app

Local-first trade for farms and co-ops
git clone https://radroots.dev/git/app.git
Log | Files | Refs | README | LICENSE

commit f6ea26ff3fcb92dc2b733b0bbc16f84618f78736
parent 93c30050cdfa05ffa3496174539ad43622eb3724
Author: triesap <tyson@radroots.org>
Date:   Fri, 17 Apr 2026 19:32:13 +0000

sync: add the radroots_app sync seam crate

Diffstat:
MCargo.lock | 9+++++++++
MCargo.toml | 2++
Acrates/shared/sync/Cargo.toml | 16++++++++++++++++
Acrates/shared/sync/src/lib.rs | 559+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 586 insertions(+), 0 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -4653,6 +4653,15 @@ dependencies = [ ] [[package]] +name = "radroots_app_sync" +version = "0.1.0" +dependencies = [ + "radroots_app_models", + "serde", + "thiserror 2.0.18", +] + +[[package]] name = "radroots_app_ui" version = "0.1.0" dependencies = [ diff --git a/Cargo.toml b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "crates/shared/models", "crates/shared/sqlite", "crates/shared/state", + "crates/shared/sync", "crates/shared/ui", "crates/launchers/desktop", ] @@ -33,6 +34,7 @@ radroots_app_i18n = { path = "crates/shared/i18n", version = "0.1.0" } radroots_app_models = { path = "crates/shared/models", version = "0.1.0" } radroots_app_sqlite = { path = "crates/shared/sqlite", version = "0.1.0" } radroots_app_state = { path = "crates/shared/state", version = "0.1.0" } +radroots_app_sync = { path = "crates/shared/sync", version = "0.1.0" } radroots_app_ui = { path = "crates/shared/ui", version = "0.1.0" } rusqlite = { version = "0.32", features = ["bundled"] } serde = { version = "1.0", features = ["derive"] } diff --git a/crates/shared/sync/Cargo.toml b/crates/shared/sync/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "radroots_app_sync" +version.workspace = true +edition.workspace = true +authors.workspace = true +rust-version.workspace = true +license.workspace = true +publish = false + +[dependencies] +radroots_app_models.workspace = true +serde.workspace = true +thiserror.workspace = true + +[lints] +workspace = true diff --git a/crates/shared/sync/src/lib.rs b/crates/shared/sync/src/lib.rs @@ -0,0 +1,559 @@ +#![forbid(unsafe_code)] + +use radroots_app_models::{FarmId, FulfillmentWindowId, OrderId, ProductId}; +use serde::{Deserialize, Serialize}; +use thiserror::Error; + +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +#[serde( + tag = "aggregate_kind", + content = "aggregate_id", + rename_all = "snake_case" +)] +pub enum SyncAggregateRef { + Farm(FarmId), + FulfillmentWindow(FulfillmentWindowId), + Product(ProductId), + Order(OrderId), +} + +impl SyncAggregateRef { + pub const fn aggregate_kind(&self) -> &'static str { + match self { + Self::Farm(_) => "farm", + Self::FulfillmentWindow(_) => "fulfillment_window", + Self::Product(_) => "product", + Self::Order(_) => "order", + } + } +} + +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum SyncTrigger { + AppLaunch, + ForegroundResume, + #[default] + ManualRefresh, + LocalMutation, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum SyncOperationKind { + Upsert, + Delete, +} + +impl SyncOperationKind { + pub const fn storage_key(self) -> &'static str { + match self { + Self::Upsert => "upsert", + Self::Delete => "delete", + } + } +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct PendingSyncOperation { + pub aggregate: SyncAggregateRef, + pub operation: SyncOperationKind, + pub payload_json: String, + pub created_at: String, + pub available_at: String, + pub attempt_count: u32, +} + +impl PendingSyncOperation { + pub const fn is_retry(&self) -> bool { + self.attempt_count > 0 + } +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum SyncConflictKind { + RevisionMismatch, + RemoteDelete, + RemoteValidationReject, +} + +impl SyncConflictKind { + pub const fn storage_key(self) -> &'static str { + match self { + Self::RevisionMismatch => "revision_mismatch", + Self::RemoteDelete => "remote_delete", + Self::RemoteValidationReject => "remote_validation_reject", + } + } +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum SyncConflictSeverity { + ReviewRequired, + Blocking, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum SyncConflictResolutionStatus { + Unresolved, + AcceptedLocal, + AcceptedRemote, + Dismissed, +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct SyncConflict { + pub aggregate: SyncAggregateRef, + pub kind: SyncConflictKind, + pub severity: SyncConflictSeverity, + pub resolution: SyncConflictResolutionStatus, + pub local_payload_json: String, + pub remote_payload_json: Option<String>, + pub detected_at: String, + pub resolved_at: Option<String>, +} + +impl SyncConflict { + pub const fn is_unresolved(&self) -> bool { + matches!(self.resolution, SyncConflictResolutionStatus::Unresolved) + } +} + +#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)] +pub struct SyncConflictStatus { + pub unresolved_count: usize, + pub blocking_count: usize, +} + +impl SyncConflictStatus { + pub const fn clear() -> Self { + Self { + unresolved_count: 0, + blocking_count: 0, + } + } + + pub fn from_conflicts(conflicts: &[SyncConflict]) -> Self { + let unresolved_conflicts = conflicts.iter().filter(|conflict| conflict.is_unresolved()); + let unresolved_count = unresolved_conflicts.clone().count(); + let blocking_count = unresolved_conflicts + .filter(|conflict| matches!(conflict.severity, SyncConflictSeverity::Blocking)) + .count(); + + Self { + unresolved_count, + blocking_count, + } + } + + pub const fn is_clear(&self) -> bool { + self.unresolved_count == 0 + } + + pub const fn requires_attention(&self) -> bool { + self.unresolved_count > 0 + } + + pub const fn has_blocking_conflicts(&self) -> bool { + self.blocking_count > 0 + } +} + +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum SyncCheckpointState { + #[default] + NeverSynced, + Syncing, + Current, + Failed, +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct SyncCheckpointStatus { + pub state: SyncCheckpointState, + pub last_sync_started_at: Option<String>, + pub last_sync_completed_at: Option<String>, + pub last_remote_cursor: Option<String>, + pub last_error_message: Option<String>, +} + +impl Default for SyncCheckpointStatus { + fn default() -> Self { + Self::never_synced() + } +} + +impl SyncCheckpointStatus { + pub const fn never_synced() -> Self { + Self { + state: SyncCheckpointState::NeverSynced, + last_sync_started_at: None, + last_sync_completed_at: None, + last_remote_cursor: None, + last_error_message: None, + } + } + + pub fn syncing(started_at: impl Into<String>, last_remote_cursor: Option<String>) -> Self { + Self { + state: SyncCheckpointState::Syncing, + last_sync_started_at: Some(started_at.into()), + last_sync_completed_at: None, + last_remote_cursor, + last_error_message: None, + } + } + + pub fn current( + started_at: Option<String>, + completed_at: impl Into<String>, + last_remote_cursor: Option<String>, + ) -> Self { + Self { + state: SyncCheckpointState::Current, + last_sync_started_at: started_at, + last_sync_completed_at: Some(completed_at.into()), + last_remote_cursor, + last_error_message: None, + } + } + + pub fn failed( + started_at: Option<String>, + completed_at: Option<String>, + last_remote_cursor: Option<String>, + message: impl Into<String>, + ) -> Self { + Self { + state: SyncCheckpointState::Failed, + last_sync_started_at: started_at, + last_sync_completed_at: completed_at, + last_remote_cursor, + last_error_message: Some(message.into()), + } + } + + pub const fn is_failed(&self) -> bool { + matches!(self.state, SyncCheckpointState::Failed) + } + + pub const fn is_syncing(&self) -> bool { + matches!(self.state, SyncCheckpointState::Syncing) + } +} + +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum AppSyncRunStatus { + #[default] + Idle, + Syncing, + Succeeded, + Conflicted, + Failed, +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct AppSyncProjection { + pub run_status: AppSyncRunStatus, + pub checkpoint: SyncCheckpointStatus, + pub conflict_status: SyncConflictStatus, +} + +impl Default for AppSyncProjection { + fn default() -> Self { + Self { + run_status: AppSyncRunStatus::Idle, + checkpoint: SyncCheckpointStatus::never_synced(), + conflict_status: SyncConflictStatus::clear(), + } + } +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct AppSyncRequest { + pub trigger: SyncTrigger, + pub checkpoint: SyncCheckpointStatus, + pub pending_operations: Vec<PendingSyncOperation>, + pub known_conflicts: Vec<SyncConflict>, +} + +impl AppSyncRequest { + pub fn conflict_status(&self) -> SyncConflictStatus { + SyncConflictStatus::from_conflicts(&self.known_conflicts) + } +} + +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct AppSyncResult { + pub run_status: AppSyncRunStatus, + pub checkpoint: SyncCheckpointStatus, + pub pushed_operation_count: usize, + pub pulled_record_count: usize, + pub conflicts: Vec<SyncConflict>, +} + +impl AppSyncResult { + pub fn projection(&self) -> AppSyncProjection { + AppSyncProjection { + run_status: self.run_status, + checkpoint: self.checkpoint.clone(), + conflict_status: SyncConflictStatus::from_conflicts(&self.conflicts), + } + } +} + +#[derive(Clone, Debug, Eq, Error, PartialEq)] +pub enum AppSyncTransportError { + #[error("app sync transport is unavailable: {message}")] + Unavailable { message: String }, + #[error("app sync transport failed: {message}")] + Failed { message: String }, +} + +impl AppSyncTransportError { + pub fn unavailable(message: impl Into<String>) -> Self { + Self::Unavailable { + message: message.into(), + } + } + + pub fn failed(message: impl Into<String>) -> Self { + Self::Failed { + message: message.into(), + } + } +} + +pub trait AppSyncTransport { + fn sync(&mut self, request: AppSyncRequest) -> Result<AppSyncResult, AppSyncTransportError>; +} + +#[derive(Clone, Debug)] +pub struct RecordedAppSyncTransport { + result: Result<AppSyncResult, AppSyncTransportError>, + last_request: Option<AppSyncRequest>, + call_count: usize, +} + +impl RecordedAppSyncTransport { + pub fn succeed(result: AppSyncResult) -> Self { + Self { + result: Ok(result), + last_request: None, + call_count: 0, + } + } + + pub fn fail(error: AppSyncTransportError) -> Self { + Self { + result: Err(error), + last_request: None, + call_count: 0, + } + } + + pub fn last_request(&self) -> Option<&AppSyncRequest> { + self.last_request.as_ref() + } + + pub const fn call_count(&self) -> usize { + self.call_count + } +} + +impl AppSyncTransport for RecordedAppSyncTransport { + fn sync(&mut self, request: AppSyncRequest) -> Result<AppSyncResult, AppSyncTransportError> { + self.call_count += 1; + self.last_request = Some(request); + self.result.clone() + } +} + +#[cfg(test)] +mod tests { + use super::{ + AppSyncProjection, AppSyncRequest, AppSyncResult, AppSyncRunStatus, AppSyncTransport, + AppSyncTransportError, PendingSyncOperation, RecordedAppSyncTransport, SyncAggregateRef, + SyncCheckpointState, SyncCheckpointStatus, SyncConflict, SyncConflictKind, + SyncConflictResolutionStatus, SyncConflictSeverity, SyncConflictStatus, SyncOperationKind, + SyncTrigger, + }; + use radroots_app_models::{FarmId, ProductId}; + + #[test] + fn default_projection_starts_idle_and_clear() { + let projection = AppSyncProjection::default(); + + assert_eq!(projection.run_status, AppSyncRunStatus::Idle); + assert_eq!( + projection.checkpoint.state, + SyncCheckpointState::NeverSynced + ); + assert!(projection.conflict_status.is_clear()); + } + + #[test] + fn checkpoint_constructors_keep_sync_and_failure_state_explicit() { + let syncing = + SyncCheckpointStatus::syncing("2026-04-17T19:30:00Z", Some("cursor-1".to_owned())); + let failed = SyncCheckpointStatus::failed( + Some("2026-04-17T19:30:00Z".to_owned()), + Some("2026-04-17T19:30:30Z".to_owned()), + Some("cursor-1".to_owned()), + "relay timeout", + ); + let current = SyncCheckpointStatus::current( + Some("2026-04-17T19:30:00Z".to_owned()), + "2026-04-17T19:30:30Z", + Some("cursor-2".to_owned()), + ); + + assert!(syncing.is_syncing()); + assert_eq!(syncing.last_sync_completed_at, None); + assert_eq!(syncing.last_error_message, None); + + assert!(failed.is_failed()); + assert_eq!(failed.last_error_message.as_deref(), Some("relay timeout")); + + assert_eq!(current.state, SyncCheckpointState::Current); + assert_eq!(current.last_remote_cursor.as_deref(), Some("cursor-2")); + assert_eq!(current.last_error_message, None); + } + + #[test] + fn conflict_status_counts_only_unresolved_conflicts() { + let conflicts = vec![ + SyncConflict { + aggregate: SyncAggregateRef::Product(ProductId::new()), + kind: SyncConflictKind::RevisionMismatch, + severity: SyncConflictSeverity::Blocking, + resolution: SyncConflictResolutionStatus::Unresolved, + local_payload_json: "{\"title\":\"carrots\"}".to_owned(), + remote_payload_json: Some("{\"title\":\"rainbow carrots\"}".to_owned()), + detected_at: "2026-04-17T19:31:00Z".to_owned(), + resolved_at: None, + }, + SyncConflict { + aggregate: SyncAggregateRef::Farm(FarmId::new()), + kind: SyncConflictKind::RemoteValidationReject, + severity: SyncConflictSeverity::ReviewRequired, + resolution: SyncConflictResolutionStatus::AcceptedRemote, + local_payload_json: "{\"display_name\":\"Sunrise Farm\"}".to_owned(), + remote_payload_json: Some("{\"display_name\":\"Sunrise Farm LLC\"}".to_owned()), + detected_at: "2026-04-17T19:31:30Z".to_owned(), + resolved_at: Some("2026-04-17T19:32:00Z".to_owned()), + }, + ]; + + let status = SyncConflictStatus::from_conflicts(&conflicts); + + assert_eq!(status.unresolved_count, 1); + assert_eq!(status.blocking_count, 1); + assert!(status.requires_attention()); + assert!(status.has_blocking_conflicts()); + } + + #[test] + fn request_and_result_surface_conflict_status_through_typed_contracts() { + let pending_operation = PendingSyncOperation { + aggregate: SyncAggregateRef::Product(ProductId::new()), + operation: SyncOperationKind::Upsert, + payload_json: "{\"title\":\"greens\"}".to_owned(), + created_at: "2026-04-17T19:32:00Z".to_owned(), + available_at: "2026-04-17T19:32:00Z".to_owned(), + attempt_count: 1, + }; + let conflict = SyncConflict { + aggregate: SyncAggregateRef::Product(ProductId::new()), + kind: SyncConflictKind::RevisionMismatch, + severity: SyncConflictSeverity::ReviewRequired, + resolution: SyncConflictResolutionStatus::Unresolved, + local_payload_json: "{\"stock_count\":4}".to_owned(), + remote_payload_json: Some("{\"stock_count\":6}".to_owned()), + detected_at: "2026-04-17T19:33:00Z".to_owned(), + resolved_at: None, + }; + let request = AppSyncRequest { + trigger: SyncTrigger::LocalMutation, + checkpoint: SyncCheckpointStatus::current( + Some("2026-04-17T19:30:00Z".to_owned()), + "2026-04-17T19:32:30Z", + Some("cursor-4".to_owned()), + ), + pending_operations: vec![pending_operation.clone()], + known_conflicts: vec![conflict.clone()], + }; + let result = AppSyncResult { + run_status: AppSyncRunStatus::Conflicted, + checkpoint: request.checkpoint.clone(), + pushed_operation_count: 1, + pulled_record_count: 3, + conflicts: vec![conflict], + }; + + assert_eq!(request.conflict_status().unresolved_count, 1); + assert!(pending_operation.is_retry()); + assert_eq!(pending_operation.operation.storage_key(), "upsert"); + + let projection = result.projection(); + assert_eq!(projection.run_status, AppSyncRunStatus::Conflicted); + assert_eq!( + projection.checkpoint.last_remote_cursor.as_deref(), + Some("cursor-4") + ); + assert_eq!(projection.conflict_status.unresolved_count, 1); + } + + #[test] + fn recorded_transport_is_mockable_and_records_requests() { + let request = AppSyncRequest { + trigger: SyncTrigger::ManualRefresh, + checkpoint: SyncCheckpointStatus::never_synced(), + pending_operations: vec![], + known_conflicts: vec![], + }; + let expected_result = AppSyncResult { + run_status: AppSyncRunStatus::Succeeded, + checkpoint: SyncCheckpointStatus::current( + Some("2026-04-17T19:34:00Z".to_owned()), + "2026-04-17T19:34:10Z", + Some("cursor-9".to_owned()), + ), + pushed_operation_count: 0, + pulled_record_count: 2, + conflicts: vec![], + }; + let mut transport = RecordedAppSyncTransport::succeed(expected_result.clone()); + + let actual_result = transport + .sync(request.clone()) + .expect("recorded transport should succeed"); + + assert_eq!(actual_result, expected_result); + assert_eq!(transport.last_request(), Some(&request)); + assert_eq!(transport.call_count(), 1); + } + + #[test] + fn recorded_transport_can_fail_without_a_live_backend() { + let mut transport = + RecordedAppSyncTransport::fail(AppSyncTransportError::unavailable("offline")); + + let error = transport + .sync(AppSyncRequest { + trigger: SyncTrigger::AppLaunch, + checkpoint: SyncCheckpointStatus::never_synced(), + pending_operations: vec![], + known_conflicts: vec![], + }) + .expect_err("recorded transport should fail"); + + assert_eq!(error, AppSyncTransportError::unavailable("offline")); + assert_eq!(transport.call_count(), 1); + } +}