commit 561fa31180e332da0796f7008d5e1467d1b50711
parent 304238cd7f9f1a458bd8fce25843bc3344a5f0f5
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 17:15:01 -0700
runtime: harden live backpressure
Diffstat:
7 files changed, 201 insertions(+), 83 deletions(-)
diff --git a/crates/tangle_bench/src/lib.rs b/crates/tangle_bench/src/lib.rs
@@ -1060,13 +1060,13 @@ fn run_broadcast_lag_benchmark(dataset: &BenchDataset) -> Result<ScenarioReport,
.iter()
.filter(|message| matches!(message, RelayMessage::Event { .. }))
.count();
- let closed = second_messages
+ let second_events = second_messages
.iter()
- .filter(|message| matches!(message, RelayMessage::Closed { .. }))
+ .filter(|message| matches!(message, RelayMessage::Event { .. }))
.count();
let accepted = if first_events == subscriber_count
- && closed == subscriber_count
- && materialized.relay.active_subscription_count() == 0
+ && second_events == subscriber_count
+ && materialized.relay.active_subscription_count() == subscriber_count
{
subscriber_count
} else {
@@ -1833,7 +1833,7 @@ mod tests {
}
#[test]
- fn broadcast_lag_scenario_closes_slow_subscriptions() {
+ fn broadcast_lag_scenario_keeps_healthy_subscriptions_open() {
let dataset =
BenchDataset::generate(BenchDatasetConfig::new(3, 1, 1, 0, 1)).expect("dataset");
let scenario = super::run_broadcast_lag_benchmark(&dataset).expect("lag");
diff --git a/crates/tangle_runtime/src/ops.rs b/crates/tangle_runtime/src/ops.rs
@@ -257,6 +257,7 @@ mod tests {
metrics.record_event_bus_receivers(2);
metrics.record_event_bus_publish(2);
metrics.record_event_bus_lagged(3);
+ metrics.record_outbound_queue_full_close();
metrics.record_outbox_pending_events(5);
metrics.record_outbox_replayed_event();
metrics.record_disk_used_bytes(89);
@@ -322,6 +323,7 @@ mod tests {
"tangle_event_rejected_total",
"tangle_group_read_denied_total",
"tangle_group_write_denied_total",
+ "tangle_outbound_queue_full_closes_total",
"tangle_outbox_pending_events",
"tangle_outbox_replayed_events_total",
"tangle_query_latency_count",
@@ -356,6 +358,7 @@ mod tests {
assert_eq!(metrics_value["tangle_event_bus_published_offsets_total"], 1);
assert_eq!(metrics_value["tangle_event_bus_lagged_receivers_total"], 1);
assert_eq!(metrics_value["tangle_event_bus_lagged_offsets_total"], 3);
+ assert_eq!(metrics_value["tangle_outbound_queue_full_closes_total"], 1);
assert_eq!(metrics_value["tangle_outbox_pending_events"], 5);
assert_eq!(metrics_value["tangle_outbox_replayed_events_total"], 1);
assert_eq!(metrics_value["tangle_disk_used_bytes"], 89);
diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs
@@ -1010,10 +1010,6 @@ impl BaseRelay {
})
}
- pub fn mark_delivered(&mut self, subscription_id: &SubscriptionId) {
- self.subscriptions.mark_delivered(subscription_id);
- }
-
pub fn active_subscription_count(&self) -> usize {
self.subscriptions.active_count()
}
@@ -2456,9 +2452,9 @@ mod tests {
}
#[test]
- fn live_subscription_lag_closes_subscription_for_resync() {
- let mut relay = test_relay("base-relay-lag", 1);
- let subscription_id = SubscriptionId::new("sub-lag").expect("sub");
+ fn live_subscription_delivery_volume_does_not_close_subscription() {
+ let mut relay = test_relay("base-relay-delivery-volume", 1);
+ let subscription_id = SubscriptionId::new("sub-volume").expect("sub");
let filter = filter_from_value(&serde_json::json!({"kinds":[1]})).expect("filter");
relay
.handle_req(subscription_id.clone(), vec![filter])
@@ -2470,14 +2466,11 @@ mod tests {
relay.fanout(&first).as_slice(),
[RelayMessage::Event { .. }]
));
- assert_eq!(
- relay.fanout(&second),
- vec![RelayMessage::Closed {
- subscription_id: subscription_id.clone(),
- message: "error: subscription lagged; resync required".to_owned()
- }]
- );
- assert_eq!(relay.active_subscription_count(), 0);
+ assert!(matches!(
+ relay.fanout(&second).as_slice(),
+ [RelayMessage::Event { .. }]
+ ));
+ assert_eq!(relay.active_subscription_count(), 1);
}
#[test]
diff --git a/crates/tangle_runtime/src/relay/live.rs b/crates/tangle_runtime/src/relay/live.rs
@@ -8,8 +8,6 @@ use tangle_protocol::{Event, Filter, RelayMessage, SubscriptionId};
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct LiveSubscriptionSet {
subscriptions: BTreeMap<SubscriptionId, LiveSubscription>,
- pending: BTreeMap<SubscriptionId, usize>,
- max_pending_events: usize,
max_subscriptions: usize,
}
@@ -36,8 +34,6 @@ impl LiveSubscriptionSet {
}
Ok(Self {
subscriptions: BTreeMap::new(),
- pending: BTreeMap::new(),
- max_pending_events,
max_subscriptions,
})
}
@@ -61,13 +57,11 @@ impl LiveSubscriptionSet {
));
}
self.subscriptions
- .insert(subscription_id.clone(), LiveSubscription { filters, auth });
- self.pending.insert(subscription_id, 0);
+ .insert(subscription_id, LiveSubscription { filters, auth });
Ok(())
}
pub(crate) fn close(&mut self, subscription_id: &SubscriptionId) -> CloseResult {
- self.pending.remove(subscription_id);
if self.subscriptions.remove(subscription_id).is_some() {
CloseResult::Closed
} else {
@@ -78,7 +72,6 @@ impl LiveSubscriptionSet {
pub(crate) fn close_all(&mut self) -> usize {
let closed = self.subscriptions.len();
self.subscriptions.clear();
- self.pending.clear();
closed
}
@@ -107,30 +100,14 @@ impl LiveSubscriptionSet {
.collect::<Vec<_>>();
let mut messages = Vec::new();
for subscription_id in matched {
- let pending = self.pending.entry(subscription_id.clone()).or_insert(0);
- *pending += 1;
- if *pending > self.max_pending_events {
- self.close(&subscription_id);
- messages.push(RelayMessage::Closed {
- subscription_id,
- message: "error: subscription lagged; resync required".to_owned(),
- });
- } else {
- messages.push(RelayMessage::Event {
- subscription_id,
- event: event.clone(),
- });
- }
+ messages.push(RelayMessage::Event {
+ subscription_id,
+ event: event.clone(),
+ });
}
messages
}
- pub(crate) fn mark_delivered(&mut self, subscription_id: &SubscriptionId) {
- if let Some(pending) = self.pending.get_mut(subscription_id) {
- *pending = 0;
- }
- }
-
pub(crate) fn active_count(&self) -> usize {
self.subscriptions.len()
}
@@ -150,7 +127,7 @@ mod tests {
use tangle_test_support::{FixtureKey, tangle_v2_event};
#[test]
- fn live_subscription_fanout_closes_lagged_subscriptions() {
+ fn live_subscription_fanout_keeps_healthy_subscriptions_open() {
let mut subscriptions = LiveSubscriptionSet::new(1, 1).expect("subscriptions");
let subscription_id = SubscriptionId::new("live").expect("subscription");
subscriptions
@@ -164,19 +141,24 @@ mod tests {
.expect("first");
let second = tangle_v2_event(FixtureKey::Member, 1_714_124_434, 1, Vec::new(), "second")
.expect("second");
+ let third = tangle_v2_event(FixtureKey::Member, 1_714_124_435, 1, Vec::new(), "third")
+ .expect("third");
assert!(matches!(
subscriptions.fanout(&first, |_, _| true).as_slice(),
[RelayMessage::Event { subscription_id: delivered, event }]
if delivered == &subscription_id && event.id() == first.id()
));
- assert_eq!(
- subscriptions.fanout(&second, |_, _| true),
- vec![RelayMessage::Closed {
- subscription_id: subscription_id.clone(),
- message: "error: subscription lagged; resync required".to_owned()
- }]
- );
- assert_eq!(subscriptions.close(&subscription_id), CloseResult::NotFound);
+ assert!(matches!(
+ subscriptions.fanout(&second, |_, _| true).as_slice(),
+ [RelayMessage::Event { subscription_id: delivered, event }]
+ if delivered == &subscription_id && event.id() == second.id()
+ ));
+ assert!(matches!(
+ subscriptions.fanout(&third, |_, _| true).as_slice(),
+ [RelayMessage::Event { subscription_id: delivered, event }]
+ if delivered == &subscription_id && event.id() == third.id()
+ ));
+ assert_eq!(subscriptions.close(&subscription_id), CloseResult::Closed);
}
}
diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs
@@ -1107,6 +1107,7 @@ struct TangleRuntimeMetricsInner {
event_bus_published_offsets: AtomicU64,
event_bus_lagged_receivers: AtomicU64,
event_bus_lagged_offsets: AtomicU64,
+ outbound_queue_full_closes: AtomicU64,
outbox_pending_events: AtomicUsize,
outbox_replayed_events: AtomicU64,
disk_used_bytes: AtomicU64,
@@ -1151,6 +1152,7 @@ pub struct TangleRuntimeMetricsSnapshot {
tangle_event_bus_published_offsets_total: u64,
tangle_event_bus_lagged_receivers_total: u64,
tangle_event_bus_lagged_offsets_total: u64,
+ tangle_outbound_queue_full_closes_total: u64,
tangle_outbox_pending_events: usize,
tangle_outbox_replayed_events_total: u64,
tangle_disk_used_bytes: u64,
@@ -1237,6 +1239,7 @@ impl TangleRuntimeMetrics {
event_bus_published_offsets: AtomicU64::new(0),
event_bus_lagged_receivers: AtomicU64::new(0),
event_bus_lagged_offsets: AtomicU64::new(0),
+ outbound_queue_full_closes: AtomicU64::new(0),
outbox_pending_events: AtomicUsize::new(0),
outbox_replayed_events: AtomicU64::new(0),
disk_used_bytes: AtomicU64::new(0),
@@ -1278,6 +1281,7 @@ impl TangleRuntimeMetrics {
tangle_event_bus_published_offsets_total: self.event_bus_published_offsets(),
tangle_event_bus_lagged_receivers_total: self.event_bus_lagged_receivers(),
tangle_event_bus_lagged_offsets_total: self.event_bus_lagged_offsets(),
+ tangle_outbound_queue_full_closes_total: self.outbound_queue_full_closes(),
tangle_outbox_pending_events: self.outbox_pending_events(),
tangle_outbox_replayed_events_total: self.outbox_replayed_events(),
tangle_disk_used_bytes: self.disk_used_bytes(),
@@ -1387,6 +1391,12 @@ impl TangleRuntimeMetrics {
self.inner.event_bus_lagged_offsets.load(Ordering::Relaxed)
}
+ pub fn outbound_queue_full_closes(&self) -> u64 {
+ self.inner
+ .outbound_queue_full_closes
+ .load(Ordering::Relaxed)
+ }
+
pub fn outbox_pending_events(&self) -> usize {
self.inner.outbox_pending_events.load(Ordering::Relaxed)
}
@@ -1547,6 +1557,13 @@ impl TangleRuntimeMetrics {
.fetch_add(skipped, Ordering::Relaxed);
}
+ pub fn record_outbound_queue_full_close(&self) -> u64 {
+ self.inner
+ .outbound_queue_full_closes
+ .fetch_add(1, Ordering::Relaxed)
+ + 1
+ }
+
pub fn record_outbox_pending_events(&self, count: usize) {
self.inner
.outbox_pending_events
@@ -1735,6 +1752,7 @@ mod tests {
runtime.metrics().record_event_bus_receivers(3);
assert_eq!(runtime.metrics().record_event_bus_publish(3), 1);
runtime.metrics().record_event_bus_lagged(4);
+ assert_eq!(runtime.metrics().record_outbound_queue_full_close(), 1);
runtime.metrics().record_outbox_pending_events(2);
assert_eq!(runtime.metrics().record_outbox_replayed_event(), 1);
runtime.metrics().record_disk_used_bytes(5);
@@ -1764,6 +1782,7 @@ mod tests {
);
assert_eq!(snapshot_value["tangle_event_bus_lagged_receivers_total"], 1);
assert_eq!(snapshot_value["tangle_event_bus_lagged_offsets_total"], 4);
+ assert_eq!(snapshot_value["tangle_outbound_queue_full_closes_total"], 1);
assert_eq!(snapshot_value["tangle_outbox_pending_events"], 2);
assert_eq!(snapshot_value["tangle_outbox_replayed_events_total"], 1);
assert_eq!(snapshot_value["tangle_disk_used_bytes"], 5);
@@ -1801,6 +1820,7 @@ mod tests {
assert_eq!(value["tangle_event_admitted_total"], 1);
assert_eq!(value["tangle_event_bus_published_offsets_total"], 1);
assert_eq!(value["tangle_disk_used_bytes"], 42);
+ assert_eq!(value["tangle_outbound_queue_full_closes_total"], 0);
assert!(value.get("active_sessions").is_none());
assert!(value.get("stored_event_offsets").is_none());
}
diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs
@@ -131,8 +131,13 @@ impl TangleWebSocketSession {
match incoming {
Some(Ok(Message::Close(_))) | Some(Err(_)) | None => break,
Some(Ok(message)) => {
- if !self.handle_incoming_message(message).await {
- break;
+ match self.handle_incoming_message(message).await {
+ TangleSessionControl::Continue => {}
+ TangleSessionControl::Close(message) => {
+ let _ = socket.send(message).await;
+ break;
+ }
+ TangleSessionControl::Stop => break,
}
}
}
@@ -179,13 +184,7 @@ impl TangleWebSocketSession {
result: Result<tangle_groups::StoreOffset, TangleEventReceiveError>,
) -> TangleSessionControl {
match result {
- Ok(offset) => {
- if self.handle_event_offset(offset).await {
- TangleSessionControl::Continue
- } else {
- TangleSessionControl::Stop
- }
- }
+ Ok(offset) => self.handle_event_offset(offset).await,
Err(TangleEventReceiveError::Lagged(skipped)) => {
self.runtime.metrics().record_event_bus_lagged(skipped);
TangleSessionControl::Close(event_stream_lag_close_message())
@@ -195,7 +194,10 @@ impl TangleWebSocketSession {
}
}
- async fn handle_event_offset(&mut self, offset: tangle_groups::StoreOffset) -> bool {
+ async fn handle_event_offset(
+ &mut self,
+ offset: tangle_groups::StoreOffset,
+ ) -> TangleSessionControl {
let runtime = self.runtime.clone();
let replies = match runtime
.fanout_event_offset(offset, &mut self.subscriptions)
@@ -205,23 +207,24 @@ impl TangleWebSocketSession {
Err(error) => vec![RelayMessage::Notice(error.prefixed_message())],
};
for reply in replies {
- if self.send_relay_message(reply).is_err() {
- return false;
+ if let Err(control) = self.enqueue_relay_message(reply) {
+ return control;
}
}
- true
+ TangleSessionControl::Continue
}
- async fn handle_incoming_message(&mut self, message: Message) -> bool {
+ async fn handle_incoming_message(&mut self, message: Message) -> TangleSessionControl {
match message {
Message::Text(raw) => self.dispatch_text(raw.as_str()).await,
Message::Binary(_) => self
- .send_relay_message(RelayMessage::Notice(
+ .enqueue_relay_message(RelayMessage::Notice(
"invalid: client message must be a text frame".to_owned(),
))
- .is_ok(),
- Message::Ping(_) | Message::Pong(_) => true,
- Message::Close(_) => false,
+ .map(|_| TangleSessionControl::Continue)
+ .unwrap_or_else(|control| control),
+ Message::Ping(_) | Message::Pong(_) => TangleSessionControl::Continue,
+ Message::Close(_) => TangleSessionControl::Stop,
}
}
@@ -235,14 +238,15 @@ impl TangleWebSocketSession {
self.send_relay_message(message).is_ok()
}
- async fn dispatch_text(&mut self, raw: &str) -> bool {
+ async fn dispatch_text(&mut self, raw: &str) -> TangleSessionControl {
if raw.len() > self.limits.max_message_length() {
return self
- .send_relay_message(RelayMessage::Notice(format!(
+ .enqueue_relay_message(RelayMessage::Notice(format!(
"invalid: client message length exceeds runtime max_message_length {}",
self.limits.max_message_length()
)))
- .is_ok();
+ .map(|_| TangleSessionControl::Continue)
+ .unwrap_or_else(|control| control);
}
let replies = match parse_client_message(raw) {
Ok(message) => match self.handle_client_message(message).await {
@@ -252,11 +256,11 @@ impl TangleWebSocketSession {
Err(error) => vec![RelayMessage::Notice(format!("invalid: {error}"))],
};
for reply in replies {
- if self.send_relay_message(reply).is_err() {
- return false;
+ if let Err(control) = self.enqueue_relay_message(reply) {
+ return control;
}
}
- true
+ TangleSessionControl::Continue
}
async fn handle_client_message(
@@ -365,6 +369,24 @@ impl TangleWebSocketSession {
self.outbound
.try_send(Message::Text(message.encode().into()))
}
+
+ fn enqueue_relay_message(&self, message: RelayMessage) -> Result<(), TangleSessionControl> {
+ self.send_relay_message(message)
+ .map_err(|error| self.outbound_queue_error_control(error))
+ }
+
+ fn outbound_queue_error_control(
+ &self,
+ error: TangleOutboundQueueError,
+ ) -> TangleSessionControl {
+ match error {
+ TangleOutboundQueueError::Full => {
+ self.runtime.metrics().record_outbound_queue_full_close();
+ TangleSessionControl::Close(outbound_queue_full_close_message())
+ }
+ TangleOutboundQueueError::Closed => TangleSessionControl::Stop,
+ }
+ }
}
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -381,6 +403,13 @@ fn event_stream_lag_close_message() -> Message {
}))
}
+fn outbound_queue_full_close_message() -> Message {
+ Message::Close(Some(CloseFrame {
+ code: 1013,
+ reason: Utf8Bytes::from_static("outbound queue full; reconnect required"),
+ }))
+}
+
#[derive(Debug, Clone)]
pub struct TangleOutboundSender {
sender: mpsc::Sender<Message>,
@@ -425,7 +454,7 @@ fn current_unix_timestamp() -> UnixTimestamp {
mod tests {
use super::{
TangleOutboundQueueError, TangleSessionControl, TangleWebSocketSession,
- current_unix_timestamp, event_stream_lag_close_message,
+ current_unix_timestamp, event_stream_lag_close_message, outbound_queue_full_close_message,
};
use crate::{
config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json},
@@ -496,7 +525,10 @@ mod tests {
)
.expect("session");
- assert!(session.dispatch_text("123456789").await);
+ assert_eq!(
+ session.dispatch_text("123456789").await,
+ TangleSessionControl::Continue
+ );
let message = session.outbound_receiver.try_recv().expect("notice");
let Message::Text(text) = message else {
panic!("expected text notice")
@@ -705,6 +737,31 @@ mod tests {
);
}
+ #[tokio::test]
+ async fn websocket_session_closes_when_outbound_queue_is_full() {
+ let shutdown = TangleShutdownSignal::new();
+ let (runtime, auth, events) = session_runtime("outbound-queue-full-close");
+ let metrics = runtime.metrics();
+ let mut session = TangleWebSocketSession::new(
+ session_limits(1),
+ shutdown.subscribe(),
+ runtime,
+ auth,
+ events,
+ )
+ .expect("session");
+ session
+ .outbound()
+ .try_send(Message::Text("blocked".into()))
+ .expect("fill queue");
+
+ assert_eq!(
+ session.dispatch_text("{").await,
+ TangleSessionControl::Close(outbound_queue_full_close_message())
+ );
+ assert_eq!(metrics.outbound_queue_full_closes(), 1);
+ }
+
fn session_runtime(
name: &str,
) -> (
diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs
@@ -452,6 +452,69 @@ async fn websocket_public_relay_covers_query_count_ephemeral_and_rejection_flows
}
#[tokio::test]
+async fn websocket_healthy_subscriber_receives_more_than_outbound_capacity() {
+ let root = temp_root("acceptance-healthy-live-volume");
+ let _ = std::fs::remove_dir_all(&root);
+ let listener = TcpListener::bind("127.0.0.1:0").await.expect("listener");
+ let address = listener.local_addr().expect("address");
+ let runtime = TangleRuntime::open(runtime_config(&root, address)).expect("runtime");
+ let shutdown = runtime.shutdown_signal().clone();
+ let task = tokio::spawn(serve_listener_until_shutdown(runtime, listener));
+ let mut publisher = connect_nostr_socket(address).await;
+ let mut subscriber = connect_nostr_socket(address).await;
+ let _ = read_auth_challenge(&mut publisher).await;
+ let _ = read_auth_challenge(&mut subscriber).await;
+ send_client_value(
+ &mut subscriber,
+ json!(["REQ", "healthy-live", {"kinds":[1]}]),
+ )
+ .await;
+ assert_eq!(
+ read_relay_value(&mut subscriber).await,
+ json!(["EOSE", "healthy-live"])
+ );
+ let delivered_count = 10_u64;
+ for index in 0..delivered_count {
+ let event = tangle_v2_event(
+ FixtureKey::Member,
+ 1_714_124_500 + index,
+ 1,
+ Vec::new(),
+ &format!("healthy live {index}"),
+ )
+ .expect("event");
+ send_client_value(&mut publisher, json!(["EVENT", event_to_value(&event)])).await;
+ assert_ok(read_relay_value(&mut publisher).await, &event, true, "");
+ assert_live_event(
+ read_relay_value(&mut subscriber).await,
+ "healthy-live",
+ &event,
+ );
+ }
+ send_client_value(
+ &mut subscriber,
+ json!(["COUNT", "healthy-count", {"kinds":[1]}]),
+ )
+ .await;
+ assert_eq!(
+ read_relay_value(&mut subscriber).await,
+ json!(["COUNT", "healthy-count", {"count": delivered_count}])
+ );
+
+ shutdown.request_shutdown();
+ read_websocket_close(&mut publisher).await;
+ read_websocket_close(&mut subscriber).await;
+ let report = timeout(Duration::from_secs(2), task)
+ .await
+ .expect("shutdown timeout")
+ .expect("task")
+ .expect("serve");
+ assert_eq!(report.listen_addr(), address);
+
+ let _ = std::fs::remove_dir_all(root);
+}
+
+#[tokio::test]
async fn websocket_nip29_group_lifecycle_state_and_live_paths_are_integrated() {
let root = temp_root("acceptance-nip29-websocket");
let _ = std::fs::remove_dir_all(&root);