app

Local-first trade for farms and co-ops
git clone https://radroots.dev/git/app.git
Log | Files | Refs | README | LICENSE

commit 168fbdf3fb7ae1bbed80e705da5a56fd87acf2d1
parent fe60ebaef9ae119b3627a38f35aca5a9a76590bd
Author: triesap <tyson@radroots.org>
Date:   Tue, 26 May 2026 03:19:07 +0000

runtime: accept relay-ingested order evidence

Diffstat:
Mcrates/launchers/desktop/src/runtime.rs | 121++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------------
Acrates/shared/sqlite/migrations/0021_local_interop_signed_event_evidence.sql | 17+++++++++++++++++
Mcrates/shared/sqlite/src/local_interop.rs | 175++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
Mcrates/shared/sqlite/src/migrations.rs | 4++++
Mcrates/shared/sync/src/lib.rs | 2++
5 files changed, 292 insertions(+), 27 deletions(-)

diff --git a/crates/launchers/desktop/src/runtime.rs b/crates/launchers/desktop/src/runtime.rs @@ -2342,7 +2342,7 @@ impl DesktopAppRuntimeState { reason: "seller order decision requires an undecided order", }); } - let request = self.resolve_seller_order_request_from_shared_events(order_id)?; + let request = self.resolve_seller_order_request_evidence(order_id)?; if request.payload.seller_pubkey.trim() != seller_pubkey.as_str() { return Err(AppSqliteError::InvalidProjection { reason: "seller order decision seller account does not match order seller", @@ -4470,17 +4470,43 @@ impl DesktopAppRuntimeState { sqlite_store.import_shared_local_events_from_path(database_path.as_path()) } - fn resolve_seller_order_request_from_shared_events( + fn resolve_seller_order_request_evidence( &self, order_id: OrderId, ) -> Result<ResolvedAppSellerOrderRequest, AppSqliteError> { - let store = self.open_shared_local_events_store()?; - let Some(store) = store else { + let mut matched_requests = BTreeMap::new(); + self.collect_seller_order_request_evidence_from_shared_events( + &order_id, + &mut matched_requests, + )?; + self.collect_seller_order_request_evidence_from_local_interop( + &order_id, + &mut matched_requests, + )?; + + if matched_requests.len() > 1 { return Err(AppSqliteError::InvalidProjection { - reason: "seller order decision requires shared signed order request evidence", + reason: "seller order decision found multiple signed order requests", }); + } + + matched_requests + .into_values() + .next() + .ok_or(AppSqliteError::InvalidProjection { + reason: "seller order decision requires signed order request evidence", + }) + } + + fn collect_seller_order_request_evidence_from_shared_events( + &self, + order_id: &OrderId, + matched_requests: &mut BTreeMap<String, ResolvedAppSellerOrderRequest>, + ) -> Result<(), AppSqliteError> { + let store = self.open_shared_local_events_store()?; + let Some(store) = store else { + return Ok(()); }; - let mut matched_request = None; let mut before = None; loop { @@ -4525,23 +4551,12 @@ impl DesktopAppRuntimeState { let Ok(envelope) = radroots_sdk::trade::parse_order_request(&event) else { continue; }; - let app_order_id = projected_order_id_from_trade_request( - envelope.payload.order_id.as_str(), - envelope.payload.buyer_pubkey.as_str(), + insert_seller_order_request_evidence( + order_id, + &event, + envelope.payload, + matched_requests, ); - if app_order_id != order_id { - continue; - } - if matched_request.is_some() { - return Err(AppSqliteError::InvalidProjection { - reason: "seller order decision found multiple signed order requests", - }); - } - matched_request = Some(ResolvedAppSellerOrderRequest { - request_event_id: event.id, - listing_event_id: listing_event_id_from_tags(&event.tags), - payload: envelope.payload, - }); } if before.is_none() || is_last_page { @@ -4549,9 +4564,34 @@ impl DesktopAppRuntimeState { } } - matched_request.ok_or(AppSqliteError::InvalidProjection { - reason: "seller order decision requires signed order request evidence", - }) + Ok(()) + } + + fn collect_seller_order_request_evidence_from_local_interop( + &self, + order_id: &OrderId, + matched_requests: &mut BTreeMap<String, ResolvedAppSellerOrderRequest>, + ) -> Result<(), AppSqliteError> { + let Some(sqlite_store) = self.sqlite_store.as_ref() else { + return Ok(()); + }; + let events = sqlite_store.load_local_interop_signed_events_by_kind(i64::from( + radroots_sdk::trade::RadrootsActiveTradeMessageType::TradeOrderRequested.kind(), + ))?; + + for event in events { + let Ok(envelope) = radroots_sdk::trade::parse_order_request(&event) else { + continue; + }; + insert_seller_order_request_evidence( + order_id, + &event, + envelope.payload, + matched_requests, + ); + } + + Ok(()) } fn open_shared_local_events_store( @@ -4980,6 +5020,7 @@ impl DesktopAppRuntimeState { let listing_addr = source_record .as_ref() .and_then(|record| record.listing_addr.clone()) + .or_else(|| receipt.listing_addr.clone()) .or_else(|| signed_event_listing_addr(receipt)); let event_record = LocalEventRecordInput { record_id: format!("app:signed_event:{}", receipt.event_id), @@ -6328,22 +6369,26 @@ fn published_operation_receipt( "direct relay app sync received non-relay receipt", )); }; - let (source_account_id, source_local_event_id) = match payload { + let (source_account_id, source_local_event_id, listing_addr) = match payload { AppPublishPayload::FarmProfile(payload) => ( payload.context.account_id.clone(), payload.context.source_local_event_id.clone(), + None, ), AppPublishPayload::Listing(payload) => ( payload.context.account_id.clone(), payload.context.source_local_event_id.clone(), + None, ), AppPublishPayload::OrderRequest(payload) => ( payload.context.account_id.clone(), payload.context.source_local_event_id.clone(), + payload.listing_addr.clone(), ), AppPublishPayload::OrderDecision(payload) => ( payload.context.account_id.clone(), payload.context.source_local_event_id.clone(), + Some(payload.listing_addr.clone()), ), }; let failed_relays = relay_receipt @@ -6381,6 +6426,7 @@ fn published_operation_receipt( operation_key: operation_key.to_owned(), source_account_id, source_local_event_id, + listing_addr, event_id: relay_receipt.event_id, event_kind: relay_receipt.event_kind, event_pubkey: relay_receipt.event.author.clone(), @@ -8108,6 +8154,28 @@ fn event_tags_from_value( .collect() } +fn insert_seller_order_request_evidence( + order_id: &OrderId, + event: &radroots_sdk::RadrootsNostrEvent, + payload: RadrootsTradeOrderRequested, + matched_requests: &mut BTreeMap<String, ResolvedAppSellerOrderRequest>, +) { + let app_order_id = projected_order_id_from_trade_request( + payload.order_id.as_str(), + payload.buyer_pubkey.as_str(), + ); + if app_order_id != *order_id { + return; + } + matched_requests + .entry(event.id.clone()) + .or_insert_with(|| ResolvedAppSellerOrderRequest { + request_event_id: event.id.clone(), + listing_event_id: listing_event_id_from_tags(&event.tags), + payload, + }); +} + fn listing_event_id_from_tags(tags: &[Vec<String>]) -> Option<String> { tags.iter().find_map(|tag| { if tag.first().map(String::as_str) == Some("listing_event") { @@ -17243,6 +17311,7 @@ mod tests { operation_key: "farm:upsert".to_owned(), source_account_id, source_local_event_id, + listing_addr: None, event_id: event_id.to_owned(), event_kind: 30340, event_pubkey: event_pubkey.to_owned(), diff --git a/crates/shared/sqlite/migrations/0021_local_interop_signed_event_evidence.sql b/crates/shared/sqlite/migrations/0021_local_interop_signed_event_evidence.sql @@ -0,0 +1,17 @@ +ALTER TABLE local_interop_imports + ADD COLUMN event_pubkey TEXT; + +ALTER TABLE local_interop_imports + ADD COLUMN event_created_at INTEGER; + +ALTER TABLE local_interop_imports + ADD COLUMN event_tags_json TEXT; + +ALTER TABLE local_interop_imports + ADD COLUMN event_content TEXT; + +ALTER TABLE local_interop_imports + ADD COLUMN event_sig TEXT; + +ALTER TABLE local_interop_imports + ADD COLUMN raw_event_json TEXT; diff --git a/crates/shared/sqlite/src/local_interop.rs b/crates/shared/sqlite/src/local_interop.rs @@ -209,6 +209,59 @@ impl<'a> AppLocalInteropRepository<'a> { .collect() } + pub fn load_signed_events_by_kind( + &self, + event_kind: i64, + ) -> Result<Vec<RadrootsNostrEvent>, AppSqliteError> { + let mut statement = self + .connection + .prepare( + "SELECT + event_id, + event_kind, + event_pubkey, + event_created_at, + event_tags_json, + event_content, + event_sig + FROM local_interop_imports + WHERE record_family = 'signed_event' + AND event_kind = ?1 + ORDER BY local_seq ASC, record_id ASC", + ) + .map_err(|source| AppSqliteError::Query { + operation: "prepare local interop signed event evidence query", + source, + })?; + let rows = statement + .query_map(params![event_kind], |row| { + Ok(StoredLocalInteropSignedEventEvidence { + event_id: row.get(0)?, + event_kind: row.get(1)?, + event_pubkey: row.get(2)?, + event_created_at: row.get(3)?, + event_tags_json: row.get(4)?, + event_content: row.get(5)?, + event_sig: row.get(6)?, + }) + }) + .map_err(|source| AppSqliteError::Query { + operation: "query local interop signed event evidence", + source, + })?; + let mut events = Vec::new(); + for row in rows { + let evidence = row.map_err(|source| AppSqliteError::Query { + operation: "read local interop signed event evidence row", + source, + })?; + if let Some(event) = signed_event_from_local_interop_evidence(&evidence)? { + events.push(event); + } + } + Ok(events) + } + fn last_imported_change_seq(&self) -> Result<i64, AppSqliteError> { match self.connection.query_row( "SELECT last_change_seq @@ -1304,6 +1357,22 @@ impl<'a> AppLocalInteropRepository<'a> { projected_kind: &str, projected_id: Option<String>, ) -> Result<(), AppSqliteError> { + let event_tags_json = record + .event_tags_json + .as_ref() + .map(serde_json::to_string) + .transpose() + .map_err(|_| AppSqliteError::InvalidProjection { + reason: "local interop event tags json must encode", + })?; + let raw_event_json = record + .raw_event_json + .as_ref() + .map(serde_json::to_string) + .transpose() + .map_err(|_| AppSqliteError::InvalidProjection { + reason: "local interop raw event json must encode", + })?; let relay_delivery_json = record .relay_delivery_json .as_ref() @@ -1328,10 +1397,16 @@ impl<'a> AppLocalInteropRepository<'a> { projected_id, event_id, event_kind, + event_pubkey, + event_created_at, + event_tags_json, + event_content, + event_sig, + raw_event_json, outbox_status, relay_delivery_json, imported_at - ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, strftime('%Y-%m-%dT%H:%M:%SZ', 'now')) ON CONFLICT(record_id) DO UPDATE SET local_seq = excluded.local_seq, record_family = excluded.record_family, @@ -1345,6 +1420,12 @@ impl<'a> AppLocalInteropRepository<'a> { projected_id = excluded.projected_id, event_id = excluded.event_id, event_kind = excluded.event_kind, + event_pubkey = excluded.event_pubkey, + event_created_at = excluded.event_created_at, + event_tags_json = excluded.event_tags_json, + event_content = excluded.event_content, + event_sig = excluded.event_sig, + raw_event_json = excluded.raw_event_json, outbox_status = excluded.outbox_status, relay_delivery_json = excluded.relay_delivery_json, imported_at = excluded.imported_at", @@ -1362,6 +1443,12 @@ impl<'a> AppLocalInteropRepository<'a> { projected_id.as_deref(), record.event_id.as_deref(), record.event_kind, + record.event_pubkey.as_deref(), + record.event_created_at, + event_tags_json.as_deref(), + record.event_content.as_deref(), + record.event_sig.as_deref(), + raw_event_json.as_deref(), record.outbox_status.as_str(), relay_delivery_json.as_deref(), ], @@ -1409,6 +1496,14 @@ impl AppSqliteStore { ) -> Result<Vec<StoredLocalInteropRecord>, AppSqliteError> { self.local_interop_repository().load_records() } + + pub fn load_local_interop_signed_events_by_kind( + &self, + event_kind: i64, + ) -> Result<Vec<RadrootsNostrEvent>, AppSqliteError> { + self.local_interop_repository() + .load_signed_events_by_kind(event_kind) + } } #[derive(Clone, Copy, Debug, Eq, PartialEq)] @@ -1424,6 +1519,17 @@ struct ProjectionRecord { } #[derive(Clone, Debug, Eq, PartialEq)] +struct StoredLocalInteropSignedEventEvidence { + event_id: Option<String>, + event_kind: Option<i64>, + event_pubkey: Option<String>, + event_created_at: Option<i64>, + event_tags_json: Option<String>, + event_content: Option<String>, + event_sig: Option<String>, +} + +#[derive(Clone, Debug, Eq, PartialEq)] struct ProductProjection { product_id: ProductId, farm_id: FarmId, @@ -1576,6 +1682,69 @@ fn signed_event_from_record( })) } +fn signed_event_from_local_interop_evidence( + evidence: &StoredLocalInteropSignedEventEvidence, +) -> Result<Option<RadrootsNostrEvent>, AppSqliteError> { + let Some(id) = evidence + .event_id + .as_deref() + .map(str::trim) + .filter(|value| !value.is_empty()) + else { + return Ok(None); + }; + let Some(author) = evidence + .event_pubkey + .as_deref() + .map(str::trim) + .filter(|value| !value.is_empty()) + else { + return Ok(None); + }; + let Some(kind) = evidence + .event_kind + .and_then(|kind| u32::try_from(kind).ok()) + else { + return Ok(None); + }; + let Some(created_at) = evidence + .event_created_at + .and_then(|created_at| u32::try_from(created_at).ok()) + else { + return Ok(None); + }; + let Some(sig) = evidence + .event_sig + .as_deref() + .map(str::trim) + .filter(|value| !value.is_empty()) + else { + return Ok(None); + }; + let Some(tags_json) = evidence.event_tags_json.as_deref() else { + return Ok(None); + }; + let tags_value = serde_json::from_str::<Value>(tags_json).map_err(|_| { + AppSqliteError::InvalidProjection { + reason: "local interop signed event tags must decode", + } + })?; + let Some(tags) = tags_from_json(&tags_value) else { + return Err(AppSqliteError::InvalidProjection { + reason: "local interop signed event tags must be an array", + }); + }; + Ok(Some(RadrootsNostrEvent { + id: id.to_owned(), + author: author.to_owned(), + created_at, + kind, + tags, + content: evidence.event_content.clone().unwrap_or_default(), + sig: sig.to_owned(), + })) +} + fn tags_from_json(value: &Value) -> Option<Vec<Vec<String>>> { value.as_array().map(|tags| { tags.iter() @@ -2634,6 +2803,9 @@ mod tests { let imported = app_store .load_local_interop_records() .expect("load imported records"); + let signed_evidence = app_store + .load_local_interop_signed_events_by_kind(KIND_ORDER_REQUEST) + .expect("load signed event evidence"); let buyer_context_key: String = app_store .connection() .query_row( @@ -2651,6 +2823,7 @@ mod tests { && record.event_kind == Some(KIND_ORDER_REQUEST) && record.event_id.as_deref() == Some("order-request-event-1")) ); + assert_eq!(signed_evidence, vec![event.clone()]); assert_eq!(orders.rows.len(), 1); assert_eq!(orders.rows[0].order_id, order_id); assert_eq!(orders.rows[0].status, OrderStatus::NeedsAction); diff --git a/crates/shared/sqlite/src/migrations.rs b/crates/shared/sqlite/src/migrations.rs @@ -84,6 +84,10 @@ const MIGRATIONS: &[Migration] = &[ version: 20, sql: include_str!("../migrations/0020_declined_order_status.sql"), }, + Migration { + version: 21, + sql: include_str!("../migrations/0021_local_interop_signed_event_evidence.sql"), + }, ]; pub fn latest_schema_version() -> u32 { diff --git a/crates/shared/sync/src/lib.rs b/crates/shared/sync/src/lib.rs @@ -445,6 +445,8 @@ pub struct AppPublishedOperationReceipt { pub operation_key: String, pub source_account_id: String, pub source_local_event_id: Option<String>, + #[serde(default)] + pub listing_addr: Option<String>, pub event_id: String, pub event_kind: u32, pub event_pubkey: String,