tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

groups.rs (49951B)


      1 #![forbid(unsafe_code)]
      2 
      3 use crate::{errors::BaseRelayError, pocket_conversion::pocket_event_id};
      4 use std::{
      5     ops::Deref,
      6     str,
      7     sync::{Arc, RwLock, RwLockReadGuard},
      8     time::{SystemTime, UNIX_EPOCH},
      9 };
     10 use tangle_crypto::RelaySigner;
     11 use tangle_groups::{
     12     CanonicalGroupEvent, GroupAuthContext, GroupAuthority, GroupError, GroupErrorKind,
     13     GroupEventClass, GroupEventDeletion, GroupGeneratedEventBuilder, GroupId, GroupLimitsConfig,
     14     GroupOutbox, GroupOutboxEffect, GroupOutboxKey, GroupOutboxPayload, GroupOutboxRecord,
     15     GroupPolicyConfig, GroupProjection, GroupReadDecision, GroupReadGate, GroupRuntimeConfig,
     16     GroupState, GroupTombstone, KIND_GROUP_CREATE_GROUP, KIND_GROUP_DELETE_EVENT,
     17     KIND_GROUP_EDIT_METADATA, KIND_GROUP_JOIN_REQUEST, KIND_GROUP_LEAVE_REQUEST,
     18     KIND_GROUP_MEMBERS, KIND_GROUP_PUT_USER, KIND_GROUP_REMOVE_USER, MemberState, MemberStatus,
     19     ProjectedRoleDefinition, ProjectionCheckpoint, RoleName, StoreOffset, event_deletion_key,
     20     event_view::GroupEventView, group_current_key, member_current_key, projection_checkpoint_key,
     21     rebuild_group_projection, role_current_key, tombstone_key,
     22 };
     23 use tangle_protocol::{EventId, PublicKeyHex, UnixTimestamp};
     24 use tangle_store_pocket::{
     25     PocketEvent, PocketEventId, PocketOwnedEvent, PocketStoreHandle, TANGLE_GROUP_CHECKPOINT_TABLE,
     26     TANGLE_GROUP_OUTBOX_TABLE, TANGLE_GROUP_PROJECTION_TABLE,
     27 };
     28 
     29 #[derive(Clone)]
     30 pub(crate) struct GroupServiceHandle {
     31     state: Arc<RwLock<GroupServiceState>>,
     32 }
     33 
     34 pub(crate) enum GroupEventWrite {
     35     Stored(Vec<StoreOffset>),
     36     Duplicate,
     37 }
     38 
     39 pub(crate) enum GroupEventWriteError {
     40     Rejected(GroupError),
     41     Storage(BaseRelayError),
     42 }
     43 
     44 struct GeneratedGroupStorageEvent {
     45     event: PocketOwnedEvent,
     46 }
     47 
     48 impl GeneratedGroupStorageEvent {
     49     fn build(
     50         builder: &GroupGeneratedEventBuilder,
     51         payload: &GroupOutboxPayload,
     52     ) -> Result<Self, BaseRelayError> {
     53         let event = builder.sign_payload_pocket(payload)?;
     54         Ok(Self { event })
     55     }
     56 
     57     fn event(&self) -> &PocketEvent {
     58         &self.event
     59     }
     60 
     61     fn event_id(&self) -> Result<EventId, BaseRelayError> {
     62         EventId::new(&self.event().id().as_hex_string()).map_err(BaseRelayError::error)
     63     }
     64 }
     65 
     66 impl From<BaseRelayError> for GroupEventWriteError {
     67     fn from(error: BaseRelayError) -> Self {
     68         Self::Storage(error)
     69     }
     70 }
     71 
     72 pub struct GroupProjectionReadGuard<'a> {
     73     state: RwLockReadGuard<'a, GroupServiceState>,
     74 }
     75 
     76 impl Deref for GroupProjectionReadGuard<'_> {
     77     type Target = GroupProjection;
     78 
     79     fn deref(&self) -> &Self::Target {
     80         &self.state.projection
     81     }
     82 }
     83 
     84 pub(crate) struct GroupServiceState {
     85     builder: GroupGeneratedEventBuilder,
     86     authority: GroupAuthority,
     87     projection: GroupProjection,
     88     outbox: GroupOutbox,
     89     policy: GroupPolicyConfig,
     90     limits: GroupLimitsConfig,
     91     member_snapshot_cap: u32,
     92     outbox_replay_batch_cap: u32,
     93 }
     94 
     95 impl GroupServiceHandle {
     96     pub(crate) fn from_config(
     97         store: &PocketStoreHandle,
     98         config: &GroupRuntimeConfig,
     99     ) -> Result<Option<Self>, BaseRelayError> {
    100         GroupServiceState::from_config(store, config).map(|state| {
    101             state.map(|state| Self {
    102                 state: Arc::new(RwLock::new(state)),
    103             })
    104         })
    105     }
    106 
    107     pub(crate) fn projection(&self) -> GroupProjectionReadGuard<'_> {
    108         GroupProjectionReadGuard {
    109             state: self
    110                 .state
    111                 .read()
    112                 .expect("group service state lock is not poisoned"),
    113         }
    114     }
    115 
    116     pub(crate) fn limits(&self) -> GroupLimitsConfig {
    117         self.state
    118             .read()
    119             .expect("group service state lock is not poisoned")
    120             .limits()
    121     }
    122 
    123     pub(crate) fn outbox_pending_events(&self) -> usize {
    124         self.state
    125             .read()
    126             .expect("group service state lock is not poisoned")
    127             .outbox_pending_events()
    128     }
    129 
    130     pub(crate) fn event_visible_to_auth(
    131         &self,
    132         event: &(impl GroupEventView + ?Sized),
    133         auth: &GroupAuthContext,
    134     ) -> Result<bool, GroupError> {
    135         self.state
    136             .read()
    137             .map_err(|_| GroupError::internal("group service state lock is poisoned"))?
    138             .event_visible_to_auth(event, auth)
    139     }
    140 
    141     pub(crate) fn store_group_pocket_event(
    142         &self,
    143         store: &PocketStoreHandle,
    144         event: &PocketEvent,
    145         class: &GroupEventClass,
    146         auth: &GroupAuthContext,
    147     ) -> Result<GroupEventWrite, GroupEventWriteError> {
    148         self.state
    149             .write()
    150             .map_err(|_| BaseRelayError::error("group service state lock is poisoned"))?
    151             .store_group_pocket_event(store, event, class, auth)
    152     }
    153 }
    154 
    155 impl GroupServiceState {
    156     fn from_config(
    157         store: &PocketStoreHandle,
    158         config: &GroupRuntimeConfig,
    159     ) -> Result<Option<Self>, BaseRelayError> {
    160         if !config.enabled() {
    161             return Ok(None);
    162         }
    163         let relay_secret = config
    164             .relay_secret()
    165             .ok_or_else(|| BaseRelayError::invalid("groups.relay_secret is required"))?;
    166         let signer = RelaySigner::from_secret_hex(relay_secret.expose_for_signing())
    167             .map_err(BaseRelayError::invalid)?;
    168         let storage = load_group_storage(store, config.limits())?;
    169         let mut state = Self {
    170             builder: GroupGeneratedEventBuilder::new(signer),
    171             authority: GroupAuthority::new(
    172                 config.owner_pubkeys().iter().cloned(),
    173                 config.admin_pubkeys().iter().cloned(),
    174             ),
    175             projection: storage.projection,
    176             outbox: storage.outbox,
    177             policy: config.policy(),
    178             limits: config.limits(),
    179             member_snapshot_cap: config.limits().max_member_list_pubkeys(),
    180             outbox_replay_batch_cap: config.limits().max_outbox_replay_batch(),
    181         };
    182         state.derive_missing_outbox_records(store)?;
    183         state.materialize_outbox(store)?;
    184         Ok(Some(state))
    185     }
    186 
    187     fn limits(&self) -> GroupLimitsConfig {
    188         self.limits
    189     }
    190 
    191     fn outbox_pending_events(&self) -> usize {
    192         self.outbox.replay_plan().records().len()
    193     }
    194 
    195     fn check_event(
    196         &self,
    197         store: &PocketStoreHandle,
    198         event: &(impl GroupEventView + ?Sized),
    199         class: &GroupEventClass,
    200         auth: &GroupAuthContext,
    201     ) -> Result<(), GroupError> {
    202         tangle_groups::GroupWritePolicy::new(&self.projection, &self.authority, self.policy)
    203             .check_event(event, class, auth)
    204             .map(|_| ())?;
    205         self.check_runtime_write_constraints(store, event, class)
    206     }
    207 
    208     fn check_runtime_write_constraints(
    209         &self,
    210         store: &PocketStoreHandle,
    211         event: &(impl GroupEventView + ?Sized),
    212         class: &GroupEventClass,
    213     ) -> Result<(), GroupError> {
    214         if let GroupEventClass::Moderation { kind, group_id } = class
    215             && kind.as_u32() == KIND_GROUP_DELETE_EVENT
    216         {
    217             self.check_delete_event_target(store, event, group_id)?;
    218         }
    219         Ok(())
    220     }
    221 
    222     fn check_delete_event_target(
    223         &self,
    224         store: &PocketStoreHandle,
    225         event: &(impl GroupEventView + ?Sized),
    226         group_id: &GroupId,
    227     ) -> Result<(), GroupError> {
    228         let target_id = delete_target_event_id(event)?;
    229         let Some(target) = store
    230             .event_by_id(
    231                 pocket_event_id(&target_id)
    232                     .map_err(|error| GroupError::internal(error.prefixed_message()))?,
    233             )
    234             .map_err(|error| GroupError::internal(error.to_string()))?
    235         else {
    236             return Err(GroupError::invalid(
    237                 GroupErrorKind::MalformedTargetTag,
    238                 "delete target event is unavailable",
    239             ));
    240         };
    241         let target_class = tangle_groups::classify_group_event(&target, self.limits)?;
    242         if target_class.group_id() != Some(group_id) {
    243             return Err(GroupError::invalid(
    244                 GroupErrorKind::MalformedTargetTag,
    245                 "delete target event is not in group",
    246             ));
    247         }
    248         Ok(())
    249     }
    250 
    251     fn store_group_pocket_event(
    252         &mut self,
    253         store: &PocketStoreHandle,
    254         event: &PocketEvent,
    255         class: &GroupEventClass,
    256         auth: &GroupAuthContext,
    257     ) -> Result<GroupEventWrite, GroupEventWriteError> {
    258         self.check_event(store, event, class, auth)
    259             .map_err(GroupEventWriteError::Rejected)?;
    260         if store
    261             .event_by_id(event.id())
    262             .map_err(BaseRelayError::from)?
    263             .is_some()
    264         {
    265             return Ok(GroupEventWrite::Duplicate);
    266         }
    267         let store_offset =
    268             StoreOffset::new(store.store_event(event).map_err(BaseRelayError::from)?);
    269         let mut stored_offsets = vec![store_offset];
    270         stored_offsets.extend(self.after_source_event_stored(store, event, class, store_offset)?);
    271         Ok(GroupEventWrite::Stored(stored_offsets))
    272     }
    273 
    274     fn event_visible_to_auth(
    275         &self,
    276         event: &(impl GroupEventView + ?Sized),
    277         auth: &GroupAuthContext,
    278     ) -> Result<bool, GroupError> {
    279         let gate = GroupReadGate::new(&self.projection, &self.authority);
    280         if auth.authenticated_pubkeys().is_empty() {
    281             return gate
    282                 .screen_event(event, None, self.limits)
    283                 .map(|decision| decision == GroupReadDecision::Visible);
    284         }
    285         for pubkey in auth.authenticated_pubkeys() {
    286             if gate.screen_event(event, Some(pubkey), self.limits)? == GroupReadDecision::Visible {
    287                 return Ok(true);
    288             }
    289         }
    290         Ok(false)
    291     }
    292 
    293     fn after_source_event_stored(
    294         &mut self,
    295         store: &PocketStoreHandle,
    296         event: &(impl GroupEventView + ?Sized),
    297         class: &GroupEventClass,
    298         store_offset: StoreOffset,
    299     ) -> Result<Vec<StoreOffset>, BaseRelayError> {
    300         let before_membership_admin =
    301             membership_admin_snapshot_state(&self.projection, event, class)?;
    302         self.projection
    303             .apply_canonical_event(event, store_offset, self.limits)?;
    304         if let Some(group_id) = class_group_id(class) {
    305             self.persist_group_projection(store, group_id)?;
    306         }
    307         for record in self.plan_outbox_records(event, class, before_membership_admin)? {
    308             let inserted = self.outbox.merge_idempotent(record.clone())?;
    309             if inserted {
    310                 persist_outbox_record(store, &record)?;
    311             }
    312         }
    313         if let Some(group_id) = class_group_id(class) {
    314             return self.materialize_outbox_for_group(store, group_id);
    315         }
    316         Ok(Vec::new())
    317     }
    318 
    319     fn plan_outbox_records(
    320         &self,
    321         event: &(impl GroupEventView + ?Sized),
    322         class: &GroupEventClass,
    323         before_membership_admin: Option<bool>,
    324     ) -> Result<Vec<GroupOutboxRecord>, GroupError> {
    325         plan_group_outbox_records(
    326             event,
    327             class,
    328             &self.projection,
    329             &self.authority,
    330             self.member_snapshot_cap,
    331             before_membership_admin,
    332         )
    333     }
    334 
    335     fn derive_missing_outbox_records(
    336         &mut self,
    337         store: &PocketStoreHandle,
    338     ) -> Result<(), BaseRelayError> {
    339         let relay_pubkey = self.builder.relay_pubkey().clone();
    340         let scan = scan_canonical_group_events(store, self.limits)?;
    341         let mut projection = GroupProjection::new();
    342         let mut events = scan.into_events();
    343         events.sort_by_key(CanonicalGroupEvent::tuple);
    344         for item in events {
    345             let class = tangle_groups::classify_group_event(item.event(), self.limits)?;
    346             let before_membership_admin =
    347                 membership_admin_snapshot_state(&projection, item.event(), &class)?;
    348             projection.apply_canonical_event(item.event(), item.store_offset(), self.limits)?;
    349             if item.event().pubkey().as_hex_string() == relay_pubkey.as_str() {
    350                 continue;
    351             }
    352             for record in plan_group_outbox_records(
    353                 item.event(),
    354                 &class,
    355                 &projection,
    356                 &self.authority,
    357                 self.member_snapshot_cap,
    358                 before_membership_admin,
    359             )? {
    360                 let inserted = self.outbox.merge_idempotent(record.clone())?;
    361                 if inserted {
    362                     persist_outbox_record(store, &record)?;
    363                 }
    364             }
    365         }
    366         Ok(())
    367     }
    368 
    369     fn materialize_outbox(
    370         &mut self,
    371         store: &PocketStoreHandle,
    372     ) -> Result<Vec<StoreOffset>, BaseRelayError> {
    373         let mut stored_offsets = Vec::new();
    374         loop {
    375             let records = self
    376                 .outbox
    377                 .replay_plan()
    378                 .records()
    379                 .iter()
    380                 .take(self.outbox_replay_batch_cap())
    381                 .cloned()
    382                 .collect::<Vec<_>>();
    383             if records.is_empty() {
    384                 break;
    385             }
    386             stored_offsets.extend(self.materialize_records(store, records)?);
    387         }
    388         Ok(stored_offsets)
    389     }
    390 
    391     fn materialize_outbox_for_group(
    392         &mut self,
    393         store: &PocketStoreHandle,
    394         group_id: &GroupId,
    395     ) -> Result<Vec<StoreOffset>, BaseRelayError> {
    396         let mut stored_offsets = Vec::new();
    397         loop {
    398             let records = self
    399                 .outbox
    400                 .replay_plan_for_group(group_id)
    401                 .records()
    402                 .iter()
    403                 .take(self.outbox_replay_batch_cap())
    404                 .cloned()
    405                 .collect::<Vec<_>>();
    406             if records.is_empty() {
    407                 break;
    408             }
    409             stored_offsets.extend(self.materialize_records(store, records)?);
    410         }
    411         Ok(stored_offsets)
    412     }
    413 
    414     fn outbox_replay_batch_cap(&self) -> usize {
    415         usize::try_from(self.outbox_replay_batch_cap)
    416             .expect("u32 outbox replay batch cap fits usize")
    417     }
    418 
    419     fn materialize_records(
    420         &mut self,
    421         store: &PocketStoreHandle,
    422         records: Vec<GroupOutboxRecord>,
    423     ) -> Result<Vec<StoreOffset>, BaseRelayError> {
    424         let mut stored_offsets = Vec::new();
    425         for record in records {
    426             if let Some(offset) = self.materialize_record(store, record)? {
    427                 stored_offsets.push(offset);
    428             }
    429         }
    430         Ok(stored_offsets)
    431     }
    432 
    433     fn materialize_record(
    434         &mut self,
    435         store: &PocketStoreHandle,
    436         mut record: GroupOutboxRecord,
    437     ) -> Result<Option<StoreOffset>, BaseRelayError> {
    438         if matches!(
    439             record.key().effect(),
    440             GroupOutboxEffect::RoleListSnapshot | GroupOutboxEffect::State39004Snapshot
    441         ) {
    442             record.mark_skipped("generated group effect is not supported");
    443             self.outbox.update(record.clone());
    444             persist_outbox_record(store, &record)?;
    445             return Ok(None);
    446         }
    447         match self.store_generated_event(store, &record) {
    448             Ok((generated_event_id, stored_offset)) => {
    449                 record.mark_stored(generated_event_id);
    450                 self.outbox.update(record.clone());
    451                 persist_outbox_record(store, &record)?;
    452                 Ok(stored_offset)
    453             }
    454             Err(error) => {
    455                 record.mark_failed(true, error.prefixed_message());
    456                 self.outbox.update(record.clone());
    457                 persist_outbox_record(store, &record)?;
    458                 Err(error)
    459             }
    460         }
    461     }
    462 
    463     fn store_generated_event(
    464         &mut self,
    465         store: &PocketStoreHandle,
    466         record: &GroupOutboxRecord,
    467     ) -> Result<(EventId, Option<StoreOffset>), BaseRelayError> {
    468         let generated = GeneratedGroupStorageEvent::build(&self.builder, record.payload())?;
    469         let event_id = generated.event_id()?;
    470         if generated_event_already_stored(store, generated.event().id())? {
    471             return Ok((event_id, None));
    472         }
    473         let offset = StoreOffset::new(store.store_event(generated.event())?);
    474         self.projection
    475             .apply_canonical_event(generated.event(), offset, self.limits)?;
    476         self.persist_group_projection(store, record.key().group_id())?;
    477         Ok((event_id, Some(offset)))
    478     }
    479 
    480     fn persist_group_projection(
    481         &self,
    482         store: &PocketStoreHandle,
    483         group_id: &GroupId,
    484     ) -> Result<(), BaseRelayError> {
    485         if let Some(group) = self.projection.group(group_id) {
    486             store.put_extra_record(
    487                 TANGLE_GROUP_PROJECTION_TABLE,
    488                 &group_current_key(group_id),
    489                 &group.to_json_bytes()?,
    490             )?;
    491         }
    492         for ((candidate_group, pubkey), member) in self.projection.members() {
    493             if candidate_group == group_id {
    494                 store.put_extra_record(
    495                     TANGLE_GROUP_PROJECTION_TABLE,
    496                     &member_current_key(group_id, pubkey),
    497                     &member.to_json_bytes()?,
    498                 )?;
    499             }
    500         }
    501         for ((candidate_group, role_name), role) in self.projection.roles() {
    502             if candidate_group == group_id {
    503                 store.put_extra_record(
    504                     TANGLE_GROUP_PROJECTION_TABLE,
    505                     &role_current_key(group_id, role_name),
    506                     &role.to_json_bytes()?,
    507                 )?;
    508             }
    509         }
    510         if let Some(tombstone) = self.projection.tombstone(group_id) {
    511             store.put_extra_record(
    512                 TANGLE_GROUP_PROJECTION_TABLE,
    513                 &tombstone_key(group_id),
    514                 &tombstone.to_json_bytes()?,
    515             )?;
    516         }
    517         for (target_event_id, deletion) in self.projection.event_deletions() {
    518             if deletion.group_id() == group_id {
    519                 store.put_extra_record(
    520                     TANGLE_GROUP_PROJECTION_TABLE,
    521                     &event_deletion_key(target_event_id),
    522                     &deletion.to_json_bytes()?,
    523                 )?;
    524             }
    525         }
    526         Ok(())
    527     }
    528 }
    529 
    530 fn plan_group_outbox_records(
    531     event: &(impl GroupEventView + ?Sized),
    532     class: &GroupEventClass,
    533     projection: &GroupProjection,
    534     authority: &GroupAuthority,
    535     member_snapshot_cap: u32,
    536     before_membership_admin: Option<bool>,
    537 ) -> Result<Vec<GroupOutboxRecord>, GroupError> {
    538     let created_at = event.created_at();
    539     match class {
    540         GroupEventClass::Moderation { kind, group_id } => match kind.as_u32() {
    541             KIND_GROUP_CREATE_GROUP => {
    542                 let group = require_projected_group(projection, group_id)?;
    543                 Ok(vec![
    544                     pending_record(
    545                         event,
    546                         GroupOutboxEffect::MetadataSnapshot,
    547                         group_id,
    548                         None,
    549                         GroupGeneratedEventBuilder::metadata_snapshot_payload(group, created_at)?,
    550                     )?,
    551                     pending_record(
    552                         event,
    553                         GroupOutboxEffect::AdminListSnapshot,
    554                         group_id,
    555                         None,
    556                         GroupGeneratedEventBuilder::admin_list_snapshot_payload(
    557                             group_id, projection, authority, created_at,
    558                         )?,
    559                     )?,
    560                 ])
    561             }
    562             KIND_GROUP_EDIT_METADATA => {
    563                 let group = require_projected_group(projection, group_id)?;
    564                 Ok(vec![pending_record(
    565                     event,
    566                     GroupOutboxEffect::MetadataSnapshot,
    567                     group_id,
    568                     None,
    569                     GroupGeneratedEventBuilder::metadata_snapshot_payload(group, created_at)?,
    570                 )?])
    571             }
    572             KIND_GROUP_PUT_USER | KIND_GROUP_REMOVE_USER => member_snapshot_records(
    573                 event,
    574                 group_id,
    575                 projection,
    576                 authority,
    577                 created_at,
    578                 member_snapshot_cap,
    579                 before_membership_admin,
    580             ),
    581             _ => Ok(Vec::new()),
    582         },
    583         GroupEventClass::Normal { group_id } => match event.kind_u32() {
    584             KIND_GROUP_JOIN_REQUEST => Ok(vec![pending_record(
    585                 event,
    586                 GroupOutboxEffect::JoinAccepted,
    587                 group_id,
    588                 Some(event.pubkey()?),
    589                 GroupGeneratedEventBuilder::join_accepted_payload(
    590                     group_id,
    591                     &event.pubkey()?,
    592                     created_at,
    593                 ),
    594             )?]),
    595             KIND_GROUP_LEAVE_REQUEST => Ok(vec![pending_record(
    596                 event,
    597                 GroupOutboxEffect::LeaveAccepted,
    598                 group_id,
    599                 Some(event.pubkey()?),
    600                 GroupGeneratedEventBuilder::leave_accepted_payload(
    601                     group_id,
    602                     &event.pubkey()?,
    603                     created_at,
    604                 ),
    605             )?]),
    606             _ => Ok(Vec::new()),
    607         },
    608         GroupEventClass::NonGroup | GroupEventClass::RelayGeneratedSnapshot { .. } => {
    609             Ok(Vec::new())
    610         }
    611     }
    612 }
    613 
    614 fn member_snapshot_records(
    615     event: &(impl GroupEventView + ?Sized),
    616     group_id: &GroupId,
    617     projection: &GroupProjection,
    618     authority: &GroupAuthority,
    619     created_at: UnixTimestamp,
    620     member_snapshot_cap: u32,
    621     before_membership_admin: Option<bool>,
    622 ) -> Result<Vec<GroupOutboxRecord>, GroupError> {
    623     let mut records =
    624         member_snapshot_record(event, group_id, projection, created_at, member_snapshot_cap)?;
    625     if let Some(before) = before_membership_admin {
    626         let target = membership_target_pubkey(event)?;
    627         let after = member_is_relay_override_admin(projection, group_id, &target);
    628         if before != after {
    629             records.push(pending_record(
    630                 event,
    631                 GroupOutboxEffect::AdminListSnapshot,
    632                 group_id,
    633                 None,
    634                 GroupGeneratedEventBuilder::admin_list_snapshot_payload(
    635                     group_id, projection, authority, created_at,
    636                 )?,
    637             )?);
    638         }
    639     }
    640     Ok(records)
    641 }
    642 
    643 fn member_snapshot_record(
    644     event: &(impl GroupEventView + ?Sized),
    645     group_id: &GroupId,
    646     projection: &GroupProjection,
    647     created_at: UnixTimestamp,
    648     member_snapshot_cap: u32,
    649 ) -> Result<Vec<GroupOutboxRecord>, GroupError> {
    650     let key = GroupOutboxKey::new(
    651         event.id()?,
    652         GroupOutboxEffect::MemberListSnapshot,
    653         group_id.clone(),
    654         None,
    655     );
    656     let payload = GroupGeneratedEventBuilder::member_list_snapshot_payload(
    657         group_id,
    658         projection,
    659         created_at,
    660         member_snapshot_cap,
    661     )?;
    662     Ok(vec![match payload {
    663         Some(payload) => GroupOutboxRecord::pending(key, payload),
    664         None => {
    665             let mut record = GroupOutboxRecord::pending(
    666                 key,
    667                 GroupOutboxPayload::new(
    668                     KIND_GROUP_MEMBERS,
    669                     created_at,
    670                     vec![vec!["d".to_owned(), group_id.as_str().to_owned()]],
    671                     "",
    672                 ),
    673             );
    674             record.mark_skipped("member snapshot exceeds configured cap");
    675             record
    676         }
    677     }])
    678 }
    679 
    680 fn membership_admin_snapshot_state(
    681     projection: &GroupProjection,
    682     event: &(impl GroupEventView + ?Sized),
    683     class: &GroupEventClass,
    684 ) -> Result<Option<bool>, GroupError> {
    685     match class {
    686         GroupEventClass::Moderation { kind, group_id }
    687             if matches!(kind.as_u32(), KIND_GROUP_PUT_USER | KIND_GROUP_REMOVE_USER) =>
    688         {
    689             let target = membership_target_pubkey(event)?;
    690             Ok(Some(member_is_relay_override_admin(
    691                 projection, group_id, &target,
    692             )))
    693         }
    694         _ => Ok(None),
    695     }
    696 }
    697 
    698 fn member_is_relay_override_admin(
    699     projection: &GroupProjection,
    700     group_id: &GroupId,
    701     pubkey: &PublicKeyHex,
    702 ) -> bool {
    703     projection
    704         .member(group_id, pubkey)
    705         .filter(|member| member.status() == MemberStatus::Member)
    706         .is_some_and(|member| {
    707             member
    708                 .roles()
    709                 .contains(&RoleName::permanent_relay_override())
    710         })
    711 }
    712 
    713 fn membership_target_pubkey(
    714     event: &(impl GroupEventView + ?Sized),
    715 ) -> Result<PublicKeyHex, GroupError> {
    716     let mut target = None;
    717     event.visit_tags(|tag| {
    718         if tag.first_value().is_none_or(|name| name != "p") {
    719             return Ok(());
    720         }
    721         let Some((_, value)) = tag.indexed_pair() else {
    722             return Err(GroupError::invalid(
    723                 GroupErrorKind::MalformedTargetTag,
    724                 "malformed p target tag",
    725             ));
    726         };
    727         target = Some(PublicKeyHex::new(value).map_err(|reason| {
    728             GroupError::invalid(
    729                 GroupErrorKind::MalformedTargetTag,
    730                 format!("malformed p target tag: {reason}"),
    731             )
    732         })?);
    733         Ok(())
    734     })?;
    735     target.ok_or_else(|| {
    736         GroupError::invalid(GroupErrorKind::MissingTargetTag, "missing p target tag")
    737     })
    738 }
    739 
    740 fn pending_record(
    741     event: &(impl GroupEventView + ?Sized),
    742     effect: GroupOutboxEffect,
    743     group_id: &GroupId,
    744     target_pubkey: Option<PublicKeyHex>,
    745     payload: GroupOutboxPayload,
    746 ) -> Result<GroupOutboxRecord, GroupError> {
    747     Ok(GroupOutboxRecord::pending(
    748         GroupOutboxKey::new(event.id()?, effect, group_id.clone(), target_pubkey),
    749         payload,
    750     ))
    751 }
    752 
    753 fn require_projected_group<'a>(
    754     projection: &'a GroupProjection,
    755     group_id: &GroupId,
    756 ) -> Result<&'a GroupState, GroupError> {
    757     projection
    758         .group(group_id)
    759         .ok_or_else(|| GroupError::internal("group projection is missing after accepted write"))
    760 }
    761 
    762 #[derive(Debug, Clone, PartialEq, Eq)]
    763 struct GroupStorageState {
    764     projection: GroupProjection,
    765     outbox: GroupOutbox,
    766 }
    767 
    768 fn load_group_storage(
    769     store: &PocketStoreHandle,
    770     limits: GroupLimitsConfig,
    771 ) -> Result<GroupStorageState, BaseRelayError> {
    772     let checkpoint_status = validate_group_checkpoint(store)?;
    773     let outbox_records = store.scan_extra_records(TANGLE_GROUP_OUTBOX_TABLE)?;
    774     if checkpoint_status.requires_rebuild() {
    775         let scan = scan_canonical_group_events(store, limits)?;
    776         let report =
    777             rebuild_group_projection(scan.into_events(), limits, projection_rebuilt_at()?)?;
    778         persist_group_projection_snapshot(store, report.projection())?;
    779         validate_rebuilt_group_projection(store)?;
    780         return Ok(GroupStorageState {
    781             projection: report.into_projection(),
    782             outbox: load_group_outbox(outbox_records)?,
    783         });
    784     }
    785     let checkpoint = checkpoint_status.checkpoint().cloned();
    786     let projection_records = store.scan_extra_records(TANGLE_GROUP_PROJECTION_TABLE)?;
    787     let mut projection = load_group_projection(projection_records, checkpoint)?;
    788     apply_canonical_events_after_checkpoint(store, &mut projection, limits)?;
    789     Ok(GroupStorageState {
    790         projection,
    791         outbox: load_group_outbox(outbox_records)?,
    792     })
    793 }
    794 
    795 fn load_group_projection(
    796     records: Vec<(Vec<u8>, Vec<u8>)>,
    797     checkpoint: Option<ProjectionCheckpoint>,
    798 ) -> Result<GroupProjection, BaseRelayError> {
    799     let mut projection = GroupProjection::new();
    800     for (key, value) in records {
    801         match projection_key_parts(&key)?.as_slice() {
    802             ["group", _] => projection.put_group(GroupState::from_json_bytes(&value)?),
    803             ["member", group_id, _] => projection.put_member(
    804                 GroupId::new(group_id)?,
    805                 MemberState::from_json_bytes(&value)?,
    806             ),
    807             ["role", group_id, _] => projection.put_role(
    808                 GroupId::new(group_id)?,
    809                 ProjectedRoleDefinition::from_json_bytes(&value)?,
    810             ),
    811             ["tombstone", _] => projection.put_tombstone(GroupTombstone::from_json_bytes(&value)?),
    812             ["event_deletion", _] => {
    813                 projection.put_event_deletion(GroupEventDeletion::from_json_bytes(&value)?)
    814             }
    815             _ => {
    816                 return Err(BaseRelayError::error(format!(
    817                     "unknown group projection extra-table key: {}",
    818                     projection_key_label(&key)
    819                 )));
    820             }
    821         }
    822     }
    823     if let Some(checkpoint) = checkpoint {
    824         projection.set_checkpoint(checkpoint);
    825     }
    826     Ok(projection)
    827 }
    828 
    829 fn load_group_outbox(records: Vec<(Vec<u8>, Vec<u8>)>) -> Result<GroupOutbox, BaseRelayError> {
    830     let mut outbox = GroupOutbox::new();
    831     for (_, value) in records {
    832         outbox.update(GroupOutboxRecord::from_json_bytes(&value)?);
    833     }
    834     Ok(outbox)
    835 }
    836 
    837 fn apply_canonical_events_after_checkpoint(
    838     store: &PocketStoreHandle,
    839     projection: &mut GroupProjection,
    840     limits: GroupLimitsConfig,
    841 ) -> Result<(), BaseRelayError> {
    842     let last_offset = projection
    843         .checkpoint()
    844         .and_then(ProjectionCheckpoint::last_offset);
    845     let scan = scan_canonical_group_events_after(store, last_offset, limits)?;
    846     if scan.events().is_empty() {
    847         return Ok(());
    848     }
    849     let mut events = scan.into_events();
    850     let next_offset = events.iter().map(CanonicalGroupEvent::store_offset).max();
    851     events.sort_by_key(CanonicalGroupEvent::tuple);
    852     for item in events {
    853         projection.apply_canonical_event(item.event(), item.store_offset(), limits)?;
    854     }
    855     projection.set_checkpoint(ProjectionCheckpoint::current(
    856         next_offset,
    857         projection_rebuilt_at()?,
    858     ));
    859     persist_group_projection_snapshot(store, projection)?;
    860     validate_rebuilt_group_projection(store)
    861 }
    862 
    863 fn persist_group_projection_snapshot(
    864     store: &PocketStoreHandle,
    865     projection: &GroupProjection,
    866 ) -> Result<(), BaseRelayError> {
    867     clear_extra_table(store, TANGLE_GROUP_PROJECTION_TABLE)?;
    868     for (group_id, group) in projection.groups() {
    869         store.put_extra_record(
    870             TANGLE_GROUP_PROJECTION_TABLE,
    871             &group_current_key(group_id),
    872             &group.to_json_bytes()?,
    873         )?;
    874     }
    875     for ((group_id, pubkey), member) in projection.members() {
    876         store.put_extra_record(
    877             TANGLE_GROUP_PROJECTION_TABLE,
    878             &member_current_key(group_id, pubkey),
    879             &member.to_json_bytes()?,
    880         )?;
    881     }
    882     for ((group_id, role_name), role) in projection.roles() {
    883         store.put_extra_record(
    884             TANGLE_GROUP_PROJECTION_TABLE,
    885             &role_current_key(group_id, role_name),
    886             &role.to_json_bytes()?,
    887         )?;
    888     }
    889     for (group_id, tombstone) in projection.tombstones() {
    890         store.put_extra_record(
    891             TANGLE_GROUP_PROJECTION_TABLE,
    892             &tombstone_key(group_id),
    893             &tombstone.to_json_bytes()?,
    894         )?;
    895     }
    896     for (target_event_id, deletion) in projection.event_deletions() {
    897         store.put_extra_record(
    898             TANGLE_GROUP_PROJECTION_TABLE,
    899             &event_deletion_key(target_event_id),
    900             &deletion.to_json_bytes()?,
    901         )?;
    902     }
    903     let checkpoint = projection
    904         .checkpoint()
    905         .ok_or_else(|| BaseRelayError::error("group projection rebuild checkpoint is missing"))?;
    906     store.put_extra_record(
    907         TANGLE_GROUP_CHECKPOINT_TABLE,
    908         &projection_checkpoint_key(),
    909         &checkpoint.to_json_bytes()?,
    910     )?;
    911     Ok(())
    912 }
    913 
    914 fn clear_extra_table(store: &PocketStoreHandle, table: &'static str) -> Result<(), BaseRelayError> {
    915     for (key, _) in store.scan_extra_records(table)? {
    916         store.delete_extra_record(table, &key)?;
    917     }
    918     Ok(())
    919 }
    920 
    921 fn validate_rebuilt_group_projection(store: &PocketStoreHandle) -> Result<(), BaseRelayError> {
    922     let validation = validate_group_extra_tables(store)?;
    923     if validation.checkpoint_status().requires_rebuild() {
    924         return Err(BaseRelayError::error(
    925             "group projection checkpoint is not current after rebuild",
    926         ));
    927     }
    928     Ok(())
    929 }
    930 
    931 #[derive(Debug, Clone, PartialEq, Eq)]
    932 pub struct GroupExtraTableValidation {
    933     projection_records: usize,
    934     outbox_records: usize,
    935     checkpoint_status: GroupCheckpointStatus,
    936 }
    937 
    938 impl GroupExtraTableValidation {
    939     pub fn projection_records(&self) -> usize {
    940         self.projection_records
    941     }
    942 
    943     pub fn outbox_records(&self) -> usize {
    944         self.outbox_records
    945     }
    946 
    947     pub fn checkpoint_status(&self) -> &GroupCheckpointStatus {
    948         &self.checkpoint_status
    949     }
    950 }
    951 
    952 #[derive(Debug, Clone, PartialEq, Eq)]
    953 pub enum GroupCheckpointStatus {
    954     Missing,
    955     Current { checkpoint: ProjectionCheckpoint },
    956     Stale { checkpoint: ProjectionCheckpoint },
    957 }
    958 
    959 impl GroupCheckpointStatus {
    960     pub fn requires_rebuild(&self) -> bool {
    961         !matches!(self, Self::Current { .. })
    962     }
    963 
    964     pub fn checkpoint(&self) -> Option<&ProjectionCheckpoint> {
    965         match self {
    966             Self::Missing => None,
    967             Self::Current { checkpoint } | Self::Stale { checkpoint } => Some(checkpoint),
    968         }
    969     }
    970 }
    971 
    972 pub fn validate_group_extra_tables(
    973     store: &PocketStoreHandle,
    974 ) -> Result<GroupExtraTableValidation, BaseRelayError> {
    975     let projection_records = validate_group_projection_records(store)?;
    976     let outbox_records = validate_group_outbox_records(store)?;
    977     let checkpoint_status = validate_group_checkpoint(store)?;
    978     Ok(GroupExtraTableValidation {
    979         projection_records,
    980         outbox_records,
    981         checkpoint_status,
    982     })
    983 }
    984 
    985 fn validate_group_projection_records(store: &PocketStoreHandle) -> Result<usize, BaseRelayError> {
    986     let records = store.scan_extra_records(TANGLE_GROUP_PROJECTION_TABLE)?;
    987     let count = records.len();
    988     for (key, value) in records {
    989         match projection_key_parts(&key)?.as_slice() {
    990             ["group", _] => {
    991                 GroupState::from_json_bytes(&value)?;
    992             }
    993             ["member", _, _] => {
    994                 MemberState::from_json_bytes(&value)?;
    995             }
    996             ["role", _, _] => {
    997                 ProjectedRoleDefinition::from_json_bytes(&value)?;
    998             }
    999             ["tombstone", _] => {
   1000                 GroupTombstone::from_json_bytes(&value)?;
   1001             }
   1002             ["event_deletion", _] => {
   1003                 GroupEventDeletion::from_json_bytes(&value)?;
   1004             }
   1005             _ => {
   1006                 return Err(BaseRelayError::error(format!(
   1007                     "unknown group projection extra-table key: {}",
   1008                     projection_key_label(&key)
   1009                 )));
   1010             }
   1011         }
   1012     }
   1013     Ok(count)
   1014 }
   1015 
   1016 fn validate_group_outbox_records(store: &PocketStoreHandle) -> Result<usize, BaseRelayError> {
   1017     let records = store.scan_extra_records(TANGLE_GROUP_OUTBOX_TABLE)?;
   1018     let count = records.len();
   1019     for (_, value) in records {
   1020         GroupOutboxRecord::from_json_bytes(&value)?;
   1021     }
   1022     Ok(count)
   1023 }
   1024 
   1025 fn validate_group_checkpoint(
   1026     store: &PocketStoreHandle,
   1027 ) -> Result<GroupCheckpointStatus, BaseRelayError> {
   1028     let Some(raw) =
   1029         store.get_extra_record(TANGLE_GROUP_CHECKPOINT_TABLE, &projection_checkpoint_key())?
   1030     else {
   1031         return Ok(GroupCheckpointStatus::Missing);
   1032     };
   1033     let checkpoint = ProjectionCheckpoint::from_json_bytes(&raw)?;
   1034     if checkpoint.matches_current_versions() {
   1035         Ok(GroupCheckpointStatus::Current { checkpoint })
   1036     } else {
   1037         Ok(GroupCheckpointStatus::Stale { checkpoint })
   1038     }
   1039 }
   1040 
   1041 fn projection_rebuilt_at() -> Result<UnixTimestamp, BaseRelayError> {
   1042     Ok(UnixTimestamp::new(
   1043         SystemTime::now()
   1044             .duration_since(UNIX_EPOCH)
   1045             .map_err(|error| {
   1046                 BaseRelayError::error(format!("system clock is before UNIX epoch: {error}"))
   1047             })?
   1048             .as_secs(),
   1049     ))
   1050 }
   1051 
   1052 #[derive(Debug, Clone, PartialEq, Eq)]
   1053 pub struct CanonicalGroupEventScan {
   1054     events: Vec<CanonicalGroupEvent>,
   1055     scanned_events: usize,
   1056     skipped_events: usize,
   1057 }
   1058 
   1059 impl CanonicalGroupEventScan {
   1060     pub fn events(&self) -> &[CanonicalGroupEvent] {
   1061         &self.events
   1062     }
   1063 
   1064     pub fn into_events(self) -> Vec<CanonicalGroupEvent> {
   1065         self.events
   1066     }
   1067 
   1068     pub fn scanned_events(&self) -> usize {
   1069         self.scanned_events
   1070     }
   1071 
   1072     pub fn skipped_events(&self) -> usize {
   1073         self.skipped_events
   1074     }
   1075 }
   1076 
   1077 pub fn scan_canonical_group_events(
   1078     store: &PocketStoreHandle,
   1079     limits: GroupLimitsConfig,
   1080 ) -> Result<CanonicalGroupEventScan, BaseRelayError> {
   1081     scan_canonical_group_events_after(store, None, limits)
   1082 }
   1083 
   1084 pub fn scan_canonical_group_events_after(
   1085     store: &PocketStoreHandle,
   1086     last_offset: Option<StoreOffset>,
   1087     limits: GroupLimitsConfig,
   1088 ) -> Result<CanonicalGroupEventScan, BaseRelayError> {
   1089     let stored_events = store.scan_events_after(last_offset.map(StoreOffset::as_u64))?;
   1090     let scanned_events = stored_events.len();
   1091     let mut events = Vec::new();
   1092     let mut skipped_events = 0;
   1093     for stored in stored_events {
   1094         match tangle_groups::classify_group_event(stored.event(), limits)? {
   1095             GroupEventClass::NonGroup => skipped_events += 1,
   1096             GroupEventClass::Normal { .. }
   1097             | GroupEventClass::Moderation { .. }
   1098             | GroupEventClass::RelayGeneratedSnapshot { .. } => {
   1099                 let store_offset = StoreOffset::new(stored.store_offset());
   1100                 events.push(CanonicalGroupEvent::new(stored.into_event(), store_offset));
   1101             }
   1102         }
   1103     }
   1104     Ok(CanonicalGroupEventScan {
   1105         events,
   1106         scanned_events,
   1107         skipped_events,
   1108     })
   1109 }
   1110 
   1111 fn projection_key_parts(key: &[u8]) -> Result<Vec<&str>, BaseRelayError> {
   1112     let key = str::from_utf8(key).map_err(|error| BaseRelayError::error(error.to_string()))?;
   1113     Ok(key.split('\0').collect())
   1114 }
   1115 
   1116 fn projection_key_label(key: &[u8]) -> String {
   1117     String::from_utf8_lossy(key).replace('\0', "\\0")
   1118 }
   1119 
   1120 fn persist_outbox_record(
   1121     store: &PocketStoreHandle,
   1122     record: &GroupOutboxRecord,
   1123 ) -> Result<(), BaseRelayError> {
   1124     store.put_extra_record(
   1125         TANGLE_GROUP_OUTBOX_TABLE,
   1126         &record.key().storage_key(),
   1127         &record.to_json_bytes()?,
   1128     )?;
   1129     Ok(())
   1130 }
   1131 
   1132 fn generated_event_already_stored(
   1133     store: &PocketStoreHandle,
   1134     event_id: PocketEventId,
   1135 ) -> Result<bool, BaseRelayError> {
   1136     if store.event_by_id(event_id)?.is_some() {
   1137         return Ok(true);
   1138     }
   1139     for stored in store.scan_events()? {
   1140         if stored.event().id() == event_id {
   1141             return Ok(true);
   1142         }
   1143     }
   1144     Ok(false)
   1145 }
   1146 
   1147 fn class_group_id(class: &GroupEventClass) -> Option<&GroupId> {
   1148     match class {
   1149         GroupEventClass::Moderation { group_id, .. }
   1150         | GroupEventClass::Normal { group_id }
   1151         | GroupEventClass::RelayGeneratedSnapshot { group_id, .. } => Some(group_id),
   1152         GroupEventClass::NonGroup => None,
   1153     }
   1154 }
   1155 
   1156 fn delete_target_event_id(event: &(impl GroupEventView + ?Sized)) -> Result<EventId, GroupError> {
   1157     let mut target = None;
   1158     event.visit_tags(|tag| {
   1159         if tag.first_value().is_none_or(|name| name != "e") {
   1160             return Ok(());
   1161         }
   1162         let Some((_, value)) = tag.indexed_pair() else {
   1163             return Err(GroupError::invalid(
   1164                 GroupErrorKind::MalformedTargetTag,
   1165                 "malformed e target tag",
   1166             ));
   1167         };
   1168         target = Some(EventId::new(value).map_err(|reason| {
   1169             GroupError::invalid(
   1170                 GroupErrorKind::MalformedTargetTag,
   1171                 format!("malformed e target tag: {reason}"),
   1172             )
   1173         })?);
   1174         Ok(())
   1175     })?;
   1176     target.ok_or_else(|| {
   1177         GroupError::invalid(GroupErrorKind::MissingTargetTag, "missing e target tag")
   1178     })
   1179 }
   1180 
   1181 #[cfg(test)]
   1182 mod tests {
   1183     use super::{
   1184         GeneratedGroupStorageEvent, GroupCheckpointStatus, GroupServiceHandle,
   1185         scan_canonical_group_events, scan_canonical_group_events_after,
   1186         validate_group_extra_tables,
   1187     };
   1188     use crate::pocket_conversion::tangle_event_to_pocket;
   1189     use tangle_crypto::RelaySigner;
   1190     use tangle_groups::{
   1191         GroupGeneratedEventBuilder, GroupId, GroupRuntimeConfig, KIND_GROUP_METADATA,
   1192         KIND_GROUP_PUT_USER, ProjectionCheckpoint, StoreOffset, projection_checkpoint_key,
   1193     };
   1194     use tangle_protocol::{Tag, UnixTimestamp};
   1195     use tangle_store_pocket::{
   1196         PocketEvent, PocketStoreConfig, PocketStoreHandle, PocketSyncPolicy,
   1197         TANGLE_GROUP_CHECKPOINT_TABLE, TANGLE_GROUP_PROJECTION_TABLE,
   1198     };
   1199     use tangle_test_support::{
   1200         FixtureKey, tangle_v2_event, tangle_v2_group_create_event, tangle_v2_group_event,
   1201     };
   1202 
   1203     #[test]
   1204     fn generated_group_storage_event_adapter_preserves_pocket_id_signature_and_tags() {
   1205         let builder = GroupGeneratedEventBuilder::new(
   1206             RelaySigner::from_secret_hex(&"7".repeat(64)).expect("key"),
   1207         );
   1208         let group_id = GroupId::new("PocketFarm").expect("group");
   1209         let member = FixtureKey::Member.public_key();
   1210         let payload = GroupGeneratedEventBuilder::join_accepted_payload(
   1211             &group_id,
   1212             &member,
   1213             UnixTimestamp::new(1_714_124_433),
   1214         );
   1215         let generated = GeneratedGroupStorageEvent::build(&builder, &payload).expect("generated");
   1216 
   1217         assert_eq!(
   1218             generated.event().id().as_hex_string(),
   1219             generated.event_id().expect("event id").as_str()
   1220         );
   1221         assert_eq!(
   1222             generated.event().pubkey().as_hex_string(),
   1223             builder.relay_pubkey().as_str()
   1224         );
   1225         assert_eq!(
   1226             u32::from(generated.event().kind().as_u16()),
   1227             KIND_GROUP_PUT_USER
   1228         );
   1229         assert!(has_pocket_tag(generated.event(), &["h", "PocketFarm"]));
   1230         assert!(has_pocket_tag(generated.event(), &["p", member.as_str()]));
   1231         generated.event().verify().expect("signature");
   1232     }
   1233 
   1234     #[test]
   1235     fn group_service_from_disabled_config_is_absent() {
   1236         let root = std::env::temp_dir().join(format!(
   1237             "tangle-group-service-disabled-{}",
   1238             std::process::id()
   1239         ));
   1240         let _ = std::fs::remove_dir_all(&root);
   1241         let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown)
   1242             .expect("config");
   1243         let store = PocketStoreHandle::open(&config).expect("store");
   1244 
   1245         assert!(
   1246             GroupServiceHandle::from_config(&store, &GroupRuntimeConfig::disabled())
   1247                 .expect("service")
   1248                 .is_none()
   1249         );
   1250     }
   1251 
   1252     #[test]
   1253     fn canonical_group_event_scanner_returns_group_events_with_offsets() {
   1254         let root = std::env::temp_dir().join(format!(
   1255             "tangle-canonical-group-scan-{}",
   1256             std::process::id()
   1257         ));
   1258         let _ = std::fs::remove_dir_all(&root);
   1259         let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown)
   1260             .expect("config");
   1261         let store = PocketStoreHandle::open(&config).expect("store");
   1262         let public =
   1263             tangle_v2_event(FixtureKey::Member, 1, 1, Vec::new(), "public").expect("public");
   1264         let normal =
   1265             tangle_v2_group_event(FixtureKey::Member, "ScanFarm", 2, 1, "normal").expect("normal");
   1266         let group =
   1267             tangle_v2_group_create_event(FixtureKey::Owner, "ScanFarm", 3, &[]).expect("group");
   1268         let generated = tangle_v2_event(
   1269             FixtureKey::Owner,
   1270             4,
   1271             KIND_GROUP_METADATA.into(),
   1272             vec![Tag::from_parts("d", &["ScanFarm"]).expect("d")],
   1273             "",
   1274         )
   1275         .expect("generated");
   1276         let public_offset = store
   1277             .store_event(&tangle_event_to_pocket(&public).expect("public pocket"))
   1278             .expect("store public");
   1279         let normal_offset = store
   1280             .store_event(&tangle_event_to_pocket(&normal).expect("normal pocket"))
   1281             .expect("store normal");
   1282         let group_offset = store
   1283             .store_event(&tangle_event_to_pocket(&group).expect("group pocket"))
   1284             .expect("store group");
   1285         let generated_offset = store
   1286             .store_event(&tangle_event_to_pocket(&generated).expect("generated pocket"))
   1287             .expect("store generated");
   1288 
   1289         let scan = scan_canonical_group_events(&store, Default::default()).expect("scan");
   1290         let after_public = scan_canonical_group_events_after(
   1291             &store,
   1292             Some(StoreOffset::new(public_offset)),
   1293             Default::default(),
   1294         )
   1295         .expect("after public");
   1296 
   1297         assert_eq!(scan.scanned_events(), 4);
   1298         assert_eq!(scan.skipped_events(), 1);
   1299         assert_eq!(
   1300             scan.events()
   1301                 .iter()
   1302                 .map(|event| event.event().id().as_hex_string())
   1303                 .collect::<Vec<_>>(),
   1304             vec![
   1305                 normal.id().as_str().to_owned(),
   1306                 group.id().as_str().to_owned(),
   1307                 generated.id().as_str().to_owned(),
   1308             ]
   1309         );
   1310         assert_eq!(
   1311             scan.events()
   1312                 .iter()
   1313                 .map(|event| event.store_offset())
   1314                 .collect::<Vec<_>>(),
   1315             vec![
   1316                 StoreOffset::new(normal_offset),
   1317                 StoreOffset::new(group_offset),
   1318                 StoreOffset::new(generated_offset),
   1319             ]
   1320         );
   1321         assert_eq!(after_public.scanned_events(), 3);
   1322         assert_eq!(after_public.skipped_events(), 0);
   1323         assert_eq!(
   1324             after_public
   1325                 .events()
   1326                 .iter()
   1327                 .map(|event| event.event().id().as_hex_string())
   1328                 .collect::<Vec<_>>(),
   1329             vec![
   1330                 normal.id().as_str().to_owned(),
   1331                 group.id().as_str().to_owned(),
   1332                 generated.id().as_str().to_owned(),
   1333             ]
   1334         );
   1335 
   1336         let _ = std::fs::remove_dir_all(root);
   1337     }
   1338 
   1339     #[test]
   1340     fn group_extra_table_validation_reports_checkpoint_version_status() {
   1341         let (root, store) = test_store("tangle-group-extra-version");
   1342         let missing = validate_group_extra_tables(&store).expect("missing");
   1343 
   1344         assert_eq!(missing.projection_records(), 0);
   1345         assert_eq!(missing.outbox_records(), 0);
   1346         assert_eq!(missing.checkpoint_status(), &GroupCheckpointStatus::Missing);
   1347         assert!(missing.checkpoint_status().requires_rebuild());
   1348 
   1349         let current =
   1350             ProjectionCheckpoint::current(Some(StoreOffset::new(42)), UnixTimestamp::new(100));
   1351         store
   1352             .put_extra_record(
   1353                 TANGLE_GROUP_CHECKPOINT_TABLE,
   1354                 &projection_checkpoint_key(),
   1355                 &current.to_json_bytes().expect("current bytes"),
   1356             )
   1357             .expect("put current");
   1358         let current_validation = validate_group_extra_tables(&store).expect("current");
   1359         assert_eq!(
   1360             current_validation.checkpoint_status(),
   1361             &GroupCheckpointStatus::Current {
   1362                 checkpoint: current.clone()
   1363             }
   1364         );
   1365         assert!(!current_validation.checkpoint_status().requires_rebuild());
   1366         assert_eq!(
   1367             current_validation.checkpoint_status().checkpoint(),
   1368             Some(&current)
   1369         );
   1370 
   1371         let stale =
   1372             ProjectionCheckpoint::new(0, 0, Some(StoreOffset::new(42)), UnixTimestamp::new(101));
   1373         store
   1374             .put_extra_record(
   1375                 TANGLE_GROUP_CHECKPOINT_TABLE,
   1376                 &projection_checkpoint_key(),
   1377                 &stale.to_json_bytes().expect("stale bytes"),
   1378             )
   1379             .expect("put stale");
   1380         let stale_validation = validate_group_extra_tables(&store).expect("stale");
   1381         assert_eq!(
   1382             stale_validation.checkpoint_status(),
   1383             &GroupCheckpointStatus::Stale {
   1384                 checkpoint: stale.clone()
   1385             }
   1386         );
   1387         assert!(stale_validation.checkpoint_status().requires_rebuild());
   1388 
   1389         let _ = std::fs::remove_dir_all(root);
   1390     }
   1391 
   1392     #[test]
   1393     fn group_extra_table_validation_rejects_bad_projection_schema() {
   1394         let (unknown_root, unknown_store) = test_store("tangle-group-extra-unknown");
   1395         unknown_store
   1396             .put_extra_record(TANGLE_GROUP_PROJECTION_TABLE, b"unknown\0Farm", b"{}")
   1397             .expect("put unknown");
   1398         assert_eq!(
   1399             validate_group_extra_tables(&unknown_store)
   1400                 .expect_err("unknown")
   1401                 .prefixed_message(),
   1402             "error: unknown group projection extra-table key: unknown\\0Farm"
   1403         );
   1404         let _ = std::fs::remove_dir_all(unknown_root);
   1405 
   1406         let (corrupt_root, corrupt_store) = test_store("tangle-group-extra-corrupt");
   1407         corrupt_store
   1408             .put_extra_record(TANGLE_GROUP_PROJECTION_TABLE, b"group\0Farm", b"not-json")
   1409             .expect("put corrupt");
   1410         assert!(
   1411             validate_group_extra_tables(&corrupt_store)
   1412                 .expect_err("corrupt")
   1413                 .prefixed_message()
   1414                 .contains("group state JSON decode failed")
   1415         );
   1416         let _ = std::fs::remove_dir_all(corrupt_root);
   1417     }
   1418 
   1419     fn test_store(name: &str) -> (std::path::PathBuf, PocketStoreHandle) {
   1420         let root = std::env::temp_dir().join(format!("{}-{}", name, std::process::id()));
   1421         let _ = std::fs::remove_dir_all(&root);
   1422         let config = PocketStoreConfig::new(root.join("pocket"), PocketSyncPolicy::FlushOnShutdown)
   1423             .expect("config");
   1424         let store = PocketStoreHandle::open(&config).expect("store");
   1425         (root, store)
   1426     }
   1427 
   1428     fn has_pocket_tag(event: &PocketEvent, expected: &[&str]) -> bool {
   1429         event.tags().expect("tags").iter().any(|tag| {
   1430             tag.map(|value| std::str::from_utf8(value).expect("utf8"))
   1431                 .eq(expected.iter().copied())
   1432         })
   1433     }
   1434 }