commit a84211124edb5e6c2b6d2206b43b0406e1958b23
parent afaabd1b486f9266f3579e1403c08a5de488bc1f
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 17:55:53 -0700
runtime: prove live chorus parity
Diffstat:
1 file changed, 124 insertions(+), 0 deletions(-)
diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs
@@ -898,6 +898,130 @@ mod tests {
let _ = std::fs::remove_dir_all(root);
}
+ #[tokio::test]
+ async fn websocket_session_preserves_chorus_live_fanout_backpressure_parity() {
+ let shutdown = TangleShutdownSignal::new();
+ let live_root = temp_root("chorus-live-fanout-parity");
+ let _ = std::fs::remove_dir_all(&live_root);
+ let runtime = TangleRuntimeHandle::new(
+ TangleRuntime::open(runtime_config_with_outbound_queue(&live_root, 1))
+ .expect("runtime"),
+ );
+ let metrics = runtime.metrics();
+ let auth = runtime.auth_state().await.expect("auth");
+ let events = runtime.subscribe_events().await;
+ let mut session = TangleWebSocketSession::new(
+ session_limits(1),
+ shutdown.subscribe(),
+ runtime,
+ auth,
+ events,
+ )
+ .expect("session");
+ let subscription_id = SubscriptionId::new("chorus-live").expect("subscription");
+ let req_text = json!(["REQ", subscription_id.as_str(), {"kinds":[1]}]).to_string();
+
+ assert_eq!(
+ session.dispatch_text(&req_text).await,
+ TangleSessionControl::Continue
+ );
+ assert_eq!(
+ take_outbound_text(&mut session),
+ RelayMessage::Eose(subscription_id.clone()).encode()
+ );
+ for index in 0..3 {
+ let content = format!("chorus live {index}");
+ let event = tangle_v2_event(
+ FixtureKey::Member,
+ 1_714_124_433 + index,
+ 1,
+ Vec::new(),
+ &content,
+ )
+ .expect("event");
+ assert_eq!(
+ session
+ .dispatch_text(&json!(["EVENT", event_to_value(&event)]).to_string())
+ .await,
+ TangleSessionControl::Continue
+ );
+ assert_eq!(
+ take_outbound_text(&mut session),
+ RelayMessage::Ok {
+ event_id: event.id().clone(),
+ accepted: true,
+ message: String::new()
+ }
+ .encode()
+ );
+ let offset = session.events.recv().await;
+ assert_eq!(
+ session.handle_event_receive_result(offset).await,
+ TangleSessionControl::Continue
+ );
+ assert_eq!(
+ take_outbound_text(&mut session),
+ RelayMessage::Event {
+ subscription_id: subscription_id.clone(),
+ event
+ }
+ .encode()
+ );
+ assert_eq!(session.active_subscription_count(), 1);
+ }
+ assert_eq!(metrics.outbound_queue_full_closes(), 0);
+ assert_eq!(metrics.event_bus_lagged_receivers(), 0);
+ assert_eq!(metrics.event_bus_lagged_offsets(), 0);
+ let _ = std::fs::remove_dir_all(live_root);
+
+ let lag_root = temp_root("chorus-live-lag-parity");
+ let _ = std::fs::remove_dir_all(&lag_root);
+ let runtime =
+ TangleRuntime::open(runtime_config_with_outbound_queue(&lag_root, 1)).expect("runtime");
+ let auth = runtime.auth_state().expect("auth");
+ let events = runtime.event_bus().subscribe();
+ assert_eq!(runtime.event_bus().publish(StoreOffset::new(1)), 1);
+ assert_eq!(runtime.event_bus().publish(StoreOffset::new(2)), 1);
+ let runtime = TangleRuntimeHandle::new(runtime);
+ let metrics = runtime.metrics();
+ let mut lagged = TangleWebSocketSession::new(
+ session_limits(1),
+ shutdown.subscribe(),
+ runtime,
+ auth,
+ events,
+ )
+ .expect("lagged");
+ let event = lagged.events.recv().await;
+ assert_eq!(
+ lagged.handle_event_receive_result(event).await,
+ TangleSessionControl::Close(event_stream_lag_close_message())
+ );
+ assert_eq!(metrics.event_bus_lagged_receivers(), 1);
+ assert_eq!(metrics.event_bus_lagged_offsets(), 1);
+ let _ = std::fs::remove_dir_all(lag_root);
+
+ let (runtime, auth, events) = session_runtime("chorus-outbound-full-parity");
+ let metrics = runtime.metrics();
+ let mut blocked = TangleWebSocketSession::new(
+ session_limits(1),
+ shutdown.subscribe(),
+ runtime,
+ auth,
+ events,
+ )
+ .expect("blocked");
+ blocked
+ .outbound()
+ .try_send(Message::Text("blocked".into()))
+ .expect("fill queue");
+ assert_eq!(
+ blocked.dispatch_text("{").await,
+ TangleSessionControl::Close(outbound_queue_full_close_message())
+ );
+ assert_eq!(metrics.outbound_queue_full_closes(), 1);
+ }
+
#[test]
fn outbound_queue_is_bounded() {
let shutdown = TangleShutdownSignal::new();