commit 68c91889b21fdbd5570064dc3b684e12ff718299
parent 4c8d53996f02f0dbfa345741f0c80d6a1ddd1c81
Author: triesap <tyson@radroots.org>
Date: Mon, 25 May 2026 01:39:37 +0000
sync: prove direct relay convergence
Diffstat:
4 files changed, 612 insertions(+), 180 deletions(-)
diff --git a/crates/launchers/desktop/src/runtime.rs b/crates/launchers/desktop/src/runtime.rs
@@ -5,7 +5,7 @@ use std::path::PathBuf;
use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
use std::time::{SystemTime, UNIX_EPOCH};
-use chrono::{Duration, Utc};
+use chrono::{DateTime, Duration, Utc};
use radroots_app_core::{
AppBuildIdentity, AppDesktopRuntimePaths, AppRuntimeCapture, AppRuntimeMode,
AppRuntimePathsError, AppRuntimeSnapshot, AppSharedAccountsPaths, PackDayExportWriteError,
@@ -49,7 +49,8 @@ use radroots_app_state::{
derive_sync_projection,
};
use radroots_app_sync::{
- AppListingPublishPayload, AppOrderRequestPublishPayload, AppPublishContext, AppPublishPayload,
+ AppFarmProfilePublishPayload, AppListingPublishPayload, AppOrderRequestItemPayload,
+ AppOrderRequestPublishPayload, AppPublishContext, AppPublishPayload,
AppPublishedOperationReceipt, AppSyncProjection, AppSyncRequest, AppSyncResult,
AppSyncTransport, AppSyncTransportError, PendingSyncOperation, SyncAggregateRef,
SyncCheckpointStatus, SyncConflictSeverity, SyncOperationKind, SyncTrigger,
@@ -196,7 +197,7 @@ impl SdkDirectRelayAppSyncTransport {
))
})?;
let receipt = publish_app_payload_sync(&client, &identity, &publish_payload)?;
- published_receipts.push(app_published_operation_receipt(
+ published_receipts.push(published_operation_receipt(
operation.operation_key.as_str(),
&publish_payload,
receipt,
@@ -1529,14 +1530,7 @@ impl DesktopAppRuntimeState {
fn place_personal_order(&mut self) -> Result<bool, AppSqliteError> {
let buyer_context = self.state_store.identity_projection().buyer_context();
- let (
- order_id,
- refreshed_cart,
- refreshed_checkout,
- refreshed_orders,
- order_detail,
- order_export,
- ) = {
+ let (refreshed_cart, refreshed_checkout, refreshed_orders, order_detail, order_export) = {
let Some(sqlite_store) = self.sqlite_store.as_ref() else {
return Ok(false);
};
@@ -1568,7 +1562,6 @@ impl DesktopAppRuntimeState {
});
};
(
- order_id,
refreshed_cart,
refreshed_checkout,
refreshed_orders,
@@ -1602,7 +1595,7 @@ impl DesktopAppRuntimeState {
changed
});
let section_changed = self.select_personal_section(PersonalSection::Orders)?;
- {
+ let order_local_work = {
let sqlite_store =
self.sqlite_store
.as_ref()
@@ -1613,10 +1606,14 @@ impl DesktopAppRuntimeState {
sqlite_store,
&buyer_context,
&order_export,
- )?;
- }
+ )?
+ };
let pending_changed = if matches!(buyer_context, BuyerContext::Account(_)) {
- self.enqueue_selected_account_order_sync_operation(order_id, order_detail.farm_id)?
+ self.enqueue_selected_account_order_sync_operation(
+ &buyer_context,
+ &order_export,
+ order_local_work.as_ref(),
+ )?
} else {
false
};
@@ -1658,7 +1655,7 @@ impl DesktopAppRuntimeState {
order_export
};
- let order_changed = {
+ let order_local_work = {
let Some(sqlite_store) = self.sqlite_store.as_ref() else {
return Ok(changed);
};
@@ -1668,13 +1665,16 @@ impl DesktopAppRuntimeState {
&order_export,
)?
};
- if order_changed {
- if matches!(buyer_context, BuyerContext::Account(_)) {
- let _ = self.enqueue_selected_account_order_sync_operation(
- order_export.order_id,
- order_export.farm_id,
- )?;
- }
+ if let Some(order_local_work) = order_local_work.as_ref()
+ && matches!(buyer_context, BuyerContext::Account(_))
+ {
+ let _ = self.enqueue_selected_account_order_sync_operation(
+ &buyer_context,
+ &order_export,
+ Some(order_local_work),
+ )?;
+ }
+ if order_local_work.is_some() {
refreshed_order_id.get_or_insert(record.order_id);
changed = true;
}
@@ -2701,19 +2701,18 @@ impl DesktopAppRuntimeState {
sqlite_store.save_farm_summary(&saved_farm)?;
sqlite_store.save_farm_setup(account.account.account_id.as_str(), &projection)?;
- let _ = self.append_app_farm_local_work_record(&account, &projection, &saved_farm)?;
+ let source_local_event_id =
+ self.append_app_farm_local_work_record(&account, &projection, &saved_farm)?;
let selected_account_context = self.refresh_selected_account_context()?;
self.apply_selected_account_context(&selected_account_context);
- let _ = self.enqueue_selected_account_sync_operations(vec![pending_sync_upsert(
- SyncAggregateRef::Farm(saved_farm.farm_id),
- farm_sync_payload(
- saved_farm.farm_id,
- saved_farm.display_name.as_str(),
- Some(saved_farm.readiness),
- "finish_farm_setup",
- ),
- )])?;
+ let _ = self.enqueue_selected_account_farm_publish_operation(
+ saved_farm.farm_id,
+ saved_farm.display_name.as_str(),
+ Some(saved_farm.readiness),
+ "finish_farm_setup",
+ source_local_event_id.as_deref(),
+ )?;
Ok(selected_account_context.farm_setup_projection)
}
@@ -2745,16 +2744,6 @@ impl DesktopAppRuntimeState {
.ok_or(DesktopAppRuntimeFarmRulesError::FarmRequired)?;
let fallback_profile = self.fallback_farm_profile(farm_id);
let normalized = normalize_farm_rules_projection(projection, &fallback_profile);
- let previous_fulfillment_window_ids = {
- let sqlite_store = self.sqlite_store_for_farm_rules()?;
- sqlite_store
- .load_farm_rules(farm_id)?
- .fulfillment_windows
- .into_iter()
- .map(|window| window.fulfillment_window_id)
- .collect::<BTreeSet<_>>()
- };
-
let saved_projection = {
let sqlite_store = self.sqlite_store_for_farm_rules()?;
sqlite_store.save_farm_rules(&normalized)?;
@@ -2795,47 +2784,23 @@ impl DesktopAppRuntimeState {
)?
};
self.apply_selected_account_context(&selected_account_context);
- let current_fulfillment_window_ids = saved_projection
- .fulfillment_windows
- .iter()
- .map(|window| window.fulfillment_window_id)
- .collect::<BTreeSet<_>>();
- let mut pending_operations = Vec::with_capacity(
- 1 + saved_projection.fulfillment_windows.len() + previous_fulfillment_window_ids.len(),
- );
- pending_operations.push(pending_sync_upsert(
- SyncAggregateRef::Farm(farm_id),
- farm_sync_payload(
- farm_id,
- saved_projection
- .farm_profile
- .as_ref()
- .map(|profile| profile.display_name.as_str())
- .unwrap_or_default(),
- Some(if saved_projection.is_ready() {
- FarmReadiness::Ready
- } else {
- FarmReadiness::Incomplete
- }),
- "save_farm_rules_projection",
- ),
- ));
- for window in &saved_projection.fulfillment_windows {
- pending_operations.push(pending_sync_upsert(
- SyncAggregateRef::FulfillmentWindow(window.fulfillment_window_id),
- fulfillment_window_sync_payload(window.fulfillment_window_id, farm_id, "upsert"),
- ));
- }
- for fulfillment_window_id in previous_fulfillment_window_ids
- .difference(¤t_fulfillment_window_ids)
- .copied()
- {
- pending_operations.push(pending_sync_delete(
- SyncAggregateRef::FulfillmentWindow(fulfillment_window_id),
- fulfillment_window_sync_payload(fulfillment_window_id, farm_id, "delete"),
- ));
- }
- let _ = self.enqueue_selected_account_sync_operations(pending_operations)?;
+ let display_name = saved_projection
+ .farm_profile
+ .as_ref()
+ .map(|profile| profile.display_name.as_str())
+ .unwrap_or_default();
+ let readiness = if saved_projection.is_ready() {
+ FarmReadiness::Ready
+ } else {
+ FarmReadiness::Incomplete
+ };
+ let _ = self.enqueue_selected_account_farm_publish_operation(
+ farm_id,
+ display_name,
+ Some(readiness),
+ "save_farm_rules_projection",
+ None,
+ )?;
Ok(saved_projection)
}
@@ -3373,18 +3338,89 @@ impl DesktopAppRuntimeState {
fn enqueue_selected_account_order_sync_operation(
&mut self,
- order_id: OrderId,
+ buyer_context: &BuyerContext,
+ order: &BuyerOrderLocalEventExport,
+ local_work: Option<&AppOrderLocalWorkPublishSource>,
+ ) -> Result<bool, AppSqliteError> {
+ let Some(operation) =
+ self.order_request_publish_operation(buyer_context, order, local_work)?
+ else {
+ return self.refresh_selected_account_sync();
+ };
+
+ self.enqueue_selected_account_sync_operation_once(operation)
+ }
+
+ fn enqueue_selected_account_farm_publish_operation(
+ &mut self,
farm_id: FarmId,
+ display_name: &str,
+ readiness: Option<FarmReadiness>,
+ source: &str,
+ source_local_event_id: Option<&str>,
) -> Result<bool, AppSqliteError> {
- self.enqueue_selected_account_sync_operation_once(pending_sync_upsert(
- SyncAggregateRef::Order(order_id),
- order_sync_payload(
- order_id,
- farm_id,
- "place_personal_order",
- Some("needs_action"),
- ),
- ))
+ let existing_source_local_event_id = if source_local_event_id.is_none() {
+ self.selected_account_pending_farm_source_local_event_id(farm_id)?
+ } else {
+ None
+ };
+ let source_local_event_id =
+ source_local_event_id.or(existing_source_local_event_id.as_deref());
+
+ let Some(operation) = self.farm_profile_publish_operation(
+ farm_id,
+ display_name,
+ readiness,
+ source,
+ source_local_event_id,
+ )?
+ else {
+ return self.refresh_selected_account_sync();
+ };
+
+ self.enqueue_selected_account_sync_operations(vec![operation])
+ }
+
+ fn selected_account_pending_farm_source_local_event_id(
+ &self,
+ farm_id: FarmId,
+ ) -> Result<Option<String>, AppSqliteError> {
+ let Some(account_id) = self
+ .state_store
+ .identity_projection()
+ .selected_account
+ .as_ref()
+ .map(|account| account.account.account_id.clone())
+ else {
+ return Ok(None);
+ };
+ let Some(sqlite_store) = self.sqlite_store.as_ref() else {
+ return Ok(None);
+ };
+ let existing = sqlite_store.load_pending_sync_operations(account_id.as_str())?;
+ existing
+ .iter()
+ .find(|pending| {
+ pending.operation.aggregate == SyncAggregateRef::Farm(farm_id)
+ && pending.operation.operation == SyncOperationKind::Upsert
+ })
+ .map(|pending| {
+ pending
+ .operation
+ .publish_payload()
+ .map_err(|_| AppSqliteError::InvalidProjection {
+ reason: "farm profile publish payload must parse",
+ })
+ })
+ .transpose()
+ .map(|payload| {
+ payload.and_then(|payload| match payload {
+ AppPublishPayload::FarmProfile(payload) => {
+ payload.context.source_local_event_id
+ }
+ _ => None,
+ })
+ })
}
fn enqueue_selected_account_sync_operation_once(
@@ -3433,6 +3469,46 @@ impl DesktopAppRuntimeState {
self.enqueue_selected_account_sync_operations(vec![operation])
}
+ fn farm_profile_publish_operation(
+ &self,
+ farm_id: FarmId,
+ display_name: &str,
+ readiness: Option<FarmReadiness>,
+ source: &str,
+ source_local_event_id: Option<&str>,
+ ) -> Result<Option<PendingSyncOperation>, AppSqliteError> {
+ let Some(selected_account) = self
+ .state_store
+ .identity_projection()
+ .selected_account
+ .as_ref()
+ else {
+ return Ok(None);
+ };
+ let mut context = AppPublishContext::new(
+ selected_account.account.account_id.clone(),
+ source.to_owned(),
+ );
+ if let Some(source_local_event_id) = source_local_event_id {
+ context = context.with_source_local_event_id(source_local_event_id.to_owned());
+ }
+ let payload = AppPublishPayload::FarmProfile(AppFarmProfilePublishPayload {
+ context,
+ farm_id,
+ display_name: display_name.trim().to_owned(),
+ readiness,
+ });
+ if payload.validate().is_err() {
+ return Ok(None);
+ }
+
+ PendingSyncOperation::from_publish_payload(payload, current_utc_timestamp())
+ .map(Some)
+ .map_err(|_| AppSqliteError::InvalidProjection {
+ reason: "farm profile publish payload must serialize",
+ })
+ }
+
fn product_publish_operation(
&self,
product_id: ProductId,
@@ -3468,6 +3544,9 @@ impl DesktopAppRuntimeState {
return Ok(None);
};
let farm_setup = self.state_store.farm_setup_projection();
+ let farm_rules = self.state_store.farm_rules_projection();
+ let (availability_starts_at, availability_ends_at) =
+ listing_availability_window_times(&draft, farm_rules);
let listing_d_tag = d_tag_from_uuid(product_id.as_uuid());
let mut context = AppPublishContext::new(
selected_account.account.account_id.clone(),
@@ -3491,16 +3570,10 @@ impl DesktopAppRuntimeState {
price_currency: draft.price_currency.trim().to_uppercase(),
stock_quantity: draft.stock_quantity,
availability_window_id: draft.availability_window_id,
- fulfillment_method: listing_fulfillment_method(
- &draft,
- farm_setup,
- self.state_store.farm_rules_projection(),
- ),
- fulfillment_location: listing_fulfillment_location(
- &draft,
- farm_setup,
- self.state_store.farm_rules_projection(),
- ),
+ availability_starts_at,
+ availability_ends_at,
+ fulfillment_method: listing_fulfillment_method(&draft, farm_setup, farm_rules),
+ fulfillment_location: listing_fulfillment_location(&draft, farm_setup, farm_rules),
status: draft.status,
});
if payload.validate().is_err() {
@@ -3514,6 +3587,65 @@ impl DesktopAppRuntimeState {
})
}
+ fn order_request_publish_operation(
+ &self,
+ buyer_context: &BuyerContext,
+ order: &BuyerOrderLocalEventExport,
+ local_work: Option<&AppOrderLocalWorkPublishSource>,
+ ) -> Result<Option<PendingSyncOperation>, AppSqliteError> {
+ let Some(local_work) = local_work else {
+ return Ok(None);
+ };
+ let Some(buyer_account) = self.selected_buyer_account(buyer_context) else {
+ return Ok(None);
+ };
+ let buyer_pubkey = self.local_events_owner_pubkey(buyer_account);
+ let export = AppBuyerOrderRequestExport::from_order(order, buyer_pubkey.as_deref())?;
+ if !export.is_supported() {
+ return Ok(None);
+ }
+ let Some((currency_code, total_minor_units)) = order_currency_and_total(order)? else {
+ return Ok(None);
+ };
+ let context = AppPublishContext::new(
+ buyer_account.account.account_id.clone(),
+ "place_personal_order",
+ )
+ .with_source_local_event_id(local_work.record_id.clone());
+ let payload = AppPublishPayload::OrderRequest(AppOrderRequestPublishPayload {
+ context,
+ order_id: order.order_id,
+ farm_id: order.farm_id,
+ status: Some(order.status.clone()),
+ order_document_json: Some(local_work.payload.clone()),
+ listing_addr: export.listing_addr,
+ listing_event_id: export.listing_event_id,
+ listing_relays: vec![self.default_nostr_relay_url.clone()],
+ buyer_pubkey: export.buyer_pubkey,
+ seller_pubkey: export.seller_pubkey,
+ items: order
+ .lines
+ .iter()
+ .map(|line| AppOrderRequestItemPayload {
+ product_id: line.product_id,
+ quantity: line.quantity,
+ })
+ .collect(),
+ currency_code: Some(currency_code),
+ total_minor_units: Some(total_minor_units),
+ note: non_empty_string(order.buyer_order_note.as_str()),
+ });
+ if payload.validate().is_err() {
+ return Ok(None);
+ }
+
+ PendingSyncOperation::from_publish_payload(payload, current_utc_timestamp())
+ .map(Some)
+ .map_err(|_| AppSqliteError::InvalidProjection {
+ reason: "order request publish payload must serialize",
+ })
+ }
+
fn selected_account_id(&self) -> Result<String, DesktopAppRuntimeFarmSetupError> {
self.selected_account_for_farm_setup()
.map(|account| account.account.account_id.clone())
@@ -3803,9 +3935,9 @@ impl DesktopAppRuntimeState {
account: &radroots_app_models::SelectedAccountProjection,
projection: &FarmSetupProjection,
saved_farm: &FarmSummary,
- ) -> Result<bool, AppSqliteError> {
+ ) -> Result<Option<String>, AppSqliteError> {
let Some(shared_accounts_paths) = self.shared_accounts_paths.as_ref() else {
- return Ok(false);
+ return Ok(None);
};
let timestamp = current_runtime_time_ms()?;
let farm_d_tag = d_tag_from_uuid(saved_farm.farm_id.as_uuid());
@@ -3847,8 +3979,9 @@ impl DesktopAppRuntimeState {
},
},
});
+ let record_id = format!("app:local_work:farm:{farm_d_tag}:{}", Uuid::now_v7());
let input = LocalEventRecordInput {
- record_id: format!("app:local_work:farm:{farm_d_tag}:{}", Uuid::now_v7()),
+ record_id: record_id.clone(),
family: LocalRecordFamily::LocalWork,
status: LocalRecordStatus::LocalSaved,
source_runtime: SourceRuntime::App,
@@ -3858,7 +3991,7 @@ impl DesktopAppRuntimeState {
owner_pubkey,
farm_id: Some(farm_d_tag),
listing_addr: None,
- local_work_json: Some(payload),
+ local_work_json: Some(payload.clone()),
event_id: None,
event_kind: None,
event_pubkey: None,
@@ -3873,7 +4006,7 @@ impl DesktopAppRuntimeState {
};
self.append_app_local_work_record(shared_accounts_paths, &input)?;
- Ok(true)
+ Ok(Some(record_id))
}
fn append_app_listing_local_work_record(
@@ -3978,7 +4111,7 @@ impl DesktopAppRuntimeState {
owner_pubkey,
farm_id: Some(farm_d_tag),
listing_addr,
- local_work_json: Some(payload),
+ local_work_json: Some(payload.clone()),
event_id: None,
event_kind: None,
event_pubkey: None,
@@ -4001,13 +4134,10 @@ impl DesktopAppRuntimeState {
sqlite_store: &AppSqliteStore,
buyer_context: &BuyerContext,
order: &BuyerOrderLocalEventExport,
- ) -> Result<bool, AppSqliteError> {
+ ) -> Result<Option<AppOrderLocalWorkPublishSource>, AppSqliteError> {
let Some(shared_accounts_paths) = self.shared_accounts_paths.as_ref() else {
- return Ok(false);
+ return Ok(None);
};
- if sqlite_store.buyer_order_coordination_is_synced(buyer_context, order.order_id)? {
- return Ok(true);
- }
let timestamp = current_runtime_time_ms()?;
let record_id = buyer_order_request_local_work_record_id(
order.order_id.to_string().as_str(),
@@ -4028,6 +4158,9 @@ impl DesktopAppRuntimeState {
&export,
timestamp,
);
+ if sqlite_store.buyer_order_coordination_is_synced(buyer_context, order.order_id)? {
+ return Ok(Some(AppOrderLocalWorkPublishSource { record_id, payload }));
+ }
validate_buyer_order_request_local_work_payload(&payload).map_err(|source| {
AppSqliteError::LocalEvents {
operation: "validate app buyer order request local work payload",
@@ -4055,7 +4188,7 @@ impl DesktopAppRuntimeState {
owner_pubkey: buyer_pubkey,
farm_id: export.farm_key.clone(),
listing_addr: export.listing_addr.clone(),
- local_work_json: Some(payload),
+ local_work_json: Some(payload.clone()),
event_id: None,
event_kind: None,
event_pubkey: None,
@@ -4079,7 +4212,7 @@ impl DesktopAppRuntimeState {
return Err(error);
}
sqlite_store.mark_buyer_order_coordination_synced(buyer_context, order.order_id)?;
- Ok(true)
+ Ok(Some(AppOrderLocalWorkPublishSource { record_id, payload }))
}
fn append_app_local_work_record(
@@ -4163,6 +4296,27 @@ impl DesktopAppRuntimeState {
.map(|account| account.account.account_id.clone());
for receipt in receipts {
+ let source_record = receipt
+ .source_local_event_id
+ .as_deref()
+ .map(|source_record_id| {
+ store.get_record(source_record_id).map_err(|source| {
+ AppSqliteError::LocalEvents {
+ operation: "load app publish source record",
+ source,
+ }
+ })
+ })
+ .transpose()?
+ .flatten();
+ let farm_id = source_record
+ .as_ref()
+ .and_then(|record| record.farm_id.clone())
+ .or_else(|| signed_event_farm_id(receipt));
+ let listing_addr = source_record
+ .as_ref()
+ .and_then(|record| record.listing_addr.clone())
+ .or_else(|| signed_event_listing_addr(receipt));
let event_record = LocalEventRecordInput {
record_id: format!("app:signed_event:{}", receipt.event_id),
family: LocalRecordFamily::SignedEvent,
@@ -4172,8 +4326,8 @@ impl DesktopAppRuntimeState {
inserted_at_ms: timestamp,
owner_account_id: owner_account_id.clone(),
owner_pubkey: Some(receipt.event_pubkey.clone()),
- farm_id: None,
- listing_addr: None,
+ farm_id,
+ listing_addr,
local_work_json: None,
event_id: Some(receipt.event_id.clone()),
event_kind: Some(i64::from(receipt.event_kind)),
@@ -4195,6 +4349,12 @@ impl DesktopAppRuntimeState {
})?;
if let Some(source_record_id) = receipt.source_local_event_id.as_deref() {
+ let Some(source_record) = source_record.as_ref() else {
+ continue;
+ };
+ if source_record.family == LocalRecordFamily::LocalWork {
+ continue;
+ }
store
.update_outbox(&LocalEventRecordUpdate {
record_id: source_record_id.to_owned(),
@@ -4205,7 +4365,7 @@ impl DesktopAppRuntimeState {
updated_at_ms: timestamp,
})
.map_err(|source| AppSqliteError::LocalEvents {
- operation: "update app local work publish evidence",
+ operation: "update app publish source evidence",
source,
})?;
}
@@ -4611,6 +4771,22 @@ fn listing_primary_bin_id(listing_d_tag: &str) -> String {
format!("{listing_d_tag}:primary")
}
+fn listing_availability_window_times(
+ draft: &ProductEditorDraft,
+ farm_rules: &FarmRulesProjection,
+) -> (Option<String>, Option<String>) {
+ draft
+ .availability_window_id
+ .and_then(|window_id| {
+ farm_rules
+ .fulfillment_windows
+ .iter()
+ .find(|window| window.fulfillment_window_id == window_id)
+ })
+ .map(|window| (Some(window.starts_at.clone()), Some(window.ends_at.clone())))
+ .unwrap_or((None, None))
+}
+
fn listing_fulfillment_method(
draft: &ProductEditorDraft,
farm_setup: &FarmSetupProjection,
@@ -4812,15 +4988,7 @@ fn listing_publish_payload_to_sdk_listing(
plot: None,
discounts: None,
inventory_available: payload.stock_quantity.map(RadrootsCoreDecimal::from),
- availability: Some(RadrootsListingAvailability::Status {
- status: match payload.status {
- ProductStatus::Published => RadrootsListingStatus::Active,
- ProductStatus::Archived => RadrootsListingStatus::Sold,
- other => RadrootsListingStatus::Other {
- value: other.storage_key().to_owned(),
- },
- },
- }),
+ availability: listing_publish_payload_availability(payload)?,
delivery_method: Some(parse_app_listing_delivery_method(
payload.fulfillment_method.as_deref().unwrap_or_default(),
)?),
@@ -4841,6 +5009,54 @@ fn listing_publish_payload_to_sdk_listing(
})
}
+fn listing_publish_payload_availability(
+ payload: &AppListingPublishPayload,
+) -> Result<Option<RadrootsListingAvailability>, AppSyncTransportError> {
+ if payload.status == ProductStatus::Published {
+ let start = parse_listing_availability_timestamp(
+ payload.availability_starts_at.as_deref(),
+ "publishable listing requires availability start",
+ )?;
+ let end = parse_listing_availability_timestamp(
+ payload.availability_ends_at.as_deref(),
+ "publishable listing requires availability end",
+ )?;
+ if end <= start {
+ return Err(AppSyncTransportError::failed(
+ "publishable listing availability end must be after start",
+ ));
+ }
+ return Ok(Some(RadrootsListingAvailability::Window {
+ start: Some(start),
+ end: Some(end),
+ }));
+ }
+
+ Ok(Some(RadrootsListingAvailability::Status {
+ status: match payload.status {
+ ProductStatus::Archived => RadrootsListingStatus::Sold,
+ other => RadrootsListingStatus::Other {
+ value: other.storage_key().to_owned(),
+ },
+ },
+ }))
+}
+
+fn parse_listing_availability_timestamp(
+ value: Option<&str>,
+ missing_message: &'static str,
+) -> Result<u64, AppSyncTransportError> {
+ let value = value
+ .map(str::trim)
+ .filter(|value| !value.is_empty())
+ .ok_or_else(|| AppSyncTransportError::failed(missing_message))?;
+ let timestamp = DateTime::parse_from_rfc3339(value)
+ .map_err(|error| AppSyncTransportError::failed(error.to_string()))?
+ .timestamp();
+ u64::try_from(timestamp)
+ .map_err(|_| AppSyncTransportError::failed("listing availability timestamp is negative"))
+}
+
fn parse_app_listing_unit(value: &str) -> Result<RadrootsCoreUnit, AppSyncTransportError> {
match value.trim().to_ascii_lowercase().as_str() {
"each" | "ea" | "unit" | "units" => Ok(RadrootsCoreUnit::Each),
@@ -4915,7 +5131,7 @@ fn order_request_publish_payload_to_sdk_order(
.map_err(|error| AppSyncTransportError::failed(error.to_string()))
}
-fn app_published_operation_receipt(
+fn published_operation_receipt(
operation_key: &str,
payload: &AppPublishPayload,
receipt: SdkPublishReceipt,
@@ -4975,6 +5191,52 @@ fn d_tag_from_uuid(uuid: Uuid) -> String {
base64_url_no_pad(uuid.as_bytes())
}
+fn signed_event_farm_id(receipt: &AppPublishedOperationReceipt) -> Option<String> {
+ match receipt.event_kind {
+ 30340 => signed_event_tag_value(&receipt.event_tags_json, "d", 1),
+ 30402 => signed_event_tag_value(&receipt.event_tags_json, "a", 1)
+ .and_then(|address| signed_event_address_d_tag(address.as_str())),
+ _ => None,
+ }
+}
+
+fn signed_event_listing_addr(receipt: &AppPublishedOperationReceipt) -> Option<String> {
+ if receipt.event_kind != 30402 {
+ return None;
+ }
+ let pubkey = receipt.event_pubkey.trim();
+ if pubkey.is_empty() {
+ return None;
+ }
+ signed_event_tag_value(&receipt.event_tags_json, "d", 1)
+ .map(|d_tag| format!("30402:{pubkey}:{d_tag}"))
+}
+
+fn signed_event_tag_value(
+ tags: &serde_json::Value,
+ tag_name: &str,
+ index: usize,
+) -> Option<String> {
+ tags.as_array()?.iter().find_map(|tag| {
+ let values = tag.as_array()?;
+ (values.first()?.as_str()? == tag_name)
+ .then(|| values.get(index).and_then(serde_json::Value::as_str))
+ .flatten()
+ .map(str::trim)
+ .filter(|value| !value.is_empty())
+ .map(str::to_owned)
+ })
+}
+
+fn signed_event_address_d_tag(address: &str) -> Option<String> {
+ address
+ .rsplit(':')
+ .next()
+ .map(str::trim)
+ .filter(|value| !value.is_empty())
+ .map(str::to_owned)
+}
+
fn base64_url_no_pad(bytes: &[u8]) -> String {
const ALPHABET: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_";
let mut output = String::with_capacity((bytes.len() * 4).div_ceil(3));
@@ -5043,6 +5305,12 @@ struct AppBuyerOrderRequestExport {
support_issues: Vec<&'static str>,
}
+#[derive(Clone, Debug)]
+struct AppOrderLocalWorkPublishSource {
+ record_id: String,
+ payload: serde_json::Value,
+}
+
impl AppBuyerOrderRequestExport {
fn from_order(
order: &BuyerOrderLocalEventExport,
@@ -5303,6 +5571,44 @@ fn order_economics_json(
})))
}
+fn order_currency_and_total(
+ order: &BuyerOrderLocalEventExport,
+) -> Result<Option<(String, u32)>, AppSqliteError> {
+ let mut currency = None::<String>;
+ let mut total_minor_units = 0_u32;
+
+ for line in &order.lines {
+ let Some(unit_price_minor_units) = line.unit_price_minor_units else {
+ return Ok(None);
+ };
+ let line_currency = normalize_currency_code(line.price_currency.as_str());
+ if line_currency.len() != 3 || !line_currency.bytes().all(|byte| byte.is_ascii_uppercase())
+ {
+ return Ok(None);
+ }
+ if let Some(existing_currency) = currency.as_deref() {
+ if existing_currency != line_currency {
+ return Ok(None);
+ }
+ } else {
+ currency = Some(line_currency.clone());
+ }
+ let line_total = unit_price_minor_units.checked_mul(line.quantity).ok_or(
+ AppSqliteError::InvalidProjection {
+ reason: "buyer order publish line total overflowed",
+ },
+ )?;
+ total_minor_units =
+ total_minor_units
+ .checked_add(line_total)
+ .ok_or(AppSqliteError::InvalidProjection {
+ reason: "buyer order publish total overflowed",
+ })?;
+ }
+
+ Ok(currency.map(|currency| (currency, total_minor_units)))
+}
+
fn shared_optional_line_value(
lines: &[BuyerOrderLocalEventLine],
value: impl Fn(&BuyerOrderLocalEventLine) -> Option<&str>,
@@ -6634,12 +6940,12 @@ mod tests {
HomeRoute,
};
use radroots_app_sync::{
- AppFarmProfilePublishPayload, AppPublishContext, AppPublishPayload, AppSyncRequest,
- AppSyncResult, AppSyncRunStatus, AppSyncTransport, AppSyncTransportError,
- PendingSyncOperation, PendingSyncOperationState, RecordedAppSyncTransport,
- SyncAggregateRef, SyncCheckpointState, SyncCheckpointStatus, SyncConflict,
- SyncConflictKind, SyncConflictResolutionStatus, SyncConflictSeverity, SyncOperationKind,
- SyncTrigger,
+ AppFarmProfilePublishPayload, AppOrderRequestPublishPayload, AppPublishContext,
+ AppPublishPayload, AppSyncRequest, AppSyncResult, AppSyncRunStatus, AppSyncTransport,
+ AppSyncTransportError, PendingSyncOperation, PendingSyncOperationState,
+ RecordedAppSyncTransport, SyncAggregateRef, SyncCheckpointState, SyncCheckpointStatus,
+ SyncConflict, SyncConflictKind, SyncConflictResolutionStatus, SyncConflictSeverity,
+ SyncOperationKind, SyncTrigger,
};
use radroots_identity::RadrootsIdentity;
use radroots_local_events::{
@@ -7282,6 +7588,14 @@ mod tests {
assert_eq!(payload.price_minor_units, Some(900));
assert_eq!(payload.price_currency, "USD");
assert_eq!(payload.stock_quantity, Some(11));
+ assert_eq!(
+ payload.availability_starts_at.as_deref(),
+ Some("2099-04-25T14:00:00Z")
+ );
+ assert_eq!(
+ payload.availability_ends_at.as_deref(),
+ Some("2099-04-25T18:00:00Z")
+ );
assert_eq!(payload.fulfillment_method.as_deref(), Some("pickup"));
assert_eq!(
payload.fulfillment_location.as_deref(),
@@ -9847,15 +10161,42 @@ mod tests {
.as_ref()
.expect("buyer order detail")
.farm_id;
+ let pending_payload = assert_single_order_request_publish_payload(
+ &runtime,
+ buyer_account_id.as_str(),
+ order_id,
+ order_farm_id,
+ "needs_action",
+ );
assert_eq!(
- pending_order_sync_payloads(&runtime, buyer_account_id.as_str(), order_id),
- vec![super::order_sync_payload(
- order_id,
- order_farm_id,
- "place_personal_order",
- Some("needs_action")
- )]
+ pending_payload.context.source_local_event_id.as_deref(),
+ Some(format!("app:local_work:order_request:{order_id}").as_str())
+ );
+ assert_eq!(
+ pending_payload.listing_addr.as_deref(),
+ Some(format!("30402:buyer-visible-seller-pubkey:{listing_key}").as_str())
);
+ assert_eq!(
+ pending_payload.listing_event_id.as_deref(),
+ Some("event-cli:signed_event:buyer-order-supported-listing")
+ );
+ assert_eq!(
+ pending_payload.seller_pubkey.as_deref(),
+ Some("buyer-visible-seller-pubkey")
+ );
+ assert!(
+ pending_payload
+ .buyer_pubkey
+ .as_deref()
+ .is_some_and(is_hex_64)
+ );
+ assert_eq!(pending_payload.items.len(), 1);
+ assert_eq!(pending_payload.items[0].product_id, product_id);
+ assert_eq!(pending_payload.items[0].quantity, 2);
+ assert_eq!(pending_payload.currency_code.as_deref(), Some("USD"));
+ assert_eq!(pending_payload.total_minor_units, Some(1600));
+ assert_eq!(pending_payload.note.as_deref(), Some("Leave by the cooler"));
+ assert!(pending_payload.order_document_json.is_some());
{
let state = runtime.lock_state_mut();
@@ -9888,6 +10229,7 @@ mod tests {
&order_export,
)
.expect("order local event reappend should be idempotent")
+ .is_some()
);
let coordination_after = sqlite_store
.load_buyer_order_coordination_record(&buyer_context, order_id)
@@ -9984,12 +10326,6 @@ mod tests {
fn runtime_buyer_order_shared_append_failure_is_recoverable_in_same_session() {
let (runtime, paths, buyer_account_id, order_id, order_farm_id) =
blocked_buyer_order_runtime("buyer_order_append_failure_same_session");
- let expected_order_sync_payload = super::order_sync_payload(
- order_id,
- order_farm_id,
- "place_personal_order",
- Some("needs_action"),
- );
{
let state = runtime.lock_state_mut();
let sqlite_store = state.sqlite_store.as_ref().expect("sqlite store");
@@ -10014,9 +10350,12 @@ mod tests {
.orders
.has_recoverable_coordination
);
- assert_eq!(
- pending_order_sync_payloads(&runtime, buyer_account_id.as_str(), order_id),
- vec![expected_order_sync_payload.clone()]
+ assert_single_order_request_publish_payload(
+ &runtime,
+ buyer_account_id.as_str(),
+ order_id,
+ order_farm_id,
+ "scheduled",
);
assert_eq!(
summary_after_retry
@@ -10046,7 +10385,7 @@ mod tests {
BuyerOrderStatus::Scheduled
);
assert_eq!(
- app_buyer_order_local_work_record_ids(&paths),
+ buyer_order_local_work_record_ids(&paths),
vec![format!("app:local_work:order_request:{order_id}")]
);
{
@@ -10070,9 +10409,12 @@ mod tests {
.retry_pending_personal_order_coordination()
.expect("same-session synced buyer order recovery retry should be idempotent")
);
- assert_eq!(
- pending_order_sync_payloads(&runtime, buyer_account_id.as_str(), order_id),
- vec![expected_order_sync_payload]
+ assert_single_order_request_publish_payload(
+ &runtime,
+ buyer_account_id.as_str(),
+ order_id,
+ order_farm_id,
+ "scheduled",
);
cleanup_bootstrapped_runtime_paths(&paths);
@@ -10082,18 +10424,12 @@ mod tests {
fn runtime_buyer_order_shared_append_failure_is_recoverable_after_restart() {
let (runtime, paths, buyer_account_id, order_id, order_farm_id) =
blocked_buyer_order_runtime("buyer_order_append_failure_restart");
- let expected_order_sync_payload = super::order_sync_payload(
- order_id,
- order_farm_id,
- "place_personal_order",
- Some("needs_action"),
- );
unblock_shared_local_events_database(&paths);
drop(runtime);
let restarted_runtime = restart_runtime(paths.clone());
assert_eq!(
- app_buyer_order_local_work_record_ids(&paths),
+ buyer_order_local_work_record_ids(&paths),
vec![format!("app:local_work:order_request:{order_id}")]
);
let summary = restarted_runtime.summary();
@@ -10139,9 +10475,12 @@ mod tests {
.retry_pending_personal_order_coordination()
.expect("synced buyer order recovery retry should be idempotent")
);
- assert_eq!(
- pending_order_sync_payloads(&restarted_runtime, buyer_account_id.as_str(), order_id),
- vec![expected_order_sync_payload]
+ assert_single_order_request_publish_payload(
+ &restarted_runtime,
+ buyer_account_id.as_str(),
+ order_id,
+ order_farm_id,
+ "needs_action",
);
cleanup_bootstrapped_runtime_paths(&paths);
@@ -10152,12 +10491,6 @@ mod tests {
{
let (runtime, paths, buyer_account_id, order_id, order_farm_id) =
blocked_buyer_order_runtime("buyer_order_append_failure_foreground_resume");
- let expected_order_sync_payload = super::order_sync_payload(
- order_id,
- order_farm_id,
- "place_personal_order",
- Some("needs_action"),
- );
unblock_shared_local_events_database(&paths);
assert!(
runtime
@@ -10172,12 +10505,15 @@ mod tests {
.has_recoverable_coordination
);
assert_eq!(
- app_buyer_order_local_work_record_ids(&paths),
+ buyer_order_local_work_record_ids(&paths),
vec![format!("app:local_work:order_request:{order_id}")]
);
- assert_eq!(
- pending_order_sync_payloads(&runtime, buyer_account_id.as_str(), order_id),
- vec![expected_order_sync_payload]
+ assert_single_order_request_publish_payload(
+ &runtime,
+ buyer_account_id.as_str(),
+ order_id,
+ order_farm_id,
+ "needs_action",
);
{
let state = runtime.lock_state_mut();
@@ -13696,7 +14032,47 @@ mod tests {
.collect()
}
- fn app_buyer_order_local_work_record_ids(paths: &AppDesktopRuntimePaths) -> Vec<String> {
+ fn pending_order_request_publish_payloads(
+ runtime: &DesktopAppRuntime,
+ account_id: &str,
+ order_id: OrderId,
+ ) -> Vec<AppOrderRequestPublishPayload> {
+ pending_order_sync_payloads(runtime, account_id, order_id)
+ .into_iter()
+ .map(|payload_json| {
+ match serde_json::from_str::<AppPublishPayload>(payload_json.as_str())
+ .expect("pending order payload should be typed app publish work")
+ {
+ AppPublishPayload::OrderRequest(payload) => payload,
+ payload => panic!("expected order request publish payload, got {payload:?}"),
+ }
+ })
+ .collect()
+ }
+
+ fn assert_single_order_request_publish_payload(
+ runtime: &DesktopAppRuntime,
+ account_id: &str,
+ order_id: OrderId,
+ farm_id: FarmId,
+ status: &str,
+ ) -> AppOrderRequestPublishPayload {
+ let pending_payloads =
+ pending_order_request_publish_payloads(runtime, account_id, order_id);
+ assert_eq!(pending_payloads.len(), 1);
+ let payload = pending_payloads
+ .into_iter()
+ .next()
+ .expect("single order request publish payload");
+ assert_eq!(payload.context.account_id, account_id);
+ assert_eq!(payload.context.source, "place_personal_order");
+ assert_eq!(payload.order_id, order_id);
+ assert_eq!(payload.farm_id, farm_id);
+ assert_eq!(payload.status.as_deref(), Some(status));
+ payload
+ }
+
+ fn buyer_order_local_work_record_ids(paths: &AppDesktopRuntimePaths) -> Vec<String> {
shared_local_event_records(paths)
.into_iter()
.filter(|record| {
@@ -13721,6 +14097,12 @@ mod tests {
FarmId,
) {
let (runtime, paths) = bootstrapped_runtime(label);
+ let _ = install_recorded_sync_transport(
+ &runtime,
+ RecordedAppSyncTransport::fail(AppSyncTransportError::unavailable(
+ "test sync unavailable",
+ )),
+ );
assert!(
runtime
.generate_local_account(Some("Buyer".to_owned()))
diff --git a/crates/launchers/desktop/src/source_guards.rs b/crates/launchers/desktop/src/source_guards.rs
@@ -53,12 +53,14 @@ const ALLOWED_WINDOW_LITERALS: &[&str] = &[
"buyer-order-confirm-replace",
"buyer-order-keep-current",
"buyer-order-repeat-demand",
+ "buyer-orders-retry-coordination",
"buyer.add_to_cart_failed",
"buyer.cart_remove_failed",
"buyer.checkout_place_failed",
"buyer.checkout_save_failed",
"buyer.detail_open_failed",
"buyer.order_open_failed",
+ "buyer.order_coordination_retry_failed",
"buyer.repeat_demand_failed",
"buyer.section_select_failed",
"buyer_notice",
@@ -74,6 +76,7 @@ const ALLOWED_WINDOW_LITERALS: &[&str] = &[
"failed to add buyer product to cart",
"failed to open buyer order detail",
"failed to place buyer order",
+ "failed to retry buyer order coordination",
"failed to remove buyer cart line",
"failed to reorder buyer order",
"failed to save buyer checkout draft",
diff --git a/crates/shared/sqlite/src/local_interop.rs b/crates/shared/sqlite/src/local_interop.rs
@@ -427,7 +427,7 @@ impl<'a> AppLocalInteropRepository<'a> {
self.upsert_farm_summary(&FarmSummary {
farm_id,
display_name,
- readiness: FarmReadiness::Incomplete,
+ readiness: signed_farm_readiness(&content, tags).unwrap_or(FarmReadiness::Incomplete),
})?;
Ok(Some(ProjectionRecord {
kind: "farm",
@@ -1257,6 +1257,40 @@ fn signed_listing_product_status(
}
}
+fn signed_farm_readiness(content: &Value, tags: Option<&Value>) -> Option<FarmReadiness> {
+ string_at(content, &["readiness"])
+ .or_else(|| {
+ content
+ .get("tags")?
+ .as_array()?
+ .iter()
+ .filter_map(Value::as_str)
+ .find_map(readiness_tag_value)
+ })
+ .or_else(|| {
+ tags?.as_array()?.iter().find_map(|tag| {
+ let values = tag.as_array()?;
+ (values.first()?.as_str()? == "t")
+ .then(|| values.get(1).and_then(Value::as_str))
+ .flatten()
+ .and_then(readiness_tag_value)
+ })
+ })
+ .and_then(|value| match value.as_str() {
+ "ready" => Some(FarmReadiness::Ready),
+ "incomplete" => Some(FarmReadiness::Incomplete),
+ _ => None,
+ })
+}
+
+fn readiness_tag_value(value: &str) -> Option<String> {
+ value
+ .strip_prefix("radroots:readiness:")
+ .map(str::trim)
+ .filter(|value| !value.is_empty())
+ .map(str::to_owned)
+}
+
fn signed_listing_fulfillment_method(
content: Option<&Value>,
tags: Option<&Value>,
diff --git a/crates/shared/sync/src/publish.rs b/crates/shared/sync/src/publish.rs
@@ -89,6 +89,8 @@ pub struct AppListingPublishPayload {
pub price_currency: String,
pub stock_quantity: Option<u32>,
pub availability_window_id: Option<FulfillmentWindowId>,
+ pub availability_starts_at: Option<String>,
+ pub availability_ends_at: Option<String>,
pub fulfillment_method: Option<String>,
pub fulfillment_location: Option<String>,
pub status: ProductStatus,
@@ -192,7 +194,16 @@ impl AppPublishPayload {
if payload.price_currency.trim().is_empty() {
failures.push(AppPublishValidationFailure::MissingListingCurrency);
}
- if payload.availability_window_id.is_none() {
+ if payload.availability_window_id.is_none()
+ || payload
+ .availability_starts_at
+ .as_deref()
+ .is_none_or(|value| value.trim().is_empty())
+ || payload
+ .availability_ends_at
+ .as_deref()
+ .is_none_or(|value| value.trim().is_empty())
+ {
failures.push(AppPublishValidationFailure::MissingListingAvailability);
}
if payload.stock_quantity.is_none() {
@@ -457,6 +468,8 @@ mod tests {
price_currency: String::new(),
stock_quantity: Some(4),
availability_window_id: None,
+ availability_starts_at: None,
+ availability_ends_at: None,
fulfillment_method: None,
fulfillment_location: None,
status: ProductStatus::Published,