tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 50f0cae3b64a903e6a8fc23e82e5335b2490fc4b
parent 984db3829367f27c7c58d27f20779af04e041f48
Author: triesap <tyson@radroots.org>
Date:   Sun, 14 Jun 2026 05:20:23 -0700

groups: rebuild projection from canonical events

- Rebuild group projection from canonical Pocket events when checkpoint state is missing or stale.
- Keep current checkpoint loads on the derived-table path while rebuilds use the canonical scanner.
- Exercise restart recovery by deleting derived group extra-table records before reopening the relay.
- Validated with cargo fmt --all -- --check, cargo check --workspace --all-targets, cargo test --workspace, and cargo clippy --workspace --all-targets -- -D warnings.

Diffstat:
Mcrates/tangle_runtime/src/groups.rs | 55+++++++++++++++++++++++++++++++++++--------------------
Mcrates/tangle_runtime/tests/base_relay_v2.rs | 27++++++++++++++++++++++++++-
2 files changed, 61 insertions(+), 21 deletions(-)

diff --git a/crates/tangle_runtime/src/groups.rs b/crates/tangle_runtime/src/groups.rs @@ -4,7 +4,10 @@ use crate::{ errors::BaseRelayError, pocket_conversion::{pocket_event_id, pocket_event_to_tangle, tangle_event_to_pocket}, }; -use std::str; +use std::{ + str, + time::{SystemTime, UNIX_EPOCH}, +}; use tangle_crypto::RelaySigner; use tangle_groups::{ CanonicalGroupEvent, GroupAuthContext, GroupAuthority, GroupError, GroupErrorKind, @@ -16,7 +19,7 @@ use tangle_groups::{ KIND_GROUP_MEMBERS, KIND_GROUP_PUT_USER, KIND_GROUP_REMOVE_USER, MemberState, ProjectedRoleDefinition, ProjectionCheckpoint, StoreOffset, event_deletion_key, event_view::GroupEventView, group_current_key, member_current_key, projection_checkpoint_key, - role_current_key, tombstone_key, + rebuild_group_projection, role_current_key, tombstone_key, }; use tangle_protocol::{Event, EventId, PublicKeyHex, UnixTimestamp}; use tangle_store_pocket::{ @@ -47,7 +50,7 @@ impl GroupService { .ok_or_else(|| BaseRelayError::invalid("groups.relay_secret is required"))?; let signer = RelaySigner::from_secret_hex(relay_secret.expose_for_signing()) .map_err(BaseRelayError::invalid)?; - let storage = load_group_storage(store)?; + let storage = load_group_storage(store, config.limits())?; let mut service = Self { builder: GroupGeneratedEventBuilder::new(signer), authority: GroupAuthority::new( @@ -416,28 +419,29 @@ struct GroupStorageState { outbox: GroupOutbox, } -fn load_group_storage(store: &PocketStoreHandle) -> Result<GroupStorageState, BaseRelayError> { - validate_group_extra_tables(store)?; - let checkpoint = load_projection_checkpoint(store)?; - let projection_records = store.scan_extra_records(TANGLE_GROUP_PROJECTION_TABLE)?; +fn load_group_storage( + store: &PocketStoreHandle, + limits: GroupLimitsConfig, +) -> Result<GroupStorageState, BaseRelayError> { + let checkpoint_status = validate_group_checkpoint(store)?; let outbox_records = store.scan_extra_records(TANGLE_GROUP_OUTBOX_TABLE)?; + if checkpoint_status.requires_rebuild() { + let scan = scan_canonical_group_events(store, limits)?; + let report = + rebuild_group_projection(scan.into_events(), limits, projection_rebuilt_at()?)?; + return Ok(GroupStorageState { + projection: report.into_projection(), + outbox: load_group_outbox(outbox_records)?, + }); + } + let checkpoint = checkpoint_status.checkpoint().cloned(); + let projection_records = store.scan_extra_records(TANGLE_GROUP_PROJECTION_TABLE)?; Ok(GroupStorageState { projection: load_group_projection(projection_records, checkpoint)?, outbox: load_group_outbox(outbox_records)?, }) } -fn load_projection_checkpoint( - store: &PocketStoreHandle, -) -> Result<Option<ProjectionCheckpoint>, BaseRelayError> { - let Some(raw) = - store.get_extra_record(TANGLE_GROUP_CHECKPOINT_TABLE, &projection_checkpoint_key())? - else { - return Ok(None); - }; - Ok(Some(ProjectionCheckpoint::from_json_bytes(&raw)?)) -} - fn load_group_projection( records: Vec<(Vec<u8>, Vec<u8>)>, checkpoint: Option<ProjectionCheckpoint>, @@ -590,6 +594,17 @@ fn validate_group_checkpoint( } } +fn projection_rebuilt_at() -> Result<UnixTimestamp, BaseRelayError> { + Ok(UnixTimestamp::new( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|error| { + BaseRelayError::error(format!("system clock is before UNIX epoch: {error}")) + })? + .as_secs(), + )) +} + #[derive(Debug, Clone, PartialEq, Eq)] pub struct CanonicalGroupEventScan { events: Vec<CanonicalGroupEvent>, @@ -708,12 +723,12 @@ fn delete_target_event_id(event: &Event) -> Result<EventId, GroupError> { #[cfg(test)] mod tests { use super::{ - GroupCheckpointStatus, GroupService, scan_canonical_group_events, + GroupCheckpointStatus, GroupService, load_group_storage, scan_canonical_group_events, scan_canonical_group_events_after, validate_group_extra_tables, }; use crate::pocket_conversion::tangle_event_to_pocket; use tangle_groups::{ - GroupRuntimeConfig, KIND_GROUP_METADATA, ProjectionCheckpoint, StoreOffset, + GroupId, GroupRuntimeConfig, KIND_GROUP_METADATA, ProjectionCheckpoint, StoreOffset, projection_checkpoint_key, }; use tangle_protocol::{Tag, UnixTimestamp}; diff --git a/crates/tangle_runtime/tests/base_relay_v2.rs b/crates/tangle_runtime/tests/base_relay_v2.rs @@ -16,7 +16,10 @@ use tangle_runtime::{ nip11::{BASE_RELAY_SUPPORTED_NIPS, BaseRelayInfoConfig}, relay::{auth::BaseAuthState, core::BaseRelay, live::CloseResult}, }; -use tangle_store_pocket::{PocketStoreConfig, PocketSyncPolicy}; +use tangle_store_pocket::{ + PocketStoreConfig, PocketStoreHandle, PocketSyncPolicy, TANGLE_GROUP_CHECKPOINT_TABLE, + TANGLE_GROUP_OUTBOX_TABLE, TANGLE_GROUP_PROJECTION_TABLE, +}; use tangle_test_support::{ FixtureKey, TANGLE_V2_RELAY_SECRET_HEX, TANGLE_V2_RELAY_URL, tangle_v2_auth_event, tangle_v2_delete_group_event, tangle_v2_event, tangle_v2_group_config, @@ -948,6 +951,7 @@ fn projection_rebuild_after_restart_matches_live_state_and_outbox_is_idempotent( assert_eq!(count_kind(&relay, KIND_GROUP_MEMBERS), 1); relay.shutdown().expect("shutdown"); } + delete_group_extra_records(&config); let relay = BaseRelay::open_with_groups(&config, 8, &group_config()).expect("reopen"); assert!( @@ -966,6 +970,13 @@ fn projection_rebuild_after_restart_matches_live_state_and_outbox_is_idempotent( .status(), MemberStatus::Member ); + assert!( + relay + .group_projection() + .expect("projection") + .checkpoint() + .is_some() + ); assert_eq!(count_kind(&relay, KIND_GROUP_METADATA), 1); assert_eq!(count_kind(&relay, KIND_GROUP_ADMINS), 1); assert_eq!(count_kind(&relay, KIND_GROUP_MEMBERS), 1); @@ -1101,6 +1112,20 @@ fn test_store_config(name: &str) -> PocketStoreConfig { .expect("config") } +fn delete_group_extra_records(config: &PocketStoreConfig) { + let store = PocketStoreHandle::open(config).expect("store"); + for table in [ + TANGLE_GROUP_PROJECTION_TABLE, + TANGLE_GROUP_OUTBOX_TABLE, + TANGLE_GROUP_CHECKPOINT_TABLE, + ] { + for (key, _) in store.scan_extra_records(table).expect("scan") { + store.delete_extra_record(table, &key).expect("delete"); + } + } + store.sync().expect("sync"); +} + fn temp_root(name: &str) -> PathBuf { std::env::temp_dir().join(format!("tangle-rcld12-{name}-{}", std::process::id())) }