tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit cd6fbb8c06d26ba7fbe6ce81cfb78655bf9b366e
parent 9cab9dc5d63c4dc6a0a221cc81caf74d45bc8e18
Author: triesap <tyson@radroots.org>
Date:   Sun, 14 Jun 2026 02:40:25 -0700

runtime: close websocket sessions on shutdown

- send a close frame when session shutdown is requested
- add a server integration test for shutdown closing upgraded sockets
- add a session test for observing shutdown requests
- verify formatting and focused runtime shutdown tests

Diffstat:
MCargo.lock | 1+
Mcrates/tangle_runtime/Cargo.toml | 1+
Mcrates/tangle_runtime/src/server.rs | 46+++++++++++++++++++++++++++++++++++++++++++++-
Mcrates/tangle_runtime/src/session.rs | 23+++++++++++++++++++++--
4 files changed, 68 insertions(+), 3 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -1258,6 +1258,7 @@ name = "tangle_runtime" version = "0.1.0" dependencies = [ "axum", + "futures-util", "http", "serde", "serde_json", diff --git a/crates/tangle_runtime/Cargo.toml b/crates/tangle_runtime/Cargo.toml @@ -20,6 +20,7 @@ tokio = { version = "1", features = ["net", "sync"] } tracing = "0.1" [dev-dependencies] +futures-util = { version = "0.3", default-features = false, features = ["std"] } tangle_test_support = { path = "../tangle_test_support" } tokio = { version = "1", features = ["macros", "rt", "time"] } tokio-tungstenite = "0.29" diff --git a/crates/tangle_runtime/src/server.rs b/crates/tangle_runtime/src/server.rs @@ -145,11 +145,15 @@ mod tests { runtime::{TangleRuntime, TangleShutdownSignal}, }; use axum::body::to_bytes; + use futures_util::StreamExt; use http::{Request, header}; use serde_json::json; use std::path::{Path, PathBuf}; use tokio::net::TcpListener; - use tokio_tungstenite::tungstenite::client::IntoClientRequest; + use tokio::time::{Duration, timeout}; + use tokio_tungstenite::tungstenite::{ + Message as TungsteniteMessage, client::IntoClientRequest, + }; use tower::ServiceExt; #[tokio::test] @@ -208,6 +212,46 @@ mod tests { } #[tokio::test] + async fn serve_until_shutdown_closes_websocket_sessions() { + let root = temp_root("websocket-shutdown"); + let _ = std::fs::remove_dir_all(&root); + let runtime = TangleRuntime::open(runtime_config(&root)).expect("runtime"); + let shutdown = runtime.shutdown_signal().clone(); + let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener"); + let address = listener.local_addr().expect("address"); + let task = tokio::spawn(super::serve_listener_until_shutdown(runtime, listener)); + let mut request = format!("ws://{address}/") + .into_client_request() + .expect("request"); + request.headers_mut().insert( + header::SEC_WEBSOCKET_PROTOCOL, + http::HeaderValue::from_static("nostr"), + ); + let (mut socket, response) = tokio_tungstenite::connect_async(request) + .await + .expect("websocket"); + + assert_eq!(response.status(), http::StatusCode::SWITCHING_PROTOCOLS); + + shutdown.request_shutdown(); + + let next = timeout(Duration::from_secs(1), socket.next()) + .await + .expect("websocket close"); + match next { + Some(Ok(TungsteniteMessage::Close(_))) | None => {} + other => panic!("expected websocket close, got {other:?}"), + } + let report = timeout(Duration::from_secs(1), task) + .await + .expect("server shutdown") + .expect("task") + .expect("serve"); + assert_eq!(report.listen_addr(), address); + let _ = std::fs::remove_dir_all(root); + } + + #[tokio::test] async fn tangle_http_router_serves_nip11_health_and_ready_routes() { let root = temp_root("http-router"); let config = runtime_config(&root); diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs @@ -43,9 +43,14 @@ impl TangleWebSocketSession { self.outbound.clone() } + pub fn shutdown_requested(&self) -> bool { + *self.shutdown.borrow() + } + pub async fn run(mut self, mut socket: WebSocket) { loop { - if *self.shutdown.borrow() { + if self.shutdown_requested() { + let _ = socket.send(Message::Close(None)).await; break; } tokio::select! { @@ -64,7 +69,8 @@ impl TangleWebSocketSession { } } changed = self.shutdown.changed() => { - if changed.is_err() || *self.shutdown.borrow() { + if changed.is_err() || self.shutdown_requested() { + let _ = socket.send(Message::Close(None)).await; break; } } @@ -122,10 +128,23 @@ mod tests { #[test] fn websocket_session_rejects_zero_outbound_capacity() { let shutdown = TangleShutdownSignal::new(); + assert!(TangleWebSocketSession::new(0, shutdown.subscribe()).is_err()); } #[test] + fn websocket_session_observes_shutdown_request() { + let shutdown = TangleShutdownSignal::new(); + let session = TangleWebSocketSession::new(8, shutdown.subscribe()).expect("session"); + + assert!(!session.shutdown_requested()); + + shutdown.request_shutdown(); + + assert!(session.shutdown_requested()); + } + + #[test] fn outbound_queue_is_bounded() { let shutdown = TangleShutdownSignal::new(); let session = TangleWebSocketSession::new(1, shutdown.subscribe()).expect("session");