commit d1650f68f88e875d9869ca1173de711d2fc69ae9
parent 4ced50b21d95b345fa923d0871516f30ad2b9519
Author: triesap <tyson@radroots.org>
Date: Mon, 20 Apr 2026 20:31:59 +0000
sqlite: align selected-account sync persistence
Diffstat:
7 files changed, 1029 insertions(+), 3 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -5104,8 +5104,10 @@ name = "radroots_app_sqlite"
version = "0.1.0"
dependencies = [
"radroots_app_models",
+ "radroots_app_sync",
"rusqlite",
"thiserror 2.0.18",
+ "uuid",
]
[[package]]
diff --git a/crates/shared/sqlite/Cargo.toml b/crates/shared/sqlite/Cargo.toml
@@ -9,8 +9,10 @@ publish = false
[dependencies]
radroots_app_models.workspace = true
+radroots_app_sync.workspace = true
rusqlite.workspace = true
thiserror.workspace = true
+uuid.workspace = true
[lints]
workspace = true
diff --git a/crates/shared/sqlite/migrations/0010_sync_contract_alignment.sql b/crates/shared/sqlite/migrations/0010_sync_contract_alignment.sql
@@ -0,0 +1,82 @@
+ALTER TABLE local_outbox RENAME TO local_outbox_legacy;
+ALTER TABLE local_conflicts RENAME TO local_conflicts_legacy;
+DROP TABLE sync_checkpoints;
+
+CREATE TABLE local_outbox (
+ id TEXT PRIMARY KEY NOT NULL,
+ account_id TEXT NOT NULL,
+ aggregate_kind TEXT NOT NULL CHECK (
+ aggregate_kind IN ('farm', 'fulfillment_window', 'product', 'order')
+ ),
+ aggregate_id TEXT NOT NULL,
+ operation_kind TEXT NOT NULL CHECK (operation_kind IN ('upsert', 'delete')),
+ payload_json TEXT NOT NULL,
+ created_at TEXT NOT NULL,
+ available_at TEXT NOT NULL,
+ attempt_count INTEGER NOT NULL DEFAULT 0
+);
+
+CREATE TABLE local_conflicts (
+ id TEXT PRIMARY KEY NOT NULL,
+ account_id TEXT NOT NULL,
+ aggregate_kind TEXT NOT NULL CHECK (
+ aggregate_kind IN ('farm', 'fulfillment_window', 'product', 'order')
+ ),
+ aggregate_id TEXT NOT NULL,
+ conflict_kind TEXT NOT NULL CHECK (
+ conflict_kind IN (
+ 'revision_mismatch',
+ 'remote_delete',
+ 'remote_validation_reject'
+ )
+ ),
+ severity TEXT NOT NULL CHECK (severity IN ('review_required', 'blocking')),
+ resolution_status TEXT NOT NULL CHECK (
+ resolution_status IN (
+ 'unresolved',
+ 'accepted_local',
+ 'accepted_remote',
+ 'dismissed'
+ )
+ ),
+ local_payload_json TEXT NOT NULL,
+ remote_payload_json TEXT,
+ detected_at TEXT NOT NULL,
+ resolved_at TEXT
+);
+
+CREATE TABLE sync_checkpoints (
+ account_id TEXT PRIMARY KEY NOT NULL,
+ state TEXT NOT NULL CHECK (
+ state IN ('never_synced', 'syncing', 'current', 'failed')
+ ),
+ last_sync_started_at TEXT,
+ last_sync_completed_at TEXT,
+ last_remote_cursor TEXT,
+ last_error_message TEXT
+);
+
+DROP TABLE local_outbox_legacy;
+DROP TABLE local_conflicts_legacy;
+
+CREATE INDEX idx_local_outbox_account_available_at ON local_outbox(
+ account_id,
+ available_at,
+ created_at,
+ id
+);
+CREATE INDEX idx_local_outbox_account_aggregate ON local_outbox(
+ account_id,
+ aggregate_kind,
+ aggregate_id
+);
+CREATE INDEX idx_local_conflicts_account_detected_at ON local_conflicts(
+ account_id,
+ detected_at,
+ id
+);
+CREATE INDEX idx_local_conflicts_account_aggregate ON local_conflicts(
+ account_id,
+ aggregate_kind,
+ aggregate_id
+);
diff --git a/crates/shared/sqlite/src/buyer.rs b/crates/shared/sqlite/src/buyer.rs
@@ -1682,6 +1682,9 @@ mod tests {
"Leave by the cooler".to_owned(),
)
);
+ assert_eq!(row_count(connection, "local_outbox"), 0);
+ assert_eq!(row_count(connection, "local_conflicts"), 0);
+ assert_eq!(row_count(connection, "sync_checkpoints"), 0);
}
#[test]
@@ -2033,4 +2036,12 @@ mod tests {
)
.expect("order contact should load")
}
+
+ fn row_count(connection: &Connection, table_name: &str) -> i64 {
+ let sql = format!("SELECT COUNT(*) FROM {table_name}");
+
+ connection
+ .query_row(&sql, [], |row| row.get(0))
+ .expect("row count query should succeed")
+ }
}
diff --git a/crates/shared/sqlite/src/lib.rs b/crates/shared/sqlite/src/lib.rs
@@ -9,6 +9,7 @@ mod farm_setup;
mod migrations;
mod orders;
mod products;
+mod sync;
mod today;
use std::{collections::BTreeSet, fs, path::PathBuf, time::Duration};
@@ -23,6 +24,9 @@ use radroots_app_models::{
ProductId, ProductPublishBlocker, ProductsFilter, ProductsListProjection, ProductsSort,
TodayAgendaProjection,
};
+use radroots_app_sync::{
+ PendingSyncOperation, SyncCheckpointStatus, SyncConflict, SyncConflictResolutionStatus,
+};
use rusqlite::Connection;
pub use activation::AppActivationRepository;
@@ -36,6 +40,7 @@ pub use farm_setup::AppFarmSetupRepository;
pub use migrations::latest_schema_version;
pub use orders::AppOrdersRepository;
pub use products::AppProductsRepository;
+pub use sync::{AppSyncRepository, StoredPendingSyncOperation, StoredSyncConflict};
pub use today::{
AppTodayAgendaRepository, TODAY_AGENDA_LIST_LIMIT, TODAY_AGENDA_LOW_STOCK_THRESHOLD,
};
@@ -104,6 +109,10 @@ impl AppSqliteStore {
AppOrdersRepository::new(&self.connection)
}
+ pub fn sync_repository(&self) -> AppSyncRepository<'_> {
+ AppSyncRepository::new(&self.connection)
+ }
+
pub fn load_today_agenda(
&self,
farm_id: Option<FarmId>,
@@ -342,6 +351,88 @@ impl AppSqliteStore {
self.buyer_repository()
.load_buyer_order_detail(context, order_id)
}
+
+ pub fn enqueue_pending_sync_operation(
+ &self,
+ account_id: &str,
+ operation: &PendingSyncOperation,
+ ) -> Result<String, AppSqliteError> {
+ self.sync_repository()
+ .enqueue_pending_operation(account_id, operation)
+ }
+
+ pub fn load_pending_sync_operations(
+ &self,
+ account_id: &str,
+ ) -> Result<Vec<StoredPendingSyncOperation>, AppSqliteError> {
+ self.sync_repository().load_pending_operations(account_id)
+ }
+
+ pub fn update_pending_sync_operation_retry(
+ &self,
+ account_id: &str,
+ operation_id: &str,
+ available_at: &str,
+ attempt_count: u32,
+ ) -> Result<bool, AppSqliteError> {
+ self.sync_repository().update_pending_operation_retry(
+ account_id,
+ operation_id,
+ available_at,
+ attempt_count,
+ )
+ }
+
+ pub fn dequeue_pending_sync_operation(
+ &self,
+ account_id: &str,
+ operation_id: &str,
+ ) -> Result<bool, AppSqliteError> {
+ self.sync_repository()
+ .dequeue_pending_operation(account_id, operation_id)
+ }
+
+ pub fn load_sync_checkpoint(
+ &self,
+ account_id: &str,
+ ) -> Result<SyncCheckpointStatus, AppSqliteError> {
+ self.sync_repository().load_checkpoint(account_id)
+ }
+
+ pub fn save_sync_checkpoint(
+ &self,
+ account_id: &str,
+ checkpoint: &SyncCheckpointStatus,
+ ) -> Result<(), AppSqliteError> {
+ self.sync_repository()
+ .save_checkpoint(account_id, checkpoint)
+ }
+
+ pub fn record_sync_conflict(
+ &self,
+ account_id: &str,
+ conflict: &SyncConflict,
+ ) -> Result<String, AppSqliteError> {
+ self.sync_repository().record_conflict(account_id, conflict)
+ }
+
+ pub fn load_sync_conflicts(
+ &self,
+ account_id: &str,
+ ) -> Result<Vec<StoredSyncConflict>, AppSqliteError> {
+ self.sync_repository().load_conflicts(account_id)
+ }
+
+ pub fn resolve_sync_conflict(
+ &self,
+ account_id: &str,
+ conflict_id: &str,
+ resolution: SyncConflictResolutionStatus,
+ resolved_at: &str,
+ ) -> Result<bool, AppSqliteError> {
+ self.sync_repository()
+ .resolve_conflict(account_id, conflict_id, resolution, resolved_at)
+ }
}
fn open_connection(target: &DatabaseTarget) -> Result<Connection, AppSqliteError> {
@@ -443,7 +534,7 @@ fn apply_migrations(connection: &mut Connection) -> Result<(), AppSqliteError> {
#[cfg(test)]
mod tests {
- use super::{AppSqliteStore, DatabaseTarget, latest_schema_version};
+ use super::{AppSqliteStore, DatabaseTarget, latest_schema_version, migrations};
use rusqlite::Connection;
use std::{
env, fs,
@@ -481,6 +572,16 @@ mod tests {
assert!(table_exists(connection, "buyer_cart_lines"));
assert!(column_exists(connection, "farms", "timezone"));
assert!(column_exists(connection, "farms", "currency_code"));
+ assert!(column_exists(connection, "local_outbox", "account_id"));
+ assert!(column_exists(connection, "local_conflicts", "account_id"));
+ assert!(column_exists(connection, "local_conflicts", "severity"));
+ assert!(column_exists(
+ connection,
+ "local_conflicts",
+ "resolution_status"
+ ));
+ assert!(column_exists(connection, "sync_checkpoints", "account_id"));
+ assert!(column_exists(connection, "sync_checkpoints", "state"));
assert!(column_exists(
connection,
"fulfillment_windows",
@@ -506,7 +607,7 @@ mod tests {
assert!(column_exists(connection, "orders", "buyer_email"));
assert!(column_exists(connection, "orders", "buyer_phone"));
assert!(column_exists(connection, "orders", "buyer_order_note"));
- assert_eq!(row_count(connection, "sync_checkpoints"), 1);
+ assert_eq!(row_count(connection, "sync_checkpoints"), 0);
drop(store);
remove_database_artifacts(&path);
@@ -523,7 +624,7 @@ mod tests {
reopened.schema_version().expect("schema version"),
latest_schema_version()
);
- assert_eq!(row_count(reopened.connection(), "sync_checkpoints"), 1);
+ assert_eq!(row_count(reopened.connection(), "sync_checkpoints"), 0);
drop(reopened);
remove_database_artifacts(&path);
@@ -541,6 +642,48 @@ mod tests {
assert!(table_exists(store.connection(), "farms"));
}
+ #[test]
+ fn legacy_sync_scaffolding_migrates_to_account_scoped_contract() {
+ let path = temp_database_path("legacy-sync-contract");
+ fs::create_dir_all(path.parent().expect("temp database should have a parent"))
+ .expect("legacy database parent should exist");
+ let connection = Connection::open(&path).expect("legacy database should open");
+
+ for (version, sql) in migrations::pending_migrations(0)
+ .filter(|(version, _)| *version < latest_schema_version())
+ {
+ connection
+ .execute_batch(sql)
+ .expect("legacy migration should apply");
+ connection
+ .pragma_update(None, "user_version", version)
+ .expect("legacy schema version should record");
+ }
+
+ drop(connection);
+
+ let store =
+ AppSqliteStore::open(DatabaseTarget::Path(path.clone())).expect("store should open");
+ let connection = store.connection();
+
+ assert_eq!(
+ store.schema_version().expect("schema version"),
+ latest_schema_version()
+ );
+ assert!(column_exists(connection, "local_outbox", "account_id"));
+ assert!(column_exists(connection, "local_conflicts", "severity"));
+ assert!(column_exists(
+ connection,
+ "local_conflicts",
+ "resolution_status"
+ ));
+ assert!(column_exists(connection, "sync_checkpoints", "state"));
+ assert_eq!(row_count(connection, "sync_checkpoints"), 0);
+
+ drop(store);
+ remove_database_artifacts(&path);
+ }
+
fn table_exists(connection: &Connection, table_name: &str) -> bool {
connection
.query_row(
diff --git a/crates/shared/sqlite/src/migrations.rs b/crates/shared/sqlite/src/migrations.rs
@@ -40,6 +40,10 @@ const MIGRATIONS: &[Migration] = &[
version: 9,
sql: include_str!("../migrations/0009_buyer_marketplace.sql"),
},
+ Migration {
+ version: 10,
+ sql: include_str!("../migrations/0010_sync_contract_alignment.sql"),
+ },
];
pub fn latest_schema_version() -> u32 {
diff --git a/crates/shared/sqlite/src/sync.rs b/crates/shared/sqlite/src/sync.rs
@@ -0,0 +1,782 @@
+use radroots_app_models::{FarmId, FulfillmentWindowId, OrderId, ProductId};
+use radroots_app_sync::{
+ PendingSyncOperation, SyncAggregateRef, SyncCheckpointState, SyncCheckpointStatus,
+ SyncConflict, SyncConflictKind, SyncConflictResolutionStatus, SyncConflictSeverity,
+ SyncOperationKind,
+};
+use rusqlite::{Connection, OptionalExtension, params};
+use uuid::Uuid;
+
+use crate::AppSqliteError;
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct StoredPendingSyncOperation {
+ pub operation_id: String,
+ pub operation: PendingSyncOperation,
+}
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct StoredSyncConflict {
+ pub conflict_id: String,
+ pub conflict: SyncConflict,
+}
+
+pub struct AppSyncRepository<'a> {
+ connection: &'a Connection,
+}
+
+impl<'a> AppSyncRepository<'a> {
+ pub const fn new(connection: &'a Connection) -> Self {
+ Self { connection }
+ }
+
+ pub fn enqueue_pending_operation(
+ &self,
+ account_id: &str,
+ operation: &PendingSyncOperation,
+ ) -> Result<String, AppSqliteError> {
+ let operation_id = Uuid::now_v7().to_string();
+
+ self.connection
+ .execute(
+ "INSERT INTO local_outbox (
+ id,
+ account_id,
+ aggregate_kind,
+ aggregate_id,
+ operation_kind,
+ payload_json,
+ created_at,
+ available_at,
+ attempt_count
+ ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
+ params![
+ operation_id,
+ account_id,
+ operation.aggregate.aggregate_kind(),
+ aggregate_id_value(&operation.aggregate),
+ operation.operation.storage_key(),
+ operation.payload_json,
+ operation.created_at,
+ operation.available_at,
+ i64::from(operation.attempt_count),
+ ],
+ )
+ .map_err(|source| AppSqliteError::Query {
+ operation: "enqueue pending sync operation",
+ source,
+ })?;
+
+ Ok(operation_id)
+ }
+
+ pub fn load_pending_operations(
+ &self,
+ account_id: &str,
+ ) -> Result<Vec<StoredPendingSyncOperation>, AppSqliteError> {
+ let mut statement = self
+ .connection
+ .prepare(
+ "SELECT
+ id,
+ aggregate_kind,
+ aggregate_id,
+ operation_kind,
+ payload_json,
+ created_at,
+ available_at,
+ attempt_count
+ FROM local_outbox
+ WHERE account_id = ?1
+ ORDER BY available_at ASC, created_at ASC, id ASC",
+ )
+ .map_err(|source| AppSqliteError::Query {
+ operation: "prepare pending sync operations query",
+ source,
+ })?;
+ let rows = statement
+ .query_map([account_id], |row| {
+ Ok((
+ row.get::<_, String>(0)?,
+ row.get::<_, String>(1)?,
+ row.get::<_, String>(2)?,
+ row.get::<_, String>(3)?,
+ row.get::<_, String>(4)?,
+ row.get::<_, String>(5)?,
+ row.get::<_, String>(6)?,
+ row.get::<_, u32>(7)?,
+ ))
+ })
+ .map_err(|source| AppSqliteError::Query {
+ operation: "query pending sync operations",
+ source,
+ })?;
+
+ rows.map(|row| {
+ let (
+ operation_id,
+ aggregate_kind,
+ aggregate_id,
+ operation_kind,
+ payload_json,
+ created_at,
+ available_at,
+ attempt_count,
+ ) = row.map_err(|source| AppSqliteError::Query {
+ operation: "read pending sync operation row",
+ source,
+ })?;
+
+ Ok(StoredPendingSyncOperation {
+ operation_id,
+ operation: PendingSyncOperation {
+ aggregate: parse_sync_aggregate_ref(
+ "local_outbox.aggregate_kind",
+ "local_outbox.aggregate_id",
+ aggregate_kind,
+ aggregate_id,
+ )?,
+ operation: parse_sync_operation_kind(operation_kind)?,
+ payload_json,
+ created_at,
+ available_at,
+ attempt_count,
+ },
+ })
+ })
+ .collect()
+ }
+
+ pub fn update_pending_operation_retry(
+ &self,
+ account_id: &str,
+ operation_id: &str,
+ available_at: &str,
+ attempt_count: u32,
+ ) -> Result<bool, AppSqliteError> {
+ let updated = self
+ .connection
+ .execute(
+ "UPDATE local_outbox
+ SET available_at = ?3, attempt_count = ?4
+ WHERE account_id = ?1 AND id = ?2",
+ params![
+ account_id,
+ operation_id,
+ available_at,
+ i64::from(attempt_count)
+ ],
+ )
+ .map_err(|source| AppSqliteError::Query {
+ operation: "update pending sync operation retry",
+ source,
+ })?;
+
+ Ok(updated == 1)
+ }
+
+ pub fn dequeue_pending_operation(
+ &self,
+ account_id: &str,
+ operation_id: &str,
+ ) -> Result<bool, AppSqliteError> {
+ let deleted = self
+ .connection
+ .execute(
+ "DELETE FROM local_outbox WHERE account_id = ?1 AND id = ?2",
+ params![account_id, operation_id],
+ )
+ .map_err(|source| AppSqliteError::Query {
+ operation: "dequeue pending sync operation",
+ source,
+ })?;
+
+ Ok(deleted == 1)
+ }
+
+ pub fn load_checkpoint(
+ &self,
+ account_id: &str,
+ ) -> Result<SyncCheckpointStatus, AppSqliteError> {
+ let row = self
+ .connection
+ .query_row(
+ "SELECT
+ state,
+ last_sync_started_at,
+ last_sync_completed_at,
+ last_remote_cursor,
+ last_error_message
+ FROM sync_checkpoints
+ WHERE account_id = ?1
+ LIMIT 1",
+ [account_id],
+ |row| {
+ Ok((
+ row.get::<_, String>(0)?,
+ row.get::<_, Option<String>>(1)?,
+ row.get::<_, Option<String>>(2)?,
+ row.get::<_, Option<String>>(3)?,
+ row.get::<_, Option<String>>(4)?,
+ ))
+ },
+ )
+ .optional()
+ .map_err(|source| AppSqliteError::Query {
+ operation: "load sync checkpoint",
+ source,
+ })?;
+
+ row.map_or_else(
+ || Ok(SyncCheckpointStatus::never_synced()),
+ |(
+ state,
+ last_sync_started_at,
+ last_sync_completed_at,
+ last_remote_cursor,
+ last_error_message,
+ )| {
+ Ok(SyncCheckpointStatus {
+ state: parse_sync_checkpoint_state(state)?,
+ last_sync_started_at,
+ last_sync_completed_at,
+ last_remote_cursor,
+ last_error_message,
+ })
+ },
+ )
+ }
+
+ pub fn save_checkpoint(
+ &self,
+ account_id: &str,
+ checkpoint: &SyncCheckpointStatus,
+ ) -> Result<(), AppSqliteError> {
+ self.connection
+ .execute(
+ "INSERT INTO sync_checkpoints (
+ account_id,
+ state,
+ last_sync_started_at,
+ last_sync_completed_at,
+ last_remote_cursor,
+ last_error_message
+ ) VALUES (?1, ?2, ?3, ?4, ?5, ?6)
+ ON CONFLICT(account_id) DO UPDATE SET
+ state = excluded.state,
+ last_sync_started_at = excluded.last_sync_started_at,
+ last_sync_completed_at = excluded.last_sync_completed_at,
+ last_remote_cursor = excluded.last_remote_cursor,
+ last_error_message = excluded.last_error_message",
+ params![
+ account_id,
+ sync_checkpoint_state_value(checkpoint.state),
+ checkpoint.last_sync_started_at,
+ checkpoint.last_sync_completed_at,
+ checkpoint.last_remote_cursor,
+ checkpoint.last_error_message,
+ ],
+ )
+ .map_err(|source| AppSqliteError::Query {
+ operation: "save sync checkpoint",
+ source,
+ })?;
+
+ Ok(())
+ }
+
+ pub fn record_conflict(
+ &self,
+ account_id: &str,
+ conflict: &SyncConflict,
+ ) -> Result<String, AppSqliteError> {
+ let conflict_id = Uuid::now_v7().to_string();
+
+ self.connection
+ .execute(
+ "INSERT INTO local_conflicts (
+ id,
+ account_id,
+ aggregate_kind,
+ aggregate_id,
+ conflict_kind,
+ severity,
+ resolution_status,
+ local_payload_json,
+ remote_payload_json,
+ detected_at,
+ resolved_at
+ ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
+ params![
+ conflict_id,
+ account_id,
+ conflict.aggregate.aggregate_kind(),
+ aggregate_id_value(&conflict.aggregate),
+ conflict.kind.storage_key(),
+ sync_conflict_severity_value(conflict.severity),
+ sync_conflict_resolution_status_value(conflict.resolution),
+ conflict.local_payload_json,
+ conflict.remote_payload_json,
+ conflict.detected_at,
+ conflict.resolved_at,
+ ],
+ )
+ .map_err(|source| AppSqliteError::Query {
+ operation: "record sync conflict",
+ source,
+ })?;
+
+ Ok(conflict_id)
+ }
+
+ pub fn load_conflicts(
+ &self,
+ account_id: &str,
+ ) -> Result<Vec<StoredSyncConflict>, AppSqliteError> {
+ let mut statement = self
+ .connection
+ .prepare(
+ "SELECT
+ id,
+ aggregate_kind,
+ aggregate_id,
+ conflict_kind,
+ severity,
+ resolution_status,
+ local_payload_json,
+ remote_payload_json,
+ detected_at,
+ resolved_at
+ FROM local_conflicts
+ WHERE account_id = ?1
+ ORDER BY detected_at DESC, id DESC",
+ )
+ .map_err(|source| AppSqliteError::Query {
+ operation: "prepare sync conflicts query",
+ source,
+ })?;
+ let rows = statement
+ .query_map([account_id], |row| {
+ Ok((
+ row.get::<_, String>(0)?,
+ row.get::<_, String>(1)?,
+ row.get::<_, String>(2)?,
+ row.get::<_, String>(3)?,
+ row.get::<_, String>(4)?,
+ row.get::<_, String>(5)?,
+ row.get::<_, String>(6)?,
+ row.get::<_, Option<String>>(7)?,
+ row.get::<_, String>(8)?,
+ row.get::<_, Option<String>>(9)?,
+ ))
+ })
+ .map_err(|source| AppSqliteError::Query {
+ operation: "query sync conflicts",
+ source,
+ })?;
+
+ rows.map(|row| {
+ let (
+ conflict_id,
+ aggregate_kind,
+ aggregate_id,
+ conflict_kind,
+ severity,
+ resolution_status,
+ local_payload_json,
+ remote_payload_json,
+ detected_at,
+ resolved_at,
+ ) = row.map_err(|source| AppSqliteError::Query {
+ operation: "read sync conflict row",
+ source,
+ })?;
+
+ Ok(StoredSyncConflict {
+ conflict_id,
+ conflict: SyncConflict {
+ aggregate: parse_sync_aggregate_ref(
+ "local_conflicts.aggregate_kind",
+ "local_conflicts.aggregate_id",
+ aggregate_kind,
+ aggregate_id,
+ )?,
+ kind: parse_sync_conflict_kind(conflict_kind)?,
+ severity: parse_sync_conflict_severity(severity)?,
+ resolution: parse_sync_conflict_resolution_status(resolution_status)?,
+ local_payload_json,
+ remote_payload_json,
+ detected_at,
+ resolved_at,
+ },
+ })
+ })
+ .collect()
+ }
+
+ pub fn resolve_conflict(
+ &self,
+ account_id: &str,
+ conflict_id: &str,
+ resolution: SyncConflictResolutionStatus,
+ resolved_at: &str,
+ ) -> Result<bool, AppSqliteError> {
+ if resolution == SyncConflictResolutionStatus::Unresolved {
+ return Err(AppSqliteError::InvalidProjection {
+ reason: "sync conflict resolution must be terminal",
+ });
+ }
+
+ let updated = self
+ .connection
+ .execute(
+ "UPDATE local_conflicts
+ SET resolution_status = ?3, resolved_at = ?4
+ WHERE account_id = ?1 AND id = ?2",
+ params![
+ account_id,
+ conflict_id,
+ sync_conflict_resolution_status_value(resolution),
+ resolved_at,
+ ],
+ )
+ .map_err(|source| AppSqliteError::Query {
+ operation: "resolve sync conflict",
+ source,
+ })?;
+
+ Ok(updated == 1)
+ }
+}
+
+fn aggregate_id_value(aggregate: &SyncAggregateRef) -> String {
+ match aggregate {
+ SyncAggregateRef::Farm(farm_id) => farm_id.to_string(),
+ SyncAggregateRef::FulfillmentWindow(fulfillment_window_id) => {
+ fulfillment_window_id.to_string()
+ }
+ SyncAggregateRef::Product(product_id) => product_id.to_string(),
+ SyncAggregateRef::Order(order_id) => order_id.to_string(),
+ }
+}
+
+fn parse_sync_aggregate_ref(
+ aggregate_kind_field: &'static str,
+ aggregate_id_field: &'static str,
+ aggregate_kind: String,
+ aggregate_id: String,
+) -> Result<SyncAggregateRef, AppSqliteError> {
+ match aggregate_kind.as_str() {
+ "farm" => Ok(SyncAggregateRef::Farm(
+ aggregate_id
+ .parse::<FarmId>()
+ .map_err(|_| AppSqliteError::DecodeId {
+ field: aggregate_id_field,
+ value: aggregate_id,
+ })?,
+ )),
+ "fulfillment_window" => Ok(SyncAggregateRef::FulfillmentWindow(
+ aggregate_id
+ .parse::<FulfillmentWindowId>()
+ .map_err(|_| AppSqliteError::DecodeId {
+ field: aggregate_id_field,
+ value: aggregate_id,
+ })?,
+ )),
+ "product" => Ok(SyncAggregateRef::Product(
+ aggregate_id
+ .parse::<ProductId>()
+ .map_err(|_| AppSqliteError::DecodeId {
+ field: aggregate_id_field,
+ value: aggregate_id,
+ })?,
+ )),
+ "order" => Ok(SyncAggregateRef::Order(
+ aggregate_id
+ .parse::<OrderId>()
+ .map_err(|_| AppSqliteError::DecodeId {
+ field: aggregate_id_field,
+ value: aggregate_id,
+ })?,
+ )),
+ _ => Err(AppSqliteError::DecodeEnum {
+ field: aggregate_kind_field,
+ value: aggregate_kind,
+ }),
+ }
+}
+
+fn parse_sync_operation_kind(value: String) -> Result<SyncOperationKind, AppSqliteError> {
+ match value.as_str() {
+ "upsert" => Ok(SyncOperationKind::Upsert),
+ "delete" => Ok(SyncOperationKind::Delete),
+ _ => Err(AppSqliteError::DecodeEnum {
+ field: "local_outbox.operation_kind",
+ value,
+ }),
+ }
+}
+
+fn parse_sync_conflict_kind(value: String) -> Result<SyncConflictKind, AppSqliteError> {
+ match value.as_str() {
+ "revision_mismatch" => Ok(SyncConflictKind::RevisionMismatch),
+ "remote_delete" => Ok(SyncConflictKind::RemoteDelete),
+ "remote_validation_reject" => Ok(SyncConflictKind::RemoteValidationReject),
+ _ => Err(AppSqliteError::DecodeEnum {
+ field: "local_conflicts.conflict_kind",
+ value,
+ }),
+ }
+}
+
+fn parse_sync_conflict_severity(value: String) -> Result<SyncConflictSeverity, AppSqliteError> {
+ match value.as_str() {
+ "review_required" => Ok(SyncConflictSeverity::ReviewRequired),
+ "blocking" => Ok(SyncConflictSeverity::Blocking),
+ _ => Err(AppSqliteError::DecodeEnum {
+ field: "local_conflicts.severity",
+ value,
+ }),
+ }
+}
+
+fn parse_sync_conflict_resolution_status(
+ value: String,
+) -> Result<SyncConflictResolutionStatus, AppSqliteError> {
+ match value.as_str() {
+ "unresolved" => Ok(SyncConflictResolutionStatus::Unresolved),
+ "accepted_local" => Ok(SyncConflictResolutionStatus::AcceptedLocal),
+ "accepted_remote" => Ok(SyncConflictResolutionStatus::AcceptedRemote),
+ "dismissed" => Ok(SyncConflictResolutionStatus::Dismissed),
+ _ => Err(AppSqliteError::DecodeEnum {
+ field: "local_conflicts.resolution_status",
+ value,
+ }),
+ }
+}
+
+fn parse_sync_checkpoint_state(value: String) -> Result<SyncCheckpointState, AppSqliteError> {
+ match value.as_str() {
+ "never_synced" => Ok(SyncCheckpointState::NeverSynced),
+ "syncing" => Ok(SyncCheckpointState::Syncing),
+ "current" => Ok(SyncCheckpointState::Current),
+ "failed" => Ok(SyncCheckpointState::Failed),
+ _ => Err(AppSqliteError::DecodeEnum {
+ field: "sync_checkpoints.state",
+ value,
+ }),
+ }
+}
+
+fn sync_checkpoint_state_value(state: SyncCheckpointState) -> &'static str {
+ match state {
+ SyncCheckpointState::NeverSynced => "never_synced",
+ SyncCheckpointState::Syncing => "syncing",
+ SyncCheckpointState::Current => "current",
+ SyncCheckpointState::Failed => "failed",
+ }
+}
+
+fn sync_conflict_severity_value(severity: SyncConflictSeverity) -> &'static str {
+ match severity {
+ SyncConflictSeverity::ReviewRequired => "review_required",
+ SyncConflictSeverity::Blocking => "blocking",
+ }
+}
+
+fn sync_conflict_resolution_status_value(resolution: SyncConflictResolutionStatus) -> &'static str {
+ match resolution {
+ SyncConflictResolutionStatus::Unresolved => "unresolved",
+ SyncConflictResolutionStatus::AcceptedLocal => "accepted_local",
+ SyncConflictResolutionStatus::AcceptedRemote => "accepted_remote",
+ SyncConflictResolutionStatus::Dismissed => "dismissed",
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use radroots_app_models::{FarmId, ProductId};
+ use radroots_app_sync::{
+ PendingSyncOperation, SyncAggregateRef, SyncCheckpointStatus, SyncConflict,
+ SyncConflictKind, SyncConflictResolutionStatus, SyncConflictSeverity, SyncOperationKind,
+ };
+
+ use crate::{AppSqliteStore, DatabaseTarget};
+
+ #[test]
+ fn checkpoints_are_selected_account_scoped() {
+ let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("store should open");
+ let repository = store.sync_repository();
+ let checkpoint =
+ SyncCheckpointStatus::syncing("2026-04-20T18:00:00Z", Some("cursor-1".to_owned()));
+
+ assert_eq!(
+ repository
+ .load_checkpoint("acct_a")
+ .expect("missing checkpoint should load"),
+ SyncCheckpointStatus::never_synced()
+ );
+
+ repository
+ .save_checkpoint("acct_a", &checkpoint)
+ .expect("checkpoint should save");
+
+ assert_eq!(
+ repository
+ .load_checkpoint("acct_a")
+ .expect("saved checkpoint should load"),
+ checkpoint
+ );
+ assert_eq!(
+ repository
+ .load_checkpoint("acct_b")
+ .expect("other account checkpoint should load"),
+ SyncCheckpointStatus::never_synced()
+ );
+ }
+
+ #[test]
+ fn pending_operations_are_account_scoped_and_retryable() {
+ let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("store should open");
+ let repository = store.sync_repository();
+ let first = PendingSyncOperation {
+ aggregate: SyncAggregateRef::Farm(FarmId::new()),
+ operation: SyncOperationKind::Upsert,
+ payload_json: "{\"farm\":\"a\"}".to_owned(),
+ created_at: "2026-04-20T18:00:00Z".to_owned(),
+ available_at: "2026-04-20T18:00:00Z".to_owned(),
+ attempt_count: 0,
+ };
+ let second = PendingSyncOperation {
+ aggregate: SyncAggregateRef::Product(ProductId::new()),
+ operation: SyncOperationKind::Delete,
+ payload_json: "{\"product\":\"b\"}".to_owned(),
+ created_at: "2026-04-20T18:05:00Z".to_owned(),
+ available_at: "2026-04-20T18:05:00Z".to_owned(),
+ attempt_count: 0,
+ };
+
+ let first_id = repository
+ .enqueue_pending_operation("acct_a", &first)
+ .expect("first operation should save");
+ let second_id = repository
+ .enqueue_pending_operation("acct_a", &second)
+ .expect("second operation should save");
+ repository
+ .enqueue_pending_operation("acct_b", &first)
+ .expect("other account operation should save");
+
+ let before_retry = repository
+ .load_pending_operations("acct_a")
+ .expect("pending operations should load");
+ assert_eq!(before_retry.len(), 2);
+ assert_eq!(before_retry[0].operation, first);
+ assert_eq!(before_retry[1].operation, second);
+
+ assert!(
+ repository
+ .update_pending_operation_retry("acct_a", &first_id, "2026-04-20T18:10:00Z", 2,)
+ .expect("retry update should succeed")
+ );
+ assert!(
+ !repository
+ .update_pending_operation_retry("acct_b", &first_id, "2026-04-20T18:10:00Z", 3,)
+ .expect("wrong-account retry update should not succeed")
+ );
+ assert!(
+ repository
+ .dequeue_pending_operation("acct_a", &second_id)
+ .expect("dequeue should succeed")
+ );
+
+ let acct_a = repository
+ .load_pending_operations("acct_a")
+ .expect("account operations should reload");
+ let acct_b = repository
+ .load_pending_operations("acct_b")
+ .expect("other account operations should reload");
+
+ assert_eq!(acct_a.len(), 1);
+ assert_eq!(acct_a[0].operation_id, first_id);
+ assert_eq!(acct_a[0].operation.attempt_count, 2);
+ assert_eq!(
+ acct_a[0].operation.available_at,
+ "2026-04-20T18:10:00Z".to_owned()
+ );
+ assert_eq!(acct_b.len(), 1);
+ }
+
+ #[test]
+ fn conflicts_are_account_scoped_and_resolvable() {
+ let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("store should open");
+ let repository = store.sync_repository();
+ let first = SyncConflict {
+ aggregate: SyncAggregateRef::Farm(FarmId::new()),
+ kind: SyncConflictKind::RevisionMismatch,
+ severity: SyncConflictSeverity::Blocking,
+ resolution: SyncConflictResolutionStatus::Unresolved,
+ local_payload_json: "{\"farm\":\"local\"}".to_owned(),
+ remote_payload_json: Some("{\"farm\":\"remote\"}".to_owned()),
+ detected_at: "2026-04-20T18:00:00Z".to_owned(),
+ resolved_at: None,
+ };
+ let second = SyncConflict {
+ aggregate: SyncAggregateRef::Product(ProductId::new()),
+ kind: SyncConflictKind::RemoteValidationReject,
+ severity: SyncConflictSeverity::ReviewRequired,
+ resolution: SyncConflictResolutionStatus::Unresolved,
+ local_payload_json: "{\"product\":\"local\"}".to_owned(),
+ remote_payload_json: None,
+ detected_at: "2026-04-20T18:05:00Z".to_owned(),
+ resolved_at: None,
+ };
+
+ let first_id = repository
+ .record_conflict("acct_a", &first)
+ .expect("first conflict should save");
+ repository
+ .record_conflict("acct_b", &second)
+ .expect("other account conflict should save");
+
+ assert!(
+ repository
+ .resolve_conflict(
+ "acct_a",
+ &first_id,
+ SyncConflictResolutionStatus::AcceptedLocal,
+ "2026-04-20T18:06:00Z",
+ )
+ .expect("conflict resolution should succeed")
+ );
+ assert!(
+ !repository
+ .resolve_conflict(
+ "acct_b",
+ &first_id,
+ SyncConflictResolutionStatus::AcceptedRemote,
+ "2026-04-20T18:07:00Z",
+ )
+ .expect("wrong-account resolution should not succeed")
+ );
+
+ let acct_a = repository
+ .load_conflicts("acct_a")
+ .expect("account conflicts should load");
+ let acct_b = repository
+ .load_conflicts("acct_b")
+ .expect("other account conflicts should load");
+
+ assert_eq!(acct_a.len(), 1);
+ assert_eq!(acct_a[0].conflict_id, first_id);
+ assert_eq!(
+ acct_a[0].conflict.resolution,
+ SyncConflictResolutionStatus::AcceptedLocal
+ );
+ assert_eq!(
+ acct_a[0].conflict.resolved_at.as_deref(),
+ Some("2026-04-20T18:06:00Z")
+ );
+ assert_eq!(acct_b.len(), 1);
+ assert_eq!(acct_b[0].conflict, second);
+ }
+}