radrootsd

JSON-RPC bridge for Radroots event publishing
git clone https://radroots.dev/git/radrootsd.git
Log | Files | Refs | README | LICENSE

commit 8331eb5bbfbd21e4de06f2673235fd1559ac7f07
parent 3a8a2ef73f2b99bbe9695d473f408194927206c2
Author: triesap <tyson@radroots.org>
Date:   Fri, 27 Mar 2026 19:45:35 +0000

bridge: add listing publish workflow and job status

Diffstat:
Mconfig.toml | 5+++++
Msrc/app/config.rs | 99++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
Msrc/app/runtime.rs | 60++++++++++++++++++++++++++++++++++++++++++++++++++----------
Asrc/core/bridge/mod.rs | 2++
Asrc/core/bridge/publish.rs | 570+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/core/bridge/store.rs | 301+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Msrc/core/mod.rs | 1+
Msrc/core/nip46/session.rs | 51+++++++++++++++++++++------------------------------
Msrc/core/state.rs | 29++++++++++++++++++++++-------
Msrc/main.rs | 4+++-
Asrc/transport/jsonrpc/methods/bridge/job_status.rs | 30++++++++++++++++++++++++++++++
Asrc/transport/jsonrpc/methods/bridge/listing_publish.rs | 206+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/transport/jsonrpc/methods/bridge/mod.rs | 16++++++++++++++++
Asrc/transport/jsonrpc/methods/bridge/status.rs | 48++++++++++++++++++++++++++++++++++++++++++++++++
Msrc/transport/jsonrpc/methods/mod.rs | 2++
Msrc/transport/jsonrpc/methods/nip46/connect.rs | 65++++++++++++++++++++++++-----------------------------------------
Msrc/transport/jsonrpc/methods/nip46/get_public_key.rs | 9++++++---
Msrc/transport/jsonrpc/methods/nip46/mod.rs | 2+-
Msrc/transport/jsonrpc/methods/nip46/nip04.rs | 14++++++--------
Msrc/transport/jsonrpc/methods/nip46/nip44.rs | 14++++++--------
Msrc/transport/jsonrpc/methods/nip46/session_close.rs | 4+---
Msrc/transport/jsonrpc/methods/nip46/session_require_auth.rs | 5++++-
Msrc/transport/jsonrpc/methods/nip46/sign_event.rs | 5+----
Msrc/transport/jsonrpc/nip46/client.rs | 37++++++++++++++++++-------------------
Msrc/transport/jsonrpc/nip46/connection.rs | 10++++------
Msrc/transport/jsonrpc/nip46/session.rs | 17++++-------------
Msrc/transport/nostr/listener.rs | 88+++++++++++++++++++++++++++++++++++++++++--------------------------------------
27 files changed, 1496 insertions(+), 198 deletions(-)

diff --git a/config.toml b/config.toml @@ -18,6 +18,11 @@ relays = [ [config.rpc] addr = "127.0.0.1:7070" +[config.bridge] +enabled = true +delivery_policy = "any" +publish_max_attempts = 2 + [config.nip46] session_ttl_secs = 900 perms = [] diff --git a/src/app/config.rs b/src/app/config.rs @@ -34,6 +34,34 @@ fn default_nip46_perms() -> Vec<String> { Vec::new() } +fn default_bridge_enabled() -> bool { + false +} + +fn default_bridge_connect_timeout_secs() -> u64 { + 10 +} + +fn default_bridge_delivery_policy() -> BridgeDeliveryPolicy { + BridgeDeliveryPolicy::Any +} + +fn default_bridge_publish_max_attempts() -> usize { + 1 +} + +fn default_bridge_publish_initial_backoff_millis() -> u64 { + 250 +} + +fn default_bridge_publish_max_backoff_millis() -> u64 { + 2_000 +} + +fn default_bridge_job_status_retention() -> usize { + 256 +} + #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Nip46Config { #[serde(default = "default_nip46_session_ttl_secs")] @@ -54,6 +82,59 @@ impl Default for Nip46Config { } } +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum BridgeDeliveryPolicy { + Any, + Quorum, + All, +} + +impl BridgeDeliveryPolicy { + pub fn as_str(self) -> &'static str { + match self { + Self::Any => "any", + Self::Quorum => "quorum", + Self::All => "all", + } + } +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct BridgeConfig { + #[serde(default = "default_bridge_enabled")] + pub enabled: bool, + #[serde(default = "default_bridge_connect_timeout_secs")] + pub connect_timeout_secs: u64, + #[serde(default = "default_bridge_delivery_policy")] + pub delivery_policy: BridgeDeliveryPolicy, + #[serde(default)] + pub delivery_quorum: Option<usize>, + #[serde(default = "default_bridge_publish_max_attempts")] + pub publish_max_attempts: usize, + #[serde(default = "default_bridge_publish_initial_backoff_millis")] + pub publish_initial_backoff_millis: u64, + #[serde(default = "default_bridge_publish_max_backoff_millis")] + pub publish_max_backoff_millis: u64, + #[serde(default = "default_bridge_job_status_retention")] + pub job_status_retention: usize, +} + +impl Default for BridgeConfig { + fn default() -> Self { + Self { + enabled: default_bridge_enabled(), + connect_timeout_secs: default_bridge_connect_timeout_secs(), + delivery_policy: default_bridge_delivery_policy(), + delivery_quorum: None, + publish_max_attempts: default_bridge_publish_max_attempts(), + publish_initial_backoff_millis: default_bridge_publish_initial_backoff_millis(), + publish_max_backoff_millis: default_bridge_publish_max_backoff_millis(), + job_status_retention: default_bridge_job_status_retention(), + } + } +} + #[derive(Debug, Serialize, Deserialize, Clone)] pub struct RpcConfig { #[serde(default = "default_rpc_addr")] @@ -96,6 +177,8 @@ pub struct Configuration { pub rpc_addr: Option<String>, #[serde(default)] pub nip46: Nip46Config, + #[serde(default)] + pub bridge: BridgeConfig, } impl Configuration { @@ -112,7 +195,7 @@ pub struct Settings { #[cfg(test)] mod tests { - use super::{Configuration, Nip46Config, RpcConfig}; + use super::{BridgeConfig, BridgeDeliveryPolicy, Configuration, Nip46Config, RpcConfig}; use radroots_runtime::RadrootsNostrServiceConfig; fn service_config() -> RadrootsNostrServiceConfig { @@ -145,6 +228,19 @@ mod tests { } #[test] + fn bridge_defaults_are_expected() { + let cfg = BridgeConfig::default(); + assert!(!cfg.enabled); + assert_eq!(cfg.connect_timeout_secs, 10); + assert_eq!(cfg.delivery_policy, BridgeDeliveryPolicy::Any); + assert_eq!(cfg.delivery_quorum, None); + assert_eq!(cfg.publish_max_attempts, 1); + assert_eq!(cfg.publish_initial_backoff_millis, 250); + assert_eq!(cfg.publish_max_backoff_millis, 2_000); + assert_eq!(cfg.job_status_retention, 256); + } + + #[test] fn rpc_addr_prefers_override() { let mut cfg = Configuration { service: service_config(), @@ -154,6 +250,7 @@ mod tests { }, rpc_addr: None, nip46: Nip46Config::default(), + bridge: BridgeConfig::default(), }; assert_eq!(cfg.rpc_addr(), "127.0.0.1:1111"); cfg.rpc_addr = Some("127.0.0.1:2222".to_string()); diff --git a/src/app/runtime.rs b/src/app/runtime.rs @@ -9,13 +9,14 @@ use crate::core::Radrootsd; use crate::transport::jsonrpc; #[cfg(not(test))] use crate::transport::nostr::listener::spawn_nip46_listener; +#[cfg(not(test))] +use anyhow::Context; +use radroots_events::kinds::KIND_LISTING; use radroots_events::profile::RadrootsProfileType; use radroots_nostr::prelude::{ RadrootsNostrApplicationHandlerSpec, RadrootsNostrKind, radroots_nostr_bootstrap_service_presence, }; -#[cfg(not(test))] -use anyhow::Context; #[cfg(test)] static RUN_LOAD_HOOK: std::sync::OnceLock< @@ -42,8 +43,8 @@ enum RunWaitOutcome { } #[cfg(test)] -fn run_load_hook( -) -> &'static std::sync::Mutex<Option<Result<(cli::Args, config::Settings), String>>> { +fn run_load_hook() +-> &'static std::sync::Mutex<Option<Result<(cli::Args, config::Settings), String>>> { RUN_LOAD_HOOK.get_or_init(|| std::sync::Mutex::new(None)) } @@ -148,11 +149,12 @@ async fn publish_service_presence( identity: RadrootsIdentity, metadata: radroots_nostr::prelude::RadrootsNostrMetadata, service_cfg: radroots_runtime::RadrootsNostrServiceConfig, + bridge_config: config::BridgeConfig, nip46_config: config::Nip46Config, ) -> Result<()> { - let nip46_kind = RadrootsNostrKind::NostrConnect.as_u16() as u32; + let kinds = service_presence_kinds(&bridge_config); let handler_spec = RadrootsNostrApplicationHandlerSpec { - kinds: vec![nip46_kind], + kinds, identifier: service_cfg.nip89_identifier.clone(), metadata: Some(metadata.clone()), extra_tags: service_cfg.nip89_extra_tags.clone(), @@ -168,6 +170,7 @@ async fn maybe_publish_service_presence( identity: RadrootsIdentity, metadata: radroots_nostr::prelude::RadrootsNostrMetadata, service_cfg: radroots_runtime::RadrootsNostrServiceConfig, + bridge_config: config::BridgeConfig, nip46_config: config::Nip46Config, ) { #[cfg(test)] @@ -177,6 +180,7 @@ async fn maybe_publish_service_presence( identity, metadata, service_cfg, + bridge_config, nip46_config, ) .await; @@ -195,6 +199,7 @@ async fn maybe_publish_service_presence( identity, metadata, service_cfg, + bridge_config, nip46_config, ) .await; @@ -268,6 +273,7 @@ pub async fn run() -> Result<()> { let radrootsd = Radrootsd::new( keys, settings.metadata.clone(), + settings.config.bridge.clone(), settings.config.nip46.clone(), ); @@ -281,6 +287,7 @@ pub async fn run() -> Result<()> { identity.clone(), settings.metadata.clone(), settings.config.service.clone(), + settings.config.bridge.clone(), settings.config.nip46.clone(), ) .await; @@ -305,6 +312,16 @@ pub async fn run() -> Result<()> { Ok(()) } +fn service_presence_kinds(bridge_config: &config::BridgeConfig) -> Vec<u32> { + let mut kinds = vec![RadrootsNostrKind::NostrConnect.as_u16() as u32]; + if bridge_config.enabled { + kinds.push(KIND_LISTING); + } + kinds.sort_unstable(); + kinds.dedup(); + kinds +} + #[cfg(test)] #[cfg_attr(coverage_nightly, coverage(off))] mod tests { @@ -314,6 +331,7 @@ mod tests { use crate::app::{cli, config}; use crate::core::Radrootsd; use crate::transport::jsonrpc; + use radroots_events::kinds::KIND_LISTING; use radroots_nostr::prelude::{RadrootsNostrKeys, RadrootsNostrMetadata}; use std::path::PathBuf; use std::sync::{Mutex, MutexGuard}; @@ -321,7 +339,9 @@ mod tests { static TEST_LOCK: Mutex<()> = Mutex::new(()); fn test_guard() -> MutexGuard<'static, ()> { - let guard = TEST_LOCK.lock().unwrap_or_else(std::sync::PoisonError::into_inner); + let guard = TEST_LOCK + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); *run_load_hook() .lock() .unwrap_or_else(std::sync::PoisonError::into_inner) = None; @@ -372,6 +392,7 @@ mod tests { ..config::RpcConfig::default() }, rpc_addr: Some("127.0.0.1:0".to_string()), + bridge: config::BridgeConfig::default(), nip46: config::Nip46Config::default(), }, } @@ -382,6 +403,7 @@ mod tests { let state = Radrootsd::new( keys, settings.metadata.clone(), + settings.config.bridge.clone(), settings.config.nip46.clone(), ); jsonrpc::start_rpc( @@ -423,7 +445,8 @@ mod tests { let handle = make_handle(&settings).await; *run_load_hook() .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Ok((args, settings.clone()))); + .unwrap_or_else(std::sync::PoisonError::into_inner) = + Some(Ok((args, settings.clone()))); *run_start_rpc_hook() .lock() .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Ok(handle)); @@ -447,7 +470,8 @@ mod tests { let _ = handle.stop(); *run_load_hook() .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Ok((args, settings.clone()))); + .unwrap_or_else(std::sync::PoisonError::into_inner) = + Some(Ok((args, settings.clone()))); *run_start_rpc_hook() .lock() .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Ok(handle)); @@ -470,7 +494,8 @@ mod tests { let handle = make_handle(&settings).await; *run_load_hook() .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Ok((args, settings.clone()))); + .unwrap_or_else(std::sync::PoisonError::into_inner) = + Some(Ok((args, settings.clone()))); *run_start_rpc_hook() .lock() .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Ok(handle)); @@ -564,4 +589,19 @@ mod tests { assert!(run().await.is_ok()); let _ = std::fs::remove_file(path); } + + #[test] + fn service_presence_kinds_include_listing_when_bridge_is_enabled() { + let mut bridge = config::BridgeConfig::default(); + bridge.enabled = true; + + let kinds = super::service_presence_kinds(&bridge); + + assert!( + kinds.contains( + &(radroots_nostr::prelude::RadrootsNostrKind::NostrConnect.as_u16() as u32) + ) + ); + assert!(kinds.contains(&KIND_LISTING)); + } } diff --git a/src/core/bridge/mod.rs b/src/core/bridge/mod.rs @@ -0,0 +1,2 @@ +pub mod publish; +pub mod store; diff --git a/src/core/bridge/publish.rs b/src/core/bridge/publish.rs @@ -0,0 +1,570 @@ +use std::collections::{BTreeMap, BTreeSet}; +use std::time::Duration; + +use radroots_nostr::prelude::{RadrootsNostrClient, RadrootsNostrOutput, RadrootsNostrRelayUrl}; +use serde::Serialize; +use tokio::time::sleep; + +use crate::app::config::{BridgeConfig, BridgeDeliveryPolicy}; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub struct BridgeRelayPublishResult { + pub relay_url: String, + pub acknowledged: bool, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub detail: Option<String>, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub struct BridgePublishExecution { + pub published: bool, + pub relay_count: usize, + pub acknowledged_relay_count: usize, + pub required_acknowledged_relay_count: usize, + pub delivery_policy: BridgeDeliveryPolicy, + pub attempt_count: usize, + pub relay_outcome_summary: String, + pub relay_results: Vec<BridgeRelayPublishResult>, + pub attempt_summaries: Vec<String>, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BridgePublishSettings { + pub connect_timeout_secs: u64, + pub delivery_policy: BridgeDeliveryPolicy, + pub delivery_quorum: Option<usize>, + pub publish_max_attempts: usize, + pub publish_initial_backoff_millis: u64, + pub publish_max_backoff_millis: u64, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct BridgePublishAttemptResult { + attempt_number: usize, + acknowledged_relay_count: usize, + relay_outcome_summary: String, + relay_results: Vec<BridgeRelayPublishResult>, +} + +impl BridgePublishSettings { + pub fn from_config(config: &BridgeConfig) -> Self { + Self { + connect_timeout_secs: config.connect_timeout_secs, + delivery_policy: config.delivery_policy, + delivery_quorum: config.delivery_quorum, + publish_max_attempts: config.publish_max_attempts, + publish_initial_backoff_millis: config.publish_initial_backoff_millis, + publish_max_backoff_millis: config.publish_max_backoff_millis, + } + } + + fn required_acknowledged_relay_count(&self, relay_count: usize) -> Result<usize, String> { + if relay_count == 0 { + return Err("cannot publish without at least one relay".to_string()); + } + if self.connect_timeout_secs == 0 { + return Err("bridge.connect_timeout_secs must be greater than zero".to_string()); + } + if self.publish_max_attempts == 0 { + return Err("bridge.publish_max_attempts must be greater than zero".to_string()); + } + if self.publish_initial_backoff_millis == 0 { + return Err( + "bridge.publish_initial_backoff_millis must be greater than zero".to_string(), + ); + } + if self.publish_max_backoff_millis == 0 { + return Err("bridge.publish_max_backoff_millis must be greater than zero".to_string()); + } + if self.publish_initial_backoff_millis > self.publish_max_backoff_millis { + return Err( + "bridge.publish_max_backoff_millis must be greater than or equal to bridge.publish_initial_backoff_millis" + .to_string(), + ); + } + + match self.delivery_policy { + BridgeDeliveryPolicy::Any => Ok(1), + BridgeDeliveryPolicy::All => Ok(relay_count), + BridgeDeliveryPolicy::Quorum => { + let delivery_quorum = self.delivery_quorum.ok_or_else(|| { + "bridge.delivery_quorum must be set when bridge.delivery_policy is `quorum`" + .to_string() + })?; + if delivery_quorum == 0 { + return Err("bridge.delivery_quorum must be greater than zero".to_string()); + } + if delivery_quorum > relay_count { + return Err(format!( + "bridge.delivery_quorum `{delivery_quorum}` cannot be satisfied by `{relay_count}` target relays" + )); + } + Ok(delivery_quorum) + } + } + } + + fn backoff_for_attempt(&self, completed_attempt_number: usize) -> u64 { + let exponent = completed_attempt_number.saturating_sub(1) as u32; + let scaled = self + .publish_initial_backoff_millis + .saturating_mul(2_u64.saturating_pow(exponent)); + scaled.min(self.publish_max_backoff_millis) + } +} + +pub async fn connect_and_publish_event( + client: &RadrootsNostrClient, + settings: &BridgePublishSettings, + event: &radroots_nostr::prelude::RadrootsNostrEvent, +) -> BridgePublishExecution { + let relays = client + .relays() + .await + .keys() + .cloned() + .collect::<Vec<RadrootsNostrRelayUrl>>(); + publish_with_policy(&relays, settings, || async { + client.connect().await; + client + .wait_for_connection(Duration::from_secs(settings.connect_timeout_secs)) + .await; + client + .send_event(event) + .await + .map_err(|error| error.to_string()) + }) + .await +} + +pub async fn publish_with_policy<T, F, Fut>( + relays: &[RadrootsNostrRelayUrl], + settings: &BridgePublishSettings, + mut send_attempt: F, +) -> BridgePublishExecution +where + T: std::fmt::Debug, + F: FnMut() -> Fut, + Fut: std::future::Future<Output = Result<RadrootsNostrOutput<T>, String>>, +{ + let relay_count = relays.len(); + let required_acknowledged_relay_count = + match settings.required_acknowledged_relay_count(relay_count) { + Ok(required) => required, + Err(error) => { + let relay_results = relays + .iter() + .map(|relay| BridgeRelayPublishResult { + relay_url: relay.to_string(), + acknowledged: false, + detail: Some(error.clone()), + }) + .collect::<Vec<_>>(); + return BridgePublishExecution { + published: false, + relay_count, + acknowledged_relay_count: 0, + required_acknowledged_relay_count: 0, + delivery_policy: settings.delivery_policy, + attempt_count: 0, + relay_outcome_summary: error.clone(), + relay_results, + attempt_summaries: vec![error], + }; + } + }; + let mut attempt_results = Vec::new(); + + for attempt_number in 1..=settings.publish_max_attempts { + let attempt = match send_attempt().await { + Ok(output) => build_publish_attempt_result(relays, attempt_number, &output), + Err(error) => build_failed_publish_attempt_result(relays, attempt_number, error), + }; + let threshold_reached = + attempt.acknowledged_relay_count >= required_acknowledged_relay_count; + attempt_results.push(attempt); + + if threshold_reached { + let final_attempt = attempt_results + .last() + .expect("publish attempt results contain the successful attempt"); + return BridgePublishExecution { + published: true, + relay_count, + acknowledged_relay_count: final_attempt.acknowledged_relay_count, + required_acknowledged_relay_count, + delivery_policy: settings.delivery_policy, + attempt_count: attempt_results.len(), + relay_outcome_summary: summarize_delivery_policy_result( + settings.delivery_policy, + required_acknowledged_relay_count, + &attempt_results, + ), + relay_results: final_attempt.relay_results.clone(), + attempt_summaries: attempt_results + .iter() + .map(|attempt| attempt.relay_outcome_summary.clone()) + .collect(), + }; + } + + if attempt_number < settings.publish_max_attempts { + sleep(Duration::from_millis( + settings.backoff_for_attempt(attempt_number), + )) + .await; + } + } + + let final_attempt = attempt_results + .last() + .expect("publish attempt results contain at least one attempt"); + BridgePublishExecution { + published: false, + relay_count, + acknowledged_relay_count: final_attempt.acknowledged_relay_count, + required_acknowledged_relay_count, + delivery_policy: settings.delivery_policy, + attempt_count: attempt_results.len(), + relay_outcome_summary: summarize_delivery_policy_result( + settings.delivery_policy, + required_acknowledged_relay_count, + &attempt_results, + ), + relay_results: final_attempt.relay_results.clone(), + attempt_summaries: attempt_results + .iter() + .map(|attempt| attempt.relay_outcome_summary.clone()) + .collect(), + } +} + +fn build_publish_relay_results<T>( + relays: &[RadrootsNostrRelayUrl], + output: &RadrootsNostrOutput<T>, +) -> Vec<BridgeRelayPublishResult> +where + T: std::fmt::Debug, +{ + let acknowledged_relays = output + .success + .iter() + .map(ToString::to_string) + .collect::<BTreeSet<_>>(); + let failed_relays = output + .failed + .iter() + .map(|(relay, error)| (relay.to_string(), error.to_string())) + .collect::<BTreeMap<_, _>>(); + + relays + .iter() + .map(|relay| { + let relay_url = relay.to_string(); + if acknowledged_relays.contains(&relay_url) { + BridgeRelayPublishResult { + relay_url, + acknowledged: true, + detail: None, + } + } else { + BridgeRelayPublishResult { + relay_url: relay_url.clone(), + acknowledged: false, + detail: Some( + failed_relays + .get(&relay_url) + .cloned() + .unwrap_or_else(|| "no relay acknowledgement reported".to_owned()), + ), + } + } + }) + .collect() +} + +fn build_publish_attempt_result<T>( + relays: &[RadrootsNostrRelayUrl], + attempt_number: usize, + output: &RadrootsNostrOutput<T>, +) -> BridgePublishAttemptResult +where + T: std::fmt::Debug, +{ + let relay_results = build_publish_relay_results(relays, output); + let acknowledged_relay_count = relay_results + .iter() + .filter(|result| result.acknowledged) + .count(); + BridgePublishAttemptResult { + attempt_number, + acknowledged_relay_count, + relay_outcome_summary: summarize_publish_results(&relay_results), + relay_results, + } +} + +fn build_failed_publish_attempt_result( + relays: &[RadrootsNostrRelayUrl], + attempt_number: usize, + error: String, +) -> BridgePublishAttemptResult { + let relay_results = relays + .iter() + .map(|relay| BridgeRelayPublishResult { + relay_url: relay.to_string(), + acknowledged: false, + detail: Some(error.clone()), + }) + .collect::<Vec<_>>(); + BridgePublishAttemptResult { + attempt_number, + acknowledged_relay_count: 0, + relay_outcome_summary: summarize_publish_results(&relay_results), + relay_results, + } +} + +fn summarize_publish_results(relay_results: &[BridgeRelayPublishResult]) -> String { + let relay_count = relay_results.len(); + let acknowledged_relay_count = relay_results + .iter() + .filter(|result| result.acknowledged) + .count(); + if relay_count == 0 { + return "no relay acknowledged the publish".to_owned(); + } + + let mut summary = + format!("{acknowledged_relay_count}/{relay_count} relays acknowledged publish"); + let acknowledged = relay_results + .iter() + .filter(|result| result.acknowledged) + .map(|result| result.relay_url.clone()) + .collect::<Vec<_>>(); + if !acknowledged.is_empty() { + summary.push_str("; acknowledged: "); + summary.push_str(&acknowledged.join(", ")); + } + let failures = relay_results + .iter() + .filter(|result| !result.acknowledged) + .map(|result| match result.detail.as_deref() { + Some(detail) => format!("{}: {detail}", result.relay_url), + None => result.relay_url.clone(), + }) + .collect::<Vec<_>>(); + if !failures.is_empty() { + summary.push_str("; failures: "); + summary.push_str(&failures.join("; ")); + } + summary +} + +fn summarize_delivery_policy_result( + delivery_policy: BridgeDeliveryPolicy, + required_acknowledged_relay_count: usize, + attempt_results: &[BridgePublishAttemptResult], +) -> String { + let attempt_count = attempt_results.len(); + let final_attempt = attempt_results + .last() + .expect("delivery policy summary requires at least one attempt"); + let mut summary = format!( + "delivery policy {} required {required_acknowledged_relay_count} acknowledgements across {attempt_count} attempt(s); final attempt {}: {}", + delivery_policy.as_str(), + final_attempt.attempt_number, + final_attempt.relay_outcome_summary, + ); + if attempt_results.len() > 1 { + let attempt_summaries = attempt_results + .iter() + .map(|attempt| { + format!( + "attempt {}: {}", + attempt.attempt_number, attempt.relay_outcome_summary + ) + }) + .collect::<Vec<_>>(); + summary.push_str("; "); + summary.push_str(&attempt_summaries.join(" | ")); + } + summary +} + +#[cfg(test)] +mod tests { + use std::collections::{HashMap, HashSet}; + use std::sync::{Arc, Mutex}; + + use radroots_nostr::prelude::{ + RadrootsNostrEventId, RadrootsNostrOutput, RadrootsNostrRelayUrl, + }; + use tokio::time::Instant; + + use crate::app::config::{BridgeConfig, BridgeDeliveryPolicy}; + + use super::{BridgePublishSettings, publish_with_policy}; + + #[test] + fn publish_settings_from_config_copies_values() { + let config = BridgeConfig { + enabled: true, + connect_timeout_secs: 15, + delivery_policy: BridgeDeliveryPolicy::Quorum, + delivery_quorum: Some(2), + publish_max_attempts: 3, + publish_initial_backoff_millis: 125, + publish_max_backoff_millis: 500, + job_status_retention: 64, + }; + + assert_eq!( + BridgePublishSettings::from_config(&config), + BridgePublishSettings { + connect_timeout_secs: 15, + delivery_policy: BridgeDeliveryPolicy::Quorum, + delivery_quorum: Some(2), + publish_max_attempts: 3, + publish_initial_backoff_millis: 125, + publish_max_backoff_millis: 500, + } + ); + } + + #[tokio::test] + async fn publish_with_policy_retries_until_threshold_is_met() { + let relays = vec![ + RadrootsNostrRelayUrl::parse("wss://relay-a.example.com").expect("relay-a"), + RadrootsNostrRelayUrl::parse("wss://relay-b.example.com").expect("relay-b"), + ]; + let settings = BridgePublishSettings { + connect_timeout_secs: 10, + delivery_policy: BridgeDeliveryPolicy::All, + delivery_quorum: None, + publish_max_attempts: 2, + publish_initial_backoff_millis: 10, + publish_max_backoff_millis: 10, + }; + let attempts = Arc::new(Mutex::new(vec![ + publish_output( + "1111111111111111111111111111111111111111111111111111111111111111", + &["wss://relay-a.example.com"], + &[("wss://relay-b.example.com", "blocked")], + ), + publish_output( + "2222222222222222222222222222222222222222222222222222222222222222", + &["wss://relay-a.example.com", "wss://relay-b.example.com"], + &[], + ), + ])); + + let start = Instant::now(); + let outcome = publish_with_policy(&relays, &settings, || { + let attempts = Arc::clone(&attempts); + async move { + let output = attempts.lock().expect("attempts lock").remove(0); + Ok(output) + } + }) + .await; + + assert!(outcome.published); + assert_eq!(outcome.delivery_policy, BridgeDeliveryPolicy::All); + assert_eq!(outcome.required_acknowledged_relay_count, 2); + assert_eq!(outcome.attempt_count, 2); + assert_eq!(outcome.acknowledged_relay_count, 2); + assert_eq!(outcome.relay_results.len(), 2); + assert_eq!(outcome.attempt_summaries.len(), 2); + assert!( + outcome + .relay_outcome_summary + .contains("delivery policy all") + ); + assert!(outcome.relay_outcome_summary.contains("attempt 1")); + assert!(start.elapsed() >= std::time::Duration::from_millis(10)); + } + + #[tokio::test] + async fn publish_with_policy_reports_threshold_failure() { + let relays = vec![ + RadrootsNostrRelayUrl::parse("wss://relay-a.example.com").expect("relay-a"), + RadrootsNostrRelayUrl::parse("wss://relay-b.example.com").expect("relay-b"), + ]; + let settings = BridgePublishSettings { + connect_timeout_secs: 10, + delivery_policy: BridgeDeliveryPolicy::Quorum, + delivery_quorum: Some(2), + publish_max_attempts: 2, + publish_initial_backoff_millis: 1, + publish_max_backoff_millis: 1, + }; + + let outcome = + publish_with_policy::<RadrootsNostrEventId, _, _>(&relays, &settings, || async { + Ok(publish_output( + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + &["wss://relay-a.example.com"], + &[("wss://relay-b.example.com", "blocked")], + )) + }) + .await; + + assert!(!outcome.published); + assert_eq!(outcome.delivery_policy, BridgeDeliveryPolicy::Quorum); + assert_eq!(outcome.required_acknowledged_relay_count, 2); + assert_eq!(outcome.attempt_count, 2); + assert!( + outcome + .relay_outcome_summary + .contains("delivery policy quorum") + ); + } + + #[tokio::test] + async fn publish_with_policy_reports_configuration_failure_without_attempts() { + let settings = BridgePublishSettings { + connect_timeout_secs: 0, + delivery_policy: BridgeDeliveryPolicy::Any, + delivery_quorum: None, + publish_max_attempts: 1, + publish_initial_backoff_millis: 10, + publish_max_backoff_millis: 10, + }; + + let outcome = publish_with_policy::<RadrootsNostrEventId, _, _>(&[], &settings, || async { + unreachable!("configuration failure should short-circuit") + }) + .await; + + assert!(!outcome.published); + assert_eq!(outcome.attempt_count, 0); + assert!(outcome.relay_outcome_summary.contains("cannot publish")); + } + + fn publish_output( + event_id_hex: &str, + succeeded_relays: &[&str], + failed_relays: &[(&str, &str)], + ) -> RadrootsNostrOutput<RadrootsNostrEventId> { + let success = succeeded_relays + .iter() + .map(|relay| RadrootsNostrRelayUrl::parse(*relay).expect("success relay")) + .collect::<HashSet<_>>(); + let failed = failed_relays + .iter() + .map(|(relay, error)| { + ( + RadrootsNostrRelayUrl::parse(*relay).expect("failed relay"), + (*error).to_owned(), + ) + }) + .collect::<HashMap<_, _>>(); + + RadrootsNostrOutput { + val: RadrootsNostrEventId::parse(event_id_hex).expect("event id"), + success, + failed, + } + } +} diff --git a/src/core/bridge/store.rs b/src/core/bridge/store.rs @@ -0,0 +1,301 @@ +use std::collections::{HashMap, VecDeque}; +use std::sync::{Arc, RwLock}; + +use serde::Serialize; + +use crate::app::config::BridgeDeliveryPolicy; +use crate::core::bridge::publish::{BridgePublishExecution, BridgeRelayPublishResult}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum BridgeJobStatus { + Accepted, + Published, + Failed, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub struct BridgeJobRecord { + pub job_id: String, + pub command: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub idempotency_key: Option<String>, + pub status: BridgeJobStatus, + pub requested_at_unix: u64, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub completed_at_unix: Option<u64>, + pub signer_mode: String, + pub event_kind: u32, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub event_id: Option<String>, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub event_addr: Option<String>, + pub delivery_policy: BridgeDeliveryPolicy, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub delivery_quorum: Option<usize>, + pub relay_count: usize, + pub acknowledged_relay_count: usize, + pub required_acknowledged_relay_count: usize, + pub attempt_count: usize, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub attempt_summaries: Vec<String>, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub relay_results: Vec<BridgeRelayPublishResult>, + pub relay_outcome_summary: String, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +pub struct BridgeJobStoreSnapshot { + pub retained_jobs: usize, + pub retained_idempotency_keys: usize, + pub capacity: usize, +} + +#[derive(Clone)] +pub struct BridgeJobStore { + inner: Arc<RwLock<BridgeJobStoreInner>>, +} + +#[derive(Debug)] +struct BridgeJobStoreInner { + jobs: HashMap<String, BridgeJobRecord>, + idempotency: HashMap<String, String>, + order: VecDeque<String>, + capacity: usize, +} + +impl BridgeJobStore { + pub fn new(capacity: usize) -> Self { + Self { + inner: Arc::new(RwLock::new(BridgeJobStoreInner { + jobs: HashMap::new(), + idempotency: HashMap::new(), + order: VecDeque::new(), + capacity, + })), + } + } + + pub fn reserve(&self, mut record: BridgeJobRecord) -> Result<BridgeJobRecord, BridgeJobRecord> { + let mut inner = self.inner.write().unwrap_or_else(|e| e.into_inner()); + if let Some(idempotency_key) = record.idempotency_key.as_ref() { + if let Some(job_id) = inner.idempotency.get(idempotency_key) { + if let Some(existing) = inner.jobs.get(job_id) { + return Err(existing.clone()); + } + } + } + + record.status = BridgeJobStatus::Accepted; + inner.order.push_back(record.job_id.clone()); + if let Some(idempotency_key) = record.idempotency_key.as_ref() { + inner + .idempotency + .insert(idempotency_key.clone(), record.job_id.clone()); + } + inner.jobs.insert(record.job_id.clone(), record.clone()); + inner.prune(); + Ok(record) + } + + pub fn complete( + &self, + job_id: &str, + execution: BridgePublishExecution, + ) -> Option<BridgeJobRecord> { + let mut inner = self.inner.write().unwrap_or_else(|e| e.into_inner()); + let record = inner.jobs.get_mut(job_id)?; + record.status = if execution.published { + BridgeJobStatus::Published + } else { + BridgeJobStatus::Failed + }; + record.completed_at_unix = Some(unix_timestamp_now()); + record.relay_count = execution.relay_count; + record.acknowledged_relay_count = execution.acknowledged_relay_count; + record.required_acknowledged_relay_count = execution.required_acknowledged_relay_count; + record.attempt_count = execution.attempt_count; + record.attempt_summaries = execution.attempt_summaries; + record.relay_results = execution.relay_results; + record.relay_outcome_summary = execution.relay_outcome_summary; + Some(record.clone()) + } + + pub fn get(&self, job_id: &str) -> Option<BridgeJobRecord> { + self.inner + .read() + .unwrap_or_else(|e| e.into_inner()) + .jobs + .get(job_id) + .cloned() + } + + pub fn snapshot(&self) -> BridgeJobStoreSnapshot { + let inner = self.inner.read().unwrap_or_else(|e| e.into_inner()); + BridgeJobStoreSnapshot { + retained_jobs: inner.jobs.len(), + retained_idempotency_keys: inner.idempotency.len(), + capacity: inner.capacity, + } + } +} + +impl BridgeJobStoreInner { + fn prune(&mut self) { + while self.jobs.len() > self.capacity { + let Some(job_id) = self.order.pop_front() else { + break; + }; + let Some(removed) = self.jobs.remove(&job_id) else { + continue; + }; + if let Some(idempotency_key) = removed.idempotency_key { + if self.idempotency.get(&idempotency_key) == Some(&job_id) { + self.idempotency.remove(&idempotency_key); + } + } + } + } +} + +pub fn new_listing_publish_job( + job_id: String, + idempotency_key: Option<String>, + event_kind: u32, + event_id: String, + event_addr: String, + delivery_policy: BridgeDeliveryPolicy, + delivery_quorum: Option<usize>, +) -> BridgeJobRecord { + BridgeJobRecord { + job_id, + command: "bridge.listing.publish".to_string(), + idempotency_key, + status: BridgeJobStatus::Accepted, + requested_at_unix: unix_timestamp_now(), + completed_at_unix: None, + signer_mode: "embedded_service_identity".to_string(), + event_kind, + event_id: Some(event_id), + event_addr: Some(event_addr), + delivery_policy, + delivery_quorum, + relay_count: 0, + acknowledged_relay_count: 0, + required_acknowledged_relay_count: 0, + attempt_count: 0, + attempt_summaries: Vec::new(), + relay_results: Vec::new(), + relay_outcome_summary: "accepted".to_string(), + } +} + +fn unix_timestamp_now() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_else(|_| std::time::Duration::from_secs(0)) + .as_secs() +} + +#[cfg(test)] +mod tests { + use crate::app::config::BridgeDeliveryPolicy; + use crate::core::bridge::publish::BridgePublishExecution; + + use super::{BridgeJobStatus, BridgeJobStore, new_listing_publish_job}; + + #[test] + fn reserve_returns_existing_job_for_same_idempotency_key() { + let store = BridgeJobStore::new(8); + let first = new_listing_publish_job( + "job-1".to_string(), + Some("same".to_string()), + 30402, + "event-1".to_string(), + "30402:author:listing".to_string(), + BridgeDeliveryPolicy::Any, + None, + ); + let second = new_listing_publish_job( + "job-2".to_string(), + Some("same".to_string()), + 30402, + "event-2".to_string(), + "30402:author:listing".to_string(), + BridgeDeliveryPolicy::Any, + None, + ); + + assert!(store.reserve(first.clone()).is_ok()); + let existing = store.reserve(second).expect_err("same idempotency key"); + assert_eq!(existing.job_id, first.job_id); + assert_eq!(existing.status, BridgeJobStatus::Accepted); + } + + #[test] + fn complete_updates_job_record() { + let store = BridgeJobStore::new(8); + let job = new_listing_publish_job( + "job-1".to_string(), + None, + 30402, + "event-1".to_string(), + "30402:author:listing".to_string(), + BridgeDeliveryPolicy::Any, + None, + ); + store.reserve(job).expect("reserve job"); + + let completed = store + .complete( + "job-1", + BridgePublishExecution { + published: true, + relay_count: 2, + acknowledged_relay_count: 1, + required_acknowledged_relay_count: 1, + delivery_policy: BridgeDeliveryPolicy::Any, + attempt_count: 1, + relay_outcome_summary: "1/2 relays acknowledged publish".to_string(), + relay_results: Vec::new(), + attempt_summaries: vec!["attempt 1".to_string()], + }, + ) + .expect("complete job"); + + assert_eq!(completed.status, BridgeJobStatus::Published); + assert_eq!(completed.attempt_count, 1); + assert_eq!(completed.acknowledged_relay_count, 1); + assert!(completed.completed_at_unix.is_some()); + } + + #[test] + fn reserve_prunes_oldest_jobs_when_capacity_is_exceeded() { + let store = BridgeJobStore::new(1); + let first = new_listing_publish_job( + "job-1".to_string(), + Some("first".to_string()), + 30402, + "event-1".to_string(), + "30402:author:listing-1".to_string(), + BridgeDeliveryPolicy::Any, + None, + ); + let second = new_listing_publish_job( + "job-2".to_string(), + Some("second".to_string()), + 30402, + "event-2".to_string(), + "30402:author:listing-2".to_string(), + BridgeDeliveryPolicy::Any, + None, + ); + + store.reserve(first).expect("first"); + store.reserve(second).expect("second"); + + assert!(store.get("job-1").is_none()); + assert!(store.get("job-2").is_some()); + assert_eq!(store.snapshot().retained_jobs, 1); + } +} diff --git a/src/core/mod.rs b/src/core/mod.rs @@ -1,3 +1,4 @@ +pub mod bridge; pub mod nip46; pub mod state; diff --git a/src/core/nip46/session.rs b/src/core/nip46/session.rs @@ -1,18 +1,14 @@ #![forbid(unsafe_code)] use std::collections::{HashMap, HashSet}; -use std::time::{Duration, Instant}; use std::sync::Arc; +use std::time::{Duration, Instant}; use serde::Serialize; use tokio::sync::Mutex; -use radroots_nostr::prelude::{ - RadrootsNostrClient, - RadrootsNostrKeys, - RadrootsNostrPublicKey, -}; use nostr::nips::nip46::NostrConnectRequest; +use radroots_nostr::prelude::{RadrootsNostrClient, RadrootsNostrKeys, RadrootsNostrPublicKey}; #[derive(Clone)] pub struct Nip46SessionStore { @@ -107,11 +103,7 @@ impl Nip46SessionStore { sessions.remove(session_id).is_some() } - pub async fn set_user_pubkey( - &self, - session_id: &str, - pubkey: RadrootsNostrPublicKey, - ) -> bool { + pub async fn set_user_pubkey(&self, session_id: &str, pubkey: RadrootsNostrPublicKey) -> bool { let mut sessions = self.inner.lock().await; match sessions.get_mut(session_id) { Some(session) => { @@ -237,7 +229,9 @@ fn remaining_secs(expires_at: Instant) -> u64 { if expires_at <= Instant::now() { 0 } else { - expires_at.saturating_duration_since(Instant::now()).as_secs() + expires_at + .saturating_duration_since(Instant::now()) + .as_secs() } } @@ -308,10 +302,7 @@ mod tests { #[tokio::test] async fn session_store_removes_expired() { let store = Nip46SessionStore::new(); - let session = build_session( - "expired", - Some(Instant::now() - Duration::from_secs(1)), - ); + let session = build_session("expired", Some(Instant::now() - Duration::from_secs(1))); store.insert(session).await; let found = store.get("expired").await; assert!(found.is_none()); @@ -372,10 +363,7 @@ mod tests { #[tokio::test] async fn session_store_keeps_active() { let store = Nip46SessionStore::new(); - let session = build_session( - "active", - Some(Instant::now() + Duration::from_secs(60)), - ); + let session = build_session("active", Some(Instant::now() + Duration::from_secs(60))); store.insert(session).await; let found = store.get("active").await; assert!(found.is_some()); @@ -473,7 +461,11 @@ mod tests { Some(Instant::now() - Duration::from_secs(1)), ); store.insert(session).await; - assert!(!store.set_user_pubkey("expired-user", keys.public_key()).await); + assert!( + !store + .set_user_pubkey("expired-user", keys.public_key()) + .await + ); } #[tokio::test] @@ -494,10 +486,7 @@ mod tests { #[tokio::test] async fn session_store_require_auth_sets_flags_and_clears_pending() { let store = Nip46SessionStore::new(); - let mut session = build_session( - "auth", - Some(Instant::now() + Duration::from_secs(30)), - ); + let mut session = build_session("auth", Some(Instant::now() + Duration::from_secs(30))); let keys = RadrootsNostrKeys::generate(); session.pending_request = Some(PendingNostrRequest { request_id: "req-1".to_string(), @@ -517,7 +506,11 @@ mod tests { #[tokio::test] async fn session_store_require_auth_handles_missing_and_expired() { let store = Nip46SessionStore::new(); - assert!(!store.require_auth("missing", "https://auth".to_string()).await); + assert!( + !store + .require_auth("missing", "https://auth".to_string()) + .await + ); store .insert(build_session( @@ -535,10 +528,8 @@ mod tests { #[tokio::test] async fn session_store_authorize_returns_pending() { let store = Nip46SessionStore::new(); - let mut session = build_session( - "authorize", - Some(Instant::now() + Duration::from_secs(30)), - ); + let mut session = + build_session("authorize", Some(Instant::now() + Duration::from_secs(30))); let keys = RadrootsNostrKeys::generate(); session.pending_request = Some(PendingNostrRequest { request_id: "req-2".to_string(), diff --git a/src/core/state.rs b/src/core/state.rs @@ -1,11 +1,8 @@ use radroots_nostr::prelude::{ - RadrootsNostrClient, - RadrootsNostrKeys, - RadrootsNostrMetadata, - RadrootsNostrPublicKey, + RadrootsNostrClient, RadrootsNostrKeys, RadrootsNostrMetadata, RadrootsNostrPublicKey, }; -use crate::app::config::Nip46Config; +use crate::app::config::{BridgeConfig, Nip46Config}; #[derive(Clone)] pub struct Radrootsd { @@ -14,6 +11,8 @@ pub struct Radrootsd { pub pubkey: RadrootsNostrPublicKey, pub metadata: RadrootsNostrMetadata, pub info: serde_json::Value, + pub(crate) bridge_jobs: crate::core::bridge::store::BridgeJobStore, + pub bridge_config: BridgeConfig, pub(crate) nip46_sessions: crate::core::nip46::session::Nip46SessionStore, pub nip46_config: Nip46Config, } @@ -22,6 +21,7 @@ impl Radrootsd { pub fn new( keys: RadrootsNostrKeys, metadata: RadrootsNostrMetadata, + bridge_config: BridgeConfig, nip46_config: Nip46Config, ) -> Self { let pubkey = keys.public_key(); @@ -30,6 +30,8 @@ impl Radrootsd { "version": env!("CARGO_PKG_VERSION"), "build": option_env!("GIT_HASH").unwrap_or("unknown"), }); + let bridge_jobs = + crate::core::bridge::store::BridgeJobStore::new(bridge_config.job_status_retention); let nip46_sessions = crate::core::nip46::session::Nip46SessionStore::new(); Self { @@ -38,6 +40,8 @@ impl Radrootsd { pubkey, metadata, info, + bridge_jobs, + bridge_config, nip46_sessions, nip46_config, } @@ -47,7 +51,7 @@ impl Radrootsd { #[cfg(test)] mod tests { use super::Radrootsd; - use crate::app::config::Nip46Config; + use crate::app::config::{BridgeConfig, Nip46Config}; use radroots_nostr::prelude::{RadrootsNostrKeys, RadrootsNostrMetadata}; #[test] @@ -55,11 +59,22 @@ mod tests { let keys = RadrootsNostrKeys::generate(); let metadata: RadrootsNostrMetadata = serde_json::from_str(r#"{"name":"radrootsd-test"}"#).expect("metadata"); + let bridge_cfg = BridgeConfig::default(); let cfg = Nip46Config::default(); - let state = Radrootsd::new(keys.clone(), metadata.clone(), cfg.clone()); + let state = Radrootsd::new( + keys.clone(), + metadata.clone(), + bridge_cfg.clone(), + cfg.clone(), + ); assert_eq!(state.pubkey, keys.public_key()); assert_eq!(state.metadata, metadata); + assert_eq!(state.bridge_config.enabled, bridge_cfg.enabled); + assert_eq!( + state.bridge_jobs.snapshot().capacity, + bridge_cfg.job_status_retention + ); assert_eq!(state.nip46_config.session_ttl_secs, cfg.session_ttl_secs); assert_eq!(state.nip46_config.perms, cfg.perms); assert_eq!(state.info["version"], env!("CARGO_PKG_VERSION")); diff --git a/src/main.rs b/src/main.rs @@ -69,7 +69,9 @@ mod tests { static TEST_LOCK: Mutex<()> = Mutex::new(()); fn test_guard() -> std::sync::MutexGuard<'static, ()> { - let guard = TEST_LOCK.lock().unwrap_or_else(std::sync::PoisonError::into_inner); + let guard = TEST_LOCK + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); *run_hook() .lock() .unwrap_or_else(std::sync::PoisonError::into_inner) = None; diff --git a/src/transport/jsonrpc/methods/bridge/job_status.rs b/src/transport/jsonrpc/methods/bridge/job_status.rs @@ -0,0 +1,30 @@ +use anyhow::Result; +use jsonrpsee::server::RpcModule; +use serde::Deserialize; + +use crate::core::bridge::store::BridgeJobRecord; +use crate::transport::jsonrpc::{MethodRegistry, RpcContext, RpcError}; + +#[derive(Debug, Deserialize)] +struct BridgeJobStatusParams { + job_id: String, +} + +pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> { + registry.track("bridge.job.status"); + m.register_async_method("bridge.job.status", |params, ctx, _| async move { + let params: BridgeJobStatusParams = params + .parse() + .map_err(|e| RpcError::InvalidParams(e.to_string()))?; + let job_id = params.job_id.trim(); + if job_id.is_empty() { + return Err(RpcError::InvalidParams("missing job_id".to_string())); + } + ctx.state + .bridge_jobs + .get(job_id) + .ok_or_else(|| RpcError::Other(format!("unknown bridge job: {job_id}"))) + .map(|job| -> BridgeJobRecord { job }) + })?; + Ok(()) +} diff --git a/src/transport/jsonrpc/methods/bridge/listing_publish.rs b/src/transport/jsonrpc/methods/bridge/listing_publish.rs @@ -0,0 +1,206 @@ +use anyhow::Result; +use jsonrpsee::server::RpcModule; +use radroots_events::listing::RadrootsListing; +use radroots_events_codec::listing::encode::to_wire_parts; +use radroots_nostr::prelude::{radroots_event_from_nostr, radroots_nostr_build_event}; +use radroots_trade::listing::validation::validate_listing_event; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use crate::core::bridge::publish::{BridgePublishSettings, connect_and_publish_event}; +use crate::core::bridge::store::{BridgeJobRecord, new_listing_publish_job}; +use crate::transport::jsonrpc::{MethodRegistry, RpcContext, RpcError}; + +#[derive(Debug, Deserialize)] +struct BridgeListingPublishParams { + listing: RadrootsListing, + #[serde(default)] + idempotency_key: Option<String>, +} + +#[derive(Clone, Debug, Serialize)] +struct BridgeListingPublishResponse { + deduplicated: bool, + job: BridgeJobRecord, +} + +pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> { + registry.track("bridge.listing.publish"); + m.register_async_method("bridge.listing.publish", |params, ctx, _| async move { + let params: BridgeListingPublishParams = params + .parse() + .map_err(|e| RpcError::InvalidParams(e.to_string()))?; + let response = publish_listing(ctx.as_ref().clone(), params).await?; + Ok::<BridgeListingPublishResponse, RpcError>(response) + })?; + Ok(()) +} + +async fn publish_listing( + ctx: RpcContext, + params: BridgeListingPublishParams, +) -> Result<BridgeListingPublishResponse, RpcError> { + if !ctx.state.bridge_config.enabled { + return Err(RpcError::Other("bridge ingress is disabled".to_string())); + } + + let idempotency_key = normalize_idempotency_key(params.idempotency_key)?; + let listing = + canonicalize_listing_for_embedded_signer(params.listing, &ctx.state.pubkey.to_hex()); + let parts = to_wire_parts(&listing) + .map_err(|error| RpcError::InvalidParams(format!("invalid listing contract: {error}")))?; + let builder = radroots_nostr_build_event(parts.kind, parts.content, parts.tags) + .map_err(|error| RpcError::Other(format!("failed to build listing event: {error}")))?; + let event = builder + .sign_with_keys(&ctx.state.keys) + .map_err(|error| RpcError::Other(format!("failed to sign listing event: {error}")))?; + let canonical = radroots_event_from_nostr(&event); + let validated = validate_listing_event(&canonical) + .map_err(|error| RpcError::InvalidParams(format!("invalid listing contract: {error}")))?; + + let reserved = ctx.state.bridge_jobs.reserve(new_listing_publish_job( + Uuid::new_v4().to_string(), + idempotency_key, + parts.kind, + event.id.to_hex(), + validated.listing_addr, + ctx.state.bridge_config.delivery_policy, + ctx.state.bridge_config.delivery_quorum, + )); + let job = match reserved { + Ok(job) => job, + Err(existing) => { + return Ok(BridgeListingPublishResponse { + deduplicated: true, + job: existing, + }); + } + }; + + let execution = connect_and_publish_event( + &ctx.state.client, + &BridgePublishSettings::from_config(&ctx.state.bridge_config), + &event, + ) + .await; + let job = ctx + .state + .bridge_jobs + .complete(&job.job_id, execution) + .ok_or_else(|| RpcError::Other("bridge job disappeared during completion".to_string()))?; + + Ok(BridgeListingPublishResponse { + deduplicated: false, + job, + }) +} + +fn normalize_idempotency_key(value: Option<String>) -> Result<Option<String>, RpcError> { + let value = value.map(|value| value.trim().to_string()); + match value { + Some(value) if value.is_empty() => Err(RpcError::InvalidParams( + "idempotency_key cannot be empty".to_string(), + )), + Some(value) => Ok(Some(value)), + None => Ok(None), + } +} + +fn canonicalize_listing_for_embedded_signer( + mut listing: RadrootsListing, + signer_pubkey: &str, +) -> RadrootsListing { + if listing.farm.pubkey.trim().is_empty() { + listing.farm.pubkey = signer_pubkey.to_string(); + } + listing +} + +#[cfg(test)] +mod tests { + use radroots_core::{ + RadrootsCoreCurrency, RadrootsCoreDecimal, RadrootsCoreMoney, RadrootsCoreQuantity, + RadrootsCoreQuantityPrice, RadrootsCoreUnit, + }; + use radroots_events::listing::{ + RadrootsListing, RadrootsListingAvailability, RadrootsListingBin, + RadrootsListingDeliveryMethod, RadrootsListingFarmRef, RadrootsListingLocation, + RadrootsListingProduct, + }; + + use super::{canonicalize_listing_for_embedded_signer, normalize_idempotency_key}; + + #[test] + fn normalize_idempotency_key_rejects_empty_values() { + let err = normalize_idempotency_key(Some(" ".to_string())).expect_err("empty key"); + assert!(err.to_string().contains("idempotency_key")); + } + + #[test] + fn canonicalize_listing_sets_missing_farm_pubkey() { + let listing = canonicalize_listing_for_embedded_signer(base_listing(), "abc123"); + assert_eq!(listing.farm.pubkey, "abc123"); + } + + fn base_listing() -> RadrootsListing { + RadrootsListing { + d_tag: "fresh-carrots".to_string(), + farm: RadrootsListingFarmRef { + pubkey: String::new(), + d_tag: "farm-1".to_string(), + }, + product: RadrootsListingProduct { + key: "carrot".to_string(), + title: "Fresh carrots".to_string(), + category: "vegetable".to_string(), + summary: Some("Sweet carrots".to_string()), + process: None, + lot: None, + location: None, + profile: None, + year: None, + }, + primary_bin_id: "bin-1".to_string(), + bins: vec![RadrootsListingBin { + bin_id: "bin-1".to_string(), + quantity: RadrootsCoreQuantity::new( + RadrootsCoreDecimal::from(25), + RadrootsCoreUnit::MassKg, + ), + price_per_canonical_unit: RadrootsCoreQuantityPrice { + amount: RadrootsCoreMoney { + amount: RadrootsCoreDecimal::from(4), + currency: RadrootsCoreCurrency::USD, + }, + quantity: RadrootsCoreQuantity::new( + RadrootsCoreDecimal::from(1), + RadrootsCoreUnit::MassKg, + ), + }, + display_amount: None, + display_unit: None, + display_label: None, + display_price: None, + display_price_unit: None, + }], + resource_area: None, + plot: None, + discounts: None, + inventory_available: Some(RadrootsCoreDecimal::from(25)), + availability: Some(RadrootsListingAvailability::Status { + status: radroots_events::listing::RadrootsListingStatus::Active, + }), + delivery_method: Some(RadrootsListingDeliveryMethod::Pickup), + location: Some(RadrootsListingLocation { + primary: "Shed 1".to_string(), + city: Some("Portland".to_string()), + region: Some("OR".to_string()), + country: Some("US".to_string()), + lat: None, + lng: None, + geohash: None, + }), + images: None, + } + } +} diff --git a/src/transport/jsonrpc/methods/bridge/mod.rs b/src/transport/jsonrpc/methods/bridge/mod.rs @@ -0,0 +1,16 @@ +use anyhow::Result; +use jsonrpsee::server::RpcModule; + +use crate::transport::jsonrpc::{MethodRegistry, RpcContext}; + +mod job_status; +mod listing_publish; +mod status; + +pub fn module(ctx: RpcContext, registry: MethodRegistry) -> Result<RpcModule<RpcContext>> { + let mut m = RpcModule::new(ctx); + status::register(&mut m, &registry)?; + job_status::register(&mut m, &registry)?; + listing_publish::register(&mut m, &registry)?; + Ok(m) +} diff --git a/src/transport/jsonrpc/methods/bridge/status.rs b/src/transport/jsonrpc/methods/bridge/status.rs @@ -0,0 +1,48 @@ +use anyhow::Result; +use jsonrpsee::server::RpcModule; +use serde::Serialize; + +use crate::app::config::BridgeDeliveryPolicy; +use crate::transport::jsonrpc::{MethodRegistry, RpcContext, RpcError}; + +#[derive(Clone, Debug, Serialize)] +struct BridgeStatusResponse { + enabled: bool, + ready: bool, + signer_mode: String, + relay_count: usize, + delivery_policy: BridgeDeliveryPolicy, + #[serde(default, skip_serializing_if = "Option::is_none")] + delivery_quorum: Option<usize>, + publish_max_attempts: usize, + publish_initial_backoff_millis: u64, + publish_max_backoff_millis: u64, + job_status_retention: usize, + retained_jobs: usize, + retained_idempotency_keys: usize, + methods: Vec<String>, +} + +pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> { + registry.track("bridge.status"); + m.register_async_method("bridge.status", |_params, ctx, _| async move { + let relay_count = ctx.state.client.relays().await.len(); + let snapshot = ctx.state.bridge_jobs.snapshot(); + Ok::<BridgeStatusResponse, RpcError>(BridgeStatusResponse { + enabled: ctx.state.bridge_config.enabled, + ready: ctx.state.bridge_config.enabled && relay_count > 0, + signer_mode: "embedded_service_identity".to_string(), + relay_count, + delivery_policy: ctx.state.bridge_config.delivery_policy, + delivery_quorum: ctx.state.bridge_config.delivery_quorum, + publish_max_attempts: ctx.state.bridge_config.publish_max_attempts, + publish_initial_backoff_millis: ctx.state.bridge_config.publish_initial_backoff_millis, + publish_max_backoff_millis: ctx.state.bridge_config.publish_max_backoff_millis, + job_status_retention: ctx.state.bridge_config.job_status_retention, + retained_jobs: snapshot.retained_jobs, + retained_idempotency_keys: snapshot.retained_idempotency_keys, + methods: ctx.methods.list(), + }) + })?; + Ok(()) +} diff --git a/src/transport/jsonrpc/methods/mod.rs b/src/transport/jsonrpc/methods/mod.rs @@ -5,6 +5,7 @@ use jsonrpsee::server::RpcModule; use crate::transport::jsonrpc::{MethodRegistry, RpcContext}; +pub mod bridge; pub mod nip46; pub fn register_all( @@ -12,6 +13,7 @@ pub fn register_all( ctx: RpcContext, registry: MethodRegistry, ) -> Result<()> { + root.merge(bridge::module(ctx.clone(), registry.clone())?)?; root.merge(nip46::module(ctx, registry)?)?; Ok(()) } diff --git a/src/transport/jsonrpc/methods/nip46/connect.rs b/src/transport/jsonrpc/methods/nip46/connect.rs @@ -7,30 +7,20 @@ use tokio::sync::broadcast; use tokio::time::sleep; use uuid::Uuid; -use crate::core::nip46::session::{filter_perms, session_expires_at, Nip46Session}; +use crate::core::nip46::session::{Nip46Session, filter_perms, session_expires_at}; use crate::transport::jsonrpc::nip46::connection::{ - parse_connect_url, - Nip46ConnectInfo, - Nip46ConnectMode, + Nip46ConnectInfo, Nip46ConnectMode, parse_connect_url, }; use crate::transport::jsonrpc::params::DEFAULT_TIMEOUT_SECS; use crate::transport::jsonrpc::{MethodRegistry, RpcContext, RpcError}; +use nostr::JsonUtil; +use nostr::nips::{nip44, nip46::NostrConnectMessage, nip46::NostrConnectRequest}; use radroots_nostr::prelude::{ - radroots_nostr_filter_tag, - radroots_nostr_parse_pubkey, - RadrootsNostrClient, - RadrootsNostrEventBuilder, - RadrootsNostrFilter, - RadrootsNostrKind, - RadrootsNostrKeys, - RadrootsNostrPublicKey, - RadrootsNostrSecretKey, - RadrootsNostrRelayPoolNotification, - RadrootsNostrSubscriptionId, - RadrootsNostrTimestamp, + RadrootsNostrClient, RadrootsNostrEventBuilder, RadrootsNostrFilter, RadrootsNostrKeys, + RadrootsNostrKind, RadrootsNostrPublicKey, RadrootsNostrRelayPoolNotification, + RadrootsNostrSecretKey, RadrootsNostrSubscriptionId, RadrootsNostrTimestamp, + radroots_nostr_filter_tag, radroots_nostr_parse_pubkey, }; -use nostr::nips::{nip44, nip46::NostrConnectMessage, nip46::NostrConnectRequest}; -use nostr::JsonUtil; #[derive(Debug, Deserialize)] struct Nip46ConnectParams { @@ -70,9 +60,7 @@ async fn connect_nip46( let info = parse_connect_url(&url)?; match info.mode { Nip46ConnectMode::Bunker => connect_bunker(ctx, info).await, - Nip46ConnectMode::Nostrconnect => { - connect_nostrconnect(ctx, info, client_secret_key).await - } + Nip46ConnectMode::Nostrconnect => connect_nostrconnect(ctx, info, client_secret_key).await, } } @@ -84,9 +72,10 @@ async fn connect_bunker( return Err(RpcError::InvalidParams("missing relay".to_string())); } - let remote_signer_raw = info.remote_signer_pubkey.as_ref().ok_or_else(|| { - RpcError::InvalidParams("missing remote signer pubkey".to_string()) - })?; + let remote_signer_raw = info + .remote_signer_pubkey + .as_ref() + .ok_or_else(|| RpcError::InvalidParams("missing remote signer pubkey".to_string()))?; let remote_signer_pubkey = radroots_nostr_parse_pubkey(remote_signer_raw) .map_err(|e| RpcError::InvalidParams(format!("invalid remote signer: {e}")))?; @@ -117,8 +106,8 @@ async fn connect_bunker( .await .map_err(|e| RpcError::Other(format!("nip46 connect failed: {e}")))?; - if let Err(error) = send_connect_request(&client, &client_keys, &remote_signer_pubkey, message) - .await + if let Err(error) = + send_connect_request(&client, &client_keys, &remote_signer_pubkey, message).await { client.unsubscribe(&subscription.val).await; return Err(error); @@ -190,9 +179,10 @@ async fn connect_nostrconnect( .map_err(|e| RpcError::InvalidParams(format!("invalid client_secret_key: {e}")))?; let client_keys = RadrootsNostrKeys::new(client_secret_key); let client_pubkey = client_keys.public_key(); - let client_pubkey_raw = info.client_pubkey.as_ref().ok_or_else(|| { - RpcError::InvalidParams("missing client pubkey".to_string()) - })?; + let client_pubkey_raw = info + .client_pubkey + .as_ref() + .ok_or_else(|| RpcError::InvalidParams("missing client pubkey".to_string()))?; let expected_pubkey = radroots_nostr_parse_pubkey(client_pubkey_raw) .map_err(|e| RpcError::InvalidParams(format!("invalid client pubkey: {e}")))?; if expected_pubkey != client_pubkey { @@ -208,13 +198,8 @@ async fn connect_nostrconnect( .wait_for_connection(Duration::from_secs(DEFAULT_TIMEOUT_SECS)) .await; - let (remote_signer_pubkey, response) = wait_for_nostrconnect_response( - &client, - &client_keys, - &client_pubkey, - secret, - ) - .await?; + let (remote_signer_pubkey, response) = + wait_for_nostrconnect_response(&client, &client_keys, &client_pubkey, secret).await?; validate_nostrconnect_response(&response, secret)?; claim_secret(&ctx, info.secret.as_deref()).await?; @@ -369,7 +354,7 @@ fn validate_connect_response( _ => { return Err(RpcError::Other( "nip46 connect response invalid".to_string(), - )) + )); } }; @@ -403,7 +388,7 @@ fn validate_nostrconnect_response( _ => { return Err(RpcError::Other( "nip46 connect response invalid".to_string(), - )) + )); } }; @@ -412,9 +397,7 @@ fn validate_nostrconnect_response( } let Some(value) = result.as_deref() else { - return Err(RpcError::Other( - "nip46 connect missing result".to_string(), - )); + return Err(RpcError::Other("nip46 connect missing result".to_string())); }; if value == secret { diff --git a/src/transport/jsonrpc/methods/nip46/get_public_key.rs b/src/transport/jsonrpc/methods/nip46/get_public_key.rs @@ -62,15 +62,18 @@ async fn request_get_public_key( Some(_) => { return Err(RpcError::Other( "nip46 get_public_key unexpected response".to_string(), - )) + )); } None => { return Err(RpcError::Other( "nip46 get_public_key missing response".to_string(), - )) + )); } }; - let updated = session.user_pubkey.map(|existing| existing != pubkey).unwrap_or(true); + let updated = session + .user_pubkey + .map(|existing| existing != pubkey) + .unwrap_or(true); Ok((pubkey, updated)) } diff --git a/src/transport/jsonrpc/methods/nip46/mod.rs b/src/transport/jsonrpc/methods/nip46/mod.rs @@ -10,12 +10,12 @@ pub mod get_public_key; pub mod nip04; pub mod nip44; pub mod ping; -pub mod sign_event; pub mod session_authorize; pub mod session_close; pub mod session_list; pub mod session_require_auth; pub mod session_status; +pub mod sign_event; pub mod status; pub fn module(ctx: RpcContext, registry: MethodRegistry) -> Result<RpcModule<RpcContext>> { diff --git a/src/transport/jsonrpc/methods/nip46/nip04.rs b/src/transport/jsonrpc/methods/nip46/nip04.rs @@ -45,8 +45,7 @@ pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Res let public_key = radroots_nostr::prelude::radroots_nostr_parse_pubkey(&public_key) .map_err(|e| RpcError::InvalidParams(format!("invalid public_key: {e}")))?; let req = NostrConnectRequest::Nip04Encrypt { public_key, text }; - let response = - client::request(&session, req, "nip04_encrypt").await?; + let response = client::request(&session, req, "nip04_encrypt").await?; let response = response .to_response(NostrConnectMethod::Nip04Encrypt) .map_err(|e| RpcError::Other(format!("nip46 nip04_encrypt failed: {e}")))?; @@ -60,12 +59,12 @@ pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Res Some(_) => { return Err(RpcError::Other( "nip46 nip04_encrypt unexpected response".to_string(), - )) + )); } None => { return Err(RpcError::Other( "nip46 nip04_encrypt missing response".to_string(), - )) + )); } }; Ok::<Nip46Nip04EncryptResponse, RpcError>(Nip46Nip04EncryptResponse { ciphertext }) @@ -88,8 +87,7 @@ pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Res public_key, ciphertext, }; - let response = - client::request(&session, req, "nip04_decrypt").await?; + let response = client::request(&session, req, "nip04_decrypt").await?; let response = response .to_response(NostrConnectMethod::Nip04Decrypt) .map_err(|e| RpcError::Other(format!("nip46 nip04_decrypt failed: {e}")))?; @@ -103,12 +101,12 @@ pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Res Some(_) => { return Err(RpcError::Other( "nip46 nip04_decrypt unexpected response".to_string(), - )) + )); } None => { return Err(RpcError::Other( "nip46 nip04_decrypt missing response".to_string(), - )) + )); } }; Ok::<Nip46Nip04DecryptResponse, RpcError>(Nip46Nip04DecryptResponse { plaintext }) diff --git a/src/transport/jsonrpc/methods/nip46/nip44.rs b/src/transport/jsonrpc/methods/nip46/nip44.rs @@ -45,8 +45,7 @@ pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Res let public_key = radroots_nostr::prelude::radroots_nostr_parse_pubkey(&public_key) .map_err(|e| RpcError::InvalidParams(format!("invalid public_key: {e}")))?; let req = NostrConnectRequest::Nip44Encrypt { public_key, text }; - let response = - client::request(&session, req, "nip44_encrypt").await?; + let response = client::request(&session, req, "nip44_encrypt").await?; let response = response .to_response(NostrConnectMethod::Nip44Encrypt) .map_err(|e| RpcError::Other(format!("nip46 nip44_encrypt failed: {e}")))?; @@ -60,12 +59,12 @@ pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Res Some(_) => { return Err(RpcError::Other( "nip46 nip44_encrypt unexpected response".to_string(), - )) + )); } None => { return Err(RpcError::Other( "nip46 nip44_encrypt missing response".to_string(), - )) + )); } }; Ok::<Nip46Nip44EncryptResponse, RpcError>(Nip46Nip44EncryptResponse { ciphertext }) @@ -88,8 +87,7 @@ pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Res public_key, ciphertext, }; - let response = - client::request(&session, req, "nip44_decrypt").await?; + let response = client::request(&session, req, "nip44_decrypt").await?; let response = response .to_response(NostrConnectMethod::Nip44Decrypt) .map_err(|e| RpcError::Other(format!("nip46 nip44_decrypt failed: {e}")))?; @@ -103,12 +101,12 @@ pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Res Some(_) => { return Err(RpcError::Other( "nip46 nip44_decrypt unexpected response".to_string(), - )) + )); } None => { return Err(RpcError::Other( "nip46 nip44_decrypt missing response".to_string(), - )) + )); } }; Ok::<Nip46Nip44DecryptResponse, RpcError>(Nip46Nip44DecryptResponse { plaintext }) diff --git a/src/transport/jsonrpc/methods/nip46/session_close.rs b/src/transport/jsonrpc/methods/nip46/session_close.rs @@ -21,9 +21,7 @@ pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Res .parse() .map_err(|e| RpcError::InvalidParams(e.to_string()))?; let closed = ctx.state.nip46_sessions.remove(&session_id).await; - Ok::<Nip46SessionCloseResponse, RpcError>(Nip46SessionCloseResponse { - closed, - }) + Ok::<Nip46SessionCloseResponse, RpcError>(Nip46SessionCloseResponse { closed }) })?; Ok(()) } diff --git a/src/transport/jsonrpc/methods/nip46/session_require_auth.rs b/src/transport/jsonrpc/methods/nip46/session_require_auth.rs @@ -20,7 +20,10 @@ struct Nip46SessionRequireAuthResponse { pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Result<()> { registry.track("nip46.session.require_auth"); m.register_async_method("nip46.session.require_auth", |params, ctx, _| async move { - let Nip46SessionRequireAuthParams { session_id, auth_url } = params + let Nip46SessionRequireAuthParams { + session_id, + auth_url, + } = params .parse() .map_err(|e| RpcError::InvalidParams(e.to_string()))?; if auth_url.trim().is_empty() { diff --git a/src/transport/jsonrpc/methods/nip46/sign_event.rs b/src/transport/jsonrpc/methods/nip46/sign_event.rs @@ -24,10 +24,7 @@ pub fn register(m: &mut RpcModule<RpcContext>, registry: &MethodRegistry) -> Res .parse() .map_err(|e| RpcError::InvalidParams(e.to_string()))?; let session = session::get_session(ctx.as_ref(), &session_id).await?; - session::require_sign_event_permission( - &session, - u32::from(event.kind.as_u16()), - )?; + session::require_sign_event_permission(&session, u32::from(event.kind.as_u16()))?; if event.pubkey != session.remote_signer_pubkey { return Err(RpcError::InvalidParams( "event pubkey does not match remote signer".to_string(), diff --git a/src/transport/jsonrpc/nip46/client.rs b/src/transport/jsonrpc/nip46/client.rs @@ -3,22 +3,18 @@ use std::time::Duration; use crate::core::nip46::session::Nip46Session; -use crate::transport::jsonrpc::{params::DEFAULT_TIMEOUT_SECS, RpcError}; -use radroots_nostr::prelude::{ - radroots_nostr_filter_tag, - RadrootsNostrEventBuilder, - RadrootsNostrFilter, - RadrootsNostrKind, - RadrootsNostrRelayPoolNotification, - RadrootsNostrSubscriptionId, - RadrootsNostrTimestamp, -}; +use crate::transport::jsonrpc::{RpcError, params::DEFAULT_TIMEOUT_SECS}; +use nostr::JsonUtil; +use nostr::UnsignedEvent; use nostr::nips::{ nip44, nip46::{NostrConnectMessage, NostrConnectMethod, NostrConnectRequest, ResponseResult}, }; -use nostr::JsonUtil; -use nostr::UnsignedEvent; +use radroots_nostr::prelude::{ + RadrootsNostrEventBuilder, RadrootsNostrFilter, RadrootsNostrKind, + RadrootsNostrRelayPoolNotification, RadrootsNostrSubscriptionId, RadrootsNostrTimestamp, + radroots_nostr_filter_tag, +}; use tokio::sync::broadcast; use tokio::time::sleep; @@ -42,13 +38,9 @@ pub async fn sign_event( Some(_) => { return Err(RpcError::Other(format!( "nip46 {label} unexpected response" - ))) - } - None => { - return Err(RpcError::Other(format!( - "nip46 {label} missing response" - ))) + ))); } + None => return Err(RpcError::Other(format!("nip46 {label} missing response"))), }; event @@ -95,7 +87,14 @@ pub async fn request( return Err(error); } - wait_for_response(session, &request_id, label, notifications, &subscription.val).await + wait_for_response( + session, + &request_id, + label, + notifications, + &subscription.val, + ) + .await } fn response_filter( diff --git a/src/transport/jsonrpc/nip46/connection.rs b/src/transport/jsonrpc/nip46/connection.rs @@ -51,9 +51,8 @@ pub fn parse_connect_url(raw: &str) -> Result<Nip46ConnectInfo, RpcError> { fn parse_bunker_url(url: &Url) -> Result<Nip46ConnectInfo, RpcError> { let remote_signer_pubkey = url.host_str().map(|host| host.to_string()); - let query: Nip46ConnectQuery = - serde_qs::from_str(url.query().unwrap_or_default()) - .map_err(|e| RpcError::InvalidParams(e.to_string()))?; + let query: Nip46ConnectQuery = serde_qs::from_str(url.query().unwrap_or_default()) + .map_err(|e| RpcError::InvalidParams(e.to_string()))?; let relays = relay_list(query.relay); let perms = parse_perms(query.perms); @@ -77,9 +76,8 @@ fn parse_nostrconnect_url(url: &Url) -> Result<Nip46ConnectInfo, RpcError> { .ok_or_else(|| RpcError::InvalidParams("missing client pubkey".to_string()))?; radroots_nostr_parse_pubkey(&client_pubkey) .map_err(|e| RpcError::InvalidParams(format!("invalid client pubkey: {e}")))?; - let query: Nip46ConnectQuery = - serde_qs::from_str(url.query().unwrap_or_default()) - .map_err(|e| RpcError::InvalidParams(e.to_string()))?; + let query: Nip46ConnectQuery = serde_qs::from_str(url.query().unwrap_or_default()) + .map_err(|e| RpcError::InvalidParams(e.to_string()))?; let relays = relay_list(query.relay); let perms = parse_perms(query.perms); diff --git a/src/transport/jsonrpc/nip46/session.rs b/src/transport/jsonrpc/nip46/session.rs @@ -1,10 +1,7 @@ -use crate::core::nip46::session::{sign_event_allowed, Nip46Session}; +use crate::core::nip46::session::{Nip46Session, sign_event_allowed}; use crate::transport::jsonrpc::{RpcContext, RpcError}; -pub async fn get_session( - ctx: &RpcContext, - session_id: &str, -) -> Result<Nip46Session, RpcError> { +pub async fn get_session(ctx: &RpcContext, session_id: &str) -> Result<Nip46Session, RpcError> { ctx.state .nip46_sessions .get(session_id) @@ -23,10 +20,7 @@ pub fn require_permission(session: &Nip46Session, perm: &str) -> Result<(), RpcE } } -pub fn require_sign_event_permission( - session: &Nip46Session, - kind: u32, -) -> Result<(), RpcError> { +pub fn require_sign_event_permission(session: &Nip46Session, kind: u32) -> Result<(), RpcError> { if session.auth_required && !session.authorized { return Err(auth_required_error(session)); } @@ -38,9 +32,6 @@ pub fn require_sign_event_permission( } fn auth_required_error(session: &Nip46Session) -> RpcError { - let url = session - .auth_url - .as_deref() - .unwrap_or("auth required"); + let url = session.auth_url.as_deref().unwrap_or("auth required"); RpcError::Other(format!("auth_url:{url}")) } diff --git a/src/transport/nostr/listener.rs b/src/transport/nostr/listener.rs @@ -1,32 +1,22 @@ use std::time::Duration; -use anyhow::{anyhow, Result}; +use anyhow::{Result, anyhow}; +use nostr::JsonUtil; use nostr::nips::nip04; use nostr::nips::nip44; use nostr::nips::nip46::{ - NostrConnectMessage, - NostrConnectRequest, - NostrConnectResponse, - ResponseResult, + NostrConnectMessage, NostrConnectRequest, NostrConnectResponse, ResponseResult, }; -use nostr::JsonUtil; use tokio::sync::broadcast; use tracing::{info, warn}; use crate::core::nip46::session::{ - session_expires_at, - sign_event_allowed, - Nip46Session, - PendingNostrRequest, + Nip46Session, PendingNostrRequest, session_expires_at, sign_event_allowed, }; use crate::core::state::Radrootsd; use radroots_nostr::prelude::{ - radroots_nostr_filter_tag, - RadrootsNostrEventBuilder, - RadrootsNostrFilter, - RadrootsNostrKind, - RadrootsNostrRelayPoolNotification, - RadrootsNostrTimestamp, + RadrootsNostrEventBuilder, RadrootsNostrFilter, RadrootsNostrKind, + RadrootsNostrRelayPoolNotification, RadrootsNostrTimestamp, radroots_nostr_filter_tag, }; const DEFAULT_TIMEOUT_SECS: u64 = 10; @@ -49,8 +39,7 @@ async fn run_nip46_listener(radrootsd: Radrootsd) -> Result<()> { let filter = RadrootsNostrFilter::new() .kind(RadrootsNostrKind::NostrConnect) .since(RadrootsNostrTimestamp::now()); - let filter = - radroots_nostr_filter_tag(filter, "p", vec![radrootsd.pubkey.to_hex()])?; + let filter = radroots_nostr_filter_tag(filter, "p", vec![radrootsd.pubkey.to_hex()])?; let mut notifications = radrootsd.client.notifications(); let subscription = radrootsd.client.subscribe(filter, None).await?; @@ -72,17 +61,14 @@ async fn run_nip46_listener(radrootsd: Radrootsd) -> Result<()> { continue; } - let decrypted = match nip44::decrypt( - radrootsd.keys.secret_key(), - &event.pubkey, - &event.content, - ) { - Ok(value) => value, - Err(err) => { - warn!("NIP-46 decrypt failed: {err}"); - continue; - } - }; + let decrypted = + match nip44::decrypt(radrootsd.keys.secret_key(), &event.pubkey, &event.content) { + Ok(value) => value, + Err(err) => { + warn!("NIP-46 decrypt failed: {err}"); + continue; + } + }; let message = match NostrConnectMessage::from_json(&decrypted) { Ok(value) => value, Err(err) => { @@ -102,8 +88,7 @@ async fn run_nip46_listener(radrootsd: Radrootsd) -> Result<()> { continue; } }; - let response = - handle_request(&radrootsd, &event.pubkey, &request_id, request).await; + let response = handle_request(&radrootsd, &event.pubkey, &request_id, request).await; let response_message = NostrConnectMessage::response(request_id, response); let response_event = RadrootsNostrEventBuilder::nostr_connect( &radrootsd.keys, @@ -139,8 +124,7 @@ pub(crate) async fn handle_request( } } let session_id = client_pubkey.to_hex(); - let expires_at = - session_expires_at(radrootsd.nip46_config.session_ttl_secs); + let expires_at = session_expires_at(radrootsd.nip46_config.session_ttl_secs); let session = Nip46Session { id: session_id, client: radrootsd.client.clone(), @@ -188,7 +172,9 @@ pub(crate) async fn handle_request( return NostrConnectResponse::with_error("pubkey mismatch"); } match unsigned.sign_with_keys(&radrootsd.keys) { - Ok(event) => NostrConnectResponse::with_result(ResponseResult::SignEvent(Box::new(event))), + Ok(event) => { + NostrConnectResponse::with_result(ResponseResult::SignEvent(Box::new(event))) + } Err(err) => NostrConnectResponse::with_error(format!("sign_event failed: {err}")), } } @@ -218,10 +204,15 @@ pub(crate) async fn handle_request( Ok(ciphertext) => { NostrConnectResponse::with_result(ResponseResult::Nip04Encrypt { ciphertext }) } - Err(err) => NostrConnectResponse::with_error(format!("nip04_encrypt failed: {err}")), + Err(err) => { + NostrConnectResponse::with_error(format!("nip04_encrypt failed: {err}")) + } } } - NostrConnectRequest::Nip04Decrypt { public_key, ciphertext } => { + NostrConnectRequest::Nip04Decrypt { + public_key, + ciphertext, + } => { let session = match session_for_client(radrootsd, client_pubkey).await { Ok(session) => session, Err(response) => return response, @@ -247,7 +238,9 @@ pub(crate) async fn handle_request( Ok(plaintext) => { NostrConnectResponse::with_result(ResponseResult::Nip04Decrypt { plaintext }) } - Err(err) => NostrConnectResponse::with_error(format!("nip04_decrypt failed: {err}")), + Err(err) => { + NostrConnectResponse::with_error(format!("nip04_decrypt failed: {err}")) + } } } NostrConnectRequest::Nip44Encrypt { public_key, text } => { @@ -272,15 +265,24 @@ pub(crate) async fn handle_request( { return response; } - match nip44::encrypt(radrootsd.keys.secret_key(), &public_key, text, nip44::Version::V2) - { + match nip44::encrypt( + radrootsd.keys.secret_key(), + &public_key, + text, + nip44::Version::V2, + ) { Ok(ciphertext) => { NostrConnectResponse::with_result(ResponseResult::Nip44Encrypt { ciphertext }) } - Err(err) => NostrConnectResponse::with_error(format!("nip44_encrypt failed: {err}")), + Err(err) => { + NostrConnectResponse::with_error(format!("nip44_encrypt failed: {err}")) + } } } - NostrConnectRequest::Nip44Decrypt { public_key, ciphertext } => { + NostrConnectRequest::Nip44Decrypt { + public_key, + ciphertext, + } => { let session = match session_for_client(radrootsd, client_pubkey).await { Ok(session) => session, Err(response) => return response, @@ -306,7 +308,9 @@ pub(crate) async fn handle_request( Ok(plaintext) => { NostrConnectResponse::with_result(ResponseResult::Nip44Decrypt { plaintext }) } - Err(err) => NostrConnectResponse::with_error(format!("nip44_decrypt failed: {err}")), + Err(err) => { + NostrConnectResponse::with_error(format!("nip44_decrypt failed: {err}")) + } } } NostrConnectRequest::Ping => NostrConnectResponse::with_result(ResponseResult::Pong),