tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 16a0af31a3a49caee23c41b3add4e98c75ff9359
parent e7135aec4465f94152729da628ab01149769f28c
Author: triesap <tyson@radroots.org>
Date:   Mon, 15 Jun 2026 19:19:40 -0700

runtime: write groups from Pocket events

- add a Pocket-native group service write entry point
- store inbound group source events without protocol reserialization
- read group delete-target and audit fields through GroupEventView
- keep protocol event storage helpers limited to tests and generated state

Diffstat:
Mcrates/tangle_runtime/src/groups.rs | 100++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------
Mcrates/tangle_runtime/src/logging.rs | 52++++++++++++++++++++++++++++++++--------------------
Mcrates/tangle_runtime/src/relay/core.rs | 31++++++++++++-------------------
3 files changed, 118 insertions(+), 65 deletions(-)

diff --git a/crates/tangle_runtime/src/groups.rs b/crates/tangle_runtime/src/groups.rs @@ -25,7 +25,7 @@ use tangle_groups::{ }; use tangle_protocol::{Event, EventId, PublicKeyHex, UnixTimestamp}; use tangle_store_pocket::{ - PocketStoreHandle, TANGLE_GROUP_CHECKPOINT_TABLE, TANGLE_GROUP_OUTBOX_TABLE, + PocketEvent, PocketStoreHandle, TANGLE_GROUP_CHECKPOINT_TABLE, TANGLE_GROUP_OUTBOX_TABLE, TANGLE_GROUP_PROJECTION_TABLE, }; @@ -119,6 +119,20 @@ impl GroupServiceHandle { .event_visible_to_auth(event, auth) } + pub(crate) fn store_group_pocket_event( + &self, + store: &PocketStoreHandle, + event: &PocketEvent, + class: &GroupEventClass, + auth: &GroupAuthContext, + ) -> Result<GroupEventWrite, GroupEventWriteError> { + self.state + .write() + .map_err(|_| BaseRelayError::error("group service state lock is poisoned"))? + .store_group_pocket_event(store, event, class, auth) + } + + #[cfg(test)] pub(crate) fn store_group_event( &self, store: &PocketStoreHandle, @@ -176,7 +190,7 @@ impl GroupServiceState { fn check_event( &self, store: &PocketStoreHandle, - event: &Event, + event: &(impl GroupEventView + ?Sized), class: &GroupEventClass, auth: &GroupAuthContext, ) -> Result<(), GroupError> { @@ -189,7 +203,7 @@ impl GroupServiceState { fn check_runtime_write_constraints( &self, store: &PocketStoreHandle, - event: &Event, + event: &(impl GroupEventView + ?Sized), class: &GroupEventClass, ) -> Result<(), GroupError> { if let GroupEventClass::Moderation { kind, group_id } = class @@ -203,7 +217,7 @@ impl GroupServiceState { fn check_delete_event_target( &self, store: &PocketStoreHandle, - event: &Event, + event: &(impl GroupEventView + ?Sized), group_id: &GroupId, ) -> Result<(), GroupError> { let target_id = delete_target_event_id(event)?; @@ -229,6 +243,36 @@ impl GroupServiceState { Ok(()) } + fn store_group_pocket_event( + &mut self, + store: &PocketStoreHandle, + event: &PocketEvent, + class: &GroupEventClass, + auth: &GroupAuthContext, + ) -> Result<GroupEventWrite, GroupEventWriteError> { + self.check_event(store, event, class, auth) + .map_err(GroupEventWriteError::Rejected)?; + if store + .event_by_id(event.id()) + .map_err(BaseRelayError::from)? + .is_some() + { + return Ok(GroupEventWrite::Duplicate); + } + let projection_event = pocket_event_to_tangle(event)?; + let store_offset = + StoreOffset::new(store.store_event(event).map_err(BaseRelayError::from)?); + let mut stored_offsets = vec![store_offset]; + stored_offsets.extend(self.after_source_event_stored( + store, + &projection_event, + class, + store_offset, + )?); + Ok(GroupEventWrite::Stored(stored_offsets)) + } + + #[cfg(test)] fn store_group_event( &mut self, store: &PocketStoreHandle, @@ -695,10 +739,13 @@ fn member_is_relay_override_admin( }) } -fn membership_target_pubkey(event: &Event) -> Result<PublicKeyHex, GroupError> { - for tag in event.unsigned().tags() { - if tag.values().first().is_none_or(|name| name != "p") { - continue; +fn membership_target_pubkey( + event: &(impl GroupEventView + ?Sized), +) -> Result<PublicKeyHex, GroupError> { + let mut target = None; + event.visit_tags(|tag| { + if tag.first_value().is_none_or(|name| name != "p") { + return Ok(()); } let Some((_, value)) = tag.indexed_pair() else { return Err(GroupError::invalid( @@ -706,17 +753,17 @@ fn membership_target_pubkey(event: &Event) -> Result<PublicKeyHex, GroupError> { "malformed p target tag", )); }; - return PublicKeyHex::new(value).map_err(|reason| { + target = Some(PublicKeyHex::new(value).map_err(|reason| { GroupError::invalid( GroupErrorKind::MalformedTargetTag, format!("malformed p target tag: {reason}"), ) - }); - } - Err(GroupError::invalid( - GroupErrorKind::MissingTargetTag, - "missing p target tag", - )) + })?); + Ok(()) + })?; + target.ok_or_else(|| { + GroupError::invalid(GroupErrorKind::MissingTargetTag, "missing p target tag") + }) } fn pending_record( @@ -1137,10 +1184,11 @@ fn class_group_id(class: &GroupEventClass) -> Option<&GroupId> { } } -fn delete_target_event_id(event: &Event) -> Result<EventId, GroupError> { - for tag in event.unsigned().tags() { - if tag.values().first().is_none_or(|name| name != "e") { - continue; +fn delete_target_event_id(event: &(impl GroupEventView + ?Sized)) -> Result<EventId, GroupError> { + let mut target = None; + event.visit_tags(|tag| { + if tag.first_value().is_none_or(|name| name != "e") { + return Ok(()); } let Some((_, value)) = tag.indexed_pair() else { return Err(GroupError::invalid( @@ -1148,17 +1196,17 @@ fn delete_target_event_id(event: &Event) -> Result<EventId, GroupError> { "malformed e target tag", )); }; - return EventId::new(value).map_err(|reason| { + target = Some(EventId::new(value).map_err(|reason| { GroupError::invalid( GroupErrorKind::MalformedTargetTag, format!("malformed e target tag: {reason}"), ) - }); - } - Err(GroupError::invalid( - GroupErrorKind::MissingTargetTag, - "missing e target tag", - )) + })?); + Ok(()) + })?; + target.ok_or_else(|| { + GroupError::invalid(GroupErrorKind::MissingTargetTag, "missing e target tag") + }) } #[cfg(test)] diff --git a/crates/tangle_runtime/src/logging.rs b/crates/tangle_runtime/src/logging.rs @@ -6,12 +6,12 @@ use crate::{ }; use std::{fmt, net::IpAddr, net::SocketAddr}; use tangle_groups::{ - GroupEventClass, KIND_GROUP_ADMINS, KIND_GROUP_CREATE_GROUP, KIND_GROUP_DELETE_EVENT, - KIND_GROUP_DELETE_GROUP, KIND_GROUP_EDIT_METADATA, KIND_GROUP_JOIN_REQUEST, - KIND_GROUP_LEAVE_REQUEST, KIND_GROUP_MEMBERS, KIND_GROUP_METADATA, KIND_GROUP_PUT_USER, - KIND_GROUP_REMOVE_USER, + GroupEventClass, GroupEventView, KIND_GROUP_ADMINS, KIND_GROUP_CREATE_GROUP, + KIND_GROUP_DELETE_EVENT, KIND_GROUP_DELETE_GROUP, KIND_GROUP_EDIT_METADATA, + KIND_GROUP_JOIN_REQUEST, KIND_GROUP_LEAVE_REQUEST, KIND_GROUP_MEMBERS, KIND_GROUP_METADATA, + KIND_GROUP_PUT_USER, KIND_GROUP_REMOVE_USER, }; -use tangle_protocol::{Event, EventId, SubscriptionId, UnixTimestamp}; +use tangle_protocol::{EventId, SubscriptionId, UnixTimestamp}; use tracing_subscriber::EnvFilter; pub const TANGLE_LOG_REDACTED: &str = "<redacted>"; @@ -269,11 +269,13 @@ pub(crate) struct TangleModerationAuditEntry { impl TangleModerationAuditEntry { pub(crate) fn new( - event: &Event, + event: &(impl GroupEventView + ?Sized), class: &GroupEventClass, result: TangleModerationAuditResult, ) -> Option<Self> { let action_family = moderation_audit_action_family(event, class)?; + let event_id = event.id().ok()?; + let actor_pubkey = event.pubkey().ok()?; let generated_state_rejection = matches!( (class, result), ( @@ -284,9 +286,9 @@ impl TangleModerationAuditEntry { Some(Self { action_family, result: result.as_str(), - event_id: event.id().as_str().to_owned(), - actor_pubkey: event.unsigned().pubkey().as_str().to_owned(), - event_kind: event.unsigned().kind().as_u32(), + event_id: event_id.as_str().to_owned(), + actor_pubkey: actor_pubkey.as_str().to_owned(), + event_kind: event.kind_u32(), target_count: moderation_target_count(event, action_family), generated_state_rejection, }) @@ -294,7 +296,7 @@ impl TangleModerationAuditEntry { } pub(crate) fn log_group_moderation_audit( - event: &Event, + event: &(impl GroupEventView + ?Sized), class: &GroupEventClass, result: TangleModerationAuditResult, ) { @@ -320,7 +322,10 @@ pub fn sanitize_error_message(config: &BaseRelayRuntimeConfig, message: impl AsR TangleLogRedactor::from_runtime_config(config).redact(message) } -fn moderation_audit_action_family(event: &Event, class: &GroupEventClass) -> Option<&'static str> { +fn moderation_audit_action_family( + event: &(impl GroupEventView + ?Sized), + class: &GroupEventClass, +) -> Option<&'static str> { match class { GroupEventClass::Moderation { kind, .. } => match kind.as_u32() { KIND_GROUP_CREATE_GROUP => Some("group_create"), @@ -331,7 +336,7 @@ fn moderation_audit_action_family(event: &Event, class: &GroupEventClass) -> Opt KIND_GROUP_REMOVE_USER => Some("remove_user"), _ => None, }, - GroupEventClass::Normal { .. } => match event.unsigned().kind().as_u32() { + GroupEventClass::Normal { .. } => match event.kind_u32() { KIND_GROUP_JOIN_REQUEST => Some("join"), KIND_GROUP_LEAVE_REQUEST => Some("leave"), _ => None, @@ -346,7 +351,7 @@ fn moderation_audit_action_family(event: &Event, class: &GroupEventClass) -> Opt } } -fn moderation_target_count(event: &Event, action_family: &str) -> usize { +fn moderation_target_count(event: &(impl GroupEventView + ?Sized), action_family: &str) -> usize { let target_tag = match action_family { "put_user" | "remove_user" | "members" => Some("p"), "delete_event" => Some("e"), @@ -355,15 +360,22 @@ fn moderation_target_count(event: &Event, action_family: &str) -> usize { let Some(target_tag) = target_tag else { return 0; }; - event - .unsigned() - .tags() - .iter() - .filter(|tag| { - tag.indexed_pair() + let mut count = 0; + if event + .visit_tags(|tag| { + if tag + .indexed_pair() .is_some_and(|(name, _)| name == target_tag) + { + count += 1; + } + Ok(()) }) - .count() + .is_err() + { + return 0; + } + count } fn relay_secret_log_value(config: &BaseRelayRuntimeConfig) -> &'static str { diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs @@ -1072,11 +1072,11 @@ impl BaseRelay { Ok(class) => class, Err(error) => { if let Some(class) = audit_class.as_ref() { - log_pocket_group_moderation_audit( + logging::log_group_moderation_audit( event, class, TangleModerationAuditResult::Rejected, - )?; + ); } return Ok(BaseRelayEventWrite::unstored(ok_rejected( event_id, @@ -1085,10 +1085,9 @@ impl BaseRelay { } }; if !matches!(class, GroupEventClass::NonGroup) { - let tangle_event = pocket_event_to_tangle(event)?; let Some(groups) = groups else { logging::log_group_moderation_audit( - &tangle_event, + event, &class, TangleModerationAuditResult::Rejected, ); @@ -1097,10 +1096,10 @@ impl BaseRelay { "blocked: NIP-29 group events are not accepted before group service".to_owned(), ))); }; - match groups.store_group_event(store, &tangle_event, &class, auth) { + match groups.store_group_pocket_event(store, event, &class, auth) { Ok(GroupEventWrite::Stored(stored_offsets)) => { logging::log_group_moderation_audit( - &tangle_event, + event, &class, TangleModerationAuditResult::Accepted, ); @@ -1111,7 +1110,7 @@ impl BaseRelay { } Ok(GroupEventWrite::Duplicate) => { logging::log_group_moderation_audit( - &tangle_event, + event, &class, TangleModerationAuditResult::Accepted, ); @@ -1122,7 +1121,7 @@ impl BaseRelay { } Err(GroupEventWriteError::Rejected(error)) => { logging::log_group_moderation_audit( - &tangle_event, + event, &class, TangleModerationAuditResult::Rejected, ); @@ -1634,16 +1633,6 @@ fn filters_are_complete(filters: &[Filter]) -> bool { !filters.is_empty() && filters.iter().all(Filter::is_complete) } -fn log_pocket_group_moderation_audit( - event: &PocketEvent, - class: &GroupEventClass, - result: TangleModerationAuditResult, -) -> Result<(), BaseRelayError> { - let event = pocket_event_to_tangle(event)?; - logging::log_group_moderation_audit(&event, class, result); - Ok(()) -} - #[cfg(test)] mod tests { use super::{BaseRelay, BaseRelayLimitSettings, BaseRelayLimits, NEGENTROPY_DISABLED_MESSAGE}; @@ -2680,9 +2669,13 @@ mod tests { let core_source = include_str!("core.rs"); let group_source = include_str!("../groups.rs"); - assert!(core_source.contains("groups.store_group_event")); + assert!(core_source.contains("groups.store_group_pocket_event")); assert!(!core_source.contains(concat!("groups.", "check_event"))); assert!(!core_source.contains(concat!("groups.", "after_source_event_stored"))); + assert!(!core_source.contains(concat!( + "let tangle_event = ", + "pocket_event_to_tangle(event)?;" + ))); assert!(!group_source.contains("pub(crate) fn check_event(")); assert!(!group_source.contains("pub(crate) fn after_source_event_stored(")); }