app

Local-first trade for farms and co-ops
git clone https://radroots.dev/git/app.git
Log | Files | Refs | README | LICENSE

commit 06e94cfc8efa54c65bdc13f97aeb9b41d58b6821
parent 168fbdf3fb7ae1bbed80e705da5a56fd87acf2d1
Author: triesap <tyson@radroots.org>
Date:   Tue, 26 May 2026 04:11:42 +0000

sqlite: preserve app order event evidence

Diffstat:
Mcrates/shared/sqlite/src/local_interop.rs | 303++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----
1 file changed, 287 insertions(+), 16 deletions(-)

diff --git a/crates/shared/sqlite/src/local_interop.rs b/crates/shared/sqlite/src/local_interop.rs @@ -304,8 +304,12 @@ impl<'a> AppLocalInteropRepository<'a> { } fn import_record(&self, record: &LocalEventRecord) -> Result<ImportOutcome, AppSqliteError> { - if self.is_duplicate_signed_event(record)? { - return Ok(ImportOutcome::Skipped); + match self.duplicate_signed_event_action(record)? { + DuplicateSignedEventAction::Import => {} + DuplicateSignedEventAction::ReplaceExisting(event_id) => { + self.delete_duplicate_signed_events(event_id.as_str(), record.record_id.as_str())?; + } + DuplicateSignedEventAction::Skip => return Ok(ImportOutcome::Skipped), } let projection = match record.family { LocalRecordFamily::LocalWork => self.import_local_work(record)?, @@ -323,9 +327,12 @@ impl<'a> AppLocalInteropRepository<'a> { } } - fn is_duplicate_signed_event(&self, record: &LocalEventRecord) -> Result<bool, AppSqliteError> { + fn duplicate_signed_event_action( + &self, + record: &LocalEventRecord, + ) -> Result<DuplicateSignedEventAction, AppSqliteError> { if record.family != LocalRecordFamily::SignedEvent { - return Ok(false); + return Ok(DuplicateSignedEventAction::Import); } let Some(event_id) = record .event_id @@ -333,23 +340,85 @@ impl<'a> AppLocalInteropRepository<'a> { .map(str::trim) .filter(|event_id| !event_id.is_empty()) else { - return Ok(false); + return Ok(DuplicateSignedEventAction::Import); }; - self.connection - .query_row( - "SELECT EXISTS( - SELECT 1 - FROM local_interop_imports - WHERE event_id = ?1 - AND record_id <> ?2 - )", - params![event_id, record.record_id.as_str()], - |row| row.get::<_, bool>(0), + let mut statement = self + .connection + .prepare( + "SELECT source_runtime, owner_account_id, local_status, outbox_status + FROM local_interop_imports + WHERE event_id = ?1 + AND record_id <> ?2 + AND record_family = 'signed_event'", ) .map_err(|source| AppSqliteError::Query { - operation: "check duplicate local interop signed event", + operation: "prepare duplicate local interop signed event query", source, + })?; + let rows = statement + .query_map(params![event_id, record.record_id.as_str()], |row| { + Ok(StoredSignedEventDuplicate { + source_runtime: row.get(0)?, + owner_account_id: row.get(1)?, + local_status: row.get(2)?, + outbox_status: row.get(3)?, + }) }) + .map_err(|source| AppSqliteError::Query { + operation: "query duplicate local interop signed events", + source, + })?; + let mut existing_precedence = None; + for row in rows { + let duplicate = row.map_err(|source| AppSqliteError::Query { + operation: "read duplicate local interop signed event", + source, + })?; + existing_precedence = Some(existing_precedence.unwrap_or(0).max( + signed_event_evidence_precedence( + duplicate.source_runtime.as_str(), + duplicate.owner_account_id.as_deref(), + duplicate.local_status.as_str(), + duplicate.outbox_status.as_str(), + ), + )); + } + let Some(existing_precedence) = existing_precedence else { + return Ok(DuplicateSignedEventAction::Import); + }; + let incoming_precedence = signed_event_evidence_precedence( + record.source_runtime.as_str(), + record.owner_account_id.as_deref(), + record.status.as_str(), + record.outbox_status.as_str(), + ); + if incoming_precedence > existing_precedence { + Ok(DuplicateSignedEventAction::ReplaceExisting( + event_id.to_owned(), + )) + } else { + Ok(DuplicateSignedEventAction::Skip) + } + } + + fn delete_duplicate_signed_events( + &self, + event_id: &str, + record_id: &str, + ) -> Result<(), AppSqliteError> { + self.connection + .execute( + "DELETE FROM local_interop_imports + WHERE event_id = ?1 + AND record_id <> ?2 + AND record_family = 'signed_event'", + params![event_id, record_id], + ) + .map_err(|source| AppSqliteError::Query { + operation: "delete superseded duplicate local interop signed event", + source, + })?; + Ok(()) } fn import_local_work( @@ -1513,12 +1582,27 @@ enum ImportOutcome { } #[derive(Clone, Debug, Eq, PartialEq)] +enum DuplicateSignedEventAction { + Import, + ReplaceExisting(String), + Skip, +} + +#[derive(Clone, Debug, Eq, PartialEq)] struct ProjectionRecord { kind: &'static str, projected_id: Option<String>, } #[derive(Clone, Debug, Eq, PartialEq)] +struct StoredSignedEventDuplicate { + source_runtime: String, + owner_account_id: Option<String>, + local_status: String, + outbox_status: String, +} + +#[derive(Clone, Debug, Eq, PartialEq)] struct StoredLocalInteropSignedEventEvidence { event_id: Option<String>, event_kind: Option<i64>, @@ -1554,6 +1638,31 @@ struct ExistingListingProjection { farm_key: Option<String>, } +fn signed_event_evidence_precedence( + source_runtime: &str, + owner_account_id: Option<&str>, + local_status: &str, + outbox_status: &str, +) -> u8 { + let mut precedence = 0; + if local_status == LocalRecordStatus::Published.as_str() { + precedence += 1; + } + if outbox_status == PublishOutboxStatus::Acknowledged.as_str() { + precedence += 2; + } + if owner_account_id + .map(str::trim) + .is_some_and(|owner_account_id| !owner_account_id.is_empty()) + { + precedence += 4; + } + if source_runtime == SourceRuntime::App.as_str() { + precedence += 8; + } + precedence +} + fn deterministic_farm_id(owner_pubkey: Option<&str>, farm_key: &str) -> FarmId { FarmId::from(deterministic_uuid( "radroots-cli-farm", @@ -4007,6 +4116,168 @@ mod tests { } #[test] + fn app_order_request_receipt_replaces_prior_relay_duplicate() { + let app_store = + AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app sqlite store"); + let events = local_events_store(); + let listing_addr = "30402:seller-pubkey:app-order-listing"; + let payload = order_request_payload( + "app-order-receipt-replaces-relay", + listing_addr, + "buyer-pubkey", + "seller-pubkey", + ); + let parts = + active_trade_order_request_event_build(&listing_event_ptr("listing-event"), &payload) + .expect("build order request event"); + let event = event_from_parts("app-order-request-event", "buyer-pubkey", parts); + let mut relay_record = signed_order_event_record( + "app:relay_event:order-request:duplicate", + &event, + listing_addr, + SourceRuntime::Cli, + None, + ); + relay_record.outbox_status = PublishOutboxStatus::None; + relay_record.relay_delivery_json = Some(json!({ + "state": "observed", + "observed_relays": ["ws://127.0.0.1:1234/"] + })); + let relay_record = events + .append_record(&relay_record) + .expect("append relay order request"); + let app_record = events + .append_record(&signed_order_event_record( + "app:signed_event:order-request:duplicate", + &event, + listing_addr, + SourceRuntime::App, + Some("acct_buyer"), + )) + .expect("append app order request receipt"); + + let report = app_store + .import_local_event_records(&[relay_record, app_record]) + .expect("import duplicate order request records"); + let imported = app_store + .load_local_interop_records() + .expect("load imported records"); + let stored = imported + .iter() + .find(|record| record.event_id.as_deref() == Some("app-order-request-event")) + .expect("app order request evidence"); + + assert_eq!(report.imported_records, 2); + assert_eq!(report.skipped_records, 0); + assert_eq!( + imported + .iter() + .filter(|record| record.event_id.as_deref() == Some("app-order-request-event")) + .count(), + 1 + ); + assert_eq!(stored.record_id, "app:signed_event:order-request:duplicate"); + assert_eq!(stored.source_runtime, SourceRuntime::App.as_str()); + assert_eq!(stored.owner_account_id.as_deref(), Some("acct_buyer")); + assert_eq!( + stored.outbox_status, + PublishOutboxStatus::Acknowledged.as_str() + ); + assert_eq!(stored.listing_addr.as_deref(), Some(listing_addr)); + } + + #[test] + fn relay_order_decision_duplicate_does_not_downgrade_app_receipt() { + let app_store = + AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app sqlite store"); + let events = local_events_store(); + let listing_addr = "30402:seller-pubkey:app-decision-listing"; + let request_payload = order_request_payload( + "app-decision-receipt", + listing_addr, + "buyer-pubkey", + "seller-pubkey", + ); + let request_parts = active_trade_order_request_event_build( + &listing_event_ptr("listing-event"), + &request_payload, + ) + .expect("build order request event"); + let request_event = + event_from_parts("app-decision-request-event", "buyer-pubkey", request_parts); + let decision_payload = accepted_order_decision_payload( + "app-decision-receipt", + listing_addr, + "buyer-pubkey", + "seller-pubkey", + ); + let decision_parts = active_trade_order_decision_event_build( + request_event.id.as_str(), + request_event.id.as_str(), + &decision_payload, + ) + .expect("build order decision event"); + let decision_event = + event_from_parts("app-order-decision-event", "seller-pubkey", decision_parts); + let app_record = events + .append_record(&signed_order_event_record( + "app:signed_event:order-decision:duplicate", + &decision_event, + listing_addr, + SourceRuntime::App, + Some("acct_seller"), + )) + .expect("append app order decision receipt"); + let mut relay_record = signed_order_event_record( + "app:relay_event:order-decision:duplicate", + &decision_event, + listing_addr, + SourceRuntime::Cli, + None, + ); + relay_record.outbox_status = PublishOutboxStatus::None; + relay_record.relay_delivery_json = Some(json!({ + "state": "observed", + "observed_relays": ["ws://127.0.0.1:1234/"] + })); + let relay_record = events + .append_record(&relay_record) + .expect("append relay order decision"); + + let report = app_store + .import_local_event_records(&[app_record, relay_record]) + .expect("import duplicate order decision records"); + let imported = app_store + .load_local_interop_records() + .expect("load imported records"); + let stored = imported + .iter() + .find(|record| record.event_id.as_deref() == Some("app-order-decision-event")) + .expect("app order decision evidence"); + + assert_eq!(report.imported_records, 1); + assert_eq!(report.skipped_records, 1); + assert_eq!( + imported + .iter() + .filter(|record| record.event_id.as_deref() == Some("app-order-decision-event")) + .count(), + 1 + ); + assert_eq!( + stored.record_id, + "app:signed_event:order-decision:duplicate" + ); + assert_eq!(stored.source_runtime, SourceRuntime::App.as_str()); + assert_eq!(stored.owner_account_id.as_deref(), Some("acct_seller")); + assert_eq!( + stored.outbox_status, + PublishOutboxStatus::Acknowledged.as_str() + ); + assert_eq!(stored.listing_addr.as_deref(), Some(listing_addr)); + } + + #[test] fn local_work_farm_import_preserves_duplicate_relay_signed_ready_farm() { let app_store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("open app sqlite store");