commit bda28d8893f1f3384956901d5e02562b9d7fb201
parent 4adb17b4bde08afde5a97457ad3d66759ac608d8
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 05:33:48 -0700
groups: derive missing outbox records
- Rebuild missing group outbox records from canonical source events during startup recovery.
- Share one deterministic outbox planning path between live writes and recovery derivation.
- Skip relay-authored generated events during source-side derivation for this recovery slice.
- Validated with cargo fmt --all -- --check, cargo check --workspace --all-targets, cargo test --workspace, and cargo clippy --workspace --all-targets -- -D warnings.
Diffstat:
2 files changed, 170 insertions(+), 123 deletions(-)
diff --git a/crates/tangle_runtime/src/groups.rs b/crates/tangle_runtime/src/groups.rs
@@ -63,6 +63,7 @@ impl GroupService {
limits: config.limits(),
member_snapshot_cap: config.limits().max_member_list_pubkeys(),
};
+ service.derive_missing_outbox_records(store)?;
service.materialize_outbox(store)?;
store.sync()?;
Ok(Some(service))
@@ -177,129 +178,44 @@ impl GroupService {
event: &Event,
class: &GroupEventClass,
) -> Result<Vec<GroupOutboxRecord>, GroupError> {
- let created_at = event.unsigned().created_at();
- match class {
- GroupEventClass::Moderation { kind, group_id } => match kind.as_u32() {
- KIND_GROUP_CREATE_GROUP => {
- let group = self.require_group(group_id)?;
- Ok(vec![
- self.pending_record(
- event,
- GroupOutboxEffect::MetadataSnapshot,
- group_id,
- None,
- GroupGeneratedEventBuilder::metadata_snapshot_payload(
- group, created_at,
- )?,
- ),
- self.pending_record(
- event,
- GroupOutboxEffect::AdminListSnapshot,
- group_id,
- None,
- GroupGeneratedEventBuilder::admin_list_snapshot_payload(
- group_id,
- &self.projection,
- &self.authority,
- created_at,
- )?,
- ),
- ])
- }
- KIND_GROUP_EDIT_METADATA => {
- let group = self.require_group(group_id)?;
- Ok(vec![self.pending_record(
- event,
- GroupOutboxEffect::MetadataSnapshot,
- group_id,
- None,
- GroupGeneratedEventBuilder::metadata_snapshot_payload(group, created_at)?,
- )])
- }
- KIND_GROUP_PUT_USER | KIND_GROUP_REMOVE_USER => {
- Ok(self.member_snapshot_record(event, group_id, created_at)?)
- }
- _ => Ok(Vec::new()),
- },
- GroupEventClass::Normal { group_id } => match event.unsigned().kind().as_u32() {
- KIND_GROUP_JOIN_REQUEST => Ok(vec![self.pending_record(
- event,
- GroupOutboxEffect::JoinAccepted,
- group_id,
- Some(event.unsigned().pubkey().clone()),
- GroupGeneratedEventBuilder::join_accepted_payload(
- group_id,
- event.unsigned().pubkey(),
- created_at,
- ),
- )]),
- KIND_GROUP_LEAVE_REQUEST => Ok(vec![self.pending_record(
- event,
- GroupOutboxEffect::LeaveAccepted,
- group_id,
- Some(event.unsigned().pubkey().clone()),
- GroupGeneratedEventBuilder::leave_accepted_payload(
- group_id,
- event.unsigned().pubkey(),
- created_at,
- ),
- )]),
- _ => Ok(Vec::new()),
- },
- GroupEventClass::NonGroup | GroupEventClass::RelayGeneratedSnapshot { .. } => {
- Ok(Vec::new())
- }
- }
- }
-
- fn member_snapshot_record(
- &self,
- event: &Event,
- group_id: &GroupId,
- created_at: UnixTimestamp,
- ) -> Result<Vec<GroupOutboxRecord>, GroupError> {
- let key = GroupOutboxKey::new(
- event.id().clone(),
- GroupOutboxEffect::MemberListSnapshot,
- group_id.clone(),
- None,
- );
- let payload = GroupGeneratedEventBuilder::member_list_snapshot_payload(
- group_id,
+ plan_group_outbox_records(
+ event,
+ class,
&self.projection,
- created_at,
+ &self.authority,
self.member_snapshot_cap,
- )?;
- Ok(vec![match payload {
- Some(payload) => GroupOutboxRecord::pending(key, payload),
- None => {
- let mut record = GroupOutboxRecord::pending(
- key,
- GroupOutboxPayload::new(
- KIND_GROUP_MEMBERS,
- created_at,
- vec![vec!["d".to_owned(), group_id.as_str().to_owned()]],
- "",
- ),
- );
- record.mark_skipped("member snapshot exceeds configured cap");
- record
- }
- }])
+ )
}
- fn pending_record(
- &self,
- event: &Event,
- effect: GroupOutboxEffect,
- group_id: &GroupId,
- target_pubkey: Option<PublicKeyHex>,
- payload: GroupOutboxPayload,
- ) -> GroupOutboxRecord {
- GroupOutboxRecord::pending(
- GroupOutboxKey::new(event.id().clone(), effect, group_id.clone(), target_pubkey),
- payload,
- )
+ fn derive_missing_outbox_records(
+ &mut self,
+ store: &PocketStoreHandle,
+ ) -> Result<(), BaseRelayError> {
+ let relay_pubkey = self.builder.relay_pubkey().clone();
+ let scan = scan_canonical_group_events(store, self.limits)?;
+ let mut projection = GroupProjection::new();
+ let mut events = scan.into_events();
+ events.sort_by_key(CanonicalGroupEvent::tuple);
+ for item in events {
+ let class = tangle_groups::classify_group_event(item.event(), self.limits)?;
+ projection.apply_canonical_event(item.event(), item.store_offset(), self.limits)?;
+ if item.event().unsigned().pubkey() == &relay_pubkey {
+ continue;
+ }
+ for record in plan_group_outbox_records(
+ item.event(),
+ &class,
+ &projection,
+ &self.authority,
+ self.member_snapshot_cap,
+ )? {
+ let inserted = self.outbox.insert_idempotent(record.clone())?;
+ if inserted {
+ persist_outbox_record(store, &record)?;
+ }
+ }
+ }
+ Ok(())
}
fn materialize_outbox(&mut self, store: &PocketStoreHandle) -> Result<(), BaseRelayError> {
@@ -405,14 +321,144 @@ impl GroupService {
}
Ok(())
}
+}
- fn require_group(&self, group_id: &GroupId) -> Result<&GroupState, GroupError> {
- self.projection
- .group(group_id)
- .ok_or_else(|| GroupError::internal("group projection is missing after accepted write"))
+fn plan_group_outbox_records(
+ event: &Event,
+ class: &GroupEventClass,
+ projection: &GroupProjection,
+ authority: &GroupAuthority,
+ member_snapshot_cap: u32,
+) -> Result<Vec<GroupOutboxRecord>, GroupError> {
+ let created_at = event.unsigned().created_at();
+ match class {
+ GroupEventClass::Moderation { kind, group_id } => match kind.as_u32() {
+ KIND_GROUP_CREATE_GROUP => {
+ let group = require_projected_group(projection, group_id)?;
+ Ok(vec![
+ pending_record(
+ event,
+ GroupOutboxEffect::MetadataSnapshot,
+ group_id,
+ None,
+ GroupGeneratedEventBuilder::metadata_snapshot_payload(group, created_at)?,
+ ),
+ pending_record(
+ event,
+ GroupOutboxEffect::AdminListSnapshot,
+ group_id,
+ None,
+ GroupGeneratedEventBuilder::admin_list_snapshot_payload(
+ group_id, projection, authority, created_at,
+ )?,
+ ),
+ ])
+ }
+ KIND_GROUP_EDIT_METADATA => {
+ let group = require_projected_group(projection, group_id)?;
+ Ok(vec![pending_record(
+ event,
+ GroupOutboxEffect::MetadataSnapshot,
+ group_id,
+ None,
+ GroupGeneratedEventBuilder::metadata_snapshot_payload(group, created_at)?,
+ )])
+ }
+ KIND_GROUP_PUT_USER | KIND_GROUP_REMOVE_USER => {
+ member_snapshot_record(event, group_id, projection, created_at, member_snapshot_cap)
+ }
+ _ => Ok(Vec::new()),
+ },
+ GroupEventClass::Normal { group_id } => match event.unsigned().kind().as_u32() {
+ KIND_GROUP_JOIN_REQUEST => Ok(vec![pending_record(
+ event,
+ GroupOutboxEffect::JoinAccepted,
+ group_id,
+ Some(event.unsigned().pubkey().clone()),
+ GroupGeneratedEventBuilder::join_accepted_payload(
+ group_id,
+ event.unsigned().pubkey(),
+ created_at,
+ ),
+ )]),
+ KIND_GROUP_LEAVE_REQUEST => Ok(vec![pending_record(
+ event,
+ GroupOutboxEffect::LeaveAccepted,
+ group_id,
+ Some(event.unsigned().pubkey().clone()),
+ GroupGeneratedEventBuilder::leave_accepted_payload(
+ group_id,
+ event.unsigned().pubkey(),
+ created_at,
+ ),
+ )]),
+ _ => Ok(Vec::new()),
+ },
+ GroupEventClass::NonGroup | GroupEventClass::RelayGeneratedSnapshot { .. } => {
+ Ok(Vec::new())
+ }
}
}
+fn member_snapshot_record(
+ event: &Event,
+ group_id: &GroupId,
+ projection: &GroupProjection,
+ created_at: UnixTimestamp,
+ member_snapshot_cap: u32,
+) -> Result<Vec<GroupOutboxRecord>, GroupError> {
+ let key = GroupOutboxKey::new(
+ event.id().clone(),
+ GroupOutboxEffect::MemberListSnapshot,
+ group_id.clone(),
+ None,
+ );
+ let payload = GroupGeneratedEventBuilder::member_list_snapshot_payload(
+ group_id,
+ projection,
+ created_at,
+ member_snapshot_cap,
+ )?;
+ Ok(vec![match payload {
+ Some(payload) => GroupOutboxRecord::pending(key, payload),
+ None => {
+ let mut record = GroupOutboxRecord::pending(
+ key,
+ GroupOutboxPayload::new(
+ KIND_GROUP_MEMBERS,
+ created_at,
+ vec![vec!["d".to_owned(), group_id.as_str().to_owned()]],
+ "",
+ ),
+ );
+ record.mark_skipped("member snapshot exceeds configured cap");
+ record
+ }
+ }])
+}
+
+fn pending_record(
+ event: &Event,
+ effect: GroupOutboxEffect,
+ group_id: &GroupId,
+ target_pubkey: Option<PublicKeyHex>,
+ payload: GroupOutboxPayload,
+) -> GroupOutboxRecord {
+ GroupOutboxRecord::pending(
+ GroupOutboxKey::new(event.id().clone(), effect, group_id.clone(), target_pubkey),
+ payload,
+ )
+}
+
+fn require_projected_group<'a>(
+ projection: &'a GroupProjection,
+ group_id: &GroupId,
+) -> Result<&'a GroupState, GroupError> {
+ projection
+ .group(group_id)
+ .ok_or_else(|| GroupError::internal("group projection is missing after accepted write"))
+}
+
#[derive(Debug, Clone, PartialEq, Eq)]
struct GroupStorageState {
projection: GroupProjection,
diff --git a/crates/tangle_runtime/tests/base_relay_v2.rs b/crates/tangle_runtime/tests/base_relay_v2.rs
@@ -983,6 +983,7 @@ fn projection_rebuild_after_restart_matches_live_state_and_outbox_is_idempotent(
assert_eq!(count_kind(&relay, KIND_GROUP_MEMBERS), 1);
let validation = group_extra_table_validation(&config);
assert!(validation.projection_records() > 0);
+ assert_eq!(validation.outbox_records(), 3);
assert!(matches!(
validation.checkpoint_status(),
&GroupCheckpointStatus::Current { .. }