commit 6b1ca6d8fb9c71194da3ff7268573b2e6edfb3be
parent d5ce401cfcfbc644bad990ab2e1f6570a30db71e
Author: triesap <tyson@radroots.org>
Date: Mon, 15 Jun 2026 23:57:46 -0700
runtime: rebuild groups from Pocket events
- store canonical group rebuild records as Pocket events
- scan Pocket storage without protocol event conversion
- replay checkpoints and missing outbox records from Pocket views
- align rebuild tests with Pocket-backed canonical events
Diffstat:
3 files changed, 125 insertions(+), 132 deletions(-)
diff --git a/crates/tangle_groups/src/projection.rs b/crates/tangle_groups/src/projection.rs
@@ -5,8 +5,9 @@ use crate::{
GroupMetadata, GroupMetadataFlags, GroupMetadataText, RoleDefinition, RoleName, SupportedKinds,
classify_group_event, event_view::GroupEventView, parse_group_metadata,
};
+use pocket_types::{Event as PocketEvent, OwnedEvent as PocketOwnedEvent};
use serde::{Deserialize, Serialize};
-use tangle_protocol::{Event, EventId, Kind, PublicKeyHex, UnixTimestamp};
+use tangle_protocol::{EventId, Kind, PublicKeyHex, UnixTimestamp};
pub const GROUP_PROJECTION_SCHEMA_VERSION: u32 = 1;
pub const GROUP_POLICY_VERSION: u32 = 1;
@@ -823,19 +824,19 @@ pub enum ProjectionApplyOutcome {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CanonicalGroupEvent {
- event: Event,
+ event: PocketOwnedEvent,
store_offset: StoreOffset,
}
impl CanonicalGroupEvent {
- pub fn new(event: Event, store_offset: StoreOffset) -> Self {
+ pub fn new(event: PocketOwnedEvent, store_offset: StoreOffset) -> Self {
Self {
event,
store_offset,
}
}
- pub fn event(&self) -> &Event {
+ pub fn event(&self) -> &PocketEvent {
&self.event
}
@@ -1601,47 +1602,39 @@ mod tests {
fn projection_rebuild_sorts_before_applying_last_tuple_wins() {
let report = rebuild_group_projection(
[
- CanonicalGroupEvent::new(
- event(
- KIND_GROUP_EDIT_METADATA,
- "30",
- 30,
- vec![
- Tag::from_parts("h", &["Farm"]).expect("h"),
- Tag::from_parts("name", &["New"]).expect("name"),
- ],
- ),
- StoreOffset::new(3),
+ canonical_event(
+ KIND_GROUP_EDIT_METADATA,
+ "30",
+ 30,
+ vec![
+ Tag::from_parts("h", &["Farm"]).expect("h"),
+ Tag::from_parts("name", &["New"]).expect("name"),
+ ],
+ 3,
),
- CanonicalGroupEvent::new(
- event(
- 1,
- "40",
- 5,
- vec![Tag::from_parts("h", &["Farm"]).expect("h")],
- ),
- StoreOffset::new(99),
+ canonical_event(
+ 1,
+ "40",
+ 5,
+ vec![Tag::from_parts("h", &["Farm"]).expect("h")],
+ 99,
),
- CanonicalGroupEvent::new(
- event(
- KIND_GROUP_CREATE_GROUP,
- "10",
- 10,
- vec![Tag::from_parts("h", &["Farm"]).expect("h")],
- ),
- StoreOffset::new(1),
+ canonical_event(
+ KIND_GROUP_CREATE_GROUP,
+ "10",
+ 10,
+ vec![Tag::from_parts("h", &["Farm"]).expect("h")],
+ 1,
),
- CanonicalGroupEvent::new(
- event(
- KIND_GROUP_EDIT_METADATA,
- "20",
- 20,
- vec![
- Tag::from_parts("h", &["Farm"]).expect("h"),
- Tag::from_parts("name", &["Old"]).expect("name"),
- ],
- ),
- StoreOffset::new(2),
+ canonical_event(
+ KIND_GROUP_EDIT_METADATA,
+ "20",
+ 20,
+ vec![
+ Tag::from_parts("h", &["Farm"]).expect("h"),
+ Tag::from_parts("name", &["Old"]).expect("name"),
+ ],
+ 2,
),
],
GroupLimitsConfig::default(),
@@ -1671,85 +1664,71 @@ mod tests {
fn projection_rebuild_matches_incremental_projection_for_full_event_stream() {
let limits = GroupLimitsConfig::default();
let events = vec![
- CanonicalGroupEvent::new(
- event(
- KIND_GROUP_EDIT_METADATA,
- "20",
- 20,
- vec![
- Tag::from_parts("h", &["Farm"]).expect("h"),
- Tag::from_parts("name", &["Market"]).expect("name"),
- ],
- ),
- StoreOffset::new(2),
+ canonical_event(
+ KIND_GROUP_EDIT_METADATA,
+ "20",
+ 20,
+ vec![
+ Tag::from_parts("h", &["Farm"]).expect("h"),
+ Tag::from_parts("name", &["Market"]).expect("name"),
+ ],
+ 2,
),
- CanonicalGroupEvent::new(
- event(
- 1,
- "b",
- 15,
- vec![Tag::from_parts("h", &["Farm"]).expect("h")],
- ),
- StoreOffset::new(7),
+ canonical_event(
+ 1,
+ "b",
+ 15,
+ vec![Tag::from_parts("h", &["Farm"]).expect("h")],
+ 7,
),
- CanonicalGroupEvent::new(
- event(
- KIND_GROUP_DELETE_GROUP,
- "50",
- 50,
- vec![Tag::from_parts("h", &["Farm"]).expect("h")],
- ),
- StoreOffset::new(6),
+ canonical_event(
+ KIND_GROUP_DELETE_GROUP,
+ "50",
+ 50,
+ vec![Tag::from_parts("h", &["Farm"]).expect("h")],
+ 6,
),
- CanonicalGroupEvent::new(
- event(
- KIND_GROUP_CREATE_GROUP,
- "10",
- 10,
- vec![
- Tag::from_parts("h", &["Farm"]).expect("h"),
- Tag::from_parts("name", &["Farmers"]).expect("name"),
- ],
- ),
- StoreOffset::new(1),
+ canonical_event(
+ KIND_GROUP_CREATE_GROUP,
+ "10",
+ 10,
+ vec![
+ Tag::from_parts("h", &["Farm"]).expect("h"),
+ Tag::from_parts("name", &["Farmers"]).expect("name"),
+ ],
+ 1,
),
- CanonicalGroupEvent::new(
- event(
- KIND_GROUP_PUT_USER,
- "30",
- 30,
- vec![
- Tag::from_parts("h", &["Farm"]).expect("h"),
- Tag::from_parts("p", &[&"8".repeat(64)]).expect("p"),
- Tag::from_parts("role", &["moderator"]).expect("role"),
- ],
- ),
- StoreOffset::new(3),
+ canonical_event(
+ KIND_GROUP_PUT_USER,
+ "30",
+ 30,
+ vec![
+ Tag::from_parts("h", &["Farm"]).expect("h"),
+ Tag::from_parts("p", &[&"8".repeat(64)]).expect("p"),
+ Tag::from_parts("role", &["moderator"]).expect("role"),
+ ],
+ 3,
),
- CanonicalGroupEvent::new(event(1, "a", 5, Vec::new()), StoreOffset::new(8)),
- CanonicalGroupEvent::new(
- event(
- KIND_GROUP_METADATA,
- "40",
- 40,
- vec![
- Tag::from_parts("d", &["Farm"]).expect("d"),
- Tag::from_parts("name", &["Snapshot"]).expect("name"),
- ],
- ),
- StoreOffset::new(4),
+ canonical_event(1, "a", 5, Vec::new(), 8),
+ canonical_event(
+ KIND_GROUP_METADATA,
+ "40",
+ 40,
+ vec![
+ Tag::from_parts("d", &["Farm"]).expect("d"),
+ Tag::from_parts("name", &["Snapshot"]).expect("name"),
+ ],
+ 4,
),
- CanonicalGroupEvent::new(
- event(
- KIND_GROUP_DELETE_EVENT,
- "45",
- 45,
- vec![
- Tag::from_parts("h", &["Farm"]).expect("h"),
- Tag::from_parts("e", &[id("30")]).expect("e"),
- ],
- ),
- StoreOffset::new(5),
+ canonical_event(
+ KIND_GROUP_DELETE_EVENT,
+ "45",
+ 45,
+ vec![
+ Tag::from_parts("h", &["Farm"]).expect("h"),
+ Tag::from_parts("e", &[id("30")]).expect("e"),
+ ],
+ 5,
),
];
let mut incremental_events = events.clone();
@@ -1983,6 +1962,17 @@ mod tests {
]
}
+ fn canonical_event(
+ kind: u32,
+ suffix: &str,
+ created_at: u64,
+ tags: Vec<Tag>,
+ offset: u64,
+ ) -> CanonicalGroupEvent {
+ let event = event(kind, suffix, created_at, tags);
+ CanonicalGroupEvent::new(pocket_event(&event), StoreOffset::new(offset))
+ }
+
fn pocket_event(event: &Event) -> PocketOwnedEvent {
let raw = event_to_value(event).to_string();
let mut buffer = vec![0; 4096];
diff --git a/crates/tangle_runtime/src/groups.rs b/crates/tangle_runtime/src/groups.rs
@@ -2,7 +2,7 @@
use crate::{
errors::BaseRelayError,
- pocket_conversion::{pocket_event_id, pocket_event_to_tangle, tangle_event_to_pocket},
+ pocket_conversion::{pocket_event_id, tangle_event_to_pocket},
};
use std::{
ops::Deref,
@@ -396,7 +396,7 @@ impl GroupServiceState {
let before_membership_admin =
membership_admin_snapshot_state(&projection, item.event(), &class)?;
projection.apply_canonical_event(item.event(), item.store_offset(), self.limits)?;
- if item.event().unsigned().pubkey() == &relay_pubkey {
+ if item.event().pubkey().as_hex_string() == relay_pubkey.as_str() {
continue;
}
for record in plan_group_outbox_records(
@@ -1145,10 +1145,8 @@ pub fn scan_canonical_group_events_after(
GroupEventClass::Normal { .. }
| GroupEventClass::Moderation { .. }
| GroupEventClass::RelayGeneratedSnapshot { .. } => {
- events.push(CanonicalGroupEvent::new(
- pocket_event_to_tangle(stored.event())?,
- StoreOffset::new(stored.store_offset()),
- ));
+ let store_offset = StoreOffset::new(stored.store_offset());
+ events.push(CanonicalGroupEvent::new(stored.into_event(), store_offset));
}
}
}
@@ -1357,9 +1355,13 @@ mod tests {
assert_eq!(
scan.events()
.iter()
- .map(|event| event.event().id())
+ .map(|event| event.event().id().as_hex_string())
.collect::<Vec<_>>(),
- vec![normal.id(), group.id(), generated.id()]
+ vec![
+ normal.id().as_str().to_owned(),
+ group.id().as_str().to_owned(),
+ generated.id().as_str().to_owned(),
+ ]
);
assert_eq!(
scan.events()
@@ -1378,9 +1380,13 @@ mod tests {
after_public
.events()
.iter()
- .map(|event| event.event().id())
+ .map(|event| event.event().id().as_hex_string())
.collect::<Vec<_>>(),
- vec![normal.id(), group.id(), generated.id()]
+ vec![
+ normal.id().as_str().to_owned(),
+ group.id().as_str().to_owned(),
+ generated.id().as_str().to_owned(),
+ ]
);
let _ = std::fs::remove_dir_all(root);
diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs
@@ -2088,7 +2088,6 @@ mod tests {
};
use crate::config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json};
use crate::event_bus::{TangleEventBus, TangleEventReceiveError, TangleEventReceiver};
- use crate::pocket_conversion::pocket_event_to_tangle;
use crate::rate_limits::{TangleRateLimitKey, TangleRateLimitQueryClass, TangleRateLimitScope};
use crate::relay::auth::BaseAuthState;
use crate::relay::core::{BaseRelayLimitSettings, BaseRelayLimits, BaseRelayQueryMetrics};
@@ -4828,13 +4827,11 @@ mod tests {
.expect("scan")
.into_iter()
.filter_map(|stored| {
- let event = pocket_event_to_tangle(stored.event()).expect("event");
- match tangle_groups::classify_group_event(&event, limits).expect("classify") {
+ let store_offset = StoreOffset::new(stored.store_offset());
+ match tangle_groups::classify_group_event(stored.event(), limits).expect("classify")
+ {
GroupEventClass::NonGroup => None,
- _ => Some(CanonicalGroupEvent::new(
- event,
- StoreOffset::new(stored.store_offset()),
- )),
+ _ => Some(CanonicalGroupEvent::new(stored.into_event(), store_offset)),
}
})
.collect::<Vec<_>>();