app

Local-first trade for farms and co-ops
git clone https://radroots.dev/git/app.git
Log | Files | Refs | README | LICENSE

commit af42bf80c47d7c40ec38d04fd9f4e319a50c2acd
parent df0b3a84a4b7c33b43879d6c9f70ec8c3751d7d2
Author: triesap <tyson@radroots.org>
Date:   Thu, 18 Jun 2026 21:35:56 -0700

store: add sdk migration receipts

Diffstat:
Acrates/store/migrations/0028_sdk_migration_receipts.sql | 38++++++++++++++++++++++++++++++++++++++
Mcrates/store/src/error.rs | 12++++++++++++
Mcrates/store/src/lib.rs | 55+++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcrates/store/src/migration_audit.rs | 303++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
Mcrates/store/src/migrations.rs | 4++++
Acrates/store/src/sdk_migration_receipts.rs | 316+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
6 files changed, 719 insertions(+), 9 deletions(-)

diff --git a/crates/store/migrations/0028_sdk_migration_receipts.sql b/crates/store/migrations/0028_sdk_migration_receipts.sql @@ -0,0 +1,38 @@ +CREATE TABLE app_sdk_migration_receipts ( + id TEXT PRIMARY KEY NOT NULL, + source_kind TEXT NOT NULL CHECK ( + source_kind IN ('local_outbox', 'shared_local_event') + ), + source_record_id TEXT NOT NULL, + sdk_operation_kind TEXT NOT NULL, + sdk_outbox_event_ids_json TEXT NOT NULL, + expected_event_id TEXT, + actor_pubkey TEXT, + idempotency_digest_prefix TEXT, + migration_state TEXT NOT NULL CHECK ( + migration_state IN ( + 'pending', + 'prepared', + 'enqueued', + 'pushed', + 'failed', + 'blocked', + 'skipped', + 'unsupported', + 'manual_review', + 'unknown' + ) + ), + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + detail_json TEXT NOT NULL, + UNIQUE(source_kind, source_record_id) +); + +CREATE INDEX idx_app_sdk_migration_receipts_source_record ON app_sdk_migration_receipts( + source_record_id +); +CREATE INDEX idx_app_sdk_migration_receipts_state ON app_sdk_migration_receipts( + migration_state, + updated_at +); diff --git a/crates/store/src/error.rs b/crates/store/src/error.rs @@ -73,6 +73,18 @@ pub enum AppSqliteError { #[source] source: rusqlite::Error, }, + #[error("failed to encode sqlite JSON column `{field}`")] + EncodeJson { + field: &'static str, + #[source] + source: serde_json::Error, + }, + #[error("failed to decode sqlite JSON column `{field}`")] + DecodeJson { + field: &'static str, + #[source] + source: serde_json::Error, + }, #[error("invalid sqlite id in `{field}`: `{value}`")] DecodeId { field: &'static str, value: String }, #[error("missing required sqlite column `{field}`")] diff --git a/crates/store/src/lib.rs b/crates/store/src/lib.rs @@ -5,6 +5,7 @@ mod interop; mod migration_audit; mod migrations; mod repo; +mod sdk_migration_receipts; mod sync; use std::{collections::BTreeSet, fs, path::PathBuf, time::Duration}; @@ -48,6 +49,10 @@ pub use repo::{ SellerOrderDecisionExport, SellerOrderDecisionLineExport, TODAY_AGENDA_LIST_LIMIT, TODAY_AGENDA_LOW_STOCK_THRESHOLD, derive_farm_rules_readiness, }; +pub use sdk_migration_receipts::{ + AppSdkMigrationReceipt, AppSdkMigrationReceiptInput, AppSdkMigrationReceiptRepository, + AppSdkMigrationReceiptSourceKind, AppSdkMigrationState, +}; pub use sync::{ AppSyncRepository, StoredPendingSyncOperation, StoredRelayIngestCursor, StoredSyncConflict, }; @@ -120,6 +125,10 @@ impl AppSqliteStore { AppSyncRepository::new(&self.connection) } + pub fn sdk_migration_receipt_repository(&self) -> AppSdkMigrationReceiptRepository<'_> { + AppSdkMigrationReceiptRepository::new(&self.connection) + } + pub fn reminders_repository(&self) -> AppRemindersRepository<'_> { AppRemindersRepository::new(&self.connection) } @@ -859,6 +868,7 @@ mod tests { assert!(table_exists(connection, "order_recovery_records")); assert!(table_exists(connection, "buyer_order_coordination_records")); assert!(table_exists(connection, "order_validation_receipts")); + assert!(table_exists(connection, "app_sdk_migration_receipts")); assert!(column_exists(connection, "farms", "timezone")); assert!(column_exists(connection, "farms", "currency_code")); assert!(column_exists(connection, "local_outbox", "account_id")); @@ -1034,6 +1044,51 @@ mod tests { "order_recovery_records", "recovery_state" )); + assert!(column_exists( + connection, + "app_sdk_migration_receipts", + "source_record_id" + )); + assert!(column_exists( + connection, + "app_sdk_migration_receipts", + "source_kind" + )); + assert!(column_exists( + connection, + "app_sdk_migration_receipts", + "sdk_operation_kind" + )); + assert!(column_exists( + connection, + "app_sdk_migration_receipts", + "sdk_outbox_event_ids_json" + )); + assert!(column_exists( + connection, + "app_sdk_migration_receipts", + "expected_event_id" + )); + assert!(column_exists( + connection, + "app_sdk_migration_receipts", + "actor_pubkey" + )); + assert!(column_exists( + connection, + "app_sdk_migration_receipts", + "idempotency_digest_prefix" + )); + assert!(column_exists( + connection, + "app_sdk_migration_receipts", + "migration_state" + )); + assert!(column_exists( + connection, + "app_sdk_migration_receipts", + "detail_json" + )); connection .execute( "INSERT INTO local_interop_imports ( diff --git a/crates/store/src/migration_audit.rs b/crates/store/src/migration_audit.rs @@ -14,7 +14,10 @@ use radroots_sql_core::SqlExecutor; use rusqlite::params; use serde_json::Value; -use crate::{AppSqliteError, AppSqliteStore}; +use crate::{ + AppSdkMigrationReceipt, AppSdkMigrationReceiptSourceKind, AppSdkMigrationState, AppSqliteError, + AppSqliteStore, +}; pub const APP_SDK_MIGRATION_AUDIT_DEFAULT_BATCH_SIZE: u32 = 500; pub const APP_SDK_MIGRATION_AUDIT_MAX_BATCH_SIZE: u32 = 1_000; @@ -94,6 +97,9 @@ pub enum AppSdkMigrationAuditSource { pub enum AppSdkMigrationAuditClassification { PublishableCandidate, AlreadyRepresentedCandidate, + RepresentedRecord, + SkippedRecord, + FailedRecord, LocalWorkDeferred, ManualReviewRequired, PaymentDeferred, @@ -108,6 +114,9 @@ impl AppSdkMigrationAuditClassification { match self { Self::PublishableCandidate => "publishable_candidate", Self::AlreadyRepresentedCandidate => "already_represented_candidate", + Self::RepresentedRecord => "represented_record", + Self::SkippedRecord => "skipped_record", + Self::FailedRecord => "failed_record", Self::LocalWorkDeferred => "local_work_deferred", Self::ManualReviewRequired => "manual_review_required", Self::PaymentDeferred => "payment_deferred", @@ -130,7 +139,7 @@ impl AppSqliteStore { { let local_outbox = self.audit_sdk_migration_local_outbox(request)?; let shared_local_events = - audit_sdk_migration_shared_local_events(shared_local_events, request)?; + self.audit_sdk_migration_shared_local_events(shared_local_events, request)?; let issues = local_outbox .issues .iter() @@ -164,7 +173,11 @@ impl AppSqliteStore { report.batch_count += 1; for row in &rows { last_rowid = row.rowid; - audit_local_outbox_row(row, &mut report); + let receipt = self.sdk_migration_receipt_repository().load_receipt( + AppSdkMigrationReceiptSourceKind::LocalOutbox, + row.id.as_str(), + )?; + audit_local_outbox_row(row, receipt.as_ref(), &mut report); } if rows.len() < batch_size as usize { break; @@ -173,11 +186,28 @@ impl AppSqliteStore { Ok(report.finish()) } + + pub fn audit_sdk_migration_shared_local_events<E>( + &self, + store: &LocalEventsStore<E>, + request: AppSdkMigrationAuditRequest, + ) -> Result<AppSdkMigrationAuditSourceReport, AppSqliteError> + where + E: SqlExecutor, + { + audit_sdk_migration_shared_local_events_with_receipts(store, request, |record_id| { + self.sdk_migration_receipt_repository().load_receipt( + AppSdkMigrationReceiptSourceKind::SharedLocalEvent, + record_id, + ) + }) + } } -pub fn audit_sdk_migration_shared_local_events<E>( +fn audit_sdk_migration_shared_local_events_with_receipts<E>( store: &LocalEventsStore<E>, request: AppSdkMigrationAuditRequest, + mut load_receipt: impl FnMut(&str) -> Result<Option<AppSdkMigrationReceipt>, AppSqliteError>, ) -> Result<AppSdkMigrationAuditSourceReport, AppSqliteError> where E: SqlExecutor, @@ -202,7 +232,8 @@ where report.batch_count += 1; for record in &records { after_change_seq = record.change_seq; - audit_shared_local_event_record(record, &mut report); + let receipt = load_receipt(record.record_id.as_str())?; + audit_shared_local_event_record(record, receipt.as_ref(), &mut report); } if records.len() < batch_size as usize { break; @@ -364,10 +395,11 @@ struct DuplicateIdentity { fn audit_local_outbox_row( row: &LocalOutboxAuditRow, + receipt: Option<&AppSdkMigrationReceipt>, report: &mut AppSdkMigrationAuditSourceBuilder, ) { let payload = serde_json::from_str::<AppPublishPayload>(row.payload_json.as_str()); - let (kind, classification) = match payload { + let (kind, source_classification) = match payload { Ok(payload) => { if row.operation_kind == SyncOperationKind::Delete.storage_key() { report.issue( @@ -404,6 +436,8 @@ fn audit_local_outbox_row( ) } }; + let classification = + classify_receipt_overlay(row.id.as_str(), source_classification, receipt, report); let identities = vec![ DuplicateIdentity { kind: "operation".to_owned(), @@ -462,10 +496,17 @@ fn classify_local_outbox_state( fn audit_shared_local_event_record( record: &LocalEventRecord, + receipt: Option<&AppSdkMigrationReceipt>, report: &mut AppSdkMigrationAuditSourceBuilder, ) { let kind = shared_local_event_kind(record); - let classification = shared_local_event_classification(record, report); + let source_classification = shared_local_event_classification(record, report); + let classification = classify_receipt_overlay( + record.record_id.as_str(), + source_classification, + receipt, + report, + ); report.record( record.record_id.as_str(), kind, @@ -479,6 +520,92 @@ fn audit_shared_local_event_record( ); } +fn classify_receipt_overlay( + record_id: &str, + source_classification: AppSdkMigrationAuditClassification, + receipt: Option<&AppSdkMigrationReceipt>, + report: &mut AppSdkMigrationAuditSourceBuilder, +) -> AppSdkMigrationAuditClassification { + let Some(receipt) = receipt else { + return source_classification; + }; + if !receipt_allowed_for_source_classification(source_classification) { + report.issue( + "sdk_migration_receipt_for_non_migratable_source", + Some(record_id), + format!( + "SDK migration receipt `{}` for operation `{}` cannot override source classification `{}`", + receipt.id, + receipt.sdk_operation_kind, + source_classification.storage_key() + ), + ); + return source_classification; + } + + match receipt.migration_state { + AppSdkMigrationState::Pending | AppSdkMigrationState::Prepared => source_classification, + AppSdkMigrationState::Enqueued | AppSdkMigrationState::Pushed => { + AppSdkMigrationAuditClassification::RepresentedRecord + } + AppSdkMigrationState::Skipped => AppSdkMigrationAuditClassification::SkippedRecord, + AppSdkMigrationState::Failed => { + report.issue( + "sdk_migration_receipt_failed", + Some(record_id), + format!( + "SDK migration receipt `{}` for operation `{}` is failed", + receipt.id, receipt.sdk_operation_kind + ), + ); + AppSdkMigrationAuditClassification::FailedRecord + } + AppSdkMigrationState::Blocked | AppSdkMigrationState::ManualReview => { + report.issue( + "sdk_migration_receipt_manual_review", + Some(record_id), + format!( + "SDK migration receipt `{}` for operation `{}` requires manual review", + receipt.id, receipt.sdk_operation_kind + ), + ); + AppSdkMigrationAuditClassification::ManualReviewRequired + } + AppSdkMigrationState::Unsupported => { + report.issue( + "sdk_migration_receipt_unsupported", + Some(record_id), + format!( + "SDK migration receipt `{}` for operation `{}` is unsupported", + receipt.id, receipt.sdk_operation_kind + ), + ); + AppSdkMigrationAuditClassification::Unsupported + } + AppSdkMigrationState::Unknown => { + report.issue( + "sdk_migration_receipt_unknown", + Some(record_id), + format!( + "SDK migration receipt `{}` for operation `{}` is unknown", + receipt.id, receipt.sdk_operation_kind + ), + ); + AppSdkMigrationAuditClassification::Unknown + } + } +} + +fn receipt_allowed_for_source_classification( + classification: AppSdkMigrationAuditClassification, +) -> bool { + matches!( + classification, + AppSdkMigrationAuditClassification::PublishableCandidate + | AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate + ) +} + fn shared_local_event_kind(record: &LocalEventRecord) -> String { match record.family { LocalRecordFamily::LocalWork => record @@ -746,8 +873,9 @@ mod tests { use serde_json::json; use crate::{ - AppSdkMigrationAuditClassification, AppSdkMigrationAuditRequest, AppSqliteStore, - DatabaseTarget, + AppSdkMigrationAuditClassification, AppSdkMigrationAuditRequest, + AppSdkMigrationReceiptInput, AppSdkMigrationReceiptSourceKind, AppSdkMigrationState, + AppSqliteStore, DatabaseTarget, }; fn local_events_store() -> LocalEventsStore<SqliteExecutor> { @@ -933,6 +1061,141 @@ mod tests { } #[test] + fn local_outbox_audit_uses_migration_receipts_for_migratable_records() { + let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app store"); + let shared_events = local_events_store(); + let operation = farm_profile_operation("acct_seed", "receipt_matrix"); + + for (id, state) in [ + ("represented-source", AppSdkMigrationState::Enqueued), + ("skipped-source", AppSdkMigrationState::Skipped), + ("failed-source", AppSdkMigrationState::Failed), + ] { + insert_local_outbox_audit_row(&store, id, id, "pending", &operation); + record_local_outbox_receipt(&store, id, state); + } + + let report = store + .audit_sdk_migration( + &shared_events, + AppSdkMigrationAuditRequest { batch_size: 10 }, + ) + .expect("audit should run"); + + assert_eq!(report.local_outbox.scanned_records, 3); + assert_eq!( + count_named( + &report.local_outbox.classification_counts, + AppSdkMigrationAuditClassification::RepresentedRecord.storage_key() + ), + 1 + ); + assert_eq!( + count_named( + &report.local_outbox.classification_counts, + AppSdkMigrationAuditClassification::SkippedRecord.storage_key() + ), + 1 + ); + assert_eq!( + count_named( + &report.local_outbox.classification_counts, + AppSdkMigrationAuditClassification::FailedRecord.storage_key() + ), + 1 + ); + assert!( + report + .local_outbox + .issues + .iter() + .any(|issue| issue.code == "sdk_migration_receipt_failed") + ); + } + + #[test] + fn local_outbox_audit_does_not_let_receipts_hide_non_migratable_rows() { + let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app store"); + let shared_events = local_events_store(); + let operation = farm_profile_operation("acct_seed", "non_migratable"); + + insert_local_outbox_audit_row(&store, "failed-source", "acct_failed", "failed", &operation); + record_local_outbox_receipt(&store, "failed-source", AppSdkMigrationState::Enqueued); + insert_local_outbox_audit_row( + &store, + "unsupported-source", + "acct_unsupported", + "pending", + &PendingSyncOperation { + operation: radroots_app_sync::SyncOperationKind::Delete, + ..operation.clone() + }, + ); + record_local_outbox_receipt(&store, "unsupported-source", AppSdkMigrationState::Enqueued); + store + .connection() + .execute_batch("PRAGMA ignore_check_constraints = ON") + .expect("disable sqlite checks for defensive unknown state row"); + insert_local_outbox_audit_row( + &store, + "unknown-source", + "acct_unknown", + "mystery", + &operation, + ); + store + .connection() + .execute_batch("PRAGMA ignore_check_constraints = OFF") + .expect("restore sqlite checks"); + record_local_outbox_receipt(&store, "unknown-source", AppSdkMigrationState::Enqueued); + + let report = store + .audit_sdk_migration( + &shared_events, + AppSdkMigrationAuditRequest { batch_size: 10 }, + ) + .expect("audit should run"); + + assert_eq!( + count_named( + &report.local_outbox.classification_counts, + AppSdkMigrationAuditClassification::RepresentedRecord.storage_key() + ), + 0 + ); + assert_eq!( + count_named( + &report.local_outbox.classification_counts, + AppSdkMigrationAuditClassification::ManualReviewRequired.storage_key() + ), + 1 + ); + assert_eq!( + count_named( + &report.local_outbox.classification_counts, + AppSdkMigrationAuditClassification::Unsupported.storage_key() + ), + 1 + ); + assert_eq!( + count_named( + &report.local_outbox.classification_counts, + AppSdkMigrationAuditClassification::Unknown.storage_key() + ), + 1 + ); + assert_eq!( + report + .local_outbox + .issues + .iter() + .filter(|issue| issue.code == "sdk_migration_receipt_for_non_migratable_source") + .count(), + 3 + ); + } + + #[test] fn shared_local_events_audit_defers_payment_and_settlement() { let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app store"); let shared_events = local_events_store(); @@ -1206,6 +1469,28 @@ mod tests { .expect("insert local outbox audit row"); } + fn record_local_outbox_receipt( + store: &AppSqliteStore, + source_record_id: &str, + migration_state: AppSdkMigrationState, + ) { + store + .sdk_migration_receipt_repository() + .record_receipt(&AppSdkMigrationReceiptInput { + source_kind: AppSdkMigrationReceiptSourceKind::LocalOutbox, + source_record_id: source_record_id.to_owned(), + sdk_operation_kind: "farm.publish".to_owned(), + sdk_outbox_event_ids: vec![format!("sdk-outbox-{source_record_id}")], + expected_event_id: Some(format!("event-{source_record_id}")), + actor_pubkey: Some("actor-pubkey".to_owned()), + idempotency_digest_prefix: Some("digest-prefix".to_owned()), + migration_state, + recorded_at: "2026-06-18T12:00:00Z".to_owned(), + detail_json: json!({"source": source_record_id}), + }) + .expect("record local outbox receipt"); + } + fn local_work_record( record_id: &str, record_kind: &str, diff --git a/crates/store/src/migrations.rs b/crates/store/src/migrations.rs @@ -114,6 +114,10 @@ const MIGRATIONS: &[Migration] = &[ "../migrations/0027_local_interop_validation_receipt_projection_kind.sql" ), }, + Migration { + version: 28, + sql: include_str!("../migrations/0028_sdk_migration_receipts.sql"), + }, ]; pub fn latest_schema_version() -> u32 { diff --git a/crates/store/src/sdk_migration_receipts.rs b/crates/store/src/sdk_migration_receipts.rs @@ -0,0 +1,316 @@ +use rusqlite::{Connection, OptionalExtension, params}; +use serde_json::Value; +use uuid::Uuid; + +use crate::AppSqliteError; + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum AppSdkMigrationReceiptSourceKind { + LocalOutbox, + SharedLocalEvent, +} + +impl AppSdkMigrationReceiptSourceKind { + pub const fn storage_key(self) -> &'static str { + match self { + Self::LocalOutbox => "local_outbox", + Self::SharedLocalEvent => "shared_local_event", + } + } + + pub fn parse(value: &str) -> Result<Self, AppSqliteError> { + match value { + "local_outbox" => Ok(Self::LocalOutbox), + "shared_local_event" => Ok(Self::SharedLocalEvent), + _ => Err(AppSqliteError::DecodeEnum { + field: "app_sdk_migration_receipts.source_kind", + value: value.to_owned(), + }), + } + } +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum AppSdkMigrationState { + Pending, + Prepared, + Enqueued, + Pushed, + Failed, + Blocked, + Skipped, + Unsupported, + ManualReview, + Unknown, +} + +impl AppSdkMigrationState { + pub const fn storage_key(self) -> &'static str { + match self { + Self::Pending => "pending", + Self::Prepared => "prepared", + Self::Enqueued => "enqueued", + Self::Pushed => "pushed", + Self::Failed => "failed", + Self::Blocked => "blocked", + Self::Skipped => "skipped", + Self::Unsupported => "unsupported", + Self::ManualReview => "manual_review", + Self::Unknown => "unknown", + } + } + + pub fn parse(value: &str) -> Result<Self, AppSqliteError> { + match value { + "pending" => Ok(Self::Pending), + "prepared" => Ok(Self::Prepared), + "enqueued" => Ok(Self::Enqueued), + "pushed" => Ok(Self::Pushed), + "failed" => Ok(Self::Failed), + "blocked" => Ok(Self::Blocked), + "skipped" => Ok(Self::Skipped), + "unsupported" => Ok(Self::Unsupported), + "manual_review" => Ok(Self::ManualReview), + "unknown" => Ok(Self::Unknown), + _ => Err(AppSqliteError::DecodeEnum { + field: "app_sdk_migration_receipts.migration_state", + value: value.to_owned(), + }), + } + } +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct AppSdkMigrationReceiptInput { + pub source_kind: AppSdkMigrationReceiptSourceKind, + pub source_record_id: String, + pub sdk_operation_kind: String, + pub sdk_outbox_event_ids: Vec<String>, + pub expected_event_id: Option<String>, + pub actor_pubkey: Option<String>, + pub idempotency_digest_prefix: Option<String>, + pub migration_state: AppSdkMigrationState, + pub recorded_at: String, + pub detail_json: Value, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct AppSdkMigrationReceipt { + pub id: String, + pub source_kind: AppSdkMigrationReceiptSourceKind, + pub source_record_id: String, + pub sdk_operation_kind: String, + pub sdk_outbox_event_ids: Vec<String>, + pub expected_event_id: Option<String>, + pub actor_pubkey: Option<String>, + pub idempotency_digest_prefix: Option<String>, + pub migration_state: AppSdkMigrationState, + pub created_at: String, + pub updated_at: String, + pub detail_json: Value, +} + +pub struct AppSdkMigrationReceiptRepository<'a> { + connection: &'a Connection, +} + +impl<'a> AppSdkMigrationReceiptRepository<'a> { + pub const fn new(connection: &'a Connection) -> Self { + Self { connection } + } + + pub fn record_receipt( + &self, + input: &AppSdkMigrationReceiptInput, + ) -> Result<AppSdkMigrationReceipt, AppSqliteError> { + let receipt_id = Uuid::now_v7().to_string(); + let outbox_ids_json = + serde_json::to_string(&input.sdk_outbox_event_ids).map_err(|source| { + AppSqliteError::EncodeJson { + field: "app_sdk_migration_receipts.sdk_outbox_event_ids_json", + source, + } + })?; + let detail_json = serde_json::to_string(&input.detail_json).map_err(|source| { + AppSqliteError::EncodeJson { + field: "app_sdk_migration_receipts.detail_json", + source, + } + })?; + + self.connection + .execute( + "INSERT INTO app_sdk_migration_receipts ( + id, + source_kind, + source_record_id, + sdk_operation_kind, + sdk_outbox_event_ids_json, + expected_event_id, + actor_pubkey, + idempotency_digest_prefix, + migration_state, + created_at, + updated_at, + detail_json + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?10, ?11) + ON CONFLICT(source_kind, source_record_id) + DO UPDATE SET + sdk_operation_kind = excluded.sdk_operation_kind, + sdk_outbox_event_ids_json = excluded.sdk_outbox_event_ids_json, + expected_event_id = excluded.expected_event_id, + actor_pubkey = excluded.actor_pubkey, + idempotency_digest_prefix = excluded.idempotency_digest_prefix, + migration_state = excluded.migration_state, + updated_at = excluded.updated_at, + detail_json = excluded.detail_json", + params![ + receipt_id, + input.source_kind.storage_key(), + input.source_record_id.as_str(), + input.sdk_operation_kind.as_str(), + outbox_ids_json.as_str(), + input.expected_event_id.as_deref(), + input.actor_pubkey.as_deref(), + input.idempotency_digest_prefix.as_deref(), + input.migration_state.storage_key(), + input.recorded_at.as_str(), + detail_json.as_str(), + ], + ) + .map_err(|source| AppSqliteError::Query { + operation: "record app SDK migration receipt", + source, + })?; + + self.load_receipt(input.source_kind, input.source_record_id.as_str())? + .ok_or(AppSqliteError::MissingColumn { + field: "app_sdk_migration_receipts.id", + }) + } + + pub fn load_receipt( + &self, + source_kind: AppSdkMigrationReceiptSourceKind, + source_record_id: &str, + ) -> Result<Option<AppSdkMigrationReceipt>, AppSqliteError> { + self.connection + .query_row( + "SELECT + id, + source_kind, + source_record_id, + sdk_operation_kind, + sdk_outbox_event_ids_json, + expected_event_id, + actor_pubkey, + idempotency_digest_prefix, + migration_state, + created_at, + updated_at, + detail_json + FROM app_sdk_migration_receipts + WHERE source_kind = ?1 + AND source_record_id = ?2 + LIMIT 1", + params![source_kind.storage_key(), source_record_id], + decode_receipt_row, + ) + .optional() + .map_err(|source| AppSqliteError::Query { + operation: "load app SDK migration receipt", + source, + }) + } +} + +fn decode_receipt_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<AppSdkMigrationReceipt> { + let source_kind: String = row.get(1)?; + let outbox_ids_json: String = row.get(4)?; + let migration_state: String = row.get(8)?; + let detail_json: String = row.get(11)?; + Ok(AppSdkMigrationReceipt { + id: row.get(0)?, + source_kind: AppSdkMigrationReceiptSourceKind::parse(source_kind.as_str()) + .map_err(decode_app_error)?, + source_record_id: row.get(2)?, + sdk_operation_kind: row.get(3)?, + sdk_outbox_event_ids: serde_json::from_str(outbox_ids_json.as_str()).map_err(|source| { + decode_app_error(AppSqliteError::DecodeJson { + field: "app_sdk_migration_receipts.sdk_outbox_event_ids_json", + source, + }) + })?, + expected_event_id: row.get(5)?, + actor_pubkey: row.get(6)?, + idempotency_digest_prefix: row.get(7)?, + migration_state: AppSdkMigrationState::parse(migration_state.as_str()) + .map_err(decode_app_error)?, + created_at: row.get(9)?, + updated_at: row.get(10)?, + detail_json: serde_json::from_str(detail_json.as_str()).map_err(|source| { + decode_app_error(AppSqliteError::DecodeJson { + field: "app_sdk_migration_receipts.detail_json", + source, + }) + })?, + }) +} + +fn decode_app_error(error: AppSqliteError) -> rusqlite::Error { + rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(error)) +} + +#[cfg(test)] +mod tests { + use serde_json::json; + + use crate::{ + AppSdkMigrationReceiptInput, AppSdkMigrationReceiptSourceKind, AppSdkMigrationState, + AppSqliteStore, DatabaseTarget, + }; + + #[test] + fn migration_receipts_are_idempotent_by_source_record() { + let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app store"); + let first = store + .sdk_migration_receipt_repository() + .record_receipt(&AppSdkMigrationReceiptInput { + source_kind: AppSdkMigrationReceiptSourceKind::LocalOutbox, + source_record_id: "source-record-a".to_owned(), + sdk_operation_kind: "farm.publish".to_owned(), + sdk_outbox_event_ids: vec!["outbox-a".to_owned()], + expected_event_id: Some("expected-a".to_owned()), + actor_pubkey: Some("actor-a".to_owned()), + idempotency_digest_prefix: Some("digest-a".to_owned()), + migration_state: AppSdkMigrationState::Enqueued, + recorded_at: "2026-06-18T12:00:00Z".to_owned(), + detail_json: json!({"attempt": 1}), + }) + .expect("record first receipt"); + let second = store + .sdk_migration_receipt_repository() + .record_receipt(&AppSdkMigrationReceiptInput { + source_kind: AppSdkMigrationReceiptSourceKind::LocalOutbox, + source_record_id: "source-record-a".to_owned(), + sdk_operation_kind: "farm.publish".to_owned(), + sdk_outbox_event_ids: vec!["outbox-b".to_owned()], + expected_event_id: Some("expected-b".to_owned()), + actor_pubkey: Some("actor-b".to_owned()), + idempotency_digest_prefix: Some("digest-b".to_owned()), + migration_state: AppSdkMigrationState::Pushed, + recorded_at: "2026-06-18T12:05:00Z".to_owned(), + detail_json: json!({"attempt": 2}), + }) + .expect("record second receipt"); + + assert_eq!(first.id, second.id); + assert_eq!(second.created_at, "2026-06-18T12:00:00Z"); + assert_eq!(second.updated_at, "2026-06-18T12:05:00Z"); + assert_eq!(second.sdk_outbox_event_ids, vec!["outbox-b".to_owned()]); + assert_eq!(second.expected_event_id.as_deref(), Some("expected-b")); + assert_eq!(second.actor_pubkey.as_deref(), Some("actor-b")); + assert_eq!(second.migration_state, AppSdkMigrationState::Pushed); + assert_eq!(second.detail_json, json!({"attempt": 2})); + } +}