commit 4adb17b4bde08afde5a97457ad3d66759ac608d8
parent 6c39cceaef7f291da955931f69893b52fab17491
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 05:28:39 -0700
groups: apply checkpointed canonical replay
- Replay canonical group events after the current projection checkpoint offset during startup.
- Persist the refreshed projection snapshot and advanced max store offset checkpoint after replay.
- Keep rebuild checkpoints anchored to the maximum canonical group store offset under projection-order sorting.
- Validated with cargo fmt --all -- --check, cargo check --workspace --all-targets, cargo test --workspace, and cargo clippy --workspace --all-targets -- -D warnings.
Diffstat:
3 files changed, 133 insertions(+), 5 deletions(-)
diff --git a/crates/tangle_groups/src/projection.rs b/crates/tangle_groups/src/projection.rs
@@ -920,7 +920,11 @@ pub fn rebuild_group_projection(
let mut skipped_events = 0;
let mut last_offset = None;
for item in events {
- last_offset = Some(item.store_offset());
+ last_offset = Some(
+ last_offset
+ .map(|current: StoreOffset| current.max(item.store_offset()))
+ .unwrap_or_else(|| item.store_offset()),
+ );
match projection.apply_canonical_event(item.event(), item.store_offset(), limits)? {
ProjectionApplyOutcome::Applied => applied_events += 1,
ProjectionApplyOutcome::Ignored => ignored_events += 1,
@@ -1578,6 +1582,15 @@ mod tests {
),
CanonicalGroupEvent::new(
event(
+ 1,
+ "40",
+ 5,
+ vec![Tag::from_parts("h", &["Farm"]).expect("h")],
+ ),
+ StoreOffset::new(99),
+ ),
+ CanonicalGroupEvent::new(
+ event(
KIND_GROUP_CREATE_GROUP,
"10",
10,
@@ -1610,7 +1623,8 @@ mod tests {
assert_eq!(group.metadata().name(), Some("New"));
assert_eq!(report.applied_events(), 3);
- assert_eq!(report.last_offset(), Some(StoreOffset::new(3)));
+ assert_eq!(report.ignored_events(), 1);
+ assert_eq!(report.last_offset(), Some(StoreOffset::new(99)));
assert!(
report
.projection()
diff --git a/crates/tangle_runtime/src/groups.rs b/crates/tangle_runtime/src/groups.rs
@@ -438,8 +438,10 @@ fn load_group_storage(
}
let checkpoint = checkpoint_status.checkpoint().cloned();
let projection_records = store.scan_extra_records(TANGLE_GROUP_PROJECTION_TABLE)?;
+ let mut projection = load_group_projection(projection_records, checkpoint)?;
+ apply_canonical_events_after_checkpoint(store, &mut projection, limits)?;
Ok(GroupStorageState {
- projection: load_group_projection(projection_records, checkpoint)?,
+ projection,
outbox: load_group_outbox(outbox_records)?,
})
}
@@ -486,6 +488,32 @@ fn load_group_outbox(records: Vec<(Vec<u8>, Vec<u8>)>) -> Result<GroupOutbox, Ba
Ok(outbox)
}
+fn apply_canonical_events_after_checkpoint(
+ store: &PocketStoreHandle,
+ projection: &mut GroupProjection,
+ limits: GroupLimitsConfig,
+) -> Result<(), BaseRelayError> {
+ let last_offset = projection
+ .checkpoint()
+ .and_then(ProjectionCheckpoint::last_offset);
+ let scan = scan_canonical_group_events_after(store, last_offset, limits)?;
+ if scan.events().is_empty() {
+ return Ok(());
+ }
+ let mut events = scan.into_events();
+ let next_offset = events.iter().map(CanonicalGroupEvent::store_offset).max();
+ events.sort_by_key(CanonicalGroupEvent::tuple);
+ for item in events {
+ projection.apply_canonical_event(item.event(), item.store_offset(), limits)?;
+ }
+ projection.set_checkpoint(ProjectionCheckpoint::current(
+ next_offset,
+ projection_rebuilt_at()?,
+ ));
+ persist_group_projection_snapshot(store, projection)?;
+ validate_rebuilt_group_projection(store)
+}
+
fn persist_group_projection_snapshot(
store: &PocketStoreHandle,
projection: &GroupProjection,
diff --git a/crates/tangle_runtime/tests/base_relay_v2.rs b/crates/tangle_runtime/tests/base_relay_v2.rs
@@ -5,8 +5,8 @@ use tangle_crypto::{event_id_matches, verify_event_signature};
use tangle_groups::{
GroupId, GroupRuntimeConfig, KIND_GROUP_ADMINS, KIND_GROUP_DELETE_GROUP,
KIND_GROUP_JOIN_REQUEST, KIND_GROUP_LEAVE_REQUEST, KIND_GROUP_MEMBERS, KIND_GROUP_METADATA,
- KIND_GROUP_PUT_USER, MemberStatus, NIP29_RELAY_GENERATED_KIND_VALUES,
- parse_group_runtime_config_json,
+ KIND_GROUP_PUT_USER, MemberStatus, NIP29_RELAY_GENERATED_KIND_VALUES, ProjectionCheckpoint,
+ StoreOffset, member_current_key, parse_group_runtime_config_json, projection_checkpoint_key,
};
use tangle_protocol::{
Event, Filter, RawEventJson, RelayMessage, SubscriptionId, Tag, UnixTimestamp,
@@ -995,6 +995,54 @@ fn projection_rebuild_after_restart_matches_live_state_and_outbox_is_idempotent(
}
#[test]
+fn projection_applies_canonical_events_after_checkpoint_on_restart() {
+ let config = test_store_config("projection-incremental");
+ let owner_auth = authenticated(FixtureKey::Owner);
+ let admin_auth = authenticated(FixtureKey::Admin);
+ let create =
+ tangle_v2_group_create_event(FixtureKey::Owner, "IncrementalFarm", 1, &[]).expect("create");
+ let put = tangle_v2_put_user_event(FixtureKey::Admin, "IncrementalFarm", FixtureKey::Member, 2)
+ .expect("put");
+ {
+ let mut relay = BaseRelay::open_with_groups(&config, 8, &group_config()).expect("relay");
+ assert_accepted(
+ relay
+ .handle_event_with_auth(create.clone(), &owner_auth)
+ .expect("create"),
+ &create,
+ );
+ assert_accepted(
+ relay
+ .handle_event_with_auth(put.clone(), &admin_auth)
+ .expect("put"),
+ &put,
+ );
+ relay.shutdown().expect("shutdown");
+ }
+ let create_offset = stored_event_offset(&config, &create);
+ regress_member_projection_to_checkpoint(&config, create_offset, "IncrementalFarm");
+
+ let relay = BaseRelay::open_with_groups(&config, 8, &group_config()).expect("reopen");
+ assert_eq!(
+ relay
+ .group_projection()
+ .expect("projection")
+ .member(&group("IncrementalFarm"), &FixtureKey::Member.public_key())
+ .expect("member")
+ .status(),
+ MemberStatus::Member
+ );
+ let validation = group_extra_table_validation(&config);
+ assert!(matches!(
+ validation.checkpoint_status(),
+ GroupCheckpointStatus::Current { checkpoint }
+ if checkpoint
+ .last_offset()
+ .is_some_and(|offset| offset.as_u64() > create_offset)
+ ));
+}
+
+#[test]
fn same_timestamp_conflicts_are_deterministic_across_ingest_order() {
let first = tangle_v2_group_metadata_event(FixtureKey::Owner, "ClockFarm", "Alpha", 100, &[])
.expect("first");
@@ -1140,6 +1188,44 @@ fn group_extra_table_validation(
validate_group_extra_tables(&store).expect("validation")
}
+fn stored_event_offset(config: &PocketStoreConfig, event: &Event) -> u64 {
+ let store = PocketStoreHandle::open(config).expect("store");
+ store
+ .scan_events()
+ .expect("events")
+ .into_iter()
+ .find(|stored| stored.event().id().as_hex_string() == event.id().as_str())
+ .expect("stored event")
+ .store_offset()
+}
+
+fn regress_member_projection_to_checkpoint(
+ config: &PocketStoreConfig,
+ checkpoint_offset: u64,
+ group_id: &str,
+) {
+ let store = PocketStoreHandle::open(config).expect("store");
+ let group_id = GroupId::new(group_id).expect("group");
+ let checkpoint = ProjectionCheckpoint::current(
+ Some(StoreOffset::new(checkpoint_offset)),
+ UnixTimestamp::new(1_714_999_999),
+ );
+ store
+ .put_extra_record(
+ TANGLE_GROUP_CHECKPOINT_TABLE,
+ &projection_checkpoint_key(),
+ &checkpoint.to_json_bytes().expect("checkpoint"),
+ )
+ .expect("checkpoint");
+ store
+ .delete_extra_record(
+ TANGLE_GROUP_PROJECTION_TABLE,
+ &member_current_key(&group_id, &FixtureKey::Member.public_key()),
+ )
+ .expect("delete member");
+ store.sync().expect("sync");
+}
+
fn temp_root(name: &str) -> PathBuf {
std::env::temp_dir().join(format!("tangle-rcld12-{name}-{}", std::process::id()))
}