commit ecd53f39ab8f8cc01de3cadfb0027b29b6038c3a
parent 886223ac7672fbac7582d4dc353dfbb523ef64ae
Author: triesap <tyson@radroots.org>
Date: Mon, 15 Jun 2026 17:04:42 -0700
relay_transport: release retryable adapter transport failures
- Convert publish adapter transport errors into retryable relay receipts for claimed outbox events.
- Preserve storage, malformed event, and relay-target failures as transport errors.
- Add coverage that failed transport claims are released and retryable at the configured time.
- Keep relay outcome counts and connection-failed diagnostics structured for product callers.
Diffstat:
2 files changed, 158 insertions(+), 7 deletions(-)
diff --git a/crates/relay_transport/src/outbox.rs b/crates/relay_transport/src/outbox.rs
@@ -1,9 +1,10 @@
#![forbid(unsafe_code)]
use crate::{
- RadrootsRelayOutcomeKind, RadrootsRelayPublishAdapter, RadrootsRelayPublishReceipt,
- RadrootsRelayPublishRequest, RadrootsRelayTargetSet, RadrootsRelayTransportError,
- RadrootsRelayUrlPolicy, publish_signed_event,
+ RadrootsRelayOutcome, RadrootsRelayOutcomeKind, RadrootsRelayPublishAdapter,
+ RadrootsRelayPublishReceipt, RadrootsRelayPublishRelayReceipt, RadrootsRelayPublishRequest,
+ RadrootsRelayTargetSet, RadrootsRelayTransportError, RadrootsRelayUrlPolicy,
+ publish_signed_event,
};
use radroots_event_store::{
RadrootsEventIngest, RadrootsEventStore, RadrootsRelayObservation, RadrootsRelayObservationType,
@@ -116,10 +117,20 @@ where
});
}
let targets = RadrootsRelayTargetSet::new(publishable.relays, policy.relay_url_policy)?;
+ let target_strings = targets.relay_strings();
let quorum = overall_quorum.saturating_sub(publishable.accepted_count);
let request = RadrootsRelayPublishRequest::new(signed_event.clone(), targets, now_ms)
.with_accepted_quorum(quorum);
- let publish = publish_signed_event(adapter, request).await?;
+ let publish = match publish_signed_event(adapter, request).await {
+ Ok(receipt) => receipt,
+ Err(RadrootsRelayTransportError::Transport(message)) => adapter_transport_failure_receipt(
+ signed_event.id.clone(),
+ target_strings,
+ quorum,
+ message,
+ ),
+ Err(error) => return Err(error),
+ };
for relay in &publish.relays {
match relay.outcome.kind {
@@ -191,6 +202,33 @@ where
})
}
+fn adapter_transport_failure_receipt(
+ event_id: String,
+ relay_urls: Vec<String>,
+ quorum: usize,
+ message: String,
+) -> RadrootsRelayPublishReceipt {
+ let relays = relay_urls
+ .into_iter()
+ .map(|relay_url| {
+ RadrootsRelayPublishRelayReceipt::attempted(
+ relay_url,
+ RadrootsRelayOutcome::connection_failed(message.clone()),
+ )
+ })
+ .collect::<Vec<_>>();
+ RadrootsRelayPublishReceipt {
+ event_id,
+ attempted_count: relays.len(),
+ accepted_count: 0,
+ retryable_count: relays.len(),
+ terminal_count: 0,
+ quorum,
+ quorum_met: false,
+ relays,
+ }
+}
+
struct PublishableRelays {
relays: Vec<String>,
total_target_count: usize,
diff --git a/crates/relay_transport/tests/transport.rs b/crates/relay_transport/tests/transport.rs
@@ -1,3 +1,4 @@
+use futures::future::BoxFuture;
use nostr::JsonUtil;
use radroots_event_store::{RadrootsEventStore, RadrootsEventVerificationStatus};
use radroots_events::draft::{RadrootsFrozenEventDraft, RadrootsSignedNostrEvent};
@@ -13,9 +14,10 @@ use radroots_outbox::{
use radroots_relay_transport::{
RadrootsMockRelayFetchAdapter, RadrootsMockRelayPublishAdapter, RadrootsOutboxPublishPolicy,
RadrootsRelayFetchItem, RadrootsRelayFetchOutcomeKind, RadrootsRelayFetchRequest,
- RadrootsRelayOutcome, RadrootsRelayOutcomeKind, RadrootsRelayTargetSet, RadrootsRelayUrl,
- RadrootsRelayUrlPolicy, fetch_and_ingest_relay_events, publish_claimed_outbox_event,
- publish_signed_event,
+ RadrootsRelayOutcome, RadrootsRelayOutcomeKind, RadrootsRelayPublishAdapter,
+ RadrootsRelayPublishRelayReceipt, RadrootsRelayPublishRequest, RadrootsRelayTargetSet,
+ RadrootsRelayTransportError, RadrootsRelayUrl, RadrootsRelayUrlPolicy,
+ fetch_and_ingest_relay_events, publish_claimed_outbox_event, publish_signed_event,
};
const FIXTURE_ALICE_SECRET_KEY_HEX: &str =
@@ -26,6 +28,22 @@ const RELAY_PRIMARY_WSS: &str = "wss://relay.example.com";
const RELAY_SECONDARY_WSS: &str = "wss://relay-2.example.com";
const RELAY_TERTIARY_WSS: &str = "wss://relay-3.example.com";
+struct TransportFailurePublishAdapter;
+
+impl RadrootsRelayPublishAdapter for TransportFailurePublishAdapter {
+ fn publish<'a>(
+ &'a self,
+ _request: RadrootsRelayPublishRequest,
+ ) -> BoxFuture<'a, Result<Vec<RadrootsRelayPublishRelayReceipt>, RadrootsRelayTransportError>>
+ {
+ Box::pin(async {
+ Err(RadrootsRelayTransportError::Transport(
+ "adapter boundary unavailable".to_owned(),
+ ))
+ })
+ }
+}
+
fn fixture_keys() -> RadrootsNostrKeys {
let secret_key =
RadrootsNostrSecretKey::from_hex(FIXTURE_ALICE_SECRET_KEY_HEX).expect("secret key");
@@ -548,6 +566,101 @@ async fn outbox_publish_persists_partial_success_and_skips_accepted_retry() {
}
#[tokio::test]
+async fn outbox_publish_transport_failure_releases_retryable_claim() {
+ let signed = signed_post("adapter transport failure");
+ let outbox = RadrootsOutbox::open_memory().await.expect("outbox");
+ let store = RadrootsEventStore::open_memory().await.expect("store");
+ let draft = RadrootsFrozenEventDraft::new(
+ "radroots.social.post.v1",
+ KIND_POST,
+ signed.created_at,
+ signed.tags.clone(),
+ signed.content.clone(),
+ signed.pubkey.as_str(),
+ )
+ .expect("draft");
+ let receipt = outbox
+ .enqueue_operation(RadrootsOutboxOperationInput::new(
+ "publish_post",
+ draft,
+ vec![RELAY_PRIMARY_WSS.to_owned(), RELAY_SECONDARY_WSS.to_owned()],
+ 1_000,
+ ))
+ .await
+ .expect("enqueue");
+ let claimed = outbox
+ .claim_next_ready_event("signer", "sign-a", 2_000, 1_000)
+ .await
+ .expect("claim")
+ .expect("claim");
+ complete_claimed_signing(&outbox, &claimed, 1_100).await;
+ outbox.recover_expired_claims(2_001).await.expect("recover");
+ let publish_claim = outbox
+ .claim_next_ready_event("publisher", "publish-a", 3_000, 2_100)
+ .await
+ .expect("claim")
+ .expect("publish claim");
+
+ let published = publish_claimed_outbox_event(
+ &outbox,
+ &store,
+ &TransportFailurePublishAdapter,
+ &publish_claim,
+ RadrootsOutboxPublishPolicy::new(2_500),
+ 2_200,
+ )
+ .await
+ .expect("publish");
+
+ assert_eq!(published.publish.attempted_count, 2);
+ assert_eq!(published.publish.accepted_count, 0);
+ assert_eq!(published.publish.retryable_count, 2);
+ assert_eq!(published.publish.terminal_count, 0);
+ assert!(!published.publish.quorum_met);
+ assert!(
+ published
+ .publish
+ .relays
+ .iter()
+ .all(|relay| relay.outcome.kind == RadrootsRelayOutcomeKind::ConnectionFailed)
+ );
+
+ let event = outbox
+ .get_event(receipt.outbox_event_id)
+ .await
+ .expect("event")
+ .expect("event");
+ assert_eq!(event.state, RadrootsOutboxEventState::PublishRetryable);
+ assert!(event.claim_token.is_none());
+ assert_eq!(event.next_attempt_after_ms, 2_500);
+
+ let statuses = outbox
+ .relay_statuses(receipt.outbox_event_id)
+ .await
+ .expect("statuses");
+ assert_eq!(statuses.len(), 2);
+ assert!(
+ statuses
+ .iter()
+ .all(|status| status.status == RadrootsOutboxRelayStatus::FailedRetryable)
+ );
+ assert!(
+ outbox
+ .claim_next_ready_event("publisher", "publish-b", 4_000, 2_499)
+ .await
+ .expect("early claim")
+ .is_none()
+ );
+ let retry_claim = outbox
+ .claim_next_ready_event("publisher", "publish-b", 4_000, 2_500)
+ .await
+ .expect("retry claim")
+ .expect("retry claim");
+ assert_eq!(retry_claim.outbox_event_id, receipt.outbox_event_id);
+ assert_eq!(retry_claim.state, RadrootsOutboxEventState::Publishing);
+}
+
+#[tokio::test]
async fn outbox_publish_marks_published_without_adapter_when_all_relays_already_accepted() {
let signed = signed_post("already accepted");
let outbox = RadrootsOutbox::open_memory().await.expect("outbox");