app

Local-first trade for farms and co-ops
git clone https://radroots.dev/git/app.git
Log | Files | Refs | README | LICENSE

commit 9ba9b1c16811a139c080274eb6a67f283b27c0a6
parent cf297f00c9938665709a347d55580755e562ea72
Author: triesap <tyson@radroots.org>
Date:   Mon, 25 May 2026 00:14:48 +0000

sync: add direct relay app transport

Diffstat:
MCargo.lock | 5+++++
MCargo.toml | 4++++
Mcrates/launchers/desktop/Cargo.toml | 5+++++
Mcrates/launchers/desktop/src/runtime.rs | 756+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
Mcrates/shared/sync/src/lib.rs | 20++++++++++++++++++++
Mcrates/shared/sync/src/publish.rs | 50++++++++++++++++++++++++++++++++++++++++++++++++++
6 files changed, 827 insertions(+), 13 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -5057,6 +5057,7 @@ name = "radroots_app" version = "0.1.0" dependencies = [ "chrono", + "futures-util", "gpui", "gpui-component", "gpui-component-assets", @@ -5068,15 +5069,19 @@ dependencies = [ "radroots_app_state", "radroots_app_sync", "radroots_app_ui", + "radroots_core", "radroots_identity", "radroots_local_events", "radroots_nostr", "radroots_nostr_accounts", "radroots_protected_store", + "radroots_sdk", "radroots_secret_vault", "radroots_sql_core", "serde_json", "thiserror 2.0.18", + "tokio", + "tokio-tungstenite", "tracing", "tracing-subscriber", "uuid", diff --git a/Cargo.toml b/Cargo.toml @@ -24,10 +24,12 @@ readme = "README.md" [workspace.dependencies] chrono = { version = "0.4", default-features = false, features = ["clock"] } +futures-util = "0.3" gpui = "0.2.2" gpui-component = "0.5.1" gpui-component-assets = "0.5.1" mf2_i18n = { git = "https://github.com/triesap/mf2_i18n.git", rev = "e2ad58d5863d9dd98f2f38d1f08b2140bf34b0a1" } +radroots_core = { path = "../lib/crates/core", default-features = false, features = ["std"] } radroots_identity = { path = "../lib/crates/identity" } radroots_local_events = { path = "../lib/crates/local_events", features = ["native"] } radroots_nostr = { path = "../lib/crates/nostr", features = ["client"] } @@ -49,6 +51,8 @@ rusqlite = { version = "0.39", features = ["bundled"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "2" +tokio = { version = "1", features = ["macros", "net", "rt", "sync"] } +tokio-tungstenite = "0.26.2" toml = "0.8" tracing = "0.1" tracing-appender = "0.2" diff --git a/crates/launchers/desktop/Cargo.toml b/crates/launchers/desktop/Cargo.toml @@ -9,14 +9,17 @@ publish = false [dependencies] chrono.workspace = true +futures-util.workspace = true gpui.workspace = true gpui-component.workspace = true gpui-component-assets.workspace = true +radroots_core.workspace = true radroots_identity.workspace = true radroots_nostr.workspace = true radroots_nostr_accounts.workspace = true radroots_protected_store.workspace = true radroots_secret_vault.workspace = true +radroots_sdk.workspace = true radroots_app_core.workspace = true radroots_app_i18n.workspace = true radroots_app_models.workspace = true @@ -29,6 +32,8 @@ radroots_local_events.workspace = true radroots_sql_core.workspace = true serde_json.workspace = true thiserror.workspace = true +tokio.workspace = true +tokio-tungstenite.workspace = true tracing.workspace = true uuid.workspace = true diff --git a/crates/launchers/desktop/src/runtime.rs b/crates/launchers/desktop/src/runtime.rs @@ -23,11 +23,12 @@ use radroots_app_models::{ PackDayExportBundle, PackDayExportInstanceId, PackDayExportStatus, PackDayHostHandoffKind, PackDayHostHandoffStatus, PackDayPrintKind, PackDayPrintStatus, PackDayProjection, PackDayScreenQueryState, PersonalSection, PickupLocationRecord, ProductEditorDraft, ProductId, - ProductsFilter, ProductsListProjection, ProductsSort, RecoveryKind, RecoveryQueueProjection, - RecoveryRecordId, RecoveryState, ReminderDeadlineProjection, ReminderDeliveryState, - ReminderFeedProjection, ReminderId, ReminderKind, ReminderLogEntryProjection, - ReminderLogProjection, ReminderSurface, ReminderUrgency, SettingsAccountProjection, - SettingsPreference, SettingsSection, ShellSection, TodayAgendaProjection, + ProductStatus, ProductsFilter, ProductsListProjection, ProductsSort, RecoveryKind, + RecoveryQueueProjection, RecoveryRecordId, RecoveryState, ReminderDeadlineProjection, + ReminderDeliveryState, ReminderFeedProjection, ReminderId, ReminderKind, + ReminderLogEntryProjection, ReminderLogProjection, ReminderSurface, ReminderUrgency, + SettingsAccountProjection, SettingsPreference, SettingsSection, ShellSection, + TodayAgendaProjection, }; use radroots_app_remote_signer::{ RadrootsAppRemoteSignerApprovedSession, RadrootsAppRemoteSignerPendingSession, @@ -47,21 +48,39 @@ use radroots_app_state::{ ProductsScreenProjection, ProductsScreenQueryState, derive_sync_projection, }; use radroots_app_sync::{ - AppSyncProjection, AppSyncRequest, AppSyncResult, AppSyncTransport, AppSyncTransportError, - PendingSyncOperation, SyncAggregateRef, SyncCheckpointStatus, SyncConflictSeverity, - SyncOperationKind, SyncTrigger, + AppListingPublishPayload, AppOrderRequestPublishPayload, AppPublishPayload, + AppPublishedOperationReceipt, AppSyncProjection, AppSyncRequest, AppSyncResult, + AppSyncTransport, AppSyncTransportError, PendingSyncOperation, SyncAggregateRef, + SyncCheckpointStatus, SyncConflictSeverity, SyncOperationKind, SyncTrigger, }; +use radroots_core::{ + RadrootsCoreCurrency, RadrootsCoreDecimal, RadrootsCoreMoney, RadrootsCoreQuantity, + RadrootsCoreQuantityPrice, RadrootsCoreUnit, +}; +use radroots_identity::RadrootsIdentity; use radroots_local_events::{ BUYER_ORDER_REQUEST_ACTOR_SOURCE_RESOLVED_ACCOUNT, BUYER_ORDER_REQUEST_ACTOR_SOURCE_UNRESOLVED_APP, BUYER_ORDER_REQUEST_DOCUMENT_KIND, - BUYER_ORDER_REQUEST_LOCAL_WORK_RECORD_KIND, LocalEventRecordInput, LocalEventsStore, - LocalRecordFamily, LocalRecordStatus, PublishOutboxStatus, SourceRuntime, + BUYER_ORDER_REQUEST_LOCAL_WORK_RECORD_KIND, LocalEventRecordInput, LocalEventRecordUpdate, + LocalEventsStore, LocalRecordFamily, LocalRecordStatus, PublishOutboxStatus, SourceRuntime, buyer_order_request_local_work_record_id, validate_buyer_order_request_local_work_payload, }; use radroots_nostr_accounts::prelude::RadrootsNostrAccountsManager; +use radroots_sdk::farm::{RadrootsFarm, RadrootsFarmRef}; +use radroots_sdk::listing::{ + RadrootsListing, RadrootsListingAvailability, RadrootsListingBin, + RadrootsListingDeliveryMethod, RadrootsListingLocation, RadrootsListingProduct, + RadrootsListingStatus, +}; +use radroots_sdk::trade::RadrootsTradeOrderRequested; +use radroots_sdk::{ + RadrootsNostrEventPtr, RadrootsSdkClient, RadrootsSdkConfig, RelayConfig, SdkEnvironment, + SdkPublishReceipt, SdkTransportMode, SdkTransportReceipt, SignerConfig, +}; use radroots_sql_core::SqliteExecutor; use serde_json::json; use thiserror::Error; +use tokio::runtime::Builder as TokioRuntimeBuilder; use tracing::error; use uuid::Uuid; @@ -88,6 +107,7 @@ use crate::remote_signer::{ const APP_DATABASE_FILE_NAME: &str = "app.sqlite3"; const SYNC_TRANSPORT_UNAVAILABLE_MESSAGE: &str = "remote sync transport is not configured"; +const APP_DIRECT_RELAY_SYNC_TIMEOUT_MS: u64 = 2_000; #[derive(Debug, Default)] struct UnavailableAppSyncTransport; @@ -104,6 +124,105 @@ fn default_sync_transport() -> Box<dyn AppSyncTransport + Send> { Box::new(UnavailableAppSyncTransport) } +#[derive(Clone)] +struct SdkDirectRelayAppSyncTransport { + accounts_manager: RadrootsNostrAccountsManager, + relay_urls: Vec<String>, + timeout_ms: u64, +} + +impl SdkDirectRelayAppSyncTransport { + fn new( + accounts_manager: RadrootsNostrAccountsManager, + default_nostr_relay_url: String, + ) -> Self { + Self { + accounts_manager, + relay_urls: vec![default_nostr_relay_url], + timeout_ms: APP_DIRECT_RELAY_SYNC_TIMEOUT_MS, + } + } + + #[cfg(test)] + fn with_relay_urls( + accounts_manager: RadrootsNostrAccountsManager, + relay_urls: Vec<String>, + ) -> Self { + Self { + accounts_manager, + relay_urls, + timeout_ms: APP_DIRECT_RELAY_SYNC_TIMEOUT_MS, + } + } + + fn sync_with_sdk( + &self, + request: AppSyncRequest, + ) -> Result<AppSyncResult, AppSyncTransportError> { + let identity = self + .accounts_manager + .default_signing_identity() + .map_err(|error| AppSyncTransportError::failed(error.to_string()))? + .ok_or_else(|| { + AppSyncTransportError::unavailable( + "selected account is not backed by a local signing key", + ) + })?; + let relay_urls = normalized_app_sync_relay_urls(&self.relay_urls)?; + let client = direct_relay_sdk_client(relay_urls, self.timeout_ms)?; + let mut published_receipts = Vec::new(); + + for operation in &request.pending_operations { + if operation.operation != SyncOperationKind::Upsert { + return Err(AppSyncTransportError::failed( + "direct relay app sync supports upsert publish work only", + )); + } + let publish_payload = operation.publish_payload().map_err(|error| { + AppSyncTransportError::failed(format!( + "pending app sync operation is not a typed publish payload: {error}" + )) + })?; + publish_payload.validate().map_err(|error| { + let reason_codes = error + .reason_codes + .into_iter() + .map(|reason| reason.storage_key()) + .collect::<Vec<_>>() + .join(","); + AppSyncTransportError::failed(format!( + "pending app publish work is blocked: {reason_codes}" + )) + })?; + let receipt = publish_app_payload_sync(&client, &identity, &publish_payload)?; + published_receipts.push(app_published_operation_receipt( + operation.operation_key.as_str(), + &publish_payload, + receipt, + )?); + } + + Ok(AppSyncResult { + run_status: radroots_app_sync::AppSyncRunStatus::Succeeded, + checkpoint: SyncCheckpointStatus::current( + request.checkpoint.last_sync_started_at.clone(), + current_utc_timestamp(), + request.checkpoint.last_remote_cursor.clone(), + ), + pushed_operation_count: request.pending_operations.len(), + pulled_record_count: 0, + conflicts: request.known_conflicts, + published_receipts, + }) + } +} + +impl AppSyncTransport for SdkDirectRelayAppSyncTransport { + fn sync(&mut self, request: AppSyncRequest) -> Result<AppSyncResult, AppSyncTransportError> { + self.sync_with_sdk(request) + } +} + #[derive(Clone, Debug)] pub struct DesktopAppRuntime { state: Arc<Mutex<DesktopAppRuntimeState>>, @@ -1002,6 +1121,14 @@ impl DesktopAppRuntimeState { let _ = state_store.apply_in_memory(AppStateCommand::replace_sync_projection( selected_account_sync_context.projection, )); + let sync_transport: Box<dyn AppSyncTransport + Send> = + match accounts_bootstrap.accounts_manager.as_ref() { + Some(accounts_manager) => Box::new(SdkDirectRelayAppSyncTransport::new( + accounts_manager.clone(), + default_nostr_relay_url.clone(), + )), + None => default_sync_transport(), + }; let mut state = Self { state_store, default_nostr_relay_url, @@ -1009,7 +1136,7 @@ impl DesktopAppRuntimeState { remote_signer_paths: Some(remote_signer_paths), accounts_manager: accounts_bootstrap.accounts_manager, sqlite_store: Some(sqlite_store), - sync_transport: default_sync_transport(), + sync_transport, runtime_metadata: DesktopAppRuntimeMetadataSummary::available( runtime_snapshot, &paths, @@ -3182,6 +3309,7 @@ impl DesktopAppRuntimeState { pending_operations: &[StoredPendingSyncOperation], result: &AppSyncResult, ) -> Result<bool, AppSqliteError> { + self.record_published_sync_receipts(result.published_receipts.as_slice())?; { let Some(sqlite_store) = self.sqlite_store.as_ref() else { return Ok(false); @@ -3925,6 +4053,100 @@ impl DesktopAppRuntimeState { Ok(()) } + fn record_published_sync_receipts( + &self, + receipts: &[AppPublishedOperationReceipt], + ) -> Result<(), AppSqliteError> { + if receipts.is_empty() { + return Ok(()); + } + let Some(shared_accounts_paths) = self.shared_accounts_paths.as_ref() else { + return Ok(()); + }; + let Some(database_path) = + shared_local_events_database_path_from_shared_accounts(shared_accounts_paths) + else { + return Ok(()); + }; + if let Some(parent) = database_path.parent() { + fs::create_dir_all(parent).map_err(|source| AppSqliteError::CreateParentDirectory { + path: parent.to_path_buf(), + source, + })?; + } + let executor = SqliteExecutor::open(database_path.as_path()).map_err(|source| { + AppSqliteError::LocalEventsSql { + operation: "open shared local events database", + source, + } + })?; + let store = LocalEventsStore::new(executor); + store + .migrate_up() + .map_err(|source| AppSqliteError::LocalEventsSql { + operation: "migrate shared local events database", + source, + })?; + let timestamp = current_runtime_time_ms()?; + let owner_account_id = self + .state_store + .identity_projection() + .selected_account + .as_ref() + .map(|account| account.account.account_id.clone()); + + for receipt in receipts { + let event_record = LocalEventRecordInput { + record_id: format!("app:signed_event:{}", receipt.event_id), + family: LocalRecordFamily::SignedEvent, + status: LocalRecordStatus::Published, + source_runtime: SourceRuntime::App, + created_at_ms: i64::from(receipt.event_created_at) * 1_000, + inserted_at_ms: timestamp, + owner_account_id: owner_account_id.clone(), + owner_pubkey: Some(receipt.event_pubkey.clone()), + farm_id: None, + listing_addr: None, + local_work_json: None, + event_id: Some(receipt.event_id.clone()), + event_kind: Some(i64::from(receipt.event_kind)), + event_pubkey: Some(receipt.event_pubkey.clone()), + event_created_at: Some(i64::from(receipt.event_created_at)), + event_tags_json: Some(receipt.event_tags_json.clone()), + event_content: Some(receipt.event_content.clone()), + event_sig: Some(receipt.event_sig.clone()), + raw_event_json: Some(receipt.raw_event_json.clone()), + outbox_status: PublishOutboxStatus::Acknowledged, + relay_set_fingerprint: Some(receipt.relay_set_fingerprint.clone()), + relay_delivery_json: Some(receipt.relay_delivery_json.clone()), + }; + store + .append_record(&event_record) + .map_err(|source| AppSqliteError::LocalEvents { + operation: "append app published event record", + source, + })?; + + if let Some(source_record_id) = receipt.source_local_event_id.as_deref() { + store + .update_outbox(&LocalEventRecordUpdate { + record_id: source_record_id.to_owned(), + status: LocalRecordStatus::Published, + outbox_status: PublishOutboxStatus::Acknowledged, + relay_set_fingerprint: Some(receipt.relay_set_fingerprint.clone()), + relay_delivery_json: Some(receipt.relay_delivery_json.clone()), + updated_at_ms: timestamp, + }) + .map_err(|source| AppSqliteError::LocalEvents { + operation: "update app local work publish evidence", + source, + })?; + } + } + + Ok(()) + } + fn local_events_owner_pubkey( &self, account: &radroots_app_models::SelectedAccountProjection, @@ -4293,6 +4515,333 @@ fn current_runtime_time_ms() -> Result<i64, AppSqliteError> { }) } +fn normalized_app_sync_relay_urls( + relay_urls: &[String], +) -> Result<Vec<String>, AppSyncTransportError> { + let normalized = relay_urls + .iter() + .map(|relay| relay.trim().to_owned()) + .filter(|relay| !relay.is_empty()) + .collect::<Vec<_>>(); + if normalized.is_empty() { + return Err(AppSyncTransportError::unavailable( + "direct relay app sync requires at least one configured relay", + )); + } + Ok(normalized) +} + +fn direct_relay_sdk_client( + relay_urls: Vec<String>, + timeout_ms: u64, +) -> Result<RadrootsSdkClient, AppSyncTransportError> { + let mut config = RadrootsSdkConfig::for_environment(SdkEnvironment::Custom); + config.transport = SdkTransportMode::RelayDirect; + config.signer = SignerConfig::LocalIdentity; + config.relay = RelayConfig { urls: relay_urls }; + config.network.timeout_ms = timeout_ms; + RadrootsSdkClient::from_config(config) + .map_err(|error| AppSyncTransportError::failed(error.to_string())) +} + +fn publish_app_payload_sync( + client: &RadrootsSdkClient, + identity: &RadrootsIdentity, + payload: &AppPublishPayload, +) -> Result<SdkPublishReceipt, AppSyncTransportError> { + let runtime = TokioRuntimeBuilder::new_current_thread() + .enable_all() + .build() + .map_err(|error| AppSyncTransportError::failed(error.to_string()))?; + runtime.block_on(async { publish_app_payload(client, identity, payload).await }) +} + +async fn publish_app_payload( + client: &RadrootsSdkClient, + identity: &RadrootsIdentity, + payload: &AppPublishPayload, +) -> Result<SdkPublishReceipt, AppSyncTransportError> { + match payload { + AppPublishPayload::FarmProfile(payload) => { + let farm = RadrootsFarm { + d_tag: d_tag_from_uuid(payload.farm_id.as_uuid()), + name: payload.display_name.trim().to_owned(), + about: None, + website: None, + picture: None, + banner: None, + location: None, + tags: payload.readiness.map(|readiness| match readiness { + FarmReadiness::Incomplete => { + vec!["radroots:readiness:incomplete".to_owned()] + } + FarmReadiness::Ready => vec!["radroots:readiness:ready".to_owned()], + }), + }; + client + .farm() + .publish_with_identity(identity, &farm) + .await + .map_err(|error| AppSyncTransportError::failed(error.to_string())) + } + AppPublishPayload::Listing(payload) => { + let listing = listing_publish_payload_to_sdk_listing(payload)?; + client + .listing() + .publish_with_identity(identity, &listing) + .await + .map_err(|error| AppSyncTransportError::failed(error.to_string())) + } + AppPublishPayload::OrderRequest(payload) => { + let listing_event = order_request_listing_event_ptr(payload)?; + let order = order_request_publish_payload_to_sdk_order(payload)?; + client + .trade() + .publish_order_request_with_identity(identity, &listing_event, &order) + .await + .map_err(|error| AppSyncTransportError::failed(error.to_string())) + } + } +} + +fn listing_publish_payload_to_sdk_listing( + payload: &AppListingPublishPayload, +) -> Result<RadrootsListing, AppSyncTransportError> { + let currency = payload + .price_currency + .parse::<RadrootsCoreCurrency>() + .map_err(|error| AppSyncTransportError::failed(error.to_string()))?; + let unit = parse_app_listing_unit(payload.unit_label.as_str())?; + let price_minor_units = payload.price_minor_units.ok_or_else(|| { + AppSyncTransportError::failed("publishable listing requires price minor units") + })?; + let farm_id = payload + .farm_id + .ok_or_else(|| AppSyncTransportError::failed("publishable listing requires farm id"))?; + let farm_pubkey = payload + .farm_pubkey + .as_deref() + .ok_or_else(|| AppSyncTransportError::failed("publishable listing requires farm pubkey"))? + .trim() + .to_owned(); + let bin_id = "default".to_owned(); + + Ok(RadrootsListing { + d_tag: payload + .listing_d_tag + .as_deref() + .filter(|value| !value.trim().is_empty()) + .map(str::to_owned) + .unwrap_or_else(|| d_tag_from_uuid(payload.product_id.as_uuid())), + farm: RadrootsFarmRef { + pubkey: farm_pubkey, + d_tag: payload + .farm_d_tag + .as_deref() + .filter(|value| !value.trim().is_empty()) + .map(str::to_owned) + .unwrap_or_else(|| d_tag_from_uuid(farm_id.as_uuid())), + }, + product: RadrootsListingProduct { + key: payload.product_id.to_string(), + title: payload.title.trim().to_owned(), + category: payload + .category + .as_deref() + .unwrap_or_default() + .trim() + .to_owned(), + summary: payload + .subtitle + .as_deref() + .filter(|value| !value.trim().is_empty()) + .map(str::to_owned), + process: None, + lot: None, + location: None, + profile: None, + year: None, + }, + primary_bin_id: bin_id.clone(), + bins: vec![RadrootsListingBin { + bin_id, + quantity: RadrootsCoreQuantity::new(RadrootsCoreDecimal::from(1u32), unit), + price_per_canonical_unit: RadrootsCoreQuantityPrice::new( + RadrootsCoreMoney::from_minor_units_u32(price_minor_units, currency), + RadrootsCoreQuantity::new(RadrootsCoreDecimal::from(1u32), unit), + ), + display_amount: Some(RadrootsCoreDecimal::from(1u32)), + display_unit: Some(unit), + display_label: Some(payload.unit_label.trim().to_owned()), + display_price: Some(RadrootsCoreMoney::from_minor_units_u32( + price_minor_units, + currency, + )), + display_price_unit: Some(unit), + }], + resource_area: None, + plot: None, + discounts: None, + inventory_available: payload.stock_quantity.map(RadrootsCoreDecimal::from), + availability: Some(RadrootsListingAvailability::Status { + status: match payload.status { + ProductStatus::Published => RadrootsListingStatus::Active, + ProductStatus::Archived => RadrootsListingStatus::Sold, + other => RadrootsListingStatus::Other { + value: other.storage_key().to_owned(), + }, + }, + }), + delivery_method: Some(parse_app_listing_delivery_method( + payload.fulfillment_method.as_deref().unwrap_or_default(), + )?), + location: payload + .fulfillment_location + .as_deref() + .filter(|value| !value.trim().is_empty()) + .map(|primary| RadrootsListingLocation { + primary: primary.trim().to_owned(), + city: None, + region: None, + country: None, + lat: None, + lng: None, + geohash: None, + }), + images: None, + }) +} + +fn parse_app_listing_unit(value: &str) -> Result<RadrootsCoreUnit, AppSyncTransportError> { + match value.trim().to_ascii_lowercase().as_str() { + "each" | "ea" | "unit" | "units" => Ok(RadrootsCoreUnit::Each), + "kg" | "kilogram" | "kilograms" => Ok(RadrootsCoreUnit::MassKg), + "g" | "gram" | "grams" => Ok(RadrootsCoreUnit::MassG), + "oz" | "ounce" | "ounces" => Ok(RadrootsCoreUnit::MassOz), + "lb" | "pound" | "pounds" => Ok(RadrootsCoreUnit::MassLb), + "l" | "liter" | "liters" => Ok(RadrootsCoreUnit::VolumeL), + "ml" | "milliliter" | "milliliters" => Ok(RadrootsCoreUnit::VolumeMl), + other => Err(AppSyncTransportError::failed(format!( + "unsupported listing unit `{other}`" + ))), + } +} + +fn parse_app_listing_delivery_method( + value: &str, +) -> Result<RadrootsListingDeliveryMethod, AppSyncTransportError> { + match value.trim().to_ascii_lowercase().as_str() { + "pickup" | "local_pickup" => Ok(RadrootsListingDeliveryMethod::Pickup), + "delivery" | "local_delivery" => Ok(RadrootsListingDeliveryMethod::LocalDelivery), + "shipping" | "ship" => Ok(RadrootsListingDeliveryMethod::Shipping), + "" => Err(AppSyncTransportError::failed( + "publishable listing requires fulfillment method", + )), + other => Ok(RadrootsListingDeliveryMethod::Other { + method: other.to_owned(), + }), + } +} + +fn order_request_listing_event_ptr( + payload: &AppOrderRequestPublishPayload, +) -> Result<RadrootsNostrEventPtr, AppSyncTransportError> { + let listing_event_id = payload + .listing_event_id + .as_deref() + .ok_or_else(|| { + AppSyncTransportError::failed("order request publish requires listing event id") + })? + .trim() + .to_owned(); + let listing_relay = payload + .listing_relays + .iter() + .map(|relay| relay.trim()) + .find(|relay| !relay.is_empty()) + .ok_or_else(|| { + AppSyncTransportError::failed("order request publish requires listing relay") + })? + .to_owned(); + + Ok(RadrootsNostrEventPtr { + id: listing_event_id, + relays: Some(listing_relay), + }) +} + +fn order_request_publish_payload_to_sdk_order( + payload: &AppOrderRequestPublishPayload, +) -> Result<RadrootsTradeOrderRequested, AppSyncTransportError> { + let Some(document_json) = payload.order_document_json.as_ref() else { + return Err(AppSyncTransportError::failed( + "order request publish requires order document", + )); + }; + let order_json = document_json + .pointer("/document/order") + .or_else(|| document_json.get("order")) + .unwrap_or(document_json); + serde_json::from_value::<RadrootsTradeOrderRequested>(order_json.clone()) + .map_err(|error| AppSyncTransportError::failed(error.to_string())) +} + +fn app_published_operation_receipt( + operation_key: &str, + payload: &AppPublishPayload, + receipt: SdkPublishReceipt, +) -> Result<AppPublishedOperationReceipt, AppSyncTransportError> { + let SdkTransportReceipt::RelayDirect(relay_receipt) = receipt.transport_receipt else { + return Err(AppSyncTransportError::failed( + "direct relay app sync received non-relay receipt", + )); + }; + let source_local_event_id = match payload { + AppPublishPayload::FarmProfile(payload) => payload.context.source_local_event_id.clone(), + AppPublishPayload::Listing(payload) => payload.context.source_local_event_id.clone(), + AppPublishPayload::OrderRequest(payload) => payload.context.source_local_event_id.clone(), + }; + let failed_relays = relay_receipt + .failed_relays + .iter() + .map(|failure| { + json!({ + "relay_url": failure.relay_url, + "error": failure.error, + }) + }) + .collect::<Vec<_>>(); + let raw_event_json = json!({ + "id": relay_receipt.event.id.clone(), + "pubkey": relay_receipt.event.author.clone(), + "created_at": relay_receipt.event.created_at, + "kind": relay_receipt.event.kind, + "tags": relay_receipt.event.tags.clone(), + "content": relay_receipt.event.content.clone(), + "sig": relay_receipt.event.sig.clone(), + }); + + Ok(AppPublishedOperationReceipt { + operation_key: operation_key.to_owned(), + source_local_event_id, + event_id: relay_receipt.event_id, + event_kind: relay_receipt.event_kind, + event_pubkey: relay_receipt.event.author.clone(), + event_created_at: relay_receipt.event.created_at, + event_tags_json: json!(relay_receipt.event.tags), + event_content: relay_receipt.event.content.clone(), + event_sig: relay_receipt.signature, + raw_event_json, + relay_set_fingerprint: relay_receipt.target_relays.join("|"), + relay_delivery_json: json!({ + "target_relays": relay_receipt.target_relays, + "connected_relays": relay_receipt.connected_relays, + "acknowledged_relays": relay_receipt.acknowledged_relays, + "failed_relays": failed_relays, + }), + }) +} + fn d_tag_from_uuid(uuid: Uuid) -> String { base64_url_no_pad(uuid.as_bytes()) } @@ -5940,11 +6489,14 @@ mod tests { collections::BTreeSet, fs, path::PathBuf, + sync::mpsc, sync::{Arc, Mutex}, + thread, time::{SystemTime, UNIX_EPOCH}, }; use chrono::{Duration, Utc}; + use futures_util::{SinkExt, StreamExt}; use radroots_app_core::{ AppDesktopRuntimePaths, AppRuntimeHostEnvironment, AppRuntimePlatform, AppSharedAccountsPaths, SHARED_ACCOUNTS_STORE_FILE_NAME, SHARED_IDENTITY_FILE_NAME, @@ -5980,7 +6532,8 @@ mod tests { HomeRoute, }; use radroots_app_sync::{ - AppSyncRequest, AppSyncResult, AppSyncRunStatus, AppSyncTransport, AppSyncTransportError, + AppFarmProfilePublishPayload, AppPublishContext, AppPublishPayload, AppSyncRequest, + AppSyncResult, AppSyncRunStatus, AppSyncTransport, AppSyncTransportError, PendingSyncOperation, PendingSyncOperationState, RecordedAppSyncTransport, SyncAggregateRef, SyncCheckpointState, SyncCheckpointStatus, SyncConflict, SyncConflictKind, SyncConflictResolutionStatus, SyncConflictSeverity, SyncOperationKind, @@ -5997,6 +6550,9 @@ mod tests { }; use radroots_sql_core::SqliteExecutor; use serde_json::json; + use tokio::net::TcpListener; + use tokio::sync::oneshot; + use tokio_tungstenite::tungstenite::Message; use crate::accounts::DesktopLocalIdentityImportRequest; @@ -6004,7 +6560,7 @@ mod tests { APP_DATABASE_FILE_NAME, DesktopAppRuntime, DesktopAppRuntimeActivityContextError, DesktopAppRuntimeCommandError, DesktopAppRuntimeMetadataSummary, DesktopAppRuntimeState, DesktopAppSyncStatusSummary, DesktopRemoteSignerPaths, SYNC_TRANSPORT_UNAVAILABLE_MESSAGE, - default_sync_transport, is_hex_64, + SdkDirectRelayAppSyncTransport, TokioRuntimeBuilder, default_sync_transport, is_hex_64, }; use crate::pack_day_host_handoff::PackDayHostHandoffError; use crate::pack_day_print::{ @@ -6027,6 +6583,97 @@ mod tests { } } + struct ThreadedAckRelay { + url: String, + shutdown_tx: Option<oneshot::Sender<()>>, + join_handle: Option<thread::JoinHandle<()>>, + } + + impl ThreadedAckRelay { + fn spawn() -> Self { + let (url_tx, url_rx) = mpsc::channel(); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let join_handle = thread::spawn(move || { + let runtime = TokioRuntimeBuilder::new_current_thread() + .enable_all() + .build() + .expect("relay runtime should build"); + runtime.block_on(async move { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("test relay should bind"); + let url = format!( + "ws://{}", + listener.local_addr().expect("test relay local addr") + ); + url_tx.send(url).expect("relay url should send"); + let mut shutdown_rx = shutdown_rx; + loop { + tokio::select! { + _ = &mut shutdown_rx => break, + accepted = listener.accept() => { + let Ok((stream, _)) = accepted else { + break; + }; + tokio::spawn(async move { + let Ok(websocket) = tokio_tungstenite::accept_async(stream).await else { + return; + }; + let (mut writer, mut reader) = websocket.split(); + while let Some(message) = reader.next().await { + let Ok(Message::Text(text)) = message else { + continue; + }; + let Ok(value) = serde_json::from_str::<serde_json::Value>(text.as_str()) else { + continue; + }; + let Some(event_id) = value + .as_array() + .and_then(|items| match items.as_slice() { + [kind, event, ..] if kind.as_str() == Some("EVENT") => { + event.get("id").and_then(|id| id.as_str()) + } + _ => None, + }) + else { + continue; + }; + let response = json!(["OK", event_id, true, ""]).to_string(); + if writer.send(Message::Text(response.into())).await.is_err() { + break; + } + } + }); + } + } + } + }); + }); + let url = url_rx.recv().expect("relay url should be received"); + + Self { + url, + shutdown_tx: Some(shutdown_tx), + join_handle: Some(join_handle), + } + } + + fn url(&self) -> &str { + self.url.as_str() + } + } + + impl Drop for ThreadedAckRelay { + fn drop(&mut self) { + if let Some(shutdown_tx) = self.shutdown_tx.take() { + let _ = shutdown_tx.send(()); + } + if let Some(join_handle) = self.join_handle.take() { + let _ = join_handle.join(); + } + } + } + fn install_recorded_sync_transport( runtime: &DesktopAppRuntime, transport: RecordedAppSyncTransport, @@ -6038,6 +6685,84 @@ mod tests { } #[test] + fn runtime_direct_relay_transport_publishes_typed_farm_work() { + let relay = ThreadedAckRelay::spawn(); + let manager = RadrootsNostrAccountsManager::new_in_memory(); + let account_id = manager + .generate_identity(Some("Farmer".to_owned()), true) + .expect("local signing account should generate"); + let farm_id = FarmId::new(); + let payload = AppPublishPayload::FarmProfile(AppFarmProfilePublishPayload { + context: AppPublishContext::new(account_id.to_string(), "farm_setup") + .with_source_local_event_id("app:local_work:farm:direct"), + farm_id, + display_name: "North field farm".to_owned(), + readiness: Some(FarmReadiness::Ready), + }); + let operation = PendingSyncOperation::from_publish_payload(payload, "2026-05-24T12:00:00Z") + .expect("typed farm publish work should serialize"); + let mut transport = + SdkDirectRelayAppSyncTransport::with_relay_urls(manager, vec![relay.url().to_owned()]); + + let result = transport + .sync(AppSyncRequest { + trigger: SyncTrigger::ManualRefresh, + checkpoint: SyncCheckpointStatus::never_synced(), + pending_operations: vec![operation], + known_conflicts: Vec::new(), + }) + .expect("direct relay farm publish should succeed"); + + assert_eq!(result.run_status, AppSyncRunStatus::Succeeded); + assert_eq!(result.pushed_operation_count, 1); + assert_eq!(result.published_receipts.len(), 1); + assert_eq!(result.published_receipts[0].event_kind, 30340); + assert_eq!( + result.published_receipts[0] + .source_local_event_id + .as_deref(), + Some("app:local_work:farm:direct") + ); + assert_eq!( + result.published_receipts[0].relay_delivery_json["acknowledged_relays"], + json!([relay.url()]) + ); + } + + #[test] + fn runtime_direct_relay_transport_requires_local_signing_custody() { + let relay = ThreadedAckRelay::spawn(); + let manager = RadrootsNostrAccountsManager::new_in_memory(); + let farm_id = FarmId::new(); + let payload = AppPublishPayload::FarmProfile(AppFarmProfilePublishPayload { + context: AppPublishContext::new("missing", "farm_setup"), + farm_id, + display_name: "North field farm".to_owned(), + readiness: Some(FarmReadiness::Ready), + }); + let operation = PendingSyncOperation::from_publish_payload(payload, "2026-05-24T12:00:00Z") + .expect("typed farm publish work should serialize"); + let mut transport = + SdkDirectRelayAppSyncTransport::with_relay_urls(manager, vec![relay.url().to_owned()]); + + let error = transport + .sync(AppSyncRequest { + trigger: SyncTrigger::ManualRefresh, + checkpoint: SyncCheckpointStatus::never_synced(), + pending_operations: vec![operation], + known_conflicts: Vec::new(), + }) + .expect_err("watch-only or missing custody should not publish"); + + assert!(matches!(error, AppSyncTransportError::Unavailable { .. })); + assert!( + error + .to_string() + .contains("selected account is not backed by a local signing key") + ); + } + + #[test] fn desktop_namespace_uses_canonical_app_and_shared_runtime_roots() { let paths = AppDesktopRuntimePaths::for_desktop( AppRuntimePlatform::Macos, @@ -6372,6 +7097,7 @@ mod tests { pushed_operation_count: 1, pulled_record_count: 0, conflicts: Vec::new(), + published_receipts: Vec::new(), }), ); @@ -6422,6 +7148,7 @@ mod tests { pushed_operation_count: 1, pulled_record_count: 0, conflicts: Vec::new(), + published_receipts: Vec::new(), }), ); @@ -6486,6 +7213,7 @@ mod tests { pushed_operation_count: 1, pulled_record_count: 0, conflicts: Vec::new(), + published_receipts: Vec::new(), }), ); @@ -7177,6 +7905,7 @@ mod tests { pushed_operation_count: 1, pulled_record_count: 0, conflicts: Vec::new(), + published_receipts: Vec::new(), }), ); @@ -7325,6 +8054,7 @@ mod tests { pushed_operation_count: 1, pulled_record_count: 0, conflicts: Vec::new(), + published_receipts: Vec::new(), }), ); diff --git a/crates/shared/sync/src/lib.rs b/crates/shared/sync/src/lib.rs @@ -377,6 +377,8 @@ pub struct AppSyncResult { pub pushed_operation_count: usize, pub pulled_record_count: usize, pub conflicts: Vec<SyncConflict>, + #[serde(default)] + pub published_receipts: Vec<AppPublishedOperationReceipt>, } impl AppSyncResult { @@ -389,6 +391,22 @@ impl AppSyncResult { } } +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct AppPublishedOperationReceipt { + pub operation_key: String, + pub source_local_event_id: Option<String>, + pub event_id: String, + pub event_kind: u32, + pub event_pubkey: String, + pub event_created_at: u32, + pub event_tags_json: serde_json::Value, + pub event_content: String, + pub event_sig: String, + pub raw_event_json: serde_json::Value, + pub relay_set_fingerprint: String, + pub relay_delivery_json: serde_json::Value, +} + #[derive(Clone, Debug, Eq, Error, PartialEq)] pub enum AppSyncTransportError { #[error("app sync transport is unavailable: {message}")] @@ -575,6 +593,7 @@ mod tests { pushed_operation_count: 1, pulled_record_count: 3, conflicts: vec![conflict], + published_receipts: Vec::new(), }; assert_eq!(request.conflict_status().unresolved_count, 1); @@ -608,6 +627,7 @@ mod tests { pushed_operation_count: 0, pulled_record_count: 2, conflicts: vec![], + published_receipts: Vec::new(), }; let mut transport = RecordedAppSyncTransport::succeed(expected_result.clone()); diff --git a/crates/shared/sync/src/publish.rs b/crates/shared/sync/src/publish.rs @@ -77,14 +77,20 @@ pub struct AppFarmProfilePublishPayload { pub struct AppListingPublishPayload { pub context: AppPublishContext, pub product_id: ProductId, + pub listing_d_tag: Option<String>, pub farm_id: Option<FarmId>, + pub farm_pubkey: Option<String>, + pub farm_d_tag: Option<String>, pub title: String, pub subtitle: Option<String>, + pub category: Option<String>, pub unit_label: String, pub price_minor_units: Option<u32>, pub price_currency: String, pub stock_quantity: Option<u32>, pub availability_window_id: Option<FulfillmentWindowId>, + pub fulfillment_method: Option<String>, + pub fulfillment_location: Option<String>, pub status: ProductStatus, } @@ -100,6 +106,7 @@ pub struct AppOrderRequestPublishPayload { pub order_id: OrderId, pub farm_id: FarmId, pub status: Option<String>, + pub order_document_json: Option<serde_json::Value>, pub listing_addr: Option<String>, pub listing_event_id: Option<String>, pub listing_relays: Vec<String>, @@ -159,6 +166,20 @@ impl AppPublishPayload { if payload.farm_id.is_none() { failures.push(AppPublishValidationFailure::MissingListingFarmId); } + if payload + .farm_pubkey + .as_deref() + .is_none_or(|value| value.trim().is_empty()) + { + failures.push(AppPublishValidationFailure::MissingListingFarmPubkey); + } + if payload + .category + .as_deref() + .is_none_or(|value| value.trim().is_empty()) + { + failures.push(AppPublishValidationFailure::MissingListingCategory); + } if payload.title.trim().is_empty() { failures.push(AppPublishValidationFailure::MissingListingTitle); } @@ -174,9 +195,19 @@ impl AppPublishPayload { if payload.availability_window_id.is_none() { failures.push(AppPublishValidationFailure::MissingListingAvailability); } + if payload + .fulfillment_method + .as_deref() + .is_none_or(|value| value.trim().is_empty()) + { + failures.push(AppPublishValidationFailure::MissingListingFulfillmentMethod); + } } Self::OrderRequest(payload) => { payload.context.validation_failures(&mut failures); + if payload.order_document_json.is_none() { + failures.push(AppPublishValidationFailure::MissingOrderDocument); + } if payload .listing_addr .as_deref() @@ -254,11 +285,15 @@ pub enum AppPublishValidationFailure { MissingSource, MissingFarmDisplayName, MissingListingFarmId, + MissingListingFarmPubkey, + MissingListingCategory, MissingListingTitle, MissingListingUnit, MissingListingPrice, MissingListingCurrency, MissingListingAvailability, + MissingListingFulfillmentMethod, + MissingOrderDocument, MissingOrderListingAddress, MissingOrderListingEventId, MissingOrderListingRelay, @@ -276,11 +311,15 @@ impl AppPublishValidationFailure { Self::MissingSource => "missing_source", Self::MissingFarmDisplayName => "missing_farm_display_name", Self::MissingListingFarmId => "missing_listing_farm_id", + Self::MissingListingFarmPubkey => "missing_listing_farm_pubkey", + Self::MissingListingCategory => "missing_listing_category", Self::MissingListingTitle => "missing_listing_title", Self::MissingListingUnit => "missing_listing_unit", Self::MissingListingPrice => "missing_listing_price", Self::MissingListingCurrency => "missing_listing_currency", Self::MissingListingAvailability => "missing_listing_availability", + Self::MissingListingFulfillmentMethod => "missing_listing_fulfillment_method", + Self::MissingOrderDocument => "missing_order_document", Self::MissingOrderListingAddress => "missing_order_listing_address", Self::MissingOrderListingEventId => "missing_order_listing_event_id", Self::MissingOrderListingRelay => "missing_order_listing_relay", @@ -392,14 +431,20 @@ mod tests { let payload = AppPublishPayload::Listing(AppListingPublishPayload { context: AppPublishContext::new("", ""), product_id: ProductId::new(), + listing_d_tag: None, farm_id: None, + farm_pubkey: None, + farm_d_tag: None, title: " ".to_owned(), subtitle: None, + category: None, unit_label: String::new(), price_minor_units: Some(0), price_currency: String::new(), stock_quantity: Some(4), availability_window_id: None, + fulfillment_method: None, + fulfillment_location: None, status: ProductStatus::Published, }); @@ -415,11 +460,14 @@ mod tests { "missing_account_id", "missing_source", "missing_listing_farm_id", + "missing_listing_farm_pubkey", + "missing_listing_category", "missing_listing_title", "missing_listing_unit", "missing_listing_price", "missing_listing_currency", "missing_listing_availability", + "missing_listing_fulfillment_method", ] ); assert!(payload.validate().is_err()); @@ -432,6 +480,7 @@ mod tests { order_id: OrderId::new(), farm_id: FarmId::new(), status: Some("needs_action".to_owned()), + order_document_json: None, listing_addr: Some(String::new()), listing_event_id: None, listing_relays: vec![], @@ -455,6 +504,7 @@ mod tests { assert_eq!( reason_codes, vec![ + "missing_order_document", "missing_order_listing_address", "missing_order_listing_event_id", "missing_order_listing_relay",