tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 403764654d62fdd51e7f6d3a9135c47ce11d9055
parent 65dc08c1f9068277f31892d1d7caa1c27572a711
Author: triesap <tyson@radroots.org>
Date:   Sun, 14 Jun 2026 02:52:30 -0700

runtime: dispatch websocket client messages

- add a shared async runtime handle for upgraded WebSocket sessions
- parse text frames with the existing client-message parser and encode relay replies
- dispatch EVENT, REQ, COUNT, CLOSE, AUTH, and malformed input through the relay path
- verify formatting, focused websocket tests, runtime tests, workspace checks, and clippy

Diffstat:
Mcrates/tangle_runtime/Cargo.toml | 2+-
Mcrates/tangle_runtime/src/runtime.rs | 44+++++++++++++++++++++++++++++++++++++++++++-
Mcrates/tangle_runtime/src/server.rs | 155++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------
Mcrates/tangle_runtime/src/session.rs | 126++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----
4 files changed, 304 insertions(+), 23 deletions(-)

diff --git a/crates/tangle_runtime/Cargo.toml b/crates/tangle_runtime/Cargo.toml @@ -20,7 +20,7 @@ tokio = { version = "1", features = ["net", "sync"] } tracing = "0.1" [dev-dependencies] -futures-util = { version = "0.3", default-features = false, features = ["std"] } +futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] } tangle_test_support = { path = "../tangle_test_support" } tokio = { version = "1", features = ["macros", "rt", "time"] } tokio-tungstenite = "0.29" diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs @@ -11,13 +11,15 @@ use crate::{ }, }; use std::{ + fmt, sync::{ Arc, atomic::{AtomicU64, AtomicUsize, Ordering}, }, time::Instant, }; -use tokio::sync::watch; +use tangle_protocol::{ClientMessage, RelayMessage, UnixTimestamp}; +use tokio::sync::{Mutex, watch}; pub struct TangleRuntime { config: BaseRelayRuntimeConfig, @@ -87,6 +89,46 @@ impl TangleRuntime { } } +#[derive(Clone)] +pub struct TangleRuntimeHandle { + inner: Arc<Mutex<TangleRuntime>>, +} + +impl TangleRuntimeHandle { + pub fn new(runtime: TangleRuntime) -> Self { + Self { + inner: Arc::new(Mutex::new(runtime)), + } + } + + pub async fn auth_state(&self) -> Result<BaseAuthState, BaseRelayError> { + self.inner.lock().await.auth_state() + } + + pub async fn handle_client_message( + &self, + message: ClientMessage, + auth: &mut BaseAuthState, + now: UnixTimestamp, + ) -> Result<Vec<RelayMessage>, BaseRelayError> { + self.inner + .lock() + .await + .relay_mut() + .handle_client_message(message, auth, now) + } + + pub async fn shutdown(&self) -> Result<BaseRelayShutdownReport, BaseRelayError> { + self.inner.lock().await.shutdown() + } +} + +impl fmt::Debug for TangleRuntimeHandle { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter.write_str("TangleRuntimeHandle") + } +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct TangleRuntimeLimits { max_pending_events: usize, diff --git a/crates/tangle_runtime/src/server.rs b/crates/tangle_runtime/src/server.rs @@ -4,7 +4,7 @@ use crate::{ errors::BaseRelayError, nip11::{BaseRelayInfoConfig, BaseRelayInfoDocument, base_relay_info_response}, ops::{BaseRelayReadinessState, base_relay_ops_router}, - runtime::{TangleRuntime, TangleShutdownSignal}, + runtime::{TangleRuntime, TangleRuntimeHandle, TangleShutdownSignal}, session::TangleWebSocketSession, }; use axum::{ @@ -53,7 +53,7 @@ pub async fn serve_until_shutdown( } pub async fn serve_listener_until_shutdown( - mut runtime: TangleRuntime, + runtime: TangleRuntime, listener: TcpListener, ) -> Result<TangleServeReport, BaseRelayError> { let listen_addr = listener @@ -61,13 +61,18 @@ pub async fn serve_listener_until_shutdown( .map_err(|error| BaseRelayError::error(error.to_string()))?; let info = BaseRelayInfoConfig::new("tangle", runtime.config().groups().clone())?.build_document()?; + let readiness = runtime.readiness_state().clone(); + let outbound_queue_capacity = runtime.limits().outbound_queue_capacity(); + let shutdown_signal = runtime.shutdown_signal().clone(); + let runtime = TangleRuntimeHandle::new(runtime); let router = tangle_http_router( - runtime.readiness_state().clone(), + readiness, info, - runtime.limits().outbound_queue_capacity(), - runtime.shutdown_signal().clone(), + outbound_queue_capacity, + shutdown_signal.clone(), + runtime.clone(), ); - let mut shutdown = runtime.shutdown_signal().subscribe(); + let mut shutdown = shutdown_signal.subscribe(); axum::serve(listener, router) .with_graceful_shutdown(async move { loop { @@ -81,7 +86,7 @@ pub async fn serve_listener_until_shutdown( }) .await .map_err(|error| BaseRelayError::error(error.to_string()))?; - let shutdown = runtime.shutdown()?; + let shutdown = runtime.shutdown().await?; Ok(TangleServeReport::new( listen_addr, shutdown.closed_subscriptions(), @@ -93,6 +98,7 @@ pub fn tangle_http_router( info: BaseRelayInfoDocument, outbound_queue_capacity: usize, shutdown: TangleShutdownSignal, + runtime: TangleRuntimeHandle, ) -> Router { Router::new() .route("/", get(tangle_root)) @@ -100,6 +106,7 @@ pub fn tangle_http_router( info, outbound_queue_capacity, shutdown, + runtime, }) .merge(base_relay_ops_router(readiness)) } @@ -109,6 +116,7 @@ struct TangleHttpState { info: BaseRelayInfoDocument, outbound_queue_capacity: usize, shutdown: TangleShutdownSignal, + runtime: TangleRuntimeHandle, } async fn tangle_root( @@ -117,10 +125,14 @@ async fn tangle_root( headers: HeaderMap, ) -> Response { match websocket { - Ok(websocket) => match TangleWebSocketSession::new( - state.outbound_queue_capacity, - state.shutdown.subscribe(), - ) { + Ok(websocket) => match state.runtime.auth_state().await.and_then(|auth| { + TangleWebSocketSession::new( + state.outbound_queue_capacity, + state.shutdown.subscribe(), + state.runtime.clone(), + auth, + ) + }) { Ok(session) => websocket .protocols(["nostr"]) .on_upgrade(move |socket| session.run(socket)) @@ -142,13 +154,15 @@ mod tests { config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json}, nip11::BaseRelayInfoConfig, ops::BaseRelayReadinessState, - runtime::{TangleRuntime, TangleShutdownSignal}, + runtime::{TangleRuntime, TangleRuntimeHandle, TangleShutdownSignal}, }; use axum::body::to_bytes; - use futures_util::StreamExt; + use futures_util::{SinkExt, StreamExt}; use http::{Request, header}; use serde_json::json; use std::path::{Path, PathBuf}; + use tangle_protocol::event_to_value; + use tangle_test_support::{FixtureKey, tangle_v2_auth_event, tangle_v2_event}; use tokio::net::TcpListener; use tokio::time::{Duration, timeout}; use tokio_tungstenite::tungstenite::{ @@ -252,6 +266,93 @@ mod tests { } #[tokio::test] + async fn websocket_session_dispatches_base_client_messages() { + let root = temp_root("websocket-dispatch"); + let _ = std::fs::remove_dir_all(&root); + let runtime = TangleRuntime::open(runtime_config(&root)).expect("runtime"); + let shutdown = runtime.shutdown_signal().clone(); + let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); + let address = listener.local_addr().expect("address"); + let task = tokio::spawn(super::serve_listener_until_shutdown(runtime, listener)); + let mut request = format!("ws://{address}/") + .into_client_request() + .expect("request"); + request.headers_mut().insert( + header::SEC_WEBSOCKET_PROTOCOL, + http::HeaderValue::from_static("nostr"), + ); + let (mut socket, response) = tokio_tungstenite::connect_async(request) + .await + .expect("websocket"); + let event = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "hello") + .expect("event"); + let auth = tangle_v2_auth_event(FixtureKey::Owner, "missing-challenge", 1_714_124_434) + .expect("auth"); + + assert_eq!(response.status(), http::StatusCode::SWITCHING_PROTOCOLS); + + send_client_text(&mut socket, "{").await; + let notice = read_relay_value(&mut socket).await; + assert_eq!(notice[0], "NOTICE"); + assert!( + notice[1] + .as_str() + .expect("notice") + .starts_with("invalid: client message JSON is invalid:") + ); + + send_client_value(&mut socket, json!(["EVENT", event_to_value(&event)])).await; + assert_eq!( + read_relay_value(&mut socket).await, + json!(["OK", event.id().as_str(), true, ""]) + ); + + send_client_value(&mut socket, json!(["COUNT", "count-a", {}])).await; + assert_eq!( + read_relay_value(&mut socket).await, + json!(["COUNT", "count-a", {"count": 1}]) + ); + + send_client_value(&mut socket, json!(["REQ", "sub-a", {}])).await; + let req_event = read_relay_value(&mut socket).await; + assert_eq!(req_event[0], "EVENT"); + assert_eq!(req_event[1], "sub-a"); + assert_eq!(req_event[2]["id"], event.id().as_str()); + assert_eq!( + read_relay_value(&mut socket).await, + json!(["EOSE", "sub-a"]) + ); + + send_client_value(&mut socket, json!(["CLOSE", "sub-a"])).await; + assert!( + timeout(Duration::from_millis(50), socket.next()) + .await + .is_err() + ); + + send_client_value(&mut socket, json!(["AUTH", event_to_value(&auth)])).await; + let auth_reply = read_relay_value(&mut socket).await; + assert_eq!(auth_reply[0], "OK"); + assert_eq!(auth_reply[1], auth.id().as_str()); + assert_eq!(auth_reply[2], false); + assert!( + auth_reply[3] + .as_str() + .expect("auth message") + .starts_with("auth-required:") + ); + + shutdown.request_shutdown(); + let report = timeout(Duration::from_secs(1), task) + .await + .expect("server shutdown") + .expect("task") + .expect("serve"); + assert_eq!(report.listen_addr(), address); + let _ = std::fs::remove_dir_all(root); + } + + #[tokio::test] async fn tangle_http_router_serves_nip11_health_and_ready_routes() { let root = temp_root("http-router"); let config = runtime_config(&root); @@ -264,6 +365,7 @@ mod tests { info, 8, TangleShutdownSignal::new(), + TangleRuntimeHandle::new(TangleRuntime::open(config).expect("runtime")), ); let nip11 = router .clone() @@ -365,4 +467,31 @@ mod tests { fn temp_root(name: &str) -> PathBuf { std::env::temp_dir().join(format!("tangle-server-{name}-{}", std::process::id())) } + + type TestWebSocket = tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>, + >; + + async fn send_client_value(socket: &mut TestWebSocket, value: serde_json::Value) { + send_client_text(socket, &value.to_string()).await; + } + + async fn send_client_text(socket: &mut TestWebSocket, value: &str) { + socket + .send(TungsteniteMessage::Text(value.to_owned().into())) + .await + .expect("send client message"); + } + + async fn read_relay_value(socket: &mut TestWebSocket) -> serde_json::Value { + let message = timeout(Duration::from_secs(1), socket.next()) + .await + .expect("relay message timeout") + .expect("relay message") + .expect("relay message result"); + let TungsteniteMessage::Text(text) = message else { + panic!("expected relay text message, got {message:?}"); + }; + serde_json::from_str(text.as_str()).expect("relay json") + } } diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs @@ -1,8 +1,9 @@ #![forbid(unsafe_code)] -use crate::errors::BaseRelayError; +use crate::{errors::BaseRelayError, relay::auth::BaseAuthState, runtime::TangleRuntimeHandle}; use axum::extract::ws::{Message, WebSocket}; -use std::time::Instant; +use std::time::{Instant, SystemTime, UNIX_EPOCH}; +use tangle_protocol::{RelayMessage, UnixTimestamp, parse_client_message}; use tokio::sync::{mpsc, watch}; #[derive(Debug)] @@ -11,12 +12,16 @@ pub struct TangleWebSocketSession { outbound: TangleOutboundSender, outbound_receiver: mpsc::Receiver<Message>, shutdown: watch::Receiver<bool>, + runtime: TangleRuntimeHandle, + auth: BaseAuthState, } impl TangleWebSocketSession { pub fn new( outbound_queue_capacity: usize, shutdown: watch::Receiver<bool>, + runtime: TangleRuntimeHandle, + auth: BaseAuthState, ) -> Result<Self, BaseRelayError> { if outbound_queue_capacity == 0 { return Err(BaseRelayError::invalid( @@ -32,6 +37,8 @@ impl TangleWebSocketSession { }, outbound_receiver: receiver, shutdown, + runtime, + auth, }) } @@ -57,7 +64,11 @@ impl TangleWebSocketSession { incoming = socket.recv() => { match incoming { Some(Ok(Message::Close(_))) | Some(Err(_)) | None => break, - Some(Ok(_)) => {} + Some(Ok(message)) => { + if !self.handle_incoming_message(message).await { + break; + } + } } } outgoing = self.outbound_receiver.recv() => { @@ -77,6 +88,46 @@ impl TangleWebSocketSession { } } } + + async fn handle_incoming_message(&mut self, message: Message) -> bool { + match message { + Message::Text(raw) => self.dispatch_text(raw.as_str()).await, + Message::Binary(_) => self + .send_relay_message(RelayMessage::Notice( + "invalid: client message must be a text frame".to_owned(), + )) + .is_ok(), + Message::Ping(_) | Message::Pong(_) => true, + Message::Close(_) => false, + } + } + + async fn dispatch_text(&mut self, raw: &str) -> bool { + let replies = match parse_client_message(raw) { + Ok(message) => { + match self + .runtime + .handle_client_message(message, &mut self.auth, current_unix_timestamp()) + .await + { + Ok(replies) => replies, + Err(error) => vec![RelayMessage::Notice(error.prefixed_message())], + } + } + Err(error) => vec![RelayMessage::Notice(format!("invalid: {error}"))], + }; + for reply in replies { + if self.send_relay_message(reply).is_err() { + return false; + } + } + true + } + + fn send_relay_message(&self, message: RelayMessage) -> Result<(), TangleOutboundQueueError> { + self.outbound + .try_send(Message::Text(message.encode().into())) + } } #[derive(Debug, Clone)] @@ -110,17 +161,33 @@ impl From<mpsc::error::TrySendError<Message>> for TangleOutboundQueueError { } } +fn current_unix_timestamp() -> UnixTimestamp { + UnixTimestamp::new( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|duration| duration.as_secs()) + .unwrap_or(0), + ) +} + #[cfg(test)] mod tests { use super::{TangleOutboundQueueError, TangleWebSocketSession}; - use crate::runtime::TangleShutdownSignal; + use crate::{ + config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json}, + runtime::{TangleRuntime, TangleRuntimeHandle, TangleShutdownSignal}, + }; use axum::extract::ws::Message; + use serde_json::json; + use std::path::{Path, PathBuf}; #[test] fn websocket_session_records_connection_time() { let before = std::time::Instant::now(); let shutdown = TangleShutdownSignal::new(); - let session = TangleWebSocketSession::new(8, shutdown.subscribe()).expect("session"); + let (runtime, auth) = session_runtime("records-connection-time"); + let session = + TangleWebSocketSession::new(8, shutdown.subscribe(), runtime, auth).expect("session"); assert!(session.connected_at() >= before); } @@ -128,14 +195,17 @@ mod tests { #[test] fn websocket_session_rejects_zero_outbound_capacity() { let shutdown = TangleShutdownSignal::new(); + let (runtime, auth) = session_runtime("zero-outbound-capacity"); - assert!(TangleWebSocketSession::new(0, shutdown.subscribe()).is_err()); + assert!(TangleWebSocketSession::new(0, shutdown.subscribe(), runtime, auth).is_err()); } #[test] fn websocket_session_observes_shutdown_request() { let shutdown = TangleShutdownSignal::new(); - let session = TangleWebSocketSession::new(8, shutdown.subscribe()).expect("session"); + let (runtime, auth) = session_runtime("observes-shutdown"); + let session = + TangleWebSocketSession::new(8, shutdown.subscribe(), runtime, auth).expect("session"); assert!(!session.shutdown_requested()); @@ -147,7 +217,9 @@ mod tests { #[test] fn outbound_queue_is_bounded() { let shutdown = TangleShutdownSignal::new(); - let session = TangleWebSocketSession::new(1, shutdown.subscribe()).expect("session"); + let (runtime, auth) = session_runtime("outbound-queue"); + let session = + TangleWebSocketSession::new(1, shutdown.subscribe(), runtime, auth).expect("session"); let outbound = session.outbound(); assert_eq!(outbound.capacity(), 1); @@ -161,4 +233,42 @@ mod tests { TangleOutboundQueueError::Full ); } + + fn session_runtime(name: &str) -> (TangleRuntimeHandle, crate::relay::auth::BaseAuthState) { + let root = temp_root(name); + let _ = std::fs::remove_dir_all(&root); + let runtime = TangleRuntime::open(runtime_config(&root)).expect("runtime"); + let auth = runtime.auth_state().expect("auth"); + (TangleRuntimeHandle::new(runtime), auth) + } + + fn runtime_config(root: &Path) -> BaseRelayRuntimeConfig { + let raw = json!({ + "server": { + "listen_addr": "127.0.0.1:0", + "relay_url": "wss://relay.radroots.test" + }, + "pocket": { + "data_directory": root.join("pocket"), + "map_size_bytes": 1073741824_u64, + "reader_slots": 128, + "sync_policy": "flush_on_shutdown" + }, + "groups": { + "enabled": false + }, + "auth": { + "challenge_ttl_seconds": 300 + }, + "limits": { + "max_pending_events": 8 + } + }) + .to_string(); + parse_base_relay_runtime_config_json(&raw).expect("config") + } + + fn temp_root(name: &str) -> PathBuf { + std::env::temp_dir().join(format!("tangle-session-{name}-{}", std::process::id())) + } }