commit 43ea1dd89f4b3ee36547307e825f984ae8526dab
parent 3bf52ddbe2a0136790dbe20e1ff5dfbc7816e6cf
Author: triesap <tyson@radroots.org>
Date: Sun, 24 May 2026 23:54:11 +0000
sync: make app outbox deterministic
Diffstat:
7 files changed, 431 insertions(+), 65 deletions(-)
diff --git a/crates/launchers/desktop/src/runtime.rs b/crates/launchers/desktop/src/runtime.rs
@@ -5838,27 +5838,23 @@ fn current_utc_timestamp() -> String {
fn pending_sync_upsert(aggregate: SyncAggregateRef, payload_json: String) -> PendingSyncOperation {
let created_at = current_utc_timestamp();
- PendingSyncOperation {
+ PendingSyncOperation::new(
aggregate,
- operation: SyncOperationKind::Upsert,
+ SyncOperationKind::Upsert,
payload_json,
- created_at: created_at.clone(),
- available_at: created_at,
- attempt_count: 0,
- }
+ created_at,
+ )
}
fn pending_sync_delete(aggregate: SyncAggregateRef, payload_json: String) -> PendingSyncOperation {
let created_at = current_utc_timestamp();
- PendingSyncOperation {
+ PendingSyncOperation::new(
aggregate,
- operation: SyncOperationKind::Delete,
+ SyncOperationKind::Delete,
payload_json,
- created_at: created_at.clone(),
- available_at: created_at,
- attempt_count: 0,
- }
+ created_at,
+ )
}
fn farm_sync_payload(
@@ -5984,9 +5980,10 @@ mod tests {
};
use radroots_app_sync::{
AppSyncRequest, AppSyncResult, AppSyncRunStatus, AppSyncTransport, AppSyncTransportError,
- PendingSyncOperation, RecordedAppSyncTransport, SyncAggregateRef, SyncCheckpointState,
- SyncCheckpointStatus, SyncConflict, SyncConflictKind, SyncConflictResolutionStatus,
- SyncConflictSeverity, SyncOperationKind, SyncTrigger,
+ PendingSyncOperation, PendingSyncOperationState, RecordedAppSyncTransport,
+ SyncAggregateRef, SyncCheckpointState, SyncCheckpointStatus, SyncConflict,
+ SyncConflictKind, SyncConflictResolutionStatus, SyncConflictSeverity, SyncOperationKind,
+ SyncTrigger,
};
use radroots_identity::RadrootsIdentity;
use radroots_local_events::{
@@ -6225,14 +6222,12 @@ mod tests {
sqlite_store
.enqueue_pending_sync_operation(
&account_id,
- &PendingSyncOperation {
- aggregate: SyncAggregateRef::Farm(farm_id),
- operation: SyncOperationKind::Upsert,
- payload_json: "{\"farm\":\"queued\"}".to_owned(),
- created_at: "2026-04-20T19:02:00Z".to_owned(),
- available_at: "2026-04-20T19:02:00Z".to_owned(),
- attempt_count: 0,
- },
+ &PendingSyncOperation::new(
+ SyncAggregateRef::Farm(farm_id),
+ SyncOperationKind::Upsert,
+ "{\"farm\":\"queued\"}",
+ "2026-04-20T19:02:00Z",
+ ),
)
.expect("pending sync operation should save");
}
@@ -6278,6 +6273,89 @@ mod tests {
}
#[test]
+ fn runtime_outbox_repeated_product_save_deduplicates_active_pending_sync() {
+ let runtime = memory_runtime();
+ let (account_id, farm_id) = provision_ready_farmer_account(&runtime);
+
+ assert!(
+ runtime
+ .open_new_product_editor()
+ .expect("new product editor should open")
+ );
+ let product_id = match runtime.summary().products_projection.editor {
+ radroots_app_state::ProductEditorState::Open(session) => session
+ .selected_product_id
+ .expect("open product editor should select a product"),
+ radroots_app_state::ProductEditorState::Closed => {
+ panic!("product editor should be open")
+ }
+ };
+ let first_draft = ProductEditorDraft {
+ title: "Salad mix".to_owned(),
+ subtitle: "Spring blend".to_owned(),
+ unit_label: "bag".to_owned(),
+ price_minor_units: Some(700),
+ price_currency: "USD".to_owned(),
+ stock_quantity: Some(8),
+ availability_window_id: None,
+ status: ProductStatus::Draft,
+ };
+ let second_draft = ProductEditorDraft {
+ title: "Winter greens".to_owned(),
+ subtitle: "Cut this morning".to_owned(),
+ unit_label: "bag".to_owned(),
+ price_minor_units: Some(900),
+ price_currency: "USD".to_owned(),
+ stock_quantity: Some(11),
+ availability_window_id: None,
+ status: ProductStatus::Published,
+ };
+
+ assert!(
+ runtime
+ .save_product_editor_draft(first_draft)
+ .expect("first product editor save should succeed")
+ );
+ assert!(
+ runtime
+ .save_product_editor_draft(second_draft.clone())
+ .expect("second product editor save should succeed")
+ );
+
+ let pending_operations = runtime
+ .lock_state()
+ .sqlite_store
+ .as_ref()
+ .expect("sqlite store")
+ .load_pending_sync_operations(account_id.as_str())
+ .expect("pending sync operations should load");
+ let expected_payload = super::product_sync_payload(
+ product_id,
+ Some(farm_id),
+ "save_product_editor_draft",
+ Some(&second_draft),
+ second_draft.stock_quantity,
+ None,
+ );
+
+ assert_eq!(pending_operations.len(), 1);
+ assert_eq!(
+ pending_operations[0].operation.operation_key,
+ format!("product:{product_id}:upsert")
+ );
+ assert_eq!(
+ pending_operations[0].operation.payload_json,
+ expected_payload
+ );
+ assert_eq!(
+ pending_operations[0].operation.state,
+ PendingSyncOperationState::Pending
+ );
+ assert_eq!(pending_operations[0].operation.attempt_count, 0);
+ assert_eq!(pending_operations[0].operation.last_error_message, None);
+ }
+
+ #[test]
fn runtime_local_product_mutations_enqueue_pending_sync_without_transport_calls() {
let runtime = memory_runtime();
let (account_id, _) = provision_ready_farmer_account(&runtime);
diff --git a/crates/shared/sqlite/migrations/0016_deterministic_outbox.sql b/crates/shared/sqlite/migrations/0016_deterministic_outbox.sql
@@ -0,0 +1,77 @@
+ALTER TABLE local_outbox RENAME TO local_outbox_legacy;
+
+CREATE TABLE local_outbox (
+ id TEXT PRIMARY KEY NOT NULL,
+ account_id TEXT NOT NULL,
+ operation_key TEXT NOT NULL,
+ aggregate_kind TEXT NOT NULL CHECK (
+ aggregate_kind IN ('farm', 'fulfillment_window', 'product', 'order')
+ ),
+ aggregate_id TEXT NOT NULL,
+ operation_kind TEXT NOT NULL CHECK (operation_kind IN ('upsert', 'delete')),
+ payload_json TEXT NOT NULL,
+ created_at TEXT NOT NULL,
+ available_at TEXT NOT NULL,
+ attempt_count INTEGER NOT NULL DEFAULT 0,
+ state TEXT NOT NULL DEFAULT 'pending' CHECK (
+ state IN (
+ 'pending',
+ 'in_progress',
+ 'succeeded',
+ 'failed',
+ 'blocked',
+ 'retryable'
+ )
+ ),
+ last_error_message TEXT
+);
+
+CREATE UNIQUE INDEX idx_local_outbox_account_operation_key_active ON local_outbox(
+ account_id,
+ operation_key
+)
+WHERE state IN ('pending', 'in_progress', 'failed', 'blocked', 'retryable');
+
+INSERT OR REPLACE INTO local_outbox (
+ id,
+ account_id,
+ operation_key,
+ aggregate_kind,
+ aggregate_id,
+ operation_kind,
+ payload_json,
+ created_at,
+ available_at,
+ attempt_count,
+ state,
+ last_error_message
+)
+SELECT
+ id,
+ account_id,
+ aggregate_kind || ':' || aggregate_id || ':' || operation_kind,
+ aggregate_kind,
+ aggregate_id,
+ operation_kind,
+ payload_json,
+ created_at,
+ available_at,
+ attempt_count,
+ 'pending',
+ NULL
+FROM local_outbox_legacy
+ORDER BY available_at ASC, created_at ASC, id ASC;
+
+DROP TABLE local_outbox_legacy;
+
+CREATE INDEX idx_local_outbox_account_available_at ON local_outbox(
+ account_id,
+ available_at,
+ created_at,
+ id
+);
+CREATE INDEX idx_local_outbox_account_aggregate ON local_outbox(
+ account_id,
+ aggregate_kind,
+ aggregate_id
+);
diff --git a/crates/shared/sqlite/src/lib.rs b/crates/shared/sqlite/src/lib.rs
@@ -771,6 +771,13 @@ mod tests {
assert!(column_exists(connection, "farms", "timezone"));
assert!(column_exists(connection, "farms", "currency_code"));
assert!(column_exists(connection, "local_outbox", "account_id"));
+ assert!(column_exists(connection, "local_outbox", "operation_key"));
+ assert!(column_exists(connection, "local_outbox", "state"));
+ assert!(column_exists(
+ connection,
+ "local_outbox",
+ "last_error_message"
+ ));
assert!(column_exists(connection, "local_conflicts", "account_id"));
assert!(column_exists(connection, "local_conflicts", "severity"));
assert!(column_exists(
@@ -939,6 +946,8 @@ mod tests {
latest_schema_version()
);
assert!(column_exists(connection, "local_outbox", "account_id"));
+ assert!(column_exists(connection, "local_outbox", "operation_key"));
+ assert!(column_exists(connection, "local_outbox", "state"));
assert!(column_exists(connection, "local_conflicts", "severity"));
assert!(column_exists(
connection,
diff --git a/crates/shared/sqlite/src/migrations.rs b/crates/shared/sqlite/src/migrations.rs
@@ -64,6 +64,10 @@ const MIGRATIONS: &[Migration] = &[
version: 15,
sql: include_str!("../migrations/0015_buyer_order_listing_identity.sql"),
},
+ Migration {
+ version: 16,
+ sql: include_str!("../migrations/0016_deterministic_outbox.sql"),
+ },
];
pub fn latest_schema_version() -> u32 {
diff --git a/crates/shared/sqlite/src/sync.rs b/crates/shared/sqlite/src/sync.rs
@@ -1,8 +1,8 @@
use radroots_app_models::{FarmId, FulfillmentWindowId, OrderId, ProductId};
use radroots_app_sync::{
- PendingSyncOperation, SyncAggregateRef, SyncCheckpointState, SyncCheckpointStatus,
- SyncConflict, SyncConflictKind, SyncConflictResolutionStatus, SyncConflictSeverity,
- SyncOperationKind,
+ PendingSyncOperation, PendingSyncOperationState, SyncAggregateRef, SyncCheckpointState,
+ SyncCheckpointStatus, SyncConflict, SyncConflictKind, SyncConflictResolutionStatus,
+ SyncConflictSeverity, SyncOperationKind,
};
use rusqlite::{Connection, OptionalExtension, params};
use uuid::Uuid;
@@ -42,24 +42,42 @@ impl<'a> AppSyncRepository<'a> {
"INSERT INTO local_outbox (
id,
account_id,
+ operation_key,
aggregate_kind,
aggregate_id,
operation_kind,
payload_json,
created_at,
available_at,
- attempt_count
- ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
+ attempt_count,
+ state,
+ last_error_message
+ ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
+ ON CONFLICT(account_id, operation_key)
+ WHERE state IN ('pending', 'in_progress', 'failed', 'blocked', 'retryable')
+ DO UPDATE SET
+ aggregate_kind = excluded.aggregate_kind,
+ aggregate_id = excluded.aggregate_id,
+ operation_kind = excluded.operation_kind,
+ payload_json = excluded.payload_json,
+ created_at = excluded.created_at,
+ available_at = excluded.available_at,
+ attempt_count = 0,
+ state = 'pending',
+ last_error_message = NULL",
params![
operation_id,
account_id,
+ operation.operation_key.as_str(),
operation.aggregate.aggregate_kind(),
aggregate_id_value(&operation.aggregate),
operation.operation.storage_key(),
- operation.payload_json,
- operation.created_at,
- operation.available_at,
+ operation.payload_json.as_str(),
+ operation.created_at.as_str(),
+ operation.available_at.as_str(),
i64::from(operation.attempt_count),
+ operation.state.storage_key(),
+ operation.last_error_message.as_deref(),
],
)
.map_err(|source| AppSqliteError::Query {
@@ -67,7 +85,21 @@ impl<'a> AppSyncRepository<'a> {
source,
})?;
- Ok(operation_id)
+ self.connection
+ .query_row(
+ "SELECT id
+ FROM local_outbox
+ WHERE account_id = ?1
+ AND operation_key = ?2
+ AND state IN ('pending', 'in_progress', 'failed', 'blocked', 'retryable')
+ LIMIT 1",
+ params![account_id, operation.operation_key.as_str()],
+ |row| row.get::<_, String>(0),
+ )
+ .map_err(|source| AppSqliteError::Query {
+ operation: "load pending sync operation id after enqueue",
+ source,
+ })
}
pub fn load_pending_operations(
@@ -79,15 +111,19 @@ impl<'a> AppSyncRepository<'a> {
.prepare(
"SELECT
id,
+ operation_key,
aggregate_kind,
aggregate_id,
operation_kind,
payload_json,
created_at,
available_at,
- attempt_count
+ attempt_count,
+ state,
+ last_error_message
FROM local_outbox
WHERE account_id = ?1
+ AND state IN ('pending', 'in_progress', 'failed', 'blocked', 'retryable')
ORDER BY available_at ASC, created_at ASC, id ASC",
)
.map_err(|source| AppSqliteError::Query {
@@ -104,7 +140,10 @@ impl<'a> AppSyncRepository<'a> {
row.get::<_, String>(4)?,
row.get::<_, String>(5)?,
row.get::<_, String>(6)?,
- row.get::<_, u32>(7)?,
+ row.get::<_, String>(7)?,
+ row.get::<_, u32>(8)?,
+ row.get::<_, String>(9)?,
+ row.get::<_, Option<String>>(10)?,
))
})
.map_err(|source| AppSqliteError::Query {
@@ -115,6 +154,7 @@ impl<'a> AppSyncRepository<'a> {
rows.map(|row| {
let (
operation_id,
+ operation_key,
aggregate_kind,
aggregate_id,
operation_kind,
@@ -122,6 +162,8 @@ impl<'a> AppSyncRepository<'a> {
created_at,
available_at,
attempt_count,
+ state,
+ last_error_message,
) = row.map_err(|source| AppSqliteError::Query {
operation: "read pending sync operation row",
source,
@@ -130,6 +172,7 @@ impl<'a> AppSyncRepository<'a> {
Ok(StoredPendingSyncOperation {
operation_id,
operation: PendingSyncOperation {
+ operation_key,
aggregate: parse_sync_aggregate_ref(
"local_outbox.aggregate_kind",
"local_outbox.aggregate_id",
@@ -141,6 +184,8 @@ impl<'a> AppSyncRepository<'a> {
created_at,
available_at,
attempt_count,
+ state: parse_pending_sync_operation_state(state)?,
+ last_error_message,
},
})
})
@@ -158,7 +203,9 @@ impl<'a> AppSyncRepository<'a> {
.connection
.execute(
"UPDATE local_outbox
- SET available_at = ?3, attempt_count = ?4
+ SET available_at = ?3,
+ attempt_count = ?4,
+ state = 'retryable'
WHERE account_id = ?1 AND id = ?2",
params![
account_id,
@@ -539,6 +586,23 @@ fn parse_sync_operation_kind(value: String) -> Result<SyncOperationKind, AppSqli
}
}
+fn parse_pending_sync_operation_state(
+ value: String,
+) -> Result<PendingSyncOperationState, AppSqliteError> {
+ match value.as_str() {
+ "pending" => Ok(PendingSyncOperationState::Pending),
+ "in_progress" => Ok(PendingSyncOperationState::InProgress),
+ "succeeded" => Ok(PendingSyncOperationState::Succeeded),
+ "failed" => Ok(PendingSyncOperationState::Failed),
+ "blocked" => Ok(PendingSyncOperationState::Blocked),
+ "retryable" => Ok(PendingSyncOperationState::Retryable),
+ _ => Err(AppSqliteError::DecodeEnum {
+ field: "local_outbox.state",
+ value,
+ }),
+ }
+}
+
fn parse_sync_conflict_kind(value: String) -> Result<SyncConflictKind, AppSqliteError> {
match value.as_str() {
"revision_mismatch" => Ok(SyncConflictKind::RevisionMismatch),
@@ -619,8 +683,9 @@ fn sync_conflict_resolution_status_value(resolution: SyncConflictResolutionStatu
mod tests {
use radroots_app_models::{FarmId, ProductId};
use radroots_app_sync::{
- PendingSyncOperation, SyncAggregateRef, SyncCheckpointStatus, SyncConflict,
- SyncConflictKind, SyncConflictResolutionStatus, SyncConflictSeverity, SyncOperationKind,
+ PendingSyncOperation, PendingSyncOperationState, SyncAggregateRef, SyncCheckpointStatus,
+ SyncConflict, SyncConflictKind, SyncConflictResolutionStatus, SyncConflictSeverity,
+ SyncOperationKind,
};
use crate::{AppSqliteStore, DatabaseTarget};
@@ -661,22 +726,18 @@ mod tests {
fn pending_operations_are_account_scoped_and_retryable() {
let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("store should open");
let repository = store.sync_repository();
- let first = PendingSyncOperation {
- aggregate: SyncAggregateRef::Farm(FarmId::new()),
- operation: SyncOperationKind::Upsert,
- payload_json: "{\"farm\":\"a\"}".to_owned(),
- created_at: "2026-04-20T18:00:00Z".to_owned(),
- available_at: "2026-04-20T18:00:00Z".to_owned(),
- attempt_count: 0,
- };
- let second = PendingSyncOperation {
- aggregate: SyncAggregateRef::Product(ProductId::new()),
- operation: SyncOperationKind::Delete,
- payload_json: "{\"product\":\"b\"}".to_owned(),
- created_at: "2026-04-20T18:05:00Z".to_owned(),
- available_at: "2026-04-20T18:05:00Z".to_owned(),
- attempt_count: 0,
- };
+ let first = PendingSyncOperation::new(
+ SyncAggregateRef::Farm(FarmId::new()),
+ SyncOperationKind::Upsert,
+ "{\"farm\":\"a\"}",
+ "2026-04-20T18:00:00Z",
+ );
+ let second = PendingSyncOperation::new(
+ SyncAggregateRef::Product(ProductId::new()),
+ SyncOperationKind::Delete,
+ "{\"product\":\"b\"}",
+ "2026-04-20T18:05:00Z",
+ );
let first_id = repository
.enqueue_pending_operation("acct_a", &first)
@@ -722,6 +783,10 @@ mod tests {
assert_eq!(acct_a[0].operation_id, first_id);
assert_eq!(acct_a[0].operation.attempt_count, 2);
assert_eq!(
+ acct_a[0].operation.state,
+ PendingSyncOperationState::Retryable
+ );
+ assert_eq!(
acct_a[0].operation.available_at,
"2026-04-20T18:10:00Z".to_owned()
);
@@ -729,6 +794,54 @@ mod tests {
}
#[test]
+ fn outbox_enqueue_upserts_active_operation_by_deterministic_key() {
+ let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("store should open");
+ let repository = store.sync_repository();
+ let product_id = ProductId::new();
+ let first = PendingSyncOperation::new(
+ SyncAggregateRef::Product(product_id),
+ SyncOperationKind::Upsert,
+ "{\"title\":\"greens\"}",
+ "2026-04-20T18:00:00Z",
+ );
+ let mut replacement = PendingSyncOperation::new(
+ SyncAggregateRef::Product(product_id),
+ SyncOperationKind::Upsert,
+ "{\"title\":\"winter greens\"}",
+ "2026-04-20T18:05:00Z",
+ );
+ replacement.attempt_count = 3;
+ replacement.state = PendingSyncOperationState::Failed;
+ replacement.last_error_message = Some("stale relay state".to_owned());
+
+ let first_id = repository
+ .enqueue_pending_operation("acct_a", &first)
+ .expect("first operation should save");
+ let replacement_id = repository
+ .enqueue_pending_operation("acct_a", &replacement)
+ .expect("replacement operation should upsert");
+
+ let pending = repository
+ .load_pending_operations("acct_a")
+ .expect("pending operations should load");
+
+ assert_eq!(replacement_id, first_id);
+ assert_eq!(pending.len(), 1);
+ assert_eq!(pending[0].operation_id, first_id);
+ assert_eq!(pending[0].operation.operation_key, first.operation_key);
+ assert_eq!(
+ pending[0].operation.payload_json,
+ "{\"title\":\"winter greens\"}"
+ );
+ assert_eq!(pending[0].operation.attempt_count, 0);
+ assert_eq!(
+ pending[0].operation.state,
+ PendingSyncOperationState::Pending
+ );
+ assert_eq!(pending[0].operation.last_error_message, None);
+ }
+
+ #[test]
fn conflicts_are_account_scoped_and_resolvable() {
let store = AppSqliteStore::open(DatabaseTarget::InMemory).expect("store should open");
let repository = store.sync_repository();
diff --git a/crates/shared/sync/src/lib.rs b/crates/shared/sync/src/lib.rs
@@ -35,6 +35,15 @@ impl SyncAggregateRef {
Self::Order(_) => "order",
}
}
+
+ pub fn aggregate_id(&self) -> String {
+ match self {
+ Self::Farm(farm_id) => farm_id.to_string(),
+ Self::FulfillmentWindow(fulfillment_window_id) => fulfillment_window_id.to_string(),
+ Self::Product(product_id) => product_id.to_string(),
+ Self::Order(order_id) => order_id.to_string(),
+ }
+ }
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
@@ -63,17 +72,81 @@ impl SyncOperationKind {
}
}
+#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
+#[serde(rename_all = "snake_case")]
+pub enum PendingSyncOperationState {
+ Pending,
+ InProgress,
+ Succeeded,
+ Failed,
+ Blocked,
+ Retryable,
+}
+
+impl PendingSyncOperationState {
+ pub const fn storage_key(self) -> &'static str {
+ match self {
+ Self::Pending => "pending",
+ Self::InProgress => "in_progress",
+ Self::Succeeded => "succeeded",
+ Self::Failed => "failed",
+ Self::Blocked => "blocked",
+ Self::Retryable => "retryable",
+ }
+ }
+
+ pub const fn is_active(self) -> bool {
+ !matches!(self, Self::Succeeded)
+ }
+}
+
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct PendingSyncOperation {
+ pub operation_key: String,
pub aggregate: SyncAggregateRef,
pub operation: SyncOperationKind,
pub payload_json: String,
pub created_at: String,
pub available_at: String,
pub attempt_count: u32,
+ pub state: PendingSyncOperationState,
+ pub last_error_message: Option<String>,
}
impl PendingSyncOperation {
+ pub fn new(
+ aggregate: SyncAggregateRef,
+ operation: SyncOperationKind,
+ payload_json: impl Into<String>,
+ created_at: impl Into<String>,
+ ) -> Self {
+ let operation_key = Self::deterministic_operation_key(&aggregate, operation);
+ let created_at = created_at.into();
+ Self {
+ operation_key,
+ aggregate,
+ operation,
+ payload_json: payload_json.into(),
+ created_at: created_at.clone(),
+ available_at: created_at,
+ attempt_count: 0,
+ state: PendingSyncOperationState::Pending,
+ last_error_message: None,
+ }
+ }
+
+ pub fn deterministic_operation_key(
+ aggregate: &SyncAggregateRef,
+ operation: SyncOperationKind,
+ ) -> String {
+ format!(
+ "{}:{}:{}",
+ aggregate.aggregate_kind(),
+ aggregate.aggregate_id(),
+ operation.storage_key()
+ )
+ }
+
pub const fn is_retry(&self) -> bool {
self.attempt_count > 0
}
@@ -469,14 +542,13 @@ mod tests {
#[test]
fn request_and_result_surface_conflict_status_through_typed_contracts() {
- let pending_operation = PendingSyncOperation {
- aggregate: SyncAggregateRef::Product(ProductId::new()),
- operation: SyncOperationKind::Upsert,
- payload_json: "{\"title\":\"greens\"}".to_owned(),
- created_at: "2026-04-17T19:32:00Z".to_owned(),
- available_at: "2026-04-17T19:32:00Z".to_owned(),
- attempt_count: 1,
- };
+ let mut pending_operation = PendingSyncOperation::new(
+ SyncAggregateRef::Product(ProductId::new()),
+ SyncOperationKind::Upsert,
+ "{\"title\":\"greens\"}",
+ "2026-04-17T19:32:00Z",
+ );
+ pending_operation.attempt_count = 1;
let conflict = SyncConflict {
aggregate: SyncAggregateRef::Product(ProductId::new()),
kind: SyncConflictKind::RevisionMismatch,
diff --git a/crates/shared/sync/src/publish.rs b/crates/shared/sync/src/publish.rs
@@ -5,7 +5,7 @@ use radroots_sdk::SdkTransportMode;
use serde::{Deserialize, Serialize};
use thiserror::Error;
-use crate::{PendingSyncOperation, SyncAggregateRef, SyncOperationKind};
+use crate::{PendingSyncOperation, PendingSyncOperationState, SyncAggregateRef, SyncOperationKind};
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
@@ -313,13 +313,18 @@ impl PendingSyncOperation {
created_at: impl Into<String>,
) -> Result<Self, AppPublishPayloadJsonError> {
let created_at = created_at.into();
+ let aggregate = payload.aggregate_ref();
+ let operation = payload.operation_kind();
Ok(Self {
- aggregate: payload.aggregate_ref(),
- operation: payload.operation_kind(),
+ operation_key: PendingSyncOperation::deterministic_operation_key(&aggregate, operation),
+ aggregate,
+ operation,
payload_json: payload.to_payload_json()?,
created_at: created_at.clone(),
available_at: created_at,
attempt_count: 0,
+ state: PendingSyncOperationState::Pending,
+ last_error_message: None,
})
}
@@ -339,7 +344,9 @@ mod tests {
AppOrderRequestPublishPayload, AppPublishContext, AppPublishPayload,
AppPublishValidationFailure,
};
- use crate::{PendingSyncOperation, SyncAggregateRef, SyncOperationKind};
+ use crate::{
+ PendingSyncOperation, PendingSyncOperationState, SyncAggregateRef, SyncOperationKind,
+ };
use radroots_app_models::{FarmId, FarmReadiness, OrderId, ProductId, ProductStatus};
#[test]
@@ -369,7 +376,10 @@ mod tests {
.expect("typed publish payload should serialize");
assert_eq!(operation.aggregate, SyncAggregateRef::Farm(farm_id));
+ assert_eq!(operation.operation_key, format!("farm:{farm_id}:upsert"));
assert_eq!(operation.operation, SyncOperationKind::Upsert);
+ assert_eq!(operation.state, PendingSyncOperationState::Pending);
+ assert_eq!(operation.last_error_message, None);
assert_eq!(operation.created_at, operation.available_at);
assert_eq!(
operation.publish_payload().expect("payload should parse"),
@@ -460,12 +470,15 @@ mod tests {
#[test]
fn existing_raw_payload_outbox_work_remains_local_save_compatible() {
let pending_operation = PendingSyncOperation {
+ operation_key: "product:greens:upsert".to_owned(),
aggregate: SyncAggregateRef::Product(ProductId::new()),
operation: SyncOperationKind::Upsert,
payload_json: "{\"title\":\"greens\"}".to_owned(),
created_at: "2026-04-17T19:32:00Z".to_owned(),
available_at: "2026-04-17T19:32:00Z".to_owned(),
attempt_count: 0,
+ state: PendingSyncOperationState::Pending,
+ last_error_message: None,
};
assert!(!pending_operation.is_retry());