radrootsd

JSON-RPC bridge for Radroots event publishing
git clone https://radroots.dev/git/radrootsd.git
Log | Files | Refs | README | LICENSE

commit d0dca92049a7c37fa2197fda209a412649783877
parent 4543e7e8b500dc249624aa104dcc8b9ab20413f5
Author: triesap <tyson@radroots.org>
Date:   Sat, 28 Mar 2026 04:13:19 +0000

bridge: add durable jobs and delegated signer selection

- persist bridge job and idempotency state at the configured bridge state path
- allow bridge listing and order publish methods to sign through requested outbound NIP-46 sessions
- keep embedded service identity as the default signer while documenting state_path and signer_session_id
- validate with cargo metadata, cargo fmt, git diff --check, cargo check, and CARGO_INCREMENTAL=0 cargo test

Diffstat:
Mconfig.toml | 1+
Msrc/app/config.rs | 8++++++++
Msrc/core/bridge/publish.rs | 5+++--
Msrc/core/bridge/store.rs | 244+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------
Msrc/core/state.rs | 6++++++
Msrc/transport/jsonrpc/methods/bridge/listing_publish.rs | 36++++++++++++++++++++----------------
Msrc/transport/jsonrpc/methods/bridge/order_request.rs | 51+++++++++++++++++++++++++++++++--------------------
Msrc/transport/jsonrpc/methods/bridge/shared.rs | 130++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
8 files changed, 425 insertions(+), 56 deletions(-)

diff --git a/config.toml b/config.toml @@ -21,6 +21,7 @@ addr = "127.0.0.1:7070" [config.bridge] enabled = true bearer_token = "change-me" +state_path = "state/bridge-jobs.json" delivery_policy = "any" publish_max_attempts = 2 diff --git a/src/app/config.rs b/src/app/config.rs @@ -2,6 +2,7 @@ use anyhow::{Result, bail}; use radroots_nostr::prelude::RadrootsNostrMetadata; use radroots_runtime::RadrootsNostrServiceConfig; use serde::{Deserialize, Serialize}; +use std::path::PathBuf; fn default_rpc_addr() -> String { "127.0.0.1:7070".to_string() @@ -67,6 +68,10 @@ fn default_bridge_job_status_retention() -> usize { 256 } +fn default_bridge_state_path() -> PathBuf { + PathBuf::from("state/bridge-jobs.json") +} + #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Nip46Config { #[serde(default = "default_nip46_session_ttl_secs")] @@ -128,6 +133,8 @@ pub struct BridgeConfig { pub publish_max_backoff_millis: u64, #[serde(default = "default_bridge_job_status_retention")] pub job_status_retention: usize, + #[serde(default = "default_bridge_state_path")] + pub state_path: PathBuf, } impl Default for BridgeConfig { @@ -142,6 +149,7 @@ impl Default for BridgeConfig { publish_initial_backoff_millis: default_bridge_publish_initial_backoff_millis(), publish_max_backoff_millis: default_bridge_publish_max_backoff_millis(), job_status_retention: default_bridge_job_status_retention(), + state_path: default_bridge_state_path(), } } } diff --git a/src/core/bridge/publish.rs b/src/core/bridge/publish.rs @@ -2,12 +2,12 @@ use std::collections::{BTreeMap, BTreeSet}; use std::time::Duration; use radroots_nostr::prelude::{RadrootsNostrClient, RadrootsNostrOutput, RadrootsNostrRelayUrl}; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use tokio::time::sleep; use crate::app::config::{BridgeConfig, BridgeDeliveryPolicy}; -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct BridgeRelayPublishResult { pub relay_url: String, pub acknowledged: bool, @@ -418,6 +418,7 @@ mod tests { publish_initial_backoff_millis: 125, publish_max_backoff_millis: 500, job_status_retention: 64, + ..BridgeConfig::default() }; assert_eq!( diff --git a/src/core/bridge/store.rs b/src/core/bridge/store.rs @@ -1,12 +1,16 @@ use std::collections::{HashMap, VecDeque}; +use std::path::{Path, PathBuf}; use std::sync::{Arc, RwLock}; -use serde::Serialize; +use serde::{Deserialize, Serialize}; +use thiserror::Error; use crate::app::config::BridgeDeliveryPolicy; use crate::core::bridge::publish::{BridgePublishExecution, BridgeRelayPublishResult}; -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +const BRIDGE_JOB_STORE_VERSION: u32 = 1; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum BridgeJobStatus { Accepted, @@ -14,7 +18,7 @@ pub enum BridgeJobStatus { Failed, } -#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct BridgeJobRecord { pub job_id: String, pub command: String, @@ -54,6 +58,7 @@ pub struct BridgeJobStoreSnapshot { #[derive(Clone)] pub struct BridgeJobStore { inner: Arc<RwLock<BridgeJobStoreInner>>, + persistence: Option<Arc<BridgeJobStorePersistence>>, } #[derive(Debug)] @@ -64,6 +69,37 @@ struct BridgeJobStoreInner { capacity: usize, } +#[derive(Debug, Clone)] +struct BridgeJobStorePersistence { + path: PathBuf, +} + +#[derive(Debug, Serialize, Deserialize)] +struct PersistedBridgeJobStore { + version: u32, + jobs: HashMap<String, BridgeJobRecord>, + idempotency: HashMap<String, String>, + order: VecDeque<String>, +} + +#[derive(Debug, Error)] +pub enum BridgeJobStoreError { + #[error("invalid bridge job store path: {0}")] + InvalidStatePath(PathBuf), + #[error("unsupported bridge job store version: {0}")] + UnsupportedStateVersion(u32), + #[error("bridge job store io error: {0}")] + Io(#[from] std::io::Error), + #[error("bridge job store json error: {0}")] + Json(#[from] serde_json::Error), +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum BridgeJobReservation { + Accepted(BridgeJobRecord), + Duplicate(BridgeJobRecord), +} + impl BridgeJobStore { pub fn new(capacity: usize) -> Self { Self { @@ -73,15 +109,28 @@ impl BridgeJobStore { order: VecDeque::new(), capacity, })), + persistence: None, } } - pub fn reserve(&self, mut record: BridgeJobRecord) -> Result<BridgeJobRecord, BridgeJobRecord> { + pub fn load(path: PathBuf, capacity: usize) -> Result<Self, BridgeJobStoreError> { + let persistence = Arc::new(BridgeJobStorePersistence::new(path)); + let inner = persistence.load(capacity)?; + Ok(Self { + inner: Arc::new(RwLock::new(inner)), + persistence: Some(persistence), + }) + } + + pub fn reserve( + &self, + mut record: BridgeJobRecord, + ) -> Result<BridgeJobReservation, BridgeJobStoreError> { let mut inner = self.inner.write().unwrap_or_else(|e| e.into_inner()); if let Some(idempotency_key) = record.idempotency_key.as_ref() { if let Some(job_id) = inner.idempotency.get(idempotency_key) { if let Some(existing) = inner.jobs.get(job_id) { - return Err(existing.clone()); + return Ok(BridgeJobReservation::Duplicate(existing.clone())); } } } @@ -95,16 +144,21 @@ impl BridgeJobStore { } inner.jobs.insert(record.job_id.clone(), record.clone()); inner.prune(); - Ok(record) + let persisted = persisted_store_from_inner(&inner); + drop(inner); + self.persist_snapshot(&persisted)?; + Ok(BridgeJobReservation::Accepted(record)) } pub fn complete( &self, job_id: &str, execution: BridgePublishExecution, - ) -> Option<BridgeJobRecord> { + ) -> Result<Option<BridgeJobRecord>, BridgeJobStoreError> { let mut inner = self.inner.write().unwrap_or_else(|e| e.into_inner()); - let record = inner.jobs.get_mut(job_id)?; + let Some(record) = inner.jobs.get_mut(job_id) else { + return Ok(None); + }; record.status = if execution.published { BridgeJobStatus::Published } else { @@ -118,7 +172,11 @@ impl BridgeJobStore { record.attempt_summaries = execution.attempt_summaries; record.relay_results = execution.relay_results; record.relay_outcome_summary = execution.relay_outcome_summary; - Some(record.clone()) + let completed = record.clone(); + let persisted = persisted_store_from_inner(&inner); + drop(inner); + self.persist_snapshot(&persisted)?; + Ok(Some(completed)) } pub fn get(&self, job_id: &str) -> Option<BridgeJobRecord> { @@ -138,6 +196,16 @@ impl BridgeJobStore { capacity: inner.capacity, } } + + fn persist_snapshot( + &self, + snapshot: &PersistedBridgeJobStore, + ) -> Result<(), BridgeJobStoreError> { + let Some(persistence) = &self.persistence else { + return Ok(()); + }; + persistence.persist(snapshot) + } } impl BridgeJobStoreInner { @@ -158,10 +226,74 @@ impl BridgeJobStoreInner { } } +impl BridgeJobStorePersistence { + fn new(path: PathBuf) -> Self { + Self { path } + } + + fn load(&self, capacity: usize) -> Result<BridgeJobStoreInner, BridgeJobStoreError> { + if !self.path.exists() { + return Ok(BridgeJobStoreInner { + jobs: HashMap::new(), + idempotency: HashMap::new(), + order: VecDeque::new(), + capacity, + }); + } + + let payload = std::fs::read_to_string(&self.path)?; + let snapshot: PersistedBridgeJobStore = serde_json::from_str(&payload)?; + if snapshot.version != BRIDGE_JOB_STORE_VERSION { + return Err(BridgeJobStoreError::UnsupportedStateVersion( + snapshot.version, + )); + } + let mut inner = BridgeJobStoreInner { + jobs: snapshot.jobs, + idempotency: snapshot.idempotency, + order: snapshot.order, + capacity, + }; + inner.prune(); + Ok(inner) + } + + fn persist(&self, snapshot: &PersistedBridgeJobStore) -> Result<(), BridgeJobStoreError> { + if let Some(parent) = self.path.parent() { + if !parent.as_os_str().is_empty() { + std::fs::create_dir_all(parent)?; + } + } + + let payload = serde_json::to_vec_pretty(snapshot)?; + let temp_path = temp_store_path(&self.path)?; + std::fs::write(&temp_path, payload)?; + std::fs::rename(&temp_path, &self.path)?; + Ok(()) + } +} + +fn persisted_store_from_inner(inner: &BridgeJobStoreInner) -> PersistedBridgeJobStore { + PersistedBridgeJobStore { + version: BRIDGE_JOB_STORE_VERSION, + jobs: inner.jobs.clone(), + idempotency: inner.idempotency.clone(), + order: inner.order.clone(), + } +} + +fn temp_store_path(path: &Path) -> Result<PathBuf, BridgeJobStoreError> { + let file_name = path + .file_name() + .ok_or_else(|| BridgeJobStoreError::InvalidStatePath(path.to_path_buf()))?; + Ok(path.with_file_name(format!("{}.tmp", file_name.to_string_lossy()))) +} + pub fn new_publish_job( command: &str, job_id: String, idempotency_key: Option<String>, + signer_mode: String, event_kind: u32, event_id: String, event_addr: Option<String>, @@ -175,7 +307,7 @@ pub fn new_publish_job( status: BridgeJobStatus::Accepted, requested_at_unix: unix_timestamp_now(), completed_at_unix: None, - signer_mode: "embedded_service_identity".to_string(), + signer_mode, event_kind, event_id: Some(event_id), event_addr, @@ -194,6 +326,7 @@ pub fn new_publish_job( pub fn new_listing_publish_job( job_id: String, idempotency_key: Option<String>, + signer_mode: String, event_kind: u32, event_id: String, event_addr: String, @@ -204,6 +337,7 @@ pub fn new_listing_publish_job( "bridge.listing.publish", job_id, idempotency_key, + signer_mode, event_kind, event_id, Some(event_addr), @@ -215,6 +349,7 @@ pub fn new_listing_publish_job( pub fn new_order_request_job( job_id: String, idempotency_key: Option<String>, + signer_mode: String, event_kind: u32, event_id: String, listing_addr: String, @@ -225,6 +360,7 @@ pub fn new_order_request_job( "bridge.order.request", job_id, idempotency_key, + signer_mode, event_kind, event_id, Some(listing_addr), @@ -245,7 +381,10 @@ mod tests { use crate::app::config::BridgeDeliveryPolicy; use crate::core::bridge::publish::BridgePublishExecution; - use super::{BridgeJobStatus, BridgeJobStore, new_listing_publish_job, new_order_request_job}; + use super::{ + BridgeJobReservation, BridgeJobStatus, BridgeJobStore, PersistedBridgeJobStore, + new_listing_publish_job, new_order_request_job, + }; #[test] fn reserve_returns_existing_job_for_same_idempotency_key() { @@ -253,6 +392,7 @@ mod tests { let first = new_listing_publish_job( "job-1".to_string(), Some("same".to_string()), + "embedded_service_identity".to_string(), 30402, "event-1".to_string(), "30402:author:listing".to_string(), @@ -262,6 +402,7 @@ mod tests { let second = new_listing_publish_job( "job-2".to_string(), Some("same".to_string()), + "embedded_service_identity".to_string(), 30402, "event-2".to_string(), "30402:author:listing".to_string(), @@ -269,8 +410,14 @@ mod tests { None, ); - assert!(store.reserve(first.clone()).is_ok()); - let existing = store.reserve(second).expect_err("same idempotency key"); + assert!(matches!( + store.reserve(first.clone()).expect("reserve"), + BridgeJobReservation::Accepted(_) + )); + let existing = match store.reserve(second).expect("same idempotency key") { + BridgeJobReservation::Duplicate(existing) => existing, + BridgeJobReservation::Accepted(_) => panic!("expected duplicate reservation"), + }; assert_eq!(existing.job_id, first.job_id); assert_eq!(existing.status, BridgeJobStatus::Accepted); } @@ -281,13 +428,17 @@ mod tests { let job = new_listing_publish_job( "job-1".to_string(), None, + "embedded_service_identity".to_string(), 30402, "event-1".to_string(), "30402:author:listing".to_string(), BridgeDeliveryPolicy::Any, None, ); - store.reserve(job).expect("reserve job"); + assert!(matches!( + store.reserve(job).expect("reserve job"), + BridgeJobReservation::Accepted(_) + )); let completed = store .complete( @@ -304,7 +455,8 @@ mod tests { attempt_summaries: vec!["attempt 1".to_string()], }, ) - .expect("complete job"); + .expect("complete job") + .expect("record"); assert_eq!(completed.status, BridgeJobStatus::Published); assert_eq!(completed.attempt_count, 1); @@ -318,6 +470,7 @@ mod tests { let first = new_listing_publish_job( "job-1".to_string(), Some("first".to_string()), + "embedded_service_identity".to_string(), 30402, "event-1".to_string(), "30402:author:listing-1".to_string(), @@ -327,6 +480,7 @@ mod tests { let second = new_listing_publish_job( "job-2".to_string(), Some("second".to_string()), + "embedded_service_identity".to_string(), 30402, "event-2".to_string(), "30402:author:listing-2".to_string(), @@ -334,8 +488,14 @@ mod tests { None, ); - store.reserve(first).expect("first"); - store.reserve(second).expect("second"); + assert!(matches!( + store.reserve(first).expect("first"), + BridgeJobReservation::Accepted(_) + )); + assert!(matches!( + store.reserve(second).expect("second"), + BridgeJobReservation::Accepted(_) + )); assert!(store.get("job-1").is_none()); assert!(store.get("job-2").is_some()); @@ -347,6 +507,7 @@ mod tests { let job = new_order_request_job( "job-1".to_string(), Some("same".to_string()), + "nip46_session:session-1".to_string(), 5322, "event-1".to_string(), "30402:author:listing".to_string(), @@ -356,5 +517,54 @@ mod tests { assert_eq!(job.command, "bridge.order.request"); assert_eq!(job.event_addr.as_deref(), Some("30402:author:listing")); + assert_eq!(job.signer_mode, "nip46_session:session-1"); + } + + #[test] + fn load_recovers_persisted_jobs_and_idempotency() { + let nanos = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("time") + .as_nanos(); + let path = std::env::temp_dir().join(format!("radrootsd-bridge-jobs-{nanos}.json")); + let store = BridgeJobStore::load(path.clone(), 8).expect("load empty store"); + let first = new_listing_publish_job( + "job-1".to_string(), + Some("same".to_string()), + "embedded_service_identity".to_string(), + 30402, + "event-1".to_string(), + "30402:author:listing".to_string(), + BridgeDeliveryPolicy::Any, + None, + ); + assert!(matches!( + store.reserve(first).expect("reserve first"), + BridgeJobReservation::Accepted(_) + )); + + let loaded = BridgeJobStore::load(path.clone(), 8).expect("reload store"); + let duplicate = new_listing_publish_job( + "job-2".to_string(), + Some("same".to_string()), + "embedded_service_identity".to_string(), + 30402, + "event-2".to_string(), + "30402:author:listing".to_string(), + BridgeDeliveryPolicy::Any, + None, + ); + let existing = match loaded.reserve(duplicate).expect("dedupe after reload") { + BridgeJobReservation::Duplicate(existing) => existing, + BridgeJobReservation::Accepted(_) => panic!("expected duplicate reservation"), + }; + assert_eq!(existing.job_id, "job-1"); + + let payload = std::fs::read_to_string(&path).expect("persisted payload"); + let persisted: PersistedBridgeJobStore = + serde_json::from_str(&payload).expect("persisted store"); + assert_eq!(persisted.version, 1); + + let _ = std::fs::remove_file(path); } } diff --git a/src/core/state.rs b/src/core/state.rs @@ -36,6 +36,12 @@ impl Radrootsd { "build": option_env!("GIT_HASH").unwrap_or("unknown"), }); let bridge_signer = RadrootsNostrEmbeddedSignerBackend::new_in_memory(identity)?; + #[cfg(not(test))] + let bridge_jobs = crate::core::bridge::store::BridgeJobStore::load( + bridge_config.state_path.clone(), + bridge_config.job_status_retention, + )?; + #[cfg(test)] let bridge_jobs = crate::core::bridge::store::BridgeJobStore::new(bridge_config.job_status_retention); let nip46_sessions = crate::core::nip46::session::Nip46SessionStore::new(); diff --git a/src/transport/jsonrpc/methods/bridge/listing_publish.rs b/src/transport/jsonrpc/methods/bridge/listing_publish.rs @@ -3,7 +3,6 @@ use jsonrpsee::server::RpcModule; use radroots_events::listing::RadrootsListing; use radroots_events_codec::listing::encode::to_wire_parts; use radroots_nostr::prelude::{radroots_event_from_nostr, radroots_nostr_build_event}; -use radroots_nostr_signer::prelude::RadrootsNostrSignerBackend; use radroots_trade::listing::validation::validate_listing_event; use serde::Deserialize; use uuid::Uuid; @@ -12,8 +11,8 @@ use crate::core::bridge::publish::{BridgePublishSettings, connect_and_publish_ev use crate::core::bridge::store::new_listing_publish_job; use crate::transport::jsonrpc::auth::require_bridge_auth; use crate::transport::jsonrpc::methods::bridge::shared::{ - BridgePublishResponse, bridge_signer_pubkey_hex, ensure_bridge_enabled, - normalize_idempotency_key, + BridgePublishResponse, ensure_bridge_enabled, normalize_idempotency_key, resolve_bridge_signer, + sign_bridge_event_builder, }; use crate::transport::jsonrpc::{MethodRegistry, RpcContext, RpcError}; @@ -21,6 +20,8 @@ use crate::transport::jsonrpc::{MethodRegistry, RpcContext, RpcError}; struct BridgeListingPublishParams { listing: RadrootsListing, #[serde(default)] + signer_session_id: Option<String>, + #[serde(default)] idempotency_key: Option<String>, } @@ -46,18 +47,14 @@ async fn publish_listing( ) -> Result<BridgePublishResponse, RpcError> { ensure_bridge_enabled(&ctx)?; let idempotency_key = normalize_idempotency_key(params.idempotency_key)?; - let signer_pubkey = bridge_signer_pubkey_hex(&ctx)?; - let listing = canonicalize_listing_for_embedded_signer(params.listing, signer_pubkey.as_str()); + let signer = resolve_bridge_signer(&ctx, params.signer_session_id.as_deref(), 30402).await?; + let signer_pubkey = signer.signer_pubkey_hex(); + let listing = canonicalize_listing_for_signer(params.listing, signer_pubkey.as_str()); let parts = to_wire_parts(&listing) .map_err(|error| RpcError::InvalidParams(format!("invalid listing contract: {error}")))?; let builder = radroots_nostr_build_event(parts.kind, parts.content, parts.tags) .map_err(|error| RpcError::Other(format!("failed to build listing event: {error}")))?; - let signed = ctx - .state - .bridge_signer - .sign_event_builder(builder) - .map_err(|error| RpcError::Other(format!("failed to sign listing event: {error}")))?; - let event = signed.event; + let event = sign_bridge_event_builder(&ctx, &signer, builder, "bridge.listing.publish").await?; let canonical = radroots_event_from_nostr(&event); let validated = validate_listing_event(&canonical) .map_err(|error| RpcError::InvalidParams(format!("invalid listing contract: {error}")))?; @@ -65,6 +62,7 @@ async fn publish_listing( let reserved = ctx.state.bridge_jobs.reserve(new_listing_publish_job( Uuid::new_v4().to_string(), idempotency_key, + signer.signer_mode(), parts.kind, event.id.to_hex(), validated.listing_addr, @@ -72,13 +70,18 @@ async fn publish_listing( ctx.state.bridge_config.delivery_quorum, )); let job = match reserved { - Ok(job) => job, - Err(existing) => { + Ok(crate::core::bridge::store::BridgeJobReservation::Accepted(job)) => job, + Ok(crate::core::bridge::store::BridgeJobReservation::Duplicate(existing)) => { return Ok(BridgePublishResponse { deduplicated: true, job: existing, }); } + Err(error) => { + return Err(RpcError::Other(format!( + "failed to persist bridge listing job: {error}" + ))); + } }; let execution = connect_and_publish_event( @@ -91,6 +94,7 @@ async fn publish_listing( .state .bridge_jobs .complete(&job.job_id, execution) + .map_err(|error| RpcError::Other(format!("failed to persist bridge listing job: {error}")))? .ok_or_else(|| RpcError::Other("bridge job disappeared during completion".to_string()))?; Ok(BridgePublishResponse { @@ -99,7 +103,7 @@ async fn publish_listing( }) } -fn canonicalize_listing_for_embedded_signer( +fn canonicalize_listing_for_signer( mut listing: RadrootsListing, signer_pubkey: &str, ) -> RadrootsListing { @@ -121,11 +125,11 @@ mod tests { RadrootsListingProduct, }; - use super::canonicalize_listing_for_embedded_signer; + use super::canonicalize_listing_for_signer; #[test] fn canonicalize_listing_sets_missing_farm_pubkey() { - let listing = canonicalize_listing_for_embedded_signer(base_listing(), "abc123"); + let listing = canonicalize_listing_for_signer(base_listing(), "abc123"); assert_eq!(listing.farm.pubkey, "abc123"); } diff --git a/src/transport/jsonrpc/methods/bridge/order_request.rs b/src/transport/jsonrpc/methods/bridge/order_request.rs @@ -2,7 +2,6 @@ use anyhow::Result; use jsonrpsee::server::RpcModule; use radroots_events::kinds::KIND_LISTING; use radroots_nostr::prelude::{radroots_nostr_build_event, radroots_nostr_parse_pubkey}; -use radroots_nostr_signer::prelude::RadrootsNostrSignerBackend; use radroots_trade::listing::{ dvm::{ TradeListingAddress, TradeListingEnvelope, TradeListingMessageType, @@ -17,8 +16,8 @@ use crate::core::bridge::publish::{BridgePublishSettings, connect_and_publish_ev use crate::core::bridge::store::new_order_request_job; use crate::transport::jsonrpc::auth::require_bridge_auth; use crate::transport::jsonrpc::methods::bridge::shared::{ - BridgePublishResponse, bridge_signer_pubkey_hex, ensure_bridge_enabled, - normalize_idempotency_key, + BridgePublishResponse, ensure_bridge_enabled, normalize_idempotency_key, resolve_bridge_signer, + sign_bridge_event_builder, }; use crate::transport::jsonrpc::{MethodRegistry, RpcContext, RpcError}; @@ -26,6 +25,8 @@ use crate::transport::jsonrpc::{MethodRegistry, RpcContext, RpcError}; struct BridgeOrderRequestParams { order: TradeOrder, #[serde(default)] + signer_session_id: Option<String>, + #[serde(default)] idempotency_key: Option<String>, } @@ -52,8 +53,14 @@ async fn publish_order_request( ensure_bridge_enabled(&ctx)?; let idempotency_key = normalize_idempotency_key(params.idempotency_key)?; - let signer_pubkey = bridge_signer_pubkey_hex(&ctx)?; - let order = canonicalize_order_request_for_embedded_signer(params.order, &signer_pubkey)?; + let signer = resolve_bridge_signer( + &ctx, + params.signer_session_id.as_deref(), + u32::from(TradeListingMessageType::OrderRequest.kind()), + ) + .await?; + let signer_pubkey = signer.signer_pubkey_hex(); + let order = canonicalize_order_request_for_signer(params.order, signer_pubkey.as_str())?; let envelope = TradeListingEnvelope::new( TradeListingMessageType::OrderRequest, order.listing_addr.clone(), @@ -75,16 +82,12 @@ async fn publish_order_request( .map_err(|error| { RpcError::Other(format!("failed to build order request event: {error}")) })?; - let signed = ctx - .state - .bridge_signer - .sign_event_builder(builder) - .map_err(|error| RpcError::Other(format!("failed to sign order request event: {error}")))?; - let event = signed.event; + let event = sign_bridge_event_builder(&ctx, &signer, builder, "bridge.order.request").await?; let reserved = ctx.state.bridge_jobs.reserve(new_order_request_job( Uuid::new_v4().to_string(), idempotency_key, + signer.signer_mode(), u32::from(TradeListingMessageType::OrderRequest.kind()), event.id.to_hex(), order.listing_addr.clone(), @@ -92,13 +95,18 @@ async fn publish_order_request( ctx.state.bridge_config.delivery_quorum, )); let job = match reserved { - Ok(job) => job, - Err(existing) => { + Ok(crate::core::bridge::store::BridgeJobReservation::Accepted(job)) => job, + Ok(crate::core::bridge::store::BridgeJobReservation::Duplicate(existing)) => { return Ok(BridgePublishResponse { deduplicated: true, job: existing, }); } + Err(error) => { + return Err(RpcError::Other(format!( + "failed to persist bridge order job: {error}" + ))); + } }; let execution = connect_and_publish_event( @@ -111,6 +119,7 @@ async fn publish_order_request( .state .bridge_jobs .complete(&job.job_id, execution) + .map_err(|error| RpcError::Other(format!("failed to persist bridge order job: {error}")))? .ok_or_else(|| RpcError::Other("bridge job disappeared during completion".to_string()))?; Ok(BridgePublishResponse { @@ -119,7 +128,7 @@ async fn publish_order_request( }) } -fn canonicalize_order_request_for_embedded_signer( +fn canonicalize_order_request_for_signer( mut order: TradeOrder, signer_pubkey: &str, ) -> Result<TradeOrder, RpcError> { @@ -147,7 +156,7 @@ fn canonicalize_order_request_for_embedded_signer( }; if buyer_pubkey != signer_pubkey { return Err(RpcError::InvalidParams( - "order.buyer_pubkey must match the bridge signer identity".to_string(), + "order.buyer_pubkey must match the requested bridge signer identity".to_string(), )); } @@ -229,13 +238,13 @@ mod tests { use crate::transport::jsonrpc::{MethodRegistry, RpcContext}; use super::{ - BridgeOrderRequestParams, canonicalize_order_request_for_embedded_signer, - normalize_optional_string, publish_order_request, + BridgeOrderRequestParams, canonicalize_order_request_for_signer, normalize_optional_string, + publish_order_request, }; #[test] fn canonicalize_order_request_sets_missing_buyer_and_seller_pubkeys() { - let order = canonicalize_order_request_for_embedded_signer( + let order = canonicalize_order_request_for_signer( base_order("", "", TradeOrderStatus::Requested), "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", ) @@ -253,7 +262,7 @@ mod tests { #[test] fn canonicalize_order_request_rejects_non_requested_status() { - let err = canonicalize_order_request_for_embedded_signer( + let err = canonicalize_order_request_for_signer( base_order( "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "", @@ -273,7 +282,7 @@ mod tests { TradeOrderStatus::Requested, ); order.items[0].bin_count = 0; - let err = canonicalize_order_request_for_embedded_signer( + let err = canonicalize_order_request_for_signer( order, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", ) @@ -309,6 +318,7 @@ mod tests { let ctx = RpcContext::new(state, MethodRegistry::default()); let params = BridgeOrderRequestParams { order: base_order("", "", TradeOrderStatus::Requested), + signer_session_id: None, idempotency_key: Some("same-key".to_string()), }; @@ -323,6 +333,7 @@ mod tests { ctx, BridgeOrderRequestParams { order: base_order("", "", TradeOrderStatus::Requested), + signer_session_id: None, idempotency_key: Some("same-key".to_string()), }, ) diff --git a/src/transport/jsonrpc/methods/bridge/shared.rs b/src/transport/jsonrpc/methods/bridge/shared.rs @@ -1,8 +1,11 @@ use anyhow::Result; +use nostr::Event; +use radroots_nostr::prelude::RadrootsNostrEventBuilder; use radroots_nostr_signer::prelude::RadrootsNostrSignerBackend; use serde::Serialize; use crate::core::bridge::store::BridgeJobRecord; +use crate::transport::jsonrpc::nip46::{client as nip46_client, session as nip46_session}; use crate::transport::jsonrpc::{RpcContext, RpcError}; #[derive(Clone, Debug, Serialize)] @@ -18,6 +21,33 @@ pub(super) fn ensure_bridge_enabled(ctx: &RpcContext) -> Result<(), RpcError> { Ok(()) } +#[derive(Clone)] +pub(super) enum BridgeSignerSelection { + EmbeddedServiceIdentity { + signer_pubkey_hex: String, + }, + Nip46Session { + session_id: String, + session: crate::core::nip46::session::Nip46Session, + }, +} + +impl BridgeSignerSelection { + pub(super) fn signer_pubkey_hex(&self) -> String { + match self { + Self::EmbeddedServiceIdentity { signer_pubkey_hex } => signer_pubkey_hex.clone(), + Self::Nip46Session { session, .. } => session.remote_signer_pubkey.to_hex(), + } + } + + pub(super) fn signer_mode(&self) -> String { + match self { + Self::EmbeddedServiceIdentity { .. } => "embedded_service_identity".to_string(), + Self::Nip46Session { session_id, .. } => format!("nip46_session:{session_id}"), + } + } +} + pub(super) fn bridge_signer_pubkey_hex(ctx: &RpcContext) -> Result<String, RpcError> { Ok(ctx .state @@ -28,6 +58,49 @@ pub(super) fn bridge_signer_pubkey_hex(ctx: &RpcContext) -> Result<String, RpcEr .public_key_hex) } +pub(super) async fn resolve_bridge_signer( + ctx: &RpcContext, + signer_session_id: Option<&str>, + event_kind: u32, +) -> Result<BridgeSignerSelection, RpcError> { + match signer_session_id + .map(str::trim) + .filter(|value| !value.is_empty()) + { + Some(session_id) => { + let session = nip46_session::get_session(ctx, session_id).await?; + nip46_session::require_sign_event_permission(&session, event_kind)?; + Ok(BridgeSignerSelection::Nip46Session { + session_id: session_id.to_string(), + session, + }) + } + None => Ok(BridgeSignerSelection::EmbeddedServiceIdentity { + signer_pubkey_hex: bridge_signer_pubkey_hex(ctx)?, + }), + } +} + +pub(super) async fn sign_bridge_event_builder( + ctx: &RpcContext, + signer: &BridgeSignerSelection, + builder: RadrootsNostrEventBuilder, + label: &str, +) -> Result<Event, RpcError> { + match signer { + BridgeSignerSelection::EmbeddedServiceIdentity { .. } => ctx + .state + .bridge_signer + .sign_event_builder(builder) + .map(|signed| signed.event) + .map_err(|error| RpcError::Other(format!("failed to sign {label} event: {error}"))), + BridgeSignerSelection::Nip46Session { session, .. } => { + let unsigned = builder.build(session.remote_signer_pubkey); + nip46_client::sign_event(session, unsigned, label).await + } + } +} + pub(super) fn normalize_idempotency_key(value: Option<String>) -> Result<Option<String>, RpcError> { let value = value.map(|value| value.trim().to_string()); match value { @@ -41,11 +114,66 @@ pub(super) fn normalize_idempotency_key(value: Option<String>) -> Result<Option< #[cfg(test)] mod tests { - use super::normalize_idempotency_key; + use radroots_identity::RadrootsIdentity; + use radroots_nostr::prelude::{RadrootsNostrClient, RadrootsNostrKeys, RadrootsNostrMetadata}; + + use crate::app::config::{BridgeConfig, Nip46Config}; + use crate::core::Radrootsd; + use crate::core::nip46::session::Nip46Session; + use crate::transport::jsonrpc::{MethodRegistry, RpcContext}; + + use super::{normalize_idempotency_key, resolve_bridge_signer}; + use std::time::Instant; #[test] fn normalize_idempotency_key_rejects_empty_values() { let err = normalize_idempotency_key(Some(" ".to_string())).expect_err("empty key"); assert!(err.to_string().contains("idempotency_key")); } + + #[tokio::test] + async fn resolve_bridge_signer_prefers_requested_nip46_session() { + let identity = RadrootsIdentity::generate(); + let metadata: RadrootsNostrMetadata = + serde_json::from_str(r#"{"name":"radrootsd-test"}"#).expect("metadata"); + let state = Radrootsd::new( + identity.clone(), + metadata, + BridgeConfig::default(), + Nip46Config::default(), + ) + .expect("state"); + let session_keys = RadrootsNostrKeys::generate(); + state + .nip46_sessions + .insert(Nip46Session { + id: "session-1".to_string(), + client: RadrootsNostrClient::new(session_keys.clone()), + client_keys: session_keys.clone(), + client_pubkey: session_keys.public_key(), + remote_signer_pubkey: session_keys.public_key(), + user_pubkey: None, + relays: vec!["wss://relay.example.com".to_string()], + perms: vec!["sign_event".to_string()], + name: None, + url: None, + image: None, + expires_at: Some(Instant::now() + std::time::Duration::from_secs(60)), + auth_required: false, + authorized: true, + auth_url: None, + pending_request: None, + }) + .await; + let ctx = RpcContext::new(state, MethodRegistry::default()); + + let signer = resolve_bridge_signer(&ctx, Some("session-1"), 30402) + .await + .expect("session signer"); + assert_eq!( + signer.signer_pubkey_hex(), + session_keys.public_key().to_hex() + ); + assert_eq!(signer.signer_mode(), "nip46_session:session-1"); + } }