commit 4251f6ac71c3b9ee613b1fbbc8591978b9097df2
parent bca548659e8752709a2b9388cbca490fb0eb7584
Author: triesap <tyson@radroots.org>
Date: Wed, 17 Jun 2026 13:47:25 -0700
outbox: split relay digest and publish ordering
- preserve ordered unique target relays for claim and publish attempts
- sort and dedupe relays only for idempotency digest comparison
- keep reordered duplicate relay inputs idempotent for the same operation key
- validate with cargo fmt --all -- --check and cargo test -p radroots_outbox
Diffstat:
1 file changed, 65 insertions(+), 9 deletions(-)
diff --git a/crates/outbox/src/store.rs b/crates/outbox/src/store.rs
@@ -74,15 +74,16 @@ impl RadrootsOutbox {
&self,
input: RadrootsOutboxOperationInput,
) -> Result<RadrootsOutboxEnqueueReceipt, RadrootsOutboxError> {
- let target_relays = canonical_relays(input.target_relays);
+ let target_relays = ordered_unique_relays(input.target_relays);
if target_relays.is_empty() {
return Err(RadrootsOutboxError::EmptyTargetRelays);
}
+ let digest_relays = digest_relays(target_relays.as_slice());
let digest = idempotency_digest(
input.operation_kind.as_str(),
input.draft.expected_pubkey.as_str(),
&input.draft,
- &target_relays,
+ &digest_relays,
)?;
let accepted_quorum = target_relays.len() as i64;
let mut tx = self.pool.begin().await?;
@@ -171,15 +172,16 @@ impl RadrootsOutbox {
input: RadrootsOutboxSignedOperationInput,
) -> Result<RadrootsOutboxEnqueueReceipt, RadrootsOutboxError> {
validate_signed_nostr_event_matches_draft(&input.signed_event, &input.draft)?;
- let target_relays = canonical_relays(input.target_relays);
+ let target_relays = ordered_unique_relays(input.target_relays);
if target_relays.is_empty() {
return Err(RadrootsOutboxError::EmptyTargetRelays);
}
+ let digest_relays = digest_relays(target_relays.as_slice());
let digest = idempotency_digest(
input.operation_kind.as_str(),
input.draft.expected_pubkey.as_str(),
&input.draft,
- &target_relays,
+ &digest_relays,
)?;
let accepted_quorum = target_relays.len() as i64;
let mut tx = self.pool.begin().await?;
@@ -983,7 +985,7 @@ async fn relay_urls_for_tx(
outbox_event_id: i64,
) -> Result<Vec<String>, RadrootsOutboxError> {
let rows = sqlx::query(
- "SELECT relay_url FROM outbox_event_relay_status WHERE outbox_event_id = ? ORDER BY relay_url",
+ "SELECT relay_url FROM outbox_event_relay_status WHERE outbox_event_id = ? ORDER BY rowid",
)
.bind(outbox_event_id)
.fetch_all(&mut **tx)
@@ -998,7 +1000,7 @@ async fn relay_statuses_for(
outbox_event_id: i64,
) -> Result<Vec<RadrootsOutboxRelayStatusRecord>, RadrootsOutboxError> {
let rows = sqlx::query(
- "SELECT outbox_event_id, relay_url, status, attempt_count, last_attempt_at_ms, acknowledged_at_ms, last_error FROM outbox_event_relay_status WHERE outbox_event_id = ? ORDER BY relay_url",
+ "SELECT outbox_event_id, relay_url, status, attempt_count, last_attempt_at_ms, acknowledged_at_ms, last_error FROM outbox_event_relay_status WHERE outbox_event_id = ? ORDER BY rowid",
)
.bind(outbox_event_id)
.fetch_all(pool)
@@ -1084,7 +1086,7 @@ fn event_from_signed(signed_event: &RadrootsSignedNostrEvent) -> RadrootsNostrEv
}
}
-fn canonical_relays(relays: Vec<String>) -> Vec<String> {
+fn ordered_unique_relays(relays: Vec<String>) -> Vec<String> {
let mut out = Vec::new();
for relay in relays {
if !out.iter().any(|existing| existing == &relay) {
@@ -1094,6 +1096,13 @@ fn canonical_relays(relays: Vec<String>) -> Vec<String> {
out
}
+fn digest_relays(relays: &[String]) -> Vec<String> {
+ let mut out = relays.to_vec();
+ out.sort();
+ out.dedup();
+ out
+}
+
#[derive(Serialize)]
struct DigestInput<'a> {
operation_kind: &'a str,
@@ -1392,6 +1401,53 @@ mod tests {
}
#[tokio::test]
+ async fn enqueue_idempotency_digest_sorts_relays_but_publish_order_is_preserved() {
+ let outbox = RadrootsOutbox::open_memory().await.expect("open");
+ let draft = post_draft(hex_64('a').as_str(), "hello");
+ let first = outbox
+ .enqueue_operation(
+ RadrootsOutboxOperationInput::new(
+ "publish_post",
+ draft.clone(),
+ vec![
+ RELAY_SECONDARY_WSS.to_owned(),
+ RELAY_PRIMARY_WSS.to_owned(),
+ RELAY_SECONDARY_WSS.to_owned(),
+ ],
+ 1_000,
+ )
+ .with_idempotency_key("idem-relay-order"),
+ )
+ .await
+ .expect("first enqueue");
+ let second = outbox
+ .enqueue_operation(
+ RadrootsOutboxOperationInput::new(
+ "publish_post",
+ draft,
+ vec![RELAY_PRIMARY_WSS.to_owned(), RELAY_SECONDARY_WSS.to_owned()],
+ 1_001,
+ )
+ .with_idempotency_key("idem-relay-order"),
+ )
+ .await
+ .expect("second enqueue");
+
+ assert_eq!(second.status, RadrootsOutboxEnqueueStatus::Existing);
+ assert_eq!(first.idempotency_digest, second.idempotency_digest);
+
+ let claimed = outbox
+ .claim_next_ready_event("worker-a", "claim-a", 2_000, 1_000)
+ .await
+ .expect("claim")
+ .expect("claimed event");
+ assert_eq!(
+ claimed.target_relays,
+ vec![RELAY_SECONDARY_WSS.to_owned(), RELAY_PRIMARY_WSS.to_owned()]
+ );
+ }
+
+ #[tokio::test]
async fn enqueue_rejects_empty_target_relays_before_persistence() {
let outbox = RadrootsOutbox::open_memory().await.expect("open");
let draft = post_draft(hex_64('a').as_str(), "hello");
@@ -1530,7 +1586,7 @@ mod tests {
assert_eq!(claimed.signed_event, Some(signed_event));
assert_eq!(
claimed.target_relays,
- vec![RELAY_SECONDARY_WSS.to_owned(), RELAY_PRIMARY_WSS.to_owned()]
+ vec![RELAY_PRIMARY_WSS.to_owned(), RELAY_SECONDARY_WSS.to_owned()]
);
}
@@ -1683,7 +1739,7 @@ mod tests {
assert_eq!(claimed.state, RadrootsOutboxEventState::Signing);
assert_eq!(
claimed.target_relays,
- vec![RELAY_SECONDARY_WSS.to_owned(), RELAY_PRIMARY_WSS.to_owned()]
+ vec![RELAY_PRIMARY_WSS.to_owned(), RELAY_SECONDARY_WSS.to_owned()]
);
let unavailable = outbox