tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 579e7060290f99c6515a884d8dca0b808dd8c01a
parent aad1d6d864bf97cf6180f37983f2288e3acfd2a3
Author: triesap <tyson@radroots.org>
Date:   Sat,  6 Jun 2026 01:29:32 -0700

ws: add graceful shutdown

Diffstat:
Mcrates/tangle_runtime/Cargo.toml | 1+
Mcrates/tangle_runtime/src/lib.rs | 104+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------
2 files changed, 97 insertions(+), 8 deletions(-)

diff --git a/crates/tangle_runtime/Cargo.toml b/crates/tangle_runtime/Cargo.toml @@ -17,6 +17,7 @@ tangle_nips = { path = "../tangle_nips" } tangle_protocol = { path = "../tangle_protocol" } tangle_store = { path = "../tangle_store" } tangle_store_surreal = { path = "../tangle_store_surreal" } +tokio = { version = "1", features = ["sync"] } url = "2" [dev-dependencies] diff --git a/crates/tangle_runtime/src/lib.rs b/crates/tangle_runtime/src/lib.rs @@ -177,19 +177,35 @@ impl RelayConnection { } } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone)] pub struct WebSocketHttpState { connection_config: RelayConnectionConfig, + shutdown_signal: GracefulShutdownSignal, } impl WebSocketHttpState { pub fn new(connection_config: RelayConnectionConfig) -> Self { - Self { connection_config } + let (shutdown_signal, _) = GracefulShutdownSignal::new(); + Self::with_shutdown(connection_config, shutdown_signal) + } + + pub fn with_shutdown( + connection_config: RelayConnectionConfig, + shutdown_signal: GracefulShutdownSignal, + ) -> Self { + Self { + connection_config, + shutdown_signal, + } } pub fn connection_config(&self) -> &RelayConnectionConfig { &self.connection_config } + + pub fn shutdown_signal(&self) -> &GracefulShutdownSignal { + &self.shutdown_signal + } } impl Default for WebSocketHttpState { @@ -198,6 +214,54 @@ impl Default for WebSocketHttpState { } } +#[derive(Debug, Clone)] +pub struct GracefulShutdownSignal { + sender: tokio::sync::watch::Sender<bool>, +} + +impl GracefulShutdownSignal { + pub fn new() -> (Self, GracefulShutdownListener) { + let (sender, receiver) = tokio::sync::watch::channel(false); + (Self { sender }, GracefulShutdownListener { receiver }) + } + + pub fn subscribe(&self) -> GracefulShutdownListener { + GracefulShutdownListener { + receiver: self.sender.subscribe(), + } + } + + pub fn request_shutdown(&self) -> bool { + self.sender.send(true).is_ok() + } + + pub fn is_shutdown_requested(&self) -> bool { + *self.sender.borrow() + } +} + +#[derive(Debug, Clone)] +pub struct GracefulShutdownListener { + receiver: tokio::sync::watch::Receiver<bool>, +} + +impl GracefulShutdownListener { + pub fn is_shutdown_requested(&self) -> bool { + *self.receiver.borrow() + } + + pub async fn wait_for_shutdown(&mut self) { + if self.is_shutdown_requested() { + return; + } + while self.receiver.changed().await.is_ok() { + if self.is_shutdown_requested() { + return; + } + } + } +} + #[derive(Debug, Clone, PartialEq, Eq)] pub enum ClientFrame { Text(String), @@ -1179,6 +1243,7 @@ async fn websocket_upgrade( websocket .on_upgrade(move |_socket| async move { let _connection_config = state.connection_config; + let _shutdown = state.shutdown_signal.subscribe(); }) .into_response() } @@ -1771,12 +1836,12 @@ mod tests { use super::{ ApiError, ApiErrorBody, ApiErrorCode, ApiErrorEnvelope, AuthMessageHandler, ClientFrame, ClientFrameOutcome, ClientMessageLoop, CloseMessageHandler, CloseMessageOutcome, - EventMessageHandler, ListingsHttpState, LiveEventFanout, ReadinessCheckStatus, - ReadinessState, RelayConnection, RelayConnectionConfig, RelayConnectionId, - RelayInfoDocument, ReqMessageHandler, TANGLE_RELAY_SOFTWARE, TANGLE_SUPPORTED_NIPS, - WebSocketHttpState, health_router, listing_item_document, listing_projection_query, - listings_router, parse_listing_query, parse_marketplace_search_query, relay_info_router, - search_document_query, websocket_router, + EventMessageHandler, GracefulShutdownSignal, ListingsHttpState, LiveEventFanout, + ReadinessCheckStatus, ReadinessState, RelayConnection, RelayConnectionConfig, + RelayConnectionId, RelayInfoDocument, ReqMessageHandler, TANGLE_RELAY_SOFTWARE, + TANGLE_SUPPORTED_NIPS, WebSocketHttpState, health_router, listing_item_document, + listing_projection_query, listings_router, parse_listing_query, + parse_marketplace_search_query, relay_info_router, search_document_query, websocket_router, }; use axum::{body::Body, response::IntoResponse}; use http::{HeaderValue, Request, StatusCode, header}; @@ -1971,6 +2036,29 @@ mod tests { default_state.connection_config().relay_url(), "wss://relay.radroots.test" ); + assert_eq!(state.shutdown_signal().is_shutdown_requested(), false); + assert_eq!( + default_state.shutdown_signal().is_shutdown_requested(), + false + ); + } + + #[tokio::test] + async fn graceful_shutdown_signal_notifies_subscribers() { + let (shutdown, mut first) = GracefulShutdownSignal::new(); + let mut second = shutdown.subscribe(); + + assert_eq!(shutdown.is_shutdown_requested(), false); + assert_eq!(first.is_shutdown_requested(), false); + assert_eq!(second.is_shutdown_requested(), false); + + assert_eq!(shutdown.request_shutdown(), true); + first.wait_for_shutdown().await; + second.wait_for_shutdown().await; + + assert_eq!(shutdown.is_shutdown_requested(), true); + assert_eq!(first.is_shutdown_requested(), true); + assert_eq!(second.is_shutdown_requested(), true); } #[tokio::test]