lib

Core libraries for Radroots
git clone https://radroots.dev/git/lib.git
Log | Files | Refs | README | LICENSE

store.rs (34416B)


      1 #![forbid(unsafe_code)]
      2 
      3 use radroots_sql_core::SqlExecutor;
      4 use radroots_sql_core::error::SqlError;
      5 use serde::Deserialize;
      6 use serde_json::{Value, json};
      7 
      8 use crate::migrations;
      9 use crate::models::validate_non_empty;
     10 use crate::{
     11     LocalEventRecord, LocalEventRecordInput, LocalEventRecordUpdate, LocalEventsCursor,
     12     LocalEventsError, LocalRecordFamily, LocalRecordStatus, PublishOutboxStatus, SourceRuntime,
     13 };
     14 
     15 pub struct LocalEventsStore<E: SqlExecutor> {
     16     executor: E,
     17 }
     18 
     19 impl<E: SqlExecutor> LocalEventsStore<E> {
     20     pub fn new(executor: E) -> Self {
     21         Self { executor }
     22     }
     23 
     24     pub fn executor(&self) -> &E {
     25         &self.executor
     26     }
     27 
     28     pub fn migrate_up(&self) -> Result<(), SqlError> {
     29         migrations::run_all_up(self.executor())
     30     }
     31 
     32     pub fn migrate_down(&self) -> Result<(), SqlError> {
     33         migrations::run_all_down(self.executor())
     34     }
     35 
     36     pub fn append_record(
     37         &self,
     38         input: &LocalEventRecordInput,
     39     ) -> Result<LocalEventRecord, LocalEventsError> {
     40         input.validate()?;
     41         self.executor.begin()?;
     42         let result = (|| -> Result<(), LocalEventsError> {
     43             let change_seq = self.next_change_seq()?;
     44             let params = json!([
     45                 change_seq,
     46                 input.record_id,
     47                 input.family.as_str(),
     48                 input.status.as_str(),
     49                 input.source_runtime.as_str(),
     50                 input.created_at_ms,
     51                 input.inserted_at_ms,
     52                 input.inserted_at_ms,
     53                 input.owner_account_id,
     54                 input.owner_pubkey,
     55                 input.farm_id,
     56                 input.listing_addr,
     57                 encode_json(input.local_work_json.as_ref()),
     58                 input.event_id,
     59                 input.event_kind,
     60                 input.event_pubkey,
     61                 input.event_created_at,
     62                 encode_json(input.event_tags_json.as_ref()),
     63                 input.event_content,
     64                 input.event_sig,
     65                 encode_json(input.raw_event_json.as_ref()),
     66                 input.outbox_status.as_str(),
     67                 input.relay_set_fingerprint,
     68                 encode_json(input.relay_delivery_json.as_ref())
     69             ])
     70             .to_string();
     71             let sql = "insert or ignore into local_event_record(
     72                 change_seq,
     73                 record_id,
     74                 family,
     75                 status,
     76                 source_runtime,
     77                 created_at_ms,
     78                 inserted_at_ms,
     79                 updated_at_ms,
     80                 owner_account_id,
     81                 owner_pubkey,
     82                 farm_id,
     83                 listing_addr,
     84                 local_work_json,
     85                 event_id,
     86                 event_kind,
     87                 event_pubkey,
     88                 event_created_at,
     89                 event_tags_json,
     90                 event_content,
     91                 event_sig,
     92                 raw_event_json,
     93                 outbox_status,
     94                 relay_set_fingerprint,
     95                 relay_delivery_json
     96             ) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
     97             let _ = self.executor.exec(sql, &params)?;
     98             Ok(())
     99         })();
    100         match result {
    101             Ok(()) => self.executor.commit()?,
    102             Err(err) => {
    103                 let _ = self.executor.rollback();
    104                 return Err(err);
    105             }
    106         }
    107         self.get_record(&input.record_id)?
    108             .ok_or_else(|| LocalEventsError::InvalidRecord("record append failed".to_owned()))
    109     }
    110 
    111     pub fn get_record(
    112         &self,
    113         record_id: &str,
    114     ) -> Result<Option<LocalEventRecord>, LocalEventsError> {
    115         validate_non_empty("record_id", record_id)?;
    116         let params = json!([record_id]).to_string();
    117         let rows = self.query_records(
    118             "select * from local_event_record where record_id = ? limit 1",
    119             &params,
    120         )?;
    121         Ok(rows.into_iter().next())
    122     }
    123 
    124     pub fn list_records_after_seq(
    125         &self,
    126         after_seq: i64,
    127         limit: u32,
    128     ) -> Result<Vec<LocalEventRecord>, LocalEventsError> {
    129         let params = json!([after_seq, i64::from(limit)]).to_string();
    130         self.query_records(
    131             "select * from local_event_record where seq > ? order by seq asc limit ?",
    132             &params,
    133         )
    134     }
    135 
    136     pub fn list_records_changed_after(
    137         &self,
    138         after_change_seq: i64,
    139         limit: u32,
    140     ) -> Result<Vec<LocalEventRecord>, LocalEventsError> {
    141         let params = json!([after_change_seq, i64::from(limit)]).to_string();
    142         self.query_records(
    143             "select * from local_event_record where change_seq > ? order by change_seq asc, seq asc limit ?",
    144             &params,
    145         )
    146     }
    147 
    148     pub fn list_records_changed_latest(
    149         &self,
    150         limit: u32,
    151     ) -> Result<Vec<LocalEventRecord>, LocalEventsError> {
    152         let params = json!([i64::from(limit)]).to_string();
    153         self.query_records(
    154             "select * from local_event_record order by change_seq desc, seq desc, record_id asc limit ?",
    155             &params,
    156         )
    157     }
    158 
    159     pub fn list_records_changed_before(
    160         &self,
    161         before_change_seq: i64,
    162         before_seq: i64,
    163         limit: u32,
    164     ) -> Result<Vec<LocalEventRecord>, LocalEventsError> {
    165         let params = json!([
    166             before_change_seq,
    167             before_change_seq,
    168             before_seq,
    169             i64::from(limit)
    170         ])
    171         .to_string();
    172         self.query_records(
    173             "select * from local_event_record
    174              where change_seq < ? or (change_seq = ? and seq < ?)
    175              order by change_seq desc, seq desc, record_id asc
    176              limit ?",
    177             &params,
    178         )
    179     }
    180 
    181     pub fn update_outbox(
    182         &self,
    183         update: &LocalEventRecordUpdate,
    184     ) -> Result<LocalEventRecord, LocalEventsError> {
    185         validate_non_empty("record_id", &update.record_id)?;
    186         self.executor.begin()?;
    187         let result = (|| -> Result<i64, LocalEventsError> {
    188             let change_seq = self.next_change_seq()?;
    189             let params = json!([
    190                 change_seq,
    191                 update.status.as_str(),
    192                 update.outbox_status.as_str(),
    193                 update.relay_set_fingerprint,
    194                 encode_json(update.relay_delivery_json.as_ref()),
    195                 update.updated_at_ms,
    196                 update.record_id
    197             ])
    198             .to_string();
    199             let outcome = self.executor.exec(
    200                 "update local_event_record
    201                  set change_seq = ?,
    202                      status = ?,
    203                      outbox_status = ?,
    204                      relay_set_fingerprint = ?,
    205                      relay_delivery_json = ?,
    206                      updated_at_ms = ?
    207                  where record_id = ?",
    208                 &params,
    209             )?;
    210             Ok(outcome.changes)
    211         })();
    212         let changes = match result {
    213             Ok(changes) => {
    214                 self.executor.commit()?;
    215                 changes
    216             }
    217             Err(err) => {
    218                 let _ = self.executor.rollback();
    219                 return Err(err);
    220             }
    221         };
    222         if changes == 0 {
    223             return Err(LocalEventsError::Sql(SqlError::NotFound(
    224                 update.record_id.clone(),
    225             )));
    226         }
    227         self.get_record(&update.record_id)?
    228             .ok_or_else(|| LocalEventsError::Sql(SqlError::NotFound(update.record_id.clone())))
    229     }
    230 
    231     pub fn get_cursor(
    232         &self,
    233         consumer_id: &str,
    234     ) -> Result<Option<LocalEventsCursor>, LocalEventsError> {
    235         validate_non_empty("consumer_id", consumer_id)?;
    236         let params = json!([consumer_id]).to_string();
    237         let raw = self.executor.query_raw(
    238             "select consumer_id, last_change_seq, updated_at_ms from local_event_projection_cursor where consumer_id = ? limit 1",
    239             &params,
    240         )?;
    241         let rows: Vec<CursorRow> = serde_json::from_str(&raw)?;
    242         Ok(rows.into_iter().next().map(Into::into))
    243     }
    244 
    245     pub fn advance_cursor(
    246         &self,
    247         consumer_id: &str,
    248         last_change_seq: i64,
    249         updated_at_ms: i64,
    250     ) -> Result<LocalEventsCursor, LocalEventsError> {
    251         validate_non_empty("consumer_id", consumer_id)?;
    252         let params = json!([consumer_id, last_change_seq, updated_at_ms]).to_string();
    253         self.executor.exec(
    254             "insert into local_event_projection_cursor(consumer_id, last_change_seq, updated_at_ms)
    255              values(?,?,?)
    256              on conflict(consumer_id) do update set
    257                  last_change_seq = max(local_event_projection_cursor.last_change_seq, excluded.last_change_seq),
    258                  updated_at_ms = excluded.updated_at_ms",
    259             &params,
    260         )?;
    261         self.get_cursor(consumer_id)?
    262             .ok_or_else(|| LocalEventsError::InvalidRecord("cursor advance failed".to_owned()))
    263     }
    264 
    265     fn query_records(
    266         &self,
    267         sql: &str,
    268         params: &str,
    269     ) -> Result<Vec<LocalEventRecord>, LocalEventsError> {
    270         let raw = self.executor.query_raw(sql, params)?;
    271         let rows: Vec<RecordRow> = serde_json::from_str(&raw)?;
    272         rows.into_iter().map(TryInto::try_into).collect()
    273     }
    274 
    275     fn next_change_seq(&self) -> Result<i64, LocalEventsError> {
    276         let raw = self.executor.query_raw(
    277             "select coalesce(max(change_seq), 0) + 1 as change_seq from local_event_record",
    278             "[]",
    279         )?;
    280         let rows: Vec<ChangeSeqRow> = serde_json::from_str(&raw)?;
    281         rows.into_iter()
    282             .next()
    283             .map(|row| row.change_seq)
    284             .ok_or_else(|| {
    285                 LocalEventsError::InvalidRecord("change sequence unavailable".to_owned())
    286             })
    287     }
    288 }
    289 
    290 #[derive(Debug, Deserialize)]
    291 struct RecordRow {
    292     seq: i64,
    293     change_seq: i64,
    294     record_id: String,
    295     family: String,
    296     status: String,
    297     source_runtime: String,
    298     created_at_ms: i64,
    299     inserted_at_ms: i64,
    300     updated_at_ms: i64,
    301     owner_account_id: Option<String>,
    302     owner_pubkey: Option<String>,
    303     farm_id: Option<String>,
    304     listing_addr: Option<String>,
    305     local_work_json: Option<String>,
    306     event_id: Option<String>,
    307     event_kind: Option<i64>,
    308     event_pubkey: Option<String>,
    309     event_created_at: Option<i64>,
    310     event_tags_json: Option<String>,
    311     event_content: Option<String>,
    312     event_sig: Option<String>,
    313     raw_event_json: Option<String>,
    314     outbox_status: String,
    315     relay_set_fingerprint: Option<String>,
    316     relay_delivery_json: Option<String>,
    317 }
    318 
    319 impl TryFrom<RecordRow> for LocalEventRecord {
    320     type Error = LocalEventsError;
    321 
    322     fn try_from(row: RecordRow) -> Result<Self, Self::Error> {
    323         Ok(Self {
    324             seq: row.seq,
    325             change_seq: row.change_seq,
    326             record_id: row.record_id,
    327             family: LocalRecordFamily::parse(&row.family)?,
    328             status: LocalRecordStatus::parse(&row.status)?,
    329             source_runtime: SourceRuntime::parse(&row.source_runtime)?,
    330             created_at_ms: row.created_at_ms,
    331             inserted_at_ms: row.inserted_at_ms,
    332             updated_at_ms: row.updated_at_ms,
    333             owner_account_id: row.owner_account_id,
    334             owner_pubkey: row.owner_pubkey,
    335             farm_id: row.farm_id,
    336             listing_addr: row.listing_addr,
    337             local_work_json: decode_json(row.local_work_json)?,
    338             event_id: row.event_id,
    339             event_kind: row.event_kind,
    340             event_pubkey: row.event_pubkey,
    341             event_created_at: row.event_created_at,
    342             event_tags_json: decode_json(row.event_tags_json)?,
    343             event_content: row.event_content,
    344             event_sig: row.event_sig,
    345             raw_event_json: decode_json(row.raw_event_json)?,
    346             outbox_status: PublishOutboxStatus::parse(&row.outbox_status)?,
    347             relay_set_fingerprint: row.relay_set_fingerprint,
    348             relay_delivery_json: decode_json(row.relay_delivery_json)?,
    349         })
    350     }
    351 }
    352 
    353 #[derive(Debug, Deserialize)]
    354 struct CursorRow {
    355     consumer_id: String,
    356     last_change_seq: i64,
    357     updated_at_ms: i64,
    358 }
    359 
    360 impl From<CursorRow> for LocalEventsCursor {
    361     fn from(row: CursorRow) -> Self {
    362         Self {
    363             consumer_id: row.consumer_id,
    364             last_change_seq: row.last_change_seq,
    365             updated_at_ms: row.updated_at_ms,
    366         }
    367     }
    368 }
    369 
    370 #[derive(Debug, Deserialize)]
    371 struct ChangeSeqRow {
    372     change_seq: i64,
    373 }
    374 
    375 fn encode_json(value: Option<&Value>) -> Option<String> {
    376     value.map(Value::to_string)
    377 }
    378 
    379 fn decode_json(value: Option<String>) -> Result<Option<Value>, LocalEventsError> {
    380     value
    381         .map(|value| serde_json::from_str(&value))
    382         .transpose()
    383         .map_err(Into::into)
    384 }
    385 
    386 #[cfg(test)]
    387 mod tests {
    388     use std::collections::VecDeque;
    389     use std::sync::Mutex;
    390     use std::sync::atomic::{AtomicUsize, Ordering};
    391 
    392     use radroots_sql_core::{ExecOutcome, SqlExecutor, SqliteExecutor};
    393     use serde_json::json;
    394 
    395     use super::*;
    396 
    397     fn store() -> LocalEventsStore<SqliteExecutor> {
    398         let executor = SqliteExecutor::open_memory().expect("open memory sqlite");
    399         let store = LocalEventsStore::new(executor);
    400         store.migrate_up().expect("migrate up");
    401         store
    402     }
    403 
    404     fn local_work(record_id: &str) -> LocalEventRecordInput {
    405         LocalEventRecordInput {
    406             record_id: record_id.to_owned(),
    407             family: LocalRecordFamily::LocalWork,
    408             status: LocalRecordStatus::LocalSaved,
    409             source_runtime: SourceRuntime::Cli,
    410             created_at_ms: 1000,
    411             inserted_at_ms: 1001,
    412             owner_account_id: Some("seller-account".to_owned()),
    413             owner_pubkey: Some("seller-pubkey".to_owned()),
    414             farm_id: Some("farm-a".to_owned()),
    415             listing_addr: Some("listing-a".to_owned()),
    416             local_work_json: Some(json!({"kind":"listing","title":"Eggs"})),
    417             event_id: None,
    418             event_kind: None,
    419             event_pubkey: None,
    420             event_created_at: None,
    421             event_tags_json: None,
    422             event_content: None,
    423             event_sig: None,
    424             raw_event_json: None,
    425             outbox_status: PublishOutboxStatus::None,
    426             relay_set_fingerprint: None,
    427             relay_delivery_json: None,
    428         }
    429     }
    430 
    431     fn signed_event(record_id: &str) -> LocalEventRecordInput {
    432         LocalEventRecordInput {
    433             record_id: record_id.to_owned(),
    434             family: LocalRecordFamily::SignedEvent,
    435             status: LocalRecordStatus::PendingPublish,
    436             source_runtime: SourceRuntime::Cli,
    437             created_at_ms: 2000,
    438             inserted_at_ms: 2001,
    439             owner_account_id: Some("seller-account".to_owned()),
    440             owner_pubkey: Some("seller-pubkey".to_owned()),
    441             farm_id: Some("farm-a".to_owned()),
    442             listing_addr: Some("listing-a".to_owned()),
    443             local_work_json: None,
    444             event_id: Some(record_id.to_owned()),
    445             event_kind: Some(3421),
    446             event_pubkey: Some("seller-pubkey".to_owned()),
    447             event_created_at: Some(2000),
    448             event_tags_json: Some(json!([["d", "listing-a"]])),
    449             event_content: Some("{\"title\":\"Eggs\"}".to_owned()),
    450             event_sig: Some("sig-a".to_owned()),
    451             raw_event_json: Some(json!({"id":record_id,"kind":3421})),
    452             outbox_status: PublishOutboxStatus::Pending,
    453             relay_set_fingerprint: Some("relay-set-a".to_owned()),
    454             relay_delivery_json: Some(json!({
    455                 "state": "pending",
    456                 "target_relays": ["ws://127.0.0.1:8080"],
    457                 "connected_relays": [],
    458                 "acknowledged_relays": [],
    459                 "failed_relays": []
    460             })),
    461         }
    462     }
    463 
    464     #[derive(Debug)]
    465     struct ScriptedExecutor {
    466         begin_result: Mutex<Result<(), SqlError>>,
    467         commit_result: Mutex<Result<(), SqlError>>,
    468         exec_results: Mutex<VecDeque<Result<ExecOutcome, SqlError>>>,
    469         query_results: Mutex<VecDeque<Result<String, SqlError>>>,
    470         rollbacks: AtomicUsize,
    471     }
    472 
    473     impl ScriptedExecutor {
    474         fn new(
    475             exec_results: Vec<Result<ExecOutcome, SqlError>>,
    476             query_results: Vec<Result<String, SqlError>>,
    477         ) -> Self {
    478             Self {
    479                 begin_result: Mutex::new(Ok(())),
    480                 commit_result: Mutex::new(Ok(())),
    481                 exec_results: Mutex::new(exec_results.into()),
    482                 query_results: Mutex::new(query_results.into()),
    483                 rollbacks: AtomicUsize::new(0),
    484             }
    485         }
    486 
    487         fn with_begin_error(error: SqlError) -> Self {
    488             let executor = Self::new(Vec::new(), Vec::new());
    489             *executor.begin_result.lock().expect("begin result") = Err(error);
    490             executor
    491         }
    492 
    493         fn with_commit_error(error: SqlError) -> Self {
    494             let executor = Self::new(
    495                 vec![Ok(ExecOutcome {
    496                     changes: 1,
    497                     last_insert_id: 0,
    498                 })],
    499                 vec![Ok(r#"[{"change_seq":1}]"#.to_owned())],
    500             );
    501             *executor.commit_result.lock().expect("commit result") = Err(error);
    502             executor
    503         }
    504     }
    505 
    506     impl SqlExecutor for ScriptedExecutor {
    507         fn exec(&self, _sql: &str, _params_json: &str) -> Result<ExecOutcome, SqlError> {
    508             self.exec_results
    509                 .lock()
    510                 .expect("exec results")
    511                 .pop_front()
    512                 .unwrap_or(Ok(ExecOutcome {
    513                     changes: 1,
    514                     last_insert_id: 0,
    515                 }))
    516         }
    517 
    518         fn query_raw(&self, _sql: &str, _params_json: &str) -> Result<String, SqlError> {
    519             self.query_results
    520                 .lock()
    521                 .expect("query results")
    522                 .pop_front()
    523                 .unwrap_or_else(|| Ok("[]".to_owned()))
    524         }
    525 
    526         fn begin(&self) -> Result<(), SqlError> {
    527             self.begin_result.lock().expect("begin result").clone()
    528         }
    529 
    530         fn commit(&self) -> Result<(), SqlError> {
    531             self.commit_result.lock().expect("commit result").clone()
    532         }
    533 
    534         fn rollback(&self) -> Result<(), SqlError> {
    535             self.rollbacks.fetch_add(1, Ordering::SeqCst);
    536             Ok(())
    537         }
    538     }
    539 
    540     fn record_row_with(field: &str, value: serde_json::Value) -> String {
    541         let mut row = json!({
    542             "seq": 1,
    543             "change_seq": 1,
    544             "record_id": "record-a",
    545             "family": "signed_event",
    546             "status": "pending_publish",
    547             "source_runtime": "cli",
    548             "created_at_ms": 1000,
    549             "inserted_at_ms": 1001,
    550             "updated_at_ms": 1001,
    551             "owner_account_id": "seller-account",
    552             "owner_pubkey": "seller-pubkey",
    553             "farm_id": "farm-a",
    554             "listing_addr": "listing-a",
    555             "local_work_json": null,
    556             "event_id": "event-a",
    557             "event_kind": 3421,
    558             "event_pubkey": "seller-pubkey",
    559             "event_created_at": 1000,
    560             "event_tags_json": "[[\"d\",\"listing-a\"]]",
    561             "event_content": "{}",
    562             "event_sig": "sig-a",
    563             "raw_event_json": "{\"id\":\"event-a\",\"kind\":3421}",
    564             "outbox_status": "pending",
    565             "relay_set_fingerprint": "relay-set-a",
    566             "relay_delivery_json": "{\"state\":\"pending\",\"target_relays\":[\"ws://127.0.0.1:8080\"],\"connected_relays\":[],\"acknowledged_relays\":[],\"failed_relays\":[]}"
    567         });
    568         row[field] = value;
    569         json!([row]).to_string()
    570     }
    571 
    572     #[test]
    573     fn store_methods_round_trip_records_and_cursors() {
    574         let store = store();
    575 
    576         assert!(
    577             store
    578                 .executor()
    579                 .query_raw("select 1 as value", "[]")
    580                 .is_ok()
    581         );
    582         assert!(store.get_record("missing").expect("get missing").is_none());
    583         assert!(store.get_cursor("app").expect("cursor missing").is_none());
    584 
    585         let local = store
    586             .append_record(&local_work("local-a"))
    587             .expect("append local work");
    588         let event = store
    589             .append_record(&signed_event("event-a"))
    590             .expect("append signed event");
    591 
    592         assert_eq!(
    593             store
    594                 .get_record("local-a")
    595                 .expect("get local")
    596                 .expect("local record")
    597                 .record_id,
    598             local.record_id
    599         );
    600         assert_eq!(
    601             store
    602                 .list_records_after_seq(0, 10)
    603                 .expect("list after seq")
    604                 .len(),
    605             2
    606         );
    607         assert_eq!(
    608             store
    609                 .list_records_changed_after(local.change_seq, 10)
    610                 .expect("list changed after")[0]
    611                 .record_id,
    612             event.record_id
    613         );
    614         assert_eq!(
    615             store.list_records_changed_latest(1).expect("list latest")[0].record_id,
    616             event.record_id
    617         );
    618         assert_eq!(
    619             store
    620                 .list_records_changed_before(event.change_seq, event.seq, 10)
    621                 .expect("list before")[0]
    622                 .record_id,
    623             local.record_id
    624         );
    625 
    626         let cursor = store
    627             .advance_cursor("app", event.change_seq, 3000)
    628             .expect("advance cursor");
    629         assert_eq!(cursor.consumer_id, "app");
    630         assert_eq!(
    631             store
    632                 .get_cursor("app")
    633                 .expect("get cursor")
    634                 .expect("cursor")
    635                 .last_change_seq,
    636             event.change_seq
    637         );
    638 
    639         let updated = store
    640             .update_outbox(&LocalEventRecordUpdate {
    641                 record_id: "event-a".to_owned(),
    642                 status: LocalRecordStatus::Published,
    643                 outbox_status: PublishOutboxStatus::Acknowledged,
    644                 relay_set_fingerprint: Some("relay-set-a".to_owned()),
    645                 relay_delivery_json: Some(json!({
    646                     "state": "acknowledged",
    647                     "target_relays": ["ws://127.0.0.1:8080"],
    648                     "connected_relays": ["ws://127.0.0.1:8080"],
    649                     "acknowledged_relays": ["ws://127.0.0.1:8080"],
    650                     "failed_relays": []
    651                 })),
    652                 updated_at_ms: 4000,
    653             })
    654             .expect("update outbox");
    655 
    656         assert_eq!(updated.status, LocalRecordStatus::Published);
    657         assert_eq!(updated.outbox_status, PublishOutboxStatus::Acknowledged);
    658         store.migrate_down().expect("migrate down");
    659     }
    660 
    661     #[test]
    662     fn store_reports_missing_updates_and_decode_errors() {
    663         let store = store();
    664         assert!(
    665             store
    666                 .get_record(" ")
    667                 .expect_err("empty record id")
    668                 .to_string()
    669                 .contains("record_id")
    670         );
    671         assert!(
    672             store
    673                 .get_cursor(" ")
    674                 .expect_err("empty consumer id")
    675                 .to_string()
    676                 .contains("consumer_id")
    677         );
    678         assert!(
    679             store
    680                 .advance_cursor(" ", 1, 1000)
    681                 .expect_err("empty cursor consumer")
    682                 .to_string()
    683                 .contains("consumer_id")
    684         );
    685         assert!(
    686             store
    687                 .update_outbox(&LocalEventRecordUpdate {
    688                     record_id: " ".to_owned(),
    689                     status: LocalRecordStatus::Published,
    690                     outbox_status: PublishOutboxStatus::Acknowledged,
    691                     relay_set_fingerprint: None,
    692                     relay_delivery_json: None,
    693                     updated_at_ms: 4000,
    694                 })
    695                 .expect_err("empty update record id")
    696                 .to_string()
    697                 .contains("record_id")
    698         );
    699 
    700         let missing_update = store
    701             .update_outbox(&LocalEventRecordUpdate {
    702                 record_id: "missing-event".to_owned(),
    703                 status: LocalRecordStatus::Published,
    704                 outbox_status: PublishOutboxStatus::Acknowledged,
    705                 relay_set_fingerprint: None,
    706                 relay_delivery_json: None,
    707                 updated_at_ms: 4000,
    708             })
    709             .expect_err("missing record update");
    710 
    711         assert!(missing_update.to_string().contains("missing-event"));
    712 
    713         store
    714             .append_record(&local_work("local-a"))
    715             .expect("append local");
    716         let params = json!(["{", "local-a"]).to_string();
    717         store
    718             .executor()
    719             .exec(
    720                 "update local_event_record set local_work_json = ? where record_id = ?",
    721                 &params,
    722             )
    723             .expect("corrupt local work json");
    724         let decode_error = store.get_record("local-a").expect_err("decode error");
    725 
    726         assert!(decode_error.to_string().contains("EOF"));
    727     }
    728 
    729     #[test]
    730     fn store_rolls_back_when_change_sequence_is_unavailable() {
    731         let append_store =
    732             LocalEventsStore::new(ScriptedExecutor::new(Vec::new(), vec![Ok("[]".to_owned())]));
    733         let append_error = append_store
    734             .append_record(&local_work("local-a"))
    735             .expect_err("append error");
    736 
    737         assert!(append_error.to_string().contains("change sequence"));
    738         assert_eq!(append_store.executor().rollbacks.load(Ordering::SeqCst), 1);
    739 
    740         let update_store =
    741             LocalEventsStore::new(ScriptedExecutor::new(Vec::new(), vec![Ok("[]".to_owned())]));
    742         let update_error = update_store
    743             .update_outbox(&LocalEventRecordUpdate {
    744                 record_id: "event-a".to_owned(),
    745                 status: LocalRecordStatus::Published,
    746                 outbox_status: PublishOutboxStatus::Acknowledged,
    747                 relay_set_fingerprint: None,
    748                 relay_delivery_json: None,
    749                 updated_at_ms: 4000,
    750             })
    751             .expect_err("update error");
    752 
    753         assert!(update_error.to_string().contains("change sequence"));
    754         assert_eq!(update_store.executor().rollbacks.load(Ordering::SeqCst), 1);
    755     }
    756 
    757     #[test]
    758     fn store_reports_cursor_advance_without_returned_cursor() {
    759         let store = LocalEventsStore::new(ScriptedExecutor::new(Vec::new(), Vec::new()));
    760 
    761         assert!(store.get_cursor("app").expect("missing cursor").is_none());
    762         let cursor_error = store
    763             .advance_cursor("app", 1, 1000)
    764             .expect_err("cursor advance error");
    765 
    766         assert!(cursor_error.to_string().contains("cursor advance failed"));
    767     }
    768 
    769     #[test]
    770     fn store_reports_executor_and_decode_failures() {
    771         let begin_store = LocalEventsStore::new(ScriptedExecutor::with_begin_error(
    772             SqlError::InvalidQuery("begin failed".to_owned()),
    773         ));
    774         assert!(
    775             begin_store
    776                 .append_record(&local_work("local-a"))
    777                 .expect_err("begin failure")
    778                 .to_string()
    779                 .contains("begin failed")
    780         );
    781 
    782         let exec_store = LocalEventsStore::new(ScriptedExecutor::new(
    783             vec![Err(SqlError::InvalidQuery("insert failed".to_owned()))],
    784             vec![Ok(r#"[{"change_seq":1}]"#.to_owned())],
    785         ));
    786         assert!(
    787             exec_store
    788                 .append_record(&local_work("local-a"))
    789                 .expect_err("exec failure")
    790                 .to_string()
    791                 .contains("insert failed")
    792         );
    793         assert_eq!(exec_store.executor().rollbacks.load(Ordering::SeqCst), 1);
    794 
    795         let commit_store = LocalEventsStore::new(ScriptedExecutor::with_commit_error(
    796             SqlError::InvalidQuery("commit failed".to_owned()),
    797         ));
    798         assert!(
    799             commit_store
    800                 .append_record(&local_work("local-a"))
    801                 .expect_err("commit failure")
    802                 .to_string()
    803                 .contains("commit failed")
    804         );
    805 
    806         let query_error_store = LocalEventsStore::new(ScriptedExecutor::new(
    807             Vec::new(),
    808             vec![Err(SqlError::InvalidQuery("query failed".to_owned()))],
    809         ));
    810         assert!(
    811             query_error_store
    812                 .get_record("record-a")
    813                 .expect_err("query failure")
    814                 .to_string()
    815                 .contains("query failed")
    816         );
    817 
    818         let invalid_rows_store =
    819             LocalEventsStore::new(ScriptedExecutor::new(Vec::new(), vec![Ok("{".to_owned())]));
    820         let _ = invalid_rows_store
    821             .get_record("record-a")
    822             .expect_err("invalid rows");
    823 
    824         let cursor_rows_store =
    825             LocalEventsStore::new(ScriptedExecutor::new(Vec::new(), vec![Ok("{".to_owned())]));
    826         let _ = cursor_rows_store
    827             .get_cursor("app")
    828             .expect_err("invalid cursor rows");
    829 
    830         let change_rows_store =
    831             LocalEventsStore::new(ScriptedExecutor::new(Vec::new(), vec![Ok("{".to_owned())]));
    832         let _ = change_rows_store
    833             .append_record(&local_work("local-a"))
    834             .expect_err("invalid change rows");
    835 
    836         let cursor_exec_store = LocalEventsStore::new(ScriptedExecutor::new(
    837             vec![Err(SqlError::InvalidQuery("cursor failed".to_owned()))],
    838             Vec::new(),
    839         ));
    840         assert!(
    841             cursor_exec_store
    842                 .advance_cursor("app", 1, 1000)
    843                 .expect_err("cursor exec failure")
    844                 .to_string()
    845                 .contains("cursor failed")
    846         );
    847 
    848         let append_lookup_store = LocalEventsStore::new(ScriptedExecutor::new(
    849             vec![Ok(ExecOutcome {
    850                 changes: 1,
    851                 last_insert_id: 0,
    852             })],
    853             vec![Ok(r#"[{"change_seq":1}]"#.to_owned()), Ok("[]".to_owned())],
    854         ));
    855         assert!(
    856             append_lookup_store
    857                 .append_record(&local_work("local-a"))
    858                 .expect_err("append lookup failure")
    859                 .to_string()
    860                 .contains("record append failed")
    861         );
    862 
    863         let update_lookup_store = LocalEventsStore::new(ScriptedExecutor::new(
    864             vec![Ok(ExecOutcome {
    865                 changes: 1,
    866                 last_insert_id: 0,
    867             })],
    868             vec![Ok(r#"[{"change_seq":1}]"#.to_owned()), Ok("[]".to_owned())],
    869         ));
    870         assert!(
    871             update_lookup_store
    872                 .update_outbox(&LocalEventRecordUpdate {
    873                     record_id: "event-a".to_owned(),
    874                     status: LocalRecordStatus::Published,
    875                     outbox_status: PublishOutboxStatus::Acknowledged,
    876                     relay_set_fingerprint: None,
    877                     relay_delivery_json: None,
    878                     updated_at_ms: 4000,
    879                 })
    880                 .expect_err("update lookup failure")
    881                 .to_string()
    882                 .contains("event-a")
    883         );
    884 
    885         let cursor_query_store = LocalEventsStore::new(ScriptedExecutor::new(
    886             Vec::new(),
    887             vec![Err(SqlError::InvalidQuery(
    888                 "cursor query failed".to_owned(),
    889             ))],
    890         ));
    891         assert!(
    892             cursor_query_store
    893                 .get_cursor("app")
    894                 .expect_err("cursor query failure")
    895                 .to_string()
    896                 .contains("cursor query failed")
    897         );
    898 
    899         let advance_cursor_query_store = LocalEventsStore::new(ScriptedExecutor::new(
    900             vec![Ok(ExecOutcome {
    901                 changes: 1,
    902                 last_insert_id: 0,
    903             })],
    904             vec![Err(SqlError::InvalidQuery(
    905                 "advanced cursor query failed".to_owned(),
    906             ))],
    907         ));
    908         assert!(
    909             advance_cursor_query_store
    910                 .advance_cursor("app", 1, 1000)
    911                 .expect_err("advance cursor query failure")
    912                 .to_string()
    913                 .contains("advanced cursor query failed")
    914         );
    915 
    916         let change_query_store = LocalEventsStore::new(ScriptedExecutor::new(
    917             Vec::new(),
    918             vec![Err(SqlError::InvalidQuery(
    919                 "change query failed".to_owned(),
    920             ))],
    921         ));
    922         assert!(
    923             change_query_store
    924                 .append_record(&local_work("local-a"))
    925                 .expect_err("change query failure")
    926                 .to_string()
    927                 .contains("change query failed")
    928         );
    929     }
    930 
    931     #[test]
    932     fn store_reports_record_row_conversion_failures() {
    933         for (field, value, expected) in [
    934             ("family", json!("bad_family"), "family"),
    935             ("status", json!("bad_status"), "status"),
    936             ("source_runtime", json!("bad_runtime"), "runtime"),
    937             ("event_tags_json", json!("{"), "EOF"),
    938             ("raw_event_json", json!("{"), "EOF"),
    939             ("outbox_status", json!("bad_outbox"), "outbox"),
    940             ("relay_delivery_json", json!("{"), "EOF"),
    941         ] {
    942             let store = LocalEventsStore::new(ScriptedExecutor::new(
    943                 Vec::new(),
    944                 vec![Ok(record_row_with(field, value))],
    945             ));
    946             let error = store.get_record("record-a").expect_err("conversion error");
    947 
    948             assert!(
    949                 error.to_string().contains(expected),
    950                 "expected error to contain {expected}, got {error}"
    951             );
    952         }
    953 
    954         let store = LocalEventsStore::new(ScriptedExecutor::new(
    955             vec![Ok(ExecOutcome {
    956                 changes: 1,
    957                 last_insert_id: 0,
    958             })],
    959             vec![
    960                 Ok(r#"[{"change_seq":1}]"#.to_owned()),
    961                 Ok(record_row_with("status", json!("published"))),
    962             ],
    963         ));
    964         let updated = store
    965             .update_outbox(&LocalEventRecordUpdate {
    966                 record_id: "record-a".to_owned(),
    967                 status: LocalRecordStatus::Published,
    968                 outbox_status: PublishOutboxStatus::Acknowledged,
    969                 relay_set_fingerprint: None,
    970                 relay_delivery_json: None,
    971                 updated_at_ms: 4000,
    972             })
    973             .expect("scripted update");
    974         assert_eq!(updated.status, LocalRecordStatus::Published);
    975     }
    976 }