tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 80f7feb411ef94c9f08cac71f0c3813576fbce20
parent 32b02ea45824890518f482f77e6b87e4d39bd984
Author: triesap <tyson@radroots.org>
Date:   Sun, 14 Jun 2026 13:26:26 -0700

rate-limits: add ip-scoped client buckets

- Add required auth, event, group write, and group join IP rate-limit config rules.
- Carry peer context through the general client-message handler and WebSocket session path.
- Enforce IP buckets before narrower pubkey, kind, group, and join-flow buckets.
- Cover IP partitioning, rotation resistance, required config fields, and updated fixture configs.

Diffstat:
Mcrates/tangle/tests/version.rs | 9+++++++--
Mcrates/tangle_runtime/src/config.rs | 118+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----
Mcrates/tangle_runtime/src/nip11.rs | 9+++++++--
Mcrates/tangle_runtime/src/rate_limits.rs | 115++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------------
Mcrates/tangle_runtime/src/runtime.rs | 431++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------
Mcrates/tangle_runtime/src/server.rs | 9+++++++--
Mcrates/tangle_runtime/src/session.rs | 29++++++++++++++++++++---------
Mcrates/tangle_runtime/tests/base_relay_v2.rs | 9+++++++--
Mcrates/tangle_runtime/tests/ops_truthfulness.rs | 9+++++++--
Mcrates/tangle_runtime/tests/phase2_acceptance_targets.rs | 9+++++++--
10 files changed, 659 insertions(+), 88 deletions(-)

diff --git a/crates/tangle/tests/version.rs b/crates/tangle/tests/version.rs @@ -140,18 +140,23 @@ fn tangle_run_starts_server_and_stays_alive_until_shutdown() { }, "rate_limits": { "auth": { + "per_ip": {"window_seconds": 60, "max_hits": 120}, "per_pubkey": {"window_seconds": 60, "max_hits": 30}, - "failures": {"window_seconds": 300, "max_hits": 5} + "failures": {"window_seconds": 300, "max_hits": 5}, + "failures_per_ip": {"window_seconds": 300, "max_hits": 20} }, "event": { + "per_ip": {"window_seconds": 60, "max_hits": 600}, "per_pubkey": {"window_seconds": 60, "max_hits": 120}, "per_kind": {"window_seconds": 60, "max_hits": 1000} }, "group": { + "write_per_ip": {"window_seconds": 60, "max_hits": 300}, "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, "write_per_group": {"window_seconds": 60, "max_hits": 90}, "write_per_kind": {"window_seconds": 60, "max_hits": 300}, - "join_flow": {"window_seconds": 300, "max_hits": 10} + "join_flow": {"window_seconds": 300, "max_hits": 10}, + "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30} }, "req": { "per_ip": {"window_seconds": 60, "max_hits": 600}, diff --git a/crates/tangle_runtime/src/config.rs b/crates/tangle_runtime/src/config.rs @@ -354,13 +354,16 @@ struct BaseRelayRateLimitsDocument { #[derive(Debug, Deserialize)] #[serde(deny_unknown_fields)] struct BaseRelayAuthRateLimitsDocument { + per_ip: BaseRelayRateLimitRuleDocument, per_pubkey: BaseRelayRateLimitRuleDocument, failures: BaseRelayRateLimitRuleDocument, + failures_per_ip: BaseRelayRateLimitRuleDocument, } #[derive(Debug, Deserialize)] #[serde(deny_unknown_fields)] struct BaseRelayEventRateLimitsDocument { + per_ip: BaseRelayRateLimitRuleDocument, per_pubkey: BaseRelayRateLimitRuleDocument, per_kind: BaseRelayRateLimitRuleDocument, } @@ -368,10 +371,12 @@ struct BaseRelayEventRateLimitsDocument { #[derive(Debug, Deserialize)] #[serde(deny_unknown_fields)] struct BaseRelayGroupRateLimitsDocument { + write_per_ip: BaseRelayRateLimitRuleDocument, write_per_pubkey: BaseRelayRateLimitRuleDocument, write_per_group: BaseRelayRateLimitRuleDocument, write_per_kind: BaseRelayRateLimitRuleDocument, join_flow: BaseRelayRateLimitRuleDocument, + join_flow_per_ip: BaseRelayRateLimitRuleDocument, } #[derive(Debug, Deserialize)] @@ -490,6 +495,10 @@ fn base_relay_rate_limits_from_document( Ok(TangleRateLimitConfig::new( TangleAuthRateLimitConfig::new( base_relay_rate_limit_rule_from_document( + "rate_limits.auth.per_ip", + document.auth.per_ip, + )?, + base_relay_rate_limit_rule_from_document( "rate_limits.auth.per_pubkey", document.auth.per_pubkey, )?, @@ -497,9 +506,17 @@ fn base_relay_rate_limits_from_document( "rate_limits.auth.failures", document.auth.failures, )?, + base_relay_rate_limit_rule_from_document( + "rate_limits.auth.failures_per_ip", + document.auth.failures_per_ip, + )?, ), TangleEventRateLimitConfig::new( base_relay_rate_limit_rule_from_document( + "rate_limits.event.per_ip", + document.event.per_ip, + )?, + base_relay_rate_limit_rule_from_document( "rate_limits.event.per_pubkey", document.event.per_pubkey, )?, @@ -510,6 +527,10 @@ fn base_relay_rate_limits_from_document( ), TangleGroupRateLimitConfig::new( base_relay_rate_limit_rule_from_document( + "rate_limits.group.write_per_ip", + document.group.write_per_ip, + )?, + base_relay_rate_limit_rule_from_document( "rate_limits.group.write_per_pubkey", document.group.write_per_pubkey, )?, @@ -525,6 +546,10 @@ fn base_relay_rate_limits_from_document( "rate_limits.group.join_flow", document.group.join_flow, )?, + base_relay_rate_limit_rule_from_document( + "rate_limits.group.join_flow_per_ip", + document.group.join_flow_per_ip, + )?, ), base_relay_query_rate_limits_from_document("rate_limits.req", document.req)?, base_relay_query_rate_limits_from_document("rate_limits.count", document.count)?, @@ -624,10 +649,14 @@ mod tests { assert_eq!(config.limits().max_content_length(), 65_536); assert_eq!(config.limits().broadcast_channel_capacity(), 4_096); assert_eq!(config.limits().per_connection_outbound_queue(), 256); + assert_eq!(config.rate_limits().auth().per_ip().max_hits(), 120); assert_eq!(config.rate_limits().auth().per_pubkey().max_hits(), 30); assert_eq!(config.rate_limits().auth().failures().max_hits(), 5); + assert_eq!(config.rate_limits().auth().failures_per_ip().max_hits(), 20); + assert_eq!(config.rate_limits().event().per_ip().max_hits(), 600); assert_eq!(config.rate_limits().event().per_pubkey().max_hits(), 120); assert_eq!(config.rate_limits().event().per_kind().max_hits(), 1_000); + assert_eq!(config.rate_limits().group().write_per_ip().max_hits(), 300); assert_eq!( config.rate_limits().group().write_per_pubkey().max_hits(), 60 @@ -641,6 +670,10 @@ mod tests { 300 ); assert_eq!(config.rate_limits().group().join_flow().max_hits(), 10); + assert_eq!( + config.rate_limits().group().join_flow_per_ip().max_hits(), + 30 + ); assert_eq!(config.rate_limits().req().per_ip().max_hits(), 600); assert_eq!(config.rate_limits().req().per_connection().max_hits(), 120); assert_eq!(config.rate_limits().req().per_pubkey().max_hits(), 240); @@ -694,18 +727,23 @@ mod tests { }, "rate_limits": { "auth": { + "per_ip": {"window_seconds": 60, "max_hits": 120}, "per_pubkey": {"window_seconds": 60, "max_hits": 30}, - "failures": {"window_seconds": 300, "max_hits": 5} + "failures": {"window_seconds": 300, "max_hits": 5}, + "failures_per_ip": {"window_seconds": 300, "max_hits": 20} }, "event": { + "per_ip": {"window_seconds": 60, "max_hits": 600}, "per_pubkey": {"window_seconds": 60, "max_hits": 120}, "per_kind": {"window_seconds": 60, "max_hits": 1000} }, "group": { + "write_per_ip": {"window_seconds": 60, "max_hits": 300}, "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, "write_per_group": {"window_seconds": 60, "max_hits": 90}, "write_per_kind": {"window_seconds": 60, "max_hits": 300}, - "join_flow": {"window_seconds": 300, "max_hits": 10} + "join_flow": {"window_seconds": 300, "max_hits": 10}, + "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30} }, "req": { "per_ip": {"window_seconds": 60, "max_hits": 600}, @@ -770,18 +808,23 @@ mod tests { }, "rate_limits": { "auth": { + "per_ip": {"window_seconds": 60, "max_hits": 120}, "per_pubkey": {"window_seconds": 60, "max_hits": 30}, - "failures": {"window_seconds": 300, "max_hits": 5} + "failures": {"window_seconds": 300, "max_hits": 5}, + "failures_per_ip": {"window_seconds": 300, "max_hits": 20} }, "event": { + "per_ip": {"window_seconds": 60, "max_hits": 600}, "per_pubkey": {"window_seconds": 60, "max_hits": 120}, "per_kind": {"window_seconds": 60, "max_hits": 1000} }, "group": { + "write_per_ip": {"window_seconds": 60, "max_hits": 300}, "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, "write_per_group": {"window_seconds": 60, "max_hits": 90}, "write_per_kind": {"window_seconds": 60, "max_hits": 300}, - "join_flow": {"window_seconds": 300, "max_hits": 10} + "join_flow": {"window_seconds": 300, "max_hits": 10}, + "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30} }, "req": { "per_ip": {"window_seconds": 60, "max_hits": 600}, @@ -844,18 +887,23 @@ mod tests { }, "rate_limits": { "auth": { + "per_ip": {"window_seconds": 60, "max_hits": 120}, "per_pubkey": {"window_seconds": 60, "max_hits": 30}, - "failures": {"window_seconds": 300, "max_hits": 5} + "failures": {"window_seconds": 300, "max_hits": 5}, + "failures_per_ip": {"window_seconds": 300, "max_hits": 20} }, "event": { + "per_ip": {"window_seconds": 60, "max_hits": 600}, "per_pubkey": {"window_seconds": 60, "max_hits": 120}, "per_kind": {"window_seconds": 60, "max_hits": 1000} }, "group": { + "write_per_ip": {"window_seconds": 60, "max_hits": 300}, "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, "write_per_group": {"window_seconds": 60, "max_hits": 90}, "write_per_kind": {"window_seconds": 60, "max_hits": 300}, - "join_flow": {"window_seconds": 300, "max_hits": 10} + "join_flow": {"window_seconds": 300, "max_hits": 10}, + "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30} }, "req": { "per_ip": {"window_seconds": 60, "max_hits": 600}, @@ -894,4 +942,62 @@ mod tests { .contains("missing field `max_query_complexity`") ); } + + #[test] + fn base_relay_runtime_config_requires_ip_scoped_rate_limits() { + let raw = include_str!("../../../ops/production/tangle-v2.example.json").replace( + " \"per_ip\": {\n \"window_seconds\": 60,\n \"max_hits\": 120\n },\n", + "", + ); + assert!( + parse_base_relay_runtime_config_json(&raw) + .expect_err("missing auth ip") + .prefixed_message() + .contains("missing field `per_ip`") + ); + + let raw = include_str!("../../../ops/production/tangle-v2.example.json").replace( + " \"failures\": {\n \"window_seconds\": 300,\n \"max_hits\": 5\n },\n \"failures_per_ip\": {\n \"window_seconds\": 300,\n \"max_hits\": 20\n }\n", + " \"failures\": {\n \"window_seconds\": 300,\n \"max_hits\": 5\n }\n", + ); + assert!( + parse_base_relay_runtime_config_json(&raw) + .expect_err("missing auth failure ip") + .prefixed_message() + .contains("missing field `failures_per_ip`") + ); + + let raw = include_str!("../../../ops/production/tangle-v2.example.json").replace( + " \"per_ip\": {\n \"window_seconds\": 60,\n \"max_hits\": 600\n },\n", + "", + ); + assert!( + parse_base_relay_runtime_config_json(&raw) + .expect_err("missing event ip") + .prefixed_message() + .contains("missing field `per_ip`") + ); + + let raw = include_str!("../../../ops/production/tangle-v2.example.json").replace( + " \"write_per_ip\": {\n \"window_seconds\": 60,\n \"max_hits\": 300\n },\n", + "", + ); + assert!( + parse_base_relay_runtime_config_json(&raw) + .expect_err("missing group write ip") + .prefixed_message() + .contains("missing field `write_per_ip`") + ); + + let raw = include_str!("../../../ops/production/tangle-v2.example.json").replace( + " \"join_flow\": {\n \"window_seconds\": 300,\n \"max_hits\": 10\n },\n \"join_flow_per_ip\": {\n \"window_seconds\": 300,\n \"max_hits\": 30\n }\n", + " \"join_flow\": {\n \"window_seconds\": 300,\n \"max_hits\": 10\n }\n", + ); + assert!( + parse_base_relay_runtime_config_json(&raw) + .expect_err("missing group join ip") + .prefixed_message() + .contains("missing field `join_flow_per_ip`") + ); + } } diff --git a/crates/tangle_runtime/src/nip11.rs b/crates/tangle_runtime/src/nip11.rs @@ -415,18 +415,23 @@ mod tests { }, "rate_limits": { "auth": { + "per_ip": {"window_seconds": 60, "max_hits": 120}, "per_pubkey": {"window_seconds": 60, "max_hits": 30}, - "failures": {"window_seconds": 300, "max_hits": 5} + "failures": {"window_seconds": 300, "max_hits": 5}, + "failures_per_ip": {"window_seconds": 300, "max_hits": 20} }, "event": { + "per_ip": {"window_seconds": 60, "max_hits": 600}, "per_pubkey": {"window_seconds": 60, "max_hits": 120}, "per_kind": {"window_seconds": 60, "max_hits": 1000} }, "group": { + "write_per_ip": {"window_seconds": 60, "max_hits": 300}, "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, "write_per_group": {"window_seconds": 60, "max_hits": 90}, "write_per_kind": {"window_seconds": 60, "max_hits": 300}, - "join_flow": {"window_seconds": 300, "max_hits": 10} + "join_flow": {"window_seconds": 300, "max_hits": 10}, + "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30} }, "req": { "per_ip": {"window_seconds": 60, "max_hits": 600}, diff --git a/crates/tangle_runtime/src/rate_limits.rs b/crates/tangle_runtime/src/rate_limits.rs @@ -40,6 +40,10 @@ pub enum TangleRateLimitKey { ip: Option<IpAddr>, pubkey: Option<PublicKeyHex>, }, + JoinFlowIp { + group_id: GroupId, + ip: IpAddr, + }, JoinFlow { group_id: GroupId, pubkey: PublicKeyHex, @@ -84,6 +88,10 @@ impl TangleRateLimitKey { Self::JoinFlow { group_id, pubkey } } + pub fn join_flow_ip(group_id: GroupId, ip: IpAddr) -> Self { + Self::JoinFlowIp { group_id, ip } + } + pub fn connection(scope: TangleRateLimitScope, connection_id: u64) -> Self { Self::Connection { scope, @@ -145,18 +153,31 @@ impl TangleRateLimitConfig { #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct TangleAuthRateLimitConfig { + per_ip: TangleRateLimitRule, per_pubkey: TangleRateLimitRule, failures: TangleRateLimitRule, + failures_per_ip: TangleRateLimitRule, } impl TangleAuthRateLimitConfig { - pub fn new(per_pubkey: TangleRateLimitRule, failures: TangleRateLimitRule) -> Self { + pub fn new( + per_ip: TangleRateLimitRule, + per_pubkey: TangleRateLimitRule, + failures: TangleRateLimitRule, + failures_per_ip: TangleRateLimitRule, + ) -> Self { Self { + per_ip, per_pubkey, failures, + failures_per_ip, } } + pub fn per_ip(self) -> TangleRateLimitRule { + self.per_ip + } + pub fn per_pubkey(self) -> TangleRateLimitRule { self.per_pubkey } @@ -164,22 +185,36 @@ impl TangleAuthRateLimitConfig { pub fn failures(self) -> TangleRateLimitRule { self.failures } + + pub fn failures_per_ip(self) -> TangleRateLimitRule { + self.failures_per_ip + } } #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct TangleEventRateLimitConfig { + per_ip: TangleRateLimitRule, per_pubkey: TangleRateLimitRule, per_kind: TangleRateLimitRule, } impl TangleEventRateLimitConfig { - pub fn new(per_pubkey: TangleRateLimitRule, per_kind: TangleRateLimitRule) -> Self { + pub fn new( + per_ip: TangleRateLimitRule, + per_pubkey: TangleRateLimitRule, + per_kind: TangleRateLimitRule, + ) -> Self { Self { + per_ip, per_pubkey, per_kind, } } + pub fn per_ip(self) -> TangleRateLimitRule { + self.per_ip + } + pub fn per_pubkey(self) -> TangleRateLimitRule { self.per_pubkey } @@ -191,27 +226,37 @@ impl TangleEventRateLimitConfig { #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct TangleGroupRateLimitConfig { + write_per_ip: TangleRateLimitRule, write_per_pubkey: TangleRateLimitRule, write_per_group: TangleRateLimitRule, write_per_kind: TangleRateLimitRule, join_flow: TangleRateLimitRule, + join_flow_per_ip: TangleRateLimitRule, } impl TangleGroupRateLimitConfig { pub fn new( + write_per_ip: TangleRateLimitRule, write_per_pubkey: TangleRateLimitRule, write_per_group: TangleRateLimitRule, write_per_kind: TangleRateLimitRule, join_flow: TangleRateLimitRule, + join_flow_per_ip: TangleRateLimitRule, ) -> Self { Self { + write_per_ip, write_per_pubkey, write_per_group, write_per_kind, join_flow, + join_flow_per_ip, } } + pub fn write_per_ip(self) -> TangleRateLimitRule { + self.write_per_ip + } + pub fn write_per_pubkey(self) -> TangleRateLimitRule { self.write_per_pubkey } @@ -227,6 +272,10 @@ impl TangleGroupRateLimitConfig { pub fn join_flow(self) -> TangleRateLimitRule { self.join_flow } + + pub fn join_flow_per_ip(self) -> TangleRateLimitRule { + self.join_flow_per_ip + } } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -473,6 +522,7 @@ mod tests { TangleRateLimitKey::group(TangleRateLimitScope::GroupWrite, group_id.clone()), TangleRateLimitKey::kind(TangleRateLimitScope::Event, kind), TangleRateLimitKey::auth_failure(Some(ip), Some(pubkey.clone())), + TangleRateLimitKey::join_flow_ip(group_id.clone(), ip), TangleRateLimitKey::join_flow(group_id, pubkey), TangleRateLimitKey::connection(TangleRateLimitScope::Req, 42), TangleRateLimitKey::query_class( @@ -495,7 +545,7 @@ mod tests { ); assert_eq!(limiter.hits(&key), 1); } - assert_eq!(limiter.tracked_key_count(), 8); + assert_eq!(limiter.tracked_key_count(), 9); } #[test] @@ -601,28 +651,40 @@ mod tests { fn rate_limit_config_exposes_auth_and_event_rules() { let auth_pubkey = TangleRateLimitRule::new(60, 2).expect("auth pubkey"); let auth_failures = TangleRateLimitRule::new(300, 3).expect("auth failures"); - let event_pubkey = TangleRateLimitRule::new(60, 4).expect("event pubkey"); - let event_kind = TangleRateLimitRule::new(60, 5).expect("event kind"); - let group_pubkey = TangleRateLimitRule::new(60, 6).expect("group pubkey"); - let group_write = TangleRateLimitRule::new(60, 7).expect("group write"); - let group_kind = TangleRateLimitRule::new(60, 8).expect("group kind"); - let group_join = TangleRateLimitRule::new(300, 9).expect("group join"); - let req_ip = TangleRateLimitRule::new(60, 10).expect("req ip"); - let req_connection = TangleRateLimitRule::new(60, 11).expect("req connection"); - let req_pubkey = TangleRateLimitRule::new(60, 12).expect("req pubkey"); - let req_group = TangleRateLimitRule::new(60, 13).expect("req group"); - let req_kind = TangleRateLimitRule::new(60, 14).expect("req kind"); - let req_broad = TangleRateLimitRule::new(60, 15).expect("req broad"); - let count_ip = TangleRateLimitRule::new(60, 16).expect("count ip"); - let count_connection = TangleRateLimitRule::new(60, 17).expect("count connection"); - let count_pubkey = TangleRateLimitRule::new(60, 18).expect("count pubkey"); - let count_group = TangleRateLimitRule::new(60, 19).expect("count group"); - let count_kind = TangleRateLimitRule::new(60, 20).expect("count kind"); - let count_broad = TangleRateLimitRule::new(60, 21).expect("count broad"); + let auth_ip = TangleRateLimitRule::new(60, 4).expect("auth ip"); + let auth_failures_ip = TangleRateLimitRule::new(300, 5).expect("auth failures ip"); + let event_ip = TangleRateLimitRule::new(60, 6).expect("event ip"); + let event_pubkey = TangleRateLimitRule::new(60, 7).expect("event pubkey"); + let event_kind = TangleRateLimitRule::new(60, 8).expect("event kind"); + let group_ip = TangleRateLimitRule::new(60, 9).expect("group ip"); + let group_pubkey = TangleRateLimitRule::new(60, 10).expect("group pubkey"); + let group_write = TangleRateLimitRule::new(60, 11).expect("group write"); + let group_kind = TangleRateLimitRule::new(60, 12).expect("group kind"); + let group_join = TangleRateLimitRule::new(300, 13).expect("group join"); + let group_join_ip = TangleRateLimitRule::new(300, 14).expect("group join ip"); + let req_ip = TangleRateLimitRule::new(60, 15).expect("req ip"); + let req_connection = TangleRateLimitRule::new(60, 16).expect("req connection"); + let req_pubkey = TangleRateLimitRule::new(60, 17).expect("req pubkey"); + let req_group = TangleRateLimitRule::new(60, 18).expect("req group"); + let req_kind = TangleRateLimitRule::new(60, 19).expect("req kind"); + let req_broad = TangleRateLimitRule::new(60, 20).expect("req broad"); + let count_ip = TangleRateLimitRule::new(60, 21).expect("count ip"); + let count_connection = TangleRateLimitRule::new(60, 22).expect("count connection"); + let count_pubkey = TangleRateLimitRule::new(60, 23).expect("count pubkey"); + let count_group = TangleRateLimitRule::new(60, 24).expect("count group"); + let count_kind = TangleRateLimitRule::new(60, 25).expect("count kind"); + let count_broad = TangleRateLimitRule::new(60, 26).expect("count broad"); let config = TangleRateLimitConfig::new( - TangleAuthRateLimitConfig::new(auth_pubkey, auth_failures), - TangleEventRateLimitConfig::new(event_pubkey, event_kind), - TangleGroupRateLimitConfig::new(group_pubkey, group_write, group_kind, group_join), + TangleAuthRateLimitConfig::new(auth_ip, auth_pubkey, auth_failures, auth_failures_ip), + TangleEventRateLimitConfig::new(event_ip, event_pubkey, event_kind), + TangleGroupRateLimitConfig::new( + group_ip, + group_pubkey, + group_write, + group_kind, + group_join, + group_join_ip, + ), TangleQueryRateLimitConfig::new( req_ip, req_connection, @@ -641,14 +703,19 @@ mod tests { ), ); + assert_eq!(config.auth().per_ip(), auth_ip); assert_eq!(config.auth().per_pubkey(), auth_pubkey); assert_eq!(config.auth().failures(), auth_failures); + assert_eq!(config.auth().failures_per_ip(), auth_failures_ip); + assert_eq!(config.event().per_ip(), event_ip); assert_eq!(config.event().per_pubkey(), event_pubkey); assert_eq!(config.event().per_kind(), event_kind); + assert_eq!(config.group().write_per_ip(), group_ip); assert_eq!(config.group().write_per_pubkey(), group_pubkey); assert_eq!(config.group().write_per_group(), group_write); assert_eq!(config.group().write_per_kind(), group_kind); assert_eq!(config.group().join_flow(), group_join); + assert_eq!(config.group().join_flow_per_ip(), group_join_ip); assert_eq!(config.req().per_ip(), req_ip); assert_eq!(config.req().per_connection(), req_connection); assert_eq!(config.req().per_pubkey(), req_pubkey); diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs @@ -49,12 +49,12 @@ pub struct TangleRuntime { } #[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] -pub struct TangleQueryRateLimitContext { +pub struct TangleClientRateLimitContext { peer_ip: Option<IpAddr>, connection_id: Option<u64>, } -impl TangleQueryRateLimitContext { +impl TangleClientRateLimitContext { pub fn new(peer_ip: Option<IpAddr>, connection_id: Option<u64>) -> Self { Self { peer_ip, @@ -70,7 +70,7 @@ struct TangleQueryRateLimitRequest<'a> { subscription_id: &'a SubscriptionId, filters: &'a [Filter], auth: &'a BaseAuthState, - context: TangleQueryRateLimitContext, + context: TangleClientRateLimitContext, now: UnixTimestamp, } @@ -149,8 +149,24 @@ impl TangleRuntime { self.relay.shutdown() } - fn rate_limit_event(&self, event: &Event, now: UnixTimestamp) -> Option<RelayMessage> { + fn rate_limit_event( + &self, + event: &Event, + context: TangleClientRateLimitContext, + now: UnixTimestamp, + ) -> Option<RelayMessage> { let rules = self.config.rate_limits().event(); + if let Some(peer_ip) = context.peer_ip + && let Some(message) = self.rate_limit_ok( + event, + TangleRateLimitKey::ip(TangleRateLimitScope::Event, peer_ip), + rules.per_ip(), + "event ip", + now, + ) + { + return Some(message); + } self.rate_limit_ok( event, TangleRateLimitKey::pubkey( @@ -172,8 +188,24 @@ impl TangleRuntime { }) } - fn rate_limit_auth_attempt(&self, event: &Event, now: UnixTimestamp) -> Option<RelayMessage> { + fn rate_limit_auth_attempt( + &self, + event: &Event, + context: TangleClientRateLimitContext, + now: UnixTimestamp, + ) -> Option<RelayMessage> { let rules = self.config.rate_limits().auth(); + if let Some(peer_ip) = context.peer_ip + && let Some(message) = self.rate_limit_ok( + event, + TangleRateLimitKey::ip(TangleRateLimitScope::Auth, peer_ip), + rules.per_ip(), + "auth ip", + now, + ) + { + return Some(message); + } self.rate_limit_ok( event, TangleRateLimitKey::pubkey( @@ -186,8 +218,24 @@ impl TangleRuntime { ) } - fn rate_limit_auth_failure(&self, event: &Event, now: UnixTimestamp) -> Option<RelayMessage> { + fn rate_limit_auth_failure( + &self, + event: &Event, + context: TangleClientRateLimitContext, + now: UnixTimestamp, + ) -> Option<RelayMessage> { let rules = self.config.rate_limits().auth(); + if let Some(peer_ip) = context.peer_ip + && let Some(message) = self.rate_limit_ok( + event, + TangleRateLimitKey::auth_failure(Some(peer_ip), None), + rules.failures_per_ip(), + "auth failure ip", + now, + ) + { + return Some(message); + } self.rate_limit_ok( event, TangleRateLimitKey::auth_failure(None, Some(event.unsigned().pubkey().clone())), @@ -197,7 +245,12 @@ impl TangleRuntime { ) } - fn rate_limit_group_write(&self, event: &Event, now: UnixTimestamp) -> Option<RelayMessage> { + fn rate_limit_group_write( + &self, + event: &Event, + context: TangleClientRateLimitContext, + now: UnixTimestamp, + ) -> Option<RelayMessage> { if !self.config.groups().enabled() { return None; } @@ -205,13 +258,35 @@ impl TangleRuntime { validate_client_group_event_structure(event, self.config.groups().limits()).ok()?; let group_id = class.group_id()?.clone(); let rules = self.config.rate_limits().group(); - if event.unsigned().kind().as_u32() == KIND_GROUP_JOIN_REQUEST - && let Some(message) = self.rate_limit_ok( + if event.unsigned().kind().as_u32() == KIND_GROUP_JOIN_REQUEST { + if let Some(peer_ip) = context.peer_ip + && let Some(message) = self.rate_limit_ok( + event, + TangleRateLimitKey::join_flow_ip(group_id.clone(), peer_ip), + rules.join_flow_per_ip(), + "group join ip", + now, + ) + { + return Some(message); + } + if let Some(message) = self.rate_limit_ok( event, TangleRateLimitKey::join_flow(group_id.clone(), event.unsigned().pubkey().clone()), rules.join_flow(), "group join", now, + ) { + return Some(message); + } + } + if let Some(peer_ip) = context.peer_ip + && let Some(message) = self.rate_limit_ok( + event, + TangleRateLimitKey::ip(TangleRateLimitScope::GroupWrite, peer_ip), + rules.write_per_ip(), + "group ip", + now, ) { return Some(message); @@ -257,7 +332,7 @@ impl TangleRuntime { subscription_id: &SubscriptionId, filters: &[Filter], auth: &BaseAuthState, - context: TangleQueryRateLimitContext, + context: TangleClientRateLimitContext, now: UnixTimestamp, ) -> Option<RelayMessage> { self.rate_limit_query(TangleQueryRateLimitRequest { @@ -277,7 +352,7 @@ impl TangleRuntime { subscription_id: &SubscriptionId, filters: &[Filter], auth: &BaseAuthState, - context: TangleQueryRateLimitContext, + context: TangleClientRateLimitContext, now: UnixTimestamp, ) -> Option<RelayMessage> { self.rate_limit_query(TangleQueryRateLimitRequest { @@ -448,20 +523,20 @@ impl TangleRuntimeHandle { auth: &mut BaseAuthState, now: UnixTimestamp, ) -> Result<Vec<RelayMessage>, BaseRelayError> { - self.handle_client_message_with_query_context( + self.handle_client_message_with_rate_limit_context( message, auth, - TangleQueryRateLimitContext::default(), + TangleClientRateLimitContext::default(), now, ) .await } - pub async fn handle_client_message_with_query_context( + pub async fn handle_client_message_with_rate_limit_context( &self, message: ClientMessage, auth: &mut BaseAuthState, - query_context: TangleQueryRateLimitContext, + rate_limit_context: TangleClientRateLimitContext, now: UnixTimestamp, ) -> Result<Vec<RelayMessage>, BaseRelayError> { self.metrics @@ -472,11 +547,13 @@ impl TangleRuntimeHandle { let started_at = Instant::now(); let event_id = event.id().clone(); let is_group_event = runtime.is_group_event(&event); - if let Some(message) = runtime.rate_limit_event(&event, now) { + if let Some(message) = runtime.rate_limit_event(&event, rate_limit_context, now) { record_event_metrics(runtime.metrics(), &message, is_group_event, started_at); return Ok(vec![message]); } - if let Some(message) = runtime.rate_limit_group_write(&event, now) { + if let Some(message) = + runtime.rate_limit_group_write(&event, rate_limit_context, now) + { record_event_metrics(runtime.metrics(), &message, is_group_event, started_at); return Ok(vec![message]); } @@ -520,9 +597,13 @@ impl TangleRuntimeHandle { .limits() .base_relay_limits() .validate_filters(&filters)?; - if let Some(message) = - runtime.rate_limit_req(&subscription_id, &filters, auth, query_context, now) - { + if let Some(message) = runtime.rate_limit_req( + &subscription_id, + &filters, + auth, + rate_limit_context, + now, + ) { runtime .metrics() .record_query_latency(elapsed_micros(started_at)); @@ -554,9 +635,13 @@ impl TangleRuntimeHandle { .limits() .base_relay_limits() .validate_filters(&filters)?; - if let Some(message) = - runtime.rate_limit_count(&subscription_id, &filters, auth, query_context, now) - { + if let Some(message) = runtime.rate_limit_count( + &subscription_id, + &filters, + auth, + rate_limit_context, + now, + ) { runtime .metrics() .record_query_latency(elapsed_micros(started_at)); @@ -584,7 +669,9 @@ impl TangleRuntimeHandle { message: error.prefixed_message(), }]); } - if let Some(message) = runtime.rate_limit_auth_attempt(&event, now) { + if let Some(message) = + runtime.rate_limit_auth_attempt(&event, rate_limit_context, now) + { runtime.metrics().record_auth_failure(); return Ok(vec![message]); } @@ -596,7 +683,8 @@ impl TangleRuntimeHandle { )?; if auth_response_failed(&replies) { runtime.metrics().record_auth_failure(); - if let Some(message) = runtime.rate_limit_auth_failure(&event_for_failure, now) + if let Some(message) = + runtime.rate_limit_auth_failure(&event_for_failure, rate_limit_context, now) { return Ok(vec![message]); } @@ -629,13 +717,16 @@ impl TangleRuntimeHandle { subscription_id: &SubscriptionId, filters: &[Filter], auth: &BaseAuthState, - query_context: TangleQueryRateLimitContext, + rate_limit_context: TangleClientRateLimitContext, now: UnixTimestamp, ) -> Option<RelayMessage> { - self.inner - .lock() - .await - .rate_limit_req(subscription_id, filters, auth, query_context, now) + self.inner.lock().await.rate_limit_req( + subscription_id, + filters, + auth, + rate_limit_context, + now, + ) } pub(crate) async fn query_req_with_auth( @@ -1405,7 +1496,7 @@ impl Default for TangleShutdownSignal { #[cfg(test)] mod tests { use super::{ - TangleQueryRateLimitContext, TangleRuntime, TangleRuntimeHandle, TangleRuntimeLimits, + TangleClientRateLimitContext, TangleRuntime, TangleRuntimeHandle, TangleRuntimeLimits, }; use crate::config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json}; use crate::event_bus::{TangleEventBus, TangleEventReceiveError}; @@ -1750,6 +1841,87 @@ mod tests { } #[tokio::test] + async fn runtime_rate_limits_event_peer_ips_partition_peers_and_precede_identity_keys() { + let root = temp_root("runtime-event-ip-rate-limit"); + let _ = std::fs::remove_dir_all(&root); + let runtime = TangleRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let rule = runtime.config().rate_limits().event().per_ip(); + let saturated_peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 20)); + let other_peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 21)); + let key = TangleRateLimitKey::ip(TangleRateLimitScope::Event, saturated_peer_ip); + for _ in 0..rule.max_hits() { + runtime + .rate_limiter() + .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); + } + let limited_event = + tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "limited") + .expect("limited event"); + let rotated_event = + tangle_v2_event(FixtureKey::Admin, 1_714_124_434, 2, Vec::new(), "rotated") + .expect("rotated event"); + let allowed_event = + tangle_v2_event(FixtureKey::Owner, 1_714_124_435, 2, Vec::new(), "allowed") + .expect("allowed event"); + let handle = TangleRuntimeHandle::new(runtime); + let mut auth = handle.auth_state().await.expect("auth"); + + assert_eq!( + handle + .handle_client_message_with_rate_limit_context( + ClientMessage::Event(limited_event.clone()), + &mut auth, + TangleClientRateLimitContext::new(Some(saturated_peer_ip), None), + UnixTimestamp::new(1_714_124_433) + ) + .await + .expect("event"), + vec![RelayMessage::Ok { + event_id: limited_event.id().clone(), + accepted: false, + message: "rate-limited: event ip rate limit exceeded until 1714124493".to_owned() + }] + ); + assert_eq!( + handle + .handle_client_message_with_rate_limit_context( + ClientMessage::Event(rotated_event.clone()), + &mut auth, + TangleClientRateLimitContext::new(Some(saturated_peer_ip), None), + UnixTimestamp::new(1_714_124_433) + ) + .await + .expect("event"), + vec![RelayMessage::Ok { + event_id: rotated_event.id().clone(), + accepted: false, + message: "rate-limited: event ip rate limit exceeded until 1714124493".to_owned() + }] + ); + assert_eq!( + handle + .handle_client_message_with_rate_limit_context( + ClientMessage::Event(allowed_event.clone()), + &mut auth, + TangleClientRateLimitContext::new(Some(other_peer_ip), None), + UnixTimestamp::new(1_714_124_433) + ) + .await + .expect("event"), + vec![RelayMessage::Ok { + event_id: allowed_event.id().clone(), + accepted: true, + message: String::new() + }] + ); + assert_eq!(handle.metrics().rate_limit_rejections(), 2); + assert_eq!(handle.metrics().event_rejections(), 2); + assert_eq!(handle.metrics().event_admissions(), 1); + + let _ = std::fs::remove_dir_all(root); + } + + #[tokio::test] async fn runtime_rate_limits_auth_pubkeys_before_authentication() { let root = temp_root("runtime-auth-pubkey-rate-limit"); let _ = std::fs::remove_dir_all(&root); @@ -1792,6 +1964,49 @@ mod tests { } #[tokio::test] + async fn runtime_rate_limits_auth_peer_ips_before_authentication() { + let root = temp_root("runtime-auth-ip-rate-limit"); + let _ = std::fs::remove_dir_all(&root); + let runtime = TangleRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let auth_event = + tangle_v2_auth_event(FixtureKey::Member, "challenge-a", 120).expect("auth event"); + let rule = runtime.config().rate_limits().auth().per_ip(); + let peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 30)); + let key = TangleRateLimitKey::ip(TangleRateLimitScope::Auth, peer_ip); + for _ in 0..rule.max_hits() { + runtime + .rate_limiter() + .record(key.clone(), rule, UnixTimestamp::new(120)); + } + let handle = TangleRuntimeHandle::new(runtime); + let mut auth = handle.auth_state().await.expect("auth"); + auth.issue_challenge("challenge-a", UnixTimestamp::new(100)) + .expect("challenge"); + + assert_eq!( + handle + .handle_client_message_with_rate_limit_context( + ClientMessage::Auth(auth_event.clone()), + &mut auth, + TangleClientRateLimitContext::new(Some(peer_ip), None), + UnixTimestamp::new(120) + ) + .await + .expect("auth"), + vec![RelayMessage::Ok { + event_id: auth_event.id().clone(), + accepted: false, + message: "rate-limited: auth ip rate limit exceeded until 180".to_owned() + }] + ); + assert!(auth.authenticated_pubkeys().is_empty()); + assert_eq!(handle.metrics().rate_limit_rejections(), 1); + assert_eq!(handle.metrics().auth_failures(), 1); + + let _ = std::fs::remove_dir_all(root); + } + + #[tokio::test] async fn runtime_rate_limits_auth_failures() { let root = temp_root("runtime-auth-failure-rate-limit"); let _ = std::fs::remove_dir_all(&root); @@ -1830,6 +2045,47 @@ mod tests { } #[tokio::test] + async fn runtime_rate_limits_auth_failures_by_peer_ip() { + let root = temp_root("runtime-auth-failure-ip-rate-limit"); + let _ = std::fs::remove_dir_all(&root); + let runtime = TangleRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let auth_event = tangle_v2_event(FixtureKey::Admin, 1_714_124_433, 22_242, Vec::new(), "") + .expect("auth event"); + let peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 31)); + let key = TangleRateLimitKey::auth_failure(Some(peer_ip), None); + let rule = runtime.config().rate_limits().auth().failures_per_ip(); + for _ in 0..rule.max_hits() { + runtime + .rate_limiter() + .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); + } + let handle = TangleRuntimeHandle::new(runtime); + let mut auth = handle.auth_state().await.expect("auth"); + + assert_eq!( + handle + .handle_client_message_with_rate_limit_context( + ClientMessage::Auth(auth_event.clone()), + &mut auth, + TangleClientRateLimitContext::new(Some(peer_ip), None), + UnixTimestamp::new(1_714_124_433) + ) + .await + .expect("auth"), + vec![RelayMessage::Ok { + event_id: auth_event.id().clone(), + accepted: false, + message: "rate-limited: auth failure ip rate limit exceeded until 1714124733" + .to_owned() + }] + ); + assert_eq!(handle.metrics().rate_limit_rejections(), 1); + assert_eq!(handle.metrics().auth_failures(), 1); + + let _ = std::fs::remove_dir_all(root); + } + + #[tokio::test] async fn runtime_rate_limits_group_writes_by_pubkey() { let root = temp_root("runtime-group-pubkey-rate-limit"); let _ = std::fs::remove_dir_all(&root); @@ -1876,6 +2132,53 @@ mod tests { } #[tokio::test] + async fn runtime_rate_limits_group_writes_by_peer_ip() { + let root = temp_root("runtime-group-ip-rate-limit"); + let _ = std::fs::remove_dir_all(&root); + let runtime = TangleRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let event = tangle_v2_event( + FixtureKey::Member, + 1_714_124_433, + 1, + vec![Tag::from_parts("h", &["Farm"]).expect("h")], + "limited", + ) + .expect("event"); + let rule = runtime.config().rate_limits().group().write_per_ip(); + let peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 40)); + let key = TangleRateLimitKey::ip(TangleRateLimitScope::GroupWrite, peer_ip); + for _ in 0..rule.max_hits() { + runtime + .rate_limiter() + .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); + } + let handle = TangleRuntimeHandle::new(runtime); + let mut auth = handle.auth_state().await.expect("auth"); + + assert_eq!( + handle + .handle_client_message_with_rate_limit_context( + ClientMessage::Event(event.clone()), + &mut auth, + TangleClientRateLimitContext::new(Some(peer_ip), None), + UnixTimestamp::new(1_714_124_433) + ) + .await + .expect("event"), + vec![RelayMessage::Ok { + event_id: event.id().clone(), + accepted: false, + message: "rate-limited: group ip rate limit exceeded until 1714124493".to_owned() + }] + ); + assert_eq!(handle.metrics().rate_limit_rejections(), 1); + assert_eq!(handle.metrics().event_rejections(), 1); + assert_eq!(handle.metrics().group_write_denials(), 1); + + let _ = std::fs::remove_dir_all(root); + } + + #[tokio::test] async fn runtime_rate_limits_group_writes_by_group_id() { let root = temp_root("runtime-group-write-rate-limit"); let _ = std::fs::remove_dir_all(&root); @@ -2006,6 +2309,55 @@ mod tests { } #[tokio::test] + async fn runtime_rate_limits_group_join_flows_by_peer_ip() { + let root = temp_root("runtime-group-join-ip-rate-limit"); + let _ = std::fs::remove_dir_all(&root); + let runtime = TangleRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let group_id = GroupId::new("Farm").expect("group"); + let event = tangle_v2_event( + FixtureKey::Member, + 1_714_124_433, + KIND_GROUP_JOIN_REQUEST.into(), + vec![Tag::from_parts("h", &[group_id.as_str()]).expect("h")], + "", + ) + .expect("event"); + let rule = runtime.config().rate_limits().group().join_flow_per_ip(); + let peer_ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 41)); + let key = TangleRateLimitKey::join_flow_ip(group_id, peer_ip); + for _ in 0..rule.max_hits() { + runtime + .rate_limiter() + .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); + } + let handle = TangleRuntimeHandle::new(runtime); + let mut auth = handle.auth_state().await.expect("auth"); + + assert_eq!( + handle + .handle_client_message_with_rate_limit_context( + ClientMessage::Event(event.clone()), + &mut auth, + TangleClientRateLimitContext::new(Some(peer_ip), None), + UnixTimestamp::new(1_714_124_433) + ) + .await + .expect("event"), + vec![RelayMessage::Ok { + event_id: event.id().clone(), + accepted: false, + message: "rate-limited: group join ip rate limit exceeded until 1714124733" + .to_owned() + }] + ); + assert_eq!(handle.metrics().rate_limit_rejections(), 1); + assert_eq!(handle.metrics().event_rejections(), 1); + assert_eq!(handle.metrics().group_write_denials(), 1); + + let _ = std::fs::remove_dir_all(root); + } + + #[tokio::test] async fn runtime_rate_limits_req_authenticated_pubkeys() { let root = temp_root("runtime-req-pubkey-rate-limit"); let _ = std::fs::remove_dir_all(&root); @@ -2082,13 +2434,13 @@ mod tests { assert_eq!( handle - .handle_client_message_with_query_context( + .handle_client_message_with_rate_limit_context( ClientMessage::Req { subscription_id: subscription_id.clone(), filters }, &mut auth, - TangleQueryRateLimitContext::new(None, Some(77)), + TangleClientRateLimitContext::new(None, Some(77)), UnixTimestamp::new(1_714_124_433) ) .await @@ -2163,13 +2515,13 @@ mod tests { assert_eq!( handle - .handle_client_message_with_query_context( + .handle_client_message_with_rate_limit_context( ClientMessage::Count { subscription_id: subscription_id.clone(), filters }, &mut auth, - TangleQueryRateLimitContext::new(Some(peer_ip), None), + TangleClientRateLimitContext::new(Some(peer_ip), None), UnixTimestamp::new(1_714_124_433) ) .await @@ -2393,18 +2745,23 @@ mod tests { }, "rate_limits": { "auth": { + "per_ip": {"window_seconds": 60, "max_hits": 120}, "per_pubkey": {"window_seconds": 60, "max_hits": 30}, - "failures": {"window_seconds": 300, "max_hits": 5} + "failures": {"window_seconds": 300, "max_hits": 5}, + "failures_per_ip": {"window_seconds": 300, "max_hits": 20} }, "event": { + "per_ip": {"window_seconds": 60, "max_hits": 600}, "per_pubkey": {"window_seconds": 60, "max_hits": 120}, "per_kind": {"window_seconds": 60, "max_hits": 1000} }, "group": { + "write_per_ip": {"window_seconds": 60, "max_hits": 300}, "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, "write_per_group": {"window_seconds": 60, "max_hits": 90}, "write_per_kind": {"window_seconds": 60, "max_hits": 300}, - "join_flow": {"window_seconds": 300, "max_hits": 10} + "join_flow": {"window_seconds": 300, "max_hits": 10}, + "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30} }, "req": { "per_ip": {"window_seconds": 60, "max_hits": 600}, diff --git a/crates/tangle_runtime/src/server.rs b/crates/tangle_runtime/src/server.rs @@ -554,18 +554,23 @@ mod tests { }, "rate_limits": { "auth": { + "per_ip": {"window_seconds": 60, "max_hits": 120}, "per_pubkey": {"window_seconds": 60, "max_hits": 30}, - "failures": {"window_seconds": 300, "max_hits": 5} + "failures": {"window_seconds": 300, "max_hits": 5}, + "failures_per_ip": {"window_seconds": 300, "max_hits": 20} }, "event": { + "per_ip": {"window_seconds": 60, "max_hits": 600}, "per_pubkey": {"window_seconds": 60, "max_hits": 120}, "per_kind": {"window_seconds": 60, "max_hits": 1000} }, "group": { + "write_per_ip": {"window_seconds": 60, "max_hits": 300}, "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, "write_per_group": {"window_seconds": 60, "max_hits": 90}, "write_per_kind": {"window_seconds": 60, "max_hits": 300}, - "join_flow": {"window_seconds": 300, "max_hits": 10} + "join_flow": {"window_seconds": 300, "max_hits": 10}, + "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30} }, "req": { "per_ip": {"window_seconds": 60, "max_hits": 600}, diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs @@ -9,7 +9,7 @@ use crate::{ live::{CloseResult, LiveSubscriptionSet}, }, runtime::{ - TangleClientMessageMetricKind, TangleQueryRateLimitContext, TangleRuntimeHandle, + TangleClientMessageMetricKind, TangleClientRateLimitContext, TangleRuntimeHandle, TangleRuntimeLimits, }, }; @@ -271,9 +271,9 @@ impl TangleWebSocketSession { subscription_id, filters, } => { - let context = self.query_rate_limit_context(); + let context = self.client_rate_limit_context(); self.runtime - .handle_client_message_with_query_context( + .handle_client_message_with_rate_limit_context( ClientMessage::Count { subscription_id, filters, @@ -296,8 +296,14 @@ impl TangleWebSocketSession { Ok(Vec::new()) } message => { + let context = self.client_rate_limit_context(); self.runtime - .handle_client_message(message, &mut self.auth, current_unix_timestamp()) + .handle_client_message_with_rate_limit_context( + message, + &mut self.auth, + context, + current_unix_timestamp(), + ) .await } } @@ -320,7 +326,7 @@ impl TangleWebSocketSession { &subscription_id, &filters, &self.auth, - self.query_rate_limit_context(), + self.client_rate_limit_context(), current_unix_timestamp(), ) .await @@ -347,8 +353,8 @@ impl TangleWebSocketSession { } } - fn query_rate_limit_context(&self) -> TangleQueryRateLimitContext { - TangleQueryRateLimitContext::new(self.peer_ip, Some(self.connection_id)) + fn client_rate_limit_context(&self) -> TangleClientRateLimitContext { + TangleClientRateLimitContext::new(self.peer_ip, Some(self.connection_id)) } fn send_relay_message(&self, message: RelayMessage) -> Result<(), TangleOutboundQueueError> { @@ -759,18 +765,23 @@ mod tests { }, "rate_limits": { "auth": { + "per_ip": {"window_seconds": 60, "max_hits": 120}, "per_pubkey": {"window_seconds": 60, "max_hits": 30}, - "failures": {"window_seconds": 300, "max_hits": 5} + "failures": {"window_seconds": 300, "max_hits": 5}, + "failures_per_ip": {"window_seconds": 300, "max_hits": 20} }, "event": { + "per_ip": {"window_seconds": 60, "max_hits": 600}, "per_pubkey": {"window_seconds": 60, "max_hits": 120}, "per_kind": {"window_seconds": 60, "max_hits": 1000} }, "group": { + "write_per_ip": {"window_seconds": 60, "max_hits": 300}, "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, "write_per_group": {"window_seconds": 60, "max_hits": 90}, "write_per_kind": {"window_seconds": 60, "max_hits": 300}, - "join_flow": {"window_seconds": 300, "max_hits": 10} + "join_flow": {"window_seconds": 300, "max_hits": 10}, + "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30} }, "req": { "per_ip": {"window_seconds": 60, "max_hits": 600}, diff --git a/crates/tangle_runtime/tests/base_relay_v2.rs b/crates/tangle_runtime/tests/base_relay_v2.rs @@ -2012,18 +2012,23 @@ fn runtime_config(groups_enabled: bool) -> BaseRelayRuntimeConfig { }, "rate_limits": { "auth": { + "per_ip": {"window_seconds": 60, "max_hits": 120}, "per_pubkey": {"window_seconds": 60, "max_hits": 30}, - "failures": {"window_seconds": 300, "max_hits": 5} + "failures": {"window_seconds": 300, "max_hits": 5}, + "failures_per_ip": {"window_seconds": 300, "max_hits": 20} }, "event": { + "per_ip": {"window_seconds": 60, "max_hits": 600}, "per_pubkey": {"window_seconds": 60, "max_hits": 120}, "per_kind": {"window_seconds": 60, "max_hits": 1000} }, "group": { + "write_per_ip": {"window_seconds": 60, "max_hits": 300}, "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, "write_per_group": {"window_seconds": 60, "max_hits": 90}, "write_per_kind": {"window_seconds": 60, "max_hits": 300}, - "join_flow": {"window_seconds": 300, "max_hits": 10} + "join_flow": {"window_seconds": 300, "max_hits": 10}, + "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30} }, "req": { "per_ip": {"window_seconds": 60, "max_hits": 600}, diff --git a/crates/tangle_runtime/tests/ops_truthfulness.rs b/crates/tangle_runtime/tests/ops_truthfulness.rs @@ -167,18 +167,23 @@ fn runtime_config(root: &Path) -> BaseRelayRuntimeConfig { }, "rate_limits": { "auth": { + "per_ip": {"window_seconds": 60, "max_hits": 120}, "per_pubkey": {"window_seconds": 60, "max_hits": 30}, - "failures": {"window_seconds": 60, "max_hits": 1} + "failures": {"window_seconds": 60, "max_hits": 1}, + "failures_per_ip": {"window_seconds": 300, "max_hits": 20} }, "event": { + "per_ip": {"window_seconds": 60, "max_hits": 600}, "per_pubkey": {"window_seconds": 60, "max_hits": 120}, "per_kind": {"window_seconds": 60, "max_hits": 1000} }, "group": { + "write_per_ip": {"window_seconds": 60, "max_hits": 300}, "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, "write_per_group": {"window_seconds": 60, "max_hits": 90}, "write_per_kind": {"window_seconds": 60, "max_hits": 300}, - "join_flow": {"window_seconds": 300, "max_hits": 10} + "join_flow": {"window_seconds": 300, "max_hits": 10}, + "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30} }, "req": { "per_ip": {"window_seconds": 60, "max_hits": 600}, diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs @@ -1635,18 +1635,23 @@ fn runtime_config_value(root: &Path, listen_addr: SocketAddr) -> Value { }, "rate_limits": { "auth": { + "per_ip": {"window_seconds": 60, "max_hits": 120}, "per_pubkey": {"window_seconds": 60, "max_hits": 30}, - "failures": {"window_seconds": 300, "max_hits": 5} + "failures": {"window_seconds": 300, "max_hits": 5}, + "failures_per_ip": {"window_seconds": 300, "max_hits": 20} }, "event": { + "per_ip": {"window_seconds": 60, "max_hits": 600}, "per_pubkey": {"window_seconds": 60, "max_hits": 120}, "per_kind": {"window_seconds": 60, "max_hits": 1000} }, "group": { + "write_per_ip": {"window_seconds": 60, "max_hits": 300}, "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, "write_per_group": {"window_seconds": 60, "max_hits": 90}, "write_per_kind": {"window_seconds": 60, "max_hits": 300}, - "join_flow": {"window_seconds": 300, "max_hits": 10} + "join_flow": {"window_seconds": 300, "max_hits": 10}, + "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30} }, "req": { "per_ip": {"window_seconds": 60, "max_hits": 600},