commit f4bbc80895292b97aa04f494a0128489447c2391
parent 1c358170ac329f299e2e8f28804588e50bcfde0c
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 06:05:55 -0700
groups: cover crash recovery equivalence
- Treat generated events already present anywhere in canonical Pocket storage as stored during outbox replay.
- Add source-store and pending-outbox crash recovery tests for rebuilt projection and generated state.
- Add live-versus-rebuilt projection equivalence coverage across metadata, role, delete-event, member removal, and group deletion flows.
- Validated with cargo fmt --all -- --check, cargo check --workspace --all-targets, cargo test --workspace, and cargo clippy --workspace --all-targets -- -D warnings.
Diffstat:
2 files changed, 394 insertions(+), 17 deletions(-)
diff --git a/crates/tangle_runtime/src/groups.rs b/crates/tangle_runtime/src/groups.rs
@@ -299,7 +299,7 @@ impl GroupService {
record: &GroupOutboxRecord,
) -> Result<(EventId, Option<StoreOffset>), BaseRelayError> {
let event = self.builder.sign_payload(record.payload())?;
- if store.event_by_id(pocket_event_id(event.id())?)?.is_some() {
+ if generated_event_already_stored(store, event.id())? {
return Ok((event.id().clone(), None));
}
let pocket_event = tangle_event_to_pocket(&event)?;
@@ -961,6 +961,21 @@ fn persist_outbox_record(
Ok(())
}
+fn generated_event_already_stored(
+ store: &PocketStoreHandle,
+ event_id: &EventId,
+) -> Result<bool, BaseRelayError> {
+ if store.event_by_id(pocket_event_id(event_id)?)?.is_some() {
+ return Ok(true);
+ }
+ for stored in store.scan_events()? {
+ if stored.event().id().as_hex_string() == event_id.as_str() {
+ return Ok(true);
+ }
+ }
+ Ok(false)
+}
+
fn class_group_id(class: &GroupEventClass) -> Option<&GroupId> {
match class {
GroupEventClass::Moderation { group_id, .. }
diff --git a/crates/tangle_runtime/tests/base_relay_v2.rs b/crates/tangle_runtime/tests/base_relay_v2.rs
@@ -3,14 +3,15 @@
use std::{fs, panic, path::PathBuf};
use tangle_crypto::{event_id_matches, verify_event_signature};
use tangle_groups::{
- GroupId, GroupOutboxRecord, GroupOutboxStatus, GroupRuntimeConfig, KIND_GROUP_ADMINS,
- KIND_GROUP_DELETE_GROUP, KIND_GROUP_JOIN_REQUEST, KIND_GROUP_LEAVE_REQUEST, KIND_GROUP_MEMBERS,
- KIND_GROUP_METADATA, KIND_GROUP_PUT_USER, MemberStatus, NIP29_RELAY_GENERATED_KIND_VALUES,
- PERMANENT_RELAY_OVERRIDE_ROLE, ProjectionCheckpoint, StoreOffset, member_current_key,
- parse_group_runtime_config_json, projection_checkpoint_key,
+ GroupAuthority, GroupGeneratedEventBuilder, GroupId, GroupLimitsConfig, GroupOutboxEffect,
+ GroupOutboxKey, GroupOutboxRecord, GroupOutboxStatus, GroupProjection, GroupRuntimeConfig,
+ KIND_GROUP_ADMINS, KIND_GROUP_DELETE_GROUP, KIND_GROUP_JOIN_REQUEST, KIND_GROUP_LEAVE_REQUEST,
+ KIND_GROUP_MEMBERS, KIND_GROUP_METADATA, KIND_GROUP_PUT_USER, MemberStatus,
+ NIP29_RELAY_GENERATED_KIND_VALUES, PERMANENT_RELAY_OVERRIDE_ROLE, ProjectionCheckpoint,
+ StoreOffset, member_current_key, parse_group_runtime_config_json, projection_checkpoint_key,
};
use tangle_protocol::{
- Event, Filter, RawEventJson, RelayMessage, SubscriptionId, Tag, UnixTimestamp,
+ Event, Filter, RawEventJson, RelayMessage, SubscriptionId, Tag, UnixTimestamp, event_to_value,
filter_from_value, parse_client_message, parse_event_json,
};
use tangle_runtime::{
@@ -20,14 +21,15 @@ use tangle_runtime::{
};
use tangle_store_pocket::{
PocketStoreConfig, PocketStoreHandle, PocketSyncPolicy, TANGLE_GROUP_CHECKPOINT_TABLE,
- TANGLE_GROUP_OUTBOX_TABLE, TANGLE_GROUP_PROJECTION_TABLE,
+ TANGLE_GROUP_OUTBOX_TABLE, TANGLE_GROUP_PROJECTION_TABLE, parse_pocket_event_json,
};
use tangle_test_support::{
FixtureKey, TANGLE_V2_RELAY_SECRET_HEX, TANGLE_V2_RELAY_URL, tangle_v2_auth_event,
- tangle_v2_delete_group_event, tangle_v2_event, tangle_v2_group_config,
- tangle_v2_group_create_event, tangle_v2_group_event, tangle_v2_group_metadata_event,
- tangle_v2_group_tag, tangle_v2_join_event, tangle_v2_leave_event, tangle_v2_pubkey_tag,
- tangle_v2_put_user_event, tangle_v2_remove_user_event, tangle_v2_tag,
+ tangle_v2_delete_event_event, tangle_v2_delete_group_event, tangle_v2_event,
+ tangle_v2_group_config, tangle_v2_group_create_event, tangle_v2_group_event,
+ tangle_v2_group_metadata_event, tangle_v2_group_tag, tangle_v2_join_event,
+ tangle_v2_leave_event, tangle_v2_pubkey_tag, tangle_v2_put_user_event,
+ tangle_v2_remove_user_event, tangle_v2_tag,
};
#[test]
@@ -1115,13 +1117,177 @@ fn projection_applies_canonical_events_after_checkpoint_on_restart() {
MemberStatus::Member
);
let validation = group_extra_table_validation(&config);
- assert!(matches!(
- validation.checkpoint_status(),
- GroupCheckpointStatus::Current { checkpoint }
- if checkpoint
+ match validation.checkpoint_status() {
+ GroupCheckpointStatus::Current { checkpoint } => assert!(
+ checkpoint
.last_offset()
.is_some_and(|offset| offset.as_u64() > create_offset)
- ));
+ ),
+ status => panic!("expected current checkpoint, got {status:?}"),
+ }
+}
+
+#[test]
+fn source_store_crash_recovery_rebuilds_projection_outbox_and_generated_events() {
+ let config = test_store_config("source-store-crash-recovery");
+ let create =
+ tangle_v2_group_create_event(FixtureKey::Owner, "CrashFarm", 1, &[]).expect("create");
+ let put = tangle_v2_put_user_event(FixtureKey::Admin, "CrashFarm", FixtureKey::Member, 2)
+ .expect("put");
+
+ store_source_events(&config, &[create, put]);
+
+ let relay = BaseRelay::open_with_groups(&config, 8, &group_config()).expect("reopen");
+ assert_eq!(
+ relay
+ .readiness_state()
+ .response()
+ .checks
+ .group_outbox_replay,
+ "ready"
+ );
+ assert!(
+ relay
+ .group_projection()
+ .expect("projection")
+ .group(&group("CrashFarm"))
+ .is_some()
+ );
+ assert_eq!(
+ relay
+ .group_projection()
+ .expect("projection")
+ .member(&group("CrashFarm"), &FixtureKey::Member.public_key())
+ .expect("member")
+ .status(),
+ MemberStatus::Member
+ );
+ assert_eq!(
+ stored_event_ids_for_kind(&config, KIND_GROUP_METADATA).len(),
+ 1
+ );
+ assert_eq!(
+ stored_event_ids_for_kind(&config, KIND_GROUP_ADMINS).len(),
+ 1
+ );
+ assert_eq!(
+ stored_event_ids_for_kind(&config, KIND_GROUP_MEMBERS).len(),
+ 1
+ );
+ let counts = outbox_status_counts(&config);
+ assert_eq!(counts.pending, 0);
+ assert_eq!(counts.retryable, 0);
+ assert_eq!(counts.stored, 3);
+}
+
+#[test]
+fn rebuilt_projection_matches_live_projection_for_moderation_stream() {
+ let config = test_store_config("projection-equivalence");
+ let owner_auth = authenticated(FixtureKey::Owner);
+ let admin_auth = authenticated(FixtureKey::Admin);
+ let member_auth = authenticated(FixtureKey::Member);
+ let live_projection;
+ let metadata_before;
+ let admins_before;
+ let members_before;
+
+ {
+ let mut relay = BaseRelay::open_with_groups(&config, 16, &group_config()).expect("relay");
+ accept_group_create(&mut relay, "EquivFarm", &[], 1, &owner_auth);
+ let metadata =
+ tangle_v2_group_metadata_event(FixtureKey::Admin, "EquivFarm", "Market", 2, &[])
+ .expect("metadata");
+ assert_accepted(
+ relay
+ .handle_event_with_auth(metadata.clone(), &admin_auth)
+ .expect("metadata"),
+ &metadata,
+ );
+ let promote = tangle_v2_put_user_event_with_roles(
+ FixtureKey::Admin,
+ "EquivFarm",
+ FixtureKey::Member,
+ 3,
+ &[PERMANENT_RELAY_OVERRIDE_ROLE],
+ );
+ assert_accepted(
+ relay
+ .handle_event_with_auth(promote.clone(), &admin_auth)
+ .expect("promote"),
+ &promote,
+ );
+ let normal = tangle_v2_group_event(FixtureKey::Member, "EquivFarm", 4, 1, "harvest")
+ .expect("normal");
+ assert_accepted(
+ relay
+ .handle_event_with_auth(normal.clone(), &member_auth)
+ .expect("normal"),
+ &normal,
+ );
+ let delete_event = tangle_v2_delete_event_event(FixtureKey::Admin, "EquivFarm", &normal, 5)
+ .expect("delete event");
+ assert_accepted(
+ relay
+ .handle_event_with_auth(delete_event.clone(), &admin_auth)
+ .expect("delete event"),
+ &delete_event,
+ );
+ let demote = tangle_v2_put_user_event_with_roles(
+ FixtureKey::Admin,
+ "EquivFarm",
+ FixtureKey::Member,
+ 6,
+ &[],
+ );
+ assert_accepted(
+ relay
+ .handle_event_with_auth(demote.clone(), &admin_auth)
+ .expect("demote"),
+ &demote,
+ );
+ let remove =
+ tangle_v2_remove_user_event(FixtureKey::Admin, "EquivFarm", FixtureKey::Member, 7)
+ .expect("remove");
+ assert_accepted(
+ relay
+ .handle_event_with_auth(remove.clone(), &admin_auth)
+ .expect("remove"),
+ &remove,
+ );
+ let delete_group =
+ tangle_v2_delete_group_event(FixtureKey::Owner, "EquivFarm", 8).expect("delete group");
+ assert_accepted(
+ relay
+ .handle_event_with_auth(delete_group.clone(), &owner_auth)
+ .expect("delete group"),
+ &delete_group,
+ );
+ live_projection = relay.group_projection().expect("projection").clone();
+ metadata_before = stored_event_ids_for_kind(&config, KIND_GROUP_METADATA);
+ admins_before = stored_event_ids_for_kind(&config, KIND_GROUP_ADMINS);
+ members_before = stored_event_ids_for_kind(&config, KIND_GROUP_MEMBERS);
+ relay.shutdown().expect("shutdown");
+ }
+
+ delete_group_extra_records(&config);
+
+ let relay = BaseRelay::open_with_groups(&config, 16, &group_config()).expect("reopen");
+ assert_projection_without_checkpoint_eq(
+ &live_projection,
+ relay.group_projection().expect("projection"),
+ );
+ assert_eq!(
+ stored_event_ids_for_kind(&config, KIND_GROUP_METADATA),
+ metadata_before
+ );
+ assert_eq!(
+ stored_event_ids_for_kind(&config, KIND_GROUP_ADMINS),
+ admins_before
+ );
+ assert_eq!(
+ stored_event_ids_for_kind(&config, KIND_GROUP_MEMBERS),
+ members_before
+ );
}
#[test]
@@ -1188,6 +1354,52 @@ fn already_stored_generated_events_mark_outbox_stored_without_duplication_on_res
}
#[test]
+fn crash_point_recovery_states_match_live_projection_and_generated_events() {
+ let live_config = test_store_config("crash-equivalence-live");
+ let source_only_config = test_store_config("crash-equivalence-source-only");
+ let pending_outbox_config = test_store_config("crash-equivalence-pending-outbox");
+ let events = recovery_equivalence_events();
+ let expected = {
+ let mut relay =
+ BaseRelay::open_with_groups(&live_config, 8, &group_config()).expect("live");
+ let owner_auth = authenticated(FixtureKey::Owner);
+ let admin_auth = authenticated(FixtureKey::Admin);
+ assert_accepted(
+ relay
+ .handle_event_with_auth(events[0].clone(), &owner_auth)
+ .expect("create"),
+ &events[0],
+ );
+ for event in events.iter().skip(1) {
+ assert_accepted(
+ relay
+ .handle_event_with_auth(event.clone(), &admin_auth)
+ .expect("event"),
+ event,
+ );
+ }
+ recovery_summary(&mut relay, "CrashFarm")
+ };
+
+ store_source_events(&source_only_config, &events);
+ let mut source_only =
+ BaseRelay::open_with_groups(&source_only_config, 8, &group_config()).expect("source only");
+ assert_eq!(recovery_summary(&mut source_only, "CrashFarm"), expected);
+ assert_eq!(outbox_status_counts(&source_only_config).stored, 5);
+
+ let offsets = store_source_events(&pending_outbox_config, &events);
+ seed_pending_create_outbox_records(&pending_outbox_config, &events[0], offsets[0]);
+ let mut pending_outbox =
+ BaseRelay::open_with_groups(&pending_outbox_config, 8, &group_config())
+ .expect("pending outbox");
+ assert_eq!(recovery_summary(&mut pending_outbox, "CrashFarm"), expected);
+ let counts = outbox_status_counts(&pending_outbox_config);
+ assert_eq!(counts.pending, 0);
+ assert_eq!(counts.retryable, 0);
+ assert_eq!(counts.stored, 5);
+}
+
+#[test]
fn same_timestamp_conflicts_are_deterministic_across_ingest_order() {
let first = tangle_v2_group_metadata_event(FixtureKey::Owner, "ClockFarm", "Alpha", 100, &[])
.expect("first");
@@ -1276,6 +1488,156 @@ fn accept_group_create(
);
}
+#[derive(Debug, Clone, PartialEq, Eq)]
+struct RecoverySummary {
+ group_name: Option<String>,
+ member_status: Option<MemberStatus>,
+ member_roles: Vec<String>,
+ metadata_event_ids: Vec<String>,
+ admin_event_ids: Vec<String>,
+ member_event_ids: Vec<String>,
+ latest_admin_pubkeys: Vec<String>,
+}
+
+fn recovery_equivalence_events() -> Vec<Event> {
+ vec![
+ tangle_v2_group_create_event(FixtureKey::Owner, "CrashFarm", 1, &[]).expect("create"),
+ tangle_v2_put_user_event_with_roles(
+ FixtureKey::Admin,
+ "CrashFarm",
+ FixtureKey::Member,
+ 2,
+ &[PERMANENT_RELAY_OVERRIDE_ROLE],
+ ),
+ tangle_v2_group_metadata_event(FixtureKey::Admin, "CrashFarm", "Crash Market", 3, &[])
+ .expect("metadata"),
+ ]
+}
+
+fn recovery_summary(relay: &mut BaseRelay, group_id: &str) -> RecoverySummary {
+ let group_id_model = group(group_id);
+ let (group_name, member_status, member_roles) = {
+ let projection = relay.group_projection().expect("projection");
+ let group_state = projection.group(&group_id_model).expect("group");
+ let member = projection.member(&group_id_model, &FixtureKey::Member.public_key());
+ let mut roles = member
+ .map(|member| {
+ member
+ .roles()
+ .iter()
+ .map(|role| role.as_str().to_owned())
+ .collect::<Vec<_>>()
+ })
+ .unwrap_or_default();
+ roles.sort();
+ (
+ group_state.metadata().name().map(str::to_owned),
+ member.map(|member| member.status()),
+ roles,
+ )
+ };
+ RecoverySummary {
+ group_name,
+ member_status,
+ member_roles,
+ metadata_event_ids: event_ids_for_group_kind(relay, group_id, KIND_GROUP_METADATA),
+ admin_event_ids: event_ids_for_group_kind(relay, group_id, KIND_GROUP_ADMINS),
+ member_event_ids: event_ids_for_group_kind(relay, group_id, KIND_GROUP_MEMBERS),
+ latest_admin_pubkeys: latest_admin_snapshot_pubkeys(relay, group_id),
+ }
+}
+
+fn assert_projection_without_checkpoint_eq(left: &GroupProjection, right: &GroupProjection) {
+ assert_eq!(left.groups(), right.groups());
+ assert_eq!(left.members(), right.members());
+ assert_eq!(left.roles(), right.roles());
+ assert_eq!(left.tombstones(), right.tombstones());
+ assert_eq!(left.event_deletions(), right.event_deletions());
+}
+
+fn event_ids_for_group_kind(relay: &mut BaseRelay, group_id: &str, kind: u32) -> Vec<String> {
+ let mut ids = query_events(
+ relay,
+ &format!("summary-{group_id}-{kind}"),
+ vec![filter_group_tag(kind, "d", group_id)],
+ )
+ .into_iter()
+ .map(|event| event.id().as_str().to_owned())
+ .collect::<Vec<_>>();
+ ids.sort();
+ ids
+}
+
+fn store_source_events(config: &PocketStoreConfig, events: &[Event]) -> Vec<StoreOffset> {
+ let store = PocketStoreHandle::open(config).expect("store");
+ let mut offsets = Vec::new();
+ for event in events {
+ let raw = serde_json::to_vec(&event_to_value(event)).expect("event JSON");
+ let pocket = parse_pocket_event_json(&raw).expect("pocket event");
+ offsets.push(StoreOffset::new(store.store_event(&pocket).expect("store")));
+ }
+ store.sync().expect("sync");
+ offsets
+}
+
+fn seed_pending_create_outbox_records(
+ config: &PocketStoreConfig,
+ create: &Event,
+ create_offset: StoreOffset,
+) {
+ let store = PocketStoreHandle::open(config).expect("store");
+ let group_id = group("CrashFarm");
+ let mut projection = GroupProjection::new();
+ projection
+ .apply_canonical_event(create, create_offset, GroupLimitsConfig::default())
+ .expect("projection");
+ let authority = GroupAuthority::new(
+ [FixtureKey::Owner.public_key()],
+ [FixtureKey::Admin.public_key()],
+ );
+ let group_state = projection.group(&group_id).expect("group");
+ let records = [
+ GroupOutboxRecord::pending(
+ GroupOutboxKey::new(
+ create.id().clone(),
+ GroupOutboxEffect::MetadataSnapshot,
+ group_id.clone(),
+ None,
+ ),
+ GroupGeneratedEventBuilder::metadata_snapshot_payload(
+ group_state,
+ create.unsigned().created_at(),
+ )
+ .expect("metadata payload"),
+ ),
+ GroupOutboxRecord::pending(
+ GroupOutboxKey::new(
+ create.id().clone(),
+ GroupOutboxEffect::AdminListSnapshot,
+ group_id.clone(),
+ None,
+ ),
+ GroupGeneratedEventBuilder::admin_list_snapshot_payload(
+ &group_id,
+ &projection,
+ &authority,
+ create.unsigned().created_at(),
+ )
+ .expect("admin payload"),
+ ),
+ ];
+ for record in records {
+ store
+ .put_extra_record(
+ TANGLE_GROUP_OUTBOX_TABLE,
+ &record.key().storage_key(),
+ &record.to_json_bytes().expect("record bytes"),
+ )
+ .expect("outbox");
+ }
+ store.sync().expect("sync");
+}
+
fn tangle_v2_put_user_event_with_roles(
actor: FixtureKey,
group_id: &str,