sdk_migration_receipts.rs (11926B)
1 use rusqlite::{Connection, OptionalExtension, params}; 2 use serde_json::Value; 3 use uuid::Uuid; 4 5 use crate::AppSqliteError; 6 7 #[derive(Clone, Copy, Debug, Eq, PartialEq)] 8 pub enum AppSdkMigrationReceiptSourceKind { 9 LocalOutbox, 10 SharedLocalEvent, 11 } 12 13 impl AppSdkMigrationReceiptSourceKind { 14 pub const fn storage_key(self) -> &'static str { 15 match self { 16 Self::LocalOutbox => "local_outbox", 17 Self::SharedLocalEvent => "shared_local_event", 18 } 19 } 20 21 pub fn parse(value: &str) -> Result<Self, AppSqliteError> { 22 match value { 23 "local_outbox" => Ok(Self::LocalOutbox), 24 "shared_local_event" => Ok(Self::SharedLocalEvent), 25 _ => Err(AppSqliteError::DecodeEnum { 26 field: "app_sdk_migration_receipts.source_kind", 27 value: value.to_owned(), 28 }), 29 } 30 } 31 } 32 33 #[derive(Clone, Copy, Debug, Eq, PartialEq)] 34 pub enum AppSdkMigrationState { 35 Pending, 36 Prepared, 37 Enqueued, 38 Pushed, 39 Failed, 40 Blocked, 41 Skipped, 42 Unsupported, 43 ManualReview, 44 Unknown, 45 } 46 47 impl AppSdkMigrationState { 48 pub const fn storage_key(self) -> &'static str { 49 match self { 50 Self::Pending => "pending", 51 Self::Prepared => "prepared", 52 Self::Enqueued => "enqueued", 53 Self::Pushed => "pushed", 54 Self::Failed => "failed", 55 Self::Blocked => "blocked", 56 Self::Skipped => "skipped", 57 Self::Unsupported => "unsupported", 58 Self::ManualReview => "manual_review", 59 Self::Unknown => "unknown", 60 } 61 } 62 63 pub fn parse(value: &str) -> Result<Self, AppSqliteError> { 64 match value { 65 "pending" => Ok(Self::Pending), 66 "prepared" => Ok(Self::Prepared), 67 "enqueued" => Ok(Self::Enqueued), 68 "pushed" => Ok(Self::Pushed), 69 "failed" => Ok(Self::Failed), 70 "blocked" => Ok(Self::Blocked), 71 "skipped" => Ok(Self::Skipped), 72 "unsupported" => Ok(Self::Unsupported), 73 "manual_review" => Ok(Self::ManualReview), 74 "unknown" => Ok(Self::Unknown), 75 _ => Err(AppSqliteError::DecodeEnum { 76 field: "app_sdk_migration_receipts.migration_state", 77 value: value.to_owned(), 78 }), 79 } 80 } 81 } 82 83 #[derive(Clone, Debug, Eq, PartialEq)] 84 pub struct AppSdkMigrationReceiptInput { 85 pub source_kind: AppSdkMigrationReceiptSourceKind, 86 pub source_record_id: String, 87 pub sdk_operation_kind: String, 88 pub sdk_outbox_event_ids: Vec<String>, 89 pub expected_event_id: Option<String>, 90 pub actor_pubkey: Option<String>, 91 pub idempotency_digest_prefix: Option<String>, 92 pub migration_state: AppSdkMigrationState, 93 pub recorded_at: String, 94 pub detail_json: Value, 95 } 96 97 #[derive(Clone, Debug, Eq, PartialEq)] 98 pub struct AppSdkMigrationReceipt { 99 pub id: String, 100 pub source_kind: AppSdkMigrationReceiptSourceKind, 101 pub source_record_id: String, 102 pub sdk_operation_kind: String, 103 pub sdk_outbox_event_ids: Vec<String>, 104 pub expected_event_id: Option<String>, 105 pub actor_pubkey: Option<String>, 106 pub idempotency_digest_prefix: Option<String>, 107 pub migration_state: AppSdkMigrationState, 108 pub created_at: String, 109 pub updated_at: String, 110 pub detail_json: Value, 111 } 112 113 pub struct AppSdkMigrationReceiptRepository<'a> { 114 connection: &'a Connection, 115 } 116 117 impl<'a> AppSdkMigrationReceiptRepository<'a> { 118 pub const fn new(connection: &'a Connection) -> Self { 119 Self { connection } 120 } 121 122 pub fn record_receipt( 123 &self, 124 input: &AppSdkMigrationReceiptInput, 125 ) -> Result<AppSdkMigrationReceipt, AppSqliteError> { 126 let receipt_id = Uuid::now_v7().to_string(); 127 let outbox_ids_json = 128 serde_json::to_string(&input.sdk_outbox_event_ids).map_err(|source| { 129 AppSqliteError::EncodeJson { 130 field: "app_sdk_migration_receipts.sdk_outbox_event_ids_json", 131 source, 132 } 133 })?; 134 let detail_json = serde_json::to_string(&input.detail_json).map_err(|source| { 135 AppSqliteError::EncodeJson { 136 field: "app_sdk_migration_receipts.detail_json", 137 source, 138 } 139 })?; 140 141 self.connection 142 .execute( 143 "INSERT INTO app_sdk_migration_receipts ( 144 id, 145 source_kind, 146 source_record_id, 147 sdk_operation_kind, 148 sdk_outbox_event_ids_json, 149 expected_event_id, 150 actor_pubkey, 151 idempotency_digest_prefix, 152 migration_state, 153 created_at, 154 updated_at, 155 detail_json 156 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?10, ?11) 157 ON CONFLICT(source_kind, source_record_id) 158 DO UPDATE SET 159 sdk_operation_kind = excluded.sdk_operation_kind, 160 sdk_outbox_event_ids_json = excluded.sdk_outbox_event_ids_json, 161 expected_event_id = excluded.expected_event_id, 162 actor_pubkey = excluded.actor_pubkey, 163 idempotency_digest_prefix = excluded.idempotency_digest_prefix, 164 migration_state = excluded.migration_state, 165 updated_at = excluded.updated_at, 166 detail_json = excluded.detail_json", 167 params![ 168 receipt_id, 169 input.source_kind.storage_key(), 170 input.source_record_id.as_str(), 171 input.sdk_operation_kind.as_str(), 172 outbox_ids_json.as_str(), 173 input.expected_event_id.as_deref(), 174 input.actor_pubkey.as_deref(), 175 input.idempotency_digest_prefix.as_deref(), 176 input.migration_state.storage_key(), 177 input.recorded_at.as_str(), 178 detail_json.as_str(), 179 ], 180 ) 181 .map_err(|source| AppSqliteError::Query { 182 operation: "record app SDK migration receipt", 183 source, 184 })?; 185 186 self.load_receipt(input.source_kind, input.source_record_id.as_str())? 187 .ok_or(AppSqliteError::MissingColumn { 188 field: "app_sdk_migration_receipts.id", 189 }) 190 } 191 192 pub fn load_receipt( 193 &self, 194 source_kind: AppSdkMigrationReceiptSourceKind, 195 source_record_id: &str, 196 ) -> Result<Option<AppSdkMigrationReceipt>, AppSqliteError> { 197 self.connection 198 .query_row( 199 "SELECT 200 id, 201 source_kind, 202 source_record_id, 203 sdk_operation_kind, 204 sdk_outbox_event_ids_json, 205 expected_event_id, 206 actor_pubkey, 207 idempotency_digest_prefix, 208 migration_state, 209 created_at, 210 updated_at, 211 detail_json 212 FROM app_sdk_migration_receipts 213 WHERE source_kind = ?1 214 AND source_record_id = ?2 215 LIMIT 1", 216 params![source_kind.storage_key(), source_record_id], 217 decode_receipt_row, 218 ) 219 .optional() 220 .map_err(|source| AppSqliteError::Query { 221 operation: "load app SDK migration receipt", 222 source, 223 }) 224 } 225 } 226 227 fn decode_receipt_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<AppSdkMigrationReceipt> { 228 let source_kind: String = row.get(1)?; 229 let outbox_ids_json: String = row.get(4)?; 230 let migration_state: String = row.get(8)?; 231 let detail_json: String = row.get(11)?; 232 Ok(AppSdkMigrationReceipt { 233 id: row.get(0)?, 234 source_kind: AppSdkMigrationReceiptSourceKind::parse(source_kind.as_str()) 235 .map_err(decode_app_error)?, 236 source_record_id: row.get(2)?, 237 sdk_operation_kind: row.get(3)?, 238 sdk_outbox_event_ids: serde_json::from_str(outbox_ids_json.as_str()).map_err(|source| { 239 decode_app_error(AppSqliteError::DecodeJson { 240 field: "app_sdk_migration_receipts.sdk_outbox_event_ids_json", 241 source, 242 }) 243 })?, 244 expected_event_id: row.get(5)?, 245 actor_pubkey: row.get(6)?, 246 idempotency_digest_prefix: row.get(7)?, 247 migration_state: AppSdkMigrationState::parse(migration_state.as_str()) 248 .map_err(decode_app_error)?, 249 created_at: row.get(9)?, 250 updated_at: row.get(10)?, 251 detail_json: serde_json::from_str(detail_json.as_str()).map_err(|source| { 252 decode_app_error(AppSqliteError::DecodeJson { 253 field: "app_sdk_migration_receipts.detail_json", 254 source, 255 }) 256 })?, 257 }) 258 } 259 260 fn decode_app_error(error: AppSqliteError) -> rusqlite::Error { 261 rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(error)) 262 } 263 264 #[cfg(test)] 265 mod tests { 266 use serde_json::json; 267 268 use crate::{ 269 AppSdkMigrationReceiptInput, AppSdkMigrationReceiptSourceKind, AppSdkMigrationState, 270 AppSqliteStore, DatabaseTarget, 271 }; 272 273 #[test] 274 fn migration_receipts_are_idempotent_by_source_record() { 275 let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app store"); 276 let first = store 277 .sdk_migration_receipt_repository() 278 .record_receipt(&AppSdkMigrationReceiptInput { 279 source_kind: AppSdkMigrationReceiptSourceKind::LocalOutbox, 280 source_record_id: "source-record-a".to_owned(), 281 sdk_operation_kind: "farm.publish".to_owned(), 282 sdk_outbox_event_ids: vec!["outbox-a".to_owned()], 283 expected_event_id: Some("expected-a".to_owned()), 284 actor_pubkey: Some("actor-a".to_owned()), 285 idempotency_digest_prefix: Some("digest-a".to_owned()), 286 migration_state: AppSdkMigrationState::Enqueued, 287 recorded_at: "2026-06-18T12:00:00Z".to_owned(), 288 detail_json: json!({"attempt": 1}), 289 }) 290 .expect("record first receipt"); 291 let second = store 292 .sdk_migration_receipt_repository() 293 .record_receipt(&AppSdkMigrationReceiptInput { 294 source_kind: AppSdkMigrationReceiptSourceKind::LocalOutbox, 295 source_record_id: "source-record-a".to_owned(), 296 sdk_operation_kind: "farm.publish".to_owned(), 297 sdk_outbox_event_ids: vec!["outbox-b".to_owned()], 298 expected_event_id: Some("expected-b".to_owned()), 299 actor_pubkey: Some("actor-b".to_owned()), 300 idempotency_digest_prefix: Some("digest-b".to_owned()), 301 migration_state: AppSdkMigrationState::Pushed, 302 recorded_at: "2026-06-18T12:05:00Z".to_owned(), 303 detail_json: json!({"attempt": 2}), 304 }) 305 .expect("record second receipt"); 306 307 assert_eq!(first.id, second.id); 308 assert_eq!(second.created_at, "2026-06-18T12:00:00Z"); 309 assert_eq!(second.updated_at, "2026-06-18T12:05:00Z"); 310 assert_eq!(second.sdk_outbox_event_ids, vec!["outbox-b".to_owned()]); 311 assert_eq!(second.expected_event_id.as_deref(), Some("expected-b")); 312 assert_eq!(second.actor_pubkey.as_deref(), Some("actor-b")); 313 assert_eq!(second.migration_state, AppSdkMigrationState::Pushed); 314 assert_eq!(second.detail_json, json!({"attempt": 2})); 315 } 316 }