radrootsd

JSON-RPC bridge for Radroots event publishing
git clone https://radroots.dev/git/radrootsd.git
Log | Files | Refs | README | LICENSE

commit aef910bf5391ed04fe9f660f54d68ad33f3ec3b0
parent d0dca92049a7c37fa2197fda209a412649783877
Author: triesap <tyson@radroots.org>
Date:   Sat, 28 Mar 2026 22:09:06 +0000

bridge: reserve idempotency before signing

Diffstat:
MCargo.lock | 1+
MCargo.toml | 1+
Msrc/core/bridge/publish.rs | 18++++++++++++++++++
Msrc/core/bridge/store.rs | 139++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------------
Msrc/transport/jsonrpc/methods/bridge/listing_publish.rs | 180+++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------------
Msrc/transport/jsonrpc/methods/bridge/order_request.rs | 109+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------------
Msrc/transport/jsonrpc/methods/bridge/shared.rs | 67+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
7 files changed, 407 insertions(+), 108 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -1871,6 +1871,7 @@ dependencies = [ "serde", "serde_json", "serde_qs", + "sha2", "thiserror 2.0.18", "tokio", "tower", diff --git a/Cargo.toml b/Cargo.toml @@ -41,6 +41,7 @@ reqwest = { version = "0.12", default-features = false, features = ["json", "rus serde = { version = "1", default-features = false, features = ["derive"] } serde_json = { version = "1", default-features = false } serde_qs = { version = "1.0" } +sha2 = { version = "0.10" } tokio = { version = "1", features = ["full"] } thiserror = { version = "2" } tower = { version = "0.5.3", features = ["util"] } diff --git a/src/core/bridge/publish.rs b/src/core/bridge/publish.rs @@ -137,6 +137,24 @@ pub async fn connect_and_publish_event( .await } +pub fn failed_prepublish_execution( + settings: &BridgePublishSettings, + summary: impl Into<String>, +) -> BridgePublishExecution { + let summary = summary.into(); + BridgePublishExecution { + published: false, + relay_count: 0, + acknowledged_relay_count: 0, + required_acknowledged_relay_count: 0, + delivery_policy: settings.delivery_policy, + attempt_count: 0, + relay_outcome_summary: summary.clone(), + relay_results: Vec::new(), + attempt_summaries: vec![summary], + } +} + pub async fn publish_with_policy<T, F, Fut>( relays: &[RadrootsNostrRelayUrl], settings: &BridgePublishSettings, diff --git a/src/core/bridge/store.rs b/src/core/bridge/store.rs @@ -8,7 +8,7 @@ use thiserror::Error; use crate::app::config::BridgeDeliveryPolicy; use crate::core::bridge::publish::{BridgePublishExecution, BridgeRelayPublishResult}; -const BRIDGE_JOB_STORE_VERSION: u32 = 1; +const BRIDGE_JOB_STORE_VERSION: u32 = 2; #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] @@ -64,11 +64,17 @@ pub struct BridgeJobStore { #[derive(Debug)] struct BridgeJobStoreInner { jobs: HashMap<String, BridgeJobRecord>, - idempotency: HashMap<String, String>, + idempotency: HashMap<String, BridgeIdempotencyRecord>, order: VecDeque<String>, capacity: usize, } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +struct BridgeIdempotencyRecord { + job_id: String, + request_fingerprint: String, +} + #[derive(Debug, Clone)] struct BridgeJobStorePersistence { path: PathBuf, @@ -78,7 +84,7 @@ struct BridgeJobStorePersistence { struct PersistedBridgeJobStore { version: u32, jobs: HashMap<String, BridgeJobRecord>, - idempotency: HashMap<String, String>, + idempotency: HashMap<String, BridgeIdempotencyRecord>, order: VecDeque<String>, } @@ -92,6 +98,11 @@ pub enum BridgeJobStoreError { Io(#[from] std::io::Error), #[error("bridge job store json error: {0}")] Json(#[from] serde_json::Error), + #[error("idempotency_key `{key}` conflicts with existing bridge job `{existing_job_id}`")] + IdempotencyConflict { + key: String, + existing_job_id: String, + }, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -125,11 +136,18 @@ impl BridgeJobStore { pub fn reserve( &self, mut record: BridgeJobRecord, + request_fingerprint: String, ) -> Result<BridgeJobReservation, BridgeJobStoreError> { let mut inner = self.inner.write().unwrap_or_else(|e| e.into_inner()); if let Some(idempotency_key) = record.idempotency_key.as_ref() { - if let Some(job_id) = inner.idempotency.get(idempotency_key) { - if let Some(existing) = inner.jobs.get(job_id) { + if let Some(existing_idempotency) = inner.idempotency.get(idempotency_key) { + if existing_idempotency.request_fingerprint != request_fingerprint { + return Err(BridgeJobStoreError::IdempotencyConflict { + key: idempotency_key.clone(), + existing_job_id: existing_idempotency.job_id.clone(), + }); + } + if let Some(existing) = inner.jobs.get(&existing_idempotency.job_id) { return Ok(BridgeJobReservation::Duplicate(existing.clone())); } } @@ -138,9 +156,13 @@ impl BridgeJobStore { record.status = BridgeJobStatus::Accepted; inner.order.push_back(record.job_id.clone()); if let Some(idempotency_key) = record.idempotency_key.as_ref() { - inner - .idempotency - .insert(idempotency_key.clone(), record.job_id.clone()); + inner.idempotency.insert( + idempotency_key.clone(), + BridgeIdempotencyRecord { + job_id: record.job_id.clone(), + request_fingerprint, + }, + ); } inner.jobs.insert(record.job_id.clone(), record.clone()); inner.prune(); @@ -153,12 +175,16 @@ impl BridgeJobStore { pub fn complete( &self, job_id: &str, + event_id: Option<String>, execution: BridgePublishExecution, ) -> Result<Option<BridgeJobRecord>, BridgeJobStoreError> { let mut inner = self.inner.write().unwrap_or_else(|e| e.into_inner()); let Some(record) = inner.jobs.get_mut(job_id) else { return Ok(None); }; + if let Some(event_id) = event_id { + record.event_id = Some(event_id); + } record.status = if execution.published { BridgeJobStatus::Published } else { @@ -218,7 +244,12 @@ impl BridgeJobStoreInner { continue; }; if let Some(idempotency_key) = removed.idempotency_key { - if self.idempotency.get(&idempotency_key) == Some(&job_id) { + if self + .idempotency + .get(&idempotency_key) + .map(|record| record.job_id.as_str()) + == Some(job_id.as_str()) + { self.idempotency.remove(&idempotency_key); } } @@ -295,7 +326,7 @@ pub fn new_publish_job( idempotency_key: Option<String>, signer_mode: String, event_kind: u32, - event_id: String, + event_id: Option<String>, event_addr: Option<String>, delivery_policy: BridgeDeliveryPolicy, delivery_quorum: Option<usize>, @@ -309,7 +340,7 @@ pub fn new_publish_job( completed_at_unix: None, signer_mode, event_kind, - event_id: Some(event_id), + event_id, event_addr, delivery_policy, delivery_quorum, @@ -328,7 +359,7 @@ pub fn new_listing_publish_job( idempotency_key: Option<String>, signer_mode: String, event_kind: u32, - event_id: String, + event_id: Option<String>, event_addr: String, delivery_policy: BridgeDeliveryPolicy, delivery_quorum: Option<usize>, @@ -351,7 +382,7 @@ pub fn new_order_request_job( idempotency_key: Option<String>, signer_mode: String, event_kind: u32, - event_id: String, + event_id: Option<String>, listing_addr: String, delivery_policy: BridgeDeliveryPolicy, delivery_quorum: Option<usize>, @@ -394,7 +425,7 @@ mod tests { Some("same".to_string()), "embedded_service_identity".to_string(), 30402, - "event-1".to_string(), + Some("event-1".to_string()), "30402:author:listing".to_string(), BridgeDeliveryPolicy::Any, None, @@ -404,17 +435,22 @@ mod tests { Some("same".to_string()), "embedded_service_identity".to_string(), 30402, - "event-2".to_string(), + Some("event-2".to_string()), "30402:author:listing".to_string(), BridgeDeliveryPolicy::Any, None, ); assert!(matches!( - store.reserve(first.clone()).expect("reserve"), + store + .reserve(first.clone(), "fingerprint-1".to_string()) + .expect("reserve"), BridgeJobReservation::Accepted(_) )); - let existing = match store.reserve(second).expect("same idempotency key") { + let existing = match store + .reserve(second, "fingerprint-1".to_string()) + .expect("same idempotency key") + { BridgeJobReservation::Duplicate(existing) => existing, BridgeJobReservation::Accepted(_) => panic!("expected duplicate reservation"), }; @@ -423,6 +459,39 @@ mod tests { } #[test] + fn reserve_rejects_conflicting_idempotency_key_reuse() { + let store = BridgeJobStore::new(8); + let first = new_listing_publish_job( + "job-1".to_string(), + Some("same".to_string()), + "embedded_service_identity".to_string(), + 30402, + Some("event-1".to_string()), + "30402:author:listing".to_string(), + BridgeDeliveryPolicy::Any, + None, + ); + let second = new_listing_publish_job( + "job-2".to_string(), + Some("same".to_string()), + "embedded_service_identity".to_string(), + 30402, + Some("event-2".to_string()), + "30402:author:listing".to_string(), + BridgeDeliveryPolicy::Any, + None, + ); + + store + .reserve(first, "fingerprint-1".to_string()) + .expect("reserve first"); + let err = store + .reserve(second, "fingerprint-2".to_string()) + .expect_err("conflicting idempotency"); + assert!(err.to_string().contains("conflicts")); + } + + #[test] fn complete_updates_job_record() { let store = BridgeJobStore::new(8); let job = new_listing_publish_job( @@ -430,19 +499,22 @@ mod tests { None, "embedded_service_identity".to_string(), 30402, - "event-1".to_string(), + Some("event-1".to_string()), "30402:author:listing".to_string(), BridgeDeliveryPolicy::Any, None, ); assert!(matches!( - store.reserve(job).expect("reserve job"), + store + .reserve(job, "fingerprint-1".to_string()) + .expect("reserve job"), BridgeJobReservation::Accepted(_) )); let completed = store .complete( "job-1", + Some("event-1".to_string()), BridgePublishExecution { published: true, relay_count: 2, @@ -472,7 +544,7 @@ mod tests { Some("first".to_string()), "embedded_service_identity".to_string(), 30402, - "event-1".to_string(), + Some("event-1".to_string()), "30402:author:listing-1".to_string(), BridgeDeliveryPolicy::Any, None, @@ -482,18 +554,22 @@ mod tests { Some("second".to_string()), "embedded_service_identity".to_string(), 30402, - "event-2".to_string(), + Some("event-2".to_string()), "30402:author:listing-2".to_string(), BridgeDeliveryPolicy::Any, None, ); assert!(matches!( - store.reserve(first).expect("first"), + store + .reserve(first, "fingerprint-1".to_string()) + .expect("first"), BridgeJobReservation::Accepted(_) )); assert!(matches!( - store.reserve(second).expect("second"), + store + .reserve(second, "fingerprint-2".to_string()) + .expect("second"), BridgeJobReservation::Accepted(_) )); @@ -509,7 +585,7 @@ mod tests { Some("same".to_string()), "nip46_session:session-1".to_string(), 5322, - "event-1".to_string(), + Some("event-1".to_string()), "30402:author:listing".to_string(), BridgeDeliveryPolicy::Any, None, @@ -533,13 +609,15 @@ mod tests { Some("same".to_string()), "embedded_service_identity".to_string(), 30402, - "event-1".to_string(), + Some("event-1".to_string()), "30402:author:listing".to_string(), BridgeDeliveryPolicy::Any, None, ); assert!(matches!( - store.reserve(first).expect("reserve first"), + store + .reserve(first, "fingerprint-1".to_string()) + .expect("reserve first"), BridgeJobReservation::Accepted(_) )); @@ -549,12 +627,15 @@ mod tests { Some("same".to_string()), "embedded_service_identity".to_string(), 30402, - "event-2".to_string(), + Some("event-2".to_string()), "30402:author:listing".to_string(), BridgeDeliveryPolicy::Any, None, ); - let existing = match loaded.reserve(duplicate).expect("dedupe after reload") { + let existing = match loaded + .reserve(duplicate, "fingerprint-1".to_string()) + .expect("dedupe after reload") + { BridgeJobReservation::Duplicate(existing) => existing, BridgeJobReservation::Accepted(_) => panic!("expected duplicate reservation"), }; @@ -563,7 +644,7 @@ mod tests { let payload = std::fs::read_to_string(&path).expect("persisted payload"); let persisted: PersistedBridgeJobStore = serde_json::from_str(&payload).expect("persisted store"); - assert_eq!(persisted.version, 1); + assert_eq!(persisted.version, 2); let _ = std::fs::remove_file(path); } diff --git a/src/transport/jsonrpc/methods/bridge/listing_publish.rs b/src/transport/jsonrpc/methods/bridge/listing_publish.rs @@ -7,11 +7,14 @@ use radroots_trade::listing::validation::validate_listing_event; use serde::Deserialize; use uuid::Uuid; -use crate::core::bridge::publish::{BridgePublishSettings, connect_and_publish_event}; +use crate::core::bridge::publish::{ + BridgePublishSettings, connect_and_publish_event, failed_prepublish_execution, +}; use crate::core::bridge::store::new_listing_publish_job; use crate::transport::jsonrpc::auth::require_bridge_auth; use crate::transport::jsonrpc::methods::bridge::shared::{ - BridgePublishResponse, ensure_bridge_enabled, normalize_idempotency_key, resolve_bridge_signer, + BridgePublishResponse, ensure_bridge_enabled, fingerprint_bridge_request, + normalize_idempotency_key, reserve_bridge_job, resolve_bridge_signer, sign_bridge_event_builder, }; use crate::transport::jsonrpc::{MethodRegistry, RpcContext, RpcError}; @@ -50,52 +53,81 @@ async fn publish_listing( let signer = resolve_bridge_signer(&ctx, params.signer_session_id.as_deref(), 30402).await?; let signer_pubkey = signer.signer_pubkey_hex(); let listing = canonicalize_listing_for_signer(params.listing, signer_pubkey.as_str()); + let request_fingerprint = + fingerprint_bridge_request("bridge.listing.publish", &signer, &listing)?; let parts = to_wire_parts(&listing) .map_err(|error| RpcError::InvalidParams(format!("invalid listing contract: {error}")))?; let builder = radroots_nostr_build_event(parts.kind, parts.content, parts.tags) .map_err(|error| RpcError::Other(format!("failed to build listing event: {error}")))?; - let event = sign_bridge_event_builder(&ctx, &signer, builder, "bridge.listing.publish").await?; - let canonical = radroots_event_from_nostr(&event); - let validated = validate_listing_event(&canonical) - .map_err(|error| RpcError::InvalidParams(format!("invalid listing contract: {error}")))?; + let listing_addr = format!("{}:{}:{}", parts.kind, signer_pubkey, listing.d_tag.trim()); - let reserved = ctx.state.bridge_jobs.reserve(new_listing_publish_job( - Uuid::new_v4().to_string(), - idempotency_key, - signer.signer_mode(), - parts.kind, - event.id.to_hex(), - validated.listing_addr, - ctx.state.bridge_config.delivery_policy, - ctx.state.bridge_config.delivery_quorum, - )); + let reserved = reserve_bridge_job( + &ctx, + new_listing_publish_job( + Uuid::new_v4().to_string(), + idempotency_key, + signer.signer_mode(), + parts.kind, + None, + listing_addr, + ctx.state.bridge_config.delivery_policy, + ctx.state.bridge_config.delivery_quorum, + ), + request_fingerprint, + "bridge listing", + )?; let job = match reserved { - Ok(crate::core::bridge::store::BridgeJobReservation::Accepted(job)) => job, - Ok(crate::core::bridge::store::BridgeJobReservation::Duplicate(existing)) => { + crate::core::bridge::store::BridgeJobReservation::Accepted(job) => job, + crate::core::bridge::store::BridgeJobReservation::Duplicate(existing) => { return Ok(BridgePublishResponse { deduplicated: true, job: existing, }); } + }; + + let publish_settings = BridgePublishSettings::from_config(&ctx.state.bridge_config); + let event = + match sign_bridge_event_builder(&ctx, &signer, builder, "bridge.listing.publish").await { + Ok(event) => event, + Err(error) => { + let _ = ctx.state.bridge_jobs.complete( + &job.job_id, + None, + failed_prepublish_execution(&publish_settings, error.to_string()), + ); + return Err(error); + } + }; + let canonical = radroots_event_from_nostr(&event); + let validated = match validate_listing_event(&canonical) { + Ok(validated) => validated, Err(error) => { - return Err(RpcError::Other(format!( - "failed to persist bridge listing job: {error}" + let _ = ctx.state.bridge_jobs.complete( + &job.job_id, + Some(event.id.to_hex()), + failed_prepublish_execution( + &publish_settings, + format!("invalid listing contract: {error}"), + ), + ); + return Err(RpcError::InvalidParams(format!( + "invalid listing contract: {error}" ))); } }; - let execution = connect_and_publish_event( - &ctx.state.client, - &BridgePublishSettings::from_config(&ctx.state.bridge_config), - &event, - ) - .await; + let execution = connect_and_publish_event(&ctx.state.client, &publish_settings, &event).await; let job = ctx .state .bridge_jobs - .complete(&job.job_id, execution) + .complete(&job.job_id, Some(event.id.to_hex()), execution) .map_err(|error| RpcError::Other(format!("failed to persist bridge listing job: {error}")))? .ok_or_else(|| RpcError::Other("bridge job disappeared during completion".to_string()))?; + debug_assert_eq!( + job.event_addr.as_deref(), + Some(validated.listing_addr.as_str()) + ); Ok(BridgePublishResponse { deduplicated: false, @@ -124,8 +156,14 @@ mod tests { RadrootsListingDeliveryMethod, RadrootsListingFarmRef, RadrootsListingLocation, RadrootsListingProduct, }; + use radroots_identity::RadrootsIdentity; + use radroots_nostr::prelude::RadrootsNostrMetadata; + + use crate::app::config::{BridgeConfig, Nip46Config}; + use crate::core::Radrootsd; + use crate::transport::jsonrpc::{MethodRegistry, RpcContext}; - use super::canonicalize_listing_for_signer; + use super::{BridgeListingPublishParams, canonicalize_listing_for_signer, publish_listing}; #[test] fn canonicalize_listing_sets_missing_farm_pubkey() { @@ -133,18 +171,60 @@ mod tests { assert_eq!(listing.farm.pubkey, "abc123"); } + #[tokio::test] + async fn publish_listing_is_job_backed_and_idempotent() { + let identity = RadrootsIdentity::generate(); + let metadata: RadrootsNostrMetadata = + serde_json::from_str(r#"{"name":"radrootsd-test"}"#).expect("metadata"); + let state = Radrootsd::new( + identity, + metadata, + BridgeConfig { + enabled: true, + bearer_token: Some("secret".to_string()), + ..BridgeConfig::default() + }, + Nip46Config::default(), + ) + .expect("state"); + let ctx = RpcContext::new(state, MethodRegistry::default()); + let params = BridgeListingPublishParams { + listing: base_listing(), + signer_session_id: None, + idempotency_key: Some("same-key".to_string()), + }; + + let first = publish_listing(ctx.clone(), params).await.expect("first"); + assert!(!first.deduplicated); + assert_eq!(first.job.command, "bridge.listing.publish"); + assert!(first.job.event_addr.is_some()); + + let second = publish_listing( + ctx, + BridgeListingPublishParams { + listing: base_listing(), + signer_session_id: None, + idempotency_key: Some("same-key".to_string()), + }, + ) + .await + .expect("second"); + assert!(second.deduplicated); + assert_eq!(second.job.job_id, first.job.job_id); + } + fn base_listing() -> RadrootsListing { RadrootsListing { - d_tag: "fresh-carrots".to_string(), + d_tag: "AAAAAAAAAAAAAAAAAAAAAg".to_string(), farm: RadrootsListingFarmRef { pubkey: String::new(), - d_tag: "farm-1".to_string(), + d_tag: "AAAAAAAAAAAAAAAAAAAAAw".to_string(), }, product: RadrootsListingProduct { - key: "carrot".to_string(), - title: "Fresh carrots".to_string(), - category: "vegetable".to_string(), - summary: Some("Sweet carrots".to_string()), + key: "coffee".to_string(), + title: "Coffee".to_string(), + category: "coffee".to_string(), + summary: Some("Single origin coffee".to_string()), process: None, lot: None, location: None, @@ -155,19 +235,19 @@ mod tests { bins: vec![RadrootsListingBin { bin_id: "bin-1".to_string(), quantity: RadrootsCoreQuantity::new( - RadrootsCoreDecimal::from(25), - RadrootsCoreUnit::MassKg, + RadrootsCoreDecimal::from(1000u32), + RadrootsCoreUnit::MassG, ), - price_per_canonical_unit: RadrootsCoreQuantityPrice { - amount: RadrootsCoreMoney { - amount: RadrootsCoreDecimal::from(4), - currency: RadrootsCoreCurrency::USD, - }, - quantity: RadrootsCoreQuantity::new( - RadrootsCoreDecimal::from(1), - RadrootsCoreUnit::MassKg, + price_per_canonical_unit: RadrootsCoreQuantityPrice::new( + RadrootsCoreMoney::new( + RadrootsCoreDecimal::from(20u32), + RadrootsCoreCurrency::USD, ), - }, + RadrootsCoreQuantity::new( + RadrootsCoreDecimal::from(1u32), + RadrootsCoreUnit::MassG, + ), + ), display_amount: None, display_unit: None, display_label: None, @@ -177,16 +257,16 @@ mod tests { resource_area: None, plot: None, discounts: None, - inventory_available: Some(RadrootsCoreDecimal::from(25)), + inventory_available: Some(RadrootsCoreDecimal::from(5u32)), availability: Some(RadrootsListingAvailability::Status { status: radroots_events::listing::RadrootsListingStatus::Active, }), delivery_method: Some(RadrootsListingDeliveryMethod::Pickup), location: Some(RadrootsListingLocation { - primary: "Shed 1".to_string(), - city: Some("Portland".to_string()), - region: Some("OR".to_string()), - country: Some("US".to_string()), + primary: "Farm".to_string(), + city: None, + region: None, + country: None, lat: None, lng: None, geohash: None, diff --git a/src/transport/jsonrpc/methods/bridge/order_request.rs b/src/transport/jsonrpc/methods/bridge/order_request.rs @@ -12,11 +12,14 @@ use radroots_trade::listing::{ use serde::Deserialize; use uuid::Uuid; -use crate::core::bridge::publish::{BridgePublishSettings, connect_and_publish_event}; +use crate::core::bridge::publish::{ + BridgePublishSettings, connect_and_publish_event, failed_prepublish_execution, +}; use crate::core::bridge::store::new_order_request_job; use crate::transport::jsonrpc::auth::require_bridge_auth; use crate::transport::jsonrpc::methods::bridge::shared::{ - BridgePublishResponse, ensure_bridge_enabled, normalize_idempotency_key, resolve_bridge_signer, + BridgePublishResponse, ensure_bridge_enabled, fingerprint_bridge_request, + normalize_idempotency_key, reserve_bridge_job, resolve_bridge_signer, sign_bridge_event_builder, }; use crate::transport::jsonrpc::{MethodRegistry, RpcContext, RpcError}; @@ -61,6 +64,7 @@ async fn publish_order_request( .await?; let signer_pubkey = signer.signer_pubkey_hex(); let order = canonicalize_order_request_for_signer(params.order, signer_pubkey.as_str())?; + let request_fingerprint = fingerprint_bridge_request("bridge.order.request", &signer, &order)?; let envelope = TradeListingEnvelope::new( TradeListingMessageType::OrderRequest, order.listing_addr.clone(), @@ -82,43 +86,51 @@ async fn publish_order_request( .map_err(|error| { RpcError::Other(format!("failed to build order request event: {error}")) })?; - let event = sign_bridge_event_builder(&ctx, &signer, builder, "bridge.order.request").await?; - let reserved = ctx.state.bridge_jobs.reserve(new_order_request_job( - Uuid::new_v4().to_string(), - idempotency_key, - signer.signer_mode(), - u32::from(TradeListingMessageType::OrderRequest.kind()), - event.id.to_hex(), - order.listing_addr.clone(), - ctx.state.bridge_config.delivery_policy, - ctx.state.bridge_config.delivery_quorum, - )); + let reserved = reserve_bridge_job( + &ctx, + new_order_request_job( + Uuid::new_v4().to_string(), + idempotency_key, + signer.signer_mode(), + u32::from(TradeListingMessageType::OrderRequest.kind()), + None, + order.listing_addr.clone(), + ctx.state.bridge_config.delivery_policy, + ctx.state.bridge_config.delivery_quorum, + ), + request_fingerprint, + "bridge order", + )?; let job = match reserved { - Ok(crate::core::bridge::store::BridgeJobReservation::Accepted(job)) => job, - Ok(crate::core::bridge::store::BridgeJobReservation::Duplicate(existing)) => { + crate::core::bridge::store::BridgeJobReservation::Accepted(job) => job, + crate::core::bridge::store::BridgeJobReservation::Duplicate(existing) => { return Ok(BridgePublishResponse { deduplicated: true, job: existing, }); } - Err(error) => { - return Err(RpcError::Other(format!( - "failed to persist bridge order job: {error}" - ))); - } }; - let execution = connect_and_publish_event( - &ctx.state.client, - &BridgePublishSettings::from_config(&ctx.state.bridge_config), - &event, - ) - .await; + let publish_settings = BridgePublishSettings::from_config(&ctx.state.bridge_config); + let event = + match sign_bridge_event_builder(&ctx, &signer, builder, "bridge.order.request").await { + Ok(event) => event, + Err(error) => { + let _ = ctx.state.bridge_jobs.complete( + &job.job_id, + None, + failed_prepublish_execution(&publish_settings, error.to_string()), + ); + return Err(error); + } + }; + + let execution = connect_and_publish_event(&ctx.state.client, &publish_settings, &event).await; let job = ctx .state .bridge_jobs - .complete(&job.job_id, execution) + .complete(&job.job_id, Some(event.id.to_hex()), execution) .map_err(|error| RpcError::Other(format!("failed to persist bridge order job: {error}")))? .ok_or_else(|| RpcError::Other("bridge job disappeared during completion".to_string()))?; @@ -343,6 +355,49 @@ mod tests { assert_eq!(second.job.job_id, first.job.job_id); } + #[tokio::test] + async fn publish_order_request_rejects_conflicting_idempotency_key_reuse() { + let identity = RadrootsIdentity::generate(); + let metadata: RadrootsNostrMetadata = + serde_json::from_str(r#"{"name":"radrootsd-test"}"#).expect("metadata"); + let state = Radrootsd::new( + identity, + metadata, + BridgeConfig { + enabled: true, + bearer_token: Some("secret".to_string()), + ..BridgeConfig::default() + }, + Nip46Config::default(), + ) + .expect("state"); + let ctx = RpcContext::new(state, MethodRegistry::default()); + publish_order_request( + ctx.clone(), + BridgeOrderRequestParams { + order: base_order("", "", TradeOrderStatus::Requested), + signer_session_id: None, + idempotency_key: Some("same-key".to_string()), + }, + ) + .await + .expect("first"); + + let mut conflicting = base_order("", "", TradeOrderStatus::Requested); + conflicting.order_id = "order-2".to_string(); + let err = publish_order_request( + ctx, + BridgeOrderRequestParams { + order: conflicting, + signer_session_id: None, + idempotency_key: Some("same-key".to_string()), + }, + ) + .await + .expect_err("conflicting idempotency"); + assert!(err.to_string().contains("conflicts")); + } + fn base_listing_addr() -> &'static str { "30402:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb:AAAAAAAAAAAAAAAAAAAAAg" } diff --git a/src/transport/jsonrpc/methods/bridge/shared.rs b/src/transport/jsonrpc/methods/bridge/shared.rs @@ -3,8 +3,9 @@ use nostr::Event; use radroots_nostr::prelude::RadrootsNostrEventBuilder; use radroots_nostr_signer::prelude::RadrootsNostrSignerBackend; use serde::Serialize; +use sha2::{Digest, Sha256}; -use crate::core::bridge::store::BridgeJobRecord; +use crate::core::bridge::store::{BridgeJobRecord, BridgeJobReservation, BridgeJobStoreError}; use crate::transport::jsonrpc::nip46::{client as nip46_client, session as nip46_session}; use crate::transport::jsonrpc::{RpcContext, RpcError}; @@ -112,6 +113,47 @@ pub(super) fn normalize_idempotency_key(value: Option<String>) -> Result<Option< } } +#[derive(Serialize)] +struct BridgeRequestFingerprint<'a, T> { + command: &'a str, + signer_mode: &'a str, + signer_pubkey_hex: &'a str, + payload: &'a T, +} + +pub(super) fn fingerprint_bridge_request<T: Serialize>( + command: &str, + signer: &BridgeSignerSelection, + payload: &T, +) -> Result<String, RpcError> { + let payload = serde_json::to_vec(&BridgeRequestFingerprint { + command, + signer_mode: &signer.signer_mode(), + signer_pubkey_hex: &signer.signer_pubkey_hex(), + payload, + }) + .map_err(|error| RpcError::Other(format!("failed to fingerprint bridge request: {error}")))?; + let digest = Sha256::digest(payload); + Ok(format!("{digest:x}")) +} + +pub(super) fn reserve_bridge_job( + ctx: &RpcContext, + record: BridgeJobRecord, + request_fingerprint: String, + label: &str, +) -> Result<BridgeJobReservation, RpcError> { + ctx.state + .bridge_jobs + .reserve(record, request_fingerprint) + .map_err(|error| match error { + BridgeJobStoreError::IdempotencyConflict { .. } => { + RpcError::InvalidParams(error.to_string()) + } + _ => RpcError::Other(format!("failed to persist {label} job: {error}")), + }) +} + #[cfg(test)] mod tests { use radroots_identity::RadrootsIdentity; @@ -122,7 +164,7 @@ mod tests { use crate::core::nip46::session::Nip46Session; use crate::transport::jsonrpc::{MethodRegistry, RpcContext}; - use super::{normalize_idempotency_key, resolve_bridge_signer}; + use super::{fingerprint_bridge_request, normalize_idempotency_key, resolve_bridge_signer}; use std::time::Instant; #[test] @@ -176,4 +218,25 @@ mod tests { ); assert_eq!(signer.signer_mode(), "nip46_session:session-1"); } + + #[test] + fn fingerprint_bridge_request_changes_when_request_changes() { + let signer = super::BridgeSignerSelection::EmbeddedServiceIdentity { + signer_pubkey_hex: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + .to_string(), + }; + let first = fingerprint_bridge_request( + "bridge.order.request", + &signer, + &serde_json::json!({"order_id":"one"}), + ) + .expect("first"); + let second = fingerprint_bridge_request( + "bridge.order.request", + &signer, + &serde_json::json!({"order_id":"two"}), + ) + .expect("second"); + assert_ne!(first, second); + } }