app

Local-first trade for farms and co-ops
git clone https://radroots.dev/git/app.git
Log | Files | Refs | README | LICENSE

migration_audit.rs (54265B)


      1 use std::collections::{BTreeMap, BTreeSet};
      2 
      3 use radroots_app_sync::{AppPublishPayload, SyncOperationKind};
      4 use radroots_events::kinds::{
      5     KIND_FARM, KIND_LISTING, KIND_LISTING_DRAFT, KIND_ORDER_CANCELLATION, KIND_ORDER_DECISION,
      6     KIND_ORDER_REQUEST, KIND_ORDER_REVISION_DECISION, KIND_ORDER_REVISION_PROPOSAL,
      7     KIND_TRADE_VALIDATION_RECEIPT,
      8 };
      9 use radroots_local_events::{
     10     LocalEventRecord, LocalEventsStore, LocalRecordFamily, LocalRecordStatus, PublishOutboxStatus,
     11 };
     12 use radroots_sql_core::SqlExecutor;
     13 use rusqlite::params;
     14 use serde_json::Value;
     15 
     16 use crate::{
     17     AppSdkMigrationReceipt, AppSdkMigrationReceiptSourceKind, AppSdkMigrationState, AppSqliteError,
     18     AppSqliteStore,
     19 };
     20 
     21 pub const APP_SDK_MIGRATION_AUDIT_DEFAULT_BATCH_SIZE: u32 = 500;
     22 pub const APP_SDK_MIGRATION_AUDIT_MAX_BATCH_SIZE: u32 = 1_000;
     23 
     24 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
     25 pub struct AppSdkMigrationAuditRequest {
     26     pub batch_size: u32,
     27 }
     28 
     29 impl Default for AppSdkMigrationAuditRequest {
     30     fn default() -> Self {
     31         Self {
     32             batch_size: APP_SDK_MIGRATION_AUDIT_DEFAULT_BATCH_SIZE,
     33         }
     34     }
     35 }
     36 
     37 impl AppSdkMigrationAuditRequest {
     38     pub fn normalized_batch_size(self) -> u32 {
     39         if self.batch_size == 0 {
     40             APP_SDK_MIGRATION_AUDIT_DEFAULT_BATCH_SIZE
     41         } else {
     42             self.batch_size.min(APP_SDK_MIGRATION_AUDIT_MAX_BATCH_SIZE)
     43         }
     44     }
     45 }
     46 
     47 #[derive(Clone, Debug, Eq, PartialEq)]
     48 pub struct AppSdkMigrationAuditReport {
     49     pub local_outbox: AppSdkMigrationAuditSourceReport,
     50     pub shared_local_events: AppSdkMigrationAuditSourceReport,
     51     pub issues: Vec<AppSdkMigrationAuditIssue>,
     52 }
     53 
     54 #[derive(Clone, Debug, Eq, PartialEq)]
     55 pub struct AppSdkMigrationAuditSourceReport {
     56     pub source: AppSdkMigrationAuditSource,
     57     pub batch_size: u32,
     58     pub batch_count: u64,
     59     pub scanned_records: u64,
     60     pub kind_counts: Vec<AppSdkMigrationAuditCount>,
     61     pub status_counts: Vec<AppSdkMigrationAuditCount>,
     62     pub classification_counts: Vec<AppSdkMigrationAuditCount>,
     63     pub duplicate_candidates: Vec<AppSdkMigrationAuditDuplicateCandidate>,
     64     pub issues: Vec<AppSdkMigrationAuditIssue>,
     65 }
     66 
     67 #[derive(Clone, Debug, Eq, PartialEq)]
     68 pub struct AppSdkMigrationAuditCount {
     69     pub key: String,
     70     pub count: u64,
     71 }
     72 
     73 #[derive(Clone, Debug, Eq, PartialEq)]
     74 pub struct AppSdkMigrationAuditDuplicateCandidate {
     75     pub identity_kind: String,
     76     pub identity_key: String,
     77     pub record_count: u64,
     78     pub record_ids: Vec<String>,
     79 }
     80 
     81 #[derive(Clone, Debug, Eq, PartialEq)]
     82 pub struct AppSdkMigrationAuditIssue {
     83     pub source: AppSdkMigrationAuditSource,
     84     pub code: String,
     85     pub record_id: Option<String>,
     86     pub message: String,
     87 }
     88 
     89 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
     90 pub enum AppSdkMigrationAuditSource {
     91     LocalOutbox,
     92     SharedLocalEvents,
     93 }
     94 
     95 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
     96 pub enum AppSdkMigrationAuditClassification {
     97     PublishableCandidate,
     98     AlreadyRepresentedCandidate,
     99     RepresentedRecord,
    100     SkippedRecord,
    101     FailedRecord,
    102     LocalWorkDeferred,
    103     ManualReviewRequired,
    104     ValidationReceiptDeferred,
    105     Unsupported,
    106     Unknown,
    107 }
    108 
    109 impl AppSdkMigrationAuditClassification {
    110     pub const fn storage_key(self) -> &'static str {
    111         match self {
    112             Self::PublishableCandidate => "publishable_candidate",
    113             Self::AlreadyRepresentedCandidate => "already_represented_candidate",
    114             Self::RepresentedRecord => "represented_record",
    115             Self::SkippedRecord => "skipped_record",
    116             Self::FailedRecord => "failed_record",
    117             Self::LocalWorkDeferred => "local_work_deferred",
    118             Self::ManualReviewRequired => "manual_review_required",
    119             Self::ValidationReceiptDeferred => "validation_receipt_deferred",
    120             Self::Unsupported => "unsupported",
    121             Self::Unknown => "unknown",
    122         }
    123     }
    124 }
    125 
    126 impl AppSqliteStore {
    127     pub fn audit_sdk_migration<E>(
    128         &self,
    129         shared_local_events: &LocalEventsStore<E>,
    130         request: AppSdkMigrationAuditRequest,
    131     ) -> Result<AppSdkMigrationAuditReport, AppSqliteError>
    132     where
    133         E: SqlExecutor,
    134     {
    135         let local_outbox = self.audit_sdk_migration_local_outbox(request)?;
    136         let shared_local_events =
    137             self.audit_sdk_migration_shared_local_events(shared_local_events, request)?;
    138         let issues = local_outbox
    139             .issues
    140             .iter()
    141             .chain(shared_local_events.issues.iter())
    142             .cloned()
    143             .collect();
    144 
    145         Ok(AppSdkMigrationAuditReport {
    146             local_outbox,
    147             shared_local_events,
    148             issues,
    149         })
    150     }
    151 
    152     pub fn audit_sdk_migration_local_outbox(
    153         &self,
    154         request: AppSdkMigrationAuditRequest,
    155     ) -> Result<AppSdkMigrationAuditSourceReport, AppSqliteError> {
    156         let batch_size = request.normalized_batch_size();
    157         let mut report = AppSdkMigrationAuditSourceBuilder::new(
    158             AppSdkMigrationAuditSource::LocalOutbox,
    159             batch_size,
    160         );
    161         let mut last_rowid = 0_i64;
    162 
    163         loop {
    164             let rows = self.load_local_outbox_audit_batch(last_rowid, batch_size)?;
    165             if rows.is_empty() {
    166                 break;
    167             }
    168             report.batch_count += 1;
    169             for row in &rows {
    170                 last_rowid = row.rowid;
    171                 let receipt = self.sdk_migration_receipt_repository().load_receipt(
    172                     AppSdkMigrationReceiptSourceKind::LocalOutbox,
    173                     row.id.as_str(),
    174                 )?;
    175                 audit_local_outbox_row(row, receipt.as_ref(), &mut report);
    176             }
    177             if rows.len() < batch_size as usize {
    178                 break;
    179             }
    180         }
    181 
    182         Ok(report.finish())
    183     }
    184 
    185     pub fn audit_sdk_migration_shared_local_events<E>(
    186         &self,
    187         store: &LocalEventsStore<E>,
    188         request: AppSdkMigrationAuditRequest,
    189     ) -> Result<AppSdkMigrationAuditSourceReport, AppSqliteError>
    190     where
    191         E: SqlExecutor,
    192     {
    193         audit_sdk_migration_shared_local_events_with_receipts(store, request, |record_id| {
    194             self.sdk_migration_receipt_repository().load_receipt(
    195                 AppSdkMigrationReceiptSourceKind::SharedLocalEvent,
    196                 record_id,
    197             )
    198         })
    199     }
    200 }
    201 
    202 fn audit_sdk_migration_shared_local_events_with_receipts<E>(
    203     store: &LocalEventsStore<E>,
    204     request: AppSdkMigrationAuditRequest,
    205     mut load_receipt: impl FnMut(&str) -> Result<Option<AppSdkMigrationReceipt>, AppSqliteError>,
    206 ) -> Result<AppSdkMigrationAuditSourceReport, AppSqliteError>
    207 where
    208     E: SqlExecutor,
    209 {
    210     let batch_size = request.normalized_batch_size();
    211     let mut report = AppSdkMigrationAuditSourceBuilder::new(
    212         AppSdkMigrationAuditSource::SharedLocalEvents,
    213         batch_size,
    214     );
    215     let mut after_change_seq = 0_i64;
    216 
    217     loop {
    218         let records = store
    219             .list_records_changed_after(after_change_seq, batch_size)
    220             .map_err(|source| AppSqliteError::LocalEvents {
    221                 operation: "audit shared local event records",
    222                 source,
    223             })?;
    224         if records.is_empty() {
    225             break;
    226         }
    227         report.batch_count += 1;
    228         for record in &records {
    229             after_change_seq = record.change_seq;
    230             let receipt = load_receipt(record.record_id.as_str())?;
    231             audit_shared_local_event_record(record, receipt.as_ref(), &mut report);
    232         }
    233         if records.len() < batch_size as usize {
    234             break;
    235         }
    236     }
    237 
    238     Ok(report.finish())
    239 }
    240 
    241 impl AppSqliteStore {
    242     fn load_local_outbox_audit_batch(
    243         &self,
    244         after_rowid: i64,
    245         limit: u32,
    246     ) -> Result<Vec<LocalOutboxAuditRow>, AppSqliteError> {
    247         let mut statement = self
    248             .connection()
    249             .prepare(
    250                 "SELECT
    251                     rowid,
    252                     id,
    253                     account_id,
    254                     operation_key,
    255                     aggregate_kind,
    256                     aggregate_id,
    257                     operation_kind,
    258                     payload_json,
    259                     state
    260                  FROM local_outbox
    261                  WHERE rowid > ?1
    262                  ORDER BY rowid ASC
    263                  LIMIT ?2",
    264             )
    265             .map_err(|source| AppSqliteError::Query {
    266                 operation: "prepare SDK migration local outbox audit query",
    267                 source,
    268             })?;
    269         let rows = statement
    270             .query_map(params![after_rowid, i64::from(limit)], |row| {
    271                 Ok(LocalOutboxAuditRow {
    272                     rowid: row.get(0)?,
    273                     id: row.get(1)?,
    274                     account_id: row.get(2)?,
    275                     operation_key: row.get(3)?,
    276                     aggregate_kind: row.get(4)?,
    277                     aggregate_id: row.get(5)?,
    278                     operation_kind: row.get(6)?,
    279                     payload_json: row.get(7)?,
    280                     state: row.get(8)?,
    281                 })
    282             })
    283             .map_err(|source| AppSqliteError::Query {
    284                 operation: "query SDK migration local outbox audit rows",
    285                 source,
    286             })?;
    287 
    288         rows.map(|row| {
    289             row.map_err(|source| AppSqliteError::Query {
    290                 operation: "read SDK migration local outbox audit row",
    291                 source,
    292             })
    293         })
    294         .collect()
    295     }
    296 }
    297 
    298 #[derive(Clone, Debug, Eq, PartialEq)]
    299 struct LocalOutboxAuditRow {
    300     rowid: i64,
    301     id: String,
    302     account_id: String,
    303     operation_key: String,
    304     aggregate_kind: String,
    305     aggregate_id: String,
    306     operation_kind: String,
    307     payload_json: String,
    308     state: String,
    309 }
    310 
    311 struct AppSdkMigrationAuditSourceBuilder {
    312     source: AppSdkMigrationAuditSource,
    313     batch_size: u32,
    314     batch_count: u64,
    315     scanned_records: u64,
    316     kind_counts: BTreeMap<String, u64>,
    317     status_counts: BTreeMap<String, u64>,
    318     classification_counts: BTreeMap<String, u64>,
    319     duplicate_records: BTreeMap<DuplicateIdentity, BTreeSet<String>>,
    320     issues: Vec<AppSdkMigrationAuditIssue>,
    321 }
    322 
    323 impl AppSdkMigrationAuditSourceBuilder {
    324     fn new(source: AppSdkMigrationAuditSource, batch_size: u32) -> Self {
    325         Self {
    326             source,
    327             batch_size,
    328             batch_count: 0,
    329             scanned_records: 0,
    330             kind_counts: BTreeMap::new(),
    331             status_counts: BTreeMap::new(),
    332             classification_counts: BTreeMap::new(),
    333             duplicate_records: BTreeMap::new(),
    334             issues: Vec::new(),
    335         }
    336     }
    337 
    338     fn record(
    339         &mut self,
    340         record_id: &str,
    341         kind: String,
    342         status: String,
    343         classification: AppSdkMigrationAuditClassification,
    344         duplicate_identities: Vec<DuplicateIdentity>,
    345     ) {
    346         self.scanned_records += 1;
    347         increment_count(&mut self.kind_counts, kind);
    348         increment_count(&mut self.status_counts, status);
    349         increment_count(
    350             &mut self.classification_counts,
    351             classification.storage_key().to_owned(),
    352         );
    353         for identity in duplicate_identities {
    354             self.duplicate_records
    355                 .entry(identity)
    356                 .or_default()
    357                 .insert(record_id.to_owned());
    358         }
    359     }
    360 
    361     fn issue(&mut self, code: &str, record_id: Option<&str>, message: impl Into<String>) {
    362         self.issues.push(AppSdkMigrationAuditIssue {
    363             source: self.source,
    364             code: code.to_owned(),
    365             record_id: record_id.map(ToOwned::to_owned),
    366             message: message.into(),
    367         });
    368     }
    369 
    370     fn finish(self) -> AppSdkMigrationAuditSourceReport {
    371         AppSdkMigrationAuditSourceReport {
    372             source: self.source,
    373             batch_size: self.batch_size,
    374             batch_count: self.batch_count,
    375             scanned_records: self.scanned_records,
    376             kind_counts: counts_from_map(self.kind_counts),
    377             status_counts: counts_from_map(self.status_counts),
    378             classification_counts: counts_from_map(self.classification_counts),
    379             duplicate_candidates: duplicate_candidates_from_map(self.duplicate_records),
    380             issues: self.issues,
    381         }
    382     }
    383 }
    384 
    385 #[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd)]
    386 struct DuplicateIdentity {
    387     kind: String,
    388     key: String,
    389 }
    390 
    391 fn audit_local_outbox_row(
    392     row: &LocalOutboxAuditRow,
    393     receipt: Option<&AppSdkMigrationReceipt>,
    394     report: &mut AppSdkMigrationAuditSourceBuilder,
    395 ) {
    396     let payload = serde_json::from_str::<AppPublishPayload>(row.payload_json.as_str());
    397     let (kind, source_classification) = match payload {
    398         Ok(payload) => {
    399             if row.operation_kind == SyncOperationKind::Delete.storage_key() {
    400                 report.issue(
    401                     "unsupported_local_outbox_operation",
    402                     Some(row.id.as_str()),
    403                     format!(
    404                         "local outbox delete operation `{}` is not a SDK publish migration candidate",
    405                         row.operation_key
    406                     ),
    407                 );
    408                 (
    409                     payload.work_kind().storage_key().to_owned(),
    410                     AppSdkMigrationAuditClassification::Unsupported,
    411                 )
    412             } else {
    413                 (
    414                     payload.work_kind().storage_key().to_owned(),
    415                     classify_local_outbox_state(row, report),
    416                 )
    417             }
    418         }
    419         Err(source) => {
    420             report.issue(
    421                 "unknown_local_outbox_payload",
    422                 Some(row.id.as_str()),
    423                 format!(
    424                     "local outbox payload for operation `{}` could not be decoded: {source}",
    425                     row.operation_key
    426                 ),
    427             );
    428             (
    429                 format!("{}:{}", row.aggregate_kind, row.operation_kind),
    430                 AppSdkMigrationAuditClassification::Unknown,
    431             )
    432         }
    433     };
    434     let classification =
    435         classify_receipt_overlay(row.id.as_str(), source_classification, receipt, report);
    436     let identities = vec![
    437         DuplicateIdentity {
    438             kind: "operation".to_owned(),
    439             key: format!("{}:{}", row.account_id, row.operation_key),
    440         },
    441         DuplicateIdentity {
    442             kind: "aggregate".to_owned(),
    443             key: format!(
    444                 "{}:{}:{}:{}",
    445                 row.account_id, row.aggregate_kind, row.aggregate_id, row.operation_kind
    446             ),
    447         },
    448     ];
    449     report.record(
    450         row.id.as_str(),
    451         kind,
    452         row.state.clone(),
    453         classification,
    454         identities,
    455     );
    456 }
    457 
    458 fn classify_local_outbox_state(
    459     row: &LocalOutboxAuditRow,
    460     report: &mut AppSdkMigrationAuditSourceBuilder,
    461 ) -> AppSdkMigrationAuditClassification {
    462     match row.state.as_str() {
    463         "pending" | "in_progress" | "retryable" => {
    464             AppSdkMigrationAuditClassification::PublishableCandidate
    465         }
    466         "succeeded" => AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate,
    467         "failed" | "blocked" => {
    468             report.issue(
    469                 "manual_review_local_outbox_state",
    470                 Some(row.id.as_str()),
    471                 format!(
    472                     "local outbox operation `{}` is in `{}` state and requires migration review",
    473                     row.operation_key, row.state
    474                 ),
    475             );
    476             AppSdkMigrationAuditClassification::ManualReviewRequired
    477         }
    478         _ => {
    479             report.issue(
    480                 "unknown_local_outbox_state",
    481                 Some(row.id.as_str()),
    482                 format!(
    483                     "local outbox operation `{}` has unknown state `{}`",
    484                     row.operation_key, row.state
    485                 ),
    486             );
    487             AppSdkMigrationAuditClassification::Unknown
    488         }
    489     }
    490 }
    491 
    492 fn audit_shared_local_event_record(
    493     record: &LocalEventRecord,
    494     receipt: Option<&AppSdkMigrationReceipt>,
    495     report: &mut AppSdkMigrationAuditSourceBuilder,
    496 ) {
    497     let kind = shared_local_event_kind(record);
    498     let source_classification = shared_local_event_classification(record, report);
    499     let classification = classify_receipt_overlay(
    500         record.record_id.as_str(),
    501         source_classification,
    502         receipt,
    503         report,
    504     );
    505     report.record(
    506         record.record_id.as_str(),
    507         kind,
    508         format!(
    509             "{}:{}",
    510             record.status.as_str(),
    511             record.outbox_status.as_str()
    512         ),
    513         classification,
    514         shared_local_event_duplicate_identities(record),
    515     );
    516 }
    517 
    518 fn classify_receipt_overlay(
    519     record_id: &str,
    520     source_classification: AppSdkMigrationAuditClassification,
    521     receipt: Option<&AppSdkMigrationReceipt>,
    522     report: &mut AppSdkMigrationAuditSourceBuilder,
    523 ) -> AppSdkMigrationAuditClassification {
    524     let Some(receipt) = receipt else {
    525         return source_classification;
    526     };
    527     if !receipt_allowed_for_source_classification(source_classification) {
    528         report.issue(
    529             "sdk_migration_receipt_for_non_migratable_source",
    530             Some(record_id),
    531             format!(
    532                 "SDK migration receipt `{}` for operation `{}` cannot override source classification `{}`",
    533                 receipt.id,
    534                 receipt.sdk_operation_kind,
    535                 source_classification.storage_key()
    536             ),
    537         );
    538         return source_classification;
    539     }
    540 
    541     match receipt.migration_state {
    542         AppSdkMigrationState::Pending | AppSdkMigrationState::Prepared => source_classification,
    543         AppSdkMigrationState::Enqueued | AppSdkMigrationState::Pushed => {
    544             AppSdkMigrationAuditClassification::RepresentedRecord
    545         }
    546         AppSdkMigrationState::Skipped => AppSdkMigrationAuditClassification::SkippedRecord,
    547         AppSdkMigrationState::Failed => {
    548             report.issue(
    549                 "sdk_migration_receipt_failed",
    550                 Some(record_id),
    551                 format!(
    552                     "SDK migration receipt `{}` for operation `{}` is failed",
    553                     receipt.id, receipt.sdk_operation_kind
    554                 ),
    555             );
    556             AppSdkMigrationAuditClassification::FailedRecord
    557         }
    558         AppSdkMigrationState::Blocked | AppSdkMigrationState::ManualReview => {
    559             report.issue(
    560                 "sdk_migration_receipt_manual_review",
    561                 Some(record_id),
    562                 format!(
    563                     "SDK migration receipt `{}` for operation `{}` requires manual review",
    564                     receipt.id, receipt.sdk_operation_kind
    565                 ),
    566             );
    567             AppSdkMigrationAuditClassification::ManualReviewRequired
    568         }
    569         AppSdkMigrationState::Unsupported => {
    570             report.issue(
    571                 "sdk_migration_receipt_unsupported",
    572                 Some(record_id),
    573                 format!(
    574                     "SDK migration receipt `{}` for operation `{}` is unsupported",
    575                     receipt.id, receipt.sdk_operation_kind
    576                 ),
    577             );
    578             AppSdkMigrationAuditClassification::Unsupported
    579         }
    580         AppSdkMigrationState::Unknown => {
    581             report.issue(
    582                 "sdk_migration_receipt_unknown",
    583                 Some(record_id),
    584                 format!(
    585                     "SDK migration receipt `{}` for operation `{}` is unknown",
    586                     receipt.id, receipt.sdk_operation_kind
    587                 ),
    588             );
    589             AppSdkMigrationAuditClassification::Unknown
    590         }
    591     }
    592 }
    593 
    594 fn receipt_allowed_for_source_classification(
    595     classification: AppSdkMigrationAuditClassification,
    596 ) -> bool {
    597     matches!(
    598         classification,
    599         AppSdkMigrationAuditClassification::PublishableCandidate
    600             | AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate
    601     )
    602 }
    603 
    604 fn shared_local_event_kind(record: &LocalEventRecord) -> String {
    605     match record.family {
    606         LocalRecordFamily::LocalWork => record
    607             .local_work_json
    608             .as_ref()
    609             .and_then(local_work_record_kind)
    610             .map(|kind| format!("local_work:{kind}"))
    611             .unwrap_or_else(|| "local_work:unknown".to_owned()),
    612         LocalRecordFamily::SignedEvent => record
    613             .event_kind
    614             .map(shared_signed_event_kind)
    615             .unwrap_or_else(|| "signed_event:unknown".to_owned()),
    616     }
    617 }
    618 
    619 fn shared_local_event_classification(
    620     record: &LocalEventRecord,
    621     report: &mut AppSdkMigrationAuditSourceBuilder,
    622 ) -> AppSdkMigrationAuditClassification {
    623     match record.family {
    624         LocalRecordFamily::LocalWork => classify_shared_local_work(record, report),
    625         LocalRecordFamily::SignedEvent => classify_shared_signed_event(record, report),
    626     }
    627 }
    628 
    629 fn classify_shared_local_work(
    630     record: &LocalEventRecord,
    631     report: &mut AppSdkMigrationAuditSourceBuilder,
    632 ) -> AppSdkMigrationAuditClassification {
    633     match record
    634         .local_work_json
    635         .as_ref()
    636         .and_then(local_work_record_kind)
    637     {
    638         Some("farm_config_v1" | "listing_draft_v1") => classify_shared_local_work_status(record),
    639         Some(record_kind) => {
    640             report.issue(
    641                 "unsupported_shared_local_work_kind",
    642                 Some(record.record_id.as_str()),
    643                 format!("shared local work kind `{record_kind}` is not a SDK migration candidate"),
    644             );
    645             AppSdkMigrationAuditClassification::Unsupported
    646         }
    647         None => {
    648             report.issue(
    649                 "unknown_shared_local_work_kind",
    650                 Some(record.record_id.as_str()),
    651                 "shared local work record does not expose a record_kind",
    652             );
    653             AppSdkMigrationAuditClassification::Unknown
    654         }
    655     }
    656 }
    657 
    658 fn classify_shared_local_work_status(
    659     record: &LocalEventRecord,
    660 ) -> AppSdkMigrationAuditClassification {
    661     if matches!(record.outbox_status, PublishOutboxStatus::Acknowledged)
    662         || matches!(record.status, LocalRecordStatus::Published)
    663     {
    664         AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate
    665     } else if matches!(record.outbox_status, PublishOutboxStatus::Failed)
    666         || matches!(
    667             record.status,
    668             LocalRecordStatus::Failed | LocalRecordStatus::Conflict
    669         )
    670     {
    671         AppSdkMigrationAuditClassification::ManualReviewRequired
    672     } else if matches!(record.status, LocalRecordStatus::PendingPublish) {
    673         AppSdkMigrationAuditClassification::PublishableCandidate
    674     } else {
    675         AppSdkMigrationAuditClassification::LocalWorkDeferred
    676     }
    677 }
    678 
    679 fn classify_shared_signed_event(
    680     record: &LocalEventRecord,
    681     report: &mut AppSdkMigrationAuditSourceBuilder,
    682 ) -> AppSdkMigrationAuditClassification {
    683     match record.event_kind {
    684         Some(kind) if kind == KIND_TRADE_VALIDATION_RECEIPT as i64 => {
    685             AppSdkMigrationAuditClassification::ValidationReceiptDeferred
    686         }
    687         Some(kind) if supported_signed_event_kind(kind) => {
    688             if signed_event_is_already_represented(record.status, record.outbox_status) {
    689                 AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate
    690             } else {
    691                 AppSdkMigrationAuditClassification::PublishableCandidate
    692             }
    693         }
    694         Some(kind) => {
    695             report.issue(
    696                 "unsupported_shared_signed_event_kind",
    697                 Some(record.record_id.as_str()),
    698                 format!("shared signed event kind `{kind}` is not a SDK migration candidate"),
    699             );
    700             AppSdkMigrationAuditClassification::Unsupported
    701         }
    702         None => {
    703             report.issue(
    704                 "unknown_shared_signed_event_kind",
    705                 Some(record.record_id.as_str()),
    706                 "shared signed event record does not expose an event_kind",
    707             );
    708             AppSdkMigrationAuditClassification::Unknown
    709         }
    710     }
    711 }
    712 
    713 fn signed_event_is_already_represented(
    714     status: LocalRecordStatus,
    715     outbox_status: PublishOutboxStatus,
    716 ) -> bool {
    717     matches!(status, LocalRecordStatus::Published)
    718         || matches!(outbox_status, PublishOutboxStatus::Acknowledged)
    719 }
    720 
    721 fn shared_local_event_duplicate_identities(record: &LocalEventRecord) -> Vec<DuplicateIdentity> {
    722     let mut identities = Vec::new();
    723     if let (Some(event_kind), Some(event_id)) = (
    724         record.event_kind,
    725         non_empty_value(record.event_id.as_deref()),
    726     ) {
    727         identities.push(DuplicateIdentity {
    728             kind: "event".to_owned(),
    729             key: format!("{event_kind}:{event_id}"),
    730         });
    731     }
    732     if let Some(key) = shared_local_event_aggregate_key(record) {
    733         identities.push(DuplicateIdentity {
    734             kind: "aggregate".to_owned(),
    735             key,
    736         });
    737     }
    738     identities
    739 }
    740 
    741 fn shared_local_event_aggregate_key(record: &LocalEventRecord) -> Option<String> {
    742     match record.family {
    743         LocalRecordFamily::LocalWork => {
    744             let record_kind = record
    745                 .local_work_json
    746                 .as_ref()
    747                 .and_then(local_work_record_kind)?;
    748             non_empty_value(record.farm_id.as_deref())
    749                 .map(|farm_id| format!("local_work:{record_kind}:farm:{farm_id}"))
    750                 .or_else(|| {
    751                     non_empty_value(record.listing_addr.as_deref()).map(|listing_addr| {
    752                         format!("local_work:{record_kind}:listing:{listing_addr}")
    753                     })
    754                 })
    755         }
    756         LocalRecordFamily::SignedEvent => {
    757             let event_kind = record.event_kind?;
    758             non_empty_value(record.listing_addr.as_deref())
    759                 .map(|listing_addr| format!("signed_event:{event_kind}:listing:{listing_addr}"))
    760                 .or_else(|| {
    761                     non_empty_value(record.farm_id.as_deref())
    762                         .map(|farm_id| format!("signed_event:{event_kind}:farm:{farm_id}"))
    763                 })
    764         }
    765     }
    766 }
    767 
    768 fn local_work_record_kind(payload: &Value) -> Option<&str> {
    769     payload
    770         .get("record_kind")
    771         .and_then(Value::as_str)
    772         .map(str::trim)
    773         .filter(|value| !value.is_empty())
    774 }
    775 
    776 fn supported_signed_event_kind(kind: i64) -> bool {
    777     matches!(
    778         kind,
    779         value if value == KIND_FARM as i64
    780             || value == KIND_LISTING as i64
    781             || value == KIND_LISTING_DRAFT as i64
    782             || value == KIND_ORDER_REQUEST as i64
    783             || value == KIND_ORDER_DECISION as i64
    784             || value == KIND_ORDER_REVISION_PROPOSAL as i64
    785             || value == KIND_ORDER_REVISION_DECISION as i64
    786             || value == KIND_ORDER_CANCELLATION as i64
    787     )
    788 }
    789 
    790 fn shared_signed_event_kind(kind: i64) -> String {
    791     let name = match kind {
    792         value if value == KIND_FARM as i64 => "farm",
    793         value if value == KIND_LISTING as i64 => "listing",
    794         value if value == KIND_LISTING_DRAFT as i64 => "listing_draft",
    795         value if value == KIND_ORDER_REQUEST as i64 => "order_request",
    796         value if value == KIND_ORDER_DECISION as i64 => "order_decision",
    797         value if value == KIND_ORDER_REVISION_PROPOSAL as i64 => "order_revision_proposal",
    798         value if value == KIND_ORDER_REVISION_DECISION as i64 => "order_revision_decision",
    799         value if value == KIND_ORDER_CANCELLATION as i64 => "order_cancellation",
    800         value if value == KIND_TRADE_VALIDATION_RECEIPT as i64 => "trade_validation_receipt",
    801         _ => "unsupported",
    802     };
    803     format!("signed_event:{name}:{kind}")
    804 }
    805 
    806 fn non_empty_value(value: Option<&str>) -> Option<&str> {
    807     value.map(str::trim).filter(|value| !value.is_empty())
    808 }
    809 
    810 fn increment_count(counts: &mut BTreeMap<String, u64>, key: String) {
    811     *counts.entry(key).or_default() += 1;
    812 }
    813 
    814 fn counts_from_map(counts: BTreeMap<String, u64>) -> Vec<AppSdkMigrationAuditCount> {
    815     counts
    816         .into_iter()
    817         .map(|(key, count)| AppSdkMigrationAuditCount { key, count })
    818         .collect()
    819 }
    820 
    821 fn duplicate_candidates_from_map(
    822     duplicate_records: BTreeMap<DuplicateIdentity, BTreeSet<String>>,
    823 ) -> Vec<AppSdkMigrationAuditDuplicateCandidate> {
    824     duplicate_records
    825         .into_iter()
    826         .filter_map(|(identity, records)| {
    827             if records.len() < 2 {
    828                 return None;
    829             }
    830             Some(AppSdkMigrationAuditDuplicateCandidate {
    831                 identity_kind: identity.kind,
    832                 identity_key: identity.key,
    833                 record_count: records.len() as u64,
    834                 record_ids: records.into_iter().collect(),
    835             })
    836         })
    837         .collect()
    838 }
    839 
    840 #[cfg(test)]
    841 mod tests {
    842     use radroots_app_sync::{
    843         AppFarmProfilePublishPayload, AppPublishContext, AppPublishPayload, PendingSyncOperation,
    844     };
    845     use radroots_app_view::{FarmId, FarmReadiness};
    846     use radroots_events::kinds::{KIND_LISTING, KIND_ORDER_REQUEST, KIND_TRADE_VALIDATION_RECEIPT};
    847     use radroots_local_events::{
    848         LocalEventRecord, LocalEventRecordInput, LocalEventsStore, LocalRecordFamily,
    849         LocalRecordStatus, PublishOutboxStatus, SourceRuntime,
    850     };
    851     use radroots_sql_core::SqliteExecutor;
    852     use rusqlite::params;
    853     use serde_json::json;
    854 
    855     use crate::{
    856         AppSdkMigrationAuditClassification, AppSdkMigrationAuditRequest,
    857         AppSdkMigrationReceiptInput, AppSdkMigrationReceiptSourceKind, AppSdkMigrationState,
    858         AppSqliteStore, DatabaseTarget,
    859     };
    860 
    861     fn local_events_store() -> LocalEventsStore<SqliteExecutor> {
    862         let executor = SqliteExecutor::open_memory().expect("open local events memory db");
    863         let store = LocalEventsStore::new(executor);
    864         store.migrate_up().expect("migrate local events store");
    865         store
    866     }
    867 
    868     fn count_named(counts: &[crate::AppSdkMigrationAuditCount], key: &str) -> u64 {
    869         counts
    870             .iter()
    871             .find(|count| count.key == key)
    872             .map(|count| count.count)
    873             .unwrap_or_default()
    874     }
    875 
    876     #[test]
    877     fn local_outbox_audit_reads_batches_without_mutating_rows() {
    878         let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app store");
    879         let shared_events = local_events_store();
    880         let farm_id = FarmId::new();
    881         let operation = PendingSyncOperation::from_publish_payload(
    882             AppPublishPayload::FarmProfile(AppFarmProfilePublishPayload {
    883                 context: AppPublishContext::new("acct_a", "farm_setup"),
    884                 farm_id,
    885                 display_name: "Green Loop Farm".to_owned(),
    886                 readiness: Some(FarmReadiness::Ready),
    887             }),
    888             "2026-06-18T12:00:00Z",
    889         )
    890         .expect("build publish operation");
    891         store
    892             .sync_repository()
    893             .enqueue_pending_operation("acct_a", &operation)
    894             .expect("enqueue operation");
    895         store
    896             .connection()
    897             .execute(
    898                 "INSERT INTO local_outbox (
    899                     id,
    900                     account_id,
    901                     operation_key,
    902                     aggregate_kind,
    903                     aggregate_id,
    904                     operation_kind,
    905                     payload_json,
    906                     created_at,
    907                     available_at,
    908                     attempt_count,
    909                     state,
    910                     last_error_message
    911                 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, NULL)",
    912                 params![
    913                     "succeeded-duplicate",
    914                     "acct_a",
    915                     operation.operation_key,
    916                     operation.aggregate.aggregate_kind(),
    917                     operation.aggregate.aggregate_id(),
    918                     operation.operation.storage_key(),
    919                     operation.payload_json,
    920                     "2026-06-18T11:00:00Z",
    921                     "2026-06-18T11:00:00Z",
    922                     0_i64,
    923                     "succeeded",
    924                 ],
    925             )
    926             .expect("insert succeeded duplicate");
    927         let before_count = local_outbox_row_count(&store);
    928 
    929         let report = store
    930             .audit_sdk_migration(
    931                 &shared_events,
    932                 AppSdkMigrationAuditRequest { batch_size: 1 },
    933             )
    934             .expect("audit should run");
    935 
    936         assert_eq!(local_outbox_row_count(&store), before_count);
    937         assert_eq!(report.local_outbox.batch_size, 1);
    938         assert_eq!(report.local_outbox.batch_count, 2);
    939         assert_eq!(report.local_outbox.scanned_records, 2);
    940         assert_eq!(
    941             count_named(&report.local_outbox.kind_counts, "farm_profile"),
    942             2
    943         );
    944         assert_eq!(
    945             count_named(&report.local_outbox.status_counts, "pending"),
    946             1
    947         );
    948         assert_eq!(
    949             count_named(&report.local_outbox.status_counts, "succeeded"),
    950             1
    951         );
    952         assert_eq!(
    953             count_named(
    954                 &report.local_outbox.classification_counts,
    955                 AppSdkMigrationAuditClassification::PublishableCandidate.storage_key()
    956             ),
    957             1
    958         );
    959         assert_eq!(
    960             count_named(
    961                 &report.local_outbox.classification_counts,
    962                 AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate.storage_key()
    963             ),
    964             1
    965         );
    966         assert!(
    967             report
    968                 .local_outbox
    969                 .duplicate_candidates
    970                 .iter()
    971                 .any(|candidate| candidate.identity_kind == "operation"
    972                     && candidate.record_count == 2)
    973         );
    974         assert_eq!(report.shared_local_events.scanned_records, 0);
    975     }
    976 
    977     #[test]
    978     fn local_outbox_audit_classifies_status_matrix() {
    979         let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app store");
    980         let shared_events = local_events_store();
    981         let operation = farm_profile_operation("acct_seed", "status_matrix");
    982 
    983         for (index, state) in [
    984             "pending",
    985             "in_progress",
    986             "retryable",
    987             "failed",
    988             "blocked",
    989             "succeeded",
    990         ]
    991         .iter()
    992         .enumerate()
    993         {
    994             insert_local_outbox_audit_row(
    995                 &store,
    996                 &format!("local-outbox-{state}"),
    997                 &format!("acct_{index}"),
    998                 state,
    999                 &operation,
   1000             );
   1001         }
   1002 
   1003         let report = store
   1004             .audit_sdk_migration(
   1005                 &shared_events,
   1006                 AppSdkMigrationAuditRequest { batch_size: 2 },
   1007             )
   1008             .expect("audit should run");
   1009 
   1010         assert_eq!(report.local_outbox.scanned_records, 6);
   1011         assert_eq!(
   1012             count_named(
   1013                 &report.local_outbox.classification_counts,
   1014                 AppSdkMigrationAuditClassification::PublishableCandidate.storage_key()
   1015             ),
   1016             3
   1017         );
   1018         assert_eq!(
   1019             count_named(
   1020                 &report.local_outbox.classification_counts,
   1021                 AppSdkMigrationAuditClassification::ManualReviewRequired.storage_key()
   1022             ),
   1023             2
   1024         );
   1025         assert_eq!(
   1026             count_named(
   1027                 &report.local_outbox.classification_counts,
   1028                 AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate.storage_key()
   1029             ),
   1030             1
   1031         );
   1032         assert_eq!(
   1033             report
   1034                 .local_outbox
   1035                 .issues
   1036                 .iter()
   1037                 .filter(|issue| issue.code == "manual_review_local_outbox_state")
   1038                 .count(),
   1039             2
   1040         );
   1041     }
   1042 
   1043     #[test]
   1044     fn local_outbox_audit_uses_migration_receipts_for_migratable_records() {
   1045         let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app store");
   1046         let shared_events = local_events_store();
   1047         let operation = farm_profile_operation("acct_seed", "receipt_matrix");
   1048 
   1049         for (id, state) in [
   1050             ("represented-source", AppSdkMigrationState::Enqueued),
   1051             ("skipped-source", AppSdkMigrationState::Skipped),
   1052             ("failed-source", AppSdkMigrationState::Failed),
   1053         ] {
   1054             insert_local_outbox_audit_row(&store, id, id, "pending", &operation);
   1055             record_local_outbox_receipt(&store, id, state);
   1056         }
   1057 
   1058         let report = store
   1059             .audit_sdk_migration(
   1060                 &shared_events,
   1061                 AppSdkMigrationAuditRequest { batch_size: 10 },
   1062             )
   1063             .expect("audit should run");
   1064 
   1065         assert_eq!(report.local_outbox.scanned_records, 3);
   1066         assert_eq!(
   1067             count_named(
   1068                 &report.local_outbox.classification_counts,
   1069                 AppSdkMigrationAuditClassification::RepresentedRecord.storage_key()
   1070             ),
   1071             1
   1072         );
   1073         assert_eq!(
   1074             count_named(
   1075                 &report.local_outbox.classification_counts,
   1076                 AppSdkMigrationAuditClassification::SkippedRecord.storage_key()
   1077             ),
   1078             1
   1079         );
   1080         assert_eq!(
   1081             count_named(
   1082                 &report.local_outbox.classification_counts,
   1083                 AppSdkMigrationAuditClassification::FailedRecord.storage_key()
   1084             ),
   1085             1
   1086         );
   1087         assert!(
   1088             report
   1089                 .local_outbox
   1090                 .issues
   1091                 .iter()
   1092                 .any(|issue| issue.code == "sdk_migration_receipt_failed")
   1093         );
   1094     }
   1095 
   1096     #[test]
   1097     fn local_outbox_audit_does_not_let_receipts_hide_non_migratable_rows() {
   1098         let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app store");
   1099         let shared_events = local_events_store();
   1100         let operation = farm_profile_operation("acct_seed", "non_migratable");
   1101 
   1102         insert_local_outbox_audit_row(&store, "failed-source", "acct_failed", "failed", &operation);
   1103         record_local_outbox_receipt(&store, "failed-source", AppSdkMigrationState::Enqueued);
   1104         insert_local_outbox_audit_row(
   1105             &store,
   1106             "unsupported-source",
   1107             "acct_unsupported",
   1108             "pending",
   1109             &PendingSyncOperation {
   1110                 operation: radroots_app_sync::SyncOperationKind::Delete,
   1111                 ..operation.clone()
   1112             },
   1113         );
   1114         record_local_outbox_receipt(&store, "unsupported-source", AppSdkMigrationState::Enqueued);
   1115         store
   1116             .connection()
   1117             .execute_batch("PRAGMA ignore_check_constraints = ON")
   1118             .expect("disable sqlite checks for defensive unknown state row");
   1119         insert_local_outbox_audit_row(
   1120             &store,
   1121             "unknown-source",
   1122             "acct_unknown",
   1123             "mystery",
   1124             &operation,
   1125         );
   1126         store
   1127             .connection()
   1128             .execute_batch("PRAGMA ignore_check_constraints = OFF")
   1129             .expect("restore sqlite checks");
   1130         record_local_outbox_receipt(&store, "unknown-source", AppSdkMigrationState::Enqueued);
   1131 
   1132         let report = store
   1133             .audit_sdk_migration(
   1134                 &shared_events,
   1135                 AppSdkMigrationAuditRequest { batch_size: 10 },
   1136             )
   1137             .expect("audit should run");
   1138 
   1139         assert_eq!(
   1140             count_named(
   1141                 &report.local_outbox.classification_counts,
   1142                 AppSdkMigrationAuditClassification::RepresentedRecord.storage_key()
   1143             ),
   1144             0
   1145         );
   1146         assert_eq!(
   1147             count_named(
   1148                 &report.local_outbox.classification_counts,
   1149                 AppSdkMigrationAuditClassification::ManualReviewRequired.storage_key()
   1150             ),
   1151             1
   1152         );
   1153         assert_eq!(
   1154             count_named(
   1155                 &report.local_outbox.classification_counts,
   1156                 AppSdkMigrationAuditClassification::Unsupported.storage_key()
   1157             ),
   1158             1
   1159         );
   1160         assert_eq!(
   1161             count_named(
   1162                 &report.local_outbox.classification_counts,
   1163                 AppSdkMigrationAuditClassification::Unknown.storage_key()
   1164             ),
   1165             1
   1166         );
   1167         assert_eq!(
   1168             report
   1169                 .local_outbox
   1170                 .issues
   1171                 .iter()
   1172                 .filter(|issue| issue.code == "sdk_migration_receipt_for_non_migratable_source")
   1173                 .count(),
   1174             3
   1175         );
   1176     }
   1177 
   1178     #[test]
   1179     fn shared_local_events_audit_classifies_supported_events_and_validation_receipts() {
   1180         let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app store");
   1181         let shared_events = local_events_store();
   1182         shared_events
   1183             .append_record(&signed_event_record(
   1184                 "listing-a",
   1185                 "duplicate-listing-event",
   1186                 KIND_LISTING as i64,
   1187             ))
   1188             .expect("append listing a");
   1189         shared_events
   1190             .append_record(&signed_event_record(
   1191                 "listing-b",
   1192                 "duplicate-listing-event",
   1193                 KIND_LISTING as i64,
   1194             ))
   1195             .expect("append listing b");
   1196         shared_events
   1197             .append_record(&signed_event_record(
   1198                 "request",
   1199                 "request-event",
   1200                 KIND_ORDER_REQUEST as i64,
   1201             ))
   1202             .expect("append request");
   1203         shared_events
   1204             .append_record(&signed_event_record(
   1205                 "validation-receipt",
   1206                 "validation-receipt-event",
   1207                 KIND_TRADE_VALIDATION_RECEIPT as i64,
   1208             ))
   1209             .expect("append validation receipt");
   1210         let before_records = shared_events
   1211             .list_records_changed_after(0, 10)
   1212             .expect("list records before audit")
   1213             .len();
   1214 
   1215         let report = store
   1216             .audit_sdk_migration(
   1217                 &shared_events,
   1218                 AppSdkMigrationAuditRequest { batch_size: 1 },
   1219             )
   1220             .expect("audit should run");
   1221 
   1222         assert_eq!(
   1223             shared_events
   1224                 .list_records_changed_after(0, 10)
   1225                 .expect("list records after audit")
   1226                 .len(),
   1227             before_records
   1228         );
   1229         assert_eq!(report.shared_local_events.batch_count, 4);
   1230         assert_eq!(report.shared_local_events.scanned_records, 4);
   1231         assert_eq!(
   1232             count_named(
   1233                 &report.shared_local_events.classification_counts,
   1234                 AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate.storage_key()
   1235             ),
   1236             3
   1237         );
   1238         assert_eq!(
   1239             count_named(
   1240                 &report.shared_local_events.classification_counts,
   1241                 AppSdkMigrationAuditClassification::ValidationReceiptDeferred.storage_key()
   1242             ),
   1243             1
   1244         );
   1245         assert_eq!(
   1246             count_named(
   1247                 &report.shared_local_events.classification_counts,
   1248                 AppSdkMigrationAuditClassification::PublishableCandidate.storage_key()
   1249             ),
   1250             0
   1251         );
   1252         assert!(
   1253             report
   1254                 .shared_local_events
   1255                 .duplicate_candidates
   1256                 .iter()
   1257                 .any(|candidate| candidate.identity_kind == "event" && candidate.record_count == 2)
   1258         );
   1259     }
   1260 
   1261     #[test]
   1262     fn shared_local_work_audit_classifies_status_matrix() {
   1263         let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app store");
   1264         let shared_events = local_events_store();
   1265 
   1266         for (record_id, record_kind, status, outbox_status) in [
   1267             (
   1268                 "local-draft",
   1269                 "farm_config_v1",
   1270                 LocalRecordStatus::LocalDraft,
   1271                 PublishOutboxStatus::None,
   1272             ),
   1273             (
   1274                 "local-saved",
   1275                 "listing_draft_v1",
   1276                 LocalRecordStatus::LocalSaved,
   1277                 PublishOutboxStatus::None,
   1278             ),
   1279             (
   1280                 "pending-publish",
   1281                 "listing_draft_v1",
   1282                 LocalRecordStatus::PendingPublish,
   1283                 PublishOutboxStatus::None,
   1284             ),
   1285             (
   1286                 "published",
   1287                 "farm_config_v1",
   1288                 LocalRecordStatus::Published,
   1289                 PublishOutboxStatus::None,
   1290             ),
   1291             (
   1292                 "failed",
   1293                 "farm_config_v1",
   1294                 LocalRecordStatus::Failed,
   1295                 PublishOutboxStatus::None,
   1296             ),
   1297             (
   1298                 "conflict",
   1299                 "listing_draft_v1",
   1300                 LocalRecordStatus::Conflict,
   1301                 PublishOutboxStatus::None,
   1302             ),
   1303         ] {
   1304             shared_events
   1305                 .append_record(&local_work_record(
   1306                     record_id,
   1307                     record_kind,
   1308                     status,
   1309                     outbox_status,
   1310                 ))
   1311                 .expect("append local work record");
   1312         }
   1313 
   1314         let report = store
   1315             .audit_sdk_migration(
   1316                 &shared_events,
   1317                 AppSdkMigrationAuditRequest { batch_size: 3 },
   1318             )
   1319             .expect("audit should run");
   1320 
   1321         assert_eq!(report.shared_local_events.scanned_records, 6);
   1322         assert_eq!(
   1323             count_named(
   1324                 &report.shared_local_events.classification_counts,
   1325                 AppSdkMigrationAuditClassification::LocalWorkDeferred.storage_key()
   1326             ),
   1327             2
   1328         );
   1329         assert_eq!(
   1330             count_named(
   1331                 &report.shared_local_events.classification_counts,
   1332                 AppSdkMigrationAuditClassification::PublishableCandidate.storage_key()
   1333             ),
   1334             1
   1335         );
   1336         assert_eq!(
   1337             count_named(
   1338                 &report.shared_local_events.classification_counts,
   1339                 AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate.storage_key()
   1340             ),
   1341             1
   1342         );
   1343         assert_eq!(
   1344             count_named(
   1345                 &report.shared_local_events.classification_counts,
   1346                 AppSdkMigrationAuditClassification::ManualReviewRequired.storage_key()
   1347             ),
   1348             2
   1349         );
   1350     }
   1351 
   1352     #[test]
   1353     fn shared_local_work_status_classifier_handles_defensive_outbox_states() {
   1354         assert_eq!(
   1355             super::classify_shared_local_work_status(&local_work_model_record(
   1356                 LocalRecordStatus::PendingPublish,
   1357                 PublishOutboxStatus::Acknowledged,
   1358             )),
   1359             AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate
   1360         );
   1361         assert_eq!(
   1362             super::classify_shared_local_work_status(&local_work_model_record(
   1363                 LocalRecordStatus::PendingPublish,
   1364                 PublishOutboxStatus::Failed,
   1365             )),
   1366             AppSdkMigrationAuditClassification::ManualReviewRequired
   1367         );
   1368     }
   1369 
   1370     fn local_outbox_row_count(store: &AppSqliteStore) -> i64 {
   1371         store
   1372             .connection()
   1373             .query_row("SELECT count(*) FROM local_outbox", [], |row| row.get(0))
   1374             .expect("count local outbox rows")
   1375     }
   1376 
   1377     fn farm_profile_operation(account_id: &str, source: &str) -> PendingSyncOperation {
   1378         PendingSyncOperation::from_publish_payload(
   1379             AppPublishPayload::FarmProfile(AppFarmProfilePublishPayload {
   1380                 context: AppPublishContext::new(account_id, source),
   1381                 farm_id: FarmId::new(),
   1382                 display_name: "Green Loop Farm".to_owned(),
   1383                 readiness: Some(FarmReadiness::Ready),
   1384             }),
   1385             "2026-06-18T12:00:00Z",
   1386         )
   1387         .expect("build publish operation")
   1388     }
   1389 
   1390     fn insert_local_outbox_audit_row(
   1391         store: &AppSqliteStore,
   1392         id: &str,
   1393         account_id: &str,
   1394         state: &str,
   1395         operation: &PendingSyncOperation,
   1396     ) {
   1397         store
   1398             .connection()
   1399             .execute(
   1400                 "INSERT INTO local_outbox (
   1401                     id,
   1402                     account_id,
   1403                     operation_key,
   1404                     aggregate_kind,
   1405                     aggregate_id,
   1406                     operation_kind,
   1407                     payload_json,
   1408                     created_at,
   1409                     available_at,
   1410                     attempt_count,
   1411                     state,
   1412                     last_error_message
   1413                 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, NULL)",
   1414                 params![
   1415                     id,
   1416                     account_id,
   1417                     operation.operation_key.as_str(),
   1418                     operation.aggregate.aggregate_kind(),
   1419                     operation.aggregate.aggregate_id(),
   1420                     operation.operation.storage_key(),
   1421                     operation.payload_json.as_str(),
   1422                     operation.created_at.as_str(),
   1423                     operation.available_at.as_str(),
   1424                     i64::from(operation.attempt_count),
   1425                     state,
   1426                 ],
   1427             )
   1428             .expect("insert local outbox audit row");
   1429     }
   1430 
   1431     fn record_local_outbox_receipt(
   1432         store: &AppSqliteStore,
   1433         source_record_id: &str,
   1434         migration_state: AppSdkMigrationState,
   1435     ) {
   1436         store
   1437             .sdk_migration_receipt_repository()
   1438             .record_receipt(&AppSdkMigrationReceiptInput {
   1439                 source_kind: AppSdkMigrationReceiptSourceKind::LocalOutbox,
   1440                 source_record_id: source_record_id.to_owned(),
   1441                 sdk_operation_kind: "farm.publish".to_owned(),
   1442                 sdk_outbox_event_ids: vec![format!("sdk-outbox-{source_record_id}")],
   1443                 expected_event_id: Some(format!("event-{source_record_id}")),
   1444                 actor_pubkey: Some("actor-pubkey".to_owned()),
   1445                 idempotency_digest_prefix: Some("digest-prefix".to_owned()),
   1446                 migration_state,
   1447                 recorded_at: "2026-06-18T12:00:00Z".to_owned(),
   1448                 detail_json: json!({"source": source_record_id}),
   1449             })
   1450             .expect("record local outbox receipt");
   1451     }
   1452 
   1453     fn local_work_record(
   1454         record_id: &str,
   1455         record_kind: &str,
   1456         status: LocalRecordStatus,
   1457         outbox_status: PublishOutboxStatus,
   1458     ) -> LocalEventRecordInput {
   1459         LocalEventRecordInput {
   1460             record_id: record_id.to_owned(),
   1461             family: LocalRecordFamily::LocalWork,
   1462             status,
   1463             source_runtime: SourceRuntime::App,
   1464             created_at_ms: 1000,
   1465             inserted_at_ms: 1001,
   1466             owner_account_id: Some("acct_a".to_owned()),
   1467             owner_pubkey: Some("seller-pubkey".to_owned()),
   1468             farm_id: Some("farm-key".to_owned()),
   1469             listing_addr: Some("30402:seller-pubkey:listing-key".to_owned()),
   1470             local_work_json: Some(json!({"record_kind": record_kind})),
   1471             event_id: None,
   1472             event_kind: None,
   1473             event_pubkey: None,
   1474             event_created_at: None,
   1475             event_tags_json: None,
   1476             event_content: None,
   1477             event_sig: None,
   1478             raw_event_json: None,
   1479             outbox_status,
   1480             relay_set_fingerprint: None,
   1481             relay_delivery_json: None,
   1482         }
   1483     }
   1484 
   1485     fn local_work_model_record(
   1486         status: LocalRecordStatus,
   1487         outbox_status: PublishOutboxStatus,
   1488     ) -> LocalEventRecord {
   1489         LocalEventRecord {
   1490             seq: 1,
   1491             change_seq: 1,
   1492             record_id: "defensive-local-work".to_owned(),
   1493             family: LocalRecordFamily::LocalWork,
   1494             status,
   1495             source_runtime: SourceRuntime::App,
   1496             created_at_ms: 1000,
   1497             inserted_at_ms: 1001,
   1498             updated_at_ms: 1002,
   1499             owner_account_id: Some("acct_a".to_owned()),
   1500             owner_pubkey: Some("seller-pubkey".to_owned()),
   1501             farm_id: Some("farm-key".to_owned()),
   1502             listing_addr: Some("30402:seller-pubkey:listing-key".to_owned()),
   1503             local_work_json: Some(json!({"record_kind": "listing_draft_v1"})),
   1504             event_id: None,
   1505             event_kind: None,
   1506             event_pubkey: None,
   1507             event_created_at: None,
   1508             event_tags_json: None,
   1509             event_content: None,
   1510             event_sig: None,
   1511             raw_event_json: None,
   1512             outbox_status,
   1513             relay_set_fingerprint: None,
   1514             relay_delivery_json: None,
   1515         }
   1516     }
   1517 
   1518     fn signed_event_record(
   1519         record_id: &str,
   1520         event_id: &str,
   1521         event_kind: i64,
   1522     ) -> LocalEventRecordInput {
   1523         LocalEventRecordInput {
   1524             record_id: record_id.to_owned(),
   1525             family: LocalRecordFamily::SignedEvent,
   1526             status: LocalRecordStatus::Published,
   1527             source_runtime: SourceRuntime::App,
   1528             created_at_ms: 1000,
   1529             inserted_at_ms: 1001,
   1530             owner_account_id: Some("acct_a".to_owned()),
   1531             owner_pubkey: Some("seller-pubkey".to_owned()),
   1532             farm_id: Some("farm-key".to_owned()),
   1533             listing_addr: Some("30402:seller-pubkey:listing-key".to_owned()),
   1534             local_work_json: None,
   1535             event_id: Some(event_id.to_owned()),
   1536             event_kind: Some(event_kind),
   1537             event_pubkey: Some("seller-pubkey".to_owned()),
   1538             event_created_at: Some(1000),
   1539             event_tags_json: Some(json!([["d", "listing-key"]])),
   1540             event_content: Some("{}".to_owned()),
   1541             event_sig: Some("signature".to_owned()),
   1542             raw_event_json: Some(json!({
   1543                 "id": event_id,
   1544                 "kind": event_kind,
   1545                 "pubkey": "seller-pubkey"
   1546             })),
   1547             outbox_status: PublishOutboxStatus::Acknowledged,
   1548             relay_set_fingerprint: Some("relay-set".to_owned()),
   1549             relay_delivery_json: Some(json!({
   1550                 "state": "acknowledged",
   1551                 "acknowledged_relays": ["wss://relay.example"]
   1552             })),
   1553         }
   1554     }
   1555 }