sync.rs (49988B)
1 use radroots_app_sync::{ 2 AppRelayIngestFreshnessState, AppRelayIngestRelayFreshness, AppRelayIngestScopeFreshness, 3 AppRelayIngestScopeStatus, PendingSyncOperation, PendingSyncOperationState, SyncAggregateRef, 4 SyncCheckpointState, SyncCheckpointStatus, SyncConflict, SyncConflictKind, 5 SyncConflictResolutionStatus, SyncConflictSeverity, SyncOperationKind, 6 }; 7 use radroots_app_view::{FarmId, FulfillmentWindowId, OrderId, ProductId}; 8 use rusqlite::{Connection, OptionalExtension, params}; 9 use uuid::Uuid; 10 11 use crate::AppSqliteError; 12 13 #[derive(Clone, Debug, Eq, PartialEq)] 14 pub struct StoredPendingSyncOperation { 15 pub operation_id: String, 16 pub operation: PendingSyncOperation, 17 } 18 19 #[derive(Clone, Debug, Eq, PartialEq)] 20 pub struct StoredSyncConflict { 21 pub conflict_id: String, 22 pub conflict: SyncConflict, 23 } 24 25 #[derive(Clone, Debug, Eq, PartialEq)] 26 pub struct StoredRelayIngestCursor { 27 pub relay_url: String, 28 pub cursor_since_unix_seconds: Option<i64>, 29 } 30 31 pub struct AppSyncRepository<'a> { 32 connection: &'a Connection, 33 } 34 35 impl<'a> AppSyncRepository<'a> { 36 pub const fn new(connection: &'a Connection) -> Self { 37 Self { connection } 38 } 39 40 pub fn enqueue_pending_operation( 41 &self, 42 account_id: &str, 43 operation: &PendingSyncOperation, 44 ) -> Result<String, AppSqliteError> { 45 let operation_id = Uuid::now_v7().to_string(); 46 47 self.connection 48 .execute( 49 "INSERT INTO local_outbox ( 50 id, 51 account_id, 52 operation_key, 53 aggregate_kind, 54 aggregate_id, 55 operation_kind, 56 payload_json, 57 created_at, 58 available_at, 59 attempt_count, 60 state, 61 last_error_message 62 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12) 63 ON CONFLICT(account_id, operation_key) 64 WHERE state IN ('pending', 'in_progress', 'failed', 'blocked', 'retryable') 65 DO UPDATE SET 66 aggregate_kind = excluded.aggregate_kind, 67 aggregate_id = excluded.aggregate_id, 68 operation_kind = excluded.operation_kind, 69 payload_json = excluded.payload_json, 70 created_at = excluded.created_at, 71 available_at = excluded.available_at, 72 attempt_count = 0, 73 state = 'pending', 74 last_error_message = NULL", 75 params![ 76 operation_id, 77 account_id, 78 operation.operation_key.as_str(), 79 operation.aggregate.aggregate_kind(), 80 aggregate_id_value(&operation.aggregate), 81 operation.operation.storage_key(), 82 operation.payload_json.as_str(), 83 operation.created_at.as_str(), 84 operation.available_at.as_str(), 85 i64::from(operation.attempt_count), 86 operation.state.storage_key(), 87 operation.last_error_message.as_deref(), 88 ], 89 ) 90 .map_err(|source| AppSqliteError::Query { 91 operation: "enqueue pending sync operation", 92 source, 93 })?; 94 95 self.connection 96 .query_row( 97 "SELECT id 98 FROM local_outbox 99 WHERE account_id = ?1 100 AND operation_key = ?2 101 AND state IN ('pending', 'in_progress', 'failed', 'blocked', 'retryable') 102 LIMIT 1", 103 params![account_id, operation.operation_key.as_str()], 104 |row| row.get::<_, String>(0), 105 ) 106 .map_err(|source| AppSqliteError::Query { 107 operation: "load pending sync operation id after enqueue", 108 source, 109 }) 110 } 111 112 pub fn load_pending_operations( 113 &self, 114 account_id: &str, 115 ) -> Result<Vec<StoredPendingSyncOperation>, AppSqliteError> { 116 let mut statement = self 117 .connection 118 .prepare( 119 "SELECT 120 id, 121 operation_key, 122 aggregate_kind, 123 aggregate_id, 124 operation_kind, 125 payload_json, 126 created_at, 127 available_at, 128 attempt_count, 129 state, 130 last_error_message 131 FROM local_outbox 132 WHERE account_id = ?1 133 AND state IN ('pending', 'in_progress', 'failed', 'blocked', 'retryable') 134 ORDER BY available_at ASC, created_at ASC, id ASC", 135 ) 136 .map_err(|source| AppSqliteError::Query { 137 operation: "prepare pending sync operations query", 138 source, 139 })?; 140 let rows = statement 141 .query_map([account_id], |row| { 142 Ok(( 143 row.get::<_, String>(0)?, 144 row.get::<_, String>(1)?, 145 row.get::<_, String>(2)?, 146 row.get::<_, String>(3)?, 147 row.get::<_, String>(4)?, 148 row.get::<_, String>(5)?, 149 row.get::<_, String>(6)?, 150 row.get::<_, String>(7)?, 151 row.get::<_, u32>(8)?, 152 row.get::<_, String>(9)?, 153 row.get::<_, Option<String>>(10)?, 154 )) 155 }) 156 .map_err(|source| AppSqliteError::Query { 157 operation: "query pending sync operations", 158 source, 159 })?; 160 161 rows.map(|row| { 162 let ( 163 operation_id, 164 operation_key, 165 aggregate_kind, 166 aggregate_id, 167 operation_kind, 168 payload_json, 169 created_at, 170 available_at, 171 attempt_count, 172 state, 173 last_error_message, 174 ) = row.map_err(|source| AppSqliteError::Query { 175 operation: "read pending sync operation row", 176 source, 177 })?; 178 179 Ok(StoredPendingSyncOperation { 180 operation_id, 181 operation: PendingSyncOperation { 182 operation_key, 183 aggregate: parse_sync_aggregate_ref( 184 "local_outbox.aggregate_kind", 185 "local_outbox.aggregate_id", 186 aggregate_kind, 187 aggregate_id, 188 )?, 189 operation: parse_sync_operation_kind(operation_kind)?, 190 payload_json, 191 created_at, 192 available_at, 193 attempt_count, 194 state: parse_pending_sync_operation_state(state)?, 195 last_error_message, 196 }, 197 }) 198 }) 199 .collect() 200 } 201 202 pub fn update_pending_operation_retry( 203 &self, 204 account_id: &str, 205 operation_id: &str, 206 available_at: &str, 207 attempt_count: u32, 208 last_error_message: Option<&str>, 209 ) -> Result<bool, AppSqliteError> { 210 let updated = self 211 .connection 212 .execute( 213 "UPDATE local_outbox 214 SET available_at = ?3, 215 attempt_count = ?4, 216 state = 'retryable', 217 last_error_message = ?5 218 WHERE account_id = ?1 AND id = ?2", 219 params![ 220 account_id, 221 operation_id, 222 available_at, 223 i64::from(attempt_count), 224 last_error_message 225 ], 226 ) 227 .map_err(|source| AppSqliteError::Query { 228 operation: "update pending sync operation retry", 229 source, 230 })?; 231 232 Ok(updated == 1) 233 } 234 235 pub fn dequeue_pending_operation( 236 &self, 237 account_id: &str, 238 operation_id: &str, 239 ) -> Result<bool, AppSqliteError> { 240 let deleted = self 241 .connection 242 .execute( 243 "DELETE FROM local_outbox WHERE account_id = ?1 AND id = ?2", 244 params![account_id, operation_id], 245 ) 246 .map_err(|source| AppSqliteError::Query { 247 operation: "dequeue pending sync operation", 248 source, 249 })?; 250 251 Ok(deleted == 1) 252 } 253 254 pub fn load_checkpoint( 255 &self, 256 account_id: &str, 257 ) -> Result<SyncCheckpointStatus, AppSqliteError> { 258 let row = self 259 .connection 260 .query_row( 261 "SELECT 262 state, 263 last_sync_started_at, 264 last_sync_completed_at, 265 last_remote_cursor, 266 last_error_message 267 FROM sync_checkpoints 268 WHERE account_id = ?1 269 LIMIT 1", 270 [account_id], 271 |row| { 272 Ok(( 273 row.get::<_, String>(0)?, 274 row.get::<_, Option<String>>(1)?, 275 row.get::<_, Option<String>>(2)?, 276 row.get::<_, Option<String>>(3)?, 277 row.get::<_, Option<String>>(4)?, 278 )) 279 }, 280 ) 281 .optional() 282 .map_err(|source| AppSqliteError::Query { 283 operation: "load sync checkpoint", 284 source, 285 })?; 286 287 row.map_or_else( 288 || Ok(SyncCheckpointStatus::never_synced()), 289 |( 290 state, 291 last_sync_started_at, 292 last_sync_completed_at, 293 last_remote_cursor, 294 last_error_message, 295 )| { 296 Ok(SyncCheckpointStatus { 297 state: parse_sync_checkpoint_state(state)?, 298 last_sync_started_at, 299 last_sync_completed_at, 300 last_remote_cursor, 301 last_error_message, 302 }) 303 }, 304 ) 305 } 306 307 pub fn save_checkpoint( 308 &self, 309 account_id: &str, 310 checkpoint: &SyncCheckpointStatus, 311 ) -> Result<(), AppSqliteError> { 312 self.connection 313 .execute( 314 "INSERT INTO sync_checkpoints ( 315 account_id, 316 state, 317 last_sync_started_at, 318 last_sync_completed_at, 319 last_remote_cursor, 320 last_error_message 321 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6) 322 ON CONFLICT(account_id) DO UPDATE SET 323 state = excluded.state, 324 last_sync_started_at = excluded.last_sync_started_at, 325 last_sync_completed_at = excluded.last_sync_completed_at, 326 last_remote_cursor = excluded.last_remote_cursor, 327 last_error_message = excluded.last_error_message", 328 params![ 329 account_id, 330 sync_checkpoint_state_value(checkpoint.state), 331 checkpoint.last_sync_started_at, 332 checkpoint.last_sync_completed_at, 333 checkpoint.last_remote_cursor, 334 checkpoint.last_error_message, 335 ], 336 ) 337 .map_err(|source| AppSqliteError::Query { 338 operation: "save sync checkpoint", 339 source, 340 })?; 341 342 Ok(()) 343 } 344 345 pub fn load_relay_ingest_cursors( 346 &self, 347 scope_key: &str, 348 relay_urls: &[String], 349 ) -> Result<Vec<StoredRelayIngestCursor>, AppSqliteError> { 350 relay_urls 351 .iter() 352 .map(|relay_url| { 353 let cursor_since_unix_seconds = self 354 .connection 355 .query_row( 356 "SELECT cursor_since_unix_seconds 357 FROM app_relay_ingest_freshness 358 WHERE scope_key = ?1 AND relay_url = ?2 359 LIMIT 1", 360 params![scope_key, relay_url.as_str()], 361 |row| row.get::<_, Option<i64>>(0), 362 ) 363 .optional() 364 .map_err(|source| AppSqliteError::Query { 365 operation: "load relay ingest cursor", 366 source, 367 })? 368 .flatten(); 369 370 Ok(StoredRelayIngestCursor { 371 relay_url: relay_url.clone(), 372 cursor_since_unix_seconds, 373 }) 374 }) 375 .collect() 376 } 377 378 pub fn load_relay_ingest_freshness( 379 &self, 380 scope_key: &str, 381 relay_urls: &[String], 382 now_unix_seconds: i64, 383 stale_after_seconds: i64, 384 ) -> Result<AppRelayIngestScopeFreshness, AppSqliteError> { 385 let relays = relay_urls 386 .iter() 387 .map(|relay_url| { 388 self.load_relay_ingest_relay_freshness( 389 scope_key, 390 relay_url, 391 now_unix_seconds, 392 stale_after_seconds, 393 ) 394 }) 395 .collect::<Result<Vec<_>, _>>()?; 396 let status = relay_ingest_scope_status(relays.as_slice()); 397 398 Ok(AppRelayIngestScopeFreshness { 399 scope_key: scope_key.to_owned(), 400 status, 401 relays, 402 }) 403 } 404 405 pub fn record_relay_ingest_success( 406 &self, 407 scope_key: &str, 408 relay_url: &str, 409 cursor_since_unix_seconds: i64, 410 last_event_created_at_unix_seconds: Option<i64>, 411 started_at: &str, 412 started_unix_seconds: i64, 413 completed_at: &str, 414 completed_unix_seconds: i64, 415 ) -> Result<(), AppSqliteError> { 416 self.connection 417 .execute( 418 "INSERT INTO app_relay_ingest_freshness ( 419 scope_key, 420 relay_url, 421 state, 422 cursor_since_unix_seconds, 423 last_event_created_at_unix_seconds, 424 last_fetch_started_at, 425 last_fetch_started_unix_seconds, 426 last_fetch_completed_at, 427 last_fetch_completed_unix_seconds, 428 last_success_at, 429 last_success_unix_seconds, 430 last_error_message, 431 updated_at 432 ) VALUES (?1, ?2, 'fresh', ?3, ?4, ?5, ?6, ?7, ?8, ?7, ?8, NULL, ?7) 433 ON CONFLICT(scope_key, relay_url) DO UPDATE SET 434 state = 'fresh', 435 cursor_since_unix_seconds = excluded.cursor_since_unix_seconds, 436 last_event_created_at_unix_seconds = excluded.last_event_created_at_unix_seconds, 437 last_fetch_started_at = excluded.last_fetch_started_at, 438 last_fetch_started_unix_seconds = excluded.last_fetch_started_unix_seconds, 439 last_fetch_completed_at = excluded.last_fetch_completed_at, 440 last_fetch_completed_unix_seconds = excluded.last_fetch_completed_unix_seconds, 441 last_success_at = excluded.last_success_at, 442 last_success_unix_seconds = excluded.last_success_unix_seconds, 443 last_error_message = NULL, 444 updated_at = excluded.updated_at", 445 params![ 446 scope_key, 447 relay_url, 448 cursor_since_unix_seconds, 449 last_event_created_at_unix_seconds, 450 started_at, 451 started_unix_seconds, 452 completed_at, 453 completed_unix_seconds, 454 ], 455 ) 456 .map_err(|source| AppSqliteError::Query { 457 operation: "record relay ingest success", 458 source, 459 })?; 460 461 Ok(()) 462 } 463 464 pub fn record_relay_ingest_failure( 465 &self, 466 scope_key: &str, 467 relay_url: &str, 468 started_at: &str, 469 started_unix_seconds: i64, 470 completed_at: &str, 471 completed_unix_seconds: i64, 472 error_message: &str, 473 ) -> Result<(), AppSqliteError> { 474 self.connection 475 .execute( 476 "INSERT INTO app_relay_ingest_freshness ( 477 scope_key, 478 relay_url, 479 state, 480 cursor_since_unix_seconds, 481 last_event_created_at_unix_seconds, 482 last_fetch_started_at, 483 last_fetch_started_unix_seconds, 484 last_fetch_completed_at, 485 last_fetch_completed_unix_seconds, 486 last_success_at, 487 last_success_unix_seconds, 488 last_error_message, 489 updated_at 490 ) VALUES (?1, ?2, 'failed', NULL, NULL, ?3, ?4, ?5, ?6, NULL, NULL, ?7, ?5) 491 ON CONFLICT(scope_key, relay_url) DO UPDATE SET 492 state = 'failed', 493 last_fetch_started_at = excluded.last_fetch_started_at, 494 last_fetch_started_unix_seconds = excluded.last_fetch_started_unix_seconds, 495 last_fetch_completed_at = excluded.last_fetch_completed_at, 496 last_fetch_completed_unix_seconds = excluded.last_fetch_completed_unix_seconds, 497 last_error_message = excluded.last_error_message, 498 updated_at = excluded.updated_at", 499 params![ 500 scope_key, 501 relay_url, 502 started_at, 503 started_unix_seconds, 504 completed_at, 505 completed_unix_seconds, 506 error_message, 507 ], 508 ) 509 .map_err(|source| AppSqliteError::Query { 510 operation: "record relay ingest failure", 511 source, 512 })?; 513 514 Ok(()) 515 } 516 517 fn load_relay_ingest_relay_freshness( 518 &self, 519 scope_key: &str, 520 relay_url: &str, 521 now_unix_seconds: i64, 522 stale_after_seconds: i64, 523 ) -> Result<AppRelayIngestRelayFreshness, AppSqliteError> { 524 let row = self 525 .connection 526 .query_row( 527 "SELECT 528 state, 529 cursor_since_unix_seconds, 530 last_event_created_at_unix_seconds, 531 last_fetch_started_at, 532 last_fetch_completed_at, 533 last_fetch_completed_unix_seconds, 534 last_success_at, 535 last_error_message 536 FROM app_relay_ingest_freshness 537 WHERE scope_key = ?1 AND relay_url = ?2 538 LIMIT 1", 539 params![scope_key, relay_url], 540 |row| { 541 Ok(( 542 row.get::<_, String>(0)?, 543 row.get::<_, Option<i64>>(1)?, 544 row.get::<_, Option<i64>>(2)?, 545 row.get::<_, Option<String>>(3)?, 546 row.get::<_, Option<String>>(4)?, 547 row.get::<_, Option<i64>>(5)?, 548 row.get::<_, Option<String>>(6)?, 549 row.get::<_, Option<String>>(7)?, 550 )) 551 }, 552 ) 553 .optional() 554 .map_err(|source| AppSqliteError::Query { 555 operation: "load relay ingest freshness", 556 source, 557 })?; 558 559 row.map_or_else( 560 || { 561 Ok(AppRelayIngestRelayFreshness { 562 relay_url: relay_url.to_owned(), 563 state: AppRelayIngestFreshnessState::Stale, 564 cursor_since_unix_seconds: None, 565 last_event_created_at_unix_seconds: None, 566 last_fetch_started_at: None, 567 last_fetch_completed_at: None, 568 last_success_at: None, 569 last_error_message: None, 570 }) 571 }, 572 |( 573 state, 574 cursor_since_unix_seconds, 575 last_event_created_at_unix_seconds, 576 last_fetch_started_at, 577 last_fetch_completed_at, 578 last_fetch_completed_unix_seconds, 579 last_success_at, 580 last_error_message, 581 )| { 582 let mut state = parse_relay_ingest_freshness_state(state)?; 583 if state == AppRelayIngestFreshnessState::Fresh 584 && relay_ingest_is_stale( 585 last_fetch_completed_unix_seconds, 586 now_unix_seconds, 587 stale_after_seconds, 588 ) 589 { 590 state = AppRelayIngestFreshnessState::Stale; 591 } 592 Ok(AppRelayIngestRelayFreshness { 593 relay_url: relay_url.to_owned(), 594 state, 595 cursor_since_unix_seconds, 596 last_event_created_at_unix_seconds, 597 last_fetch_started_at, 598 last_fetch_completed_at, 599 last_success_at, 600 last_error_message, 601 }) 602 }, 603 ) 604 } 605 606 pub fn record_conflict( 607 &self, 608 account_id: &str, 609 conflict: &SyncConflict, 610 ) -> Result<String, AppSqliteError> { 611 let conflict_id = Uuid::now_v7().to_string(); 612 613 self.connection 614 .execute( 615 "INSERT INTO local_conflicts ( 616 id, 617 account_id, 618 aggregate_kind, 619 aggregate_id, 620 conflict_kind, 621 severity, 622 resolution_status, 623 local_payload_json, 624 remote_payload_json, 625 detected_at, 626 resolved_at 627 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)", 628 params![ 629 conflict_id, 630 account_id, 631 conflict.aggregate.aggregate_kind(), 632 aggregate_id_value(&conflict.aggregate), 633 conflict.kind.storage_key(), 634 sync_conflict_severity_value(conflict.severity), 635 sync_conflict_resolution_status_value(conflict.resolution), 636 conflict.local_payload_json, 637 conflict.remote_payload_json, 638 conflict.detected_at, 639 conflict.resolved_at, 640 ], 641 ) 642 .map_err(|source| AppSqliteError::Query { 643 operation: "record sync conflict", 644 source, 645 })?; 646 647 Ok(conflict_id) 648 } 649 650 pub fn replace_conflicts( 651 &self, 652 account_id: &str, 653 conflicts: &[SyncConflict], 654 ) -> Result<(), AppSqliteError> { 655 self.connection 656 .execute( 657 "DELETE FROM local_conflicts WHERE account_id = ?1", 658 [account_id], 659 ) 660 .map_err(|source| AppSqliteError::Query { 661 operation: "clear sync conflicts", 662 source, 663 })?; 664 665 for conflict in conflicts { 666 let _ = self.record_conflict(account_id, conflict)?; 667 } 668 669 Ok(()) 670 } 671 672 pub fn load_conflicts( 673 &self, 674 account_id: &str, 675 ) -> Result<Vec<StoredSyncConflict>, AppSqliteError> { 676 let mut statement = self 677 .connection 678 .prepare( 679 "SELECT 680 id, 681 aggregate_kind, 682 aggregate_id, 683 conflict_kind, 684 severity, 685 resolution_status, 686 local_payload_json, 687 remote_payload_json, 688 detected_at, 689 resolved_at 690 FROM local_conflicts 691 WHERE account_id = ?1 692 ORDER BY detected_at DESC, id DESC", 693 ) 694 .map_err(|source| AppSqliteError::Query { 695 operation: "prepare sync conflicts query", 696 source, 697 })?; 698 let rows = statement 699 .query_map([account_id], |row| { 700 Ok(( 701 row.get::<_, String>(0)?, 702 row.get::<_, String>(1)?, 703 row.get::<_, String>(2)?, 704 row.get::<_, String>(3)?, 705 row.get::<_, String>(4)?, 706 row.get::<_, String>(5)?, 707 row.get::<_, String>(6)?, 708 row.get::<_, Option<String>>(7)?, 709 row.get::<_, String>(8)?, 710 row.get::<_, Option<String>>(9)?, 711 )) 712 }) 713 .map_err(|source| AppSqliteError::Query { 714 operation: "query sync conflicts", 715 source, 716 })?; 717 718 rows.map(|row| { 719 let ( 720 conflict_id, 721 aggregate_kind, 722 aggregate_id, 723 conflict_kind, 724 severity, 725 resolution_status, 726 local_payload_json, 727 remote_payload_json, 728 detected_at, 729 resolved_at, 730 ) = row.map_err(|source| AppSqliteError::Query { 731 operation: "read sync conflict row", 732 source, 733 })?; 734 735 Ok(StoredSyncConflict { 736 conflict_id, 737 conflict: SyncConflict { 738 aggregate: parse_sync_aggregate_ref( 739 "local_conflicts.aggregate_kind", 740 "local_conflicts.aggregate_id", 741 aggregate_kind, 742 aggregate_id, 743 )?, 744 kind: parse_sync_conflict_kind(conflict_kind)?, 745 severity: parse_sync_conflict_severity(severity)?, 746 resolution: parse_sync_conflict_resolution_status(resolution_status)?, 747 local_payload_json, 748 remote_payload_json, 749 detected_at, 750 resolved_at, 751 }, 752 }) 753 }) 754 .collect() 755 } 756 757 pub fn resolve_conflict( 758 &self, 759 account_id: &str, 760 conflict_id: &str, 761 resolution: SyncConflictResolutionStatus, 762 resolved_at: &str, 763 ) -> Result<bool, AppSqliteError> { 764 if resolution == SyncConflictResolutionStatus::Unresolved { 765 return Err(AppSqliteError::InvalidProjection { 766 reason: "sync conflict resolution must be terminal", 767 }); 768 } 769 770 let updated = self 771 .connection 772 .execute( 773 "UPDATE local_conflicts 774 SET resolution_status = ?3, resolved_at = ?4 775 WHERE account_id = ?1 AND id = ?2", 776 params![ 777 account_id, 778 conflict_id, 779 sync_conflict_resolution_status_value(resolution), 780 resolved_at, 781 ], 782 ) 783 .map_err(|source| AppSqliteError::Query { 784 operation: "resolve sync conflict", 785 source, 786 })?; 787 788 Ok(updated == 1) 789 } 790 } 791 792 fn aggregate_id_value(aggregate: &SyncAggregateRef) -> String { 793 match aggregate { 794 SyncAggregateRef::Farm(farm_id) => farm_id.to_string(), 795 SyncAggregateRef::FulfillmentWindow(fulfillment_window_id) => { 796 fulfillment_window_id.to_string() 797 } 798 SyncAggregateRef::Product(product_id) => product_id.to_string(), 799 SyncAggregateRef::Order(order_id) => order_id.to_string(), 800 } 801 } 802 803 fn parse_sync_aggregate_ref( 804 aggregate_kind_field: &'static str, 805 aggregate_id_field: &'static str, 806 aggregate_kind: String, 807 aggregate_id: String, 808 ) -> Result<SyncAggregateRef, AppSqliteError> { 809 match aggregate_kind.as_str() { 810 "farm" => Ok(SyncAggregateRef::Farm( 811 aggregate_id 812 .parse::<FarmId>() 813 .map_err(|_| AppSqliteError::DecodeId { 814 field: aggregate_id_field, 815 value: aggregate_id, 816 })?, 817 )), 818 "fulfillment_window" => Ok(SyncAggregateRef::FulfillmentWindow( 819 aggregate_id 820 .parse::<FulfillmentWindowId>() 821 .map_err(|_| AppSqliteError::DecodeId { 822 field: aggregate_id_field, 823 value: aggregate_id, 824 })?, 825 )), 826 "product" => Ok(SyncAggregateRef::Product( 827 aggregate_id 828 .parse::<ProductId>() 829 .map_err(|_| AppSqliteError::DecodeId { 830 field: aggregate_id_field, 831 value: aggregate_id, 832 })?, 833 )), 834 "order" => Ok(SyncAggregateRef::Order( 835 aggregate_id 836 .parse::<OrderId>() 837 .map_err(|_| AppSqliteError::DecodeId { 838 field: aggregate_id_field, 839 value: aggregate_id, 840 })?, 841 )), 842 _ => Err(AppSqliteError::DecodeEnum { 843 field: aggregate_kind_field, 844 value: aggregate_kind, 845 }), 846 } 847 } 848 849 fn parse_sync_operation_kind(value: String) -> Result<SyncOperationKind, AppSqliteError> { 850 match value.as_str() { 851 "upsert" => Ok(SyncOperationKind::Upsert), 852 "delete" => Ok(SyncOperationKind::Delete), 853 _ => Err(AppSqliteError::DecodeEnum { 854 field: "local_outbox.operation_kind", 855 value, 856 }), 857 } 858 } 859 860 fn parse_pending_sync_operation_state( 861 value: String, 862 ) -> Result<PendingSyncOperationState, AppSqliteError> { 863 match value.as_str() { 864 "pending" => Ok(PendingSyncOperationState::Pending), 865 "in_progress" => Ok(PendingSyncOperationState::InProgress), 866 "succeeded" => Ok(PendingSyncOperationState::Succeeded), 867 "failed" => Ok(PendingSyncOperationState::Failed), 868 "blocked" => Ok(PendingSyncOperationState::Blocked), 869 "retryable" => Ok(PendingSyncOperationState::Retryable), 870 _ => Err(AppSqliteError::DecodeEnum { 871 field: "local_outbox.state", 872 value, 873 }), 874 } 875 } 876 877 fn parse_sync_conflict_kind(value: String) -> Result<SyncConflictKind, AppSqliteError> { 878 match value.as_str() { 879 "revision_mismatch" => Ok(SyncConflictKind::RevisionMismatch), 880 "remote_delete" => Ok(SyncConflictKind::RemoteDelete), 881 "remote_validation_reject" => Ok(SyncConflictKind::RemoteValidationReject), 882 _ => Err(AppSqliteError::DecodeEnum { 883 field: "local_conflicts.conflict_kind", 884 value, 885 }), 886 } 887 } 888 889 fn parse_sync_conflict_severity(value: String) -> Result<SyncConflictSeverity, AppSqliteError> { 890 match value.as_str() { 891 "review_required" => Ok(SyncConflictSeverity::ReviewRequired), 892 "blocking" => Ok(SyncConflictSeverity::Blocking), 893 _ => Err(AppSqliteError::DecodeEnum { 894 field: "local_conflicts.severity", 895 value, 896 }), 897 } 898 } 899 900 fn parse_sync_conflict_resolution_status( 901 value: String, 902 ) -> Result<SyncConflictResolutionStatus, AppSqliteError> { 903 match value.as_str() { 904 "unresolved" => Ok(SyncConflictResolutionStatus::Unresolved), 905 "accepted_local" => Ok(SyncConflictResolutionStatus::AcceptedLocal), 906 "accepted_remote" => Ok(SyncConflictResolutionStatus::AcceptedRemote), 907 "dismissed" => Ok(SyncConflictResolutionStatus::Dismissed), 908 _ => Err(AppSqliteError::DecodeEnum { 909 field: "local_conflicts.resolution_status", 910 value, 911 }), 912 } 913 } 914 915 fn parse_sync_checkpoint_state(value: String) -> Result<SyncCheckpointState, AppSqliteError> { 916 match value.as_str() { 917 "never_synced" => Ok(SyncCheckpointState::NeverSynced), 918 "syncing" => Ok(SyncCheckpointState::Syncing), 919 "current" => Ok(SyncCheckpointState::Current), 920 "failed" => Ok(SyncCheckpointState::Failed), 921 _ => Err(AppSqliteError::DecodeEnum { 922 field: "sync_checkpoints.state", 923 value, 924 }), 925 } 926 } 927 928 fn sync_checkpoint_state_value(state: SyncCheckpointState) -> &'static str { 929 match state { 930 SyncCheckpointState::NeverSynced => "never_synced", 931 SyncCheckpointState::Syncing => "syncing", 932 SyncCheckpointState::Current => "current", 933 SyncCheckpointState::Failed => "failed", 934 } 935 } 936 937 fn sync_conflict_severity_value(severity: SyncConflictSeverity) -> &'static str { 938 match severity { 939 SyncConflictSeverity::ReviewRequired => "review_required", 940 SyncConflictSeverity::Blocking => "blocking", 941 } 942 } 943 944 fn sync_conflict_resolution_status_value(resolution: SyncConflictResolutionStatus) -> &'static str { 945 match resolution { 946 SyncConflictResolutionStatus::Unresolved => "unresolved", 947 SyncConflictResolutionStatus::AcceptedLocal => "accepted_local", 948 SyncConflictResolutionStatus::AcceptedRemote => "accepted_remote", 949 SyncConflictResolutionStatus::Dismissed => "dismissed", 950 } 951 } 952 953 fn parse_relay_ingest_freshness_state( 954 value: String, 955 ) -> Result<AppRelayIngestFreshnessState, AppSqliteError> { 956 match value.as_str() { 957 "fresh" => Ok(AppRelayIngestFreshnessState::Fresh), 958 "stale" => Ok(AppRelayIngestFreshnessState::Stale), 959 "failed" => Ok(AppRelayIngestFreshnessState::Failed), 960 _ => Err(AppSqliteError::DecodeEnum { 961 field: "app_relay_ingest_freshness.state", 962 value, 963 }), 964 } 965 } 966 967 fn relay_ingest_is_stale( 968 last_fetch_completed_unix_seconds: Option<i64>, 969 now_unix_seconds: i64, 970 stale_after_seconds: i64, 971 ) -> bool { 972 let Some(last_fetch_completed_unix_seconds) = last_fetch_completed_unix_seconds else { 973 return true; 974 }; 975 now_unix_seconds.saturating_sub(last_fetch_completed_unix_seconds) > stale_after_seconds 976 } 977 978 fn relay_ingest_scope_status(relays: &[AppRelayIngestRelayFreshness]) -> AppRelayIngestScopeStatus { 979 if relays.is_empty() { 980 return AppRelayIngestScopeStatus::Stale; 981 } 982 let failed_count = relays 983 .iter() 984 .filter(|relay| relay.state == AppRelayIngestFreshnessState::Failed) 985 .count(); 986 if failed_count == relays.len() { 987 return AppRelayIngestScopeStatus::Failed; 988 } 989 if failed_count > 0 { 990 return AppRelayIngestScopeStatus::Partial; 991 } 992 if relays 993 .iter() 994 .all(|relay| relay.state == AppRelayIngestFreshnessState::Fresh) 995 { 996 AppRelayIngestScopeStatus::Fresh 997 } else { 998 AppRelayIngestScopeStatus::Stale 999 } 1000 } 1001 1002 #[cfg(test)] 1003 mod tests { 1004 use radroots_app_sync::{ 1005 AppRelayIngestFreshnessState, AppRelayIngestScopeStatus, PendingSyncOperation, 1006 PendingSyncOperationState, SyncAggregateRef, SyncCheckpointStatus, SyncConflict, 1007 SyncConflictKind, SyncConflictResolutionStatus, SyncConflictSeverity, SyncOperationKind, 1008 }; 1009 use radroots_app_view::{FarmId, ProductId}; 1010 1011 use crate::{AppSqliteStore, DatabaseTarget}; 1012 1013 #[test] 1014 fn checkpoints_are_selected_account_scoped() { 1015 let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("store should open"); 1016 let repository = store.sync_repository(); 1017 let checkpoint = 1018 SyncCheckpointStatus::syncing("2026-04-20T18:00:00Z", Some("cursor-1".to_owned())); 1019 1020 assert_eq!( 1021 repository 1022 .load_checkpoint("acct_a") 1023 .expect("missing checkpoint should load"), 1024 SyncCheckpointStatus::never_synced() 1025 ); 1026 1027 repository 1028 .save_checkpoint("acct_a", &checkpoint) 1029 .expect("checkpoint should save"); 1030 1031 assert_eq!( 1032 repository 1033 .load_checkpoint("acct_a") 1034 .expect("saved checkpoint should load"), 1035 checkpoint 1036 ); 1037 assert_eq!( 1038 repository 1039 .load_checkpoint("acct_b") 1040 .expect("other account checkpoint should load"), 1041 SyncCheckpointStatus::never_synced() 1042 ); 1043 } 1044 1045 #[test] 1046 fn relay_ingest_freshness_tracks_cursors_and_scope_status() { 1047 let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("store should open"); 1048 let repository = store.sync_repository(); 1049 let relay_urls = vec![ 1050 "wss://relay-a.example".to_owned(), 1051 "wss://relay-b.example".to_owned(), 1052 ]; 1053 1054 let initial = repository 1055 .load_relay_ingest_freshness("direct_relay_ingest", &relay_urls, 1_000, 60) 1056 .expect("freshness should load"); 1057 assert_eq!(initial.status, AppRelayIngestScopeStatus::Stale); 1058 assert_eq!(initial.relays.len(), 2); 1059 assert!( 1060 initial 1061 .relays 1062 .iter() 1063 .all(|relay| relay.state == AppRelayIngestFreshnessState::Stale) 1064 ); 1065 1066 repository 1067 .record_relay_ingest_success( 1068 "direct_relay_ingest", 1069 "wss://relay-a.example", 1070 1_010, 1071 Some(1_009), 1072 "2026-05-25T20:00:00Z", 1073 1_000, 1074 "2026-05-25T20:00:02Z", 1075 1_002, 1076 ) 1077 .expect("success should record"); 1078 repository 1079 .record_relay_ingest_failure( 1080 "direct_relay_ingest", 1081 "wss://relay-b.example", 1082 "2026-05-25T20:00:00Z", 1083 1_000, 1084 "2026-05-25T20:00:02Z", 1085 1_002, 1086 "relay timeout", 1087 ) 1088 .expect("failure should record"); 1089 1090 let cursors = repository 1091 .load_relay_ingest_cursors("direct_relay_ingest", &relay_urls) 1092 .expect("cursors should load"); 1093 assert_eq!(cursors[0].cursor_since_unix_seconds, Some(1_010)); 1094 assert_eq!(cursors[1].cursor_since_unix_seconds, None); 1095 1096 let partial = repository 1097 .load_relay_ingest_freshness("direct_relay_ingest", &relay_urls, 1_005, 60) 1098 .expect("partial freshness should load"); 1099 assert_eq!(partial.status, AppRelayIngestScopeStatus::Partial); 1100 assert_eq!(partial.relays[0].state, AppRelayIngestFreshnessState::Fresh); 1101 assert_eq!( 1102 partial.relays[1].state, 1103 AppRelayIngestFreshnessState::Failed 1104 ); 1105 assert_eq!( 1106 partial.relays[1].last_error_message.as_deref(), 1107 Some("relay timeout") 1108 ); 1109 1110 let stale = repository 1111 .load_relay_ingest_freshness( 1112 "direct_relay_ingest", 1113 &["wss://relay-a.example".to_owned()], 1114 1_100, 1115 60, 1116 ) 1117 .expect("stale freshness should load"); 1118 assert_eq!(stale.status, AppRelayIngestScopeStatus::Stale); 1119 assert_eq!(stale.relays[0].state, AppRelayIngestFreshnessState::Stale); 1120 assert_eq!(stale.relays[0].cursor_since_unix_seconds, Some(1_010)); 1121 } 1122 1123 #[test] 1124 fn pending_operations_are_account_scoped_and_retryable() { 1125 let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("store should open"); 1126 let repository = store.sync_repository(); 1127 let first = PendingSyncOperation::new( 1128 SyncAggregateRef::Farm(FarmId::new()), 1129 SyncOperationKind::Upsert, 1130 "{\"farm\":\"a\"}", 1131 "2026-04-20T18:00:00Z", 1132 ); 1133 let second = PendingSyncOperation::new( 1134 SyncAggregateRef::Product(ProductId::new()), 1135 SyncOperationKind::Delete, 1136 "{\"product\":\"b\"}", 1137 "2026-04-20T18:05:00Z", 1138 ); 1139 1140 let first_id = repository 1141 .enqueue_pending_operation("acct_a", &first) 1142 .expect("first operation should save"); 1143 let second_id = repository 1144 .enqueue_pending_operation("acct_a", &second) 1145 .expect("second operation should save"); 1146 repository 1147 .enqueue_pending_operation("acct_b", &first) 1148 .expect("other account operation should save"); 1149 1150 let before_retry = repository 1151 .load_pending_operations("acct_a") 1152 .expect("pending operations should load"); 1153 assert_eq!(before_retry.len(), 2); 1154 assert_eq!(before_retry[0].operation, first); 1155 assert_eq!(before_retry[1].operation, second); 1156 1157 assert!( 1158 repository 1159 .update_pending_operation_retry( 1160 "acct_a", 1161 &first_id, 1162 "2026-04-20T18:10:00Z", 1163 2, 1164 Some("relay timeout"), 1165 ) 1166 .expect("retry update should succeed") 1167 ); 1168 assert!( 1169 !repository 1170 .update_pending_operation_retry( 1171 "acct_b", 1172 &first_id, 1173 "2026-04-20T18:10:00Z", 1174 3, 1175 Some("wrong account"), 1176 ) 1177 .expect("wrong-account retry update should not succeed") 1178 ); 1179 assert!( 1180 repository 1181 .dequeue_pending_operation("acct_a", &second_id) 1182 .expect("dequeue should succeed") 1183 ); 1184 1185 let acct_a = repository 1186 .load_pending_operations("acct_a") 1187 .expect("account operations should reload"); 1188 let acct_b = repository 1189 .load_pending_operations("acct_b") 1190 .expect("other account operations should reload"); 1191 1192 assert_eq!(acct_a.len(), 1); 1193 assert_eq!(acct_a[0].operation_id, first_id); 1194 assert_eq!(acct_a[0].operation.attempt_count, 2); 1195 assert_eq!( 1196 acct_a[0].operation.state, 1197 PendingSyncOperationState::Retryable 1198 ); 1199 assert_eq!( 1200 acct_a[0].operation.available_at, 1201 "2026-04-20T18:10:00Z".to_owned() 1202 ); 1203 assert_eq!( 1204 acct_a[0].operation.last_error_message.as_deref(), 1205 Some("relay timeout") 1206 ); 1207 assert_eq!(acct_b.len(), 1); 1208 } 1209 1210 #[test] 1211 fn outbox_enqueue_upserts_active_operation_by_deterministic_key() { 1212 let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("store should open"); 1213 let repository = store.sync_repository(); 1214 let product_id = ProductId::new(); 1215 let first = PendingSyncOperation::new( 1216 SyncAggregateRef::Product(product_id), 1217 SyncOperationKind::Upsert, 1218 "{\"title\":\"greens\"}", 1219 "2026-04-20T18:00:00Z", 1220 ); 1221 let mut replacement = PendingSyncOperation::new( 1222 SyncAggregateRef::Product(product_id), 1223 SyncOperationKind::Upsert, 1224 "{\"title\":\"winter greens\"}", 1225 "2026-04-20T18:05:00Z", 1226 ); 1227 replacement.attempt_count = 3; 1228 replacement.state = PendingSyncOperationState::Failed; 1229 replacement.last_error_message = Some("stale relay state".to_owned()); 1230 1231 let first_id = repository 1232 .enqueue_pending_operation("acct_a", &first) 1233 .expect("first operation should save"); 1234 let replacement_id = repository 1235 .enqueue_pending_operation("acct_a", &replacement) 1236 .expect("replacement operation should upsert"); 1237 1238 let pending = repository 1239 .load_pending_operations("acct_a") 1240 .expect("pending operations should load"); 1241 1242 assert_eq!(replacement_id, first_id); 1243 assert_eq!(pending.len(), 1); 1244 assert_eq!(pending[0].operation_id, first_id); 1245 assert_eq!(pending[0].operation.operation_key, first.operation_key); 1246 assert_eq!( 1247 pending[0].operation.payload_json, 1248 "{\"title\":\"winter greens\"}" 1249 ); 1250 assert_eq!(pending[0].operation.attempt_count, 0); 1251 assert_eq!( 1252 pending[0].operation.state, 1253 PendingSyncOperationState::Pending 1254 ); 1255 assert_eq!(pending[0].operation.last_error_message, None); 1256 } 1257 1258 #[test] 1259 fn conflicts_are_account_scoped_and_resolvable() { 1260 let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("store should open"); 1261 let repository = store.sync_repository(); 1262 let first = SyncConflict { 1263 aggregate: SyncAggregateRef::Farm(FarmId::new()), 1264 kind: SyncConflictKind::RevisionMismatch, 1265 severity: SyncConflictSeverity::Blocking, 1266 resolution: SyncConflictResolutionStatus::Unresolved, 1267 local_payload_json: "{\"farm\":\"local\"}".to_owned(), 1268 remote_payload_json: Some("{\"farm\":\"remote\"}".to_owned()), 1269 detected_at: "2026-04-20T18:00:00Z".to_owned(), 1270 resolved_at: None, 1271 }; 1272 let second = SyncConflict { 1273 aggregate: SyncAggregateRef::Product(ProductId::new()), 1274 kind: SyncConflictKind::RemoteValidationReject, 1275 severity: SyncConflictSeverity::ReviewRequired, 1276 resolution: SyncConflictResolutionStatus::Unresolved, 1277 local_payload_json: "{\"product\":\"local\"}".to_owned(), 1278 remote_payload_json: None, 1279 detected_at: "2026-04-20T18:05:00Z".to_owned(), 1280 resolved_at: None, 1281 }; 1282 1283 let first_id = repository 1284 .record_conflict("acct_a", &first) 1285 .expect("first conflict should save"); 1286 repository 1287 .record_conflict("acct_b", &second) 1288 .expect("other account conflict should save"); 1289 1290 assert!( 1291 repository 1292 .resolve_conflict( 1293 "acct_a", 1294 &first_id, 1295 SyncConflictResolutionStatus::AcceptedLocal, 1296 "2026-04-20T18:06:00Z", 1297 ) 1298 .expect("conflict resolution should succeed") 1299 ); 1300 assert!( 1301 !repository 1302 .resolve_conflict( 1303 "acct_b", 1304 &first_id, 1305 SyncConflictResolutionStatus::AcceptedRemote, 1306 "2026-04-20T18:07:00Z", 1307 ) 1308 .expect("wrong-account resolution should not succeed") 1309 ); 1310 1311 let acct_a = repository 1312 .load_conflicts("acct_a") 1313 .expect("account conflicts should load"); 1314 let acct_b = repository 1315 .load_conflicts("acct_b") 1316 .expect("other account conflicts should load"); 1317 1318 assert_eq!(acct_a.len(), 1); 1319 assert_eq!(acct_a[0].conflict_id, first_id); 1320 assert_eq!( 1321 acct_a[0].conflict.resolution, 1322 SyncConflictResolutionStatus::AcceptedLocal 1323 ); 1324 assert_eq!( 1325 acct_a[0].conflict.resolved_at.as_deref(), 1326 Some("2026-04-20T18:06:00Z") 1327 ); 1328 assert_eq!(acct_b.len(), 1); 1329 assert_eq!(acct_b[0].conflict, second); 1330 } 1331 1332 #[test] 1333 fn replacing_conflicts_clears_stale_rows_for_the_selected_account() { 1334 let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("store should open"); 1335 let repository = store.sync_repository(); 1336 let first = SyncConflict { 1337 aggregate: SyncAggregateRef::Farm(FarmId::new()), 1338 kind: SyncConflictKind::RevisionMismatch, 1339 severity: SyncConflictSeverity::Blocking, 1340 resolution: SyncConflictResolutionStatus::Unresolved, 1341 local_payload_json: "{\"farm\":\"local\"}".to_owned(), 1342 remote_payload_json: Some("{\"farm\":\"remote\"}".to_owned()), 1343 detected_at: "2026-04-20T18:00:00Z".to_owned(), 1344 resolved_at: None, 1345 }; 1346 let second = SyncConflict { 1347 aggregate: SyncAggregateRef::Product(ProductId::new()), 1348 kind: SyncConflictKind::RemoteValidationReject, 1349 severity: SyncConflictSeverity::ReviewRequired, 1350 resolution: SyncConflictResolutionStatus::Unresolved, 1351 local_payload_json: "{\"product\":\"local\"}".to_owned(), 1352 remote_payload_json: None, 1353 detected_at: "2026-04-20T18:05:00Z".to_owned(), 1354 resolved_at: None, 1355 }; 1356 1357 repository 1358 .record_conflict("acct_a", &first) 1359 .expect("first conflict should save"); 1360 repository 1361 .record_conflict("acct_b", &first) 1362 .expect("other account conflict should save"); 1363 1364 repository 1365 .replace_conflicts("acct_a", std::slice::from_ref(&second)) 1366 .expect("conflicts should replace"); 1367 1368 let acct_a = repository 1369 .load_conflicts("acct_a") 1370 .expect("account conflicts should load"); 1371 let acct_b = repository 1372 .load_conflicts("acct_b") 1373 .expect("other account conflicts should load"); 1374 1375 assert_eq!(acct_a.len(), 1); 1376 assert_eq!(acct_a[0].conflict, second); 1377 assert_eq!(acct_b.len(), 1); 1378 assert_eq!(acct_b[0].conflict, first); 1379 } 1380 }