tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 66805d65a6c30d059378d58555c0c8fd471b205d
parent c3d7fdaf03550772d1ea24a4a12624e0a169dfe9
Author: triesap <tyson@radroots.org>
Date:   Sun, 14 Jun 2026 07:10:12 -0700

limits: enforce group write rate checks

- Add required group-write rate-limit config for pubkey, group id, event kind, and join-flow buckets.
- Apply group write and join rate checks before group storage/projection side effects.
- Update runtime fixtures, production example config, and focused tests for group rate-limit enforcement.
- Validated with cargo fmt --all -- --check, cargo check --workspace --all-targets, cargo test --workspace, and cargo clippy --workspace --all-targets -- -D warnings.

Diffstat:
Mcrates/tangle/tests/version.rs | 6++++++
Mcrates/tangle_runtime/src/config.rs | 63+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
Mcrates/tangle_runtime/src/rate_limits.rs | 68+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----
Mcrates/tangle_runtime/src/runtime.rs | 243++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
Mcrates/tangle_runtime/src/server.rs | 6++++++
Mcrates/tangle_runtime/src/session.rs | 6++++++
Mcrates/tangle_runtime/tests/phase2_acceptance_targets.rs | 6++++++
7 files changed, 388 insertions(+), 10 deletions(-)

diff --git a/crates/tangle/tests/version.rs b/crates/tangle/tests/version.rs @@ -145,6 +145,12 @@ fn tangle_run_starts_server_and_stays_alive_until_shutdown() { "event": { "per_pubkey": {"window_seconds": 60, "max_hits": 120}, "per_kind": {"window_seconds": 60, "max_hits": 1000} + }, + "group": { + "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, + "write_per_group": {"window_seconds": 60, "max_hits": 90}, + "write_per_kind": {"window_seconds": 60, "max_hits": 300}, + "join_flow": {"window_seconds": 300, "max_hits": 10} } } }) diff --git a/crates/tangle_runtime/src/config.rs b/crates/tangle_runtime/src/config.rs @@ -3,8 +3,8 @@ use crate::{ errors::BaseRelayError, rate_limits::{ - TangleAuthRateLimitConfig, TangleEventRateLimitConfig, TangleRateLimitConfig, - TangleRateLimitRule, + TangleAuthRateLimitConfig, TangleEventRateLimitConfig, TangleGroupRateLimitConfig, + TangleRateLimitConfig, TangleRateLimitRule, }, relay::{ auth::BaseAuthState, @@ -345,6 +345,7 @@ struct BaseRelayRuntimeLimitsDocument { struct BaseRelayRateLimitsDocument { auth: BaseRelayAuthRateLimitsDocument, event: BaseRelayEventRateLimitsDocument, + group: BaseRelayGroupRateLimitsDocument, } #[derive(Debug, Deserialize)] @@ -361,6 +362,15 @@ struct BaseRelayEventRateLimitsDocument { per_kind: BaseRelayRateLimitRuleDocument, } +#[derive(Debug, Deserialize)] +#[serde(deny_unknown_fields)] +struct BaseRelayGroupRateLimitsDocument { + write_per_pubkey: BaseRelayRateLimitRuleDocument, + write_per_group: BaseRelayRateLimitRuleDocument, + write_per_kind: BaseRelayRateLimitRuleDocument, + join_flow: BaseRelayRateLimitRuleDocument, +} + #[derive(Debug, Clone, Copy, Deserialize)] #[serde(deny_unknown_fields)] struct BaseRelayRateLimitRuleDocument { @@ -484,6 +494,24 @@ fn base_relay_rate_limits_from_document( document.event.per_kind, )?, ), + TangleGroupRateLimitConfig::new( + base_relay_rate_limit_rule_from_document( + "rate_limits.group.write_per_pubkey", + document.group.write_per_pubkey, + )?, + base_relay_rate_limit_rule_from_document( + "rate_limits.group.write_per_group", + document.group.write_per_group, + )?, + base_relay_rate_limit_rule_from_document( + "rate_limits.group.write_per_kind", + document.group.write_per_kind, + )?, + base_relay_rate_limit_rule_from_document( + "rate_limits.group.join_flow", + document.group.join_flow, + )?, + ), )) } @@ -558,6 +586,19 @@ mod tests { assert_eq!(config.rate_limits().auth().failures().max_hits(), 5); assert_eq!(config.rate_limits().event().per_pubkey().max_hits(), 120); assert_eq!(config.rate_limits().event().per_kind().max_hits(), 1_000); + assert_eq!( + config.rate_limits().group().write_per_pubkey().max_hits(), + 60 + ); + assert_eq!( + config.rate_limits().group().write_per_group().max_hits(), + 90 + ); + assert_eq!( + config.rate_limits().group().write_per_kind().max_hits(), + 300 + ); + assert_eq!(config.rate_limits().group().join_flow().max_hits(), 10); assert!(config.tracing().enabled()); assert_eq!(config.tracing().format(), BaseRelayTracingFormat::Json); config.auth_state().expect("auth"); @@ -604,6 +645,12 @@ mod tests { "event": { "per_pubkey": {"window_seconds": 60, "max_hits": 120}, "per_kind": {"window_seconds": 60, "max_hits": 1000} + }, + "group": { + "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, + "write_per_group": {"window_seconds": 60, "max_hits": 90}, + "write_per_kind": {"window_seconds": 60, "max_hits": 300}, + "join_flow": {"window_seconds": 300, "max_hits": 10} } } }"#; @@ -657,6 +704,12 @@ mod tests { "event": { "per_pubkey": {"window_seconds": 60, "max_hits": 120}, "per_kind": {"window_seconds": 60, "max_hits": 1000} + }, + "group": { + "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, + "write_per_group": {"window_seconds": 60, "max_hits": 90}, + "write_per_kind": {"window_seconds": 60, "max_hits": 300}, + "join_flow": {"window_seconds": 300, "max_hits": 10} } }, "ignored": true @@ -708,6 +761,12 @@ mod tests { "event": { "per_pubkey": {"window_seconds": 60, "max_hits": 120}, "per_kind": {"window_seconds": 60, "max_hits": 1000} + }, + "group": { + "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, + "write_per_group": {"window_seconds": 60, "max_hits": 90}, + "write_per_kind": {"window_seconds": 60, "max_hits": 300}, + "join_flow": {"window_seconds": 300, "max_hits": 10} } } }"#; diff --git a/crates/tangle_runtime/src/rate_limits.rs b/crates/tangle_runtime/src/rate_limits.rs @@ -76,11 +76,16 @@ impl TangleRateLimitKey { pub struct TangleRateLimitConfig { auth: TangleAuthRateLimitConfig, event: TangleEventRateLimitConfig, + group: TangleGroupRateLimitConfig, } impl TangleRateLimitConfig { - pub fn new(auth: TangleAuthRateLimitConfig, event: TangleEventRateLimitConfig) -> Self { - Self { auth, event } + pub fn new( + auth: TangleAuthRateLimitConfig, + event: TangleEventRateLimitConfig, + group: TangleGroupRateLimitConfig, + ) -> Self { + Self { auth, event, group } } pub fn auth(self) -> TangleAuthRateLimitConfig { @@ -90,6 +95,10 @@ impl TangleRateLimitConfig { pub fn event(self) -> TangleEventRateLimitConfig { self.event } + + pub fn group(self) -> TangleGroupRateLimitConfig { + self.group + } } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -139,6 +148,46 @@ impl TangleEventRateLimitConfig { } #[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct TangleGroupRateLimitConfig { + write_per_pubkey: TangleRateLimitRule, + write_per_group: TangleRateLimitRule, + write_per_kind: TangleRateLimitRule, + join_flow: TangleRateLimitRule, +} + +impl TangleGroupRateLimitConfig { + pub fn new( + write_per_pubkey: TangleRateLimitRule, + write_per_group: TangleRateLimitRule, + write_per_kind: TangleRateLimitRule, + join_flow: TangleRateLimitRule, + ) -> Self { + Self { + write_per_pubkey, + write_per_group, + write_per_kind, + join_flow, + } + } + + pub fn write_per_pubkey(self) -> TangleRateLimitRule { + self.write_per_pubkey + } + + pub fn write_per_group(self) -> TangleRateLimitRule { + self.write_per_group + } + + pub fn write_per_kind(self) -> TangleRateLimitRule { + self.write_per_kind + } + + pub fn join_flow(self) -> TangleRateLimitRule { + self.join_flow + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct TangleRateLimitRule { window_seconds: u64, max_hits: u64, @@ -289,9 +338,9 @@ fn reset_at(rule: TangleRateLimitRule, now: UnixTimestamp) -> UnixTimestamp { #[cfg(test)] mod tests { use super::{ - TangleAuthRateLimitConfig, TangleEventRateLimitConfig, TangleRateLimitConfig, - TangleRateLimitDecision, TangleRateLimitKey, TangleRateLimitRule, TangleRateLimitScope, - TangleRateLimiter, + TangleAuthRateLimitConfig, TangleEventRateLimitConfig, TangleGroupRateLimitConfig, + TangleRateLimitConfig, TangleRateLimitDecision, TangleRateLimitKey, TangleRateLimitRule, + TangleRateLimitScope, TangleRateLimiter, }; use std::net::{IpAddr, Ipv4Addr}; use tangle_groups::GroupId; @@ -452,14 +501,23 @@ mod tests { let auth_failures = TangleRateLimitRule::new(300, 3).expect("auth failures"); let event_pubkey = TangleRateLimitRule::new(60, 4).expect("event pubkey"); let event_kind = TangleRateLimitRule::new(60, 5).expect("event kind"); + let group_pubkey = TangleRateLimitRule::new(60, 6).expect("group pubkey"); + let group_write = TangleRateLimitRule::new(60, 7).expect("group write"); + let group_kind = TangleRateLimitRule::new(60, 8).expect("group kind"); + let group_join = TangleRateLimitRule::new(300, 9).expect("group join"); let config = TangleRateLimitConfig::new( TangleAuthRateLimitConfig::new(auth_pubkey, auth_failures), TangleEventRateLimitConfig::new(event_pubkey, event_kind), + TangleGroupRateLimitConfig::new(group_pubkey, group_write, group_kind, group_join), ); assert_eq!(config.auth().per_pubkey(), auth_pubkey); assert_eq!(config.auth().failures(), auth_failures); assert_eq!(config.event().per_pubkey(), event_pubkey); assert_eq!(config.event().per_kind(), event_kind); + assert_eq!(config.group().write_per_pubkey(), group_pubkey); + assert_eq!(config.group().write_per_group(), group_write); + assert_eq!(config.group().write_per_kind(), group_kind); + assert_eq!(config.group().join_flow(), group_join); } } diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs @@ -23,7 +23,7 @@ use std::{ }, time::Instant, }; -use tangle_groups::StoreOffset; +use tangle_groups::{KIND_GROUP_JOIN_REQUEST, StoreOffset, validate_client_group_event_structure}; use tangle_protocol::{ClientMessage, Event, Filter, RelayMessage, SubscriptionId, UnixTimestamp}; use tokio::sync::{Mutex, watch}; @@ -149,6 +149,55 @@ impl TangleRuntime { ) } + fn rate_limit_group_write(&self, event: &Event, now: UnixTimestamp) -> Option<RelayMessage> { + if !self.config.groups().enabled() { + return None; + } + let class = + validate_client_group_event_structure(event, self.config.groups().limits()).ok()?; + let group_id = class.group_id()?.clone(); + let rules = self.config.rate_limits().group(); + if event.unsigned().kind().as_u32() == KIND_GROUP_JOIN_REQUEST + && let Some(message) = self.rate_limit_ok( + event, + TangleRateLimitKey::join_flow(group_id.clone(), event.unsigned().pubkey().clone()), + rules.join_flow(), + "group join", + now, + ) + { + return Some(message); + } + if let Some(message) = self.rate_limit_ok( + event, + TangleRateLimitKey::pubkey( + TangleRateLimitScope::GroupWrite, + event.unsigned().pubkey().clone(), + ), + rules.write_per_pubkey(), + "group pubkey", + now, + ) { + return Some(message); + } + if let Some(message) = self.rate_limit_ok( + event, + TangleRateLimitKey::group(TangleRateLimitScope::GroupWrite, group_id), + rules.write_per_group(), + "group write", + now, + ) { + return Some(message); + } + self.rate_limit_ok( + event, + TangleRateLimitKey::kind(TangleRateLimitScope::GroupWrite, event.unsigned().kind()), + rules.write_per_kind(), + "group kind", + now, + ) + } + fn rate_limit_ok( &self, event: &Event, @@ -199,6 +248,9 @@ impl TangleRuntimeHandle { if let Some(message) = runtime.rate_limit_event(&event, now) { return Ok(vec![message]); } + if let Some(message) = runtime.rate_limit_group_write(&event, now) { + return Ok(vec![message]); + } let result = runtime .relay_mut() .handle_event_with_auth_report(event, auth)?; @@ -472,9 +524,12 @@ mod tests { use crate::relay::live::LiveSubscriptionSet; use serde_json::json; use std::path::{Path, PathBuf}; - use tangle_groups::{GroupAuthContext, KIND_GROUP_ADMINS, KIND_GROUP_METADATA, StoreOffset}; + use tangle_groups::{ + GroupAuthContext, GroupId, KIND_GROUP_ADMINS, KIND_GROUP_JOIN_REQUEST, KIND_GROUP_METADATA, + StoreOffset, + }; use tangle_protocol::{ - ClientMessage, RelayMessage, SubscriptionId, UnixTimestamp, filter_from_value, + ClientMessage, RelayMessage, SubscriptionId, Tag, UnixTimestamp, filter_from_value, }; use tangle_test_support::{ FixtureKey, tangle_v2_auth_event, tangle_v2_event, tangle_v2_group_create_event, @@ -773,6 +828,182 @@ mod tests { } #[tokio::test] + async fn runtime_rate_limits_group_writes_by_pubkey() { + let root = temp_root("runtime-group-pubkey-rate-limit"); + let _ = std::fs::remove_dir_all(&root); + let runtime = TangleRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let event = tangle_v2_event( + FixtureKey::Member, + 1_714_124_433, + 1, + vec![Tag::from_parts("h", &["Farm"]).expect("h")], + "limited", + ) + .expect("event"); + let rule = runtime.config().rate_limits().group().write_per_pubkey(); + let key = TangleRateLimitKey::pubkey( + TangleRateLimitScope::GroupWrite, + event.unsigned().pubkey().clone(), + ); + for _ in 0..rule.max_hits() { + runtime + .rate_limiter() + .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); + } + let handle = TangleRuntimeHandle::new(runtime); + let mut auth = handle.auth_state().await.expect("auth"); + + assert_eq!( + handle + .handle_client_message( + ClientMessage::Event(event.clone()), + &mut auth, + UnixTimestamp::new(1_714_124_433) + ) + .await + .expect("event"), + vec![RelayMessage::Ok { + event_id: event.id().clone(), + accepted: false, + message: "rate-limited: group pubkey rate limit exceeded until 1714124493" + .to_owned() + }] + ); + + let _ = std::fs::remove_dir_all(root); + } + + #[tokio::test] + async fn runtime_rate_limits_group_writes_by_group_id() { + let root = temp_root("runtime-group-write-rate-limit"); + let _ = std::fs::remove_dir_all(&root); + let runtime = TangleRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let group_id = GroupId::new("Farm").expect("group"); + let event = tangle_v2_event( + FixtureKey::Member, + 1_714_124_433, + 1, + vec![Tag::from_parts("h", &[group_id.as_str()]).expect("h")], + "limited", + ) + .expect("event"); + let rule = runtime.config().rate_limits().group().write_per_group(); + let key = TangleRateLimitKey::group(TangleRateLimitScope::GroupWrite, group_id); + for _ in 0..rule.max_hits() { + runtime + .rate_limiter() + .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); + } + let handle = TangleRuntimeHandle::new(runtime); + let mut auth = handle.auth_state().await.expect("auth"); + + assert_eq!( + handle + .handle_client_message( + ClientMessage::Event(event.clone()), + &mut auth, + UnixTimestamp::new(1_714_124_433) + ) + .await + .expect("event"), + vec![RelayMessage::Ok { + event_id: event.id().clone(), + accepted: false, + message: "rate-limited: group write rate limit exceeded until 1714124493" + .to_owned() + }] + ); + + let _ = std::fs::remove_dir_all(root); + } + + #[tokio::test] + async fn runtime_rate_limits_group_writes_by_kind() { + let root = temp_root("runtime-group-kind-rate-limit"); + let _ = std::fs::remove_dir_all(&root); + let runtime = TangleRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let event = tangle_v2_event( + FixtureKey::Member, + 1_714_124_433, + 1, + vec![Tag::from_parts("h", &["Farm"]).expect("h")], + "limited", + ) + .expect("event"); + let rule = runtime.config().rate_limits().group().write_per_kind(); + let key = + TangleRateLimitKey::kind(TangleRateLimitScope::GroupWrite, event.unsigned().kind()); + for _ in 0..rule.max_hits() { + runtime + .rate_limiter() + .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); + } + let handle = TangleRuntimeHandle::new(runtime); + let mut auth = handle.auth_state().await.expect("auth"); + + assert_eq!( + handle + .handle_client_message( + ClientMessage::Event(event.clone()), + &mut auth, + UnixTimestamp::new(1_714_124_433) + ) + .await + .expect("event"), + vec![RelayMessage::Ok { + event_id: event.id().clone(), + accepted: false, + message: "rate-limited: group kind rate limit exceeded until 1714124493".to_owned() + }] + ); + + let _ = std::fs::remove_dir_all(root); + } + + #[tokio::test] + async fn runtime_rate_limits_group_join_flows() { + let root = temp_root("runtime-group-join-rate-limit"); + let _ = std::fs::remove_dir_all(&root); + let runtime = TangleRuntime::open(runtime_config(&root, 8)).expect("runtime"); + let group_id = GroupId::new("Farm").expect("group"); + let event = tangle_v2_event( + FixtureKey::Member, + 1_714_124_433, + KIND_GROUP_JOIN_REQUEST.into(), + vec![Tag::from_parts("h", &[group_id.as_str()]).expect("h")], + "", + ) + .expect("event"); + let rule = runtime.config().rate_limits().group().join_flow(); + let key = TangleRateLimitKey::join_flow(group_id, event.unsigned().pubkey().clone()); + for _ in 0..rule.max_hits() { + runtime + .rate_limiter() + .record(key.clone(), rule, UnixTimestamp::new(1_714_124_433)); + } + let handle = TangleRuntimeHandle::new(runtime); + let mut auth = handle.auth_state().await.expect("auth"); + + assert_eq!( + handle + .handle_client_message( + ClientMessage::Event(event.clone()), + &mut auth, + UnixTimestamp::new(1_714_124_433) + ) + .await + .expect("event"), + vec![RelayMessage::Ok { + event_id: event.id().clone(), + accepted: false, + message: "rate-limited: group join rate limit exceeded until 1714124733".to_owned() + }] + ); + + let _ = std::fs::remove_dir_all(root); + } + + #[tokio::test] async fn runtime_publishes_generated_group_event_offsets_for_live_fanout() { let root = temp_root("runtime-generated-offset-fanout"); let _ = std::fs::remove_dir_all(&root); @@ -903,6 +1134,12 @@ mod tests { "event": { "per_pubkey": {"window_seconds": 60, "max_hits": 120}, "per_kind": {"window_seconds": 60, "max_hits": 1000} + }, + "group": { + "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, + "write_per_group": {"window_seconds": 60, "max_hits": 90}, + "write_per_kind": {"window_seconds": 60, "max_hits": 300}, + "join_flow": {"window_seconds": 300, "max_hits": 10} } } }) diff --git a/crates/tangle_runtime/src/server.rs b/crates/tangle_runtime/src/server.rs @@ -500,6 +500,12 @@ mod tests { "event": { "per_pubkey": {"window_seconds": 60, "max_hits": 120}, "per_kind": {"window_seconds": 60, "max_hits": 1000} + }, + "group": { + "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, + "write_per_group": {"window_seconds": 60, "max_hits": 90}, + "write_per_kind": {"window_seconds": 60, "max_hits": 300}, + "join_flow": {"window_seconds": 300, "max_hits": 10} } } }) diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs @@ -608,6 +608,12 @@ mod tests { "event": { "per_pubkey": {"window_seconds": 60, "max_hits": 120}, "per_kind": {"window_seconds": 60, "max_hits": 1000} + }, + "group": { + "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, + "write_per_group": {"window_seconds": 60, "max_hits": 90}, + "write_per_kind": {"window_seconds": 60, "max_hits": 300}, + "join_flow": {"window_seconds": 300, "max_hits": 10} } } }) diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs @@ -578,6 +578,12 @@ fn runtime_config(root: &Path, listen_addr: SocketAddr) -> BaseRelayRuntimeConfi "event": { "per_pubkey": {"window_seconds": 60, "max_hits": 120}, "per_kind": {"window_seconds": 60, "max_hits": 1000} + }, + "group": { + "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, + "write_per_group": {"window_seconds": 60, "max_hits": 90}, + "write_per_kind": {"window_seconds": 60, "max_hits": 300}, + "join_flow": {"window_seconds": 300, "max_hits": 10} } } })