tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit cbdf1221f373efe8af72f2ae98d2bf1cb0fe2882
parent ec2cbe828fea3aeeda6cc8c49eb102ca3aef08af
Author: triesap <tyson@radroots.org>
Date:   Sat, 13 Jun 2026 18:19:46 -0700

feat: gate group reads in base relay

- apply group read gate to REQ queries

- apply group read gate to COUNT queries

- store subscription auth for live fanout

- cover private group read isolation

Diffstat:
Mcrates/tangle_runtime/src/base_relay.rs | 264++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------
1 file changed, 243 insertions(+), 21 deletions(-)

diff --git a/crates/tangle_runtime/src/base_relay.rs b/crates/tangle_runtime/src/base_relay.rs @@ -12,12 +12,12 @@ use tangle_crypto::{RelaySigner, verify_event_signature}; use tangle_groups::{ GroupAuthContext, GroupAuthority, GroupError, GroupEventClass, GroupGeneratedEventBuilder, GroupId, GroupLimitsConfig, GroupOutbox, GroupOutboxEffect, GroupOutboxKey, GroupOutboxPayload, - GroupOutboxRecord, GroupProjection, GroupRuntimeConfig, GroupState, GroupTombstone, - KIND_GROUP_CREATE_GROUP, KIND_GROUP_EDIT_METADATA, KIND_GROUP_JOIN_REQUEST, - KIND_GROUP_LEAVE_REQUEST, KIND_GROUP_MEMBERS, KIND_GROUP_PUT_USER, KIND_GROUP_REMOVE_USER, - MemberState, ProjectedRoleDefinition, ProjectionCheckpoint, StoreOffset, group_current_key, - member_current_key, projection_checkpoint_key, role_current_key, tombstone_key, - validate_client_group_event_structure, + GroupOutboxRecord, GroupProjection, GroupReadDecision, GroupReadGate, GroupRuntimeConfig, + GroupState, GroupTombstone, KIND_GROUP_CREATE_GROUP, KIND_GROUP_EDIT_METADATA, + KIND_GROUP_JOIN_REQUEST, KIND_GROUP_LEAVE_REQUEST, KIND_GROUP_MEMBERS, KIND_GROUP_PUT_USER, + KIND_GROUP_REMOVE_USER, MemberState, ProjectedRoleDefinition, ProjectionCheckpoint, + StoreOffset, group_current_key, member_current_key, projection_checkpoint_key, + role_current_key, tombstone_key, validate_client_group_event_structure, }; use tangle_nips::parse_relay_auth_event; use tangle_protocol::{ @@ -310,12 +310,12 @@ impl BaseRelay { ClientMessage::Req { subscription_id, filters, - } => self.handle_req(subscription_id, filters), + } => self.handle_req_with_auth(subscription_id, filters, auth), ClientMessage::Count { subscription_id, filters, } => self - .handle_count(subscription_id, filters) + .handle_count_with_auth(subscription_id, filters, auth) .map(|message| vec![message]), ClientMessage::Close(subscription_id) => { self.handle_close(&subscription_id); @@ -421,10 +421,36 @@ impl BaseRelay { subscription_id: SubscriptionId, filters: Vec<Filter>, ) -> Result<Vec<RelayMessage>, BaseRelayError> { + self.handle_req_with_group_auth( + subscription_id, + filters, + &GroupAuthContext::unauthenticated(), + ) + } + + pub fn handle_req_with_auth( + &mut self, + subscription_id: SubscriptionId, + filters: Vec<Filter>, + auth: &BaseAuthState, + ) -> Result<Vec<RelayMessage>, BaseRelayError> { + self.handle_req_with_group_auth( + subscription_id, + filters, + &GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()), + ) + } + + fn handle_req_with_group_auth( + &mut self, + subscription_id: SubscriptionId, + filters: Vec<Filter>, + auth: &GroupAuthContext, + ) -> Result<Vec<RelayMessage>, BaseRelayError> { self.subscriptions - .subscribe(subscription_id.clone(), filters.clone())?; + .subscribe(subscription_id.clone(), filters.clone(), auth.clone())?; let mut messages = self - .query_events(&filters)? + .query_events(&filters, auth)? .into_iter() .map(|event| RelayMessage::Event { subscription_id: subscription_id.clone(), @@ -440,9 +466,35 @@ impl BaseRelay { subscription_id: SubscriptionId, filters: Vec<Filter>, ) -> Result<RelayMessage, BaseRelayError> { + self.handle_count_with_group_auth( + subscription_id, + filters, + &GroupAuthContext::unauthenticated(), + ) + } + + pub fn handle_count_with_auth( + &self, + subscription_id: SubscriptionId, + filters: Vec<Filter>, + auth: &BaseAuthState, + ) -> Result<RelayMessage, BaseRelayError> { + self.handle_count_with_group_auth( + subscription_id, + filters, + &GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()), + ) + } + + fn handle_count_with_group_auth( + &self, + subscription_id: SubscriptionId, + filters: Vec<Filter>, + auth: &GroupAuthContext, + ) -> Result<RelayMessage, BaseRelayError> { Ok(RelayMessage::Count { subscription_id, - count: self.query_events(&filters)?.len() as u64, + count: self.query_events(&filters, auth)?.len() as u64, }) } @@ -451,7 +503,7 @@ impl BaseRelay { } pub fn fanout(&mut self, event: &Event) -> Vec<RelayMessage> { - self.subscriptions.fanout(event) + self.subscriptions.fanout(event, self.groups.as_ref()) } pub fn mark_delivered(&mut self, subscription_id: &SubscriptionId) { @@ -462,20 +514,36 @@ impl BaseRelay { self.subscriptions.active_count() } - fn query_events(&self, filters: &[Filter]) -> Result<Vec<Event>, BaseRelayError> { + fn query_events( + &self, + filters: &[Filter], + auth: &GroupAuthContext, + ) -> Result<Vec<Event>, BaseRelayError> { let mut seen = BTreeSet::new(); let mut output = Vec::new(); for filter in filters { let pocket_filter = tangle_filter_to_pocket(filter)?; for pocket_event in self.store.find_events(&pocket_filter)? { let event = pocket_event_to_tangle(&pocket_event)?; - if seen.insert(event.id().clone()) { + if seen.insert(event.id().clone()) && self.event_visible_to_auth(&event, auth)? { output.push(event); } } } Ok(output) } + + fn event_visible_to_auth( + &self, + event: &Event, + auth: &GroupAuthContext, + ) -> Result<bool, BaseRelayError> { + self.groups + .as_ref() + .map(|groups| groups.event_visible_to_auth(event, auth)) + .unwrap_or(Ok(true)) + .map_err(BaseRelayError::from) + } } struct GroupService { @@ -532,6 +600,25 @@ impl GroupService { .map(|_| ()) } + fn event_visible_to_auth( + &self, + event: &Event, + auth: &GroupAuthContext, + ) -> Result<bool, GroupError> { + let gate = GroupReadGate::new(&self.projection, &self.authority); + if auth.authenticated_pubkeys().is_empty() { + return gate + .screen_event(event, None, self.limits) + .map(|decision| decision == GroupReadDecision::Visible); + } + for pubkey in auth.authenticated_pubkeys() { + if gate.screen_event(event, Some(pubkey), self.limits)? == GroupReadDecision::Visible { + return Ok(true); + } + } + Ok(false) + } + fn after_source_event_stored( &mut self, store: &PocketStoreHandle, @@ -846,11 +933,17 @@ fn class_group_id(class: &GroupEventClass) -> Option<&GroupId> { #[derive(Debug, Clone, PartialEq, Eq)] pub struct LiveSubscriptionSet { - subscriptions: BTreeMap<SubscriptionId, Vec<Filter>>, + subscriptions: BTreeMap<SubscriptionId, LiveSubscription>, pending: BTreeMap<SubscriptionId, usize>, max_pending_events: usize, } +#[derive(Debug, Clone, PartialEq, Eq)] +struct LiveSubscription { + filters: Vec<Filter>, + auth: GroupAuthContext, +} + impl LiveSubscriptionSet { pub fn new(max_pending_events: usize) -> Result<Self, BaseRelayError> { if max_pending_events == 0 { @@ -869,13 +962,15 @@ impl LiveSubscriptionSet { &mut self, subscription_id: SubscriptionId, filters: Vec<Filter>, + auth: GroupAuthContext, ) -> Result<(), BaseRelayError> { if filters.is_empty() { return Err(BaseRelayError::invalid( "subscription must include at least one filter", )); } - self.subscriptions.insert(subscription_id.clone(), filters); + self.subscriptions + .insert(subscription_id.clone(), LiveSubscription { filters, auth }); self.pending.insert(subscription_id, 0); Ok(()) } @@ -889,15 +984,30 @@ impl LiveSubscriptionSet { } } - pub fn fanout(&mut self, event: &Event) -> Vec<RelayMessage> { + fn fanout(&mut self, event: &Event, groups: Option<&GroupService>) -> Vec<RelayMessage> { let matched = self .subscriptions .iter() - .filter_map(|(subscription_id, filters)| { - filters + .filter_map(|(subscription_id, subscription)| { + if !subscription + .filters .iter() .any(|filter| filter.matches(event)) - .then(|| subscription_id.clone()) + { + return None; + } + if groups + .map(|groups| { + groups + .event_visible_to_auth(event, &subscription.auth) + .unwrap_or(false) + }) + .unwrap_or(true) + { + Some(subscription_id.clone()) + } else { + None + } }) .collect::<Vec<_>>(); let mut messages = Vec::new(); @@ -1398,6 +1508,89 @@ mod tests { } #[test] + fn private_group_req_and_count_use_reader_auth() { + let owner = signer(7).public_key().clone(); + let auth = authenticated_state(7); + let mut relay = test_relay_with_groups( + "base-relay-private-read", + 4, + &enabled_groups_for_owner(&owner), + ); + relay + .handle_event_with_auth(signed_private_group_create_event(7, "Farm"), &auth) + .expect("create"); + let private_event = signed_event_at( + 7, + 1, + vec![Tag::from_parts("h", &["Farm"]).expect("h")], + "private harvest", + 1_714_124_435, + ); + relay + .handle_event_with_auth(private_event.clone(), &auth) + .expect("private event"); + + let unauth_sub = SubscriptionId::new("private-unauth").expect("sub"); + let auth_sub = SubscriptionId::new("private-auth").expect("sub"); + assert_eq!( + relay + .handle_req(unauth_sub.clone(), vec![filter_kind(1)]) + .expect("unauth req"), + vec![RelayMessage::Eose(unauth_sub)] + ); + assert!(matches!( + relay + .handle_req_with_auth(auth_sub.clone(), vec![filter_kind(1)], &auth) + .expect("auth req") + .as_slice(), + [RelayMessage::Event { subscription_id, event }, RelayMessage::Eose(eose)] + if subscription_id == &auth_sub && event.id() == private_event.id() && eose == &auth_sub + )); + assert_eq!(count_kind(&relay, 1), 0); + assert_eq!(count_kind_with_auth(&relay, 1, &auth), 1); + assert_eq!(count_kind(&relay, KIND_GROUP_METADATA), 0); + assert_eq!(count_kind_with_auth(&relay, KIND_GROUP_METADATA, &auth), 1); + } + + #[test] + fn private_group_live_fanout_uses_subscription_auth() { + let owner = signer(7).public_key().clone(); + let auth = authenticated_state(7); + let mut relay = test_relay_with_groups( + "base-relay-private-fanout", + 4, + &enabled_groups_for_owner(&owner), + ); + relay + .handle_event_with_auth(signed_private_group_create_event(7, "Farm"), &auth) + .expect("create"); + let unauth_sub = SubscriptionId::new("fanout-unauth").expect("sub"); + let auth_sub = SubscriptionId::new("fanout-auth").expect("sub"); + relay + .handle_req(unauth_sub, vec![filter_kind(1)]) + .expect("unauth sub"); + relay + .handle_req_with_auth(auth_sub.clone(), vec![filter_kind(1)], &auth) + .expect("auth sub"); + let private_event = signed_event_at( + 7, + 1, + vec![Tag::from_parts("h", &["Farm"]).expect("h")], + "private harvest", + 1_714_124_435, + ); + relay + .handle_event_with_auth(private_event.clone(), &auth) + .expect("private event"); + + assert!(matches!( + relay.fanout(&private_event).as_slice(), + [RelayMessage::Event { subscription_id, event }] + if subscription_id == &auth_sub && event.id() == private_event.id() + )); + } + + #[test] fn live_subscription_lag_closes_subscription_for_resync() { let mut relay = test_relay("base-relay-lag", 1); let subscription_id = SubscriptionId::new("sub-lag").expect("sub"); @@ -1542,6 +1735,20 @@ mod tests { ) } + fn signed_private_group_create_event(secret_byte: u8, group_id: &str) -> Event { + signed_event_at( + secret_byte, + KIND_GROUP_CREATE_GROUP.into(), + vec![ + Tag::from_parts("h", &[group_id]).expect("h"), + Tag::from_parts("name", &[group_id]).expect("name"), + Tag::from_parts("private", &[]).expect("private"), + ], + "", + 1_714_124_433, + ) + } + fn signed_event_at( secret_byte: u8, kind: u64, @@ -1573,7 +1780,7 @@ mod tests { fn count_kind(relay: &BaseRelay, kind: u32) -> u64 { let subscription_id = SubscriptionId::new(&format!("count-{kind}")).expect("sub"); - let filter = filter_from_value(&serde_json::json!({"kinds":[kind]})).expect("filter"); + let filter = filter_kind(kind); match relay .handle_count(subscription_id, vec![filter]) .expect("count") @@ -1583,6 +1790,21 @@ mod tests { } } + fn count_kind_with_auth(relay: &BaseRelay, kind: u32, auth: &BaseAuthState) -> u64 { + let subscription_id = SubscriptionId::new(&format!("count-auth-{kind}")).expect("sub"); + match relay + .handle_count_with_auth(subscription_id, vec![filter_kind(kind)], auth) + .expect("count") + { + RelayMessage::Count { count, .. } => count, + _ => panic!("count response expected"), + } + } + + fn filter_kind(kind: u32) -> Filter { + filter_from_value(&serde_json::json!({"kinds":[kind]})).expect("filter") + } + fn signer(secret_byte: u8) -> RelaySigner { RelaySigner::from_secret_hex(&format!("{:02x}", secret_byte).repeat(32)).expect("signer") }