rhi

Coordinated trade for connected markets
git clone https://radroots.dev/git/rhi.git
Log | Files | Refs | README | LICENSE

commit 4921ce36b83213651a48e37fcd26815a47c0854b
parent f1dc1c93e541315ad28887f0156a03a893e3b7d6
Author: triesap <tyson@radroots.org>
Date:   Sun, 29 Mar 2026 19:32:10 +0000

trade: consume public trade lane in rhi

Diffstat:
Msrc/features/trade_listing/handlers/dvm.rs | 188+++++++++++++++----------------------------------------------------------------
Msrc/features/trade_listing/subscriber.rs | 14+++++++++++---
2 files changed, 45 insertions(+), 157 deletions(-)

diff --git a/src/features/trade_listing/handlers/dvm.rs b/src/features/trade_listing/handlers/dvm.rs @@ -265,16 +265,18 @@ pub async fn handle_event( } let tag_slices: Vec<Vec<String>> = tags.iter().map(|t| t.as_slice().to_vec()).collect(); - let rhi_pubkey = keys.public_key().to_string(); - if !tag_has_value(&tag_slices, "p", &rhi_pubkey) { - return Err(TradeListingDvmError::MissingRecipient); - } let envelope: TradeListingEnvelope<serde_json::Value> = serde_json::from_str(&event.content)?; envelope.validate()?; if envelope.message_type.kind() != kind { return Err(TradeListingDvmError::TagMismatch("kind")); } + if envelope.message_type.is_service() { + let rhi_pubkey = keys.public_key().to_string(); + if !tag_has_value(&tag_slices, "p", &rhi_pubkey) { + return Err(TradeListingDvmError::MissingRecipient); + } + } let listing_addr = tag_value(&tag_slices, "a").ok_or(TradeListingDvmError::MissingTag("a"))?; if listing_addr != envelope.listing_addr { @@ -648,15 +650,7 @@ async fn handle_order_request( drop(state); - send_envelope( - client, - payload.seller_pubkey.clone(), - TradeListingMessageType::OrderRequest, - &payload.listing_addr, - Some(order_id), - &payload, - ) - .await + Ok(()) } async fn handle_order_response( @@ -664,7 +658,7 @@ async fn handle_order_response( payload: TradeOrderResponse, _listing_addr: &TradeListingAddress, order_id: Option<&str>, - client: &RadrootsNostrClient, + _client: &RadrootsNostrClient, state: &Arc<tokio::sync::Mutex<TradeListingState>>, ) -> Result<(), TradeListingDvmError> { let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; @@ -689,19 +683,9 @@ async fn handle_order_response( order.status = next_status; order.seen_event_ids.insert(event_id); - let buyer = order.buyer_pubkey.clone(); - let listing_addr_str = order.listing_addr.clone(); drop(state); - send_envelope( - client, - buyer, - TradeListingMessageType::OrderResponse, - &listing_addr_str, - Some(order_id), - &payload, - ) - .await + Ok(()) } #[cfg_attr(all(not(test), coverage_nightly), coverage(off))] @@ -710,7 +694,7 @@ async fn handle_order_revision( payload: TradeOrderRevision, listing_addr: &TradeListingAddress, order_id: Option<&str>, - client: &RadrootsNostrClient, + _client: &RadrootsNostrClient, state: &Arc<tokio::sync::Mutex<TradeListingState>>, ) -> Result<(), TradeListingDvmError> { let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; @@ -734,19 +718,9 @@ async fn handle_order_revision( ensure_transition(order.status.clone(), TradeOrderStatus::Revised)?; order.status = TradeOrderStatus::Revised; order.seen_event_ids.insert(event_id); - let buyer = order.buyer_pubkey.clone(); - let listing_addr_str = order.listing_addr.clone(); drop(state); - send_envelope( - client, - buyer, - TradeListingMessageType::OrderRevision, - &listing_addr_str, - Some(order_id), - &payload, - ) - .await + Ok(()) } async fn handle_order_revision_response( @@ -755,7 +729,7 @@ async fn handle_order_revision_response( payload: TradeOrderRevisionResponse, _listing_addr: &TradeListingAddress, order_id: Option<&str>, - client: &RadrootsNostrClient, + _client: &RadrootsNostrClient, state: &Arc<tokio::sync::Mutex<TradeListingState>>, ) -> Result<(), TradeListingDvmError> { let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; @@ -785,19 +759,9 @@ async fn handle_order_revision_response( ensure_transition(order.status.clone(), next_status.clone())?; order.status = next_status; order.seen_event_ids.insert(event_id); - let seller = order.seller_pubkey.clone(); - let listing_addr_str = order.listing_addr.clone(); drop(state); - send_envelope( - client, - seller, - message_type, - &listing_addr_str, - Some(order_id), - &payload, - ) - .await + Ok(()) } async fn handle_question( @@ -805,7 +769,7 @@ async fn handle_question( payload: TradeQuestion, _listing_addr: &TradeListingAddress, order_id: Option<&str>, - client: &RadrootsNostrClient, + _client: &RadrootsNostrClient, state: &Arc<tokio::sync::Mutex<TradeListingState>>, ) -> Result<(), TradeListingDvmError> { let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; @@ -828,19 +792,9 @@ async fn handle_question( ensure_transition(order.status.clone(), TradeOrderStatus::Questioned)?; order.status = TradeOrderStatus::Questioned; order.seen_event_ids.insert(event_id); - let seller = order.seller_pubkey.clone(); - let listing_addr_str = order.listing_addr.clone(); drop(state); - send_envelope( - client, - seller, - TradeListingMessageType::Question, - &listing_addr_str, - Some(order_id), - &payload, - ) - .await + Ok(()) } #[cfg_attr(all(not(test), coverage_nightly), coverage(off))] @@ -849,7 +803,7 @@ async fn handle_answer( payload: TradeAnswer, listing_addr: &TradeListingAddress, order_id: Option<&str>, - client: &RadrootsNostrClient, + _client: &RadrootsNostrClient, state: &Arc<tokio::sync::Mutex<TradeListingState>>, ) -> Result<(), TradeListingDvmError> { let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; @@ -874,19 +828,9 @@ async fn handle_answer( ensure_transition(order.status.clone(), TradeOrderStatus::Requested)?; order.status = TradeOrderStatus::Requested; order.seen_event_ids.insert(event_id); - let buyer = order.buyer_pubkey.clone(); - let listing_addr_str = order.listing_addr.clone(); drop(state); - send_envelope( - client, - buyer, - TradeListingMessageType::Answer, - &listing_addr_str, - Some(order_id), - &payload, - ) - .await + Ok(()) } async fn handle_discount_request( @@ -894,7 +838,7 @@ async fn handle_discount_request( payload: TradeDiscountRequest, _listing_addr: &TradeListingAddress, order_id: Option<&str>, - client: &RadrootsNostrClient, + _client: &RadrootsNostrClient, state: &Arc<tokio::sync::Mutex<TradeListingState>>, ) -> Result<(), TradeListingDvmError> { let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; @@ -913,19 +857,9 @@ async fn handle_discount_request( return Err(TradeListingDvmError::Unauthorized); } order.seen_event_ids.insert(event_id); - let seller = order.seller_pubkey.clone(); - let listing_addr_str = order.listing_addr.clone(); drop(state); - send_envelope( - client, - seller, - TradeListingMessageType::DiscountRequest, - &listing_addr_str, - Some(order_id), - &payload, - ) - .await + Ok(()) } async fn handle_discount_offer( @@ -933,7 +867,7 @@ async fn handle_discount_offer( payload: TradeDiscountOffer, _listing_addr: &TradeListingAddress, order_id: Option<&str>, - client: &RadrootsNostrClient, + _client: &RadrootsNostrClient, state: &Arc<tokio::sync::Mutex<TradeListingState>>, ) -> Result<(), TradeListingDvmError> { let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; @@ -954,19 +888,9 @@ async fn handle_discount_offer( ensure_transition(order.status.clone(), TradeOrderStatus::Revised)?; order.status = TradeOrderStatus::Revised; order.seen_event_ids.insert(event_id); - let buyer = order.buyer_pubkey.clone(); - let listing_addr_str = order.listing_addr.clone(); drop(state); - send_envelope( - client, - buyer, - TradeListingMessageType::DiscountOffer, - &listing_addr_str, - Some(order_id), - &payload, - ) - .await + Ok(()) } async fn handle_discount_decision( @@ -975,7 +899,7 @@ async fn handle_discount_decision( payload: TradeDiscountDecision, _listing_addr: &TradeListingAddress, order_id: Option<&str>, - client: &RadrootsNostrClient, + _client: &RadrootsNostrClient, state: &Arc<tokio::sync::Mutex<TradeListingState>>, ) -> Result<(), TradeListingDvmError> { let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; @@ -1006,27 +930,17 @@ async fn handle_discount_decision( ensure_transition(order.status.clone(), next_status.clone())?; order.status = next_status; order.seen_event_ids.insert(event_id); - let seller = order.seller_pubkey.clone(); - let listing_addr_str = order.listing_addr.clone(); drop(state); - send_envelope( - client, - seller, - message_type, - &listing_addr_str, - Some(order_id), - &payload, - ) - .await + Ok(()) } async fn handle_cancel( event: &RadrootsNostrEvent, - payload: TradeListingCancel, + _payload: TradeListingCancel, _listing_addr: &TradeListingAddress, order_id: Option<&str>, - client: &RadrootsNostrClient, + _client: &RadrootsNostrClient, state: &Arc<tokio::sync::Mutex<TradeListingState>>, ) -> Result<(), TradeListingDvmError> { let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; @@ -1045,23 +959,9 @@ async fn handle_cancel( ensure_transition(order.status.clone(), TradeOrderStatus::Cancelled)?; order.status = TradeOrderStatus::Cancelled; order.seen_event_ids.insert(event_id); - let recipient = if sender == order.buyer_pubkey { - order.seller_pubkey.clone() - } else { - order.buyer_pubkey.clone() - }; - let listing_addr_str = order.listing_addr.clone(); drop(state); - send_envelope( - client, - recipient, - TradeListingMessageType::Cancel, - &listing_addr_str, - Some(order_id), - &payload, - ) - .await + Ok(()) } async fn handle_fulfillment_update( @@ -1069,7 +969,7 @@ async fn handle_fulfillment_update( payload: TradeFulfillmentUpdate, _listing_addr: &TradeListingAddress, order_id: Option<&str>, - client: &RadrootsNostrClient, + _client: &RadrootsNostrClient, state: &Arc<tokio::sync::Mutex<TradeListingState>>, ) -> Result<(), TradeListingDvmError> { let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; @@ -1088,19 +988,9 @@ async fn handle_fulfillment_update( order.status = next_status; } order.seen_event_ids.insert(event_id); - let buyer = order.buyer_pubkey.clone(); - let listing_addr_str = order.listing_addr.clone(); drop(state); - send_envelope( - client, - buyer, - TradeListingMessageType::FulfillmentUpdate, - &listing_addr_str, - Some(order_id), - &payload, - ) - .await + Ok(()) } async fn handle_receipt( @@ -1108,7 +998,7 @@ async fn handle_receipt( payload: TradeReceipt, _listing_addr: &TradeListingAddress, order_id: Option<&str>, - client: &RadrootsNostrClient, + _client: &RadrootsNostrClient, state: &Arc<tokio::sync::Mutex<TradeListingState>>, ) -> Result<(), TradeListingDvmError> { let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; @@ -1127,19 +1017,9 @@ async fn handle_receipt( order.status = next_status; } order.seen_event_ids.insert(event_id); - let seller = order.seller_pubkey.clone(); - let listing_addr_str = order.listing_addr.clone(); drop(state); - send_envelope( - client, - seller, - TradeListingMessageType::Receipt, - &listing_addr_str, - Some(order_id), - &payload, - ) - .await + Ok(()) } #[cfg_attr(coverage_nightly, coverage(off))] @@ -2628,12 +2508,12 @@ mod tests { let missing_recipient = make_event( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_ORDER_REQ), + custom_trade_kind(KIND_TRADE_LISTING_VALIDATE_REQ), make_envelope_content( - TradeListingMessageType::OrderRequest, + TradeListingMessageType::ListingValidateRequest, &listing_addr, - Some(order_id), - json!({}), + None, + json!({"listing_event": null}), ), Vec::new(), ); diff --git a/src/features/trade_listing/subscriber.rs b/src/features/trade_listing/subscriber.rs @@ -5,7 +5,7 @@ use std::convert::TryFrom; use std::time::Duration; use anyhow::{Result, anyhow}; -use radroots_events::kinds::TRADE_LISTING_KINDS; +use radroots_events::kinds::{TRADE_LISTING_KINDS, is_trade_service_kind}; use radroots_nostr::prelude::{ RadrootsNostrClient, RadrootsNostrEvent, RadrootsNostrFilter, RadrootsNostrKeys, RadrootsNostrKind, RadrootsNostrRelayPoolNotification, RadrootsNostrTag, @@ -206,6 +206,10 @@ async fn process_event_notification( }; let state = runtime.state(); + let event_kind = match event.kind { + RadrootsNostrKind::Custom(v) => Some(u32::from(v)), + _ => None, + }; if let Err(err) = handle_event_io( event.clone(), resolved_tags, @@ -218,8 +222,12 @@ async fn process_event_notification( match err { TradeListingDvmError::MissingRecipient | TradeListingDvmError::UnsupportedKind => {} other => { - if let Err(err) = handle_error_io(other, &event, &client).await { - warn!("trade_listing: failed to send error feedback: {err}"); + if event_kind.is_some_and(is_trade_service_kind) { + if let Err(err) = handle_error_io(other, &event, &client).await { + warn!("trade_listing: failed to send error feedback: {err}"); + } + } else { + warn!("trade_listing: rejected public trade event: {other}"); } runtime.mark_processed_event(created_at).await?; }