tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

outbox.rs (21985B)


      1 use std::collections::{BTreeMap, BTreeSet};
      2 
      3 use crate::{GroupError, GroupId};
      4 use serde::{Deserialize, Serialize};
      5 use tangle_protocol::{EventId, PublicKeyHex, UnixTimestamp};
      6 
      7 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
      8 pub enum GroupOutboxEffect {
      9     MetadataSnapshot,
     10     AdminListSnapshot,
     11     MemberListSnapshot,
     12     RoleListSnapshot,
     13     State39004Snapshot,
     14     JoinAccepted,
     15     LeaveAccepted,
     16 }
     17 
     18 impl GroupOutboxEffect {
     19     pub fn as_str(self) -> &'static str {
     20         match self {
     21             Self::MetadataSnapshot => "metadata_snapshot",
     22             Self::AdminListSnapshot => "admin_list_snapshot",
     23             Self::MemberListSnapshot => "member_list_snapshot",
     24             Self::RoleListSnapshot => "role_list_snapshot",
     25             Self::State39004Snapshot => "state_39004_snapshot",
     26             Self::JoinAccepted => "join_accepted",
     27             Self::LeaveAccepted => "leave_accepted",
     28         }
     29     }
     30 
     31     pub fn from_label(value: &str) -> Result<Self, GroupError> {
     32         match value {
     33             "metadata_snapshot" => Ok(Self::MetadataSnapshot),
     34             "admin_list_snapshot" => Ok(Self::AdminListSnapshot),
     35             "member_list_snapshot" => Ok(Self::MemberListSnapshot),
     36             "role_list_snapshot" => Ok(Self::RoleListSnapshot),
     37             "state_39004_snapshot" => Ok(Self::State39004Snapshot),
     38             "join_accepted" => Ok(Self::JoinAccepted),
     39             "leave_accepted" => Ok(Self::LeaveAccepted),
     40             _ => Err(GroupError::internal(format!(
     41                 "unknown outbox effect {value}"
     42             ))),
     43         }
     44     }
     45 }
     46 
     47 #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
     48 pub struct GroupOutboxKey {
     49     source_event_id: EventId,
     50     effect: GroupOutboxEffect,
     51     group_id: GroupId,
     52     target_pubkey: Option<PublicKeyHex>,
     53 }
     54 
     55 impl GroupOutboxKey {
     56     pub fn new(
     57         source_event_id: EventId,
     58         effect: GroupOutboxEffect,
     59         group_id: GroupId,
     60         target_pubkey: Option<PublicKeyHex>,
     61     ) -> Self {
     62         Self {
     63             source_event_id,
     64             effect,
     65             group_id,
     66             target_pubkey,
     67         }
     68     }
     69 
     70     pub fn source_event_id(&self) -> &EventId {
     71         &self.source_event_id
     72     }
     73 
     74     pub fn effect(&self) -> GroupOutboxEffect {
     75         self.effect
     76     }
     77 
     78     pub fn group_id(&self) -> &GroupId {
     79         &self.group_id
     80     }
     81 
     82     pub fn target_pubkey(&self) -> Option<&PublicKeyHex> {
     83         self.target_pubkey.as_ref()
     84     }
     85 
     86     pub fn storage_key(&self) -> Vec<u8> {
     87         let mut key = Vec::new();
     88         key.extend_from_slice(self.source_event_id.as_str().as_bytes());
     89         key.push(0);
     90         key.extend_from_slice(self.effect.as_str().as_bytes());
     91         key.push(0);
     92         key.extend_from_slice(self.group_id.as_str().as_bytes());
     93         key.push(0);
     94         if let Some(pubkey) = &self.target_pubkey {
     95             key.extend_from_slice(pubkey.as_str().as_bytes());
     96         }
     97         key
     98     }
     99 }
    100 
    101 #[derive(Debug, Clone, PartialEq, Eq)]
    102 pub enum GroupOutboxStatus {
    103     Pending,
    104     Stored { generated_event_id: EventId },
    105     Skipped { reason: String },
    106     Failed { retryable: bool },
    107 }
    108 
    109 #[derive(Debug, Clone, PartialEq, Eq)]
    110 pub struct GroupOutboxPayload {
    111     generated_kind: u32,
    112     generated_created_at: UnixTimestamp,
    113     tags: Vec<Vec<String>>,
    114     content: String,
    115 }
    116 
    117 impl GroupOutboxPayload {
    118     pub fn new(
    119         generated_kind: u32,
    120         generated_created_at: UnixTimestamp,
    121         tags: Vec<Vec<String>>,
    122         content: impl Into<String>,
    123     ) -> Self {
    124         Self {
    125             generated_kind,
    126             generated_created_at,
    127             tags,
    128             content: content.into(),
    129         }
    130     }
    131 
    132     pub fn generated_kind(&self) -> u32 {
    133         self.generated_kind
    134     }
    135 
    136     pub fn generated_created_at(&self) -> UnixTimestamp {
    137         self.generated_created_at
    138     }
    139 
    140     pub fn tags(&self) -> &[Vec<String>] {
    141         &self.tags
    142     }
    143 
    144     pub fn content(&self) -> &str {
    145         &self.content
    146     }
    147 }
    148 
    149 #[derive(Debug, Clone, PartialEq, Eq)]
    150 pub struct GroupOutboxRecord {
    151     key: GroupOutboxKey,
    152     status: GroupOutboxStatus,
    153     payload: GroupOutboxPayload,
    154     attempts: u32,
    155     last_error: Option<String>,
    156 }
    157 
    158 impl GroupOutboxRecord {
    159     pub fn pending(key: GroupOutboxKey, payload: GroupOutboxPayload) -> Self {
    160         Self {
    161             key,
    162             status: GroupOutboxStatus::Pending,
    163             payload,
    164             attempts: 0,
    165             last_error: None,
    166         }
    167     }
    168 
    169     pub fn key(&self) -> &GroupOutboxKey {
    170         &self.key
    171     }
    172 
    173     pub fn status(&self) -> &GroupOutboxStatus {
    174         &self.status
    175     }
    176 
    177     pub fn payload(&self) -> &GroupOutboxPayload {
    178         &self.payload
    179     }
    180 
    181     pub fn attempts(&self) -> u32 {
    182         self.attempts
    183     }
    184 
    185     pub fn last_error(&self) -> Option<&str> {
    186         self.last_error.as_deref()
    187     }
    188 
    189     pub fn mark_stored(&mut self, generated_event_id: EventId) {
    190         self.status = GroupOutboxStatus::Stored { generated_event_id };
    191         self.last_error = None;
    192     }
    193 
    194     pub fn mark_skipped(&mut self, reason: impl Into<String>) {
    195         self.status = GroupOutboxStatus::Skipped {
    196             reason: reason.into(),
    197         };
    198     }
    199 
    200     pub fn mark_failed(&mut self, retryable: bool, error: impl Into<String>) {
    201         self.status = GroupOutboxStatus::Failed { retryable };
    202         self.attempts = self.attempts.saturating_add(1);
    203         self.last_error = Some(error.into());
    204     }
    205 
    206     pub fn is_retryable(&self) -> bool {
    207         matches!(
    208             self.status,
    209             GroupOutboxStatus::Pending | GroupOutboxStatus::Failed { retryable: true }
    210         )
    211     }
    212 
    213     pub fn to_json_bytes(&self) -> Result<Vec<u8>, GroupError> {
    214         serde_json::to_vec(&GroupOutboxRecordDocument::from_record(self)).map_err(|error| {
    215             GroupError::internal(format!("outbox record JSON encode failed: {error}"))
    216         })
    217     }
    218 
    219     pub fn from_json_bytes(raw: &[u8]) -> Result<Self, GroupError> {
    220         let document =
    221             serde_json::from_slice::<GroupOutboxRecordDocument>(raw).map_err(|error| {
    222                 GroupError::internal(format!("outbox record JSON decode failed: {error}"))
    223             })?;
    224         document.into_record()
    225     }
    226 }
    227 
    228 #[derive(Debug, Clone, PartialEq, Eq, Default)]
    229 pub struct GroupOutbox {
    230     records: BTreeMap<GroupOutboxKey, GroupOutboxRecord>,
    231 }
    232 
    233 impl GroupOutbox {
    234     pub fn new() -> Self {
    235         Self::default()
    236     }
    237 
    238     pub fn merge_idempotent(&mut self, record: GroupOutboxRecord) -> Result<bool, GroupError> {
    239         if let Some(existing) = self.records.get(record.key()) {
    240             if existing.payload() == record.payload() {
    241                 return Ok(false);
    242             }
    243             return Err(GroupError::internal(
    244                 "outbox record key already exists with different payload",
    245             ));
    246         }
    247         self.records.insert(record.key().clone(), record);
    248         Ok(true)
    249     }
    250 
    251     pub fn get(&self, key: &GroupOutboxKey) -> Option<&GroupOutboxRecord> {
    252         self.records.get(key)
    253     }
    254 
    255     pub fn update(&mut self, record: GroupOutboxRecord) {
    256         self.records.insert(record.key().clone(), record);
    257     }
    258 
    259     pub fn replay_plan(&self) -> OutboxReplayPlan {
    260         self.replay_plan_matching(|_| true)
    261     }
    262 
    263     pub fn replay_plan_for_group(&self, group_id: &GroupId) -> OutboxReplayPlan {
    264         self.replay_plan_matching(|record| record.key().group_id() == group_id)
    265     }
    266 
    267     fn replay_plan_matching(
    268         &self,
    269         include: impl Fn(&GroupOutboxRecord) -> bool,
    270     ) -> OutboxReplayPlan {
    271         let mut records = self
    272             .records
    273             .values()
    274             .filter(|record| record.is_retryable() && include(record))
    275             .cloned()
    276             .collect::<Vec<_>>();
    277         records.sort_by(|left, right| {
    278             left.key()
    279                 .group_id()
    280                 .cmp(right.key().group_id())
    281                 .then_with(|| {
    282                     left.payload()
    283                         .generated_created_at()
    284                         .cmp(&right.payload().generated_created_at())
    285                 })
    286                 .then_with(|| {
    287                     left.key()
    288                         .source_event_id()
    289                         .cmp(right.key().source_event_id())
    290                 })
    291                 .then_with(|| left.key().effect().cmp(&right.key().effect()))
    292                 .then_with(|| left.key().target_pubkey().cmp(&right.key().target_pubkey()))
    293         });
    294         OutboxReplayPlan { records }
    295     }
    296 }
    297 
    298 #[derive(Debug, Clone, PartialEq, Eq)]
    299 pub struct OutboxReplayPlan {
    300     records: Vec<GroupOutboxRecord>,
    301 }
    302 
    303 impl OutboxReplayPlan {
    304     pub fn records(&self) -> &[GroupOutboxRecord] {
    305         &self.records
    306     }
    307 }
    308 
    309 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
    310 pub enum GroupCrashPoint {
    311     SourceParsedBeforeStore,
    312     SourceStoreBeforeProjection,
    313     ProjectionUpdateBeforeOutboxPersist,
    314     OutboxPersistBeforeGeneratedStore,
    315     GeneratedStoreBeforeOutboxMark,
    316     OutboxMarkBeforeBroadcast,
    317     ProjectionRebuild,
    318 }
    319 
    320 #[derive(Debug, Clone, PartialEq, Eq, Default)]
    321 pub struct GroupCrashHooks {
    322     fail_points: BTreeSet<GroupCrashPoint>,
    323 }
    324 
    325 impl GroupCrashHooks {
    326     pub fn disabled() -> Self {
    327         Self::default()
    328     }
    329 
    330     pub fn failing_at(points: impl IntoIterator<Item = GroupCrashPoint>) -> Self {
    331         Self {
    332             fail_points: points.into_iter().collect(),
    333         }
    334     }
    335 
    336     pub fn check(&self, point: GroupCrashPoint) -> Result<(), GroupError> {
    337         if self.fail_points.contains(&point) {
    338             return Err(GroupError::internal(format!(
    339                 "injected group crash at {point:?}"
    340             )));
    341         }
    342         Ok(())
    343     }
    344 }
    345 
    346 #[derive(Debug, Clone, PartialEq, Eq)]
    347 pub enum OutboxRecoveryReadiness {
    348     Ready,
    349     FailedClosed { reason: String },
    350 }
    351 
    352 impl OutboxRecoveryReadiness {
    353     pub fn from_replay_result<T>(result: &Result<T, GroupError>) -> Self {
    354         match result {
    355             Ok(_) => Self::Ready,
    356             Err(error) => Self::FailedClosed {
    357                 reason: error.prefixed_message(),
    358             },
    359         }
    360     }
    361 }
    362 
    363 #[derive(Debug, Clone, Serialize, Deserialize)]
    364 struct GroupOutboxRecordDocument {
    365     key: GroupOutboxKeyDocument,
    366     status: GroupOutboxStatusDocument,
    367     payload: GroupOutboxPayloadDocument,
    368     attempts: u32,
    369     last_error: Option<String>,
    370 }
    371 
    372 impl GroupOutboxRecordDocument {
    373     fn from_record(record: &GroupOutboxRecord) -> Self {
    374         Self {
    375             key: GroupOutboxKeyDocument::from_key(record.key()),
    376             status: GroupOutboxStatusDocument::from_status(record.status()),
    377             payload: GroupOutboxPayloadDocument::from_payload(record.payload()),
    378             attempts: record.attempts(),
    379             last_error: record.last_error().map(str::to_owned),
    380         }
    381     }
    382 
    383     fn into_record(self) -> Result<GroupOutboxRecord, GroupError> {
    384         Ok(GroupOutboxRecord {
    385             key: self.key.into_key()?,
    386             status: self.status.into_status()?,
    387             payload: self.payload.into_payload(),
    388             attempts: self.attempts,
    389             last_error: self.last_error,
    390         })
    391     }
    392 }
    393 
    394 #[derive(Debug, Clone, Serialize, Deserialize)]
    395 struct GroupOutboxKeyDocument {
    396     source_event_id: String,
    397     effect: String,
    398     group_id: String,
    399     target_pubkey: Option<String>,
    400 }
    401 
    402 impl GroupOutboxKeyDocument {
    403     fn from_key(key: &GroupOutboxKey) -> Self {
    404         Self {
    405             source_event_id: key.source_event_id().as_str().to_owned(),
    406             effect: key.effect().as_str().to_owned(),
    407             group_id: key.group_id().as_str().to_owned(),
    408             target_pubkey: key.target_pubkey().map(|pubkey| pubkey.as_str().to_owned()),
    409         }
    410     }
    411 
    412     fn into_key(self) -> Result<GroupOutboxKey, GroupError> {
    413         Ok(GroupOutboxKey::new(
    414             EventId::new(&self.source_event_id).map_err(GroupError::internal)?,
    415             GroupOutboxEffect::from_label(&self.effect)?,
    416             GroupId::new(&self.group_id)?,
    417             self.target_pubkey
    418                 .as_deref()
    419                 .map(PublicKeyHex::new)
    420                 .transpose()
    421                 .map_err(GroupError::internal)?,
    422         ))
    423     }
    424 }
    425 
    426 #[derive(Debug, Clone, Serialize, Deserialize)]
    427 #[serde(tag = "state")]
    428 enum GroupOutboxStatusDocument {
    429     Pending,
    430     Stored { generated_event_id: String },
    431     Skipped { reason: String },
    432     Failed { retryable: bool },
    433 }
    434 
    435 impl GroupOutboxStatusDocument {
    436     fn from_status(status: &GroupOutboxStatus) -> Self {
    437         match status {
    438             GroupOutboxStatus::Pending => Self::Pending,
    439             GroupOutboxStatus::Stored { generated_event_id } => Self::Stored {
    440                 generated_event_id: generated_event_id.as_str().to_owned(),
    441             },
    442             GroupOutboxStatus::Skipped { reason } => Self::Skipped {
    443                 reason: reason.clone(),
    444             },
    445             GroupOutboxStatus::Failed { retryable } => Self::Failed {
    446                 retryable: *retryable,
    447             },
    448         }
    449     }
    450 
    451     fn into_status(self) -> Result<GroupOutboxStatus, GroupError> {
    452         match self {
    453             Self::Pending => Ok(GroupOutboxStatus::Pending),
    454             Self::Stored { generated_event_id } => Ok(GroupOutboxStatus::Stored {
    455                 generated_event_id: EventId::new(&generated_event_id)
    456                     .map_err(GroupError::internal)?,
    457             }),
    458             Self::Skipped { reason } => Ok(GroupOutboxStatus::Skipped { reason }),
    459             Self::Failed { retryable } => Ok(GroupOutboxStatus::Failed { retryable }),
    460         }
    461     }
    462 }
    463 
    464 #[derive(Debug, Clone, Serialize, Deserialize)]
    465 struct GroupOutboxPayloadDocument {
    466     generated_kind: u32,
    467     generated_created_at: u64,
    468     tags: Vec<Vec<String>>,
    469     content: String,
    470 }
    471 
    472 impl GroupOutboxPayloadDocument {
    473     fn from_payload(payload: &GroupOutboxPayload) -> Self {
    474         Self {
    475             generated_kind: payload.generated_kind(),
    476             generated_created_at: payload.generated_created_at().as_u64(),
    477             tags: payload.tags().to_vec(),
    478             content: payload.content().to_owned(),
    479         }
    480     }
    481 
    482     fn into_payload(self) -> GroupOutboxPayload {
    483         GroupOutboxPayload::new(
    484             self.generated_kind,
    485             UnixTimestamp::new(self.generated_created_at),
    486             self.tags,
    487             self.content,
    488         )
    489     }
    490 }
    491 
    492 #[cfg(test)]
    493 mod tests {
    494     use super::{
    495         GroupCrashHooks, GroupCrashPoint, GroupOutbox, GroupOutboxEffect, GroupOutboxKey,
    496         GroupOutboxPayload, GroupOutboxRecord, GroupOutboxStatus,
    497     };
    498     use crate::GroupId;
    499     use tangle_protocol::{EventId, PublicKeyHex, UnixTimestamp};
    500 
    501     #[test]
    502     fn outbox_keys_are_deterministic() {
    503         let key = key(Some(PublicKeyHex::new(&"2".repeat(64)).expect("pubkey")));
    504 
    505         assert_eq!(
    506             key.storage_key(),
    507             format!(
    508                 "{}\0join_accepted\0Farm\0{}",
    509                 "1".repeat(64),
    510                 "2".repeat(64)
    511             )
    512             .into_bytes()
    513         );
    514     }
    515 
    516     #[test]
    517     fn outbox_merge_is_idempotent_for_same_payload() {
    518         let mut outbox = GroupOutbox::new();
    519         let record = GroupOutboxRecord::pending(key(None), payload(9_000));
    520 
    521         assert!(outbox.merge_idempotent(record.clone()).expect("insert"));
    522         assert!(!outbox.merge_idempotent(record).expect("same"));
    523         assert!(
    524             outbox
    525                 .merge_idempotent(GroupOutboxRecord::pending(key(None), payload(9_001)))
    526                 .is_err()
    527         );
    528     }
    529 
    530     #[test]
    531     fn outbox_merge_preserves_persisted_status_for_same_payload() {
    532         let mut outbox = GroupOutbox::new();
    533         let mut stored = GroupOutboxRecord::pending(key(None), payload(9_000));
    534         let generated_event_id = EventId::new(&"9".repeat(64)).expect("event");
    535         stored.mark_stored(generated_event_id.clone());
    536 
    537         assert!(outbox.merge_idempotent(stored.clone()).expect("stored"));
    538         assert!(
    539             !outbox
    540                 .merge_idempotent(GroupOutboxRecord::pending(key(None), payload(9_000)))
    541                 .expect("derived")
    542         );
    543         assert_eq!(
    544             outbox.get(stored.key()).expect("record").status(),
    545             &GroupOutboxStatus::Stored { generated_event_id }
    546         );
    547     }
    548 
    549     #[test]
    550     fn outbox_replay_plan_is_sorted_and_retryable_only() {
    551         let mut outbox = GroupOutbox::new();
    552         let mut stored = GroupOutboxRecord::pending(key(None), payload(9_000));
    553         stored.mark_stored(EventId::new(&"9".repeat(64)).expect("event"));
    554         let mut retryable = GroupOutboxRecord::pending(
    555             GroupOutboxKey::new(
    556                 EventId::new(&"0".repeat(64)).expect("event"),
    557                 GroupOutboxEffect::MetadataSnapshot,
    558                 GroupId::new("Farm").expect("group"),
    559                 None,
    560             ),
    561             payload(39_000),
    562         );
    563         retryable.mark_failed(true, "store failed");
    564 
    565         outbox.merge_idempotent(stored).expect("stored");
    566         outbox.merge_idempotent(retryable).expect("retryable");
    567         let plan = outbox.replay_plan();
    568 
    569         assert_eq!(plan.records().len(), 1);
    570         assert_eq!(plan.records()[0].payload().generated_kind(), 39_000);
    571         assert_eq!(plan.records()[0].attempts(), 1);
    572     }
    573 
    574     #[test]
    575     fn outbox_replay_plan_orders_retryable_records_by_group_and_source_time() {
    576         let mut outbox = GroupOutbox::new();
    577         let farm_early = replay_record(&"f".repeat(64), "Farm", 1);
    578         let farm_late = replay_record(&"0".repeat(64), "Farm", 2);
    579         let market_early = replay_record(&"1".repeat(64), "Market", 1);
    580 
    581         outbox
    582             .merge_idempotent(market_early.clone())
    583             .expect("market");
    584         outbox
    585             .merge_idempotent(farm_late.clone())
    586             .expect("farm late");
    587         outbox
    588             .merge_idempotent(farm_early.clone())
    589             .expect("farm early");
    590         let plan = outbox.replay_plan();
    591 
    592         assert_eq!(
    593             plan.records()
    594                 .iter()
    595                 .map(|record| record.key().source_event_id())
    596                 .collect::<Vec<_>>(),
    597             vec![
    598                 farm_early.key().source_event_id(),
    599                 farm_late.key().source_event_id(),
    600                 market_early.key().source_event_id()
    601             ]
    602         );
    603     }
    604 
    605     #[test]
    606     fn outbox_replay_plan_can_scope_retryable_records_to_one_group() {
    607         let mut outbox = GroupOutbox::new();
    608         let farm_early = replay_record(&"f".repeat(64), "Farm", 1);
    609         let farm_late = replay_record(&"0".repeat(64), "Farm", 2);
    610         let market_early = replay_record(&"1".repeat(64), "Market", 1);
    611 
    612         outbox
    613             .merge_idempotent(market_early.clone())
    614             .expect("market");
    615         outbox
    616             .merge_idempotent(farm_late.clone())
    617             .expect("farm late");
    618         outbox
    619             .merge_idempotent(farm_early.clone())
    620             .expect("farm early");
    621         let plan = outbox.replay_plan_for_group(&GroupId::new("Farm").expect("group"));
    622 
    623         assert_eq!(
    624             plan.records()
    625                 .iter()
    626                 .map(|record| record.key().source_event_id())
    627                 .collect::<Vec<_>>(),
    628             vec![
    629                 farm_early.key().source_event_id(),
    630                 farm_late.key().source_event_id()
    631             ]
    632         );
    633     }
    634 
    635     #[test]
    636     fn outbox_records_round_trip_for_persistence() {
    637         let mut record = GroupOutboxRecord::pending(key(None), payload(39_000));
    638         record.mark_failed(true, "pending retry");
    639 
    640         let decoded = GroupOutboxRecord::from_json_bytes(&record.to_json_bytes().expect("bytes"))
    641             .expect("record");
    642         assert_eq!(decoded.payload().generated_kind(), 39_000);
    643         assert_eq!(
    644             decoded.payload().generated_created_at(),
    645             UnixTimestamp::new(1)
    646         );
    647         assert_eq!(
    648             decoded.payload().tags(),
    649             &[vec!["h".to_owned(), "Farm".to_owned()]]
    650         );
    651         assert_eq!(decoded.payload().content(), "");
    652         assert_eq!(decoded, record);
    653     }
    654 
    655     #[test]
    656     fn crash_hooks_fail_only_at_configured_points() {
    657         let hooks =
    658             GroupCrashHooks::failing_at([GroupCrashPoint::OutboxPersistBeforeGeneratedStore]);
    659 
    660         assert!(
    661             hooks
    662                 .check(GroupCrashPoint::GeneratedStoreBeforeOutboxMark)
    663                 .is_ok()
    664         );
    665         assert_eq!(
    666             hooks
    667                 .check(GroupCrashPoint::OutboxPersistBeforeGeneratedStore)
    668                 .expect_err("injected")
    669                 .prefixed_message(),
    670             "error: injected group crash at OutboxPersistBeforeGeneratedStore"
    671         );
    672     }
    673 
    674     fn key(target_pubkey: Option<PublicKeyHex>) -> GroupOutboxKey {
    675         GroupOutboxKey::new(
    676             EventId::new(&"1".repeat(64)).expect("event"),
    677             GroupOutboxEffect::JoinAccepted,
    678             GroupId::new("Farm").expect("group"),
    679             target_pubkey,
    680         )
    681     }
    682 
    683     fn payload(kind: u32) -> GroupOutboxPayload {
    684         GroupOutboxPayload::new(
    685             kind,
    686             UnixTimestamp::new(1),
    687             vec![vec!["h".to_owned(), "Farm".to_owned()]],
    688             "",
    689         )
    690     }
    691 
    692     fn replay_record(source_event_id: &str, group_id: &str, created_at: u64) -> GroupOutboxRecord {
    693         let group_id = GroupId::new(group_id).expect("group");
    694         GroupOutboxRecord::pending(
    695             GroupOutboxKey::new(
    696                 EventId::new(source_event_id).expect("event"),
    697                 GroupOutboxEffect::MetadataSnapshot,
    698                 group_id.clone(),
    699                 None,
    700             ),
    701             GroupOutboxPayload::new(
    702                 39_000,
    703                 UnixTimestamp::new(created_at),
    704                 vec![vec!["h".to_owned(), group_id.as_str().to_owned()]],
    705                 "",
    706             ),
    707         )
    708     }
    709 }