commit 0ab8382c26f3011c2239c7c9f226c49f96e73642
parent 4fc669d02c8bf516c53cf5ad436cdbed5a715f2f
Author: triesap <tyson@radroots.org>
Date: Tue, 23 Jun 2026 21:06:02 +0000
sync: scope proxy idempotency to attempts
- include outbox attempt count in radrootsd proxy idempotency keys
- add a two-request local proxy server for retry proofs
- verify retry attempts reuse the event with a new daemon key
- keep same-attempt request shape deterministic for daemon dedupe
Diffstat:
2 files changed, 250 insertions(+), 80 deletions(-)
diff --git a/crates/sdk/src/sync_runtime.rs b/crates/sdk/src/sync_runtime.rs
@@ -527,6 +527,7 @@ async fn push_proxy_claimed_outbox_event(
delivery_policy: proxy_delivery_policy(claimed.target_relays.len()),
idempotency_key: Some(proxy_outbox_idempotency_key(
claimed.outbox_event_id,
+ claimed.attempt_count,
signed_event.id.as_str(),
)),
timeout_ms: adapter.config().request_timeout_ms,
@@ -562,8 +563,12 @@ fn proxy_delivery_policy(target_count: usize) -> PublishDeliveryPolicy {
}
#[cfg(all(feature = "runtime", feature = "radrootsd-proxy"))]
-fn proxy_outbox_idempotency_key(outbox_event_id: i64, event_id: &str) -> String {
- format!("radroots-sdk-outbox-{outbox_event_id}-{event_id}")
+fn proxy_outbox_idempotency_key(
+ outbox_event_id: i64,
+ attempt_count: i64,
+ event_id: &str,
+) -> String {
+ format!("radroots-sdk-outbox-{outbox_event_id}-{attempt_count}-{event_id}")
}
#[cfg(all(feature = "runtime", feature = "radrootsd-proxy"))]
diff --git a/crates/sdk/tests/sync_runtime.rs b/crates/sdk/tests/sync_runtime.rs
@@ -35,7 +35,7 @@ use radroots_sdk::{SdkPublishTransport, adapters::radrootsd::RadrootsdProxyConfi
#[cfg(feature = "radrootsd-proxy")]
use std::io::{Read, Write};
#[cfg(feature = "radrootsd-proxy")]
-use std::net::TcpListener;
+use std::net::{TcpListener, TcpStream};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
#[cfg(feature = "radrootsd-proxy")]
@@ -68,6 +68,13 @@ struct RecordedProxyRequest {
body: String,
}
+#[cfg(feature = "radrootsd-proxy")]
+#[derive(Clone, Copy)]
+enum ProxyResponseMode {
+ Accepted,
+ Retryable,
+}
+
#[derive(Clone)]
struct RecordingPublishAdapter {
delay: Duration,
@@ -81,88 +88,148 @@ fn spawn_publish_proxy_server() -> (String, JoinHandle<RecordedProxyRequest>) {
let endpoint = format!("http://{}/rpc", listener.local_addr().expect("addr"));
let handle = std::thread::spawn(move || {
let (mut stream, _) = listener.accept().expect("accept");
- let mut request = Vec::new();
- let mut buffer = [0u8; 1024];
- loop {
- let read = stream.read(&mut buffer).expect("read request");
- if read == 0 {
- break;
- }
- request.extend_from_slice(&buffer[..read]);
- if request.windows(4).any(|window| window == b"\r\n\r\n") {
- let headers_end = request
- .windows(4)
- .position(|window| window == b"\r\n\r\n")
- .expect("headers end")
- + 4;
- let header_text = String::from_utf8_lossy(&request[..headers_end]);
- let content_length = header_text
- .lines()
- .find_map(|line| {
- let (name, value) = line.split_once(':')?;
- name.eq_ignore_ascii_case("content-length")
- .then(|| value.trim().parse::<usize>().expect("content length"))
- })
- .unwrap_or(0);
- while request.len() < headers_end + content_length {
- let read = stream.read(&mut buffer).expect("read body");
- if read == 0 {
- break;
- }
- request.extend_from_slice(&buffer[..read]);
+ let body = read_proxy_request_body(&mut stream);
+ write_proxy_response(&mut stream, body.as_str(), ProxyResponseMode::Accepted, 1);
+ RecordedProxyRequest { body }
+ });
+ (endpoint, handle)
+}
+
+#[cfg(feature = "radrootsd-proxy")]
+fn spawn_publish_proxy_sequence_server(
+ responses: Vec<ProxyResponseMode>,
+) -> (String, JoinHandle<Vec<RecordedProxyRequest>>) {
+ let listener = TcpListener::bind("127.0.0.1:0").expect("bind proxy server");
+ let endpoint = format!("http://{}/rpc", listener.local_addr().expect("addr"));
+ let handle = std::thread::spawn(move || {
+ responses
+ .into_iter()
+ .enumerate()
+ .map(|(index, mode)| {
+ let (mut stream, _) = listener.accept().expect("accept");
+ let body = read_proxy_request_body(&mut stream);
+ write_proxy_response(&mut stream, body.as_str(), mode, index + 1);
+ RecordedProxyRequest { body }
+ })
+ .collect()
+ });
+ (endpoint, handle)
+}
+
+#[cfg(feature = "radrootsd-proxy")]
+fn read_proxy_request_body(stream: &mut TcpStream) -> String {
+ let mut request = Vec::new();
+ let mut buffer = [0u8; 1024];
+ loop {
+ let read = stream.read(&mut buffer).expect("read request");
+ if read == 0 {
+ break;
+ }
+ request.extend_from_slice(&buffer[..read]);
+ if request.windows(4).any(|window| window == b"\r\n\r\n") {
+ let headers_end = request
+ .windows(4)
+ .position(|window| window == b"\r\n\r\n")
+ .expect("headers end")
+ + 4;
+ let header_text = String::from_utf8_lossy(&request[..headers_end]);
+ let content_length = header_text
+ .lines()
+ .find_map(|line| {
+ let (name, value) = line.split_once(':')?;
+ name.eq_ignore_ascii_case("content-length")
+ .then(|| value.trim().parse::<usize>().expect("content length"))
+ })
+ .unwrap_or(0);
+ while request.len() < headers_end + content_length {
+ let read = stream.read(&mut buffer).expect("read body");
+ if read == 0 {
+ break;
}
- break;
+ request.extend_from_slice(&buffer[..read]);
}
+ break;
}
- let request_text = String::from_utf8_lossy(&request);
- let (_, body) = request_text.split_once("\r\n\r\n").expect("request body");
- let body_json: serde_json::Value = serde_json::from_str(body).expect("body json");
- let event = &body_json["params"]["event"];
- let response_body = serde_json::json!({
- "jsonrpc": "2.0",
- "id": body_json["id"],
- "result": {
- "deduplicated": false,
- "job": {
- "job_id": "job-1",
- "status": "delivery_satisfied",
- "terminal": true,
- "delivery_satisfied": true,
- "event_id": event["id"],
- "pubkey": event["pubkey"],
- "event_kind": event["kind"],
- "relay_policy": body_json["params"]["relay_policy"],
- "delivery_policy": body_json["params"]["delivery_policy"],
- "relay_count": 1,
- "acknowledged_count": 1,
- "retryable_count": 0,
- "terminal_count": 0,
- "requested_at_ms": 1700000000000i64,
- "completed_at_ms": 1700000000100i64,
- "relays": [{
- "relay_url": "wss://daemon-resolved.example.com",
- "source": "daemon_default",
- "attempted": true,
- "outcome_kind": "accepted",
- "message": "accepted"
- }]
- }
+ }
+ let request_text = String::from_utf8_lossy(&request);
+ let (_, body) = request_text.split_once("\r\n\r\n").expect("request body");
+ body.to_owned()
+}
+
+#[cfg(feature = "radrootsd-proxy")]
+fn write_proxy_response(
+ stream: &mut TcpStream,
+ body: &str,
+ mode: ProxyResponseMode,
+ job_number: usize,
+) {
+ let body_json: serde_json::Value = serde_json::from_str(body).expect("body json");
+ let event = &body_json["params"]["event"];
+ let (status, terminal, delivery_satisfied, acknowledged_count, retryable_count, relay) =
+ match mode {
+ ProxyResponseMode::Accepted => (
+ "delivery_satisfied",
+ true,
+ true,
+ 1,
+ 0,
+ serde_json::json!({
+ "relay_url": "wss://daemon-resolved.example.com",
+ "source": "daemon_default",
+ "attempted": true,
+ "outcome_kind": "accepted",
+ "message": "accepted"
+ }),
+ ),
+ ProxyResponseMode::Retryable => (
+ "delivery_unsatisfied_retryable",
+ false,
+ false,
+ 0,
+ 1,
+ serde_json::json!({
+ "relay_url": "wss://daemon-resolved.example.com",
+ "source": "daemon_default",
+ "attempted": false,
+ "outcome_kind": "connection_failed",
+ "message": "dns lookup failed"
+ }),
+ ),
+ };
+ let response_body = serde_json::json!({
+ "jsonrpc": "2.0",
+ "id": body_json["id"],
+ "result": {
+ "deduplicated": false,
+ "job": {
+ "job_id": format!("job-{job_number}"),
+ "status": status,
+ "terminal": terminal,
+ "delivery_satisfied": delivery_satisfied,
+ "event_id": event["id"],
+ "pubkey": event["pubkey"],
+ "event_kind": event["kind"],
+ "relay_policy": body_json["params"]["relay_policy"],
+ "delivery_policy": body_json["params"]["delivery_policy"],
+ "relay_count": 1,
+ "acknowledged_count": acknowledged_count,
+ "retryable_count": retryable_count,
+ "terminal_count": 0,
+ "requested_at_ms": 1700000000000i64,
+ "completed_at_ms": 1700000000100i64,
+ "relays": [relay]
}
- })
- .to_string();
- let response = format!(
- "HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}",
- response_body.len(),
- response_body
- );
- stream
- .write_all(response.as_bytes())
- .expect("write response");
- RecordedProxyRequest {
- body: body.to_owned(),
}
- });
- (endpoint, handle)
+ })
+ .to_string();
+ let response = format!(
+ "HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}",
+ response_body.len(),
+ response_body
+ );
+ stream
+ .write_all(response.as_bytes())
+ .expect("write response");
}
impl RadrootsRelayPublishAdapter for TransportFailurePublishAdapter {
@@ -1361,6 +1428,104 @@ async fn product_push_outbox_uses_radrootsd_proxy_transport_with_daemon_resolved
assert_eq!(status.outbox.ready_signed_events, 0);
}
+#[cfg(feature = "radrootsd-proxy")]
+#[tokio::test]
+async fn product_push_outbox_radrootsd_proxy_idempotency_is_attempt_scoped() {
+ let (endpoint, handle) = spawn_publish_proxy_sequence_server(vec![
+ ProxyResponseMode::Retryable,
+ ProxyResponseMode::Accepted,
+ ]);
+ let tempdir = tempfile::tempdir().expect("tempdir");
+ let storage = tempdir.path().join("sdk");
+ let transport = SdkPublishTransport::RadrootsdProxy(RadrootsdProxyConfig::new(endpoint));
+ let sdk = RadrootsSdk::builder()
+ .directory_storage(storage.clone())
+ .fixed_clock(RadrootsSdkTimestamp::from_unix_seconds(1_700_000_000))
+ .publish_transport(transport.clone())
+ .build()
+ .await
+ .expect("sdk");
+
+ let enqueue = sdk
+ .listings()
+ .enqueue_publish(
+ ListingEnqueuePublishRequest::new(
+ actor(),
+ listing(LISTING_A_D_TAG, "Retry Coffee"),
+ SdkRelayTargetPolicy::use_publish_transport(),
+ ),
+ &FixtureSigner::new(SELLER),
+ )
+ .await
+ .expect("enqueue");
+
+ let first = sdk
+ .sync()
+ .push_outbox(
+ PushOutboxRequest::new()
+ .with_limit(1)
+ .with_next_attempt_delay_ms(1),
+ )
+ .await
+ .expect("first proxy push");
+
+ assert_eq!(first.attempted_events, 1);
+ assert_eq!(first.retryable_events, 1);
+ assert_eq!(first.events[0].outbox_event_id, enqueue.outbox_event_id);
+ assert_eq!(
+ first.events[0].final_state,
+ PushOutboxEventState::PublishRetryable
+ );
+
+ drop(sdk);
+ let sdk = RadrootsSdk::builder()
+ .directory_storage(storage)
+ .fixed_clock(RadrootsSdkTimestamp::from_unix_seconds(1_700_000_001))
+ .publish_transport(transport)
+ .build()
+ .await
+ .expect("reopened sdk");
+ let second = sdk
+ .sync()
+ .push_outbox(PushOutboxRequest::new().with_limit(1))
+ .await
+ .expect("second proxy push");
+
+ assert_eq!(second.attempted_events, 1);
+ assert_eq!(second.published_events, 1);
+ assert_eq!(second.events[0].outbox_event_id, enqueue.outbox_event_id);
+ assert_eq!(
+ second.events[0].final_state,
+ PushOutboxEventState::Published
+ );
+
+ let recorded = handle.join().expect("proxy requests");
+ assert_eq!(recorded.len(), 2);
+ let first_body: serde_json::Value =
+ serde_json::from_str(recorded[0].body.as_str()).expect("first body");
+ let second_body: serde_json::Value =
+ serde_json::from_str(recorded[1].body.as_str()).expect("second body");
+ let first_key = first_body["params"]["idempotency_key"]
+ .as_str()
+ .expect("first idempotency key");
+ let second_key = second_body["params"]["idempotency_key"]
+ .as_str()
+ .expect("second idempotency key");
+ assert_ne!(first_key, second_key);
+ assert_eq!(
+ first_body["params"]["event"]["id"],
+ second_body["params"]["event"]["id"]
+ );
+ assert!(
+ first_key
+ .starts_with(format!("radroots-sdk-outbox-{}-1-", enqueue.outbox_event_id).as_str())
+ );
+ assert!(
+ second_key
+ .starts_with(format!("radroots-sdk-outbox-{}-2-", enqueue.outbox_event_id).as_str())
+ );
+}
+
#[test]
fn push_outbox_contract_dtos_serialize_deterministically() {
let request = PushOutboxRequest::new()