app

Local-first trade for farms and co-ops
git clone https://radroots.dev/git/app.git
Log | Files | Refs | README | LICENSE

sync.rs (49988B)


      1 use radroots_app_sync::{
      2     AppRelayIngestFreshnessState, AppRelayIngestRelayFreshness, AppRelayIngestScopeFreshness,
      3     AppRelayIngestScopeStatus, PendingSyncOperation, PendingSyncOperationState, SyncAggregateRef,
      4     SyncCheckpointState, SyncCheckpointStatus, SyncConflict, SyncConflictKind,
      5     SyncConflictResolutionStatus, SyncConflictSeverity, SyncOperationKind,
      6 };
      7 use radroots_app_view::{FarmId, FulfillmentWindowId, OrderId, ProductId};
      8 use rusqlite::{Connection, OptionalExtension, params};
      9 use uuid::Uuid;
     10 
     11 use crate::AppSqliteError;
     12 
     13 #[derive(Clone, Debug, Eq, PartialEq)]
     14 pub struct StoredPendingSyncOperation {
     15     pub operation_id: String,
     16     pub operation: PendingSyncOperation,
     17 }
     18 
     19 #[derive(Clone, Debug, Eq, PartialEq)]
     20 pub struct StoredSyncConflict {
     21     pub conflict_id: String,
     22     pub conflict: SyncConflict,
     23 }
     24 
     25 #[derive(Clone, Debug, Eq, PartialEq)]
     26 pub struct StoredRelayIngestCursor {
     27     pub relay_url: String,
     28     pub cursor_since_unix_seconds: Option<i64>,
     29 }
     30 
     31 pub struct AppSyncRepository<'a> {
     32     connection: &'a Connection,
     33 }
     34 
     35 impl<'a> AppSyncRepository<'a> {
     36     pub const fn new(connection: &'a Connection) -> Self {
     37         Self { connection }
     38     }
     39 
     40     pub fn enqueue_pending_operation(
     41         &self,
     42         account_id: &str,
     43         operation: &PendingSyncOperation,
     44     ) -> Result<String, AppSqliteError> {
     45         let operation_id = Uuid::now_v7().to_string();
     46 
     47         self.connection
     48             .execute(
     49                 "INSERT INTO local_outbox (
     50                     id,
     51                     account_id,
     52                     operation_key,
     53                     aggregate_kind,
     54                     aggregate_id,
     55                     operation_kind,
     56                     payload_json,
     57                     created_at,
     58                     available_at,
     59                     attempt_count,
     60                     state,
     61                     last_error_message
     62                 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
     63                 ON CONFLICT(account_id, operation_key)
     64                 WHERE state IN ('pending', 'in_progress', 'failed', 'blocked', 'retryable')
     65                 DO UPDATE SET
     66                     aggregate_kind = excluded.aggregate_kind,
     67                     aggregate_id = excluded.aggregate_id,
     68                     operation_kind = excluded.operation_kind,
     69                     payload_json = excluded.payload_json,
     70                     created_at = excluded.created_at,
     71                     available_at = excluded.available_at,
     72                     attempt_count = 0,
     73                     state = 'pending',
     74                     last_error_message = NULL",
     75                 params![
     76                     operation_id,
     77                     account_id,
     78                     operation.operation_key.as_str(),
     79                     operation.aggregate.aggregate_kind(),
     80                     aggregate_id_value(&operation.aggregate),
     81                     operation.operation.storage_key(),
     82                     operation.payload_json.as_str(),
     83                     operation.created_at.as_str(),
     84                     operation.available_at.as_str(),
     85                     i64::from(operation.attempt_count),
     86                     operation.state.storage_key(),
     87                     operation.last_error_message.as_deref(),
     88                 ],
     89             )
     90             .map_err(|source| AppSqliteError::Query {
     91                 operation: "enqueue pending sync operation",
     92                 source,
     93             })?;
     94 
     95         self.connection
     96             .query_row(
     97                 "SELECT id
     98                  FROM local_outbox
     99                  WHERE account_id = ?1
    100                     AND operation_key = ?2
    101                     AND state IN ('pending', 'in_progress', 'failed', 'blocked', 'retryable')
    102                  LIMIT 1",
    103                 params![account_id, operation.operation_key.as_str()],
    104                 |row| row.get::<_, String>(0),
    105             )
    106             .map_err(|source| AppSqliteError::Query {
    107                 operation: "load pending sync operation id after enqueue",
    108                 source,
    109             })
    110     }
    111 
    112     pub fn load_pending_operations(
    113         &self,
    114         account_id: &str,
    115     ) -> Result<Vec<StoredPendingSyncOperation>, AppSqliteError> {
    116         let mut statement = self
    117             .connection
    118             .prepare(
    119                 "SELECT
    120                     id,
    121                     operation_key,
    122                     aggregate_kind,
    123                     aggregate_id,
    124                     operation_kind,
    125                     payload_json,
    126                     created_at,
    127                     available_at,
    128                     attempt_count,
    129                     state,
    130                     last_error_message
    131                  FROM local_outbox
    132                  WHERE account_id = ?1
    133                     AND state IN ('pending', 'in_progress', 'failed', 'blocked', 'retryable')
    134                  ORDER BY available_at ASC, created_at ASC, id ASC",
    135             )
    136             .map_err(|source| AppSqliteError::Query {
    137                 operation: "prepare pending sync operations query",
    138                 source,
    139             })?;
    140         let rows = statement
    141             .query_map([account_id], |row| {
    142                 Ok((
    143                     row.get::<_, String>(0)?,
    144                     row.get::<_, String>(1)?,
    145                     row.get::<_, String>(2)?,
    146                     row.get::<_, String>(3)?,
    147                     row.get::<_, String>(4)?,
    148                     row.get::<_, String>(5)?,
    149                     row.get::<_, String>(6)?,
    150                     row.get::<_, String>(7)?,
    151                     row.get::<_, u32>(8)?,
    152                     row.get::<_, String>(9)?,
    153                     row.get::<_, Option<String>>(10)?,
    154                 ))
    155             })
    156             .map_err(|source| AppSqliteError::Query {
    157                 operation: "query pending sync operations",
    158                 source,
    159             })?;
    160 
    161         rows.map(|row| {
    162             let (
    163                 operation_id,
    164                 operation_key,
    165                 aggregate_kind,
    166                 aggregate_id,
    167                 operation_kind,
    168                 payload_json,
    169                 created_at,
    170                 available_at,
    171                 attempt_count,
    172                 state,
    173                 last_error_message,
    174             ) = row.map_err(|source| AppSqliteError::Query {
    175                 operation: "read pending sync operation row",
    176                 source,
    177             })?;
    178 
    179             Ok(StoredPendingSyncOperation {
    180                 operation_id,
    181                 operation: PendingSyncOperation {
    182                     operation_key,
    183                     aggregate: parse_sync_aggregate_ref(
    184                         "local_outbox.aggregate_kind",
    185                         "local_outbox.aggregate_id",
    186                         aggregate_kind,
    187                         aggregate_id,
    188                     )?,
    189                     operation: parse_sync_operation_kind(operation_kind)?,
    190                     payload_json,
    191                     created_at,
    192                     available_at,
    193                     attempt_count,
    194                     state: parse_pending_sync_operation_state(state)?,
    195                     last_error_message,
    196                 },
    197             })
    198         })
    199         .collect()
    200     }
    201 
    202     pub fn update_pending_operation_retry(
    203         &self,
    204         account_id: &str,
    205         operation_id: &str,
    206         available_at: &str,
    207         attempt_count: u32,
    208         last_error_message: Option<&str>,
    209     ) -> Result<bool, AppSqliteError> {
    210         let updated = self
    211             .connection
    212             .execute(
    213                 "UPDATE local_outbox
    214                  SET available_at = ?3,
    215                     attempt_count = ?4,
    216                     state = 'retryable',
    217                     last_error_message = ?5
    218                  WHERE account_id = ?1 AND id = ?2",
    219                 params![
    220                     account_id,
    221                     operation_id,
    222                     available_at,
    223                     i64::from(attempt_count),
    224                     last_error_message
    225                 ],
    226             )
    227             .map_err(|source| AppSqliteError::Query {
    228                 operation: "update pending sync operation retry",
    229                 source,
    230             })?;
    231 
    232         Ok(updated == 1)
    233     }
    234 
    235     pub fn dequeue_pending_operation(
    236         &self,
    237         account_id: &str,
    238         operation_id: &str,
    239     ) -> Result<bool, AppSqliteError> {
    240         let deleted = self
    241             .connection
    242             .execute(
    243                 "DELETE FROM local_outbox WHERE account_id = ?1 AND id = ?2",
    244                 params![account_id, operation_id],
    245             )
    246             .map_err(|source| AppSqliteError::Query {
    247                 operation: "dequeue pending sync operation",
    248                 source,
    249             })?;
    250 
    251         Ok(deleted == 1)
    252     }
    253 
    254     pub fn load_checkpoint(
    255         &self,
    256         account_id: &str,
    257     ) -> Result<SyncCheckpointStatus, AppSqliteError> {
    258         let row = self
    259             .connection
    260             .query_row(
    261                 "SELECT
    262                     state,
    263                     last_sync_started_at,
    264                     last_sync_completed_at,
    265                     last_remote_cursor,
    266                     last_error_message
    267                  FROM sync_checkpoints
    268                  WHERE account_id = ?1
    269                  LIMIT 1",
    270                 [account_id],
    271                 |row| {
    272                     Ok((
    273                         row.get::<_, String>(0)?,
    274                         row.get::<_, Option<String>>(1)?,
    275                         row.get::<_, Option<String>>(2)?,
    276                         row.get::<_, Option<String>>(3)?,
    277                         row.get::<_, Option<String>>(4)?,
    278                     ))
    279                 },
    280             )
    281             .optional()
    282             .map_err(|source| AppSqliteError::Query {
    283                 operation: "load sync checkpoint",
    284                 source,
    285             })?;
    286 
    287         row.map_or_else(
    288             || Ok(SyncCheckpointStatus::never_synced()),
    289             |(
    290                 state,
    291                 last_sync_started_at,
    292                 last_sync_completed_at,
    293                 last_remote_cursor,
    294                 last_error_message,
    295             )| {
    296                 Ok(SyncCheckpointStatus {
    297                     state: parse_sync_checkpoint_state(state)?,
    298                     last_sync_started_at,
    299                     last_sync_completed_at,
    300                     last_remote_cursor,
    301                     last_error_message,
    302                 })
    303             },
    304         )
    305     }
    306 
    307     pub fn save_checkpoint(
    308         &self,
    309         account_id: &str,
    310         checkpoint: &SyncCheckpointStatus,
    311     ) -> Result<(), AppSqliteError> {
    312         self.connection
    313             .execute(
    314                 "INSERT INTO sync_checkpoints (
    315                     account_id,
    316                     state,
    317                     last_sync_started_at,
    318                     last_sync_completed_at,
    319                     last_remote_cursor,
    320                     last_error_message
    321                 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6)
    322                 ON CONFLICT(account_id) DO UPDATE SET
    323                     state = excluded.state,
    324                     last_sync_started_at = excluded.last_sync_started_at,
    325                     last_sync_completed_at = excluded.last_sync_completed_at,
    326                     last_remote_cursor = excluded.last_remote_cursor,
    327                     last_error_message = excluded.last_error_message",
    328                 params![
    329                     account_id,
    330                     sync_checkpoint_state_value(checkpoint.state),
    331                     checkpoint.last_sync_started_at,
    332                     checkpoint.last_sync_completed_at,
    333                     checkpoint.last_remote_cursor,
    334                     checkpoint.last_error_message,
    335                 ],
    336             )
    337             .map_err(|source| AppSqliteError::Query {
    338                 operation: "save sync checkpoint",
    339                 source,
    340             })?;
    341 
    342         Ok(())
    343     }
    344 
    345     pub fn load_relay_ingest_cursors(
    346         &self,
    347         scope_key: &str,
    348         relay_urls: &[String],
    349     ) -> Result<Vec<StoredRelayIngestCursor>, AppSqliteError> {
    350         relay_urls
    351             .iter()
    352             .map(|relay_url| {
    353                 let cursor_since_unix_seconds = self
    354                     .connection
    355                     .query_row(
    356                         "SELECT cursor_since_unix_seconds
    357                          FROM app_relay_ingest_freshness
    358                          WHERE scope_key = ?1 AND relay_url = ?2
    359                          LIMIT 1",
    360                         params![scope_key, relay_url.as_str()],
    361                         |row| row.get::<_, Option<i64>>(0),
    362                     )
    363                     .optional()
    364                     .map_err(|source| AppSqliteError::Query {
    365                         operation: "load relay ingest cursor",
    366                         source,
    367                     })?
    368                     .flatten();
    369 
    370                 Ok(StoredRelayIngestCursor {
    371                     relay_url: relay_url.clone(),
    372                     cursor_since_unix_seconds,
    373                 })
    374             })
    375             .collect()
    376     }
    377 
    378     pub fn load_relay_ingest_freshness(
    379         &self,
    380         scope_key: &str,
    381         relay_urls: &[String],
    382         now_unix_seconds: i64,
    383         stale_after_seconds: i64,
    384     ) -> Result<AppRelayIngestScopeFreshness, AppSqliteError> {
    385         let relays = relay_urls
    386             .iter()
    387             .map(|relay_url| {
    388                 self.load_relay_ingest_relay_freshness(
    389                     scope_key,
    390                     relay_url,
    391                     now_unix_seconds,
    392                     stale_after_seconds,
    393                 )
    394             })
    395             .collect::<Result<Vec<_>, _>>()?;
    396         let status = relay_ingest_scope_status(relays.as_slice());
    397 
    398         Ok(AppRelayIngestScopeFreshness {
    399             scope_key: scope_key.to_owned(),
    400             status,
    401             relays,
    402         })
    403     }
    404 
    405     pub fn record_relay_ingest_success(
    406         &self,
    407         scope_key: &str,
    408         relay_url: &str,
    409         cursor_since_unix_seconds: i64,
    410         last_event_created_at_unix_seconds: Option<i64>,
    411         started_at: &str,
    412         started_unix_seconds: i64,
    413         completed_at: &str,
    414         completed_unix_seconds: i64,
    415     ) -> Result<(), AppSqliteError> {
    416         self.connection
    417             .execute(
    418                 "INSERT INTO app_relay_ingest_freshness (
    419                     scope_key,
    420                     relay_url,
    421                     state,
    422                     cursor_since_unix_seconds,
    423                     last_event_created_at_unix_seconds,
    424                     last_fetch_started_at,
    425                     last_fetch_started_unix_seconds,
    426                     last_fetch_completed_at,
    427                     last_fetch_completed_unix_seconds,
    428                     last_success_at,
    429                     last_success_unix_seconds,
    430                     last_error_message,
    431                     updated_at
    432                 ) VALUES (?1, ?2, 'fresh', ?3, ?4, ?5, ?6, ?7, ?8, ?7, ?8, NULL, ?7)
    433                 ON CONFLICT(scope_key, relay_url) DO UPDATE SET
    434                     state = 'fresh',
    435                     cursor_since_unix_seconds = excluded.cursor_since_unix_seconds,
    436                     last_event_created_at_unix_seconds = excluded.last_event_created_at_unix_seconds,
    437                     last_fetch_started_at = excluded.last_fetch_started_at,
    438                     last_fetch_started_unix_seconds = excluded.last_fetch_started_unix_seconds,
    439                     last_fetch_completed_at = excluded.last_fetch_completed_at,
    440                     last_fetch_completed_unix_seconds = excluded.last_fetch_completed_unix_seconds,
    441                     last_success_at = excluded.last_success_at,
    442                     last_success_unix_seconds = excluded.last_success_unix_seconds,
    443                     last_error_message = NULL,
    444                     updated_at = excluded.updated_at",
    445                 params![
    446                     scope_key,
    447                     relay_url,
    448                     cursor_since_unix_seconds,
    449                     last_event_created_at_unix_seconds,
    450                     started_at,
    451                     started_unix_seconds,
    452                     completed_at,
    453                     completed_unix_seconds,
    454                 ],
    455             )
    456             .map_err(|source| AppSqliteError::Query {
    457                 operation: "record relay ingest success",
    458                 source,
    459             })?;
    460 
    461         Ok(())
    462     }
    463 
    464     pub fn record_relay_ingest_failure(
    465         &self,
    466         scope_key: &str,
    467         relay_url: &str,
    468         started_at: &str,
    469         started_unix_seconds: i64,
    470         completed_at: &str,
    471         completed_unix_seconds: i64,
    472         error_message: &str,
    473     ) -> Result<(), AppSqliteError> {
    474         self.connection
    475             .execute(
    476                 "INSERT INTO app_relay_ingest_freshness (
    477                     scope_key,
    478                     relay_url,
    479                     state,
    480                     cursor_since_unix_seconds,
    481                     last_event_created_at_unix_seconds,
    482                     last_fetch_started_at,
    483                     last_fetch_started_unix_seconds,
    484                     last_fetch_completed_at,
    485                     last_fetch_completed_unix_seconds,
    486                     last_success_at,
    487                     last_success_unix_seconds,
    488                     last_error_message,
    489                     updated_at
    490                 ) VALUES (?1, ?2, 'failed', NULL, NULL, ?3, ?4, ?5, ?6, NULL, NULL, ?7, ?5)
    491                 ON CONFLICT(scope_key, relay_url) DO UPDATE SET
    492                     state = 'failed',
    493                     last_fetch_started_at = excluded.last_fetch_started_at,
    494                     last_fetch_started_unix_seconds = excluded.last_fetch_started_unix_seconds,
    495                     last_fetch_completed_at = excluded.last_fetch_completed_at,
    496                     last_fetch_completed_unix_seconds = excluded.last_fetch_completed_unix_seconds,
    497                     last_error_message = excluded.last_error_message,
    498                     updated_at = excluded.updated_at",
    499                 params![
    500                     scope_key,
    501                     relay_url,
    502                     started_at,
    503                     started_unix_seconds,
    504                     completed_at,
    505                     completed_unix_seconds,
    506                     error_message,
    507                 ],
    508             )
    509             .map_err(|source| AppSqliteError::Query {
    510                 operation: "record relay ingest failure",
    511                 source,
    512             })?;
    513 
    514         Ok(())
    515     }
    516 
    517     fn load_relay_ingest_relay_freshness(
    518         &self,
    519         scope_key: &str,
    520         relay_url: &str,
    521         now_unix_seconds: i64,
    522         stale_after_seconds: i64,
    523     ) -> Result<AppRelayIngestRelayFreshness, AppSqliteError> {
    524         let row = self
    525             .connection
    526             .query_row(
    527                 "SELECT
    528                     state,
    529                     cursor_since_unix_seconds,
    530                     last_event_created_at_unix_seconds,
    531                     last_fetch_started_at,
    532                     last_fetch_completed_at,
    533                     last_fetch_completed_unix_seconds,
    534                     last_success_at,
    535                     last_error_message
    536                  FROM app_relay_ingest_freshness
    537                  WHERE scope_key = ?1 AND relay_url = ?2
    538                  LIMIT 1",
    539                 params![scope_key, relay_url],
    540                 |row| {
    541                     Ok((
    542                         row.get::<_, String>(0)?,
    543                         row.get::<_, Option<i64>>(1)?,
    544                         row.get::<_, Option<i64>>(2)?,
    545                         row.get::<_, Option<String>>(3)?,
    546                         row.get::<_, Option<String>>(4)?,
    547                         row.get::<_, Option<i64>>(5)?,
    548                         row.get::<_, Option<String>>(6)?,
    549                         row.get::<_, Option<String>>(7)?,
    550                     ))
    551                 },
    552             )
    553             .optional()
    554             .map_err(|source| AppSqliteError::Query {
    555                 operation: "load relay ingest freshness",
    556                 source,
    557             })?;
    558 
    559         row.map_or_else(
    560             || {
    561                 Ok(AppRelayIngestRelayFreshness {
    562                     relay_url: relay_url.to_owned(),
    563                     state: AppRelayIngestFreshnessState::Stale,
    564                     cursor_since_unix_seconds: None,
    565                     last_event_created_at_unix_seconds: None,
    566                     last_fetch_started_at: None,
    567                     last_fetch_completed_at: None,
    568                     last_success_at: None,
    569                     last_error_message: None,
    570                 })
    571             },
    572             |(
    573                 state,
    574                 cursor_since_unix_seconds,
    575                 last_event_created_at_unix_seconds,
    576                 last_fetch_started_at,
    577                 last_fetch_completed_at,
    578                 last_fetch_completed_unix_seconds,
    579                 last_success_at,
    580                 last_error_message,
    581             )| {
    582                 let mut state = parse_relay_ingest_freshness_state(state)?;
    583                 if state == AppRelayIngestFreshnessState::Fresh
    584                     && relay_ingest_is_stale(
    585                         last_fetch_completed_unix_seconds,
    586                         now_unix_seconds,
    587                         stale_after_seconds,
    588                     )
    589                 {
    590                     state = AppRelayIngestFreshnessState::Stale;
    591                 }
    592                 Ok(AppRelayIngestRelayFreshness {
    593                     relay_url: relay_url.to_owned(),
    594                     state,
    595                     cursor_since_unix_seconds,
    596                     last_event_created_at_unix_seconds,
    597                     last_fetch_started_at,
    598                     last_fetch_completed_at,
    599                     last_success_at,
    600                     last_error_message,
    601                 })
    602             },
    603         )
    604     }
    605 
    606     pub fn record_conflict(
    607         &self,
    608         account_id: &str,
    609         conflict: &SyncConflict,
    610     ) -> Result<String, AppSqliteError> {
    611         let conflict_id = Uuid::now_v7().to_string();
    612 
    613         self.connection
    614             .execute(
    615                 "INSERT INTO local_conflicts (
    616                     id,
    617                     account_id,
    618                     aggregate_kind,
    619                     aggregate_id,
    620                     conflict_kind,
    621                     severity,
    622                     resolution_status,
    623                     local_payload_json,
    624                     remote_payload_json,
    625                     detected_at,
    626                     resolved_at
    627                 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
    628                 params![
    629                     conflict_id,
    630                     account_id,
    631                     conflict.aggregate.aggregate_kind(),
    632                     aggregate_id_value(&conflict.aggregate),
    633                     conflict.kind.storage_key(),
    634                     sync_conflict_severity_value(conflict.severity),
    635                     sync_conflict_resolution_status_value(conflict.resolution),
    636                     conflict.local_payload_json,
    637                     conflict.remote_payload_json,
    638                     conflict.detected_at,
    639                     conflict.resolved_at,
    640                 ],
    641             )
    642             .map_err(|source| AppSqliteError::Query {
    643                 operation: "record sync conflict",
    644                 source,
    645             })?;
    646 
    647         Ok(conflict_id)
    648     }
    649 
    650     pub fn replace_conflicts(
    651         &self,
    652         account_id: &str,
    653         conflicts: &[SyncConflict],
    654     ) -> Result<(), AppSqliteError> {
    655         self.connection
    656             .execute(
    657                 "DELETE FROM local_conflicts WHERE account_id = ?1",
    658                 [account_id],
    659             )
    660             .map_err(|source| AppSqliteError::Query {
    661                 operation: "clear sync conflicts",
    662                 source,
    663             })?;
    664 
    665         for conflict in conflicts {
    666             let _ = self.record_conflict(account_id, conflict)?;
    667         }
    668 
    669         Ok(())
    670     }
    671 
    672     pub fn load_conflicts(
    673         &self,
    674         account_id: &str,
    675     ) -> Result<Vec<StoredSyncConflict>, AppSqliteError> {
    676         let mut statement = self
    677             .connection
    678             .prepare(
    679                 "SELECT
    680                     id,
    681                     aggregate_kind,
    682                     aggregate_id,
    683                     conflict_kind,
    684                     severity,
    685                     resolution_status,
    686                     local_payload_json,
    687                     remote_payload_json,
    688                     detected_at,
    689                     resolved_at
    690                  FROM local_conflicts
    691                  WHERE account_id = ?1
    692                  ORDER BY detected_at DESC, id DESC",
    693             )
    694             .map_err(|source| AppSqliteError::Query {
    695                 operation: "prepare sync conflicts query",
    696                 source,
    697             })?;
    698         let rows = statement
    699             .query_map([account_id], |row| {
    700                 Ok((
    701                     row.get::<_, String>(0)?,
    702                     row.get::<_, String>(1)?,
    703                     row.get::<_, String>(2)?,
    704                     row.get::<_, String>(3)?,
    705                     row.get::<_, String>(4)?,
    706                     row.get::<_, String>(5)?,
    707                     row.get::<_, String>(6)?,
    708                     row.get::<_, Option<String>>(7)?,
    709                     row.get::<_, String>(8)?,
    710                     row.get::<_, Option<String>>(9)?,
    711                 ))
    712             })
    713             .map_err(|source| AppSqliteError::Query {
    714                 operation: "query sync conflicts",
    715                 source,
    716             })?;
    717 
    718         rows.map(|row| {
    719             let (
    720                 conflict_id,
    721                 aggregate_kind,
    722                 aggregate_id,
    723                 conflict_kind,
    724                 severity,
    725                 resolution_status,
    726                 local_payload_json,
    727                 remote_payload_json,
    728                 detected_at,
    729                 resolved_at,
    730             ) = row.map_err(|source| AppSqliteError::Query {
    731                 operation: "read sync conflict row",
    732                 source,
    733             })?;
    734 
    735             Ok(StoredSyncConflict {
    736                 conflict_id,
    737                 conflict: SyncConflict {
    738                     aggregate: parse_sync_aggregate_ref(
    739                         "local_conflicts.aggregate_kind",
    740                         "local_conflicts.aggregate_id",
    741                         aggregate_kind,
    742                         aggregate_id,
    743                     )?,
    744                     kind: parse_sync_conflict_kind(conflict_kind)?,
    745                     severity: parse_sync_conflict_severity(severity)?,
    746                     resolution: parse_sync_conflict_resolution_status(resolution_status)?,
    747                     local_payload_json,
    748                     remote_payload_json,
    749                     detected_at,
    750                     resolved_at,
    751                 },
    752             })
    753         })
    754         .collect()
    755     }
    756 
    757     pub fn resolve_conflict(
    758         &self,
    759         account_id: &str,
    760         conflict_id: &str,
    761         resolution: SyncConflictResolutionStatus,
    762         resolved_at: &str,
    763     ) -> Result<bool, AppSqliteError> {
    764         if resolution == SyncConflictResolutionStatus::Unresolved {
    765             return Err(AppSqliteError::InvalidProjection {
    766                 reason: "sync conflict resolution must be terminal",
    767             });
    768         }
    769 
    770         let updated = self
    771             .connection
    772             .execute(
    773                 "UPDATE local_conflicts
    774                  SET resolution_status = ?3, resolved_at = ?4
    775                  WHERE account_id = ?1 AND id = ?2",
    776                 params![
    777                     account_id,
    778                     conflict_id,
    779                     sync_conflict_resolution_status_value(resolution),
    780                     resolved_at,
    781                 ],
    782             )
    783             .map_err(|source| AppSqliteError::Query {
    784                 operation: "resolve sync conflict",
    785                 source,
    786             })?;
    787 
    788         Ok(updated == 1)
    789     }
    790 }
    791 
    792 fn aggregate_id_value(aggregate: &SyncAggregateRef) -> String {
    793     match aggregate {
    794         SyncAggregateRef::Farm(farm_id) => farm_id.to_string(),
    795         SyncAggregateRef::FulfillmentWindow(fulfillment_window_id) => {
    796             fulfillment_window_id.to_string()
    797         }
    798         SyncAggregateRef::Product(product_id) => product_id.to_string(),
    799         SyncAggregateRef::Order(order_id) => order_id.to_string(),
    800     }
    801 }
    802 
    803 fn parse_sync_aggregate_ref(
    804     aggregate_kind_field: &'static str,
    805     aggregate_id_field: &'static str,
    806     aggregate_kind: String,
    807     aggregate_id: String,
    808 ) -> Result<SyncAggregateRef, AppSqliteError> {
    809     match aggregate_kind.as_str() {
    810         "farm" => Ok(SyncAggregateRef::Farm(
    811             aggregate_id
    812                 .parse::<FarmId>()
    813                 .map_err(|_| AppSqliteError::DecodeId {
    814                     field: aggregate_id_field,
    815                     value: aggregate_id,
    816                 })?,
    817         )),
    818         "fulfillment_window" => Ok(SyncAggregateRef::FulfillmentWindow(
    819             aggregate_id
    820                 .parse::<FulfillmentWindowId>()
    821                 .map_err(|_| AppSqliteError::DecodeId {
    822                     field: aggregate_id_field,
    823                     value: aggregate_id,
    824                 })?,
    825         )),
    826         "product" => Ok(SyncAggregateRef::Product(
    827             aggregate_id
    828                 .parse::<ProductId>()
    829                 .map_err(|_| AppSqliteError::DecodeId {
    830                     field: aggregate_id_field,
    831                     value: aggregate_id,
    832                 })?,
    833         )),
    834         "order" => Ok(SyncAggregateRef::Order(
    835             aggregate_id
    836                 .parse::<OrderId>()
    837                 .map_err(|_| AppSqliteError::DecodeId {
    838                     field: aggregate_id_field,
    839                     value: aggregate_id,
    840                 })?,
    841         )),
    842         _ => Err(AppSqliteError::DecodeEnum {
    843             field: aggregate_kind_field,
    844             value: aggregate_kind,
    845         }),
    846     }
    847 }
    848 
    849 fn parse_sync_operation_kind(value: String) -> Result<SyncOperationKind, AppSqliteError> {
    850     match value.as_str() {
    851         "upsert" => Ok(SyncOperationKind::Upsert),
    852         "delete" => Ok(SyncOperationKind::Delete),
    853         _ => Err(AppSqliteError::DecodeEnum {
    854             field: "local_outbox.operation_kind",
    855             value,
    856         }),
    857     }
    858 }
    859 
    860 fn parse_pending_sync_operation_state(
    861     value: String,
    862 ) -> Result<PendingSyncOperationState, AppSqliteError> {
    863     match value.as_str() {
    864         "pending" => Ok(PendingSyncOperationState::Pending),
    865         "in_progress" => Ok(PendingSyncOperationState::InProgress),
    866         "succeeded" => Ok(PendingSyncOperationState::Succeeded),
    867         "failed" => Ok(PendingSyncOperationState::Failed),
    868         "blocked" => Ok(PendingSyncOperationState::Blocked),
    869         "retryable" => Ok(PendingSyncOperationState::Retryable),
    870         _ => Err(AppSqliteError::DecodeEnum {
    871             field: "local_outbox.state",
    872             value,
    873         }),
    874     }
    875 }
    876 
    877 fn parse_sync_conflict_kind(value: String) -> Result<SyncConflictKind, AppSqliteError> {
    878     match value.as_str() {
    879         "revision_mismatch" => Ok(SyncConflictKind::RevisionMismatch),
    880         "remote_delete" => Ok(SyncConflictKind::RemoteDelete),
    881         "remote_validation_reject" => Ok(SyncConflictKind::RemoteValidationReject),
    882         _ => Err(AppSqliteError::DecodeEnum {
    883             field: "local_conflicts.conflict_kind",
    884             value,
    885         }),
    886     }
    887 }
    888 
    889 fn parse_sync_conflict_severity(value: String) -> Result<SyncConflictSeverity, AppSqliteError> {
    890     match value.as_str() {
    891         "review_required" => Ok(SyncConflictSeverity::ReviewRequired),
    892         "blocking" => Ok(SyncConflictSeverity::Blocking),
    893         _ => Err(AppSqliteError::DecodeEnum {
    894             field: "local_conflicts.severity",
    895             value,
    896         }),
    897     }
    898 }
    899 
    900 fn parse_sync_conflict_resolution_status(
    901     value: String,
    902 ) -> Result<SyncConflictResolutionStatus, AppSqliteError> {
    903     match value.as_str() {
    904         "unresolved" => Ok(SyncConflictResolutionStatus::Unresolved),
    905         "accepted_local" => Ok(SyncConflictResolutionStatus::AcceptedLocal),
    906         "accepted_remote" => Ok(SyncConflictResolutionStatus::AcceptedRemote),
    907         "dismissed" => Ok(SyncConflictResolutionStatus::Dismissed),
    908         _ => Err(AppSqliteError::DecodeEnum {
    909             field: "local_conflicts.resolution_status",
    910             value,
    911         }),
    912     }
    913 }
    914 
    915 fn parse_sync_checkpoint_state(value: String) -> Result<SyncCheckpointState, AppSqliteError> {
    916     match value.as_str() {
    917         "never_synced" => Ok(SyncCheckpointState::NeverSynced),
    918         "syncing" => Ok(SyncCheckpointState::Syncing),
    919         "current" => Ok(SyncCheckpointState::Current),
    920         "failed" => Ok(SyncCheckpointState::Failed),
    921         _ => Err(AppSqliteError::DecodeEnum {
    922             field: "sync_checkpoints.state",
    923             value,
    924         }),
    925     }
    926 }
    927 
    928 fn sync_checkpoint_state_value(state: SyncCheckpointState) -> &'static str {
    929     match state {
    930         SyncCheckpointState::NeverSynced => "never_synced",
    931         SyncCheckpointState::Syncing => "syncing",
    932         SyncCheckpointState::Current => "current",
    933         SyncCheckpointState::Failed => "failed",
    934     }
    935 }
    936 
    937 fn sync_conflict_severity_value(severity: SyncConflictSeverity) -> &'static str {
    938     match severity {
    939         SyncConflictSeverity::ReviewRequired => "review_required",
    940         SyncConflictSeverity::Blocking => "blocking",
    941     }
    942 }
    943 
    944 fn sync_conflict_resolution_status_value(resolution: SyncConflictResolutionStatus) -> &'static str {
    945     match resolution {
    946         SyncConflictResolutionStatus::Unresolved => "unresolved",
    947         SyncConflictResolutionStatus::AcceptedLocal => "accepted_local",
    948         SyncConflictResolutionStatus::AcceptedRemote => "accepted_remote",
    949         SyncConflictResolutionStatus::Dismissed => "dismissed",
    950     }
    951 }
    952 
    953 fn parse_relay_ingest_freshness_state(
    954     value: String,
    955 ) -> Result<AppRelayIngestFreshnessState, AppSqliteError> {
    956     match value.as_str() {
    957         "fresh" => Ok(AppRelayIngestFreshnessState::Fresh),
    958         "stale" => Ok(AppRelayIngestFreshnessState::Stale),
    959         "failed" => Ok(AppRelayIngestFreshnessState::Failed),
    960         _ => Err(AppSqliteError::DecodeEnum {
    961             field: "app_relay_ingest_freshness.state",
    962             value,
    963         }),
    964     }
    965 }
    966 
    967 fn relay_ingest_is_stale(
    968     last_fetch_completed_unix_seconds: Option<i64>,
    969     now_unix_seconds: i64,
    970     stale_after_seconds: i64,
    971 ) -> bool {
    972     let Some(last_fetch_completed_unix_seconds) = last_fetch_completed_unix_seconds else {
    973         return true;
    974     };
    975     now_unix_seconds.saturating_sub(last_fetch_completed_unix_seconds) > stale_after_seconds
    976 }
    977 
    978 fn relay_ingest_scope_status(relays: &[AppRelayIngestRelayFreshness]) -> AppRelayIngestScopeStatus {
    979     if relays.is_empty() {
    980         return AppRelayIngestScopeStatus::Stale;
    981     }
    982     let failed_count = relays
    983         .iter()
    984         .filter(|relay| relay.state == AppRelayIngestFreshnessState::Failed)
    985         .count();
    986     if failed_count == relays.len() {
    987         return AppRelayIngestScopeStatus::Failed;
    988     }
    989     if failed_count > 0 {
    990         return AppRelayIngestScopeStatus::Partial;
    991     }
    992     if relays
    993         .iter()
    994         .all(|relay| relay.state == AppRelayIngestFreshnessState::Fresh)
    995     {
    996         AppRelayIngestScopeStatus::Fresh
    997     } else {
    998         AppRelayIngestScopeStatus::Stale
    999     }
   1000 }
   1001 
   1002 #[cfg(test)]
   1003 mod tests {
   1004     use radroots_app_sync::{
   1005         AppRelayIngestFreshnessState, AppRelayIngestScopeStatus, PendingSyncOperation,
   1006         PendingSyncOperationState, SyncAggregateRef, SyncCheckpointStatus, SyncConflict,
   1007         SyncConflictKind, SyncConflictResolutionStatus, SyncConflictSeverity, SyncOperationKind,
   1008     };
   1009     use radroots_app_view::{FarmId, ProductId};
   1010 
   1011     use crate::{AppSqliteStore, DatabaseTarget};
   1012 
   1013     #[test]
   1014     fn checkpoints_are_selected_account_scoped() {
   1015         let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("store should open");
   1016         let repository = store.sync_repository();
   1017         let checkpoint =
   1018             SyncCheckpointStatus::syncing("2026-04-20T18:00:00Z", Some("cursor-1".to_owned()));
   1019 
   1020         assert_eq!(
   1021             repository
   1022                 .load_checkpoint("acct_a")
   1023                 .expect("missing checkpoint should load"),
   1024             SyncCheckpointStatus::never_synced()
   1025         );
   1026 
   1027         repository
   1028             .save_checkpoint("acct_a", &checkpoint)
   1029             .expect("checkpoint should save");
   1030 
   1031         assert_eq!(
   1032             repository
   1033                 .load_checkpoint("acct_a")
   1034                 .expect("saved checkpoint should load"),
   1035             checkpoint
   1036         );
   1037         assert_eq!(
   1038             repository
   1039                 .load_checkpoint("acct_b")
   1040                 .expect("other account checkpoint should load"),
   1041             SyncCheckpointStatus::never_synced()
   1042         );
   1043     }
   1044 
   1045     #[test]
   1046     fn relay_ingest_freshness_tracks_cursors_and_scope_status() {
   1047         let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("store should open");
   1048         let repository = store.sync_repository();
   1049         let relay_urls = vec![
   1050             "wss://relay-a.example".to_owned(),
   1051             "wss://relay-b.example".to_owned(),
   1052         ];
   1053 
   1054         let initial = repository
   1055             .load_relay_ingest_freshness("direct_relay_ingest", &relay_urls, 1_000, 60)
   1056             .expect("freshness should load");
   1057         assert_eq!(initial.status, AppRelayIngestScopeStatus::Stale);
   1058         assert_eq!(initial.relays.len(), 2);
   1059         assert!(
   1060             initial
   1061                 .relays
   1062                 .iter()
   1063                 .all(|relay| relay.state == AppRelayIngestFreshnessState::Stale)
   1064         );
   1065 
   1066         repository
   1067             .record_relay_ingest_success(
   1068                 "direct_relay_ingest",
   1069                 "wss://relay-a.example",
   1070                 1_010,
   1071                 Some(1_009),
   1072                 "2026-05-25T20:00:00Z",
   1073                 1_000,
   1074                 "2026-05-25T20:00:02Z",
   1075                 1_002,
   1076             )
   1077             .expect("success should record");
   1078         repository
   1079             .record_relay_ingest_failure(
   1080                 "direct_relay_ingest",
   1081                 "wss://relay-b.example",
   1082                 "2026-05-25T20:00:00Z",
   1083                 1_000,
   1084                 "2026-05-25T20:00:02Z",
   1085                 1_002,
   1086                 "relay timeout",
   1087             )
   1088             .expect("failure should record");
   1089 
   1090         let cursors = repository
   1091             .load_relay_ingest_cursors("direct_relay_ingest", &relay_urls)
   1092             .expect("cursors should load");
   1093         assert_eq!(cursors[0].cursor_since_unix_seconds, Some(1_010));
   1094         assert_eq!(cursors[1].cursor_since_unix_seconds, None);
   1095 
   1096         let partial = repository
   1097             .load_relay_ingest_freshness("direct_relay_ingest", &relay_urls, 1_005, 60)
   1098             .expect("partial freshness should load");
   1099         assert_eq!(partial.status, AppRelayIngestScopeStatus::Partial);
   1100         assert_eq!(partial.relays[0].state, AppRelayIngestFreshnessState::Fresh);
   1101         assert_eq!(
   1102             partial.relays[1].state,
   1103             AppRelayIngestFreshnessState::Failed
   1104         );
   1105         assert_eq!(
   1106             partial.relays[1].last_error_message.as_deref(),
   1107             Some("relay timeout")
   1108         );
   1109 
   1110         let stale = repository
   1111             .load_relay_ingest_freshness(
   1112                 "direct_relay_ingest",
   1113                 &["wss://relay-a.example".to_owned()],
   1114                 1_100,
   1115                 60,
   1116             )
   1117             .expect("stale freshness should load");
   1118         assert_eq!(stale.status, AppRelayIngestScopeStatus::Stale);
   1119         assert_eq!(stale.relays[0].state, AppRelayIngestFreshnessState::Stale);
   1120         assert_eq!(stale.relays[0].cursor_since_unix_seconds, Some(1_010));
   1121     }
   1122 
   1123     #[test]
   1124     fn pending_operations_are_account_scoped_and_retryable() {
   1125         let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("store should open");
   1126         let repository = store.sync_repository();
   1127         let first = PendingSyncOperation::new(
   1128             SyncAggregateRef::Farm(FarmId::new()),
   1129             SyncOperationKind::Upsert,
   1130             "{\"farm\":\"a\"}",
   1131             "2026-04-20T18:00:00Z",
   1132         );
   1133         let second = PendingSyncOperation::new(
   1134             SyncAggregateRef::Product(ProductId::new()),
   1135             SyncOperationKind::Delete,
   1136             "{\"product\":\"b\"}",
   1137             "2026-04-20T18:05:00Z",
   1138         );
   1139 
   1140         let first_id = repository
   1141             .enqueue_pending_operation("acct_a", &first)
   1142             .expect("first operation should save");
   1143         let second_id = repository
   1144             .enqueue_pending_operation("acct_a", &second)
   1145             .expect("second operation should save");
   1146         repository
   1147             .enqueue_pending_operation("acct_b", &first)
   1148             .expect("other account operation should save");
   1149 
   1150         let before_retry = repository
   1151             .load_pending_operations("acct_a")
   1152             .expect("pending operations should load");
   1153         assert_eq!(before_retry.len(), 2);
   1154         assert_eq!(before_retry[0].operation, first);
   1155         assert_eq!(before_retry[1].operation, second);
   1156 
   1157         assert!(
   1158             repository
   1159                 .update_pending_operation_retry(
   1160                     "acct_a",
   1161                     &first_id,
   1162                     "2026-04-20T18:10:00Z",
   1163                     2,
   1164                     Some("relay timeout"),
   1165                 )
   1166                 .expect("retry update should succeed")
   1167         );
   1168         assert!(
   1169             !repository
   1170                 .update_pending_operation_retry(
   1171                     "acct_b",
   1172                     &first_id,
   1173                     "2026-04-20T18:10:00Z",
   1174                     3,
   1175                     Some("wrong account"),
   1176                 )
   1177                 .expect("wrong-account retry update should not succeed")
   1178         );
   1179         assert!(
   1180             repository
   1181                 .dequeue_pending_operation("acct_a", &second_id)
   1182                 .expect("dequeue should succeed")
   1183         );
   1184 
   1185         let acct_a = repository
   1186             .load_pending_operations("acct_a")
   1187             .expect("account operations should reload");
   1188         let acct_b = repository
   1189             .load_pending_operations("acct_b")
   1190             .expect("other account operations should reload");
   1191 
   1192         assert_eq!(acct_a.len(), 1);
   1193         assert_eq!(acct_a[0].operation_id, first_id);
   1194         assert_eq!(acct_a[0].operation.attempt_count, 2);
   1195         assert_eq!(
   1196             acct_a[0].operation.state,
   1197             PendingSyncOperationState::Retryable
   1198         );
   1199         assert_eq!(
   1200             acct_a[0].operation.available_at,
   1201             "2026-04-20T18:10:00Z".to_owned()
   1202         );
   1203         assert_eq!(
   1204             acct_a[0].operation.last_error_message.as_deref(),
   1205             Some("relay timeout")
   1206         );
   1207         assert_eq!(acct_b.len(), 1);
   1208     }
   1209 
   1210     #[test]
   1211     fn outbox_enqueue_upserts_active_operation_by_deterministic_key() {
   1212         let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("store should open");
   1213         let repository = store.sync_repository();
   1214         let product_id = ProductId::new();
   1215         let first = PendingSyncOperation::new(
   1216             SyncAggregateRef::Product(product_id),
   1217             SyncOperationKind::Upsert,
   1218             "{\"title\":\"greens\"}",
   1219             "2026-04-20T18:00:00Z",
   1220         );
   1221         let mut replacement = PendingSyncOperation::new(
   1222             SyncAggregateRef::Product(product_id),
   1223             SyncOperationKind::Upsert,
   1224             "{\"title\":\"winter greens\"}",
   1225             "2026-04-20T18:05:00Z",
   1226         );
   1227         replacement.attempt_count = 3;
   1228         replacement.state = PendingSyncOperationState::Failed;
   1229         replacement.last_error_message = Some("stale relay state".to_owned());
   1230 
   1231         let first_id = repository
   1232             .enqueue_pending_operation("acct_a", &first)
   1233             .expect("first operation should save");
   1234         let replacement_id = repository
   1235             .enqueue_pending_operation("acct_a", &replacement)
   1236             .expect("replacement operation should upsert");
   1237 
   1238         let pending = repository
   1239             .load_pending_operations("acct_a")
   1240             .expect("pending operations should load");
   1241 
   1242         assert_eq!(replacement_id, first_id);
   1243         assert_eq!(pending.len(), 1);
   1244         assert_eq!(pending[0].operation_id, first_id);
   1245         assert_eq!(pending[0].operation.operation_key, first.operation_key);
   1246         assert_eq!(
   1247             pending[0].operation.payload_json,
   1248             "{\"title\":\"winter greens\"}"
   1249         );
   1250         assert_eq!(pending[0].operation.attempt_count, 0);
   1251         assert_eq!(
   1252             pending[0].operation.state,
   1253             PendingSyncOperationState::Pending
   1254         );
   1255         assert_eq!(pending[0].operation.last_error_message, None);
   1256     }
   1257 
   1258     #[test]
   1259     fn conflicts_are_account_scoped_and_resolvable() {
   1260         let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("store should open");
   1261         let repository = store.sync_repository();
   1262         let first = SyncConflict {
   1263             aggregate: SyncAggregateRef::Farm(FarmId::new()),
   1264             kind: SyncConflictKind::RevisionMismatch,
   1265             severity: SyncConflictSeverity::Blocking,
   1266             resolution: SyncConflictResolutionStatus::Unresolved,
   1267             local_payload_json: "{\"farm\":\"local\"}".to_owned(),
   1268             remote_payload_json: Some("{\"farm\":\"remote\"}".to_owned()),
   1269             detected_at: "2026-04-20T18:00:00Z".to_owned(),
   1270             resolved_at: None,
   1271         };
   1272         let second = SyncConflict {
   1273             aggregate: SyncAggregateRef::Product(ProductId::new()),
   1274             kind: SyncConflictKind::RemoteValidationReject,
   1275             severity: SyncConflictSeverity::ReviewRequired,
   1276             resolution: SyncConflictResolutionStatus::Unresolved,
   1277             local_payload_json: "{\"product\":\"local\"}".to_owned(),
   1278             remote_payload_json: None,
   1279             detected_at: "2026-04-20T18:05:00Z".to_owned(),
   1280             resolved_at: None,
   1281         };
   1282 
   1283         let first_id = repository
   1284             .record_conflict("acct_a", &first)
   1285             .expect("first conflict should save");
   1286         repository
   1287             .record_conflict("acct_b", &second)
   1288             .expect("other account conflict should save");
   1289 
   1290         assert!(
   1291             repository
   1292                 .resolve_conflict(
   1293                     "acct_a",
   1294                     &first_id,
   1295                     SyncConflictResolutionStatus::AcceptedLocal,
   1296                     "2026-04-20T18:06:00Z",
   1297                 )
   1298                 .expect("conflict resolution should succeed")
   1299         );
   1300         assert!(
   1301             !repository
   1302                 .resolve_conflict(
   1303                     "acct_b",
   1304                     &first_id,
   1305                     SyncConflictResolutionStatus::AcceptedRemote,
   1306                     "2026-04-20T18:07:00Z",
   1307                 )
   1308                 .expect("wrong-account resolution should not succeed")
   1309         );
   1310 
   1311         let acct_a = repository
   1312             .load_conflicts("acct_a")
   1313             .expect("account conflicts should load");
   1314         let acct_b = repository
   1315             .load_conflicts("acct_b")
   1316             .expect("other account conflicts should load");
   1317 
   1318         assert_eq!(acct_a.len(), 1);
   1319         assert_eq!(acct_a[0].conflict_id, first_id);
   1320         assert_eq!(
   1321             acct_a[0].conflict.resolution,
   1322             SyncConflictResolutionStatus::AcceptedLocal
   1323         );
   1324         assert_eq!(
   1325             acct_a[0].conflict.resolved_at.as_deref(),
   1326             Some("2026-04-20T18:06:00Z")
   1327         );
   1328         assert_eq!(acct_b.len(), 1);
   1329         assert_eq!(acct_b[0].conflict, second);
   1330     }
   1331 
   1332     #[test]
   1333     fn replacing_conflicts_clears_stale_rows_for_the_selected_account() {
   1334         let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("store should open");
   1335         let repository = store.sync_repository();
   1336         let first = SyncConflict {
   1337             aggregate: SyncAggregateRef::Farm(FarmId::new()),
   1338             kind: SyncConflictKind::RevisionMismatch,
   1339             severity: SyncConflictSeverity::Blocking,
   1340             resolution: SyncConflictResolutionStatus::Unresolved,
   1341             local_payload_json: "{\"farm\":\"local\"}".to_owned(),
   1342             remote_payload_json: Some("{\"farm\":\"remote\"}".to_owned()),
   1343             detected_at: "2026-04-20T18:00:00Z".to_owned(),
   1344             resolved_at: None,
   1345         };
   1346         let second = SyncConflict {
   1347             aggregate: SyncAggregateRef::Product(ProductId::new()),
   1348             kind: SyncConflictKind::RemoteValidationReject,
   1349             severity: SyncConflictSeverity::ReviewRequired,
   1350             resolution: SyncConflictResolutionStatus::Unresolved,
   1351             local_payload_json: "{\"product\":\"local\"}".to_owned(),
   1352             remote_payload_json: None,
   1353             detected_at: "2026-04-20T18:05:00Z".to_owned(),
   1354             resolved_at: None,
   1355         };
   1356 
   1357         repository
   1358             .record_conflict("acct_a", &first)
   1359             .expect("first conflict should save");
   1360         repository
   1361             .record_conflict("acct_b", &first)
   1362             .expect("other account conflict should save");
   1363 
   1364         repository
   1365             .replace_conflicts("acct_a", std::slice::from_ref(&second))
   1366             .expect("conflicts should replace");
   1367 
   1368         let acct_a = repository
   1369             .load_conflicts("acct_a")
   1370             .expect("account conflicts should load");
   1371         let acct_b = repository
   1372             .load_conflicts("acct_b")
   1373             .expect("other account conflicts should load");
   1374 
   1375         assert_eq!(acct_a.len(), 1);
   1376         assert_eq!(acct_a[0].conflict, second);
   1377         assert_eq!(acct_b.len(), 1);
   1378         assert_eq!(acct_b[0].conflict, first);
   1379     }
   1380 }