myc

Self-custodial remote signer for Radroots apps
git clone https://radroots.dev/git/myc.git
Log | Files | Refs | README | LICENSE

commit 3c9df8754e1901ee2d6e7c39b0df3126bbc5b3c7
parent 0a3bad41f032d7d1df833a63130375f7de7878e2
Author: triesap <tyson@radroots.org>
Date:   Thu, 26 Mar 2026 18:29:41 +0000

delivery: add sqlite outbox boundary

Diffstat:
MCargo.lock | 2++
MCargo.toml | 1+
Amigrations/0000_delivery_outbox_init.down.sql | 6++++++
Amigrations/0000_delivery_outbox_init.up.sql | 32++++++++++++++++++++++++++++++++
Msrc/app/runtime.rs | 34++++++++++++++++++++++++++++++++--
Msrc/error.rs | 22++++++++++++++++++++++
Msrc/lib.rs | 7+++++++
Asrc/outbox.rs | 343+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/outbox_sqlite.rs | 598+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mtests/operability_e2e.rs | 2+-
10 files changed, 1044 insertions(+), 3 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -1366,6 +1366,7 @@ dependencies = [ "tracing", "tracing-subscriber", "url", + "uuid", ] [[package]] @@ -2794,6 +2795,7 @@ checksum = "a68d3c8f01c0cfa54a75291d83601161799e4a89a39e0929f4b0354d88757a37" dependencies = [ "getrandom 0.4.2", "js-sys", + "serde_core", "wasm-bindgen", ] diff --git a/Cargo.toml b/Cargo.toml @@ -31,6 +31,7 @@ tokio = { version = "1.48", features = ["macros", "net", "rt-multi-thread", "syn tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } url = "2.5" +uuid = { version = "1.18", features = ["serde", "v7"] } [dev-dependencies] futures-util = "0.3.32" diff --git a/migrations/0000_delivery_outbox_init.down.sql b/migrations/0000_delivery_outbox_init.down.sql @@ -0,0 +1,6 @@ +DROP INDEX IF EXISTS idx_myc_delivery_outbox_signer_workflow_id; +DROP INDEX IF EXISTS idx_myc_delivery_outbox_attempt_id; +DROP INDEX IF EXISTS idx_myc_delivery_outbox_request_id; +DROP INDEX IF EXISTS idx_myc_delivery_outbox_connection_id; +DROP INDEX IF EXISTS idx_myc_delivery_outbox_status; +DROP TABLE IF EXISTS myc_delivery_outbox; diff --git a/migrations/0000_delivery_outbox_init.up.sql b/migrations/0000_delivery_outbox_init.up.sql @@ -0,0 +1,32 @@ +CREATE TABLE myc_delivery_outbox ( + job_id TEXT PRIMARY KEY, + kind TEXT NOT NULL, + status TEXT NOT NULL, + event_json TEXT NOT NULL, + relay_urls_json TEXT NOT NULL, + connection_id TEXT, + request_id TEXT, + attempt_id TEXT, + signer_publish_workflow_id TEXT, + publish_attempt_count INTEGER NOT NULL, + last_error TEXT, + created_at_unix INTEGER NOT NULL, + updated_at_unix INTEGER NOT NULL, + published_at_unix INTEGER, + finalized_at_unix INTEGER +); + +CREATE INDEX idx_myc_delivery_outbox_status + ON myc_delivery_outbox(status, created_at_unix, job_id); + +CREATE INDEX idx_myc_delivery_outbox_connection_id + ON myc_delivery_outbox(connection_id, created_at_unix, job_id); + +CREATE INDEX idx_myc_delivery_outbox_request_id + ON myc_delivery_outbox(request_id, created_at_unix, job_id); + +CREATE INDEX idx_myc_delivery_outbox_attempt_id + ON myc_delivery_outbox(attempt_id, created_at_unix, job_id); + +CREATE INDEX idx_myc_delivery_outbox_signer_workflow_id + ON myc_delivery_outbox(signer_publish_workflow_id, created_at_unix, job_id); diff --git a/src/app/runtime.rs b/src/app/runtime.rs @@ -13,6 +13,8 @@ use crate::config::{ use crate::custody::MycIdentityProvider; use crate::error::MycError; use crate::operability::server::run_observability_server; +use crate::outbox::MycDeliveryOutboxStore; +use crate::outbox_sqlite::MycSqliteDeliveryOutboxStore; use crate::policy::MycPolicyContext; use crate::transport::{MycNip46Service, MycNostrTransport, MycTransportSnapshot}; use radroots_identity::{RadrootsIdentity, RadrootsIdentityPublic}; @@ -30,6 +32,7 @@ pub struct MycRuntimePaths { pub user_identity_path: PathBuf, pub signer_state_path: PathBuf, pub runtime_audit_path: PathBuf, + pub delivery_outbox_path: PathBuf, } #[derive(Debug, Clone, PartialEq, Eq, Serialize)] @@ -73,6 +76,7 @@ pub struct MycRuntime { paths: MycRuntimePaths, signer: MycSignerContext, transport: Option<MycNostrTransport>, + delivery_outbox_store: Arc<dyn MycDeliveryOutboxStore>, } impl MycRuntime { @@ -90,11 +94,13 @@ impl MycRuntime { config.paths.user_identity_source(), )?; let transport = MycNostrTransport::bootstrap(&config.transport, &signer.signer_identity)?; + let delivery_outbox_store = Arc::new(MycSqliteDeliveryOutboxStore::open(&paths.state_dir)?); let runtime = Self { paths, config, signer, transport, + delivery_outbox_store, }; Ok(runtime) } @@ -135,6 +141,10 @@ impl MycRuntime { self.signer.operation_audit_store() } + pub fn delivery_outbox_store(&self) -> Arc<dyn MycDeliveryOutboxStore> { + self.delivery_outbox_store.clone() + } + pub fn record_operation_audit(&self, record: &MycOperationAuditRecord) { self.signer.record_operation_audit(record); } @@ -322,6 +332,10 @@ impl MycRuntimePaths { }) } + pub(crate) fn delivery_outbox_path_for_state_dir(state_dir: &Path) -> PathBuf { + state_dir.join("delivery-outbox.sqlite") + } + fn from_config(config: &MycConfig) -> Self { let state_dir = config.paths.state_dir.clone(); let audit_dir = Self::audit_dir_for_state_dir(&state_dir); @@ -336,6 +350,7 @@ impl MycRuntimePaths { &audit_dir, config.persistence.runtime_audit_backend, ), + delivery_outbox_path: Self::delivery_outbox_path_for_state_dir(&state_dir), audit_dir, state_dir, } @@ -548,12 +563,11 @@ mod tests { RadrootsNostrFileSignerStore, RadrootsNostrSignerManager, RadrootsNostrSqliteSignerStore, }; + use super::MycRuntime; use crate::audit::{MycOperationAuditKind, MycOperationAuditOutcome, MycOperationAuditRecord}; use crate::config::{MycConfig, MycRuntimeAuditBackend, MycSignerStateBackend}; use crate::error::MycError; - use super::MycRuntime; - fn write_test_identity(path: &std::path::Path, secret_key: &str) { RadrootsIdentity::from_secret_key_str(secret_key) .expect("identity from secret") @@ -595,6 +609,20 @@ mod tests { .ends_with("signer-state.json") ); assert!(runtime.paths().signer_state_path.is_file()); + assert!( + runtime + .paths() + .delivery_outbox_path + .ends_with("delivery-outbox.sqlite") + ); + assert!(runtime.paths().delivery_outbox_path.is_file()); + assert!( + runtime + .delivery_outbox_store() + .list_all() + .expect("list outbox jobs") + .is_empty() + ); assert_eq!( runtime .signer_manager() @@ -744,6 +772,7 @@ mod tests { .ends_with("signer-state.sqlite") ); assert!(runtime.paths().signer_state_path.is_file()); + assert!(runtime.paths().delivery_outbox_path.is_file()); } #[test] @@ -828,5 +857,6 @@ mod tests { .join("operations.sqlite") .is_file() ); + assert!(runtime.paths().delivery_outbox_path.is_file()); } } diff --git a/src/error.rs b/src/error.rs @@ -86,6 +86,28 @@ pub enum MycError { #[source] source: serde_json::Error, }, + #[error("delivery outbox sqlite error at {path}: {source}")] + DeliveryOutboxSql { + path: PathBuf, + #[source] + source: SqlError, + }, + #[error("delivery outbox sqlite decode error at {path}: {source}")] + DeliveryOutboxSqlDecode { + path: PathBuf, + #[source] + source: serde_json::Error, + }, + #[error("failed to serialize delivery outbox record at {path}: {source}")] + DeliveryOutboxSerialize { + path: PathBuf, + #[source] + source: serde_json::Error, + }, + #[error("invalid delivery outbox job id `{0}`")] + InvalidDeliveryOutboxJobId(String), + #[error("delivery outbox job not found: {0}")] + DeliveryOutboxJobNotFound(String), #[error("discovery io error at {path}: {source}")] DiscoveryIo { path: PathBuf, diff --git a/src/lib.rs b/src/lib.rs @@ -11,6 +11,8 @@ pub mod discovery; pub mod error; pub mod logging; pub mod operability; +pub mod outbox; +mod outbox_sqlite; pub mod persistence; pub mod policy; pub mod transport; @@ -50,6 +52,11 @@ pub use operability::{ MycStatusSummaryOutput, MycTransportStatusOutput, collect_metrics, collect_status_full, collect_status_summary, render_metrics_text, }; +pub use outbox::{ + MycDeliveryOutboxJobId, MycDeliveryOutboxKind, MycDeliveryOutboxRecord, + MycDeliveryOutboxStatus, MycDeliveryOutboxStore, +}; +pub use outbox_sqlite::MycSqliteDeliveryOutboxStore; pub use persistence::{ MycPersistenceImportJsonToSqliteOutput, MycPersistenceImportSelection, MycRuntimeAuditImportOutput, MycSignerStateImportOutput, import_json_to_sqlite, diff --git a/src/outbox.rs b/src/outbox.rs @@ -0,0 +1,343 @@ +use std::fmt; +use std::str::FromStr; +use std::time::{SystemTime, UNIX_EPOCH}; + +use radroots_nostr::prelude::{RadrootsNostrEvent, RadrootsNostrRelayUrl}; +use radroots_nostr_signer::prelude::{ + RadrootsNostrSignerConnectionId, RadrootsNostrSignerWorkflowId, +}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use crate::error::MycError; + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct MycDeliveryOutboxJobId(String); + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum MycDeliveryOutboxKind { + ListenerResponsePublish, + ConnectAcceptPublish, + AuthReplayPublish, + DiscoveryHandlerPublish, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum MycDeliveryOutboxStatus { + Queued, + PublishedPendingFinalize, + Finalized, + Failed, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct MycDeliveryOutboxRecord { + pub job_id: MycDeliveryOutboxJobId, + pub kind: MycDeliveryOutboxKind, + pub status: MycDeliveryOutboxStatus, + pub event: RadrootsNostrEvent, + pub relay_urls: Vec<RadrootsNostrRelayUrl>, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub connection_id: Option<RadrootsNostrSignerConnectionId>, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub request_id: Option<String>, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub attempt_id: Option<String>, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub signer_publish_workflow_id: Option<RadrootsNostrSignerWorkflowId>, + pub publish_attempt_count: usize, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_error: Option<String>, + pub created_at_unix: u64, + pub updated_at_unix: u64, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub published_at_unix: Option<u64>, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub finalized_at_unix: Option<u64>, +} + +pub trait MycDeliveryOutboxStore: Send + Sync { + fn enqueue(&self, record: &MycDeliveryOutboxRecord) -> Result<(), MycError>; + fn get( + &self, + job_id: &MycDeliveryOutboxJobId, + ) -> Result<Option<MycDeliveryOutboxRecord>, MycError>; + fn list_all(&self) -> Result<Vec<MycDeliveryOutboxRecord>, MycError>; + fn list_by_status( + &self, + status: MycDeliveryOutboxStatus, + ) -> Result<Vec<MycDeliveryOutboxRecord>, MycError>; + fn mark_published_pending_finalize( + &self, + job_id: &MycDeliveryOutboxJobId, + publish_attempt_count: usize, + ) -> Result<MycDeliveryOutboxRecord, MycError>; + fn mark_failed( + &self, + job_id: &MycDeliveryOutboxJobId, + publish_attempt_count: usize, + error: &str, + ) -> Result<MycDeliveryOutboxRecord, MycError>; + fn mark_finalized( + &self, + job_id: &MycDeliveryOutboxJobId, + ) -> Result<MycDeliveryOutboxRecord, MycError>; +} + +impl MycDeliveryOutboxJobId { + pub fn new_v7() -> Self { + Self(Uuid::now_v7().to_string()) + } + + pub fn parse(value: &str) -> Result<Self, MycError> { + let trimmed = value.trim(); + if trimmed.is_empty() { + return Err(MycError::InvalidDeliveryOutboxJobId(value.to_owned())); + } + Ok(Self(trimmed.to_owned())) + } + + pub fn as_str(&self) -> &str { + self.0.as_str() + } +} + +impl fmt::Display for MycDeliveryOutboxJobId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(self.as_str()) + } +} + +impl AsRef<str> for MycDeliveryOutboxJobId { + fn as_ref(&self) -> &str { + self.as_str() + } +} + +impl FromStr for MycDeliveryOutboxJobId { + type Err = MycError; + + fn from_str(value: &str) -> Result<Self, Self::Err> { + Self::parse(value) + } +} + +impl MycDeliveryOutboxRecord { + pub fn new( + kind: MycDeliveryOutboxKind, + event: RadrootsNostrEvent, + relay_urls: Vec<RadrootsNostrRelayUrl>, + ) -> Result<Self, MycError> { + if relay_urls.is_empty() { + return Err(MycError::InvalidOperation( + "delivery outbox job requires at least one relay".to_owned(), + )); + } + let created_at_unix = now_unix_secs(); + Ok(Self { + job_id: MycDeliveryOutboxJobId::new_v7(), + kind, + status: MycDeliveryOutboxStatus::Queued, + event, + relay_urls, + connection_id: None, + request_id: None, + attempt_id: None, + signer_publish_workflow_id: None, + publish_attempt_count: 0, + last_error: None, + created_at_unix, + updated_at_unix: created_at_unix, + published_at_unix: None, + finalized_at_unix: None, + }) + } + + pub fn with_connection_id(mut self, connection_id: &RadrootsNostrSignerConnectionId) -> Self { + self.connection_id = Some(connection_id.clone()); + self + } + + pub fn with_request_id(mut self, request_id: impl Into<String>) -> Self { + self.request_id = Some(request_id.into()); + self + } + + pub fn with_attempt_id(mut self, attempt_id: impl Into<String>) -> Self { + self.attempt_id = Some(attempt_id.into()); + self + } + + pub fn with_signer_publish_workflow_id( + mut self, + workflow_id: &RadrootsNostrSignerWorkflowId, + ) -> Self { + self.signer_publish_workflow_id = Some(workflow_id.clone()); + self + } + + pub fn mark_published_pending_finalize( + &mut self, + publish_attempt_count: usize, + updated_at_unix: u64, + ) -> Result<(), MycError> { + match self.status { + MycDeliveryOutboxStatus::Queued | MycDeliveryOutboxStatus::Failed => { + self.status = MycDeliveryOutboxStatus::PublishedPendingFinalize; + self.publish_attempt_count = publish_attempt_count; + self.last_error = None; + self.published_at_unix = Some(updated_at_unix); + self.updated_at_unix = updated_at_unix; + Ok(()) + } + MycDeliveryOutboxStatus::PublishedPendingFinalize => Ok(()), + MycDeliveryOutboxStatus::Finalized => Err(MycError::InvalidOperation( + "cannot mark a finalized delivery outbox job as published".to_owned(), + )), + } + } + + pub fn mark_failed( + &mut self, + publish_attempt_count: usize, + error: impl AsRef<str>, + updated_at_unix: u64, + ) -> Result<(), MycError> { + if self.status == MycDeliveryOutboxStatus::Finalized { + return Err(MycError::InvalidOperation( + "cannot fail a finalized delivery outbox job".to_owned(), + )); + } + let error = error.as_ref().trim(); + if error.is_empty() { + return Err(MycError::InvalidOperation( + "delivery outbox failure reason must not be empty".to_owned(), + )); + } + + self.status = MycDeliveryOutboxStatus::Failed; + self.publish_attempt_count = publish_attempt_count; + self.last_error = Some(error.to_owned()); + self.updated_at_unix = updated_at_unix; + Ok(()) + } + + pub fn mark_finalized(&mut self, updated_at_unix: u64) -> Result<(), MycError> { + if self.status != MycDeliveryOutboxStatus::PublishedPendingFinalize { + return Err(MycError::InvalidOperation( + "cannot finalize a delivery outbox job before publish confirmation".to_owned(), + )); + } + + self.status = MycDeliveryOutboxStatus::Finalized; + self.finalized_at_unix = Some(updated_at_unix); + self.updated_at_unix = updated_at_unix; + Ok(()) + } +} + +pub(crate) fn now_unix_secs() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|duration| duration.as_secs()) + .unwrap_or(0) +} + +#[cfg(test)] +mod tests { + use radroots_identity::RadrootsIdentity; + use radroots_nostr::prelude::{RadrootsNostrEventBuilder, RadrootsNostrKind}; + use radroots_nostr_signer::prelude::{ + RadrootsNostrSignerConnectionId, RadrootsNostrSignerWorkflowId, + }; + + use super::{ + MycDeliveryOutboxJobId, MycDeliveryOutboxKind, MycDeliveryOutboxRecord, + MycDeliveryOutboxStatus, + }; + + fn signed_event() -> nostr::Event { + let identity = RadrootsIdentity::from_secret_key_str( + "1111111111111111111111111111111111111111111111111111111111111111", + ) + .expect("identity"); + RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(24133), "hello") + .sign_with_keys(identity.keys()) + .expect("sign event") + } + + #[test] + fn delivery_outbox_job_ids_parse_and_display() { + let job_id = MycDeliveryOutboxJobId::parse("job-1").expect("job id"); + assert_eq!(job_id.as_str(), "job-1"); + assert_eq!(job_id.to_string(), "job-1"); + assert_eq!(job_id.as_ref(), "job-1"); + assert!(MycDeliveryOutboxJobId::parse(" ").is_err()); + assert!(!MycDeliveryOutboxJobId::new_v7().as_str().is_empty()); + } + + #[test] + fn delivery_outbox_record_covers_state_transitions() { + let connection_id = RadrootsNostrSignerConnectionId::parse("conn-outbox").expect("id"); + let workflow_id = RadrootsNostrSignerWorkflowId::parse("wf-outbox").expect("id"); + let mut record = MycDeliveryOutboxRecord::new( + MycDeliveryOutboxKind::AuthReplayPublish, + signed_event(), + vec!["wss://relay.example.com".parse().expect("relay")], + ) + .expect("record") + .with_connection_id(&connection_id) + .with_request_id("req-1") + .with_attempt_id("attempt-1") + .with_signer_publish_workflow_id(&workflow_id); + + assert_eq!(record.status, MycDeliveryOutboxStatus::Queued); + assert_eq!(record.connection_id.as_ref(), Some(&connection_id)); + assert_eq!(record.request_id.as_deref(), Some("req-1")); + assert_eq!(record.attempt_id.as_deref(), Some("attempt-1")); + assert_eq!( + record.signer_publish_workflow_id.as_ref(), + Some(&workflow_id) + ); + + record + .mark_published_pending_finalize(1, 100) + .expect("mark published"); + assert_eq!( + record.status, + MycDeliveryOutboxStatus::PublishedPendingFinalize + ); + assert_eq!(record.publish_attempt_count, 1); + assert_eq!(record.published_at_unix, Some(100)); + + record + .mark_failed(2, "relay rejected", 101) + .expect("mark failed"); + assert_eq!(record.status, MycDeliveryOutboxStatus::Failed); + assert_eq!(record.last_error.as_deref(), Some("relay rejected")); + + record + .mark_published_pending_finalize(3, 102) + .expect("republish"); + record.mark_finalized(103).expect("finalize"); + assert_eq!(record.status, MycDeliveryOutboxStatus::Finalized); + assert_eq!(record.finalized_at_unix, Some(103)); + assert!(record.mark_failed(4, "late failure", 104).is_err()); + } + + #[test] + fn delivery_outbox_record_requires_relays() { + let err = MycDeliveryOutboxRecord::new( + MycDeliveryOutboxKind::ListenerResponsePublish, + signed_event(), + Vec::new(), + ) + .expect_err("missing relays"); + assert!( + err.to_string() + .contains("delivery outbox job requires at least one relay") + ); + } +} diff --git a/src/outbox_sqlite.rs b/src/outbox_sqlite.rs @@ -0,0 +1,598 @@ +use std::path::{Path, PathBuf}; + +use radroots_sql_core::migrations::{Migration, migrations_run_all_up}; +use radroots_sql_core::{SqlExecutor, SqliteExecutor}; +use serde::Deserialize; +use serde::de::DeserializeOwned; +use serde_json::{Value, json}; + +use crate::error::MycError; +use crate::outbox::{ + MycDeliveryOutboxJobId, MycDeliveryOutboxKind, MycDeliveryOutboxRecord, + MycDeliveryOutboxStatus, MycDeliveryOutboxStore, now_unix_secs, +}; + +const MYC_DELIVERY_OUTBOX_SQLITE_FILE_NAME: &str = "delivery-outbox.sqlite"; +#[cfg(test)] +const MYC_DELIVERY_OUTBOX_MEMORY_PATH: &str = ":memory:"; + +static MYC_DELIVERY_OUTBOX_MIGRATIONS: &[Migration] = &[Migration { + name: "0000_delivery_outbox_init", + up_sql: include_str!("../migrations/0000_delivery_outbox_init.up.sql"), + down_sql: include_str!("../migrations/0000_delivery_outbox_init.down.sql"), +}]; + +pub struct MycSqliteDeliveryOutboxStore { + db: MycDeliveryOutboxSqliteDb, +} + +struct MycDeliveryOutboxSqliteDb { + path: PathBuf, + executor: SqliteExecutor, + file_backed: bool, +} + +#[derive(Debug, Deserialize)] +struct MycDeliveryOutboxRow { + job_id: String, + kind: String, + status: String, + event_json: String, + relay_urls_json: String, + connection_id: Option<String>, + request_id: Option<String>, + attempt_id: Option<String>, + signer_publish_workflow_id: Option<String>, + publish_attempt_count: i64, + last_error: Option<String>, + created_at_unix: u64, + updated_at_unix: u64, + published_at_unix: Option<u64>, + finalized_at_unix: Option<u64>, +} + +impl MycSqliteDeliveryOutboxStore { + pub fn open(state_dir: impl AsRef<Path>) -> Result<Self, MycError> { + let db = MycDeliveryOutboxSqliteDb::open( + state_dir + .as_ref() + .join(MYC_DELIVERY_OUTBOX_SQLITE_FILE_NAME), + )?; + Ok(Self { db }) + } + + #[cfg(test)] + pub fn open_memory() -> Result<Self, MycError> { + Ok(Self { + db: MycDeliveryOutboxSqliteDb::open_memory()?, + }) + } + + pub fn path(&self) -> &Path { + self.db.path() + } + + fn update_record( + &self, + job_id: &MycDeliveryOutboxJobId, + update: impl FnOnce(&mut MycDeliveryOutboxRecord) -> Result<(), MycError>, + ) -> Result<MycDeliveryOutboxRecord, MycError> { + let mut record = self + .get(job_id)? + .ok_or_else(|| MycError::DeliveryOutboxJobNotFound(job_id.to_string()))?; + update(&mut record)?; + exec_json( + self.db.path(), + self.db.executor(), + "UPDATE myc_delivery_outbox SET kind = ?, status = ?, event_json = ?, relay_urls_json = ?, connection_id = ?, request_id = ?, attempt_id = ?, signer_publish_workflow_id = ?, publish_attempt_count = ?, last_error = ?, created_at_unix = ?, updated_at_unix = ?, published_at_unix = ?, finalized_at_unix = ? WHERE job_id = ?", + serialize_record_update_params(self.db.path(), &record, job_id.as_str())?, + )?; + Ok(record) + } +} + +impl MycDeliveryOutboxStore for MycSqliteDeliveryOutboxStore { + fn enqueue(&self, record: &MycDeliveryOutboxRecord) -> Result<(), MycError> { + exec_json( + self.db.path(), + self.db.executor(), + "INSERT INTO myc_delivery_outbox(job_id, kind, status, event_json, relay_urls_json, connection_id, request_id, attempt_id, signer_publish_workflow_id, publish_attempt_count, last_error, created_at_unix, updated_at_unix, published_at_unix, finalized_at_unix) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + serialize_record_params(self.db.path(), record)?, + ) + } + + fn get( + &self, + job_id: &MycDeliveryOutboxJobId, + ) -> Result<Option<MycDeliveryOutboxRecord>, MycError> { + let rows: Vec<MycDeliveryOutboxRow> = query_rows( + self.db.path(), + self.db.executor(), + "SELECT job_id, kind, status, event_json, relay_urls_json, connection_id, request_id, attempt_id, signer_publish_workflow_id, publish_attempt_count, last_error, created_at_unix, updated_at_unix, published_at_unix, finalized_at_unix FROM myc_delivery_outbox WHERE job_id = ? LIMIT 1", + json!([job_id.as_str()]), + )?; + rows.into_iter() + .next() + .map(|row| row.into_record(self.db.path())) + .transpose() + } + + fn list_all(&self) -> Result<Vec<MycDeliveryOutboxRecord>, MycError> { + let rows: Vec<MycDeliveryOutboxRow> = query_rows( + self.db.path(), + self.db.executor(), + "SELECT job_id, kind, status, event_json, relay_urls_json, connection_id, request_id, attempt_id, signer_publish_workflow_id, publish_attempt_count, last_error, created_at_unix, updated_at_unix, published_at_unix, finalized_at_unix FROM myc_delivery_outbox ORDER BY created_at_unix ASC, job_id ASC", + json!([]), + )?; + rows.into_iter() + .map(|row| row.into_record(self.db.path())) + .collect() + } + + fn list_by_status( + &self, + status: MycDeliveryOutboxStatus, + ) -> Result<Vec<MycDeliveryOutboxRecord>, MycError> { + let rows: Vec<MycDeliveryOutboxRow> = query_rows( + self.db.path(), + self.db.executor(), + "SELECT job_id, kind, status, event_json, relay_urls_json, connection_id, request_id, attempt_id, signer_publish_workflow_id, publish_attempt_count, last_error, created_at_unix, updated_at_unix, published_at_unix, finalized_at_unix FROM myc_delivery_outbox WHERE status = ? ORDER BY created_at_unix ASC, job_id ASC", + json!([status_label(status)]), + )?; + rows.into_iter() + .map(|row| row.into_record(self.db.path())) + .collect() + } + + fn mark_published_pending_finalize( + &self, + job_id: &MycDeliveryOutboxJobId, + publish_attempt_count: usize, + ) -> Result<MycDeliveryOutboxRecord, MycError> { + self.update_record(job_id, |record| { + record.mark_published_pending_finalize(publish_attempt_count, now_unix_secs()) + }) + } + + fn mark_failed( + &self, + job_id: &MycDeliveryOutboxJobId, + publish_attempt_count: usize, + error: &str, + ) -> Result<MycDeliveryOutboxRecord, MycError> { + self.update_record(job_id, |record| { + record.mark_failed(publish_attempt_count, error, now_unix_secs()) + }) + } + + fn mark_finalized( + &self, + job_id: &MycDeliveryOutboxJobId, + ) -> Result<MycDeliveryOutboxRecord, MycError> { + self.update_record(job_id, |record| record.mark_finalized(now_unix_secs())) + } +} + +impl MycDeliveryOutboxRow { + fn into_record(self, path: &Path) -> Result<MycDeliveryOutboxRecord, MycError> { + Ok(MycDeliveryOutboxRecord { + job_id: self.job_id.parse()?, + kind: parse_kind(self.kind.as_str())?, + status: parse_status(self.status.as_str())?, + event: parse_json_field(path, self.event_json.as_str(), "event_json")?, + relay_urls: parse_json_field(path, self.relay_urls_json.as_str(), "relay_urls_json")?, + connection_id: self.connection_id.as_deref().map(str::parse).transpose()?, + request_id: self.request_id, + attempt_id: self.attempt_id, + signer_publish_workflow_id: self + .signer_publish_workflow_id + .as_deref() + .map(str::parse) + .transpose()?, + publish_attempt_count: usize_from_i64( + path, + self.publish_attempt_count, + "publish_attempt_count", + )?, + last_error: self.last_error, + created_at_unix: self.created_at_unix, + updated_at_unix: self.updated_at_unix, + published_at_unix: self.published_at_unix, + finalized_at_unix: self.finalized_at_unix, + }) + } +} + +impl MycDeliveryOutboxSqliteDb { + fn open(path: PathBuf) -> Result<Self, MycError> { + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent).map_err(|source| MycError::CreateDir { + path: parent.to_path_buf(), + source, + })?; + } + let executor = + SqliteExecutor::open(path.as_path()).map_err(|source| MycError::DeliveryOutboxSql { + path: path.clone(), + source, + })?; + let db = Self { + path, + executor, + file_backed: true, + }; + db.configure()?; + db.run_migrations()?; + Ok(db) + } + + #[cfg(test)] + fn open_memory() -> Result<Self, MycError> { + let executor = + SqliteExecutor::open_memory().map_err(|source| MycError::DeliveryOutboxSql { + path: PathBuf::from(MYC_DELIVERY_OUTBOX_MEMORY_PATH), + source, + })?; + let db = Self { + path: PathBuf::from(MYC_DELIVERY_OUTBOX_MEMORY_PATH), + executor, + file_backed: false, + }; + db.configure()?; + db.run_migrations()?; + Ok(db) + } + + fn path(&self) -> &Path { + self.path.as_path() + } + + fn executor(&self) -> &SqliteExecutor { + &self.executor + } + + fn configure(&self) -> Result<(), MycError> { + exec_json( + self.path(), + self.executor(), + "PRAGMA foreign_keys = ON", + json!([]), + )?; + if self.file_backed { + exec_json( + self.path(), + self.executor(), + "PRAGMA journal_mode = WAL", + json!([]), + )?; + } + Ok(()) + } + + fn run_migrations(&self) -> Result<(), MycError> { + migrations_run_all_up(self.executor(), MYC_DELIVERY_OUTBOX_MIGRATIONS).map_err(|source| { + MycError::DeliveryOutboxSql { + path: self.path.clone(), + source, + } + }) + } +} + +fn serialize_record_params( + path: &Path, + record: &MycDeliveryOutboxRecord, +) -> Result<Value, MycError> { + Ok(Value::Array(vec![ + Value::from(record.job_id.as_str()), + Value::from(kind_label(record.kind)), + Value::from(status_label(record.status)), + Value::from(serialize_json_field(path, &record.event)?), + Value::from(serialize_json_field(path, &record.relay_urls)?), + record + .connection_id + .as_ref() + .map(|value| Value::from(value.as_str())) + .unwrap_or(Value::Null), + record + .request_id + .clone() + .map(Value::from) + .unwrap_or(Value::Null), + record + .attempt_id + .clone() + .map(Value::from) + .unwrap_or(Value::Null), + record + .signer_publish_workflow_id + .as_ref() + .map(|value| Value::from(value.as_str())) + .unwrap_or(Value::Null), + Value::from(i64::try_from(record.publish_attempt_count).map_err(|_| { + MycError::InvalidOperation( + "delivery outbox publish_attempt_count exceeds sqlite range".to_owned(), + ) + })?), + record + .last_error + .clone() + .map(Value::from) + .unwrap_or(Value::Null), + Value::from(record.created_at_unix), + Value::from(record.updated_at_unix), + record + .published_at_unix + .map(Value::from) + .unwrap_or(Value::Null), + record + .finalized_at_unix + .map(Value::from) + .unwrap_or(Value::Null), + ])) +} + +fn serialize_record_update_params( + path: &Path, + record: &MycDeliveryOutboxRecord, + trailing_job_id: &str, +) -> Result<Value, MycError> { + Ok(Value::Array(vec![ + Value::from(kind_label(record.kind)), + Value::from(status_label(record.status)), + Value::from(serialize_json_field(path, &record.event)?), + Value::from(serialize_json_field(path, &record.relay_urls)?), + record + .connection_id + .as_ref() + .map(|value| Value::from(value.as_str())) + .unwrap_or(Value::Null), + record + .request_id + .clone() + .map(Value::from) + .unwrap_or(Value::Null), + record + .attempt_id + .clone() + .map(Value::from) + .unwrap_or(Value::Null), + record + .signer_publish_workflow_id + .as_ref() + .map(|value| Value::from(value.as_str())) + .unwrap_or(Value::Null), + Value::from(i64::try_from(record.publish_attempt_count).map_err(|_| { + MycError::InvalidOperation( + "delivery outbox publish_attempt_count exceeds sqlite range".to_owned(), + ) + })?), + record + .last_error + .clone() + .map(Value::from) + .unwrap_or(Value::Null), + Value::from(record.created_at_unix), + Value::from(record.updated_at_unix), + record + .published_at_unix + .map(Value::from) + .unwrap_or(Value::Null), + record + .finalized_at_unix + .map(Value::from) + .unwrap_or(Value::Null), + Value::from(trailing_job_id), + ])) +} + +fn exec_json( + path: &Path, + executor: &impl SqlExecutor, + sql: &str, + params: Value, +) -> Result<(), MycError> { + executor + .exec(sql, params.to_string().as_str()) + .map_err(|source| MycError::DeliveryOutboxSql { + path: path.to_path_buf(), + source, + })?; + Ok(()) +} + +fn query_rows<T: DeserializeOwned>( + path: &Path, + executor: &impl SqlExecutor, + sql: &str, + params: Value, +) -> Result<Vec<T>, MycError> { + let raw = executor + .query_raw(sql, params.to_string().as_str()) + .map_err(|source| MycError::DeliveryOutboxSql { + path: path.to_path_buf(), + source, + })?; + serde_json::from_str(&raw).map_err(|source| MycError::DeliveryOutboxSqlDecode { + path: path.to_path_buf(), + source, + }) +} + +fn serialize_json_field(path: &Path, value: &impl serde::Serialize) -> Result<String, MycError> { + serde_json::to_string(value).map_err(|source| MycError::DeliveryOutboxSerialize { + path: path.to_path_buf(), + source, + }) +} + +fn parse_json_field<T: DeserializeOwned>( + path: &Path, + value: &str, + _field: &str, +) -> Result<T, MycError> { + serde_json::from_str(value).map_err(|source| MycError::DeliveryOutboxSqlDecode { + path: path.to_path_buf(), + source, + }) +} + +fn kind_label(kind: MycDeliveryOutboxKind) -> &'static str { + match kind { + MycDeliveryOutboxKind::ListenerResponsePublish => "listener_response_publish", + MycDeliveryOutboxKind::ConnectAcceptPublish => "connect_accept_publish", + MycDeliveryOutboxKind::AuthReplayPublish => "auth_replay_publish", + MycDeliveryOutboxKind::DiscoveryHandlerPublish => "discovery_handler_publish", + } +} + +fn parse_kind(value: &str) -> Result<MycDeliveryOutboxKind, MycError> { + match value { + "listener_response_publish" => Ok(MycDeliveryOutboxKind::ListenerResponsePublish), + "connect_accept_publish" => Ok(MycDeliveryOutboxKind::ConnectAcceptPublish), + "auth_replay_publish" => Ok(MycDeliveryOutboxKind::AuthReplayPublish), + "discovery_handler_publish" => Ok(MycDeliveryOutboxKind::DiscoveryHandlerPublish), + other => Err(MycError::InvalidOperation(format!( + "unknown delivery outbox kind `{other}`" + ))), + } +} + +fn status_label(status: MycDeliveryOutboxStatus) -> &'static str { + match status { + MycDeliveryOutboxStatus::Queued => "queued", + MycDeliveryOutboxStatus::PublishedPendingFinalize => "published_pending_finalize", + MycDeliveryOutboxStatus::Finalized => "finalized", + MycDeliveryOutboxStatus::Failed => "failed", + } +} + +fn parse_status(value: &str) -> Result<MycDeliveryOutboxStatus, MycError> { + match value { + "queued" => Ok(MycDeliveryOutboxStatus::Queued), + "published_pending_finalize" => Ok(MycDeliveryOutboxStatus::PublishedPendingFinalize), + "finalized" => Ok(MycDeliveryOutboxStatus::Finalized), + "failed" => Ok(MycDeliveryOutboxStatus::Failed), + other => Err(MycError::InvalidOperation(format!( + "unknown delivery outbox status `{other}`" + ))), + } +} + +fn usize_from_i64(path: &Path, value: i64, field: &str) -> Result<usize, MycError> { + usize::try_from(value).map_err(|_| { + MycError::InvalidOperation(format!( + "delivery outbox field `{field}` at {} is out of range for usize", + path.display() + )) + }) +} + +#[cfg(test)] +mod tests { + use radroots_identity::RadrootsIdentity; + use radroots_nostr::prelude::{RadrootsNostrEventBuilder, RadrootsNostrKind}; + use radroots_nostr_signer::prelude::{ + RadrootsNostrSignerConnectionId, RadrootsNostrSignerWorkflowId, + }; + + use crate::outbox::{ + MycDeliveryOutboxKind, MycDeliveryOutboxRecord, MycDeliveryOutboxStatus, + MycDeliveryOutboxStore, + }; + + use super::MycSqliteDeliveryOutboxStore; + + fn sample_record() -> MycDeliveryOutboxRecord { + let identity = RadrootsIdentity::from_secret_key_str( + "1111111111111111111111111111111111111111111111111111111111111111", + ) + .expect("identity"); + let event = RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(24133), "hello") + .sign_with_keys(identity.keys()) + .expect("sign event"); + MycDeliveryOutboxRecord::new( + MycDeliveryOutboxKind::AuthReplayPublish, + event, + vec!["wss://relay.example.com".parse().expect("relay")], + ) + .expect("record") + .with_connection_id( + &RadrootsNostrSignerConnectionId::parse("conn-sqlite-outbox").expect("id"), + ) + .with_request_id("req-sqlite-outbox") + .with_attempt_id("attempt-sqlite-outbox") + .with_signer_publish_workflow_id( + &RadrootsNostrSignerWorkflowId::parse("wf-sqlite-outbox").expect("id"), + ) + } + + #[test] + fn sqlite_outbox_store_round_trips_and_updates_status() { + let store = MycSqliteDeliveryOutboxStore::open_memory().expect("open store"); + let record = sample_record(); + + store.enqueue(&record).expect("enqueue"); + assert_eq!( + store.get(&record.job_id).expect("get"), + Some(record.clone()) + ); + assert_eq!(store.list_all().expect("list all"), vec![record.clone()]); + assert_eq!( + store + .list_by_status(MycDeliveryOutboxStatus::Queued) + .expect("list queued"), + vec![record.clone()] + ); + + let published = store + .mark_published_pending_finalize(&record.job_id, 1) + .expect("mark published"); + assert_eq!( + published.status, + MycDeliveryOutboxStatus::PublishedPendingFinalize + ); + assert_eq!(published.publish_attempt_count, 1); + + let failed = store + .mark_failed(&record.job_id, 2, "relay rejected") + .expect("mark failed"); + assert_eq!(failed.status, MycDeliveryOutboxStatus::Failed); + assert_eq!(failed.last_error.as_deref(), Some("relay rejected")); + + let republished = store + .mark_published_pending_finalize(&record.job_id, 3) + .expect("republish"); + assert_eq!( + republished.status, + MycDeliveryOutboxStatus::PublishedPendingFinalize + ); + + let finalized = store + .mark_finalized(&record.job_id) + .expect("mark finalized"); + assert_eq!(finalized.status, MycDeliveryOutboxStatus::Finalized); + assert_eq!( + store + .list_by_status(MycDeliveryOutboxStatus::Finalized) + .expect("list finalized"), + vec![finalized] + ); + } + + #[test] + fn sqlite_outbox_store_reopens_file_backed_state() { + let temp = tempfile::tempdir().expect("tempdir"); + let record = sample_record(); + + let store = MycSqliteDeliveryOutboxStore::open(temp.path()).expect("open store"); + store.enqueue(&record).expect("enqueue"); + + let reopened = MycSqliteDeliveryOutboxStore::open(temp.path()).expect("reopen store"); + assert_eq!( + reopened.get(&record.job_id).expect("get reopened"), + Some(record) + ); + assert!(reopened.path().ends_with("delivery-outbox.sqlite")); + } +} diff --git a/tests/operability_e2e.rs b/tests/operability_e2e.rs @@ -216,7 +216,7 @@ async fn status_reports_sqlite_persistence_schema_state() -> TestResult<()> { .as_ref() .expect("signer sqlite schema") .applied_migration_count, - Some(1) + Some(2) ); assert_eq!( status