outbox_sqlite.rs (20875B)
1 use std::path::{Path, PathBuf}; 2 3 use radroots_sql_core::migrations::{Migration, migrations_run_all_up}; 4 use radroots_sql_core::{SqlExecutor, SqliteExecutor}; 5 use serde::Deserialize; 6 use serde::de::DeserializeOwned; 7 use serde_json::{Value, json}; 8 9 use crate::error::MycError; 10 use crate::outbox::{ 11 MycDeliveryOutboxJobId, MycDeliveryOutboxKind, MycDeliveryOutboxRecord, 12 MycDeliveryOutboxStatus, MycDeliveryOutboxStore, now_unix_secs, 13 }; 14 15 const MYC_DELIVERY_OUTBOX_SQLITE_FILE_NAME: &str = "delivery-outbox.sqlite"; 16 #[cfg(test)] 17 const MYC_DELIVERY_OUTBOX_MEMORY_PATH: &str = ":memory:"; 18 19 static MYC_DELIVERY_OUTBOX_MIGRATIONS: &[Migration] = &[Migration { 20 name: "0000_delivery_outbox_init", 21 up_sql: include_str!("../migrations/0000_delivery_outbox_init.up.sql"), 22 down_sql: include_str!("../migrations/0000_delivery_outbox_init.down.sql"), 23 }]; 24 25 /// Myc keeps its delivery outbox store local to the service boundary. 26 pub struct MycSqliteDeliveryOutboxStore { 27 db: MycDeliveryOutboxSqliteDb, 28 } 29 30 struct MycDeliveryOutboxSqliteDb { 31 path: PathBuf, 32 executor: SqliteExecutor, 33 file_backed: bool, 34 } 35 36 #[derive(Debug, Deserialize)] 37 struct MycDeliveryOutboxRow { 38 job_id: String, 39 kind: String, 40 status: String, 41 event_json: String, 42 relay_urls_json: String, 43 connection_id: Option<String>, 44 request_id: Option<String>, 45 attempt_id: Option<String>, 46 signer_publish_workflow_id: Option<String>, 47 publish_attempt_count: i64, 48 last_error: Option<String>, 49 created_at_unix: u64, 50 updated_at_unix: u64, 51 published_at_unix: Option<u64>, 52 finalized_at_unix: Option<u64>, 53 } 54 55 impl MycSqliteDeliveryOutboxStore { 56 pub fn open(state_dir: impl AsRef<Path>) -> Result<Self, MycError> { 57 let db = MycDeliveryOutboxSqliteDb::open( 58 state_dir 59 .as_ref() 60 .join(MYC_DELIVERY_OUTBOX_SQLITE_FILE_NAME), 61 )?; 62 Ok(Self { db }) 63 } 64 65 #[cfg(test)] 66 pub fn open_memory() -> Result<Self, MycError> { 67 Ok(Self { 68 db: MycDeliveryOutboxSqliteDb::open_memory()?, 69 }) 70 } 71 72 pub fn path(&self) -> &Path { 73 self.db.path() 74 } 75 76 fn update_record( 77 &self, 78 job_id: &MycDeliveryOutboxJobId, 79 update: impl FnOnce(&mut MycDeliveryOutboxRecord) -> Result<(), MycError>, 80 ) -> Result<MycDeliveryOutboxRecord, MycError> { 81 let mut record = self 82 .get(job_id)? 83 .ok_or_else(|| MycError::DeliveryOutboxJobNotFound(job_id.to_string()))?; 84 update(&mut record)?; 85 exec_json( 86 self.db.path(), 87 self.db.executor(), 88 "UPDATE myc_delivery_outbox SET kind = ?, status = ?, event_json = ?, relay_urls_json = ?, connection_id = ?, request_id = ?, attempt_id = ?, signer_publish_workflow_id = ?, publish_attempt_count = ?, last_error = ?, created_at_unix = ?, updated_at_unix = ?, published_at_unix = ?, finalized_at_unix = ? WHERE job_id = ?", 89 serialize_record_update_params(self.db.path(), &record, job_id.as_str())?, 90 )?; 91 Ok(record) 92 } 93 } 94 95 impl MycDeliveryOutboxStore for MycSqliteDeliveryOutboxStore { 96 fn enqueue(&self, record: &MycDeliveryOutboxRecord) -> Result<(), MycError> { 97 exec_json( 98 self.db.path(), 99 self.db.executor(), 100 "INSERT INTO myc_delivery_outbox(job_id, kind, status, event_json, relay_urls_json, connection_id, request_id, attempt_id, signer_publish_workflow_id, publish_attempt_count, last_error, created_at_unix, updated_at_unix, published_at_unix, finalized_at_unix) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", 101 serialize_record_params(self.db.path(), record)?, 102 ) 103 } 104 105 fn get( 106 &self, 107 job_id: &MycDeliveryOutboxJobId, 108 ) -> Result<Option<MycDeliveryOutboxRecord>, MycError> { 109 let rows: Vec<MycDeliveryOutboxRow> = query_rows( 110 self.db.path(), 111 self.db.executor(), 112 "SELECT job_id, kind, status, event_json, relay_urls_json, connection_id, request_id, attempt_id, signer_publish_workflow_id, publish_attempt_count, last_error, created_at_unix, updated_at_unix, published_at_unix, finalized_at_unix FROM myc_delivery_outbox WHERE job_id = ? LIMIT 1", 113 json!([job_id.as_str()]), 114 )?; 115 rows.into_iter() 116 .next() 117 .map(|row| row.into_record(self.db.path())) 118 .transpose() 119 } 120 121 fn list_all(&self) -> Result<Vec<MycDeliveryOutboxRecord>, MycError> { 122 let rows: Vec<MycDeliveryOutboxRow> = query_rows( 123 self.db.path(), 124 self.db.executor(), 125 "SELECT job_id, kind, status, event_json, relay_urls_json, connection_id, request_id, attempt_id, signer_publish_workflow_id, publish_attempt_count, last_error, created_at_unix, updated_at_unix, published_at_unix, finalized_at_unix FROM myc_delivery_outbox ORDER BY created_at_unix ASC, job_id ASC", 126 json!([]), 127 )?; 128 rows.into_iter() 129 .map(|row| row.into_record(self.db.path())) 130 .collect() 131 } 132 133 fn list_by_status( 134 &self, 135 status: MycDeliveryOutboxStatus, 136 ) -> Result<Vec<MycDeliveryOutboxRecord>, MycError> { 137 let rows: Vec<MycDeliveryOutboxRow> = query_rows( 138 self.db.path(), 139 self.db.executor(), 140 "SELECT job_id, kind, status, event_json, relay_urls_json, connection_id, request_id, attempt_id, signer_publish_workflow_id, publish_attempt_count, last_error, created_at_unix, updated_at_unix, published_at_unix, finalized_at_unix FROM myc_delivery_outbox WHERE status = ? ORDER BY created_at_unix ASC, job_id ASC", 141 json!([status_label(status)]), 142 )?; 143 rows.into_iter() 144 .map(|row| row.into_record(self.db.path())) 145 .collect() 146 } 147 148 fn mark_published_pending_finalize( 149 &self, 150 job_id: &MycDeliveryOutboxJobId, 151 publish_attempt_count: usize, 152 ) -> Result<MycDeliveryOutboxRecord, MycError> { 153 self.update_record(job_id, |record| { 154 record.mark_published_pending_finalize(publish_attempt_count, now_unix_secs()) 155 }) 156 } 157 158 fn mark_failed( 159 &self, 160 job_id: &MycDeliveryOutboxJobId, 161 publish_attempt_count: usize, 162 error: &str, 163 ) -> Result<MycDeliveryOutboxRecord, MycError> { 164 self.update_record(job_id, |record| { 165 record.mark_failed(publish_attempt_count, error, now_unix_secs()) 166 }) 167 } 168 169 fn mark_finalized( 170 &self, 171 job_id: &MycDeliveryOutboxJobId, 172 ) -> Result<MycDeliveryOutboxRecord, MycError> { 173 self.update_record(job_id, |record| record.mark_finalized(now_unix_secs())) 174 } 175 } 176 177 impl MycDeliveryOutboxRow { 178 fn into_record(self, path: &Path) -> Result<MycDeliveryOutboxRecord, MycError> { 179 Ok(MycDeliveryOutboxRecord { 180 job_id: self.job_id.parse()?, 181 kind: parse_kind(self.kind.as_str())?, 182 status: parse_status(self.status.as_str())?, 183 event: parse_json_field(path, self.event_json.as_str(), "event_json")?, 184 relay_urls: parse_json_field(path, self.relay_urls_json.as_str(), "relay_urls_json")?, 185 connection_id: self.connection_id.as_deref().map(str::parse).transpose()?, 186 request_id: self.request_id, 187 attempt_id: self.attempt_id, 188 signer_publish_workflow_id: self 189 .signer_publish_workflow_id 190 .as_deref() 191 .map(str::parse) 192 .transpose()?, 193 publish_attempt_count: usize_from_i64( 194 path, 195 self.publish_attempt_count, 196 "publish_attempt_count", 197 )?, 198 last_error: self.last_error, 199 created_at_unix: self.created_at_unix, 200 updated_at_unix: self.updated_at_unix, 201 published_at_unix: self.published_at_unix, 202 finalized_at_unix: self.finalized_at_unix, 203 }) 204 } 205 } 206 207 impl MycDeliveryOutboxSqliteDb { 208 fn open(path: PathBuf) -> Result<Self, MycError> { 209 if let Some(parent) = path.parent() { 210 std::fs::create_dir_all(parent).map_err(|source| MycError::CreateDir { 211 path: parent.to_path_buf(), 212 source, 213 })?; 214 } 215 let executor = 216 SqliteExecutor::open(path.as_path()).map_err(|source| MycError::DeliveryOutboxSql { 217 path: path.clone(), 218 source, 219 })?; 220 let db = Self { 221 path, 222 executor, 223 file_backed: true, 224 }; 225 db.configure()?; 226 db.run_migrations()?; 227 Ok(db) 228 } 229 230 #[cfg(test)] 231 fn open_memory() -> Result<Self, MycError> { 232 let executor = 233 SqliteExecutor::open_memory().map_err(|source| MycError::DeliveryOutboxSql { 234 path: PathBuf::from(MYC_DELIVERY_OUTBOX_MEMORY_PATH), 235 source, 236 })?; 237 let db = Self { 238 path: PathBuf::from(MYC_DELIVERY_OUTBOX_MEMORY_PATH), 239 executor, 240 file_backed: false, 241 }; 242 db.configure()?; 243 db.run_migrations()?; 244 Ok(db) 245 } 246 247 fn path(&self) -> &Path { 248 self.path.as_path() 249 } 250 251 fn executor(&self) -> &SqliteExecutor { 252 &self.executor 253 } 254 255 fn configure(&self) -> Result<(), MycError> { 256 exec_json( 257 self.path(), 258 self.executor(), 259 "PRAGMA foreign_keys = ON", 260 json!([]), 261 )?; 262 if self.file_backed { 263 exec_json( 264 self.path(), 265 self.executor(), 266 "PRAGMA journal_mode = WAL", 267 json!([]), 268 )?; 269 } 270 Ok(()) 271 } 272 273 fn run_migrations(&self) -> Result<(), MycError> { 274 migrations_run_all_up(self.executor(), MYC_DELIVERY_OUTBOX_MIGRATIONS).map_err(|source| { 275 MycError::DeliveryOutboxSql { 276 path: self.path.clone(), 277 source, 278 } 279 }) 280 } 281 } 282 283 fn serialize_record_params( 284 path: &Path, 285 record: &MycDeliveryOutboxRecord, 286 ) -> Result<Value, MycError> { 287 Ok(Value::Array(vec![ 288 Value::from(record.job_id.as_str()), 289 Value::from(kind_label(record.kind)), 290 Value::from(status_label(record.status)), 291 Value::from(serialize_json_field(path, &record.event)?), 292 Value::from(serialize_json_field(path, &record.relay_urls)?), 293 record 294 .connection_id 295 .as_ref() 296 .map(|value| Value::from(value.as_str())) 297 .unwrap_or(Value::Null), 298 record 299 .request_id 300 .clone() 301 .map(Value::from) 302 .unwrap_or(Value::Null), 303 record 304 .attempt_id 305 .clone() 306 .map(Value::from) 307 .unwrap_or(Value::Null), 308 record 309 .signer_publish_workflow_id 310 .as_ref() 311 .map(|value| Value::from(value.as_str())) 312 .unwrap_or(Value::Null), 313 Value::from(i64::try_from(record.publish_attempt_count).map_err(|_| { 314 MycError::InvalidOperation( 315 "delivery outbox publish_attempt_count exceeds sqlite range".to_owned(), 316 ) 317 })?), 318 record 319 .last_error 320 .clone() 321 .map(Value::from) 322 .unwrap_or(Value::Null), 323 Value::from(record.created_at_unix), 324 Value::from(record.updated_at_unix), 325 record 326 .published_at_unix 327 .map(Value::from) 328 .unwrap_or(Value::Null), 329 record 330 .finalized_at_unix 331 .map(Value::from) 332 .unwrap_or(Value::Null), 333 ])) 334 } 335 336 fn serialize_record_update_params( 337 path: &Path, 338 record: &MycDeliveryOutboxRecord, 339 trailing_job_id: &str, 340 ) -> Result<Value, MycError> { 341 Ok(Value::Array(vec![ 342 Value::from(kind_label(record.kind)), 343 Value::from(status_label(record.status)), 344 Value::from(serialize_json_field(path, &record.event)?), 345 Value::from(serialize_json_field(path, &record.relay_urls)?), 346 record 347 .connection_id 348 .as_ref() 349 .map(|value| Value::from(value.as_str())) 350 .unwrap_or(Value::Null), 351 record 352 .request_id 353 .clone() 354 .map(Value::from) 355 .unwrap_or(Value::Null), 356 record 357 .attempt_id 358 .clone() 359 .map(Value::from) 360 .unwrap_or(Value::Null), 361 record 362 .signer_publish_workflow_id 363 .as_ref() 364 .map(|value| Value::from(value.as_str())) 365 .unwrap_or(Value::Null), 366 Value::from(i64::try_from(record.publish_attempt_count).map_err(|_| { 367 MycError::InvalidOperation( 368 "delivery outbox publish_attempt_count exceeds sqlite range".to_owned(), 369 ) 370 })?), 371 record 372 .last_error 373 .clone() 374 .map(Value::from) 375 .unwrap_or(Value::Null), 376 Value::from(record.created_at_unix), 377 Value::from(record.updated_at_unix), 378 record 379 .published_at_unix 380 .map(Value::from) 381 .unwrap_or(Value::Null), 382 record 383 .finalized_at_unix 384 .map(Value::from) 385 .unwrap_or(Value::Null), 386 Value::from(trailing_job_id), 387 ])) 388 } 389 390 fn exec_json( 391 path: &Path, 392 executor: &impl SqlExecutor, 393 sql: &str, 394 params: Value, 395 ) -> Result<(), MycError> { 396 executor 397 .exec(sql, params.to_string().as_str()) 398 .map_err(|source| MycError::DeliveryOutboxSql { 399 path: path.to_path_buf(), 400 source, 401 })?; 402 Ok(()) 403 } 404 405 fn query_rows<T: DeserializeOwned>( 406 path: &Path, 407 executor: &impl SqlExecutor, 408 sql: &str, 409 params: Value, 410 ) -> Result<Vec<T>, MycError> { 411 let raw = executor 412 .query_raw(sql, params.to_string().as_str()) 413 .map_err(|source| MycError::DeliveryOutboxSql { 414 path: path.to_path_buf(), 415 source, 416 })?; 417 serde_json::from_str(&raw).map_err(|source| MycError::DeliveryOutboxSqlDecode { 418 path: path.to_path_buf(), 419 source, 420 }) 421 } 422 423 fn serialize_json_field(path: &Path, value: &impl serde::Serialize) -> Result<String, MycError> { 424 serde_json::to_string(value).map_err(|source| MycError::DeliveryOutboxSerialize { 425 path: path.to_path_buf(), 426 source, 427 }) 428 } 429 430 fn parse_json_field<T: DeserializeOwned>( 431 path: &Path, 432 value: &str, 433 _field: &str, 434 ) -> Result<T, MycError> { 435 serde_json::from_str(value).map_err(|source| MycError::DeliveryOutboxSqlDecode { 436 path: path.to_path_buf(), 437 source, 438 }) 439 } 440 441 fn kind_label(kind: MycDeliveryOutboxKind) -> &'static str { 442 match kind { 443 MycDeliveryOutboxKind::ListenerResponsePublish => "listener_response_publish", 444 MycDeliveryOutboxKind::ConnectAcceptPublish => "connect_accept_publish", 445 MycDeliveryOutboxKind::AuthReplayPublish => "auth_replay_publish", 446 MycDeliveryOutboxKind::DiscoveryHandlerPublish => "discovery_handler_publish", 447 } 448 } 449 450 fn parse_kind(value: &str) -> Result<MycDeliveryOutboxKind, MycError> { 451 match value { 452 "listener_response_publish" => Ok(MycDeliveryOutboxKind::ListenerResponsePublish), 453 "connect_accept_publish" => Ok(MycDeliveryOutboxKind::ConnectAcceptPublish), 454 "auth_replay_publish" => Ok(MycDeliveryOutboxKind::AuthReplayPublish), 455 "discovery_handler_publish" => Ok(MycDeliveryOutboxKind::DiscoveryHandlerPublish), 456 other => Err(MycError::InvalidOperation(format!( 457 "unknown delivery outbox kind `{other}`" 458 ))), 459 } 460 } 461 462 fn status_label(status: MycDeliveryOutboxStatus) -> &'static str { 463 match status { 464 MycDeliveryOutboxStatus::Queued => "queued", 465 MycDeliveryOutboxStatus::PublishedPendingFinalize => "published_pending_finalize", 466 MycDeliveryOutboxStatus::Finalized => "finalized", 467 MycDeliveryOutboxStatus::Failed => "failed", 468 } 469 } 470 471 fn parse_status(value: &str) -> Result<MycDeliveryOutboxStatus, MycError> { 472 match value { 473 "queued" => Ok(MycDeliveryOutboxStatus::Queued), 474 "published_pending_finalize" => Ok(MycDeliveryOutboxStatus::PublishedPendingFinalize), 475 "finalized" => Ok(MycDeliveryOutboxStatus::Finalized), 476 "failed" => Ok(MycDeliveryOutboxStatus::Failed), 477 other => Err(MycError::InvalidOperation(format!( 478 "unknown delivery outbox status `{other}`" 479 ))), 480 } 481 } 482 483 fn usize_from_i64(path: &Path, value: i64, field: &str) -> Result<usize, MycError> { 484 usize::try_from(value).map_err(|_| { 485 MycError::InvalidOperation(format!( 486 "delivery outbox field `{field}` at {} is out of range for usize", 487 path.display() 488 )) 489 }) 490 } 491 492 #[cfg(test)] 493 mod tests { 494 use radroots_identity::RadrootsIdentity; 495 use radroots_nostr::prelude::{RadrootsNostrEventBuilder, RadrootsNostrKind}; 496 use radroots_nostr_signer::prelude::{ 497 RadrootsNostrSignerConnectionId, RadrootsNostrSignerWorkflowId, 498 }; 499 500 use crate::outbox::{ 501 MycDeliveryOutboxKind, MycDeliveryOutboxRecord, MycDeliveryOutboxStatus, 502 MycDeliveryOutboxStore, 503 }; 504 505 use super::MycSqliteDeliveryOutboxStore; 506 507 fn sample_record() -> MycDeliveryOutboxRecord { 508 let identity = RadrootsIdentity::from_secret_key_str( 509 "1111111111111111111111111111111111111111111111111111111111111111", 510 ) 511 .expect("identity"); 512 let event = RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(24133), "hello") 513 .sign_with_keys(identity.keys()) 514 .expect("sign event"); 515 MycDeliveryOutboxRecord::new( 516 MycDeliveryOutboxKind::AuthReplayPublish, 517 event, 518 vec!["wss://relay.example.com".parse().expect("relay")], 519 ) 520 .expect("record") 521 .with_connection_id( 522 &RadrootsNostrSignerConnectionId::parse("conn-sqlite-outbox").expect("id"), 523 ) 524 .with_request_id("req-sqlite-outbox") 525 .with_attempt_id("attempt-sqlite-outbox") 526 .with_signer_publish_workflow_id( 527 &RadrootsNostrSignerWorkflowId::parse("wf-sqlite-outbox").expect("id"), 528 ) 529 } 530 531 #[test] 532 fn sqlite_outbox_store_round_trips_and_updates_status() { 533 let store = MycSqliteDeliveryOutboxStore::open_memory().expect("open store"); 534 let record = sample_record(); 535 536 store.enqueue(&record).expect("enqueue"); 537 assert_eq!( 538 store.get(&record.job_id).expect("get"), 539 Some(record.clone()) 540 ); 541 assert_eq!(store.list_all().expect("list all"), vec![record.clone()]); 542 assert_eq!( 543 store 544 .list_by_status(MycDeliveryOutboxStatus::Queued) 545 .expect("list queued"), 546 vec![record.clone()] 547 ); 548 549 let published = store 550 .mark_published_pending_finalize(&record.job_id, 1) 551 .expect("mark published"); 552 assert_eq!( 553 published.status, 554 MycDeliveryOutboxStatus::PublishedPendingFinalize 555 ); 556 assert_eq!(published.publish_attempt_count, 1); 557 558 let failed = store 559 .mark_failed(&record.job_id, 2, "relay rejected") 560 .expect("mark failed"); 561 assert_eq!(failed.status, MycDeliveryOutboxStatus::Failed); 562 assert_eq!(failed.last_error.as_deref(), Some("relay rejected")); 563 564 let republished = store 565 .mark_published_pending_finalize(&record.job_id, 3) 566 .expect("republish"); 567 assert_eq!( 568 republished.status, 569 MycDeliveryOutboxStatus::PublishedPendingFinalize 570 ); 571 572 let finalized = store 573 .mark_finalized(&record.job_id) 574 .expect("mark finalized"); 575 assert_eq!(finalized.status, MycDeliveryOutboxStatus::Finalized); 576 assert_eq!( 577 store 578 .list_by_status(MycDeliveryOutboxStatus::Finalized) 579 .expect("list finalized"), 580 vec![finalized] 581 ); 582 } 583 584 #[test] 585 fn sqlite_outbox_store_reopens_file_backed_state() { 586 let temp = tempfile::tempdir().expect("tempdir"); 587 let record = sample_record(); 588 589 let store = MycSqliteDeliveryOutboxStore::open(temp.path()).expect("open store"); 590 store.enqueue(&record).expect("enqueue"); 591 592 let reopened = MycSqliteDeliveryOutboxStore::open(temp.path()).expect("reopen store"); 593 assert_eq!( 594 reopened.get(&record.job_id).expect("get reopened"), 595 Some(record) 596 ); 597 assert!(reopened.path().ends_with("delivery-outbox.sqlite")); 598 } 599 }