tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit c3d7fdaf03550772d1ea24a4a12624e0a169dfe9
parent 1fa6add245683783ef3e4be01698e64d08485a45
Author: triesap <tyson@radroots.org>
Date:   Sun, 14 Jun 2026 07:02:34 -0700

limits: apply auth and event rate checks

- Add required AUTH and EVENT rate-limit config parsing with deny-unknown validation.

- Enforce EVENT pubkey/kind, AUTH pubkey, and AUTH-failure buckets from parsed runtime config.

- Cover the config contract and runtime rejection paths with focused tests.

- Validated with cargo fmt --all -- --check, cargo check --workspace --all-targets, cargo test --workspace, and cargo clippy --workspace --all-targets -- -D warnings.

Diffstat:
Mcrates/tangle/tests/version.rs | 10++++++++++
Mcrates/tangle_runtime/src/config.rs | 110+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcrates/tangle_runtime/src/errors.rs | 11+++++++++++
Mcrates/tangle_runtime/src/rate_limits.rs | 185++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------
Mcrates/tangle_runtime/src/runtime.rs | 274+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
Mcrates/tangle_runtime/src/server.rs | 10++++++++++
Mcrates/tangle_runtime/src/session.rs | 10++++++++++
Mcrates/tangle_runtime/tests/phase2_acceptance_targets.rs | 10++++++++++
8 files changed, 596 insertions(+), 24 deletions(-)

diff --git a/crates/tangle/tests/version.rs b/crates/tangle/tests/version.rs @@ -136,6 +136,16 @@ fn tangle_run_starts_server_and_stays_alive_until_shutdown() { "max_content_length": 65536, "broadcast_channel_capacity": 4096, "per_connection_outbound_queue": 256 + }, + "rate_limits": { + "auth": { + "per_pubkey": {"window_seconds": 60, "max_hits": 30}, + "failures": {"window_seconds": 300, "max_hits": 5} + }, + "event": { + "per_pubkey": {"window_seconds": 60, "max_hits": 120}, + "per_kind": {"window_seconds": 60, "max_hits": 1000} + } } }) .to_string(), diff --git a/crates/tangle_runtime/src/config.rs b/crates/tangle_runtime/src/config.rs @@ -2,6 +2,10 @@ use crate::{ errors::BaseRelayError, + rate_limits::{ + TangleAuthRateLimitConfig, TangleEventRateLimitConfig, TangleRateLimitConfig, + TangleRateLimitRule, + }, relay::{ auth::BaseAuthState, core::{BaseRelay, BaseRelayLimitSettings, BaseRelayLimits}, @@ -22,6 +26,7 @@ pub struct BaseRelayRuntimeConfig { auth_ttl_seconds: u64, auth_created_at_skew_seconds: u64, limits: BaseRelayRuntimeLimitsConfig, + rate_limits: TangleRateLimitConfig, tracing: BaseRelayTracingConfig, } @@ -54,6 +59,10 @@ impl BaseRelayRuntimeConfig { self.limits } + pub fn rate_limits(&self) -> TangleRateLimitConfig { + self.rate_limits + } + pub fn tracing(&self) -> &BaseRelayTracingConfig { &self.tracing } @@ -280,6 +289,7 @@ struct BaseRelayRuntimeConfigDocument { groups: serde_json::Value, auth: BaseRelayAuthConfigDocument, limits: BaseRelayRuntimeLimitsDocument, + rate_limits: BaseRelayRateLimitsDocument, #[serde(default)] observability: BaseRelayObservabilityConfigDocument, } @@ -330,6 +340,34 @@ struct BaseRelayRuntimeLimitsDocument { per_connection_outbound_queue: usize, } +#[derive(Debug, Deserialize)] +#[serde(deny_unknown_fields)] +struct BaseRelayRateLimitsDocument { + auth: BaseRelayAuthRateLimitsDocument, + event: BaseRelayEventRateLimitsDocument, +} + +#[derive(Debug, Deserialize)] +#[serde(deny_unknown_fields)] +struct BaseRelayAuthRateLimitsDocument { + per_pubkey: BaseRelayRateLimitRuleDocument, + failures: BaseRelayRateLimitRuleDocument, +} + +#[derive(Debug, Deserialize)] +#[serde(deny_unknown_fields)] +struct BaseRelayEventRateLimitsDocument { + per_pubkey: BaseRelayRateLimitRuleDocument, + per_kind: BaseRelayRateLimitRuleDocument, +} + +#[derive(Debug, Clone, Copy, Deserialize)] +#[serde(deny_unknown_fields)] +struct BaseRelayRateLimitRuleDocument { + window_seconds: u64, + max_hits: u64, +} + #[derive(Debug, Default, Deserialize)] #[serde(deny_unknown_fields)] struct BaseRelayObservabilityConfigDocument { @@ -384,6 +422,7 @@ pub fn parse_base_relay_runtime_config_json( let groups = tangle_groups::parse_group_runtime_config_json(&groups_raw) .map_err(|error| BaseRelayError::invalid(error.to_string()))?; let limits = BaseRelayRuntimeLimitsConfig::from_document(document.limits)?; + let rate_limits = base_relay_rate_limits_from_document(document.rate_limits)?; if document.auth.created_at_skew_seconds == 0 { return Err(BaseRelayError::invalid( "auth.created_at_skew_seconds must be greater than zero", @@ -398,6 +437,7 @@ pub fn parse_base_relay_runtime_config_json( auth_ttl_seconds: document.auth.challenge_ttl_seconds, auth_created_at_skew_seconds: document.auth.created_at_skew_seconds, limits, + rate_limits, tracing, }) } @@ -420,6 +460,42 @@ fn require_positive_u64(field: &str, value: u64) -> Result<(), BaseRelayError> { Ok(()) } +fn base_relay_rate_limits_from_document( + document: BaseRelayRateLimitsDocument, +) -> Result<TangleRateLimitConfig, BaseRelayError> { + Ok(TangleRateLimitConfig::new( + TangleAuthRateLimitConfig::new( + base_relay_rate_limit_rule_from_document( + "rate_limits.auth.per_pubkey", + document.auth.per_pubkey, + )?, + base_relay_rate_limit_rule_from_document( + "rate_limits.auth.failures", + document.auth.failures, + )?, + ), + TangleEventRateLimitConfig::new( + base_relay_rate_limit_rule_from_document( + "rate_limits.event.per_pubkey", + document.event.per_pubkey, + )?, + base_relay_rate_limit_rule_from_document( + "rate_limits.event.per_kind", + document.event.per_kind, + )?, + ), + )) +} + +fn base_relay_rate_limit_rule_from_document( + field: &str, + document: BaseRelayRateLimitRuleDocument, +) -> Result<TangleRateLimitRule, BaseRelayError> { + require_positive_u64(&format!("{field}.window_seconds"), document.window_seconds)?; + require_positive_u64(&format!("{field}.max_hits"), document.max_hits)?; + TangleRateLimitRule::new(document.window_seconds, document.max_hits) +} + fn base_relay_tracing_config_from_document( document: BaseRelayTracingConfigDocument, ) -> Result<BaseRelayTracingConfig, BaseRelayError> { @@ -478,6 +554,10 @@ mod tests { assert_eq!(config.limits().max_content_length(), 65_536); assert_eq!(config.limits().broadcast_channel_capacity(), 4_096); assert_eq!(config.limits().per_connection_outbound_queue(), 256); + assert_eq!(config.rate_limits().auth().per_pubkey().max_hits(), 30); + assert_eq!(config.rate_limits().auth().failures().max_hits(), 5); + assert_eq!(config.rate_limits().event().per_pubkey().max_hits(), 120); + assert_eq!(config.rate_limits().event().per_kind().max_hits(), 1_000); assert!(config.tracing().enabled()); assert_eq!(config.tracing().format(), BaseRelayTracingFormat::Json); config.auth_state().expect("auth"); @@ -515,6 +595,16 @@ mod tests { "max_content_length": 65536, "broadcast_channel_capacity": 4096, "per_connection_outbound_queue": 256 + }, + "rate_limits": { + "auth": { + "per_pubkey": {"window_seconds": 60, "max_hits": 30}, + "failures": {"window_seconds": 300, "max_hits": 5} + }, + "event": { + "per_pubkey": {"window_seconds": 60, "max_hits": 120}, + "per_kind": {"window_seconds": 60, "max_hits": 1000} + } } }"#; @@ -559,6 +649,16 @@ mod tests { "broadcast_channel_capacity": 4096, "per_connection_outbound_queue": 256 }, + "rate_limits": { + "auth": { + "per_pubkey": {"window_seconds": 60, "max_hits": 30}, + "failures": {"window_seconds": 300, "max_hits": 5} + }, + "event": { + "per_pubkey": {"window_seconds": 60, "max_hits": 120}, + "per_kind": {"window_seconds": 60, "max_hits": 1000} + } + }, "ignored": true }"#; assert!( @@ -599,6 +699,16 @@ mod tests { "broadcast_channel_capacity": 4096, "per_connection_outbound_queue": 256, "max_unimplemented_limit": 99 + }, + "rate_limits": { + "auth": { + "per_pubkey": {"window_seconds": 60, "max_hits": 30}, + "failures": {"window_seconds": 300, "max_hits": 5} + }, + "event": { + "per_pubkey": {"window_seconds": 60, "max_hits": 120}, + "per_kind": {"window_seconds": 60, "max_hits": 1000} + } } }"#; assert!( diff --git a/crates/tangle_runtime/src/errors.rs b/crates/tangle_runtime/src/errors.rs @@ -25,6 +25,13 @@ impl BaseRelayError { } } + pub fn rate_limited(message: impl Into<String>) -> Self { + Self { + prefix: "rate-limited", + message: message.into(), + } + } + pub fn error(message: impl Into<String>) -> Self { Self { prefix: "error", @@ -92,6 +99,10 @@ mod tests { "auth-required: login" ); assert_eq!( + BaseRelayError::rate_limited("slow down").prefixed_message(), + "rate-limited: slow down" + ); + assert_eq!( BaseRelayError::error("store").prefixed_message(), "error: store" ); diff --git a/crates/tangle_runtime/src/rate_limits.rs b/crates/tangle_runtime/src/rate_limits.rs @@ -9,12 +9,33 @@ use std::{ use tangle_groups::GroupId; use tangle_protocol::{Kind, PublicKeyHex, UnixTimestamp}; +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum TangleRateLimitScope { + Auth, + Event, + GroupWrite, + Req, + Count, +} + #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum TangleRateLimitKey { - Ip(IpAddr), - Pubkey(PublicKeyHex), - Group(GroupId), - Kind(Kind), + Ip { + scope: TangleRateLimitScope, + ip: IpAddr, + }, + Pubkey { + scope: TangleRateLimitScope, + pubkey: PublicKeyHex, + }, + Group { + scope: TangleRateLimitScope, + group_id: GroupId, + }, + Kind { + scope: TangleRateLimitScope, + kind: Kind, + }, AuthFailure { ip: Option<IpAddr>, pubkey: Option<PublicKeyHex>, @@ -26,20 +47,20 @@ pub enum TangleRateLimitKey { } impl TangleRateLimitKey { - pub fn ip(ip: IpAddr) -> Self { - Self::Ip(ip) + pub fn ip(scope: TangleRateLimitScope, ip: IpAddr) -> Self { + Self::Ip { scope, ip } } - pub fn pubkey(pubkey: PublicKeyHex) -> Self { - Self::Pubkey(pubkey) + pub fn pubkey(scope: TangleRateLimitScope, pubkey: PublicKeyHex) -> Self { + Self::Pubkey { scope, pubkey } } - pub fn group(group_id: GroupId) -> Self { - Self::Group(group_id) + pub fn group(scope: TangleRateLimitScope, group_id: GroupId) -> Self { + Self::Group { scope, group_id } } - pub fn kind(kind: Kind) -> Self { - Self::Kind(kind) + pub fn kind(scope: TangleRateLimitScope, kind: Kind) -> Self { + Self::Kind { scope, kind } } pub fn auth_failure(ip: Option<IpAddr>, pubkey: Option<PublicKeyHex>) -> Self { @@ -52,6 +73,72 @@ impl TangleRateLimitKey { } #[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct TangleRateLimitConfig { + auth: TangleAuthRateLimitConfig, + event: TangleEventRateLimitConfig, +} + +impl TangleRateLimitConfig { + pub fn new(auth: TangleAuthRateLimitConfig, event: TangleEventRateLimitConfig) -> Self { + Self { auth, event } + } + + pub fn auth(self) -> TangleAuthRateLimitConfig { + self.auth + } + + pub fn event(self) -> TangleEventRateLimitConfig { + self.event + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct TangleAuthRateLimitConfig { + per_pubkey: TangleRateLimitRule, + failures: TangleRateLimitRule, +} + +impl TangleAuthRateLimitConfig { + pub fn new(per_pubkey: TangleRateLimitRule, failures: TangleRateLimitRule) -> Self { + Self { + per_pubkey, + failures, + } + } + + pub fn per_pubkey(self) -> TangleRateLimitRule { + self.per_pubkey + } + + pub fn failures(self) -> TangleRateLimitRule { + self.failures + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct TangleEventRateLimitConfig { + per_pubkey: TangleRateLimitRule, + per_kind: TangleRateLimitRule, +} + +impl TangleEventRateLimitConfig { + pub fn new(per_pubkey: TangleRateLimitRule, per_kind: TangleRateLimitRule) -> Self { + Self { + per_pubkey, + per_kind, + } + } + + pub fn per_pubkey(self) -> TangleRateLimitRule { + self.per_pubkey + } + + pub fn per_kind(self) -> TangleRateLimitRule { + self.per_kind + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct TangleRateLimitRule { window_seconds: u64, max_hits: u64, @@ -202,7 +289,9 @@ fn reset_at(rule: TangleRateLimitRule, now: UnixTimestamp) -> UnixTimestamp { #[cfg(test)] mod tests { use super::{ - TangleRateLimitDecision, TangleRateLimitKey, TangleRateLimitRule, TangleRateLimiter, + TangleAuthRateLimitConfig, TangleEventRateLimitConfig, TangleRateLimitConfig, + TangleRateLimitDecision, TangleRateLimitKey, TangleRateLimitRule, TangleRateLimitScope, + TangleRateLimiter, }; use std::net::{IpAddr, Ipv4Addr}; use tangle_groups::GroupId; @@ -233,10 +322,10 @@ mod tests { let group_id = GroupId::new("farm").expect("group"); let kind = Kind::new(1).expect("kind"); let keys = [ - TangleRateLimitKey::ip(ip), - TangleRateLimitKey::pubkey(pubkey.clone()), - TangleRateLimitKey::group(group_id.clone()), - TangleRateLimitKey::kind(kind), + TangleRateLimitKey::ip(TangleRateLimitScope::Event, ip), + TangleRateLimitKey::pubkey(TangleRateLimitScope::Event, pubkey.clone()), + TangleRateLimitKey::group(TangleRateLimitScope::GroupWrite, group_id.clone()), + TangleRateLimitKey::kind(TangleRateLimitScope::Event, kind), TangleRateLimitKey::auth_failure(Some(ip), Some(pubkey.clone())), TangleRateLimitKey::join_flow(group_id, pubkey), ]; @@ -262,7 +351,10 @@ mod tests { fn rate_limiter_counts_per_key_and_resets_after_window() { let limiter = TangleRateLimiter::new(); let rule = TangleRateLimitRule::new(10, 2).expect("rule"); - let key = TangleRateLimitKey::ip(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))); + let key = TangleRateLimitKey::ip( + TangleRateLimitScope::Event, + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + ); let first = limiter.record(key.clone(), rule, UnixTimestamp::from(100)); let second = limiter.record(key.clone(), rule, UnixTimestamp::from(101)); @@ -303,12 +395,18 @@ mod tests { let limiter = TangleRateLimiter::new(); let rule = TangleRateLimitRule::new(5, 1).expect("rule"); limiter.record( - TangleRateLimitKey::ip(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))), + TangleRateLimitKey::ip( + TangleRateLimitScope::Event, + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + ), rule, UnixTimestamp::from(1), ); limiter.record( - TangleRateLimitKey::ip(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2))), + TangleRateLimitKey::ip( + TangleRateLimitScope::Event, + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 2)), + ), rule, UnixTimestamp::from(5), ); @@ -317,4 +415,51 @@ mod tests { assert_eq!(limiter.tracked_key_count(), 1); } + + #[test] + fn scoped_pubkey_keys_do_not_share_buckets() { + let limiter = TangleRateLimiter::new(); + let rule = TangleRateLimitRule::new(60, 1).expect("rule"); + let pubkey = PublicKeyHex::new(&"1".repeat(64)).expect("pubkey"); + let auth_key = TangleRateLimitKey::pubkey(TangleRateLimitScope::Auth, pubkey.clone()); + let event_key = TangleRateLimitKey::pubkey(TangleRateLimitScope::Event, pubkey); + + assert!( + limiter + .record(auth_key.clone(), rule, UnixTimestamp::from(1)) + .is_allowed() + ); + assert!( + limiter + .record(event_key.clone(), rule, UnixTimestamp::from(1)) + .is_allowed() + ); + assert!( + !limiter + .record(auth_key, rule, UnixTimestamp::from(2)) + .is_allowed() + ); + assert!( + !limiter + .record(event_key, rule, UnixTimestamp::from(2)) + .is_allowed() + ); + } + + #[test] + fn rate_limit_config_exposes_auth_and_event_rules() { + let auth_pubkey = TangleRateLimitRule::new(60, 2).expect("auth pubkey"); + let auth_failures = TangleRateLimitRule::new(300, 3).expect("auth failures"); + let event_pubkey = TangleRateLimitRule::new(60, 4).expect("event pubkey"); + let event_kind = TangleRateLimitRule::new(60, 5).expect("event kind"); + let config = TangleRateLimitConfig::new( + TangleAuthRateLimitConfig::new(auth_pubkey, auth_failures), + TangleEventRateLimitConfig::new(event_pubkey, event_kind), + ); + + assert_eq!(config.auth().per_pubkey(), auth_pubkey); + assert_eq!(config.auth().failures(), auth_failures); + assert_eq!(config.event().per_pubkey(), event_pubkey); + assert_eq!(config.event().per_kind(), event_kind); + } } diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs @@ -5,7 +5,10 @@ use crate::{ errors::BaseRelayError, event_bus::{TangleEventBus, TangleEventReceiver}, ops::BaseRelayReadinessState, - rate_limits::TangleRateLimiter, + rate_limits::{ + TangleRateLimitDecision, TangleRateLimitKey, TangleRateLimitRule, TangleRateLimitScope, + TangleRateLimiter, + }, relay::{ auth::BaseAuthState, core::{BaseRelay, BaseRelayLimits, BaseRelayShutdownReport}, @@ -97,6 +100,75 @@ impl TangleRuntime { self.shutdown.request_shutdown(); self.relay.shutdown() } + + fn rate_limit_event(&self, event: &Event, now: UnixTimestamp) -> Option<RelayMessage> { + let rules = self.config.rate_limits().event(); + self.rate_limit_ok( + event, + TangleRateLimitKey::pubkey( + TangleRateLimitScope::Event, + event.unsigned().pubkey().clone(), + ), + rules.per_pubkey(), + "event pubkey", + now, + ) + .or_else(|| { + self.rate_limit_ok( + event, + TangleRateLimitKey::kind(TangleRateLimitScope::Event, event.unsigned().kind()), + rules.per_kind(), + "event kind", + now, + ) + }) + } + + fn rate_limit_auth_attempt(&self, event: &Event, now: UnixTimestamp) -> Option<RelayMessage> { + let rules = self.config.rate_limits().auth(); + self.rate_limit_ok( + event, + TangleRateLimitKey::pubkey( + TangleRateLimitScope::Auth, + event.unsigned().pubkey().clone(), + ), + rules.per_pubkey(), + "auth pubkey", + now, + ) + } + + fn rate_limit_auth_failure(&self, event: &Event, now: UnixTimestamp) -> Option<RelayMessage> { + let rules = self.config.rate_limits().auth(); + self.rate_limit_ok( + event, + TangleRateLimitKey::auth_failure(None, Some(event.unsigned().pubkey().clone())), + rules.failures(), + "auth failure", + now, + ) + } + + fn rate_limit_ok( + &self, + event: &Event, + key: TangleRateLimitKey, + rule: TangleRateLimitRule, + label: &'static str, + now: UnixTimestamp, + ) -> Option<RelayMessage> { + match self.rate_limiter.record(key, rule, now) { + TangleRateLimitDecision::Allowed { .. } => None, + TangleRateLimitDecision::Rejected { reset_at } => Some(RelayMessage::Ok { + event_id: event.id().clone(), + accepted: false, + message: BaseRelayError::rate_limited(format!( + "{label} rate limit exceeded until {reset_at}" + )) + .prefixed_message(), + }), + } + } } #[derive(Clone)] @@ -124,6 +196,9 @@ impl TangleRuntimeHandle { let mut runtime = self.inner.lock().await; match message { ClientMessage::Event(event) => { + if let Some(message) = runtime.rate_limit_event(&event, now) { + return Ok(vec![message]); + } let result = runtime .relay_mut() .handle_event_with_auth_report(event, auth)?; @@ -141,9 +216,21 @@ impl TangleRuntimeHandle { message: error.prefixed_message(), }]); } - runtime - .relay_mut() - .handle_client_message(ClientMessage::Auth(event), auth, now) + if let Some(message) = runtime.rate_limit_auth_attempt(&event, now) { + return Ok(vec![message]); + } + let event_for_failure = event.clone(); + let replies = runtime.relay_mut().handle_client_message( + ClientMessage::Auth(event), + auth, + now, + )?; + if auth_response_failed(&replies) + && let Some(message) = runtime.rate_limit_auth_failure(&event_for_failure, now) + { + return Ok(vec![message]); + } + Ok(replies) } message => runtime .relay_mut() @@ -201,6 +288,18 @@ impl TangleRuntimeHandle { } } +fn auth_response_failed(replies: &[RelayMessage]) -> bool { + replies.iter().any(|reply| { + matches!( + reply, + RelayMessage::Ok { + accepted: false, + .. + } + ) + }) +} + impl fmt::Debug for TangleRuntimeHandle { fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { formatter.write_str("TangleRuntimeHandle") @@ -368,6 +467,7 @@ mod tests { use super::{TangleRuntime, TangleRuntimeHandle, TangleRuntimeLimits}; use crate::config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json}; use crate::event_bus::{TangleEventBus, TangleEventReceiveError}; + use crate::rate_limits::{TangleRateLimitKey, TangleRateLimitScope}; use crate::relay::core::{BaseRelayLimitSettings, BaseRelayLimits}; use crate::relay::live::LiveSubscriptionSet; use serde_json::json; @@ -517,6 +617,162 @@ mod tests { } #[tokio::test] + async fn runtime_rate_limits_event_pubkeys_before_storage() { + let root = temp_root("runtime-event-rate-limit"); + let _ = std::fs::remove_dir_all(&root); + let runtime = TangleRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let event = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 1, Vec::new(), "limited") + .expect("event"); + let rule = runtime.config().rate_limits().event().per_pubkey(); + let key = TangleRateLimitKey::pubkey( + TangleRateLimitScope::Event, + event.unsigned().pubkey().clone(), + ); + for _ in 0..rule.max_hits() { + runtime + .rate_limiter() + .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); + } + let handle = TangleRuntimeHandle::new(runtime); + let mut auth = handle.auth_state().await.expect("auth"); + + assert_eq!( + handle + .handle_client_message( + ClientMessage::Event(event.clone()), + &mut auth, + UnixTimestamp::new(1_714_124_433) + ) + .await + .expect("event"), + vec![RelayMessage::Ok { + event_id: event.id().clone(), + accepted: false, + message: "rate-limited: event pubkey rate limit exceeded until 1714124493" + .to_owned() + }] + ); + + let _ = std::fs::remove_dir_all(root); + } + + #[tokio::test] + async fn runtime_rate_limits_event_kinds_before_storage() { + let root = temp_root("runtime-event-kind-rate-limit"); + let _ = std::fs::remove_dir_all(&root); + let runtime = TangleRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let event = tangle_v2_event(FixtureKey::Admin, 1_714_124_433, 1, Vec::new(), "limited") + .expect("event"); + let rule = runtime.config().rate_limits().event().per_kind(); + let key = TangleRateLimitKey::kind(TangleRateLimitScope::Event, event.unsigned().kind()); + for _ in 0..rule.max_hits() { + runtime + .rate_limiter() + .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); + } + let handle = TangleRuntimeHandle::new(runtime); + let mut auth = handle.auth_state().await.expect("auth"); + + assert_eq!( + handle + .handle_client_message( + ClientMessage::Event(event.clone()), + &mut auth, + UnixTimestamp::new(1_714_124_433) + ) + .await + .expect("event"), + vec![RelayMessage::Ok { + event_id: event.id().clone(), + accepted: false, + message: "rate-limited: event kind rate limit exceeded until 1714124493".to_owned() + }] + ); + + let _ = std::fs::remove_dir_all(root); + } + + #[tokio::test] + async fn runtime_rate_limits_auth_pubkeys_before_authentication() { + let root = temp_root("runtime-auth-pubkey-rate-limit"); + let _ = std::fs::remove_dir_all(&root); + let runtime = TangleRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let auth_event = + tangle_v2_auth_event(FixtureKey::Member, "challenge-a", 120).expect("auth event"); + let rule = runtime.config().rate_limits().auth().per_pubkey(); + let key = TangleRateLimitKey::pubkey( + TangleRateLimitScope::Auth, + auth_event.unsigned().pubkey().clone(), + ); + for _ in 0..rule.max_hits() { + runtime + .rate_limiter() + .record(key.clone(), rule, UnixTimestamp::new(120)); + } + let handle = TangleRuntimeHandle::new(runtime); + let mut auth = handle.auth_state().await.expect("auth"); + auth.issue_challenge("challenge-a", UnixTimestamp::new(100)) + .expect("challenge"); + + assert_eq!( + handle + .handle_client_message( + ClientMessage::Auth(auth_event.clone()), + &mut auth, + UnixTimestamp::new(120) + ) + .await + .expect("auth"), + vec![RelayMessage::Ok { + event_id: auth_event.id().clone(), + accepted: false, + message: "rate-limited: auth pubkey rate limit exceeded until 180".to_owned() + }] + ); + assert!(auth.authenticated_pubkeys().is_empty()); + + let _ = std::fs::remove_dir_all(root); + } + + #[tokio::test] + async fn runtime_rate_limits_auth_failures() { + let root = temp_root("runtime-auth-failure-rate-limit"); + let _ = std::fs::remove_dir_all(&root); + let runtime = TangleRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let auth_event = tangle_v2_event(FixtureKey::Member, 1_714_124_433, 22_242, Vec::new(), "") + .expect("auth event"); + let key = + TangleRateLimitKey::auth_failure(None, Some(auth_event.unsigned().pubkey().clone())); + let rule = runtime.config().rate_limits().auth().failures(); + for _ in 0..rule.max_hits() { + runtime + .rate_limiter() + .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); + } + let handle = TangleRuntimeHandle::new(runtime); + let mut auth = handle.auth_state().await.expect("auth"); + + assert_eq!( + handle + .handle_client_message( + ClientMessage::Auth(auth_event.clone()), + &mut auth, + UnixTimestamp::new(1_714_124_433) + ) + .await + .expect("auth"), + vec![RelayMessage::Ok { + event_id: auth_event.id().clone(), + accepted: false, + message: "rate-limited: auth failure rate limit exceeded until 1714124733" + .to_owned() + }] + ); + + let _ = std::fs::remove_dir_all(root); + } + + #[tokio::test] async fn runtime_publishes_generated_group_event_offsets_for_live_fanout() { let root = temp_root("runtime-generated-offset-fanout"); let _ = std::fs::remove_dir_all(&root); @@ -638,6 +894,16 @@ mod tests { "max_content_length": 65536, "broadcast_channel_capacity": 16, "per_connection_outbound_queue": per_connection_outbound_queue + }, + "rate_limits": { + "auth": { + "per_pubkey": {"window_seconds": 60, "max_hits": 30}, + "failures": {"window_seconds": 300, "max_hits": 5} + }, + "event": { + "per_pubkey": {"window_seconds": 60, "max_hits": 120}, + "per_kind": {"window_seconds": 60, "max_hits": 1000} + } } }) .to_string(); diff --git a/crates/tangle_runtime/src/server.rs b/crates/tangle_runtime/src/server.rs @@ -491,6 +491,16 @@ mod tests { "max_content_length": 65536, "broadcast_channel_capacity": 8, "per_connection_outbound_queue": 8 + }, + "rate_limits": { + "auth": { + "per_pubkey": {"window_seconds": 60, "max_hits": 30}, + "failures": {"window_seconds": 300, "max_hits": 5} + }, + "event": { + "per_pubkey": {"window_seconds": 60, "max_hits": 120}, + "per_kind": {"window_seconds": 60, "max_hits": 1000} + } } }) .to_string(); diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs @@ -599,6 +599,16 @@ mod tests { "max_content_length": 65536, "broadcast_channel_capacity": per_connection_outbound_queue, "per_connection_outbound_queue": per_connection_outbound_queue + }, + "rate_limits": { + "auth": { + "per_pubkey": {"window_seconds": 60, "max_hits": 30}, + "failures": {"window_seconds": 300, "max_hits": 5} + }, + "event": { + "per_pubkey": {"window_seconds": 60, "max_hits": 120}, + "per_kind": {"window_seconds": 60, "max_hits": 1000} + } } }) .to_string(); diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs @@ -569,6 +569,16 @@ fn runtime_config(root: &Path, listen_addr: SocketAddr) -> BaseRelayRuntimeConfi "max_content_length": 65536, "broadcast_channel_capacity": 8, "per_connection_outbound_queue": 8 + }, + "rate_limits": { + "auth": { + "per_pubkey": {"window_seconds": 60, "max_hits": 30}, + "failures": {"window_seconds": 300, "max_hits": 5} + }, + "event": { + "per_pubkey": {"window_seconds": 60, "max_hits": 120}, + "per_kind": {"window_seconds": 60, "max_hits": 1000} + } } }) .to_string();