commit 94a60f76952af42db453ba2d127c103b871b73ac
parent 726cc097ddcf2e6ff304c38a85374828f9ddb0cc
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 17:43:49 -0700
runtime: prove close chorus parity
Diffstat:
1 file changed, 118 insertions(+), 0 deletions(-)
diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs
@@ -694,6 +694,124 @@ mod tests {
}
#[tokio::test]
+ async fn websocket_session_preserves_chorus_close_scope_parity() {
+ let shutdown = TangleShutdownSignal::new();
+ let root = temp_root("chorus-close-scope-parity");
+ let _ = std::fs::remove_dir_all(&root);
+ let runtime =
+ TangleRuntimeHandle::new(TangleRuntime::open(runtime_config(&root)).expect("runtime"));
+ let metrics = runtime.metrics();
+ let auth_a = runtime.auth_state().await.expect("auth a");
+ let auth_b = runtime.auth_state().await.expect("auth b");
+ let events_a = runtime.subscribe_events().await;
+ let events_b = runtime.subscribe_events().await;
+ let mut first = TangleWebSocketSession::new(
+ session_limits(8),
+ shutdown.subscribe(),
+ runtime.clone(),
+ auth_a,
+ events_a,
+ )
+ .expect("first");
+ let mut second = TangleWebSocketSession::new(
+ session_limits(8),
+ shutdown.subscribe(),
+ runtime,
+ auth_b,
+ events_b,
+ )
+ .expect("second");
+ let subscription_id = SubscriptionId::new("shared-close").expect("subscription");
+ let req_text = json!(["REQ", subscription_id.as_str(), {"kinds":[1]}]).to_string();
+
+ assert_eq!(
+ first.dispatch_text(&req_text).await,
+ TangleSessionControl::Continue
+ );
+ assert_eq!(
+ take_outbound_text(&mut first),
+ RelayMessage::Eose(subscription_id.clone()).encode()
+ );
+ assert_eq!(
+ second.dispatch_text(&req_text).await,
+ TangleSessionControl::Continue
+ );
+ assert_eq!(
+ take_outbound_text(&mut second),
+ RelayMessage::Eose(subscription_id.clone()).encode()
+ );
+ assert_eq!(first.active_subscription_count(), 1);
+ assert_eq!(second.active_subscription_count(), 1);
+
+ let close_text = json!(["CLOSE", subscription_id.as_str()]).to_string();
+ assert_eq!(
+ first.dispatch_text(&close_text).await,
+ TangleSessionControl::Continue
+ );
+ assert!(first.outbound_receiver.try_recv().is_err());
+ assert_eq!(
+ first.dispatch_text(&close_text).await,
+ TangleSessionControl::Continue
+ );
+ assert!(first.outbound_receiver.try_recv().is_err());
+ assert_eq!(first.active_subscription_count(), 0);
+ assert_eq!(second.active_subscription_count(), 1);
+
+ let event = tangle_v2_event(
+ FixtureKey::Member,
+ 1_714_124_433,
+ 1,
+ Vec::new(),
+ "close scope parity",
+ )
+ .expect("event");
+ assert_eq!(
+ first
+ .dispatch_text(&json!(["EVENT", event_to_value(&event)]).to_string())
+ .await,
+ TangleSessionControl::Continue
+ );
+ assert_eq!(
+ take_outbound_text(&mut first),
+ RelayMessage::Ok {
+ event_id: event.id().clone(),
+ accepted: true,
+ message: String::new()
+ }
+ .encode()
+ );
+
+ let first_offset = first.events.recv().await;
+ let second_offset = second.events.recv().await;
+ assert_eq!(
+ first.handle_event_receive_result(first_offset).await,
+ TangleSessionControl::Continue
+ );
+ assert!(first.outbound_receiver.try_recv().is_err());
+ assert_eq!(
+ second.handle_event_receive_result(second_offset).await,
+ TangleSessionControl::Continue
+ );
+ assert_eq!(
+ take_outbound_text(&mut second),
+ RelayMessage::Event {
+ subscription_id: subscription_id.clone(),
+ event
+ }
+ .encode()
+ );
+ let snapshot = metrics.snapshot();
+ assert_eq!(snapshot.client_messages(), 5);
+ assert_eq!(snapshot.event_messages(), 1);
+ assert_eq!(snapshot.req_messages(), 2);
+ assert_eq!(snapshot.close_messages(), 2);
+ assert_eq!(snapshot.opened_subscriptions(), 2);
+ assert_eq!(snapshot.closed_subscriptions(), 1);
+
+ let _ = std::fs::remove_dir_all(root);
+ }
+
+ #[tokio::test]
async fn websocket_session_rate_limited_req_does_not_subscribe() {
let shutdown = TangleShutdownSignal::new();
let root = temp_root("rate-limited-req");