app

Local-first trade for farms and co-ops
git clone https://radroots.dev/git/app.git
Log | Files | Refs | README | LICENSE

commit cdcca6b7b30ecec4af02f14720ceb167893a9075
parent 34a871959983e573f0ee4b66dd74b42cb04494e6
Author: triesap <tyson@radroots.org>
Date:   Mon, 25 May 2026 08:04:21 +0000

sync: add desktop relay ingest

Diffstat:
Mcrates/launchers/desktop/src/runtime.rs | 721++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
Mcrates/shared/sqlite/src/local_interop.rs | 95+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 792 insertions(+), 24 deletions(-)

diff --git a/crates/launchers/desktop/src/runtime.rs b/crates/launchers/desktop/src/runtime.rs @@ -3,7 +3,7 @@ use std::fmt; use std::fs; use std::path::PathBuf; use std::sync::{Arc, Mutex, MutexGuard, PoisonError}; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::time::{Duration as StdDuration, SystemTime, UNIX_EPOCH}; use chrono::{DateTime, Duration, Utc}; use radroots_app_core::{ @@ -63,11 +63,15 @@ use radroots_identity::{RadrootsIdentity, RadrootsIdentityId}; use radroots_local_events::{ BUYER_ORDER_REQUEST_ACTOR_SOURCE_RESOLVED_ACCOUNT, BUYER_ORDER_REQUEST_ACTOR_SOURCE_UNRESOLVED_APP, BUYER_ORDER_REQUEST_DOCUMENT_KIND, - BUYER_ORDER_REQUEST_LOCAL_WORK_RECORD_KIND, LocalEventRecordInput, LocalEventRecordUpdate, - LocalEventsStore, LocalRecordFamily, LocalRecordStatus, PublishOutboxStatus, - RelayDeliveryEvidence, RelayDeliveryFailure, SourceRuntime, + BUYER_ORDER_REQUEST_LOCAL_WORK_RECORD_KIND, LocalEventRecord, LocalEventRecordInput, + LocalEventRecordUpdate, LocalEventsStore, LocalRecordFamily, LocalRecordStatus, + PublishOutboxStatus, RelayDeliveryEvidence, RelayDeliveryFailure, SourceRuntime, buyer_order_request_local_work_record_id, validate_buyer_order_request_local_work_payload, }; +use radroots_nostr::prelude::{ + RadrootsNostrClient, RadrootsNostrEvent, RadrootsNostrFilter, RadrootsNostrOutput, + RadrootsNostrTimestamp, radroots_nostr_kind, +}; use radroots_nostr_accounts::prelude::RadrootsNostrAccountsManager; use radroots_sdk::farm::{RadrootsFarm, RadrootsFarmRef}; use radroots_sdk::listing::{ @@ -111,6 +115,12 @@ use crate::remote_signer::{ const APP_DATABASE_FILE_NAME: &str = "app.sqlite3"; const SYNC_TRANSPORT_UNAVAILABLE_MESSAGE: &str = "remote sync transport is not configured"; const APP_DIRECT_RELAY_SYNC_TIMEOUT_MS: u64 = 2_000; +const APP_DIRECT_RELAY_CONNECT_TIMEOUT: StdDuration = StdDuration::from_secs(10); +const APP_DIRECT_RELAY_INGEST_LIMIT: usize = 1_000; +const APP_DIRECT_RELAY_INGEST_MAX_PAGES: usize = 5; +const APP_DIRECT_RELAY_INGEST_KINDS: &[u16] = &[ + 0, 30340, 30402, 30403, 3422, 3423, 3424, 3425, 3432, 3433, 3434, +]; #[derive(Debug, Default)] struct UnavailableAppSyncTransport; @@ -127,6 +137,22 @@ fn default_sync_transport() -> Box<dyn AppSyncTransport + Send> { Box::new(UnavailableAppSyncTransport) } +#[derive(Debug, Clone)] +struct AppDirectRelayFetchReceipt { + target_relays: Vec<String>, + connected_relays: Vec<String>, + failed_relays: Vec<RelayDeliveryFailure>, + events: Vec<RadrootsNostrEvent>, +} + +#[derive(Debug, Error)] +enum AppDirectRelayIngestError { + #[error(transparent)] + Sqlite(#[from] AppSqliteError), + #[error(transparent)] + Transport(#[from] AppSyncTransportError), +} + #[derive(Clone)] struct SdkDirectRelayAppSyncTransport { accounts_manager: RadrootsNostrAccountsManager, @@ -3241,13 +3267,38 @@ impl DesktopAppRuntimeState { .collect(), }; - match self.sync_transport.sync(request) { - Ok(result) => { + match self.run_sync_transport_or_relay_only(request, started_at.as_str()) { + Ok(mut result) => { + let mut relay_context_changed = false; + if self.has_configured_relay_ingest() { + match self.ingest_configured_relay_events() { + Ok(report) => { + result.pulled_record_count = result + .pulled_record_count + .saturating_add(report.scanned_records as usize); + relay_context_changed = + report.imported_records > 0 || report.skipped_records > 0; + } + Err(AppDirectRelayIngestError::Sqlite(error)) => return Err(error), + Err(AppDirectRelayIngestError::Transport(error)) => { + result.run_status = AppSyncRunStatus::Failed; + result.checkpoint = SyncCheckpointStatus::failed( + Some(started_at.clone()), + Some(current_utc_timestamp()), + result.checkpoint.last_remote_cursor.clone(), + error.to_string(), + ); + } + } + } changed |= self.apply_sync_result( prepared.account_id.as_str(), &prepared.pending_operations, &result, )?; + if relay_context_changed { + changed |= self.refresh_selected_account_context_after_local_events()?; + } } Err(error) => { changed |= self.apply_sync_transport_error( @@ -3263,6 +3314,55 @@ impl DesktopAppRuntimeState { Ok(changed) } + fn run_sync_transport_or_relay_only( + &mut self, + request: AppSyncRequest, + started_at: &str, + ) -> Result<AppSyncResult, AppSyncTransportError> { + if request.pending_operations.is_empty() && self.has_configured_relay_ingest() { + return Ok(AppSyncResult { + run_status: AppSyncRunStatus::Succeeded, + checkpoint: SyncCheckpointStatus::current( + Some(started_at.to_owned()), + current_utc_timestamp(), + request.checkpoint.last_remote_cursor.clone(), + ), + pushed_operation_count: 0, + pulled_record_count: 0, + conflicts: request.known_conflicts, + published_receipts: Vec::new(), + }); + } + + self.sync_transport.sync(request) + } + + fn has_configured_relay_ingest(&self) -> bool { + self.nostr_relay_urls + .iter() + .any(|relay_url| !relay_url.trim().is_empty()) + } + + fn ingest_configured_relay_events( + &self, + ) -> Result<AppLocalInteropImportReport, AppDirectRelayIngestError> { + let Some(sqlite_store) = self.sqlite_store.as_ref() else { + return Ok(AppLocalInteropImportReport::default()); + }; + let relay_urls = normalized_app_relay_ingest_urls(&self.nostr_relay_urls)?; + if relay_urls.is_empty() { + return Ok(AppLocalInteropImportReport::default()); + } + let receipt = fetch_app_events_from_relays_windowed(&relay_urls)?; + if receipt.events.is_empty() { + return Ok(AppLocalInteropImportReport::default()); + } + let records = app_relay_event_records(&receipt, current_runtime_time_ms()?)?; + sqlite_store + .import_local_event_records(records.as_slice()) + .map_err(AppDirectRelayIngestError::from) + } + fn prepare_sync_request( &self, trigger: SyncTrigger, @@ -3292,6 +3392,7 @@ impl DesktopAppRuntimeState { } if !matches!(trigger, SyncTrigger::ManualRefresh) + && !self.has_configured_relay_ingest() && !self.has_sync_eligible_runtime_state(&checkpoint, &conflicts, &pending_operations) { return Ok(None); @@ -4868,6 +4969,381 @@ fn normalized_app_sync_relay_urls( Ok(normalized) } +fn normalized_app_relay_ingest_urls(relay_urls: &[String]) -> Result<Vec<String>, AppSqliteError> { + let normalized = radroots_local_events::normalize_relay_urls(relay_urls).map_err(|_| { + AppSqliteError::InvalidProjection { + reason: "app relay ingest requires valid relay urls", + } + })?; + Ok(normalized) +} + +fn fetch_app_events_from_relays_windowed( + relay_urls: &[String], +) -> Result<AppDirectRelayFetchReceipt, AppSyncTransportError> { + let base_filter = app_direct_relay_ingest_filter(); + let mut next_filter = base_filter.clone(); + let mut merged: Option<AppDirectRelayFetchReceipt> = None; + + for _ in 0..APP_DIRECT_RELAY_INGEST_MAX_PAGES { + let receipt = fetch_app_events_from_relays(relay_urls, next_filter)?; + let page_len = receipt.events.len(); + let oldest_created_at = receipt + .events + .iter() + .map(|event| event.created_at.as_secs()) + .min(); + merge_app_direct_relay_fetch_receipt(&mut merged, receipt); + if page_len < APP_DIRECT_RELAY_INGEST_LIMIT { + break; + } + let Some(oldest_created_at) = oldest_created_at else { + break; + }; + if oldest_created_at == 0 { + break; + } + next_filter = base_filter + .clone() + .until(RadrootsNostrTimestamp::from(oldest_created_at - 1)) + .limit(APP_DIRECT_RELAY_INGEST_LIMIT); + } + + Ok(merged.unwrap_or_else(|| AppDirectRelayFetchReceipt { + target_relays: relay_urls.to_vec(), + connected_relays: Vec::new(), + failed_relays: Vec::new(), + events: Vec::new(), + })) +} + +fn fetch_app_events_from_relays( + relay_urls: &[String], + filter: RadrootsNostrFilter, +) -> Result<AppDirectRelayFetchReceipt, AppSyncTransportError> { + if relay_urls.is_empty() { + return Ok(AppDirectRelayFetchReceipt { + target_relays: Vec::new(), + connected_relays: Vec::new(), + failed_relays: Vec::new(), + events: Vec::new(), + }); + } + + let runtime = TokioRuntimeBuilder::new_current_thread() + .enable_all() + .build() + .map_err(|error| AppSyncTransportError::failed(error.to_string()))?; + runtime.block_on(fetch_app_events_from_relays_async(relay_urls, filter)) +} + +async fn fetch_app_events_from_relays_async( + relay_urls: &[String], + filter: RadrootsNostrFilter, +) -> Result<AppDirectRelayFetchReceipt, AppSyncTransportError> { + let client = RadrootsNostrClient::new_signerless(); + + for relay_url in relay_urls { + client + .add_read_relay(relay_url) + .await + .map_err(|source| AppSyncTransportError::failed(source.to_string()))?; + } + + let connection_output = client.try_connect(APP_DIRECT_RELAY_CONNECT_TIMEOUT).await; + let failed_relays = app_relay_failures_from_output(&connection_output)?; + if connection_output.success.is_empty() { + return Err(AppSyncTransportError::unavailable(format!( + "direct relay app ingest connection failed: {}", + summarize_app_relay_failures(&failed_relays) + ))); + } + + let events = client + .fetch_events( + filter, + StdDuration::from_millis(APP_DIRECT_RELAY_SYNC_TIMEOUT_MS), + ) + .await + .map_err(|source| AppSyncTransportError::failed(source.to_string()))?; + + Ok(AppDirectRelayFetchReceipt { + target_relays: relay_urls.to_vec(), + connected_relays: connection_output + .success + .iter() + .map(ToString::to_string) + .collect(), + failed_relays, + events, + }) +} + +fn app_direct_relay_ingest_filter() -> RadrootsNostrFilter { + RadrootsNostrFilter::new() + .kinds( + APP_DIRECT_RELAY_INGEST_KINDS + .iter() + .copied() + .map(radroots_nostr_kind), + ) + .limit(APP_DIRECT_RELAY_INGEST_LIMIT) +} + +fn app_relay_failures_from_output<T: fmt::Debug>( + output: &RadrootsNostrOutput<T>, +) -> Result<Vec<RelayDeliveryFailure>, AppSyncTransportError> { + output + .failed + .iter() + .map(|(relay, reason)| { + RelayDeliveryFailure::new(relay.to_string(), reason.to_string()) + .map_err(|source| AppSyncTransportError::failed(source.to_string())) + }) + .collect() +} + +fn summarize_app_relay_failures(failed_relays: &[RelayDeliveryFailure]) -> String { + if failed_relays.is_empty() { + return "no relay acknowledged the operation".to_owned(); + } + + failed_relays + .iter() + .map(|failure| format!("{}: {}", failure.relay_url, failure.error)) + .collect::<Vec<_>>() + .join("; ") +} + +fn merge_app_direct_relay_fetch_receipt( + merged: &mut Option<AppDirectRelayFetchReceipt>, + receipt: AppDirectRelayFetchReceipt, +) { + let Some(existing) = merged.as_mut() else { + *merged = Some(receipt); + return; + }; + + append_unique_relays(&mut existing.connected_relays, receipt.connected_relays); + for failure in receipt.failed_relays { + if !existing + .failed_relays + .iter() + .any(|known| known.relay_url == failure.relay_url && known.error == failure.error) + { + existing.failed_relays.push(failure); + } + } + let mut seen_event_ids = existing + .events + .iter() + .map(|event| event.id.to_hex()) + .collect::<BTreeSet<_>>(); + for event in receipt.events { + if seen_event_ids.insert(event.id.to_hex()) { + existing.events.push(event); + } + } +} + +fn append_unique_relays(target: &mut Vec<String>, relays: Vec<String>) { + for relay in relays { + if !target.iter().any(|known| known == &relay) { + target.push(relay); + } + } +} + +fn app_relay_event_records( + receipt: &AppDirectRelayFetchReceipt, + inserted_at_ms: i64, +) -> Result<Vec<LocalEventRecord>, AppDirectRelayIngestError> { + let delivery_evidence = RelayDeliveryEvidence::acknowledged( + &receipt.target_relays, + &receipt.connected_relays, + &receipt.connected_relays, + receipt.failed_relays.clone(), + ) + .map_err(|source| AppSyncTransportError::failed(source.to_string()))?; + let relay_set_fingerprint = delivery_evidence.relay_set_fingerprint().ok_or_else(|| { + AppSyncTransportError::failed("app relay ingest requires a non-empty relay set") + })?; + let relay_delivery_json = delivery_evidence + .to_json_value() + .map_err(|source| AppSyncTransportError::failed(source.to_string()))?; + let mut records = Vec::with_capacity(receipt.events.len()); + + for (index, event) in receipt.events.iter().enumerate() { + let tags = app_event_tags(event); + let kind = app_event_kind(event); + let event_pubkey = event.pubkey.to_string(); + let listing_d_tag = app_event_tag_value(&tags, "d", 1); + let farm_id = app_relay_event_farm_id(kind, &tags); + let listing_addr = + app_relay_event_listing_addr(kind, &event_pubkey, listing_d_tag.as_deref()); + let created_at_ms = app_event_created_at_ms(event)?; + let local_seq = created_at_ms.saturating_add(i64::try_from(index).map_err(|_| { + AppSqliteError::InvalidProjection { + reason: "app relay ingest sequence must fit i64", + } + })?); + records.push(LocalEventRecord { + seq: local_seq, + change_seq: local_seq, + record_id: format!("app:relay_event:{}", event.id.to_hex()), + family: LocalRecordFamily::SignedEvent, + status: LocalRecordStatus::Published, + source_runtime: app_relay_event_source_runtime(kind, listing_d_tag.as_deref()), + created_at_ms, + inserted_at_ms, + updated_at_ms: inserted_at_ms, + owner_account_id: None, + owner_pubkey: Some(event_pubkey.clone()), + farm_id, + listing_addr, + local_work_json: None, + event_id: Some(event.id.to_hex()), + event_kind: Some(i64::from(kind)), + event_pubkey: Some(event_pubkey), + event_created_at: Some(app_event_created_at_i64(event)?), + event_tags_json: Some(json!(tags)), + event_content: Some(event.content.clone()), + event_sig: Some(event.sig.to_string()), + raw_event_json: Some(app_raw_event_json(event)?), + outbox_status: PublishOutboxStatus::Acknowledged, + relay_set_fingerprint: Some(relay_set_fingerprint.clone()), + relay_delivery_json: Some(relay_delivery_json.clone()), + }); + } + + Ok(records) +} + +fn app_relay_event_farm_id(kind: u16, tags: &[Vec<String>]) -> Option<String> { + match kind { + 30340 => app_event_tag_value(tags, "d", 1), + 30402 | 30403 => { + app_event_tag_value(tags, "a", 1).and_then(|address| app_address_d_tag(&address)) + } + _ => None, + } +} + +fn app_relay_event_listing_addr( + kind: u16, + event_pubkey: &str, + listing_d_tag: Option<&str>, +) -> Option<String> { + match kind { + 30402 | 30403 => listing_d_tag.map(|d_tag| format!("{kind}:{event_pubkey}:{d_tag}")), + _ => None, + } +} + +fn app_relay_event_source_runtime(kind: u16, d_tag: Option<&str>) -> SourceRuntime { + if matches!(kind, 30340 | 30402 | 30403) + && d_tag.is_some_and(|d_tag| decode_app_d_tag_uuid(d_tag).is_some()) + { + SourceRuntime::App + } else { + SourceRuntime::Cli + } +} + +fn app_event_kind(event: &RadrootsNostrEvent) -> u16 { + event.kind.as_u16() +} + +fn app_event_created_at_i64(event: &RadrootsNostrEvent) -> Result<i64, AppSqliteError> { + i64::try_from(event.created_at.as_secs()).map_err(|_| AppSqliteError::InvalidProjection { + reason: "app relay ingest event timestamp must fit i64", + }) +} + +fn app_event_created_at_ms(event: &RadrootsNostrEvent) -> Result<i64, AppSqliteError> { + app_event_created_at_i64(event)? + .checked_mul(1_000) + .ok_or(AppSqliteError::InvalidProjection { + reason: "app relay ingest event timestamp milliseconds must fit i64", + }) +} + +fn app_event_tags(event: &RadrootsNostrEvent) -> Vec<Vec<String>> { + event + .tags + .iter() + .map(|tag| tag.as_slice().to_vec()) + .collect() +} + +fn app_event_tag_value(tags: &[Vec<String>], tag_name: &str, index: usize) -> Option<String> { + tags.iter().find_map(|tag| { + (tag.first().map(String::as_str) == Some(tag_name)) + .then(|| tag.get(index)) + .flatten() + .map(String::as_str) + .map(str::trim) + .filter(|value| !value.is_empty()) + .map(str::to_owned) + }) +} + +fn app_raw_event_json(event: &RadrootsNostrEvent) -> Result<serde_json::Value, AppSqliteError> { + Ok(json!({ + "id": event.id.to_hex(), + "pubkey": event.pubkey.to_string(), + "created_at": app_event_created_at_i64(event)?, + "kind": u32::from(event.kind.as_u16()), + "tags": app_event_tags(event), + "content": event.content.clone(), + "sig": event.sig.to_string(), + })) +} + +fn app_address_d_tag(address: &str) -> Option<String> { + address + .rsplit(':') + .next() + .map(str::trim) + .filter(|value| !value.is_empty()) + .map(str::to_owned) +} + +fn decode_app_d_tag_uuid(value: &str) -> Option<Uuid> { + let mut decoded = Vec::with_capacity(16); + let mut buffer = 0u32; + let mut bits = 0u8; + for byte in value.trim().bytes() { + let digit = base64_url_digit(byte)?; + buffer = (buffer << 6) | u32::from(digit); + bits += 6; + while bits >= 8 { + bits -= 8; + decoded.push(((buffer >> bits) & 0xff) as u8); + buffer &= (1u32 << bits) - 1; + } + } + if bits > 0 && buffer != 0 { + return None; + } + if decoded.len() == 16 { + Uuid::from_slice(decoded.as_slice()).ok() + } else { + None + } +} + +fn base64_url_digit(byte: u8) -> Option<u8> { + match byte { + b'A'..=b'Z' => Some(byte - b'A'), + b'a'..=b'z' => Some(byte - b'a' + 26), + b'0'..=b'9' => Some(byte - b'0' + 52), + b'-' => Some(62), + b'_' => Some(63), + _ => None, + } +} + fn non_empty_string(value: &str) -> Option<String> { let trimmed = value.trim(); (!trimmed.is_empty()).then(|| trimmed.to_owned()) @@ -7066,12 +7542,13 @@ mod tests { HomeRoute, }; use radroots_app_sync::{ - AppFarmProfilePublishPayload, AppOrderRequestPublishPayload, AppPublishContext, - AppPublishPayload, AppPublishedOperationReceipt, AppSyncRequest, AppSyncResult, - AppSyncRunStatus, AppSyncTransport, AppSyncTransportError, PendingSyncOperation, - PendingSyncOperationState, RecordedAppSyncTransport, SyncAggregateRef, SyncCheckpointState, - SyncCheckpointStatus, SyncConflict, SyncConflictKind, SyncConflictResolutionStatus, - SyncConflictSeverity, SyncOperationKind, SyncTrigger, + AppFarmProfilePublishPayload, AppListingPublishPayload, AppOrderRequestPublishPayload, + AppPublishContext, AppPublishPayload, AppPublishedOperationReceipt, AppSyncRequest, + AppSyncResult, AppSyncRunStatus, AppSyncTransport, AppSyncTransportError, + PendingSyncOperation, PendingSyncOperationState, RecordedAppSyncTransport, + SyncAggregateRef, SyncCheckpointState, SyncCheckpointStatus, SyncConflict, + SyncConflictKind, SyncConflictResolutionStatus, SyncConflictSeverity, SyncOperationKind, + SyncTrigger, }; use radroots_identity::RadrootsIdentity; use radroots_local_events::{ @@ -7130,6 +7607,7 @@ mod tests { fn spawn() -> Self { let (url_tx, url_rx) = mpsc::channel(); let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let events: Arc<Mutex<Vec<serde_json::Value>>> = Arc::new(Mutex::new(Vec::new())); let join_handle = thread::spawn(move || { let runtime = TokioRuntimeBuilder::new_current_thread() .enable_all() @@ -7152,6 +7630,7 @@ mod tests { let Ok((stream, _)) = accepted else { break; }; + let events = events.clone(); tokio::spawn(async move { let Ok(websocket) = tokio_tungstenite::accept_async(stream).await else { return; @@ -7164,20 +7643,38 @@ mod tests { let Ok(value) = serde_json::from_str::<serde_json::Value>(text.as_str()) else { continue; }; - let Some(event_id) = value - .as_array() - .and_then(|items| match items.as_slice() { - [kind, event, ..] if kind.as_str() == Some("EVENT") => { - event.get("id").and_then(|id| id.as_str()) - } - _ => None, - }) - else { + let Some(items) = value.as_array() else { continue; }; - let response = json!(["OK", event_id, true, ""]).to_string(); - if writer.send(Message::Text(response.into())).await.is_err() { - break; + match items.as_slice() { + [kind, event, ..] if kind.as_str() == Some("EVENT") => { + let Some(event_id) = event.get("id").and_then(|id| id.as_str()) else { + continue; + }; + events.lock().expect("relay events lock").push(event.clone()); + let response = json!(["OK", event_id, true, ""]).to_string(); + if writer.send(Message::Text(response.into())).await.is_err() { + break; + } + } + [kind, subscription_id, filters @ ..] if kind.as_str() == Some("REQ") => { + let Some(subscription_id) = subscription_id.as_str() else { + continue; + }; + let snapshot = events.lock().expect("relay events lock").clone(); + for event in snapshot.iter().filter(|event| relay_event_matches_filters(event, filters)) { + let response = json!(["EVENT", subscription_id, event]).to_string(); + if writer.send(Message::Text(response.into())).await.is_err() { + break; + } + } + let response = json!(["EOSE", subscription_id]).to_string(); + if writer.send(Message::Text(response.into())).await.is_err() { + break; + } + } + [kind, ..] if kind.as_str() == Some("CLOSE") => break, + _ => {} } } }); @@ -7200,6 +7697,37 @@ mod tests { } } + fn relay_event_matches_filters( + event: &serde_json::Value, + filters: &[serde_json::Value], + ) -> bool { + filters.is_empty() + || filters + .iter() + .any(|filter| relay_event_matches_filter(event, filter)) + } + + fn relay_event_matches_filter(event: &serde_json::Value, filter: &serde_json::Value) -> bool { + let event_kind = event.get("kind").and_then(serde_json::Value::as_u64); + if let Some(kinds) = filter.get("kinds").and_then(serde_json::Value::as_array) + && !kinds + .iter() + .filter_map(serde_json::Value::as_u64) + .any(|kind| Some(kind) == event_kind) + { + return false; + } + + let event_created_at = event.get("created_at").and_then(serde_json::Value::as_u64); + if let Some(until) = filter.get("until").and_then(serde_json::Value::as_u64) + && event_created_at.is_some_and(|created_at| created_at > until) + { + return false; + } + + true + } + impl Drop for ThreadedAckRelay { fn drop(&mut self) { if let Some(shutdown_tx) = self.shutdown_tx.take() { @@ -7287,6 +7815,151 @@ mod tests { } #[test] + fn runtime_configured_relay_sync_triggers_ingest_listing_into_fresh_buyer_projection() { + let relay = ThreadedAckRelay::spawn(); + let manager = RadrootsNostrAccountsManager::new_in_memory(); + let account_id = manager + .generate_identity(Some("Farmer".to_owned()), true) + .expect("local signing account should generate"); + let identity = manager + .get_signing_identity(&account_id) + .expect("seller signing lookup should succeed") + .expect("seller account should have local signer"); + let farm_id = FarmId::new(); + let product_id = ProductId::new(); + let farm_payload = AppPublishPayload::FarmProfile(AppFarmProfilePublishPayload { + context: AppPublishContext::new(account_id.to_string(), "relay_ingest_farm"), + farm_id, + display_name: "Relay test farm".to_owned(), + readiness: Some(FarmReadiness::Ready), + }); + let listing_payload = AppPublishPayload::Listing(AppListingPublishPayload { + context: AppPublishContext::new(account_id.to_string(), "relay_ingest_listing"), + product_id, + listing_d_tag: Some(super::d_tag_from_uuid(product_id.as_uuid())), + farm_id: Some(farm_id), + farm_pubkey: Some(identity.public_key_hex()), + farm_d_tag: Some(super::d_tag_from_uuid(farm_id.as_uuid())), + title: "Relay ingest lettuce".to_owned(), + subtitle: Some("Pulled into a fresh buyer app".to_owned()), + category: Some("greens".to_owned()), + unit_label: "each".to_owned(), + price_minor_units: Some(450), + price_currency: "USD".to_owned(), + stock_quantity: Some(6), + availability_window_id: Some(FulfillmentWindowId::new()), + availability_starts_at: Some("2099-04-25T14:00:00Z".to_owned()), + availability_ends_at: Some("2099-04-25T18:00:00Z".to_owned()), + fulfillment_method: Some("pickup".to_owned()), + fulfillment_location: Some("Relay barn".to_owned()), + status: ProductStatus::Published, + }); + let mut transport = + SdkDirectRelayAppSyncTransport::with_relay_urls(manager, vec![relay.url().to_owned()]); + let result = transport + .sync(AppSyncRequest { + trigger: SyncTrigger::ManualRefresh, + checkpoint: SyncCheckpointStatus::never_synced(), + pending_operations: vec![ + PendingSyncOperation::from_publish_payload( + farm_payload, + "2026-05-25T07:00:00Z", + ) + .expect("farm publish payload should serialize"), + PendingSyncOperation::from_publish_payload( + listing_payload, + "2026-05-25T07:00:01Z", + ) + .expect("listing publish payload should serialize"), + ], + known_conflicts: Vec::new(), + }) + .expect("seller relay publish should succeed"); + assert_eq!(result.run_status, AppSyncRunStatus::Succeeded); + assert_eq!(result.published_receipts.len(), 2); + + assert_fresh_buyer_relay_ingest( + relay.url(), + "relay_ingest_manual_refresh", + SyncTrigger::ManualRefresh, + product_id, + ); + assert_fresh_buyer_relay_ingest( + relay.url(), + "relay_ingest_app_launch", + SyncTrigger::AppLaunch, + product_id, + ); + assert_fresh_buyer_relay_ingest( + relay.url(), + "relay_ingest_foreground_resume", + SyncTrigger::ForegroundResume, + product_id, + ); + } + + fn assert_fresh_buyer_relay_ingest( + relay_url: &str, + label: &str, + trigger: SyncTrigger, + product_id: ProductId, + ) { + let (runtime, paths) = bootstrapped_runtime(label); + assert!( + runtime + .generate_local_account(Some("Buyer".to_owned())) + .expect("buyer account should generate") + ); + runtime.lock_state_mut().nostr_relay_urls = vec![relay_url.to_owned()]; + + let changed = match trigger { + SyncTrigger::ManualRefresh => runtime + .sync_on_manual_refresh() + .expect("manual relay ingest should complete"), + SyncTrigger::AppLaunch => runtime + .sync_on_app_launch() + .expect("launch relay ingest should complete"), + SyncTrigger::ForegroundResume => runtime + .sync_on_foreground_resume() + .expect("foreground relay ingest should complete"), + SyncTrigger::LocalMutation => panic!("local mutation is not a relay ingest trigger"), + }; + assert!(changed); + + let summary = runtime.summary(); + let listing = summary + .personal_projection + .browse + .listings + .rows + .iter() + .find(|listing| listing.product_id == product_id) + .expect("fresh buyer app should project relay listing"); + assert_eq!(listing.title, "Relay ingest lettuce"); + assert_eq!(listing.farm_display_name, "Relay test farm"); + assert_eq!(listing.listing_relays, vec![relay_url.to_owned()]); + + let product_id_string = product_id.to_string(); + let imports = runtime + .lock_state() + .sqlite_store + .as_ref() + .expect("sqlite store") + .load_local_interop_records() + .expect("local interop records should load"); + assert_eq!( + imports + .iter() + .filter(|record| record.projected_kind == "listing" + && record.projected_id.as_deref() == Some(product_id_string.as_str())) + .count(), + 1 + ); + + cleanup_bootstrapped_runtime_paths(&paths); + } + + #[test] fn runtime_direct_relay_transport_returns_partial_failure_after_successful_prefix() { let relay = ThreadedAckRelay::spawn(); let manager = RadrootsNostrAccountsManager::new_in_memory(); diff --git a/crates/shared/sqlite/src/local_interop.rs b/crates/shared/sqlite/src/local_interop.rs @@ -121,6 +121,22 @@ impl<'a> AppLocalInteropRepository<'a> { Ok(report) } + pub fn import_records( + &self, + records: &[LocalEventRecord], + ) -> Result<AppLocalInteropImportReport, AppSqliteError> { + let mut report = AppLocalInteropImportReport::default(); + for record in records { + report.scanned_records += 1; + report.last_change_seq = Some(record.change_seq); + match self.import_record(record)? { + ImportOutcome::Imported => report.imported_records += 1, + ImportOutcome::Skipped => report.skipped_records += 1, + } + } + Ok(report) + } + pub fn load_records(&self) -> Result<Vec<StoredLocalInteropRecord>, AppSqliteError> { let mut statement = self .connection @@ -223,6 +239,9 @@ impl<'a> AppLocalInteropRepository<'a> { } fn import_record(&self, record: &LocalEventRecord) -> Result<ImportOutcome, AppSqliteError> { + if self.is_duplicate_signed_event(record)? { + return Ok(ImportOutcome::Skipped); + } let projection = match record.family { LocalRecordFamily::LocalWork => self.import_local_work(record)?, LocalRecordFamily::SignedEvent => self.import_signed_event(record)?, @@ -239,6 +258,35 @@ impl<'a> AppLocalInteropRepository<'a> { } } + fn is_duplicate_signed_event(&self, record: &LocalEventRecord) -> Result<bool, AppSqliteError> { + if record.family != LocalRecordFamily::SignedEvent { + return Ok(false); + } + let Some(event_id) = record + .event_id + .as_deref() + .map(str::trim) + .filter(|event_id| !event_id.is_empty()) + else { + return Ok(false); + }; + self.connection + .query_row( + "SELECT EXISTS( + SELECT 1 + FROM local_interop_imports + WHERE event_id = ?1 + AND record_id <> ?2 + )", + params![event_id, record.record_id.as_str()], + |row| row.get::<_, bool>(0), + ) + .map_err(|source| AppSqliteError::Query { + operation: "check duplicate local interop signed event", + source, + }) + } + fn import_local_work( &self, record: &LocalEventRecord, @@ -1049,6 +1097,13 @@ impl AppSqliteStore { self.local_interop_repository().import_from_store(store) } + pub fn import_local_event_records( + &self, + records: &[LocalEventRecord], + ) -> Result<AppLocalInteropImportReport, AppSqliteError> { + self.local_interop_repository().import_records(records) + } + pub fn load_local_interop_records( &self, ) -> Result<Vec<StoredLocalInteropRecord>, AppSqliteError> { @@ -2603,6 +2658,46 @@ mod tests { } #[test] + fn direct_record_import_dedupes_signed_events_by_event_id() { + let app_store = + AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app sqlite store"); + let events = local_events_store(); + let farm_key = "SIGNEDFARMAAAAAAAAAAAA"; + let listing_key = "SIGNEDLISTINGBBBBBBBB"; + let first = events + .append_record(&signed_listing_record( + "shared-record", + farm_key, + listing_key, + "active", + )) + .expect("append shared signed listing"); + let mut duplicate = signed_listing_record("relay-record", farm_key, listing_key, "active"); + duplicate.event_id = first.event_id.clone(); + let duplicate = events + .append_record(&duplicate) + .expect("append relay signed listing"); + + let report = app_store + .import_local_event_records(&[first, duplicate]) + .expect("direct records should import"); + let imported = app_store + .load_local_interop_records() + .expect("load imported records"); + + assert_eq!(report.scanned_records, 2); + assert_eq!(report.imported_records, 1); + assert_eq!(report.skipped_records, 1); + assert_eq!( + imported + .iter() + .filter(|record| record.projected_kind == "listing") + .count(), + 1 + ); + } + + #[test] fn maps_acknowledged_signed_listing_lifecycle_statuses() { for (status_tag, expected_product_status) in [ ("active", "published"),