myc

Self-custodial remote signer for Radroots apps
git clone https://radroots.dev/git/myc.git
Log | Files | Refs | README | LICENSE

outbox_sqlite.rs (20875B)


      1 use std::path::{Path, PathBuf};
      2 
      3 use radroots_sql_core::migrations::{Migration, migrations_run_all_up};
      4 use radroots_sql_core::{SqlExecutor, SqliteExecutor};
      5 use serde::Deserialize;
      6 use serde::de::DeserializeOwned;
      7 use serde_json::{Value, json};
      8 
      9 use crate::error::MycError;
     10 use crate::outbox::{
     11     MycDeliveryOutboxJobId, MycDeliveryOutboxKind, MycDeliveryOutboxRecord,
     12     MycDeliveryOutboxStatus, MycDeliveryOutboxStore, now_unix_secs,
     13 };
     14 
     15 const MYC_DELIVERY_OUTBOX_SQLITE_FILE_NAME: &str = "delivery-outbox.sqlite";
     16 #[cfg(test)]
     17 const MYC_DELIVERY_OUTBOX_MEMORY_PATH: &str = ":memory:";
     18 
     19 static MYC_DELIVERY_OUTBOX_MIGRATIONS: &[Migration] = &[Migration {
     20     name: "0000_delivery_outbox_init",
     21     up_sql: include_str!("../migrations/0000_delivery_outbox_init.up.sql"),
     22     down_sql: include_str!("../migrations/0000_delivery_outbox_init.down.sql"),
     23 }];
     24 
     25 /// Myc keeps its delivery outbox store local to the service boundary.
     26 pub struct MycSqliteDeliveryOutboxStore {
     27     db: MycDeliveryOutboxSqliteDb,
     28 }
     29 
     30 struct MycDeliveryOutboxSqliteDb {
     31     path: PathBuf,
     32     executor: SqliteExecutor,
     33     file_backed: bool,
     34 }
     35 
     36 #[derive(Debug, Deserialize)]
     37 struct MycDeliveryOutboxRow {
     38     job_id: String,
     39     kind: String,
     40     status: String,
     41     event_json: String,
     42     relay_urls_json: String,
     43     connection_id: Option<String>,
     44     request_id: Option<String>,
     45     attempt_id: Option<String>,
     46     signer_publish_workflow_id: Option<String>,
     47     publish_attempt_count: i64,
     48     last_error: Option<String>,
     49     created_at_unix: u64,
     50     updated_at_unix: u64,
     51     published_at_unix: Option<u64>,
     52     finalized_at_unix: Option<u64>,
     53 }
     54 
     55 impl MycSqliteDeliveryOutboxStore {
     56     pub fn open(state_dir: impl AsRef<Path>) -> Result<Self, MycError> {
     57         let db = MycDeliveryOutboxSqliteDb::open(
     58             state_dir
     59                 .as_ref()
     60                 .join(MYC_DELIVERY_OUTBOX_SQLITE_FILE_NAME),
     61         )?;
     62         Ok(Self { db })
     63     }
     64 
     65     #[cfg(test)]
     66     pub fn open_memory() -> Result<Self, MycError> {
     67         Ok(Self {
     68             db: MycDeliveryOutboxSqliteDb::open_memory()?,
     69         })
     70     }
     71 
     72     pub fn path(&self) -> &Path {
     73         self.db.path()
     74     }
     75 
     76     fn update_record(
     77         &self,
     78         job_id: &MycDeliveryOutboxJobId,
     79         update: impl FnOnce(&mut MycDeliveryOutboxRecord) -> Result<(), MycError>,
     80     ) -> Result<MycDeliveryOutboxRecord, MycError> {
     81         let mut record = self
     82             .get(job_id)?
     83             .ok_or_else(|| MycError::DeliveryOutboxJobNotFound(job_id.to_string()))?;
     84         update(&mut record)?;
     85         exec_json(
     86             self.db.path(),
     87             self.db.executor(),
     88             "UPDATE myc_delivery_outbox SET kind = ?, status = ?, event_json = ?, relay_urls_json = ?, connection_id = ?, request_id = ?, attempt_id = ?, signer_publish_workflow_id = ?, publish_attempt_count = ?, last_error = ?, created_at_unix = ?, updated_at_unix = ?, published_at_unix = ?, finalized_at_unix = ? WHERE job_id = ?",
     89             serialize_record_update_params(self.db.path(), &record, job_id.as_str())?,
     90         )?;
     91         Ok(record)
     92     }
     93 }
     94 
     95 impl MycDeliveryOutboxStore for MycSqliteDeliveryOutboxStore {
     96     fn enqueue(&self, record: &MycDeliveryOutboxRecord) -> Result<(), MycError> {
     97         exec_json(
     98             self.db.path(),
     99             self.db.executor(),
    100             "INSERT INTO myc_delivery_outbox(job_id, kind, status, event_json, relay_urls_json, connection_id, request_id, attempt_id, signer_publish_workflow_id, publish_attempt_count, last_error, created_at_unix, updated_at_unix, published_at_unix, finalized_at_unix) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
    101             serialize_record_params(self.db.path(), record)?,
    102         )
    103     }
    104 
    105     fn get(
    106         &self,
    107         job_id: &MycDeliveryOutboxJobId,
    108     ) -> Result<Option<MycDeliveryOutboxRecord>, MycError> {
    109         let rows: Vec<MycDeliveryOutboxRow> = query_rows(
    110             self.db.path(),
    111             self.db.executor(),
    112             "SELECT job_id, kind, status, event_json, relay_urls_json, connection_id, request_id, attempt_id, signer_publish_workflow_id, publish_attempt_count, last_error, created_at_unix, updated_at_unix, published_at_unix, finalized_at_unix FROM myc_delivery_outbox WHERE job_id = ? LIMIT 1",
    113             json!([job_id.as_str()]),
    114         )?;
    115         rows.into_iter()
    116             .next()
    117             .map(|row| row.into_record(self.db.path()))
    118             .transpose()
    119     }
    120 
    121     fn list_all(&self) -> Result<Vec<MycDeliveryOutboxRecord>, MycError> {
    122         let rows: Vec<MycDeliveryOutboxRow> = query_rows(
    123             self.db.path(),
    124             self.db.executor(),
    125             "SELECT job_id, kind, status, event_json, relay_urls_json, connection_id, request_id, attempt_id, signer_publish_workflow_id, publish_attempt_count, last_error, created_at_unix, updated_at_unix, published_at_unix, finalized_at_unix FROM myc_delivery_outbox ORDER BY created_at_unix ASC, job_id ASC",
    126             json!([]),
    127         )?;
    128         rows.into_iter()
    129             .map(|row| row.into_record(self.db.path()))
    130             .collect()
    131     }
    132 
    133     fn list_by_status(
    134         &self,
    135         status: MycDeliveryOutboxStatus,
    136     ) -> Result<Vec<MycDeliveryOutboxRecord>, MycError> {
    137         let rows: Vec<MycDeliveryOutboxRow> = query_rows(
    138             self.db.path(),
    139             self.db.executor(),
    140             "SELECT job_id, kind, status, event_json, relay_urls_json, connection_id, request_id, attempt_id, signer_publish_workflow_id, publish_attempt_count, last_error, created_at_unix, updated_at_unix, published_at_unix, finalized_at_unix FROM myc_delivery_outbox WHERE status = ? ORDER BY created_at_unix ASC, job_id ASC",
    141             json!([status_label(status)]),
    142         )?;
    143         rows.into_iter()
    144             .map(|row| row.into_record(self.db.path()))
    145             .collect()
    146     }
    147 
    148     fn mark_published_pending_finalize(
    149         &self,
    150         job_id: &MycDeliveryOutboxJobId,
    151         publish_attempt_count: usize,
    152     ) -> Result<MycDeliveryOutboxRecord, MycError> {
    153         self.update_record(job_id, |record| {
    154             record.mark_published_pending_finalize(publish_attempt_count, now_unix_secs())
    155         })
    156     }
    157 
    158     fn mark_failed(
    159         &self,
    160         job_id: &MycDeliveryOutboxJobId,
    161         publish_attempt_count: usize,
    162         error: &str,
    163     ) -> Result<MycDeliveryOutboxRecord, MycError> {
    164         self.update_record(job_id, |record| {
    165             record.mark_failed(publish_attempt_count, error, now_unix_secs())
    166         })
    167     }
    168 
    169     fn mark_finalized(
    170         &self,
    171         job_id: &MycDeliveryOutboxJobId,
    172     ) -> Result<MycDeliveryOutboxRecord, MycError> {
    173         self.update_record(job_id, |record| record.mark_finalized(now_unix_secs()))
    174     }
    175 }
    176 
    177 impl MycDeliveryOutboxRow {
    178     fn into_record(self, path: &Path) -> Result<MycDeliveryOutboxRecord, MycError> {
    179         Ok(MycDeliveryOutboxRecord {
    180             job_id: self.job_id.parse()?,
    181             kind: parse_kind(self.kind.as_str())?,
    182             status: parse_status(self.status.as_str())?,
    183             event: parse_json_field(path, self.event_json.as_str(), "event_json")?,
    184             relay_urls: parse_json_field(path, self.relay_urls_json.as_str(), "relay_urls_json")?,
    185             connection_id: self.connection_id.as_deref().map(str::parse).transpose()?,
    186             request_id: self.request_id,
    187             attempt_id: self.attempt_id,
    188             signer_publish_workflow_id: self
    189                 .signer_publish_workflow_id
    190                 .as_deref()
    191                 .map(str::parse)
    192                 .transpose()?,
    193             publish_attempt_count: usize_from_i64(
    194                 path,
    195                 self.publish_attempt_count,
    196                 "publish_attempt_count",
    197             )?,
    198             last_error: self.last_error,
    199             created_at_unix: self.created_at_unix,
    200             updated_at_unix: self.updated_at_unix,
    201             published_at_unix: self.published_at_unix,
    202             finalized_at_unix: self.finalized_at_unix,
    203         })
    204     }
    205 }
    206 
    207 impl MycDeliveryOutboxSqliteDb {
    208     fn open(path: PathBuf) -> Result<Self, MycError> {
    209         if let Some(parent) = path.parent() {
    210             std::fs::create_dir_all(parent).map_err(|source| MycError::CreateDir {
    211                 path: parent.to_path_buf(),
    212                 source,
    213             })?;
    214         }
    215         let executor =
    216             SqliteExecutor::open(path.as_path()).map_err(|source| MycError::DeliveryOutboxSql {
    217                 path: path.clone(),
    218                 source,
    219             })?;
    220         let db = Self {
    221             path,
    222             executor,
    223             file_backed: true,
    224         };
    225         db.configure()?;
    226         db.run_migrations()?;
    227         Ok(db)
    228     }
    229 
    230     #[cfg(test)]
    231     fn open_memory() -> Result<Self, MycError> {
    232         let executor =
    233             SqliteExecutor::open_memory().map_err(|source| MycError::DeliveryOutboxSql {
    234                 path: PathBuf::from(MYC_DELIVERY_OUTBOX_MEMORY_PATH),
    235                 source,
    236             })?;
    237         let db = Self {
    238             path: PathBuf::from(MYC_DELIVERY_OUTBOX_MEMORY_PATH),
    239             executor,
    240             file_backed: false,
    241         };
    242         db.configure()?;
    243         db.run_migrations()?;
    244         Ok(db)
    245     }
    246 
    247     fn path(&self) -> &Path {
    248         self.path.as_path()
    249     }
    250 
    251     fn executor(&self) -> &SqliteExecutor {
    252         &self.executor
    253     }
    254 
    255     fn configure(&self) -> Result<(), MycError> {
    256         exec_json(
    257             self.path(),
    258             self.executor(),
    259             "PRAGMA foreign_keys = ON",
    260             json!([]),
    261         )?;
    262         if self.file_backed {
    263             exec_json(
    264                 self.path(),
    265                 self.executor(),
    266                 "PRAGMA journal_mode = WAL",
    267                 json!([]),
    268             )?;
    269         }
    270         Ok(())
    271     }
    272 
    273     fn run_migrations(&self) -> Result<(), MycError> {
    274         migrations_run_all_up(self.executor(), MYC_DELIVERY_OUTBOX_MIGRATIONS).map_err(|source| {
    275             MycError::DeliveryOutboxSql {
    276                 path: self.path.clone(),
    277                 source,
    278             }
    279         })
    280     }
    281 }
    282 
    283 fn serialize_record_params(
    284     path: &Path,
    285     record: &MycDeliveryOutboxRecord,
    286 ) -> Result<Value, MycError> {
    287     Ok(Value::Array(vec![
    288         Value::from(record.job_id.as_str()),
    289         Value::from(kind_label(record.kind)),
    290         Value::from(status_label(record.status)),
    291         Value::from(serialize_json_field(path, &record.event)?),
    292         Value::from(serialize_json_field(path, &record.relay_urls)?),
    293         record
    294             .connection_id
    295             .as_ref()
    296             .map(|value| Value::from(value.as_str()))
    297             .unwrap_or(Value::Null),
    298         record
    299             .request_id
    300             .clone()
    301             .map(Value::from)
    302             .unwrap_or(Value::Null),
    303         record
    304             .attempt_id
    305             .clone()
    306             .map(Value::from)
    307             .unwrap_or(Value::Null),
    308         record
    309             .signer_publish_workflow_id
    310             .as_ref()
    311             .map(|value| Value::from(value.as_str()))
    312             .unwrap_or(Value::Null),
    313         Value::from(i64::try_from(record.publish_attempt_count).map_err(|_| {
    314             MycError::InvalidOperation(
    315                 "delivery outbox publish_attempt_count exceeds sqlite range".to_owned(),
    316             )
    317         })?),
    318         record
    319             .last_error
    320             .clone()
    321             .map(Value::from)
    322             .unwrap_or(Value::Null),
    323         Value::from(record.created_at_unix),
    324         Value::from(record.updated_at_unix),
    325         record
    326             .published_at_unix
    327             .map(Value::from)
    328             .unwrap_or(Value::Null),
    329         record
    330             .finalized_at_unix
    331             .map(Value::from)
    332             .unwrap_or(Value::Null),
    333     ]))
    334 }
    335 
    336 fn serialize_record_update_params(
    337     path: &Path,
    338     record: &MycDeliveryOutboxRecord,
    339     trailing_job_id: &str,
    340 ) -> Result<Value, MycError> {
    341     Ok(Value::Array(vec![
    342         Value::from(kind_label(record.kind)),
    343         Value::from(status_label(record.status)),
    344         Value::from(serialize_json_field(path, &record.event)?),
    345         Value::from(serialize_json_field(path, &record.relay_urls)?),
    346         record
    347             .connection_id
    348             .as_ref()
    349             .map(|value| Value::from(value.as_str()))
    350             .unwrap_or(Value::Null),
    351         record
    352             .request_id
    353             .clone()
    354             .map(Value::from)
    355             .unwrap_or(Value::Null),
    356         record
    357             .attempt_id
    358             .clone()
    359             .map(Value::from)
    360             .unwrap_or(Value::Null),
    361         record
    362             .signer_publish_workflow_id
    363             .as_ref()
    364             .map(|value| Value::from(value.as_str()))
    365             .unwrap_or(Value::Null),
    366         Value::from(i64::try_from(record.publish_attempt_count).map_err(|_| {
    367             MycError::InvalidOperation(
    368                 "delivery outbox publish_attempt_count exceeds sqlite range".to_owned(),
    369             )
    370         })?),
    371         record
    372             .last_error
    373             .clone()
    374             .map(Value::from)
    375             .unwrap_or(Value::Null),
    376         Value::from(record.created_at_unix),
    377         Value::from(record.updated_at_unix),
    378         record
    379             .published_at_unix
    380             .map(Value::from)
    381             .unwrap_or(Value::Null),
    382         record
    383             .finalized_at_unix
    384             .map(Value::from)
    385             .unwrap_or(Value::Null),
    386         Value::from(trailing_job_id),
    387     ]))
    388 }
    389 
    390 fn exec_json(
    391     path: &Path,
    392     executor: &impl SqlExecutor,
    393     sql: &str,
    394     params: Value,
    395 ) -> Result<(), MycError> {
    396     executor
    397         .exec(sql, params.to_string().as_str())
    398         .map_err(|source| MycError::DeliveryOutboxSql {
    399             path: path.to_path_buf(),
    400             source,
    401         })?;
    402     Ok(())
    403 }
    404 
    405 fn query_rows<T: DeserializeOwned>(
    406     path: &Path,
    407     executor: &impl SqlExecutor,
    408     sql: &str,
    409     params: Value,
    410 ) -> Result<Vec<T>, MycError> {
    411     let raw = executor
    412         .query_raw(sql, params.to_string().as_str())
    413         .map_err(|source| MycError::DeliveryOutboxSql {
    414             path: path.to_path_buf(),
    415             source,
    416         })?;
    417     serde_json::from_str(&raw).map_err(|source| MycError::DeliveryOutboxSqlDecode {
    418         path: path.to_path_buf(),
    419         source,
    420     })
    421 }
    422 
    423 fn serialize_json_field(path: &Path, value: &impl serde::Serialize) -> Result<String, MycError> {
    424     serde_json::to_string(value).map_err(|source| MycError::DeliveryOutboxSerialize {
    425         path: path.to_path_buf(),
    426         source,
    427     })
    428 }
    429 
    430 fn parse_json_field<T: DeserializeOwned>(
    431     path: &Path,
    432     value: &str,
    433     _field: &str,
    434 ) -> Result<T, MycError> {
    435     serde_json::from_str(value).map_err(|source| MycError::DeliveryOutboxSqlDecode {
    436         path: path.to_path_buf(),
    437         source,
    438     })
    439 }
    440 
    441 fn kind_label(kind: MycDeliveryOutboxKind) -> &'static str {
    442     match kind {
    443         MycDeliveryOutboxKind::ListenerResponsePublish => "listener_response_publish",
    444         MycDeliveryOutboxKind::ConnectAcceptPublish => "connect_accept_publish",
    445         MycDeliveryOutboxKind::AuthReplayPublish => "auth_replay_publish",
    446         MycDeliveryOutboxKind::DiscoveryHandlerPublish => "discovery_handler_publish",
    447     }
    448 }
    449 
    450 fn parse_kind(value: &str) -> Result<MycDeliveryOutboxKind, MycError> {
    451     match value {
    452         "listener_response_publish" => Ok(MycDeliveryOutboxKind::ListenerResponsePublish),
    453         "connect_accept_publish" => Ok(MycDeliveryOutboxKind::ConnectAcceptPublish),
    454         "auth_replay_publish" => Ok(MycDeliveryOutboxKind::AuthReplayPublish),
    455         "discovery_handler_publish" => Ok(MycDeliveryOutboxKind::DiscoveryHandlerPublish),
    456         other => Err(MycError::InvalidOperation(format!(
    457             "unknown delivery outbox kind `{other}`"
    458         ))),
    459     }
    460 }
    461 
    462 fn status_label(status: MycDeliveryOutboxStatus) -> &'static str {
    463     match status {
    464         MycDeliveryOutboxStatus::Queued => "queued",
    465         MycDeliveryOutboxStatus::PublishedPendingFinalize => "published_pending_finalize",
    466         MycDeliveryOutboxStatus::Finalized => "finalized",
    467         MycDeliveryOutboxStatus::Failed => "failed",
    468     }
    469 }
    470 
    471 fn parse_status(value: &str) -> Result<MycDeliveryOutboxStatus, MycError> {
    472     match value {
    473         "queued" => Ok(MycDeliveryOutboxStatus::Queued),
    474         "published_pending_finalize" => Ok(MycDeliveryOutboxStatus::PublishedPendingFinalize),
    475         "finalized" => Ok(MycDeliveryOutboxStatus::Finalized),
    476         "failed" => Ok(MycDeliveryOutboxStatus::Failed),
    477         other => Err(MycError::InvalidOperation(format!(
    478             "unknown delivery outbox status `{other}`"
    479         ))),
    480     }
    481 }
    482 
    483 fn usize_from_i64(path: &Path, value: i64, field: &str) -> Result<usize, MycError> {
    484     usize::try_from(value).map_err(|_| {
    485         MycError::InvalidOperation(format!(
    486             "delivery outbox field `{field}` at {} is out of range for usize",
    487             path.display()
    488         ))
    489     })
    490 }
    491 
    492 #[cfg(test)]
    493 mod tests {
    494     use radroots_identity::RadrootsIdentity;
    495     use radroots_nostr::prelude::{RadrootsNostrEventBuilder, RadrootsNostrKind};
    496     use radroots_nostr_signer::prelude::{
    497         RadrootsNostrSignerConnectionId, RadrootsNostrSignerWorkflowId,
    498     };
    499 
    500     use crate::outbox::{
    501         MycDeliveryOutboxKind, MycDeliveryOutboxRecord, MycDeliveryOutboxStatus,
    502         MycDeliveryOutboxStore,
    503     };
    504 
    505     use super::MycSqliteDeliveryOutboxStore;
    506 
    507     fn sample_record() -> MycDeliveryOutboxRecord {
    508         let identity = RadrootsIdentity::from_secret_key_str(
    509             "1111111111111111111111111111111111111111111111111111111111111111",
    510         )
    511         .expect("identity");
    512         let event = RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(24133), "hello")
    513             .sign_with_keys(identity.keys())
    514             .expect("sign event");
    515         MycDeliveryOutboxRecord::new(
    516             MycDeliveryOutboxKind::AuthReplayPublish,
    517             event,
    518             vec!["wss://relay.example.com".parse().expect("relay")],
    519         )
    520         .expect("record")
    521         .with_connection_id(
    522             &RadrootsNostrSignerConnectionId::parse("conn-sqlite-outbox").expect("id"),
    523         )
    524         .with_request_id("req-sqlite-outbox")
    525         .with_attempt_id("attempt-sqlite-outbox")
    526         .with_signer_publish_workflow_id(
    527             &RadrootsNostrSignerWorkflowId::parse("wf-sqlite-outbox").expect("id"),
    528         )
    529     }
    530 
    531     #[test]
    532     fn sqlite_outbox_store_round_trips_and_updates_status() {
    533         let store = MycSqliteDeliveryOutboxStore::open_memory().expect("open store");
    534         let record = sample_record();
    535 
    536         store.enqueue(&record).expect("enqueue");
    537         assert_eq!(
    538             store.get(&record.job_id).expect("get"),
    539             Some(record.clone())
    540         );
    541         assert_eq!(store.list_all().expect("list all"), vec![record.clone()]);
    542         assert_eq!(
    543             store
    544                 .list_by_status(MycDeliveryOutboxStatus::Queued)
    545                 .expect("list queued"),
    546             vec![record.clone()]
    547         );
    548 
    549         let published = store
    550             .mark_published_pending_finalize(&record.job_id, 1)
    551             .expect("mark published");
    552         assert_eq!(
    553             published.status,
    554             MycDeliveryOutboxStatus::PublishedPendingFinalize
    555         );
    556         assert_eq!(published.publish_attempt_count, 1);
    557 
    558         let failed = store
    559             .mark_failed(&record.job_id, 2, "relay rejected")
    560             .expect("mark failed");
    561         assert_eq!(failed.status, MycDeliveryOutboxStatus::Failed);
    562         assert_eq!(failed.last_error.as_deref(), Some("relay rejected"));
    563 
    564         let republished = store
    565             .mark_published_pending_finalize(&record.job_id, 3)
    566             .expect("republish");
    567         assert_eq!(
    568             republished.status,
    569             MycDeliveryOutboxStatus::PublishedPendingFinalize
    570         );
    571 
    572         let finalized = store
    573             .mark_finalized(&record.job_id)
    574             .expect("mark finalized");
    575         assert_eq!(finalized.status, MycDeliveryOutboxStatus::Finalized);
    576         assert_eq!(
    577             store
    578                 .list_by_status(MycDeliveryOutboxStatus::Finalized)
    579                 .expect("list finalized"),
    580             vec![finalized]
    581         );
    582     }
    583 
    584     #[test]
    585     fn sqlite_outbox_store_reopens_file_backed_state() {
    586         let temp = tempfile::tempdir().expect("tempdir");
    587         let record = sample_record();
    588 
    589         let store = MycSqliteDeliveryOutboxStore::open(temp.path()).expect("open store");
    590         store.enqueue(&record).expect("enqueue");
    591 
    592         let reopened = MycSqliteDeliveryOutboxStore::open(temp.path()).expect("reopen store");
    593         assert_eq!(
    594             reopened.get(&record.job_id).expect("get reopened"),
    595             Some(record)
    596         );
    597         assert!(reopened.path().ends_with("delivery-outbox.sqlite"));
    598     }
    599 }