lib

Core libraries for Radroots
git clone https://radroots.dev/git/lib.git
Log | Files | Refs | README | LICENSE

commit 10aa5d702a09085823560f5a97838605e76b5e7f
parent b1a4b71a0beabd12d3b68b4088c67e733020fff0
Author: triesap <tyson@radroots.org>
Date:   Fri, 12 Jun 2026 23:38:48 -0700

relay: add deterministic transport

- Add radroots_relay_transport with typed relay targets, outcome receipts, and mockable adapters.
- Implement exact signed-event publish, fetch ingest, and nostr client publish handling without rebuilding events.
- Wire outbox publish integration with persisted quorum, relay failure states, and accepted-relay retry skipping.
- Cover URL policy, prefix parsing, mock publish, fetch ingest, and outbox partial-success retry behavior.

Diffstat:
MCargo.lock | 17+++++++++++++++++
MCargo.toml | 2++
Mcrates/outbox/migrations/0001_outbox.up.sql | 1+
Mcrates/outbox/src/model.rs | 1+
Mcrates/outbox/src/store.rs | 92++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
Acrates/relay_transport/Cargo.toml | 49+++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/relay_transport/README | 3+++
Acrates/relay_transport/src/error.rs | 45+++++++++++++++++++++++++++++++++++++++++++++
Acrates/relay_transport/src/fetch.rs | 234+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/relay_transport/src/lib.rs | 25+++++++++++++++++++++++++
Acrates/relay_transport/src/outbox.rs | 239+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/relay_transport/src/outcome.rs | 120+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/relay_transport/src/publish.rs | 272+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/relay_transport/src/relay.rs | 144+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/relay_transport/tests/transport.rs | 389+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mspec/README.md | 1+
Mspec/manifest.toml | 1+
17 files changed, 1632 insertions(+), 3 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -4254,6 +4254,23 @@ dependencies = [ ] [[package]] +name = "radroots_relay_transport" +version = "0.1.0-alpha.2" +dependencies = [ + "futures", + "nostr", + "radroots_event_store", + "radroots_events", + "radroots_nostr", + "radroots_outbox", + "serde", + "serde_json", + "thiserror 1.0.69", + "tokio", + "url", +] + +[[package]] name = "radroots_replica_db" version = "0.1.0-alpha.2" dependencies = [ diff --git a/Cargo.toml b/Cargo.toml @@ -18,6 +18,7 @@ members = [ "crates/nostr_ndb", "crates/nostr_runtime", "crates/outbox", + "crates/relay_transport", "crates/runtime", "crates/secret_vault", "crates/simplex_agent_proto", @@ -82,6 +83,7 @@ radroots_log = { path = "crates/log", version = "0.1.0-alpha.2", default-feature radroots_net = { path = "crates/net", version = "0.1.0-alpha.2", default-features = false } radroots_nostr_runtime = { path = "crates/nostr_runtime", version = "0.1.0-alpha.2", default-features = false } radroots_outbox = { path = "crates/outbox", version = "0.1.0-alpha.2", default-features = false } +radroots_relay_transport = { path = "crates/relay_transport", version = "0.1.0-alpha.2", default-features = false } radroots_simplex_agent_proto = { path = "crates/simplex_agent_proto", version = "0.1.0-alpha.2", default-features = false } radroots_simplex_agent_runtime = { path = "crates/simplex_agent_runtime", version = "0.1.0-alpha.2", default-features = false } radroots_simplex_agent_store = { path = "crates/simplex_agent_store", version = "0.1.0-alpha.2", default-features = false } diff --git a/crates/outbox/migrations/0001_outbox.up.sql b/crates/outbox/migrations/0001_outbox.up.sql @@ -25,6 +25,7 @@ CREATE TABLE outbox_event ( signed_event_json TEXT, raw_event_json TEXT, state TEXT NOT NULL, + accepted_quorum INTEGER NOT NULL, attempt_count INTEGER NOT NULL, claim_token TEXT, claim_owner TEXT, diff --git a/crates/outbox/src/model.rs b/crates/outbox/src/model.rs @@ -170,6 +170,7 @@ pub struct RadrootsOutboxEventRecord { pub signed_event: Option<RadrootsSignedNostrEvent>, pub raw_event_json: Option<String>, pub state: RadrootsOutboxEventState, + pub accepted_quorum: i64, pub attempt_count: i64, pub claim_token: Option<String>, pub claim_owner: Option<String>, diff --git a/crates/outbox/src/store.rs b/crates/outbox/src/store.rs @@ -80,6 +80,7 @@ impl RadrootsOutbox { &input.draft, &target_relays, )?; + let accepted_quorum = target_relays.len() as i64; let mut tx = self.pool.begin().await?; if let Some(idempotency_key) = input.idempotency_key.as_deref() { @@ -126,13 +127,14 @@ impl RadrootsOutbox { let operation_id = operation.last_insert_rowid(); let draft_json = serde_json::to_string(&input.draft)?; let event = sqlx::query( - "INSERT INTO outbox_event(operation_id, event_id, expected_pubkey, draft_json, state, attempt_count, next_attempt_after_ms, event_store_ingested, event_store_inserted, created_at_ms, updated_at_ms) VALUES (?, ?, ?, ?, ?, 0, ?, 0, 0, ?, ?)", + "INSERT INTO outbox_event(operation_id, event_id, expected_pubkey, draft_json, state, accepted_quorum, attempt_count, next_attempt_after_ms, event_store_ingested, event_store_inserted, created_at_ms, updated_at_ms) VALUES (?, ?, ?, ?, ?, ?, 0, ?, 0, 0, ?, ?)", ) .bind(operation_id) .bind(input.draft.expected_event_id.as_str()) .bind(input.draft.expected_pubkey.as_str()) .bind(draft_json.as_str()) .bind(RadrootsOutboxEventState::DraftQueued.as_str()) + .bind(accepted_quorum) .bind(input.created_at_ms) .bind(input.created_at_ms) .bind(input.created_at_ms) @@ -179,7 +181,7 @@ impl RadrootsOutbox { outbox_event_id: i64, ) -> Result<Option<RadrootsOutboxEventRecord>, RadrootsOutboxError> { let row = sqlx::query( - "SELECT outbox_event_id, operation_id, event_id, expected_pubkey, draft_json, signed_event_json, raw_event_json, state, attempt_count, claim_token, claim_owner, claim_expires_at_ms, next_attempt_after_ms, last_error, event_store_ingested, event_store_inserted, event_store_ingested_at_ms, created_at_ms, updated_at_ms FROM outbox_event WHERE outbox_event_id = ?", + "SELECT outbox_event_id, operation_id, event_id, expected_pubkey, draft_json, signed_event_json, raw_event_json, state, accepted_quorum, attempt_count, claim_token, claim_owner, claim_expires_at_ms, next_attempt_after_ms, last_error, event_store_ingested, event_store_inserted, event_store_ingested_at_ms, created_at_ms, updated_at_ms FROM outbox_event WHERE outbox_event_id = ?", ) .bind(outbox_event_id) .fetch_optional(&self.pool) @@ -456,6 +458,65 @@ impl RadrootsOutbox { Ok(()) } + pub async fn set_publish_quorum( + &self, + outbox_event_id: i64, + claim_token: &str, + accepted_quorum: i64, + now_ms: i64, + ) -> Result<(), RadrootsOutboxError> { + self.ensure_claim_token(outbox_event_id, claim_token) + .await?; + sqlx::query( + "UPDATE outbox_event SET accepted_quorum = ?, updated_at_ms = ? WHERE outbox_event_id = ? AND claim_token = ?", + ) + .bind(accepted_quorum) + .bind(now_ms) + .bind(outbox_event_id) + .bind(claim_token) + .execute(&self.pool) + .await?; + Ok(()) + } + + pub async fn mark_relay_failed_retryable( + &self, + outbox_event_id: i64, + claim_token: &str, + relay_url: &str, + error: &str, + attempted_at_ms: i64, + ) -> Result<(), RadrootsOutboxError> { + self.mark_relay_failed( + outbox_event_id, + claim_token, + relay_url, + RadrootsOutboxRelayStatus::FailedRetryable, + error, + attempted_at_ms, + ) + .await + } + + pub async fn mark_relay_failed_terminal( + &self, + outbox_event_id: i64, + claim_token: &str, + relay_url: &str, + error: &str, + attempted_at_ms: i64, + ) -> Result<(), RadrootsOutboxError> { + self.mark_relay_failed( + outbox_event_id, + claim_token, + relay_url, + RadrootsOutboxRelayStatus::FailedTerminal, + error, + attempted_at_ms, + ) + .await + } + async fn claimed_event( &self, outbox_event_id: i64, @@ -486,6 +547,30 @@ impl RadrootsOutbox { } Ok(()) } + + async fn mark_relay_failed( + &self, + outbox_event_id: i64, + claim_token: &str, + relay_url: &str, + status: RadrootsOutboxRelayStatus, + error: &str, + attempted_at_ms: i64, + ) -> Result<(), RadrootsOutboxError> { + self.ensure_claim_token(outbox_event_id, claim_token) + .await?; + sqlx::query( + "UPDATE outbox_event_relay_status SET status = ?, attempt_count = attempt_count + 1, last_attempt_at_ms = ?, acknowledged_at_ms = NULL, last_error = ? WHERE outbox_event_id = ? AND relay_url = ?", + ) + .bind(status.as_str()) + .bind(attempted_at_ms) + .bind(error) + .bind(outbox_event_id) + .bind(relay_url) + .execute(&self.pool) + .await?; + Ok(()) + } } struct ExistingOperation { @@ -563,7 +648,7 @@ async fn event_by_id_tx( outbox_event_id: i64, ) -> Result<RadrootsOutboxEventRecord, RadrootsOutboxError> { let row = sqlx::query( - "SELECT outbox_event_id, operation_id, event_id, expected_pubkey, draft_json, signed_event_json, raw_event_json, state, attempt_count, claim_token, claim_owner, claim_expires_at_ms, next_attempt_after_ms, last_error, event_store_ingested, event_store_inserted, event_store_ingested_at_ms, created_at_ms, updated_at_ms FROM outbox_event WHERE outbox_event_id = ?", + "SELECT outbox_event_id, operation_id, event_id, expected_pubkey, draft_json, signed_event_json, raw_event_json, state, accepted_quorum, attempt_count, claim_token, claim_owner, claim_expires_at_ms, next_attempt_after_ms, last_error, event_store_ingested, event_store_inserted, event_store_ingested_at_ms, created_at_ms, updated_at_ms FROM outbox_event WHERE outbox_event_id = ?", ) .bind(outbox_event_id) .fetch_one(&mut **tx) @@ -635,6 +720,7 @@ fn event_from_row( signed_event, raw_event_json: row.try_get("raw_event_json")?, state, + accepted_quorum: row.try_get("accepted_quorum")?, attempt_count: row.try_get("attempt_count")?, claim_token: row.try_get("claim_token")?, claim_owner: row.try_get("claim_owner")?, diff --git a/crates/relay_transport/Cargo.toml b/crates/relay_transport/Cargo.toml @@ -0,0 +1,49 @@ +[package] +name = "radroots_relay_transport" +publish = false +version = "0.1.0-alpha.2" +edition.workspace = true +authors = ["Tyson Lupul <tyson@radroots.org>"] +rust-version.workspace = true +license.workspace = true +description = "Deterministic Nostr relay transport substrate" +repository.workspace = true +homepage.workspace = true +documentation = "https://docs.rs/radroots_relay_transport" +readme = "README" + +[features] +default = ["std", "runtime-tokio"] +std = [] +runtime-tokio = [ + "radroots_event_store/runtime-tokio", + "radroots_outbox/runtime-tokio", +] + +[dependencies] +radroots_events = { workspace = true, default-features = false, features = [ + "std", + "serde", +] } +radroots_event_store = { workspace = true, default-features = false, features = [ + "sqlite", + "runtime-tokio", +] } +radroots_nostr = { workspace = true, default-features = false, features = [ + "std", + "client", + "events", +] } +radroots_outbox = { workspace = true, default-features = false, features = [ + "sqlite", + "runtime-tokio", +] } +futures = { workspace = true } +nostr = { workspace = true } +serde = { workspace = true, features = ["derive", "std"] } +serde_json = { workspace = true, features = ["std"] } +thiserror = { workspace = true } +url = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true, features = ["macros", "rt"] } diff --git a/crates/relay_transport/README b/crates/relay_transport/README @@ -0,0 +1,3 @@ +# radroots_relay_transport + +Deterministic Nostr relay transport substrate for exact signed-event publish, fetch ingest, and outbox relay status coordination. diff --git a/crates/relay_transport/src/error.rs b/crates/relay_transport/src/error.rs @@ -0,0 +1,45 @@ +#![forbid(unsafe_code)] + +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum RadrootsRelayTransportError { + #[error("Relay URL parse failed for `{url}`: {reason}")] + RelayUrlParse { url: String, reason: String }, + + #[error("Relay URL `{url}` uses ws outside local-dev policy")] + WsRequiresLocalPolicy { url: String }, + + #[error("Relay URL `{url}` has unsupported scheme `{scheme}`")] + UnsupportedRelayScheme { url: String, scheme: String }, + + #[error("Relay URL `{url}` must include a host")] + EmptyRelayHost { url: String }, + + #[error("Relay URL `{url}` must not include userinfo")] + RelayUrlUserinfo { url: String }, + + #[error("Relay URL `{url}` must not include query or fragment")] + RelayUrlQueryOrFragment { url: String }, + + #[error("Relay target set must not be empty")] + EmptyTargetSet, + + #[error("JSON error: {0}")] + Json(#[from] serde_json::Error), + + #[error("Nostr event JSON error: {0}")] + NostrEventJson(String), + + #[error("Event store error: {0}")] + EventStore(#[from] radroots_event_store::RadrootsEventStoreError), + + #[error("Outbox error: {0}")] + Outbox(#[from] radroots_outbox::RadrootsOutboxError), + + #[error("Outbox claim {0} does not contain a signed event")] + MissingSignedOutboxEvent(i64), + + #[error("Relay transport error: {0}")] + Transport(String), +} diff --git a/crates/relay_transport/src/fetch.rs b/crates/relay_transport/src/fetch.rs @@ -0,0 +1,234 @@ +#![forbid(unsafe_code)] + +use crate::RadrootsRelayTransportError; +use futures::future::BoxFuture; +use nostr::JsonUtil; +use radroots_event_store::{ + RadrootsEventContractStatus, RadrootsEventIngest, RadrootsEventStore, RadrootsRelayObservation, + RadrootsRelayObservationType, +}; +use radroots_nostr::prelude::{RadrootsNostrEvent, radroots_event_from_nostr}; +use serde::{Deserialize, Serialize}; +use std::sync::{Arc, Mutex}; + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum RadrootsRelayFetchMode { + Fetch, + Subscription, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct RadrootsRelayFetchRequest { + pub mode: RadrootsRelayFetchMode, + pub observed_at_ms: i64, + pub max_events: usize, +} + +impl RadrootsRelayFetchRequest { + pub fn fetch(observed_at_ms: i64, max_events: usize) -> Self { + Self { + mode: RadrootsRelayFetchMode::Fetch, + observed_at_ms, + max_events, + } + } + + pub fn subscription(observed_at_ms: i64, max_events: usize) -> Self { + Self { + mode: RadrootsRelayFetchMode::Subscription, + observed_at_ms, + max_events, + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum RadrootsRelayFetchItem { + Event { + relay_url: String, + raw_json: String, + observed_at_ms: i64, + }, + Eose { + relay_url: String, + }, + Closed { + relay_url: String, + message: String, + }, + Notice { + relay_url: String, + message: String, + }, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RadrootsRelayFetchEventReceipt { + pub relay_url: String, + pub event_id: Option<String>, + pub inserted: bool, + pub duplicate: bool, + pub unsupported: bool, + pub malformed: bool, + pub message: Option<String>, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RadrootsRelayFetchReceipt { + pub inserted_count: usize, + pub duplicate_count: usize, + pub malformed_count: usize, + pub unsupported_count: usize, + pub eose_count: usize, + pub closed_count: usize, + pub notice_count: usize, + pub events: Vec<RadrootsRelayFetchEventReceipt>, +} + +pub trait RadrootsRelayFetchAdapter: Send + Sync { + fn fetch<'a>( + &'a self, + request: RadrootsRelayFetchRequest, + ) -> BoxFuture<'a, Result<Vec<RadrootsRelayFetchItem>, RadrootsRelayTransportError>>; +} + +pub async fn fetch_and_ingest_relay_events<A>( + adapter: &A, + event_store: &RadrootsEventStore, + request: RadrootsRelayFetchRequest, +) -> Result<RadrootsRelayFetchReceipt, RadrootsRelayTransportError> +where + A: RadrootsRelayFetchAdapter, +{ + let mode = request.mode; + let max_events = request.max_events; + let items = adapter.fetch(request).await?; + let mut receipt = RadrootsRelayFetchReceipt { + inserted_count: 0, + duplicate_count: 0, + malformed_count: 0, + unsupported_count: 0, + eose_count: 0, + closed_count: 0, + notice_count: 0, + events: Vec::new(), + }; + let mut processed_events = 0usize; + for item in items { + match item { + RadrootsRelayFetchItem::Event { + relay_url, + raw_json, + observed_at_ms, + } => { + if processed_events >= max_events { + break; + } + processed_events += 1; + let parsed = RadrootsNostrEvent::from_json(raw_json.as_str()); + let Ok(raw_event) = parsed else { + receipt.malformed_count += 1; + receipt.events.push(RadrootsRelayFetchEventReceipt { + relay_url, + event_id: None, + inserted: false, + duplicate: false, + unsupported: false, + malformed: true, + message: Some("event JSON parse failed".to_owned()), + }); + continue; + }; + let event = radroots_event_from_nostr(&raw_event); + let observation_type = match mode { + RadrootsRelayFetchMode::Fetch => RadrootsRelayObservationType::Fetch, + RadrootsRelayFetchMode::Subscription => { + RadrootsRelayObservationType::Subscription + } + }; + let ingest = RadrootsEventIngest::verified(event, observed_at_ms) + .with_raw_json(raw_json) + .with_observation(RadrootsRelayObservation::new( + relay_url.clone(), + observation_type, + observed_at_ms, + )); + match event_store.ingest_event(ingest).await { + Ok(store_receipt) => { + let unsupported = + store_receipt.contract_status != RadrootsEventContractStatus::Supported; + if store_receipt.inserted { + receipt.inserted_count += 1; + } else { + receipt.duplicate_count += 1; + } + if unsupported { + receipt.unsupported_count += 1; + } + receipt.events.push(RadrootsRelayFetchEventReceipt { + relay_url, + event_id: Some(store_receipt.event_id), + inserted: store_receipt.inserted, + duplicate: !store_receipt.inserted, + unsupported, + malformed: false, + message: None, + }); + } + Err(error) => { + receipt.malformed_count += 1; + receipt.events.push(RadrootsRelayFetchEventReceipt { + relay_url, + event_id: Some(raw_event.id.to_hex()), + inserted: false, + duplicate: false, + unsupported: false, + malformed: true, + message: Some(error.to_string()), + }); + } + } + } + RadrootsRelayFetchItem::Eose { .. } => { + receipt.eose_count += 1; + } + RadrootsRelayFetchItem::Closed { .. } => { + receipt.closed_count += 1; + } + RadrootsRelayFetchItem::Notice { .. } => { + receipt.notice_count += 1; + } + } + } + Ok(receipt) +} + +#[derive(Clone, Default)] +pub struct RadrootsMockRelayFetchAdapter { + items: Arc<Mutex<Vec<RadrootsRelayFetchItem>>>, +} + +impl RadrootsMockRelayFetchAdapter { + pub fn new(items: Vec<RadrootsRelayFetchItem>) -> Self { + Self { + items: Arc::new(Mutex::new(items)), + } + } +} + +impl RadrootsRelayFetchAdapter for RadrootsMockRelayFetchAdapter { + fn fetch<'a>( + &'a self, + _request: RadrootsRelayFetchRequest, + ) -> BoxFuture<'a, Result<Vec<RadrootsRelayFetchItem>, RadrootsRelayTransportError>> { + Box::pin(async move { + Ok(self + .items + .lock() + .map_err(|_| { + RadrootsRelayTransportError::Transport("fetch item lock poisoned".to_owned()) + })? + .clone()) + }) + } +} diff --git a/crates/relay_transport/src/lib.rs b/crates/relay_transport/src/lib.rs @@ -0,0 +1,25 @@ +#![forbid(unsafe_code)] + +mod error; +mod fetch; +mod outbox; +mod outcome; +mod publish; +mod relay; + +pub use error::RadrootsRelayTransportError; +pub use fetch::{ + RadrootsMockRelayFetchAdapter, RadrootsRelayFetchAdapter, RadrootsRelayFetchEventReceipt, + RadrootsRelayFetchItem, RadrootsRelayFetchMode, RadrootsRelayFetchReceipt, + RadrootsRelayFetchRequest, fetch_and_ingest_relay_events, +}; +pub use outbox::{ + RadrootsOutboxPublishPolicy, RadrootsOutboxPublishReceipt, publish_claimed_outbox_event, +}; +pub use outcome::{RadrootsRelayOutcome, RadrootsRelayOutcomeKind}; +pub use publish::{ + RadrootsMockRelayPublishAdapter, RadrootsNostrClientPublishAdapter, + RadrootsRelayPublishAdapter, RadrootsRelayPublishReceipt, RadrootsRelayPublishRelayReceipt, + RadrootsRelayPublishRequest, publish_signed_event, +}; +pub use relay::{RadrootsRelayTargetSet, RadrootsRelayUrl, RadrootsRelayUrlPolicy}; diff --git a/crates/relay_transport/src/outbox.rs b/crates/relay_transport/src/outbox.rs @@ -0,0 +1,239 @@ +#![forbid(unsafe_code)] + +use crate::{ + RadrootsRelayOutcomeKind, RadrootsRelayPublishAdapter, RadrootsRelayPublishReceipt, + RadrootsRelayPublishRequest, RadrootsRelayTargetSet, RadrootsRelayTransportError, + RadrootsRelayUrlPolicy, publish_signed_event, +}; +use radroots_event_store::{ + RadrootsEventIngest, RadrootsEventStore, RadrootsRelayObservation, RadrootsRelayObservationType, +}; +use radroots_events::RadrootsNostrEvent; +use radroots_events::draft::RadrootsSignedNostrEvent; +use radroots_outbox::{ + RadrootsOutbox, RadrootsOutboxClaimedEvent, RadrootsOutboxEventStoreIngestReceipt, + RadrootsOutboxRelayStatus, +}; + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct RadrootsOutboxPublishPolicy { + pub accepted_quorum: Option<usize>, + pub next_attempt_after_ms: i64, + pub republish_accepted_relays: bool, + pub relay_url_policy: RadrootsRelayUrlPolicy, +} + +impl RadrootsOutboxPublishPolicy { + pub fn new(next_attempt_after_ms: i64) -> Self { + Self { + accepted_quorum: None, + next_attempt_after_ms, + republish_accepted_relays: false, + relay_url_policy: RadrootsRelayUrlPolicy::Public, + } + } + + pub fn with_accepted_quorum(mut self, accepted_quorum: usize) -> Self { + self.accepted_quorum = Some(accepted_quorum); + self + } + + pub fn republish_accepted_relays(mut self, enabled: bool) -> Self { + self.republish_accepted_relays = enabled; + self + } + + pub fn relay_url_policy(mut self, policy: RadrootsRelayUrlPolicy) -> Self { + self.relay_url_policy = policy; + self + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct RadrootsOutboxPublishReceipt { + pub local_ingest: RadrootsOutboxEventStoreIngestReceipt, + pub publish: RadrootsRelayPublishReceipt, +} + +pub async fn publish_claimed_outbox_event<A>( + outbox: &RadrootsOutbox, + event_store: &RadrootsEventStore, + adapter: &A, + claimed: &RadrootsOutboxClaimedEvent, + policy: RadrootsOutboxPublishPolicy, + now_ms: i64, +) -> Result<RadrootsOutboxPublishReceipt, RadrootsRelayTransportError> +where + A: RadrootsRelayPublishAdapter, +{ + let signed_event = claimed.signed_event.clone().ok_or( + RadrootsRelayTransportError::MissingSignedOutboxEvent(claimed.outbox_event_id), + )?; + let local_ingest = outbox + .ingest_signed_event_local( + event_store, + claimed.outbox_event_id, + claimed.claim_token.as_str(), + now_ms, + ) + .await?; + let publishable = publishable_relays(outbox, claimed, policy.republish_accepted_relays).await?; + let targets = RadrootsRelayTargetSet::new(publishable.relays, policy.relay_url_policy)?; + let overall_quorum = policy + .accepted_quorum + .unwrap_or(publishable.total_target_count); + outbox + .set_publish_quorum( + claimed.outbox_event_id, + claimed.claim_token.as_str(), + overall_quorum as i64, + now_ms, + ) + .await?; + let quorum = overall_quorum.saturating_sub(publishable.accepted_count); + let request = RadrootsRelayPublishRequest::new(signed_event.clone(), targets, now_ms) + .with_accepted_quorum(quorum); + let publish = publish_signed_event(adapter, request).await?; + + for relay in &publish.relays { + match relay.outcome.kind { + RadrootsRelayOutcomeKind::Accepted | RadrootsRelayOutcomeKind::DuplicateAccepted => { + outbox + .mark_relay_accepted( + claimed.outbox_event_id, + claimed.claim_token.as_str(), + relay.relay_url.as_str(), + now_ms, + ) + .await?; + ingest_publish_observation( + event_store, + &signed_event, + relay.relay_url.as_str(), + relay.outcome.message.as_deref(), + now_ms, + ) + .await?; + } + _ if relay.outcome.is_retryable() => { + outbox + .mark_relay_failed_retryable( + claimed.outbox_event_id, + claimed.claim_token.as_str(), + relay.relay_url.as_str(), + relay + .outcome + .message + .as_deref() + .unwrap_or("relay publish retryable"), + now_ms, + ) + .await?; + } + _ => { + outbox + .mark_relay_failed_terminal( + claimed.outbox_event_id, + claimed.claim_token.as_str(), + relay.relay_url.as_str(), + relay + .outcome + .message + .as_deref() + .unwrap_or("relay publish terminal"), + now_ms, + ) + .await?; + } + } + } + + if !publish.quorum_met || publish.retryable_count > 0 || publish.terminal_count > 0 { + outbox + .mark_publish_retryable( + claimed.outbox_event_id, + claimed.claim_token.as_str(), + "relay publish incomplete", + policy.next_attempt_after_ms, + now_ms, + ) + .await?; + } + + Ok(RadrootsOutboxPublishReceipt { + local_ingest, + publish, + }) +} + +struct PublishableRelays { + relays: Vec<String>, + total_target_count: usize, + accepted_count: usize, +} + +async fn publishable_relays( + outbox: &RadrootsOutbox, + claimed: &RadrootsOutboxClaimedEvent, + republish_accepted_relays: bool, +) -> Result<PublishableRelays, RadrootsRelayTransportError> { + let statuses = outbox.relay_statuses(claimed.outbox_event_id).await?; + let mut relays = Vec::new(); + let mut total_target_count = 0usize; + let mut accepted_count = 0usize; + for status in statuses { + if !claimed + .target_relays + .iter() + .any(|relay_url| relay_url == &status.relay_url) + { + continue; + } + total_target_count += 1; + if status.status == RadrootsOutboxRelayStatus::Accepted { + accepted_count += 1; + } + if republish_accepted_relays || status.status != RadrootsOutboxRelayStatus::Accepted { + relays.push(status.relay_url); + } + } + Ok(PublishableRelays { + relays, + total_target_count, + accepted_count, + }) +} + +async fn ingest_publish_observation( + event_store: &RadrootsEventStore, + signed_event: &RadrootsSignedNostrEvent, + relay_url: &str, + message: Option<&str>, + observed_at_ms: i64, +) -> Result<(), RadrootsRelayTransportError> { + let mut observation = RadrootsRelayObservation::new( + relay_url, + RadrootsRelayObservationType::PublishAck, + observed_at_ms, + ); + if let Some(message) = message { + observation = observation.with_message(message); + } + let ingest = RadrootsEventIngest::verified(event_from_signed(signed_event), observed_at_ms) + .with_raw_json(signed_event.raw_json.clone()) + .with_observation(observation); + event_store.ingest_event(ingest).await?; + Ok(()) +} + +fn event_from_signed(signed_event: &RadrootsSignedNostrEvent) -> RadrootsNostrEvent { + RadrootsNostrEvent { + id: signed_event.id.clone(), + author: signed_event.pubkey.clone(), + created_at: signed_event.created_at, + kind: signed_event.kind, + tags: signed_event.tags.clone(), + content: signed_event.content.clone(), + sig: signed_event.sig.clone(), + } +} diff --git a/crates/relay_transport/src/outcome.rs b/crates/relay_transport/src/outcome.rs @@ -0,0 +1,120 @@ +#![forbid(unsafe_code)] + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub enum RadrootsRelayOutcomeKind { + Accepted, + DuplicateAccepted, + Blocked, + RateLimited, + Invalid, + PowRequired, + Restricted, + AuthRequired, + Error, + Timeout, + ConnectionFailed, + Unknown, +} + +impl RadrootsRelayOutcomeKind { + pub fn counts_toward_quorum(self) -> bool { + matches!(self, Self::Accepted | Self::DuplicateAccepted) + } + + pub fn is_retryable(self) -> bool { + matches!( + self, + Self::RateLimited + | Self::PowRequired + | Self::AuthRequired + | Self::Error + | Self::Timeout + | Self::ConnectionFailed + | Self::Unknown + ) + } + + pub fn is_terminal_failure(self) -> bool { + matches!(self, Self::Blocked | Self::Invalid | Self::Restricted) + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RadrootsRelayOutcome { + pub kind: RadrootsRelayOutcomeKind, + pub message: Option<String>, +} + +impl RadrootsRelayOutcome { + pub fn accepted() -> Self { + Self { + kind: RadrootsRelayOutcomeKind::Accepted, + message: None, + } + } + + pub fn duplicate_accepted(message: impl Into<String>) -> Self { + Self { + kind: RadrootsRelayOutcomeKind::DuplicateAccepted, + message: Some(message.into()), + } + } + + pub fn connection_failed(message: impl Into<String>) -> Self { + Self { + kind: RadrootsRelayOutcomeKind::ConnectionFailed, + message: Some(message.into()), + } + } + + pub fn timeout(message: impl Into<String>) -> Self { + Self { + kind: RadrootsRelayOutcomeKind::Timeout, + message: Some(message.into()), + } + } + + pub fn classify(message: impl AsRef<str>) -> Self { + let message = message.as_ref().trim(); + let lower = message.to_ascii_lowercase(); + let kind = if lower.starts_with("duplicate:") { + RadrootsRelayOutcomeKind::DuplicateAccepted + } else if lower.starts_with("blocked:") { + RadrootsRelayOutcomeKind::Blocked + } else if lower.starts_with("rate-limited:") { + RadrootsRelayOutcomeKind::RateLimited + } else if lower.starts_with("invalid:") { + RadrootsRelayOutcomeKind::Invalid + } else if lower.starts_with("pow:") { + RadrootsRelayOutcomeKind::PowRequired + } else if lower.starts_with("restricted:") { + RadrootsRelayOutcomeKind::Restricted + } else if lower.starts_with("auth-required:") { + RadrootsRelayOutcomeKind::AuthRequired + } else if lower.starts_with("error:") { + RadrootsRelayOutcomeKind::Error + } else if lower.starts_with("timeout:") { + RadrootsRelayOutcomeKind::Timeout + } else { + RadrootsRelayOutcomeKind::Unknown + }; + Self { + kind, + message: Some(message.to_owned()), + } + } + + pub fn counts_toward_quorum(&self) -> bool { + self.kind.counts_toward_quorum() + } + + pub fn is_retryable(&self) -> bool { + self.kind.is_retryable() + } + + pub fn is_terminal_failure(&self) -> bool { + self.kind.is_terminal_failure() + } +} diff --git a/crates/relay_transport/src/publish.rs b/crates/relay_transport/src/publish.rs @@ -0,0 +1,272 @@ +#![forbid(unsafe_code)] + +use crate::{ + RadrootsRelayOutcome, RadrootsRelayOutcomeKind, RadrootsRelayTargetSet, + RadrootsRelayTransportError, RadrootsRelayUrlPolicy, +}; +use futures::future::BoxFuture; +use nostr::JsonUtil; +use radroots_events::draft::RadrootsSignedNostrEvent; +use radroots_nostr::prelude::{RadrootsNostrClient, RadrootsNostrEvent}; +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; +use std::sync::{Arc, Mutex}; + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct RadrootsRelayPublishRequest { + pub signed_event: RadrootsSignedNostrEvent, + pub targets: RadrootsRelayTargetSet, + pub accepted_quorum: usize, + pub now_ms: i64, +} + +impl RadrootsRelayPublishRequest { + pub fn new( + signed_event: RadrootsSignedNostrEvent, + targets: RadrootsRelayTargetSet, + now_ms: i64, + ) -> Self { + let accepted_quorum = targets.len(); + Self { + signed_event, + targets, + accepted_quorum, + now_ms, + } + } + + pub fn with_accepted_quorum(mut self, accepted_quorum: usize) -> Self { + self.accepted_quorum = accepted_quorum; + self + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RadrootsRelayPublishRelayReceipt { + pub relay_url: String, + pub outcome: RadrootsRelayOutcome, + pub attempted: bool, +} + +impl RadrootsRelayPublishRelayReceipt { + pub fn attempted(relay_url: impl Into<String>, outcome: RadrootsRelayOutcome) -> Self { + Self { + relay_url: relay_url.into(), + outcome, + attempted: true, + } + } + + pub fn skipped(relay_url: impl Into<String>, outcome: RadrootsRelayOutcome) -> Self { + Self { + relay_url: relay_url.into(), + outcome, + attempted: false, + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RadrootsRelayPublishReceipt { + pub event_id: String, + pub attempted_count: usize, + pub accepted_count: usize, + pub retryable_count: usize, + pub terminal_count: usize, + pub quorum: usize, + pub quorum_met: bool, + pub relays: Vec<RadrootsRelayPublishRelayReceipt>, +} + +pub trait RadrootsRelayPublishAdapter: Send + Sync { + fn publish<'a>( + &'a self, + request: RadrootsRelayPublishRequest, + ) -> BoxFuture<'a, Result<Vec<RadrootsRelayPublishRelayReceipt>, RadrootsRelayTransportError>>; +} + +pub async fn publish_signed_event<A>( + adapter: &A, + request: RadrootsRelayPublishRequest, +) -> Result<RadrootsRelayPublishReceipt, RadrootsRelayTransportError> +where + A: RadrootsRelayPublishAdapter, +{ + let event_id = request.signed_event.id.clone(); + let quorum = request.accepted_quorum; + let relays = adapter.publish(request).await?; + let attempted_count = relays.iter().filter(|receipt| receipt.attempted).count(); + let accepted_count = relays + .iter() + .filter(|receipt| receipt.outcome.counts_toward_quorum()) + .count(); + let retryable_count = relays + .iter() + .filter(|receipt| receipt.outcome.is_retryable()) + .count(); + let terminal_count = relays + .iter() + .filter(|receipt| receipt.outcome.is_terminal_failure()) + .count(); + Ok(RadrootsRelayPublishReceipt { + event_id, + attempted_count, + accepted_count, + retryable_count, + terminal_count, + quorum, + quorum_met: accepted_count >= quorum, + relays, + }) +} + +#[derive(Clone, Default)] +pub struct RadrootsMockRelayPublishAdapter { + outcomes: BTreeMap<String, RadrootsRelayOutcome>, + captured_raw_events: Arc<Mutex<Vec<String>>>, +} + +impl RadrootsMockRelayPublishAdapter { + pub fn new() -> Self { + Self::default() + } + + pub fn with_outcome( + mut self, + relay_url: impl Into<String>, + outcome: RadrootsRelayOutcome, + ) -> Self { + self.outcomes.insert(relay_url.into(), outcome); + self + } + + pub fn captured_raw_events(&self) -> Vec<String> { + self.captured_raw_events + .lock() + .expect("captured raw event lock") + .clone() + } +} + +impl RadrootsRelayPublishAdapter for RadrootsMockRelayPublishAdapter { + fn publish<'a>( + &'a self, + request: RadrootsRelayPublishRequest, + ) -> BoxFuture<'a, Result<Vec<RadrootsRelayPublishRelayReceipt>, RadrootsRelayTransportError>> + { + Box::pin(async move { + self.captured_raw_events + .lock() + .map_err(|_| { + RadrootsRelayTransportError::Transport( + "captured raw event lock poisoned".to_owned(), + ) + })? + .push(request.signed_event.raw_json.clone()); + Ok(request + .targets + .relays() + .iter() + .map(|relay| { + let outcome = self + .outcomes + .get(relay.as_str()) + .cloned() + .unwrap_or_else(RadrootsRelayOutcome::accepted); + RadrootsRelayPublishRelayReceipt::attempted(relay.as_str(), outcome) + }) + .collect()) + }) + } +} + +#[derive(Clone)] +pub struct RadrootsNostrClientPublishAdapter { + client: RadrootsNostrClient, +} + +impl RadrootsNostrClientPublishAdapter { + pub fn new(client: RadrootsNostrClient) -> Self { + Self { client } + } +} + +impl RadrootsRelayPublishAdapter for RadrootsNostrClientPublishAdapter { + fn publish<'a>( + &'a self, + request: RadrootsRelayPublishRequest, + ) -> BoxFuture<'a, Result<Vec<RadrootsRelayPublishRelayReceipt>, RadrootsRelayTransportError>> + { + Box::pin(async move { + let event = RadrootsNostrEvent::from_json(request.signed_event.raw_json.as_str()) + .map_err(|error| RadrootsRelayTransportError::NostrEventJson(error.to_string()))?; + if event.id.to_hex() != request.signed_event.id { + return Err(RadrootsRelayTransportError::NostrEventJson( + "raw event JSON ID does not match signed event ID".to_owned(), + )); + } + let target_strings = request.targets.relay_strings(); + for relay_url in &target_strings { + self.client + .add_write_relay(relay_url.as_str()) + .await + .map_err(|error| RadrootsRelayTransportError::Transport(error.to_string()))?; + } + let output = match self + .client + .send_event_to(target_strings.clone(), &event) + .await + { + Ok(output) => output, + Err(error) => { + let message = error.to_string(); + return Ok(target_strings + .into_iter() + .map(|relay_url| { + RadrootsRelayPublishRelayReceipt::attempted( + relay_url, + RadrootsRelayOutcome::connection_failed(message.clone()), + ) + }) + .collect()); + } + }; + let mut receipts = Vec::new(); + for relay_url in &target_strings { + let relay = + crate::RadrootsRelayUrl::parse(relay_url, RadrootsRelayUrlPolicy::LocalDev)?; + let success = output.success.iter().any(|success_url| { + success_url.to_string().trim_end_matches('/') == relay.as_str() + }); + if success { + receipts.push(RadrootsRelayPublishRelayReceipt::attempted( + relay_url, + RadrootsRelayOutcome { + kind: RadrootsRelayOutcomeKind::Accepted, + message: Some( + "nostr-relay-pool-success-ok-message-unavailable".to_owned(), + ), + }, + )); + continue; + } + let failed = output.failed.iter().find_map(|(failed_url, message)| { + if failed_url.to_string().trim_end_matches('/') == relay.as_str() { + Some(message.clone()) + } else { + None + } + }); + let outcome = failed + .map(RadrootsRelayOutcome::classify) + .unwrap_or_else(|| { + RadrootsRelayOutcome::classify("error: relay output omitted target") + }); + receipts.push(RadrootsRelayPublishRelayReceipt::attempted( + relay_url, outcome, + )); + } + Ok(receipts) + }) + } +} diff --git a/crates/relay_transport/src/relay.rs b/crates/relay_transport/src/relay.rs @@ -0,0 +1,144 @@ +#![forbid(unsafe_code)] + +use crate::RadrootsRelayTransportError; +use serde::{Deserialize, Serialize}; +use std::collections::BTreeSet; +use std::fmt; +use url::Url; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum RadrootsRelayUrlPolicy { + Public, + LocalDev, +} + +impl RadrootsRelayUrlPolicy { + fn accepts_ws(self) -> bool { + matches!(self, Self::LocalDev) + } +} + +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +pub struct RadrootsRelayUrl(String); + +impl RadrootsRelayUrl { + pub fn parse( + value: impl AsRef<str>, + policy: RadrootsRelayUrlPolicy, + ) -> Result<Self, RadrootsRelayTransportError> { + let original = value.as_ref().trim(); + let parsed = + Url::parse(original).map_err(|error| RadrootsRelayTransportError::RelayUrlParse { + url: original.to_owned(), + reason: error.to_string(), + })?; + let scheme = parsed.scheme(); + match scheme { + "wss" => {} + "ws" if policy.accepts_ws() => {} + "ws" => { + return Err(RadrootsRelayTransportError::WsRequiresLocalPolicy { + url: original.to_owned(), + }); + } + other => { + return Err(RadrootsRelayTransportError::UnsupportedRelayScheme { + url: original.to_owned(), + scheme: other.to_owned(), + }); + } + } + if !parsed.username().is_empty() || parsed.password().is_some() { + return Err(RadrootsRelayTransportError::RelayUrlUserinfo { + url: original.to_owned(), + }); + } + if parsed.host_str().is_none_or(str::is_empty) { + return Err(RadrootsRelayTransportError::EmptyRelayHost { + url: original.to_owned(), + }); + } + if parsed.query().is_some() || parsed.fragment().is_some() { + return Err(RadrootsRelayTransportError::RelayUrlQueryOrFragment { + url: original.to_owned(), + }); + } + let mut normalized = parsed.to_string(); + if parsed.path() == "/" { + normalized.pop(); + } + Ok(Self(normalized)) + } + + pub fn as_str(&self) -> &str { + self.0.as_str() + } + + pub fn into_string(self) -> String { + self.0 + } +} + +impl fmt::Display for RadrootsRelayUrl { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(self.0.as_str()) + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RadrootsRelayTargetSet { + relays: Vec<RadrootsRelayUrl>, +} + +impl RadrootsRelayTargetSet { + pub fn new<I, S>( + relays: I, + policy: RadrootsRelayUrlPolicy, + ) -> Result<Self, RadrootsRelayTransportError> + where + I: IntoIterator<Item = S>, + S: AsRef<str>, + { + let relays = relays + .into_iter() + .map(|relay| RadrootsRelayUrl::parse(relay, policy)) + .collect::<Result<BTreeSet<_>, _>>()? + .into_iter() + .collect::<Vec<_>>(); + if relays.is_empty() { + return Err(RadrootsRelayTransportError::EmptyTargetSet); + } + Ok(Self { relays }) + } + + pub fn from_urls(relays: Vec<RadrootsRelayUrl>) -> Result<Self, RadrootsRelayTransportError> { + let relays = relays + .into_iter() + .collect::<BTreeSet<_>>() + .into_iter() + .collect::<Vec<_>>(); + if relays.is_empty() { + return Err(RadrootsRelayTransportError::EmptyTargetSet); + } + Ok(Self { relays }) + } + + pub fn relays(&self) -> &[RadrootsRelayUrl] { + &self.relays + } + + pub fn relay_strings(&self) -> Vec<String> { + self.relays + .iter() + .map(|relay| relay.as_str().to_owned()) + .collect() + } + + pub fn len(&self) -> usize { + self.relays.len() + } + + pub fn is_empty(&self) -> bool { + self.relays.is_empty() + } +} diff --git a/crates/relay_transport/tests/transport.rs b/crates/relay_transport/tests/transport.rs @@ -0,0 +1,389 @@ +use nostr::JsonUtil; +use radroots_event_store::RadrootsEventStore; +use radroots_events::draft::{RadrootsFrozenEventDraft, RadrootsSignedNostrEvent}; +use radroots_events::kinds::KIND_POST; +use radroots_nostr::prelude::{ + RadrootsNostrKeys, RadrootsNostrSecretKey, RadrootsNostrTimestamp, radroots_nostr_build_event, + radroots_nostr_sign_frozen_draft, +}; +use radroots_outbox::{ + RadrootsOutbox, RadrootsOutboxEventState, RadrootsOutboxOperationInput, + RadrootsOutboxOperationStatus, RadrootsOutboxRelayStatus, +}; +use radroots_relay_transport::{ + RadrootsMockRelayFetchAdapter, RadrootsMockRelayPublishAdapter, RadrootsOutboxPublishPolicy, + RadrootsRelayFetchItem, RadrootsRelayFetchRequest, RadrootsRelayOutcome, + RadrootsRelayOutcomeKind, RadrootsRelayTargetSet, RadrootsRelayUrl, RadrootsRelayUrlPolicy, + fetch_and_ingest_relay_events, publish_claimed_outbox_event, publish_signed_event, +}; + +const FIXTURE_ALICE_SECRET_KEY_HEX: &str = + "10c5304d6c9ae3a1a16f7860f1cc8f5e3a76225a2663b3a989a0d775919b7df5"; +const FIXTURE_ALICE_PUBLIC_KEY_HEX: &str = + "585591529da0bab31b3b1b1f986611cf5f435dca84f978c89ee8a40cca7103df"; +const RELAY_PRIMARY_WSS: &str = "wss://relay.example.com"; +const RELAY_SECONDARY_WSS: &str = "wss://relay-2.example.com"; +const RELAY_TERTIARY_WSS: &str = "wss://relay-3.example.com"; + +fn fixture_keys() -> RadrootsNostrKeys { + let secret_key = + RadrootsNostrSecretKey::from_hex(FIXTURE_ALICE_SECRET_KEY_HEX).expect("secret key"); + RadrootsNostrKeys::new(secret_key) +} + +fn signed_post(content: &str) -> RadrootsSignedNostrEvent { + let draft = RadrootsFrozenEventDraft::new( + "radroots.social.post.v1", + KIND_POST, + 1_700_000_000, + vec![vec!["t".to_owned(), "soil".to_owned()]], + content, + FIXTURE_ALICE_PUBLIC_KEY_HEX, + ) + .expect("draft"); + radroots_nostr_sign_frozen_draft(&fixture_keys(), &draft).expect("signed event") +} + +fn unsupported_raw_event() -> String { + let event = radroots_nostr_build_event(999, "unsupported", Vec::new()) + .expect("event builder") + .custom_created_at(RadrootsNostrTimestamp::from_secs(1_700_000_001)) + .sign_with_keys(&fixture_keys()) + .expect("signed unsupported event"); + event.as_json() +} + +#[test] +fn relay_url_validation_and_target_normalization() { + let relay = RadrootsRelayUrl::parse("wss://Relay.Example.com", RadrootsRelayUrlPolicy::Public) + .expect("relay"); + assert_eq!(relay.as_str(), RELAY_PRIMARY_WSS); + + assert!( + RadrootsRelayUrl::parse("ws://127.0.0.1:7777", RadrootsRelayUrlPolicy::Public).is_err() + ); + let local = RadrootsRelayUrl::parse("ws://127.0.0.1:7777", RadrootsRelayUrlPolicy::LocalDev) + .expect("local relay"); + assert_eq!(local.as_str(), "ws://127.0.0.1:7777"); + + assert!( + RadrootsRelayUrl::parse("https://relay.example.com", RadrootsRelayUrlPolicy::Public) + .is_err() + ); + assert!( + RadrootsRelayUrl::parse( + "wss://user@relay.example.com", + RadrootsRelayUrlPolicy::Public + ) + .is_err() + ); + assert!( + RadrootsRelayUrl::parse( + "wss://relay.example.com:bad", + RadrootsRelayUrlPolicy::Public + ) + .is_err() + ); + assert!(RadrootsRelayUrl::parse("wss://", RadrootsRelayUrlPolicy::Public).is_err()); + + let targets = RadrootsRelayTargetSet::new( + vec![ + RELAY_TERTIARY_WSS, + RELAY_PRIMARY_WSS, + RELAY_PRIMARY_WSS, + RELAY_SECONDARY_WSS, + ], + RadrootsRelayUrlPolicy::Public, + ) + .expect("targets"); + assert_eq!( + targets.relay_strings(), + vec![ + RELAY_SECONDARY_WSS.to_owned(), + RELAY_TERTIARY_WSS.to_owned(), + RELAY_PRIMARY_WSS.to_owned() + ] + ); +} + +#[test] +fn outcome_prefix_classification_covers_required_kinds() { + let cases = [ + ("blocked: policy", RadrootsRelayOutcomeKind::Blocked), + ( + "rate-limited: slow down", + RadrootsRelayOutcomeKind::RateLimited, + ), + ("invalid: bad event", RadrootsRelayOutcomeKind::Invalid), + ("pow: difficulty 24", RadrootsRelayOutcomeKind::PowRequired), + ( + "restricted: group write denied", + RadrootsRelayOutcomeKind::Restricted, + ), + ( + "auth-required: challenge", + RadrootsRelayOutcomeKind::AuthRequired, + ), + ( + "duplicate: already have it", + RadrootsRelayOutcomeKind::DuplicateAccepted, + ), + ("error: relay failed", RadrootsRelayOutcomeKind::Error), + ("timeout: no OK", RadrootsRelayOutcomeKind::Timeout), + ("strange relay text", RadrootsRelayOutcomeKind::Unknown), + ]; + + for (message, kind) in cases { + let outcome = RadrootsRelayOutcome::classify(message); + assert_eq!(outcome.kind, kind); + } + + assert!(RadrootsRelayOutcome::classify("duplicate: already have it").counts_toward_quorum()); + assert!(RadrootsRelayOutcome::classify("auth-required: challenge").is_retryable()); + assert!(RadrootsRelayOutcome::classify("restricted: denied").is_terminal_failure()); +} + +#[tokio::test] +async fn mock_publish_preserves_exact_raw_json_and_counts_outcomes() { + let signed = signed_post("hello"); + let targets = RadrootsRelayTargetSet::new( + vec![RELAY_PRIMARY_WSS, RELAY_SECONDARY_WSS, RELAY_TERTIARY_WSS], + RadrootsRelayUrlPolicy::Public, + ) + .expect("targets"); + let adapter = RadrootsMockRelayPublishAdapter::new() + .with_outcome( + RELAY_SECONDARY_WSS, + RadrootsRelayOutcome::classify("duplicate: already have it"), + ) + .with_outcome( + RELAY_TERTIARY_WSS, + RadrootsRelayOutcome::classify("auth-required: challenge"), + ); + + let receipt = publish_signed_event( + &adapter, + radroots_relay_transport::RadrootsRelayPublishRequest::new(signed.clone(), targets, 1_000) + .with_accepted_quorum(2), + ) + .await + .expect("publish"); + + assert_eq!(adapter.captured_raw_events(), vec![signed.raw_json]); + assert_eq!(receipt.attempted_count, 3); + assert_eq!(receipt.accepted_count, 2); + assert_eq!(receipt.retryable_count, 1); + assert!(receipt.quorum_met); + serde_json::to_string(&receipt).expect("receipt json"); +} + +#[tokio::test] +async fn fetch_ingests_events_and_records_relay_observations() { + let signed = signed_post("hello"); + let store = RadrootsEventStore::open_memory().await.expect("store"); + let adapter = RadrootsMockRelayFetchAdapter::new(vec![ + RadrootsRelayFetchItem::Event { + relay_url: RELAY_PRIMARY_WSS.to_owned(), + raw_json: signed.raw_json.clone(), + observed_at_ms: 1_000, + }, + RadrootsRelayFetchItem::Event { + relay_url: RELAY_PRIMARY_WSS.to_owned(), + raw_json: signed.raw_json.clone(), + observed_at_ms: 1_001, + }, + RadrootsRelayFetchItem::Event { + relay_url: RELAY_SECONDARY_WSS.to_owned(), + raw_json: unsupported_raw_event(), + observed_at_ms: 1_002, + }, + RadrootsRelayFetchItem::Event { + relay_url: RELAY_TERTIARY_WSS.to_owned(), + raw_json: "{not json".to_owned(), + observed_at_ms: 1_003, + }, + RadrootsRelayFetchItem::Eose { + relay_url: RELAY_PRIMARY_WSS.to_owned(), + }, + RadrootsRelayFetchItem::Closed { + relay_url: RELAY_SECONDARY_WSS.to_owned(), + message: "closed: done".to_owned(), + }, + RadrootsRelayFetchItem::Notice { + relay_url: RELAY_TERTIARY_WSS.to_owned(), + message: "notice: test".to_owned(), + }, + ]); + + let receipt = fetch_and_ingest_relay_events( + &adapter, + &store, + RadrootsRelayFetchRequest::fetch(1_000, 10), + ) + .await + .expect("fetch ingest"); + + assert_eq!(receipt.inserted_count, 2); + assert_eq!(receipt.duplicate_count, 1); + assert_eq!(receipt.unsupported_count, 1); + assert_eq!(receipt.malformed_count, 1); + assert_eq!(receipt.eose_count, 1); + assert_eq!(receipt.closed_count, 1); + assert_eq!(receipt.notice_count, 1); + + let observations = store + .observations_for_event(signed.id.as_str()) + .await + .expect("observations"); + assert_eq!(observations.len(), 1); + assert_eq!(observations[0].relay_url, RELAY_PRIMARY_WSS); + assert_eq!(observations[0].observation_count, 2); +} + +#[tokio::test] +async fn outbox_publish_persists_partial_success_and_skips_accepted_retry() { + let signed = signed_post("hello"); + let outbox = RadrootsOutbox::open_memory().await.expect("outbox"); + let store = RadrootsEventStore::open_memory().await.expect("store"); + let draft = RadrootsFrozenEventDraft::new( + "radroots.social.post.v1", + KIND_POST, + signed.created_at, + signed.tags.clone(), + signed.content.clone(), + signed.pubkey.as_str(), + ) + .expect("draft"); + let receipt = outbox + .enqueue_operation(RadrootsOutboxOperationInput::new( + "publish_post", + draft, + vec![ + RELAY_PRIMARY_WSS.to_owned(), + RELAY_SECONDARY_WSS.to_owned(), + RELAY_TERTIARY_WSS.to_owned(), + ], + 1_000, + )) + .await + .expect("enqueue"); + let claimed = outbox + .claim_next_ready_event("signer", "sign-a", 2_000, 1_000) + .await + .expect("claim") + .expect("claim"); + let signed = outbox + .sign_claimed_event(&claimed, &fixture_keys(), 1_100) + .await + .expect("sign"); + outbox.recover_expired_claims(2_001).await.expect("recover"); + let publish_claim = outbox + .claim_next_ready_event("publisher", "publish-a", 3_000, 2_100) + .await + .expect("claim") + .expect("publish claim"); + assert_eq!(publish_claim.state, RadrootsOutboxEventState::Publishing); + + let adapter = RadrootsMockRelayPublishAdapter::new() + .with_outcome(RELAY_PRIMARY_WSS, RadrootsRelayOutcome::accepted()) + .with_outcome( + RELAY_SECONDARY_WSS, + RadrootsRelayOutcome::timeout("timeout: no OK"), + ) + .with_outcome( + RELAY_TERTIARY_WSS, + RadrootsRelayOutcome::duplicate_accepted("duplicate: already have it"), + ); + let first = publish_claimed_outbox_event( + &outbox, + &store, + &adapter, + &publish_claim, + RadrootsOutboxPublishPolicy::new(2_500), + 2_200, + ) + .await + .expect("publish"); + + assert_eq!(first.publish.attempted_count, 3); + assert_eq!(first.publish.accepted_count, 2); + assert!(!first.publish.quorum_met); + let event = outbox + .get_event(receipt.outbox_event_id) + .await + .expect("event") + .expect("event"); + assert_eq!(event.state, RadrootsOutboxEventState::PublishRetryable); + assert_eq!(event.accepted_quorum, 3); + + let statuses = outbox + .relay_statuses(receipt.outbox_event_id) + .await + .expect("statuses"); + assert_eq!( + statuses + .iter() + .find(|status| status.relay_url == RELAY_PRIMARY_WSS) + .expect("primary") + .status, + RadrootsOutboxRelayStatus::Accepted + ); + assert_eq!( + statuses + .iter() + .find(|status| status.relay_url == RELAY_SECONDARY_WSS) + .expect("secondary") + .status, + RadrootsOutboxRelayStatus::FailedRetryable + ); + assert_eq!( + statuses + .iter() + .find(|status| status.relay_url == RELAY_TERTIARY_WSS) + .expect("tertiary") + .status, + RadrootsOutboxRelayStatus::Accepted + ); + + let retry_claim = outbox + .claim_next_ready_event("publisher", "publish-b", 4_000, 2_500) + .await + .expect("claim") + .expect("retry claim"); + let retry_adapter = RadrootsMockRelayPublishAdapter::new() + .with_outcome(RELAY_SECONDARY_WSS, RadrootsRelayOutcome::accepted()); + let second = publish_claimed_outbox_event( + &outbox, + &store, + &retry_adapter, + &retry_claim, + RadrootsOutboxPublishPolicy::new(3_000), + 2_600, + ) + .await + .expect("retry publish"); + + assert_eq!(second.local_ingest.event_id, signed.id); + assert_eq!(second.publish.attempted_count, 1); + assert_eq!(retry_adapter.captured_raw_events().len(), 1); + + let event = outbox + .get_event(receipt.outbox_event_id) + .await + .expect("event") + .expect("event"); + assert_eq!(event.state, RadrootsOutboxEventState::Published); + assert_eq!(event.accepted_quorum, 3); + let operation = outbox + .get_operation(receipt.operation_id) + .await + .expect("operation") + .expect("operation"); + assert_eq!(operation.status, RadrootsOutboxOperationStatus::Complete); + + let observations = store + .observations_for_event(signed.id.as_str()) + .await + .expect("observations"); + assert_eq!(observations.len(), 3); +} diff --git a/spec/README.md b/spec/README.md @@ -98,6 +98,7 @@ The public Rust story is tiered explicitly. - `radroots_types` - `radroots_event_store` - `radroots_outbox` + - `radroots_relay_transport` - `radroots_events_codec_wasm` - `radroots_net` - `radroots_nostr_runtime` diff --git a/spec/manifest.toml b/spec/manifest.toml @@ -50,6 +50,7 @@ deferred_publication = [ "radroots_types", "radroots_event_store", "radroots_outbox", + "radroots_relay_transport", "radroots_events_codec_wasm", "radroots_net", "radroots_nostr_runtime",