tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 984db3829367f27c7c58d27f20779af04e041f48
parent 64b773033204987f559b582946b9561699f83c38
Author: triesap <tyson@radroots.org>
Date:   Sun, 14 Jun 2026 05:07:29 -0700

groups: validate extra table schema versions

- Add a group extra-table validation report for projection, outbox, and checkpoint state.
- Classify projection checkpoints as missing, current, or stale without making rebuild decisions yet.
- Validate projection and outbox records before group service startup trusts persisted side tables.
- Validated with cargo fmt --all -- --check, cargo check --workspace --all-targets, cargo test --workspace, and cargo clippy --workspace --all-targets -- -D warnings.

Diffstat:
Mcrates/tangle_runtime/src/groups.rs | 282++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----
1 file changed, 267 insertions(+), 15 deletions(-)

diff --git a/crates/tangle_runtime/src/groups.rs b/crates/tangle_runtime/src/groups.rs @@ -47,14 +47,15 @@ impl GroupService { .ok_or_else(|| BaseRelayError::invalid("groups.relay_secret is required"))?; let signer = RelaySigner::from_secret_hex(relay_secret.expose_for_signing()) .map_err(BaseRelayError::invalid)?; + let storage = load_group_storage(store)?; let mut service = Self { builder: GroupGeneratedEventBuilder::new(signer), authority: GroupAuthority::new( config.owner_pubkeys().iter().cloned(), config.admin_pubkeys().iter().cloned(), ), - projection: load_group_projection(store)?, - outbox: load_group_outbox(store)?, + projection: storage.projection, + outbox: storage.outbox, policy: config.policy(), limits: config.limits(), member_snapshot_cap: config.limits().max_member_list_pubkeys(), @@ -409,9 +410,40 @@ impl GroupService { } } -fn load_group_projection(store: &PocketStoreHandle) -> Result<GroupProjection, BaseRelayError> { +#[derive(Debug, Clone, PartialEq, Eq)] +struct GroupStorageState { + projection: GroupProjection, + outbox: GroupOutbox, +} + +fn load_group_storage(store: &PocketStoreHandle) -> Result<GroupStorageState, BaseRelayError> { + validate_group_extra_tables(store)?; + let checkpoint = load_projection_checkpoint(store)?; + let projection_records = store.scan_extra_records(TANGLE_GROUP_PROJECTION_TABLE)?; + let outbox_records = store.scan_extra_records(TANGLE_GROUP_OUTBOX_TABLE)?; + Ok(GroupStorageState { + projection: load_group_projection(projection_records, checkpoint)?, + outbox: load_group_outbox(outbox_records)?, + }) +} + +fn load_projection_checkpoint( + store: &PocketStoreHandle, +) -> Result<Option<ProjectionCheckpoint>, BaseRelayError> { + let Some(raw) = + store.get_extra_record(TANGLE_GROUP_CHECKPOINT_TABLE, &projection_checkpoint_key())? + else { + return Ok(None); + }; + Ok(Some(ProjectionCheckpoint::from_json_bytes(&raw)?)) +} + +fn load_group_projection( + records: Vec<(Vec<u8>, Vec<u8>)>, + checkpoint: Option<ProjectionCheckpoint>, +) -> Result<GroupProjection, BaseRelayError> { let mut projection = GroupProjection::new(); - for (key, value) in store.scan_extra_records(TANGLE_GROUP_PROJECTION_TABLE)? { + for (key, value) in records { match projection_key_parts(&key)?.as_slice() { ["group", _] => projection.put_group(GroupState::from_json_bytes(&value)?), ["member", group_id, _] => projection.put_member( @@ -426,26 +458,139 @@ fn load_group_projection(store: &PocketStoreHandle) -> Result<GroupProjection, B ["event_deletion", _] => { projection.put_event_deletion(GroupEventDeletion::from_json_bytes(&value)?) } - _ => {} + _ => { + return Err(BaseRelayError::error(format!( + "unknown group projection extra-table key: {}", + projection_key_label(&key) + ))); + } } } - if let Some(raw) = - store.get_extra_record(TANGLE_GROUP_CHECKPOINT_TABLE, &projection_checkpoint_key())? - { - projection.set_checkpoint(ProjectionCheckpoint::from_json_bytes(&raw)?); + if let Some(checkpoint) = checkpoint { + projection.set_checkpoint(checkpoint); } Ok(projection) } -fn load_group_outbox(store: &PocketStoreHandle) -> Result<GroupOutbox, BaseRelayError> { +fn load_group_outbox(records: Vec<(Vec<u8>, Vec<u8>)>) -> Result<GroupOutbox, BaseRelayError> { let mut outbox = GroupOutbox::new(); - for (_, value) in store.scan_extra_records(TANGLE_GROUP_OUTBOX_TABLE)? { + for (_, value) in records { outbox.update(GroupOutboxRecord::from_json_bytes(&value)?); } Ok(outbox) } #[derive(Debug, Clone, PartialEq, Eq)] +pub struct GroupExtraTableValidation { + projection_records: usize, + outbox_records: usize, + checkpoint_status: GroupCheckpointStatus, +} + +impl GroupExtraTableValidation { + pub fn projection_records(&self) -> usize { + self.projection_records + } + + pub fn outbox_records(&self) -> usize { + self.outbox_records + } + + pub fn checkpoint_status(&self) -> &GroupCheckpointStatus { + &self.checkpoint_status + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum GroupCheckpointStatus { + Missing, + Current { checkpoint: ProjectionCheckpoint }, + Stale { checkpoint: ProjectionCheckpoint }, +} + +impl GroupCheckpointStatus { + pub fn requires_rebuild(&self) -> bool { + !matches!(self, Self::Current { .. }) + } + + pub fn checkpoint(&self) -> Option<&ProjectionCheckpoint> { + match self { + Self::Missing => None, + Self::Current { checkpoint } | Self::Stale { checkpoint } => Some(checkpoint), + } + } +} + +pub fn validate_group_extra_tables( + store: &PocketStoreHandle, +) -> Result<GroupExtraTableValidation, BaseRelayError> { + let projection_records = validate_group_projection_records(store)?; + let outbox_records = validate_group_outbox_records(store)?; + let checkpoint_status = validate_group_checkpoint(store)?; + Ok(GroupExtraTableValidation { + projection_records, + outbox_records, + checkpoint_status, + }) +} + +fn validate_group_projection_records(store: &PocketStoreHandle) -> Result<usize, BaseRelayError> { + let records = store.scan_extra_records(TANGLE_GROUP_PROJECTION_TABLE)?; + let count = records.len(); + for (key, value) in records { + match projection_key_parts(&key)?.as_slice() { + ["group", _] => { + GroupState::from_json_bytes(&value)?; + } + ["member", _, _] => { + MemberState::from_json_bytes(&value)?; + } + ["role", _, _] => { + ProjectedRoleDefinition::from_json_bytes(&value)?; + } + ["tombstone", _] => { + GroupTombstone::from_json_bytes(&value)?; + } + ["event_deletion", _] => { + GroupEventDeletion::from_json_bytes(&value)?; + } + _ => { + return Err(BaseRelayError::error(format!( + "unknown group projection extra-table key: {}", + projection_key_label(&key) + ))); + } + } + } + Ok(count) +} + +fn validate_group_outbox_records(store: &PocketStoreHandle) -> Result<usize, BaseRelayError> { + let records = store.scan_extra_records(TANGLE_GROUP_OUTBOX_TABLE)?; + let count = records.len(); + for (_, value) in records { + GroupOutboxRecord::from_json_bytes(&value)?; + } + Ok(count) +} + +fn validate_group_checkpoint( + store: &PocketStoreHandle, +) -> Result<GroupCheckpointStatus, BaseRelayError> { + let Some(raw) = + store.get_extra_record(TANGLE_GROUP_CHECKPOINT_TABLE, &projection_checkpoint_key())? + else { + return Ok(GroupCheckpointStatus::Missing); + }; + let checkpoint = ProjectionCheckpoint::from_json_bytes(&raw)?; + if checkpoint.matches_current_versions() { + Ok(GroupCheckpointStatus::Current { checkpoint }) + } else { + Ok(GroupCheckpointStatus::Stale { checkpoint }) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] pub struct CanonicalGroupEventScan { events: Vec<CanonicalGroupEvent>, scanned_events: usize, @@ -511,6 +656,10 @@ fn projection_key_parts(key: &[u8]) -> Result<Vec<&str>, BaseRelayError> { Ok(key.split('\0').collect()) } +fn projection_key_label(key: &[u8]) -> String { + String::from_utf8_lossy(key).replace('\0', "\\0") +} + fn persist_outbox_record( store: &PocketStoreHandle, record: &GroupOutboxRecord, @@ -558,11 +707,20 @@ fn delete_target_event_id(event: &Event) -> Result<EventId, GroupError> { #[cfg(test)] mod tests { - use super::{GroupService, scan_canonical_group_events, scan_canonical_group_events_after}; + use super::{ + GroupCheckpointStatus, GroupService, scan_canonical_group_events, + scan_canonical_group_events_after, validate_group_extra_tables, + }; use crate::pocket_conversion::tangle_event_to_pocket; - use tangle_groups::{GroupRuntimeConfig, KIND_GROUP_METADATA, StoreOffset}; - use tangle_protocol::Tag; - use tangle_store_pocket::{PocketStoreConfig, PocketStoreHandle, PocketSyncPolicy}; + use tangle_groups::{ + GroupRuntimeConfig, KIND_GROUP_METADATA, ProjectionCheckpoint, StoreOffset, + projection_checkpoint_key, + }; + use tangle_protocol::{Tag, UnixTimestamp}; + use tangle_store_pocket::{ + PocketStoreConfig, PocketStoreHandle, PocketSyncPolicy, TANGLE_GROUP_CHECKPOINT_TABLE, + TANGLE_GROUP_PROJECTION_TABLE, + }; use tangle_test_support::{ FixtureKey, tangle_v2_event, tangle_v2_group_create_event, tangle_v2_group_event, }; @@ -673,4 +831,98 @@ mod tests { let _ = std::fs::remove_dir_all(root); } + + #[test] + fn group_extra_table_validation_reports_checkpoint_version_status() { + let (root, store) = test_store("tangle-group-extra-version"); + let missing = validate_group_extra_tables(&store).expect("missing"); + + assert_eq!(missing.projection_records(), 0); + assert_eq!(missing.outbox_records(), 0); + assert_eq!(missing.checkpoint_status(), &GroupCheckpointStatus::Missing); + assert!(missing.checkpoint_status().requires_rebuild()); + + let current = + ProjectionCheckpoint::current(Some(StoreOffset::new(42)), UnixTimestamp::new(100)); + store + .put_extra_record( + TANGLE_GROUP_CHECKPOINT_TABLE, + &projection_checkpoint_key(), + &current.to_json_bytes().expect("current bytes"), + ) + .expect("put current"); + let current_validation = validate_group_extra_tables(&store).expect("current"); + assert_eq!( + current_validation.checkpoint_status(), + &GroupCheckpointStatus::Current { + checkpoint: current.clone() + } + ); + assert!(!current_validation.checkpoint_status().requires_rebuild()); + assert_eq!( + current_validation.checkpoint_status().checkpoint(), + Some(&current) + ); + + let stale = + ProjectionCheckpoint::new(0, 0, Some(StoreOffset::new(42)), UnixTimestamp::new(101)); + store + .put_extra_record( + TANGLE_GROUP_CHECKPOINT_TABLE, + &projection_checkpoint_key(), + &stale.to_json_bytes().expect("stale bytes"), + ) + .expect("put stale"); + let stale_validation = validate_group_extra_tables(&store).expect("stale"); + assert_eq!( + stale_validation.checkpoint_status(), + &GroupCheckpointStatus::Stale { + checkpoint: stale.clone() + } + ); + assert!(stale_validation.checkpoint_status().requires_rebuild()); + + let _ = std::fs::remove_dir_all(root); + } + + #[test] + fn group_extra_table_validation_rejects_bad_projection_schema() { + let (unknown_root, unknown_store) = test_store("tangle-group-extra-unknown"); + unknown_store + .put_extra_record(TANGLE_GROUP_PROJECTION_TABLE, b"unknown\0Farm", b"{}") + .expect("put unknown"); + assert_eq!( + validate_group_extra_tables(&unknown_store) + .expect_err("unknown") + .prefixed_message(), + "error: unknown group projection extra-table key: unknown\\0Farm" + ); + let _ = std::fs::remove_dir_all(unknown_root); + + let (corrupt_root, corrupt_store) = test_store("tangle-group-extra-corrupt"); + corrupt_store + .put_extra_record(TANGLE_GROUP_PROJECTION_TABLE, b"group\0Farm", b"not-json") + .expect("put corrupt"); + assert!( + validate_group_extra_tables(&corrupt_store) + .expect_err("corrupt") + .prefixed_message() + .contains("group state JSON decode failed") + ); + let _ = std::fs::remove_dir_all(corrupt_root); + } + + fn test_store(name: &str) -> (std::path::PathBuf, PocketStoreHandle) { + let root = std::env::temp_dir().join(format!("{}-{}", name, std::process::id())); + let _ = std::fs::remove_dir_all(&root); + let config = PocketStoreConfig::new( + root.join("pocket"), + 1024 * 1024 * 1024, + 128, + PocketSyncPolicy::FlushOnShutdown, + ) + .expect("config"); + let store = PocketStoreHandle::open(&config).expect("store"); + (root, store) + } }