app

Local-first trade for farms and co-ops
git clone https://radroots.dev/git/app.git
Log | Files | Refs | README | LICENSE

sdk_migration_receipts.rs (11926B)


      1 use rusqlite::{Connection, OptionalExtension, params};
      2 use serde_json::Value;
      3 use uuid::Uuid;
      4 
      5 use crate::AppSqliteError;
      6 
      7 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
      8 pub enum AppSdkMigrationReceiptSourceKind {
      9     LocalOutbox,
     10     SharedLocalEvent,
     11 }
     12 
     13 impl AppSdkMigrationReceiptSourceKind {
     14     pub const fn storage_key(self) -> &'static str {
     15         match self {
     16             Self::LocalOutbox => "local_outbox",
     17             Self::SharedLocalEvent => "shared_local_event",
     18         }
     19     }
     20 
     21     pub fn parse(value: &str) -> Result<Self, AppSqliteError> {
     22         match value {
     23             "local_outbox" => Ok(Self::LocalOutbox),
     24             "shared_local_event" => Ok(Self::SharedLocalEvent),
     25             _ => Err(AppSqliteError::DecodeEnum {
     26                 field: "app_sdk_migration_receipts.source_kind",
     27                 value: value.to_owned(),
     28             }),
     29         }
     30     }
     31 }
     32 
     33 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
     34 pub enum AppSdkMigrationState {
     35     Pending,
     36     Prepared,
     37     Enqueued,
     38     Pushed,
     39     Failed,
     40     Blocked,
     41     Skipped,
     42     Unsupported,
     43     ManualReview,
     44     Unknown,
     45 }
     46 
     47 impl AppSdkMigrationState {
     48     pub const fn storage_key(self) -> &'static str {
     49         match self {
     50             Self::Pending => "pending",
     51             Self::Prepared => "prepared",
     52             Self::Enqueued => "enqueued",
     53             Self::Pushed => "pushed",
     54             Self::Failed => "failed",
     55             Self::Blocked => "blocked",
     56             Self::Skipped => "skipped",
     57             Self::Unsupported => "unsupported",
     58             Self::ManualReview => "manual_review",
     59             Self::Unknown => "unknown",
     60         }
     61     }
     62 
     63     pub fn parse(value: &str) -> Result<Self, AppSqliteError> {
     64         match value {
     65             "pending" => Ok(Self::Pending),
     66             "prepared" => Ok(Self::Prepared),
     67             "enqueued" => Ok(Self::Enqueued),
     68             "pushed" => Ok(Self::Pushed),
     69             "failed" => Ok(Self::Failed),
     70             "blocked" => Ok(Self::Blocked),
     71             "skipped" => Ok(Self::Skipped),
     72             "unsupported" => Ok(Self::Unsupported),
     73             "manual_review" => Ok(Self::ManualReview),
     74             "unknown" => Ok(Self::Unknown),
     75             _ => Err(AppSqliteError::DecodeEnum {
     76                 field: "app_sdk_migration_receipts.migration_state",
     77                 value: value.to_owned(),
     78             }),
     79         }
     80     }
     81 }
     82 
     83 #[derive(Clone, Debug, Eq, PartialEq)]
     84 pub struct AppSdkMigrationReceiptInput {
     85     pub source_kind: AppSdkMigrationReceiptSourceKind,
     86     pub source_record_id: String,
     87     pub sdk_operation_kind: String,
     88     pub sdk_outbox_event_ids: Vec<String>,
     89     pub expected_event_id: Option<String>,
     90     pub actor_pubkey: Option<String>,
     91     pub idempotency_digest_prefix: Option<String>,
     92     pub migration_state: AppSdkMigrationState,
     93     pub recorded_at: String,
     94     pub detail_json: Value,
     95 }
     96 
     97 #[derive(Clone, Debug, Eq, PartialEq)]
     98 pub struct AppSdkMigrationReceipt {
     99     pub id: String,
    100     pub source_kind: AppSdkMigrationReceiptSourceKind,
    101     pub source_record_id: String,
    102     pub sdk_operation_kind: String,
    103     pub sdk_outbox_event_ids: Vec<String>,
    104     pub expected_event_id: Option<String>,
    105     pub actor_pubkey: Option<String>,
    106     pub idempotency_digest_prefix: Option<String>,
    107     pub migration_state: AppSdkMigrationState,
    108     pub created_at: String,
    109     pub updated_at: String,
    110     pub detail_json: Value,
    111 }
    112 
    113 pub struct AppSdkMigrationReceiptRepository<'a> {
    114     connection: &'a Connection,
    115 }
    116 
    117 impl<'a> AppSdkMigrationReceiptRepository<'a> {
    118     pub const fn new(connection: &'a Connection) -> Self {
    119         Self { connection }
    120     }
    121 
    122     pub fn record_receipt(
    123         &self,
    124         input: &AppSdkMigrationReceiptInput,
    125     ) -> Result<AppSdkMigrationReceipt, AppSqliteError> {
    126         let receipt_id = Uuid::now_v7().to_string();
    127         let outbox_ids_json =
    128             serde_json::to_string(&input.sdk_outbox_event_ids).map_err(|source| {
    129                 AppSqliteError::EncodeJson {
    130                     field: "app_sdk_migration_receipts.sdk_outbox_event_ids_json",
    131                     source,
    132                 }
    133             })?;
    134         let detail_json = serde_json::to_string(&input.detail_json).map_err(|source| {
    135             AppSqliteError::EncodeJson {
    136                 field: "app_sdk_migration_receipts.detail_json",
    137                 source,
    138             }
    139         })?;
    140 
    141         self.connection
    142             .execute(
    143                 "INSERT INTO app_sdk_migration_receipts (
    144                     id,
    145                     source_kind,
    146                     source_record_id,
    147                     sdk_operation_kind,
    148                     sdk_outbox_event_ids_json,
    149                     expected_event_id,
    150                     actor_pubkey,
    151                     idempotency_digest_prefix,
    152                     migration_state,
    153                     created_at,
    154                     updated_at,
    155                     detail_json
    156                  ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?10, ?11)
    157                  ON CONFLICT(source_kind, source_record_id)
    158                  DO UPDATE SET
    159                     sdk_operation_kind = excluded.sdk_operation_kind,
    160                     sdk_outbox_event_ids_json = excluded.sdk_outbox_event_ids_json,
    161                     expected_event_id = excluded.expected_event_id,
    162                     actor_pubkey = excluded.actor_pubkey,
    163                     idempotency_digest_prefix = excluded.idempotency_digest_prefix,
    164                     migration_state = excluded.migration_state,
    165                     updated_at = excluded.updated_at,
    166                     detail_json = excluded.detail_json",
    167                 params![
    168                     receipt_id,
    169                     input.source_kind.storage_key(),
    170                     input.source_record_id.as_str(),
    171                     input.sdk_operation_kind.as_str(),
    172                     outbox_ids_json.as_str(),
    173                     input.expected_event_id.as_deref(),
    174                     input.actor_pubkey.as_deref(),
    175                     input.idempotency_digest_prefix.as_deref(),
    176                     input.migration_state.storage_key(),
    177                     input.recorded_at.as_str(),
    178                     detail_json.as_str(),
    179                 ],
    180             )
    181             .map_err(|source| AppSqliteError::Query {
    182                 operation: "record app SDK migration receipt",
    183                 source,
    184             })?;
    185 
    186         self.load_receipt(input.source_kind, input.source_record_id.as_str())?
    187             .ok_or(AppSqliteError::MissingColumn {
    188                 field: "app_sdk_migration_receipts.id",
    189             })
    190     }
    191 
    192     pub fn load_receipt(
    193         &self,
    194         source_kind: AppSdkMigrationReceiptSourceKind,
    195         source_record_id: &str,
    196     ) -> Result<Option<AppSdkMigrationReceipt>, AppSqliteError> {
    197         self.connection
    198             .query_row(
    199                 "SELECT
    200                     id,
    201                     source_kind,
    202                     source_record_id,
    203                     sdk_operation_kind,
    204                     sdk_outbox_event_ids_json,
    205                     expected_event_id,
    206                     actor_pubkey,
    207                     idempotency_digest_prefix,
    208                     migration_state,
    209                     created_at,
    210                     updated_at,
    211                     detail_json
    212                  FROM app_sdk_migration_receipts
    213                  WHERE source_kind = ?1
    214                     AND source_record_id = ?2
    215                  LIMIT 1",
    216                 params![source_kind.storage_key(), source_record_id],
    217                 decode_receipt_row,
    218             )
    219             .optional()
    220             .map_err(|source| AppSqliteError::Query {
    221                 operation: "load app SDK migration receipt",
    222                 source,
    223             })
    224     }
    225 }
    226 
    227 fn decode_receipt_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<AppSdkMigrationReceipt> {
    228     let source_kind: String = row.get(1)?;
    229     let outbox_ids_json: String = row.get(4)?;
    230     let migration_state: String = row.get(8)?;
    231     let detail_json: String = row.get(11)?;
    232     Ok(AppSdkMigrationReceipt {
    233         id: row.get(0)?,
    234         source_kind: AppSdkMigrationReceiptSourceKind::parse(source_kind.as_str())
    235             .map_err(decode_app_error)?,
    236         source_record_id: row.get(2)?,
    237         sdk_operation_kind: row.get(3)?,
    238         sdk_outbox_event_ids: serde_json::from_str(outbox_ids_json.as_str()).map_err(|source| {
    239             decode_app_error(AppSqliteError::DecodeJson {
    240                 field: "app_sdk_migration_receipts.sdk_outbox_event_ids_json",
    241                 source,
    242             })
    243         })?,
    244         expected_event_id: row.get(5)?,
    245         actor_pubkey: row.get(6)?,
    246         idempotency_digest_prefix: row.get(7)?,
    247         migration_state: AppSdkMigrationState::parse(migration_state.as_str())
    248             .map_err(decode_app_error)?,
    249         created_at: row.get(9)?,
    250         updated_at: row.get(10)?,
    251         detail_json: serde_json::from_str(detail_json.as_str()).map_err(|source| {
    252             decode_app_error(AppSqliteError::DecodeJson {
    253                 field: "app_sdk_migration_receipts.detail_json",
    254                 source,
    255             })
    256         })?,
    257     })
    258 }
    259 
    260 fn decode_app_error(error: AppSqliteError) -> rusqlite::Error {
    261     rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(error))
    262 }
    263 
    264 #[cfg(test)]
    265 mod tests {
    266     use serde_json::json;
    267 
    268     use crate::{
    269         AppSdkMigrationReceiptInput, AppSdkMigrationReceiptSourceKind, AppSdkMigrationState,
    270         AppSqliteStore, DatabaseTarget,
    271     };
    272 
    273     #[test]
    274     fn migration_receipts_are_idempotent_by_source_record() {
    275         let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app store");
    276         let first = store
    277             .sdk_migration_receipt_repository()
    278             .record_receipt(&AppSdkMigrationReceiptInput {
    279                 source_kind: AppSdkMigrationReceiptSourceKind::LocalOutbox,
    280                 source_record_id: "source-record-a".to_owned(),
    281                 sdk_operation_kind: "farm.publish".to_owned(),
    282                 sdk_outbox_event_ids: vec!["outbox-a".to_owned()],
    283                 expected_event_id: Some("expected-a".to_owned()),
    284                 actor_pubkey: Some("actor-a".to_owned()),
    285                 idempotency_digest_prefix: Some("digest-a".to_owned()),
    286                 migration_state: AppSdkMigrationState::Enqueued,
    287                 recorded_at: "2026-06-18T12:00:00Z".to_owned(),
    288                 detail_json: json!({"attempt": 1}),
    289             })
    290             .expect("record first receipt");
    291         let second = store
    292             .sdk_migration_receipt_repository()
    293             .record_receipt(&AppSdkMigrationReceiptInput {
    294                 source_kind: AppSdkMigrationReceiptSourceKind::LocalOutbox,
    295                 source_record_id: "source-record-a".to_owned(),
    296                 sdk_operation_kind: "farm.publish".to_owned(),
    297                 sdk_outbox_event_ids: vec!["outbox-b".to_owned()],
    298                 expected_event_id: Some("expected-b".to_owned()),
    299                 actor_pubkey: Some("actor-b".to_owned()),
    300                 idempotency_digest_prefix: Some("digest-b".to_owned()),
    301                 migration_state: AppSdkMigrationState::Pushed,
    302                 recorded_at: "2026-06-18T12:05:00Z".to_owned(),
    303                 detail_json: json!({"attempt": 2}),
    304             })
    305             .expect("record second receipt");
    306 
    307         assert_eq!(first.id, second.id);
    308         assert_eq!(second.created_at, "2026-06-18T12:00:00Z");
    309         assert_eq!(second.updated_at, "2026-06-18T12:05:00Z");
    310         assert_eq!(second.sdk_outbox_event_ids, vec!["outbox-b".to_owned()]);
    311         assert_eq!(second.expected_event_id.as_deref(), Some("expected-b"));
    312         assert_eq!(second.actor_pubkey.as_deref(), Some("actor-b"));
    313         assert_eq!(second.migration_state, AppSdkMigrationState::Pushed);
    314         assert_eq!(second.detail_json, json!({"attempt": 2}));
    315     }
    316 }