commit 39b8b42d3268f8f1c35ecf0406af9979023c9be5
parent a1a0beb4c97e8e1e8c9331308c2eccad2a0db283
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 21:16:17 -0700
runtime: make group writes atomic
- replace split group write handle methods with one locked write boundary
- keep group rejection and storage errors distinct for relay responses
- route group event storage and generated offsets through the group service
- add a source-shape guard for the atomic production path
Diffstat:
2 files changed, 98 insertions(+), 45 deletions(-)
diff --git a/crates/tangle_runtime/src/groups.rs b/crates/tangle_runtime/src/groups.rs
@@ -34,6 +34,22 @@ pub(crate) struct GroupServiceHandle {
state: Arc<RwLock<GroupServiceState>>,
}
+pub(crate) enum GroupEventWrite {
+ Stored(Vec<StoreOffset>),
+ Duplicate,
+}
+
+pub(crate) enum GroupEventWriteError {
+ Rejected(GroupError),
+ Storage(BaseRelayError),
+}
+
+impl From<BaseRelayError> for GroupEventWriteError {
+ fn from(error: BaseRelayError) -> Self {
+ Self::Storage(error)
+ }
+}
+
pub struct GroupProjectionReadGuard<'a> {
state: RwLockReadGuard<'a, GroupServiceState>,
}
@@ -92,19 +108,6 @@ impl GroupServiceHandle {
.outbox_pending_events()
}
- pub(crate) fn check_event(
- &self,
- store: &PocketStoreHandle,
- event: &Event,
- class: &GroupEventClass,
- auth: &GroupAuthContext,
- ) -> Result<(), GroupError> {
- self.state
- .read()
- .map_err(|_| GroupError::internal("group service state lock is poisoned"))?
- .check_event(store, event, class, auth)
- }
-
pub(crate) fn event_visible_to_auth(
&self,
event: &(impl GroupEventView + ?Sized),
@@ -116,17 +119,17 @@ impl GroupServiceHandle {
.event_visible_to_auth(event, auth)
}
- pub(crate) fn after_source_event_stored(
+ pub(crate) fn store_group_event(
&self,
store: &PocketStoreHandle,
event: &Event,
class: &GroupEventClass,
- store_offset: StoreOffset,
- ) -> Result<Vec<StoreOffset>, BaseRelayError> {
+ auth: &GroupAuthContext,
+ ) -> Result<GroupEventWrite, GroupEventWriteError> {
self.state
.write()
.map_err(|_| BaseRelayError::error("group service state lock is poisoned"))?
- .after_source_event_stored(store, event, class, store_offset)
+ .store_group_event(store, event, class, auth)
}
}
@@ -226,6 +229,33 @@ impl GroupServiceState {
Ok(())
}
+ fn store_group_event(
+ &mut self,
+ store: &PocketStoreHandle,
+ event: &Event,
+ class: &GroupEventClass,
+ auth: &GroupAuthContext,
+ ) -> Result<GroupEventWrite, GroupEventWriteError> {
+ self.check_event(store, event, class, auth)
+ .map_err(GroupEventWriteError::Rejected)?;
+ if store
+ .event_by_id(pocket_event_id(event.id())?)
+ .map_err(BaseRelayError::from)?
+ .is_some()
+ {
+ return Ok(GroupEventWrite::Duplicate);
+ }
+ let pocket_event = tangle_event_to_pocket(event)?;
+ let store_offset = StoreOffset::new(
+ store
+ .store_event(&pocket_event)
+ .map_err(BaseRelayError::from)?,
+ );
+ let mut stored_offsets = vec![store_offset];
+ stored_offsets.extend(self.after_source_event_stored(store, event, class, store_offset)?);
+ Ok(GroupEventWrite::Stored(stored_offsets))
+ }
+
fn event_visible_to_auth(
&self,
event: &(impl GroupEventView + ?Sized),
diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs
@@ -1,5 +1,7 @@
use crate::errors::{BaseRelayError, ok_accepted, ok_rejected};
-use crate::groups::{GroupProjectionReadGuard, GroupServiceHandle};
+use crate::groups::{
+ GroupEventWrite, GroupEventWriteError, GroupProjectionReadGuard, GroupServiceHandle,
+};
use crate::logging::{self, TangleModerationAuditResult};
use crate::ops::BaseRelayReadinessState;
use crate::pocket_conversion::{
@@ -820,22 +822,42 @@ impl BaseRelay {
"blocked: NIP-29 group events are not accepted before group service".to_owned(),
)));
};
- if let Err(error) = groups.check_event(store, &event, &class, auth) {
- logging::log_group_moderation_audit(
- &event,
- &class,
- TangleModerationAuditResult::Rejected,
- );
- return Ok(BaseRelayEventWrite::unstored(ok_rejected(
- event_id,
- error.prefixed_message(),
- )));
+ match groups.store_group_event(store, &event, &class, auth) {
+ Ok(GroupEventWrite::Stored(stored_offsets)) => {
+ logging::log_group_moderation_audit(
+ &event,
+ &class,
+ TangleModerationAuditResult::Accepted,
+ );
+ return Ok(BaseRelayEventWrite::stored(
+ ok_accepted(event_id, String::new()),
+ stored_offsets,
+ ));
+ }
+ Ok(GroupEventWrite::Duplicate) => {
+ logging::log_group_moderation_audit(
+ &event,
+ &class,
+ TangleModerationAuditResult::Accepted,
+ );
+ return Ok(BaseRelayEventWrite::unstored(ok_accepted(
+ event_id,
+ "duplicate: already have this event".to_owned(),
+ )));
+ }
+ Err(GroupEventWriteError::Rejected(error)) => {
+ logging::log_group_moderation_audit(
+ &event,
+ &class,
+ TangleModerationAuditResult::Rejected,
+ );
+ return Ok(BaseRelayEventWrite::unstored(ok_rejected(
+ event_id,
+ error.prefixed_message(),
+ )));
+ }
+ Err(GroupEventWriteError::Storage(error)) => return Err(error),
}
- logging::log_group_moderation_audit(
- &event,
- &class,
- TangleModerationAuditResult::Accepted,
- );
}
if event.unsigned().kind().is_ephemeral() {
return Ok(BaseRelayEventWrite::unstored(ok_accepted(
@@ -851,20 +873,9 @@ impl BaseRelay {
}
let pocket_event = tangle_event_to_pocket(&event)?;
let store_offset = StoreOffset::new(store.store_event(&pocket_event)?);
- let mut stored_offsets = vec![store_offset];
- if !matches!(class, GroupEventClass::NonGroup)
- && let Some(groups) = groups
- {
- stored_offsets.extend(groups.after_source_event_stored(
- store,
- &event,
- &class,
- store_offset,
- )?);
- }
Ok(BaseRelayEventWrite::stored(
ok_accepted(event_id, String::new()),
- stored_offsets,
+ vec![store_offset],
))
}
@@ -1966,6 +1977,18 @@ mod tests {
}
#[test]
+ fn group_write_source_uses_atomic_service_boundary() {
+ let core_source = include_str!("core.rs");
+ let group_source = include_str!("../groups.rs");
+
+ assert!(core_source.contains("groups.store_group_event"));
+ assert!(!core_source.contains(concat!("groups.", "check_event")));
+ assert!(!core_source.contains(concat!("groups.", "after_source_event_stored")));
+ assert!(!group_source.contains("pub(crate) fn check_event("));
+ assert!(!group_source.contains("pub(crate) fn after_source_event_stored("));
+ }
+
+ #[test]
fn base_relay_event_path_preserves_chorus_parity() {
let owner = signer(7).public_key().clone();
let relay = test_relay_with_groups(