myc

Self-custodial remote signer for Radroots apps
git clone https://radroots.dev/git/myc.git
Log | Files | Refs | README | LICENSE

outbox.rs (11702B)


      1 use std::fmt;
      2 use std::str::FromStr;
      3 use std::time::{SystemTime, UNIX_EPOCH};
      4 
      5 use radroots_nostr::prelude::{RadrootsNostrEvent, RadrootsNostrRelayUrl};
      6 use radroots_nostr_signer::prelude::{
      7     RadrootsNostrSignerConnectionId, RadrootsNostrSignerWorkflowId,
      8 };
      9 use serde::{Deserialize, Serialize};
     10 use uuid::Uuid;
     11 
     12 use crate::error::MycError;
     13 
     14 #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
     15 pub struct MycDeliveryOutboxJobId(String);
     16 
     17 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
     18 #[serde(rename_all = "snake_case")]
     19 pub enum MycDeliveryOutboxKind {
     20     ListenerResponsePublish,
     21     ConnectAcceptPublish,
     22     AuthReplayPublish,
     23     DiscoveryHandlerPublish,
     24 }
     25 
     26 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
     27 #[serde(rename_all = "snake_case")]
     28 pub enum MycDeliveryOutboxStatus {
     29     Queued,
     30     PublishedPendingFinalize,
     31     Finalized,
     32     Failed,
     33 }
     34 
     35 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
     36 pub struct MycDeliveryOutboxRecord {
     37     pub job_id: MycDeliveryOutboxJobId,
     38     pub kind: MycDeliveryOutboxKind,
     39     pub status: MycDeliveryOutboxStatus,
     40     pub event: RadrootsNostrEvent,
     41     pub relay_urls: Vec<RadrootsNostrRelayUrl>,
     42     #[serde(default, skip_serializing_if = "Option::is_none")]
     43     pub connection_id: Option<RadrootsNostrSignerConnectionId>,
     44     #[serde(default, skip_serializing_if = "Option::is_none")]
     45     pub request_id: Option<String>,
     46     #[serde(default, skip_serializing_if = "Option::is_none")]
     47     pub attempt_id: Option<String>,
     48     #[serde(default, skip_serializing_if = "Option::is_none")]
     49     pub signer_publish_workflow_id: Option<RadrootsNostrSignerWorkflowId>,
     50     pub publish_attempt_count: usize,
     51     #[serde(default, skip_serializing_if = "Option::is_none")]
     52     pub last_error: Option<String>,
     53     pub created_at_unix: u64,
     54     pub updated_at_unix: u64,
     55     #[serde(default, skip_serializing_if = "Option::is_none")]
     56     pub published_at_unix: Option<u64>,
     57     #[serde(default, skip_serializing_if = "Option::is_none")]
     58     pub finalized_at_unix: Option<u64>,
     59 }
     60 
     61 pub trait MycDeliveryOutboxStore: Send + Sync {
     62     fn enqueue(&self, record: &MycDeliveryOutboxRecord) -> Result<(), MycError>;
     63     fn get(
     64         &self,
     65         job_id: &MycDeliveryOutboxJobId,
     66     ) -> Result<Option<MycDeliveryOutboxRecord>, MycError>;
     67     fn list_all(&self) -> Result<Vec<MycDeliveryOutboxRecord>, MycError>;
     68     fn list_by_status(
     69         &self,
     70         status: MycDeliveryOutboxStatus,
     71     ) -> Result<Vec<MycDeliveryOutboxRecord>, MycError>;
     72     fn mark_published_pending_finalize(
     73         &self,
     74         job_id: &MycDeliveryOutboxJobId,
     75         publish_attempt_count: usize,
     76     ) -> Result<MycDeliveryOutboxRecord, MycError>;
     77     fn mark_failed(
     78         &self,
     79         job_id: &MycDeliveryOutboxJobId,
     80         publish_attempt_count: usize,
     81         error: &str,
     82     ) -> Result<MycDeliveryOutboxRecord, MycError>;
     83     fn mark_finalized(
     84         &self,
     85         job_id: &MycDeliveryOutboxJobId,
     86     ) -> Result<MycDeliveryOutboxRecord, MycError>;
     87 }
     88 
     89 impl MycDeliveryOutboxJobId {
     90     pub fn new_v7() -> Self {
     91         Self(Uuid::now_v7().to_string())
     92     }
     93 
     94     pub fn parse(value: &str) -> Result<Self, MycError> {
     95         let trimmed = value.trim();
     96         if trimmed.is_empty() {
     97             return Err(MycError::InvalidDeliveryOutboxJobId(value.to_owned()));
     98         }
     99         Ok(Self(trimmed.to_owned()))
    100     }
    101 
    102     pub fn as_str(&self) -> &str {
    103         self.0.as_str()
    104     }
    105 }
    106 
    107 impl fmt::Display for MycDeliveryOutboxJobId {
    108     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
    109         f.write_str(self.as_str())
    110     }
    111 }
    112 
    113 impl AsRef<str> for MycDeliveryOutboxJobId {
    114     fn as_ref(&self) -> &str {
    115         self.as_str()
    116     }
    117 }
    118 
    119 impl FromStr for MycDeliveryOutboxJobId {
    120     type Err = MycError;
    121 
    122     fn from_str(value: &str) -> Result<Self, Self::Err> {
    123         Self::parse(value)
    124     }
    125 }
    126 
    127 impl MycDeliveryOutboxRecord {
    128     pub fn new(
    129         kind: MycDeliveryOutboxKind,
    130         event: RadrootsNostrEvent,
    131         relay_urls: Vec<RadrootsNostrRelayUrl>,
    132     ) -> Result<Self, MycError> {
    133         if relay_urls.is_empty() {
    134             return Err(MycError::InvalidOperation(
    135                 "delivery outbox job requires at least one relay".to_owned(),
    136             ));
    137         }
    138         let created_at_unix = now_unix_secs();
    139         Ok(Self {
    140             job_id: MycDeliveryOutboxJobId::new_v7(),
    141             kind,
    142             status: MycDeliveryOutboxStatus::Queued,
    143             event,
    144             relay_urls,
    145             connection_id: None,
    146             request_id: None,
    147             attempt_id: None,
    148             signer_publish_workflow_id: None,
    149             publish_attempt_count: 0,
    150             last_error: None,
    151             created_at_unix,
    152             updated_at_unix: created_at_unix,
    153             published_at_unix: None,
    154             finalized_at_unix: None,
    155         })
    156     }
    157 
    158     pub fn with_connection_id(mut self, connection_id: &RadrootsNostrSignerConnectionId) -> Self {
    159         self.connection_id = Some(connection_id.clone());
    160         self
    161     }
    162 
    163     pub fn with_request_id(mut self, request_id: impl Into<String>) -> Self {
    164         self.request_id = Some(request_id.into());
    165         self
    166     }
    167 
    168     pub fn with_attempt_id(mut self, attempt_id: impl Into<String>) -> Self {
    169         self.attempt_id = Some(attempt_id.into());
    170         self
    171     }
    172 
    173     pub fn with_signer_publish_workflow_id(
    174         mut self,
    175         workflow_id: &RadrootsNostrSignerWorkflowId,
    176     ) -> Self {
    177         self.signer_publish_workflow_id = Some(workflow_id.clone());
    178         self
    179     }
    180 
    181     pub fn mark_published_pending_finalize(
    182         &mut self,
    183         publish_attempt_count: usize,
    184         updated_at_unix: u64,
    185     ) -> Result<(), MycError> {
    186         match self.status {
    187             MycDeliveryOutboxStatus::Queued | MycDeliveryOutboxStatus::Failed => {
    188                 self.status = MycDeliveryOutboxStatus::PublishedPendingFinalize;
    189                 self.publish_attempt_count = publish_attempt_count;
    190                 self.last_error = None;
    191                 self.published_at_unix = Some(updated_at_unix);
    192                 self.updated_at_unix = updated_at_unix;
    193                 Ok(())
    194             }
    195             MycDeliveryOutboxStatus::PublishedPendingFinalize => Ok(()),
    196             MycDeliveryOutboxStatus::Finalized => Err(MycError::InvalidOperation(
    197                 "cannot mark a finalized delivery outbox job as published".to_owned(),
    198             )),
    199         }
    200     }
    201 
    202     pub fn mark_failed(
    203         &mut self,
    204         publish_attempt_count: usize,
    205         error: impl AsRef<str>,
    206         updated_at_unix: u64,
    207     ) -> Result<(), MycError> {
    208         if self.status == MycDeliveryOutboxStatus::Finalized {
    209             return Err(MycError::InvalidOperation(
    210                 "cannot fail a finalized delivery outbox job".to_owned(),
    211             ));
    212         }
    213         let error = error.as_ref().trim();
    214         if error.is_empty() {
    215             return Err(MycError::InvalidOperation(
    216                 "delivery outbox failure reason must not be empty".to_owned(),
    217             ));
    218         }
    219 
    220         self.status = MycDeliveryOutboxStatus::Failed;
    221         self.publish_attempt_count = publish_attempt_count;
    222         self.last_error = Some(error.to_owned());
    223         self.updated_at_unix = updated_at_unix;
    224         Ok(())
    225     }
    226 
    227     pub fn mark_finalized(&mut self, updated_at_unix: u64) -> Result<(), MycError> {
    228         if self.status != MycDeliveryOutboxStatus::PublishedPendingFinalize {
    229             return Err(MycError::InvalidOperation(
    230                 "cannot finalize a delivery outbox job before publish confirmation".to_owned(),
    231             ));
    232         }
    233 
    234         self.status = MycDeliveryOutboxStatus::Finalized;
    235         self.finalized_at_unix = Some(updated_at_unix);
    236         self.updated_at_unix = updated_at_unix;
    237         Ok(())
    238     }
    239 }
    240 
    241 pub(crate) fn now_unix_secs() -> u64 {
    242     SystemTime::now()
    243         .duration_since(UNIX_EPOCH)
    244         .map(|duration| duration.as_secs())
    245         .unwrap_or(0)
    246 }
    247 
    248 #[cfg(test)]
    249 mod tests {
    250     use radroots_identity::RadrootsIdentity;
    251     use radroots_nostr::prelude::{RadrootsNostrEventBuilder, RadrootsNostrKind};
    252     use radroots_nostr_signer::prelude::{
    253         RadrootsNostrSignerConnectionId, RadrootsNostrSignerWorkflowId,
    254     };
    255 
    256     use super::{
    257         MycDeliveryOutboxJobId, MycDeliveryOutboxKind, MycDeliveryOutboxRecord,
    258         MycDeliveryOutboxStatus,
    259     };
    260 
    261     fn signed_event() -> nostr::Event {
    262         let identity = RadrootsIdentity::from_secret_key_str(
    263             "1111111111111111111111111111111111111111111111111111111111111111",
    264         )
    265         .expect("identity");
    266         RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(24133), "hello")
    267             .sign_with_keys(identity.keys())
    268             .expect("sign event")
    269     }
    270 
    271     #[test]
    272     fn delivery_outbox_job_ids_parse_and_display() {
    273         let job_id = MycDeliveryOutboxJobId::parse("job-1").expect("job id");
    274         assert_eq!(job_id.as_str(), "job-1");
    275         assert_eq!(job_id.to_string(), "job-1");
    276         assert_eq!(job_id.as_ref(), "job-1");
    277         assert!(MycDeliveryOutboxJobId::parse("   ").is_err());
    278         assert!(!MycDeliveryOutboxJobId::new_v7().as_str().is_empty());
    279     }
    280 
    281     #[test]
    282     fn delivery_outbox_record_covers_state_transitions() {
    283         let connection_id = RadrootsNostrSignerConnectionId::parse("conn-outbox").expect("id");
    284         let workflow_id = RadrootsNostrSignerWorkflowId::parse("wf-outbox").expect("id");
    285         let mut record = MycDeliveryOutboxRecord::new(
    286             MycDeliveryOutboxKind::AuthReplayPublish,
    287             signed_event(),
    288             vec!["wss://relay.example.com".parse().expect("relay")],
    289         )
    290         .expect("record")
    291         .with_connection_id(&connection_id)
    292         .with_request_id("req-1")
    293         .with_attempt_id("attempt-1")
    294         .with_signer_publish_workflow_id(&workflow_id);
    295 
    296         assert_eq!(record.status, MycDeliveryOutboxStatus::Queued);
    297         assert_eq!(record.connection_id.as_ref(), Some(&connection_id));
    298         assert_eq!(record.request_id.as_deref(), Some("req-1"));
    299         assert_eq!(record.attempt_id.as_deref(), Some("attempt-1"));
    300         assert_eq!(
    301             record.signer_publish_workflow_id.as_ref(),
    302             Some(&workflow_id)
    303         );
    304 
    305         record
    306             .mark_published_pending_finalize(1, 100)
    307             .expect("mark published");
    308         assert_eq!(
    309             record.status,
    310             MycDeliveryOutboxStatus::PublishedPendingFinalize
    311         );
    312         assert_eq!(record.publish_attempt_count, 1);
    313         assert_eq!(record.published_at_unix, Some(100));
    314 
    315         record
    316             .mark_failed(2, "relay rejected", 101)
    317             .expect("mark failed");
    318         assert_eq!(record.status, MycDeliveryOutboxStatus::Failed);
    319         assert_eq!(record.last_error.as_deref(), Some("relay rejected"));
    320 
    321         record
    322             .mark_published_pending_finalize(3, 102)
    323             .expect("republish");
    324         record.mark_finalized(103).expect("finalize");
    325         assert_eq!(record.status, MycDeliveryOutboxStatus::Finalized);
    326         assert_eq!(record.finalized_at_unix, Some(103));
    327         assert!(record.mark_failed(4, "late failure", 104).is_err());
    328     }
    329 
    330     #[test]
    331     fn delivery_outbox_record_requires_relays() {
    332         let err = MycDeliveryOutboxRecord::new(
    333             MycDeliveryOutboxKind::ListenerResponsePublish,
    334             signed_event(),
    335             Vec::new(),
    336         )
    337         .expect_err("missing relays");
    338         assert!(
    339             err.to_string()
    340                 .contains("delivery outbox job requires at least one relay")
    341         );
    342     }
    343 }