commit 9cab9dc5d63c4dc6a0a221cc81caf74d45bc8e18
parent 34d7ebb3a80789f7fddc68aa9ad88390e6ffe054
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 02:35:24 -0700
runtime: propagate websocket shutdown
- pass the runtime shutdown signal into upgraded WebSocket sessions
- make session loops exit on shutdown, close frames, socket errors, or closed queues
- keep route construction explicit about outbound queue capacity and shutdown state
- verify formatting, focused session tests, runtime tests, workspace checks, and clippy
Diffstat:
2 files changed, 38 insertions(+), 9 deletions(-)
diff --git a/crates/tangle_runtime/src/server.rs b/crates/tangle_runtime/src/server.rs
@@ -4,7 +4,7 @@ use crate::{
errors::BaseRelayError,
nip11::{BaseRelayInfoConfig, BaseRelayInfoDocument, base_relay_info_response},
ops::{BaseRelayReadinessState, base_relay_ops_router},
- runtime::TangleRuntime,
+ runtime::{TangleRuntime, TangleShutdownSignal},
session::TangleWebSocketSession,
};
use axum::{
@@ -65,6 +65,7 @@ pub async fn serve_listener_until_shutdown(
runtime.readiness_state().clone(),
info,
runtime.limits().outbound_queue_capacity(),
+ runtime.shutdown_signal().clone(),
);
let mut shutdown = runtime.shutdown_signal().subscribe();
axum::serve(listener, router)
@@ -91,12 +92,14 @@ pub fn tangle_http_router(
readiness: BaseRelayReadinessState,
info: BaseRelayInfoDocument,
outbound_queue_capacity: usize,
+ shutdown: TangleShutdownSignal,
) -> Router {
Router::new()
.route("/", get(tangle_root))
.with_state(TangleHttpState {
info,
outbound_queue_capacity,
+ shutdown,
})
.merge(base_relay_ops_router(readiness))
}
@@ -105,6 +108,7 @@ pub fn tangle_http_router(
struct TangleHttpState {
info: BaseRelayInfoDocument,
outbound_queue_capacity: usize,
+ shutdown: TangleShutdownSignal,
}
async fn tangle_root(
@@ -113,7 +117,10 @@ async fn tangle_root(
headers: HeaderMap,
) -> Response {
match websocket {
- Ok(websocket) => match TangleWebSocketSession::new(state.outbound_queue_capacity) {
+ Ok(websocket) => match TangleWebSocketSession::new(
+ state.outbound_queue_capacity,
+ state.shutdown.subscribe(),
+ ) {
Ok(session) => websocket
.protocols(["nostr"])
.on_upgrade(move |socket| session.run(socket))
@@ -135,7 +142,7 @@ mod tests {
config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json},
nip11::BaseRelayInfoConfig,
ops::BaseRelayReadinessState,
- runtime::TangleRuntime,
+ runtime::{TangleRuntime, TangleShutdownSignal},
};
use axum::body::to_bytes;
use http::{Request, header};
@@ -208,7 +215,12 @@ mod tests {
.expect("info config")
.build_document()
.expect("info");
- let router = tangle_http_router(BaseRelayReadinessState::ready(), info, 8);
+ let router = tangle_http_router(
+ BaseRelayReadinessState::ready(),
+ info,
+ 8,
+ TangleShutdownSignal::new(),
+ );
let nip11 = router
.clone()
.oneshot(
diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs
@@ -3,17 +3,21 @@
use crate::errors::BaseRelayError;
use axum::extract::ws::{Message, WebSocket};
use std::time::Instant;
-use tokio::sync::mpsc;
+use tokio::sync::{mpsc, watch};
#[derive(Debug)]
pub struct TangleWebSocketSession {
connected_at: Instant,
outbound: TangleOutboundSender,
outbound_receiver: mpsc::Receiver<Message>,
+ shutdown: watch::Receiver<bool>,
}
impl TangleWebSocketSession {
- pub fn new(outbound_queue_capacity: usize) -> Result<Self, BaseRelayError> {
+ pub fn new(
+ outbound_queue_capacity: usize,
+ shutdown: watch::Receiver<bool>,
+ ) -> Result<Self, BaseRelayError> {
if outbound_queue_capacity == 0 {
return Err(BaseRelayError::invalid(
"runtime outbound queue capacity must be greater than zero",
@@ -27,6 +31,7 @@ impl TangleWebSocketSession {
capacity: outbound_queue_capacity,
},
outbound_receiver: receiver,
+ shutdown,
})
}
@@ -40,6 +45,9 @@ impl TangleWebSocketSession {
pub async fn run(mut self, mut socket: WebSocket) {
loop {
+ if *self.shutdown.borrow() {
+ break;
+ }
tokio::select! {
incoming = socket.recv() => {
match incoming {
@@ -55,6 +63,11 @@ impl TangleWebSocketSession {
break;
}
}
+ changed = self.shutdown.changed() => {
+ if changed.is_err() || *self.shutdown.borrow() {
+ break;
+ }
+ }
}
}
}
@@ -94,24 +107,28 @@ impl From<mpsc::error::TrySendError<Message>> for TangleOutboundQueueError {
#[cfg(test)]
mod tests {
use super::{TangleOutboundQueueError, TangleWebSocketSession};
+ use crate::runtime::TangleShutdownSignal;
use axum::extract::ws::Message;
#[test]
fn websocket_session_records_connection_time() {
let before = std::time::Instant::now();
- let session = TangleWebSocketSession::new(8).expect("session");
+ let shutdown = TangleShutdownSignal::new();
+ let session = TangleWebSocketSession::new(8, shutdown.subscribe()).expect("session");
assert!(session.connected_at() >= before);
}
#[test]
fn websocket_session_rejects_zero_outbound_capacity() {
- assert!(TangleWebSocketSession::new(0).is_err());
+ let shutdown = TangleShutdownSignal::new();
+ assert!(TangleWebSocketSession::new(0, shutdown.subscribe()).is_err());
}
#[test]
fn outbound_queue_is_bounded() {
- let session = TangleWebSocketSession::new(1).expect("session");
+ let shutdown = TangleShutdownSignal::new();
+ let session = TangleWebSocketSession::new(1, shutdown.subscribe()).expect("session");
let outbound = session.outbound();
assert_eq!(outbound.capacity(), 1);