sdk

Radroots SDK and bindings
git clone https://radroots.dev/git/sdk.git
Log | Files | Refs | README

commit 47fc2e0501103b8eca9f0090550349a62944b548
parent 2a77aee91492d052e1b22aa009266cf223dbe7b8
Author: triesap <tyson@radroots.org>
Date:   Wed, 17 Jun 2026 14:03:43 -0700

sync: add SDK status and push policy contracts

- add sync status request/receipt DTOs over SDK canonical event-store and outbox summaries
- expose push relay URL policy, auth detection policy, claim TTL, and retry delay controls
- default push to public relay policy and require explicit localhost policy for ws targets
- compute publish timing per claim iteration and preserve ordered relay attempts in receipts
- validate with SDK fmt, feature checks, package tests, xtask, WASM generation, and pnpm lanes

Diffstat:
Mcrates/sdk/src/lib.rs | 7+++++--
Mcrates/sdk/src/relay_targets.rs | 2+-
Mcrates/sdk/src/sync_runtime.rs | 191+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------
Mcrates/sdk/tests/sync_runtime.rs | 291++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
4 files changed, 463 insertions(+), 28 deletions(-)

diff --git a/crates/sdk/src/lib.rs b/crates/sdk/src/lib.rs @@ -70,6 +70,9 @@ pub use crate::runtime::{ }; #[cfg(feature = "runtime")] pub use crate::sync_runtime::{ - PUSH_OUTBOX_DEFAULT_LIMIT, PUSH_OUTBOX_MAX_LIMIT, PushOutboxEventReceipt, PushOutboxEventState, - PushOutboxReceipt, PushOutboxRelayOutcomeKind, PushOutboxRelayReceipt, PushOutboxRequest, + PUSH_OUTBOX_DEFAULT_CLAIM_TTL_MS, PUSH_OUTBOX_DEFAULT_LIMIT, + PUSH_OUTBOX_DEFAULT_NEXT_ATTEMPT_DELAY_MS, PUSH_OUTBOX_MAX_LIMIT, PushOutboxEventReceipt, + PushOutboxEventState, PushOutboxReceipt, PushOutboxRelayOutcomeKind, PushOutboxRelayReceipt, + PushOutboxRequest, SdkRelayAuthPolicy, SyncEventStoreStatus, SyncOutboxStatus, + SyncRelayTargetSummary, SyncStatusReceipt, SyncStatusRequest, SyncStatusSource, }; diff --git a/crates/sdk/src/relay_targets.rs b/crates/sdk/src/relay_targets.rs @@ -14,7 +14,7 @@ pub enum SdkRelayUrlPolicy { } impl SdkRelayUrlPolicy { - fn relay_transport_policy(self) -> RadrootsRelayUrlPolicy { + pub(crate) fn relay_transport_policy(self) -> RadrootsRelayUrlPolicy { match self { Self::Public => RadrootsRelayUrlPolicy::Public, Self::Localhost => RadrootsRelayUrlPolicy::Localhost, diff --git a/crates/sdk/src/sync_runtime.rs b/crates/sdk/src/sync_runtime.rs @@ -1,37 +1,145 @@ #[cfg(feature = "runtime")] -use crate::{RadrootsSdkError, SyncClient, runtime::sdk_now_ms}; +use crate::{RadrootsSdkError, SdkRelayUrlPolicy, SyncClient, runtime::sdk_now_ms}; +#[cfg(feature = "runtime")] +use radroots_event_store::RadrootsEventStoreStatusSummary; #[cfg(feature = "runtime")] use radroots_events::ids::RadrootsEventId; #[cfg(all(feature = "runtime", feature = "relay-runtime"))] use radroots_nostr::prelude::RadrootsNostrClient; #[cfg(feature = "runtime")] -use radroots_outbox::RadrootsOutboxEventState; +use radroots_outbox::{RadrootsOutboxEventState, RadrootsOutboxStatusSummary}; #[cfg(all(feature = "runtime", feature = "relay-runtime"))] use radroots_relay_transport::RadrootsNostrClientPublishAdapter; #[cfg(feature = "runtime")] use radroots_relay_transport::{ RadrootsOutboxPublishPolicy, RadrootsRelayOutcomeKind, RadrootsRelayPublishAdapter, - RadrootsRelayPublishReceipt, RadrootsRelayPublishRelayReceipt, RadrootsRelayUrlPolicy, - publish_claimed_outbox_event, + RadrootsRelayPublishReceipt, RadrootsRelayPublishRelayReceipt, publish_claimed_outbox_event, }; #[cfg(feature = "runtime")] pub const PUSH_OUTBOX_DEFAULT_LIMIT: usize = 20; #[cfg(feature = "runtime")] pub const PUSH_OUTBOX_MAX_LIMIT: usize = 100; +#[cfg(feature = "runtime")] +pub const PUSH_OUTBOX_DEFAULT_CLAIM_TTL_MS: i64 = 30_000; +#[cfg(feature = "runtime")] +pub const PUSH_OUTBOX_DEFAULT_NEXT_ATTEMPT_DELAY_MS: i64 = 60_000; #[cfg(feature = "runtime")] const CLAIM_OWNER: &str = "radroots_sdk.sync.push_outbox"; + +#[cfg(feature = "runtime")] +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, serde::Serialize)] +pub struct SyncStatusRequest {} + +#[cfg(feature = "runtime")] +impl SyncStatusRequest { + pub fn new() -> Self { + Self::default() + } +} + +#[cfg(feature = "runtime")] +#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize)] +pub struct SyncStatusReceipt { + pub source: SyncStatusSource, + pub observed_at_ms: i64, + pub event_store: SyncEventStoreStatus, + pub outbox: SyncOutboxStatus, + pub relay_targets: SyncRelayTargetSummary, +} + +#[cfg(feature = "runtime")] +#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize)] +#[serde(rename_all = "snake_case")] +#[non_exhaustive] +pub enum SyncStatusSource { + SdkCanonicalStores, +} + +#[cfg(feature = "runtime")] +#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize)] +pub struct SyncEventStoreStatus { + pub total_events: i64, + pub projection_eligible_events: i64, + pub relay_observations: i64, + pub last_event_seq: Option<i64>, + pub last_event_updated_at_ms: Option<i64>, +} + +#[cfg(feature = "runtime")] +impl From<RadrootsEventStoreStatusSummary> for SyncEventStoreStatus { + fn from(summary: RadrootsEventStoreStatusSummary) -> Self { + Self { + total_events: summary.total_events, + projection_eligible_events: summary.projection_eligible_events, + relay_observations: summary.relay_observations, + last_event_seq: summary.last_event_seq, + last_event_updated_at_ms: summary.last_event_updated_at_ms, + } + } +} + +#[cfg(feature = "runtime")] +#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize)] +pub struct SyncOutboxStatus { + pub total_events: i64, + pub pending_events: i64, + pub retryable_events: i64, + pub terminal_events: i64, + pub ready_signed_events: i64, + pub publishing_events: i64, + pub last_attempt_at_ms: Option<i64>, + pub last_error: Option<String>, +} + +#[cfg(feature = "runtime")] +impl From<RadrootsOutboxStatusSummary> for SyncOutboxStatus { + fn from(summary: RadrootsOutboxStatusSummary) -> Self { + Self { + total_events: summary.total_events, + pending_events: summary.pending_events, + retryable_events: summary.retryable_events, + terminal_events: summary.terminal_events, + ready_signed_events: summary.ready_signed_events, + publishing_events: summary.publishing_events, + last_attempt_at_ms: summary.last_attempt_at_ms, + last_error: summary.last_error, + } + } +} + #[cfg(feature = "runtime")] -const CLAIM_TTL_MS: i64 = 30_000; +#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize)] +pub struct SyncRelayTargetSummary { + pub configured_count: usize, + pub configured_relays: Vec<String>, +} + #[cfg(feature = "runtime")] -const NEXT_ATTEMPT_DELAY_MS: i64 = 60_000; +#[derive(Clone, Copy, Debug, PartialEq, Eq, serde::Serialize)] +#[serde(rename_all = "snake_case")] +#[non_exhaustive] +pub enum SdkRelayAuthPolicy { + DetectOnly, +} + +#[cfg(feature = "runtime")] +impl Default for SdkRelayAuthPolicy { + fn default() -> Self { + Self::DetectOnly + } +} #[cfg(feature = "runtime")] #[derive(Clone, Debug, PartialEq, Eq, serde::Serialize)] pub struct PushOutboxRequest { pub limit: usize, pub republish_accepted_relays: bool, + pub relay_url_policy: SdkRelayUrlPolicy, + pub auth_policy: SdkRelayAuthPolicy, + pub claim_ttl_ms: i64, + pub next_attempt_delay_ms: i64, } #[cfg(feature = "runtime")] @@ -40,6 +148,10 @@ impl Default for PushOutboxRequest { Self { limit: PUSH_OUTBOX_DEFAULT_LIMIT, republish_accepted_relays: false, + relay_url_policy: SdkRelayUrlPolicy::Public, + auth_policy: SdkRelayAuthPolicy::DetectOnly, + claim_ttl_ms: PUSH_OUTBOX_DEFAULT_CLAIM_TTL_MS, + next_attempt_delay_ms: PUSH_OUTBOX_DEFAULT_NEXT_ATTEMPT_DELAY_MS, } } } @@ -60,12 +172,42 @@ impl PushOutboxRequest { self } + pub fn with_relay_url_policy(mut self, policy: SdkRelayUrlPolicy) -> Self { + self.relay_url_policy = policy; + self + } + + pub fn with_auth_policy(mut self, policy: SdkRelayAuthPolicy) -> Self { + self.auth_policy = policy; + self + } + + pub fn with_claim_ttl_ms(mut self, claim_ttl_ms: i64) -> Self { + self.claim_ttl_ms = claim_ttl_ms; + self + } + + pub fn with_next_attempt_delay_ms(mut self, next_attempt_delay_ms: i64) -> Self { + self.next_attempt_delay_ms = next_attempt_delay_ms; + self + } + fn validate(&self) -> Result<(), RadrootsSdkError> { if self.limit == 0 || self.limit > PUSH_OUTBOX_MAX_LIMIT { return Err(RadrootsSdkError::InvalidRequest { message: format!("push_outbox limit must be between 1 and {PUSH_OUTBOX_MAX_LIMIT}"), }); } + if self.claim_ttl_ms <= 0 { + return Err(RadrootsSdkError::InvalidRequest { + message: "push_outbox claim TTL must be positive".to_owned(), + }); + } + if self.next_attempt_delay_ms <= 0 { + return Err(RadrootsSdkError::InvalidRequest { + message: "push_outbox next attempt delay must be positive".to_owned(), + }); + } Ok(()) } } @@ -192,6 +334,25 @@ impl From<RadrootsRelayOutcomeKind> for PushOutboxRelayOutcomeKind { #[cfg(feature = "runtime")] impl<'sdk> SyncClient<'sdk> { + pub async fn status( + &self, + _request: SyncStatusRequest, + ) -> Result<SyncStatusReceipt, RadrootsSdkError> { + let observed_at_ms = sdk_now_ms(self.sdk)?; + let event_store = self.sdk._event_store.status_summary().await?; + let outbox = self.sdk._outbox.status_summary(observed_at_ms).await?; + Ok(SyncStatusReceipt { + source: SyncStatusSource::SdkCanonicalStores, + observed_at_ms, + event_store: event_store.into(), + outbox: outbox.into(), + relay_targets: SyncRelayTargetSummary { + configured_count: self.sdk.relay_urls().len(), + configured_relays: self.sdk.relay_urls().to_vec(), + }, + }) + } + pub async fn push_outbox( &self, request: PushOutboxRequest, @@ -222,9 +383,9 @@ impl<'sdk> SyncClient<'sdk> { A: RadrootsRelayPublishAdapter, { request.validate()?; - let now_ms = sdk_now_ms(self.sdk)?; let mut receipt = PushOutboxReceipt::default(); for _ in 0..request.limit { + let claim_now_ms = sdk_now_ms(self.sdk)?; let claim_token = push_outbox_claim_token(); let Some(claimed) = self .sdk @@ -232,24 +393,26 @@ impl<'sdk> SyncClient<'sdk> { .claim_next_ready_signed_event( CLAIM_OWNER, claim_token.as_str(), - now_ms.saturating_add(CLAIM_TTL_MS), - now_ms, + claim_now_ms.saturating_add(request.claim_ttl_ms), + claim_now_ms, ) .await? else { break; }; - let policy = - RadrootsOutboxPublishPolicy::new(now_ms.saturating_add(NEXT_ATTEMPT_DELAY_MS)) - .republish_accepted_relays(request.republish_accepted_relays) - .relay_url_policy(RadrootsRelayUrlPolicy::Localhost); + let publish_now_ms = sdk_now_ms(self.sdk)?; + let policy = RadrootsOutboxPublishPolicy::new( + publish_now_ms.saturating_add(request.next_attempt_delay_ms), + ) + .republish_accepted_relays(request.republish_accepted_relays) + .relay_url_policy(request.relay_url_policy.relay_transport_policy()); let publish = publish_claimed_outbox_event( &self.sdk._outbox, &self.sdk._event_store, adapter, &claimed, policy, - now_ms, + publish_now_ms, ) .await?; let outbox_event = self diff --git a/crates/sdk/tests/sync_runtime.rs b/crates/sdk/tests/sync_runtime.rs @@ -21,12 +21,15 @@ use radroots_relay_transport::{ RadrootsRelayPublishRelayReceipt, RadrootsRelayPublishRequest, RadrootsRelayTransportError, }; use radroots_sdk::{ - ListingEnqueuePublishRequest, ListingPreparePublishRequest, PUSH_OUTBOX_DEFAULT_LIMIT, - PUSH_OUTBOX_MAX_LIMIT, PushOutboxEventReceipt, PushOutboxEventState, PushOutboxReceipt, - PushOutboxRelayOutcomeKind, PushOutboxRelayReceipt, PushOutboxRequest, RadrootsSdk, - RadrootsSdkError, RadrootsSdkTimestamp, SdkRelayTargetPolicy, SdkRelayUrlPolicy, + ListingEnqueuePublishRequest, ListingPreparePublishRequest, PUSH_OUTBOX_DEFAULT_CLAIM_TTL_MS, + PUSH_OUTBOX_DEFAULT_LIMIT, PUSH_OUTBOX_DEFAULT_NEXT_ATTEMPT_DELAY_MS, PUSH_OUTBOX_MAX_LIMIT, + PushOutboxEventReceipt, PushOutboxEventState, PushOutboxReceipt, PushOutboxRelayOutcomeKind, + PushOutboxRelayReceipt, PushOutboxRequest, RadrootsSdk, RadrootsSdkError, RadrootsSdkTimestamp, + SdkRelayAuthPolicy, SdkRelayTargetPolicy, SdkRelayUrlPolicy, SyncStatusRequest, + SyncStatusSource, }; -use std::collections::BTreeSet; +use std::sync::{Arc, Mutex}; +use std::time::Duration; const SELLER: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; const FARM_D_TAG: &str = "AAAAAAAAAAAAAAAAAAAAAA"; @@ -49,6 +52,13 @@ struct FixtureSigner { struct TransportFailurePublishAdapter; +#[derive(Clone)] +struct RecordingPublishAdapter { + delay: Duration, + raw_events: Arc<Mutex<Vec<String>>>, + request_times_ms: Arc<Mutex<Vec<i64>>>, +} + impl RadrootsRelayPublishAdapter for TransportFailurePublishAdapter { fn publish<'a>( &'a self, @@ -63,6 +73,60 @@ impl RadrootsRelayPublishAdapter for TransportFailurePublishAdapter { } } +impl RecordingPublishAdapter { + fn new(delay: Duration) -> Self { + Self { + delay, + raw_events: Arc::new(Mutex::new(Vec::new())), + request_times_ms: Arc::new(Mutex::new(Vec::new())), + } + } + + fn captured_raw_events(&self) -> Vec<String> { + self.raw_events.lock().expect("raw event lock").clone() + } + + fn request_times_ms(&self) -> Vec<i64> { + self.request_times_ms + .lock() + .expect("request time lock") + .clone() + } +} + +impl RadrootsRelayPublishAdapter for RecordingPublishAdapter { + fn publish<'a>( + &'a self, + request: RadrootsRelayPublishRequest, + ) -> BoxFuture<'a, Result<Vec<RadrootsRelayPublishRelayReceipt>, RadrootsRelayTransportError>> + { + Box::pin(async move { + if !self.delay.is_zero() { + tokio::time::sleep(self.delay).await; + } + self.raw_events + .lock() + .expect("raw event lock") + .push(request.signed_event.raw_json.clone()); + self.request_times_ms + .lock() + .expect("request time lock") + .push(request.now_ms); + Ok(request + .targets + .relays() + .iter() + .map(|relay| { + RadrootsRelayPublishRelayReceipt::attempted( + relay.as_str(), + RadrootsRelayOutcome::accepted(), + ) + }) + .collect()) + }) + } +} + impl FixtureSigner { fn new(pubkey: &str) -> Self { Self { @@ -181,6 +245,16 @@ async fn directory_sdk(relays: &[&str]) -> (tempfile::TempDir, RadrootsSdk) { (tempdir, sdk) } +async fn system_clock_directory_sdk(relays: &[&str]) -> (tempfile::TempDir, RadrootsSdk) { + let tempdir = tempfile::tempdir().expect("tempdir"); + let mut builder = RadrootsSdk::builder().directory_storage(tempdir.path().join("sdk")); + for relay in relays { + builder = builder.relay_url(*relay); + } + let sdk = builder.build().await.expect("sdk"); + (tempdir, sdk) +} + async fn enqueue_listing(sdk: &RadrootsSdk, d_tag: &str, title: &str, relays: &[&str]) -> i64 { enqueue_listing_with_policy(sdk, d_tag, title, relays, SdkRelayUrlPolicy::Public).await } @@ -209,6 +283,103 @@ async fn enqueue_listing_with_policy( } #[tokio::test] +async fn sync_status_empty_store_reports_canonical_sources_and_configured_relays() { + let (_tempdir, sdk) = directory_sdk(&[RELAY_B, RELAY_A]).await; + + let receipt = sdk + .sync() + .status(SyncStatusRequest::new()) + .await + .expect("status"); + + assert_eq!(receipt.source, SyncStatusSource::SdkCanonicalStores); + assert_eq!(receipt.observed_at_ms, 1_700_000_000_000); + assert_eq!(receipt.event_store.total_events, 0); + assert_eq!(receipt.event_store.projection_eligible_events, 0); + assert_eq!(receipt.event_store.relay_observations, 0); + assert_eq!(receipt.event_store.last_event_seq, None); + assert_eq!(receipt.outbox.total_events, 0); + assert_eq!(receipt.outbox.pending_events, 0); + assert_eq!(receipt.outbox.retryable_events, 0); + assert_eq!(receipt.outbox.terminal_events, 0); + assert_eq!(receipt.outbox.ready_signed_events, 0); + assert_eq!(receipt.relay_targets.configured_count, 2); + assert_eq!( + receipt.relay_targets.configured_relays, + vec![RELAY_B.to_owned(), RELAY_A.to_owned()] + ); + assert_eq!( + serde_json::to_value(&receipt).expect("status json"), + serde_json::json!({ + "source": "sdk_canonical_stores", + "observed_at_ms": 1700000000000i64, + "event_store": { + "total_events": 0, + "projection_eligible_events": 0, + "relay_observations": 0, + "last_event_seq": null, + "last_event_updated_at_ms": null + }, + "outbox": { + "total_events": 0, + "pending_events": 0, + "retryable_events": 0, + "terminal_events": 0, + "ready_signed_events": 0, + "publishing_events": 0, + "last_attempt_at_ms": null, + "last_error": null + }, + "relay_targets": { + "configured_count": 2, + "configured_relays": [RELAY_B, RELAY_A] + } + }) + ); +} + +#[tokio::test] +async fn sync_status_reports_pending_retryable_terminal_and_last_attempt_metadata() { + let (_tempdir, sdk) = directory_sdk(&[RELAY_A, RELAY_B, RELAY_C]).await; + enqueue_listing(&sdk, LISTING_A_D_TAG, "Retryable Coffee", &[RELAY_A]).await; + sdk.sync() + .push_outbox_with_adapter( + &TransportFailurePublishAdapter, + PushOutboxRequest::new().with_limit(1), + ) + .await + .expect("retryable push"); + enqueue_listing(&sdk, LISTING_B_D_TAG, "Published Coffee", &[RELAY_B]).await; + sdk.sync() + .push_outbox_with_adapter( + &RadrootsMockRelayPublishAdapter::new(), + PushOutboxRequest::new().with_limit(1), + ) + .await + .expect("published push"); + enqueue_listing(&sdk, LISTING_C_D_TAG, "Pending Coffee", &[RELAY_C]).await; + + let receipt = sdk + .sync() + .status(SyncStatusRequest::new()) + .await + .expect("status"); + + assert_eq!(receipt.event_store.total_events, 3); + assert_eq!(receipt.outbox.total_events, 3); + assert_eq!(receipt.outbox.pending_events, 1); + assert_eq!(receipt.outbox.retryable_events, 1); + assert_eq!(receipt.outbox.terminal_events, 1); + assert_eq!(receipt.outbox.ready_signed_events, 1); + assert_eq!(receipt.outbox.publishing_events, 0); + assert_eq!(receipt.outbox.last_attempt_at_ms, Some(1_700_000_000_000)); + assert_eq!( + receipt.outbox.last_error.as_deref(), + Some("relay publish incomplete") + ); +} + +#[tokio::test] async fn push_outbox_empty_queue_returns_zero_counts() { let (_tempdir, sdk) = directory_sdk(&[]).await; let adapter = RadrootsMockRelayPublishAdapter::new(); @@ -231,14 +402,24 @@ async fn push_outbox_empty_queue_returns_zero_counts() { fn push_outbox_contract_dtos_serialize_deterministically() { let request = PushOutboxRequest::new() .with_limit(2) - .republish_accepted_relays(true); + .republish_accepted_relays(true) + .with_relay_url_policy(SdkRelayUrlPolicy::Localhost) + .with_auth_policy(SdkRelayAuthPolicy::DetectOnly) + .with_claim_ttl_ms(1_000) + .with_next_attempt_delay_ms(2_000); assert_eq!( serde_json::to_value(&request).expect("request json"), serde_json::json!({ "limit": 2, - "republish_accepted_relays": true + "republish_accepted_relays": true, + "relay_url_policy": "localhost", + "auth_policy": "detect_only", + "claim_ttl_ms": 1000, + "next_attempt_delay_ms": 2000 }) ); + assert_eq!(PUSH_OUTBOX_DEFAULT_CLAIM_TTL_MS, 30_000); + assert_eq!(PUSH_OUTBOX_DEFAULT_NEXT_ATTEMPT_DELAY_MS, 60_000); let receipt = PushOutboxReceipt { attempted_events: 1, @@ -341,9 +522,27 @@ async fn push_outbox_rejects_invalid_limits_before_claiming() { ) .await .expect_err("too large"); + let zero_ttl = sdk + .sync() + .push_outbox_with_adapter(&adapter, PushOutboxRequest::new().with_claim_ttl_ms(0)) + .await + .expect_err("zero ttl"); + let zero_delay = sdk + .sync() + .push_outbox_with_adapter( + &adapter, + PushOutboxRequest::new().with_next_attempt_delay_ms(0), + ) + .await + .expect_err("zero delay"); assert!(matches!(zero, RadrootsSdkError::InvalidRequest { .. })); assert!(matches!(too_large, RadrootsSdkError::InvalidRequest { .. })); + assert!(matches!(zero_ttl, RadrootsSdkError::InvalidRequest { .. })); + assert!(matches!( + zero_delay, + RadrootsSdkError::InvalidRequest { .. } + )); assert!(adapter.captured_raw_events().is_empty()); } @@ -392,7 +591,30 @@ async fn push_outbox_with_adapter_uses_queued_targets_without_builder_relays() { } #[tokio::test] -async fn push_outbox_with_adapter_accepts_queued_localhost_ws_targets() { +async fn push_outbox_default_public_policy_rejects_queued_localhost_ws_targets() { + let (_tempdir, sdk) = directory_sdk(&[]).await; + enqueue_listing_with_policy( + &sdk, + LISTING_A_D_TAG, + "Local Coffee", + &[LOCAL_RELAY_A], + SdkRelayUrlPolicy::Localhost, + ) + .await; + let adapter = RadrootsMockRelayPublishAdapter::new(); + + let error = sdk + .sync() + .push_outbox_with_adapter(&adapter, PushOutboxRequest::new().with_limit(1)) + .await + .expect_err("public push should reject ws target"); + + assert!(matches!(error, RadrootsSdkError::InvalidRelayUrl { .. })); + assert!(adapter.captured_raw_events().is_empty()); +} + +#[tokio::test] +async fn push_outbox_with_adapter_accepts_explicit_queued_localhost_ws_targets() { let (_tempdir, sdk) = directory_sdk(&[]).await; let outbox_event_id = enqueue_listing_with_policy( &sdk, @@ -406,7 +628,12 @@ async fn push_outbox_with_adapter_accepts_queued_localhost_ws_targets() { let receipt = sdk .sync() - .push_outbox_with_adapter(&adapter, PushOutboxRequest::new().with_limit(1)) + .push_outbox_with_adapter( + &adapter, + PushOutboxRequest::new() + .with_limit(1) + .with_relay_url_policy(SdkRelayUrlPolicy::Localhost), + ) .await .expect("push"); @@ -435,10 +662,10 @@ async fn push_outbox_with_adapter_accepts_queued_localhost_ws_targets() { .relays .iter() .map(|relay| relay.relay_url.as_str()) - .collect::<BTreeSet<_>>(); + .collect::<Vec<_>>(); assert_eq!( relay_urls, - BTreeSet::from([LOCAL_RELAY_A, LOCAL_RELAY_B, LOCAL_RELAY_C]) + vec![LOCAL_RELAY_A, LOCAL_RELAY_B, LOCAL_RELAY_C] ); assert_eq!(adapter.captured_raw_events().len(), 1); } @@ -597,6 +824,48 @@ async fn push_outbox_continues_after_adapter_transport_failure_and_releases_clai } #[tokio::test] +async fn concurrent_push_outbox_claims_do_not_publish_the_same_event_twice() { + let (_tempdir, sdk) = directory_sdk(&[RELAY_A]).await; + enqueue_listing(&sdk, LISTING_A_D_TAG, "Coffee", &[RELAY_A]).await; + let adapter = RecordingPublishAdapter::new(Duration::from_millis(50)); + let request = PushOutboxRequest::new().with_limit(1); + let sync = sdk.sync(); + + let (left, right) = tokio::join!( + sync.push_outbox_with_adapter(&adapter, request.clone()), + sync.push_outbox_with_adapter(&adapter, request) + ); + let left = left.expect("left push"); + let right = right.expect("right push"); + + assert_eq!(left.attempted_events + right.attempted_events, 1); + assert_eq!(left.published_events + right.published_events, 1); + assert_eq!(adapter.captured_raw_events().len(), 1); +} + +#[tokio::test] +async fn push_outbox_computes_publish_time_for_each_iteration() { + let (_tempdir, sdk) = system_clock_directory_sdk(&[RELAY_A, RELAY_B]).await; + enqueue_listing(&sdk, LISTING_A_D_TAG, "Coffee One", &[RELAY_A]).await; + enqueue_listing(&sdk, LISTING_B_D_TAG, "Coffee Two", &[RELAY_B]).await; + let adapter = RecordingPublishAdapter::new(Duration::from_millis(1_200)); + + let receipt = sdk + .sync() + .push_outbox_with_adapter(&adapter, PushOutboxRequest::new().with_limit(2)) + .await + .expect("push"); + + assert_eq!(receipt.attempted_events, 2); + let request_times_ms = adapter.request_times_ms(); + assert_eq!(request_times_ms.len(), 2); + assert!( + request_times_ms[1] > request_times_ms[0], + "request publish times should advance between iterations: {request_times_ms:?}" + ); +} + +#[tokio::test] async fn push_outbox_returns_fatal_error_for_malformed_signed_event_data() { let (_tempdir, sdk) = directory_sdk(&[RELAY_A, RELAY_B]).await; let corrupt_outbox_event_id =