app

Local-first trade for farms and co-ops
git clone https://radroots.dev/git/app.git
Log | Files | Refs | README | LICENSE

commit b911eab104386fdd9cd85747c24373f1afc3be25
parent a7dcb129b031356c9450accc425869cab7594bd8
Author: triesap <tyson@radroots.org>
Date:   Thu, 18 Jun 2026 14:12:58 -0700

app: harden sdk migration audit statuses

- classify local outbox failed and blocked rows as manual review
- distinguish deferred local work from publishable migration candidates
- preserve already represented semantics for published and acknowledged records
- add audit status matrix regression coverage

Diffstat:
Mcrates/store/src/migration_audit.rs | 368++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
1 file changed, 357 insertions(+), 11 deletions(-)

diff --git a/crates/store/src/migration_audit.rs b/crates/store/src/migration_audit.rs @@ -94,6 +94,8 @@ pub enum AppSdkMigrationAuditSource { pub enum AppSdkMigrationAuditClassification { PublishableCandidate, AlreadyRepresentedCandidate, + LocalWorkDeferred, + ManualReviewRequired, PaymentDeferred, SettlementDeferred, Unsupported, @@ -105,6 +107,8 @@ impl AppSdkMigrationAuditClassification { match self { Self::PublishableCandidate => "publishable_candidate", Self::AlreadyRepresentedCandidate => "already_represented_candidate", + Self::LocalWorkDeferred => "local_work_deferred", + Self::ManualReviewRequired => "manual_review_required", Self::PaymentDeferred => "payment_deferred", Self::SettlementDeferred => "settlement_deferred", Self::Unsupported => "unsupported", @@ -376,15 +380,10 @@ fn audit_local_outbox_row( payload.work_kind().storage_key().to_owned(), AppSdkMigrationAuditClassification::Unsupported, ) - } else if row.state == "succeeded" { - ( - payload.work_kind().storage_key().to_owned(), - AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate, - ) } else { ( payload.work_kind().storage_key().to_owned(), - AppSdkMigrationAuditClassification::PublishableCandidate, + classify_local_outbox_state(row, report), ) } } @@ -425,6 +424,40 @@ fn audit_local_outbox_row( ); } +fn classify_local_outbox_state( + row: &LocalOutboxAuditRow, + report: &mut AppSdkMigrationAuditSourceBuilder, +) -> AppSdkMigrationAuditClassification { + match row.state.as_str() { + "pending" | "in_progress" | "retryable" => { + AppSdkMigrationAuditClassification::PublishableCandidate + } + "succeeded" => AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate, + "failed" | "blocked" => { + report.issue( + "manual_review_local_outbox_state", + Some(row.id.as_str()), + format!( + "local outbox operation `{}` is in `{}` state and requires migration review", + row.operation_key, row.state + ), + ); + AppSdkMigrationAuditClassification::ManualReviewRequired + } + _ => { + report.issue( + "unknown_local_outbox_state", + Some(row.id.as_str()), + format!( + "local outbox operation `{}` has unknown state `{}`", + row.operation_key, row.state + ), + ); + AppSdkMigrationAuditClassification::Unknown + } + } +} + fn audit_shared_local_event_record( record: &LocalEventRecord, report: &mut AppSdkMigrationAuditSourceBuilder, @@ -478,9 +511,7 @@ fn classify_shared_local_work( .as_ref() .and_then(local_work_record_kind) { - Some("farm_config_v1" | "listing_draft_v1") => { - AppSdkMigrationAuditClassification::PublishableCandidate - } + Some("farm_config_v1" | "listing_draft_v1") => classify_shared_local_work_status(record), Some(record_kind) => { report.issue( "unsupported_shared_local_work_kind", @@ -500,6 +531,27 @@ fn classify_shared_local_work( } } +fn classify_shared_local_work_status( + record: &LocalEventRecord, +) -> AppSdkMigrationAuditClassification { + if matches!(record.outbox_status, PublishOutboxStatus::Acknowledged) + || matches!(record.status, LocalRecordStatus::Published) + { + AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate + } else if matches!(record.outbox_status, PublishOutboxStatus::Failed) + || matches!( + record.status, + LocalRecordStatus::Failed | LocalRecordStatus::Conflict + ) + { + AppSdkMigrationAuditClassification::ManualReviewRequired + } else if matches!(record.status, LocalRecordStatus::PendingPublish) { + AppSdkMigrationAuditClassification::PublishableCandidate + } else { + AppSdkMigrationAuditClassification::LocalWorkDeferred + } +} + fn classify_shared_signed_event( record: &LocalEventRecord, report: &mut AppSdkMigrationAuditSourceBuilder, @@ -681,8 +733,8 @@ mod tests { KIND_LISTING, KIND_ORDER_PAYMENT_RECORD, KIND_ORDER_SETTLEMENT_DECISION, }; use radroots_local_events::{ - LocalEventRecordInput, LocalEventsStore, LocalRecordFamily, LocalRecordStatus, - PublishOutboxStatus, SourceRuntime, + LocalEventRecord, LocalEventRecordInput, LocalEventsStore, LocalRecordFamily, + LocalRecordStatus, PublishOutboxStatus, SourceRuntime, }; use radroots_sql_core::SqliteExecutor; use rusqlite::params; @@ -810,6 +862,72 @@ mod tests { } #[test] + fn local_outbox_audit_classifies_status_matrix() { + let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app store"); + let shared_events = local_events_store(); + let operation = farm_profile_operation("acct_seed", "status_matrix"); + + for (index, state) in [ + "pending", + "in_progress", + "retryable", + "failed", + "blocked", + "succeeded", + ] + .iter() + .enumerate() + { + insert_local_outbox_audit_row( + &store, + &format!("local-outbox-{state}"), + &format!("acct_{index}"), + state, + &operation, + ); + } + + let report = store + .audit_sdk_migration( + &shared_events, + AppSdkMigrationAuditRequest { batch_size: 2 }, + ) + .expect("audit should run"); + + assert_eq!(report.local_outbox.scanned_records, 6); + assert_eq!( + count_named( + &report.local_outbox.classification_counts, + AppSdkMigrationAuditClassification::PublishableCandidate.storage_key() + ), + 3 + ); + assert_eq!( + count_named( + &report.local_outbox.classification_counts, + AppSdkMigrationAuditClassification::ManualReviewRequired.storage_key() + ), + 2 + ); + assert_eq!( + count_named( + &report.local_outbox.classification_counts, + AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate.storage_key() + ), + 1 + ); + assert_eq!( + report + .local_outbox + .issues + .iter() + .filter(|issue| issue.code == "manual_review_local_outbox_state") + .count(), + 2 + ); + } + + #[test] fn shared_local_events_audit_defers_payment_and_settlement() { let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app store"); let shared_events = local_events_store(); @@ -892,6 +1010,115 @@ mod tests { ); } + #[test] + fn shared_local_work_audit_classifies_status_matrix() { + let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app store"); + let shared_events = local_events_store(); + + for (record_id, record_kind, status, outbox_status) in [ + ( + "local-draft", + "farm_config_v1", + LocalRecordStatus::LocalDraft, + PublishOutboxStatus::None, + ), + ( + "local-saved", + "listing_draft_v1", + LocalRecordStatus::LocalSaved, + PublishOutboxStatus::None, + ), + ( + "pending-publish", + "listing_draft_v1", + LocalRecordStatus::PendingPublish, + PublishOutboxStatus::None, + ), + ( + "published", + "farm_config_v1", + LocalRecordStatus::Published, + PublishOutboxStatus::None, + ), + ( + "failed", + "farm_config_v1", + LocalRecordStatus::Failed, + PublishOutboxStatus::None, + ), + ( + "conflict", + "listing_draft_v1", + LocalRecordStatus::Conflict, + PublishOutboxStatus::None, + ), + ] { + shared_events + .append_record(&local_work_record( + record_id, + record_kind, + status, + outbox_status, + )) + .expect("append local work record"); + } + + let report = store + .audit_sdk_migration( + &shared_events, + AppSdkMigrationAuditRequest { batch_size: 3 }, + ) + .expect("audit should run"); + + assert_eq!(report.shared_local_events.scanned_records, 6); + assert_eq!( + count_named( + &report.shared_local_events.classification_counts, + AppSdkMigrationAuditClassification::LocalWorkDeferred.storage_key() + ), + 2 + ); + assert_eq!( + count_named( + &report.shared_local_events.classification_counts, + AppSdkMigrationAuditClassification::PublishableCandidate.storage_key() + ), + 1 + ); + assert_eq!( + count_named( + &report.shared_local_events.classification_counts, + AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate.storage_key() + ), + 1 + ); + assert_eq!( + count_named( + &report.shared_local_events.classification_counts, + AppSdkMigrationAuditClassification::ManualReviewRequired.storage_key() + ), + 2 + ); + } + + #[test] + fn shared_local_work_status_classifier_handles_defensive_outbox_states() { + assert_eq!( + super::classify_shared_local_work_status(&local_work_model_record( + LocalRecordStatus::PendingPublish, + PublishOutboxStatus::Acknowledged, + )), + AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate + ); + assert_eq!( + super::classify_shared_local_work_status(&local_work_model_record( + LocalRecordStatus::PendingPublish, + PublishOutboxStatus::Failed, + )), + AppSdkMigrationAuditClassification::ManualReviewRequired + ); + } + fn local_outbox_row_count(store: &AppSqliteStore) -> i64 { store .connection() @@ -899,6 +1126,125 @@ mod tests { .expect("count local outbox rows") } + fn farm_profile_operation(account_id: &str, source: &str) -> PendingSyncOperation { + PendingSyncOperation::from_publish_payload( + AppPublishPayload::FarmProfile(AppFarmProfilePublishPayload { + context: AppPublishContext::new(account_id, source), + farm_id: FarmId::new(), + display_name: "Green Loop Farm".to_owned(), + readiness: Some(FarmReadiness::Ready), + }), + "2026-06-18T12:00:00Z", + ) + .expect("build publish operation") + } + + fn insert_local_outbox_audit_row( + store: &AppSqliteStore, + id: &str, + account_id: &str, + state: &str, + operation: &PendingSyncOperation, + ) { + store + .connection() + .execute( + "INSERT INTO local_outbox ( + id, + account_id, + operation_key, + aggregate_kind, + aggregate_id, + operation_kind, + payload_json, + created_at, + available_at, + attempt_count, + state, + last_error_message + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, NULL)", + params![ + id, + account_id, + operation.operation_key.as_str(), + operation.aggregate.aggregate_kind(), + operation.aggregate.aggregate_id(), + operation.operation.storage_key(), + operation.payload_json.as_str(), + operation.created_at.as_str(), + operation.available_at.as_str(), + i64::from(operation.attempt_count), + state, + ], + ) + .expect("insert local outbox audit row"); + } + + fn local_work_record( + record_id: &str, + record_kind: &str, + status: LocalRecordStatus, + outbox_status: PublishOutboxStatus, + ) -> LocalEventRecordInput { + LocalEventRecordInput { + record_id: record_id.to_owned(), + family: LocalRecordFamily::LocalWork, + status, + source_runtime: SourceRuntime::App, + created_at_ms: 1000, + inserted_at_ms: 1001, + owner_account_id: Some("acct_a".to_owned()), + owner_pubkey: Some("seller-pubkey".to_owned()), + farm_id: Some("farm-key".to_owned()), + listing_addr: Some("30402:seller-pubkey:listing-key".to_owned()), + local_work_json: Some(json!({"record_kind": record_kind})), + event_id: None, + event_kind: None, + event_pubkey: None, + event_created_at: None, + event_tags_json: None, + event_content: None, + event_sig: None, + raw_event_json: None, + outbox_status, + relay_set_fingerprint: None, + relay_delivery_json: None, + } + } + + fn local_work_model_record( + status: LocalRecordStatus, + outbox_status: PublishOutboxStatus, + ) -> LocalEventRecord { + LocalEventRecord { + seq: 1, + change_seq: 1, + record_id: "defensive-local-work".to_owned(), + family: LocalRecordFamily::LocalWork, + status, + source_runtime: SourceRuntime::App, + created_at_ms: 1000, + inserted_at_ms: 1001, + updated_at_ms: 1002, + owner_account_id: Some("acct_a".to_owned()), + owner_pubkey: Some("seller-pubkey".to_owned()), + farm_id: Some("farm-key".to_owned()), + listing_addr: Some("30402:seller-pubkey:listing-key".to_owned()), + local_work_json: Some(json!({"record_kind": "listing_draft_v1"})), + event_id: None, + event_kind: None, + event_pubkey: None, + event_created_at: None, + event_tags_json: None, + event_content: None, + event_sig: None, + raw_event_json: None, + outbox_status, + relay_set_fingerprint: None, + relay_delivery_json: None, + } + } + fn signed_event_record( record_id: &str, event_id: &str,