app

Local-first trade for farms and co-ops
git clone https://radroots.dev/git/app.git
Log | Files | Refs | README | LICENSE

commit 51d995a8d4dcc2f1e3c1d79ee1025cd094526fd1
parent 2fa1bd6d4874c7954832938c3c23fcd0548ae359
Author: triesap <tyson@radroots.org>
Date:   Mon, 25 May 2026 20:06:47 +0000

sync: run relay ingest after outbound failure

- move configured relay ingest into a shared post-sync path
- preserve failed checkpoints and retry state for outbound transport errors
- refresh local projections when relay ingest imports records after failure
- cover manual refresh launch and foreground failure ingest paths

Diffstat:
Mcrates/launchers/desktop/src/runtime.rs | 227+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------
1 file changed, 187 insertions(+), 40 deletions(-)

diff --git a/crates/launchers/desktop/src/runtime.rs b/crates/launchers/desktop/src/runtime.rs @@ -3269,28 +3269,8 @@ impl DesktopAppRuntimeState { match self.run_sync_transport_or_relay_only(request, started_at.as_str()) { Ok(mut result) => { - let mut relay_context_changed = false; - if self.has_configured_relay_ingest() { - match self.ingest_configured_relay_events() { - Ok(report) => { - result.pulled_record_count = result - .pulled_record_count - .saturating_add(report.scanned_records as usize); - relay_context_changed = - report.imported_records > 0 || report.skipped_records > 0; - } - Err(AppDirectRelayIngestError::Sqlite(error)) => return Err(error), - Err(AppDirectRelayIngestError::Transport(error)) => { - result.run_status = AppSyncRunStatus::Failed; - result.checkpoint = SyncCheckpointStatus::failed( - Some(started_at.clone()), - Some(current_utc_timestamp()), - result.checkpoint.last_remote_cursor.clone(), - error.to_string(), - ); - } - } - } + let relay_context_changed = + self.ingest_configured_relay_events_for_sync(Some(&mut result), &started_at)?; changed |= self.apply_sync_result( prepared.account_id.as_str(), &prepared.pending_operations, @@ -3308,6 +3288,11 @@ impl DesktopAppRuntimeState { started_at.as_str(), error, )?; + let relay_context_changed = + self.ingest_configured_relay_events_for_sync(None, &started_at)?; + if relay_context_changed { + changed |= self.refresh_selected_account_context_after_local_events()?; + } } } @@ -3363,6 +3348,39 @@ impl DesktopAppRuntimeState { .map_err(AppDirectRelayIngestError::from) } + fn ingest_configured_relay_events_for_sync( + &mut self, + mut result: Option<&mut AppSyncResult>, + started_at: &str, + ) -> Result<bool, AppSqliteError> { + if !self.has_configured_relay_ingest() { + return Ok(false); + } + match self.ingest_configured_relay_events() { + Ok(report) => { + if let Some(result) = result.as_mut() { + result.pulled_record_count = result + .pulled_record_count + .saturating_add(report.scanned_records as usize); + } + Ok(report.imported_records > 0 || report.skipped_records > 0) + } + Err(AppDirectRelayIngestError::Sqlite(error)) => Err(error), + Err(AppDirectRelayIngestError::Transport(error)) => { + if let Some(result) = result.as_mut() { + result.run_status = AppSyncRunStatus::Failed; + result.checkpoint = SyncCheckpointStatus::failed( + Some(started_at.to_owned()), + Some(current_utc_timestamp()), + result.checkpoint.last_remote_cursor.clone(), + error.to_string(), + ); + } + Ok(false) + } + } + } + fn prepare_sync_request( &self, trigger: SyncTrigger, @@ -7849,6 +7867,29 @@ mod tests { #[test] fn runtime_configured_relay_sync_triggers_ingest_listing_into_fresh_buyer_projection() { let relay = ThreadedAckRelay::spawn(); + let product_id = publish_relay_ingest_listing_fixture(&relay); + + assert_fresh_buyer_relay_ingest( + relay.url(), + "relay_ingest_manual_refresh", + SyncTrigger::ManualRefresh, + product_id, + ); + assert_fresh_buyer_relay_ingest( + relay.url(), + "relay_ingest_app_launch", + SyncTrigger::AppLaunch, + product_id, + ); + assert_fresh_buyer_relay_ingest( + relay.url(), + "relay_ingest_foreground_resume", + SyncTrigger::ForegroundResume, + product_id, + ); + } + + fn publish_relay_ingest_listing_fixture(relay: &ThreadedAckRelay) -> ProductId { let manager = RadrootsNostrAccountsManager::new_in_memory(); let account_id = manager .generate_identity(Some("Farmer".to_owned()), true) @@ -7910,24 +7951,7 @@ mod tests { assert_eq!(result.run_status, AppSyncRunStatus::Succeeded); assert_eq!(result.published_receipts.len(), 2); - assert_fresh_buyer_relay_ingest( - relay.url(), - "relay_ingest_manual_refresh", - SyncTrigger::ManualRefresh, - product_id, - ); - assert_fresh_buyer_relay_ingest( - relay.url(), - "relay_ingest_app_launch", - SyncTrigger::AppLaunch, - product_id, - ); - assert_fresh_buyer_relay_ingest( - relay.url(), - "relay_ingest_foreground_resume", - SyncTrigger::ForegroundResume, - product_id, - ); + product_id } fn assert_fresh_buyer_relay_ingest( @@ -7992,6 +8016,129 @@ mod tests { } #[test] + fn runtime_relay_ingest_runs_after_outbound_sync_failure() { + let relay = ThreadedAckRelay::spawn(); + let product_id = publish_relay_ingest_listing_fixture(&relay); + + assert_relay_ingest_after_outbound_failure( + relay.url(), + "relay_ingest_after_manual_failure", + SyncTrigger::ManualRefresh, + product_id, + ); + assert_relay_ingest_after_outbound_failure( + relay.url(), + "relay_ingest_after_launch_failure", + SyncTrigger::AppLaunch, + product_id, + ); + assert_relay_ingest_after_outbound_failure( + relay.url(), + "relay_ingest_after_foreground_failure", + SyncTrigger::ForegroundResume, + product_id, + ); + } + + fn assert_relay_ingest_after_outbound_failure( + relay_url: &str, + label: &str, + trigger: SyncTrigger, + product_id: ProductId, + ) { + let (runtime, paths) = bootstrapped_runtime(label); + assert!( + runtime + .generate_local_account(Some("Buyer".to_owned())) + .expect("buyer account should generate") + ); + runtime.lock_state_mut().nostr_relay_urls = vec![relay_url.to_owned()]; + let buyer_account_id = runtime + .summary() + .settings_account_projection + .selected_account + .as_ref() + .expect("selected account") + .account + .account_id + .clone(); + let pending_farm_id = FarmId::new(); + runtime + .lock_state_mut() + .enqueue_selected_account_sync_operations(vec![pending_sync_upsert( + SyncAggregateRef::Farm(pending_farm_id), + farm_sync_payload( + pending_farm_id, + "Pending outbound farm", + Some(FarmReadiness::Ready), + "relay_ingest_after_outbound_failure", + ), + )]) + .expect("pending sync should enqueue"); + let recorded = install_recorded_sync_transport( + &runtime, + RecordedAppSyncTransport::fail(AppSyncTransportError::unavailable( + "test outbound sync unavailable", + )), + ); + + let changed = match trigger { + SyncTrigger::ManualRefresh => runtime + .sync_on_manual_refresh() + .expect("manual refresh should complete"), + SyncTrigger::AppLaunch => runtime + .sync_on_app_launch() + .expect("launch sync should complete"), + SyncTrigger::ForegroundResume => runtime + .sync_on_foreground_resume() + .expect("foreground sync should complete"), + SyncTrigger::LocalMutation => panic!("local mutation is not a relay ingest trigger"), + }; + + assert!(changed); + assert_eq!(recorded.lock().expect("recorded transport").call_count(), 1); + let summary = runtime.summary(); + assert_eq!( + summary.sync_status.projection.run_status, + AppSyncRunStatus::Failed + ); + assert_eq!( + summary.sync_status.projection.checkpoint.state, + SyncCheckpointState::Failed + ); + assert_eq!(summary.sync_status.pending_write_count, 1); + let listing = summary + .personal_projection + .browse + .listings + .rows + .iter() + .find(|listing| listing.product_id == product_id) + .expect("relay listing should still project after outbound failure"); + assert_eq!(listing.title, "Relay ingest lettuce"); + assert_eq!(listing.listing_relays, vec![relay_url.to_owned()]); + + let pending_operations = runtime + .lock_state() + .sqlite_store + .as_ref() + .expect("sqlite store") + .load_pending_sync_operations(buyer_account_id.as_str()) + .expect("pending sync operations should load"); + assert_eq!(pending_operations.len(), 1); + assert_eq!(pending_operations[0].operation.attempt_count, 1); + assert!( + pending_operations[0] + .operation + .last_error_message + .as_deref() + .is_some_and(|message| message.contains("test outbound sync unavailable")) + ); + + cleanup_bootstrapped_runtime_paths(&paths); + } + + #[test] fn runtime_direct_relay_transport_returns_partial_failure_after_successful_prefix() { let relay = ThreadedAckRelay::spawn(); let manager = RadrootsNostrAccountsManager::new_in_memory();