commit cade95a66cf37159071c9e59b836081bc3881b4b
parent a5788d4103ac4fd3497b6e2bf75b68f60d7c710f
Author: triesap <tyson@radroots.org>
Date: Mon, 13 Apr 2026 01:13:09 +0000
sdk: formalize relay-direct publish adapter
Diffstat:
6 files changed, 499 insertions(+), 1 deletion(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -2644,6 +2644,8 @@ dependencies = [
name = "radroots_sdk"
version = "0.1.0-alpha.2"
dependencies = [
+ "futures",
+ "nostr",
"radroots_core",
"radroots_events",
"radroots_events_codec",
@@ -2658,6 +2660,7 @@ dependencies = [
"radroots_trade",
"tempfile",
"tokio",
+ "tokio-tungstenite",
]
[[package]]
diff --git a/crates/sdk/Cargo.toml b/crates/sdk/Cargo.toml
@@ -51,6 +51,8 @@ radroots_nostr_connect = { workspace = true, optional = true }
radroots_nostr_signer = { workspace = true, optional = true, default-features = false }
[dev-dependencies]
+futures = { workspace = true }
+nostr = { workspace = true }
radroots_core = { workspace = true, default-features = false, features = ["std"] }
radroots_replica_db = { workspace = true, default-features = false, features = ["native"] }
radroots_replica_db_schema = { workspace = true }
@@ -58,3 +60,4 @@ radroots_replica_sync = { workspace = true, features = ["std"] }
radroots_sql_core = { workspace = true, features = ["native"] }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
+tokio-tungstenite = "0.26.2"
diff --git a/crates/sdk/src/adapters/relay.rs b/crates/sdk/src/adapters/relay.rs
@@ -1,3 +1,5 @@
+use core::time::Duration;
+
use crate::WireEventParts;
use crate::adapters::signing::{SignedNostrEvent, event_builder_from_parts};
use crate::identity::RadrootsIdentity;
@@ -26,6 +28,29 @@ pub fn client_from_identity(identity: &RadrootsIdentity) -> RelayClient {
RelayClient::from_identity(identity)
}
+pub async fn configure_write_relays(
+ client: &RelayClient,
+ relay_urls: &[String],
+ connect_timeout: Duration,
+) -> Result<(), RelayError> {
+ for relay_url in relay_urls {
+ client.add_write_relay(relay_url).await?;
+ }
+ client.connect().await;
+ client.wait_for_connection(connect_timeout).await;
+ Ok(())
+}
+
+pub async fn connected_client_from_identity(
+ identity: &RadrootsIdentity,
+ relay_urls: &[String],
+ connect_timeout: Duration,
+) -> Result<RelayClient, RelayError> {
+ let client = client_from_identity(identity);
+ configure_write_relays(&client, relay_urls, connect_timeout).await?;
+ Ok(client)
+}
+
pub async fn publish_parts(
client: &RelayClient,
parts: WireEventParts,
diff --git a/crates/sdk/src/client.rs b/crates/sdk/src/client.rs
@@ -3,12 +3,112 @@ use alloc::{string::String, vec::Vec};
#[cfg(feature = "std")]
use std::{string::String, vec::Vec};
+#[cfg(all(feature = "identity-models", feature = "relay-client", feature = "signing"))]
+use crate::adapters::relay;
+#[cfg(all(feature = "identity-models", feature = "relay-client", feature = "signing"))]
+use crate::identity::RadrootsIdentity;
use crate::config::{RadrootsSdkConfig, SdkConfigError, SdkTransportMode};
use crate::{
NostrTags, RadrootsNostrEvent, RadrootsNostrEventPtr, RadrootsProfile, RadrootsProfileType,
RadrootsTradeEnvelope, TradeListingValidateResult, WireEventParts, farm, listing, profile,
trade,
};
+#[cfg(all(feature = "identity-models", feature = "relay-client", feature = "signing"))]
+use core::time::Duration;
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct SdkPublishReceipt {
+ pub transport: SdkTransportMode,
+ pub event_kind: u32,
+ pub event_id: String,
+ pub transport_receipt: SdkTransportReceipt,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum SdkTransportReceipt {
+ RelayDirect(SdkRelayPublishReceipt),
+ Radrootsd(SdkRadrootsdPublishReceipt),
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Default)]
+pub struct SdkRelayPublishReceipt {
+ pub acknowledged_relays: Vec<String>,
+ pub failed_relays: Vec<SdkRelayFailure>,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct SdkRelayFailure {
+ pub relay_url: String,
+ pub error: String,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Default)]
+pub struct SdkRadrootsdPublishReceipt {
+ pub accepted: bool,
+ pub job_id: Option<String>,
+ pub details: Option<String>,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum SdkPublishError {
+ Config(SdkConfigError),
+ Encode(String),
+ UnsupportedTransport {
+ transport: SdkTransportMode,
+ operation: &'static str,
+ },
+ Relay(String),
+ RelayNotAcknowledged {
+ transport: SdkTransportMode,
+ failed_relays: Vec<SdkRelayFailure>,
+ },
+}
+
+impl From<SdkConfigError> for SdkPublishError {
+ fn from(value: SdkConfigError) -> Self {
+ Self::Config(value)
+ }
+}
+
+impl core::fmt::Display for SdkPublishError {
+ fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
+ match self {
+ Self::Config(err) => write!(f, "{err}"),
+ Self::Encode(message) => write!(f, "{message}"),
+ Self::UnsupportedTransport {
+ transport,
+ operation,
+ } => {
+ write!(
+ f,
+ "{operation} requires a different sdk transport mode than {transport:?}"
+ )
+ }
+ Self::Relay(message) => write!(f, "{message}"),
+ Self::RelayNotAcknowledged {
+ transport,
+ failed_relays,
+ } => {
+ if failed_relays.is_empty() {
+ write!(f, "{transport:?} publish was not acknowledged by any relay")
+ } else {
+ let summary = failed_relays
+ .iter()
+ .map(|failure| format!("{}: {}", failure.relay_url, failure.error))
+ .collect::<Vec<_>>()
+ .join(", ");
+ write!(
+ f,
+ "{transport:?} publish was not acknowledged by any relay: {summary}"
+ )
+ }
+ }
+ }
+ }
+}
+
+#[cfg(feature = "std")]
+impl std::error::Error for SdkPublishError {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RadrootsSdkClient {
@@ -53,6 +153,35 @@ impl RadrootsSdkClient {
pub fn trade(&self) -> TradeClient<'_> {
TradeClient { client: self }
}
+
+ #[cfg(all(feature = "identity-models", feature = "relay-client", feature = "signing"))]
+ async fn publish_parts_via_relay_with_identity(
+ &self,
+ identity: &RadrootsIdentity,
+ parts: WireEventParts,
+ operation: &'static str,
+ ) -> Result<SdkPublishReceipt, SdkPublishError> {
+ if self.transport() != SdkTransportMode::RelayDirect {
+ return Err(SdkPublishError::UnsupportedTransport {
+ transport: self.transport(),
+ operation,
+ });
+ }
+
+ let event_kind = u32::from(parts.kind);
+ let relay_urls = self.resolved_relay_urls()?;
+ let client = relay::connected_client_from_identity(
+ identity,
+ &relay_urls,
+ Duration::from_millis(self.config.network.timeout_ms),
+ )
+ .await
+ .map_err(|err| SdkPublishError::Relay(err.to_string()))?;
+ let output = relay::publish_parts(&client, parts)
+ .await
+ .map_err(|err| SdkPublishError::Relay(err.to_string()))?;
+ sdk_publish_receipt_from_relay_output(event_kind, output)
+ }
}
#[derive(Debug, Clone, Copy)]
@@ -138,6 +267,22 @@ impl<'a> ListingClient<'a> {
) -> Result<listing::RadrootsListing, listing::RadrootsTradeListingParseError> {
listing::parse_event(event)
}
+
+ #[cfg(all(feature = "identity-models", feature = "relay-client", feature = "signing"))]
+ pub async fn publish_with_identity(
+ &self,
+ identity: &RadrootsIdentity,
+ listing_value: &listing::RadrootsListing,
+ ) -> Result<SdkPublishReceipt, SdkPublishError> {
+ let parts = listing::build_draft(listing_value)
+ .map_err(|err| SdkPublishError::Encode(err.to_string()))?;
+ self.client.publish_parts_via_relay_with_identity(
+ identity,
+ parts,
+ "listing.publish_with_identity",
+ )
+ .await
+ }
}
#[derive(Debug, Clone, Copy)]
@@ -203,3 +348,108 @@ impl<'a> TradeClient<'a> {
trade::validate_listing_event(event)
}
}
+
+#[cfg(all(feature = "identity-models", feature = "relay-client", feature = "signing"))]
+fn sdk_publish_receipt_from_relay_output(
+ event_kind: u32,
+ output: relay::RelayOutput<relay::RelayEventId>,
+) -> Result<SdkPublishReceipt, SdkPublishError> {
+ let mut acknowledged_relays = output
+ .success
+ .into_iter()
+ .map(|relay| relay.to_string())
+ .collect::<Vec<_>>();
+ acknowledged_relays.sort();
+
+ let mut failed_relays = output
+ .failed
+ .into_iter()
+ .map(|(relay_url, error)| SdkRelayFailure {
+ relay_url: relay_url.to_string(),
+ error,
+ })
+ .collect::<Vec<_>>();
+ failed_relays.sort_by(|left, right| left.relay_url.cmp(&right.relay_url));
+
+ if acknowledged_relays.is_empty() {
+ return Err(SdkPublishError::RelayNotAcknowledged {
+ transport: SdkTransportMode::RelayDirect,
+ failed_relays,
+ });
+ }
+
+ Ok(SdkPublishReceipt {
+ transport: SdkTransportMode::RelayDirect,
+ event_kind,
+ event_id: output.val.to_string(),
+ transport_receipt: SdkTransportReceipt::RelayDirect(SdkRelayPublishReceipt {
+ acknowledged_relays,
+ failed_relays,
+ }),
+ })
+}
+
+#[cfg(all(test, feature = "identity-models", feature = "relay-client", feature = "signing"))]
+mod tests {
+ use super::{
+ SdkPublishError, SdkRelayFailure, SdkTransportMode, sdk_publish_receipt_from_relay_output,
+ };
+ use crate::adapters::relay::RelayOutput;
+ use radroots_nostr::prelude::RadrootsNostrEventId;
+ use std::collections::{HashMap, HashSet};
+
+ #[test]
+ fn relay_output_maps_to_normalized_publish_receipt() {
+ let output = RelayOutput {
+ val: RadrootsNostrEventId::parse(
+ "5f3cf27d85c9571a2dca28269f6547f625364a7e06e5e853ee1bc74d2c4aa3d4",
+ )
+ .expect("event id"),
+ success: HashSet::from([
+ nostr::RelayUrl::parse("ws://127.0.0.1:8080").expect("relay a"),
+ nostr::RelayUrl::parse("ws://127.0.0.1:8081").expect("relay b"),
+ ]),
+ failed: HashMap::from([(
+ nostr::RelayUrl::parse("ws://127.0.0.1:8082").expect("relay c"),
+ "timeout".to_owned(),
+ )]),
+ };
+
+ let receipt = sdk_publish_receipt_from_relay_output(30402, output).expect("receipt");
+
+ assert_eq!(receipt.transport, SdkTransportMode::RelayDirect);
+ assert_eq!(receipt.event_kind, 30402);
+ assert_eq!(
+ receipt.event_id,
+ "5f3cf27d85c9571a2dca28269f6547f625364a7e06e5e853ee1bc74d2c4aa3d4"
+ );
+ }
+
+ #[test]
+ fn relay_output_without_acknowledgement_is_rejected() {
+ let output = RelayOutput {
+ val: RadrootsNostrEventId::parse(
+ "5f3cf27d85c9571a2dca28269f6547f625364a7e06e5e853ee1bc74d2c4aa3d4",
+ )
+ .expect("event id"),
+ success: HashSet::new(),
+ failed: HashMap::from([(
+ nostr::RelayUrl::parse("ws://127.0.0.1:8082").expect("relay c"),
+ "blocked".to_owned(),
+ )]),
+ };
+
+ let error = sdk_publish_receipt_from_relay_output(30402, output).expect_err("error");
+
+ assert_eq!(
+ error,
+ SdkPublishError::RelayNotAcknowledged {
+ transport: SdkTransportMode::RelayDirect,
+ failed_relays: vec![SdkRelayFailure {
+ relay_url: "ws://127.0.0.1:8082".to_owned(),
+ error: "blocked".to_owned(),
+ }],
+ }
+ );
+ }
+}
diff --git a/crates/sdk/src/lib.rs b/crates/sdk/src/lib.rs
@@ -32,7 +32,9 @@ pub use crate::config::{
SdkTransportMode, SignerConfig,
};
pub use crate::client::{
- FarmClient, ListingClient, ProfileClient, RadrootsSdkClient, TradeClient,
+ FarmClient, ListingClient, ProfileClient, RadrootsSdkClient, SdkPublishError,
+ SdkPublishReceipt, SdkRadrootsdPublishReceipt, SdkRelayFailure, SdkRelayPublishReceipt,
+ SdkTransportReceipt, TradeClient,
};
pub use radroots_events::{
RadrootsNostrEvent, RadrootsNostrEventPtr, RadrootsNostrEventRef,
diff --git a/crates/sdk/tests/relay_direct.rs b/crates/sdk/tests/relay_direct.rs
@@ -0,0 +1,215 @@
+#![cfg(all(feature = "identity-models", feature = "relay-client", feature = "signing"))]
+
+use futures::{SinkExt, StreamExt};
+use nostr::{ClientMessage, JsonUtil, RelayMessage};
+use radroots_core::{
+ RadrootsCoreCurrency, RadrootsCoreDecimal, RadrootsCoreMoney, RadrootsCoreQuantity,
+ RadrootsCoreQuantityPrice, RadrootsCoreUnit,
+};
+use radroots_sdk::{
+ RadrootsSdkClient, RadrootsSdkConfig, RadrootsdAuth, RadrootsdConfig, RelayConfig,
+ SdkEnvironment, SdkPublishError, SdkTransportMode, SdkTransportReceipt,
+};
+use radroots_sdk::identity::RadrootsIdentity;
+use radroots_sdk::listing::{
+ RadrootsListing, RadrootsListingAvailability, RadrootsListingBin,
+ RadrootsListingDeliveryMethod, RadrootsListingFarmRef, RadrootsListingLocation,
+ RadrootsListingProduct, RadrootsListingStatus,
+};
+use tokio::net::TcpListener;
+use tokio::sync::oneshot;
+use tokio_tungstenite::tungstenite::Message;
+
+type TestResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>;
+
+struct AckRelay {
+ url: String,
+ shutdown_tx: Option<oneshot::Sender<()>>,
+}
+
+impl AckRelay {
+ async fn spawn() -> TestResult<Self> {
+ let listener = TcpListener::bind("127.0.0.1:0").await?;
+ let addr = listener.local_addr()?;
+ let url = format!("ws://{addr}");
+ let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
+
+ tokio::spawn(async move {
+ loop {
+ tokio::select! {
+ _ = &mut shutdown_rx => break,
+ accept = listener.accept() => {
+ let Ok((stream, _)) = accept else {
+ break;
+ };
+ tokio::spawn(async move {
+ let Ok(websocket) = tokio_tungstenite::accept_async(stream).await else {
+ return;
+ };
+ let (mut writer, mut reader) = websocket.split();
+ while let Some(message) = reader.next().await {
+ let Ok(message) = message else {
+ break;
+ };
+ let Message::Text(text) = message else {
+ continue;
+ };
+ let Ok(client_message) = ClientMessage::from_json(text.as_str()) else {
+ continue;
+ };
+ if let ClientMessage::Event(event) = client_message {
+ let relay_message =
+ RelayMessage::ok(event.id, true, "").as_json();
+ if writer
+ .send(Message::Text(relay_message.into()))
+ .await
+ .is_err()
+ {
+ break;
+ }
+ }
+ }
+ });
+ }
+ }
+ }
+ });
+
+ Ok(Self {
+ url,
+ shutdown_tx: Some(shutdown_tx),
+ })
+ }
+
+ fn url(&self) -> &str {
+ self.url.as_str()
+ }
+}
+
+impl Drop for AckRelay {
+ fn drop(&mut self) {
+ if let Some(shutdown_tx) = self.shutdown_tx.take() {
+ let _ = shutdown_tx.send(());
+ }
+ }
+}
+
+fn sample_listing() -> RadrootsListing {
+ RadrootsListing {
+ d_tag: "AAAAAAAAAAAAAAAAAAAAAg".into(),
+ farm: RadrootsListingFarmRef {
+ pubkey: "seller".into(),
+ d_tag: "AAAAAAAAAAAAAAAAAAAAAA".into(),
+ },
+ product: RadrootsListingProduct {
+ key: "coffee".into(),
+ title: "Coffee".into(),
+ category: "coffee".into(),
+ summary: Some("Single origin coffee".into()),
+ process: None,
+ lot: None,
+ location: None,
+ profile: None,
+ year: None,
+ },
+ primary_bin_id: "bin-1".into(),
+ bins: vec![RadrootsListingBin {
+ bin_id: "bin-1".into(),
+ quantity: RadrootsCoreQuantity::new(
+ RadrootsCoreDecimal::from(1000u32),
+ RadrootsCoreUnit::MassG,
+ ),
+ price_per_canonical_unit: RadrootsCoreQuantityPrice {
+ amount: RadrootsCoreMoney::new(
+ RadrootsCoreDecimal::from(20u32),
+ RadrootsCoreCurrency::USD,
+ ),
+ quantity: RadrootsCoreQuantity::new(
+ RadrootsCoreDecimal::from(1u32),
+ RadrootsCoreUnit::MassG,
+ ),
+ },
+ display_amount: None,
+ display_unit: None,
+ display_label: None,
+ display_price: None,
+ display_price_unit: None,
+ }],
+ resource_area: None,
+ plot: None,
+ discounts: None,
+ inventory_available: Some(RadrootsCoreDecimal::from(5u32)),
+ availability: Some(RadrootsListingAvailability::Status {
+ status: RadrootsListingStatus::Active,
+ }),
+ delivery_method: Some(RadrootsListingDeliveryMethod::Pickup),
+ location: Some(RadrootsListingLocation {
+ primary: "North Farm".into(),
+ city: None,
+ region: None,
+ country: None,
+ lat: None,
+ lng: None,
+ geohash: None,
+ }),
+ images: None,
+ }
+}
+
+#[tokio::test]
+async fn relay_direct_listing_publish_returns_normalized_receipt() -> TestResult<()> {
+ let relay = AckRelay::spawn().await?;
+ let identity = RadrootsIdentity::generate();
+ let mut config = RadrootsSdkConfig::for_environment(SdkEnvironment::Custom);
+ config.transport = SdkTransportMode::RelayDirect;
+ config.relay = RelayConfig {
+ urls: vec![relay.url().to_owned()],
+ };
+ config.radrootsd = RadrootsdConfig {
+ endpoint: Some("https://rpc.radroots.org/jsonrpc".into()),
+ auth: RadrootsdAuth::None,
+ };
+ let client = RadrootsSdkClient::from_config(config)?;
+
+ let receipt = client
+ .listing()
+ .publish_with_identity(&identity, &sample_listing())
+ .await?;
+
+ assert_eq!(receipt.transport, SdkTransportMode::RelayDirect);
+ assert_eq!(receipt.event_kind, 30402);
+ assert!(!receipt.event_id.is_empty());
+ match receipt.transport_receipt {
+ SdkTransportReceipt::RelayDirect(relay_receipt) => {
+ assert_eq!(relay_receipt.acknowledged_relays, vec![relay.url().to_owned()]);
+ assert!(relay_receipt.failed_relays.is_empty());
+ }
+ SdkTransportReceipt::Radrootsd(_) => panic!("unexpected radrootsd receipt"),
+ }
+
+ Ok(())
+}
+
+#[tokio::test]
+async fn relay_direct_publish_rejects_radrootsd_transport_mode() -> TestResult<()> {
+ let identity = RadrootsIdentity::generate();
+ let mut config = RadrootsSdkConfig::production();
+ config.transport = SdkTransportMode::Radrootsd;
+ let client = RadrootsSdkClient::from_config(config)?;
+
+ let error = client
+ .listing()
+ .publish_with_identity(&identity, &sample_listing())
+ .await
+ .expect_err("unsupported transport");
+
+ assert!(matches!(
+ error,
+ SdkPublishError::UnsupportedTransport {
+ transport: SdkTransportMode::Radrootsd,
+ operation: "listing.publish_with_identity",
+ }
+ ));
+
+ Ok(())
+}