commit e976ba37a9b0e80c9017fad951444ff6865e1330
parent 024c0c327c740b87778580bb06bccdfbeeca9cbf
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 03:05:23 -0700
runtime: scope websocket subscriptions
- move websocket REQ state into per-session live subscription scopes
- route session REQ handling through query-only relay snapshots without global subscription writes
- keep CLOSE local to the owning websocket connection and clear scopes on session exit
- add per-connection subscription isolation coverage while preserving base relay tests
Diffstat:
3 files changed, 199 insertions(+), 29 deletions(-)
diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs
@@ -102,25 +102,46 @@ impl BaseRelay {
self.handle_close(&subscription_id);
Ok(Vec::new())
}
- ClientMessage::Auth(event) => auth
- .authenticate(&event, now)
- .map(|_| {
- vec![RelayMessage::Ok {
- event_id: event.id().clone(),
- accepted: true,
- message: String::new(),
- }]
- })
- .or_else(|error| {
- Ok(vec![RelayMessage::Ok {
- event_id: event.id().clone(),
- accepted: false,
- message: error.prefixed_message(),
- }])
- }),
+ ClientMessage::Auth(event) => Ok(self.handle_auth_message(event, auth, now)),
}
}
+ pub(crate) fn query_req_with_auth(
+ &self,
+ subscription_id: SubscriptionId,
+ filters: Vec<Filter>,
+ auth: &BaseAuthState,
+ ) -> Result<Vec<RelayMessage>, BaseRelayError> {
+ self.query_req_with_group_auth(
+ subscription_id,
+ filters,
+ &GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()),
+ )
+ }
+
+ fn handle_auth_message(
+ &self,
+ event: Event,
+ auth: &mut BaseAuthState,
+ now: UnixTimestamp,
+ ) -> Vec<RelayMessage> {
+ auth.authenticate(&event, now)
+ .map(|_| {
+ vec![RelayMessage::Ok {
+ event_id: event.id().clone(),
+ accepted: true,
+ message: String::new(),
+ }]
+ })
+ .unwrap_or_else(|error| {
+ vec![RelayMessage::Ok {
+ event_id: event.id().clone(),
+ accepted: false,
+ message: error.prefixed_message(),
+ }]
+ })
+ }
+
pub fn handle_event(&mut self, event: Event) -> Result<RelayMessage, BaseRelayError> {
self.handle_event_with_group_auth(event, &GroupAuthContext::unauthenticated())
}
@@ -240,6 +261,15 @@ impl BaseRelay {
) -> Result<Vec<RelayMessage>, BaseRelayError> {
self.subscriptions
.subscribe(subscription_id.clone(), filters.clone(), auth.clone())?;
+ self.query_req_with_group_auth(subscription_id, filters, auth)
+ }
+
+ fn query_req_with_group_auth(
+ &self,
+ subscription_id: SubscriptionId,
+ filters: Vec<Filter>,
+ auth: &GroupAuthContext,
+ ) -> Result<Vec<RelayMessage>, BaseRelayError> {
let mut messages = self
.query_events(&filters, auth)?
.into_iter()
diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs
@@ -18,7 +18,7 @@ use std::{
},
time::Instant,
};
-use tangle_protocol::{ClientMessage, RelayMessage, UnixTimestamp};
+use tangle_protocol::{ClientMessage, Filter, RelayMessage, SubscriptionId, UnixTimestamp};
use tokio::sync::{Mutex, watch};
pub struct TangleRuntime {
@@ -118,6 +118,19 @@ impl TangleRuntimeHandle {
.handle_client_message(message, auth, now)
}
+ pub(crate) async fn query_req_with_auth(
+ &self,
+ subscription_id: SubscriptionId,
+ filters: Vec<Filter>,
+ auth: &BaseAuthState,
+ ) -> Result<Vec<RelayMessage>, BaseRelayError> {
+ self.inner
+ .lock()
+ .await
+ .relay()
+ .query_req_with_auth(subscription_id, filters, auth)
+ }
+
pub async fn shutdown(&self) -> Result<BaseRelayShutdownReport, BaseRelayError> {
self.inner.lock().await.shutdown()
}
diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs
@@ -2,12 +2,18 @@
use crate::{
errors::BaseRelayError,
- relay::auth::{BaseAuthState, generate_auth_challenge},
+ relay::{
+ auth::{BaseAuthState, generate_auth_challenge},
+ live::LiveSubscriptionSet,
+ },
runtime::TangleRuntimeHandle,
};
use axum::extract::ws::{Message, WebSocket};
use std::time::{Instant, SystemTime, UNIX_EPOCH};
-use tangle_protocol::{RelayMessage, UnixTimestamp, parse_client_message};
+use tangle_groups::GroupAuthContext;
+use tangle_protocol::{
+ ClientMessage, Filter, RelayMessage, SubscriptionId, UnixTimestamp, parse_client_message,
+};
use tokio::sync::{mpsc, watch};
#[derive(Debug)]
@@ -18,6 +24,7 @@ pub struct TangleWebSocketSession {
shutdown: watch::Receiver<bool>,
runtime: TangleRuntimeHandle,
auth: BaseAuthState,
+ subscriptions: LiveSubscriptionSet,
}
impl TangleWebSocketSession {
@@ -33,6 +40,7 @@ impl TangleWebSocketSession {
));
}
let (sender, receiver) = mpsc::channel(outbound_queue_capacity);
+ let subscriptions = LiveSubscriptionSet::new(outbound_queue_capacity)?;
Ok(Self {
connected_at: Instant::now(),
outbound: TangleOutboundSender {
@@ -43,6 +51,7 @@ impl TangleWebSocketSession {
shutdown,
runtime,
auth,
+ subscriptions,
})
}
@@ -58,6 +67,12 @@ impl TangleWebSocketSession {
*self.shutdown.borrow()
}
+ #[cfg(test)]
+ #[cfg(test)]
+ fn active_subscription_count(&self) -> usize {
+ self.subscriptions.active_count()
+ }
+
pub async fn run(mut self, mut socket: WebSocket) {
if !self.issue_auth_challenge() {
return;
@@ -94,6 +109,7 @@ impl TangleWebSocketSession {
}
}
}
+ self.subscriptions.close_all();
}
async fn handle_incoming_message(&mut self, message: Message) -> bool {
@@ -121,16 +137,10 @@ impl TangleWebSocketSession {
async fn dispatch_text(&mut self, raw: &str) -> bool {
let replies = match parse_client_message(raw) {
- Ok(message) => {
- match self
- .runtime
- .handle_client_message(message, &mut self.auth, current_unix_timestamp())
- .await
- {
- Ok(replies) => replies,
- Err(error) => vec![RelayMessage::Notice(error.prefixed_message())],
- }
- }
+ Ok(message) => match self.handle_client_message(message).await {
+ Ok(replies) => replies,
+ Err(error) => vec![RelayMessage::Notice(error.prefixed_message())],
+ },
Err(error) => vec![RelayMessage::Notice(format!("invalid: {error}"))],
};
for reply in replies {
@@ -141,6 +151,50 @@ impl TangleWebSocketSession {
true
}
+ async fn handle_client_message(
+ &mut self,
+ message: ClientMessage,
+ ) -> Result<Vec<RelayMessage>, BaseRelayError> {
+ match message {
+ ClientMessage::Req {
+ subscription_id,
+ filters,
+ } => self.handle_req(subscription_id, filters).await,
+ ClientMessage::Close(subscription_id) => {
+ self.subscriptions.close(&subscription_id);
+ Ok(Vec::new())
+ }
+ message => {
+ self.runtime
+ .handle_client_message(message, &mut self.auth, current_unix_timestamp())
+ .await
+ }
+ }
+ }
+
+ async fn handle_req(
+ &mut self,
+ subscription_id: SubscriptionId,
+ filters: Vec<Filter>,
+ ) -> Result<Vec<RelayMessage>, BaseRelayError> {
+ self.subscriptions.subscribe(
+ subscription_id.clone(),
+ filters.clone(),
+ GroupAuthContext::new(self.auth.authenticated_pubkeys().iter().cloned()),
+ )?;
+ match self
+ .runtime
+ .query_req_with_auth(subscription_id.clone(), filters, &self.auth)
+ .await
+ {
+ Ok(replies) => Ok(replies),
+ Err(error) => {
+ self.subscriptions.close(&subscription_id);
+ Err(error)
+ }
+ }
+ }
+
fn send_relay_message(&self, message: RelayMessage) -> Result<(), TangleOutboundQueueError> {
self.outbound
.try_send(Message::Text(message.encode().into()))
@@ -197,6 +251,7 @@ mod tests {
use axum::extract::ws::Message;
use serde_json::json;
use std::path::{Path, PathBuf};
+ use tangle_protocol::{ClientMessage, Filter, RelayMessage, SubscriptionId};
#[test]
fn websocket_session_records_connection_time() {
@@ -231,6 +286,71 @@ mod tests {
assert!(session.shutdown_requested());
}
+ #[tokio::test]
+ async fn websocket_session_scopes_subscriptions_per_connection() {
+ let shutdown = TangleShutdownSignal::new();
+ let root = temp_root("connection-scope");
+ let _ = std::fs::remove_dir_all(&root);
+ let runtime =
+ TangleRuntimeHandle::new(TangleRuntime::open(runtime_config(&root)).expect("runtime"));
+ let auth_a = runtime.auth_state().await.expect("auth a");
+ let auth_b = runtime.auth_state().await.expect("auth b");
+ let mut first =
+ TangleWebSocketSession::new(8, shutdown.subscribe(), runtime.clone(), auth_a)
+ .expect("first");
+ let mut second =
+ TangleWebSocketSession::new(8, shutdown.subscribe(), runtime, auth_b).expect("second");
+ let subscription_id = SubscriptionId::new("shared").expect("subscription");
+
+ assert_eq!(
+ first
+ .handle_client_message(req(subscription_id.clone()))
+ .await
+ .expect("first req"),
+ vec![RelayMessage::Eose(subscription_id.clone())]
+ );
+ assert_eq!(
+ second
+ .handle_client_message(req(subscription_id.clone()))
+ .await
+ .expect("second req"),
+ vec![RelayMessage::Eose(subscription_id.clone())]
+ );
+ assert_eq!(first.active_subscription_count(), 1);
+ assert_eq!(second.active_subscription_count(), 1);
+
+ assert_eq!(
+ first
+ .handle_client_message(ClientMessage::Close(subscription_id.clone()))
+ .await
+ .expect("close first"),
+ Vec::<RelayMessage>::new()
+ );
+ assert_eq!(first.active_subscription_count(), 0);
+ assert_eq!(second.active_subscription_count(), 1);
+
+ assert_eq!(
+ second
+ .handle_client_message(req(subscription_id.clone()))
+ .await
+ .expect("replace second"),
+ vec![RelayMessage::Eose(subscription_id.clone())]
+ );
+ assert_eq!(first.active_subscription_count(), 0);
+ assert_eq!(second.active_subscription_count(), 1);
+
+ assert_eq!(
+ second
+ .handle_client_message(ClientMessage::Close(subscription_id))
+ .await
+ .expect("close second"),
+ Vec::<RelayMessage>::new()
+ );
+ assert_eq!(second.active_subscription_count(), 0);
+
+ let _ = std::fs::remove_dir_all(root);
+ }
+
#[test]
fn outbound_queue_is_bounded() {
let shutdown = TangleShutdownSignal::new();
@@ -259,6 +379,13 @@ mod tests {
(TangleRuntimeHandle::new(runtime), auth)
}
+ fn req(subscription_id: SubscriptionId) -> ClientMessage {
+ ClientMessage::Req {
+ subscription_id,
+ filters: vec![Filter::empty()],
+ }
+ }
+
fn runtime_config(root: &Path) -> BaseRelayRuntimeConfig {
let raw = json!({
"server": {