tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 9af337383db8459657d2ac771520abbcabeb59f8
parent 3d07345798a581db2a60dd358e39a835737e2964
Author: triesap <tyson@radroots.org>
Date:   Sun, 14 Jun 2026 03:39:57 -0700

runtime: close lagged websocket sessions

- Treat lagged runtime event receivers as stale WebSocket sessions.

- Send an explicit policy close frame when the offset stream skips events.

- Cover broadcast lag handling with a session-level regression test.

- Validated with cargo fmt --all -- --check, cargo check --workspace --all-targets, cargo test --workspace, and cargo clippy --workspace --all-targets -- -D warnings.

Diffstat:
Mcrates/tangle_runtime/src/session.rs | 95+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------
1 file changed, 84 insertions(+), 11 deletions(-)

diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs @@ -9,7 +9,7 @@ use crate::{ }, runtime::TangleRuntimeHandle, }; -use axum::extract::ws::{Message, WebSocket}; +use axum::extract::ws::{CloseFrame, Message, Utf8Bytes, WebSocket}; use std::time::{Instant, SystemTime, UNIX_EPOCH}; use tangle_groups::GroupAuthContext; use tangle_protocol::{ @@ -104,15 +104,14 @@ impl TangleWebSocketSession { break; } } - event_offset = self.events.recv() => { - match event_offset { - Ok(offset) => { - if !self.handle_event_offset(offset).await { - break; - } + event = self.events.recv() => { + match self.handle_event_receive_result(event).await { + TangleSessionControl::Continue => {} + TangleSessionControl::Close(message) => { + let _ = socket.send(message).await; + break; } - Err(TangleEventReceiveError::Closed | TangleEventReceiveError::Lagged(_)) => break, - Err(TangleEventReceiveError::Empty) => {} + TangleSessionControl::Stop => break, } } changed = self.shutdown.changed() => { @@ -126,6 +125,26 @@ impl TangleWebSocketSession { self.subscriptions.close_all(); } + async fn handle_event_receive_result( + &mut self, + result: Result<tangle_groups::StoreOffset, TangleEventReceiveError>, + ) -> TangleSessionControl { + match result { + Ok(offset) => { + if self.handle_event_offset(offset).await { + TangleSessionControl::Continue + } else { + TangleSessionControl::Stop + } + } + Err(TangleEventReceiveError::Lagged(_)) => { + TangleSessionControl::Close(event_stream_lag_close_message()) + } + Err(TangleEventReceiveError::Closed) => TangleSessionControl::Stop, + Err(TangleEventReceiveError::Empty) => TangleSessionControl::Continue, + } + } + async fn handle_event_offset(&mut self, offset: tangle_groups::StoreOffset) -> bool { let runtime = self.runtime.clone(); let replies = match runtime @@ -232,6 +251,20 @@ impl TangleWebSocketSession { } } +#[derive(Debug, Clone, PartialEq, Eq)] +enum TangleSessionControl { + Continue, + Close(Message), + Stop, +} + +fn event_stream_lag_close_message() -> Message { + Message::Close(Some(CloseFrame { + code: 1008, + reason: Utf8Bytes::from_static("event stream lagged; reconnect required"), + })) +} + #[derive(Debug, Clone)] pub struct TangleOutboundSender { sender: mpsc::Sender<Message>, @@ -274,7 +307,10 @@ fn current_unix_timestamp() -> UnixTimestamp { #[cfg(test)] mod tests { - use super::{TangleOutboundQueueError, TangleWebSocketSession}; + use super::{ + TangleOutboundQueueError, TangleSessionControl, TangleWebSocketSession, + event_stream_lag_close_message, + }; use crate::{ config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json}, event_bus::TangleEventReceiver, @@ -283,6 +319,7 @@ mod tests { use axum::extract::ws::Message; use serde_json::json; use std::path::{Path, PathBuf}; + use tangle_groups::StoreOffset; use tangle_protocol::{ClientMessage, Filter, RelayMessage, SubscriptionId}; #[test] @@ -388,6 +425,35 @@ mod tests { let _ = std::fs::remove_dir_all(root); } + #[tokio::test] + async fn websocket_session_closes_when_event_receiver_lags() { + let shutdown = TangleShutdownSignal::new(); + let root = temp_root("event-receiver-lag"); + let _ = std::fs::remove_dir_all(&root); + let runtime = + TangleRuntime::open(runtime_config_with_max_pending_events(&root, 1)).expect("runtime"); + let auth = runtime.auth_state().expect("auth"); + let events = runtime.event_bus().subscribe(); + assert_eq!(runtime.event_bus().publish(StoreOffset::new(1)), 1); + assert_eq!(runtime.event_bus().publish(StoreOffset::new(2)), 1); + let mut session = TangleWebSocketSession::new( + 1, + shutdown.subscribe(), + TangleRuntimeHandle::new(runtime), + auth, + events, + ) + .expect("session"); + let event = session.events.recv().await; + + assert_eq!( + session.handle_event_receive_result(event).await, + TangleSessionControl::Close(event_stream_lag_close_message()) + ); + + let _ = std::fs::remove_dir_all(root); + } + #[test] fn outbound_queue_is_bounded() { let shutdown = TangleShutdownSignal::new(); @@ -431,6 +497,13 @@ mod tests { } fn runtime_config(root: &Path) -> BaseRelayRuntimeConfig { + runtime_config_with_max_pending_events(root, 8) + } + + fn runtime_config_with_max_pending_events( + root: &Path, + max_pending_events: usize, + ) -> BaseRelayRuntimeConfig { let raw = json!({ "server": { "listen_addr": "127.0.0.1:0", @@ -450,7 +523,7 @@ mod tests { "created_at_skew_seconds": 600 }, "limits": { - "max_pending_events": 8 + "max_pending_events": max_pending_events } }) .to_string();