tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit ec2cbe828fea3aeeda6cc8c49eb102ca3aef08af
parent dcb86ddb87f94fd7cbf13266186c67049407f118
Author: triesap <tyson@radroots.org>
Date:   Sat, 13 Jun 2026 18:10:41 -0700

feat: wire group runtime generated events

- initialize enabled group service from config

- gate group EVENT writes with AUTH policy

- persist projection and durable outbox records

- materialize relay-signed group side effects

Diffstat:
MCargo.lock | 1+
Mcrates/tangle_groups/Cargo.toml | 1+
Mcrates/tangle_groups/src/lib.rs | 1+
Mcrates/tangle_groups/src/policy.rs | 8++++++++
Mcrates/tangle_groups/src/projection.rs | 11+++++++++++
Mcrates/tangle_groups/src/signing.rs | 404++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
Mcrates/tangle_runtime/src/base_relay.rs | 676++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
7 files changed, 1081 insertions(+), 21 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -4358,6 +4358,7 @@ version = "0.1.0" dependencies = [ "serde", "serde_json", + "tangle_crypto", "tangle_protocol", ] diff --git a/crates/tangle_groups/Cargo.toml b/crates/tangle_groups/Cargo.toml @@ -10,6 +10,7 @@ description = "NIP-29 group domain and configuration types for tangle" [dependencies] serde = { version = "1", features = ["derive"] } serde_json = "1" +tangle_crypto = { path = "../tangle_crypto" } tangle_protocol = { path = "../tangle_protocol" } [lints] diff --git a/crates/tangle_groups/src/lib.rs b/crates/tangle_groups/src/lib.rs @@ -51,6 +51,7 @@ pub use roles::{ Capability, CapabilitySet, PERMANENT_RELAY_OVERRIDE_ROLE, RoleDefinition, RoleName, resolve_capabilities, }; +pub use signing::GroupGeneratedEventBuilder; pub use tags::{GroupTag, GroupTagName, extract_group_tag, has_group_identity_tag}; pub use write_gate::{ GroupAuthContext, require_group_auth_as_author, validate_client_group_event_structure, diff --git a/crates/tangle_groups/src/policy.rs b/crates/tangle_groups/src/policy.rs @@ -35,6 +35,14 @@ impl GroupAuthority { self.owner_pubkeys.contains(pubkey) } + pub fn owner_pubkeys(&self) -> &BTreeSet<PublicKeyHex> { + &self.owner_pubkeys + } + + pub fn admin_pubkeys(&self) -> &BTreeSet<PublicKeyHex> { + &self.admin_pubkeys + } + pub fn is_admin(&self, pubkey: &PublicKeyHex) -> bool { self.admin_pubkeys.contains(pubkey) || self.is_owner(pubkey) } diff --git a/crates/tangle_groups/src/projection.rs b/crates/tangle_groups/src/projection.rs @@ -560,6 +560,17 @@ impl GroupProjection { } } + pub fn put_tombstone(&mut self, tombstone: GroupTombstone) { + if self + .tombstones + .get(tombstone.group_id()) + .is_none_or(|current| tombstone.last_tuple() >= current.last_tuple()) + { + self.tombstones + .insert(tombstone.group_id().clone(), tombstone); + } + } + pub fn apply_canonical_event( &mut self, event: &Event, diff --git a/crates/tangle_groups/src/signing.rs b/crates/tangle_groups/src/signing.rs @@ -1,2 +1,402 @@ -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct GroupSigningBoundary; +use std::collections::BTreeSet; + +use crate::{ + GroupAuthority, GroupError, GroupId, GroupMetadata, GroupOutboxPayload, GroupProjection, + GroupState, KIND_GROUP_ADMINS, KIND_GROUP_MEMBERS, KIND_GROUP_METADATA, KIND_GROUP_PUT_USER, + KIND_GROUP_REMOVE_USER, MemberStatus, RoleName, SupportedKinds, +}; +use tangle_crypto::RelaySigner; +use tangle_protocol::{Event, Kind, PublicKeyHex, Tag, UnixTimestamp, UnsignedEvent}; + +pub struct GroupGeneratedEventBuilder { + signer: RelaySigner, +} + +impl GroupGeneratedEventBuilder { + pub fn new(signer: RelaySigner) -> Self { + Self { signer } + } + + pub fn relay_pubkey(&self) -> &PublicKeyHex { + self.signer.public_key() + } + + pub fn metadata_snapshot_payload( + group: &GroupState, + created_at: UnixTimestamp, + ) -> Result<GroupOutboxPayload, GroupError> { + Ok(GroupOutboxPayload::new( + KIND_GROUP_METADATA, + created_at, + metadata_tags(group.id(), group.metadata())?, + "", + )) + } + + pub fn admin_list_snapshot_payload( + group_id: &GroupId, + projection: &GroupProjection, + authority: &GroupAuthority, + created_at: UnixTimestamp, + ) -> Result<GroupOutboxPayload, GroupError> { + let mut admins = BTreeSet::new(); + admins.extend(authority.owner_pubkeys().iter().cloned()); + admins.extend(authority.admin_pubkeys().iter().cloned()); + for ((candidate_group, pubkey), member) in projection.members() { + if candidate_group == group_id + && member.status() == MemberStatus::Member + && member + .roles() + .contains(&RoleName::permanent_relay_override()) + { + admins.insert(pubkey.clone()); + } + } + let mut tags = vec![tag_values(["d".to_owned(), group_id.as_str().to_owned()])]; + tags.extend( + admins + .into_iter() + .map(|pubkey| tag_values(["p".to_owned(), pubkey.as_str().to_owned()])), + ); + Ok(GroupOutboxPayload::new( + KIND_GROUP_ADMINS, + created_at, + tags, + "", + )) + } + + pub fn member_list_snapshot_payload( + group_id: &GroupId, + projection: &GroupProjection, + created_at: UnixTimestamp, + cap: u32, + ) -> Result<Option<GroupOutboxPayload>, GroupError> { + let mut members = projection + .members() + .iter() + .filter(|((candidate_group, _), member)| { + candidate_group == group_id && member.status() == MemberStatus::Member + }) + .map(|((_, pubkey), _)| pubkey.clone()) + .collect::<Vec<_>>(); + members.sort(); + if members.len() > usize::try_from(cap).expect("u32 fits in usize on supported targets") { + return Ok(None); + } + let mut tags = vec![tag_values(["d".to_owned(), group_id.as_str().to_owned()])]; + tags.extend( + members + .into_iter() + .map(|pubkey| tag_values(["p".to_owned(), pubkey.as_str().to_owned()])), + ); + Ok(Some(GroupOutboxPayload::new( + KIND_GROUP_MEMBERS, + created_at, + tags, + "", + ))) + } + + pub fn join_accepted_payload( + group_id: &GroupId, + target_pubkey: &PublicKeyHex, + created_at: UnixTimestamp, + ) -> GroupOutboxPayload { + membership_payload(KIND_GROUP_PUT_USER, group_id, target_pubkey, created_at) + } + + pub fn leave_accepted_payload( + group_id: &GroupId, + target_pubkey: &PublicKeyHex, + created_at: UnixTimestamp, + ) -> GroupOutboxPayload { + membership_payload(KIND_GROUP_REMOVE_USER, group_id, target_pubkey, created_at) + } + + pub fn sign_payload(&self, payload: &GroupOutboxPayload) -> Result<Event, GroupError> { + let tags = payload + .tags() + .iter() + .cloned() + .map(Tag::new) + .collect::<Result<Vec<_>, _>>() + .map_err(GroupError::internal)?; + let unsigned = UnsignedEvent::new( + self.signer.public_key().clone(), + payload.generated_created_at(), + Kind::new(payload.generated_kind().into()).map_err(GroupError::internal)?, + tags, + payload.content(), + ); + Ok(self.signer.sign_unsigned_event(unsigned)) + } + + pub fn build_metadata_snapshot( + &self, + group: &GroupState, + created_at: UnixTimestamp, + ) -> Result<Event, GroupError> { + self.sign_payload(&Self::metadata_snapshot_payload(group, created_at)?) + } + + pub fn build_admin_list_snapshot( + &self, + group_id: &GroupId, + projection: &GroupProjection, + authority: &GroupAuthority, + created_at: UnixTimestamp, + ) -> Result<Event, GroupError> { + self.sign_payload(&Self::admin_list_snapshot_payload( + group_id, projection, authority, created_at, + )?) + } + + pub fn build_join_accepted( + &self, + group_id: &GroupId, + target_pubkey: &PublicKeyHex, + created_at: UnixTimestamp, + ) -> Result<Event, GroupError> { + self.sign_payload(&Self::join_accepted_payload( + group_id, + target_pubkey, + created_at, + )) + } + + pub fn build_leave_accepted( + &self, + group_id: &GroupId, + target_pubkey: &PublicKeyHex, + created_at: UnixTimestamp, + ) -> Result<Event, GroupError> { + self.sign_payload(&Self::leave_accepted_payload( + group_id, + target_pubkey, + created_at, + )) + } +} + +fn metadata_tags( + group_id: &GroupId, + metadata: &GroupMetadata, +) -> Result<Vec<Vec<String>>, GroupError> { + let mut tags = vec![tag_values(["d".to_owned(), group_id.as_str().to_owned()])]; + if let Some(name) = metadata.name() { + tags.push(tag_values(["name".to_owned(), name.to_owned()])); + } + if let Some(picture) = metadata.picture() { + tags.push(tag_values(["picture".to_owned(), picture.to_owned()])); + } + if let Some(about) = metadata.about() { + tags.push(tag_values(["about".to_owned(), about.to_owned()])); + } + if metadata.private() { + tags.push(tag_values(["private".to_owned()])); + } + if metadata.restricted() { + tags.push(tag_values(["restricted".to_owned()])); + } + if metadata.hidden() { + tags.push(tag_values(["hidden".to_owned()])); + } + if metadata.closed() { + tags.push(tag_values(["closed".to_owned()])); + } + match metadata.supported_kinds() { + SupportedKinds::UnspecifiedAll => {} + SupportedKinds::None => tags.push(tag_values(["supported_kinds".to_owned()])), + SupportedKinds::Only(kinds) => { + let mut tag = vec!["supported_kinds".to_owned()]; + tag.extend(kinds.iter().map(|kind| kind.as_u32().to_string())); + tags.push(tag); + } + } + for tag in &tags { + Tag::new(tag.clone()).map_err(GroupError::internal)?; + } + Ok(tags) +} + +fn membership_payload( + kind: u32, + group_id: &GroupId, + target_pubkey: &PublicKeyHex, + created_at: UnixTimestamp, +) -> GroupOutboxPayload { + GroupOutboxPayload::new( + kind, + created_at, + vec![ + tag_values(["h".to_owned(), group_id.as_str().to_owned()]), + tag_values(["p".to_owned(), target_pubkey.as_str().to_owned()]), + ], + "", + ) +} + +fn tag_values<const N: usize>(values: [String; N]) -> Vec<String> { + values.into_iter().collect() +} + +#[cfg(test)] +mod tests { + use super::GroupGeneratedEventBuilder; + use crate::{ + GroupAuthority, GroupId, GroupMetadata, GroupProjection, GroupState, KIND_GROUP_ADMINS, + KIND_GROUP_MEMBERS, KIND_GROUP_METADATA, KIND_GROUP_PUT_USER, KIND_GROUP_REMOVE_USER, + MemberState, MemberStatus, ProjectionOrderTuple, StoreOffset, + }; + use tangle_crypto::{RelaySigner, verify_event_signature}; + use tangle_protocol::{EventId, PublicKeyHex, UnixTimestamp}; + + #[test] + fn generated_metadata_event_is_relay_signed() { + let builder = builder(); + let group = group_state("Farm", GroupMetadata::empty()); + let event = builder + .build_metadata_snapshot(&group, UnixTimestamp::new(20)) + .expect("event"); + + assert_eq!(event.unsigned().kind().as_u32(), KIND_GROUP_METADATA); + assert_eq!(event.unsigned().pubkey(), builder.relay_pubkey()); + assert!(has_tag(&event, &["d", "Farm"])); + verify_event_signature(&event).expect("signature"); + } + + #[test] + fn generated_admin_event_includes_configured_and_override_admins() { + let builder = builder(); + let group_id = GroupId::new("Farm").expect("group"); + let owner = pubkey("1"); + let admin = pubkey("2"); + let override_member = pubkey("3"); + let mut projection = GroupProjection::new(); + projection.put_member( + group_id.clone(), + MemberState::new( + override_member.clone(), + MemberStatus::Member, + [crate::RoleName::permanent_relay_override()] + .into_iter() + .collect(), + event_id("30"), + tuple(30, "30", 3), + ), + ); + let event = builder + .build_admin_list_snapshot( + &group_id, + &projection, + &GroupAuthority::new([owner.clone()], [admin.clone()]), + UnixTimestamp::new(20), + ) + .expect("event"); + + assert_eq!(event.unsigned().kind().as_u32(), KIND_GROUP_ADMINS); + for pubkey in [owner, admin, override_member] { + assert!(has_tag(&event, &["p", pubkey.as_str()])); + } + verify_event_signature(&event).expect("signature"); + } + + #[test] + fn generated_member_snapshot_is_capped() { + let group_id = GroupId::new("Farm").expect("group"); + let mut projection = GroupProjection::new(); + projection.put_member( + group_id.clone(), + MemberState::new( + pubkey("1"), + MemberStatus::Member, + Default::default(), + event_id("10"), + tuple(10, "10", 1), + ), + ); + + let payload = GroupGeneratedEventBuilder::member_list_snapshot_payload( + &group_id, + &projection, + UnixTimestamp::new(20), + 1, + ) + .expect("payload") + .expect("under cap"); + assert_eq!(payload.generated_kind(), KIND_GROUP_MEMBERS); + assert!( + GroupGeneratedEventBuilder::member_list_snapshot_payload( + &group_id, + &projection, + UnixTimestamp::new(20), + 0 + ) + .expect("payload") + .is_none() + ); + } + + #[test] + fn generated_membership_events_use_group_and_target_tags() { + let builder = builder(); + let group_id = GroupId::new("Farm").expect("group"); + let member = pubkey("4"); + let join = builder + .build_join_accepted(&group_id, &member, UnixTimestamp::new(20)) + .expect("join"); + let leave = builder + .build_leave_accepted(&group_id, &member, UnixTimestamp::new(21)) + .expect("leave"); + + assert_eq!(join.unsigned().kind().as_u32(), KIND_GROUP_PUT_USER); + assert_eq!(leave.unsigned().kind().as_u32(), KIND_GROUP_REMOVE_USER); + for event in [join, leave] { + assert!(has_tag(&event, &["h", "Farm"])); + assert!(has_tag(&event, &["p", member.as_str()])); + verify_event_signature(&event).expect("signature"); + } + } + + fn has_tag(event: &tangle_protocol::Event, expected: &[&str]) -> bool { + event.unsigned().tags().iter().any(|tag| { + tag.values() + .iter() + .map(String::as_str) + .eq(expected.iter().copied()) + }) + } + + fn builder() -> GroupGeneratedEventBuilder { + GroupGeneratedEventBuilder::new(RelaySigner::from_secret_hex(&"7".repeat(64)).expect("key")) + } + + fn group_state(group_id: &str, metadata: GroupMetadata) -> GroupState { + GroupState::new( + GroupId::new(group_id).expect("group"), + metadata, + pubkey("9"), + event_id("10"), + tuple(10, "10", 1), + ) + } + + fn pubkey(suffix: &str) -> PublicKeyHex { + PublicKeyHex::new(&suffix.repeat(64)).expect("pubkey") + } + + fn tuple(created_at: u64, suffix: &str, offset: u64) -> ProjectionOrderTuple { + ProjectionOrderTuple::new( + UnixTimestamp::new(created_at), + event_id(suffix), + StoreOffset::new(offset), + ) + } + + fn event_id(suffix: &str) -> EventId { + let mut value = "0".repeat(64 - suffix.len()); + value.push_str(suffix); + EventId::new(&value).expect("event") + } +} diff --git a/crates/tangle_runtime/src/base_relay.rs b/crates/tangle_runtime/src/base_relay.rs @@ -10,7 +10,14 @@ use serde::{Deserialize, Serialize}; use std::{collections::BTreeMap, collections::BTreeSet, str}; use tangle_crypto::{RelaySigner, verify_event_signature}; use tangle_groups::{ - GroupEventClass, GroupLimitsConfig, GroupRuntimeConfig, validate_client_group_event_structure, + GroupAuthContext, GroupAuthority, GroupError, GroupEventClass, GroupGeneratedEventBuilder, + GroupId, GroupLimitsConfig, GroupOutbox, GroupOutboxEffect, GroupOutboxKey, GroupOutboxPayload, + GroupOutboxRecord, GroupProjection, GroupRuntimeConfig, GroupState, GroupTombstone, + KIND_GROUP_CREATE_GROUP, KIND_GROUP_EDIT_METADATA, KIND_GROUP_JOIN_REQUEST, + KIND_GROUP_LEAVE_REQUEST, KIND_GROUP_MEMBERS, KIND_GROUP_PUT_USER, KIND_GROUP_REMOVE_USER, + MemberState, ProjectedRoleDefinition, ProjectionCheckpoint, StoreOffset, group_current_key, + member_current_key, projection_checkpoint_key, role_current_key, tombstone_key, + validate_client_group_event_structure, }; use tangle_nips::parse_relay_auth_event; use tangle_protocol::{ @@ -19,7 +26,8 @@ use tangle_protocol::{ }; use tangle_store_pocket::{ PocketEvent, PocketEventId, PocketOwnedEvent, PocketOwnedFilter, PocketStoreConfig, - PocketStoreHandle, parse_pocket_event_json, parse_pocket_filter_json, + PocketStoreHandle, TANGLE_GROUP_CHECKPOINT_TABLE, TANGLE_GROUP_OUTBOX_TABLE, + TANGLE_GROUP_PROJECTION_TABLE, parse_pocket_event_json, parse_pocket_filter_json, }; pub const BASE_RELAY_SUPPORTED_NIPS: [u16; 5] = [1, 11, 42, 45, 70]; @@ -248,6 +256,7 @@ struct BaseAuthChallenge { pub struct BaseRelay { store: PocketStoreHandle, subscriptions: LiveSubscriptionSet, + groups: Option<GroupService>, } impl BaseRelay { @@ -259,13 +268,32 @@ impl BaseRelay { Self::new(store, max_pending_events) } + pub fn open_with_groups( + config: &PocketStoreConfig, + max_pending_events: usize, + groups: &GroupRuntimeConfig, + ) -> Result<Self, BaseRelayError> { + let store = PocketStoreHandle::open(config).map_err(BaseRelayError::from)?; + Self::new_with_groups(store, max_pending_events, groups) + } + pub fn new( store: PocketStoreHandle, max_pending_events: usize, ) -> Result<Self, BaseRelayError> { + Self::new_with_groups(store, max_pending_events, &GroupRuntimeConfig::disabled()) + } + + pub fn new_with_groups( + store: PocketStoreHandle, + max_pending_events: usize, + groups: &GroupRuntimeConfig, + ) -> Result<Self, BaseRelayError> { + let groups = GroupService::from_config(&store, groups)?; Ok(Self { store, subscriptions: LiveSubscriptionSet::new(max_pending_events)?, + groups, }) } @@ -276,7 +304,9 @@ impl BaseRelay { now: UnixTimestamp, ) -> Result<Vec<RelayMessage>, BaseRelayError> { match message { - ClientMessage::Event(event) => self.handle_event(event).map(|message| vec![message]), + ClientMessage::Event(event) => self + .handle_event_with_auth(event, auth) + .map(|message| vec![message]), ClientMessage::Req { subscription_id, filters, @@ -310,20 +340,57 @@ impl BaseRelay { } } - pub fn handle_event(&self, event: Event) -> Result<RelayMessage, BaseRelayError> { + pub fn handle_event(&mut self, event: Event) -> Result<RelayMessage, BaseRelayError> { + self.handle_event_with_group_auth(event, &GroupAuthContext::unauthenticated()) + } + + pub fn handle_event_with_auth( + &mut self, + event: Event, + auth: &BaseAuthState, + ) -> Result<RelayMessage, BaseRelayError> { + self.handle_event_with_group_auth( + event, + &GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()), + ) + } + + pub fn groups_enabled(&self) -> bool { + self.groups.is_some() + } + + pub fn group_projection(&self) -> Option<&GroupProjection> { + self.groups.as_ref().map(|groups| groups.projection()) + } + + fn handle_event_with_group_auth( + &mut self, + event: Event, + auth: &GroupAuthContext, + ) -> Result<RelayMessage, BaseRelayError> { let event_id = event.id().clone(); if let Err(error) = verify_event_signature(&event) { return Ok(ok_rejected(event_id, format!("invalid: {error}"))); } - match validate_client_group_event_structure(&event, GroupLimitsConfig::default()) { - Ok(GroupEventClass::NonGroup) => {} - Ok(_) => { + let group_limits = self + .groups + .as_ref() + .map(GroupService::limits) + .unwrap_or_default(); + let class = match validate_client_group_event_structure(&event, group_limits) { + Ok(class) => class, + Err(error) => return Ok(ok_rejected(event_id, error.prefixed_message())), + }; + if !matches!(class, GroupEventClass::NonGroup) { + let Some(groups) = self.groups.as_ref() else { return Ok(ok_rejected( event_id, "blocked: NIP-29 group events are not accepted before group service".to_owned(), )); + }; + if let Err(error) = groups.check_event(&event, &class, auth) { + return Ok(ok_rejected(event_id, error.prefixed_message())); } - Err(error) => return Ok(ok_rejected(event_id, error.prefixed_message())), } if event.unsigned().kind().is_ephemeral() { return Ok(ok_accepted(event_id, String::new())); @@ -339,7 +406,12 @@ impl BaseRelay { )); } let pocket_event = tangle_event_to_pocket(&event)?; - self.store.store_event(&pocket_event)?; + let store_offset = StoreOffset::new(self.store.store_event(&pocket_event)?); + if !matches!(class, GroupEventClass::NonGroup) + && let Some(groups) = self.groups.as_mut() + { + groups.after_source_event_stored(&self.store, &event, &class, store_offset)?; + } self.store.sync()?; Ok(ok_accepted(event_id, String::new())) } @@ -406,6 +478,372 @@ impl BaseRelay { } } +struct GroupService { + builder: GroupGeneratedEventBuilder, + authority: GroupAuthority, + projection: GroupProjection, + outbox: GroupOutbox, + limits: GroupLimitsConfig, + member_snapshot_cap: u32, +} + +impl GroupService { + fn from_config( + store: &PocketStoreHandle, + config: &GroupRuntimeConfig, + ) -> Result<Option<Self>, BaseRelayError> { + if !config.enabled() { + return Ok(None); + } + let relay_secret = config + .relay_secret() + .ok_or_else(|| BaseRelayError::invalid("groups.relay_secret is required"))?; + let signer = RelaySigner::from_secret_hex(relay_secret.expose_for_signing()) + .map_err(BaseRelayError::invalid)?; + Ok(Some(Self { + builder: GroupGeneratedEventBuilder::new(signer), + authority: GroupAuthority::new( + config.owner_pubkeys().iter().cloned(), + config.admin_pubkeys().iter().cloned(), + ), + projection: load_group_projection(store)?, + outbox: load_group_outbox(store)?, + limits: config.limits(), + member_snapshot_cap: config.limits().max_member_list_pubkeys(), + })) + } + + fn projection(&self) -> &GroupProjection { + &self.projection + } + + fn limits(&self) -> GroupLimitsConfig { + self.limits + } + + fn check_event( + &self, + event: &Event, + class: &GroupEventClass, + auth: &GroupAuthContext, + ) -> Result<(), GroupError> { + tangle_groups::GroupWritePolicy::new(&self.projection, &self.authority) + .check_event(event, class, auth) + .map(|_| ()) + } + + fn after_source_event_stored( + &mut self, + store: &PocketStoreHandle, + event: &Event, + class: &GroupEventClass, + store_offset: StoreOffset, + ) -> Result<(), BaseRelayError> { + self.projection + .apply_canonical_event(event, store_offset, self.limits)?; + if let Some(group_id) = class_group_id(class) { + self.persist_group_projection(store, group_id)?; + } + for record in self.plan_outbox_records(event, class)? { + let inserted = self.outbox.insert_idempotent(record.clone())?; + if inserted { + persist_outbox_record(store, &record)?; + } + } + self.materialize_outbox(store) + } + + fn plan_outbox_records( + &self, + event: &Event, + class: &GroupEventClass, + ) -> Result<Vec<GroupOutboxRecord>, GroupError> { + let created_at = event.unsigned().created_at(); + match class { + GroupEventClass::Moderation { kind, group_id } => match kind.as_u32() { + KIND_GROUP_CREATE_GROUP => { + let group = self.require_group(group_id)?; + Ok(vec![ + self.pending_record( + event, + GroupOutboxEffect::MetadataSnapshot, + group_id, + None, + GroupGeneratedEventBuilder::metadata_snapshot_payload( + group, created_at, + )?, + ), + self.pending_record( + event, + GroupOutboxEffect::AdminListSnapshot, + group_id, + None, + GroupGeneratedEventBuilder::admin_list_snapshot_payload( + group_id, + &self.projection, + &self.authority, + created_at, + )?, + ), + ]) + } + KIND_GROUP_EDIT_METADATA => { + let group = self.require_group(group_id)?; + Ok(vec![self.pending_record( + event, + GroupOutboxEffect::MetadataSnapshot, + group_id, + None, + GroupGeneratedEventBuilder::metadata_snapshot_payload(group, created_at)?, + )]) + } + KIND_GROUP_PUT_USER | KIND_GROUP_REMOVE_USER => { + Ok(self.member_snapshot_record(event, group_id, created_at)?) + } + _ => Ok(Vec::new()), + }, + GroupEventClass::Normal { group_id } => match event.unsigned().kind().as_u32() { + KIND_GROUP_JOIN_REQUEST => Ok(vec![self.pending_record( + event, + GroupOutboxEffect::JoinAccepted, + group_id, + Some(event.unsigned().pubkey().clone()), + GroupGeneratedEventBuilder::join_accepted_payload( + group_id, + event.unsigned().pubkey(), + created_at, + ), + )]), + KIND_GROUP_LEAVE_REQUEST => Ok(vec![self.pending_record( + event, + GroupOutboxEffect::LeaveAccepted, + group_id, + Some(event.unsigned().pubkey().clone()), + GroupGeneratedEventBuilder::leave_accepted_payload( + group_id, + event.unsigned().pubkey(), + created_at, + ), + )]), + _ => Ok(Vec::new()), + }, + GroupEventClass::NonGroup | GroupEventClass::RelayGeneratedSnapshot { .. } => { + Ok(Vec::new()) + } + } + } + + fn member_snapshot_record( + &self, + event: &Event, + group_id: &GroupId, + created_at: UnixTimestamp, + ) -> Result<Vec<GroupOutboxRecord>, GroupError> { + let key = GroupOutboxKey::new( + event.id().clone(), + GroupOutboxEffect::MemberListSnapshot, + group_id.clone(), + None, + ); + let payload = GroupGeneratedEventBuilder::member_list_snapshot_payload( + group_id, + &self.projection, + created_at, + self.member_snapshot_cap, + )?; + Ok(vec![match payload { + Some(payload) => GroupOutboxRecord::pending(key, payload), + None => { + let mut record = GroupOutboxRecord::pending( + key, + GroupOutboxPayload::new( + KIND_GROUP_MEMBERS, + created_at, + vec![vec!["d".to_owned(), group_id.as_str().to_owned()]], + "", + ), + ); + record.mark_skipped("member snapshot exceeds configured cap"); + record + } + }]) + } + + fn pending_record( + &self, + event: &Event, + effect: GroupOutboxEffect, + group_id: &GroupId, + target_pubkey: Option<PublicKeyHex>, + payload: GroupOutboxPayload, + ) -> GroupOutboxRecord { + GroupOutboxRecord::pending( + GroupOutboxKey::new(event.id().clone(), effect, group_id.clone(), target_pubkey), + payload, + ) + } + + fn materialize_outbox(&mut self, store: &PocketStoreHandle) -> Result<(), BaseRelayError> { + let records = self.outbox.replay_plan().records().to_vec(); + for record in records { + self.materialize_record(store, record)?; + } + Ok(()) + } + + fn materialize_record( + &mut self, + store: &PocketStoreHandle, + mut record: GroupOutboxRecord, + ) -> Result<(), BaseRelayError> { + if matches!( + record.key().effect(), + GroupOutboxEffect::RoleListSnapshot | GroupOutboxEffect::State39004Snapshot + ) { + record.mark_skipped("generated group effect is not supported"); + self.outbox.update(record.clone()); + persist_outbox_record(store, &record)?; + return Ok(()); + } + match self.store_generated_event(store, &record) { + Ok(generated_event_id) => { + record.mark_stored(generated_event_id); + self.outbox.update(record.clone()); + persist_outbox_record(store, &record)?; + Ok(()) + } + Err(error) => { + record.mark_failed(true, error.prefixed_message()); + self.outbox.update(record.clone()); + persist_outbox_record(store, &record)?; + Err(error) + } + } + } + + fn store_generated_event( + &mut self, + store: &PocketStoreHandle, + record: &GroupOutboxRecord, + ) -> Result<EventId, BaseRelayError> { + let event = self.builder.sign_payload(record.payload())?; + if store.event_by_id(pocket_event_id(event.id())?)?.is_some() { + return Ok(event.id().clone()); + } + let pocket_event = tangle_event_to_pocket(&event)?; + let offset = StoreOffset::new(store.store_event(&pocket_event)?); + self.projection + .apply_canonical_event(&event, offset, self.limits)?; + self.persist_group_projection(store, record.key().group_id())?; + Ok(event.id().clone()) + } + + fn persist_group_projection( + &self, + store: &PocketStoreHandle, + group_id: &GroupId, + ) -> Result<(), BaseRelayError> { + if let Some(group) = self.projection.group(group_id) { + store.put_extra_record( + TANGLE_GROUP_PROJECTION_TABLE, + &group_current_key(group_id), + &group.to_json_bytes()?, + )?; + } + for ((candidate_group, pubkey), member) in self.projection.members() { + if candidate_group == group_id { + store.put_extra_record( + TANGLE_GROUP_PROJECTION_TABLE, + &member_current_key(group_id, pubkey), + &member.to_json_bytes()?, + )?; + } + } + for ((candidate_group, role_name), role) in self.projection.roles() { + if candidate_group == group_id { + store.put_extra_record( + TANGLE_GROUP_PROJECTION_TABLE, + &role_current_key(group_id, role_name), + &role.to_json_bytes()?, + )?; + } + } + if let Some(tombstone) = self.projection.tombstone(group_id) { + store.put_extra_record( + TANGLE_GROUP_PROJECTION_TABLE, + &tombstone_key(group_id), + &tombstone.to_json_bytes()?, + )?; + } + Ok(()) + } + + fn require_group(&self, group_id: &GroupId) -> Result<&GroupState, GroupError> { + self.projection + .group(group_id) + .ok_or_else(|| GroupError::internal("group projection is missing after accepted write")) + } +} + +fn load_group_projection(store: &PocketStoreHandle) -> Result<GroupProjection, BaseRelayError> { + let mut projection = GroupProjection::new(); + for (key, value) in store.scan_extra_records(TANGLE_GROUP_PROJECTION_TABLE)? { + match projection_key_parts(&key)?.as_slice() { + ["group", _] => projection.put_group(GroupState::from_json_bytes(&value)?), + ["member", group_id, _] => projection.put_member( + GroupId::new(group_id)?, + MemberState::from_json_bytes(&value)?, + ), + ["role", group_id, _] => projection.put_role( + GroupId::new(group_id)?, + ProjectedRoleDefinition::from_json_bytes(&value)?, + ), + ["tombstone", _] => projection.put_tombstone(GroupTombstone::from_json_bytes(&value)?), + _ => {} + } + } + if let Some(raw) = + store.get_extra_record(TANGLE_GROUP_CHECKPOINT_TABLE, &projection_checkpoint_key())? + { + projection.set_checkpoint(ProjectionCheckpoint::from_json_bytes(&raw)?); + } + Ok(projection) +} + +fn load_group_outbox(store: &PocketStoreHandle) -> Result<GroupOutbox, BaseRelayError> { + let mut outbox = GroupOutbox::new(); + for (_, value) in store.scan_extra_records(TANGLE_GROUP_OUTBOX_TABLE)? { + outbox.update(GroupOutboxRecord::from_json_bytes(&value)?); + } + Ok(outbox) +} + +fn projection_key_parts(key: &[u8]) -> Result<Vec<&str>, BaseRelayError> { + let key = str::from_utf8(key).map_err(|error| BaseRelayError::error(error.to_string()))?; + Ok(key.split('\0').collect()) +} + +fn persist_outbox_record( + store: &PocketStoreHandle, + record: &GroupOutboxRecord, +) -> Result<(), BaseRelayError> { + store.put_extra_record( + TANGLE_GROUP_OUTBOX_TABLE, + &record.key().storage_key(), + &record.to_json_bytes()?, + )?; + Ok(()) +} + +fn class_group_id(class: &GroupEventClass) -> Option<&GroupId> { + match class { + GroupEventClass::Moderation { group_id, .. } + | GroupEventClass::Normal { group_id } + | GroupEventClass::RelayGeneratedSnapshot { group_id, .. } => Some(group_id), + GroupEventClass::NonGroup => None, + } +} + #[derive(Debug, Clone, PartialEq, Eq)] pub struct LiveSubscriptionSet { subscriptions: BTreeMap<SubscriptionId, Vec<Filter>>, @@ -550,6 +988,12 @@ impl From<tangle_store_pocket::PocketStoreError> for BaseRelayError { } } +impl From<GroupError> for BaseRelayError { + fn from(error: GroupError) -> Self { + Self::error(error.prefixed_message()) + } +} + fn relay_self_from_groups( groups: &GroupRuntimeConfig, ) -> Result<Option<PublicKeyHex>, BaseRelayError> { @@ -621,10 +1065,13 @@ mod tests { use axum::body::to_bytes; use http::{Request, StatusCode, header}; use tangle_crypto::RelaySigner; - use tangle_groups::parse_group_runtime_config_json; + use tangle_groups::{ + GroupId, KIND_GROUP_ADMINS, KIND_GROUP_CREATE_GROUP, KIND_GROUP_JOIN_REQUEST, + KIND_GROUP_METADATA, KIND_GROUP_PUT_USER, MemberStatus, parse_group_runtime_config_json, + }; use tangle_protocol::{ - ClientMessage, Event, Filter, Kind, RelayMessage, SubscriptionId, Tag, UnixTimestamp, - UnsignedEvent, filter_from_value, + ClientMessage, Event, Filter, Kind, PublicKeyHex, RelayMessage, SubscriptionId, Tag, + UnixTimestamp, UnsignedEvent, filter_from_value, }; use tangle_store_pocket::{PocketStoreConfig, PocketSyncPolicy}; use tower::ServiceExt; @@ -777,7 +1224,7 @@ mod tests { #[test] fn base_relay_rejects_group_marked_events_before_group_service() { - let relay = test_relay("base-relay-group-reject", 4); + let mut relay = test_relay("base-relay-group-reject", 4); let event = signed_public_event( 7, 1, @@ -798,7 +1245,7 @@ mod tests { #[test] fn base_relay_rejects_client_submitted_relay_generated_group_state() { - let relay = test_relay("base-relay-generated-group-reject", 4); + let mut relay = test_relay("base-relay-generated-group-reject", 4); let event = signed_public_event( 7, 39_000, @@ -819,6 +1266,138 @@ mod tests { } #[test] + fn base_relay_initializes_group_service_from_config() { + let owner = signer(7).public_key().clone(); + let relay = test_relay_with_groups( + "base-relay-groups-enabled", + 4, + &enabled_groups_for_owner(&owner), + ); + let disabled = test_relay_with_groups("base-relay-groups-disabled", 4, &disabled_groups()); + + assert!(relay.groups_enabled()); + assert!( + relay + .group_projection() + .expect("projection") + .groups() + .is_empty() + ); + assert!(!disabled.groups_enabled()); + assert!(disabled.group_projection().is_none()); + } + + #[test] + fn group_event_write_requires_auth_before_storage() { + let owner = signer(7).public_key().clone(); + let mut relay = test_relay_with_groups( + "base-relay-group-auth-required", + 4, + &enabled_groups_for_owner(&owner), + ); + let auth = BaseAuthState::new("wss://relay.radroots.test", 60).expect("auth"); + let event = signed_group_create_event(7, "Farm"); + + assert_eq!( + relay + .handle_event_with_auth(event.clone(), &auth) + .expect("event"), + RelayMessage::Ok { + event_id: event.id().clone(), + accepted: false, + message: "auth-required: group event author must authenticate with AUTH".to_owned() + } + ); + assert!( + relay + .group_projection() + .expect("projection") + .group(&GroupId::new("Farm").expect("group")) + .is_none() + ); + assert_eq!(count_kind(&relay, KIND_GROUP_CREATE_GROUP), 0); + } + + #[test] + fn group_create_updates_projection_and_stores_generated_snapshots() { + let owner = signer(7).public_key().clone(); + let mut relay = test_relay_with_groups( + "base-relay-group-create", + 4, + &enabled_groups_for_owner(&owner), + ); + let auth = authenticated_state(7); + let event = signed_group_create_event(7, "Farm"); + + assert_eq!( + relay + .handle_event_with_auth(event.clone(), &auth) + .expect("event"), + RelayMessage::Ok { + event_id: event.id().clone(), + accepted: true, + message: String::new() + } + ); + + let group_id = GroupId::new("Farm").expect("group"); + assert!( + relay + .group_projection() + .expect("projection") + .group(&group_id) + .is_some() + ); + assert_eq!(count_kind(&relay, KIND_GROUP_CREATE_GROUP), 1); + assert_eq!(count_kind(&relay, KIND_GROUP_METADATA), 1); + assert_eq!(count_kind(&relay, KIND_GROUP_ADMINS), 1); + } + + #[test] + fn group_join_materializes_relay_membership_event() { + let owner = signer(7).public_key().clone(); + let joiner = signer(8).public_key().clone(); + let mut relay = test_relay_with_groups( + "base-relay-group-join", + 4, + &enabled_groups_for_owner(&owner), + ); + let create = signed_group_create_event(7, "Farm"); + relay + .handle_event_with_auth(create, &authenticated_state(7)) + .expect("create"); + let join = signed_event_at( + 8, + KIND_GROUP_JOIN_REQUEST.into(), + vec![Tag::from_parts("h", &["Farm"]).expect("h")], + "", + 1_714_124_434, + ); + + assert_eq!( + relay + .handle_event_with_auth(join.clone(), &authenticated_state(8)) + .expect("join"), + RelayMessage::Ok { + event_id: join.id().clone(), + accepted: true, + message: String::new() + } + ); + + assert_eq!(count_kind(&relay, KIND_GROUP_PUT_USER), 1); + assert_eq!( + relay + .group_projection() + .expect("projection") + .member(&GroupId::new("Farm").expect("group"), &joiner) + .expect("member") + .status(), + MemberStatus::Member + ); + } + + #[test] fn live_subscription_lag_closes_subscription_for_resync() { let mut relay = test_relay("base-relay-lag", 1); let subscription_id = SubscriptionId::new("sub-lag").expect("sub"); @@ -885,26 +1464,46 @@ mod tests { } fn test_relay(name: &str, max_pending_events: usize) -> BaseRelay { + let config = test_store_config(name); + BaseRelay::open(&config, max_pending_events).expect("relay") + } + + fn test_relay_with_groups( + name: &str, + max_pending_events: usize, + groups: &tangle_groups::GroupRuntimeConfig, + ) -> BaseRelay { + let config = test_store_config(name); + BaseRelay::open_with_groups(&config, max_pending_events, groups).expect("relay") + } + + fn test_store_config(name: &str) -> PocketStoreConfig { let root = std::env::temp_dir().join(format!("tangle-{name}-{}", std::process::id())); let _ = std::fs::remove_dir_all(&root); - let config = PocketStoreConfig::new( + PocketStoreConfig::new( root.join("pocket"), 1024 * 1024 * 1024, 128, PocketSyncPolicy::FlushOnShutdown, ) - .expect("config"); - BaseRelay::open(&config, max_pending_events).expect("relay") + .expect("config") } fn enabled_groups() -> tangle_groups::GroupRuntimeConfig { + let owner = signer(7).public_key().clone(); + enabled_groups_for_owner(&owner) + } + + fn enabled_groups_for_owner(owner: &PublicKeyHex) -> tangle_groups::GroupRuntimeConfig { parse_group_runtime_config_json(&format!( r#"{{ "enabled": true, "canonical_relay_url": "wss://relay.radroots.test", - "relay_secret": "{}" + "relay_secret": "{}", + "owner_pubkeys": ["{}"] }}"#, - "7".repeat(64) + "7".repeat(64), + owner.as_str() )) .expect("groups") } @@ -930,6 +1529,19 @@ mod tests { signed_event_at(secret_byte, kind, tags, content, 1_714_124_433) } + fn signed_group_create_event(secret_byte: u8, group_id: &str) -> Event { + signed_event_at( + secret_byte, + KIND_GROUP_CREATE_GROUP.into(), + vec![ + Tag::from_parts("h", &[group_id]).expect("h"), + Tag::from_parts("name", &[group_id]).expect("name"), + ], + "", + 1_714_124_433, + ) + } + fn signed_event_at( secret_byte: u8, kind: u64, @@ -948,4 +1560,30 @@ mod tests { ); signer.sign_unsigned_event(unsigned) } + + fn authenticated_state(secret_byte: u8) -> BaseAuthState { + let mut auth = BaseAuthState::new("wss://relay.radroots.test", 60).expect("auth state"); + auth.issue_challenge("challenge-a", UnixTimestamp::new(100)) + .expect("challenge"); + let event = signed_auth_event(secret_byte, "challenge-a", 120); + auth.authenticate(&event, UnixTimestamp::new(120)) + .expect("authenticate"); + auth + } + + fn count_kind(relay: &BaseRelay, kind: u32) -> u64 { + let subscription_id = SubscriptionId::new(&format!("count-{kind}")).expect("sub"); + let filter = filter_from_value(&serde_json::json!({"kinds":[kind]})).expect("filter"); + match relay + .handle_count(subscription_id, vec![filter]) + .expect("count") + { + RelayMessage::Count { count, .. } => count, + _ => panic!("count response expected"), + } + } + + fn signer(secret_byte: u8) -> RelaySigner { + RelaySigner::from_secret_hex(&format!("{:02x}", secret_byte).repeat(32)).expect("signer") + } }