commit 3d07345798a581db2a60dd358e39a835737e2964
parent 0018a8a7c9dc8ef3f98c711161048ab58ca60ab9
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 03:36:31 -0700
runtime: fan out stored event offsets
- publish stored source event offsets when runtime EVENT handling accepts new persistent events
- expose event-bus subscriptions and offset fanout through the runtime handle
- wire websocket sessions to consume offset broadcasts and deliver visible subscription matches
- cover stored-offset publication, duplicate suppression, and offset-based live fanout
Diffstat:
5 files changed, 291 insertions(+), 53 deletions(-)
diff --git a/crates/tangle_runtime/src/event_bus.rs b/crates/tangle_runtime/src/event_bus.rs
@@ -46,6 +46,10 @@ pub struct TangleEventReceiver {
}
impl TangleEventReceiver {
+ pub async fn recv(&mut self) -> Result<StoreOffset, TangleEventReceiveError> {
+ self.receiver.recv().await.map_err(Into::into)
+ }
+
pub fn try_recv(&mut self) -> Result<StoreOffset, TangleEventReceiveError> {
self.receiver.try_recv().map_err(Into::into)
}
@@ -68,6 +72,15 @@ impl From<broadcast::error::TryRecvError> for TangleEventReceiveError {
}
}
+impl From<broadcast::error::RecvError> for TangleEventReceiveError {
+ fn from(error: broadcast::error::RecvError) -> Self {
+ match error {
+ broadcast::error::RecvError::Closed => Self::Closed,
+ broadcast::error::RecvError::Lagged(skipped) => Self::Lagged(skipped),
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
use super::{TangleEventBus, TangleEventReceiveError};
diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs
@@ -23,6 +23,36 @@ pub struct BaseRelay {
groups: Option<GroupService>,
}
+#[derive(Debug, Clone, PartialEq)]
+pub(crate) struct BaseRelayEventWrite {
+ message: RelayMessage,
+ stored_offset: Option<StoreOffset>,
+}
+
+impl BaseRelayEventWrite {
+ fn stored(message: RelayMessage, stored_offset: StoreOffset) -> Self {
+ Self {
+ message,
+ stored_offset: Some(stored_offset),
+ }
+ }
+
+ fn unstored(message: RelayMessage) -> Self {
+ Self {
+ message,
+ stored_offset: None,
+ }
+ }
+
+ pub(crate) fn stored_offset(&self) -> Option<StoreOffset> {
+ self.stored_offset
+ }
+
+ pub(crate) fn into_message(self) -> RelayMessage {
+ self.message
+ }
+}
+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct BaseRelayShutdownReport {
closed_subscriptions: usize,
@@ -176,6 +206,7 @@ impl BaseRelay {
pub fn handle_event(&mut self, event: Event) -> Result<RelayMessage, BaseRelayError> {
self.handle_event_with_group_auth(event, &GroupAuthContext::unauthenticated())
+ .map(BaseRelayEventWrite::into_message)
}
pub fn handle_event_with_auth(
@@ -183,6 +214,15 @@ impl BaseRelay {
event: Event,
auth: &BaseAuthState,
) -> Result<RelayMessage, BaseRelayError> {
+ self.handle_event_with_auth_report(event, auth)
+ .map(BaseRelayEventWrite::into_message)
+ }
+
+ pub(crate) fn handle_event_with_auth_report(
+ &mut self,
+ event: Event,
+ auth: &BaseAuthState,
+ ) -> Result<BaseRelayEventWrite, BaseRelayError> {
self.handle_event_with_group_auth(
event,
&GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()),
@@ -211,10 +251,13 @@ impl BaseRelay {
&mut self,
event: Event,
auth: &GroupAuthContext,
- ) -> Result<RelayMessage, BaseRelayError> {
+ ) -> Result<BaseRelayEventWrite, BaseRelayError> {
let event_id = event.id().clone();
if let Err(error) = verify_event_signature(&event) {
- return Ok(ok_rejected(event_id, format!("invalid: {error}")));
+ return Ok(BaseRelayEventWrite::unstored(ok_rejected(
+ event_id,
+ format!("invalid: {error}"),
+ )));
}
let group_limits = self
.groups
@@ -223,31 +266,42 @@ impl BaseRelay {
.unwrap_or_default();
let class = match validate_client_group_event_structure(&event, group_limits) {
Ok(class) => class,
- Err(error) => return Ok(ok_rejected(event_id, error.prefixed_message())),
+ Err(error) => {
+ return Ok(BaseRelayEventWrite::unstored(ok_rejected(
+ event_id,
+ error.prefixed_message(),
+ )));
+ }
};
if !matches!(class, GroupEventClass::NonGroup) {
let Some(groups) = self.groups.as_ref() else {
- return Ok(ok_rejected(
+ return Ok(BaseRelayEventWrite::unstored(ok_rejected(
event_id,
"blocked: NIP-29 group events are not accepted before group service".to_owned(),
- ));
+ )));
};
if let Err(error) = groups.check_event(&self.store, &event, &class, auth) {
- return Ok(ok_rejected(event_id, error.prefixed_message()));
+ return Ok(BaseRelayEventWrite::unstored(ok_rejected(
+ event_id,
+ error.prefixed_message(),
+ )));
}
}
if event.unsigned().kind().is_ephemeral() {
- return Ok(ok_accepted(event_id, String::new()));
+ return Ok(BaseRelayEventWrite::unstored(ok_accepted(
+ event_id,
+ String::new(),
+ )));
}
if self
.store
.event_by_id(pocket_event_id(&event_id)?)?
.is_some()
{
- return Ok(ok_accepted(
+ return Ok(BaseRelayEventWrite::unstored(ok_accepted(
event_id,
"duplicate: already have this event".to_owned(),
- ));
+ )));
}
let pocket_event = tangle_event_to_pocket(&event)?;
let store_offset = StoreOffset::new(self.store.store_event(&pocket_event)?);
@@ -257,7 +311,10 @@ impl BaseRelay {
groups.after_source_event_stored(&self.store, &event, &class, store_offset)?;
}
self.store.sync()?;
- Ok(ok_accepted(event_id, String::new()))
+ Ok(BaseRelayEventWrite::stored(
+ ok_accepted(event_id, String::new()),
+ store_offset,
+ ))
}
pub fn handle_req(
@@ -473,6 +530,20 @@ impl BaseRelay {
.unwrap_or(Ok(true))
.map_err(BaseRelayError::from)
}
+
+ pub(crate) fn fanout_offset(
+ &self,
+ offset: StoreOffset,
+ subscriptions: &mut LiveSubscriptionSet,
+ ) -> Result<Vec<RelayMessage>, BaseRelayError> {
+ let event = self.event_by_offset(offset)?;
+ let groups = self.groups.as_ref();
+ Ok(subscriptions.fanout(&event, |event, auth| {
+ groups
+ .map(|groups| groups.event_visible_to_auth(event, auth).unwrap_or(false))
+ .unwrap_or(true)
+ }))
+ }
}
#[cfg(test)]
diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs
@@ -3,11 +3,12 @@
use crate::{
config::BaseRelayRuntimeConfig,
errors::BaseRelayError,
- event_bus::TangleEventBus,
+ event_bus::{TangleEventBus, TangleEventReceiver},
ops::BaseRelayReadinessState,
relay::{
auth::BaseAuthState,
core::{BaseRelay, BaseRelayShutdownReport},
+ live::LiveSubscriptionSet,
},
};
use std::{
@@ -112,11 +113,26 @@ impl TangleRuntimeHandle {
auth: &mut BaseAuthState,
now: UnixTimestamp,
) -> Result<Vec<RelayMessage>, BaseRelayError> {
- self.inner
- .lock()
- .await
- .relay_mut()
- .handle_client_message(message, auth, now)
+ let mut runtime = self.inner.lock().await;
+ match message {
+ ClientMessage::Event(event) => {
+ let result = runtime
+ .relay_mut()
+ .handle_event_with_auth_report(event, auth)?;
+ if let Some(offset) = result.stored_offset() {
+ runtime.metrics().record_stored_event_offset();
+ runtime.event_bus().publish(offset);
+ }
+ Ok(vec![result.into_message()])
+ }
+ message => runtime
+ .relay_mut()
+ .handle_client_message(message, auth, now),
+ }
+ }
+
+ pub async fn subscribe_events(&self) -> TangleEventReceiver {
+ self.inner.lock().await.event_bus().subscribe()
}
pub(crate) async fn query_req_with_auth(
@@ -144,6 +160,18 @@ impl TangleRuntimeHandle {
.event_by_offset_with_auth(offset, auth)
}
+ pub(crate) async fn fanout_event_offset(
+ &self,
+ offset: StoreOffset,
+ subscriptions: &mut LiveSubscriptionSet,
+ ) -> Result<Vec<RelayMessage>, BaseRelayError> {
+ self.inner
+ .lock()
+ .await
+ .relay()
+ .fanout_offset(offset, subscriptions)
+ }
+
pub async fn shutdown(&self) -> Result<BaseRelayShutdownReport, BaseRelayError> {
self.inner.lock().await.shutdown()
}
@@ -300,12 +328,17 @@ impl Default for TangleShutdownSignal {
#[cfg(test)]
mod tests {
- use super::{TangleRuntime, TangleRuntimeLimits};
+ use super::{TangleRuntime, TangleRuntimeHandle, TangleRuntimeLimits};
use crate::config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json};
- use crate::event_bus::TangleEventBus;
+ use crate::event_bus::{TangleEventBus, TangleEventReceiveError};
+ use crate::relay::live::LiveSubscriptionSet;
use serde_json::json;
use std::path::{Path, PathBuf};
- use tangle_groups::StoreOffset;
+ use tangle_groups::{GroupAuthContext, StoreOffset};
+ use tangle_protocol::{
+ ClientMessage, RelayMessage, SubscriptionId, UnixTimestamp, filter_from_value,
+ };
+ use tangle_test_support::{FixtureKey, tangle_v2_event};
#[test]
fn tangle_runtime_opens_owned_process_shell_from_config() {
@@ -369,6 +402,78 @@ mod tests {
assert!(TangleEventBus::new(0).is_err());
}
+ #[tokio::test]
+ async fn runtime_publishes_stored_event_offsets_for_live_fanout() {
+ let root = temp_root("runtime-offset-fanout");
+ let _ = std::fs::remove_dir_all(&root);
+ let handle = TangleRuntimeHandle::new(
+ TangleRuntime::open(runtime_config(&root, 8)).expect("runtime"),
+ );
+ let mut offsets = handle.subscribe_events().await;
+ let mut auth = handle.auth_state().await.expect("auth");
+ let mut subscriptions = LiveSubscriptionSet::new(8).expect("subscriptions");
+ let subscription_id = SubscriptionId::new("live-offset").expect("subscription");
+ subscriptions
+ .subscribe(
+ subscription_id.clone(),
+ vec![filter_from_value(&json!({"kinds":[1]})).expect("filter")],
+ GroupAuthContext::unauthenticated(),
+ )
+ .expect("subscribe");
+ let event = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "live")
+ .expect("event");
+
+ assert_eq!(
+ handle
+ .handle_client_message(
+ ClientMessage::Event(event.clone()),
+ &mut auth,
+ UnixTimestamp::new(1_714_124_433)
+ )
+ .await
+ .expect("event"),
+ vec![RelayMessage::Ok {
+ event_id: event.id().clone(),
+ accepted: true,
+ message: String::new()
+ }]
+ );
+ let offset = offsets.try_recv().expect("offset");
+ assert!(matches!(
+ handle
+ .fanout_event_offset(offset, &mut subscriptions)
+ .await
+ .expect("fanout")
+ .as_slice(),
+ [RelayMessage::Event {
+ subscription_id: delivered,
+ event: found
+ }] if delivered == &subscription_id && found.id() == event.id()
+ ));
+
+ assert_eq!(
+ handle
+ .handle_client_message(
+ ClientMessage::Event(event.clone()),
+ &mut auth,
+ UnixTimestamp::new(1_714_124_434)
+ )
+ .await
+ .expect("duplicate"),
+ vec![RelayMessage::Ok {
+ event_id: event.id().clone(),
+ accepted: true,
+ message: "duplicate: already have this event".to_owned()
+ }]
+ );
+ assert_eq!(
+ offsets.try_recv().expect_err("no duplicate offset"),
+ TangleEventReceiveError::Empty
+ );
+
+ let _ = std::fs::remove_dir_all(root);
+ }
+
fn runtime_config(root: &Path, max_pending_events: usize) -> BaseRelayRuntimeConfig {
let raw = json!({
"server": {
diff --git a/crates/tangle_runtime/src/server.rs b/crates/tangle_runtime/src/server.rs
@@ -125,24 +125,29 @@ async fn tangle_root(
headers: HeaderMap,
) -> Response {
match websocket {
- Ok(websocket) => match state.runtime.auth_state().await.and_then(|auth| {
- TangleWebSocketSession::new(
- state.outbound_queue_capacity,
- state.shutdown.subscribe(),
- state.runtime.clone(),
- auth,
- )
- }) {
- Ok(session) => websocket
- .protocols(["nostr"])
- .on_upgrade(move |socket| session.run(socket))
- .into_response(),
- Err(error) => (
- http::StatusCode::INTERNAL_SERVER_ERROR,
- error.prefixed_message(),
- )
- .into_response(),
- },
+ Ok(websocket) => {
+ let session = match state.runtime.auth_state().await {
+ Ok(auth) => TangleWebSocketSession::new(
+ state.outbound_queue_capacity,
+ state.shutdown.subscribe(),
+ state.runtime.clone(),
+ auth,
+ state.runtime.subscribe_events().await,
+ ),
+ Err(error) => Err(error),
+ };
+ match session {
+ Ok(session) => websocket
+ .protocols(["nostr"])
+ .on_upgrade(move |socket| session.run(socket))
+ .into_response(),
+ Err(error) => (
+ http::StatusCode::INTERNAL_SERVER_ERROR,
+ error.prefixed_message(),
+ )
+ .into_response(),
+ }
+ }
Err(_) => base_relay_info_response(state.info, headers),
}
}
diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs
@@ -2,6 +2,7 @@
use crate::{
errors::BaseRelayError,
+ event_bus::{TangleEventReceiveError, TangleEventReceiver},
relay::{
auth::{BaseAuthState, generate_auth_challenge},
live::LiveSubscriptionSet,
@@ -25,6 +26,7 @@ pub struct TangleWebSocketSession {
runtime: TangleRuntimeHandle,
auth: BaseAuthState,
subscriptions: LiveSubscriptionSet,
+ events: TangleEventReceiver,
}
impl TangleWebSocketSession {
@@ -33,6 +35,7 @@ impl TangleWebSocketSession {
shutdown: watch::Receiver<bool>,
runtime: TangleRuntimeHandle,
auth: BaseAuthState,
+ events: TangleEventReceiver,
) -> Result<Self, BaseRelayError> {
if outbound_queue_capacity == 0 {
return Err(BaseRelayError::invalid(
@@ -52,6 +55,7 @@ impl TangleWebSocketSession {
runtime,
auth,
subscriptions,
+ events,
})
}
@@ -68,7 +72,6 @@ impl TangleWebSocketSession {
}
#[cfg(test)]
- #[cfg(test)]
fn active_subscription_count(&self) -> usize {
self.subscriptions.active_count()
}
@@ -101,6 +104,17 @@ impl TangleWebSocketSession {
break;
}
}
+ event_offset = self.events.recv() => {
+ match event_offset {
+ Ok(offset) => {
+ if !self.handle_event_offset(offset).await {
+ break;
+ }
+ }
+ Err(TangleEventReceiveError::Closed | TangleEventReceiveError::Lagged(_)) => break,
+ Err(TangleEventReceiveError::Empty) => {}
+ }
+ }
changed = self.shutdown.changed() => {
if changed.is_err() || self.shutdown_requested() {
let _ = socket.send(Message::Close(None)).await;
@@ -112,6 +126,23 @@ impl TangleWebSocketSession {
self.subscriptions.close_all();
}
+ async fn handle_event_offset(&mut self, offset: tangle_groups::StoreOffset) -> bool {
+ let runtime = self.runtime.clone();
+ let replies = match runtime
+ .fanout_event_offset(offset, &mut self.subscriptions)
+ .await
+ {
+ Ok(replies) => replies,
+ Err(error) => vec![RelayMessage::Notice(error.prefixed_message())],
+ };
+ for reply in replies {
+ if self.send_relay_message(reply).is_err() {
+ return false;
+ }
+ }
+ true
+ }
+
async fn handle_incoming_message(&mut self, message: Message) -> bool {
match message {
Message::Text(raw) => self.dispatch_text(raw.as_str()).await,
@@ -246,6 +277,7 @@ mod tests {
use super::{TangleOutboundQueueError, TangleWebSocketSession};
use crate::{
config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json},
+ event_bus::TangleEventReceiver,
runtime::{TangleRuntime, TangleRuntimeHandle, TangleShutdownSignal},
};
use axum::extract::ws::Message;
@@ -257,9 +289,9 @@ mod tests {
fn websocket_session_records_connection_time() {
let before = std::time::Instant::now();
let shutdown = TangleShutdownSignal::new();
- let (runtime, auth) = session_runtime("records-connection-time");
- let session =
- TangleWebSocketSession::new(8, shutdown.subscribe(), runtime, auth).expect("session");
+ let (runtime, auth, events) = session_runtime("records-connection-time");
+ let session = TangleWebSocketSession::new(8, shutdown.subscribe(), runtime, auth, events)
+ .expect("session");
assert!(session.connected_at() >= before);
}
@@ -267,17 +299,19 @@ mod tests {
#[test]
fn websocket_session_rejects_zero_outbound_capacity() {
let shutdown = TangleShutdownSignal::new();
- let (runtime, auth) = session_runtime("zero-outbound-capacity");
+ let (runtime, auth, events) = session_runtime("zero-outbound-capacity");
- assert!(TangleWebSocketSession::new(0, shutdown.subscribe(), runtime, auth).is_err());
+ assert!(
+ TangleWebSocketSession::new(0, shutdown.subscribe(), runtime, auth, events).is_err()
+ );
}
#[test]
fn websocket_session_observes_shutdown_request() {
let shutdown = TangleShutdownSignal::new();
- let (runtime, auth) = session_runtime("observes-shutdown");
- let session =
- TangleWebSocketSession::new(8, shutdown.subscribe(), runtime, auth).expect("session");
+ let (runtime, auth, events) = session_runtime("observes-shutdown");
+ let session = TangleWebSocketSession::new(8, shutdown.subscribe(), runtime, auth, events)
+ .expect("session");
assert!(!session.shutdown_requested());
@@ -295,11 +329,14 @@ mod tests {
TangleRuntimeHandle::new(TangleRuntime::open(runtime_config(&root)).expect("runtime"));
let auth_a = runtime.auth_state().await.expect("auth a");
let auth_b = runtime.auth_state().await.expect("auth b");
+ let events_a = runtime.subscribe_events().await;
+ let events_b = runtime.subscribe_events().await;
let mut first =
- TangleWebSocketSession::new(8, shutdown.subscribe(), runtime.clone(), auth_a)
+ TangleWebSocketSession::new(8, shutdown.subscribe(), runtime.clone(), auth_a, events_a)
.expect("first");
let mut second =
- TangleWebSocketSession::new(8, shutdown.subscribe(), runtime, auth_b).expect("second");
+ TangleWebSocketSession::new(8, shutdown.subscribe(), runtime, auth_b, events_b)
+ .expect("second");
let subscription_id = SubscriptionId::new("shared").expect("subscription");
assert_eq!(
@@ -354,9 +391,9 @@ mod tests {
#[test]
fn outbound_queue_is_bounded() {
let shutdown = TangleShutdownSignal::new();
- let (runtime, auth) = session_runtime("outbound-queue");
- let session =
- TangleWebSocketSession::new(1, shutdown.subscribe(), runtime, auth).expect("session");
+ let (runtime, auth, events) = session_runtime("outbound-queue");
+ let session = TangleWebSocketSession::new(1, shutdown.subscribe(), runtime, auth, events)
+ .expect("session");
let outbound = session.outbound();
assert_eq!(outbound.capacity(), 1);
@@ -371,12 +408,19 @@ mod tests {
);
}
- fn session_runtime(name: &str) -> (TangleRuntimeHandle, crate::relay::auth::BaseAuthState) {
+ fn session_runtime(
+ name: &str,
+ ) -> (
+ TangleRuntimeHandle,
+ crate::relay::auth::BaseAuthState,
+ TangleEventReceiver,
+ ) {
let root = temp_root(name);
let _ = std::fs::remove_dir_all(&root);
let runtime = TangleRuntime::open(runtime_config(&root)).expect("runtime");
let auth = runtime.auth_state().expect("auth");
- (TangleRuntimeHandle::new(runtime), auth)
+ let events = runtime.event_bus().subscribe();
+ (TangleRuntimeHandle::new(runtime), auth, events)
}
fn req(subscription_id: SubscriptionId) -> ClientMessage {