commit 043977ca477ce6ac598f35885ad9b9a2588fab62
parent 66805d65a6c30d059378d58555c0c8fd471b205d
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 07:56:56 -0700
limits: apply query rate checks
- Add configured REQ and COUNT rate buckets for IP, connection, pubkey, group, kind, and broad queries.
- Carry peer IP and connection identity into WebSocket sessions and enforce query buckets before REQ subscription/query execution and COUNT handling.
- Cover query config parsing plus REQ/COUNT rejection paths with focused runtime/session tests.
- Validated with cargo fmt --all -- --check, cargo check --workspace --all-targets, cargo test --workspace, and cargo clippy --workspace --all-targets -- -D warnings.
Diffstat:
7 files changed, 1012 insertions(+), 32 deletions(-)
diff --git a/crates/tangle/tests/version.rs b/crates/tangle/tests/version.rs
@@ -151,6 +151,22 @@ fn tangle_run_starts_server_and_stays_alive_until_shutdown() {
"write_per_group": {"window_seconds": 60, "max_hits": 90},
"write_per_kind": {"window_seconds": 60, "max_hits": 300},
"join_flow": {"window_seconds": 300, "max_hits": 10}
+ },
+ "req": {
+ "per_ip": {"window_seconds": 60, "max_hits": 600},
+ "per_connection": {"window_seconds": 60, "max_hits": 120},
+ "per_pubkey": {"window_seconds": 60, "max_hits": 240},
+ "per_group": {"window_seconds": 60, "max_hits": 240},
+ "per_kind": {"window_seconds": 60, "max_hits": 500},
+ "broad": {"window_seconds": 60, "max_hits": 30}
+ },
+ "count": {
+ "per_ip": {"window_seconds": 60, "max_hits": 300},
+ "per_connection": {"window_seconds": 60, "max_hits": 60},
+ "per_pubkey": {"window_seconds": 60, "max_hits": 120},
+ "per_group": {"window_seconds": 60, "max_hits": 120},
+ "per_kind": {"window_seconds": 60, "max_hits": 240},
+ "broad": {"window_seconds": 60, "max_hits": 20}
}
}
})
diff --git a/crates/tangle_runtime/src/config.rs b/crates/tangle_runtime/src/config.rs
@@ -4,7 +4,7 @@ use crate::{
errors::BaseRelayError,
rate_limits::{
TangleAuthRateLimitConfig, TangleEventRateLimitConfig, TangleGroupRateLimitConfig,
- TangleRateLimitConfig, TangleRateLimitRule,
+ TangleQueryRateLimitConfig, TangleRateLimitConfig, TangleRateLimitRule,
},
relay::{
auth::BaseAuthState,
@@ -346,6 +346,8 @@ struct BaseRelayRateLimitsDocument {
auth: BaseRelayAuthRateLimitsDocument,
event: BaseRelayEventRateLimitsDocument,
group: BaseRelayGroupRateLimitsDocument,
+ req: BaseRelayQueryRateLimitsDocument,
+ count: BaseRelayQueryRateLimitsDocument,
}
#[derive(Debug, Deserialize)]
@@ -371,6 +373,17 @@ struct BaseRelayGroupRateLimitsDocument {
join_flow: BaseRelayRateLimitRuleDocument,
}
+#[derive(Debug, Deserialize)]
+#[serde(deny_unknown_fields)]
+struct BaseRelayQueryRateLimitsDocument {
+ per_ip: BaseRelayRateLimitRuleDocument,
+ per_connection: BaseRelayRateLimitRuleDocument,
+ per_pubkey: BaseRelayRateLimitRuleDocument,
+ per_group: BaseRelayRateLimitRuleDocument,
+ per_kind: BaseRelayRateLimitRuleDocument,
+ broad: BaseRelayRateLimitRuleDocument,
+}
+
#[derive(Debug, Clone, Copy, Deserialize)]
#[serde(deny_unknown_fields)]
struct BaseRelayRateLimitRuleDocument {
@@ -512,6 +525,31 @@ fn base_relay_rate_limits_from_document(
document.group.join_flow,
)?,
),
+ base_relay_query_rate_limits_from_document("rate_limits.req", document.req)?,
+ base_relay_query_rate_limits_from_document("rate_limits.count", document.count)?,
+ ))
+}
+
+fn base_relay_query_rate_limits_from_document(
+ field: &str,
+ document: BaseRelayQueryRateLimitsDocument,
+) -> Result<TangleQueryRateLimitConfig, BaseRelayError> {
+ Ok(TangleQueryRateLimitConfig::new(
+ base_relay_rate_limit_rule_from_document(&format!("{field}.per_ip"), document.per_ip)?,
+ base_relay_rate_limit_rule_from_document(
+ &format!("{field}.per_connection"),
+ document.per_connection,
+ )?,
+ base_relay_rate_limit_rule_from_document(
+ &format!("{field}.per_pubkey"),
+ document.per_pubkey,
+ )?,
+ base_relay_rate_limit_rule_from_document(
+ &format!("{field}.per_group"),
+ document.per_group,
+ )?,
+ base_relay_rate_limit_rule_from_document(&format!("{field}.per_kind"), document.per_kind)?,
+ base_relay_rate_limit_rule_from_document(&format!("{field}.broad"), document.broad)?,
))
}
@@ -599,6 +637,18 @@ mod tests {
300
);
assert_eq!(config.rate_limits().group().join_flow().max_hits(), 10);
+ assert_eq!(config.rate_limits().req().per_ip().max_hits(), 600);
+ assert_eq!(config.rate_limits().req().per_connection().max_hits(), 120);
+ assert_eq!(config.rate_limits().req().per_pubkey().max_hits(), 240);
+ assert_eq!(config.rate_limits().req().per_group().max_hits(), 240);
+ assert_eq!(config.rate_limits().req().per_kind().max_hits(), 500);
+ assert_eq!(config.rate_limits().req().broad().max_hits(), 30);
+ assert_eq!(config.rate_limits().count().per_ip().max_hits(), 300);
+ assert_eq!(config.rate_limits().count().per_connection().max_hits(), 60);
+ assert_eq!(config.rate_limits().count().per_pubkey().max_hits(), 120);
+ assert_eq!(config.rate_limits().count().per_group().max_hits(), 120);
+ assert_eq!(config.rate_limits().count().per_kind().max_hits(), 240);
+ assert_eq!(config.rate_limits().count().broad().max_hits(), 20);
assert!(config.tracing().enabled());
assert_eq!(config.tracing().format(), BaseRelayTracingFormat::Json);
config.auth_state().expect("auth");
@@ -651,6 +701,22 @@ mod tests {
"write_per_group": {"window_seconds": 60, "max_hits": 90},
"write_per_kind": {"window_seconds": 60, "max_hits": 300},
"join_flow": {"window_seconds": 300, "max_hits": 10}
+ },
+ "req": {
+ "per_ip": {"window_seconds": 60, "max_hits": 600},
+ "per_connection": {"window_seconds": 60, "max_hits": 120},
+ "per_pubkey": {"window_seconds": 60, "max_hits": 240},
+ "per_group": {"window_seconds": 60, "max_hits": 240},
+ "per_kind": {"window_seconds": 60, "max_hits": 500},
+ "broad": {"window_seconds": 60, "max_hits": 30}
+ },
+ "count": {
+ "per_ip": {"window_seconds": 60, "max_hits": 300},
+ "per_connection": {"window_seconds": 60, "max_hits": 60},
+ "per_pubkey": {"window_seconds": 60, "max_hits": 120},
+ "per_group": {"window_seconds": 60, "max_hits": 120},
+ "per_kind": {"window_seconds": 60, "max_hits": 240},
+ "broad": {"window_seconds": 60, "max_hits": 20}
}
}
}"#;
@@ -710,6 +776,22 @@ mod tests {
"write_per_group": {"window_seconds": 60, "max_hits": 90},
"write_per_kind": {"window_seconds": 60, "max_hits": 300},
"join_flow": {"window_seconds": 300, "max_hits": 10}
+ },
+ "req": {
+ "per_ip": {"window_seconds": 60, "max_hits": 600},
+ "per_connection": {"window_seconds": 60, "max_hits": 120},
+ "per_pubkey": {"window_seconds": 60, "max_hits": 240},
+ "per_group": {"window_seconds": 60, "max_hits": 240},
+ "per_kind": {"window_seconds": 60, "max_hits": 500},
+ "broad": {"window_seconds": 60, "max_hits": 30}
+ },
+ "count": {
+ "per_ip": {"window_seconds": 60, "max_hits": 300},
+ "per_connection": {"window_seconds": 60, "max_hits": 60},
+ "per_pubkey": {"window_seconds": 60, "max_hits": 120},
+ "per_group": {"window_seconds": 60, "max_hits": 120},
+ "per_kind": {"window_seconds": 60, "max_hits": 240},
+ "broad": {"window_seconds": 60, "max_hits": 20}
}
},
"ignored": true
@@ -767,6 +849,22 @@ mod tests {
"write_per_group": {"window_seconds": 60, "max_hits": 90},
"write_per_kind": {"window_seconds": 60, "max_hits": 300},
"join_flow": {"window_seconds": 300, "max_hits": 10}
+ },
+ "req": {
+ "per_ip": {"window_seconds": 60, "max_hits": 600},
+ "per_connection": {"window_seconds": 60, "max_hits": 120},
+ "per_pubkey": {"window_seconds": 60, "max_hits": 240},
+ "per_group": {"window_seconds": 60, "max_hits": 240},
+ "per_kind": {"window_seconds": 60, "max_hits": 500},
+ "broad": {"window_seconds": 60, "max_hits": 30}
+ },
+ "count": {
+ "per_ip": {"window_seconds": 60, "max_hits": 300},
+ "per_connection": {"window_seconds": 60, "max_hits": 60},
+ "per_pubkey": {"window_seconds": 60, "max_hits": 120},
+ "per_group": {"window_seconds": 60, "max_hits": 120},
+ "per_kind": {"window_seconds": 60, "max_hits": 240},
+ "broad": {"window_seconds": 60, "max_hits": 20}
}
}
}"#;
diff --git a/crates/tangle_runtime/src/rate_limits.rs b/crates/tangle_runtime/src/rate_limits.rs
@@ -44,6 +44,19 @@ pub enum TangleRateLimitKey {
group_id: GroupId,
pubkey: PublicKeyHex,
},
+ Connection {
+ scope: TangleRateLimitScope,
+ connection_id: u64,
+ },
+ QueryClass {
+ scope: TangleRateLimitScope,
+ class: TangleRateLimitQueryClass,
+ },
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub enum TangleRateLimitQueryClass {
+ Broad,
}
impl TangleRateLimitKey {
@@ -70,6 +83,17 @@ impl TangleRateLimitKey {
pub fn join_flow(group_id: GroupId, pubkey: PublicKeyHex) -> Self {
Self::JoinFlow { group_id, pubkey }
}
+
+ pub fn connection(scope: TangleRateLimitScope, connection_id: u64) -> Self {
+ Self::Connection {
+ scope,
+ connection_id,
+ }
+ }
+
+ pub fn query_class(scope: TangleRateLimitScope, class: TangleRateLimitQueryClass) -> Self {
+ Self::QueryClass { scope, class }
+ }
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -77,6 +101,8 @@ pub struct TangleRateLimitConfig {
auth: TangleAuthRateLimitConfig,
event: TangleEventRateLimitConfig,
group: TangleGroupRateLimitConfig,
+ req: TangleQueryRateLimitConfig,
+ count: TangleQueryRateLimitConfig,
}
impl TangleRateLimitConfig {
@@ -84,8 +110,16 @@ impl TangleRateLimitConfig {
auth: TangleAuthRateLimitConfig,
event: TangleEventRateLimitConfig,
group: TangleGroupRateLimitConfig,
+ req: TangleQueryRateLimitConfig,
+ count: TangleQueryRateLimitConfig,
) -> Self {
- Self { auth, event, group }
+ Self {
+ auth,
+ event,
+ group,
+ req,
+ count,
+ }
}
pub fn auth(self) -> TangleAuthRateLimitConfig {
@@ -99,6 +133,14 @@ impl TangleRateLimitConfig {
pub fn group(self) -> TangleGroupRateLimitConfig {
self.group
}
+
+ pub fn req(self) -> TangleQueryRateLimitConfig {
+ self.req
+ }
+
+ pub fn count(self) -> TangleQueryRateLimitConfig {
+ self.count
+ }
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -188,6 +230,60 @@ impl TangleGroupRateLimitConfig {
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct TangleQueryRateLimitConfig {
+ per_ip: TangleRateLimitRule,
+ per_connection: TangleRateLimitRule,
+ per_pubkey: TangleRateLimitRule,
+ per_group: TangleRateLimitRule,
+ per_kind: TangleRateLimitRule,
+ broad: TangleRateLimitRule,
+}
+
+impl TangleQueryRateLimitConfig {
+ pub fn new(
+ per_ip: TangleRateLimitRule,
+ per_connection: TangleRateLimitRule,
+ per_pubkey: TangleRateLimitRule,
+ per_group: TangleRateLimitRule,
+ per_kind: TangleRateLimitRule,
+ broad: TangleRateLimitRule,
+ ) -> Self {
+ Self {
+ per_ip,
+ per_connection,
+ per_pubkey,
+ per_group,
+ per_kind,
+ broad,
+ }
+ }
+
+ pub fn per_ip(self) -> TangleRateLimitRule {
+ self.per_ip
+ }
+
+ pub fn per_connection(self) -> TangleRateLimitRule {
+ self.per_connection
+ }
+
+ pub fn per_pubkey(self) -> TangleRateLimitRule {
+ self.per_pubkey
+ }
+
+ pub fn per_group(self) -> TangleRateLimitRule {
+ self.per_group
+ }
+
+ pub fn per_kind(self) -> TangleRateLimitRule {
+ self.per_kind
+ }
+
+ pub fn broad(self) -> TangleRateLimitRule {
+ self.broad
+ }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TangleRateLimitRule {
window_seconds: u64,
max_hits: u64,
@@ -339,8 +435,9 @@ fn reset_at(rule: TangleRateLimitRule, now: UnixTimestamp) -> UnixTimestamp {
mod tests {
use super::{
TangleAuthRateLimitConfig, TangleEventRateLimitConfig, TangleGroupRateLimitConfig,
- TangleRateLimitConfig, TangleRateLimitDecision, TangleRateLimitKey, TangleRateLimitRule,
- TangleRateLimitScope, TangleRateLimiter,
+ TangleQueryRateLimitConfig, TangleRateLimitConfig, TangleRateLimitDecision,
+ TangleRateLimitKey, TangleRateLimitQueryClass, TangleRateLimitRule, TangleRateLimitScope,
+ TangleRateLimiter,
};
use std::net::{IpAddr, Ipv4Addr};
use tangle_groups::GroupId;
@@ -377,6 +474,11 @@ mod tests {
TangleRateLimitKey::kind(TangleRateLimitScope::Event, kind),
TangleRateLimitKey::auth_failure(Some(ip), Some(pubkey.clone())),
TangleRateLimitKey::join_flow(group_id, pubkey),
+ TangleRateLimitKey::connection(TangleRateLimitScope::Req, 42),
+ TangleRateLimitKey::query_class(
+ TangleRateLimitScope::Count,
+ TangleRateLimitQueryClass::Broad,
+ ),
];
for key in keys {
@@ -393,7 +495,7 @@ mod tests {
);
assert_eq!(limiter.hits(&key), 1);
}
- assert_eq!(limiter.tracked_key_count(), 6);
+ assert_eq!(limiter.tracked_key_count(), 8);
}
#[test]
@@ -505,10 +607,38 @@ mod tests {
let group_write = TangleRateLimitRule::new(60, 7).expect("group write");
let group_kind = TangleRateLimitRule::new(60, 8).expect("group kind");
let group_join = TangleRateLimitRule::new(300, 9).expect("group join");
+ let req_ip = TangleRateLimitRule::new(60, 10).expect("req ip");
+ let req_connection = TangleRateLimitRule::new(60, 11).expect("req connection");
+ let req_pubkey = TangleRateLimitRule::new(60, 12).expect("req pubkey");
+ let req_group = TangleRateLimitRule::new(60, 13).expect("req group");
+ let req_kind = TangleRateLimitRule::new(60, 14).expect("req kind");
+ let req_broad = TangleRateLimitRule::new(60, 15).expect("req broad");
+ let count_ip = TangleRateLimitRule::new(60, 16).expect("count ip");
+ let count_connection = TangleRateLimitRule::new(60, 17).expect("count connection");
+ let count_pubkey = TangleRateLimitRule::new(60, 18).expect("count pubkey");
+ let count_group = TangleRateLimitRule::new(60, 19).expect("count group");
+ let count_kind = TangleRateLimitRule::new(60, 20).expect("count kind");
+ let count_broad = TangleRateLimitRule::new(60, 21).expect("count broad");
let config = TangleRateLimitConfig::new(
TangleAuthRateLimitConfig::new(auth_pubkey, auth_failures),
TangleEventRateLimitConfig::new(event_pubkey, event_kind),
TangleGroupRateLimitConfig::new(group_pubkey, group_write, group_kind, group_join),
+ TangleQueryRateLimitConfig::new(
+ req_ip,
+ req_connection,
+ req_pubkey,
+ req_group,
+ req_kind,
+ req_broad,
+ ),
+ TangleQueryRateLimitConfig::new(
+ count_ip,
+ count_connection,
+ count_pubkey,
+ count_group,
+ count_kind,
+ count_broad,
+ ),
);
assert_eq!(config.auth().per_pubkey(), auth_pubkey);
@@ -519,5 +649,17 @@ mod tests {
assert_eq!(config.group().write_per_group(), group_write);
assert_eq!(config.group().write_per_kind(), group_kind);
assert_eq!(config.group().join_flow(), group_join);
+ assert_eq!(config.req().per_ip(), req_ip);
+ assert_eq!(config.req().per_connection(), req_connection);
+ assert_eq!(config.req().per_pubkey(), req_pubkey);
+ assert_eq!(config.req().per_group(), req_group);
+ assert_eq!(config.req().per_kind(), req_kind);
+ assert_eq!(config.req().broad(), req_broad);
+ assert_eq!(config.count().per_ip(), count_ip);
+ assert_eq!(config.count().per_connection(), count_connection);
+ assert_eq!(config.count().per_pubkey(), count_pubkey);
+ assert_eq!(config.count().per_group(), count_group);
+ assert_eq!(config.count().per_kind(), count_kind);
+ assert_eq!(config.count().broad(), count_broad);
}
}
diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs
@@ -6,8 +6,8 @@ use crate::{
event_bus::{TangleEventBus, TangleEventReceiver},
ops::BaseRelayReadinessState,
rate_limits::{
- TangleRateLimitDecision, TangleRateLimitKey, TangleRateLimitRule, TangleRateLimitScope,
- TangleRateLimiter,
+ TangleQueryRateLimitConfig, TangleRateLimitDecision, TangleRateLimitKey,
+ TangleRateLimitQueryClass, TangleRateLimitRule, TangleRateLimitScope, TangleRateLimiter,
},
relay::{
auth::BaseAuthState,
@@ -16,15 +16,21 @@ use crate::{
},
};
use std::{
+ collections::BTreeSet,
fmt,
+ net::IpAddr,
sync::{
Arc,
atomic::{AtomicU64, AtomicUsize, Ordering},
},
time::Instant,
};
-use tangle_groups::{KIND_GROUP_JOIN_REQUEST, StoreOffset, validate_client_group_event_structure};
-use tangle_protocol::{ClientMessage, Event, Filter, RelayMessage, SubscriptionId, UnixTimestamp};
+use tangle_groups::{
+ GroupId, KIND_GROUP_JOIN_REQUEST, StoreOffset, validate_client_group_event_structure,
+};
+use tangle_protocol::{
+ ClientMessage, Event, Filter, Kind, RelayMessage, SubscriptionId, UnixTimestamp,
+};
use tokio::sync::{Mutex, watch};
pub struct TangleRuntime {
@@ -38,6 +44,32 @@ pub struct TangleRuntime {
shutdown: TangleShutdownSignal,
}
+#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
+pub struct TangleQueryRateLimitContext {
+ peer_ip: Option<IpAddr>,
+ connection_id: Option<u64>,
+}
+
+impl TangleQueryRateLimitContext {
+ pub fn new(peer_ip: Option<IpAddr>, connection_id: Option<u64>) -> Self {
+ Self {
+ peer_ip,
+ connection_id,
+ }
+ }
+}
+
+struct TangleQueryRateLimitRequest<'a> {
+ scope: TangleRateLimitScope,
+ rules: TangleQueryRateLimitConfig,
+ label: &'static str,
+ subscription_id: &'a SubscriptionId,
+ filters: &'a [Filter],
+ auth: &'a BaseAuthState,
+ context: TangleQueryRateLimitContext,
+ now: UnixTimestamp,
+}
+
impl TangleRuntime {
pub fn open(config: BaseRelayRuntimeConfig) -> Result<Self, BaseRelayError> {
let limits = TangleRuntimeLimits::from_config(&config)?;
@@ -198,6 +230,143 @@ impl TangleRuntime {
)
}
+ fn rate_limit_req(
+ &self,
+ subscription_id: &SubscriptionId,
+ filters: &[Filter],
+ auth: &BaseAuthState,
+ context: TangleQueryRateLimitContext,
+ now: UnixTimestamp,
+ ) -> Option<RelayMessage> {
+ self.rate_limit_query(TangleQueryRateLimitRequest {
+ scope: TangleRateLimitScope::Req,
+ rules: self.config.rate_limits().req(),
+ label: "req",
+ subscription_id,
+ filters,
+ auth,
+ context,
+ now,
+ })
+ }
+
+ fn rate_limit_count(
+ &self,
+ subscription_id: &SubscriptionId,
+ filters: &[Filter],
+ auth: &BaseAuthState,
+ context: TangleQueryRateLimitContext,
+ now: UnixTimestamp,
+ ) -> Option<RelayMessage> {
+ self.rate_limit_query(TangleQueryRateLimitRequest {
+ scope: TangleRateLimitScope::Count,
+ rules: self.config.rate_limits().count(),
+ label: "count",
+ subscription_id,
+ filters,
+ auth,
+ context,
+ now,
+ })
+ }
+
+ fn rate_limit_query(&self, request: TangleQueryRateLimitRequest<'_>) -> Option<RelayMessage> {
+ if let Some(peer_ip) = request.context.peer_ip
+ && let Some(message) = self.rate_limit_closed(
+ request.subscription_id,
+ TangleRateLimitKey::ip(request.scope, peer_ip),
+ request.rules.per_ip(),
+ request.label,
+ "ip",
+ request.now,
+ )
+ {
+ return Some(message);
+ }
+ if let Some(connection_id) = request.context.connection_id
+ && let Some(message) = self.rate_limit_closed(
+ request.subscription_id,
+ TangleRateLimitKey::connection(request.scope, connection_id),
+ request.rules.per_connection(),
+ request.label,
+ "connection",
+ request.now,
+ )
+ {
+ return Some(message);
+ }
+ for pubkey in request.auth.authenticated_pubkeys() {
+ if let Some(message) = self.rate_limit_closed(
+ request.subscription_id,
+ TangleRateLimitKey::pubkey(request.scope, pubkey.clone()),
+ request.rules.per_pubkey(),
+ request.label,
+ "pubkey",
+ request.now,
+ ) {
+ return Some(message);
+ }
+ }
+ for group_id in filter_group_ids(request.filters) {
+ if let Some(message) = self.rate_limit_closed(
+ request.subscription_id,
+ TangleRateLimitKey::group(request.scope, group_id),
+ request.rules.per_group(),
+ request.label,
+ "group",
+ request.now,
+ ) {
+ return Some(message);
+ }
+ }
+ for kind in filter_kinds(request.filters) {
+ if let Some(message) = self.rate_limit_closed(
+ request.subscription_id,
+ TangleRateLimitKey::kind(request.scope, kind),
+ request.rules.per_kind(),
+ request.label,
+ "kind",
+ request.now,
+ ) {
+ return Some(message);
+ }
+ }
+ if query_is_broad(request.filters)
+ && let Some(message) = self.rate_limit_closed(
+ request.subscription_id,
+ TangleRateLimitKey::query_class(request.scope, TangleRateLimitQueryClass::Broad),
+ request.rules.broad(),
+ request.label,
+ "broad",
+ request.now,
+ )
+ {
+ return Some(message);
+ }
+ None
+ }
+
+ fn rate_limit_closed(
+ &self,
+ subscription_id: &SubscriptionId,
+ key: TangleRateLimitKey,
+ rule: TangleRateLimitRule,
+ label: &'static str,
+ dimension: &'static str,
+ now: UnixTimestamp,
+ ) -> Option<RelayMessage> {
+ match self.rate_limiter.record(key, rule, now) {
+ TangleRateLimitDecision::Allowed { .. } => None,
+ TangleRateLimitDecision::Rejected { reset_at } => Some(RelayMessage::Closed {
+ subscription_id: subscription_id.clone(),
+ message: BaseRelayError::rate_limited(format!(
+ "{label} {dimension} rate limit exceeded until {reset_at}"
+ ))
+ .prefixed_message(),
+ }),
+ }
+ }
+
fn rate_limit_ok(
&self,
event: &Event,
@@ -242,6 +411,22 @@ impl TangleRuntimeHandle {
auth: &mut BaseAuthState,
now: UnixTimestamp,
) -> Result<Vec<RelayMessage>, BaseRelayError> {
+ self.handle_client_message_with_query_context(
+ message,
+ auth,
+ TangleQueryRateLimitContext::default(),
+ now,
+ )
+ .await
+ }
+
+ pub async fn handle_client_message_with_query_context(
+ &self,
+ message: ClientMessage,
+ auth: &mut BaseAuthState,
+ query_context: TangleQueryRateLimitContext,
+ now: UnixTimestamp,
+ ) -> Result<Vec<RelayMessage>, BaseRelayError> {
let mut runtime = self.inner.lock().await;
match message {
ClientMessage::Event(event) => {
@@ -260,6 +445,58 @@ impl TangleRuntimeHandle {
}
Ok(vec![result.into_message()])
}
+ ClientMessage::Req {
+ subscription_id,
+ filters,
+ } => {
+ runtime
+ .limits()
+ .base_relay_limits()
+ .validate_subscription_id(&subscription_id)?;
+ runtime
+ .limits()
+ .base_relay_limits()
+ .validate_filters(&filters)?;
+ if let Some(message) =
+ runtime.rate_limit_req(&subscription_id, &filters, auth, query_context, now)
+ {
+ return Ok(vec![message]);
+ }
+ runtime.relay_mut().handle_client_message(
+ ClientMessage::Req {
+ subscription_id,
+ filters,
+ },
+ auth,
+ now,
+ )
+ }
+ ClientMessage::Count {
+ subscription_id,
+ filters,
+ } => {
+ runtime
+ .limits()
+ .base_relay_limits()
+ .validate_subscription_id(&subscription_id)?;
+ runtime
+ .limits()
+ .base_relay_limits()
+ .validate_filters(&filters)?;
+ if let Some(message) =
+ runtime.rate_limit_count(&subscription_id, &filters, auth, query_context, now)
+ {
+ return Ok(vec![message]);
+ }
+ runtime.relay_mut().handle_client_message(
+ ClientMessage::Count {
+ subscription_id,
+ filters,
+ },
+ auth,
+ now,
+ )
+ }
ClientMessage::Auth(event) => {
if let Err(error) = runtime.limits().base_relay_limits().validate_event(&event) {
return Ok(vec![RelayMessage::Ok {
@@ -298,6 +535,20 @@ impl TangleRuntimeHandle {
self.inner.lock().await.rate_limiter().clone()
}
+ pub async fn rate_limit_req(
+ &self,
+ subscription_id: &SubscriptionId,
+ filters: &[Filter],
+ auth: &BaseAuthState,
+ query_context: TangleQueryRateLimitContext,
+ now: UnixTimestamp,
+ ) -> Option<RelayMessage> {
+ self.inner
+ .lock()
+ .await
+ .rate_limit_req(subscription_id, filters, auth, query_context, now)
+ }
+
pub(crate) async fn query_req_with_auth(
&self,
subscription_id: SubscriptionId,
@@ -352,6 +603,38 @@ fn auth_response_failed(replies: &[RelayMessage]) -> bool {
})
}
+fn filter_group_ids(filters: &[Filter]) -> Vec<GroupId> {
+ filters
+ .iter()
+ .flat_map(|filter| filter.tag_filters())
+ .filter(|(name, _)| matches!(name.as_str(), "h" | "d"))
+ .flat_map(|(_, values)| values)
+ .filter_map(|value| GroupId::new(value.as_str()).ok())
+ .collect::<BTreeSet<_>>()
+ .into_iter()
+ .collect()
+}
+
+fn filter_kinds(filters: &[Filter]) -> Vec<Kind> {
+ filters
+ .iter()
+ .flat_map(Filter::kinds)
+ .copied()
+ .collect::<BTreeSet<_>>()
+ .into_iter()
+ .collect()
+}
+
+fn query_is_broad(filters: &[Filter]) -> bool {
+ filters.iter().any(|filter| {
+ filter.ids().is_empty()
+ && filter.authors().is_empty()
+ && filter.kinds().is_empty()
+ && filter.tag_filters().is_empty()
+ && filter.search().is_none()
+ })
+}
+
impl fmt::Debug for TangleRuntimeHandle {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.write_str("TangleRuntimeHandle")
@@ -516,20 +799,25 @@ impl Default for TangleShutdownSignal {
#[cfg(test)]
mod tests {
- use super::{TangleRuntime, TangleRuntimeHandle, TangleRuntimeLimits};
+ use super::{
+ TangleQueryRateLimitContext, TangleRuntime, TangleRuntimeHandle, TangleRuntimeLimits,
+ };
use crate::config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json};
use crate::event_bus::{TangleEventBus, TangleEventReceiveError};
- use crate::rate_limits::{TangleRateLimitKey, TangleRateLimitScope};
+ use crate::rate_limits::{TangleRateLimitKey, TangleRateLimitQueryClass, TangleRateLimitScope};
use crate::relay::core::{BaseRelayLimitSettings, BaseRelayLimits};
use crate::relay::live::LiveSubscriptionSet;
use serde_json::json;
- use std::path::{Path, PathBuf};
+ use std::{
+ net::{IpAddr, Ipv4Addr},
+ path::{Path, PathBuf},
+ };
use tangle_groups::{
GroupAuthContext, GroupId, KIND_GROUP_ADMINS, KIND_GROUP_JOIN_REQUEST, KIND_GROUP_METADATA,
StoreOffset,
};
use tangle_protocol::{
- ClientMessage, RelayMessage, SubscriptionId, Tag, UnixTimestamp, filter_from_value,
+ ClientMessage, Kind, RelayMessage, SubscriptionId, Tag, UnixTimestamp, filter_from_value,
};
use tangle_test_support::{
FixtureKey, tangle_v2_auth_event, tangle_v2_event, tangle_v2_group_create_event,
@@ -1004,6 +1292,265 @@ mod tests {
}
#[tokio::test]
+ async fn runtime_rate_limits_req_authenticated_pubkeys() {
+ let root = temp_root("runtime-req-pubkey-rate-limit");
+ let _ = std::fs::remove_dir_all(&root);
+ let runtime = TangleRuntime::open(runtime_config(&root, 8)).expect("runtime");
+ let rule = runtime.config().rate_limits().req().per_pubkey();
+ let handle = TangleRuntimeHandle::new(runtime);
+ let mut auth = handle.auth_state().await.expect("auth");
+ auth.issue_challenge("challenge-a", UnixTimestamp::new(100))
+ .expect("challenge");
+ let auth_event =
+ tangle_v2_auth_event(FixtureKey::Member, "challenge-a", 120).expect("auth event");
+
+ assert_eq!(
+ handle
+ .handle_client_message(
+ ClientMessage::Auth(auth_event.clone()),
+ &mut auth,
+ UnixTimestamp::new(120)
+ )
+ .await
+ .expect("auth"),
+ vec![RelayMessage::Ok {
+ event_id: auth_event.id().clone(),
+ accepted: true,
+ message: String::new()
+ }]
+ );
+ let key =
+ TangleRateLimitKey::pubkey(TangleRateLimitScope::Req, FixtureKey::Member.public_key());
+ let limiter = handle.rate_limiter().await;
+ for _ in 0..rule.max_hits() {
+ limiter.record(key.clone(), rule, UnixTimestamp::new(120));
+ }
+ let subscription_id = SubscriptionId::new("limited-req-pubkey").expect("subscription");
+ let filters = vec![filter_from_value(&json!({"limit": 1})).expect("filter")];
+
+ assert_eq!(
+ handle
+ .handle_client_message(
+ ClientMessage::Req {
+ subscription_id: subscription_id.clone(),
+ filters
+ },
+ &mut auth,
+ UnixTimestamp::new(120)
+ )
+ .await
+ .expect("req"),
+ vec![RelayMessage::Closed {
+ subscription_id,
+ message: "rate-limited: req pubkey rate limit exceeded until 180".to_owned()
+ }]
+ );
+
+ let _ = std::fs::remove_dir_all(root);
+ }
+
+ #[tokio::test]
+ async fn runtime_rate_limits_req_connections() {
+ let root = temp_root("runtime-req-connection-rate-limit");
+ let _ = std::fs::remove_dir_all(&root);
+ let runtime = TangleRuntime::open(runtime_config(&root, 8)).expect("runtime");
+ let rule = runtime.config().rate_limits().req().per_connection();
+ let key = TangleRateLimitKey::connection(TangleRateLimitScope::Req, 77);
+ for _ in 0..rule.max_hits() {
+ runtime
+ .rate_limiter()
+ .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433));
+ }
+ let handle = TangleRuntimeHandle::new(runtime);
+ let mut auth = handle.auth_state().await.expect("auth");
+ let subscription_id = SubscriptionId::new("limited-req-connection").expect("subscription");
+ let filters = vec![filter_from_value(&json!({"kinds": [1], "limit": 1})).expect("filter")];
+
+ assert_eq!(
+ handle
+ .handle_client_message_with_query_context(
+ ClientMessage::Req {
+ subscription_id: subscription_id.clone(),
+ filters
+ },
+ &mut auth,
+ TangleQueryRateLimitContext::new(None, Some(77)),
+ UnixTimestamp::new(1_714_124_433)
+ )
+ .await
+ .expect("req"),
+ vec![RelayMessage::Closed {
+ subscription_id,
+ message: "rate-limited: req connection rate limit exceeded until 1714124493"
+ .to_owned()
+ }]
+ );
+
+ let _ = std::fs::remove_dir_all(root);
+ }
+
+ #[tokio::test]
+ async fn runtime_rate_limits_req_filter_groups() {
+ let root = temp_root("runtime-req-group-rate-limit");
+ let _ = std::fs::remove_dir_all(&root);
+ let runtime = TangleRuntime::open(runtime_config(&root, 8)).expect("runtime");
+ let group_id = GroupId::new("Farm").expect("group");
+ let rule = runtime.config().rate_limits().req().per_group();
+ let key = TangleRateLimitKey::group(TangleRateLimitScope::Req, group_id);
+ for _ in 0..rule.max_hits() {
+ runtime
+ .rate_limiter()
+ .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433));
+ }
+ let handle = TangleRuntimeHandle::new(runtime);
+ let mut auth = handle.auth_state().await.expect("auth");
+ let subscription_id = SubscriptionId::new("limited-req-group").expect("subscription");
+ let filters =
+ vec![filter_from_value(&json!({"#h": ["Farm"], "limit": 1})).expect("filter")];
+
+ assert_eq!(
+ handle
+ .handle_client_message(
+ ClientMessage::Req {
+ subscription_id: subscription_id.clone(),
+ filters
+ },
+ &mut auth,
+ UnixTimestamp::new(1_714_124_433)
+ )
+ .await
+ .expect("req"),
+ vec![RelayMessage::Closed {
+ subscription_id,
+ message: "rate-limited: req group rate limit exceeded until 1714124493".to_owned()
+ }]
+ );
+
+ let _ = std::fs::remove_dir_all(root);
+ }
+
+ #[tokio::test]
+ async fn runtime_rate_limits_count_peer_ips() {
+ let root = temp_root("runtime-count-ip-rate-limit");
+ let _ = std::fs::remove_dir_all(&root);
+ let runtime = TangleRuntime::open(runtime_config(&root, 8)).expect("runtime");
+ let rule = runtime.config().rate_limits().count().per_ip();
+ let peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 9));
+ let key = TangleRateLimitKey::ip(TangleRateLimitScope::Count, peer_ip);
+ for _ in 0..rule.max_hits() {
+ runtime
+ .rate_limiter()
+ .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433));
+ }
+ let handle = TangleRuntimeHandle::new(runtime);
+ let mut auth = handle.auth_state().await.expect("auth");
+ let subscription_id = SubscriptionId::new("limited-count-ip").expect("subscription");
+ let filters = vec![filter_from_value(&json!({"kinds": [1], "limit": 1})).expect("filter")];
+
+ assert_eq!(
+ handle
+ .handle_client_message_with_query_context(
+ ClientMessage::Count {
+ subscription_id: subscription_id.clone(),
+ filters
+ },
+ &mut auth,
+ TangleQueryRateLimitContext::new(Some(peer_ip), None),
+ UnixTimestamp::new(1_714_124_433)
+ )
+ .await
+ .expect("count"),
+ vec![RelayMessage::Closed {
+ subscription_id,
+ message: "rate-limited: count ip rate limit exceeded until 1714124493".to_owned()
+ }]
+ );
+
+ let _ = std::fs::remove_dir_all(root);
+ }
+
+ #[tokio::test]
+ async fn runtime_rate_limits_count_filter_kinds() {
+ let root = temp_root("runtime-count-kind-rate-limit");
+ let _ = std::fs::remove_dir_all(&root);
+ let runtime = TangleRuntime::open(runtime_config(&root, 8)).expect("runtime");
+ let kind = Kind::new(1).expect("kind");
+ let rule = runtime.config().rate_limits().count().per_kind();
+ let key = TangleRateLimitKey::kind(TangleRateLimitScope::Count, kind);
+ for _ in 0..rule.max_hits() {
+ runtime
+ .rate_limiter()
+ .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433));
+ }
+ let handle = TangleRuntimeHandle::new(runtime);
+ let mut auth = handle.auth_state().await.expect("auth");
+ let subscription_id = SubscriptionId::new("limited-count-kind").expect("subscription");
+ let filters = vec![filter_from_value(&json!({"kinds": [1], "limit": 1})).expect("filter")];
+
+ assert_eq!(
+ handle
+ .handle_client_message(
+ ClientMessage::Count {
+ subscription_id: subscription_id.clone(),
+ filters
+ },
+ &mut auth,
+ UnixTimestamp::new(1_714_124_433)
+ )
+ .await
+ .expect("count"),
+ vec![RelayMessage::Closed {
+ subscription_id,
+ message: "rate-limited: count kind rate limit exceeded until 1714124493".to_owned()
+ }]
+ );
+
+ let _ = std::fs::remove_dir_all(root);
+ }
+
+ #[tokio::test]
+ async fn runtime_rate_limits_count_broad_queries() {
+ let root = temp_root("runtime-count-broad-rate-limit");
+ let _ = std::fs::remove_dir_all(&root);
+ let runtime = TangleRuntime::open(runtime_config(&root, 8)).expect("runtime");
+ let rule = runtime.config().rate_limits().count().broad();
+ let key = TangleRateLimitKey::query_class(
+ TangleRateLimitScope::Count,
+ TangleRateLimitQueryClass::Broad,
+ );
+ for _ in 0..rule.max_hits() {
+ runtime
+ .rate_limiter()
+ .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433));
+ }
+ let handle = TangleRuntimeHandle::new(runtime);
+ let mut auth = handle.auth_state().await.expect("auth");
+ let subscription_id = SubscriptionId::new("limited-count-broad").expect("subscription");
+ let filters = vec![filter_from_value(&json!({"limit": 1})).expect("filter")];
+
+ assert_eq!(
+ handle
+ .handle_client_message(
+ ClientMessage::Count {
+ subscription_id: subscription_id.clone(),
+ filters
+ },
+ &mut auth,
+ UnixTimestamp::new(1_714_124_433)
+ )
+ .await
+ .expect("count"),
+ vec![RelayMessage::Closed {
+ subscription_id,
+ message: "rate-limited: count broad rate limit exceeded until 1714124493"
+ .to_owned()
+ }]
+ );
+
+ let _ = std::fs::remove_dir_all(root);
+ }
+
+ #[tokio::test]
async fn runtime_publishes_generated_group_event_offsets_for_live_fanout() {
let root = temp_root("runtime-generated-offset-fanout");
let _ = std::fs::remove_dir_all(&root);
@@ -1140,6 +1687,22 @@ mod tests {
"write_per_group": {"window_seconds": 60, "max_hits": 90},
"write_per_kind": {"window_seconds": 60, "max_hits": 300},
"join_flow": {"window_seconds": 300, "max_hits": 10}
+ },
+ "req": {
+ "per_ip": {"window_seconds": 60, "max_hits": 600},
+ "per_connection": {"window_seconds": 60, "max_hits": 120},
+ "per_pubkey": {"window_seconds": 60, "max_hits": 240},
+ "per_group": {"window_seconds": 60, "max_hits": 240},
+ "per_kind": {"window_seconds": 60, "max_hits": 500},
+ "broad": {"window_seconds": 60, "max_hits": 30}
+ },
+ "count": {
+ "per_ip": {"window_seconds": 60, "max_hits": 300},
+ "per_connection": {"window_seconds": 60, "max_hits": 60},
+ "per_pubkey": {"window_seconds": 60, "max_hits": 120},
+ "per_group": {"window_seconds": 60, "max_hits": 120},
+ "per_kind": {"window_seconds": 60, "max_hits": 240},
+ "broad": {"window_seconds": 60, "max_hits": 20}
}
}
})
diff --git a/crates/tangle_runtime/src/server.rs b/crates/tangle_runtime/src/server.rs
@@ -10,7 +10,7 @@ use crate::{
use axum::{
Router,
extract::{
- State,
+ ConnectInfo, State,
ws::{WebSocketUpgrade, rejection::WebSocketUpgradeRejection},
},
response::{IntoResponse, Response},
@@ -73,19 +73,22 @@ pub async fn serve_listener_until_shutdown(
runtime.clone(),
);
let mut shutdown = shutdown_signal.subscribe();
- axum::serve(listener, router)
- .with_graceful_shutdown(async move {
- loop {
- if *shutdown.borrow() {
- break;
- }
- if shutdown.changed().await.is_err() {
- break;
- }
+ axum::serve(
+ listener,
+ router.into_make_service_with_connect_info::<SocketAddr>(),
+ )
+ .with_graceful_shutdown(async move {
+ loop {
+ if *shutdown.borrow() {
+ break;
}
- })
- .await
- .map_err(|error| BaseRelayError::error(error.to_string()))?;
+ if shutdown.changed().await.is_err() {
+ break;
+ }
+ }
+ })
+ .await
+ .map_err(|error| BaseRelayError::error(error.to_string()))?;
let shutdown = runtime.shutdown().await?;
Ok(TangleServeReport::new(
listen_addr,
@@ -121,18 +124,20 @@ struct TangleHttpState {
async fn tangle_root(
State(state): State<TangleHttpState>,
+ ConnectInfo(peer_addr): ConnectInfo<SocketAddr>,
websocket: Result<WebSocketUpgrade, WebSocketUpgradeRejection>,
headers: HeaderMap,
) -> Response {
match websocket {
Ok(websocket) => {
let session = match state.runtime.auth_state().await {
- Ok(auth) => TangleWebSocketSession::new(
+ Ok(auth) => TangleWebSocketSession::new_with_peer(
state.limits,
state.shutdown.subscribe(),
state.runtime.clone(),
auth,
state.runtime.subscribe_events().await,
+ Some(peer_addr.ip()),
),
Err(error) => Err(error),
};
@@ -161,11 +166,12 @@ mod tests {
ops::BaseRelayReadinessState,
runtime::{TangleRuntime, TangleRuntimeHandle, TangleShutdownSignal},
};
- use axum::body::to_bytes;
+ use axum::{body::to_bytes, extract::ConnectInfo};
use futures_util::{SinkExt, StreamExt};
use http::{Request, header};
use serde_json::json;
use std::{
+ net::SocketAddr,
path::{Path, PathBuf},
time::{SystemTime, UNIX_EPOCH},
};
@@ -395,6 +401,7 @@ mod tests {
Request::builder()
.uri("/")
.header(header::ACCEPT, "application/nostr+json")
+ .extension(ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 39_000))))
.body(axum::body::Body::empty())
.expect("request"),
)
@@ -405,6 +412,7 @@ mod tests {
.oneshot(
Request::builder()
.uri("/")
+ .extension(ConnectInfo(SocketAddr::from(([127, 0, 0, 1], 39_001))))
.body(axum::body::Body::empty())
.expect("request"),
)
@@ -506,6 +514,22 @@ mod tests {
"write_per_group": {"window_seconds": 60, "max_hits": 90},
"write_per_kind": {"window_seconds": 60, "max_hits": 300},
"join_flow": {"window_seconds": 300, "max_hits": 10}
+ },
+ "req": {
+ "per_ip": {"window_seconds": 60, "max_hits": 600},
+ "per_connection": {"window_seconds": 60, "max_hits": 120},
+ "per_pubkey": {"window_seconds": 60, "max_hits": 240},
+ "per_group": {"window_seconds": 60, "max_hits": 240},
+ "per_kind": {"window_seconds": 60, "max_hits": 500},
+ "broad": {"window_seconds": 60, "max_hits": 30}
+ },
+ "count": {
+ "per_ip": {"window_seconds": 60, "max_hits": 300},
+ "per_connection": {"window_seconds": 60, "max_hits": 60},
+ "per_pubkey": {"window_seconds": 60, "max_hits": 120},
+ "per_group": {"window_seconds": 60, "max_hits": 120},
+ "per_kind": {"window_seconds": 60, "max_hits": 240},
+ "broad": {"window_seconds": 60, "max_hits": 20}
}
}
})
diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs
@@ -7,10 +7,14 @@ use crate::{
auth::{BaseAuthState, generate_auth_challenge},
live::LiveSubscriptionSet,
},
- runtime::{TangleRuntimeHandle, TangleRuntimeLimits},
+ runtime::{TangleQueryRateLimitContext, TangleRuntimeHandle, TangleRuntimeLimits},
};
use axum::extract::ws::{CloseFrame, Message, Utf8Bytes, WebSocket};
-use std::time::{Instant, SystemTime, UNIX_EPOCH};
+use std::{
+ net::IpAddr,
+ sync::atomic::{AtomicU64, Ordering},
+ time::{Instant, SystemTime, UNIX_EPOCH},
+};
use tangle_groups::GroupAuthContext;
use tangle_protocol::{
ClientMessage, Filter, RelayMessage, SubscriptionId, UnixTimestamp, parse_client_message,
@@ -19,6 +23,8 @@ use tokio::sync::{mpsc, watch};
#[derive(Debug)]
pub struct TangleWebSocketSession {
+ connection_id: u64,
+ peer_ip: Option<IpAddr>,
connected_at: Instant,
outbound: TangleOutboundSender,
outbound_receiver: mpsc::Receiver<Message>,
@@ -30,6 +36,8 @@ pub struct TangleWebSocketSession {
events: TangleEventReceiver,
}
+static NEXT_TANGLE_CONNECTION_ID: AtomicU64 = AtomicU64::new(1);
+
impl TangleWebSocketSession {
pub fn new(
limits: TangleRuntimeLimits,
@@ -38,6 +46,17 @@ impl TangleWebSocketSession {
auth: BaseAuthState,
events: TangleEventReceiver,
) -> Result<Self, BaseRelayError> {
+ Self::new_with_peer(limits, shutdown, runtime, auth, events, None)
+ }
+
+ pub fn new_with_peer(
+ limits: TangleRuntimeLimits,
+ shutdown: watch::Receiver<bool>,
+ runtime: TangleRuntimeHandle,
+ auth: BaseAuthState,
+ events: TangleEventReceiver,
+ peer_ip: Option<IpAddr>,
+ ) -> Result<Self, BaseRelayError> {
let outbound_queue_capacity = limits.outbound_queue_capacity();
let (sender, receiver) = mpsc::channel(outbound_queue_capacity);
let subscriptions = LiveSubscriptionSet::new(
@@ -45,6 +64,8 @@ impl TangleWebSocketSession {
limits.base_relay_limits().max_subscriptions(),
)?;
Ok(Self {
+ connection_id: NEXT_TANGLE_CONNECTION_ID.fetch_add(1, Ordering::Relaxed),
+ peer_ip,
connected_at: Instant::now(),
outbound: TangleOutboundSender {
sender,
@@ -219,6 +240,23 @@ impl TangleWebSocketSession {
subscription_id,
filters,
} => self.handle_req(subscription_id, filters).await,
+ ClientMessage::Count {
+ subscription_id,
+ filters,
+ } => {
+ let context = self.query_rate_limit_context();
+ self.runtime
+ .handle_client_message_with_query_context(
+ ClientMessage::Count {
+ subscription_id,
+ filters,
+ },
+ &mut self.auth,
+ context,
+ current_unix_timestamp(),
+ )
+ .await
+ }
ClientMessage::Close(subscription_id) => {
self.limits
.base_relay_limits()
@@ -243,6 +281,19 @@ impl TangleWebSocketSession {
.base_relay_limits()
.validate_subscription_id(&subscription_id)?;
self.limits.base_relay_limits().validate_filters(&filters)?;
+ if let Some(message) = self
+ .runtime
+ .rate_limit_req(
+ &subscription_id,
+ &filters,
+ &self.auth,
+ self.query_rate_limit_context(),
+ current_unix_timestamp(),
+ )
+ .await
+ {
+ return Ok(vec![message]);
+ }
self.subscriptions.subscribe(
subscription_id.clone(),
filters.clone(),
@@ -261,6 +312,10 @@ impl TangleWebSocketSession {
}
}
+ fn query_rate_limit_context(&self) -> TangleQueryRateLimitContext {
+ TangleQueryRateLimitContext::new(self.peer_ip, Some(self.connection_id))
+ }
+
fn send_relay_message(&self, message: RelayMessage) -> Result<(), TangleOutboundQueueError> {
self.outbound
.try_send(Message::Text(message.encode().into()))
@@ -325,12 +380,13 @@ fn current_unix_timestamp() -> UnixTimestamp {
mod tests {
use super::{
TangleOutboundQueueError, TangleSessionControl, TangleWebSocketSession,
- event_stream_lag_close_message,
+ current_unix_timestamp, event_stream_lag_close_message,
};
use crate::{
config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json},
errors::BaseRelayError,
event_bus::TangleEventReceiver,
+ rate_limits::{TangleRateLimitKey, TangleRateLimitScope},
relay::core::{BaseRelayLimitSettings, BaseRelayLimits},
runtime::{TangleRuntime, TangleRuntimeHandle, TangleRuntimeLimits, TangleShutdownSignal},
};
@@ -338,7 +394,7 @@ mod tests {
use serde_json::json;
use std::path::{Path, PathBuf};
use tangle_groups::StoreOffset;
- use tangle_protocol::{ClientMessage, Filter, RelayMessage, SubscriptionId};
+ use tangle_protocol::{ClientMessage, Filter, RelayMessage, SubscriptionId, filter_from_value};
#[test]
fn websocket_session_records_connection_time() {
@@ -485,6 +541,55 @@ mod tests {
}
#[tokio::test]
+ async fn websocket_session_rate_limited_req_does_not_subscribe() {
+ let shutdown = TangleShutdownSignal::new();
+ let root = temp_root("rate-limited-req");
+ let _ = std::fs::remove_dir_all(&root);
+ let runtime = TangleRuntime::open(runtime_config(&root)).expect("runtime");
+ let rule = runtime.config().rate_limits().req().per_connection();
+ let runtime = TangleRuntimeHandle::new(runtime);
+ let auth = runtime.auth_state().await.expect("auth");
+ let events = runtime.subscribe_events().await;
+ let now = current_unix_timestamp();
+ let mut session = TangleWebSocketSession::new(
+ session_limits(8),
+ shutdown.subscribe(),
+ runtime.clone(),
+ auth,
+ events,
+ )
+ .expect("session");
+ let key = TangleRateLimitKey::connection(TangleRateLimitScope::Req, session.connection_id);
+ let limiter = runtime.rate_limiter().await;
+ for _ in 0..rule.max_hits() {
+ limiter.record(key.clone(), rule, now);
+ }
+ let subscription_id = SubscriptionId::new("limited").expect("subscription");
+
+ assert_eq!(
+ session
+ .handle_client_message(ClientMessage::Req {
+ subscription_id: subscription_id.clone(),
+ filters: vec![
+ filter_from_value(&json!({"kinds": [1], "limit": 1})).expect("filter")
+ ]
+ })
+ .await
+ .expect("req"),
+ vec![RelayMessage::Closed {
+ subscription_id,
+ message: format!(
+ "rate-limited: req connection rate limit exceeded until {}",
+ now.as_u64() + 60
+ )
+ }]
+ );
+ assert_eq!(session.active_subscription_count(), 0);
+
+ let _ = std::fs::remove_dir_all(root);
+ }
+
+ #[tokio::test]
async fn websocket_session_closes_when_event_receiver_lags() {
let shutdown = TangleShutdownSignal::new();
let root = temp_root("event-receiver-lag");
@@ -614,6 +719,22 @@ mod tests {
"write_per_group": {"window_seconds": 60, "max_hits": 90},
"write_per_kind": {"window_seconds": 60, "max_hits": 300},
"join_flow": {"window_seconds": 300, "max_hits": 10}
+ },
+ "req": {
+ "per_ip": {"window_seconds": 60, "max_hits": 600},
+ "per_connection": {"window_seconds": 60, "max_hits": 120},
+ "per_pubkey": {"window_seconds": 60, "max_hits": 240},
+ "per_group": {"window_seconds": 60, "max_hits": 240},
+ "per_kind": {"window_seconds": 60, "max_hits": 500},
+ "broad": {"window_seconds": 60, "max_hits": 30}
+ },
+ "count": {
+ "per_ip": {"window_seconds": 60, "max_hits": 300},
+ "per_connection": {"window_seconds": 60, "max_hits": 60},
+ "per_pubkey": {"window_seconds": 60, "max_hits": 120},
+ "per_group": {"window_seconds": 60, "max_hits": 120},
+ "per_kind": {"window_seconds": 60, "max_hits": 240},
+ "broad": {"window_seconds": 60, "max_hits": 20}
}
}
})
diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs
@@ -584,6 +584,22 @@ fn runtime_config(root: &Path, listen_addr: SocketAddr) -> BaseRelayRuntimeConfi
"write_per_group": {"window_seconds": 60, "max_hits": 90},
"write_per_kind": {"window_seconds": 60, "max_hits": 300},
"join_flow": {"window_seconds": 300, "max_hits": 10}
+ },
+ "req": {
+ "per_ip": {"window_seconds": 60, "max_hits": 600},
+ "per_connection": {"window_seconds": 60, "max_hits": 120},
+ "per_pubkey": {"window_seconds": 60, "max_hits": 240},
+ "per_group": {"window_seconds": 60, "max_hits": 240},
+ "per_kind": {"window_seconds": 60, "max_hits": 500},
+ "broad": {"window_seconds": 60, "max_hits": 30}
+ },
+ "count": {
+ "per_ip": {"window_seconds": 60, "max_hits": 300},
+ "per_connection": {"window_seconds": 60, "max_hits": 60},
+ "per_pubkey": {"window_seconds": 60, "max_hits": 120},
+ "per_group": {"window_seconds": 60, "max_hits": 120},
+ "per_kind": {"window_seconds": 60, "max_hits": 240},
+ "broad": {"window_seconds": 60, "max_hits": 20}
}
}
})