commit 79175bddaa9330a37dda261d5ffddd68016c838b
parent 5b645f44948951d943123efdb798d6d687e1c929
Author: triesap <tyson@radroots.org>
Date: Thu, 18 Jun 2026 13:22:13 -0700
app: add read-only sdk migration audit
- audit app outbox and shared local-events records
- classify deferred payment and settlement evidence
- report duplicate and unsupported migration candidates
- verify sqlite core and app package checks
Diffstat:
2 files changed, 946 insertions(+), 0 deletions(-)
diff --git a/crates/store/src/lib.rs b/crates/store/src/lib.rs
@@ -2,6 +2,7 @@
mod error;
mod interop;
+mod migration_audit;
mod migrations;
mod repo;
mod sync;
@@ -31,6 +32,12 @@ pub use interop::{
AppLocalInteropImportReport, AppLocalInteropRepository, StoredLocalInteropRecord,
projected_order_id_from_trade_request,
};
+pub use migration_audit::{
+ APP_SDK_MIGRATION_AUDIT_DEFAULT_BATCH_SIZE, APP_SDK_MIGRATION_AUDIT_MAX_BATCH_SIZE,
+ AppSdkMigrationAuditClassification, AppSdkMigrationAuditCount,
+ AppSdkMigrationAuditDuplicateCandidate, AppSdkMigrationAuditIssue, AppSdkMigrationAuditReport,
+ AppSdkMigrationAuditRequest, AppSdkMigrationAuditSource, AppSdkMigrationAuditSourceReport,
+};
pub use migrations::latest_schema_version;
pub use repo::{
APP_ACTIVITY_CONTEXT_LIMIT, APP_ACTIVITY_RETENTION_LIMIT, AppActivationRepository,
diff --git a/crates/store/src/migration_audit.rs b/crates/store/src/migration_audit.rs
@@ -0,0 +1,939 @@
+use std::collections::{BTreeMap, BTreeSet};
+
+use radroots_app_sync::{AppPublishPayload, SyncOperationKind};
+use radroots_events::kinds::{
+ KIND_FARM, KIND_LISTING, KIND_LISTING_DRAFT, KIND_ORDER_CANCELLATION, KIND_ORDER_DECISION,
+ KIND_ORDER_FULFILLMENT_UPDATE, KIND_ORDER_PAYMENT_RECORD, KIND_ORDER_RECEIPT,
+ KIND_ORDER_REQUEST, KIND_ORDER_REVISION_DECISION, KIND_ORDER_REVISION_PROPOSAL,
+ KIND_ORDER_SETTLEMENT_DECISION, KIND_TRADE_VALIDATION_RECEIPT,
+};
+use radroots_local_events::{
+ LocalEventRecord, LocalEventsStore, LocalRecordFamily, LocalRecordStatus, PublishOutboxStatus,
+};
+use radroots_sql_core::SqlExecutor;
+use rusqlite::params;
+use serde_json::Value;
+
+use crate::{AppSqliteError, AppSqliteStore};
+
+pub const APP_SDK_MIGRATION_AUDIT_DEFAULT_BATCH_SIZE: u32 = 500;
+pub const APP_SDK_MIGRATION_AUDIT_MAX_BATCH_SIZE: u32 = 1_000;
+
+#[derive(Clone, Copy, Debug, Eq, PartialEq)]
+pub struct AppSdkMigrationAuditRequest {
+ pub batch_size: u32,
+}
+
+impl Default for AppSdkMigrationAuditRequest {
+ fn default() -> Self {
+ Self {
+ batch_size: APP_SDK_MIGRATION_AUDIT_DEFAULT_BATCH_SIZE,
+ }
+ }
+}
+
+impl AppSdkMigrationAuditRequest {
+ pub fn normalized_batch_size(self) -> u32 {
+ if self.batch_size == 0 {
+ APP_SDK_MIGRATION_AUDIT_DEFAULT_BATCH_SIZE
+ } else {
+ self.batch_size.min(APP_SDK_MIGRATION_AUDIT_MAX_BATCH_SIZE)
+ }
+ }
+}
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct AppSdkMigrationAuditReport {
+ pub local_outbox: AppSdkMigrationAuditSourceReport,
+ pub shared_local_events: AppSdkMigrationAuditSourceReport,
+ pub issues: Vec<AppSdkMigrationAuditIssue>,
+}
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct AppSdkMigrationAuditSourceReport {
+ pub source: AppSdkMigrationAuditSource,
+ pub batch_size: u32,
+ pub batch_count: u64,
+ pub scanned_records: u64,
+ pub kind_counts: Vec<AppSdkMigrationAuditCount>,
+ pub status_counts: Vec<AppSdkMigrationAuditCount>,
+ pub classification_counts: Vec<AppSdkMigrationAuditCount>,
+ pub duplicate_candidates: Vec<AppSdkMigrationAuditDuplicateCandidate>,
+ pub issues: Vec<AppSdkMigrationAuditIssue>,
+}
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct AppSdkMigrationAuditCount {
+ pub key: String,
+ pub count: u64,
+}
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct AppSdkMigrationAuditDuplicateCandidate {
+ pub identity_kind: String,
+ pub identity_key: String,
+ pub record_count: u64,
+ pub record_ids: Vec<String>,
+}
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct AppSdkMigrationAuditIssue {
+ pub source: AppSdkMigrationAuditSource,
+ pub code: String,
+ pub record_id: Option<String>,
+ pub message: String,
+}
+
+#[derive(Clone, Copy, Debug, Eq, PartialEq)]
+pub enum AppSdkMigrationAuditSource {
+ LocalOutbox,
+ SharedLocalEvents,
+}
+
+#[derive(Clone, Copy, Debug, Eq, PartialEq)]
+pub enum AppSdkMigrationAuditClassification {
+ PublishableCandidate,
+ AlreadyRepresentedCandidate,
+ PaymentDeferred,
+ SettlementDeferred,
+ Unsupported,
+ Unknown,
+}
+
+impl AppSdkMigrationAuditClassification {
+ pub const fn storage_key(self) -> &'static str {
+ match self {
+ Self::PublishableCandidate => "publishable_candidate",
+ Self::AlreadyRepresentedCandidate => "already_represented_candidate",
+ Self::PaymentDeferred => "payment_deferred",
+ Self::SettlementDeferred => "settlement_deferred",
+ Self::Unsupported => "unsupported",
+ Self::Unknown => "unknown",
+ }
+ }
+}
+
+impl AppSqliteStore {
+ pub fn audit_sdk_migration<E>(
+ &self,
+ shared_local_events: &LocalEventsStore<E>,
+ request: AppSdkMigrationAuditRequest,
+ ) -> Result<AppSdkMigrationAuditReport, AppSqliteError>
+ where
+ E: SqlExecutor,
+ {
+ let local_outbox = self.audit_sdk_migration_local_outbox(request)?;
+ let shared_local_events =
+ audit_sdk_migration_shared_local_events(shared_local_events, request)?;
+ let issues = local_outbox
+ .issues
+ .iter()
+ .chain(shared_local_events.issues.iter())
+ .cloned()
+ .collect();
+
+ Ok(AppSdkMigrationAuditReport {
+ local_outbox,
+ shared_local_events,
+ issues,
+ })
+ }
+
+ pub fn audit_sdk_migration_local_outbox(
+ &self,
+ request: AppSdkMigrationAuditRequest,
+ ) -> Result<AppSdkMigrationAuditSourceReport, AppSqliteError> {
+ let batch_size = request.normalized_batch_size();
+ let mut report = AppSdkMigrationAuditSourceBuilder::new(
+ AppSdkMigrationAuditSource::LocalOutbox,
+ batch_size,
+ );
+ let mut last_rowid = 0_i64;
+
+ loop {
+ let rows = self.load_local_outbox_audit_batch(last_rowid, batch_size)?;
+ if rows.is_empty() {
+ break;
+ }
+ report.batch_count += 1;
+ for row in &rows {
+ last_rowid = row.rowid;
+ audit_local_outbox_row(row, &mut report);
+ }
+ if rows.len() < batch_size as usize {
+ break;
+ }
+ }
+
+ Ok(report.finish())
+ }
+}
+
+pub fn audit_sdk_migration_shared_local_events<E>(
+ store: &LocalEventsStore<E>,
+ request: AppSdkMigrationAuditRequest,
+) -> Result<AppSdkMigrationAuditSourceReport, AppSqliteError>
+where
+ E: SqlExecutor,
+{
+ let batch_size = request.normalized_batch_size();
+ let mut report = AppSdkMigrationAuditSourceBuilder::new(
+ AppSdkMigrationAuditSource::SharedLocalEvents,
+ batch_size,
+ );
+ let mut after_change_seq = 0_i64;
+
+ loop {
+ let records = store
+ .list_records_changed_after(after_change_seq, batch_size)
+ .map_err(|source| AppSqliteError::LocalEvents {
+ operation: "audit shared local event records",
+ source,
+ })?;
+ if records.is_empty() {
+ break;
+ }
+ report.batch_count += 1;
+ for record in &records {
+ after_change_seq = record.change_seq;
+ audit_shared_local_event_record(record, &mut report);
+ }
+ if records.len() < batch_size as usize {
+ break;
+ }
+ }
+
+ Ok(report.finish())
+}
+
+impl AppSqliteStore {
+ fn load_local_outbox_audit_batch(
+ &self,
+ after_rowid: i64,
+ limit: u32,
+ ) -> Result<Vec<LocalOutboxAuditRow>, AppSqliteError> {
+ let mut statement = self
+ .connection()
+ .prepare(
+ "SELECT
+ rowid,
+ id,
+ account_id,
+ operation_key,
+ aggregate_kind,
+ aggregate_id,
+ operation_kind,
+ payload_json,
+ state
+ FROM local_outbox
+ WHERE rowid > ?1
+ ORDER BY rowid ASC
+ LIMIT ?2",
+ )
+ .map_err(|source| AppSqliteError::Query {
+ operation: "prepare SDK migration local outbox audit query",
+ source,
+ })?;
+ let rows = statement
+ .query_map(params![after_rowid, i64::from(limit)], |row| {
+ Ok(LocalOutboxAuditRow {
+ rowid: row.get(0)?,
+ id: row.get(1)?,
+ account_id: row.get(2)?,
+ operation_key: row.get(3)?,
+ aggregate_kind: row.get(4)?,
+ aggregate_id: row.get(5)?,
+ operation_kind: row.get(6)?,
+ payload_json: row.get(7)?,
+ state: row.get(8)?,
+ })
+ })
+ .map_err(|source| AppSqliteError::Query {
+ operation: "query SDK migration local outbox audit rows",
+ source,
+ })?;
+
+ rows.map(|row| {
+ row.map_err(|source| AppSqliteError::Query {
+ operation: "read SDK migration local outbox audit row",
+ source,
+ })
+ })
+ .collect()
+ }
+}
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+struct LocalOutboxAuditRow {
+ rowid: i64,
+ id: String,
+ account_id: String,
+ operation_key: String,
+ aggregate_kind: String,
+ aggregate_id: String,
+ operation_kind: String,
+ payload_json: String,
+ state: String,
+}
+
+struct AppSdkMigrationAuditSourceBuilder {
+ source: AppSdkMigrationAuditSource,
+ batch_size: u32,
+ batch_count: u64,
+ scanned_records: u64,
+ kind_counts: BTreeMap<String, u64>,
+ status_counts: BTreeMap<String, u64>,
+ classification_counts: BTreeMap<String, u64>,
+ duplicate_records: BTreeMap<DuplicateIdentity, BTreeSet<String>>,
+ issues: Vec<AppSdkMigrationAuditIssue>,
+}
+
+impl AppSdkMigrationAuditSourceBuilder {
+ fn new(source: AppSdkMigrationAuditSource, batch_size: u32) -> Self {
+ Self {
+ source,
+ batch_size,
+ batch_count: 0,
+ scanned_records: 0,
+ kind_counts: BTreeMap::new(),
+ status_counts: BTreeMap::new(),
+ classification_counts: BTreeMap::new(),
+ duplicate_records: BTreeMap::new(),
+ issues: Vec::new(),
+ }
+ }
+
+ fn record(
+ &mut self,
+ record_id: &str,
+ kind: String,
+ status: String,
+ classification: AppSdkMigrationAuditClassification,
+ duplicate_identities: Vec<DuplicateIdentity>,
+ ) {
+ self.scanned_records += 1;
+ increment_count(&mut self.kind_counts, kind);
+ increment_count(&mut self.status_counts, status);
+ increment_count(
+ &mut self.classification_counts,
+ classification.storage_key().to_owned(),
+ );
+ for identity in duplicate_identities {
+ self.duplicate_records
+ .entry(identity)
+ .or_default()
+ .insert(record_id.to_owned());
+ }
+ }
+
+ fn issue(&mut self, code: &str, record_id: Option<&str>, message: impl Into<String>) {
+ self.issues.push(AppSdkMigrationAuditIssue {
+ source: self.source,
+ code: code.to_owned(),
+ record_id: record_id.map(ToOwned::to_owned),
+ message: message.into(),
+ });
+ }
+
+ fn finish(self) -> AppSdkMigrationAuditSourceReport {
+ AppSdkMigrationAuditSourceReport {
+ source: self.source,
+ batch_size: self.batch_size,
+ batch_count: self.batch_count,
+ scanned_records: self.scanned_records,
+ kind_counts: counts_from_map(self.kind_counts),
+ status_counts: counts_from_map(self.status_counts),
+ classification_counts: counts_from_map(self.classification_counts),
+ duplicate_candidates: duplicate_candidates_from_map(self.duplicate_records),
+ issues: self.issues,
+ }
+ }
+}
+
+#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd)]
+struct DuplicateIdentity {
+ kind: String,
+ key: String,
+}
+
+fn audit_local_outbox_row(
+ row: &LocalOutboxAuditRow,
+ report: &mut AppSdkMigrationAuditSourceBuilder,
+) {
+ let payload = serde_json::from_str::<AppPublishPayload>(row.payload_json.as_str());
+ let (kind, classification) = match payload {
+ Ok(payload) => {
+ if row.operation_kind == SyncOperationKind::Delete.storage_key() {
+ report.issue(
+ "unsupported_local_outbox_operation",
+ Some(row.id.as_str()),
+ format!(
+ "local outbox delete operation `{}` is not a SDK publish migration candidate",
+ row.operation_key
+ ),
+ );
+ (
+ payload.work_kind().storage_key().to_owned(),
+ AppSdkMigrationAuditClassification::Unsupported,
+ )
+ } else if row.state == "succeeded" {
+ (
+ payload.work_kind().storage_key().to_owned(),
+ AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate,
+ )
+ } else {
+ (
+ payload.work_kind().storage_key().to_owned(),
+ AppSdkMigrationAuditClassification::PublishableCandidate,
+ )
+ }
+ }
+ Err(source) => {
+ report.issue(
+ "unknown_local_outbox_payload",
+ Some(row.id.as_str()),
+ format!(
+ "local outbox payload for operation `{}` could not be decoded: {source}",
+ row.operation_key
+ ),
+ );
+ (
+ format!("{}:{}", row.aggregate_kind, row.operation_kind),
+ AppSdkMigrationAuditClassification::Unknown,
+ )
+ }
+ };
+ let identities = vec![
+ DuplicateIdentity {
+ kind: "operation".to_owned(),
+ key: format!("{}:{}", row.account_id, row.operation_key),
+ },
+ DuplicateIdentity {
+ kind: "aggregate".to_owned(),
+ key: format!(
+ "{}:{}:{}:{}",
+ row.account_id, row.aggregate_kind, row.aggregate_id, row.operation_kind
+ ),
+ },
+ ];
+ report.record(
+ row.id.as_str(),
+ kind,
+ row.state.clone(),
+ classification,
+ identities,
+ );
+}
+
+fn audit_shared_local_event_record(
+ record: &LocalEventRecord,
+ report: &mut AppSdkMigrationAuditSourceBuilder,
+) {
+ let kind = shared_local_event_kind(record);
+ let classification = shared_local_event_classification(record, report);
+ report.record(
+ record.record_id.as_str(),
+ kind,
+ format!(
+ "{}:{}",
+ record.status.as_str(),
+ record.outbox_status.as_str()
+ ),
+ classification,
+ shared_local_event_duplicate_identities(record),
+ );
+}
+
+fn shared_local_event_kind(record: &LocalEventRecord) -> String {
+ match record.family {
+ LocalRecordFamily::LocalWork => record
+ .local_work_json
+ .as_ref()
+ .and_then(local_work_record_kind)
+ .map(|kind| format!("local_work:{kind}"))
+ .unwrap_or_else(|| "local_work:unknown".to_owned()),
+ LocalRecordFamily::SignedEvent => record
+ .event_kind
+ .map(shared_signed_event_kind)
+ .unwrap_or_else(|| "signed_event:unknown".to_owned()),
+ }
+}
+
+fn shared_local_event_classification(
+ record: &LocalEventRecord,
+ report: &mut AppSdkMigrationAuditSourceBuilder,
+) -> AppSdkMigrationAuditClassification {
+ match record.family {
+ LocalRecordFamily::LocalWork => classify_shared_local_work(record, report),
+ LocalRecordFamily::SignedEvent => classify_shared_signed_event(record, report),
+ }
+}
+
+fn classify_shared_local_work(
+ record: &LocalEventRecord,
+ report: &mut AppSdkMigrationAuditSourceBuilder,
+) -> AppSdkMigrationAuditClassification {
+ match record
+ .local_work_json
+ .as_ref()
+ .and_then(local_work_record_kind)
+ {
+ Some("farm_config_v1" | "listing_draft_v1") => {
+ AppSdkMigrationAuditClassification::PublishableCandidate
+ }
+ Some(record_kind) => {
+ report.issue(
+ "unsupported_shared_local_work_kind",
+ Some(record.record_id.as_str()),
+ format!("shared local work kind `{record_kind}` is not a SDK migration candidate"),
+ );
+ AppSdkMigrationAuditClassification::Unsupported
+ }
+ None => {
+ report.issue(
+ "unknown_shared_local_work_kind",
+ Some(record.record_id.as_str()),
+ "shared local work record does not expose a record_kind",
+ );
+ AppSdkMigrationAuditClassification::Unknown
+ }
+ }
+}
+
+fn classify_shared_signed_event(
+ record: &LocalEventRecord,
+ report: &mut AppSdkMigrationAuditSourceBuilder,
+) -> AppSdkMigrationAuditClassification {
+ match record.event_kind {
+ Some(kind) if kind == KIND_ORDER_PAYMENT_RECORD as i64 => {
+ AppSdkMigrationAuditClassification::PaymentDeferred
+ }
+ Some(kind) if kind == KIND_ORDER_SETTLEMENT_DECISION as i64 => {
+ AppSdkMigrationAuditClassification::SettlementDeferred
+ }
+ Some(kind) if supported_signed_event_kind(kind) => {
+ if signed_event_is_already_represented(record.status, record.outbox_status) {
+ AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate
+ } else {
+ AppSdkMigrationAuditClassification::PublishableCandidate
+ }
+ }
+ Some(kind) => {
+ report.issue(
+ "unsupported_shared_signed_event_kind",
+ Some(record.record_id.as_str()),
+ format!("shared signed event kind `{kind}` is not a SDK migration candidate"),
+ );
+ AppSdkMigrationAuditClassification::Unsupported
+ }
+ None => {
+ report.issue(
+ "unknown_shared_signed_event_kind",
+ Some(record.record_id.as_str()),
+ "shared signed event record does not expose an event_kind",
+ );
+ AppSdkMigrationAuditClassification::Unknown
+ }
+ }
+}
+
+fn signed_event_is_already_represented(
+ status: LocalRecordStatus,
+ outbox_status: PublishOutboxStatus,
+) -> bool {
+ matches!(status, LocalRecordStatus::Published)
+ || matches!(outbox_status, PublishOutboxStatus::Acknowledged)
+}
+
+fn shared_local_event_duplicate_identities(record: &LocalEventRecord) -> Vec<DuplicateIdentity> {
+ let mut identities = Vec::new();
+ if let (Some(event_kind), Some(event_id)) = (
+ record.event_kind,
+ non_empty_value(record.event_id.as_deref()),
+ ) {
+ identities.push(DuplicateIdentity {
+ kind: "event".to_owned(),
+ key: format!("{event_kind}:{event_id}"),
+ });
+ }
+ if let Some(key) = shared_local_event_aggregate_key(record) {
+ identities.push(DuplicateIdentity {
+ kind: "aggregate".to_owned(),
+ key,
+ });
+ }
+ identities
+}
+
+fn shared_local_event_aggregate_key(record: &LocalEventRecord) -> Option<String> {
+ match record.family {
+ LocalRecordFamily::LocalWork => {
+ let record_kind = record
+ .local_work_json
+ .as_ref()
+ .and_then(local_work_record_kind)?;
+ non_empty_value(record.farm_id.as_deref())
+ .map(|farm_id| format!("local_work:{record_kind}:farm:{farm_id}"))
+ .or_else(|| {
+ non_empty_value(record.listing_addr.as_deref()).map(|listing_addr| {
+ format!("local_work:{record_kind}:listing:{listing_addr}")
+ })
+ })
+ }
+ LocalRecordFamily::SignedEvent => {
+ let event_kind = record.event_kind?;
+ non_empty_value(record.listing_addr.as_deref())
+ .map(|listing_addr| format!("signed_event:{event_kind}:listing:{listing_addr}"))
+ .or_else(|| {
+ non_empty_value(record.farm_id.as_deref())
+ .map(|farm_id| format!("signed_event:{event_kind}:farm:{farm_id}"))
+ })
+ }
+ }
+}
+
+fn local_work_record_kind(payload: &Value) -> Option<&str> {
+ payload
+ .get("record_kind")
+ .and_then(Value::as_str)
+ .map(str::trim)
+ .filter(|value| !value.is_empty())
+}
+
+fn supported_signed_event_kind(kind: i64) -> bool {
+ matches!(
+ kind,
+ value if value == KIND_FARM as i64
+ || value == KIND_LISTING as i64
+ || value == KIND_LISTING_DRAFT as i64
+ || value == KIND_ORDER_REQUEST as i64
+ || value == KIND_ORDER_DECISION as i64
+ || value == KIND_ORDER_REVISION_PROPOSAL as i64
+ || value == KIND_ORDER_REVISION_DECISION as i64
+ || value == KIND_ORDER_CANCELLATION as i64
+ || value == KIND_ORDER_FULFILLMENT_UPDATE as i64
+ || value == KIND_ORDER_RECEIPT as i64
+ || value == KIND_TRADE_VALIDATION_RECEIPT as i64
+ )
+}
+
+fn shared_signed_event_kind(kind: i64) -> String {
+ let name = match kind {
+ value if value == KIND_FARM as i64 => "farm",
+ value if value == KIND_LISTING as i64 => "listing",
+ value if value == KIND_LISTING_DRAFT as i64 => "listing_draft",
+ value if value == KIND_ORDER_REQUEST as i64 => "order_request",
+ value if value == KIND_ORDER_DECISION as i64 => "order_decision",
+ value if value == KIND_ORDER_REVISION_PROPOSAL as i64 => "order_revision_proposal",
+ value if value == KIND_ORDER_REVISION_DECISION as i64 => "order_revision_decision",
+ value if value == KIND_ORDER_CANCELLATION as i64 => "order_cancellation",
+ value if value == KIND_ORDER_FULFILLMENT_UPDATE as i64 => "order_fulfillment",
+ value if value == KIND_ORDER_RECEIPT as i64 => "order_receipt",
+ value if value == KIND_ORDER_PAYMENT_RECORD as i64 => "order_payment",
+ value if value == KIND_ORDER_SETTLEMENT_DECISION as i64 => "order_settlement",
+ value if value == KIND_TRADE_VALIDATION_RECEIPT as i64 => "trade_validation_receipt",
+ _ => "unsupported",
+ };
+ format!("signed_event:{name}:{kind}")
+}
+
+fn non_empty_value(value: Option<&str>) -> Option<&str> {
+ value.map(str::trim).filter(|value| !value.is_empty())
+}
+
+fn increment_count(counts: &mut BTreeMap<String, u64>, key: String) {
+ *counts.entry(key).or_default() += 1;
+}
+
+fn counts_from_map(counts: BTreeMap<String, u64>) -> Vec<AppSdkMigrationAuditCount> {
+ counts
+ .into_iter()
+ .map(|(key, count)| AppSdkMigrationAuditCount { key, count })
+ .collect()
+}
+
+fn duplicate_candidates_from_map(
+ duplicate_records: BTreeMap<DuplicateIdentity, BTreeSet<String>>,
+) -> Vec<AppSdkMigrationAuditDuplicateCandidate> {
+ duplicate_records
+ .into_iter()
+ .filter_map(|(identity, records)| {
+ if records.len() < 2 {
+ return None;
+ }
+ Some(AppSdkMigrationAuditDuplicateCandidate {
+ identity_kind: identity.kind,
+ identity_key: identity.key,
+ record_count: records.len() as u64,
+ record_ids: records.into_iter().collect(),
+ })
+ })
+ .collect()
+}
+
+#[cfg(test)]
+mod tests {
+ use radroots_app_sync::{
+ AppFarmProfilePublishPayload, AppPublishContext, AppPublishPayload, PendingSyncOperation,
+ };
+ use radroots_app_view::{FarmId, FarmReadiness};
+ use radroots_events::kinds::{
+ KIND_LISTING, KIND_ORDER_PAYMENT_RECORD, KIND_ORDER_SETTLEMENT_DECISION,
+ };
+ use radroots_local_events::{
+ LocalEventRecordInput, LocalEventsStore, LocalRecordFamily, LocalRecordStatus,
+ PublishOutboxStatus, SourceRuntime,
+ };
+ use radroots_sql_core::SqliteExecutor;
+ use rusqlite::params;
+ use serde_json::json;
+
+ use crate::{
+ AppSdkMigrationAuditClassification, AppSdkMigrationAuditRequest, AppSqliteStore,
+ DatabaseTarget,
+ };
+
+ fn local_events_store() -> LocalEventsStore<SqliteExecutor> {
+ let executor = SqliteExecutor::open_memory().expect("open local events memory db");
+ let store = LocalEventsStore::new(executor);
+ store.migrate_up().expect("migrate local events store");
+ store
+ }
+
+ fn count_named(counts: &[crate::AppSdkMigrationAuditCount], key: &str) -> u64 {
+ counts
+ .iter()
+ .find(|count| count.key == key)
+ .map(|count| count.count)
+ .unwrap_or_default()
+ }
+
+ #[test]
+ fn local_outbox_audit_reads_batches_without_mutating_rows() {
+ let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app store");
+ let shared_events = local_events_store();
+ let farm_id = FarmId::new();
+ let operation = PendingSyncOperation::from_publish_payload(
+ AppPublishPayload::FarmProfile(AppFarmProfilePublishPayload {
+ context: AppPublishContext::new("acct_a", "farm_setup"),
+ farm_id,
+ display_name: "Green Loop Farm".to_owned(),
+ readiness: Some(FarmReadiness::Ready),
+ }),
+ "2026-06-18T12:00:00Z",
+ )
+ .expect("build publish operation");
+ store
+ .sync_repository()
+ .enqueue_pending_operation("acct_a", &operation)
+ .expect("enqueue operation");
+ store
+ .connection()
+ .execute(
+ "INSERT INTO local_outbox (
+ id,
+ account_id,
+ operation_key,
+ aggregate_kind,
+ aggregate_id,
+ operation_kind,
+ payload_json,
+ created_at,
+ available_at,
+ attempt_count,
+ state,
+ last_error_message
+ ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, NULL)",
+ params![
+ "succeeded-duplicate",
+ "acct_a",
+ operation.operation_key,
+ operation.aggregate.aggregate_kind(),
+ operation.aggregate.aggregate_id(),
+ operation.operation.storage_key(),
+ operation.payload_json,
+ "2026-06-18T11:00:00Z",
+ "2026-06-18T11:00:00Z",
+ 0_i64,
+ "succeeded",
+ ],
+ )
+ .expect("insert succeeded duplicate");
+ let before_count = local_outbox_row_count(&store);
+
+ let report = store
+ .audit_sdk_migration(
+ &shared_events,
+ AppSdkMigrationAuditRequest { batch_size: 1 },
+ )
+ .expect("audit should run");
+
+ assert_eq!(local_outbox_row_count(&store), before_count);
+ assert_eq!(report.local_outbox.batch_size, 1);
+ assert_eq!(report.local_outbox.batch_count, 2);
+ assert_eq!(report.local_outbox.scanned_records, 2);
+ assert_eq!(
+ count_named(&report.local_outbox.kind_counts, "farm_profile"),
+ 2
+ );
+ assert_eq!(
+ count_named(&report.local_outbox.status_counts, "pending"),
+ 1
+ );
+ assert_eq!(
+ count_named(&report.local_outbox.status_counts, "succeeded"),
+ 1
+ );
+ assert_eq!(
+ count_named(
+ &report.local_outbox.classification_counts,
+ AppSdkMigrationAuditClassification::PublishableCandidate.storage_key()
+ ),
+ 1
+ );
+ assert_eq!(
+ count_named(
+ &report.local_outbox.classification_counts,
+ AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate.storage_key()
+ ),
+ 1
+ );
+ assert!(
+ report
+ .local_outbox
+ .duplicate_candidates
+ .iter()
+ .any(|candidate| candidate.identity_kind == "operation"
+ && candidate.record_count == 2)
+ );
+ assert_eq!(report.shared_local_events.scanned_records, 0);
+ }
+
+ #[test]
+ fn shared_local_events_audit_defers_payment_and_settlement() {
+ let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app store");
+ let shared_events = local_events_store();
+ shared_events
+ .append_record(&signed_event_record(
+ "listing-a",
+ "duplicate-listing-event",
+ KIND_LISTING as i64,
+ ))
+ .expect("append listing a");
+ shared_events
+ .append_record(&signed_event_record(
+ "listing-b",
+ "duplicate-listing-event",
+ KIND_LISTING as i64,
+ ))
+ .expect("append listing b");
+ shared_events
+ .append_record(&signed_event_record(
+ "payment",
+ "payment-event",
+ KIND_ORDER_PAYMENT_RECORD as i64,
+ ))
+ .expect("append payment");
+ shared_events
+ .append_record(&signed_event_record(
+ "settlement",
+ "settlement-event",
+ KIND_ORDER_SETTLEMENT_DECISION as i64,
+ ))
+ .expect("append settlement");
+ let before_records = shared_events
+ .list_records_changed_after(0, 10)
+ .expect("list records before audit")
+ .len();
+
+ let report = store
+ .audit_sdk_migration(
+ &shared_events,
+ AppSdkMigrationAuditRequest { batch_size: 1 },
+ )
+ .expect("audit should run");
+
+ assert_eq!(
+ shared_events
+ .list_records_changed_after(0, 10)
+ .expect("list records after audit")
+ .len(),
+ before_records
+ );
+ assert_eq!(report.shared_local_events.batch_count, 4);
+ assert_eq!(report.shared_local_events.scanned_records, 4);
+ assert_eq!(
+ count_named(
+ &report.shared_local_events.classification_counts,
+ AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate.storage_key()
+ ),
+ 2
+ );
+ assert_eq!(
+ count_named(
+ &report.shared_local_events.classification_counts,
+ AppSdkMigrationAuditClassification::PaymentDeferred.storage_key()
+ ),
+ 1
+ );
+ assert_eq!(
+ count_named(
+ &report.shared_local_events.classification_counts,
+ AppSdkMigrationAuditClassification::SettlementDeferred.storage_key()
+ ),
+ 1
+ );
+ assert!(
+ report
+ .shared_local_events
+ .duplicate_candidates
+ .iter()
+ .any(|candidate| candidate.identity_kind == "event" && candidate.record_count == 2)
+ );
+ }
+
+ fn local_outbox_row_count(store: &AppSqliteStore) -> i64 {
+ store
+ .connection()
+ .query_row("SELECT count(*) FROM local_outbox", [], |row| row.get(0))
+ .expect("count local outbox rows")
+ }
+
+ fn signed_event_record(
+ record_id: &str,
+ event_id: &str,
+ event_kind: i64,
+ ) -> LocalEventRecordInput {
+ LocalEventRecordInput {
+ record_id: record_id.to_owned(),
+ family: LocalRecordFamily::SignedEvent,
+ status: LocalRecordStatus::Published,
+ source_runtime: SourceRuntime::App,
+ created_at_ms: 1000,
+ inserted_at_ms: 1001,
+ owner_account_id: Some("acct_a".to_owned()),
+ owner_pubkey: Some("seller-pubkey".to_owned()),
+ farm_id: Some("farm-key".to_owned()),
+ listing_addr: Some("30402:seller-pubkey:listing-key".to_owned()),
+ local_work_json: None,
+ event_id: Some(event_id.to_owned()),
+ event_kind: Some(event_kind),
+ event_pubkey: Some("seller-pubkey".to_owned()),
+ event_created_at: Some(1000),
+ event_tags_json: Some(json!([["d", "listing-key"]])),
+ event_content: Some("{}".to_owned()),
+ event_sig: Some("signature".to_owned()),
+ raw_event_json: Some(json!({
+ "id": event_id,
+ "kind": event_kind,
+ "pubkey": "seller-pubkey"
+ })),
+ outbox_status: PublishOutboxStatus::Acknowledged,
+ relay_set_fingerprint: Some("relay-set".to_owned()),
+ relay_delivery_json: Some(json!({
+ "state": "acknowledged",
+ "acknowledged_relays": ["wss://relay.example"]
+ })),
+ }
+ }
+}