app

Local-first trade for farms and co-ops
git clone https://radroots.dev/git/app.git
Log | Files | Refs | README | LICENSE

commit 2616ec4d16a612563dcedd36ed273fd5b38c2903
parent 20028a38415acd04e83cd55cb2cee12f8dd89142
Author: triesap <tyson@radroots.org>
Date:   Mon, 20 Apr 2026 21:22:16 +0000

runtime: add sync triggers and pending write semantics

Diffstat:
MCargo.lock | 2++
Mcrates/launchers/desktop/Cargo.toml | 2++
Mcrates/launchers/desktop/src/app.rs | 8++++++++
Mcrates/launchers/desktop/src/runtime.rs | 826+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
Mcrates/shared/sqlite/src/lib.rs | 9+++++++++
Mcrates/shared/sqlite/src/sync.rs | 71+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
6 files changed, 903 insertions(+), 15 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -5034,6 +5034,7 @@ checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" name = "radroots_app" version = "0.1.0" dependencies = [ + "chrono", "gpui", "gpui-component", "gpui-component-assets", @@ -5050,6 +5051,7 @@ dependencies = [ "radroots_nostr_accounts", "radroots_protected_store", "radroots_secret_vault", + "serde_json", "thiserror 2.0.18", "tracing", "tracing-subscriber", diff --git a/crates/launchers/desktop/Cargo.toml b/crates/launchers/desktop/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true publish = false [dependencies] +chrono.workspace = true gpui.workspace = true gpui-component.workspace = true gpui-component-assets.workspace = true @@ -24,6 +25,7 @@ radroots_app_sqlite.workspace = true radroots_app_state.workspace = true radroots_app_sync.workspace = true radroots_app_ui.workspace = true +serde_json.workspace = true thiserror.workspace = true tracing.workspace = true diff --git a/crates/launchers/desktop/src/app.rs b/crates/launchers/desktop/src/app.rs @@ -34,6 +34,14 @@ pub fn launch() -> Result<(), AppLaunchError> { runtime_config.default_nostr_relay_url.clone(), snapshot.clone(), ); + if let Err(error) = runtime.sync_on_app_launch() { + error!( + target: "sync", + event = "sync.launch_attempt_failed", + error = %error, + "failed to execute launch sync attempt" + ); + } let runtime_summary = runtime.summary(); emit_runtime_events(&snapshot, &runtime_summary); let launch_target = primary_window_target(&runtime_summary); diff --git a/crates/launchers/desktop/src/runtime.rs b/crates/launchers/desktop/src/runtime.rs @@ -3,6 +3,7 @@ use std::fmt; use std::path::PathBuf; use std::sync::{Arc, Mutex, MutexGuard, PoisonError}; +use chrono::Utc; use radroots_app_core::{ AppBuildIdentity, AppDesktopRuntimePaths, AppRuntimeCapture, AppRuntimeMode, AppRuntimePathsError, AppRuntimeSnapshot, AppSharedAccountsPaths, @@ -10,8 +11,8 @@ use radroots_app_core::{ use radroots_app_models::{ ActiveSurface, AppActivityContext, AppActivityKind, AppIdentityProjection, AppStartupGate, BuyerCartLineProjection, BuyerCartProjection, BuyerCartReplaceConfirmationProjection, - BuyerCheckoutDraft, BuyerOrderDetailProjection, BuyerProductDetailProjection, FarmId, - FarmOrderMethod, FarmProfileRecord, FarmReadiness, FarmRulesProjection, FarmSetupDraft, + BuyerCheckoutDraft, BuyerContext, BuyerOrderDetailProjection, BuyerProductDetailProjection, + FarmId, FarmOrderMethod, FarmProfileRecord, FarmReadiness, FarmRulesProjection, FarmSetupDraft, FarmSetupProjection, FarmSummary, FarmerSection, FulfillmentWindowId, LoggedOutStartupProjection, OrderDetailProjection, OrderId, OrdersFilter, OrdersListProjection, OrdersScreenQueryState, PackDayProjection, PackDayScreenQueryState, PersonalSection, @@ -24,7 +25,7 @@ use radroots_app_remote_signer::{ }; use radroots_app_sqlite::{ APP_ACTIVITY_CONTEXT_LIMIT, AppSqliteError, AppSqliteStore, DatabaseTarget, - derive_farm_rules_readiness, + StoredPendingSyncOperation, StoredSyncConflict, derive_farm_rules_readiness, }; use radroots_app_state::{ AppShellProjection, AppStateCommand, AppStateStore, AppStateStoreError, @@ -34,8 +35,13 @@ use radroots_app_state::{ OrdersScreenProjection, PackDayScreenProjection, PersonalWorkspaceProjection, ProductsScreenProjection, ProductsScreenQueryState, derive_sync_projection, }; -use radroots_app_sync::AppSyncProjection; +use radroots_app_sync::{ + AppSyncProjection, AppSyncRequest, AppSyncResult, AppSyncTransport, AppSyncTransportError, + PendingSyncOperation, SyncAggregateRef, SyncCheckpointStatus, SyncConflictSeverity, + SyncOperationKind, SyncTrigger, +}; use radroots_nostr_accounts::prelude::RadrootsNostrAccountsManager; +use serde_json::json; use thiserror::Error; use tracing::error; @@ -52,6 +58,22 @@ use crate::remote_signer::{ }; const APP_DATABASE_FILE_NAME: &str = "app.sqlite3"; +const SYNC_TRANSPORT_UNAVAILABLE_MESSAGE: &str = "remote sync transport is not configured"; + +#[derive(Debug, Default)] +struct UnavailableAppSyncTransport; + +impl AppSyncTransport for UnavailableAppSyncTransport { + fn sync(&mut self, _request: AppSyncRequest) -> Result<AppSyncResult, AppSyncTransportError> { + Err(AppSyncTransportError::unavailable( + SYNC_TRANSPORT_UNAVAILABLE_MESSAGE, + )) + } +} + +fn default_sync_transport() -> Box<dyn AppSyncTransport + Send> { + Box::new(UnavailableAppSyncTransport) +} #[derive(Clone, Debug)] pub struct DesktopAppRuntime { @@ -460,6 +482,20 @@ impl DesktopAppRuntime { self.lock_state_mut().save_farm_rules_projection(projection) } + pub fn sync_on_app_launch(&self) -> Result<bool, AppSqliteError> { + self.lock_state_mut().attempt_sync(SyncTrigger::AppLaunch) + } + + pub fn sync_on_foreground_resume(&self) -> Result<bool, AppSqliteError> { + self.lock_state_mut() + .attempt_sync(SyncTrigger::ForegroundResume) + } + + pub fn sync_on_manual_refresh(&self) -> Result<bool, AppSqliteError> { + self.lock_state_mut() + .attempt_sync(SyncTrigger::ManualRefresh) + } + pub fn record_home_opened(&self) -> bool { self.record_activity(AppActivityKind::HomeOpened) } @@ -633,6 +669,14 @@ struct DesktopSelectedAccountSyncContext { pending_write_count: usize, } +#[derive(Clone, Debug)] +struct DesktopPreparedSyncRequest { + account_id: String, + checkpoint: SyncCheckpointStatus, + conflicts: Vec<StoredSyncConflict>, + pending_operations: Vec<StoredPendingSyncOperation>, +} + struct DesktopAppRuntimeState { state_store: AppStateStore<InMemoryAppStateRepository>, default_nostr_relay_url: String, @@ -640,6 +684,7 @@ struct DesktopAppRuntimeState { remote_signer_paths: Option<DesktopRemoteSignerPaths>, accounts_manager: Option<RadrootsNostrAccountsManager>, sqlite_store: Option<AppSqliteStore>, + sync_transport: Box<dyn AppSyncTransport + Send>, runtime_metadata: DesktopAppRuntimeMetadataSummary, selected_account_pending_sync_write_count: usize, startup_issue: Option<String>, @@ -666,6 +711,7 @@ impl fmt::Debug for DesktopAppRuntimeState { "sqlite_store", &self.sqlite_store.as_ref().map(|_| "available"), ) + .field("sync_transport", &"configured") .field("runtime_metadata", &self.runtime_metadata) .field( "selected_account_pending_sync_write_count", @@ -767,6 +813,7 @@ impl DesktopAppRuntimeState { remote_signer_paths: Some(remote_signer_paths), accounts_manager: accounts_bootstrap.accounts_manager, sqlite_store: Some(sqlite_store), + sync_transport: default_sync_transport(), runtime_metadata: DesktopAppRuntimeMetadataSummary::available( runtime_snapshot, &paths, @@ -793,6 +840,7 @@ impl DesktopAppRuntimeState { remote_signer_paths: None, accounts_manager: None, sqlite_store: None, + sync_transport: default_sync_transport(), runtime_metadata: DesktopAppRuntimeMetadataSummary::unavailable(runtime_snapshot), selected_account_pending_sync_write_count: 0, startup_issue: Some(error.to_string()), @@ -1156,8 +1204,21 @@ impl DesktopAppRuntimeState { changed }); let section_changed = self.select_personal_section(PersonalSection::Orders); + let pending_changed = if matches!(buyer_context, BuyerContext::Account(_)) { + self.enqueue_selected_account_sync_operations(vec![pending_sync_upsert( + SyncAggregateRef::Order(order_id), + order_sync_payload( + order_id, + order_detail.farm_id, + "place_personal_order", + Some("needs_action"), + ), + )])? + } else { + false + }; - Ok(personal_changed || section_changed) + Ok(personal_changed || section_changed || pending_changed) } fn open_personal_order_detail(&mut self, order_id: OrderId) -> Result<bool, AppSqliteError> { @@ -1356,8 +1417,13 @@ impl DesktopAppRuntimeState { self.state_store.pack_day_projection().query.clone(), )?; let context_changed = self.apply_selected_account_context(&selected_account_context); + let pending_changed = + self.enqueue_selected_account_sync_operations(vec![pending_sync_upsert( + SyncAggregateRef::Order(order_id), + order_sync_payload(order_id, farm_id, "mark_order_packed", Some("packed")), + )])?; - Ok(updated || context_changed) + Ok(updated || context_changed || pending_changed) } fn mark_order_completed(&mut self, order_id: OrderId) -> Result<bool, AppSqliteError> { @@ -1382,8 +1448,13 @@ impl DesktopAppRuntimeState { self.state_store.pack_day_projection().query.clone(), )?; let context_changed = self.apply_selected_account_context(&selected_account_context); + let pending_changed = + self.enqueue_selected_account_sync_operations(vec![pending_sync_upsert( + SyncAggregateRef::Order(order_id), + order_sync_payload(order_id, farm_id, "mark_order_completed", Some("completed")), + )])?; - Ok(updated || context_changed) + Ok(updated || context_changed || pending_changed) } fn open_pack_day( @@ -1419,6 +1490,9 @@ impl DesktopAppRuntimeState { let Some(sqlite_store) = self.sqlite_store.as_ref() else { return Ok(false); }; + let Some(farm_id) = self.selected_farm_id() else { + return Ok(false); + }; if self .state_store .identity_projection() @@ -1442,8 +1516,20 @@ impl DesktopAppRuntimeState { self.state_store.pack_day_projection().query.clone(), )?; let context_changed = self.apply_selected_account_context(&selected_account_context); + let pending_changed = + self.enqueue_selected_account_sync_operations(vec![pending_sync_upsert( + SyncAggregateRef::Product(product_id), + product_sync_payload( + product_id, + Some(farm_id), + "update_product_stock", + None, + Some(stock_quantity), + None, + ), + )])?; - Ok(updated || context_changed) + Ok(updated || context_changed || pending_changed) } fn open_new_product_editor(&mut self) -> Result<bool, AppSqliteError> { @@ -1468,13 +1554,26 @@ impl DesktopAppRuntimeState { )?; let context_changed = self.apply_selected_account_context(&selected_account_context); let section_changed = self.select_farmer_section(FarmerSection::Products); + let draft_payload = draft.clone(); let editor_changed = self.state_store .apply_in_memory(AppStateCommand::open_existing_product_editor( product_id, draft, )); + let pending_changed = + self.enqueue_selected_account_sync_operations(vec![pending_sync_upsert( + SyncAggregateRef::Product(product_id), + product_sync_payload( + product_id, + Some(farm_id), + "open_new_product_editor", + Some(&draft_payload), + draft_payload.stock_quantity, + None, + ), + )])?; - Ok(context_changed || section_changed || editor_changed) + Ok(context_changed || section_changed || editor_changed || pending_changed) } fn open_existing_product_editor( @@ -1537,13 +1636,26 @@ impl DesktopAppRuntimeState { .unwrap_or(draft) }; let context_changed = self.apply_selected_account_context(&selected_account_context); + let draft_payload = reloaded_draft.clone(); let editor_changed = self.state_store .apply_in_memory(AppStateCommand::replace_product_editor_draft( reloaded_draft, )); + let pending_changed = + self.enqueue_selected_account_sync_operations(vec![pending_sync_upsert( + SyncAggregateRef::Product(product_id), + product_sync_payload( + product_id, + self.selected_farm_id(), + "save_product_editor_draft", + Some(&draft_payload), + draft_payload.stock_quantity, + None, + ), + )])?; - Ok(saved || context_changed || editor_changed) + Ok(saved || context_changed || editor_changed || pending_changed) } fn close_product_editor(&mut self) -> bool { @@ -1592,6 +1704,15 @@ impl DesktopAppRuntimeState { let selected_account_context = self.refresh_selected_account_context()?; self.apply_selected_account_context(&selected_account_context); + let _ = self.enqueue_selected_account_sync_operations(vec![pending_sync_upsert( + SyncAggregateRef::Farm(saved_farm.farm_id), + farm_sync_payload( + saved_farm.farm_id, + saved_farm.display_name.as_str(), + Some(saved_farm.readiness), + "finish_farm_setup", + ), + )])?; Ok(selected_account_context.farm_setup_projection) } @@ -1623,6 +1744,15 @@ impl DesktopAppRuntimeState { .ok_or(DesktopAppRuntimeFarmRulesError::FarmRequired)?; let fallback_profile = self.fallback_farm_profile(farm_id); let normalized = normalize_farm_rules_projection(projection, &fallback_profile); + let previous_fulfillment_window_ids = { + let sqlite_store = self.sqlite_store_for_farm_rules()?; + sqlite_store + .load_farm_rules(farm_id)? + .fulfillment_windows + .into_iter() + .map(|window| window.fulfillment_window_id) + .collect::<BTreeSet<_>>() + }; let saved_projection = { let sqlite_store = self.sqlite_store_for_farm_rules()?; @@ -1666,6 +1796,47 @@ impl DesktopAppRuntimeState { )? }; self.apply_selected_account_context(&selected_account_context); + let current_fulfillment_window_ids = saved_projection + .fulfillment_windows + .iter() + .map(|window| window.fulfillment_window_id) + .collect::<BTreeSet<_>>(); + let mut pending_operations = Vec::with_capacity( + 1 + saved_projection.fulfillment_windows.len() + previous_fulfillment_window_ids.len(), + ); + pending_operations.push(pending_sync_upsert( + SyncAggregateRef::Farm(farm_id), + farm_sync_payload( + farm_id, + saved_projection + .farm_profile + .as_ref() + .map(|profile| profile.display_name.as_str()) + .unwrap_or_default(), + Some(if saved_projection.is_ready() { + FarmReadiness::Ready + } else { + FarmReadiness::Incomplete + }), + "save_farm_rules_projection", + ), + )); + for window in &saved_projection.fulfillment_windows { + pending_operations.push(pending_sync_upsert( + SyncAggregateRef::FulfillmentWindow(window.fulfillment_window_id), + fulfillment_window_sync_payload(window.fulfillment_window_id, farm_id, "upsert"), + )); + } + for fulfillment_window_id in previous_fulfillment_window_ids + .difference(&current_fulfillment_window_ids) + .copied() + { + pending_operations.push(pending_sync_delete( + SyncAggregateRef::FulfillmentWindow(fulfillment_window_id), + fulfillment_window_sync_payload(fulfillment_window_id, farm_id, "delete"), + )); + } + let _ = self.enqueue_selected_account_sync_operations(pending_operations)?; Ok(saved_projection) } @@ -1801,6 +1972,213 @@ impl DesktopAppRuntimeState { Ok(self.apply_selected_account_sync_context(&context)) } + fn attempt_sync(&mut self, trigger: SyncTrigger) -> Result<bool, AppSqliteError> { + let Some(prepared) = self.prepare_sync_request(trigger)? else { + return Ok(false); + }; + + let started_at = current_utc_timestamp(); + let syncing_checkpoint = SyncCheckpointStatus::syncing( + started_at.clone(), + prepared.checkpoint.last_remote_cursor.clone(), + ); + { + let Some(sqlite_store) = self.sqlite_store.as_ref() else { + return Ok(false); + }; + sqlite_store.save_sync_checkpoint(prepared.account_id.as_str(), &syncing_checkpoint)?; + } + + let mut changed = self.refresh_selected_account_sync()?; + let request = AppSyncRequest { + trigger, + checkpoint: prepared.checkpoint.clone(), + pending_operations: prepared + .pending_operations + .iter() + .map(|stored| stored.operation.clone()) + .collect(), + known_conflicts: prepared + .conflicts + .iter() + .map(|stored| stored.conflict.clone()) + .collect(), + }; + + match self.sync_transport.sync(request) { + Ok(result) => { + changed |= self.apply_sync_result( + prepared.account_id.as_str(), + &prepared.pending_operations, + &result, + )?; + } + Err(error) => { + changed |= self.apply_sync_transport_error( + prepared.account_id.as_str(), + &prepared.checkpoint, + &prepared.pending_operations, + started_at.as_str(), + error, + )?; + } + } + + Ok(changed) + } + + fn prepare_sync_request( + &self, + trigger: SyncTrigger, + ) -> Result<Option<DesktopPreparedSyncRequest>, AppSqliteError> { + let Some(sqlite_store) = self.sqlite_store.as_ref() else { + return Ok(None); + }; + let Some(selected_account) = self + .state_store + .identity_projection() + .selected_account + .as_ref() + else { + return Ok(None); + }; + + let account_id = selected_account.account.account_id.clone(); + let checkpoint = sqlite_store.load_sync_checkpoint(account_id.as_str())?; + let conflicts = sqlite_store.load_sync_conflicts(account_id.as_str())?; + let pending_operations = sqlite_store.load_pending_sync_operations(account_id.as_str())?; + + if conflicts.iter().any(|stored| { + stored.conflict.is_unresolved() + && matches!(stored.conflict.severity, SyncConflictSeverity::Blocking) + }) { + return Ok(None); + } + + if !matches!(trigger, SyncTrigger::ManualRefresh) + && !self.has_sync_eligible_runtime_state(&checkpoint, &conflicts, &pending_operations) + { + return Ok(None); + } + + Ok(Some(DesktopPreparedSyncRequest { + account_id, + checkpoint, + conflicts, + pending_operations, + })) + } + + fn has_sync_eligible_runtime_state( + &self, + checkpoint: &SyncCheckpointStatus, + conflicts: &[StoredSyncConflict], + pending_operations: &[StoredPendingSyncOperation], + ) -> bool { + !pending_operations.is_empty() + || !conflicts.is_empty() + || *checkpoint != SyncCheckpointStatus::never_synced() + || self.selected_farm_id().is_some() + || !self + .state_store + .personal_projection() + .orders + .list + .rows + .is_empty() + || !self.state_store.orders_projection().list.rows.is_empty() + || !self.state_store.products_projection().list.rows.is_empty() + } + + fn apply_sync_result( + &mut self, + account_id: &str, + pending_operations: &[StoredPendingSyncOperation], + result: &AppSyncResult, + ) -> Result<bool, AppSqliteError> { + { + let Some(sqlite_store) = self.sqlite_store.as_ref() else { + return Ok(false); + }; + sqlite_store.save_sync_checkpoint(account_id, &result.checkpoint)?; + sqlite_store.replace_sync_conflicts(account_id, &result.conflicts)?; + + for pending in pending_operations + .iter() + .take(result.pushed_operation_count) + { + let _ = sqlite_store + .dequeue_pending_sync_operation(account_id, pending.operation_id.as_str())?; + } + } + + self.refresh_selected_account_sync() + } + + fn apply_sync_transport_error( + &mut self, + account_id: &str, + previous_checkpoint: &SyncCheckpointStatus, + pending_operations: &[StoredPendingSyncOperation], + started_at: &str, + error: AppSyncTransportError, + ) -> Result<bool, AppSqliteError> { + let failed_checkpoint = SyncCheckpointStatus::failed( + Some(started_at.to_owned()), + previous_checkpoint.last_sync_completed_at.clone(), + previous_checkpoint.last_remote_cursor.clone(), + error.to_string(), + ); + { + let Some(sqlite_store) = self.sqlite_store.as_ref() else { + return Ok(false); + }; + sqlite_store.save_sync_checkpoint(account_id, &failed_checkpoint)?; + + for pending in pending_operations { + let _ = sqlite_store.update_pending_sync_operation_retry( + account_id, + pending.operation_id.as_str(), + started_at, + pending.operation.attempt_count.saturating_add(1), + )?; + } + } + + self.refresh_selected_account_sync() + } + + fn enqueue_selected_account_sync_operations( + &mut self, + operations: Vec<PendingSyncOperation>, + ) -> Result<bool, AppSqliteError> { + if operations.is_empty() { + return Ok(false); + } + + let Some(selected_account) = self + .state_store + .identity_projection() + .selected_account + .as_ref() + else { + return Ok(false); + }; + { + let Some(sqlite_store) = self.sqlite_store.as_ref() else { + return Ok(false); + }; + for operation in &operations { + let _ = sqlite_store.enqueue_pending_sync_operation( + selected_account.account.account_id.as_str(), + operation, + )?; + } + } + + self.refresh_selected_account_sync() + } + fn selected_account_id(&self) -> Result<String, DesktopAppRuntimeFarmSetupError> { self.selected_account_for_farm_setup() .map(|account| account.account.account_id.clone()) @@ -2641,13 +3019,119 @@ fn normalize_pickup_location_defaults(pickup_locations: &mut [PickupLocationReco } } +fn current_utc_timestamp() -> String { + Utc::now().format("%Y-%m-%dT%H:%M:%SZ").to_string() +} + +fn pending_sync_upsert(aggregate: SyncAggregateRef, payload_json: String) -> PendingSyncOperation { + let created_at = current_utc_timestamp(); + + PendingSyncOperation { + aggregate, + operation: SyncOperationKind::Upsert, + payload_json, + created_at: created_at.clone(), + available_at: created_at, + attempt_count: 0, + } +} + +fn pending_sync_delete(aggregate: SyncAggregateRef, payload_json: String) -> PendingSyncOperation { + let created_at = current_utc_timestamp(); + + PendingSyncOperation { + aggregate, + operation: SyncOperationKind::Delete, + payload_json, + created_at: created_at.clone(), + available_at: created_at, + attempt_count: 0, + } +} + +fn farm_sync_payload( + farm_id: FarmId, + display_name: &str, + readiness: Option<FarmReadiness>, + source: &str, +) -> String { + json!({ + "aggregate_kind": "farm", + "farm_id": farm_id.to_string(), + "display_name": display_name, + "readiness": readiness.map(|value| match value { + FarmReadiness::Incomplete => "incomplete", + FarmReadiness::Ready => "ready", + }), + "source": source, + }) + .to_string() +} + +fn fulfillment_window_sync_payload( + fulfillment_window_id: FulfillmentWindowId, + farm_id: FarmId, + source: &str, +) -> String { + json!({ + "aggregate_kind": "fulfillment_window", + "fulfillment_window_id": fulfillment_window_id.to_string(), + "farm_id": farm_id.to_string(), + "source": source, + }) + .to_string() +} + +fn product_sync_payload( + product_id: ProductId, + farm_id: Option<FarmId>, + source: &str, + draft: Option<&ProductEditorDraft>, + stock_quantity: Option<u32>, + status: Option<&str>, +) -> String { + json!({ + "aggregate_kind": "product", + "product_id": product_id.to_string(), + "farm_id": farm_id.map(|value| value.to_string()), + "title": draft.map(|value| value.title.clone()), + "subtitle": draft.map(|value| value.subtitle.clone()), + "unit_label": draft.map(|value| value.unit_label.clone()), + "price_minor_units": draft.and_then(|value| value.price_minor_units), + "price_currency": draft.map(|value| value.price_currency.clone()), + "stock_quantity": stock_quantity.or_else(|| draft.and_then(|value| value.stock_quantity)), + "availability_window_id": draft + .and_then(|value| value.availability_window_id) + .map(|value| value.to_string()), + "status": status.or_else(|| draft.map(|value| value.status.storage_key())), + "source": source, + }) + .to_string() +} + +fn order_sync_payload( + order_id: OrderId, + farm_id: FarmId, + source: &str, + status: Option<&str>, +) -> String { + json!({ + "aggregate_kind": "order", + "order_id": order_id.to_string(), + "farm_id": farm_id.to_string(), + "status": status, + "source": source, + }) + .to_string() +} + #[cfg(test)] mod tests { use std::{ collections::BTreeSet, fs, path::PathBuf, - sync::Arc, + sync::{Arc, Mutex}, time::{SystemTime, UNIX_EPOCH}, }; @@ -2675,9 +3159,10 @@ mod tests { InMemoryAppStateRepository, }; use radroots_app_sync::{ - AppSyncRunStatus, PendingSyncOperation, SyncAggregateRef, SyncCheckpointStatus, - SyncConflict, SyncConflictKind, SyncConflictResolutionStatus, SyncConflictSeverity, - SyncOperationKind, + AppSyncRequest, AppSyncResult, AppSyncRunStatus, AppSyncTransport, AppSyncTransportError, + PendingSyncOperation, RecordedAppSyncTransport, SyncAggregateRef, SyncCheckpointState, + SyncCheckpointStatus, SyncConflict, SyncConflictKind, SyncConflictResolutionStatus, + SyncConflictSeverity, SyncOperationKind, SyncTrigger, }; use radroots_identity::RadrootsIdentity; use radroots_nostr_accounts::prelude::{ @@ -2690,9 +3175,35 @@ mod tests { use super::{ APP_DATABASE_FILE_NAME, DesktopAppRuntime, DesktopAppRuntimeActivityContextError, DesktopAppRuntimeCommandError, DesktopAppRuntimeMetadataSummary, DesktopAppRuntimeState, - DesktopAppSyncStatusSummary, DesktopRemoteSignerPaths, + DesktopAppSyncStatusSummary, DesktopRemoteSignerPaths, SYNC_TRANSPORT_UNAVAILABLE_MESSAGE, + default_sync_transport, }; + #[derive(Clone)] + struct SharedRecordedSyncTransport(Arc<Mutex<RecordedAppSyncTransport>>); + + impl AppSyncTransport for SharedRecordedSyncTransport { + fn sync( + &mut self, + request: AppSyncRequest, + ) -> Result<AppSyncResult, AppSyncTransportError> { + self.0 + .lock() + .expect("recorded sync transport lock") + .sync(request) + } + } + + fn install_recorded_sync_transport( + runtime: &DesktopAppRuntime, + transport: RecordedAppSyncTransport, + ) -> Arc<Mutex<RecordedAppSyncTransport>> { + let shared = Arc::new(Mutex::new(transport)); + runtime.lock_state_mut().sync_transport = + Box::new(SharedRecordedSyncTransport(shared.clone())); + shared + } + #[test] fn desktop_namespace_uses_canonical_app_and_shared_runtime_roots() { let paths = AppDesktopRuntimePaths::for_desktop( @@ -2749,6 +3260,7 @@ mod tests { AppSqliteStore::open(DatabaseTarget::InMemory) .expect("in-memory sqlite store should open"), ), + sync_transport: default_sync_transport(), runtime_metadata: DesktopAppRuntimeMetadataSummary::default(), selected_account_pending_sync_write_count: 0, startup_issue: None, @@ -2801,6 +3313,7 @@ mod tests { AppSqliteStore::open(DatabaseTarget::InMemory) .expect("in-memory sqlite store should open"), ), + sync_transport: default_sync_transport(), runtime_metadata: DesktopAppRuntimeMetadataSummary::default(), selected_account_pending_sync_write_count: 0, startup_issue: None, @@ -2928,6 +3441,279 @@ mod tests { } #[test] + fn runtime_local_product_mutations_enqueue_pending_sync_without_transport_calls() { + let runtime = memory_runtime(); + let (account_id, _) = provision_ready_farmer_account(&runtime); + let recorded = install_recorded_sync_transport( + &runtime, + RecordedAppSyncTransport::succeed(AppSyncResult { + run_status: AppSyncRunStatus::Succeeded, + checkpoint: SyncCheckpointStatus::current( + None, + "2026-04-20T19:30:00Z", + Some("cursor-product".to_owned()), + ), + pushed_operation_count: 1, + pulled_record_count: 0, + conflicts: Vec::new(), + }), + ); + + assert!( + runtime + .open_new_product_editor() + .expect("new product editor should open") + ); + + let summary = runtime.summary(); + let pending_operations = runtime + .lock_state() + .sqlite_store + .as_ref() + .expect("sqlite store") + .load_pending_sync_operations(account_id.as_str()) + .expect("pending sync operations should load"); + + assert_eq!(recorded.lock().expect("recorded transport").call_count(), 0); + assert_eq!(summary.sync_status.pending_write_count, 1); + assert_eq!(pending_operations.len(), 1); + assert!(matches!( + pending_operations[0].operation.aggregate, + SyncAggregateRef::Product(_) + )); + } + + #[test] + fn runtime_launch_sync_attempt_dequeues_pushed_operations() { + let runtime = memory_runtime(); + let (account_id, _) = provision_ready_farmer_account(&runtime); + + assert!( + runtime + .open_new_product_editor() + .expect("new product editor should open") + ); + + let recorded = install_recorded_sync_transport( + &runtime, + RecordedAppSyncTransport::succeed(AppSyncResult { + run_status: AppSyncRunStatus::Succeeded, + checkpoint: SyncCheckpointStatus::current( + Some("2026-04-20T19:40:00Z".to_owned()), + "2026-04-20T19:40:05Z", + Some("cursor-launch".to_owned()), + ), + pushed_operation_count: 1, + pulled_record_count: 0, + conflicts: Vec::new(), + }), + ); + + assert!( + runtime + .sync_on_app_launch() + .expect("launch sync should succeed") + ); + + let summary = runtime.summary(); + let recorded = recorded.lock().expect("recorded transport"); + let request = recorded + .last_request() + .cloned() + .expect("launch sync request should record"); + + assert_eq!(recorded.call_count(), 1); + assert_eq!(request.trigger, SyncTrigger::AppLaunch); + assert_eq!(request.pending_operations.len(), 1); + assert_eq!(summary.sync_status.pending_write_count, 0); + assert_eq!( + summary.sync_status.projection.run_status, + AppSyncRunStatus::Succeeded + ); + assert_eq!( + summary.sync_status.projection.checkpoint.state, + SyncCheckpointState::Current + ); + assert_eq!( + runtime + .lock_state() + .sqlite_store + .as_ref() + .expect("sqlite store") + .load_pending_sync_operations(account_id.as_str()) + .expect("pending sync operations should load") + .len(), + 0 + ); + } + + #[test] + fn runtime_foreground_resume_sync_uses_the_resume_trigger() { + let runtime = memory_runtime(); + let (_, _) = provision_ready_farmer_account(&runtime); + + assert!( + runtime + .open_new_product_editor() + .expect("new product editor should open") + ); + + let recorded = install_recorded_sync_transport( + &runtime, + RecordedAppSyncTransport::succeed(AppSyncResult { + run_status: AppSyncRunStatus::Succeeded, + checkpoint: SyncCheckpointStatus::current( + Some("2026-04-20T19:50:00Z".to_owned()), + "2026-04-20T19:50:03Z", + Some("cursor-resume".to_owned()), + ), + pushed_operation_count: 1, + pulled_record_count: 0, + conflicts: Vec::new(), + }), + ); + + assert!( + runtime + .sync_on_foreground_resume() + .expect("resume sync should succeed") + ); + + let request = recorded + .lock() + .expect("recorded transport") + .last_request() + .cloned() + .expect("resume sync request should record"); + + assert_eq!(request.trigger, SyncTrigger::ForegroundResume); + } + + #[test] + fn runtime_manual_refresh_marks_failed_checkpoint_when_transport_is_unavailable() { + let runtime = memory_runtime(); + let (account_id, _) = provision_ready_farmer_account(&runtime); + + assert!( + runtime + .open_new_product_editor() + .expect("new product editor should open") + ); + + assert!( + runtime + .sync_on_manual_refresh() + .expect("manual refresh should complete") + ); + + let summary = runtime.summary(); + let pending_operations = runtime + .lock_state() + .sqlite_store + .as_ref() + .expect("sqlite store") + .load_pending_sync_operations(account_id.as_str()) + .expect("pending sync operations should load"); + + assert_eq!( + summary.sync_status.projection.run_status, + AppSyncRunStatus::Failed + ); + assert_eq!( + summary.sync_status.projection.checkpoint.state, + SyncCheckpointState::Failed + ); + assert_eq!(summary.sync_status.pending_write_count, 1); + assert!( + summary + .sync_status + .projection + .checkpoint + .last_error_message + .as_deref() + .is_some_and(|message| { message.contains(SYNC_TRANSPORT_UNAVAILABLE_MESSAGE) }) + ); + assert_eq!(pending_operations.len(), 1); + assert_eq!(pending_operations[0].operation.attempt_count, 1); + } + + #[test] + fn runtime_sync_attempts_stop_when_blocking_conflicts_are_present() { + let runtime = memory_runtime(); + let (account_id, farm_id) = provision_ready_farmer_account(&runtime); + + assert!( + runtime + .open_new_product_editor() + .expect("new product editor should open") + ); + + runtime + .lock_state() + .sqlite_store + .as_ref() + .expect("sqlite store") + .record_sync_conflict( + account_id.as_str(), + &SyncConflict { + aggregate: SyncAggregateRef::Farm(farm_id), + kind: SyncConflictKind::RevisionMismatch, + severity: SyncConflictSeverity::Blocking, + resolution: SyncConflictResolutionStatus::Unresolved, + local_payload_json: "{\"farm\":\"local\"}".to_owned(), + remote_payload_json: Some("{\"farm\":\"remote\"}".to_owned()), + detected_at: "2026-04-20T20:00:00Z".to_owned(), + resolved_at: None, + }, + ) + .expect("blocking conflict should save"); + assert!( + runtime + .lock_state_mut() + .refresh_selected_account_sync() + .expect("sync status should refresh") + ); + + let recorded = install_recorded_sync_transport( + &runtime, + RecordedAppSyncTransport::succeed(AppSyncResult { + run_status: AppSyncRunStatus::Succeeded, + checkpoint: SyncCheckpointStatus::current( + None, + "2026-04-20T20:00:05Z", + Some("cursor-blocked".to_owned()), + ), + pushed_operation_count: 1, + pulled_record_count: 0, + conflicts: Vec::new(), + }), + ); + + assert!( + !runtime + .sync_on_app_launch() + .expect("blocked launch sync should skip") + ); + + let summary = runtime.summary(); + + assert_eq!(recorded.lock().expect("recorded transport").call_count(), 0); + assert_eq!(summary.sync_status.pending_write_count, 1); + assert_eq!( + summary.sync_status.projection.run_status, + AppSyncRunStatus::Conflicted + ); + assert_eq!( + summary + .sync_status + .projection + .conflict_status + .blocking_count, + 1 + ); + } + + #[test] fn runtime_summary_surfaces_runtime_metadata_from_bootstrap() { let (runtime, paths) = bootstrapped_runtime("runtime_metadata"); let summary = runtime.summary(); @@ -2970,6 +3756,7 @@ mod tests { AppSqliteStore::open(DatabaseTarget::InMemory) .expect("in-memory sqlite store should open"), ), + sync_transport: default_sync_transport(), runtime_metadata: DesktopAppRuntimeMetadataSummary::default(), selected_account_pending_sync_write_count: 0, startup_issue: None, @@ -2999,6 +3786,7 @@ mod tests { AppSqliteStore::open(DatabaseTarget::InMemory) .expect("in-memory sqlite store should open"), ), + sync_transport: default_sync_transport(), runtime_metadata: DesktopAppRuntimeMetadataSummary::default(), selected_account_pending_sync_write_count: 0, startup_issue: None, @@ -3097,6 +3885,7 @@ mod tests { AppSqliteStore::open(DatabaseTarget::InMemory) .expect("in-memory sqlite store should open"), ), + sync_transport: default_sync_transport(), runtime_metadata: DesktopAppRuntimeMetadataSummary::default(), selected_account_pending_sync_write_count: 0, startup_issue: None, @@ -3195,6 +3984,7 @@ mod tests { AppSqliteStore::open(DatabaseTarget::InMemory) .expect("in-memory sqlite store should open"), ), + sync_transport: default_sync_transport(), runtime_metadata: DesktopAppRuntimeMetadataSummary::default(), selected_account_pending_sync_write_count: 0, startup_issue: None, @@ -3246,6 +4036,7 @@ mod tests { AppSqliteStore::open(DatabaseTarget::InMemory) .expect("in-memory sqlite store should open"), ), + sync_transport: default_sync_transport(), runtime_metadata: DesktopAppRuntimeMetadataSummary::default(), selected_account_pending_sync_write_count: 0, startup_issue: None, @@ -3281,6 +4072,7 @@ mod tests { AppSqliteStore::open(DatabaseTarget::InMemory) .expect("in-memory sqlite store should open"), ), + sync_transport: default_sync_transport(), runtime_metadata: DesktopAppRuntimeMetadataSummary::default(), selected_account_pending_sync_write_count: 0, startup_issue: None, @@ -3314,6 +4106,7 @@ mod tests { AppSqliteStore::open(DatabaseTarget::InMemory) .expect("in-memory sqlite store should open"), ), + sync_transport: default_sync_transport(), runtime_metadata: DesktopAppRuntimeMetadataSummary::default(), selected_account_pending_sync_write_count: 0, startup_issue: None, @@ -5518,6 +6311,7 @@ mod tests { AppSqliteStore::open(DatabaseTarget::InMemory) .expect("in-memory sqlite store should open"), ), + sync_transport: default_sync_transport(), runtime_metadata: DesktopAppRuntimeMetadataSummary::default(), selected_account_pending_sync_write_count: 0, startup_issue: None, @@ -5551,6 +6345,7 @@ mod tests { AppSqliteStore::open(DatabaseTarget::InMemory) .expect("in-memory sqlite store should open"), ), + sync_transport: default_sync_transport(), runtime_metadata: DesktopAppRuntimeMetadataSummary::default(), selected_account_pending_sync_write_count: 0, startup_issue: None, @@ -5582,6 +6377,7 @@ mod tests { AppSqliteStore::open(DatabaseTarget::InMemory) .expect("in-memory sqlite store should open"), ), + sync_transport: default_sync_transport(), runtime_metadata: DesktopAppRuntimeMetadataSummary::default(), selected_account_pending_sync_write_count: 0, startup_issue: None, diff --git a/crates/shared/sqlite/src/lib.rs b/crates/shared/sqlite/src/lib.rs @@ -416,6 +416,15 @@ impl AppSqliteStore { self.sync_repository().record_conflict(account_id, conflict) } + pub fn replace_sync_conflicts( + &self, + account_id: &str, + conflicts: &[SyncConflict], + ) -> Result<(), AppSqliteError> { + self.sync_repository() + .replace_conflicts(account_id, conflicts) + } + pub fn load_sync_conflicts( &self, account_id: &str, diff --git a/crates/shared/sqlite/src/sync.rs b/crates/shared/sqlite/src/sync.rs @@ -329,6 +329,28 @@ impl<'a> AppSyncRepository<'a> { Ok(conflict_id) } + pub fn replace_conflicts( + &self, + account_id: &str, + conflicts: &[SyncConflict], + ) -> Result<(), AppSqliteError> { + self.connection + .execute( + "DELETE FROM local_conflicts WHERE account_id = ?1", + [account_id], + ) + .map_err(|source| AppSqliteError::Query { + operation: "clear sync conflicts", + source, + })?; + + for conflict in conflicts { + let _ = self.record_conflict(account_id, conflict)?; + } + + Ok(()) + } + pub fn load_conflicts( &self, account_id: &str, @@ -779,4 +801,53 @@ mod tests { assert_eq!(acct_b.len(), 1); assert_eq!(acct_b[0].conflict, second); } + + #[test] + fn replacing_conflicts_clears_stale_rows_for_the_selected_account() { + let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("store should open"); + let repository = store.sync_repository(); + let first = SyncConflict { + aggregate: SyncAggregateRef::Farm(FarmId::new()), + kind: SyncConflictKind::RevisionMismatch, + severity: SyncConflictSeverity::Blocking, + resolution: SyncConflictResolutionStatus::Unresolved, + local_payload_json: "{\"farm\":\"local\"}".to_owned(), + remote_payload_json: Some("{\"farm\":\"remote\"}".to_owned()), + detected_at: "2026-04-20T18:00:00Z".to_owned(), + resolved_at: None, + }; + let second = SyncConflict { + aggregate: SyncAggregateRef::Product(ProductId::new()), + kind: SyncConflictKind::RemoteValidationReject, + severity: SyncConflictSeverity::ReviewRequired, + resolution: SyncConflictResolutionStatus::Unresolved, + local_payload_json: "{\"product\":\"local\"}".to_owned(), + remote_payload_json: None, + detected_at: "2026-04-20T18:05:00Z".to_owned(), + resolved_at: None, + }; + + repository + .record_conflict("acct_a", &first) + .expect("first conflict should save"); + repository + .record_conflict("acct_b", &first) + .expect("other account conflict should save"); + + repository + .replace_conflicts("acct_a", std::slice::from_ref(&second)) + .expect("conflicts should replace"); + + let acct_a = repository + .load_conflicts("acct_a") + .expect("account conflicts should load"); + let acct_b = repository + .load_conflicts("acct_b") + .expect("other account conflicts should load"); + + assert_eq!(acct_a.len(), 1); + assert_eq!(acct_a[0].conflict, second); + assert_eq!(acct_b.len(), 1); + assert_eq!(acct_b[0].conflict, first); + } }