app

Local-first trade for farms and co-ops
git clone https://radroots.dev/git/app.git
Log | Files | Refs | README | LICENSE

commit 0d194b9c2846be818ff905173dfabed4505a8648
parent 51d995a8d4dcc2f1e3c1d79ee1025cd094526fd1
Author: triesap <tyson@radroots.org>
Date:   Mon, 25 May 2026 20:34:39 +0000

sync: use observed relay provenance

- record relay-fetched events with observed delivery evidence instead of publish acknowledgement
- keep listing relay provenance limited to acknowledged or explicitly observed relays
- project observed signed listings as published without marking outbox acknowledgement
- cover conservative relay provenance and observed listing projection paths

Diffstat:
Mcrates/launchers/desktop/src/runtime.rs | 97++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
Mcrates/shared/sqlite/src/buyer.rs | 70++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------
Mcrates/shared/sqlite/src/local_interop.rs | 60++++++++++++++++++++++++++++++++++++++++++++++++++++++++----
3 files changed, 214 insertions(+), 13 deletions(-)

diff --git a/crates/launchers/desktop/src/runtime.rs b/crates/launchers/desktop/src/runtime.rs @@ -5176,10 +5176,15 @@ fn direct_relay_event_records( receipt: &AppDirectRelayFetchReceipt, inserted_at_ms: i64, ) -> Result<Vec<LocalEventRecord>, AppDirectRelayIngestError> { - let delivery_evidence = RelayDeliveryEvidence::acknowledged( + let observed_relays = if receipt.connected_relays.len() == 1 { + receipt.connected_relays.clone() + } else { + Vec::new() + }; + let delivery_evidence = RelayDeliveryEvidence::observed( &receipt.target_relays, &receipt.connected_relays, - &receipt.connected_relays, + observed_relays, receipt.failed_relays.clone(), ) .map_err(|source| AppSyncTransportError::failed(source.to_string()))?; @@ -5228,7 +5233,7 @@ fn direct_relay_event_records( event_content: Some(event.content.clone()), event_sig: Some(event.sig.to_string()), raw_event_json: Some(relay_raw_event_json(event)?), - outbox_status: PublishOutboxStatus::Acknowledged, + outbox_status: PublishOutboxStatus::None, relay_set_fingerprint: Some(relay_set_fingerprint.clone()), relay_delivery_json: Some(relay_delivery_json.clone()), }); @@ -7889,6 +7894,73 @@ mod tests { ); } + #[test] + fn runtime_relay_ingest_does_not_use_connected_relays_as_listing_provenance() { + let listing_relay = ThreadedAckRelay::spawn(); + let empty_relay = ThreadedAckRelay::spawn(); + let product_id = publish_relay_ingest_listing_fixture(&listing_relay); + let (runtime, paths) = bootstrapped_runtime("relay_ingest_connected_not_provenance"); + assert!( + runtime + .generate_local_account(Some("Buyer".to_owned())) + .expect("buyer account should generate") + ); + runtime.lock_state_mut().nostr_relay_urls = + vec![listing_relay.url().to_owned(), empty_relay.url().to_owned()]; + + assert!( + runtime + .sync_on_manual_refresh() + .expect("manual relay ingest should complete") + ); + + let summary = runtime.summary(); + let listing = summary + .personal_projection + .browse + .listings + .rows + .iter() + .find(|listing| listing.product_id == product_id) + .expect("fresh buyer app should project relay listing"); + assert_eq!(listing.title, "Relay ingest lettuce"); + assert!(listing.listing_relays.is_empty()); + + let product_id_string = product_id.to_string(); + let imports = runtime + .lock_state() + .sqlite_store + .as_ref() + .expect("sqlite store") + .load_local_interop_records() + .expect("local interop records should load"); + let listing_import = imports + .iter() + .find(|record| { + record.projected_kind == "listing" + && record.projected_id.as_deref() == Some(product_id_string.as_str()) + }) + .expect("listing import"); + let delivery = serde_json::from_str::<serde_json::Value>( + listing_import + .relay_delivery_json + .as_deref() + .expect("listing delivery evidence"), + ) + .expect("delivery json"); + + assert_eq!(listing_import.outbox_status, "none"); + assert_eq!(delivery["state"], json!("observed")); + assert_eq!(delivery["acknowledged_relays"], json!([])); + assert_eq!(delivery["observed_relays"], serde_json::Value::Null); + assert_eq!( + delivery["target_relays"], + json!([listing_relay.url(), empty_relay.url()]) + ); + + cleanup_bootstrapped_runtime_paths(&paths); + } + fn publish_relay_ingest_listing_fixture(relay: &ThreadedAckRelay) -> ProductId { let manager = RadrootsNostrAccountsManager::new_in_memory(); let account_id = manager @@ -8003,6 +8075,25 @@ mod tests { .expect("sqlite store") .load_local_interop_records() .expect("local interop records should load"); + let listing_import = imports + .iter() + .find(|record| { + record.projected_kind == "listing" + && record.projected_id.as_deref() == Some(product_id_string.as_str()) + }) + .expect("listing import"); + let delivery = serde_json::from_str::<serde_json::Value>( + listing_import + .relay_delivery_json + .as_deref() + .expect("listing delivery evidence"), + ) + .expect("delivery json"); + + assert_eq!(listing_import.outbox_status, "none"); + assert_eq!(delivery["state"], json!("observed")); + assert_eq!(delivery["acknowledged_relays"], json!([])); + assert_eq!(delivery["observed_relays"], json!([relay_url])); assert_eq!( imports .iter() diff --git a/crates/shared/sqlite/src/buyer.rs b/crates/shared/sqlite/src/buyer.rs @@ -2624,15 +2624,17 @@ fn listing_relays_from_json(value: Option<String>) -> Result<Vec<String>, AppSql return Ok(relays_from_json_array(relays)); } - for key in ["acknowledged_relays", "target_relays", "connected_relays"] { - let relays = value + let relay_key = match value.get("state").and_then(Value::as_str) { + Some("acknowledged") => Some("acknowledged_relays"), + Some("observed") => Some("observed_relays"), + _ => None, + }; + if let Some(key) = relay_key { + return Ok(value .get(key) .and_then(Value::as_array) .map(|relays| relays_from_json_array(relays)) - .unwrap_or_default(); - if !relays.is_empty() { - return Ok(relays); - } + .unwrap_or_default()); } Ok(Vec::new()) @@ -2663,6 +2665,7 @@ mod tests { PickupLocationId, ProductId, }; use rusqlite::{Connection, params}; + use serde_json::json; use crate::{ AppSqliteError, AppSqliteStore, BuyerOrderCoordinationState, BuyerRepeatDemandApplyOutcome, @@ -2672,6 +2675,61 @@ mod tests { use super::AppBuyerRepository; #[test] + fn listing_relays_from_json_uses_only_acknowledged_or_observed_relays() { + assert_eq!( + super::listing_relays_from_json(Some( + json!({ + "state": "acknowledged", + "target_relays": ["wss://target.example"], + "connected_relays": ["wss://connected.example"], + "acknowledged_relays": ["wss://ack.example"] + }) + .to_string() + )) + .expect("acknowledged relays"), + vec!["wss://ack.example"] + ); + assert_eq!( + super::listing_relays_from_json(Some( + json!({ + "state": "observed", + "target_relays": ["wss://target.example"], + "connected_relays": ["wss://connected.example"], + "observed_relays": ["wss://observed.example"] + }) + .to_string() + )) + .expect("observed relays"), + vec!["wss://observed.example"] + ); + assert!( + super::listing_relays_from_json(Some( + json!({ + "state": "observed", + "target_relays": ["wss://target.example"], + "connected_relays": ["wss://connected.example"], + "observed_relays": [] + }) + .to_string() + )) + .expect("unknown observed relays") + .is_empty() + ); + assert!( + super::listing_relays_from_json(Some( + json!({ + "state": "pending", + "target_relays": ["wss://target.example"], + "connected_relays": ["wss://connected.example"] + }) + .to_string() + )) + .expect("pending relays") + .is_empty() + ); + } + + #[test] fn buyer_listings_and_product_detail_follow_catalog_truth() { let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("store should open"); let connection = store.connection(); diff --git a/crates/shared/sqlite/src/local_interop.rs b/crates/shared/sqlite/src/local_interop.rs @@ -16,7 +16,7 @@ use radroots_events_codec::trade::{ }; use radroots_local_events::{ LocalEventRecord, LocalEventsStore, LocalRecordFamily, LocalRecordStatus, PublishOutboxStatus, - SourceRuntime, + RelayDeliveryEvidence, RelayDeliveryState, SourceRuntime, }; use radroots_sql_core::{SqlExecutor, SqliteExecutor}; use rusqlite::{Connection, OptionalExtension, params}; @@ -1759,9 +1759,7 @@ fn signed_listing_product_status( content: Option<&Value>, tags: Option<&Value>, ) -> Option<ProductStatus> { - if record.status != LocalRecordStatus::Published - || record.outbox_status != PublishOutboxStatus::Acknowledged - { + if !signed_listing_has_public_evidence(record) { return Some(ProductStatus::Draft); } match signed_listing_lifecycle(content, tags)? { @@ -1773,6 +1771,20 @@ fn signed_listing_product_status( } } +fn signed_listing_has_public_evidence(record: &LocalEventRecord) -> bool { + if record.status != LocalRecordStatus::Published { + return false; + } + if record.outbox_status == PublishOutboxStatus::Acknowledged { + return true; + } + record + .relay_delivery_json + .as_ref() + .and_then(|delivery| RelayDeliveryEvidence::from_json_value(delivery).ok()) + .is_some_and(|delivery| delivery.state == RelayDeliveryState::Observed) +} + fn signed_farm_readiness(content: &Value, tags: Option<&Value>) -> Option<FarmReadiness> { string_at(content, &["readiness"]) .or_else(|| { @@ -3850,6 +3862,46 @@ mod tests { } #[test] + fn maps_observed_signed_listing_as_published_without_outbox_acknowledgement() { + let app_store = + AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app sqlite store"); + let events = local_events_store(); + let farm_key = "AAAAAAAAAAAAAAAAAAAAAA"; + let listing_key = "BBBBBBBBBBBBBBBBBBBBBB"; + let mut record = signed_listing_record_with_publish_state( + "observed-listing", + farm_key, + listing_key, + "active", + LocalRecordStatus::Published, + PublishOutboxStatus::None, + ); + record.relay_delivery_json = Some(json!({ + "state": "observed", + "target_relays": ["ws://127.0.0.1:1234"], + "connected_relays": ["ws://127.0.0.1:1234"], + "acknowledged_relays": [], + "observed_relays": ["ws://127.0.0.1:1234"], + "failed_relays": [] + })); + events + .append_record(&record) + .expect("append observed signed listing"); + + let report = app_store + .import_shared_local_events_from_store(&events) + .expect("import observed signed listing"); + let product_status: String = app_store + .connection() + .query_row("SELECT status FROM products", [], |row| row.get(0)) + .expect("load product status"); + + assert_eq!(report.imported_records, 1); + assert_eq!(report.skipped_records, 0); + assert_eq!(product_status, "published"); + } + + #[test] fn unknown_acknowledged_signed_listing_status_is_not_published() { let app_store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app sqlite store");