lib

Core libraries for Radroots
git clone https://radroots.dev/git/lib.git
Log | Files | Refs | README | LICENSE

commit 964c52cf8986f85d34798961ba43ef5377c3ae45
parent 6867a6e95211c67dd03349a5723bf17cb0e33fcf
Author: triesap <tyson@radroots.org>
Date:   Sat, 13 Jun 2026 13:32:27 -0700

relay_transport: treat accepted quorum as published

Diffstat:
Mcrates/relay_transport/src/outbox.rs | 28+++++++++++++++++++++++++++-
Mcrates/relay_transport/tests/transport.rs | 194+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 221 insertions(+), 1 deletion(-)

diff --git a/crates/relay_transport/src/outbox.rs b/crates/relay_transport/src/outbox.rs @@ -78,7 +78,6 @@ where ) .await?; let publishable = publishable_relays(outbox, claimed, policy.republish_accepted_relays).await?; - let targets = RadrootsRelayTargetSet::new(publishable.relays, policy.relay_url_policy)?; let overall_quorum = policy .accepted_quorum .unwrap_or(publishable.total_target_count); @@ -90,6 +89,33 @@ where now_ms, ) .await?; + if publishable.accepted_count >= overall_quorum { + outbox + .complete_publish_attempt( + claimed.outbox_event_id, + claimed.claim_token.as_str(), + "relay publish incomplete", + "relay publish terminal", + policy.next_attempt_after_ms, + now_ms, + ) + .await?; + let publish = RadrootsRelayPublishReceipt { + event_id: signed_event.id, + attempted_count: 0, + accepted_count: publishable.accepted_count, + retryable_count: 0, + terminal_count: 0, + quorum: overall_quorum, + quorum_met: true, + relays: Vec::new(), + }; + return Ok(RadrootsOutboxPublishReceipt { + local_ingest, + publish, + }); + } + let targets = RadrootsRelayTargetSet::new(publishable.relays, policy.relay_url_policy)?; let quorum = overall_quorum.saturating_sub(publishable.accepted_count); let request = RadrootsRelayPublishRequest::new(signed_event.clone(), targets, now_ms) .with_accepted_quorum(quorum); diff --git a/crates/relay_transport/tests/transport.rs b/crates/relay_transport/tests/transport.rs @@ -459,6 +459,200 @@ async fn outbox_publish_persists_partial_success_and_skips_accepted_retry() { } #[tokio::test] +async fn outbox_publish_marks_published_without_adapter_when_all_relays_already_accepted() { + let signed = signed_post("already accepted"); + let outbox = RadrootsOutbox::open_memory().await.expect("outbox"); + let store = RadrootsEventStore::open_memory().await.expect("store"); + let draft = RadrootsFrozenEventDraft::new( + "radroots.social.post.v1", + KIND_POST, + signed.created_at, + signed.tags.clone(), + signed.content.clone(), + signed.pubkey.as_str(), + ) + .expect("draft"); + let receipt = outbox + .enqueue_operation(RadrootsOutboxOperationInput::new( + "publish_post", + draft, + vec![RELAY_PRIMARY_WSS.to_owned(), RELAY_SECONDARY_WSS.to_owned()], + 1_000, + )) + .await + .expect("enqueue"); + let claimed = outbox + .claim_next_ready_event("signer", "sign-a", 2_000, 1_000) + .await + .expect("claim") + .expect("claim"); + let signed = outbox + .sign_claimed_event(&claimed, &fixture_keys(), 1_100) + .await + .expect("sign"); + outbox.recover_expired_claims(2_001).await.expect("recover"); + let publish_claim = outbox + .claim_next_ready_event("publisher", "publish-a", 3_000, 2_100) + .await + .expect("claim") + .expect("publish claim"); + outbox + .mark_relay_accepted( + publish_claim.outbox_event_id, + publish_claim.claim_token.as_str(), + RELAY_PRIMARY_WSS, + 2_150, + ) + .await + .expect("primary accepted"); + outbox + .mark_relay_accepted( + publish_claim.outbox_event_id, + publish_claim.claim_token.as_str(), + RELAY_SECONDARY_WSS, + 2_151, + ) + .await + .expect("secondary accepted"); + + let adapter = RadrootsMockRelayPublishAdapter::new(); + let published = publish_claimed_outbox_event( + &outbox, + &store, + &adapter, + &publish_claim, + RadrootsOutboxPublishPolicy::new(2_500), + 2_200, + ) + .await + .expect("publish"); + + assert_eq!(published.local_ingest.event_id, signed.id); + assert_eq!(published.publish.event_id, signed.id); + assert_eq!(published.publish.attempted_count, 0); + assert_eq!(published.publish.accepted_count, 2); + assert_eq!(published.publish.quorum, 2); + assert!(published.publish.quorum_met); + assert!(published.publish.relays.is_empty()); + assert!(adapter.captured_raw_events().is_empty()); + + let event = outbox + .get_event(receipt.outbox_event_id) + .await + .expect("event") + .expect("event"); + assert_eq!(event.state, RadrootsOutboxEventState::Published); + assert_eq!(event.accepted_quorum, 2); + assert!(event.claim_token.is_none()); + let operation = outbox + .get_operation(receipt.operation_id) + .await + .expect("operation") + .expect("operation"); + assert_eq!(operation.status, RadrootsOutboxOperationStatus::Complete); +} + +#[tokio::test] +async fn outbox_publish_uses_persisted_accepted_count_for_explicit_quorum() { + let signed = signed_post("explicit quorum already accepted"); + let outbox = RadrootsOutbox::open_memory().await.expect("outbox"); + let store = RadrootsEventStore::open_memory().await.expect("store"); + let draft = RadrootsFrozenEventDraft::new( + "radroots.social.post.v1", + KIND_POST, + signed.created_at, + signed.tags.clone(), + signed.content.clone(), + signed.pubkey.as_str(), + ) + .expect("draft"); + let receipt = outbox + .enqueue_operation(RadrootsOutboxOperationInput::new( + "publish_post", + draft, + vec![ + RELAY_PRIMARY_WSS.to_owned(), + RELAY_SECONDARY_WSS.to_owned(), + RELAY_TERTIARY_WSS.to_owned(), + ], + 1_000, + )) + .await + .expect("enqueue"); + let claimed = outbox + .claim_next_ready_event("signer", "sign-a", 2_000, 1_000) + .await + .expect("claim") + .expect("claim"); + outbox + .sign_claimed_event(&claimed, &fixture_keys(), 1_100) + .await + .expect("sign"); + outbox.recover_expired_claims(2_001).await.expect("recover"); + let publish_claim = outbox + .claim_next_ready_event("publisher", "publish-a", 3_000, 2_100) + .await + .expect("claim") + .expect("publish claim"); + outbox + .mark_relay_accepted( + publish_claim.outbox_event_id, + publish_claim.claim_token.as_str(), + RELAY_PRIMARY_WSS, + 2_150, + ) + .await + .expect("primary accepted"); + outbox + .mark_relay_accepted( + publish_claim.outbox_event_id, + publish_claim.claim_token.as_str(), + RELAY_SECONDARY_WSS, + 2_151, + ) + .await + .expect("secondary accepted"); + + let adapter = RadrootsMockRelayPublishAdapter::new(); + let published = publish_claimed_outbox_event( + &outbox, + &store, + &adapter, + &publish_claim, + RadrootsOutboxPublishPolicy::new(2_500).with_accepted_quorum(2), + 2_200, + ) + .await + .expect("publish"); + + assert_eq!(published.publish.attempted_count, 0); + assert_eq!(published.publish.accepted_count, 2); + assert_eq!(published.publish.quorum, 2); + assert!(published.publish.quorum_met); + assert!(adapter.captured_raw_events().is_empty()); + + let event = outbox + .get_event(receipt.outbox_event_id) + .await + .expect("event") + .expect("event"); + assert_eq!(event.state, RadrootsOutboxEventState::Published); + assert_eq!(event.accepted_quorum, 2); + let statuses = outbox + .relay_statuses(receipt.outbox_event_id) + .await + .expect("statuses"); + assert_eq!( + statuses + .iter() + .find(|status| status.relay_url == RELAY_TERTIARY_WSS) + .expect("tertiary") + .status, + RadrootsOutboxRelayStatus::Pending + ); +} + +#[tokio::test] async fn outbox_publish_marks_published_when_policy_quorum_is_met_with_failure_diagnostics() { let signed = signed_post("quorum"); let outbox = RadrootsOutbox::open_memory().await.expect("outbox");