tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit aad1d6d864bf97cf6180f37983f2288e3acfd2a3
parent 0894b5b2d6d16379508dcdfd28ad08e230ae68c9
Author: triesap <tyson@radroots.org>
Date:   Sat,  6 Jun 2026 01:27:41 -0700

ws: enforce backpressure limits

Diffstat:
Mcrates/tangle_runtime/src/lib.rs | 71+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 69 insertions(+), 2 deletions(-)

diff --git a/crates/tangle_runtime/src/lib.rs b/crates/tangle_runtime/src/lib.rs @@ -14,8 +14,8 @@ use std::collections::BTreeSet; use tangle_core::{ AdmissionContext, AdmissionEffect, AuthChallengeState, EventValidator, FixedWindowRateLimiter, MarketplaceListingStatus, MarketplaceQuery, MarketplaceQueryError, MarketplaceQuerySpec, - MarketplaceSort, NostrFilterCompiler, QueryExecutionMode, RateLimitConfig, RuntimeLimits, - SubscriptionCloseOutcome, SubscriptionManager, SubscriptionMatcher, + MarketplaceSort, NostrFilterCompiler, QueryExecutionMode, RateLimitConfig, RateLimitDecision, + RuntimeLimits, SubscriptionCloseOutcome, SubscriptionManager, SubscriptionMatcher, }; use tangle_nips::{FulfillmentMethod, ListingUnit, parse_relay_auth_event}; use tangle_protocol::{ @@ -234,6 +234,35 @@ impl ClientMessageLoop { } pub fn handle_frame(&mut self, frame: ClientFrame) -> ClientFrameOutcome { + self.handle_frame_at(frame, UnixTimestamp::new(0)) + } + + pub fn handle_frame_at( + &mut self, + frame: ClientFrame, + now: UnixTimestamp, + ) -> ClientFrameOutcome { + if matches!(frame, ClientFrame::Text(_) | ClientFrame::Binary(_)) { + let key = self.connection.id().as_str().to_owned(); + match self + .connection + .rate_limiter_mut() + .check(&key, now, 1) + .unwrap_or(RateLimitDecision::Rejected { + retry_after_seconds: 0, + reset_at: now, + }) { + RateLimitDecision::Accepted { .. } => {} + RateLimitDecision::Rejected { + retry_after_seconds, + .. + } => { + return ClientFrameOutcome::Reject(RelayMessage::Notice(format!( + "rate-limited: retry after {retry_after_seconds} seconds" + ))); + } + } + } match frame { ClientFrame::Text(raw) => parse_client_message(&raw) .map(ClientFrameOutcome::Message) @@ -2056,6 +2085,44 @@ mod tests { ); } + #[test] + fn client_message_loop_enforces_backpressure_limits() { + let config = RelayConnectionConfig::new( + "wss://relay.radroots.test", + 300, + RateLimitConfig::new(2, 60).expect("rate limit"), + RuntimeLimits::default(), + ) + .expect("config"); + let connection = + RelayConnection::new(RelayConnectionId::new("backpressure").expect("id"), config); + let mut loop_state = ClientMessageLoop::new(connection); + let frame = || ClientFrame::Text(r#"["REQ","sub-a",{"kinds":[30402]}]"#.to_owned()); + + assert!(matches!( + loop_state.handle_frame_at(frame(), UnixTimestamp::new(100)), + ClientFrameOutcome::Message(ClientMessage::Req { .. }) + )); + assert!(matches!( + loop_state.handle_frame_at(frame(), UnixTimestamp::new(100)), + ClientFrameOutcome::Message(ClientMessage::Req { .. }) + )); + assert_eq!( + loop_state.handle_frame_at(frame(), UnixTimestamp::new(100)), + ClientFrameOutcome::Reject(RelayMessage::Notice( + "rate-limited: retry after 60 seconds".to_owned() + )) + ); + assert_eq!( + loop_state.handle_frame_at(ClientFrame::Ping(vec![1]), UnixTimestamp::new(100)), + ClientFrameOutcome::Ignore + ); + assert!(matches!( + loop_state.handle_frame_at(frame(), UnixTimestamp::new(160)), + ClientFrameOutcome::Message(ClientMessage::Req { .. }) + )); + } + #[tokio::test] async fn event_message_handler_stores_and_projects_authenticated_listing() { let store = runtime_memory_store().await;