commit 740a5278449c514b96a3bb19436cbbda8b5334ea
parent c50c875991d3193741f1b8e099ff5da2bc016ebc
Author: triesap <tyson@radroots.org>
Date: Mon, 25 May 2026 02:45:35 +0000
sync: keep partial publish results truthful
- return partial failed sync results after a successful publish prefix
- dequeue only acknowledged prefix operations from local outbox
- keep remaining work retryable with explicit error evidence
- cover direct transport and runtime apply behavior with focused tests
Diffstat:
3 files changed, 242 insertions(+), 34 deletions(-)
diff --git a/crates/launchers/desktop/src/runtime.rs b/crates/launchers/desktop/src/runtime.rs
@@ -52,8 +52,8 @@ use radroots_app_sync::{
AppFarmProfilePublishPayload, AppListingPublishPayload, AppOrderRequestItemPayload,
AppOrderRequestPublishPayload, AppPublishContext, AppPublishPayload,
AppPublishedOperationReceipt, AppSyncProjection, AppSyncRequest, AppSyncResult,
- AppSyncTransport, AppSyncTransportError, PendingSyncOperation, SyncAggregateRef,
- SyncCheckpointStatus, SyncConflictSeverity, SyncOperationKind, SyncTrigger,
+ AppSyncRunStatus, AppSyncTransport, AppSyncTransportError, PendingSyncOperation,
+ SyncAggregateRef, SyncCheckpointStatus, SyncConflictSeverity, SyncOperationKind, SyncTrigger,
};
use radroots_core::{
RadrootsCoreCurrency, RadrootsCoreDecimal, RadrootsCoreMoney, RadrootsCoreQuantity,
@@ -158,6 +158,7 @@ impl SdkDirectRelayAppSyncTransport {
&self,
request: AppSyncRequest,
) -> Result<AppSyncResult, AppSyncTransportError> {
+ let run_started_at = current_utc_timestamp();
let identity = self
.accounts_manager
.default_signing_identity()
@@ -172,34 +173,20 @@ impl SdkDirectRelayAppSyncTransport {
let mut published_receipts = Vec::new();
for operation in &request.pending_operations {
- if operation.operation != SyncOperationKind::Upsert {
- return Err(AppSyncTransportError::failed(
- "direct relay app sync supports upsert publish work only",
- ));
+ match publish_pending_sync_operation(&client, &identity, operation, &relay_urls) {
+ Ok(receipt) => published_receipts.push(receipt),
+ Err(error) => {
+ if published_receipts.is_empty() {
+ return Err(error);
+ }
+ return Ok(partial_failed_sync_result(
+ &request,
+ published_receipts,
+ run_started_at,
+ error,
+ ));
+ }
}
- let publish_payload = operation.publish_payload().map_err(|error| {
- AppSyncTransportError::failed(format!(
- "pending app sync operation is not a typed publish payload: {error}"
- ))
- })?;
- publish_payload.validate().map_err(|error| {
- let reason_codes = error
- .reason_codes
- .into_iter()
- .map(|reason| reason.storage_key())
- .collect::<Vec<_>>()
- .join(",");
- AppSyncTransportError::failed(format!(
- "pending app publish work is blocked: {reason_codes}"
- ))
- })?;
- let receipt =
- publish_app_payload_sync(&client, &identity, &publish_payload, &relay_urls)?;
- published_receipts.push(published_operation_receipt(
- operation.operation_key.as_str(),
- &publish_payload,
- receipt,
- )?);
}
Ok(AppSyncResult {
@@ -217,6 +204,58 @@ impl SdkDirectRelayAppSyncTransport {
}
}
+fn publish_pending_sync_operation(
+ client: &RadrootsSdkClient,
+ identity: &RadrootsIdentity,
+ operation: &PendingSyncOperation,
+ relay_urls: &[String],
+) -> Result<AppPublishedOperationReceipt, AppSyncTransportError> {
+ if operation.operation != SyncOperationKind::Upsert {
+ return Err(AppSyncTransportError::failed(
+ "direct relay app sync supports upsert publish work only",
+ ));
+ }
+ let publish_payload = operation.publish_payload().map_err(|error| {
+ AppSyncTransportError::failed(format!(
+ "pending app sync operation is not a typed publish payload: {error}"
+ ))
+ })?;
+ publish_payload.validate().map_err(|error| {
+ let reason_codes = error
+ .reason_codes
+ .into_iter()
+ .map(|reason| reason.storage_key())
+ .collect::<Vec<_>>()
+ .join(",");
+ AppSyncTransportError::failed(format!(
+ "pending app publish work is blocked: {reason_codes}"
+ ))
+ })?;
+ let receipt = publish_app_payload_sync(client, identity, &publish_payload, relay_urls)?;
+ published_operation_receipt(operation.operation_key.as_str(), &publish_payload, receipt)
+}
+
+fn partial_failed_sync_result(
+ request: &AppSyncRequest,
+ published_receipts: Vec<AppPublishedOperationReceipt>,
+ run_started_at: String,
+ error: AppSyncTransportError,
+) -> AppSyncResult {
+ AppSyncResult {
+ run_status: radroots_app_sync::AppSyncRunStatus::Failed,
+ checkpoint: SyncCheckpointStatus::failed(
+ Some(run_started_at),
+ Some(current_utc_timestamp()),
+ request.checkpoint.last_remote_cursor.clone(),
+ error.to_string(),
+ ),
+ pushed_operation_count: published_receipts.len(),
+ pulled_record_count: 0,
+ conflicts: request.known_conflicts.clone(),
+ published_receipts,
+ }
+}
+
impl AppSyncTransport for SdkDirectRelayAppSyncTransport {
fn sync(&mut self, request: AppSyncRequest) -> Result<AppSyncResult, AppSyncTransportError> {
self.sync_with_sdk(request)
@@ -3264,6 +3303,26 @@ impl DesktopAppRuntimeState {
let _ = sqlite_store
.dequeue_pending_sync_operation(account_id, pending.operation_id.as_str())?;
}
+ if result.run_status == AppSyncRunStatus::Failed {
+ let retry_available_at = result
+ .checkpoint
+ .last_sync_completed_at
+ .clone()
+ .unwrap_or_else(current_utc_timestamp);
+ let last_error_message = result.checkpoint.last_error_message.as_deref();
+ for pending in pending_operations
+ .iter()
+ .skip(result.pushed_operation_count)
+ {
+ let _ = sqlite_store.update_pending_sync_operation_retry(
+ account_id,
+ pending.operation_id.as_str(),
+ retry_available_at.as_str(),
+ pending.operation.attempt_count.saturating_add(1),
+ last_error_message,
+ )?;
+ }
+ }
}
self.refresh_selected_account_sync()
@@ -3277,11 +3336,12 @@ impl DesktopAppRuntimeState {
started_at: &str,
error: AppSyncTransportError,
) -> Result<bool, AppSqliteError> {
+ let error_message = error.to_string();
let failed_checkpoint = SyncCheckpointStatus::failed(
Some(started_at.to_owned()),
previous_checkpoint.last_sync_completed_at.clone(),
previous_checkpoint.last_remote_cursor.clone(),
- error.to_string(),
+ error_message.clone(),
);
{
let Some(sqlite_store) = self.sqlite_store.as_ref() else {
@@ -3295,6 +3355,7 @@ impl DesktopAppRuntimeState {
pending.operation_id.as_str(),
started_at,
pending.operation.attempt_count.saturating_add(1),
+ Some(error_message.as_str()),
)?;
}
}
@@ -7146,6 +7207,54 @@ mod tests {
}
#[test]
+ fn runtime_direct_relay_transport_returns_partial_failure_after_successful_prefix() {
+ let relay = ThreadedAckRelay::spawn();
+ let manager = RadrootsNostrAccountsManager::new_in_memory();
+ let account_id = manager
+ .generate_identity(Some("Farmer".to_owned()), true)
+ .expect("local signing account should generate");
+ let farm_id = FarmId::new();
+ let payload = AppPublishPayload::FarmProfile(AppFarmProfilePublishPayload {
+ context: AppPublishContext::new(account_id.to_string(), "farm_setup"),
+ farm_id,
+ display_name: "North field farm".to_owned(),
+ readiness: Some(FarmReadiness::Ready),
+ });
+ let successful_operation =
+ PendingSyncOperation::from_publish_payload(payload, "2026-05-24T12:00:00Z")
+ .expect("typed farm publish work should serialize");
+ let unsupported_operation = PendingSyncOperation::new(
+ SyncAggregateRef::Product(ProductId::new()),
+ SyncOperationKind::Delete,
+ "{}",
+ "2026-05-24T12:01:00Z",
+ );
+ let mut transport =
+ SdkDirectRelayAppSyncTransport::with_relay_urls(manager, vec![relay.url().to_owned()]);
+
+ let result = transport
+ .sync(AppSyncRequest {
+ trigger: SyncTrigger::ManualRefresh,
+ checkpoint: SyncCheckpointStatus::never_synced(),
+ pending_operations: vec![successful_operation, unsupported_operation],
+ known_conflicts: Vec::new(),
+ })
+ .expect("successful prefix should return a partial result");
+
+ assert_eq!(result.run_status, AppSyncRunStatus::Failed);
+ assert_eq!(result.pushed_operation_count, 1);
+ assert_eq!(result.published_receipts.len(), 1);
+ assert_eq!(result.checkpoint.state, SyncCheckpointState::Failed);
+ assert!(
+ result
+ .checkpoint
+ .last_error_message
+ .as_deref()
+ .is_some_and(|message| message.contains("supports upsert"))
+ );
+ }
+
+ #[test]
fn runtime_direct_relay_transport_normalizes_configured_relay_set() {
let relay_urls = super::normalized_app_sync_relay_urls(&[
" ws://127.0.0.1:8081 ".to_owned(),
@@ -7798,6 +7907,84 @@ mod tests {
}
#[test]
+ fn runtime_partial_sync_result_dequeues_successful_prefix_only() {
+ let runtime = memory_runtime();
+ let (account_id, farm_id) = provision_ready_farmer_account(&runtime);
+ let product_id = ProductId::new();
+ runtime
+ .lock_state_mut()
+ .enqueue_selected_account_sync_operations(vec![
+ pending_sync_upsert(
+ SyncAggregateRef::Farm(farm_id),
+ farm_sync_payload(
+ farm_id,
+ "North field farm",
+ Some(FarmReadiness::Ready),
+ "partial_sync_prefix",
+ ),
+ ),
+ pending_sync_upsert(SyncAggregateRef::Product(product_id), "{}".to_owned()),
+ ])
+ .expect("pending sync should enqueue");
+
+ let recorded = install_recorded_sync_transport(
+ &runtime,
+ RecordedAppSyncTransport::succeed(AppSyncResult {
+ run_status: AppSyncRunStatus::Failed,
+ checkpoint: SyncCheckpointStatus::failed(
+ Some("2026-04-20T19:45:00Z".to_owned()),
+ Some("2026-04-20T19:45:05Z".to_owned()),
+ Some("cursor-partial".to_owned()),
+ "relay refused second operation",
+ ),
+ pushed_operation_count: 1,
+ pulled_record_count: 0,
+ conflicts: Vec::new(),
+ published_receipts: Vec::new(),
+ }),
+ );
+
+ assert!(
+ runtime
+ .sync_on_app_launch()
+ .expect("partial launch sync should apply")
+ );
+
+ let summary = runtime.summary();
+ let pending_operations = runtime
+ .lock_state()
+ .sqlite_store
+ .as_ref()
+ .expect("sqlite store")
+ .load_pending_sync_operations(account_id.as_str())
+ .expect("pending operations should load");
+
+ assert_eq!(recorded.lock().expect("recorded transport").call_count(), 1);
+ assert_eq!(summary.sync_status.pending_write_count, 1);
+ assert_eq!(
+ summary.sync_status.projection.run_status,
+ AppSyncRunStatus::Failed
+ );
+ assert_eq!(pending_operations.len(), 1);
+ assert_eq!(
+ pending_operations[0].operation.aggregate,
+ SyncAggregateRef::Product(product_id)
+ );
+ assert_eq!(
+ pending_operations[0].operation.state,
+ PendingSyncOperationState::Retryable
+ );
+ assert_eq!(pending_operations[0].operation.attempt_count, 1);
+ assert_eq!(
+ pending_operations[0]
+ .operation
+ .last_error_message
+ .as_deref(),
+ Some("relay refused second operation")
+ );
+ }
+
+ #[test]
fn runtime_foreground_resume_sync_uses_the_resume_trigger() {
let runtime = memory_runtime();
let (_, _) = provision_ready_farmer_account(&runtime);
diff --git a/crates/shared/sqlite/src/lib.rs b/crates/shared/sqlite/src/lib.rs
@@ -559,12 +559,14 @@ impl AppSqliteStore {
operation_id: &str,
available_at: &str,
attempt_count: u32,
+ last_error_message: Option<&str>,
) -> Result<bool, AppSqliteError> {
self.sync_repository().update_pending_operation_retry(
account_id,
operation_id,
available_at,
attempt_count,
+ last_error_message,
)
}
diff --git a/crates/shared/sqlite/src/sync.rs b/crates/shared/sqlite/src/sync.rs
@@ -198,6 +198,7 @@ impl<'a> AppSyncRepository<'a> {
operation_id: &str,
available_at: &str,
attempt_count: u32,
+ last_error_message: Option<&str>,
) -> Result<bool, AppSqliteError> {
let updated = self
.connection
@@ -205,13 +206,15 @@ impl<'a> AppSyncRepository<'a> {
"UPDATE local_outbox
SET available_at = ?3,
attempt_count = ?4,
- state = 'retryable'
+ state = 'retryable',
+ last_error_message = ?5
WHERE account_id = ?1 AND id = ?2",
params![
account_id,
operation_id,
available_at,
- i64::from(attempt_count)
+ i64::from(attempt_count),
+ last_error_message
],
)
.map_err(|source| AppSqliteError::Query {
@@ -758,12 +761,24 @@ mod tests {
assert!(
repository
- .update_pending_operation_retry("acct_a", &first_id, "2026-04-20T18:10:00Z", 2,)
+ .update_pending_operation_retry(
+ "acct_a",
+ &first_id,
+ "2026-04-20T18:10:00Z",
+ 2,
+ Some("relay timeout"),
+ )
.expect("retry update should succeed")
);
assert!(
!repository
- .update_pending_operation_retry("acct_b", &first_id, "2026-04-20T18:10:00Z", 3,)
+ .update_pending_operation_retry(
+ "acct_b",
+ &first_id,
+ "2026-04-20T18:10:00Z",
+ 3,
+ Some("wrong account"),
+ )
.expect("wrong-account retry update should not succeed")
);
assert!(
@@ -790,6 +805,10 @@ mod tests {
acct_a[0].operation.available_at,
"2026-04-20T18:10:00Z".to_owned()
);
+ assert_eq!(
+ acct_a[0].operation.last_error_message.as_deref(),
+ Some("relay timeout")
+ );
assert_eq!(acct_b.len(), 1);
}