commit 34d7ebb3a80789f7fddc68aa9ad88390e6ffe054
parent e7df9a96d223eccee9f212b16c5858628511ec47
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 02:34:17 -0700
runtime: add websocket queue skeleton
- add a bounded outbound queue to WebSocket sessions
- drain inbound frames and queued outbound frames in the session loop
- route session capacity from runtime limits into upgraded connections
- verify formatting, session tests, runtime tests, workspace checks, and clippy
Diffstat:
2 files changed, 119 insertions(+), 17 deletions(-)
diff --git a/crates/tangle_runtime/src/server.rs b/crates/tangle_runtime/src/server.rs
@@ -61,7 +61,11 @@ pub async fn serve_listener_until_shutdown(
.map_err(|error| BaseRelayError::error(error.to_string()))?;
let info =
BaseRelayInfoConfig::new("tangle", runtime.config().groups().clone())?.build_document()?;
- let router = tangle_http_router(runtime.readiness_state().clone(), info);
+ let router = tangle_http_router(
+ runtime.readiness_state().clone(),
+ info,
+ runtime.limits().outbound_queue_capacity(),
+ );
let mut shutdown = runtime.shutdown_signal().subscribe();
axum::serve(listener, router)
.with_graceful_shutdown(async move {
@@ -86,16 +90,21 @@ pub async fn serve_listener_until_shutdown(
pub fn tangle_http_router(
readiness: BaseRelayReadinessState,
info: BaseRelayInfoDocument,
+ outbound_queue_capacity: usize,
) -> Router {
Router::new()
.route("/", get(tangle_root))
- .with_state(TangleHttpState { info })
+ .with_state(TangleHttpState {
+ info,
+ outbound_queue_capacity,
+ })
.merge(base_relay_ops_router(readiness))
}
#[derive(Debug, Clone)]
struct TangleHttpState {
info: BaseRelayInfoDocument,
+ outbound_queue_capacity: usize,
}
async fn tangle_root(
@@ -104,10 +113,17 @@ async fn tangle_root(
headers: HeaderMap,
) -> Response {
match websocket {
- Ok(websocket) => websocket
- .protocols(["nostr"])
- .on_upgrade(|socket| TangleWebSocketSession::new().run(socket))
- .into_response(),
+ Ok(websocket) => match TangleWebSocketSession::new(state.outbound_queue_capacity) {
+ Ok(session) => websocket
+ .protocols(["nostr"])
+ .on_upgrade(move |socket| session.run(socket))
+ .into_response(),
+ Err(error) => (
+ http::StatusCode::INTERNAL_SERVER_ERROR,
+ error.prefixed_message(),
+ )
+ .into_response(),
+ },
Err(_) => base_relay_info_response(state.info, headers),
}
}
@@ -192,7 +208,7 @@ mod tests {
.expect("info config")
.build_document()
.expect("info");
- let router = tangle_http_router(BaseRelayReadinessState::ready(), info);
+ let router = tangle_http_router(BaseRelayReadinessState::ready(), info, 8);
let nip11 = router
.clone()
.oneshot(
diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs
@@ -1,42 +1,128 @@
#![forbid(unsafe_code)]
-use axum::extract::ws::WebSocket;
+use crate::errors::BaseRelayError;
+use axum::extract::ws::{Message, WebSocket};
use std::time::Instant;
+use tokio::sync::mpsc;
#[derive(Debug)]
pub struct TangleWebSocketSession {
connected_at: Instant,
+ outbound: TangleOutboundSender,
+ outbound_receiver: mpsc::Receiver<Message>,
}
impl TangleWebSocketSession {
- pub fn new() -> Self {
- Self {
- connected_at: Instant::now(),
+ pub fn new(outbound_queue_capacity: usize) -> Result<Self, BaseRelayError> {
+ if outbound_queue_capacity == 0 {
+ return Err(BaseRelayError::invalid(
+ "runtime outbound queue capacity must be greater than zero",
+ ));
}
+ let (sender, receiver) = mpsc::channel(outbound_queue_capacity);
+ Ok(Self {
+ connected_at: Instant::now(),
+ outbound: TangleOutboundSender {
+ sender,
+ capacity: outbound_queue_capacity,
+ },
+ outbound_receiver: receiver,
+ })
}
pub fn connected_at(&self) -> Instant {
self.connected_at
}
- pub async fn run(self, _socket: WebSocket) {}
+ pub fn outbound(&self) -> TangleOutboundSender {
+ self.outbound.clone()
+ }
+
+ pub async fn run(mut self, mut socket: WebSocket) {
+ loop {
+ tokio::select! {
+ incoming = socket.recv() => {
+ match incoming {
+ Some(Ok(Message::Close(_))) | Some(Err(_)) | None => break,
+ Some(Ok(_)) => {}
+ }
+ }
+ outgoing = self.outbound_receiver.recv() => {
+ let Some(message) = outgoing else {
+ break;
+ };
+ if socket.send(message).await.is_err() {
+ break;
+ }
+ }
+ }
+ }
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct TangleOutboundSender {
+ sender: mpsc::Sender<Message>,
+ capacity: usize,
}
-impl Default for TangleWebSocketSession {
- fn default() -> Self {
- Self::new()
+impl TangleOutboundSender {
+ pub fn capacity(&self) -> usize {
+ self.capacity
+ }
+
+ pub fn try_send(&self, message: Message) -> Result<(), TangleOutboundQueueError> {
+ self.sender.try_send(message).map_err(Into::into)
+ }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum TangleOutboundQueueError {
+ Full,
+ Closed,
+}
+
+impl From<mpsc::error::TrySendError<Message>> for TangleOutboundQueueError {
+ fn from(error: mpsc::error::TrySendError<Message>) -> Self {
+ match error {
+ mpsc::error::TrySendError::Full(_) => Self::Full,
+ mpsc::error::TrySendError::Closed(_) => Self::Closed,
+ }
}
}
#[cfg(test)]
mod tests {
- use super::TangleWebSocketSession;
+ use super::{TangleOutboundQueueError, TangleWebSocketSession};
+ use axum::extract::ws::Message;
#[test]
fn websocket_session_records_connection_time() {
let before = std::time::Instant::now();
- let session = TangleWebSocketSession::new();
+ let session = TangleWebSocketSession::new(8).expect("session");
assert!(session.connected_at() >= before);
}
+
+ #[test]
+ fn websocket_session_rejects_zero_outbound_capacity() {
+ assert!(TangleWebSocketSession::new(0).is_err());
+ }
+
+ #[test]
+ fn outbound_queue_is_bounded() {
+ let session = TangleWebSocketSession::new(1).expect("session");
+ let outbound = session.outbound();
+
+ assert_eq!(outbound.capacity(), 1);
+ outbound
+ .try_send(Message::Text("first".into()))
+ .expect("first");
+ assert_eq!(
+ outbound
+ .try_send(Message::Text("second".into()))
+ .expect_err("full"),
+ TangleOutboundQueueError::Full
+ );
+ }
}