outbox.rs (11702B)
1 use std::fmt; 2 use std::str::FromStr; 3 use std::time::{SystemTime, UNIX_EPOCH}; 4 5 use radroots_nostr::prelude::{RadrootsNostrEvent, RadrootsNostrRelayUrl}; 6 use radroots_nostr_signer::prelude::{ 7 RadrootsNostrSignerConnectionId, RadrootsNostrSignerWorkflowId, 8 }; 9 use serde::{Deserialize, Serialize}; 10 use uuid::Uuid; 11 12 use crate::error::MycError; 13 14 #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] 15 pub struct MycDeliveryOutboxJobId(String); 16 17 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] 18 #[serde(rename_all = "snake_case")] 19 pub enum MycDeliveryOutboxKind { 20 ListenerResponsePublish, 21 ConnectAcceptPublish, 22 AuthReplayPublish, 23 DiscoveryHandlerPublish, 24 } 25 26 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] 27 #[serde(rename_all = "snake_case")] 28 pub enum MycDeliveryOutboxStatus { 29 Queued, 30 PublishedPendingFinalize, 31 Finalized, 32 Failed, 33 } 34 35 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 36 pub struct MycDeliveryOutboxRecord { 37 pub job_id: MycDeliveryOutboxJobId, 38 pub kind: MycDeliveryOutboxKind, 39 pub status: MycDeliveryOutboxStatus, 40 pub event: RadrootsNostrEvent, 41 pub relay_urls: Vec<RadrootsNostrRelayUrl>, 42 #[serde(default, skip_serializing_if = "Option::is_none")] 43 pub connection_id: Option<RadrootsNostrSignerConnectionId>, 44 #[serde(default, skip_serializing_if = "Option::is_none")] 45 pub request_id: Option<String>, 46 #[serde(default, skip_serializing_if = "Option::is_none")] 47 pub attempt_id: Option<String>, 48 #[serde(default, skip_serializing_if = "Option::is_none")] 49 pub signer_publish_workflow_id: Option<RadrootsNostrSignerWorkflowId>, 50 pub publish_attempt_count: usize, 51 #[serde(default, skip_serializing_if = "Option::is_none")] 52 pub last_error: Option<String>, 53 pub created_at_unix: u64, 54 pub updated_at_unix: u64, 55 #[serde(default, skip_serializing_if = "Option::is_none")] 56 pub published_at_unix: Option<u64>, 57 #[serde(default, skip_serializing_if = "Option::is_none")] 58 pub finalized_at_unix: Option<u64>, 59 } 60 61 pub trait MycDeliveryOutboxStore: Send + Sync { 62 fn enqueue(&self, record: &MycDeliveryOutboxRecord) -> Result<(), MycError>; 63 fn get( 64 &self, 65 job_id: &MycDeliveryOutboxJobId, 66 ) -> Result<Option<MycDeliveryOutboxRecord>, MycError>; 67 fn list_all(&self) -> Result<Vec<MycDeliveryOutboxRecord>, MycError>; 68 fn list_by_status( 69 &self, 70 status: MycDeliveryOutboxStatus, 71 ) -> Result<Vec<MycDeliveryOutboxRecord>, MycError>; 72 fn mark_published_pending_finalize( 73 &self, 74 job_id: &MycDeliveryOutboxJobId, 75 publish_attempt_count: usize, 76 ) -> Result<MycDeliveryOutboxRecord, MycError>; 77 fn mark_failed( 78 &self, 79 job_id: &MycDeliveryOutboxJobId, 80 publish_attempt_count: usize, 81 error: &str, 82 ) -> Result<MycDeliveryOutboxRecord, MycError>; 83 fn mark_finalized( 84 &self, 85 job_id: &MycDeliveryOutboxJobId, 86 ) -> Result<MycDeliveryOutboxRecord, MycError>; 87 } 88 89 impl MycDeliveryOutboxJobId { 90 pub fn new_v7() -> Self { 91 Self(Uuid::now_v7().to_string()) 92 } 93 94 pub fn parse(value: &str) -> Result<Self, MycError> { 95 let trimmed = value.trim(); 96 if trimmed.is_empty() { 97 return Err(MycError::InvalidDeliveryOutboxJobId(value.to_owned())); 98 } 99 Ok(Self(trimmed.to_owned())) 100 } 101 102 pub fn as_str(&self) -> &str { 103 self.0.as_str() 104 } 105 } 106 107 impl fmt::Display for MycDeliveryOutboxJobId { 108 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 109 f.write_str(self.as_str()) 110 } 111 } 112 113 impl AsRef<str> for MycDeliveryOutboxJobId { 114 fn as_ref(&self) -> &str { 115 self.as_str() 116 } 117 } 118 119 impl FromStr for MycDeliveryOutboxJobId { 120 type Err = MycError; 121 122 fn from_str(value: &str) -> Result<Self, Self::Err> { 123 Self::parse(value) 124 } 125 } 126 127 impl MycDeliveryOutboxRecord { 128 pub fn new( 129 kind: MycDeliveryOutboxKind, 130 event: RadrootsNostrEvent, 131 relay_urls: Vec<RadrootsNostrRelayUrl>, 132 ) -> Result<Self, MycError> { 133 if relay_urls.is_empty() { 134 return Err(MycError::InvalidOperation( 135 "delivery outbox job requires at least one relay".to_owned(), 136 )); 137 } 138 let created_at_unix = now_unix_secs(); 139 Ok(Self { 140 job_id: MycDeliveryOutboxJobId::new_v7(), 141 kind, 142 status: MycDeliveryOutboxStatus::Queued, 143 event, 144 relay_urls, 145 connection_id: None, 146 request_id: None, 147 attempt_id: None, 148 signer_publish_workflow_id: None, 149 publish_attempt_count: 0, 150 last_error: None, 151 created_at_unix, 152 updated_at_unix: created_at_unix, 153 published_at_unix: None, 154 finalized_at_unix: None, 155 }) 156 } 157 158 pub fn with_connection_id(mut self, connection_id: &RadrootsNostrSignerConnectionId) -> Self { 159 self.connection_id = Some(connection_id.clone()); 160 self 161 } 162 163 pub fn with_request_id(mut self, request_id: impl Into<String>) -> Self { 164 self.request_id = Some(request_id.into()); 165 self 166 } 167 168 pub fn with_attempt_id(mut self, attempt_id: impl Into<String>) -> Self { 169 self.attempt_id = Some(attempt_id.into()); 170 self 171 } 172 173 pub fn with_signer_publish_workflow_id( 174 mut self, 175 workflow_id: &RadrootsNostrSignerWorkflowId, 176 ) -> Self { 177 self.signer_publish_workflow_id = Some(workflow_id.clone()); 178 self 179 } 180 181 pub fn mark_published_pending_finalize( 182 &mut self, 183 publish_attempt_count: usize, 184 updated_at_unix: u64, 185 ) -> Result<(), MycError> { 186 match self.status { 187 MycDeliveryOutboxStatus::Queued | MycDeliveryOutboxStatus::Failed => { 188 self.status = MycDeliveryOutboxStatus::PublishedPendingFinalize; 189 self.publish_attempt_count = publish_attempt_count; 190 self.last_error = None; 191 self.published_at_unix = Some(updated_at_unix); 192 self.updated_at_unix = updated_at_unix; 193 Ok(()) 194 } 195 MycDeliveryOutboxStatus::PublishedPendingFinalize => Ok(()), 196 MycDeliveryOutboxStatus::Finalized => Err(MycError::InvalidOperation( 197 "cannot mark a finalized delivery outbox job as published".to_owned(), 198 )), 199 } 200 } 201 202 pub fn mark_failed( 203 &mut self, 204 publish_attempt_count: usize, 205 error: impl AsRef<str>, 206 updated_at_unix: u64, 207 ) -> Result<(), MycError> { 208 if self.status == MycDeliveryOutboxStatus::Finalized { 209 return Err(MycError::InvalidOperation( 210 "cannot fail a finalized delivery outbox job".to_owned(), 211 )); 212 } 213 let error = error.as_ref().trim(); 214 if error.is_empty() { 215 return Err(MycError::InvalidOperation( 216 "delivery outbox failure reason must not be empty".to_owned(), 217 )); 218 } 219 220 self.status = MycDeliveryOutboxStatus::Failed; 221 self.publish_attempt_count = publish_attempt_count; 222 self.last_error = Some(error.to_owned()); 223 self.updated_at_unix = updated_at_unix; 224 Ok(()) 225 } 226 227 pub fn mark_finalized(&mut self, updated_at_unix: u64) -> Result<(), MycError> { 228 if self.status != MycDeliveryOutboxStatus::PublishedPendingFinalize { 229 return Err(MycError::InvalidOperation( 230 "cannot finalize a delivery outbox job before publish confirmation".to_owned(), 231 )); 232 } 233 234 self.status = MycDeliveryOutboxStatus::Finalized; 235 self.finalized_at_unix = Some(updated_at_unix); 236 self.updated_at_unix = updated_at_unix; 237 Ok(()) 238 } 239 } 240 241 pub(crate) fn now_unix_secs() -> u64 { 242 SystemTime::now() 243 .duration_since(UNIX_EPOCH) 244 .map(|duration| duration.as_secs()) 245 .unwrap_or(0) 246 } 247 248 #[cfg(test)] 249 mod tests { 250 use radroots_identity::RadrootsIdentity; 251 use radroots_nostr::prelude::{RadrootsNostrEventBuilder, RadrootsNostrKind}; 252 use radroots_nostr_signer::prelude::{ 253 RadrootsNostrSignerConnectionId, RadrootsNostrSignerWorkflowId, 254 }; 255 256 use super::{ 257 MycDeliveryOutboxJobId, MycDeliveryOutboxKind, MycDeliveryOutboxRecord, 258 MycDeliveryOutboxStatus, 259 }; 260 261 fn signed_event() -> nostr::Event { 262 let identity = RadrootsIdentity::from_secret_key_str( 263 "1111111111111111111111111111111111111111111111111111111111111111", 264 ) 265 .expect("identity"); 266 RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(24133), "hello") 267 .sign_with_keys(identity.keys()) 268 .expect("sign event") 269 } 270 271 #[test] 272 fn delivery_outbox_job_ids_parse_and_display() { 273 let job_id = MycDeliveryOutboxJobId::parse("job-1").expect("job id"); 274 assert_eq!(job_id.as_str(), "job-1"); 275 assert_eq!(job_id.to_string(), "job-1"); 276 assert_eq!(job_id.as_ref(), "job-1"); 277 assert!(MycDeliveryOutboxJobId::parse(" ").is_err()); 278 assert!(!MycDeliveryOutboxJobId::new_v7().as_str().is_empty()); 279 } 280 281 #[test] 282 fn delivery_outbox_record_covers_state_transitions() { 283 let connection_id = RadrootsNostrSignerConnectionId::parse("conn-outbox").expect("id"); 284 let workflow_id = RadrootsNostrSignerWorkflowId::parse("wf-outbox").expect("id"); 285 let mut record = MycDeliveryOutboxRecord::new( 286 MycDeliveryOutboxKind::AuthReplayPublish, 287 signed_event(), 288 vec!["wss://relay.example.com".parse().expect("relay")], 289 ) 290 .expect("record") 291 .with_connection_id(&connection_id) 292 .with_request_id("req-1") 293 .with_attempt_id("attempt-1") 294 .with_signer_publish_workflow_id(&workflow_id); 295 296 assert_eq!(record.status, MycDeliveryOutboxStatus::Queued); 297 assert_eq!(record.connection_id.as_ref(), Some(&connection_id)); 298 assert_eq!(record.request_id.as_deref(), Some("req-1")); 299 assert_eq!(record.attempt_id.as_deref(), Some("attempt-1")); 300 assert_eq!( 301 record.signer_publish_workflow_id.as_ref(), 302 Some(&workflow_id) 303 ); 304 305 record 306 .mark_published_pending_finalize(1, 100) 307 .expect("mark published"); 308 assert_eq!( 309 record.status, 310 MycDeliveryOutboxStatus::PublishedPendingFinalize 311 ); 312 assert_eq!(record.publish_attempt_count, 1); 313 assert_eq!(record.published_at_unix, Some(100)); 314 315 record 316 .mark_failed(2, "relay rejected", 101) 317 .expect("mark failed"); 318 assert_eq!(record.status, MycDeliveryOutboxStatus::Failed); 319 assert_eq!(record.last_error.as_deref(), Some("relay rejected")); 320 321 record 322 .mark_published_pending_finalize(3, 102) 323 .expect("republish"); 324 record.mark_finalized(103).expect("finalize"); 325 assert_eq!(record.status, MycDeliveryOutboxStatus::Finalized); 326 assert_eq!(record.finalized_at_unix, Some(103)); 327 assert!(record.mark_failed(4, "late failure", 104).is_err()); 328 } 329 330 #[test] 331 fn delivery_outbox_record_requires_relays() { 332 let err = MycDeliveryOutboxRecord::new( 333 MycDeliveryOutboxKind::ListenerResponsePublish, 334 signed_event(), 335 Vec::new(), 336 ) 337 .expect_err("missing relays"); 338 assert!( 339 err.to_string() 340 .contains("delivery outbox job requires at least one relay") 341 ); 342 } 343 }