commit 64b773033204987f559b582946b9561699f83c38
parent 1aec981afdeac961306d0ce01f853864239f5145
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 04:58:07 -0700
groups: add canonical pocket event scanner
- Add a Pocket store scan API that walks canonical events with their store offsets.
- Add a group recovery scanner that filters canonical Pocket events into ordered CanonicalGroupEvent values.
- Cover raw Pocket scans, checkpoint-offset scans, and group classification scans for normal, moderation, and generated events.
- Validated with cargo fmt --all -- --check, cargo check --workspace --all-targets, cargo test --workspace, and cargo clippy --workspace --all-targets -- -D warnings.
Diffstat:
2 files changed, 282 insertions(+), 12 deletions(-)
diff --git a/crates/tangle_runtime/src/groups.rs b/crates/tangle_runtime/src/groups.rs
@@ -2,20 +2,21 @@
use crate::{
errors::BaseRelayError,
- pocket_conversion::{pocket_event_id, tangle_event_to_pocket},
+ pocket_conversion::{pocket_event_id, pocket_event_to_tangle, tangle_event_to_pocket},
};
use std::str;
use tangle_crypto::RelaySigner;
use tangle_groups::{
- GroupAuthContext, GroupAuthority, GroupError, GroupErrorKind, GroupEventClass,
- GroupEventDeletion, GroupGeneratedEventBuilder, GroupId, GroupLimitsConfig, GroupOutbox,
- GroupOutboxEffect, GroupOutboxKey, GroupOutboxPayload, GroupOutboxRecord, GroupPolicyConfig,
- GroupProjection, GroupReadDecision, GroupReadGate, GroupRuntimeConfig, GroupState,
- GroupTombstone, KIND_GROUP_CREATE_GROUP, KIND_GROUP_DELETE_EVENT, KIND_GROUP_EDIT_METADATA,
- KIND_GROUP_JOIN_REQUEST, KIND_GROUP_LEAVE_REQUEST, KIND_GROUP_MEMBERS, KIND_GROUP_PUT_USER,
- KIND_GROUP_REMOVE_USER, MemberState, ProjectedRoleDefinition, ProjectionCheckpoint,
- StoreOffset, event_deletion_key, event_view::GroupEventView, group_current_key,
- member_current_key, projection_checkpoint_key, role_current_key, tombstone_key,
+ CanonicalGroupEvent, GroupAuthContext, GroupAuthority, GroupError, GroupErrorKind,
+ GroupEventClass, GroupEventDeletion, GroupGeneratedEventBuilder, GroupId, GroupLimitsConfig,
+ GroupOutbox, GroupOutboxEffect, GroupOutboxKey, GroupOutboxPayload, GroupOutboxRecord,
+ GroupPolicyConfig, GroupProjection, GroupReadDecision, GroupReadGate, GroupRuntimeConfig,
+ GroupState, GroupTombstone, KIND_GROUP_CREATE_GROUP, KIND_GROUP_DELETE_EVENT,
+ KIND_GROUP_EDIT_METADATA, KIND_GROUP_JOIN_REQUEST, KIND_GROUP_LEAVE_REQUEST,
+ KIND_GROUP_MEMBERS, KIND_GROUP_PUT_USER, KIND_GROUP_REMOVE_USER, MemberState,
+ ProjectedRoleDefinition, ProjectionCheckpoint, StoreOffset, event_deletion_key,
+ event_view::GroupEventView, group_current_key, member_current_key, projection_checkpoint_key,
+ role_current_key, tombstone_key,
};
use tangle_protocol::{Event, EventId, PublicKeyHex, UnixTimestamp};
use tangle_store_pocket::{
@@ -444,6 +445,67 @@ fn load_group_outbox(store: &PocketStoreHandle) -> Result<GroupOutbox, BaseRelay
Ok(outbox)
}
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct CanonicalGroupEventScan {
+ events: Vec<CanonicalGroupEvent>,
+ scanned_events: usize,
+ skipped_events: usize,
+}
+
+impl CanonicalGroupEventScan {
+ pub fn events(&self) -> &[CanonicalGroupEvent] {
+ &self.events
+ }
+
+ pub fn into_events(self) -> Vec<CanonicalGroupEvent> {
+ self.events
+ }
+
+ pub fn scanned_events(&self) -> usize {
+ self.scanned_events
+ }
+
+ pub fn skipped_events(&self) -> usize {
+ self.skipped_events
+ }
+}
+
+pub fn scan_canonical_group_events(
+ store: &PocketStoreHandle,
+ limits: GroupLimitsConfig,
+) -> Result<CanonicalGroupEventScan, BaseRelayError> {
+ scan_canonical_group_events_after(store, None, limits)
+}
+
+pub fn scan_canonical_group_events_after(
+ store: &PocketStoreHandle,
+ last_offset: Option<StoreOffset>,
+ limits: GroupLimitsConfig,
+) -> Result<CanonicalGroupEventScan, BaseRelayError> {
+ let stored_events = store.scan_events_after(last_offset.map(StoreOffset::as_u64))?;
+ let scanned_events = stored_events.len();
+ let mut events = Vec::new();
+ let mut skipped_events = 0;
+ for stored in stored_events {
+ match tangle_groups::classify_group_event(stored.event(), limits)? {
+ GroupEventClass::NonGroup => skipped_events += 1,
+ GroupEventClass::Normal { .. }
+ | GroupEventClass::Moderation { .. }
+ | GroupEventClass::RelayGeneratedSnapshot { .. } => {
+ events.push(CanonicalGroupEvent::new(
+ pocket_event_to_tangle(stored.event())?,
+ StoreOffset::new(stored.store_offset()),
+ ));
+ }
+ }
+ }
+ Ok(CanonicalGroupEventScan {
+ events,
+ scanned_events,
+ skipped_events,
+ })
+}
+
fn projection_key_parts(key: &[u8]) -> Result<Vec<&str>, BaseRelayError> {
let key = str::from_utf8(key).map_err(|error| BaseRelayError::error(error.to_string()))?;
Ok(key.split('\0').collect())
@@ -496,9 +558,14 @@ fn delete_target_event_id(event: &Event) -> Result<EventId, GroupError> {
#[cfg(test)]
mod tests {
- use super::GroupService;
- use tangle_groups::GroupRuntimeConfig;
+ use super::{GroupService, scan_canonical_group_events, scan_canonical_group_events_after};
+ use crate::pocket_conversion::tangle_event_to_pocket;
+ use tangle_groups::{GroupRuntimeConfig, KIND_GROUP_METADATA, StoreOffset};
+ use tangle_protocol::Tag;
use tangle_store_pocket::{PocketStoreConfig, PocketStoreHandle, PocketSyncPolicy};
+ use tangle_test_support::{
+ FixtureKey, tangle_v2_event, tangle_v2_group_create_event, tangle_v2_group_event,
+ };
#[test]
fn group_service_from_disabled_config_is_absent() {
@@ -522,4 +589,88 @@ mod tests {
.is_none()
);
}
+
+ #[test]
+ fn canonical_group_event_scanner_returns_group_events_with_offsets() {
+ let root = std::env::temp_dir().join(format!(
+ "tangle-canonical-group-scan-{}",
+ std::process::id()
+ ));
+ let _ = std::fs::remove_dir_all(&root);
+ let config = PocketStoreConfig::new(
+ root.join("pocket"),
+ 1024 * 1024 * 1024,
+ 128,
+ PocketSyncPolicy::FlushOnShutdown,
+ )
+ .expect("config");
+ let store = PocketStoreHandle::open(&config).expect("store");
+ let public =
+ tangle_v2_event(FixtureKey::Member, 1, 1, Vec::new(), "public").expect("public");
+ let normal =
+ tangle_v2_group_event(FixtureKey::Member, "ScanFarm", 2, 1, "normal").expect("normal");
+ let group =
+ tangle_v2_group_create_event(FixtureKey::Owner, "ScanFarm", 3, &[]).expect("group");
+ let generated = tangle_v2_event(
+ FixtureKey::Owner,
+ 4,
+ KIND_GROUP_METADATA.into(),
+ vec![Tag::from_parts("d", &["ScanFarm"]).expect("d")],
+ "",
+ )
+ .expect("generated");
+ let public_offset = store
+ .store_event(&tangle_event_to_pocket(&public).expect("public pocket"))
+ .expect("store public");
+ let normal_offset = store
+ .store_event(&tangle_event_to_pocket(&normal).expect("normal pocket"))
+ .expect("store normal");
+ let group_offset = store
+ .store_event(&tangle_event_to_pocket(&group).expect("group pocket"))
+ .expect("store group");
+ let generated_offset = store
+ .store_event(&tangle_event_to_pocket(&generated).expect("generated pocket"))
+ .expect("store generated");
+
+ let scan = scan_canonical_group_events(&store, Default::default()).expect("scan");
+ let after_public = scan_canonical_group_events_after(
+ &store,
+ Some(StoreOffset::new(public_offset)),
+ Default::default(),
+ )
+ .expect("after public");
+
+ assert_eq!(scan.scanned_events(), 4);
+ assert_eq!(scan.skipped_events(), 1);
+ assert_eq!(
+ scan.events()
+ .iter()
+ .map(|event| event.event().id())
+ .collect::<Vec<_>>(),
+ vec![normal.id(), group.id(), generated.id()]
+ );
+ assert_eq!(
+ scan.events()
+ .iter()
+ .map(|event| event.store_offset())
+ .collect::<Vec<_>>(),
+ vec![
+ StoreOffset::new(normal_offset),
+ StoreOffset::new(group_offset),
+ StoreOffset::new(generated_offset),
+ ]
+ );
+ assert_eq!(after_public.scanned_events(), 3);
+ assert_eq!(after_public.skipped_events(), 0);
+ assert_eq!(
+ after_public
+ .events()
+ .iter()
+ .map(|event| event.event().id())
+ .collect::<Vec<_>>(),
+ vec![normal.id(), group.id(), generated.id()]
+ );
+
+ let _ = std::fs::remove_dir_all(root);
+ }
}
diff --git a/crates/tangle_store_pocket/src/lib.rs b/crates/tangle_store_pocket/src/lib.rs
@@ -33,6 +33,33 @@ pub type PocketExtraRecord = (Vec<u8>, Vec<u8>);
pub type PocketExtraRecords = Vec<PocketExtraRecord>;
#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct PocketStoredEvent {
+ store_offset: u64,
+ event: PocketOwnedEvent,
+}
+
+impl PocketStoredEvent {
+ pub fn new(store_offset: u64, event: PocketOwnedEvent) -> Self {
+ Self {
+ store_offset,
+ event,
+ }
+ }
+
+ pub fn store_offset(&self) -> u64 {
+ self.store_offset
+ }
+
+ pub fn event(&self) -> &PocketEvent {
+ &self.event
+ }
+
+ pub fn into_event(self) -> PocketOwnedEvent {
+ self.event
+ }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PocketScreenedEvents {
events: Vec<PocketOwnedEvent>,
redacted: bool,
@@ -172,6 +199,39 @@ impl PocketStoreHandle {
.map(|events| u64::try_from(events.len()).expect("usize count fits in u64"))
}
+ pub fn scan_events(&self) -> Result<Vec<PocketStoredEvent>, PocketStoreError> {
+ self.scan_events_after(None)
+ }
+
+ pub fn scan_events_after(
+ &self,
+ last_offset: Option<u64>,
+ ) -> Result<Vec<PocketStoredEvent>, PocketStoreError> {
+ let stats = self.store.stats().map_err(PocketStoreError::from_pocket)?;
+ let end = u64::try_from(stats.event_bytes)
+ .map_err(|_| PocketStoreError::invalid("Pocket event map size exceeds u64"))?;
+ let mut offset = match last_offset {
+ Some(offset) => {
+ let event = self
+ .store
+ .get_event_by_offset(offset)
+ .map_err(PocketStoreError::from_pocket)?;
+ next_event_offset(offset, event)?
+ }
+ None => event_map_start_offset(),
+ };
+ let mut events = Vec::new();
+ while offset < end {
+ let event = self
+ .store
+ .get_event_by_offset(offset)
+ .map_err(PocketStoreError::from_pocket)?;
+ events.push(PocketStoredEvent::new(offset, event.to_owned()));
+ offset = next_event_offset(offset, event)?;
+ }
+ Ok(events)
+ }
+
pub fn put_extra_record(
&self,
table: &'static str,
@@ -433,6 +493,30 @@ fn pocket_json_buffer_len(raw_len: usize) -> usize {
raw_len.saturating_mul(2).max(4096)
}
+fn event_map_start_offset() -> u64 {
+ u64::try_from(std::mem::size_of::<usize>()).expect("usize header size fits u64")
+}
+
+fn align_event_offset(offset: u64) -> u64 {
+ if offset.is_multiple_of(8) {
+ offset
+ } else {
+ offset + (8 - offset % 8)
+ }
+}
+
+fn next_event_offset(offset: u64, event: &PocketEvent) -> Result<u64, PocketStoreError> {
+ let next = offset
+ .checked_add(event_len_u64(event)?)
+ .ok_or_else(|| PocketStoreError::invalid("Pocket event offset exceeds u64"))?;
+ Ok(align_event_offset(next))
+}
+
+fn event_len_u64(event: &PocketEvent) -> Result<u64, PocketStoreError> {
+ u64::try_from(event.len())
+ .map_err(|_| PocketStoreError::invalid("Pocket event size exceeds u64"))
+}
+
#[cfg(test)]
mod tests {
use super::{
@@ -516,6 +600,41 @@ mod tests {
}
#[test]
+ fn pocket_store_handle_scans_canonical_events_with_offsets() {
+ let root = temp_root("tangle-pocket-scan");
+ let config = PocketStoreConfig::new(
+ root.join("pocket"),
+ 1024 * 1024 * 1024,
+ 128,
+ PocketSyncPolicy::FlushOnShutdown,
+ )
+ .expect("config");
+ let handle = PocketStoreHandle::open(&config).expect("open");
+ let first =
+ parse_pocket_event_json(event_json_with("a", "1", "first").as_bytes()).expect("first");
+ let second = parse_pocket_event_json(event_json_with("c", "2", "second").as_bytes())
+ .expect("second");
+
+ let first_offset = handle.store_event(&first).expect("store first");
+ let second_offset = handle.store_event(&second).expect("store second");
+ let all = handle.scan_events().expect("scan");
+ let after_first = handle
+ .scan_events_after(Some(first_offset))
+ .expect("scan after first");
+
+ assert_eq!(all.len(), 2);
+ assert_eq!(all[0].store_offset(), first_offset);
+ assert_eq!(all[0].event().id(), first.id());
+ assert_eq!(all[1].store_offset(), second_offset);
+ assert_eq!(all[1].event().id(), second.id());
+ assert_eq!(after_first.len(), 1);
+ assert_eq!(after_first[0].store_offset(), second_offset);
+ assert_eq!(after_first[0].event().id(), second.id());
+
+ let _ = std::fs::remove_dir_all(root);
+ }
+
+ #[test]
fn pocket_store_handle_screens_events_before_materialization() {
let root = temp_root("tangle-pocket-screen");
let config = PocketStoreConfig::new(