migration_audit.rs (54265B)
1 use std::collections::{BTreeMap, BTreeSet}; 2 3 use radroots_app_sync::{AppPublishPayload, SyncOperationKind}; 4 use radroots_events::kinds::{ 5 KIND_FARM, KIND_LISTING, KIND_LISTING_DRAFT, KIND_ORDER_CANCELLATION, KIND_ORDER_DECISION, 6 KIND_ORDER_REQUEST, KIND_ORDER_REVISION_DECISION, KIND_ORDER_REVISION_PROPOSAL, 7 KIND_TRADE_VALIDATION_RECEIPT, 8 }; 9 use radroots_local_events::{ 10 LocalEventRecord, LocalEventsStore, LocalRecordFamily, LocalRecordStatus, PublishOutboxStatus, 11 }; 12 use radroots_sql_core::SqlExecutor; 13 use rusqlite::params; 14 use serde_json::Value; 15 16 use crate::{ 17 AppSdkMigrationReceipt, AppSdkMigrationReceiptSourceKind, AppSdkMigrationState, AppSqliteError, 18 AppSqliteStore, 19 }; 20 21 pub const APP_SDK_MIGRATION_AUDIT_DEFAULT_BATCH_SIZE: u32 = 500; 22 pub const APP_SDK_MIGRATION_AUDIT_MAX_BATCH_SIZE: u32 = 1_000; 23 24 #[derive(Clone, Copy, Debug, Eq, PartialEq)] 25 pub struct AppSdkMigrationAuditRequest { 26 pub batch_size: u32, 27 } 28 29 impl Default for AppSdkMigrationAuditRequest { 30 fn default() -> Self { 31 Self { 32 batch_size: APP_SDK_MIGRATION_AUDIT_DEFAULT_BATCH_SIZE, 33 } 34 } 35 } 36 37 impl AppSdkMigrationAuditRequest { 38 pub fn normalized_batch_size(self) -> u32 { 39 if self.batch_size == 0 { 40 APP_SDK_MIGRATION_AUDIT_DEFAULT_BATCH_SIZE 41 } else { 42 self.batch_size.min(APP_SDK_MIGRATION_AUDIT_MAX_BATCH_SIZE) 43 } 44 } 45 } 46 47 #[derive(Clone, Debug, Eq, PartialEq)] 48 pub struct AppSdkMigrationAuditReport { 49 pub local_outbox: AppSdkMigrationAuditSourceReport, 50 pub shared_local_events: AppSdkMigrationAuditSourceReport, 51 pub issues: Vec<AppSdkMigrationAuditIssue>, 52 } 53 54 #[derive(Clone, Debug, Eq, PartialEq)] 55 pub struct AppSdkMigrationAuditSourceReport { 56 pub source: AppSdkMigrationAuditSource, 57 pub batch_size: u32, 58 pub batch_count: u64, 59 pub scanned_records: u64, 60 pub kind_counts: Vec<AppSdkMigrationAuditCount>, 61 pub status_counts: Vec<AppSdkMigrationAuditCount>, 62 pub classification_counts: Vec<AppSdkMigrationAuditCount>, 63 pub duplicate_candidates: Vec<AppSdkMigrationAuditDuplicateCandidate>, 64 pub issues: Vec<AppSdkMigrationAuditIssue>, 65 } 66 67 #[derive(Clone, Debug, Eq, PartialEq)] 68 pub struct AppSdkMigrationAuditCount { 69 pub key: String, 70 pub count: u64, 71 } 72 73 #[derive(Clone, Debug, Eq, PartialEq)] 74 pub struct AppSdkMigrationAuditDuplicateCandidate { 75 pub identity_kind: String, 76 pub identity_key: String, 77 pub record_count: u64, 78 pub record_ids: Vec<String>, 79 } 80 81 #[derive(Clone, Debug, Eq, PartialEq)] 82 pub struct AppSdkMigrationAuditIssue { 83 pub source: AppSdkMigrationAuditSource, 84 pub code: String, 85 pub record_id: Option<String>, 86 pub message: String, 87 } 88 89 #[derive(Clone, Copy, Debug, Eq, PartialEq)] 90 pub enum AppSdkMigrationAuditSource { 91 LocalOutbox, 92 SharedLocalEvents, 93 } 94 95 #[derive(Clone, Copy, Debug, Eq, PartialEq)] 96 pub enum AppSdkMigrationAuditClassification { 97 PublishableCandidate, 98 AlreadyRepresentedCandidate, 99 RepresentedRecord, 100 SkippedRecord, 101 FailedRecord, 102 LocalWorkDeferred, 103 ManualReviewRequired, 104 ValidationReceiptDeferred, 105 Unsupported, 106 Unknown, 107 } 108 109 impl AppSdkMigrationAuditClassification { 110 pub const fn storage_key(self) -> &'static str { 111 match self { 112 Self::PublishableCandidate => "publishable_candidate", 113 Self::AlreadyRepresentedCandidate => "already_represented_candidate", 114 Self::RepresentedRecord => "represented_record", 115 Self::SkippedRecord => "skipped_record", 116 Self::FailedRecord => "failed_record", 117 Self::LocalWorkDeferred => "local_work_deferred", 118 Self::ManualReviewRequired => "manual_review_required", 119 Self::ValidationReceiptDeferred => "validation_receipt_deferred", 120 Self::Unsupported => "unsupported", 121 Self::Unknown => "unknown", 122 } 123 } 124 } 125 126 impl AppSqliteStore { 127 pub fn audit_sdk_migration<E>( 128 &self, 129 shared_local_events: &LocalEventsStore<E>, 130 request: AppSdkMigrationAuditRequest, 131 ) -> Result<AppSdkMigrationAuditReport, AppSqliteError> 132 where 133 E: SqlExecutor, 134 { 135 let local_outbox = self.audit_sdk_migration_local_outbox(request)?; 136 let shared_local_events = 137 self.audit_sdk_migration_shared_local_events(shared_local_events, request)?; 138 let issues = local_outbox 139 .issues 140 .iter() 141 .chain(shared_local_events.issues.iter()) 142 .cloned() 143 .collect(); 144 145 Ok(AppSdkMigrationAuditReport { 146 local_outbox, 147 shared_local_events, 148 issues, 149 }) 150 } 151 152 pub fn audit_sdk_migration_local_outbox( 153 &self, 154 request: AppSdkMigrationAuditRequest, 155 ) -> Result<AppSdkMigrationAuditSourceReport, AppSqliteError> { 156 let batch_size = request.normalized_batch_size(); 157 let mut report = AppSdkMigrationAuditSourceBuilder::new( 158 AppSdkMigrationAuditSource::LocalOutbox, 159 batch_size, 160 ); 161 let mut last_rowid = 0_i64; 162 163 loop { 164 let rows = self.load_local_outbox_audit_batch(last_rowid, batch_size)?; 165 if rows.is_empty() { 166 break; 167 } 168 report.batch_count += 1; 169 for row in &rows { 170 last_rowid = row.rowid; 171 let receipt = self.sdk_migration_receipt_repository().load_receipt( 172 AppSdkMigrationReceiptSourceKind::LocalOutbox, 173 row.id.as_str(), 174 )?; 175 audit_local_outbox_row(row, receipt.as_ref(), &mut report); 176 } 177 if rows.len() < batch_size as usize { 178 break; 179 } 180 } 181 182 Ok(report.finish()) 183 } 184 185 pub fn audit_sdk_migration_shared_local_events<E>( 186 &self, 187 store: &LocalEventsStore<E>, 188 request: AppSdkMigrationAuditRequest, 189 ) -> Result<AppSdkMigrationAuditSourceReport, AppSqliteError> 190 where 191 E: SqlExecutor, 192 { 193 audit_sdk_migration_shared_local_events_with_receipts(store, request, |record_id| { 194 self.sdk_migration_receipt_repository().load_receipt( 195 AppSdkMigrationReceiptSourceKind::SharedLocalEvent, 196 record_id, 197 ) 198 }) 199 } 200 } 201 202 fn audit_sdk_migration_shared_local_events_with_receipts<E>( 203 store: &LocalEventsStore<E>, 204 request: AppSdkMigrationAuditRequest, 205 mut load_receipt: impl FnMut(&str) -> Result<Option<AppSdkMigrationReceipt>, AppSqliteError>, 206 ) -> Result<AppSdkMigrationAuditSourceReport, AppSqliteError> 207 where 208 E: SqlExecutor, 209 { 210 let batch_size = request.normalized_batch_size(); 211 let mut report = AppSdkMigrationAuditSourceBuilder::new( 212 AppSdkMigrationAuditSource::SharedLocalEvents, 213 batch_size, 214 ); 215 let mut after_change_seq = 0_i64; 216 217 loop { 218 let records = store 219 .list_records_changed_after(after_change_seq, batch_size) 220 .map_err(|source| AppSqliteError::LocalEvents { 221 operation: "audit shared local event records", 222 source, 223 })?; 224 if records.is_empty() { 225 break; 226 } 227 report.batch_count += 1; 228 for record in &records { 229 after_change_seq = record.change_seq; 230 let receipt = load_receipt(record.record_id.as_str())?; 231 audit_shared_local_event_record(record, receipt.as_ref(), &mut report); 232 } 233 if records.len() < batch_size as usize { 234 break; 235 } 236 } 237 238 Ok(report.finish()) 239 } 240 241 impl AppSqliteStore { 242 fn load_local_outbox_audit_batch( 243 &self, 244 after_rowid: i64, 245 limit: u32, 246 ) -> Result<Vec<LocalOutboxAuditRow>, AppSqliteError> { 247 let mut statement = self 248 .connection() 249 .prepare( 250 "SELECT 251 rowid, 252 id, 253 account_id, 254 operation_key, 255 aggregate_kind, 256 aggregate_id, 257 operation_kind, 258 payload_json, 259 state 260 FROM local_outbox 261 WHERE rowid > ?1 262 ORDER BY rowid ASC 263 LIMIT ?2", 264 ) 265 .map_err(|source| AppSqliteError::Query { 266 operation: "prepare SDK migration local outbox audit query", 267 source, 268 })?; 269 let rows = statement 270 .query_map(params![after_rowid, i64::from(limit)], |row| { 271 Ok(LocalOutboxAuditRow { 272 rowid: row.get(0)?, 273 id: row.get(1)?, 274 account_id: row.get(2)?, 275 operation_key: row.get(3)?, 276 aggregate_kind: row.get(4)?, 277 aggregate_id: row.get(5)?, 278 operation_kind: row.get(6)?, 279 payload_json: row.get(7)?, 280 state: row.get(8)?, 281 }) 282 }) 283 .map_err(|source| AppSqliteError::Query { 284 operation: "query SDK migration local outbox audit rows", 285 source, 286 })?; 287 288 rows.map(|row| { 289 row.map_err(|source| AppSqliteError::Query { 290 operation: "read SDK migration local outbox audit row", 291 source, 292 }) 293 }) 294 .collect() 295 } 296 } 297 298 #[derive(Clone, Debug, Eq, PartialEq)] 299 struct LocalOutboxAuditRow { 300 rowid: i64, 301 id: String, 302 account_id: String, 303 operation_key: String, 304 aggregate_kind: String, 305 aggregate_id: String, 306 operation_kind: String, 307 payload_json: String, 308 state: String, 309 } 310 311 struct AppSdkMigrationAuditSourceBuilder { 312 source: AppSdkMigrationAuditSource, 313 batch_size: u32, 314 batch_count: u64, 315 scanned_records: u64, 316 kind_counts: BTreeMap<String, u64>, 317 status_counts: BTreeMap<String, u64>, 318 classification_counts: BTreeMap<String, u64>, 319 duplicate_records: BTreeMap<DuplicateIdentity, BTreeSet<String>>, 320 issues: Vec<AppSdkMigrationAuditIssue>, 321 } 322 323 impl AppSdkMigrationAuditSourceBuilder { 324 fn new(source: AppSdkMigrationAuditSource, batch_size: u32) -> Self { 325 Self { 326 source, 327 batch_size, 328 batch_count: 0, 329 scanned_records: 0, 330 kind_counts: BTreeMap::new(), 331 status_counts: BTreeMap::new(), 332 classification_counts: BTreeMap::new(), 333 duplicate_records: BTreeMap::new(), 334 issues: Vec::new(), 335 } 336 } 337 338 fn record( 339 &mut self, 340 record_id: &str, 341 kind: String, 342 status: String, 343 classification: AppSdkMigrationAuditClassification, 344 duplicate_identities: Vec<DuplicateIdentity>, 345 ) { 346 self.scanned_records += 1; 347 increment_count(&mut self.kind_counts, kind); 348 increment_count(&mut self.status_counts, status); 349 increment_count( 350 &mut self.classification_counts, 351 classification.storage_key().to_owned(), 352 ); 353 for identity in duplicate_identities { 354 self.duplicate_records 355 .entry(identity) 356 .or_default() 357 .insert(record_id.to_owned()); 358 } 359 } 360 361 fn issue(&mut self, code: &str, record_id: Option<&str>, message: impl Into<String>) { 362 self.issues.push(AppSdkMigrationAuditIssue { 363 source: self.source, 364 code: code.to_owned(), 365 record_id: record_id.map(ToOwned::to_owned), 366 message: message.into(), 367 }); 368 } 369 370 fn finish(self) -> AppSdkMigrationAuditSourceReport { 371 AppSdkMigrationAuditSourceReport { 372 source: self.source, 373 batch_size: self.batch_size, 374 batch_count: self.batch_count, 375 scanned_records: self.scanned_records, 376 kind_counts: counts_from_map(self.kind_counts), 377 status_counts: counts_from_map(self.status_counts), 378 classification_counts: counts_from_map(self.classification_counts), 379 duplicate_candidates: duplicate_candidates_from_map(self.duplicate_records), 380 issues: self.issues, 381 } 382 } 383 } 384 385 #[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd)] 386 struct DuplicateIdentity { 387 kind: String, 388 key: String, 389 } 390 391 fn audit_local_outbox_row( 392 row: &LocalOutboxAuditRow, 393 receipt: Option<&AppSdkMigrationReceipt>, 394 report: &mut AppSdkMigrationAuditSourceBuilder, 395 ) { 396 let payload = serde_json::from_str::<AppPublishPayload>(row.payload_json.as_str()); 397 let (kind, source_classification) = match payload { 398 Ok(payload) => { 399 if row.operation_kind == SyncOperationKind::Delete.storage_key() { 400 report.issue( 401 "unsupported_local_outbox_operation", 402 Some(row.id.as_str()), 403 format!( 404 "local outbox delete operation `{}` is not a SDK publish migration candidate", 405 row.operation_key 406 ), 407 ); 408 ( 409 payload.work_kind().storage_key().to_owned(), 410 AppSdkMigrationAuditClassification::Unsupported, 411 ) 412 } else { 413 ( 414 payload.work_kind().storage_key().to_owned(), 415 classify_local_outbox_state(row, report), 416 ) 417 } 418 } 419 Err(source) => { 420 report.issue( 421 "unknown_local_outbox_payload", 422 Some(row.id.as_str()), 423 format!( 424 "local outbox payload for operation `{}` could not be decoded: {source}", 425 row.operation_key 426 ), 427 ); 428 ( 429 format!("{}:{}", row.aggregate_kind, row.operation_kind), 430 AppSdkMigrationAuditClassification::Unknown, 431 ) 432 } 433 }; 434 let classification = 435 classify_receipt_overlay(row.id.as_str(), source_classification, receipt, report); 436 let identities = vec![ 437 DuplicateIdentity { 438 kind: "operation".to_owned(), 439 key: format!("{}:{}", row.account_id, row.operation_key), 440 }, 441 DuplicateIdentity { 442 kind: "aggregate".to_owned(), 443 key: format!( 444 "{}:{}:{}:{}", 445 row.account_id, row.aggregate_kind, row.aggregate_id, row.operation_kind 446 ), 447 }, 448 ]; 449 report.record( 450 row.id.as_str(), 451 kind, 452 row.state.clone(), 453 classification, 454 identities, 455 ); 456 } 457 458 fn classify_local_outbox_state( 459 row: &LocalOutboxAuditRow, 460 report: &mut AppSdkMigrationAuditSourceBuilder, 461 ) -> AppSdkMigrationAuditClassification { 462 match row.state.as_str() { 463 "pending" | "in_progress" | "retryable" => { 464 AppSdkMigrationAuditClassification::PublishableCandidate 465 } 466 "succeeded" => AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate, 467 "failed" | "blocked" => { 468 report.issue( 469 "manual_review_local_outbox_state", 470 Some(row.id.as_str()), 471 format!( 472 "local outbox operation `{}` is in `{}` state and requires migration review", 473 row.operation_key, row.state 474 ), 475 ); 476 AppSdkMigrationAuditClassification::ManualReviewRequired 477 } 478 _ => { 479 report.issue( 480 "unknown_local_outbox_state", 481 Some(row.id.as_str()), 482 format!( 483 "local outbox operation `{}` has unknown state `{}`", 484 row.operation_key, row.state 485 ), 486 ); 487 AppSdkMigrationAuditClassification::Unknown 488 } 489 } 490 } 491 492 fn audit_shared_local_event_record( 493 record: &LocalEventRecord, 494 receipt: Option<&AppSdkMigrationReceipt>, 495 report: &mut AppSdkMigrationAuditSourceBuilder, 496 ) { 497 let kind = shared_local_event_kind(record); 498 let source_classification = shared_local_event_classification(record, report); 499 let classification = classify_receipt_overlay( 500 record.record_id.as_str(), 501 source_classification, 502 receipt, 503 report, 504 ); 505 report.record( 506 record.record_id.as_str(), 507 kind, 508 format!( 509 "{}:{}", 510 record.status.as_str(), 511 record.outbox_status.as_str() 512 ), 513 classification, 514 shared_local_event_duplicate_identities(record), 515 ); 516 } 517 518 fn classify_receipt_overlay( 519 record_id: &str, 520 source_classification: AppSdkMigrationAuditClassification, 521 receipt: Option<&AppSdkMigrationReceipt>, 522 report: &mut AppSdkMigrationAuditSourceBuilder, 523 ) -> AppSdkMigrationAuditClassification { 524 let Some(receipt) = receipt else { 525 return source_classification; 526 }; 527 if !receipt_allowed_for_source_classification(source_classification) { 528 report.issue( 529 "sdk_migration_receipt_for_non_migratable_source", 530 Some(record_id), 531 format!( 532 "SDK migration receipt `{}` for operation `{}` cannot override source classification `{}`", 533 receipt.id, 534 receipt.sdk_operation_kind, 535 source_classification.storage_key() 536 ), 537 ); 538 return source_classification; 539 } 540 541 match receipt.migration_state { 542 AppSdkMigrationState::Pending | AppSdkMigrationState::Prepared => source_classification, 543 AppSdkMigrationState::Enqueued | AppSdkMigrationState::Pushed => { 544 AppSdkMigrationAuditClassification::RepresentedRecord 545 } 546 AppSdkMigrationState::Skipped => AppSdkMigrationAuditClassification::SkippedRecord, 547 AppSdkMigrationState::Failed => { 548 report.issue( 549 "sdk_migration_receipt_failed", 550 Some(record_id), 551 format!( 552 "SDK migration receipt `{}` for operation `{}` is failed", 553 receipt.id, receipt.sdk_operation_kind 554 ), 555 ); 556 AppSdkMigrationAuditClassification::FailedRecord 557 } 558 AppSdkMigrationState::Blocked | AppSdkMigrationState::ManualReview => { 559 report.issue( 560 "sdk_migration_receipt_manual_review", 561 Some(record_id), 562 format!( 563 "SDK migration receipt `{}` for operation `{}` requires manual review", 564 receipt.id, receipt.sdk_operation_kind 565 ), 566 ); 567 AppSdkMigrationAuditClassification::ManualReviewRequired 568 } 569 AppSdkMigrationState::Unsupported => { 570 report.issue( 571 "sdk_migration_receipt_unsupported", 572 Some(record_id), 573 format!( 574 "SDK migration receipt `{}` for operation `{}` is unsupported", 575 receipt.id, receipt.sdk_operation_kind 576 ), 577 ); 578 AppSdkMigrationAuditClassification::Unsupported 579 } 580 AppSdkMigrationState::Unknown => { 581 report.issue( 582 "sdk_migration_receipt_unknown", 583 Some(record_id), 584 format!( 585 "SDK migration receipt `{}` for operation `{}` is unknown", 586 receipt.id, receipt.sdk_operation_kind 587 ), 588 ); 589 AppSdkMigrationAuditClassification::Unknown 590 } 591 } 592 } 593 594 fn receipt_allowed_for_source_classification( 595 classification: AppSdkMigrationAuditClassification, 596 ) -> bool { 597 matches!( 598 classification, 599 AppSdkMigrationAuditClassification::PublishableCandidate 600 | AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate 601 ) 602 } 603 604 fn shared_local_event_kind(record: &LocalEventRecord) -> String { 605 match record.family { 606 LocalRecordFamily::LocalWork => record 607 .local_work_json 608 .as_ref() 609 .and_then(local_work_record_kind) 610 .map(|kind| format!("local_work:{kind}")) 611 .unwrap_or_else(|| "local_work:unknown".to_owned()), 612 LocalRecordFamily::SignedEvent => record 613 .event_kind 614 .map(shared_signed_event_kind) 615 .unwrap_or_else(|| "signed_event:unknown".to_owned()), 616 } 617 } 618 619 fn shared_local_event_classification( 620 record: &LocalEventRecord, 621 report: &mut AppSdkMigrationAuditSourceBuilder, 622 ) -> AppSdkMigrationAuditClassification { 623 match record.family { 624 LocalRecordFamily::LocalWork => classify_shared_local_work(record, report), 625 LocalRecordFamily::SignedEvent => classify_shared_signed_event(record, report), 626 } 627 } 628 629 fn classify_shared_local_work( 630 record: &LocalEventRecord, 631 report: &mut AppSdkMigrationAuditSourceBuilder, 632 ) -> AppSdkMigrationAuditClassification { 633 match record 634 .local_work_json 635 .as_ref() 636 .and_then(local_work_record_kind) 637 { 638 Some("farm_config_v1" | "listing_draft_v1") => classify_shared_local_work_status(record), 639 Some(record_kind) => { 640 report.issue( 641 "unsupported_shared_local_work_kind", 642 Some(record.record_id.as_str()), 643 format!("shared local work kind `{record_kind}` is not a SDK migration candidate"), 644 ); 645 AppSdkMigrationAuditClassification::Unsupported 646 } 647 None => { 648 report.issue( 649 "unknown_shared_local_work_kind", 650 Some(record.record_id.as_str()), 651 "shared local work record does not expose a record_kind", 652 ); 653 AppSdkMigrationAuditClassification::Unknown 654 } 655 } 656 } 657 658 fn classify_shared_local_work_status( 659 record: &LocalEventRecord, 660 ) -> AppSdkMigrationAuditClassification { 661 if matches!(record.outbox_status, PublishOutboxStatus::Acknowledged) 662 || matches!(record.status, LocalRecordStatus::Published) 663 { 664 AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate 665 } else if matches!(record.outbox_status, PublishOutboxStatus::Failed) 666 || matches!( 667 record.status, 668 LocalRecordStatus::Failed | LocalRecordStatus::Conflict 669 ) 670 { 671 AppSdkMigrationAuditClassification::ManualReviewRequired 672 } else if matches!(record.status, LocalRecordStatus::PendingPublish) { 673 AppSdkMigrationAuditClassification::PublishableCandidate 674 } else { 675 AppSdkMigrationAuditClassification::LocalWorkDeferred 676 } 677 } 678 679 fn classify_shared_signed_event( 680 record: &LocalEventRecord, 681 report: &mut AppSdkMigrationAuditSourceBuilder, 682 ) -> AppSdkMigrationAuditClassification { 683 match record.event_kind { 684 Some(kind) if kind == KIND_TRADE_VALIDATION_RECEIPT as i64 => { 685 AppSdkMigrationAuditClassification::ValidationReceiptDeferred 686 } 687 Some(kind) if supported_signed_event_kind(kind) => { 688 if signed_event_is_already_represented(record.status, record.outbox_status) { 689 AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate 690 } else { 691 AppSdkMigrationAuditClassification::PublishableCandidate 692 } 693 } 694 Some(kind) => { 695 report.issue( 696 "unsupported_shared_signed_event_kind", 697 Some(record.record_id.as_str()), 698 format!("shared signed event kind `{kind}` is not a SDK migration candidate"), 699 ); 700 AppSdkMigrationAuditClassification::Unsupported 701 } 702 None => { 703 report.issue( 704 "unknown_shared_signed_event_kind", 705 Some(record.record_id.as_str()), 706 "shared signed event record does not expose an event_kind", 707 ); 708 AppSdkMigrationAuditClassification::Unknown 709 } 710 } 711 } 712 713 fn signed_event_is_already_represented( 714 status: LocalRecordStatus, 715 outbox_status: PublishOutboxStatus, 716 ) -> bool { 717 matches!(status, LocalRecordStatus::Published) 718 || matches!(outbox_status, PublishOutboxStatus::Acknowledged) 719 } 720 721 fn shared_local_event_duplicate_identities(record: &LocalEventRecord) -> Vec<DuplicateIdentity> { 722 let mut identities = Vec::new(); 723 if let (Some(event_kind), Some(event_id)) = ( 724 record.event_kind, 725 non_empty_value(record.event_id.as_deref()), 726 ) { 727 identities.push(DuplicateIdentity { 728 kind: "event".to_owned(), 729 key: format!("{event_kind}:{event_id}"), 730 }); 731 } 732 if let Some(key) = shared_local_event_aggregate_key(record) { 733 identities.push(DuplicateIdentity { 734 kind: "aggregate".to_owned(), 735 key, 736 }); 737 } 738 identities 739 } 740 741 fn shared_local_event_aggregate_key(record: &LocalEventRecord) -> Option<String> { 742 match record.family { 743 LocalRecordFamily::LocalWork => { 744 let record_kind = record 745 .local_work_json 746 .as_ref() 747 .and_then(local_work_record_kind)?; 748 non_empty_value(record.farm_id.as_deref()) 749 .map(|farm_id| format!("local_work:{record_kind}:farm:{farm_id}")) 750 .or_else(|| { 751 non_empty_value(record.listing_addr.as_deref()).map(|listing_addr| { 752 format!("local_work:{record_kind}:listing:{listing_addr}") 753 }) 754 }) 755 } 756 LocalRecordFamily::SignedEvent => { 757 let event_kind = record.event_kind?; 758 non_empty_value(record.listing_addr.as_deref()) 759 .map(|listing_addr| format!("signed_event:{event_kind}:listing:{listing_addr}")) 760 .or_else(|| { 761 non_empty_value(record.farm_id.as_deref()) 762 .map(|farm_id| format!("signed_event:{event_kind}:farm:{farm_id}")) 763 }) 764 } 765 } 766 } 767 768 fn local_work_record_kind(payload: &Value) -> Option<&str> { 769 payload 770 .get("record_kind") 771 .and_then(Value::as_str) 772 .map(str::trim) 773 .filter(|value| !value.is_empty()) 774 } 775 776 fn supported_signed_event_kind(kind: i64) -> bool { 777 matches!( 778 kind, 779 value if value == KIND_FARM as i64 780 || value == KIND_LISTING as i64 781 || value == KIND_LISTING_DRAFT as i64 782 || value == KIND_ORDER_REQUEST as i64 783 || value == KIND_ORDER_DECISION as i64 784 || value == KIND_ORDER_REVISION_PROPOSAL as i64 785 || value == KIND_ORDER_REVISION_DECISION as i64 786 || value == KIND_ORDER_CANCELLATION as i64 787 ) 788 } 789 790 fn shared_signed_event_kind(kind: i64) -> String { 791 let name = match kind { 792 value if value == KIND_FARM as i64 => "farm", 793 value if value == KIND_LISTING as i64 => "listing", 794 value if value == KIND_LISTING_DRAFT as i64 => "listing_draft", 795 value if value == KIND_ORDER_REQUEST as i64 => "order_request", 796 value if value == KIND_ORDER_DECISION as i64 => "order_decision", 797 value if value == KIND_ORDER_REVISION_PROPOSAL as i64 => "order_revision_proposal", 798 value if value == KIND_ORDER_REVISION_DECISION as i64 => "order_revision_decision", 799 value if value == KIND_ORDER_CANCELLATION as i64 => "order_cancellation", 800 value if value == KIND_TRADE_VALIDATION_RECEIPT as i64 => "trade_validation_receipt", 801 _ => "unsupported", 802 }; 803 format!("signed_event:{name}:{kind}") 804 } 805 806 fn non_empty_value(value: Option<&str>) -> Option<&str> { 807 value.map(str::trim).filter(|value| !value.is_empty()) 808 } 809 810 fn increment_count(counts: &mut BTreeMap<String, u64>, key: String) { 811 *counts.entry(key).or_default() += 1; 812 } 813 814 fn counts_from_map(counts: BTreeMap<String, u64>) -> Vec<AppSdkMigrationAuditCount> { 815 counts 816 .into_iter() 817 .map(|(key, count)| AppSdkMigrationAuditCount { key, count }) 818 .collect() 819 } 820 821 fn duplicate_candidates_from_map( 822 duplicate_records: BTreeMap<DuplicateIdentity, BTreeSet<String>>, 823 ) -> Vec<AppSdkMigrationAuditDuplicateCandidate> { 824 duplicate_records 825 .into_iter() 826 .filter_map(|(identity, records)| { 827 if records.len() < 2 { 828 return None; 829 } 830 Some(AppSdkMigrationAuditDuplicateCandidate { 831 identity_kind: identity.kind, 832 identity_key: identity.key, 833 record_count: records.len() as u64, 834 record_ids: records.into_iter().collect(), 835 }) 836 }) 837 .collect() 838 } 839 840 #[cfg(test)] 841 mod tests { 842 use radroots_app_sync::{ 843 AppFarmProfilePublishPayload, AppPublishContext, AppPublishPayload, PendingSyncOperation, 844 }; 845 use radroots_app_view::{FarmId, FarmReadiness}; 846 use radroots_events::kinds::{KIND_LISTING, KIND_ORDER_REQUEST, KIND_TRADE_VALIDATION_RECEIPT}; 847 use radroots_local_events::{ 848 LocalEventRecord, LocalEventRecordInput, LocalEventsStore, LocalRecordFamily, 849 LocalRecordStatus, PublishOutboxStatus, SourceRuntime, 850 }; 851 use radroots_sql_core::SqliteExecutor; 852 use rusqlite::params; 853 use serde_json::json; 854 855 use crate::{ 856 AppSdkMigrationAuditClassification, AppSdkMigrationAuditRequest, 857 AppSdkMigrationReceiptInput, AppSdkMigrationReceiptSourceKind, AppSdkMigrationState, 858 AppSqliteStore, DatabaseTarget, 859 }; 860 861 fn local_events_store() -> LocalEventsStore<SqliteExecutor> { 862 let executor = SqliteExecutor::open_memory().expect("open local events memory db"); 863 let store = LocalEventsStore::new(executor); 864 store.migrate_up().expect("migrate local events store"); 865 store 866 } 867 868 fn count_named(counts: &[crate::AppSdkMigrationAuditCount], key: &str) -> u64 { 869 counts 870 .iter() 871 .find(|count| count.key == key) 872 .map(|count| count.count) 873 .unwrap_or_default() 874 } 875 876 #[test] 877 fn local_outbox_audit_reads_batches_without_mutating_rows() { 878 let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app store"); 879 let shared_events = local_events_store(); 880 let farm_id = FarmId::new(); 881 let operation = PendingSyncOperation::from_publish_payload( 882 AppPublishPayload::FarmProfile(AppFarmProfilePublishPayload { 883 context: AppPublishContext::new("acct_a", "farm_setup"), 884 farm_id, 885 display_name: "Green Loop Farm".to_owned(), 886 readiness: Some(FarmReadiness::Ready), 887 }), 888 "2026-06-18T12:00:00Z", 889 ) 890 .expect("build publish operation"); 891 store 892 .sync_repository() 893 .enqueue_pending_operation("acct_a", &operation) 894 .expect("enqueue operation"); 895 store 896 .connection() 897 .execute( 898 "INSERT INTO local_outbox ( 899 id, 900 account_id, 901 operation_key, 902 aggregate_kind, 903 aggregate_id, 904 operation_kind, 905 payload_json, 906 created_at, 907 available_at, 908 attempt_count, 909 state, 910 last_error_message 911 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, NULL)", 912 params![ 913 "succeeded-duplicate", 914 "acct_a", 915 operation.operation_key, 916 operation.aggregate.aggregate_kind(), 917 operation.aggregate.aggregate_id(), 918 operation.operation.storage_key(), 919 operation.payload_json, 920 "2026-06-18T11:00:00Z", 921 "2026-06-18T11:00:00Z", 922 0_i64, 923 "succeeded", 924 ], 925 ) 926 .expect("insert succeeded duplicate"); 927 let before_count = local_outbox_row_count(&store); 928 929 let report = store 930 .audit_sdk_migration( 931 &shared_events, 932 AppSdkMigrationAuditRequest { batch_size: 1 }, 933 ) 934 .expect("audit should run"); 935 936 assert_eq!(local_outbox_row_count(&store), before_count); 937 assert_eq!(report.local_outbox.batch_size, 1); 938 assert_eq!(report.local_outbox.batch_count, 2); 939 assert_eq!(report.local_outbox.scanned_records, 2); 940 assert_eq!( 941 count_named(&report.local_outbox.kind_counts, "farm_profile"), 942 2 943 ); 944 assert_eq!( 945 count_named(&report.local_outbox.status_counts, "pending"), 946 1 947 ); 948 assert_eq!( 949 count_named(&report.local_outbox.status_counts, "succeeded"), 950 1 951 ); 952 assert_eq!( 953 count_named( 954 &report.local_outbox.classification_counts, 955 AppSdkMigrationAuditClassification::PublishableCandidate.storage_key() 956 ), 957 1 958 ); 959 assert_eq!( 960 count_named( 961 &report.local_outbox.classification_counts, 962 AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate.storage_key() 963 ), 964 1 965 ); 966 assert!( 967 report 968 .local_outbox 969 .duplicate_candidates 970 .iter() 971 .any(|candidate| candidate.identity_kind == "operation" 972 && candidate.record_count == 2) 973 ); 974 assert_eq!(report.shared_local_events.scanned_records, 0); 975 } 976 977 #[test] 978 fn local_outbox_audit_classifies_status_matrix() { 979 let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app store"); 980 let shared_events = local_events_store(); 981 let operation = farm_profile_operation("acct_seed", "status_matrix"); 982 983 for (index, state) in [ 984 "pending", 985 "in_progress", 986 "retryable", 987 "failed", 988 "blocked", 989 "succeeded", 990 ] 991 .iter() 992 .enumerate() 993 { 994 insert_local_outbox_audit_row( 995 &store, 996 &format!("local-outbox-{state}"), 997 &format!("acct_{index}"), 998 state, 999 &operation, 1000 ); 1001 } 1002 1003 let report = store 1004 .audit_sdk_migration( 1005 &shared_events, 1006 AppSdkMigrationAuditRequest { batch_size: 2 }, 1007 ) 1008 .expect("audit should run"); 1009 1010 assert_eq!(report.local_outbox.scanned_records, 6); 1011 assert_eq!( 1012 count_named( 1013 &report.local_outbox.classification_counts, 1014 AppSdkMigrationAuditClassification::PublishableCandidate.storage_key() 1015 ), 1016 3 1017 ); 1018 assert_eq!( 1019 count_named( 1020 &report.local_outbox.classification_counts, 1021 AppSdkMigrationAuditClassification::ManualReviewRequired.storage_key() 1022 ), 1023 2 1024 ); 1025 assert_eq!( 1026 count_named( 1027 &report.local_outbox.classification_counts, 1028 AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate.storage_key() 1029 ), 1030 1 1031 ); 1032 assert_eq!( 1033 report 1034 .local_outbox 1035 .issues 1036 .iter() 1037 .filter(|issue| issue.code == "manual_review_local_outbox_state") 1038 .count(), 1039 2 1040 ); 1041 } 1042 1043 #[test] 1044 fn local_outbox_audit_uses_migration_receipts_for_migratable_records() { 1045 let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app store"); 1046 let shared_events = local_events_store(); 1047 let operation = farm_profile_operation("acct_seed", "receipt_matrix"); 1048 1049 for (id, state) in [ 1050 ("represented-source", AppSdkMigrationState::Enqueued), 1051 ("skipped-source", AppSdkMigrationState::Skipped), 1052 ("failed-source", AppSdkMigrationState::Failed), 1053 ] { 1054 insert_local_outbox_audit_row(&store, id, id, "pending", &operation); 1055 record_local_outbox_receipt(&store, id, state); 1056 } 1057 1058 let report = store 1059 .audit_sdk_migration( 1060 &shared_events, 1061 AppSdkMigrationAuditRequest { batch_size: 10 }, 1062 ) 1063 .expect("audit should run"); 1064 1065 assert_eq!(report.local_outbox.scanned_records, 3); 1066 assert_eq!( 1067 count_named( 1068 &report.local_outbox.classification_counts, 1069 AppSdkMigrationAuditClassification::RepresentedRecord.storage_key() 1070 ), 1071 1 1072 ); 1073 assert_eq!( 1074 count_named( 1075 &report.local_outbox.classification_counts, 1076 AppSdkMigrationAuditClassification::SkippedRecord.storage_key() 1077 ), 1078 1 1079 ); 1080 assert_eq!( 1081 count_named( 1082 &report.local_outbox.classification_counts, 1083 AppSdkMigrationAuditClassification::FailedRecord.storage_key() 1084 ), 1085 1 1086 ); 1087 assert!( 1088 report 1089 .local_outbox 1090 .issues 1091 .iter() 1092 .any(|issue| issue.code == "sdk_migration_receipt_failed") 1093 ); 1094 } 1095 1096 #[test] 1097 fn local_outbox_audit_does_not_let_receipts_hide_non_migratable_rows() { 1098 let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app store"); 1099 let shared_events = local_events_store(); 1100 let operation = farm_profile_operation("acct_seed", "non_migratable"); 1101 1102 insert_local_outbox_audit_row(&store, "failed-source", "acct_failed", "failed", &operation); 1103 record_local_outbox_receipt(&store, "failed-source", AppSdkMigrationState::Enqueued); 1104 insert_local_outbox_audit_row( 1105 &store, 1106 "unsupported-source", 1107 "acct_unsupported", 1108 "pending", 1109 &PendingSyncOperation { 1110 operation: radroots_app_sync::SyncOperationKind::Delete, 1111 ..operation.clone() 1112 }, 1113 ); 1114 record_local_outbox_receipt(&store, "unsupported-source", AppSdkMigrationState::Enqueued); 1115 store 1116 .connection() 1117 .execute_batch("PRAGMA ignore_check_constraints = ON") 1118 .expect("disable sqlite checks for defensive unknown state row"); 1119 insert_local_outbox_audit_row( 1120 &store, 1121 "unknown-source", 1122 "acct_unknown", 1123 "mystery", 1124 &operation, 1125 ); 1126 store 1127 .connection() 1128 .execute_batch("PRAGMA ignore_check_constraints = OFF") 1129 .expect("restore sqlite checks"); 1130 record_local_outbox_receipt(&store, "unknown-source", AppSdkMigrationState::Enqueued); 1131 1132 let report = store 1133 .audit_sdk_migration( 1134 &shared_events, 1135 AppSdkMigrationAuditRequest { batch_size: 10 }, 1136 ) 1137 .expect("audit should run"); 1138 1139 assert_eq!( 1140 count_named( 1141 &report.local_outbox.classification_counts, 1142 AppSdkMigrationAuditClassification::RepresentedRecord.storage_key() 1143 ), 1144 0 1145 ); 1146 assert_eq!( 1147 count_named( 1148 &report.local_outbox.classification_counts, 1149 AppSdkMigrationAuditClassification::ManualReviewRequired.storage_key() 1150 ), 1151 1 1152 ); 1153 assert_eq!( 1154 count_named( 1155 &report.local_outbox.classification_counts, 1156 AppSdkMigrationAuditClassification::Unsupported.storage_key() 1157 ), 1158 1 1159 ); 1160 assert_eq!( 1161 count_named( 1162 &report.local_outbox.classification_counts, 1163 AppSdkMigrationAuditClassification::Unknown.storage_key() 1164 ), 1165 1 1166 ); 1167 assert_eq!( 1168 report 1169 .local_outbox 1170 .issues 1171 .iter() 1172 .filter(|issue| issue.code == "sdk_migration_receipt_for_non_migratable_source") 1173 .count(), 1174 3 1175 ); 1176 } 1177 1178 #[test] 1179 fn shared_local_events_audit_classifies_supported_events_and_validation_receipts() { 1180 let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app store"); 1181 let shared_events = local_events_store(); 1182 shared_events 1183 .append_record(&signed_event_record( 1184 "listing-a", 1185 "duplicate-listing-event", 1186 KIND_LISTING as i64, 1187 )) 1188 .expect("append listing a"); 1189 shared_events 1190 .append_record(&signed_event_record( 1191 "listing-b", 1192 "duplicate-listing-event", 1193 KIND_LISTING as i64, 1194 )) 1195 .expect("append listing b"); 1196 shared_events 1197 .append_record(&signed_event_record( 1198 "request", 1199 "request-event", 1200 KIND_ORDER_REQUEST as i64, 1201 )) 1202 .expect("append request"); 1203 shared_events 1204 .append_record(&signed_event_record( 1205 "validation-receipt", 1206 "validation-receipt-event", 1207 KIND_TRADE_VALIDATION_RECEIPT as i64, 1208 )) 1209 .expect("append validation receipt"); 1210 let before_records = shared_events 1211 .list_records_changed_after(0, 10) 1212 .expect("list records before audit") 1213 .len(); 1214 1215 let report = store 1216 .audit_sdk_migration( 1217 &shared_events, 1218 AppSdkMigrationAuditRequest { batch_size: 1 }, 1219 ) 1220 .expect("audit should run"); 1221 1222 assert_eq!( 1223 shared_events 1224 .list_records_changed_after(0, 10) 1225 .expect("list records after audit") 1226 .len(), 1227 before_records 1228 ); 1229 assert_eq!(report.shared_local_events.batch_count, 4); 1230 assert_eq!(report.shared_local_events.scanned_records, 4); 1231 assert_eq!( 1232 count_named( 1233 &report.shared_local_events.classification_counts, 1234 AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate.storage_key() 1235 ), 1236 3 1237 ); 1238 assert_eq!( 1239 count_named( 1240 &report.shared_local_events.classification_counts, 1241 AppSdkMigrationAuditClassification::ValidationReceiptDeferred.storage_key() 1242 ), 1243 1 1244 ); 1245 assert_eq!( 1246 count_named( 1247 &report.shared_local_events.classification_counts, 1248 AppSdkMigrationAuditClassification::PublishableCandidate.storage_key() 1249 ), 1250 0 1251 ); 1252 assert!( 1253 report 1254 .shared_local_events 1255 .duplicate_candidates 1256 .iter() 1257 .any(|candidate| candidate.identity_kind == "event" && candidate.record_count == 2) 1258 ); 1259 } 1260 1261 #[test] 1262 fn shared_local_work_audit_classifies_status_matrix() { 1263 let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app store"); 1264 let shared_events = local_events_store(); 1265 1266 for (record_id, record_kind, status, outbox_status) in [ 1267 ( 1268 "local-draft", 1269 "farm_config_v1", 1270 LocalRecordStatus::LocalDraft, 1271 PublishOutboxStatus::None, 1272 ), 1273 ( 1274 "local-saved", 1275 "listing_draft_v1", 1276 LocalRecordStatus::LocalSaved, 1277 PublishOutboxStatus::None, 1278 ), 1279 ( 1280 "pending-publish", 1281 "listing_draft_v1", 1282 LocalRecordStatus::PendingPublish, 1283 PublishOutboxStatus::None, 1284 ), 1285 ( 1286 "published", 1287 "farm_config_v1", 1288 LocalRecordStatus::Published, 1289 PublishOutboxStatus::None, 1290 ), 1291 ( 1292 "failed", 1293 "farm_config_v1", 1294 LocalRecordStatus::Failed, 1295 PublishOutboxStatus::None, 1296 ), 1297 ( 1298 "conflict", 1299 "listing_draft_v1", 1300 LocalRecordStatus::Conflict, 1301 PublishOutboxStatus::None, 1302 ), 1303 ] { 1304 shared_events 1305 .append_record(&local_work_record( 1306 record_id, 1307 record_kind, 1308 status, 1309 outbox_status, 1310 )) 1311 .expect("append local work record"); 1312 } 1313 1314 let report = store 1315 .audit_sdk_migration( 1316 &shared_events, 1317 AppSdkMigrationAuditRequest { batch_size: 3 }, 1318 ) 1319 .expect("audit should run"); 1320 1321 assert_eq!(report.shared_local_events.scanned_records, 6); 1322 assert_eq!( 1323 count_named( 1324 &report.shared_local_events.classification_counts, 1325 AppSdkMigrationAuditClassification::LocalWorkDeferred.storage_key() 1326 ), 1327 2 1328 ); 1329 assert_eq!( 1330 count_named( 1331 &report.shared_local_events.classification_counts, 1332 AppSdkMigrationAuditClassification::PublishableCandidate.storage_key() 1333 ), 1334 1 1335 ); 1336 assert_eq!( 1337 count_named( 1338 &report.shared_local_events.classification_counts, 1339 AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate.storage_key() 1340 ), 1341 1 1342 ); 1343 assert_eq!( 1344 count_named( 1345 &report.shared_local_events.classification_counts, 1346 AppSdkMigrationAuditClassification::ManualReviewRequired.storage_key() 1347 ), 1348 2 1349 ); 1350 } 1351 1352 #[test] 1353 fn shared_local_work_status_classifier_handles_defensive_outbox_states() { 1354 assert_eq!( 1355 super::classify_shared_local_work_status(&local_work_model_record( 1356 LocalRecordStatus::PendingPublish, 1357 PublishOutboxStatus::Acknowledged, 1358 )), 1359 AppSdkMigrationAuditClassification::AlreadyRepresentedCandidate 1360 ); 1361 assert_eq!( 1362 super::classify_shared_local_work_status(&local_work_model_record( 1363 LocalRecordStatus::PendingPublish, 1364 PublishOutboxStatus::Failed, 1365 )), 1366 AppSdkMigrationAuditClassification::ManualReviewRequired 1367 ); 1368 } 1369 1370 fn local_outbox_row_count(store: &AppSqliteStore) -> i64 { 1371 store 1372 .connection() 1373 .query_row("SELECT count(*) FROM local_outbox", [], |row| row.get(0)) 1374 .expect("count local outbox rows") 1375 } 1376 1377 fn farm_profile_operation(account_id: &str, source: &str) -> PendingSyncOperation { 1378 PendingSyncOperation::from_publish_payload( 1379 AppPublishPayload::FarmProfile(AppFarmProfilePublishPayload { 1380 context: AppPublishContext::new(account_id, source), 1381 farm_id: FarmId::new(), 1382 display_name: "Green Loop Farm".to_owned(), 1383 readiness: Some(FarmReadiness::Ready), 1384 }), 1385 "2026-06-18T12:00:00Z", 1386 ) 1387 .expect("build publish operation") 1388 } 1389 1390 fn insert_local_outbox_audit_row( 1391 store: &AppSqliteStore, 1392 id: &str, 1393 account_id: &str, 1394 state: &str, 1395 operation: &PendingSyncOperation, 1396 ) { 1397 store 1398 .connection() 1399 .execute( 1400 "INSERT INTO local_outbox ( 1401 id, 1402 account_id, 1403 operation_key, 1404 aggregate_kind, 1405 aggregate_id, 1406 operation_kind, 1407 payload_json, 1408 created_at, 1409 available_at, 1410 attempt_count, 1411 state, 1412 last_error_message 1413 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, NULL)", 1414 params![ 1415 id, 1416 account_id, 1417 operation.operation_key.as_str(), 1418 operation.aggregate.aggregate_kind(), 1419 operation.aggregate.aggregate_id(), 1420 operation.operation.storage_key(), 1421 operation.payload_json.as_str(), 1422 operation.created_at.as_str(), 1423 operation.available_at.as_str(), 1424 i64::from(operation.attempt_count), 1425 state, 1426 ], 1427 ) 1428 .expect("insert local outbox audit row"); 1429 } 1430 1431 fn record_local_outbox_receipt( 1432 store: &AppSqliteStore, 1433 source_record_id: &str, 1434 migration_state: AppSdkMigrationState, 1435 ) { 1436 store 1437 .sdk_migration_receipt_repository() 1438 .record_receipt(&AppSdkMigrationReceiptInput { 1439 source_kind: AppSdkMigrationReceiptSourceKind::LocalOutbox, 1440 source_record_id: source_record_id.to_owned(), 1441 sdk_operation_kind: "farm.publish".to_owned(), 1442 sdk_outbox_event_ids: vec![format!("sdk-outbox-{source_record_id}")], 1443 expected_event_id: Some(format!("event-{source_record_id}")), 1444 actor_pubkey: Some("actor-pubkey".to_owned()), 1445 idempotency_digest_prefix: Some("digest-prefix".to_owned()), 1446 migration_state, 1447 recorded_at: "2026-06-18T12:00:00Z".to_owned(), 1448 detail_json: json!({"source": source_record_id}), 1449 }) 1450 .expect("record local outbox receipt"); 1451 } 1452 1453 fn local_work_record( 1454 record_id: &str, 1455 record_kind: &str, 1456 status: LocalRecordStatus, 1457 outbox_status: PublishOutboxStatus, 1458 ) -> LocalEventRecordInput { 1459 LocalEventRecordInput { 1460 record_id: record_id.to_owned(), 1461 family: LocalRecordFamily::LocalWork, 1462 status, 1463 source_runtime: SourceRuntime::App, 1464 created_at_ms: 1000, 1465 inserted_at_ms: 1001, 1466 owner_account_id: Some("acct_a".to_owned()), 1467 owner_pubkey: Some("seller-pubkey".to_owned()), 1468 farm_id: Some("farm-key".to_owned()), 1469 listing_addr: Some("30402:seller-pubkey:listing-key".to_owned()), 1470 local_work_json: Some(json!({"record_kind": record_kind})), 1471 event_id: None, 1472 event_kind: None, 1473 event_pubkey: None, 1474 event_created_at: None, 1475 event_tags_json: None, 1476 event_content: None, 1477 event_sig: None, 1478 raw_event_json: None, 1479 outbox_status, 1480 relay_set_fingerprint: None, 1481 relay_delivery_json: None, 1482 } 1483 } 1484 1485 fn local_work_model_record( 1486 status: LocalRecordStatus, 1487 outbox_status: PublishOutboxStatus, 1488 ) -> LocalEventRecord { 1489 LocalEventRecord { 1490 seq: 1, 1491 change_seq: 1, 1492 record_id: "defensive-local-work".to_owned(), 1493 family: LocalRecordFamily::LocalWork, 1494 status, 1495 source_runtime: SourceRuntime::App, 1496 created_at_ms: 1000, 1497 inserted_at_ms: 1001, 1498 updated_at_ms: 1002, 1499 owner_account_id: Some("acct_a".to_owned()), 1500 owner_pubkey: Some("seller-pubkey".to_owned()), 1501 farm_id: Some("farm-key".to_owned()), 1502 listing_addr: Some("30402:seller-pubkey:listing-key".to_owned()), 1503 local_work_json: Some(json!({"record_kind": "listing_draft_v1"})), 1504 event_id: None, 1505 event_kind: None, 1506 event_pubkey: None, 1507 event_created_at: None, 1508 event_tags_json: None, 1509 event_content: None, 1510 event_sig: None, 1511 raw_event_json: None, 1512 outbox_status, 1513 relay_set_fingerprint: None, 1514 relay_delivery_json: None, 1515 } 1516 } 1517 1518 fn signed_event_record( 1519 record_id: &str, 1520 event_id: &str, 1521 event_kind: i64, 1522 ) -> LocalEventRecordInput { 1523 LocalEventRecordInput { 1524 record_id: record_id.to_owned(), 1525 family: LocalRecordFamily::SignedEvent, 1526 status: LocalRecordStatus::Published, 1527 source_runtime: SourceRuntime::App, 1528 created_at_ms: 1000, 1529 inserted_at_ms: 1001, 1530 owner_account_id: Some("acct_a".to_owned()), 1531 owner_pubkey: Some("seller-pubkey".to_owned()), 1532 farm_id: Some("farm-key".to_owned()), 1533 listing_addr: Some("30402:seller-pubkey:listing-key".to_owned()), 1534 local_work_json: None, 1535 event_id: Some(event_id.to_owned()), 1536 event_kind: Some(event_kind), 1537 event_pubkey: Some("seller-pubkey".to_owned()), 1538 event_created_at: Some(1000), 1539 event_tags_json: Some(json!([["d", "listing-key"]])), 1540 event_content: Some("{}".to_owned()), 1541 event_sig: Some("signature".to_owned()), 1542 raw_event_json: Some(json!({ 1543 "id": event_id, 1544 "kind": event_kind, 1545 "pubkey": "seller-pubkey" 1546 })), 1547 outbox_status: PublishOutboxStatus::Acknowledged, 1548 relay_set_fingerprint: Some("relay-set".to_owned()), 1549 relay_delivery_json: Some(json!({ 1550 "state": "acknowledged", 1551 "acknowledged_relays": ["wss://relay.example"] 1552 })), 1553 } 1554 } 1555 }