app

Local-first trade for farms and co-ops
git clone https://radroots.dev/git/app.git
Log | Files | Refs | README | LICENSE

commit ad6d06f930714a9d378239afd2434be5ea568f61
parent 0d194b9c2846be818ff905173dfabed4505a8648
Author: triesap <tyson@radroots.org>
Date:   Mon, 25 May 2026 21:03:24 +0000

sync: persist relay ingest freshness

- add app relay ingest freshness contracts and SQLite persistence
- fetch configured relays with durable per-relay cursors
- surface fresh, stale, partial, and failed relay ingest state internally
- cover idempotent relay refresh and cursor persistence paths

Diffstat:
Mcrates/launchers/desktop/src/runtime.rs | 413+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------
Acrates/shared/sqlite/migrations/0019_relay_ingest_freshness.sql | 19+++++++++++++++++++
Mcrates/shared/sqlite/src/lib.rs | 92+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
Mcrates/shared/sqlite/src/migrations.rs | 4++++
Mcrates/shared/sqlite/src/sync.rs | 407+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
Mcrates/shared/sync/src/lib.rs | 48++++++++++++++++++++++++++++++++++++++++++++++++
6 files changed, 902 insertions(+), 81 deletions(-)

diff --git a/crates/launchers/desktop/src/runtime.rs b/crates/launchers/desktop/src/runtime.rs @@ -1,4 +1,4 @@ -use std::collections::BTreeSet; +use std::collections::{BTreeMap, BTreeSet}; use std::fmt; use std::fs; use std::path::PathBuf; @@ -36,7 +36,8 @@ use radroots_app_remote_signer::{ use radroots_app_sqlite::{ APP_ACTIVITY_CONTEXT_LIMIT, AppLocalInteropImportReport, AppSqliteError, AppSqliteStore, BuyerOrderLocalEventExport, BuyerOrderLocalEventLine, BuyerRepeatDemandApplyOutcome, - DatabaseTarget, StoredPendingSyncOperation, StoredSyncConflict, derive_farm_rules_readiness, + DatabaseTarget, StoredPendingSyncOperation, StoredRelayIngestCursor, StoredSyncConflict, + derive_farm_rules_readiness, }; use radroots_app_state::{ APP_STATE_FILE_NAME, AppShellProjection, AppStateCommand, AppStatePersistenceRepository, @@ -51,8 +52,8 @@ use radroots_app_state::{ use radroots_app_sync::{ AppFarmProfilePublishPayload, AppListingPublishPayload, AppOrderRequestItemPayload, AppOrderRequestPublishPayload, AppPublishContext, AppPublishPayload, - AppPublishedOperationReceipt, AppSyncProjection, AppSyncRequest, AppSyncResult, - AppSyncRunStatus, AppSyncTransport, AppSyncTransportError, PendingSyncOperation, + AppPublishedOperationReceipt, AppRelayIngestScopeFreshness, AppSyncProjection, AppSyncRequest, + AppSyncResult, AppSyncRunStatus, AppSyncTransport, AppSyncTransportError, PendingSyncOperation, SyncAggregateRef, SyncCheckpointStatus, SyncConflictSeverity, SyncOperationKind, SyncTrigger, }; use radroots_core::{ @@ -118,6 +119,8 @@ const APP_DIRECT_RELAY_SYNC_TIMEOUT_MS: u64 = 2_000; const APP_DIRECT_RELAY_CONNECT_TIMEOUT: StdDuration = StdDuration::from_secs(10); const APP_DIRECT_RELAY_INGEST_LIMIT: usize = 1_000; const APP_DIRECT_RELAY_INGEST_MAX_PAGES: usize = 5; +const APP_DIRECT_RELAY_INGEST_SCOPE_KEY: &str = "direct_relay_ingest"; +const APP_DIRECT_RELAY_INGEST_STALE_AFTER_SECONDS: i64 = 900; const APP_DIRECT_RELAY_INGEST_KINDS: &[u16] = &[ 0, 30340, 30402, 30403, 3422, 3423, 3424, 3425, 3432, 3433, 3434, ]; @@ -142,9 +145,23 @@ struct AppDirectRelayFetchReceipt { target_relays: Vec<String>, connected_relays: Vec<String>, failed_relays: Vec<RelayDeliveryFailure>, + fetched_relays: Vec<AppDirectRelayFetchedRelay>, + event_observed_relays: BTreeMap<String, Vec<String>>, events: Vec<RadrootsNostrEvent>, } +#[derive(Debug, Clone)] +struct AppDirectRelayFetchedRelay { + relay_url: String, + last_event_created_at_unix_seconds: Option<i64>, +} + +#[derive(Debug, Default)] +struct AppDirectRelayIngestReport { + local_import: AppLocalInteropImportReport, + freshness_changed: bool, +} + #[derive(Debug, Error)] enum AppDirectRelayIngestError { #[error(transparent)] @@ -1102,6 +1119,7 @@ struct DesktopReminderSyncTruth { #[derive(Clone, Debug, Default, Eq, PartialEq)] struct DesktopSelectedAccountSyncContext { projection: AppSyncProjection, + relay_ingest: AppRelayIngestScopeFreshness, pending_write_count: usize, conflicts: Vec<DesktopAppSyncConflictSummary>, } @@ -1124,6 +1142,7 @@ struct DesktopAppRuntimeState { sync_transport: Box<dyn AppSyncTransport + Send>, runtime_metadata: DesktopAppRuntimeMetadataSummary, selected_account_pending_sync_write_count: usize, + selected_account_relay_ingest_freshness: AppRelayIngestScopeFreshness, selected_account_sync_conflicts: Vec<DesktopAppSyncConflictSummary>, startup_issue: Option<String>, } @@ -1156,6 +1175,10 @@ impl fmt::Debug for DesktopAppRuntimeState { &self.selected_account_pending_sync_write_count, ) .field( + "selected_account_relay_ingest_freshness", + &self.selected_account_relay_ingest_freshness, + ) + .field( "selected_account_sync_conflicts", &self.selected_account_sync_conflicts, ) @@ -1213,8 +1236,11 @@ impl DesktopAppRuntimeState { )?; let selected_account_context = load_selected_account_context(&sqlite_store, &identity_projection, &continuity_state)?; - let selected_account_sync_context = - load_selected_account_sync_context(&sqlite_store, &identity_projection)?; + let selected_account_sync_context = load_selected_account_sync_context( + &sqlite_store, + &identity_projection, + &nostr_relay_urls, + )?; let _ = state_store.apply_in_memory(AppStateCommand::replace_identity_projection( identity_projection.clone(), )); @@ -1224,6 +1250,8 @@ impl DesktopAppRuntimeState { let _ = state_store.apply_in_memory(AppStateCommand::show_startup_signer_entry()); } let pending_sync_write_count = selected_account_sync_context.pending_write_count; + let selected_account_relay_ingest_freshness = + selected_account_sync_context.relay_ingest.clone(); let selected_account_sync_conflicts = selected_account_sync_context.conflicts; let _ = state_store.apply_in_memory(AppStateCommand::replace_sync_projection( selected_account_sync_context.projection, @@ -1251,6 +1279,7 @@ impl DesktopAppRuntimeState { database_schema_version, ), selected_account_pending_sync_write_count: pending_sync_write_count, + selected_account_relay_ingest_freshness, selected_account_sync_conflicts, startup_issue: None, }; @@ -1287,6 +1316,7 @@ impl DesktopAppRuntimeState { sync_transport: default_sync_transport(), runtime_metadata: DesktopAppRuntimeMetadataSummary::unavailable(runtime_snapshot), selected_account_pending_sync_write_count: 0, + selected_account_relay_ingest_freshness: AppRelayIngestScopeFreshness::default(), selected_account_sync_conflicts: Vec::new(), startup_issue: Some(error.to_string()), } @@ -2920,8 +2950,11 @@ impl DesktopAppRuntimeState { let continuity_state = self.continuity_state(); let selected_account_context = load_selected_account_context(self.sqlite_store()?, &projection, &continuity_state)?; - let selected_account_sync_context = - load_selected_account_sync_context(self.sqlite_store()?, &projection)?; + let selected_account_sync_context = load_selected_account_sync_context( + self.sqlite_store()?, + &projection, + &self.nostr_relay_urls, + )?; let identity_changed = self .state_store .apply_in_memory(AppStateCommand::replace_identity_projection(projection)); @@ -3090,7 +3123,11 @@ impl DesktopAppRuntimeState { return Ok(DesktopSelectedAccountSyncContext::default()); }; - load_selected_account_sync_context(sqlite_store, self.state_store.identity_projection()) + load_selected_account_sync_context( + sqlite_store, + self.state_store.identity_projection(), + &self.nostr_relay_urls, + ) } fn apply_selected_account_sync_context( @@ -3104,12 +3141,15 @@ impl DesktopAppRuntimeState { )); let pending_changed = self.selected_account_pending_sync_write_count != context.pending_write_count; + let relay_ingest_changed = + self.selected_account_relay_ingest_freshness != context.relay_ingest; let conflicts_changed = self.selected_account_sync_conflicts != context.conflicts; self.selected_account_pending_sync_write_count = context.pending_write_count; + self.selected_account_relay_ingest_freshness = context.relay_ingest.clone(); self.selected_account_sync_conflicts = context.conflicts.clone(); - projection_changed || pending_changed || conflicts_changed + projection_changed || pending_changed || relay_ingest_changed || conflicts_changed } fn refresh_selected_account_sync(&mut self) -> Result<bool, AppSqliteError> { @@ -3291,6 +3331,7 @@ impl DesktopAppRuntimeState { let relay_context_changed = self.ingest_configured_relay_events_for_sync(None, &started_at)?; if relay_context_changed { + changed |= self.refresh_selected_account_sync()?; changed |= self.refresh_selected_account_context_after_local_events()?; } } @@ -3330,22 +3371,92 @@ impl DesktopAppRuntimeState { fn ingest_configured_relay_events( &self, - ) -> Result<AppLocalInteropImportReport, AppDirectRelayIngestError> { + ) -> Result<AppDirectRelayIngestReport, AppDirectRelayIngestError> { let Some(sqlite_store) = self.sqlite_store.as_ref() else { - return Ok(AppLocalInteropImportReport::default()); + return Ok(AppDirectRelayIngestReport::default()); }; let relay_urls = normalized_app_relay_ingest_urls(&self.nostr_relay_urls)?; if relay_urls.is_empty() { - return Ok(AppLocalInteropImportReport::default()); + return Ok(AppDirectRelayIngestReport::default()); + } + let started_at = current_utc_timestamp(); + let started_unix_seconds = current_runtime_time_seconds()?; + let cursors = sqlite_store + .load_relay_ingest_cursors(APP_DIRECT_RELAY_INGEST_SCOPE_KEY, &relay_urls)?; + let receipt = fetch_app_events_from_relays_windowed(cursors.as_slice())?; + let completed_at = current_utc_timestamp(); + let completed_unix_seconds = current_runtime_time_seconds()?; + self.record_relay_ingest_freshness( + &receipt, + started_at.as_str(), + started_unix_seconds, + completed_at.as_str(), + completed_unix_seconds, + )?; + if receipt.connected_relays.is_empty() { + return Err(AppSyncTransportError::unavailable(format!( + "direct relay app ingest connection failed: {}", + summarize_app_relay_failures(&receipt.failed_relays) + )) + .into()); } - let receipt = fetch_app_events_from_relays_windowed(&relay_urls)?; if receipt.events.is_empty() { - return Ok(AppLocalInteropImportReport::default()); + return Ok(AppDirectRelayIngestReport { + local_import: AppLocalInteropImportReport::default(), + freshness_changed: true, + }); } let records = direct_relay_event_records(&receipt, current_runtime_time_ms()?)?; - sqlite_store + let local_import = sqlite_store .import_local_event_records(records.as_slice()) - .map_err(AppDirectRelayIngestError::from) + .map_err(AppDirectRelayIngestError::from)?; + Ok(AppDirectRelayIngestReport { + local_import, + freshness_changed: true, + }) + } + + fn record_relay_ingest_freshness( + &self, + receipt: &AppDirectRelayFetchReceipt, + started_at: &str, + started_unix_seconds: i64, + completed_at: &str, + completed_unix_seconds: i64, + ) -> Result<(), AppSqliteError> { + let Some(sqlite_store) = self.sqlite_store.as_ref() else { + return Ok(()); + }; + for relay in &receipt.fetched_relays { + let cursor_since_unix_seconds = relay + .last_event_created_at_unix_seconds + .map_or(started_unix_seconds, |last_event_created_at| { + started_unix_seconds.max(last_event_created_at) + }); + sqlite_store.record_relay_ingest_success( + APP_DIRECT_RELAY_INGEST_SCOPE_KEY, + relay.relay_url.as_str(), + cursor_since_unix_seconds, + relay.last_event_created_at_unix_seconds, + started_at, + started_unix_seconds, + completed_at, + completed_unix_seconds, + )?; + } + for failure in &receipt.failed_relays { + sqlite_store.record_relay_ingest_failure( + APP_DIRECT_RELAY_INGEST_SCOPE_KEY, + failure.relay_url.as_str(), + started_at, + started_unix_seconds, + completed_at, + completed_unix_seconds, + failure.error.as_str(), + )?; + } + + Ok(()) } fn ingest_configured_relay_events_for_sync( @@ -3361,9 +3472,11 @@ impl DesktopAppRuntimeState { if let Some(result) = result.as_mut() { result.pulled_record_count = result .pulled_record_count - .saturating_add(report.scanned_records as usize); + .saturating_add(report.local_import.scanned_records as usize); } - Ok(report.imported_records > 0 || report.skipped_records > 0) + Ok(report.freshness_changed + || report.local_import.imported_records > 0 + || report.local_import.skipped_records > 0) } Err(AppDirectRelayIngestError::Sqlite(error)) => Err(error), Err(AppDirectRelayIngestError::Transport(error)) => { @@ -3376,7 +3489,7 @@ impl DesktopAppRuntimeState { error.to_string(), ); } - Ok(false) + Ok(true) } } } @@ -4973,6 +5086,17 @@ fn current_runtime_time_ms() -> Result<i64, AppSqliteError> { }) } +fn current_runtime_time_seconds() -> Result<i64, AppSqliteError> { + let duration = SystemTime::now().duration_since(UNIX_EPOCH).map_err(|_| { + AppSqliteError::InvalidProjection { + reason: "current runtime timestamp must be after unix epoch", + } + })?; + i64::try_from(duration.as_secs()).map_err(|_| AppSqliteError::InvalidProjection { + reason: "current runtime timestamp must fit i64 seconds", + }) +} + fn normalized_app_sync_relay_urls( relay_urls: &[String], ) -> Result<Vec<String>, AppSyncTransportError> { @@ -4997,14 +5121,53 @@ fn normalized_app_relay_ingest_urls(relay_urls: &[String]) -> Result<Vec<String> } fn fetch_app_events_from_relays_windowed( - relay_urls: &[String], + cursors: &[StoredRelayIngestCursor], +) -> Result<AppDirectRelayFetchReceipt, AppSyncTransportError> { + let target_relays = cursors + .iter() + .map(|cursor| cursor.relay_url.clone()) + .collect::<Vec<_>>(); + let mut merged: Option<AppDirectRelayFetchReceipt> = None; + + for cursor in cursors { + match fetch_app_events_from_single_relay_windowed(cursor) { + Ok(receipt) => merge_app_direct_relay_fetch_receipt(&mut merged, receipt), + Err(error) => merge_app_direct_relay_fetch_receipt( + &mut merged, + AppDirectRelayFetchReceipt { + target_relays: vec![cursor.relay_url.clone()], + connected_relays: Vec::new(), + failed_relays: vec![ + RelayDeliveryFailure::new(cursor.relay_url.clone(), error.to_string()) + .map_err(|source| AppSyncTransportError::failed(source.to_string()))?, + ], + fetched_relays: Vec::new(), + event_observed_relays: BTreeMap::new(), + events: Vec::new(), + }, + ), + } + } + + Ok(merged.unwrap_or_else(|| AppDirectRelayFetchReceipt { + target_relays, + connected_relays: Vec::new(), + failed_relays: Vec::new(), + fetched_relays: Vec::new(), + event_observed_relays: BTreeMap::new(), + events: Vec::new(), + })) +} + +fn fetch_app_events_from_single_relay_windowed( + cursor: &StoredRelayIngestCursor, ) -> Result<AppDirectRelayFetchReceipt, AppSyncTransportError> { - let base_filter = direct_relay_ingest_filter(); + let base_filter = direct_relay_ingest_filter_since(cursor.cursor_since_unix_seconds)?; let mut next_filter = base_filter.clone(); let mut merged: Option<AppDirectRelayFetchReceipt> = None; for _ in 0..APP_DIRECT_RELAY_INGEST_MAX_PAGES { - let receipt = fetch_app_events_from_relays(relay_urls, next_filter)?; + let receipt = fetch_app_events_from_single_relay(cursor.relay_url.as_str(), next_filter)?; let page_len = receipt.events.len(); let oldest_created_at = receipt .events @@ -5018,7 +5181,10 @@ fn fetch_app_events_from_relays_windowed( let Some(oldest_created_at) = oldest_created_at else { break; }; - if oldest_created_at == 0 { + if cursor.cursor_since_unix_seconds.is_some_and(|since| { + i64::try_from(oldest_created_at).is_ok_and(|oldest| oldest <= since) + }) || oldest_created_at == 0 + { break; } next_filter = base_filter @@ -5028,45 +5194,36 @@ fn fetch_app_events_from_relays_windowed( } Ok(merged.unwrap_or_else(|| AppDirectRelayFetchReceipt { - target_relays: relay_urls.to_vec(), + target_relays: vec![cursor.relay_url.clone()], connected_relays: Vec::new(), failed_relays: Vec::new(), + fetched_relays: Vec::new(), + event_observed_relays: BTreeMap::new(), events: Vec::new(), })) } -fn fetch_app_events_from_relays( - relay_urls: &[String], +fn fetch_app_events_from_single_relay( + relay_url: &str, filter: RadrootsNostrFilter, ) -> Result<AppDirectRelayFetchReceipt, AppSyncTransportError> { - if relay_urls.is_empty() { - return Ok(AppDirectRelayFetchReceipt { - target_relays: Vec::new(), - connected_relays: Vec::new(), - failed_relays: Vec::new(), - events: Vec::new(), - }); - } - let runtime = TokioRuntimeBuilder::new_current_thread() .enable_all() .build() .map_err(|error| AppSyncTransportError::failed(error.to_string()))?; - runtime.block_on(fetch_app_events_from_relays_async(relay_urls, filter)) + runtime.block_on(fetch_app_events_from_single_relay_async(relay_url, filter)) } -async fn fetch_app_events_from_relays_async( - relay_urls: &[String], +async fn fetch_app_events_from_single_relay_async( + relay_url: &str, filter: RadrootsNostrFilter, ) -> Result<AppDirectRelayFetchReceipt, AppSyncTransportError> { let client = RadrootsNostrClient::new_signerless(); - for relay_url in relay_urls { - client - .add_read_relay(relay_url) - .await - .map_err(|source| AppSyncTransportError::failed(source.to_string()))?; - } + client + .add_read_relay(relay_url) + .await + .map_err(|source| AppSyncTransportError::failed(source.to_string()))?; let connection_output = client.try_connect(APP_DIRECT_RELAY_CONNECT_TIMEOUT).await; let failed_relays = direct_relay_failures_from_output(&connection_output)?; @@ -5084,15 +5241,30 @@ async fn fetch_app_events_from_relays_async( ) .await .map_err(|source| AppSyncTransportError::failed(source.to_string()))?; + let last_event_created_at_unix_seconds = events + .iter() + .map(|event| relay_event_created_at_unix_seconds_for_fetch(event)) + .collect::<Result<Vec<_>, _>>()? + .into_iter() + .max(); + let mut event_observed_relays = BTreeMap::new(); + for event in &events { + event_observed_relays.insert(event.id.to_hex(), vec![relay_url.to_owned()]); + } Ok(AppDirectRelayFetchReceipt { - target_relays: relay_urls.to_vec(), + target_relays: vec![relay_url.to_owned()], connected_relays: connection_output .success .iter() .map(ToString::to_string) .collect(), failed_relays, + fetched_relays: vec![AppDirectRelayFetchedRelay { + relay_url: relay_url.to_owned(), + last_event_created_at_unix_seconds, + }], + event_observed_relays, events, }) } @@ -5108,6 +5280,19 @@ fn direct_relay_ingest_filter() -> RadrootsNostrFilter { .limit(APP_DIRECT_RELAY_INGEST_LIMIT) } +fn direct_relay_ingest_filter_since( + since_unix_seconds: Option<i64>, +) -> Result<RadrootsNostrFilter, AppSyncTransportError> { + let mut filter = direct_relay_ingest_filter(); + if let Some(since_unix_seconds) = since_unix_seconds { + let since = u64::try_from(since_unix_seconds).map_err(|_| { + AppSyncTransportError::failed("relay ingest cursor must be non-negative") + })?; + filter = filter.since(RadrootsNostrTimestamp::from(since)); + } + Ok(filter) +} + fn direct_relay_failures_from_output<T: fmt::Debug>( output: &RadrootsNostrOutput<T>, ) -> Result<Vec<RelayDeliveryFailure>, AppSyncTransportError> { @@ -5142,7 +5327,17 @@ fn merge_app_direct_relay_fetch_receipt( return; }; + append_unique_relays(&mut existing.target_relays, receipt.target_relays); append_unique_relays(&mut existing.connected_relays, receipt.connected_relays); + for fetched_relay in receipt.fetched_relays { + if !existing + .fetched_relays + .iter() + .any(|known| known.relay_url == fetched_relay.relay_url) + { + existing.fetched_relays.push(fetched_relay); + } + } for failure in receipt.failed_relays { if !existing .failed_relays @@ -5152,6 +5347,13 @@ fn merge_app_direct_relay_fetch_receipt( existing.failed_relays.push(failure); } } + for (event_id, relays) in receipt.event_observed_relays { + let observed = existing + .event_observed_relays + .entry(event_id) + .or_insert_with(Vec::new); + append_unique_relays(observed, relays); + } let mut seen_event_ids = existing .events .iter() @@ -5176,27 +5378,28 @@ fn direct_relay_event_records( receipt: &AppDirectRelayFetchReceipt, inserted_at_ms: i64, ) -> Result<Vec<LocalEventRecord>, AppDirectRelayIngestError> { - let observed_relays = if receipt.connected_relays.len() == 1 { - receipt.connected_relays.clone() - } else { - Vec::new() - }; - let delivery_evidence = RelayDeliveryEvidence::observed( - &receipt.target_relays, - &receipt.connected_relays, - observed_relays, - receipt.failed_relays.clone(), - ) - .map_err(|source| AppSyncTransportError::failed(source.to_string()))?; - let relay_set_fingerprint = delivery_evidence.relay_set_fingerprint().ok_or_else(|| { - AppSyncTransportError::failed("app relay ingest requires a non-empty relay set") - })?; - let relay_delivery_json = delivery_evidence - .to_json_value() - .map_err(|source| AppSyncTransportError::failed(source.to_string()))?; let mut records = Vec::with_capacity(receipt.events.len()); for (index, event) in receipt.events.iter().enumerate() { + let event_id = event.id.to_hex(); + let observed_relays = receipt + .event_observed_relays + .get(event_id.as_str()) + .cloned() + .unwrap_or_default(); + let delivery_evidence = RelayDeliveryEvidence::observed( + &receipt.target_relays, + &receipt.connected_relays, + observed_relays, + receipt.failed_relays.clone(), + ) + .map_err(|source| AppSyncTransportError::failed(source.to_string()))?; + let relay_set_fingerprint = delivery_evidence.relay_set_fingerprint().ok_or_else(|| { + AppSyncTransportError::failed("app relay ingest requires a non-empty relay set") + })?; + let relay_delivery_json = delivery_evidence + .to_json_value() + .map_err(|source| AppSyncTransportError::failed(source.to_string()))?; let tags = relay_event_tags(event); let kind = relay_event_kind(event); let event_pubkey = event.pubkey.to_string(); @@ -5213,7 +5416,7 @@ fn direct_relay_event_records( records.push(LocalEventRecord { seq: local_seq, change_seq: local_seq, - record_id: format!("app:relay_event:{}", event.id.to_hex()), + record_id: format!("app:relay_event:{event_id}"), family: LocalRecordFamily::SignedEvent, status: LocalRecordStatus::Published, source_runtime: direct_relay_event_source_runtime(kind, listing_d_tag.as_deref()), @@ -5225,7 +5428,7 @@ fn direct_relay_event_records( farm_id, listing_addr, local_work_json: None, - event_id: Some(event.id.to_hex()), + event_id: Some(event_id), event_kind: Some(i64::from(kind)), event_pubkey: Some(event_pubkey), event_created_at: Some(relay_event_created_at_i64(event)?), @@ -5283,6 +5486,13 @@ fn relay_event_created_at_i64(event: &RadrootsNostrEvent) -> Result<i64, AppSqli }) } +fn relay_event_created_at_unix_seconds_for_fetch( + event: &RadrootsNostrEvent, +) -> Result<i64, AppSyncTransportError> { + i64::try_from(event.created_at.as_secs()) + .map_err(|_| AppSyncTransportError::failed("app relay ingest event timestamp must fit i64")) +} + fn relay_event_created_at_ms(event: &RadrootsNostrEvent) -> Result<i64, AppSqliteError> { relay_event_created_at_i64(event)? .checked_mul(1_000) @@ -7184,6 +7394,7 @@ fn order_recovery_sync_payload( fn load_selected_account_sync_context( sqlite_store: &AppSqliteStore, identity_projection: &AppIdentityProjection, + relay_urls: &[String], ) -> Result<DesktopSelectedAccountSyncContext, AppSqliteError> { let Some(selected_account) = identity_projection.selected_account.as_ref() else { return Ok(DesktopSelectedAccountSyncContext::default()); @@ -7196,9 +7407,17 @@ fn load_selected_account_sync_context( .map(|stored| stored.conflict.clone()) .collect::<Vec<_>>(); let pending_write_count = sqlite_store.load_pending_sync_operations(account_id)?.len(); + let relay_urls = normalized_app_relay_ingest_urls(relay_urls)?; + let relay_ingest = sqlite_store.load_relay_ingest_freshness( + APP_DIRECT_RELAY_INGEST_SCOPE_KEY, + &relay_urls, + current_runtime_time_seconds()?, + APP_DIRECT_RELAY_INGEST_STALE_AFTER_SECONDS, + )?; Ok(DesktopSelectedAccountSyncContext { projection: derive_sync_projection(&checkpoint, &conflicts), + relay_ingest, pending_write_count, conflicts: stored_conflicts .into_iter() @@ -7579,11 +7798,12 @@ mod tests { use radroots_app_sync::{ AppFarmProfilePublishPayload, AppListingPublishPayload, AppOrderRequestItemPayload, AppOrderRequestPublishPayload, AppPublishContext, AppPublishPayload, - AppPublishedOperationReceipt, AppSyncRequest, AppSyncResult, AppSyncRunStatus, - AppSyncTransport, AppSyncTransportError, PendingSyncOperation, PendingSyncOperationState, - RecordedAppSyncTransport, SyncAggregateRef, SyncCheckpointState, SyncCheckpointStatus, - SyncConflict, SyncConflictKind, SyncConflictResolutionStatus, SyncConflictSeverity, - SyncOperationKind, SyncTrigger, + AppPublishedOperationReceipt, AppRelayIngestScopeFreshness, AppRelayIngestScopeStatus, + AppSyncRequest, AppSyncResult, AppSyncRunStatus, AppSyncTransport, AppSyncTransportError, + PendingSyncOperation, PendingSyncOperationState, RecordedAppSyncTransport, + SyncAggregateRef, SyncCheckpointState, SyncCheckpointStatus, SyncConflict, + SyncConflictKind, SyncConflictResolutionStatus, SyncConflictSeverity, SyncOperationKind, + SyncTrigger, }; use radroots_identity::RadrootsIdentity; use radroots_local_events::{ @@ -7924,7 +8144,7 @@ mod tests { .find(|listing| listing.product_id == product_id) .expect("fresh buyer app should project relay listing"); assert_eq!(listing.title, "Relay ingest lettuce"); - assert!(listing.listing_relays.is_empty()); + assert_eq!(listing.listing_relays, vec![listing_relay.url().to_owned()]); let product_id_string = product_id.to_string(); let imports = runtime @@ -7952,7 +8172,7 @@ mod tests { assert_eq!(listing_import.outbox_status, "none"); assert_eq!(delivery["state"], json!("observed")); assert_eq!(delivery["acknowledged_relays"], json!([])); - assert_eq!(delivery["observed_relays"], serde_json::Value::Null); + assert_eq!(delivery["observed_relays"], json!([listing_relay.url()])); assert_eq!( delivery["target_relays"], json!([listing_relay.url(), empty_relay.url()]) @@ -8066,6 +8286,14 @@ mod tests { assert_eq!(listing.title, "Relay ingest lettuce"); assert_eq!(listing.farm_display_name, "Relay test farm"); assert_eq!(listing.listing_relays, vec![relay_url.to_owned()]); + let relay_ingest = runtime + .lock_state() + .selected_account_relay_ingest_freshness + .clone(); + assert_eq!(relay_ingest.status, AppRelayIngestScopeStatus::Fresh); + assert_eq!(relay_ingest.relays.len(), 1); + assert_eq!(relay_ingest.relays[0].relay_url, relay_url); + assert!(relay_ingest.relays[0].cursor_since_unix_seconds.is_some()); let product_id_string = product_id.to_string(); let imports = runtime @@ -8102,6 +8330,26 @@ mod tests { .count(), 1 ); + assert!( + runtime + .sync_on_manual_refresh() + .expect("repeat relay ingest should complete") + ); + let repeated_imports = runtime + .lock_state() + .sqlite_store + .as_ref() + .expect("sqlite store") + .load_local_interop_records() + .expect("repeated local interop records should load"); + assert_eq!( + repeated_imports + .iter() + .filter(|record| record.projected_kind == "listing" + && record.projected_id.as_deref() == Some(product_id_string.as_str())) + .count(), + 1 + ); cleanup_bootstrapped_runtime_paths(&paths); } @@ -8197,6 +8445,13 @@ mod tests { summary.sync_status.projection.checkpoint.state, SyncCheckpointState::Failed ); + assert_eq!( + runtime + .lock_state() + .selected_account_relay_ingest_freshness + .status, + AppRelayIngestScopeStatus::Fresh + ); assert_eq!(summary.sync_status.pending_write_count, 1); let listing = summary .personal_projection @@ -8616,6 +8871,7 @@ mod tests { sync_transport: default_sync_transport(), runtime_metadata: DesktopAppRuntimeMetadataSummary::default(), selected_account_pending_sync_write_count: 0, + selected_account_relay_ingest_freshness: AppRelayIngestScopeFreshness::default(), selected_account_sync_conflicts: Vec::new(), startup_issue: None, }); @@ -8670,6 +8926,7 @@ mod tests { sync_transport: default_sync_transport(), runtime_metadata: DesktopAppRuntimeMetadataSummary::default(), selected_account_pending_sync_write_count: 0, + selected_account_relay_ingest_freshness: AppRelayIngestScopeFreshness::default(), selected_account_sync_conflicts: Vec::new(), startup_issue: None, }); @@ -10416,6 +10673,7 @@ mod tests { sync_transport: default_sync_transport(), runtime_metadata: DesktopAppRuntimeMetadataSummary::default(), selected_account_pending_sync_write_count: 0, + selected_account_relay_ingest_freshness: AppRelayIngestScopeFreshness::default(), selected_account_sync_conflicts: Vec::new(), startup_issue: None, }); @@ -10447,6 +10705,7 @@ mod tests { sync_transport: default_sync_transport(), runtime_metadata: DesktopAppRuntimeMetadataSummary::default(), selected_account_pending_sync_write_count: 0, + selected_account_relay_ingest_freshness: AppRelayIngestScopeFreshness::default(), selected_account_sync_conflicts: Vec::new(), startup_issue: None, }); @@ -10856,6 +11115,7 @@ mod tests { sync_transport: default_sync_transport(), runtime_metadata: DesktopAppRuntimeMetadataSummary::default(), selected_account_pending_sync_write_count: 0, + selected_account_relay_ingest_freshness: AppRelayIngestScopeFreshness::default(), selected_account_sync_conflicts: Vec::new(), startup_issue: None, }); @@ -10958,6 +11218,7 @@ mod tests { sync_transport: default_sync_transport(), runtime_metadata: DesktopAppRuntimeMetadataSummary::default(), selected_account_pending_sync_write_count: 0, + selected_account_relay_ingest_freshness: AppRelayIngestScopeFreshness::default(), selected_account_sync_conflicts: Vec::new(), startup_issue: None, }); @@ -11011,6 +11272,7 @@ mod tests { sync_transport: default_sync_transport(), runtime_metadata: DesktopAppRuntimeMetadataSummary::default(), selected_account_pending_sync_write_count: 0, + selected_account_relay_ingest_freshness: AppRelayIngestScopeFreshness::default(), selected_account_sync_conflicts: Vec::new(), startup_issue: None, }); @@ -11048,6 +11310,7 @@ mod tests { sync_transport: default_sync_transport(), runtime_metadata: DesktopAppRuntimeMetadataSummary::default(), selected_account_pending_sync_write_count: 0, + selected_account_relay_ingest_freshness: AppRelayIngestScopeFreshness::default(), selected_account_sync_conflicts: Vec::new(), startup_issue: None, }); @@ -11083,6 +11346,7 @@ mod tests { sync_transport: default_sync_transport(), runtime_metadata: DesktopAppRuntimeMetadataSummary::default(), selected_account_pending_sync_write_count: 0, + selected_account_relay_ingest_freshness: AppRelayIngestScopeFreshness::default(), selected_account_sync_conflicts: Vec::new(), startup_issue: None, }); @@ -15252,6 +15516,7 @@ mod tests { sync_transport: default_sync_transport(), runtime_metadata: DesktopAppRuntimeMetadataSummary::default(), selected_account_pending_sync_write_count: 0, + selected_account_relay_ingest_freshness: AppRelayIngestScopeFreshness::default(), selected_account_sync_conflicts: Vec::new(), startup_issue: None, }); @@ -15287,6 +15552,7 @@ mod tests { sync_transport: default_sync_transport(), runtime_metadata: DesktopAppRuntimeMetadataSummary::default(), selected_account_pending_sync_write_count: 0, + selected_account_relay_ingest_freshness: AppRelayIngestScopeFreshness::default(), selected_account_sync_conflicts: Vec::new(), startup_issue: None, }) @@ -15320,6 +15586,7 @@ mod tests { sync_transport: default_sync_transport(), runtime_metadata: DesktopAppRuntimeMetadataSummary::default(), selected_account_pending_sync_write_count: 0, + selected_account_relay_ingest_freshness: AppRelayIngestScopeFreshness::default(), selected_account_sync_conflicts: Vec::new(), startup_issue: None, }), diff --git a/crates/shared/sqlite/migrations/0019_relay_ingest_freshness.sql b/crates/shared/sqlite/migrations/0019_relay_ingest_freshness.sql @@ -0,0 +1,19 @@ +CREATE TABLE app_relay_ingest_freshness ( + scope_key TEXT NOT NULL, + relay_url TEXT NOT NULL, + state TEXT NOT NULL CHECK (state IN ('fresh', 'stale', 'failed')), + cursor_since_unix_seconds INTEGER, + last_event_created_at_unix_seconds INTEGER, + last_fetch_started_at TEXT NOT NULL, + last_fetch_started_unix_seconds INTEGER NOT NULL, + last_fetch_completed_at TEXT, + last_fetch_completed_unix_seconds INTEGER, + last_success_at TEXT, + last_success_unix_seconds INTEGER, + last_error_message TEXT, + updated_at TEXT NOT NULL, + PRIMARY KEY(scope_key, relay_url) +); + +CREATE INDEX idx_app_relay_ingest_freshness_scope_state + ON app_relay_ingest_freshness(scope_key, state, relay_url); diff --git a/crates/shared/sqlite/src/lib.rs b/crates/shared/sqlite/src/lib.rs @@ -29,7 +29,8 @@ use radroots_app_models::{ ReminderLogProjection, TodayAgendaProjection, }; use radroots_app_sync::{ - PendingSyncOperation, SyncCheckpointStatus, SyncConflict, SyncConflictResolutionStatus, + AppRelayIngestScopeFreshness, PendingSyncOperation, SyncCheckpointStatus, SyncConflict, + SyncConflictResolutionStatus, }; use rusqlite::Connection; @@ -51,7 +52,9 @@ pub use migrations::latest_schema_version; pub use orders::AppOrdersRepository; pub use products::AppProductsRepository; pub use reminders::AppRemindersRepository; -pub use sync::{AppSyncRepository, StoredPendingSyncOperation, StoredSyncConflict}; +pub use sync::{ + AppSyncRepository, StoredPendingSyncOperation, StoredRelayIngestCursor, StoredSyncConflict, +}; pub use today::{ AppTodayAgendaRepository, TODAY_AGENDA_LIST_LIMIT, TODAY_AGENDA_LOW_STOCK_THRESHOLD, }; @@ -595,6 +598,74 @@ impl AppSqliteStore { .save_checkpoint(account_id, checkpoint) } + pub fn load_relay_ingest_cursors( + &self, + scope_key: &str, + relay_urls: &[String], + ) -> Result<Vec<StoredRelayIngestCursor>, AppSqliteError> { + self.sync_repository() + .load_relay_ingest_cursors(scope_key, relay_urls) + } + + pub fn load_relay_ingest_freshness( + &self, + scope_key: &str, + relay_urls: &[String], + now_unix_seconds: i64, + stale_after_seconds: i64, + ) -> Result<AppRelayIngestScopeFreshness, AppSqliteError> { + self.sync_repository().load_relay_ingest_freshness( + scope_key, + relay_urls, + now_unix_seconds, + stale_after_seconds, + ) + } + + pub fn record_relay_ingest_success( + &self, + scope_key: &str, + relay_url: &str, + cursor_since_unix_seconds: i64, + last_event_created_at_unix_seconds: Option<i64>, + started_at: &str, + started_unix_seconds: i64, + completed_at: &str, + completed_unix_seconds: i64, + ) -> Result<(), AppSqliteError> { + self.sync_repository().record_relay_ingest_success( + scope_key, + relay_url, + cursor_since_unix_seconds, + last_event_created_at_unix_seconds, + started_at, + started_unix_seconds, + completed_at, + completed_unix_seconds, + ) + } + + pub fn record_relay_ingest_failure( + &self, + scope_key: &str, + relay_url: &str, + started_at: &str, + started_unix_seconds: i64, + completed_at: &str, + completed_unix_seconds: i64, + error_message: &str, + ) -> Result<(), AppSqliteError> { + self.sync_repository().record_relay_ingest_failure( + scope_key, + relay_url, + started_at, + started_unix_seconds, + completed_at, + completed_unix_seconds, + error_message, + ) + } + pub fn record_sync_conflict( &self, account_id: &str, @@ -757,6 +828,7 @@ mod tests { assert!(table_exists(connection, "local_outbox")); assert!(table_exists(connection, "local_conflicts")); assert!(table_exists(connection, "sync_checkpoints")); + assert!(table_exists(connection, "app_relay_ingest_freshness")); assert!(table_exists(connection, "activity_events")); assert!(table_exists(connection, "account_surface_activations")); assert!(table_exists(connection, "account_farm_setups")); @@ -791,6 +863,21 @@ mod tests { assert!(column_exists(connection, "sync_checkpoints", "state")); assert!(column_exists( connection, + "app_relay_ingest_freshness", + "scope_key" + )); + assert!(column_exists( + connection, + "app_relay_ingest_freshness", + "relay_url" + )); + assert!(column_exists( + connection, + "app_relay_ingest_freshness", + "cursor_since_unix_seconds" + )); + assert!(column_exists( + connection, "fulfillment_windows", "pickup_location_id" )); @@ -968,6 +1055,7 @@ mod tests { "resolution_status" )); assert!(column_exists(connection, "sync_checkpoints", "state")); + assert!(table_exists(connection, "app_relay_ingest_freshness")); assert_eq!(row_count(connection, "sync_checkpoints"), 0); drop(store); diff --git a/crates/shared/sqlite/src/migrations.rs b/crates/shared/sqlite/src/migrations.rs @@ -76,6 +76,10 @@ const MIGRATIONS: &[Migration] = &[ version: 18, sql: include_str!("../migrations/0018_listing_relay_provenance.sql"), }, + Migration { + version: 19, + sql: include_str!("../migrations/0019_relay_ingest_freshness.sql"), + }, ]; pub fn latest_schema_version() -> u32 { diff --git a/crates/shared/sqlite/src/sync.rs b/crates/shared/sqlite/src/sync.rs @@ -1,8 +1,9 @@ use radroots_app_models::{FarmId, FulfillmentWindowId, OrderId, ProductId}; use radroots_app_sync::{ - PendingSyncOperation, PendingSyncOperationState, SyncAggregateRef, SyncCheckpointState, - SyncCheckpointStatus, SyncConflict, SyncConflictKind, SyncConflictResolutionStatus, - SyncConflictSeverity, SyncOperationKind, + AppRelayIngestFreshnessState, AppRelayIngestRelayFreshness, AppRelayIngestScopeFreshness, + AppRelayIngestScopeStatus, PendingSyncOperation, PendingSyncOperationState, SyncAggregateRef, + SyncCheckpointState, SyncCheckpointStatus, SyncConflict, SyncConflictKind, + SyncConflictResolutionStatus, SyncConflictSeverity, SyncOperationKind, }; use rusqlite::{Connection, OptionalExtension, params}; use uuid::Uuid; @@ -21,6 +22,12 @@ pub struct StoredSyncConflict { pub conflict: SyncConflict, } +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct StoredRelayIngestCursor { + pub relay_url: String, + pub cursor_since_unix_seconds: Option<i64>, +} + pub struct AppSyncRepository<'a> { connection: &'a Connection, } @@ -335,6 +342,267 @@ impl<'a> AppSyncRepository<'a> { Ok(()) } + pub fn load_relay_ingest_cursors( + &self, + scope_key: &str, + relay_urls: &[String], + ) -> Result<Vec<StoredRelayIngestCursor>, AppSqliteError> { + relay_urls + .iter() + .map(|relay_url| { + let cursor_since_unix_seconds = self + .connection + .query_row( + "SELECT cursor_since_unix_seconds + FROM app_relay_ingest_freshness + WHERE scope_key = ?1 AND relay_url = ?2 + LIMIT 1", + params![scope_key, relay_url.as_str()], + |row| row.get::<_, Option<i64>>(0), + ) + .optional() + .map_err(|source| AppSqliteError::Query { + operation: "load relay ingest cursor", + source, + })? + .flatten(); + + Ok(StoredRelayIngestCursor { + relay_url: relay_url.clone(), + cursor_since_unix_seconds, + }) + }) + .collect() + } + + pub fn load_relay_ingest_freshness( + &self, + scope_key: &str, + relay_urls: &[String], + now_unix_seconds: i64, + stale_after_seconds: i64, + ) -> Result<AppRelayIngestScopeFreshness, AppSqliteError> { + let relays = relay_urls + .iter() + .map(|relay_url| { + self.load_relay_ingest_relay_freshness( + scope_key, + relay_url, + now_unix_seconds, + stale_after_seconds, + ) + }) + .collect::<Result<Vec<_>, _>>()?; + let status = relay_ingest_scope_status(relays.as_slice()); + + Ok(AppRelayIngestScopeFreshness { + scope_key: scope_key.to_owned(), + status, + relays, + }) + } + + pub fn record_relay_ingest_success( + &self, + scope_key: &str, + relay_url: &str, + cursor_since_unix_seconds: i64, + last_event_created_at_unix_seconds: Option<i64>, + started_at: &str, + started_unix_seconds: i64, + completed_at: &str, + completed_unix_seconds: i64, + ) -> Result<(), AppSqliteError> { + self.connection + .execute( + "INSERT INTO app_relay_ingest_freshness ( + scope_key, + relay_url, + state, + cursor_since_unix_seconds, + last_event_created_at_unix_seconds, + last_fetch_started_at, + last_fetch_started_unix_seconds, + last_fetch_completed_at, + last_fetch_completed_unix_seconds, + last_success_at, + last_success_unix_seconds, + last_error_message, + updated_at + ) VALUES (?1, ?2, 'fresh', ?3, ?4, ?5, ?6, ?7, ?8, ?7, ?8, NULL, ?7) + ON CONFLICT(scope_key, relay_url) DO UPDATE SET + state = 'fresh', + cursor_since_unix_seconds = excluded.cursor_since_unix_seconds, + last_event_created_at_unix_seconds = excluded.last_event_created_at_unix_seconds, + last_fetch_started_at = excluded.last_fetch_started_at, + last_fetch_started_unix_seconds = excluded.last_fetch_started_unix_seconds, + last_fetch_completed_at = excluded.last_fetch_completed_at, + last_fetch_completed_unix_seconds = excluded.last_fetch_completed_unix_seconds, + last_success_at = excluded.last_success_at, + last_success_unix_seconds = excluded.last_success_unix_seconds, + last_error_message = NULL, + updated_at = excluded.updated_at", + params![ + scope_key, + relay_url, + cursor_since_unix_seconds, + last_event_created_at_unix_seconds, + started_at, + started_unix_seconds, + completed_at, + completed_unix_seconds, + ], + ) + .map_err(|source| AppSqliteError::Query { + operation: "record relay ingest success", + source, + })?; + + Ok(()) + } + + pub fn record_relay_ingest_failure( + &self, + scope_key: &str, + relay_url: &str, + started_at: &str, + started_unix_seconds: i64, + completed_at: &str, + completed_unix_seconds: i64, + error_message: &str, + ) -> Result<(), AppSqliteError> { + self.connection + .execute( + "INSERT INTO app_relay_ingest_freshness ( + scope_key, + relay_url, + state, + cursor_since_unix_seconds, + last_event_created_at_unix_seconds, + last_fetch_started_at, + last_fetch_started_unix_seconds, + last_fetch_completed_at, + last_fetch_completed_unix_seconds, + last_success_at, + last_success_unix_seconds, + last_error_message, + updated_at + ) VALUES (?1, ?2, 'failed', NULL, NULL, ?3, ?4, ?5, ?6, NULL, NULL, ?7, ?5) + ON CONFLICT(scope_key, relay_url) DO UPDATE SET + state = 'failed', + last_fetch_started_at = excluded.last_fetch_started_at, + last_fetch_started_unix_seconds = excluded.last_fetch_started_unix_seconds, + last_fetch_completed_at = excluded.last_fetch_completed_at, + last_fetch_completed_unix_seconds = excluded.last_fetch_completed_unix_seconds, + last_error_message = excluded.last_error_message, + updated_at = excluded.updated_at", + params![ + scope_key, + relay_url, + started_at, + started_unix_seconds, + completed_at, + completed_unix_seconds, + error_message, + ], + ) + .map_err(|source| AppSqliteError::Query { + operation: "record relay ingest failure", + source, + })?; + + Ok(()) + } + + fn load_relay_ingest_relay_freshness( + &self, + scope_key: &str, + relay_url: &str, + now_unix_seconds: i64, + stale_after_seconds: i64, + ) -> Result<AppRelayIngestRelayFreshness, AppSqliteError> { + let row = self + .connection + .query_row( + "SELECT + state, + cursor_since_unix_seconds, + last_event_created_at_unix_seconds, + last_fetch_started_at, + last_fetch_completed_at, + last_fetch_completed_unix_seconds, + last_success_at, + last_error_message + FROM app_relay_ingest_freshness + WHERE scope_key = ?1 AND relay_url = ?2 + LIMIT 1", + params![scope_key, relay_url], + |row| { + Ok(( + row.get::<_, String>(0)?, + row.get::<_, Option<i64>>(1)?, + row.get::<_, Option<i64>>(2)?, + row.get::<_, Option<String>>(3)?, + row.get::<_, Option<String>>(4)?, + row.get::<_, Option<i64>>(5)?, + row.get::<_, Option<String>>(6)?, + row.get::<_, Option<String>>(7)?, + )) + }, + ) + .optional() + .map_err(|source| AppSqliteError::Query { + operation: "load relay ingest freshness", + source, + })?; + + row.map_or_else( + || { + Ok(AppRelayIngestRelayFreshness { + relay_url: relay_url.to_owned(), + state: AppRelayIngestFreshnessState::Stale, + cursor_since_unix_seconds: None, + last_event_created_at_unix_seconds: None, + last_fetch_started_at: None, + last_fetch_completed_at: None, + last_success_at: None, + last_error_message: None, + }) + }, + |( + state, + cursor_since_unix_seconds, + last_event_created_at_unix_seconds, + last_fetch_started_at, + last_fetch_completed_at, + last_fetch_completed_unix_seconds, + last_success_at, + last_error_message, + )| { + let mut state = parse_relay_ingest_freshness_state(state)?; + if state == AppRelayIngestFreshnessState::Fresh + && relay_ingest_is_stale( + last_fetch_completed_unix_seconds, + now_unix_seconds, + stale_after_seconds, + ) + { + state = AppRelayIngestFreshnessState::Stale; + } + Ok(AppRelayIngestRelayFreshness { + relay_url: relay_url.to_owned(), + state, + cursor_since_unix_seconds, + last_event_created_at_unix_seconds, + last_fetch_started_at, + last_fetch_completed_at, + last_success_at, + last_error_message, + }) + }, + ) + } + pub fn record_conflict( &self, account_id: &str, @@ -682,13 +950,62 @@ fn sync_conflict_resolution_status_value(resolution: SyncConflictResolutionStatu } } +fn parse_relay_ingest_freshness_state( + value: String, +) -> Result<AppRelayIngestFreshnessState, AppSqliteError> { + match value.as_str() { + "fresh" => Ok(AppRelayIngestFreshnessState::Fresh), + "stale" => Ok(AppRelayIngestFreshnessState::Stale), + "failed" => Ok(AppRelayIngestFreshnessState::Failed), + _ => Err(AppSqliteError::DecodeEnum { + field: "app_relay_ingest_freshness.state", + value, + }), + } +} + +fn relay_ingest_is_stale( + last_fetch_completed_unix_seconds: Option<i64>, + now_unix_seconds: i64, + stale_after_seconds: i64, +) -> bool { + let Some(last_fetch_completed_unix_seconds) = last_fetch_completed_unix_seconds else { + return true; + }; + now_unix_seconds.saturating_sub(last_fetch_completed_unix_seconds) > stale_after_seconds +} + +fn relay_ingest_scope_status(relays: &[AppRelayIngestRelayFreshness]) -> AppRelayIngestScopeStatus { + if relays.is_empty() { + return AppRelayIngestScopeStatus::Stale; + } + let failed_count = relays + .iter() + .filter(|relay| relay.state == AppRelayIngestFreshnessState::Failed) + .count(); + if failed_count == relays.len() { + return AppRelayIngestScopeStatus::Failed; + } + if failed_count > 0 { + return AppRelayIngestScopeStatus::Partial; + } + if relays + .iter() + .all(|relay| relay.state == AppRelayIngestFreshnessState::Fresh) + { + AppRelayIngestScopeStatus::Fresh + } else { + AppRelayIngestScopeStatus::Stale + } +} + #[cfg(test)] mod tests { use radroots_app_models::{FarmId, ProductId}; use radroots_app_sync::{ - PendingSyncOperation, PendingSyncOperationState, SyncAggregateRef, SyncCheckpointStatus, - SyncConflict, SyncConflictKind, SyncConflictResolutionStatus, SyncConflictSeverity, - SyncOperationKind, + AppRelayIngestFreshnessState, AppRelayIngestScopeStatus, PendingSyncOperation, + PendingSyncOperationState, SyncAggregateRef, SyncCheckpointStatus, SyncConflict, + SyncConflictKind, SyncConflictResolutionStatus, SyncConflictSeverity, SyncOperationKind, }; use crate::{AppSqliteStore, DatabaseTarget}; @@ -726,6 +1043,84 @@ mod tests { } #[test] + fn relay_ingest_freshness_tracks_cursors_and_scope_status() { + let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("store should open"); + let repository = store.sync_repository(); + let relay_urls = vec![ + "wss://relay-a.example".to_owned(), + "wss://relay-b.example".to_owned(), + ]; + + let initial = repository + .load_relay_ingest_freshness("direct_relay_ingest", &relay_urls, 1_000, 60) + .expect("freshness should load"); + assert_eq!(initial.status, AppRelayIngestScopeStatus::Stale); + assert_eq!(initial.relays.len(), 2); + assert!( + initial + .relays + .iter() + .all(|relay| relay.state == AppRelayIngestFreshnessState::Stale) + ); + + repository + .record_relay_ingest_success( + "direct_relay_ingest", + "wss://relay-a.example", + 1_010, + Some(1_009), + "2026-05-25T20:00:00Z", + 1_000, + "2026-05-25T20:00:02Z", + 1_002, + ) + .expect("success should record"); + repository + .record_relay_ingest_failure( + "direct_relay_ingest", + "wss://relay-b.example", + "2026-05-25T20:00:00Z", + 1_000, + "2026-05-25T20:00:02Z", + 1_002, + "relay timeout", + ) + .expect("failure should record"); + + let cursors = repository + .load_relay_ingest_cursors("direct_relay_ingest", &relay_urls) + .expect("cursors should load"); + assert_eq!(cursors[0].cursor_since_unix_seconds, Some(1_010)); + assert_eq!(cursors[1].cursor_since_unix_seconds, None); + + let partial = repository + .load_relay_ingest_freshness("direct_relay_ingest", &relay_urls, 1_005, 60) + .expect("partial freshness should load"); + assert_eq!(partial.status, AppRelayIngestScopeStatus::Partial); + assert_eq!(partial.relays[0].state, AppRelayIngestFreshnessState::Fresh); + assert_eq!( + partial.relays[1].state, + AppRelayIngestFreshnessState::Failed + ); + assert_eq!( + partial.relays[1].last_error_message.as_deref(), + Some("relay timeout") + ); + + let stale = repository + .load_relay_ingest_freshness( + "direct_relay_ingest", + &["wss://relay-a.example".to_owned()], + 1_100, + 60, + ) + .expect("stale freshness should load"); + assert_eq!(stale.status, AppRelayIngestScopeStatus::Stale); + assert_eq!(stale.relays[0].state, AppRelayIngestFreshnessState::Stale); + assert_eq!(stale.relays[0].cursor_since_unix_seconds, Some(1_010)); + } + + #[test] fn pending_operations_are_account_scoped_and_retryable() { let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("store should open"); let repository = store.sync_repository(); diff --git a/crates/shared/sync/src/lib.rs b/crates/shared/sync/src/lib.rs @@ -356,6 +356,54 @@ impl Default for AppSyncProjection { } } +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum AppRelayIngestFreshnessState { + Fresh, + #[default] + Stale, + Failed, +} + +impl AppRelayIngestFreshnessState { + pub const fn storage_key(self) -> &'static str { + match self { + Self::Fresh => "fresh", + Self::Stale => "stale", + Self::Failed => "failed", + } + } +} + +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum AppRelayIngestScopeStatus { + Fresh, + #[default] + Stale, + Partial, + Failed, +} + +#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)] +pub struct AppRelayIngestRelayFreshness { + pub relay_url: String, + pub state: AppRelayIngestFreshnessState, + pub cursor_since_unix_seconds: Option<i64>, + pub last_event_created_at_unix_seconds: Option<i64>, + pub last_fetch_started_at: Option<String>, + pub last_fetch_completed_at: Option<String>, + pub last_success_at: Option<String>, + pub last_error_message: Option<String>, +} + +#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)] +pub struct AppRelayIngestScopeFreshness { + pub scope_key: String, + pub status: AppRelayIngestScopeStatus, + pub relays: Vec<AppRelayIngestRelayFreshness>, +} + #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] pub struct AppSyncRequest { pub trigger: SyncTrigger,