commit 6c39cceaef7f291da955931f69893b52fab17491
parent 50f0cae3b64a903e6a8fc23e82e5335b2490fc4b
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 05:24:10 -0700
groups: persist rebuilt projection checkpoint
- Persist rebuilt group projection records and the current checkpoint after canonical replay.
- Clear stale projection side-table records before writing the rebuilt projection snapshot.
- Validate rebuilt group extra tables immediately after checkpoint persistence.
- Validated with cargo fmt --all -- --check, cargo check --workspace --all-targets, cargo test --workspace, and cargo clippy --workspace --all-targets -- -D warnings.
Diffstat:
2 files changed, 86 insertions(+), 2 deletions(-)
diff --git a/crates/tangle_runtime/src/groups.rs b/crates/tangle_runtime/src/groups.rs
@@ -429,6 +429,8 @@ fn load_group_storage(
let scan = scan_canonical_group_events(store, limits)?;
let report =
rebuild_group_projection(scan.into_events(), limits, projection_rebuilt_at()?)?;
+ persist_group_projection_snapshot(store, report.projection())?;
+ validate_rebuilt_group_projection(store)?;
return Ok(GroupStorageState {
projection: report.into_projection(),
outbox: load_group_outbox(outbox_records)?,
@@ -484,6 +486,74 @@ fn load_group_outbox(records: Vec<(Vec<u8>, Vec<u8>)>) -> Result<GroupOutbox, Ba
Ok(outbox)
}
+fn persist_group_projection_snapshot(
+ store: &PocketStoreHandle,
+ projection: &GroupProjection,
+) -> Result<(), BaseRelayError> {
+ clear_extra_table(store, TANGLE_GROUP_PROJECTION_TABLE)?;
+ for (group_id, group) in projection.groups() {
+ store.put_extra_record(
+ TANGLE_GROUP_PROJECTION_TABLE,
+ &group_current_key(group_id),
+ &group.to_json_bytes()?,
+ )?;
+ }
+ for ((group_id, pubkey), member) in projection.members() {
+ store.put_extra_record(
+ TANGLE_GROUP_PROJECTION_TABLE,
+ &member_current_key(group_id, pubkey),
+ &member.to_json_bytes()?,
+ )?;
+ }
+ for ((group_id, role_name), role) in projection.roles() {
+ store.put_extra_record(
+ TANGLE_GROUP_PROJECTION_TABLE,
+ &role_current_key(group_id, role_name),
+ &role.to_json_bytes()?,
+ )?;
+ }
+ for (group_id, tombstone) in projection.tombstones() {
+ store.put_extra_record(
+ TANGLE_GROUP_PROJECTION_TABLE,
+ &tombstone_key(group_id),
+ &tombstone.to_json_bytes()?,
+ )?;
+ }
+ for (target_event_id, deletion) in projection.event_deletions() {
+ store.put_extra_record(
+ TANGLE_GROUP_PROJECTION_TABLE,
+ &event_deletion_key(target_event_id),
+ &deletion.to_json_bytes()?,
+ )?;
+ }
+ let checkpoint = projection
+ .checkpoint()
+ .ok_or_else(|| BaseRelayError::error("group projection rebuild checkpoint is missing"))?;
+ store.put_extra_record(
+ TANGLE_GROUP_CHECKPOINT_TABLE,
+ &projection_checkpoint_key(),
+ &checkpoint.to_json_bytes()?,
+ )?;
+ Ok(())
+}
+
+fn clear_extra_table(store: &PocketStoreHandle, table: &'static str) -> Result<(), BaseRelayError> {
+ for (key, _) in store.scan_extra_records(table)? {
+ store.delete_extra_record(table, &key)?;
+ }
+ Ok(())
+}
+
+fn validate_rebuilt_group_projection(store: &PocketStoreHandle) -> Result<(), BaseRelayError> {
+ let validation = validate_group_extra_tables(store)?;
+ if validation.checkpoint_status().requires_rebuild() {
+ return Err(BaseRelayError::error(
+ "group projection checkpoint is not current after rebuild",
+ ));
+ }
+ Ok(())
+}
+
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GroupExtraTableValidation {
projection_records: usize,
@@ -723,12 +793,12 @@ fn delete_target_event_id(event: &Event) -> Result<EventId, GroupError> {
#[cfg(test)]
mod tests {
use super::{
- GroupCheckpointStatus, GroupService, load_group_storage, scan_canonical_group_events,
+ GroupCheckpointStatus, GroupService, scan_canonical_group_events,
scan_canonical_group_events_after, validate_group_extra_tables,
};
use crate::pocket_conversion::tangle_event_to_pocket;
use tangle_groups::{
- GroupId, GroupRuntimeConfig, KIND_GROUP_METADATA, ProjectionCheckpoint, StoreOffset,
+ GroupRuntimeConfig, KIND_GROUP_METADATA, ProjectionCheckpoint, StoreOffset,
projection_checkpoint_key,
};
use tangle_protocol::{Tag, UnixTimestamp};
diff --git a/crates/tangle_runtime/tests/base_relay_v2.rs b/crates/tangle_runtime/tests/base_relay_v2.rs
@@ -13,6 +13,7 @@ use tangle_protocol::{
filter_from_value, parse_client_message, parse_event_json,
};
use tangle_runtime::{
+ groups::{GroupCheckpointStatus, validate_group_extra_tables},
nip11::{BASE_RELAY_SUPPORTED_NIPS, BaseRelayInfoConfig},
relay::{auth::BaseAuthState, core::BaseRelay, live::CloseResult},
};
@@ -980,6 +981,12 @@ fn projection_rebuild_after_restart_matches_live_state_and_outbox_is_idempotent(
assert_eq!(count_kind(&relay, KIND_GROUP_METADATA), 1);
assert_eq!(count_kind(&relay, KIND_GROUP_ADMINS), 1);
assert_eq!(count_kind(&relay, KIND_GROUP_MEMBERS), 1);
+ let validation = group_extra_table_validation(&config);
+ assert!(validation.projection_records() > 0);
+ assert!(matches!(
+ validation.checkpoint_status(),
+ &GroupCheckpointStatus::Current { .. }
+ ));
let relay = BaseRelay::open_with_groups(&config, 8, &group_config()).expect("second reopen");
assert_eq!(count_kind(&relay, KIND_GROUP_METADATA), 1);
@@ -1126,6 +1133,13 @@ fn delete_group_extra_records(config: &PocketStoreConfig) {
store.sync().expect("sync");
}
+fn group_extra_table_validation(
+ config: &PocketStoreConfig,
+) -> tangle_runtime::groups::GroupExtraTableValidation {
+ let store = PocketStoreHandle::open(config).expect("store");
+ validate_group_extra_tables(&store).expect("validation")
+}
+
fn temp_root(name: &str) -> PathBuf {
std::env::temp_dir().join(format!("tangle-rcld12-{name}-{}", std::process::id()))
}