commit 4f67a9eeffa497f427b1dd0bd70435db81a82641
parent 67e4835073fa87788281d9c257bd7333dbb3e046
Author: triesap <tyson@radroots.org>
Date: Sun, 24 May 2026 17:40:36 +0000
app: add buyer order coordination recovery
- add a durable buyer order coordination checkpoint table to the app sqlite schema
- create pending coordination records inside the buyer checkout transaction
- track local-events append attempts, failures, synced state, and restart retry recovery
- cover success, failure, recovery, and idempotent retry paths in app tests
Diffstat:
5 files changed, 587 insertions(+), 9 deletions(-)
diff --git a/crates/launchers/desktop/src/runtime.rs b/crates/launchers/desktop/src/runtime.rs
@@ -338,6 +338,11 @@ impl DesktopAppRuntime {
self.lock_state_mut().place_personal_order()
}
+ pub fn retry_pending_personal_order_coordination(&self) -> Result<bool, AppSqliteError> {
+ self.lock_state_mut()
+ .retry_pending_personal_order_coordination()
+ }
+
pub fn open_personal_order_detail(&self, order_id: OrderId) -> Result<bool, AppSqliteError> {
self.lock_state_mut().open_personal_order_detail(order_id)
}
@@ -1016,6 +1021,14 @@ impl DesktopAppRuntimeState {
startup_issue: None,
};
let _ = state.apply_selected_account_context(&selected_account_context);
+ if let Err(error) = state.retry_pending_personal_order_coordination() {
+ error!(
+ target: "buyer_order",
+ event = "buyer_order.coordination_bootstrap_retry_failed",
+ error = %error,
+ "failed to retry pending buyer order coordination during bootstrap"
+ );
+ }
Ok(state)
}
@@ -1417,7 +1430,11 @@ impl DesktopAppRuntimeState {
reason: "buyer order write did not surface in buyer order local event export",
});
};
- self.append_app_buyer_order_request_local_work_record(&buyer_context, &order_export)?;
+ self.append_app_buyer_order_request_local_work_record(
+ sqlite_store,
+ &buyer_context,
+ &order_export,
+ )?;
let personal_changed = self.mutate_personal_projection(|projection| {
let mut changed = false;
@@ -1458,6 +1475,37 @@ impl DesktopAppRuntimeState {
Ok(personal_changed || section_changed || pending_changed)
}
+ fn retry_pending_personal_order_coordination(&mut self) -> Result<bool, AppSqliteError> {
+ let Some(sqlite_store) = self.sqlite_store.as_ref() else {
+ return Ok(false);
+ };
+ let buyer_context = self.state_store.identity_projection().buyer_context();
+ let records =
+ sqlite_store.load_recoverable_buyer_order_coordination_records(&buyer_context)?;
+ let mut changed = false;
+
+ for record in records {
+ let Some(order_export) = sqlite_store
+ .load_buyer_order_local_event_export(&buyer_context, record.order_id)?
+ else {
+ let _ = sqlite_store.mark_buyer_order_coordination_failed(
+ &buyer_context,
+ record.order_id,
+ "buyer order local event export is unavailable",
+ )?;
+ continue;
+ };
+
+ changed |= self.append_app_buyer_order_request_local_work_record(
+ sqlite_store,
+ &buyer_context,
+ &order_export,
+ )?;
+ }
+
+ Ok(changed)
+ }
+
fn open_personal_order_detail(&mut self, order_id: OrderId) -> Result<bool, AppSqliteError> {
let Some(sqlite_store) = self.sqlite_store.as_ref() else {
return Ok(false);
@@ -3551,12 +3599,16 @@ impl DesktopAppRuntimeState {
fn append_app_buyer_order_request_local_work_record(
&self,
+ sqlite_store: &AppSqliteStore,
buyer_context: &BuyerContext,
order: &BuyerOrderLocalEventExport,
) -> Result<bool, AppSqliteError> {
let Some(shared_accounts_paths) = self.shared_accounts_paths.as_ref() else {
return Ok(false);
};
+ if sqlite_store.buyer_order_coordination_is_synced(buyer_context, order.order_id)? {
+ return Ok(true);
+ }
let timestamp = current_runtime_time_ms()?;
let record_id = buyer_order_request_local_work_record_id(
order.order_id.to_string().as_str(),
@@ -3583,8 +3635,18 @@ impl DesktopAppRuntimeState {
source,
}
})?;
+ let payload_json =
+ serde_json::to_string(&payload).map_err(|_| AppSqliteError::InvalidProjection {
+ reason: "buyer order request local work payload must encode",
+ })?;
+ sqlite_store.prepare_buyer_order_coordination_attempt(
+ buyer_context,
+ order.order_id,
+ record_id.as_str(),
+ payload_json.as_str(),
+ )?;
let input = LocalEventRecordInput {
- record_id,
+ record_id: record_id.clone(),
family: LocalRecordFamily::LocalWork,
status: LocalRecordStatus::LocalSaved,
source_runtime: SourceRuntime::App,
@@ -3608,7 +3670,16 @@ impl DesktopAppRuntimeState {
relay_delivery_json: None,
};
- self.append_app_local_work_record(shared_accounts_paths, &input)?;
+ if let Err(error) = self.append_app_local_work_record(shared_accounts_paths, &input) {
+ let failure_message = error.to_string();
+ let _ = sqlite_store.mark_buyer_order_coordination_failed(
+ buyer_context,
+ order.order_id,
+ failure_message.as_str(),
+ );
+ return Err(error);
+ }
+ sqlite_store.mark_buyer_order_coordination_synced(buyer_context, order.order_id)?;
Ok(true)
}
@@ -5677,7 +5748,8 @@ mod tests {
RadrootsAppRemoteSignerPendingSession, RadrootsAppRemoteSignerSessionRecord,
};
use radroots_app_sqlite::{
- AppSqliteError, AppSqliteStore, DatabaseTarget, latest_schema_version,
+ AppSqliteError, AppSqliteStore, BuyerOrderCoordinationState, DatabaseTarget,
+ latest_schema_version,
};
use radroots_app_state::{
APP_STATE_FILE_NAME, AppStateCommand, AppStatePersistenceRepository, AppStateRepository,
@@ -8444,6 +8516,7 @@ mod tests {
{
let state = runtime.lock_state_mut();
let buyer_context = state.state_store.identity_projection().buyer_context();
+ let sqlite_store = state.sqlite_store.as_ref().expect("sqlite store");
let order_export = state
.sqlite_store
.as_ref()
@@ -8451,11 +8524,32 @@ mod tests {
.load_buyer_order_local_event_export(&buyer_context, order_id)
.expect("order export should load")
.expect("order export should exist");
+ let coordination = sqlite_store
+ .load_buyer_order_coordination_record(&buyer_context, order_id)
+ .expect("order coordination should load")
+ .expect("order coordination should exist");
+ assert_eq!(coordination.state, BuyerOrderCoordinationState::Synced);
+ assert_eq!(
+ coordination.record_id.as_deref(),
+ Some(format!("app:local_work:order_request:{order_id}").as_str())
+ );
+ assert!(coordination.payload_json.is_some());
+ assert_eq!(coordination.attempt_count, 1);
+ assert_eq!(coordination.last_error_message, None);
assert!(
state
- .append_app_buyer_order_request_local_work_record(&buyer_context, &order_export)
+ .append_app_buyer_order_request_local_work_record(
+ sqlite_store,
+ &buyer_context,
+ &order_export,
+ )
.expect("order local event reappend should be idempotent")
);
+ let coordination_after = sqlite_store
+ .load_buyer_order_coordination_record(&buyer_context, order_id)
+ .expect("order coordination should reload")
+ .expect("order coordination should still exist");
+ assert_eq!(coordination_after.attempt_count, 1);
}
let records = shared_local_event_records(&paths);
@@ -8532,7 +8626,7 @@ mod tests {
}
#[test]
- fn runtime_buyer_order_shared_append_failure_blocks_visible_completion() {
+ fn runtime_buyer_order_shared_append_failure_is_recoverable_after_restart() {
let (runtime, paths) = bootstrapped_runtime("buyer_order_append_failure");
assert!(
runtime
@@ -8588,6 +8682,63 @@ mod tests {
);
assert!(summary.personal_projection.orders.list.rows.is_empty());
assert!(summary.personal_projection.orders.detail.is_none());
+ let order_id = {
+ let state = runtime.lock_state_mut();
+ let buyer_context = state.state_store.identity_projection().buyer_context();
+ let sqlite_store = state.sqlite_store.as_ref().expect("sqlite store");
+ let buyer_orders = sqlite_store
+ .load_buyer_orders(&buyer_context)
+ .expect("buyer order should persist after coordination failure");
+ assert_eq!(buyer_orders.rows.len(), 1);
+ let order_id = buyer_orders.rows[0].order_id;
+ let coordination = sqlite_store
+ .load_buyer_order_coordination_record(&buyer_context, order_id)
+ .expect("buyer order coordination should load")
+ .expect("buyer order coordination should exist");
+ assert_eq!(coordination.state, BuyerOrderCoordinationState::Failed);
+ assert_eq!(coordination.attempt_count, 1);
+ assert!(coordination.record_id.is_some());
+ assert!(coordination.payload_json.is_some());
+ assert!(coordination.last_error_message.is_some());
+ order_id
+ };
+ drop(runtime);
+ unblock_shared_local_events_database(&paths);
+
+ let restarted_runtime = restart_runtime(paths.clone());
+ let records = shared_local_event_records(&paths);
+ let order_records = records
+ .iter()
+ .filter(|record| {
+ record.source_runtime == SourceRuntime::App
+ && record
+ .local_work_json
+ .as_ref()
+ .and_then(|payload| payload["record_kind"].as_str())
+ == Some(BUYER_ORDER_REQUEST_LOCAL_WORK_RECORD_KIND)
+ })
+ .collect::<Vec<_>>();
+ assert_eq!(order_records.len(), 1);
+ assert_eq!(
+ order_records[0].record_id,
+ format!("app:local_work:order_request:{order_id}")
+ );
+ let state = restarted_runtime.lock_state_mut();
+ let buyer_context = state.state_store.identity_projection().buyer_context();
+ let sqlite_store = state.sqlite_store.as_ref().expect("sqlite store");
+ let coordination = sqlite_store
+ .load_buyer_order_coordination_record(&buyer_context, order_id)
+ .expect("buyer order coordination should reload")
+ .expect("buyer order coordination should still exist");
+ assert_eq!(coordination.state, BuyerOrderCoordinationState::Synced);
+ assert_eq!(coordination.attempt_count, 2);
+ assert_eq!(coordination.last_error_message, None);
+ drop(state);
+ assert!(
+ !restarted_runtime
+ .retry_pending_personal_order_coordination()
+ .expect("synced buyer order recovery retry should be idempotent")
+ );
cleanup_bootstrapped_runtime_paths(&paths);
}
@@ -12064,6 +12215,15 @@ mod tests {
fs::create_dir(&database_path).expect("blocking directory should create");
}
+ fn unblock_shared_local_events_database(paths: &AppDesktopRuntimePaths) {
+ let database_path = paths
+ .shared_local_events_database_path()
+ .expect("shared local events path");
+ if database_path.is_dir() {
+ fs::remove_dir_all(&database_path).expect("blocking directory should remove");
+ }
+ }
+
fn fixture_pending_session() -> RadrootsAppRemoteSignerPendingSession {
let signer_identity = RadrootsIdentity::from_secret_key_str(
"1111111111111111111111111111111111111111111111111111111111111111",
diff --git a/crates/shared/sqlite/migrations/0014_buyer_order_coordination.sql b/crates/shared/sqlite/migrations/0014_buyer_order_coordination.sql
@@ -0,0 +1,18 @@
+CREATE TABLE buyer_order_coordination_records (
+ order_id TEXT PRIMARY KEY NOT NULL REFERENCES orders(id) ON DELETE CASCADE,
+ buyer_context_key TEXT NOT NULL,
+ record_id TEXT,
+ state TEXT NOT NULL CHECK (state IN ('pending', 'synced', 'failed')),
+ payload_json TEXT,
+ attempt_count INTEGER NOT NULL DEFAULT 0 CHECK (attempt_count >= 0),
+ last_error_message TEXT,
+ created_at TEXT NOT NULL,
+ updated_at TEXT NOT NULL,
+ synced_at TEXT
+);
+
+CREATE INDEX idx_buyer_order_coordination_context_state_updated_at
+ ON buyer_order_coordination_records(buyer_context_key, state, updated_at);
+
+CREATE INDEX idx_buyer_order_coordination_state_updated_at
+ ON buyer_order_coordination_records(state, updated_at);
diff --git a/crates/shared/sqlite/src/buyer.rs b/crates/shared/sqlite/src/buyer.rs
@@ -58,6 +58,35 @@ pub struct BuyerOrderLocalEventLine {
pub seller_pubkey: Option<String>,
}
+#[derive(Clone, Copy, Debug, Eq, PartialEq)]
+pub enum BuyerOrderCoordinationState {
+ Pending,
+ Synced,
+ Failed,
+}
+
+impl BuyerOrderCoordinationState {
+ fn from_storage_key(field: &'static str, value: String) -> Result<Self, AppSqliteError> {
+ match value.as_str() {
+ "pending" => Ok(Self::Pending),
+ "synced" => Ok(Self::Synced),
+ "failed" => Ok(Self::Failed),
+ _ => Err(AppSqliteError::DecodeEnum { field, value }),
+ }
+ }
+}
+
+#[derive(Clone, Debug, Eq, PartialEq)]
+pub struct BuyerOrderCoordinationRecord {
+ pub order_id: OrderId,
+ pub buyer_context_key: String,
+ pub record_id: Option<String>,
+ pub state: BuyerOrderCoordinationState,
+ pub payload_json: Option<String>,
+ pub attempt_count: u32,
+ pub last_error_message: Option<String>,
+}
+
pub struct AppBuyerRepository<'a> {
connection: &'a Connection,
}
@@ -416,6 +445,7 @@ impl<'a> AppBuyerRepository<'a> {
operation: "reset buyer cart header after checkout",
source,
})?;
+ self.insert_pending_buyer_order_coordination(context_key.as_str(), order_id)?;
Ok(order_id)
})();
@@ -437,6 +467,222 @@ impl<'a> AppBuyerRepository<'a> {
}
}
+ pub fn load_buyer_order_coordination_record(
+ &self,
+ context: &BuyerContext,
+ order_id: OrderId,
+ ) -> Result<Option<BuyerOrderCoordinationRecord>, AppSqliteError> {
+ let context_key = context.storage_key();
+
+ self.connection
+ .query_row(
+ "select
+ order_id,
+ buyer_context_key,
+ record_id,
+ state,
+ payload_json,
+ attempt_count,
+ last_error_message
+ from buyer_order_coordination_records
+ where buyer_context_key = ?1 and order_id = ?2
+ limit 1",
+ params![context_key.as_str(), order_id.to_string()],
+ |row| {
+ Ok((
+ row.get::<_, String>(0)?,
+ row.get::<_, String>(1)?,
+ row.get::<_, Option<String>>(2)?,
+ row.get::<_, String>(3)?,
+ row.get::<_, Option<String>>(4)?,
+ row.get::<_, i64>(5)?,
+ row.get::<_, Option<String>>(6)?,
+ ))
+ },
+ )
+ .optional()
+ .map_err(|source| AppSqliteError::Query {
+ operation: "load buyer order coordination record",
+ source,
+ })?
+ .map(buyer_order_coordination_record_from_row)
+ .transpose()
+ }
+
+ pub fn load_recoverable_buyer_order_coordination_records(
+ &self,
+ context: &BuyerContext,
+ ) -> Result<Vec<BuyerOrderCoordinationRecord>, AppSqliteError> {
+ let context_key = context.storage_key();
+ let mut statement = self
+ .connection
+ .prepare(
+ "select
+ order_id,
+ buyer_context_key,
+ record_id,
+ state,
+ payload_json,
+ attempt_count,
+ last_error_message
+ from buyer_order_coordination_records
+ where buyer_context_key = ?1 and state in ('pending', 'failed')
+ order by updated_at asc, order_id asc",
+ )
+ .map_err(|source| AppSqliteError::Query {
+ operation: "prepare recoverable buyer order coordination records",
+ source,
+ })?;
+ let rows = statement
+ .query_map(params![context_key.as_str()], |row| {
+ Ok((
+ row.get::<_, String>(0)?,
+ row.get::<_, String>(1)?,
+ row.get::<_, Option<String>>(2)?,
+ row.get::<_, String>(3)?,
+ row.get::<_, Option<String>>(4)?,
+ row.get::<_, i64>(5)?,
+ row.get::<_, Option<String>>(6)?,
+ ))
+ })
+ .map_err(|source| AppSqliteError::Query {
+ operation: "query recoverable buyer order coordination records",
+ source,
+ })?;
+
+ rows.map(|row| {
+ buyer_order_coordination_record_from_row(row.map_err(|source| {
+ AppSqliteError::Query {
+ operation: "read recoverable buyer order coordination record",
+ source,
+ }
+ })?)
+ })
+ .collect()
+ }
+
+ pub fn buyer_order_coordination_is_synced(
+ &self,
+ context: &BuyerContext,
+ order_id: OrderId,
+ ) -> Result<bool, AppSqliteError> {
+ Ok(self
+ .load_buyer_order_coordination_record(context, order_id)?
+ .is_some_and(|record| record.state == BuyerOrderCoordinationState::Synced))
+ }
+
+ pub fn prepare_buyer_order_coordination_attempt(
+ &self,
+ context: &BuyerContext,
+ order_id: OrderId,
+ record_id: &str,
+ payload_json: &str,
+ ) -> Result<bool, AppSqliteError> {
+ let context_key = context.storage_key();
+ let changed = self
+ .connection
+ .execute(
+ "insert into buyer_order_coordination_records (
+ order_id,
+ buyer_context_key,
+ record_id,
+ state,
+ payload_json,
+ attempt_count,
+ last_error_message,
+ created_at,
+ updated_at,
+ synced_at
+ ) values (
+ ?1,
+ ?2,
+ ?3,
+ 'pending',
+ ?4,
+ 1,
+ null,
+ strftime('%Y-%m-%dT%H:%M:%SZ', 'now'),
+ strftime('%Y-%m-%dT%H:%M:%SZ', 'now'),
+ null
+ )
+ on conflict(order_id) do update set
+ record_id = excluded.record_id,
+ state = 'pending',
+ payload_json = excluded.payload_json,
+ attempt_count = buyer_order_coordination_records.attempt_count + 1,
+ last_error_message = null,
+ updated_at = excluded.updated_at,
+ synced_at = null
+ where buyer_order_coordination_records.buyer_context_key = excluded.buyer_context_key
+ and buyer_order_coordination_records.state <> 'synced'",
+ params![
+ order_id.to_string(),
+ context_key.as_str(),
+ record_id,
+ payload_json
+ ],
+ )
+ .map_err(|source| AppSqliteError::Query {
+ operation: "prepare buyer order coordination attempt",
+ source,
+ })?;
+
+ Ok(changed == 1)
+ }
+
+ pub fn mark_buyer_order_coordination_synced(
+ &self,
+ context: &BuyerContext,
+ order_id: OrderId,
+ ) -> Result<bool, AppSqliteError> {
+ let context_key = context.storage_key();
+ let changed = self
+ .connection
+ .execute(
+ "update buyer_order_coordination_records
+ set
+ state = 'synced',
+ last_error_message = null,
+ updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'),
+ synced_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
+ where buyer_context_key = ?1 and order_id = ?2",
+ params![context_key.as_str(), order_id.to_string()],
+ )
+ .map_err(|source| AppSqliteError::Query {
+ operation: "mark buyer order coordination synced",
+ source,
+ })?;
+
+ Ok(changed == 1)
+ }
+
+ pub fn mark_buyer_order_coordination_failed(
+ &self,
+ context: &BuyerContext,
+ order_id: OrderId,
+ error_message: &str,
+ ) -> Result<bool, AppSqliteError> {
+ let context_key = context.storage_key();
+ let changed = self
+ .connection
+ .execute(
+ "update buyer_order_coordination_records
+ set
+ state = 'failed',
+ last_error_message = ?3,
+ updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now'),
+ synced_at = null
+ where buyer_context_key = ?1 and order_id = ?2",
+ params![context_key.as_str(), order_id.to_string(), error_message],
+ )
+ .map_err(|source| AppSqliteError::Query {
+ operation: "mark buyer order coordination failed",
+ source,
+ })?;
+
+ Ok(changed == 1)
+ }
+
pub fn load_buyer_orders(
&self,
context: &BuyerContext,
@@ -839,6 +1085,38 @@ impl<'a> AppBuyerRepository<'a> {
})
}
+ fn insert_pending_buyer_order_coordination(
+ &self,
+ context_key: &str,
+ order_id: OrderId,
+ ) -> Result<(), AppSqliteError> {
+ self.connection
+ .execute(
+ "insert into buyer_order_coordination_records (
+ order_id,
+ buyer_context_key,
+ state,
+ attempt_count,
+ created_at,
+ updated_at
+ ) values (
+ ?1,
+ ?2,
+ 'pending',
+ 0,
+ strftime('%Y-%m-%dT%H:%M:%SZ', 'now'),
+ strftime('%Y-%m-%dT%H:%M:%SZ', 'now')
+ )",
+ params![order_id.to_string(), context_key],
+ )
+ .map_err(|source| AppSqliteError::Query {
+ operation: "insert pending buyer order coordination record",
+ source,
+ })?;
+
+ Ok(())
+ }
+
fn load_listing_records(&self) -> Result<Vec<BuyerListingRecord>, AppSqliteError> {
let mut statement = self
.connection
@@ -2049,6 +2327,38 @@ where
.map_err(|_| AppSqliteError::DecodeId { field, value })
}
+fn buyer_order_coordination_record_from_row(
+ row: (
+ String,
+ String,
+ Option<String>,
+ String,
+ Option<String>,
+ i64,
+ Option<String>,
+ ),
+) -> Result<BuyerOrderCoordinationRecord, AppSqliteError> {
+ let (order_id, buyer_context_key, record_id, state, payload_json, attempt_count, last_error) =
+ row;
+ let attempt_count =
+ u32::try_from(attempt_count).map_err(|_| AppSqliteError::InvalidProjection {
+ reason: "buyer order coordination attempt count must be non-negative",
+ })?;
+
+ Ok(BuyerOrderCoordinationRecord {
+ order_id: parse_typed_id("buyer_order_coordination_records.order_id", order_id)?,
+ buyer_context_key,
+ record_id: record_id.and_then(empty_string_to_none),
+ state: BuyerOrderCoordinationState::from_storage_key(
+ "buyer_order_coordination_records.state",
+ state,
+ )?,
+ payload_json: payload_json.and_then(empty_string_to_none),
+ attempt_count,
+ last_error_message: last_error.and_then(empty_string_to_none),
+ })
+}
+
fn parse_optional_typed_id<T>(
field: &'static str,
value: Option<String>,
@@ -2117,7 +2427,10 @@ mod tests {
};
use rusqlite::{Connection, params};
- use crate::{AppSqliteError, AppSqliteStore, BuyerRepeatDemandApplyOutcome, DatabaseTarget};
+ use crate::{
+ AppSqliteError, AppSqliteStore, BuyerOrderCoordinationState, BuyerRepeatDemandApplyOutcome,
+ DatabaseTarget,
+ };
use super::AppBuyerRepository;
@@ -2290,6 +2603,10 @@ mod tests {
let cart_after_checkout = repository
.load_buyer_cart(&context)
.expect("buyer cart should load after checkout");
+ let coordination = repository
+ .load_buyer_order_coordination_record(&context, order_id)
+ .expect("buyer order coordination should load")
+ .expect("buyer order coordination should exist");
assert!(checkout.can_place_order);
assert_eq!(checkout.summary.line_count, 1);
@@ -2322,6 +2639,13 @@ mod tests {
"Leave by the cooler".to_owned(),
)
);
+ assert_eq!(coordination.order_id, order_id);
+ assert_eq!(coordination.buyer_context_key, "guest");
+ assert_eq!(coordination.state, BuyerOrderCoordinationState::Pending);
+ assert_eq!(coordination.record_id, None);
+ assert_eq!(coordination.payload_json, None);
+ assert_eq!(coordination.attempt_count, 0);
+ assert_eq!(coordination.last_error_message, None);
assert_eq!(row_count(connection, "local_outbox"), 0);
assert_eq!(row_count(connection, "local_conflicts"), 0);
assert_eq!(row_count(connection, "sync_checkpoints"), 0);
diff --git a/crates/shared/sqlite/src/lib.rs b/crates/shared/sqlite/src/lib.rs
@@ -38,8 +38,8 @@ pub use activity::{
APP_ACTIVITY_CONTEXT_LIMIT, APP_ACTIVITY_RETENTION_LIMIT, AppActivityRepository,
};
pub use buyer::{
- AppBuyerRepository, BuyerOrderLocalEventExport, BuyerOrderLocalEventLine,
- BuyerRepeatDemandApplyOutcome,
+ AppBuyerRepository, BuyerOrderCoordinationRecord, BuyerOrderCoordinationState,
+ BuyerOrderLocalEventExport, BuyerOrderLocalEventLine, BuyerRepeatDemandApplyOutcome,
};
pub use error::AppSqliteError;
pub use farm_rules::{AppFarmRulesRepository, derive_farm_rules_readiness};
@@ -468,6 +468,62 @@ impl AppSqliteStore {
.load_buyer_order_local_event_export(context, order_id)
}
+ pub fn load_buyer_order_coordination_record(
+ &self,
+ context: &BuyerContext,
+ order_id: OrderId,
+ ) -> Result<Option<BuyerOrderCoordinationRecord>, AppSqliteError> {
+ self.buyer_repository()
+ .load_buyer_order_coordination_record(context, order_id)
+ }
+
+ pub fn load_recoverable_buyer_order_coordination_records(
+ &self,
+ context: &BuyerContext,
+ ) -> Result<Vec<BuyerOrderCoordinationRecord>, AppSqliteError> {
+ self.buyer_repository()
+ .load_recoverable_buyer_order_coordination_records(context)
+ }
+
+ pub fn buyer_order_coordination_is_synced(
+ &self,
+ context: &BuyerContext,
+ order_id: OrderId,
+ ) -> Result<bool, AppSqliteError> {
+ self.buyer_repository()
+ .buyer_order_coordination_is_synced(context, order_id)
+ }
+
+ pub fn prepare_buyer_order_coordination_attempt(
+ &self,
+ context: &BuyerContext,
+ order_id: OrderId,
+ record_id: &str,
+ payload_json: &str,
+ ) -> Result<bool, AppSqliteError> {
+ self.buyer_repository()
+ .prepare_buyer_order_coordination_attempt(context, order_id, record_id, payload_json)
+ }
+
+ pub fn mark_buyer_order_coordination_synced(
+ &self,
+ context: &BuyerContext,
+ order_id: OrderId,
+ ) -> Result<bool, AppSqliteError> {
+ self.buyer_repository()
+ .mark_buyer_order_coordination_synced(context, order_id)
+ }
+
+ pub fn mark_buyer_order_coordination_failed(
+ &self,
+ context: &BuyerContext,
+ order_id: OrderId,
+ error_message: &str,
+ ) -> Result<bool, AppSqliteError> {
+ self.buyer_repository()
+ .mark_buyer_order_coordination_failed(context, order_id, error_message)
+ }
+
pub fn apply_buyer_repeat_demand_to_cart(
&self,
context: &BuyerContext,
@@ -711,6 +767,7 @@ mod tests {
assert!(table_exists(connection, "reminder_schedules"));
assert!(table_exists(connection, "reminder_log_entries"));
assert!(table_exists(connection, "order_recovery_records"));
+ assert!(table_exists(connection, "buyer_order_coordination_records"));
assert!(column_exists(connection, "farms", "timezone"));
assert!(column_exists(connection, "farms", "currency_code"));
assert!(column_exists(connection, "local_outbox", "account_id"));
@@ -770,6 +827,21 @@ mod tests {
));
assert!(column_exists(
connection,
+ "buyer_order_coordination_records",
+ "state"
+ ));
+ assert!(column_exists(
+ connection,
+ "buyer_order_coordination_records",
+ "payload_json"
+ ));
+ assert!(column_exists(
+ connection,
+ "buyer_order_coordination_records",
+ "last_error_message"
+ ));
+ assert!(column_exists(
+ connection,
"order_recovery_records",
"recovery_state"
));
diff --git a/crates/shared/sqlite/src/migrations.rs b/crates/shared/sqlite/src/migrations.rs
@@ -56,6 +56,10 @@ const MIGRATIONS: &[Migration] = &[
version: 13,
sql: include_str!("../migrations/0013_local_interop_projection_cursor.sql"),
},
+ Migration {
+ version: 14,
+ sql: include_str!("../migrations/0014_buyer_order_coordination.sql"),
+ },
];
pub fn latest_schema_version() -> u32 {