rhi

Coordinated trade for connected markets
git clone https://radroots.dev/git/rhi.git
Log | Files | Refs | README | LICENSE

commit 3616100d243a6f653d3b3295ad506164196c4162
parent 9afdd98d28278cc08660214ede012d6ef821bb4e
Author: triesap <tyson@radroots.org>
Date:   Thu,  2 Apr 2026 21:17:31 +0000

trade: rebuild rhi from public listing and trade chain

Diffstat:
Msrc/features/trade_listing/handlers/dvm.rs | 2283++++++++++++++++++++++++++++++++++++++++++++++++++-----------------------------
Msrc/features/trade_listing/state.rs | 49++++++++++++++++++++++++++++++++++++++++++++++---
Msrc/features/trade_listing/subscriber.rs | 14++++++++++----
Msrc/lib.rs | 8++++----
4 files changed, 1498 insertions(+), 856 deletions(-)

diff --git a/src/features/trade_listing/handlers/dvm.rs b/src/features/trade_listing/handlers/dvm.rs @@ -3,7 +3,7 @@ use std::{sync::Arc, time::Duration}; -use radroots_events::kinds::{KIND_FARM, is_trade_listing_kind}; +use radroots_events::kinds::{KIND_FARM, is_listing_kind, is_trade_kind}; use radroots_events::listing::RadrootsListingFarmRef; use radroots_events::trade::{ RadrootsTradeAnswer as TradeAnswer, RadrootsTradeDiscountDecision as TradeDiscountDecision, @@ -17,6 +17,7 @@ use radroots_events::trade::{ RadrootsTradeListingValidateRequest as TradeListingValidateRequest, RadrootsTradeListingValidateResult as TradeListingValidateResult, RadrootsTradeListingValidationError as TradeListingValidationError, + RadrootsTradeMessagePayload as TradeListingMessagePayload, RadrootsTradeMessageType as TradeListingMessageType, RadrootsTradeOrder as TradeOrder, RadrootsTradeOrderResponse as TradeOrderResponse, RadrootsTradeOrderRevision as TradeOrderRevision, @@ -25,7 +26,9 @@ use radroots_events::trade::{ RadrootsTradeReceipt as TradeReceipt, }; use radroots_events_codec::trade::{ + RadrootsTradeEnvelopeParseError as TradeListingEnvelopeParseError, RadrootsTradeListingAddress as TradeListingAddress, + trade_envelope_from_event, trade_envelope_event_build as trade_listing_envelope_event_build, }; use radroots_nostr::prelude::{ @@ -34,7 +37,9 @@ use radroots_nostr::prelude::{ radroots_nostr_build_event, radroots_nostr_build_event_job_feedback, radroots_nostr_fetch_event_by_id, radroots_nostr_parse_pubkey, radroots_nostr_send_event, }; +use radroots_trade::listing::projection::RadrootsTradeOrderWorkflowMessage; use radroots_trade::listing::validation::validate_listing_event; +#[cfg(test)] use serde::de::DeserializeOwned; use std::convert::TryFrom; use thiserror::Error; @@ -247,7 +252,7 @@ fn validate_listing_event_io( pub async fn handle_event( event: RadrootsNostrEvent, - tags: Vec<RadrootsNostrTag>, + _tags: Vec<RadrootsNostrTag>, keys: RadrootsNostrKeys, client: RadrootsNostrClient, state: Arc<tokio::sync::Mutex<TradeListingState>>, @@ -256,7 +261,11 @@ pub async fn handle_event( RadrootsNostrKind::Custom(v) => u32::from(v), _ => return Err(TradeListingDvmError::UnsupportedKind), }; - if !is_trade_listing_kind(kind) { + if is_listing_kind(kind) { + handle_listing_event(&event, &state).await?; + return Ok(()); + } + if !is_trade_kind(kind) { return Err(TradeListingDvmError::UnsupportedKind); } @@ -264,48 +273,43 @@ pub async fn handle_event( return Ok(()); } - let tag_slices: Vec<Vec<String>> = tags.iter().map(|t| t.as_slice().to_vec()).collect(); - - let envelope: TradeListingEnvelope<serde_json::Value> = serde_json::from_str(&event.content)?; - envelope.validate()?; - if envelope.message_type.kind() != kind { + let envelope_hint: TradeListingEnvelope<serde_json::Value> = serde_json::from_str(&event.content) + .map_err(|error| TradeListingDvmError::InvalidPayload(error.to_string()))?; + if envelope_hint.message_type.kind() != kind { return Err(TradeListingDvmError::TagMismatch("kind")); } - if envelope.message_type.is_service() { + + let tag_slices: Vec<Vec<String>> = event.tags.iter().map(|t| t.as_slice().to_vec()).collect(); + if envelope_hint.message_type.is_service() { let rhi_pubkey = keys.public_key().to_string(); if !tag_has_value(&tag_slices, "p", &rhi_pubkey) { return Err(TradeListingDvmError::MissingRecipient); } } - let listing_addr = tag_value(&tag_slices, "a").ok_or(TradeListingDvmError::MissingTag("a"))?; - if listing_addr != envelope.listing_addr { - return Err(TradeListingDvmError::TagMismatch("a")); + let envelope: TradeListingEnvelope<TradeListingMessagePayload> = + trade_envelope_from_event(&radroots_event_from_nostr(&event)) + .map_err(map_trade_envelope_parse_error)?; + if envelope.payload.message_type() != envelope.message_type { + return Err(TradeListingDvmError::InvalidPayload( + "trade envelope payload does not match message type".to_string(), + )); } let order_id = envelope.order_id.as_deref(); - if envelope.message_type.requires_order_id() { - let tag_order_id = - tag_value(&tag_slices, "d").ok_or(TradeListingDvmError::MissingTag("d"))?; - if Some(tag_order_id.as_str()) != order_id { - return Err(TradeListingDvmError::TagMismatch("d")); - } - } - + let listing_addr = envelope.listing_addr.clone(); let listing_addr_parsed = TradeListingAddress::parse(&listing_addr) .map_err(|_| TradeListingDvmError::InvalidListingAddr)?; - if listing_addr_parsed.kind != 30402 { + if !is_listing_kind(listing_addr_parsed.kind) { return Err(TradeListingDvmError::InvalidListingAddr); } - match envelope.message_type { - TradeListingMessageType::ListingValidateRequest => { - let payload: TradeListingValidateRequest = parse_payload(envelope.payload)?; + match envelope.payload { + TradeListingMessagePayload::ListingValidateRequest(payload) => { handle_listing_validate_request(&event, payload, &listing_addr, &client, &state) .await?; } - TradeListingMessageType::OrderRequest => { - let payload: TradeOrder = parse_payload(envelope.payload)?; + TradeListingMessagePayload::OrderRequest(payload) => { handle_order_request( &event, payload, @@ -316,8 +320,7 @@ pub async fn handle_event( ) .await?; } - TradeListingMessageType::OrderResponse => { - let payload: TradeOrderResponse = parse_payload(envelope.payload)?; + TradeListingMessagePayload::OrderResponse(payload) => { handle_order_response( &event, payload, @@ -328,8 +331,7 @@ pub async fn handle_event( ) .await?; } - TradeListingMessageType::OrderRevision => { - let payload: TradeOrderRevision = parse_payload(envelope.payload)?; + TradeListingMessagePayload::OrderRevision(payload) => { handle_order_revision( &event, payload, @@ -340,9 +342,8 @@ pub async fn handle_event( ) .await?; } - TradeListingMessageType::OrderRevisionAccept - | TradeListingMessageType::OrderRevisionDecline => { - let payload: TradeOrderRevisionResponse = parse_payload(envelope.payload)?; + TradeListingMessagePayload::OrderRevisionAccept(payload) + | TradeListingMessagePayload::OrderRevisionDecline(payload) => { handle_order_revision_response( &event, envelope.message_type, @@ -354,8 +355,7 @@ pub async fn handle_event( ) .await?; } - TradeListingMessageType::Question => { - let payload: TradeQuestion = parse_payload(envelope.payload)?; + TradeListingMessagePayload::Question(payload) => { handle_question( &event, payload, @@ -366,8 +366,7 @@ pub async fn handle_event( ) .await?; } - TradeListingMessageType::Answer => { - let payload: TradeAnswer = parse_payload(envelope.payload)?; + TradeListingMessagePayload::Answer(payload) => { handle_answer( &event, payload, @@ -378,8 +377,7 @@ pub async fn handle_event( ) .await?; } - TradeListingMessageType::DiscountRequest => { - let payload: TradeDiscountRequest = parse_payload(envelope.payload)?; + TradeListingMessagePayload::DiscountRequest(payload) => { handle_discount_request( &event, payload, @@ -390,8 +388,7 @@ pub async fn handle_event( ) .await?; } - TradeListingMessageType::DiscountOffer => { - let payload: TradeDiscountOffer = parse_payload(envelope.payload)?; + TradeListingMessagePayload::DiscountOffer(payload) => { handle_discount_offer( &event, payload, @@ -402,8 +399,8 @@ pub async fn handle_event( ) .await?; } - TradeListingMessageType::DiscountAccept | TradeListingMessageType::DiscountDecline => { - let payload: TradeDiscountDecision = parse_payload(envelope.payload)?; + TradeListingMessagePayload::DiscountAccept(payload) + | TradeListingMessagePayload::DiscountDecline(payload) => { handle_discount_decision( &event, envelope.message_type, @@ -415,8 +412,7 @@ pub async fn handle_event( ) .await?; } - TradeListingMessageType::Cancel => { - let payload: TradeListingCancel = parse_payload(envelope.payload)?; + TradeListingMessagePayload::Cancel(payload) => { handle_cancel( &event, payload, @@ -427,8 +423,7 @@ pub async fn handle_event( ) .await?; } - TradeListingMessageType::FulfillmentUpdate => { - let payload: TradeFulfillmentUpdate = parse_payload(envelope.payload)?; + TradeListingMessagePayload::FulfillmentUpdate(payload) => { handle_fulfillment_update( &event, payload, @@ -439,8 +434,7 @@ pub async fn handle_event( ) .await?; } - TradeListingMessageType::Receipt => { - let payload: TradeReceipt = parse_payload(envelope.payload)?; + TradeListingMessagePayload::Receipt(payload) => { handle_receipt( &event, payload, @@ -451,9 +445,58 @@ pub async fn handle_event( ) .await?; } - TradeListingMessageType::ListingValidateResult => {} + TradeListingMessagePayload::ListingValidateResult(_) => {} + } + + Ok(()) +} + +fn map_trade_envelope_parse_error(error: TradeListingEnvelopeParseError) -> TradeListingDvmError { + match error { + TradeListingEnvelopeParseError::InvalidKind(_) => TradeListingDvmError::UnsupportedKind, + TradeListingEnvelopeParseError::InvalidJson + | TradeListingEnvelopeParseError::InvalidTag(_) => { + TradeListingDvmError::InvalidPayload(error.to_string()) + } + TradeListingEnvelopeParseError::InvalidEnvelope(inner) => { + TradeListingDvmError::InvalidEnvelope(inner) + } + TradeListingEnvelopeParseError::MessageTypeKindMismatch { .. } => { + TradeListingDvmError::TagMismatch("kind") + } + TradeListingEnvelopeParseError::MissingTag(tag) => TradeListingDvmError::MissingTag(tag), + TradeListingEnvelopeParseError::ListingAddrTagMismatch => { + TradeListingDvmError::TagMismatch("a") + } + TradeListingEnvelopeParseError::OrderIdTagMismatch => TradeListingDvmError::TagMismatch("d"), + TradeListingEnvelopeParseError::InvalidListingAddr(_) => { + TradeListingDvmError::InvalidListingAddr + } + } +} + +async fn handle_listing_event( + event: &RadrootsNostrEvent, + state: &Arc<tokio::sync::Mutex<TradeListingState>>, +) -> Result<(), TradeListingDvmError> { + let event_id = event.id.to_string(); + { + let state = state.lock().await; + if state.is_non_order_event_seen(&event_id) { + return Ok(()); + } } + let validated = validate_listing_event(&radroots_event_from_nostr(event)) + .map_err(|error| TradeListingDvmError::InvalidPayload(error.to_string()))?; + let kind = match event.kind { + RadrootsNostrKind::Custom(value) => u32::from(value), + _ => return Err(TradeListingDvmError::UnsupportedKind), + }; + + let mut state = state.lock().await; + state.upsert_listing_event(&validated.listing_addr, &event_id, kind); + state.mark_non_order_event_seen(&event_id); Ok(()) } @@ -561,10 +604,10 @@ async fn send_validate_result( listing_addr: &str, errors: Vec<TradeListingValidationError>, ) -> Result<(), TradeListingDvmError> { - let payload = TradeListingValidateResult { + let payload = TradeListingMessagePayload::ListingValidateResult(TradeListingValidateResult { valid: errors.is_empty(), errors, - }; + }); send_envelope( client, event.pubkey.to_string(), @@ -576,6 +619,81 @@ async fn send_validate_result( .await } +fn workflow_message_from_event( + event: &RadrootsNostrEvent, +) -> Result<RadrootsTradeOrderWorkflowMessage, TradeListingDvmError> { + RadrootsTradeOrderWorkflowMessage::from_event(&radroots_event_from_nostr(event)) + .map_err(|error| TradeListingDvmError::InvalidPayload(error.to_string())) +} + +fn ensure_order_counterparty( + actual: &str, + expected: &str, +) -> Result<(), TradeListingDvmError> { + if actual == expected { + Ok(()) + } else { + Err(TradeListingDvmError::Unauthorized) + } +} + +fn ensure_trade_chain( + order: &TradeOrderState, + message: &RadrootsTradeOrderWorkflowMessage, +) -> Result<(), TradeListingDvmError> { + let root_event_id = message + .root_event_id + .as_deref() + .ok_or(TradeListingDvmError::MissingTag("e:root"))?; + if order.root_event_id.as_deref() != Some(root_event_id) { + return Err(TradeListingDvmError::InvalidOrder); + } + + let prev_event_id = message + .prev_event_id + .as_deref() + .ok_or(TradeListingDvmError::MissingTag("e:prev"))?; + if order.last_event_id.as_deref() != Some(prev_event_id) { + return Err(TradeListingDvmError::InvalidOrder); + } + + Ok(()) +} + +async fn ensure_listing_snapshot( + message: &RadrootsTradeOrderWorkflowMessage, + client: &RadrootsNostrClient, + state: &Arc<tokio::sync::Mutex<TradeListingState>>, +) -> Result<String, TradeListingDvmError> { + let listing_event = message + .listing_event + .as_ref() + .ok_or(TradeListingDvmError::MissingTag("listing_event"))?; + let snapshot_id = listing_event.id.clone(); + + { + let state = state.lock().await; + if state.listing_event_id(&message.listing_addr) == Some(snapshot_id.as_str()) { + return Ok(snapshot_id); + } + } + + let snapshot_event = fetch_event_by_id_io(client, &snapshot_id).await?; + let validated = validate_listing_event_io(&snapshot_event) + .map_err(|error| TradeListingDvmError::InvalidPayload(error.to_string()))?; + if validated.0 != message.listing_addr { + return Err(TradeListingDvmError::InvalidOrder); + } + let snapshot_kind = match snapshot_event.kind { + RadrootsNostrKind::Custom(value) => u32::from(value), + _ => return Err(TradeListingDvmError::InvalidListingAddr), + }; + + let mut state = state.lock().await; + state.upsert_listing_event(&message.listing_addr, &snapshot_id, snapshot_kind); + Ok(snapshot_id) +} + #[cfg_attr(all(not(test), coverage_nightly), coverage(off))] async fn handle_order_request( event: &RadrootsNostrEvent, @@ -585,13 +703,17 @@ async fn handle_order_request( client: &RadrootsNostrClient, state: &Arc<tokio::sync::Mutex<TradeListingState>>, ) -> Result<(), TradeListingDvmError> { + let message = workflow_message_from_event(event)?; let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; if payload.order_id != order_id || payload.listing_addr != listing_addr.as_str() { return Err(TradeListingDvmError::InvalidOrder); } - if payload.status != TradeOrderStatus::Requested { - return Err(TradeListingDvmError::InvalidOrder); + if payload.buyer_pubkey != event.pubkey.to_string() { + return Err(TradeListingDvmError::Unauthorized); } + ensure_order_counterparty(&message.counterparty_pubkey, &payload.seller_pubkey)?; + let listing_snapshot_event_id = ensure_listing_snapshot(&message, client, state).await?; + let event_id = event.id.to_string(); { let state = state.lock().await; @@ -600,31 +722,6 @@ async fn handle_order_request( } } - let validated_event_id = { - let state = state.lock().await; - state - .validated_listing_event_id(&payload.listing_addr) - .map(str::to_owned) - } - .ok_or(TradeListingDvmError::ListingNotValidated)?; - - let latest_listing_event = fetch_listing_by_addr(client, &payload.listing_addr).await?; - let Some(latest_listing_event) = latest_listing_event else { - state - .lock() - .await - .clear_listing_validation(&payload.listing_addr); - return Err(TradeListingDvmError::ListingNotValidated); - }; - - if latest_listing_event.id.to_string() != validated_event_id { - state - .lock() - .await - .clear_listing_validation(&payload.listing_addr); - return Err(TradeListingDvmError::ListingNotValidated); - } - let mut state = state.lock().await; if state.order_exists(order_id) { return Ok(()); @@ -637,7 +734,7 @@ async fn handle_order_request( } let mut seen = std::collections::HashSet::new(); - seen.insert(event.id.to_string()); + seen.insert(event_id.clone()); state.insert_order(TradeOrderState { order_id: order_id.to_string(), @@ -645,6 +742,9 @@ async fn handle_order_request( buyer_pubkey: payload.buyer_pubkey.clone(), seller_pubkey: payload.seller_pubkey.clone(), status: TradeOrderStatus::Requested, + listing_snapshot_event_id: Some(listing_snapshot_event_id), + root_event_id: Some(event_id.clone()), + last_event_id: Some(event_id.clone()), seen_event_ids: seen, }); @@ -661,6 +761,7 @@ async fn handle_order_response( _client: &RadrootsNostrClient, state: &Arc<tokio::sync::Mutex<TradeListingState>>, ) -> Result<(), TradeListingDvmError> { + let message = workflow_message_from_event(event)?; let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; let mut state = state.lock().await; let event_id = event.id.to_string(); @@ -673,6 +774,8 @@ async fn handle_order_response( if order.seller_pubkey != event.pubkey.to_string() { return Err(TradeListingDvmError::Unauthorized); } + ensure_order_counterparty(&message.counterparty_pubkey, &order.buyer_pubkey)?; + ensure_trade_chain(order, &message)?; let next_status = if payload.accepted { TradeOrderStatus::Accepted @@ -681,6 +784,7 @@ async fn handle_order_response( }; ensure_transition(order.status.clone(), next_status.clone())?; order.status = next_status; + order.last_event_id = Some(event_id.clone()); order.seen_event_ids.insert(event_id); drop(state); @@ -691,16 +795,15 @@ async fn handle_order_response( #[cfg_attr(all(not(test), coverage_nightly), coverage(off))] async fn handle_order_revision( event: &RadrootsNostrEvent, - payload: TradeOrderRevision, + _payload: TradeOrderRevision, listing_addr: &TradeListingAddress, order_id: Option<&str>, - _client: &RadrootsNostrClient, + client: &RadrootsNostrClient, state: &Arc<tokio::sync::Mutex<TradeListingState>>, ) -> Result<(), TradeListingDvmError> { + let message = workflow_message_from_event(event)?; + let listing_snapshot_event_id = ensure_listing_snapshot(&message, client, state).await?; let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; - if payload.order_id != order_id { - return Err(TradeListingDvmError::InvalidOrder); - } let mut state = state.lock().await; let event_id = event.id.to_string(); if state.is_event_seen(order_id, &event_id) { @@ -712,11 +815,15 @@ async fn handle_order_revision( if order.seller_pubkey != event.pubkey.to_string() { return Err(TradeListingDvmError::Unauthorized); } + ensure_order_counterparty(&message.counterparty_pubkey, &order.buyer_pubkey)?; + ensure_trade_chain(order, &message)?; if listing_addr.seller_pubkey != order.seller_pubkey { return Err(TradeListingDvmError::Unauthorized); } ensure_transition(order.status.clone(), TradeOrderStatus::Revised)?; order.status = TradeOrderStatus::Revised; + order.listing_snapshot_event_id = Some(listing_snapshot_event_id); + order.last_event_id = Some(event_id.clone()); order.seen_event_ids.insert(event_id); drop(state); @@ -732,6 +839,7 @@ async fn handle_order_revision_response( _client: &RadrootsNostrClient, state: &Arc<tokio::sync::Mutex<TradeListingState>>, ) -> Result<(), TradeListingDvmError> { + let message = workflow_message_from_event(event)?; let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; let mut state = state.lock().await; let event_id = event.id.to_string(); @@ -744,6 +852,8 @@ async fn handle_order_revision_response( if order.buyer_pubkey != event.pubkey.to_string() { return Err(TradeListingDvmError::Unauthorized); } + ensure_order_counterparty(&message.counterparty_pubkey, &order.seller_pubkey)?; + ensure_trade_chain(order, &message)?; if message_type == TradeListingMessageType::OrderRevisionAccept && !payload.accepted { return Err(TradeListingDvmError::InvalidOrder); } @@ -758,6 +868,7 @@ async fn handle_order_revision_response( }; ensure_transition(order.status.clone(), next_status.clone())?; order.status = next_status; + order.last_event_id = Some(event_id.clone()); order.seen_event_ids.insert(event_id); drop(state); @@ -766,18 +877,14 @@ async fn handle_order_revision_response( async fn handle_question( event: &RadrootsNostrEvent, - payload: TradeQuestion, + _payload: TradeQuestion, _listing_addr: &TradeListingAddress, order_id: Option<&str>, _client: &RadrootsNostrClient, state: &Arc<tokio::sync::Mutex<TradeListingState>>, ) -> Result<(), TradeListingDvmError> { + let message = workflow_message_from_event(event)?; let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; - if let Some(ref payload_order_id) = payload.order_id { - if payload_order_id != order_id { - return Err(TradeListingDvmError::InvalidOrder); - } - } let mut state = state.lock().await; let event_id = event.id.to_string(); if state.is_event_seen(order_id, &event_id) { @@ -789,8 +896,11 @@ async fn handle_question( if order.buyer_pubkey != event.pubkey.to_string() { return Err(TradeListingDvmError::Unauthorized); } + ensure_order_counterparty(&message.counterparty_pubkey, &order.seller_pubkey)?; + ensure_trade_chain(order, &message)?; ensure_transition(order.status.clone(), TradeOrderStatus::Questioned)?; order.status = TradeOrderStatus::Questioned; + order.last_event_id = Some(event_id.clone()); order.seen_event_ids.insert(event_id); drop(state); @@ -800,18 +910,14 @@ async fn handle_question( #[cfg_attr(all(not(test), coverage_nightly), coverage(off))] async fn handle_answer( event: &RadrootsNostrEvent, - payload: TradeAnswer, + _payload: TradeAnswer, listing_addr: &TradeListingAddress, order_id: Option<&str>, _client: &RadrootsNostrClient, state: &Arc<tokio::sync::Mutex<TradeListingState>>, ) -> Result<(), TradeListingDvmError> { + let message = workflow_message_from_event(event)?; let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; - if let Some(ref payload_order_id) = payload.order_id { - if payload_order_id != order_id { - return Err(TradeListingDvmError::InvalidOrder); - } - } let mut state = state.lock().await; let event_id = event.id.to_string(); if state.is_event_seen(order_id, &event_id) { @@ -825,8 +931,11 @@ async fn handle_answer( { return Err(TradeListingDvmError::Unauthorized); } + ensure_order_counterparty(&message.counterparty_pubkey, &order.buyer_pubkey)?; + ensure_trade_chain(order, &message)?; ensure_transition(order.status.clone(), TradeOrderStatus::Requested)?; order.status = TradeOrderStatus::Requested; + order.last_event_id = Some(event_id.clone()); order.seen_event_ids.insert(event_id); drop(state); @@ -835,16 +944,15 @@ async fn handle_answer( async fn handle_discount_request( event: &RadrootsNostrEvent, - payload: TradeDiscountRequest, + _payload: TradeDiscountRequest, _listing_addr: &TradeListingAddress, order_id: Option<&str>, - _client: &RadrootsNostrClient, + client: &RadrootsNostrClient, state: &Arc<tokio::sync::Mutex<TradeListingState>>, ) -> Result<(), TradeListingDvmError> { + let message = workflow_message_from_event(event)?; + let listing_snapshot_event_id = ensure_listing_snapshot(&message, client, state).await?; let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; - if payload.order_id != order_id { - return Err(TradeListingDvmError::InvalidOrder); - } let mut state = state.lock().await; let event_id = event.id.to_string(); if state.is_event_seen(order_id, &event_id) { @@ -856,6 +964,10 @@ async fn handle_discount_request( if order.buyer_pubkey != event.pubkey.to_string() { return Err(TradeListingDvmError::Unauthorized); } + ensure_order_counterparty(&message.counterparty_pubkey, &order.seller_pubkey)?; + ensure_trade_chain(order, &message)?; + order.listing_snapshot_event_id = Some(listing_snapshot_event_id); + order.last_event_id = Some(event_id.clone()); order.seen_event_ids.insert(event_id); drop(state); @@ -864,16 +976,15 @@ async fn handle_discount_request( async fn handle_discount_offer( event: &RadrootsNostrEvent, - payload: TradeDiscountOffer, + _payload: TradeDiscountOffer, _listing_addr: &TradeListingAddress, order_id: Option<&str>, - _client: &RadrootsNostrClient, + client: &RadrootsNostrClient, state: &Arc<tokio::sync::Mutex<TradeListingState>>, ) -> Result<(), TradeListingDvmError> { + let message = workflow_message_from_event(event)?; + let listing_snapshot_event_id = ensure_listing_snapshot(&message, client, state).await?; let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; - if payload.order_id != order_id { - return Err(TradeListingDvmError::InvalidOrder); - } let mut state = state.lock().await; let event_id = event.id.to_string(); if state.is_event_seen(order_id, &event_id) { @@ -885,8 +996,12 @@ async fn handle_discount_offer( if order.seller_pubkey != event.pubkey.to_string() { return Err(TradeListingDvmError::Unauthorized); } + ensure_order_counterparty(&message.counterparty_pubkey, &order.buyer_pubkey)?; + ensure_trade_chain(order, &message)?; ensure_transition(order.status.clone(), TradeOrderStatus::Revised)?; order.status = TradeOrderStatus::Revised; + order.listing_snapshot_event_id = Some(listing_snapshot_event_id); + order.last_event_id = Some(event_id.clone()); order.seen_event_ids.insert(event_id); drop(state); @@ -902,6 +1017,7 @@ async fn handle_discount_decision( _client: &RadrootsNostrClient, state: &Arc<tokio::sync::Mutex<TradeListingState>>, ) -> Result<(), TradeListingDvmError> { + let message = workflow_message_from_event(event)?; let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; let mut state = state.lock().await; let event_id = event.id.to_string(); @@ -914,6 +1030,8 @@ async fn handle_discount_decision( if order.buyer_pubkey != event.pubkey.to_string() { return Err(TradeListingDvmError::Unauthorized); } + ensure_order_counterparty(&message.counterparty_pubkey, &order.seller_pubkey)?; + ensure_trade_chain(order, &message)?; let payload_is_accept = matches!(payload, TradeDiscountDecision::Accept { .. }); let payload_is_decline = matches!(payload, TradeDiscountDecision::Decline { .. }); if message_type == TradeListingMessageType::DiscountAccept && !payload_is_accept { @@ -929,6 +1047,7 @@ async fn handle_discount_decision( }; ensure_transition(order.status.clone(), next_status.clone())?; order.status = next_status; + order.last_event_id = Some(event_id.clone()); order.seen_event_ids.insert(event_id); drop(state); @@ -943,6 +1062,7 @@ async fn handle_cancel( _client: &RadrootsNostrClient, state: &Arc<tokio::sync::Mutex<TradeListingState>>, ) -> Result<(), TradeListingDvmError> { + let message = workflow_message_from_event(event)?; let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; let mut state = state.lock().await; let event_id = event.id.to_string(); @@ -956,8 +1076,16 @@ async fn handle_cancel( if sender != order.buyer_pubkey && sender != order.seller_pubkey { return Err(TradeListingDvmError::Unauthorized); } + let expected_counterparty = if sender == order.buyer_pubkey { + order.seller_pubkey.as_str() + } else { + order.buyer_pubkey.as_str() + }; + ensure_order_counterparty(&message.counterparty_pubkey, expected_counterparty)?; + ensure_trade_chain(order, &message)?; ensure_transition(order.status.clone(), TradeOrderStatus::Cancelled)?; order.status = TradeOrderStatus::Cancelled; + order.last_event_id = Some(event_id.clone()); order.seen_event_ids.insert(event_id); drop(state); @@ -972,6 +1100,7 @@ async fn handle_fulfillment_update( _client: &RadrootsNostrClient, state: &Arc<tokio::sync::Mutex<TradeListingState>>, ) -> Result<(), TradeListingDvmError> { + let message = workflow_message_from_event(event)?; let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; let mut state = state.lock().await; let event_id = event.id.to_string(); @@ -984,9 +1113,12 @@ async fn handle_fulfillment_update( if order.seller_pubkey != event.pubkey.to_string() { return Err(TradeListingDvmError::Unauthorized); } + ensure_order_counterparty(&message.counterparty_pubkey, &order.buyer_pubkey)?; + ensure_trade_chain(order, &message)?; if let Some(next_status) = next_status_for_fulfillment_update(&order.status, &payload.status)? { order.status = next_status; } + order.last_event_id = Some(event_id.clone()); order.seen_event_ids.insert(event_id); drop(state); @@ -1001,6 +1133,7 @@ async fn handle_receipt( _client: &RadrootsNostrClient, state: &Arc<tokio::sync::Mutex<TradeListingState>>, ) -> Result<(), TradeListingDvmError> { + let message = workflow_message_from_event(event)?; let order_id = order_id.ok_or(TradeListingDvmError::MissingTag("d"))?; let mut state = state.lock().await; let event_id = event.id.to_string(); @@ -1013,9 +1146,12 @@ async fn handle_receipt( if order.buyer_pubkey != event.pubkey.to_string() { return Err(TradeListingDvmError::Unauthorized); } + ensure_order_counterparty(&message.counterparty_pubkey, &order.seller_pubkey)?; + ensure_trade_chain(order, &message)?; if let Some(next_status) = next_status_for_receipt(&order.status, payload.acknowledged)? { order.status = next_status; } + order.last_event_id = Some(event_id.clone()); order.seen_event_ids.insert(event_id); drop(state); @@ -1023,21 +1159,25 @@ async fn handle_receipt( } #[cfg_attr(coverage_nightly, coverage(off))] -async fn send_envelope<T: serde::Serialize + Clone>( +async fn send_envelope( client: &RadrootsNostrClient, recipient_pubkey: String, message_type: TradeListingMessageType, listing_addr: &str, order_id: Option<&str>, - payload: &T, + payload: &TradeListingMessagePayload, ) -> Result<(), TradeListingDvmError> { let envelope_event = trade_listing_envelope_event_build( recipient_pubkey, message_type, listing_addr, order_id.map(|value| value.to_string()), + None, + None, + None, payload, - )?; + ) + .map_err(|error| TradeListingDvmError::InvalidPayload(error.to_string()))?; let builder = radroots_nostr_build_event( envelope_event.kind as u32, envelope_event.content, @@ -1148,11 +1288,13 @@ async fn validate_farm_dependencies( Ok(errors) } +#[cfg(test)] #[cfg_attr(coverage_nightly, coverage(off))] fn parse_payload<T: DeserializeOwned>(value: serde_json::Value) -> Result<T, TradeListingDvmError> { serde_json::from_value(value).map_err(|e| TradeListingDvmError::InvalidPayload(e.to_string())) } +#[cfg(test)] fn tag_value(tags: &[Vec<String>], key: &str) -> Option<String> { tags.iter().find_map(|t| { if t.get(0).map(|k| k.as_str()) == Some(key) { @@ -1319,6 +1461,8 @@ mod tests { RadrootsTradeFulfillmentUpdate as TradeFulfillmentUpdate, RadrootsTradeListingCancel as TradeListingCancel, RadrootsTradeListingValidateRequest as TradeListingValidateRequest, + RadrootsTradeListingValidateResult as TradeListingValidateResult, + RadrootsTradeMessagePayload as TradeListingMessagePayload, RadrootsTradeMessageType as TradeListingMessageType, RadrootsTradeOrder as TradeOrder, RadrootsTradeOrderResponse as TradeOrderResponse, RadrootsTradeOrderRevision as TradeOrderRevision, @@ -1339,6 +1483,7 @@ mod tests { use tokio::sync::Mutex as AsyncMutex; static TEST_LOCK: Mutex<()> = Mutex::new(()); + const TEST_LISTING_EVENT_ID: &str = "listing-event"; fn test_guard() -> MutexGuard<'static, ()> { let guard = TEST_LOCK.lock().unwrap_or_else(|err| err.into_inner()); @@ -1423,7 +1568,7 @@ mod tests { listing_addr: &str, buyer: &str, seller: &str, - status: TradeOrderStatus, + _status: TradeOrderStatus, ) -> TradeOrder { TradeOrder { order_id: order_id.to_string(), @@ -1432,8 +1577,6 @@ mod tests { seller_pubkey: seller.to_string(), items: Vec::new(), discounts: None, - notes: None, - status, } } @@ -1450,6 +1593,9 @@ mod tests { buyer_pubkey: buyer.to_string(), seller_pubkey: seller.to_string(), status, + listing_snapshot_event_id: Some("listing-event".to_string()), + root_event_id: Some(format!("{order_id}:root")), + last_event_id: Some(format!("{order_id}:root")), seen_event_ids: HashSet::new(), } } @@ -1464,6 +1610,7 @@ mod tests { let state = Arc::new(AsyncMutex::new(TradeListingState::default())); let mut locked = state.lock().await; locked.mark_listing_validated(listing_addr, "validated-listing-event"); + locked.upsert_listing_event(listing_addr, TEST_LISTING_EVENT_ID, 30402); locked.insert_order(make_order_state( order_id, listing_addr, @@ -1501,6 +1648,17 @@ mod tests { listing_addr: &str, order_id: Option<&str>, ) -> Vec<RadrootsNostrTag> { + make_workflow_tags(recipient, listing_addr, order_id, None, None, None) + } + + fn make_workflow_tags( + recipient: &str, + listing_addr: &str, + order_id: Option<&str>, + listing_event_id: Option<&str>, + root_event_id: Option<&str>, + prev_event_id: Option<&str>, + ) -> Vec<RadrootsNostrTag> { let mut tags = vec![ RadrootsNostrTag::custom( RadrootsNostrTagKind::custom("p"), @@ -1517,6 +1675,24 @@ mod tests { vec![order_id.to_string()], )); } + if let Some(listing_event_id) = listing_event_id { + tags.push(RadrootsNostrTag::custom( + RadrootsNostrTagKind::custom("listing_event"), + vec![listing_event_id.to_string()], + )); + } + if let Some(root_event_id) = root_event_id { + tags.push(RadrootsNostrTag::custom( + RadrootsNostrTagKind::custom("e_root"), + vec![root_event_id.to_string()], + )); + } + if let Some(prev_event_id) = prev_event_id { + tags.push(RadrootsNostrTag::custom( + RadrootsNostrTagKind::custom("e_prev"), + vec![prev_event_id.to_string()], + )); + } tags } @@ -1547,182 +1723,362 @@ mod tests { .expect("envelope") } - fn sample_discount_value() -> RadrootsCoreDiscountValue { - RadrootsCoreDiscountValue::MoneyPerBin(RadrootsCoreMoney::from_minor_units_u32( - 100, - RadrootsCoreCurrency::USD, - )) - } - - fn sender_for_message<'a>( + fn make_canonical_envelope_content( message_type: TradeListingMessageType, - seller_keys: &'a RadrootsNostrKeys, - buyer_keys: &'a RadrootsNostrKeys, - ) -> &'a RadrootsNostrKeys { - match message_type { - TradeListingMessageType::OrderResponse - | TradeListingMessageType::OrderRevision - | TradeListingMessageType::Answer - | TradeListingMessageType::DiscountOffer - | TradeListingMessageType::FulfillmentUpdate => seller_keys, - _ => buyer_keys, - } + listing_addr: &str, + order_id: Option<&str>, + payload: TradeListingMessagePayload, + ) -> String { + serde_json::to_string(&TradeListingEnvelope::new( + message_type, + listing_addr.to_string(), + order_id.map(|value| value.to_string()), + payload, + )) + .expect("canonical envelope") } - fn payload_for_message( + fn payload_enum_for_message( message_type: TradeListingMessageType, order_id: &str, listing_addr: &str, buyer_pub: &str, seller_pub: &str, - ) -> serde_json::Value { + ) -> TradeListingMessagePayload { match message_type { - TradeListingMessageType::OrderRequest => serde_json::to_value(make_order( - order_id, - listing_addr, - buyer_pub, - seller_pub, - TradeOrderStatus::Requested, - )) - .expect("order request payload"), - TradeListingMessageType::OrderResponse => serde_json::to_value(TradeOrderResponse { - accepted: true, - reason: None, - }) - .expect("order response payload"), - TradeListingMessageType::OrderRevision => serde_json::to_value(TradeOrderRevision { - revision_id: "r-matrix".to_string(), - order_id: order_id.to_string(), - changes: Vec::new(), - reason: None, - }) - .expect("order revision payload"), + TradeListingMessageType::ListingValidateRequest => { + TradeListingMessagePayload::ListingValidateRequest(TradeListingValidateRequest { + listing_event: Some(RadrootsNostrEventPtr { + id: TEST_LISTING_EVENT_ID.to_string(), + relays: None, + }), + }) + } + TradeListingMessageType::ListingValidateResult => { + TradeListingMessagePayload::ListingValidateResult(TradeListingValidateResult { + valid: true, + errors: Vec::new(), + }) + } + TradeListingMessageType::OrderRequest => TradeListingMessagePayload::OrderRequest( + make_order( + order_id, + listing_addr, + buyer_pub, + seller_pub, + TradeOrderStatus::Requested, + ), + ), + TradeListingMessageType::OrderResponse => { + TradeListingMessagePayload::OrderResponse(TradeOrderResponse { + accepted: true, + reason: None, + }) + } + TradeListingMessageType::OrderRevision => { + TradeListingMessagePayload::OrderRevision(TradeOrderRevision { + revision_id: "r-matrix".to_string(), + changes: Vec::new(), + }) + } TradeListingMessageType::OrderRevisionAccept => { - serde_json::to_value(TradeOrderRevisionResponse { + TradeListingMessagePayload::OrderRevisionAccept(TradeOrderRevisionResponse { accepted: true, reason: None, }) - .expect("order revision accept payload") } TradeListingMessageType::OrderRevisionDecline => { - serde_json::to_value(TradeOrderRevisionResponse { + TradeListingMessagePayload::OrderRevisionDecline(TradeOrderRevisionResponse { accepted: false, reason: None, }) - .expect("order revision decline payload") } - TradeListingMessageType::Question => serde_json::to_value(TradeQuestion { - question_id: "q-matrix".to_string(), - order_id: Some(order_id.to_string()), - listing_addr: Some(listing_addr.to_string()), - question_text: "question".to_string(), - }) - .expect("question payload"), - TradeListingMessageType::Answer => serde_json::to_value(TradeAnswer { + TradeListingMessageType::Question => { + TradeListingMessagePayload::Question(TradeQuestion { + question_id: "q-matrix".to_string(), + }) + } + TradeListingMessageType::Answer => TradeListingMessagePayload::Answer(TradeAnswer { question_id: "q-matrix".to_string(), - order_id: Some(order_id.to_string()), - listing_addr: Some(listing_addr.to_string()), - answer_text: "answer".to_string(), - }) - .expect("answer payload"), + }), TradeListingMessageType::DiscountRequest => { - serde_json::to_value(TradeDiscountRequest { + TradeListingMessagePayload::DiscountRequest(TradeDiscountRequest { + discount_id: "d-matrix".to_string(), + value: sample_discount_value(), + }) + } + TradeListingMessageType::DiscountOffer => { + TradeListingMessagePayload::DiscountOffer(TradeDiscountOffer { discount_id: "d-matrix".to_string(), - order_id: order_id.to_string(), value: sample_discount_value(), - conditions: None, }) - .expect("discount request payload") } - TradeListingMessageType::DiscountOffer => serde_json::to_value(TradeDiscountOffer { - discount_id: "d-matrix".to_string(), - order_id: order_id.to_string(), - value: sample_discount_value(), - conditions: None, - }) - .expect("discount offer payload"), TradeListingMessageType::DiscountAccept => { - serde_json::to_value(TradeDiscountDecision::Accept { + TradeListingMessagePayload::DiscountAccept(TradeDiscountDecision::Accept { value: sample_discount_value(), }) - .expect("discount accept payload") } TradeListingMessageType::DiscountDecline => { - serde_json::to_value(TradeDiscountDecision::Decline { reason: None }) - .expect("discount decline payload") + TradeListingMessagePayload::DiscountDecline(TradeDiscountDecision::Decline { + reason: None, + }) } TradeListingMessageType::Cancel => { - serde_json::to_value(TradeListingCancel { reason: None }).expect("cancel payload") + TradeListingMessagePayload::Cancel(TradeListingCancel { reason: None }) } TradeListingMessageType::FulfillmentUpdate => { - serde_json::to_value(TradeFulfillmentUpdate { + TradeListingMessagePayload::FulfillmentUpdate(TradeFulfillmentUpdate { status: TradeFulfillmentStatus::Shipped, - tracking: None, - eta: None, - notes: None, }) - .expect("fulfillment payload") } - TradeListingMessageType::Receipt => serde_json::to_value(TradeReceipt { + TradeListingMessageType::Receipt => TradeListingMessagePayload::Receipt(TradeReceipt { acknowledged: true, at: 1, - note: None, - }) - .expect("receipt payload"), - TradeListingMessageType::ListingValidateRequest => { - json!({"listing_event":{"id":"listing-event","relays":null}}) - } - TradeListingMessageType::ListingValidateResult => json!({"valid": true, "errors": []}), + }), } } - #[test] - fn transition_matrix_and_tag_helpers_are_covered() { - let _guard = test_guard(); + fn recipient_for_message<'a>( + message_type: TradeListingMessageType, + buyer_pub: &'a str, + seller_pub: &'a str, + ) -> &'a str { + match message_type { + TradeListingMessageType::OrderResponse + | TradeListingMessageType::OrderRevision + | TradeListingMessageType::Answer + | TradeListingMessageType::DiscountOffer + | TradeListingMessageType::FulfillmentUpdate => buyer_pub, + _ => seller_pub, + } + } - assert!(ensure_transition(TradeOrderStatus::Requested, TradeOrderStatus::Revised).is_ok()); - assert!(ensure_transition(TradeOrderStatus::Declined, TradeOrderStatus::Accepted).is_err()); - assert!( - ensure_transition(TradeOrderStatus::Fulfilled, TradeOrderStatus::Completed).is_ok() - ); - assert!( - ensure_transition(TradeOrderStatus::Completed, TradeOrderStatus::Requested).is_err() - ); - assert!(ensure_transition(TradeOrderStatus::Draft, TradeOrderStatus::Draft).is_ok()); - assert_eq!( - next_status_for_fulfillment_update( - &TradeOrderStatus::Accepted, - &TradeFulfillmentStatus::Shipped - ) - .expect("shipped keeps accepted"), - None - ); - assert_eq!( - next_status_for_fulfillment_update( - &TradeOrderStatus::Accepted, - &TradeFulfillmentStatus::Delivered - ) - .expect("delivered fulfills"), - Some(TradeOrderStatus::Fulfilled) - ); - assert_eq!( - next_status_for_fulfillment_update( - &TradeOrderStatus::Accepted, - &TradeFulfillmentStatus::Cancelled - ) - .expect("cancelled cancels"), - Some(TradeOrderStatus::Cancelled) - ); - assert!( - next_status_for_fulfillment_update( - &TradeOrderStatus::Requested, - &TradeFulfillmentStatus::Shipped + async fn workflow_state_refs( + message_type: TradeListingMessageType, + listing_addr: &str, + order_id: &str, + state: &Arc<AsyncMutex<TradeListingState>>, + ) -> (Option<String>, Option<String>, Option<String>) { + let mut locked = state.lock().await; + let listing_event_id = if message_type.requires_listing_snapshot() { + Some( + locked + .listing_event_id(listing_addr) + .unwrap_or(TEST_LISTING_EVENT_ID) + .to_string(), ) - .is_err() - ); - assert_eq!( - next_status_for_receipt(&TradeOrderStatus::Fulfilled, false) - .expect("unacknowledged receipt keeps fulfilled"), + } else { + None + }; + let (root_event_id, prev_event_id) = if message_type.requires_trade_chain() { + if let Some(order) = locked.get_order_mut(order_id) { + (order.root_event_id.clone(), order.last_event_id.clone()) + } else { + ( + Some(format!("{order_id}:root")), + Some(format!("{order_id}:prev")), + ) + } + } else { + (None, None) + }; + (listing_event_id, root_event_id, prev_event_id) + } + + async fn make_public_trade_event( + sender: &RadrootsNostrKeys, + message_type: TradeListingMessageType, + listing_addr: &str, + order_id: &str, + buyer_pub: &str, + seller_pub: &str, + state: Option<&Arc<AsyncMutex<TradeListingState>>>, + ) -> RadrootsNostrEvent { + let (listing_event_id, root_event_id, prev_event_id) = if let Some(state) = state { + workflow_state_refs(message_type, listing_addr, order_id, state).await + } else { + let listing_event_id = message_type + .requires_listing_snapshot() + .then(|| TEST_LISTING_EVENT_ID.to_string()); + (listing_event_id, None, None) + }; + + let payload = + payload_enum_for_message(message_type, order_id, listing_addr, buyer_pub, seller_pub); + make_public_trade_event_with_payload( + sender, + message_type, + listing_addr, + order_id, + buyer_pub, + seller_pub, + payload, + listing_event_id, + root_event_id, + prev_event_id, + ) + } + + fn make_public_trade_event_with_payload( + sender: &RadrootsNostrKeys, + message_type: TradeListingMessageType, + listing_addr: &str, + order_id: &str, + buyer_pub: &str, + seller_pub: &str, + payload: TradeListingMessagePayload, + listing_event_id: Option<String>, + root_event_id: Option<String>, + prev_event_id: Option<String>, + ) -> RadrootsNostrEvent { + let recipient = recipient_for_message(message_type, buyer_pub, seller_pub); + make_trade_event_with_payload_and_recipient( + sender, + recipient, + message_type, + listing_addr, + order_id, + payload, + listing_event_id, + root_event_id, + prev_event_id, + ) + } + + fn make_trade_event_with_payload_and_recipient( + sender: &RadrootsNostrKeys, + recipient: &str, + message_type: TradeListingMessageType, + listing_addr: &str, + order_id: &str, + payload: TradeListingMessagePayload, + listing_event_id: Option<String>, + root_event_id: Option<String>, + prev_event_id: Option<String>, + ) -> RadrootsNostrEvent { + let listing_event = listing_event_id.map(|id| RadrootsNostrEventPtr { id, relays: None }); + let envelope_event = super::trade_listing_envelope_event_build( + recipient.to_string(), + message_type, + listing_addr.to_string(), + Some(order_id.to_string()), + listing_event.as_ref(), + root_event_id.as_deref(), + prev_event_id.as_deref(), + &payload, + ) + .expect("build trade event"); + let builder = radroots_nostr::prelude::radroots_nostr_build_event( + envelope_event.kind, + envelope_event.content, + envelope_event.tags, + ) + .expect("event builder"); + builder.sign_with_keys(sender).expect("event") + } + + async fn make_handle_event_trade_event( + sender: &RadrootsNostrKeys, + message_type: TradeListingMessageType, + listing_addr: &str, + order_id: &str, + buyer_pub: &str, + seller_pub: &str, + state: Option<&Arc<AsyncMutex<TradeListingState>>>, + ) -> (RadrootsNostrEvent, Vec<RadrootsNostrTag>) { + let (listing_event_id, root_event_id, prev_event_id) = if let Some(state) = state { + workflow_state_refs(message_type, listing_addr, order_id, state).await + } else { + let listing_event_id = message_type + .requires_listing_snapshot() + .then(|| TEST_LISTING_EVENT_ID.to_string()); + (listing_event_id, None, None) + }; + let event = make_public_trade_event_with_payload( + sender, + message_type, + listing_addr, + order_id, + buyer_pub, + seller_pub, + payload_enum_for_message(message_type, order_id, listing_addr, buyer_pub, seller_pub), + listing_event_id, + root_event_id, + prev_event_id, + ); + let tags = event.tags.iter().cloned().collect(); + (event, tags) + } + + fn sample_discount_value() -> RadrootsCoreDiscountValue { + RadrootsCoreDiscountValue::MoneyPerBin(RadrootsCoreMoney::from_minor_units_u32( + 100, + RadrootsCoreCurrency::USD, + )) + } + + fn sender_for_message<'a>( + message_type: TradeListingMessageType, + seller_keys: &'a RadrootsNostrKeys, + buyer_keys: &'a RadrootsNostrKeys, + ) -> &'a RadrootsNostrKeys { + match message_type { + TradeListingMessageType::OrderResponse + | TradeListingMessageType::OrderRevision + | TradeListingMessageType::Answer + | TradeListingMessageType::DiscountOffer + | TradeListingMessageType::FulfillmentUpdate => seller_keys, + _ => buyer_keys, + } + } + + #[test] + fn transition_matrix_and_tag_helpers_are_covered() { + let _guard = test_guard(); + + assert!(ensure_transition(TradeOrderStatus::Requested, TradeOrderStatus::Revised).is_ok()); + assert!(ensure_transition(TradeOrderStatus::Declined, TradeOrderStatus::Accepted).is_err()); + assert!( + ensure_transition(TradeOrderStatus::Fulfilled, TradeOrderStatus::Completed).is_ok() + ); + assert!( + ensure_transition(TradeOrderStatus::Completed, TradeOrderStatus::Requested).is_err() + ); + assert!(ensure_transition(TradeOrderStatus::Draft, TradeOrderStatus::Draft).is_ok()); + assert_eq!( + next_status_for_fulfillment_update( + &TradeOrderStatus::Accepted, + &TradeFulfillmentStatus::Shipped + ) + .expect("shipped keeps accepted"), + None + ); + assert_eq!( + next_status_for_fulfillment_update( + &TradeOrderStatus::Accepted, + &TradeFulfillmentStatus::Delivered + ) + .expect("delivered fulfills"), + Some(TradeOrderStatus::Fulfilled) + ); + assert_eq!( + next_status_for_fulfillment_update( + &TradeOrderStatus::Accepted, + &TradeFulfillmentStatus::Cancelled + ) + .expect("cancelled cancels"), + Some(TradeOrderStatus::Cancelled) + ); + assert!( + next_status_for_fulfillment_update( + &TradeOrderStatus::Requested, + &TradeFulfillmentStatus::Shipped + ) + .is_err() + ); + assert_eq!( + next_status_for_receipt(&TradeOrderStatus::Fulfilled, false) + .expect("unacknowledged receipt keeps fulfilled"), None ); assert_eq!( @@ -2143,24 +2499,21 @@ mod tests { let seller_pub = seller_keys.public_key().to_hex(); let buyer_pub = buyer_keys.public_key().to_hex(); let state = Arc::new(AsyncMutex::new(TradeListingState::default())); - let validated_listing_event = - RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(30402), "listing") - .custom_created_at(RadrootsNostrTimestamp::from(10_u64)) - .sign_with_keys(&seller_keys) - .expect("listing event"); state .lock() .await - .mark_listing_validated(&listing_addr, &validated_listing_event.id.to_string()); + .upsert_listing_event(&listing_addr, TEST_LISTING_EVENT_ID, 30402); - let order_event = make_event( + let order_event = make_public_trade_event( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_ORDER_REQ), - "order".to_string(), - Vec::new(), - ); - push_fetch_events_ok(vec![validated_listing_event.clone()]); - push_send_ok(); + TradeListingMessageType::OrderRequest, + &listing_addr, + order_id, + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; let order_payload = make_order( order_id, &listing_addr, @@ -2181,13 +2534,16 @@ mod tests { .is_ok() ); - let response_event = make_event( + let response_event = make_public_trade_event( &seller_keys, - custom_trade_kind(KIND_TRADE_LISTING_ORDER_RES), - "resp".to_string(), - Vec::new(), - ); - push_send_ok(); + TradeListingMessageType::OrderResponse, + &listing_addr, + order_id, + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; assert!( handle_order_response( &response_event, @@ -2210,21 +2566,22 @@ mod tests { .get_order_mut(order_id) .expect("order") .status = TradeOrderStatus::Requested; - let revision_event = make_event( + let revision_event = make_public_trade_event( &seller_keys, - custom_trade_kind(KIND_TRADE_LISTING_ORDER_REVISION_REQ), - "rev".to_string(), - Vec::new(), - ); - push_send_ok(); + TradeListingMessageType::OrderRevision, + &listing_addr, + order_id, + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; assert!( handle_order_revision( &revision_event, TradeOrderRevision { revision_id: "r1".to_string(), - order_id: order_id.to_string(), changes: Vec::new(), - reason: None, }, &listing_addr_parsed, Some(order_id), @@ -2235,13 +2592,16 @@ mod tests { .is_ok() ); - let revision_response_event = make_event( + let revision_response_event = make_public_trade_event( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_ORDER_REVISION_RES), - "revresp".to_string(), - Vec::new(), - ); - push_send_ok(); + TradeListingMessageType::OrderRevisionAccept, + &listing_addr, + order_id, + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; assert!( handle_order_revision_response( &revision_response_event, @@ -2265,21 +2625,21 @@ mod tests { .get_order_mut(order_id) .expect("order") .status = TradeOrderStatus::Requested; - let question_event = make_event( + let question_event = make_public_trade_event( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_QUESTION_REQ), - "q".to_string(), - Vec::new(), - ); - push_send_ok(); + TradeListingMessageType::Question, + &listing_addr, + order_id, + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; assert!( handle_question( &question_event, TradeQuestion { question_id: "q1".to_string(), - order_id: Some(order_id.to_string()), - listing_addr: Some(listing_addr.clone()), - question_text: "what".to_string(), }, &listing_addr_parsed, Some(order_id), @@ -2290,21 +2650,21 @@ mod tests { .is_ok() ); - let answer_event = make_event( + let answer_event = make_public_trade_event( &seller_keys, - custom_trade_kind(KIND_TRADE_LISTING_ANSWER_RES), - "a".to_string(), - Vec::new(), - ); - push_send_ok(); + TradeListingMessageType::Answer, + &listing_addr, + order_id, + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; assert!( handle_answer( &answer_event, TradeAnswer { question_id: "q1".to_string(), - order_id: Some(order_id.to_string()), - listing_addr: Some(listing_addr.clone()), - answer_text: "ans".to_string(), }, &listing_addr_parsed, Some(order_id), @@ -2315,21 +2675,22 @@ mod tests { .is_ok() ); - let discount_request_event = make_event( + let discount_request_event = make_public_trade_event( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_DISCOUNT_REQ), - "dr".to_string(), - Vec::new(), - ); - push_send_ok(); + TradeListingMessageType::DiscountRequest, + &listing_addr, + order_id, + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; assert!( handle_discount_request( &discount_request_event, TradeDiscountRequest { discount_id: "d1".to_string(), - order_id: order_id.to_string(), value: sample_discount_value(), - conditions: None, }, &listing_addr_parsed, Some(order_id), @@ -2340,21 +2701,22 @@ mod tests { .is_ok() ); - let discount_offer_event = make_event( + let discount_offer_event = make_public_trade_event( &seller_keys, - custom_trade_kind(KIND_TRADE_LISTING_DISCOUNT_OFFER_RES), - "do".to_string(), - Vec::new(), - ); - push_send_ok(); + TradeListingMessageType::DiscountOffer, + &listing_addr, + order_id, + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; assert!( handle_discount_offer( &discount_offer_event, TradeDiscountOffer { discount_id: "d1".to_string(), - order_id: order_id.to_string(), value: sample_discount_value(), - conditions: None, }, &listing_addr_parsed, Some(order_id), @@ -2365,13 +2727,16 @@ mod tests { .is_ok() ); - let discount_accept_event = make_event( + let discount_accept_event = make_public_trade_event( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_DISCOUNT_ACCEPT_REQ), - "da".to_string(), - Vec::new(), - ); - push_send_ok(); + TradeListingMessageType::DiscountAccept, + &listing_addr, + order_id, + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; assert!( handle_discount_decision( &discount_accept_event, @@ -2394,13 +2759,16 @@ mod tests { .get_order_mut(order_id) .expect("order") .status = TradeOrderStatus::Requested; - let cancel_event = make_event( + let cancel_event = make_public_trade_event( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_CANCEL_REQ), - "cancel".to_string(), - Vec::new(), - ); - push_send_ok(); + TradeListingMessageType::Cancel, + &listing_addr, + order_id, + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; assert!( handle_cancel( &cancel_event, @@ -2420,21 +2788,21 @@ mod tests { .get_order_mut(order_id) .expect("order") .status = TradeOrderStatus::Accepted; - let fulfill_event = make_event( + let fulfill_event = make_public_trade_event( &seller_keys, - custom_trade_kind(KIND_TRADE_LISTING_FULFILLMENT_UPDATE_REQ), - "fulfill".to_string(), - Vec::new(), - ); - push_send_ok(); + TradeListingMessageType::FulfillmentUpdate, + &listing_addr, + order_id, + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; assert!( handle_fulfillment_update( &fulfill_event, TradeFulfillmentUpdate { status: TradeFulfillmentStatus::Delivered, - tracking: None, - eta: None, - notes: None, }, &listing_addr_parsed, Some(order_id), @@ -2445,20 +2813,22 @@ mod tests { .is_ok() ); - let receipt_event = make_event( + let receipt_event = make_public_trade_event( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_RECEIPT_REQ), - "receipt".to_string(), - Vec::new(), - ); - push_send_ok(); + TradeListingMessageType::Receipt, + &listing_addr, + order_id, + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; assert!( handle_receipt( &receipt_event, TradeReceipt { acknowledged: true, at: 1, - note: None, }, &listing_addr_parsed, Some(order_id), @@ -2476,14 +2846,16 @@ mod tests { let (rhi_keys, seller_keys, buyer_keys) = make_keys(); let client = make_client(&rhi_keys); let rhi_pub = rhi_keys.public_key().to_hex(); + let buyer_pub = buyer_keys.public_key().to_hex(); + let seller_pub = seller_keys.public_key().to_hex(); let listing_addr = listing_addr_for_seller(&seller_keys); let order_id = "order-1"; let tags = make_custom_tags(&rhi_pub, &listing_addr, Some(order_id)); let state = state_with_order( &listing_addr, order_id, - &buyer_keys.public_key().to_hex(), - &seller_keys.public_key().to_hex(), + &buyer_pub, + &seller_pub, TradeOrderStatus::Requested, ) .await; @@ -2509,11 +2881,13 @@ mod tests { let missing_recipient = make_event( &buyer_keys, custom_trade_kind(KIND_TRADE_LISTING_VALIDATE_REQ), - make_envelope_content( + make_canonical_envelope_content( TradeListingMessageType::ListingValidateRequest, &listing_addr, None, - json!({"listing_event": null}), + TradeListingMessagePayload::ListingValidateRequest(TradeListingValidateRequest { + listing_event: None, + }), ), Vec::new(), ); @@ -2601,13 +2975,19 @@ mod tests { let a_mismatch_event = make_event( &buyer_keys, custom_trade_kind(KIND_TRADE_LISTING_ORDER_REQ), - make_envelope_content( + make_canonical_envelope_content( TradeListingMessageType::OrderRequest, "30402:deadbeef:AAAAAAAAAAAAAAAAAAAAAA", Some(order_id), - json!({}), - ), - tags.clone(), + payload_enum_for_message( + TradeListingMessageType::OrderRequest, + order_id, + "30402:deadbeef:AAAAAAAAAAAAAAAAAAAAAA", + &buyer_pub, + &seller_pub, + ), + ), + tags.clone(), ); assert!(matches!( handle_event( @@ -2625,11 +3005,17 @@ mod tests { let d_mismatch_event = make_event( &buyer_keys, custom_trade_kind(KIND_TRADE_LISTING_ORDER_REQ), - make_envelope_content( + make_canonical_envelope_content( TradeListingMessageType::OrderRequest, &listing_addr, Some(order_id), - json!({}), + payload_enum_for_message( + TradeListingMessageType::OrderRequest, + order_id, + &listing_addr, + &buyer_pub, + &seller_pub, + ), ), d_mismatch_tags.clone(), ); @@ -2646,18 +3032,31 @@ mod tests { )); let bad_addr = format!( - "30403:{}:AAAAAAAAAAAAAAAAAAAAAA", + "30404:{}:AAAAAAAAAAAAAAAAAAAAAA", seller_keys.public_key().to_hex() ); - let bad_addr_tags = make_custom_tags(&rhi_pub, &bad_addr, Some(order_id)); + let bad_addr_tags = make_workflow_tags( + &rhi_pub, + &bad_addr, + Some(order_id), + Some(TEST_LISTING_EVENT_ID), + None, + None, + ); let bad_addr_event = make_event( &buyer_keys, custom_trade_kind(KIND_TRADE_LISTING_ORDER_REQ), - make_envelope_content( + make_canonical_envelope_content( TradeListingMessageType::OrderRequest, &bad_addr, Some(order_id), - json!({}), + payload_enum_for_message( + TradeListingMessageType::OrderRequest, + order_id, + &bad_addr, + &buyer_pub, + &seller_pub, + ), ), bad_addr_tags.clone(), ); @@ -2817,7 +3216,10 @@ mod tests { TradeListingMessageType::ListingValidateResult, &listing_addr_for_seller(&seller_keys), None, - &json!({"valid":true,"errors":[]}), + &TradeListingMessagePayload::ListingValidateResult(TradeListingValidateResult { + valid: true, + errors: Vec::new(), + }), ) .await .is_ok() @@ -3032,9 +3434,7 @@ mod tests { KIND_TRADE_LISTING_ORDER_REVISION_REQ, serde_json::to_value(TradeOrderRevision { revision_id: "r2".to_string(), - order_id: order_id.to_string(), changes: Vec::new(), - reason: None, }) .expect("order revision"), TradeOrderStatus::Requested, @@ -3064,9 +3464,6 @@ mod tests { KIND_TRADE_LISTING_QUESTION_REQ, serde_json::to_value(TradeQuestion { question_id: "qx".to_string(), - order_id: Some(order_id.to_string()), - listing_addr: Some(listing_addr.clone()), - question_text: "question".to_string(), }) .expect("question"), TradeOrderStatus::Requested, @@ -3076,9 +3473,6 @@ mod tests { KIND_TRADE_LISTING_ANSWER_RES, serde_json::to_value(TradeAnswer { question_id: "qx".to_string(), - order_id: Some(order_id.to_string()), - listing_addr: Some(listing_addr.clone()), - answer_text: "answer".to_string(), }) .expect("answer"), TradeOrderStatus::Questioned, @@ -3088,9 +3482,7 @@ mod tests { KIND_TRADE_LISTING_DISCOUNT_REQ, serde_json::to_value(TradeDiscountRequest { discount_id: "d2".to_string(), - order_id: order_id.to_string(), value: sample_discount_value(), - conditions: None, }) .expect("discount request"), TradeOrderStatus::Requested, @@ -3100,9 +3492,7 @@ mod tests { KIND_TRADE_LISTING_DISCOUNT_OFFER_RES, serde_json::to_value(TradeDiscountOffer { discount_id: "d2".to_string(), - order_id: order_id.to_string(), value: sample_discount_value(), - conditions: None, }) .expect("discount offer"), TradeOrderStatus::Requested, @@ -3134,9 +3524,6 @@ mod tests { KIND_TRADE_LISTING_FULFILLMENT_UPDATE_REQ, serde_json::to_value(TradeFulfillmentUpdate { status: TradeFulfillmentStatus::Shipped, - tracking: None, - eta: None, - notes: None, }) .expect("fulfillment"), TradeOrderStatus::Accepted, @@ -3147,7 +3534,6 @@ mod tests { serde_json::to_value(TradeReceipt { acknowledged: true, at: 1, - note: None, }) .expect("receipt"), TradeOrderStatus::Fulfilled, @@ -3207,12 +3593,16 @@ mod tests { &seller_pub, TradeOrderStatus::Requested, ); - let event = make_event( + let event = make_public_trade_event( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_ORDER_REQ), - "x".to_string(), - Vec::new(), - ); + TradeListingMessageType::OrderRequest, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; assert!(matches!( handle_order_request(&event, bad_order, &parsed, Some("order-1"), &client, &state) .await, @@ -3227,62 +3617,126 @@ mod tests { &seller_pub, TradeOrderStatus::Requested, ); - assert!(matches!( + let fetched_snapshot_event = make_event( + &seller_keys, + RadrootsNostrKind::Custom(30402), + "listing-fetch".to_string(), + Vec::new(), + ); + let fetched_snapshot_id = fetched_snapshot_event.id.to_string(); + dvm_test_hooks() + .lock() + .expect("hooks") + .fetch_event_by_id_results + .push_back(Ok(fetched_snapshot_event)); + push_validate_listing_ok( + listing_addr.clone(), + RadrootsListingFarmRef { + pubkey: seller_pub.clone(), + d_tag: "farmtag".to_string(), + }, + ); + let fetched_order_event = make_public_trade_event_with_payload( + &buyer_keys, + TradeListingMessageType::OrderRequest, + &listing_addr, + "order-2", + &buyer_pub, + &seller_pub, + TradeListingMessagePayload::OrderRequest(make_order( + "order-2", + &listing_addr, + &buyer_pub, + &seller_pub, + TradeOrderStatus::Requested, + )), + Some(fetched_snapshot_id), + None, + None, + ); + assert!( handle_order_request( - &event, + &fetched_order_event, order, &parsed, Some("order-2"), &client, &missing_state ) - .await, - Err(TradeListingDvmError::ListingNotValidated) - )); + .await + .is_ok() + ); + assert!(missing_state.lock().await.order_exists("order-2")); - let stale_listing_event = - RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(30402), "listing-stale") - .custom_created_at(RadrootsNostrTimestamp::from(9_u64)) - .sign_with_keys(&seller_keys) - .expect("stale listing event"); - state + let mismatched_snapshot_state = Arc::new(AsyncMutex::new(TradeListingState::default())); + let mismatched_snapshot_event = make_event( + &seller_keys, + RadrootsNostrKind::Custom(30402), + "listing-mismatch".to_string(), + Vec::new(), + ); + let mismatched_snapshot_id = mismatched_snapshot_event.id.to_string(); + dvm_test_hooks() .lock() - .await - .mark_listing_validated(&listing_addr, &stale_listing_event.id.to_string()); - let latest_listing_event = - RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(30402), "listing-latest") - .custom_created_at(RadrootsNostrTimestamp::from(10_u64)) - .sign_with_keys(&seller_keys) - .expect("latest listing event"); - push_fetch_events_ok(vec![latest_listing_event]); - let stale_order = make_order( + .expect("hooks") + .fetch_event_by_id_results + .push_back(Ok(mismatched_snapshot_event)); + push_validate_listing_ok( + listing_addr_for_seller(&buyer_keys), + RadrootsListingFarmRef { + pubkey: buyer_pub.clone(), + d_tag: "farmtag".to_string(), + }, + ); + let mismatched_snapshot_order = make_order( "order-3", &listing_addr, &buyer_pub, &seller_pub, TradeOrderStatus::Requested, ); + let mismatched_snapshot_event = make_public_trade_event_with_payload( + &buyer_keys, + TradeListingMessageType::OrderRequest, + &listing_addr, + "order-3", + &buyer_pub, + &seller_pub, + TradeListingMessagePayload::OrderRequest(make_order( + "order-3", + &listing_addr, + &buyer_pub, + &seller_pub, + TradeOrderStatus::Requested, + )), + Some(mismatched_snapshot_id), + None, + None, + ); assert!(matches!( handle_order_request( - &event, - stale_order, + &mismatched_snapshot_event, + mismatched_snapshot_order, &parsed, Some("order-3"), &client, - &state + &mismatched_snapshot_state ) .await, - Err(TradeListingDvmError::ListingNotValidated) + Err(TradeListingDvmError::InvalidOrder) )); - assert!(!state.lock().await.is_listing_validated(&listing_addr)); set_order_status(&state, "order-1", TradeOrderStatus::Requested).await; - let seller_event = make_event( + let seller_event = make_public_trade_event( &seller_keys, - custom_trade_kind(KIND_TRADE_LISTING_ORDER_RES), - "x".to_string(), - Vec::new(), - ); + TradeListingMessageType::OrderResponse, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; state .lock() .await @@ -3306,12 +3760,16 @@ mod tests { .is_ok() ); - let wrong_buyer = make_event( + let wrong_buyer = make_public_trade_event( &seller_keys, - custom_trade_kind(KIND_TRADE_LISTING_ORDER_REVISION_RES), - "x".to_string(), - Vec::new(), - ); + TradeListingMessageType::OrderRevisionAccept, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; assert!(matches!( handle_order_revision_response( &wrong_buyer, @@ -3330,12 +3788,16 @@ mod tests { )); set_order_status(&state, "order-1", TradeOrderStatus::Requested).await; - let wrong_sender = make_event( + let wrong_sender = make_public_trade_event( &rhi_keys, - custom_trade_kind(KIND_TRADE_LISTING_CANCEL_REQ), - "x".to_string(), - Vec::new(), - ); + TradeListingMessageType::Cancel, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; assert!(matches!( handle_cancel( &wrong_sender, @@ -3421,35 +3883,35 @@ mod tests { .is_ok() ); - let validated_listing_event = - RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(30402), "listing") - .custom_created_at(RadrootsNostrTimestamp::from(10_u64)) - .sign_with_keys(&seller_keys) - .expect("listing event"); - state - .lock() - .await - .mark_listing_validated(&listing_addr, &validated_listing_event.id.to_string()); - let invalid_status_order = make_order( - "order-4", + let duplicate_order = make_order( + "order-1", &listing_addr, &buyer_pub, &seller_pub, - TradeOrderStatus::Accepted, + TradeOrderStatus::Requested, ); - push_fetch_events_ok(vec![validated_listing_event.clone()]); - assert!(matches!( + let duplicate_order_event = make_public_trade_event( + &buyer_keys, + TradeListingMessageType::OrderRequest, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; + assert!( handle_order_request( - &event, - invalid_status_order, + &duplicate_order_event, + duplicate_order, &parsed, - Some("order-4"), + Some("order-1"), &client, &state, ) - .await, - Err(TradeListingDvmError::InvalidOrder) - )); + .await + .is_ok() + ); let unauthorized_order = make_order( "order-3", @@ -3458,10 +3920,19 @@ mod tests { &seller_pub, TradeOrderStatus::Requested, ); - push_fetch_events_ok(vec![validated_listing_event]); + let unauthorized_order_event = make_public_trade_event( + &buyer_keys, + TradeListingMessageType::OrderRequest, + &listing_addr, + "order-3", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; assert!(matches!( handle_order_request( - &event, + &unauthorized_order_event, unauthorized_order, &parsed, Some("order-3"), @@ -3493,12 +3964,16 @@ mod tests { ); set_order_status(&state, "order-1", TradeOrderStatus::Requested).await; - let buyer_event = make_event( + let buyer_event = make_public_trade_event( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_ORDER_RES), - "x".to_string(), - Vec::new(), - ); + TradeListingMessageType::OrderResponse, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; assert!(matches!( handle_order_response( &buyer_event, @@ -3519,12 +3994,16 @@ mod tests { push_send_ok(); assert!( handle_order_response( - &make_event( + &make_public_trade_event( &seller_keys, - custom_trade_kind(KIND_TRADE_LISTING_ORDER_RES), - "x".to_string(), - Vec::new(), - ), + TradeListingMessageType::OrderResponse, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await, TradeOrderResponse { accepted: false, reason: None, @@ -3541,17 +4020,19 @@ mod tests { set_order_status(&state, "order-1", TradeOrderStatus::Requested).await; assert!(matches!( handle_order_revision( - &make_event( + &make_public_trade_event( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_ORDER_REVISION_REQ), - "revision-wrong-sender".to_string(), - Vec::new(), - ), + TradeListingMessageType::OrderRevision, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await, TradeOrderRevision { revision_id: "r-wrong-sender".to_string(), - order_id: "order-1".to_string(), changes: Vec::new(), - reason: None, }, &parsed, Some("order-1"), @@ -3565,17 +4046,27 @@ mod tests { set_order_status(&state, "order-1", TradeOrderStatus::Requested).await; assert!(matches!( handle_order_revision( - &make_event( + &make_public_trade_event_with_payload( &seller_keys, - custom_trade_kind(KIND_TRADE_LISTING_ORDER_REVISION_REQ), - "x".to_string(), - Vec::new(), + TradeListingMessageType::OrderRevision, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + payload_enum_for_message( + TradeListingMessageType::OrderRevision, + "order-1", + &listing_addr, + &buyer_pub, + &seller_pub, + ), + Some(TEST_LISTING_EVENT_ID.to_string()), + Some("wrong-root".to_string()), + Some("wrong-prev".to_string()), ), TradeOrderRevision { revision_id: "r3".to_string(), - order_id: "other".to_string(), changes: Vec::new(), - reason: None, }, &parsed, Some("order-1"), @@ -3586,12 +4077,16 @@ mod tests { Err(TradeListingDvmError::InvalidOrder) )); - let seen_event = make_event( + let seen_event = make_public_trade_event( &seller_keys, - custom_trade_kind(KIND_TRADE_LISTING_ORDER_REVISION_REQ), - "x".to_string(), - Vec::new(), - ); + TradeListingMessageType::OrderRevision, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; state .lock() .await @@ -3604,9 +4099,7 @@ mod tests { &seen_event, TradeOrderRevision { revision_id: "r4".to_string(), - order_id: "order-1".to_string(), changes: Vec::new(), - reason: None, }, &parsed, Some("order-1"), @@ -3620,17 +4113,26 @@ mod tests { set_order_status(&state, "order-1", TradeOrderStatus::Requested).await; assert!(matches!( handle_question( - &make_event( + &make_public_trade_event_with_payload( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_QUESTION_REQ), - "x".to_string(), - Vec::new(), + TradeListingMessageType::Question, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + payload_enum_for_message( + TradeListingMessageType::Question, + "order-1", + &listing_addr, + &buyer_pub, + &seller_pub, + ), + None, + Some("wrong-root".to_string()), + Some("wrong-prev".to_string()), ), TradeQuestion { question_id: "q".to_string(), - order_id: Some("other".to_string()), - listing_addr: None, - question_text: "q".to_string(), }, &parsed, Some("order-1"), @@ -3644,17 +4146,26 @@ mod tests { set_order_status(&state, "order-1", TradeOrderStatus::Questioned).await; assert!(matches!( handle_answer( - &make_event( + &make_public_trade_event_with_payload( &seller_keys, - custom_trade_kind(KIND_TRADE_LISTING_ANSWER_RES), - "x".to_string(), - Vec::new(), + TradeListingMessageType::Answer, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + payload_enum_for_message( + TradeListingMessageType::Answer, + "order-1", + &listing_addr, + &buyer_pub, + &seller_pub, + ), + None, + Some("wrong-root".to_string()), + Some("wrong-prev".to_string()), ), TradeAnswer { question_id: "q".to_string(), - order_id: Some("other".to_string()), - listing_addr: None, - answer_text: "a".to_string(), }, &parsed, Some("order-1"), @@ -3668,17 +4179,27 @@ mod tests { set_order_status(&state, "order-1", TradeOrderStatus::Requested).await; assert!(matches!( handle_discount_request( - &make_event( + &make_public_trade_event_with_payload( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_DISCOUNT_REQ), - "x".to_string(), - Vec::new(), + TradeListingMessageType::DiscountRequest, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + payload_enum_for_message( + TradeListingMessageType::DiscountRequest, + "order-1", + &listing_addr, + &buyer_pub, + &seller_pub, + ), + Some(TEST_LISTING_EVENT_ID.to_string()), + Some("wrong-root".to_string()), + Some("wrong-prev".to_string()), ), TradeDiscountRequest { discount_id: "d".to_string(), - order_id: "other".to_string(), value: sample_discount_value(), - conditions: None, }, &parsed, Some("order-1"), @@ -3692,17 +4213,27 @@ mod tests { set_order_status(&state, "order-1", TradeOrderStatus::Requested).await; assert!(matches!( handle_discount_offer( - &make_event( + &make_public_trade_event_with_payload( &seller_keys, - custom_trade_kind(KIND_TRADE_LISTING_DISCOUNT_OFFER_RES), - "x".to_string(), - Vec::new(), + TradeListingMessageType::DiscountOffer, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + payload_enum_for_message( + TradeListingMessageType::DiscountOffer, + "order-1", + &listing_addr, + &buyer_pub, + &seller_pub, + ), + Some(TEST_LISTING_EVENT_ID.to_string()), + Some("wrong-root".to_string()), + Some("wrong-prev".to_string()), ), TradeDiscountOffer { discount_id: "d".to_string(), - order_id: "other".to_string(), value: sample_discount_value(), - conditions: None, }, &parsed, Some("order-1"), @@ -3716,12 +4247,16 @@ mod tests { set_order_status(&state, "order-1", TradeOrderStatus::Revised).await; assert!(matches!( handle_discount_decision( - &make_event( + &make_public_trade_event( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_DISCOUNT_ACCEPT_REQ), - "x".to_string(), - Vec::new(), - ), + TradeListingMessageType::DiscountAccept, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await, TradeListingMessageType::DiscountAccept, TradeDiscountDecision::Decline { reason: None }, &parsed, @@ -3734,12 +4269,16 @@ mod tests { )); assert!(matches!( handle_discount_decision( - &make_event( + &make_public_trade_event( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_DISCOUNT_DECLINE_REQ), - "x".to_string(), - Vec::new(), - ), + TradeListingMessageType::DiscountDecline, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await, TradeListingMessageType::DiscountDecline, TradeDiscountDecision::Accept { value: sample_discount_value(), @@ -3756,12 +4295,16 @@ mod tests { push_send_ok(); assert!( handle_discount_decision( - &make_event( + &make_public_trade_event( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_DISCOUNT_DECLINE_REQ), - "x".to_string(), - Vec::new(), - ), + TradeListingMessageType::DiscountDecline, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await, TradeListingMessageType::Cancel, TradeDiscountDecision::Decline { reason: None }, &parsed, @@ -3774,11 +4317,30 @@ mod tests { ); set_order_status(&state, "order-1", TradeOrderStatus::Requested).await; - let cancel_by_seller = make_event( + let (cancel_root_event_id, cancel_prev_event_id) = { + let mut locked = state.lock().await; + let order = locked.get_order_mut("order-1").expect("order"); + ( + order.root_event_id.clone().expect("root event"), + order.last_event_id.clone().expect("prev event"), + ) + }; + let cancel_by_seller = make_trade_event_with_payload_and_recipient( &seller_keys, - custom_trade_kind(KIND_TRADE_LISTING_CANCEL_REQ), - "x".to_string(), - Vec::new(), + &buyer_pub, + TradeListingMessageType::Cancel, + &listing_addr, + "order-1", + payload_enum_for_message( + TradeListingMessageType::Cancel, + "order-1", + &listing_addr, + &buyer_pub, + &seller_pub, + ), + None, + Some(cancel_root_event_id), + Some(cancel_prev_event_id), ); push_send_ok(); assert!( @@ -3797,17 +4359,18 @@ mod tests { set_order_status(&state, "order-1", TradeOrderStatus::Accepted).await; assert!(matches!( handle_fulfillment_update( - &make_event( + &make_public_trade_event( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_FULFILLMENT_UPDATE_REQ), - "x".to_string(), - Vec::new(), - ), + TradeListingMessageType::FulfillmentUpdate, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await, TradeFulfillmentUpdate { status: TradeFulfillmentStatus::Shipped, - tracking: None, - eta: None, - notes: None, }, &parsed, Some("order-1"), @@ -3821,16 +4384,19 @@ mod tests { set_order_status(&state, "order-1", TradeOrderStatus::Fulfilled).await; assert!(matches!( handle_receipt( - &make_event( + &make_public_trade_event( &seller_keys, - custom_trade_kind(KIND_TRADE_LISTING_RECEIPT_REQ), - "x".to_string(), - Vec::new(), - ), + TradeListingMessageType::Receipt, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await, TradeReceipt { acknowledged: true, at: 1, - note: None, }, &parsed, Some("order-1"), @@ -3863,20 +4429,22 @@ mod tests { let mismatched_addr = listing_addr_for_seller(&buyer_keys); let mismatched_parsed = TradeListingAddress::parse(&mismatched_addr).expect("mismatched listing"); - let revision_event = make_event( + let revision_event = make_public_trade_event( &seller_keys, - custom_trade_kind(KIND_TRADE_LISTING_ORDER_REVISION_REQ), - "revision".to_string(), - Vec::new(), - ); + TradeListingMessageType::OrderRevision, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; assert!(matches!( handle_order_revision( &revision_event, TradeOrderRevision { revision_id: "r1".to_string(), - order_id: "order-1".to_string(), changes: Vec::new(), - reason: None, }, &mismatched_parsed, Some("order-1"), @@ -3887,12 +4455,16 @@ mod tests { Err(TradeListingDvmError::Unauthorized) )); - let seen_revision_response = make_event( + let seen_revision_response = make_public_trade_event( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_ORDER_REVISION_RES), - "seen".to_string(), - Vec::new(), - ); + TradeListingMessageType::OrderRevisionAccept, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; mark_event_seen(&state, "order-1", seen_revision_response.id.to_string()).await; assert!( handle_order_revision_response( @@ -3911,13 +4483,31 @@ mod tests { .is_ok() ); + let (listing_event_id, root_event_id, prev_event_id) = workflow_state_refs( + TradeListingMessageType::OrderRevisionAccept, + &listing_addr, + "order-1", + &state, + ) + .await; assert!(matches!( handle_order_revision_response( - &make_event( + &make_public_trade_event_with_payload( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_ORDER_REVISION_RES), - "accept-invalid".to_string(), - Vec::new(), + TradeListingMessageType::OrderRevisionAccept, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + TradeListingMessagePayload::OrderRevisionAccept( + TradeOrderRevisionResponse { + accepted: false, + reason: None, + }, + ), + listing_event_id, + root_event_id, + prev_event_id, ), TradeListingMessageType::OrderRevisionAccept, TradeOrderRevisionResponse { @@ -3932,13 +4522,31 @@ mod tests { .await, Err(TradeListingDvmError::InvalidOrder) )); + let (listing_event_id, root_event_id, prev_event_id) = workflow_state_refs( + TradeListingMessageType::OrderRevisionDecline, + &listing_addr, + "order-1", + &state, + ) + .await; assert!(matches!( handle_order_revision_response( - &make_event( + &make_public_trade_event_with_payload( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_ORDER_REVISION_RES), - "decline-invalid".to_string(), - Vec::new(), + TradeListingMessageType::OrderRevisionDecline, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + TradeListingMessagePayload::OrderRevisionDecline( + TradeOrderRevisionResponse { + accepted: true, + reason: None, + }, + ), + listing_event_id, + root_event_id, + prev_event_id, ), TradeListingMessageType::OrderRevisionDecline, TradeOrderRevisionResponse { @@ -3955,20 +4563,20 @@ mod tests { )); set_order_status(&state, "order-1", TradeOrderStatus::Requested).await; - push_send_ok(); assert!( handle_question( - &make_event( + &make_public_trade_event( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_QUESTION_REQ), - "question-ok".to_string(), - Vec::new(), - ), + TradeListingMessageType::Question, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await, TradeQuestion { question_id: "q1".to_string(), - order_id: None, - listing_addr: Some(listing_addr.clone()), - question_text: "question".to_string(), }, &parsed, Some("order-1"), @@ -3979,21 +4587,22 @@ mod tests { .is_ok() ); - let seen_question = make_event( + let seen_question = make_public_trade_event( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_QUESTION_REQ), - "question-seen".to_string(), - Vec::new(), - ); + TradeListingMessageType::Question, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; mark_event_seen(&state, "order-1", seen_question.id.to_string()).await; assert!( handle_question( &seen_question, TradeQuestion { question_id: "q2".to_string(), - order_id: Some("order-1".to_string()), - listing_addr: None, - question_text: "question".to_string(), }, &parsed, Some("order-1"), @@ -4005,17 +4614,18 @@ mod tests { ); assert!(matches!( handle_question( - &make_event( + &make_public_trade_event( &seller_keys, - custom_trade_kind(KIND_TRADE_LISTING_QUESTION_REQ), - "question-unauthorized".to_string(), - Vec::new(), - ), + TradeListingMessageType::Question, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await, TradeQuestion { question_id: "q3".to_string(), - order_id: Some("order-1".to_string()), - listing_addr: None, - question_text: "question".to_string(), }, &parsed, Some("order-1"), @@ -4027,20 +4637,20 @@ mod tests { )); set_order_status(&state, "order-1", TradeOrderStatus::Questioned).await; - push_send_ok(); assert!( handle_answer( - &make_event( + &make_public_trade_event( &seller_keys, - custom_trade_kind(KIND_TRADE_LISTING_ANSWER_RES), - "answer-ok".to_string(), - Vec::new(), - ), + TradeListingMessageType::Answer, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await, TradeAnswer { question_id: "q1".to_string(), - order_id: None, - listing_addr: Some(listing_addr.clone()), - answer_text: "answer".to_string(), }, &parsed, Some("order-1"), @@ -4051,21 +4661,22 @@ mod tests { .is_ok() ); - let seen_answer = make_event( + let seen_answer = make_public_trade_event( &seller_keys, - custom_trade_kind(KIND_TRADE_LISTING_ANSWER_RES), - "answer-seen".to_string(), - Vec::new(), - ); + TradeListingMessageType::Answer, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; mark_event_seen(&state, "order-1", seen_answer.id.to_string()).await; assert!( handle_answer( &seen_answer, TradeAnswer { question_id: "q1".to_string(), - order_id: Some("order-1".to_string()), - listing_addr: None, - answer_text: "answer".to_string(), }, &parsed, Some("order-1"), @@ -4077,17 +4688,18 @@ mod tests { ); assert!(matches!( handle_answer( - &make_event( + &make_public_trade_event( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_ANSWER_RES), - "answer-unauthorized".to_string(), - Vec::new(), - ), + TradeListingMessageType::Answer, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await, TradeAnswer { question_id: "q1".to_string(), - order_id: Some("order-1".to_string()), - listing_addr: None, - answer_text: "answer".to_string(), }, &parsed, Some("order-1"), @@ -4099,21 +4711,23 @@ mod tests { )); set_order_status(&state, "order-1", TradeOrderStatus::Requested).await; - let seen_discount_request = make_event( + let seen_discount_request = make_public_trade_event( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_DISCOUNT_REQ), - "discount-request-seen".to_string(), - Vec::new(), - ); + TradeListingMessageType::DiscountRequest, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; mark_event_seen(&state, "order-1", seen_discount_request.id.to_string()).await; assert!( handle_discount_request( &seen_discount_request, TradeDiscountRequest { discount_id: "d1".to_string(), - order_id: "order-1".to_string(), value: sample_discount_value(), - conditions: None, }, &parsed, Some("order-1"), @@ -4125,17 +4739,19 @@ mod tests { ); assert!(matches!( handle_discount_request( - &make_event( + &make_public_trade_event( &seller_keys, - custom_trade_kind(KIND_TRADE_LISTING_DISCOUNT_REQ), - "discount-request-unauthorized".to_string(), - Vec::new(), - ), + TradeListingMessageType::DiscountRequest, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await, TradeDiscountRequest { discount_id: "d2".to_string(), - order_id: "order-1".to_string(), value: sample_discount_value(), - conditions: None, }, &parsed, Some("order-1"), @@ -4147,21 +4763,23 @@ mod tests { )); set_order_status(&state, "order-1", TradeOrderStatus::Requested).await; - let seen_discount_offer = make_event( + let seen_discount_offer = make_public_trade_event( &seller_keys, - custom_trade_kind(KIND_TRADE_LISTING_DISCOUNT_OFFER_RES), - "discount-offer-seen".to_string(), - Vec::new(), - ); + TradeListingMessageType::DiscountOffer, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; mark_event_seen(&state, "order-1", seen_discount_offer.id.to_string()).await; assert!( handle_discount_offer( &seen_discount_offer, TradeDiscountOffer { discount_id: "d1".to_string(), - order_id: "order-1".to_string(), value: sample_discount_value(), - conditions: None, }, &parsed, Some("order-1"), @@ -4173,17 +4791,19 @@ mod tests { ); assert!(matches!( handle_discount_offer( - &make_event( + &make_public_trade_event( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_DISCOUNT_OFFER_RES), - "discount-offer-unauthorized".to_string(), - Vec::new(), - ), + TradeListingMessageType::DiscountOffer, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await, TradeDiscountOffer { discount_id: "d2".to_string(), - order_id: "order-1".to_string(), value: sample_discount_value(), - conditions: None, }, &parsed, Some("order-1"), @@ -4195,12 +4815,16 @@ mod tests { )); set_order_status(&state, "order-1", TradeOrderStatus::Revised).await; - let seen_discount_decision = make_event( + let seen_discount_decision = make_public_trade_event( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_DISCOUNT_DECLINE_REQ), - "discount-decision-seen".to_string(), - Vec::new(), - ); + TradeListingMessageType::DiscountDecline, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; mark_event_seen(&state, "order-1", seen_discount_decision.id.to_string()).await; assert!( handle_discount_decision( @@ -4217,12 +4841,16 @@ mod tests { ); assert!(matches!( handle_discount_decision( - &make_event( + &make_public_trade_event( &seller_keys, - custom_trade_kind(KIND_TRADE_LISTING_DISCOUNT_DECLINE_REQ), - "discount-decision-unauthorized".to_string(), - Vec::new(), - ), + TradeListingMessageType::DiscountDecline, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await, TradeListingMessageType::DiscountDecline, TradeDiscountDecision::Decline { reason: None }, &parsed, @@ -4235,12 +4863,16 @@ mod tests { )); set_order_status(&state, "order-1", TradeOrderStatus::Requested).await; - let seen_cancel = make_event( + let seen_cancel = make_public_trade_event( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_CANCEL_REQ), - "cancel-seen".to_string(), - Vec::new(), - ); + TradeListingMessageType::Cancel, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; mark_event_seen(&state, "order-1", seen_cancel.id.to_string()).await; assert!( handle_cancel( @@ -4256,21 +4888,22 @@ mod tests { ); set_order_status(&state, "order-1", TradeOrderStatus::Accepted).await; - let seen_fulfillment = make_event( + let seen_fulfillment = make_public_trade_event( &seller_keys, - custom_trade_kind(KIND_TRADE_LISTING_FULFILLMENT_UPDATE_REQ), - "fulfillment-seen".to_string(), - Vec::new(), - ); + TradeListingMessageType::FulfillmentUpdate, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; mark_event_seen(&state, "order-1", seen_fulfillment.id.to_string()).await; assert!( handle_fulfillment_update( &seen_fulfillment, TradeFulfillmentUpdate { status: TradeFulfillmentStatus::Shipped, - tracking: None, - eta: None, - notes: None, }, &parsed, Some("order-1"), @@ -4282,12 +4915,16 @@ mod tests { ); set_order_status(&state, "order-1", TradeOrderStatus::Fulfilled).await; - let seen_receipt = make_event( + let seen_receipt = make_public_trade_event( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_RECEIPT_REQ), - "receipt-seen".to_string(), - Vec::new(), - ); + TradeListingMessageType::Receipt, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; mark_event_seen(&state, "order-1", seen_receipt.id.to_string()).await; assert!( handle_receipt( @@ -4295,7 +4932,6 @@ mod tests { TradeReceipt { acknowledged: true, at: 1, - note: None, }, &parsed, Some("order-1"), @@ -4325,20 +4961,20 @@ mod tests { ) .await; - push_send_ok(); assert!( handle_fulfillment_update( - &make_event( + &make_public_trade_event( &seller_keys, - custom_trade_kind(KIND_TRADE_LISTING_FULFILLMENT_UPDATE_REQ), - "fulfillment-shipped".to_string(), - Vec::new(), - ), + TradeListingMessageType::FulfillmentUpdate, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await, TradeFulfillmentUpdate { status: TradeFulfillmentStatus::Shipped, - tracking: None, - eta: None, - notes: None, }, &parsed, Some("order-1"), @@ -4358,20 +4994,20 @@ mod tests { TradeOrderStatus::Accepted ); - push_send_ok(); assert!( handle_fulfillment_update( - &make_event( + &make_public_trade_event( &seller_keys, - custom_trade_kind(KIND_TRADE_LISTING_FULFILLMENT_UPDATE_REQ), - "fulfillment-delivered".to_string(), - Vec::new(), - ), + TradeListingMessageType::FulfillmentUpdate, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await, TradeFulfillmentUpdate { status: TradeFulfillmentStatus::Delivered, - tracking: None, - eta: None, - notes: None, }, &parsed, Some("order-1"), @@ -4391,19 +5027,21 @@ mod tests { TradeOrderStatus::Fulfilled ); - push_send_ok(); assert!( handle_receipt( - &make_event( + &make_public_trade_event( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_RECEIPT_REQ), - "receipt-unacknowledged".to_string(), - Vec::new(), - ), + TradeListingMessageType::Receipt, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await, TradeReceipt { acknowledged: false, at: 1, - note: None, }, &parsed, Some("order-1"), @@ -4423,19 +5061,21 @@ mod tests { TradeOrderStatus::Fulfilled ); - push_send_ok(); assert!( handle_receipt( - &make_event( + &make_public_trade_event( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_RECEIPT_REQ), - "receipt-acknowledged".to_string(), - Vec::new(), - ), + TradeListingMessageType::Receipt, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await, TradeReceipt { acknowledged: true, at: 2, - note: None, }, &parsed, Some("order-1"), @@ -4463,20 +5103,20 @@ mod tests { TradeOrderStatus::Accepted, ) .await; - push_send_ok(); assert!( handle_fulfillment_update( - &make_event( + &make_public_trade_event( &seller_keys, - custom_trade_kind(KIND_TRADE_LISTING_FULFILLMENT_UPDATE_REQ), - "fulfillment-cancelled".to_string(), - Vec::new(), - ), + TradeListingMessageType::FulfillmentUpdate, + &listing_addr, + "order-2", + &buyer_pub, + &seller_pub, + Some(&cancelled_state), + ) + .await, TradeFulfillmentUpdate { status: TradeFulfillmentStatus::Cancelled, - tracking: None, - eta: None, - notes: None, }, &parsed, Some("order-2"), @@ -4553,21 +5193,16 @@ mod tests { TradeOrderStatus::Requested, ) .await; - let order_event = make_event( + let order_event = make_public_trade_event( &buyer_keys, - custom_trade_kind(KIND_TRADE_LISTING_ORDER_REQ), - "order".to_string(), - Vec::new(), - ); - let validated_listing_event = - RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(30402), "listing") - .custom_created_at(RadrootsNostrTimestamp::from(10_u64)) - .sign_with_keys(&seller_keys) - .expect("listing event"); - state - .lock() - .await - .mark_listing_validated(&listing_addr, &validated_listing_event.id.to_string()); + TradeListingMessageType::OrderRequest, + &listing_addr, + "order-2", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; let mismatch_payload = make_order( "order-2", @@ -4596,10 +5231,19 @@ mod tests { "not-seller", TradeOrderStatus::Requested, ); - push_fetch_events_ok(vec![validated_listing_event]); + let unauthorized_order_event = make_public_trade_event( + &buyer_keys, + TradeListingMessageType::OrderRequest, + &listing_addr, + "order-3", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; assert!(matches!( handle_order_request( - &order_event, + &unauthorized_order_event, unauthorized_payload, &parsed, Some("order-3"), @@ -4617,17 +5261,19 @@ mod tests { set_order_status(&state, "order-1", TradeOrderStatus::Requested).await; assert!(matches!( handle_order_revision( - &make_event( + &make_public_trade_event( &seller_keys, - custom_trade_kind(KIND_TRADE_LISTING_ORDER_REVISION_REQ), - "revision".to_string(), - Vec::new(), - ), + TradeListingMessageType::OrderRevision, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await, TradeOrderRevision { revision_id: "r-edge".to_string(), - order_id: "order-1".to_string(), changes: Vec::new(), - reason: None, }, &mismatched_parsed, Some("order-1"), @@ -4641,17 +5287,18 @@ mod tests { set_order_status(&state, "order-1", TradeOrderStatus::Questioned).await; assert!(matches!( handle_answer( - &make_event( + &make_public_trade_event( &seller_keys, - custom_trade_kind(KIND_TRADE_LISTING_ANSWER_RES), - "answer".to_string(), - Vec::new(), - ), + TradeListingMessageType::Answer, + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await, TradeAnswer { question_id: "q-edge".to_string(), - order_id: Some("order-1".to_string()), - listing_addr: None, - answer_text: "answer".to_string(), }, &mismatched_parsed, Some("order-1"), @@ -4735,7 +5382,7 @@ mod tests { .await; assert!(matches!( invalid_json_result, - Err(TradeListingDvmError::Serde(_)) + Err(TradeListingDvmError::InvalidPayload(_)) )); let invalid_envelope_event = make_event( @@ -4760,6 +5407,7 @@ mod tests { assert!(matches!( invalid_envelope_result, Err(TradeListingDvmError::InvalidEnvelope(_)) + | Err(TradeListingDvmError::InvalidPayload(_)) )); let missing_a_tags = vec![RadrootsNostrTag::custom( @@ -4769,11 +5417,11 @@ mod tests { let missing_a_event = make_event( &buyer_keys, custom_trade_kind(KIND_TRADE_LISTING_ORDER_REQ), - make_envelope_content( + make_canonical_envelope_content( TradeListingMessageType::OrderRequest, &listing_addr, Some(order_id), - payload_for_message( + payload_enum_for_message( TradeListingMessageType::OrderRequest, order_id, &listing_addr, @@ -4801,11 +5449,11 @@ mod tests { let invalid_addr_event = make_event( &buyer_keys, custom_trade_kind(KIND_TRADE_LISTING_ORDER_REQ), - make_envelope_content( + make_canonical_envelope_content( TradeListingMessageType::OrderRequest, invalid_addr, Some(order_id), - payload_for_message( + payload_enum_for_message( TradeListingMessageType::OrderRequest, order_id, invalid_addr, @@ -4856,11 +5504,16 @@ mod tests { let listing_validate_send_err_event = make_event( &buyer_keys, custom_trade_kind(KIND_TRADE_LISTING_VALIDATE_REQ), - make_envelope_content( + make_canonical_envelope_content( TradeListingMessageType::ListingValidateRequest, &listing_addr, None, - json!({"listing_event":{"id":"missing","relays":null}}), + TradeListingMessagePayload::ListingValidateRequest(TradeListingValidateRequest { + listing_event: Some(RadrootsNostrEventPtr { + id: "missing".to_string(), + relays: None, + }), + }), ), make_custom_tags(&rhi_pub, &listing_addr, None), ); @@ -4879,11 +5532,13 @@ mod tests { let listing_validate_fetch_err_event = make_event( &buyer_keys, custom_trade_kind(KIND_TRADE_LISTING_VALIDATE_REQ), - make_envelope_content( + make_canonical_envelope_content( TradeListingMessageType::ListingValidateRequest, &listing_addr, None, - json!({"listing_event": null}), + TradeListingMessagePayload::ListingValidateRequest(TradeListingValidateRequest { + listing_event: None, + }), ), make_custom_tags(&rhi_pub, &listing_addr, None), ); @@ -4962,11 +5617,11 @@ mod tests { let event = make_event( sender, custom_trade_kind(kind), - make_envelope_content( + make_canonical_envelope_content( message_type, &listing_addr, Some(order_id), - payload_for_message( + payload_enum_for_message( message_type, order_id, &listing_addr, @@ -4987,180 +5642,118 @@ mod tests { assert!(matches!(result, Err(TradeListingDvmError::MissingTag("d")))); } - let missing_order_cases: Vec<(TradeListingMessageType, u32)> = vec![ - ( - TradeListingMessageType::OrderResponse, - KIND_TRADE_LISTING_ORDER_RES, - ), - ( - TradeListingMessageType::OrderRevision, - KIND_TRADE_LISTING_ORDER_REVISION_REQ, - ), - ( - TradeListingMessageType::OrderRevisionAccept, - KIND_TRADE_LISTING_ORDER_REVISION_RES, - ), - ( - TradeListingMessageType::OrderRevisionDecline, - KIND_TRADE_LISTING_ORDER_REVISION_RES, - ), - ( - TradeListingMessageType::Question, - KIND_TRADE_LISTING_QUESTION_REQ, - ), - ( - TradeListingMessageType::Answer, - KIND_TRADE_LISTING_ANSWER_RES, - ), - ( - TradeListingMessageType::DiscountRequest, - KIND_TRADE_LISTING_DISCOUNT_REQ, - ), - ( - TradeListingMessageType::DiscountOffer, - KIND_TRADE_LISTING_DISCOUNT_OFFER_RES, - ), - ( - TradeListingMessageType::DiscountAccept, - KIND_TRADE_LISTING_DISCOUNT_ACCEPT_REQ, - ), - ( - TradeListingMessageType::DiscountDecline, - KIND_TRADE_LISTING_DISCOUNT_DECLINE_REQ, - ), - ( - TradeListingMessageType::Cancel, - KIND_TRADE_LISTING_CANCEL_REQ, - ), - ( - TradeListingMessageType::FulfillmentUpdate, - KIND_TRADE_LISTING_FULFILLMENT_UPDATE_REQ, - ), - ( - TradeListingMessageType::Receipt, - KIND_TRADE_LISTING_RECEIPT_REQ, - ), + let missing_order_cases: Vec<TradeListingMessageType> = vec![ + TradeListingMessageType::OrderResponse, + TradeListingMessageType::OrderRevision, + TradeListingMessageType::OrderRevisionAccept, + TradeListingMessageType::OrderRevisionDecline, + TradeListingMessageType::Question, + TradeListingMessageType::Answer, + TradeListingMessageType::DiscountRequest, + TradeListingMessageType::DiscountOffer, + TradeListingMessageType::DiscountAccept, + TradeListingMessageType::DiscountDecline, + TradeListingMessageType::Cancel, + TradeListingMessageType::FulfillmentUpdate, + TradeListingMessageType::Receipt, ]; - for (message_type, kind) in missing_order_cases { + for message_type in missing_order_cases { let sender = sender_for_message(message_type, &seller_keys, &buyer_keys); - let event = make_event( + let (event, tags) = make_handle_event_trade_event( sender, - custom_trade_kind(kind), - make_envelope_content( - message_type, - &listing_addr, - Some(missing_order_id), - payload_for_message( - message_type, - missing_order_id, - &listing_addr, - &buyer_pub, - &seller_pub, - ), - ), - make_custom_tags(&rhi_pub, &listing_addr, Some(missing_order_id)), - ); + message_type, + &listing_addr, + missing_order_id, + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; let result = handle_event( event, - make_custom_tags(&rhi_pub, &listing_addr, Some(missing_order_id)), + tags, rhi_keys.clone(), client.clone(), state.clone(), ) .await; - assert!(matches!( - result, - Err(TradeListingDvmError::State( - TradeListingStateError::MissingOrder - )) - )); + assert!( + matches!( + result, + Err(TradeListingDvmError::State( + TradeListingStateError::MissingOrder + )) + ), + "{message_type:?}: {result:?}" + ); } - let transition_cases: Vec<(TradeListingMessageType, u32, TradeOrderStatus)> = vec![ + let transition_cases: Vec<(TradeListingMessageType, TradeOrderStatus)> = vec![ ( TradeListingMessageType::OrderResponse, - KIND_TRADE_LISTING_ORDER_RES, TradeOrderStatus::Completed, ), ( TradeListingMessageType::OrderRevision, - KIND_TRADE_LISTING_ORDER_REVISION_REQ, TradeOrderStatus::Completed, ), ( TradeListingMessageType::OrderRevisionAccept, - KIND_TRADE_LISTING_ORDER_REVISION_RES, TradeOrderStatus::Completed, ), ( TradeListingMessageType::OrderRevisionDecline, - KIND_TRADE_LISTING_ORDER_REVISION_RES, TradeOrderStatus::Completed, ), ( TradeListingMessageType::Question, - KIND_TRADE_LISTING_QUESTION_REQ, TradeOrderStatus::Completed, ), ( TradeListingMessageType::Answer, - KIND_TRADE_LISTING_ANSWER_RES, TradeOrderStatus::Completed, ), ( TradeListingMessageType::DiscountOffer, - KIND_TRADE_LISTING_DISCOUNT_OFFER_RES, TradeOrderStatus::Completed, ), ( TradeListingMessageType::DiscountAccept, - KIND_TRADE_LISTING_DISCOUNT_ACCEPT_REQ, TradeOrderStatus::Completed, ), ( TradeListingMessageType::DiscountDecline, - KIND_TRADE_LISTING_DISCOUNT_DECLINE_REQ, TradeOrderStatus::Completed, ), ( TradeListingMessageType::Cancel, - KIND_TRADE_LISTING_CANCEL_REQ, TradeOrderStatus::Completed, ), ( TradeListingMessageType::FulfillmentUpdate, - KIND_TRADE_LISTING_FULFILLMENT_UPDATE_REQ, TradeOrderStatus::Completed, ), ( TradeListingMessageType::Receipt, - KIND_TRADE_LISTING_RECEIPT_REQ, TradeOrderStatus::Requested, ), ]; - for (message_type, kind, status_before) in transition_cases { + for (message_type, status_before) in transition_cases { set_order_status(&state, order_id, status_before).await; let sender = sender_for_message(message_type, &seller_keys, &buyer_keys); - let event = make_event( + let (event, tags) = make_handle_event_trade_event( sender, - custom_trade_kind(kind), - make_envelope_content( - message_type, - &listing_addr, - Some(order_id), - payload_for_message( - message_type, - order_id, - &listing_addr, - &buyer_pub, - &seller_pub, - ), - ), - make_custom_tags(&rhi_pub, &listing_addr, Some(order_id)), - ); + message_type, + &listing_addr, + order_id, + &buyer_pub, + &seller_pub, + Some(&state), + ) + .await; let result = handle_event( event, - make_custom_tags(&rhi_pub, &listing_addr, Some(order_id)), + tags, rhi_keys.clone(), client.clone(), state.clone(), diff --git a/src/features/trade_listing/state.rs b/src/features/trade_listing/state.rs @@ -21,6 +21,12 @@ pub struct TradeOrderState { pub buyer_pubkey: String, pub seller_pubkey: String, pub status: TradeOrderStatus, + #[serde(default)] + pub listing_snapshot_event_id: Option<String>, + #[serde(default)] + pub root_event_id: Option<String>, + #[serde(default)] + pub last_event_id: Option<String>, pub seen_event_ids: HashSet<String>, } @@ -29,6 +35,12 @@ pub struct ValidatedListingState { pub event_id: String, } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ListingEventState { + pub event_id: String, + pub kind: u32, +} + #[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct TradeListingState { #[serde(default)] @@ -36,6 +48,8 @@ pub struct TradeListingState { #[serde(default)] validated_listing_events: HashMap<String, ValidatedListingState>, #[serde(default)] + listing_events: HashMap<String, ListingEventState>, + #[serde(default)] seen_non_order_event_ids: HashSet<String>, orders: HashMap<String, TradeOrderState>, last_event_created_at: Option<u32>, @@ -140,6 +154,22 @@ impl TradeListingRuntime { } impl TradeListingState { + pub fn upsert_listing_event(&mut self, listing_addr: &str, event_id: &str, kind: u32) { + self.listing_events.insert( + listing_addr.to_string(), + ListingEventState { + event_id: event_id.to_string(), + kind, + }, + ); + } + + pub fn listing_event_id(&self, listing_addr: &str) -> Option<&str> { + self.listing_events + .get(listing_addr) + .map(|listing| listing.event_id.as_str()) + } + pub fn mark_listing_validated(&mut self, listing_addr: &str, event_id: &str) { self.validated_listings.insert(listing_addr.to_string()); self.validated_listing_events.insert( @@ -308,9 +338,9 @@ pub enum TradeListingRuntimeError { #[cfg_attr(coverage_nightly, coverage(off))] mod tests { use super::{ - PersistedTradeListingState, TradeListingRuntime, TradeListingRuntimeConfig, - TradeListingRuntimeError, TradeListingState, TradeListingStateError, TradeOrderState, - ValidatedListingState, + ListingEventState, PersistedTradeListingState, TradeListingRuntime, + TradeListingRuntimeConfig, TradeListingRuntimeError, TradeListingState, + TradeListingStateError, TradeOrderState, ValidatedListingState, }; use radroots_events::trade::RadrootsTradeOrderStatus as TradeOrderStatus; use std::collections::{HashMap, HashSet}; @@ -340,6 +370,9 @@ mod tests { buyer_pubkey: "buyer".into(), seller_pubkey: "seller".into(), status: TradeOrderStatus::Requested, + listing_snapshot_event_id: Some("evt-listing-1".into()), + root_event_id: Some("evt-root-1".into()), + last_event_id: Some("evt-root-1".into()), seen_event_ids: Default::default(), }; state.insert_order(order); @@ -349,6 +382,8 @@ mod tests { assert!(!state.is_non_order_event_seen("evt-non-order")); assert!(state.mark_non_order_event_seen("evt-non-order")); assert!(state.is_non_order_event_seen("evt-non-order")); + state.upsert_listing_event("addr", "evt-listing-1", 30402); + assert_eq!(state.listing_event_id("addr"), Some("evt-listing-1")); assert_eq!(state.replay_since(1_000, 300, 60), 700); state.observe_event_created_at(900); @@ -461,6 +496,7 @@ mod tests { state: TradeListingState { validated_listings: ["addr".to_string()].into_iter().collect(), validated_listing_events: HashMap::new(), + listing_events: HashMap::new(), seen_non_order_event_ids: HashSet::new(), orders: HashMap::new(), last_event_created_at: Some(321), @@ -496,6 +532,13 @@ mod tests { event_id: "evt-listing-1".to_string(), }, )]), + listing_events: HashMap::from([( + "addr".to_string(), + ListingEventState { + event_id: "evt-listing-1".to_string(), + kind: 30402, + }, + )]), seen_non_order_event_ids: HashSet::new(), orders: HashMap::new(), last_event_created_at: None, diff --git a/src/features/trade_listing/subscriber.rs b/src/features/trade_listing/subscriber.rs @@ -5,7 +5,9 @@ use std::convert::TryFrom; use std::time::Duration; use anyhow::{Result, anyhow}; -use radroots_events::kinds::{TRADE_LISTING_KINDS, is_trade_service_kind}; +use radroots_events::kinds::{ + KIND_LISTING, KIND_LISTING_DRAFT, TRADE_LISTING_KINDS, is_trade_service_kind, +}; use radroots_nostr::prelude::{ RadrootsNostrClient, RadrootsNostrEvent, RadrootsNostrFilter, RadrootsNostrKeys, RadrootsNostrKind, RadrootsNostrRelayPoolNotification, RadrootsNostrTag, @@ -254,12 +256,16 @@ pub async fn subscriber( runtime: TradeListingRuntime, mut stop_rx: watch::Receiver<bool>, ) -> Result<()> { + let subscribed_kinds = [KIND_LISTING, KIND_LISTING_DRAFT] + .into_iter() + .chain(TRADE_LISTING_KINDS) + .collect::<Vec<_>>(); info!( - "Starting subscriber for trade listing DVM kinds: {:?}", - TRADE_LISTING_KINDS + "Starting subscriber for trade listing and public trade kinds: {:?}", + subscribed_kinds ); - let kinds: Vec<RadrootsNostrKind> = TRADE_LISTING_KINDS + let kinds: Vec<RadrootsNostrKind> = subscribed_kinds .iter() .map(|kind| u16::try_from(*kind).expect("trade listing kinds fit in nostr custom range")) .map(RadrootsNostrKind::Custom) diff --git a/src/lib.rs b/src/lib.rs @@ -9,7 +9,7 @@ pub mod rhi; pub use cli::Args as cli_args; use anyhow::Result; -use radroots_events::kinds::TRADE_LISTING_KINDS; +use radroots_events::kinds::{KIND_LISTING, KIND_LISTING_DRAFT, TRADE_LISTING_KINDS}; use std::time::Duration; use crate::features::trade_listing::state::{TradeListingRuntime, TradeListingRuntimeConfig}; @@ -130,9 +130,9 @@ pub async fn run_rhi(settings: &config::Settings, args: &cli_args) -> Result<()> let md = settings.metadata.clone(); if !relays.is_empty() { - let handler_kinds = TRADE_LISTING_KINDS - .iter() - .map(|kind| *kind as u32) + let handler_kinds = [KIND_LISTING, KIND_LISTING_DRAFT] + .into_iter() + .chain(TRADE_LISTING_KINDS) .collect(); let handler_spec = RadrootsNostrApplicationHandlerSpec { kinds: handler_kinds,