commit 933b2de1308ab748607682924f1cda22fee327fd
parent 391dfcc065bdf6b21bb28b9d90b54d7fb7402f2e
Author: triesap <tyson@radroots.org>
Date: Mon, 15 Jun 2026 17:04:49 -0700
sdk: harden sync push claim resilience
- Generate runtime push-outbox claim tokens with UUID v7 instead of timestamp and loop indexes.
- Add focused coverage that immediate claim token generation is collision-resistant.
- Prove adapter transport failures produce retryable event receipts without aborting the batch.
- Include the runtime-scoped uuid dependency and lockfile edge required by the SDK crate.
Diffstat:
4 files changed, 108 insertions(+), 3 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -2002,6 +2002,7 @@ dependencies = [
"tempfile",
"tokio",
"tokio-tungstenite",
+ "uuid",
]
[[package]]
diff --git a/crates/sdk/Cargo.toml b/crates/sdk/Cargo.toml
@@ -49,6 +49,7 @@ runtime = [
"dep:radroots_outbox",
"dep:radroots_relay_transport",
"dep:sha2",
+ "dep:uuid",
"radroots_authority/std",
"radroots_event_store/sqlite",
"radroots_event_store/runtime-tokio",
@@ -92,6 +93,7 @@ serde_json = { workspace = true, optional = true, default-features = false, feat
"alloc",
] }
sha2 = { workspace = true, optional = true }
+uuid = { workspace = true, optional = true }
[dev-dependencies]
futures = { workspace = true }
diff --git a/crates/sdk/src/sync_runtime.rs b/crates/sdk/src/sync_runtime.rs
@@ -221,8 +221,8 @@ impl<'sdk> SyncClient<'sdk> {
request.validate()?;
let now_ms = sdk_now_ms(self.sdk)?;
let mut receipt = PushOutboxReceipt::default();
- for index in 0..request.limit {
- let claim_token = format!("radroots-sdk-sync-{now_ms}-{index}");
+ for _ in 0..request.limit {
+ let claim_token = push_outbox_claim_token();
let Some(claimed) = self
.sdk
._outbox
@@ -276,6 +276,11 @@ fn sdk_now_ms(sdk: &crate::RadrootsSdk) -> Result<i64, RadrootsSdkError> {
}
#[cfg(feature = "runtime")]
+fn push_outbox_claim_token() -> String {
+ format!("radroots-sdk-sync-{}", uuid::Uuid::now_v7())
+}
+
+#[cfg(feature = "runtime")]
fn push_event_receipt(
outbox_event_id: i64,
final_state: PushOutboxEventState,
@@ -304,3 +309,19 @@ fn push_relay_receipt(relay: RadrootsRelayPublishRelayReceipt) -> PushOutboxRela
message: relay.outcome.message,
}
}
+
+#[cfg(all(test, feature = "runtime"))]
+mod tests {
+ use super::push_outbox_claim_token;
+ use std::collections::BTreeSet;
+
+ #[test]
+ fn push_outbox_claim_tokens_are_unique_under_immediate_generation() {
+ let mut tokens = BTreeSet::new();
+ for _ in 0..1_024 {
+ let token = push_outbox_claim_token();
+ assert!(token.starts_with("radroots-sdk-sync-"));
+ assert!(tokens.insert(token));
+ }
+ }
+}
diff --git a/crates/sdk/tests/sync_runtime.rs b/crates/sdk/tests/sync_runtime.rs
@@ -1,5 +1,6 @@
#![cfg(feature = "runtime")]
+use futures::future::BoxFuture;
use radroots_authority::{
RadrootsActorContext, RadrootsEventSigner, RadrootsSignerError, RadrootsSignerIdentity,
};
@@ -15,7 +16,10 @@ use radroots_events::{
listing::{RadrootsListing, RadrootsListingBin, RadrootsListingProduct},
};
use radroots_outbox::{RadrootsOutbox, RadrootsOutboxEventState, RadrootsOutboxOperationInput};
-use radroots_relay_transport::{RadrootsMockRelayPublishAdapter, RadrootsRelayOutcome};
+use radroots_relay_transport::{
+ RadrootsMockRelayPublishAdapter, RadrootsRelayOutcome, RadrootsRelayPublishAdapter,
+ RadrootsRelayPublishRelayReceipt, RadrootsRelayPublishRequest, RadrootsRelayTransportError,
+};
use radroots_sdk::{
ListingEnqueuePublishRequest, ListingPreparePublishRequest, PUSH_OUTBOX_DEFAULT_LIMIT,
PUSH_OUTBOX_MAX_LIMIT, PushOutboxEventState, PushOutboxRelayOutcomeKind, PushOutboxRequest,
@@ -36,6 +40,22 @@ struct FixtureSigner {
identity: RadrootsSignerIdentity,
}
+struct TransportFailurePublishAdapter;
+
+impl RadrootsRelayPublishAdapter for TransportFailurePublishAdapter {
+ fn publish<'a>(
+ &'a self,
+ _request: RadrootsRelayPublishRequest,
+ ) -> BoxFuture<'a, Result<Vec<RadrootsRelayPublishRelayReceipt>, RadrootsRelayTransportError>>
+ {
+ Box::pin(async {
+ Err(RadrootsRelayTransportError::Transport(
+ "adapter boundary unavailable".to_owned(),
+ ))
+ })
+ }
+}
+
impl FixtureSigner {
fn new(pubkey: &str) -> Self {
Self {
@@ -371,6 +391,67 @@ async fn push_outbox_preserves_retryable_and_terminal_relay_outcomes() {
}
#[tokio::test]
+async fn push_outbox_continues_after_adapter_transport_failure_and_releases_claims() {
+ let (_tempdir, sdk) = directory_sdk(&[RELAY_A, RELAY_B]).await;
+ let first_outbox_event_id =
+ enqueue_listing(&sdk, LISTING_A_D_TAG, "Coffee One", &[RELAY_A]).await;
+ let second_outbox_event_id =
+ enqueue_listing(&sdk, LISTING_B_D_TAG, "Coffee Two", &[RELAY_B]).await;
+
+ let receipt = sdk
+ .sync()
+ .push_outbox_with_adapter(
+ &TransportFailurePublishAdapter,
+ PushOutboxRequest::new().with_limit(2),
+ )
+ .await
+ .expect("push");
+
+ assert_eq!(receipt.attempted_events, 2);
+ assert_eq!(receipt.published_events, 0);
+ assert_eq!(receipt.retryable_events, 2);
+ assert_eq!(receipt.terminal_events, 0);
+ assert_eq!(
+ receipt
+ .events
+ .iter()
+ .map(|event| event.outbox_event_id)
+ .collect::<Vec<_>>(),
+ vec![first_outbox_event_id, second_outbox_event_id]
+ );
+ assert!(
+ receipt
+ .events
+ .iter()
+ .all(|event| event.final_state == PushOutboxEventState::PublishRetryable)
+ );
+ assert!(
+ receipt
+ .events
+ .iter()
+ .flat_map(|event| event.relays.iter())
+ .all(|relay| {
+ relay.attempted
+ && relay.outcome_kind == PushOutboxRelayOutcomeKind::ConnectionFailed
+ && relay.message.as_deref() == Some("adapter boundary unavailable")
+ })
+ );
+
+ let outbox = RadrootsOutbox::open_file(&sdk.storage_paths().expect("paths").outbox_path)
+ .await
+ .expect("outbox");
+ for outbox_event_id in [first_outbox_event_id, second_outbox_event_id] {
+ let stored = outbox
+ .get_event(outbox_event_id)
+ .await
+ .expect("stored")
+ .expect("stored");
+ assert_eq!(stored.state, RadrootsOutboxEventState::PublishRetryable);
+ assert!(stored.claim_token.is_none());
+ }
+}
+
+#[tokio::test]
async fn push_outbox_does_not_claim_unsigned_outbox_work() {
let (_tempdir, sdk) = directory_sdk(&[RELAY_A]).await;
let prepared = sdk