rhi

Coordinated trade for connected markets
git clone https://radroots.dev/git/rhi.git
Log | Files | Refs | README | LICENSE

commit af77cb3154cb018839b0ddf6ea6f580af1f98f81
parent fd4d568adc873d60da48add3ff54fde7fad9c497
Author: triesap <tyson@radroots.org>
Date:   Fri, 27 Mar 2026 20:00:21 +0000

trade listing: keep workflow state across reconnects

Diffstat:
Msrc/features/trade_listing/handlers/dvm.rs | 1650+++++++++++++++++++++++++++++++++++++++++++++++--------------------------------
Msrc/features/trade_listing/state.rs | 43+++++++++++++++++++++++++++++++++++++++++--
Msrc/features/trade_listing/subscriber.rs | 171++++++++++++++++++++++++++++++++++++++++++-------------------------------------
Msrc/lib.rs | 25+++++++++++++------------
Msrc/main.rs | 15+++++++++++----
Msrc/rhi.rs | 49++++++++++++++++++++++++++++++++++++-------------
6 files changed, 1176 insertions(+), 777 deletions(-)

diff --git a/src/features/trade_listing/handlers/dvm.rs b/src/features/trade_listing/handlers/dvm.rs @@ -7,7 +7,7 @@ use radroots_events::kinds::KIND_FARM; use radroots_events::listing::RadrootsListingFarmRef; use radroots_nostr::prelude::{ RadrootsNostrClient, RadrootsNostrEvent, RadrootsNostrEventBuilder, RadrootsNostrFilter, - RadrootsNostrKind, RadrootsNostrKeys, RadrootsNostrTag, radroots_event_from_nostr, + RadrootsNostrKeys, RadrootsNostrKind, RadrootsNostrTag, radroots_event_from_nostr, radroots_nostr_build_event, radroots_nostr_build_event_job_feedback, radroots_nostr_fetch_event_by_id, radroots_nostr_parse_pubkey, radroots_nostr_send_event, }; @@ -70,8 +70,9 @@ struct DvmTestHooks { fetch_events_results: std::collections::VecDeque<Result<Vec<RadrootsNostrEvent>, TradeListingDvmError>>, send_event_results: std::collections::VecDeque<Result<(), TradeListingDvmError>>, - validate_listing_results: - std::collections::VecDeque<Result<RadrootsListingFarmRef, TradeListingValidationError>>, + validate_listing_results: std::collections::VecDeque< + Result<(String, RadrootsListingFarmRef), TradeListingValidationError>, + >, farm_validation_results: std::collections::VecDeque<Result<Vec<TradeListingValidationError>, TradeListingDvmError>>, } @@ -113,7 +114,8 @@ fn pop_send_event_hook() -> Option<Result<(), TradeListingDvmError>> { } #[cfg(test)] -fn pop_validate_listing_hook() -> Option<Result<RadrootsListingFarmRef, TradeListingValidationError>> { +fn pop_validate_listing_hook() +-> Option<Result<(String, RadrootsListingFarmRef), TradeListingValidationError>> { dvm_test_hooks() .lock() .expect("dvm test hooks lock") @@ -122,7 +124,8 @@ fn pop_validate_listing_hook() -> Option<Result<RadrootsListingFarmRef, TradeLis } #[cfg(test)] -fn pop_farm_validation_hook() -> Option<Result<Vec<TradeListingValidationError>, TradeListingDvmError>> { +fn pop_farm_validation_hook() +-> Option<Result<Vec<TradeListingValidationError>, TradeListingDvmError>> { dvm_test_hooks() .lock() .expect("dvm test hooks lock") @@ -164,13 +167,15 @@ fn take_send_event_hook() -> Option<Result<(), TradeListingDvmError>> { } #[cfg(test)] -fn take_validate_listing_hook() -> Option<Result<RadrootsListingFarmRef, TradeListingValidationError>> { +fn take_validate_listing_hook() +-> Option<Result<(String, RadrootsListingFarmRef), TradeListingValidationError>> { pop_validate_listing_hook() } #[cfg(not(test))] #[cfg_attr(coverage_nightly, coverage(off))] -fn take_validate_listing_hook() -> Option<Result<RadrootsListingFarmRef, TradeListingValidationError>> { +fn take_validate_listing_hook() +-> Option<Result<(String, RadrootsListingFarmRef), TradeListingValidationError>> { None } @@ -219,13 +224,14 @@ async fn send_event_io( #[cfg_attr(all(not(test), coverage_nightly), coverage(off))] fn validate_listing_event_io( event: &RadrootsNostrEvent, -) -> Result<RadrootsListingFarmRef, TradeListingValidationError> { +) -> Result<(String, RadrootsListingFarmRef), TradeListingValidationError> { let hook_result = take_validate_listing_hook(); - let farm = match hook_result { + let validated = match hook_result { Some(result) => result?, - None => validate_listing_event(&radroots_event_from_nostr(event)).map(|listing| listing.listing.farm)?, + None => validate_listing_event(&radroots_event_from_nostr(event)) + .map(|listing| (listing.listing_addr, listing.listing.farm))?, }; - Ok(farm) + Ok(validated) } pub async fn handle_event( @@ -479,13 +485,19 @@ async fn handle_listing_validate_request( let errors = if let Some(event) = listing_event { match validate_listing_event_io(&event) { - Ok(farm) => { - let errors = validate_farm_dependencies(client, &farm).await?; - if errors.is_empty() { - let mut state = state.lock().await; - state.mark_listing_validated(listing_addr); + Ok((validated_listing_addr, farm)) => { + if validated_listing_addr != listing_addr { + vec![TradeListingValidationError::ListingEventNotFound { + listing_addr: listing_addr.to_string(), + }] + } else { + let errors = validate_farm_dependencies(client, &farm).await?; + if errors.is_empty() { + let mut state = state.lock().await; + state.mark_listing_validated(listing_addr); + } + errors } - errors } Err(err) => vec![err], } @@ -1268,14 +1280,14 @@ pub async fn handle_error( #[cfg_attr(coverage_nightly, coverage(off))] mod tests { use super::{ - DvmTestHooks, TradeListingDvmError, dvm_test_hooks, ensure_transition, fetch_events_io, - fetch_event_by_id_io, fetch_latest_event_by_kind, fetch_listing_by_addr, handle_answer, - handle_cancel, handle_discount_decision, handle_discount_offer, handle_discount_request, - handle_error, handle_event, - handle_fulfillment_update, handle_listing_validate_request, handle_order_request, - handle_order_response, handle_order_revision, handle_order_revision_response, - handle_question, handle_receipt, parse_payload, send_envelope, send_event_io, tag_has_value, - tag_value, validate_farm_dependencies, validate_listing_event_io, + DvmTestHooks, TradeListingDvmError, dvm_test_hooks, ensure_transition, + fetch_event_by_id_io, fetch_events_io, fetch_latest_event_by_kind, fetch_listing_by_addr, + handle_answer, handle_cancel, handle_discount_decision, handle_discount_offer, + handle_discount_request, handle_error, handle_event, handle_fulfillment_update, + handle_listing_validate_request, handle_order_request, handle_order_response, + handle_order_revision, handle_order_revision_response, handle_question, handle_receipt, + parse_payload, send_envelope, send_event_io, tag_has_value, tag_value, + validate_farm_dependencies, validate_listing_event_io, }; use crate::features::trade_listing::state::{ TradeListingState, TradeListingStateError, TradeOrderState, @@ -1286,7 +1298,7 @@ mod tests { use radroots_nostr::error::RadrootsNostrError; use radroots_nostr::prelude::{ RadrootsNostrClient, RadrootsNostrEvent, RadrootsNostrEventBuilder, RadrootsNostrFilter, - RadrootsNostrKind, RadrootsNostrKeys, RadrootsNostrTag, RadrootsNostrTagKind, + RadrootsNostrKeys, RadrootsNostrKind, RadrootsNostrTag, RadrootsNostrTagKind, RadrootsNostrTimestamp, }; use radroots_trade::listing::dvm::{ @@ -1305,8 +1317,8 @@ mod tests { }; use radroots_trade::listing::order::{ TradeAnswer, TradeDiscountDecision, TradeDiscountOffer, TradeDiscountRequest, - TradeFulfillmentStatus, TradeFulfillmentUpdate, TradeOrder, TradeOrderStatus, - TradeOrderRevision, TradeQuestion, TradeReceipt, + TradeFulfillmentStatus, TradeFulfillmentUpdate, TradeOrder, TradeOrderRevision, + TradeOrderStatus, TradeQuestion, TradeReceipt, }; use radroots_trade::listing::validation::TradeListingValidationError; use serde_json::json; @@ -1350,15 +1362,17 @@ mod tests { ))); } - fn push_validate_listing_ok(farm: RadrootsListingFarmRef) { + fn push_validate_listing_ok(listing_addr: impl Into<String>, farm: RadrootsListingFarmRef) { dvm_test_hooks() .lock() .expect("hooks") .validate_listing_results - .push_back(Ok(farm)); + .push_back(Ok((listing_addr.into(), farm))); } - fn push_farm_validation_result(result: Result<Vec<TradeListingValidationError>, TradeListingDvmError>) { + fn push_farm_validation_result( + result: Result<Vec<TradeListingValidationError>, TradeListingDvmError>, + ) { dvm_test_hooks() .lock() .expect("hooks") @@ -1375,7 +1389,10 @@ mod tests { } fn listing_addr_for_seller(seller: &RadrootsNostrKeys) -> String { - format!("30402:{}:AAAAAAAAAAAAAAAAAAAAAA", seller.public_key().to_hex()) + format!( + "30402:{}:AAAAAAAAAAAAAAAAAAAAAA", + seller.public_key().to_hex() + ) } fn make_client(keys: &RadrootsNostrKeys) -> RadrootsNostrClient { @@ -1428,7 +1445,13 @@ mod tests { let state = Arc::new(AsyncMutex::new(TradeListingState::default())); let mut locked = state.lock().await; locked.mark_listing_validated(listing_addr); - locked.insert_order(make_order_state(order_id, listing_addr, buyer, seller, status)); + locked.insert_order(make_order_state( + order_id, + listing_addr, + buyer, + seller, + status, + )); drop(locked); state } @@ -1454,10 +1477,20 @@ mod tests { order.seen_event_ids.insert(event_id); } - fn make_custom_tags(recipient: &str, listing_addr: &str, order_id: Option<&str>) -> Vec<RadrootsNostrTag> { + fn make_custom_tags( + recipient: &str, + listing_addr: &str, + order_id: Option<&str>, + ) -> Vec<RadrootsNostrTag> { let mut tags = vec![ - RadrootsNostrTag::custom(RadrootsNostrTagKind::custom("p"), vec![recipient.to_string()]), - RadrootsNostrTag::custom(RadrootsNostrTagKind::custom("a"), vec![listing_addr.to_string()]), + RadrootsNostrTag::custom( + RadrootsNostrTagKind::custom("p"), + vec![recipient.to_string()], + ), + RadrootsNostrTag::custom( + RadrootsNostrTagKind::custom("a"), + vec![listing_addr.to_string()], + ), ]; if let Some(order_id) = order_id { tags.push(RadrootsNostrTag::custom( @@ -1525,30 +1558,26 @@ mod tests { seller_pub: &str, ) -> serde_json::Value { match message_type { - TradeListingMessageType::OrderRequest => { - serde_json::to_value(make_order( - order_id, - listing_addr, - buyer_pub, - seller_pub, - TradeOrderStatus::Requested, - )) - .expect("order request payload") - } + TradeListingMessageType::OrderRequest => serde_json::to_value(make_order( + order_id, + listing_addr, + buyer_pub, + seller_pub, + TradeOrderStatus::Requested, + )) + .expect("order request payload"), TradeListingMessageType::OrderResponse => serde_json::to_value(TradeOrderResponse { accepted: true, reason: None, }) .expect("order response payload"), - TradeListingMessageType::OrderRevision => { - serde_json::to_value(TradeOrderRevision { - revision_id: "r-matrix".to_string(), - order_id: order_id.to_string(), - changes: Vec::new(), - reason: None, - }) - .expect("order revision payload") - } + TradeListingMessageType::OrderRevision => serde_json::to_value(TradeOrderRevision { + revision_id: "r-matrix".to_string(), + order_id: order_id.to_string(), + changes: Vec::new(), + reason: None, + }) + .expect("order revision payload"), TradeListingMessageType::OrderRevisionAccept => { serde_json::to_value(TradeOrderRevisionResponse { accepted: true, @@ -1586,15 +1615,13 @@ mod tests { }) .expect("discount request payload") } - TradeListingMessageType::DiscountOffer => { - serde_json::to_value(TradeDiscountOffer { - discount_id: "d-matrix".to_string(), - order_id: order_id.to_string(), - value: sample_discount_value(), - conditions: None, - }) - .expect("discount offer payload") - } + TradeListingMessageType::DiscountOffer => serde_json::to_value(TradeDiscountOffer { + discount_id: "d-matrix".to_string(), + order_id: order_id.to_string(), + value: sample_discount_value(), + conditions: None, + }) + .expect("discount offer payload"), TradeListingMessageType::DiscountAccept => { serde_json::to_value(TradeDiscountDecision::Accept { value: sample_discount_value(), @@ -1636,8 +1663,12 @@ mod tests { assert!(ensure_transition(TradeOrderStatus::Requested, TradeOrderStatus::Revised).is_ok()); assert!(ensure_transition(TradeOrderStatus::Declined, TradeOrderStatus::Accepted).is_err()); - assert!(ensure_transition(TradeOrderStatus::Fulfilled, TradeOrderStatus::Completed).is_ok()); - assert!(ensure_transition(TradeOrderStatus::Completed, TradeOrderStatus::Requested).is_err()); + assert!( + ensure_transition(TradeOrderStatus::Fulfilled, TradeOrderStatus::Completed).is_ok() + ); + assert!( + ensure_transition(TradeOrderStatus::Completed, TradeOrderStatus::Requested).is_err() + ); assert!(ensure_transition(TradeOrderStatus::Draft, TradeOrderStatus::Draft).is_ok()); let tags = vec![ @@ -1649,7 +1680,8 @@ mod tests { assert!(tag_has_value(&tags, "p", "pk")); assert!(!tag_has_value(&tags, "p", "miss")); - let parsed: Result<TradeOrderResponse, _> = parse_payload(json!({"accepted":true,"reason":null})); + let parsed: Result<TradeOrderResponse, _> = + parse_payload(json!({"accepted":true,"reason":null})); assert!(parsed.is_ok()); let invalid: Result<TradeOrderResponse, _> = parse_payload(json!({"accepted":"true"})); assert!(invalid.is_err()); @@ -1662,28 +1694,48 @@ mod tests { assert!(ensure_transition(TradeOrderStatus::Draft, TradeOrderStatus::Requested).is_ok()); assert!(ensure_transition(TradeOrderStatus::Draft, TradeOrderStatus::Accepted).is_err()); - assert!(ensure_transition(TradeOrderStatus::Validated, TradeOrderStatus::Requested).is_ok()); - assert!(ensure_transition(TradeOrderStatus::Validated, TradeOrderStatus::Accepted).is_err()); + assert!( + ensure_transition(TradeOrderStatus::Validated, TradeOrderStatus::Requested).is_ok() + ); + assert!( + ensure_transition(TradeOrderStatus::Validated, TradeOrderStatus::Accepted).is_err() + ); assert!(ensure_transition(TradeOrderStatus::Requested, TradeOrderStatus::Accepted).is_ok()); - assert!(ensure_transition(TradeOrderStatus::Requested, TradeOrderStatus::Fulfilled).is_err()); + assert!( + ensure_transition(TradeOrderStatus::Requested, TradeOrderStatus::Fulfilled).is_err() + ); - assert!(ensure_transition(TradeOrderStatus::Questioned, TradeOrderStatus::Requested).is_ok()); - assert!(ensure_transition(TradeOrderStatus::Questioned, TradeOrderStatus::Accepted).is_err()); + assert!( + ensure_transition(TradeOrderStatus::Questioned, TradeOrderStatus::Requested).is_ok() + ); + assert!( + ensure_transition(TradeOrderStatus::Questioned, TradeOrderStatus::Accepted).is_err() + ); assert!(ensure_transition(TradeOrderStatus::Revised, TradeOrderStatus::Declined).is_ok()); assert!(ensure_transition(TradeOrderStatus::Revised, TradeOrderStatus::Fulfilled).is_err()); assert!(ensure_transition(TradeOrderStatus::Accepted, TradeOrderStatus::Fulfilled).is_ok()); - assert!(ensure_transition(TradeOrderStatus::Accepted, TradeOrderStatus::Requested).is_err()); + assert!( + ensure_transition(TradeOrderStatus::Accepted, TradeOrderStatus::Requested).is_err() + ); assert!(ensure_transition(TradeOrderStatus::Declined, TradeOrderStatus::Accepted).is_err()); - assert!(ensure_transition(TradeOrderStatus::Cancelled, TradeOrderStatus::Requested).is_err()); + assert!( + ensure_transition(TradeOrderStatus::Cancelled, TradeOrderStatus::Requested).is_err() + ); - assert!(ensure_transition(TradeOrderStatus::Fulfilled, TradeOrderStatus::Completed).is_ok()); - assert!(ensure_transition(TradeOrderStatus::Fulfilled, TradeOrderStatus::Accepted).is_err()); + assert!( + ensure_transition(TradeOrderStatus::Fulfilled, TradeOrderStatus::Completed).is_ok() + ); + assert!( + ensure_transition(TradeOrderStatus::Fulfilled, TradeOrderStatus::Accepted).is_err() + ); - assert!(ensure_transition(TradeOrderStatus::Completed, TradeOrderStatus::Cancelled).is_err()); + assert!( + ensure_transition(TradeOrderStatus::Completed, TradeOrderStatus::Cancelled).is_err() + ); } #[tokio::test] @@ -1702,9 +1754,13 @@ mod tests { )], ); push_fetch_events_ok(vec![event.clone()]); - let fetched = fetch_events_io(&client, RadrootsNostrFilter::new(), std::time::Duration::from_secs(1)) - .await - .expect("fetch hook"); + let fetched = fetch_events_io( + &client, + RadrootsNostrFilter::new(), + std::time::Duration::from_secs(1), + ) + .await + .expect("fetch hook"); assert_eq!(fetched.len(), 1); dvm_test_hooks() @@ -1712,7 +1768,9 @@ mod tests { .expect("hooks") .fetch_event_by_id_results .push_back(Ok(event.clone())); - let by_id = super::fetch_event_by_id_io(&client, "id").await.expect("by id"); + let by_id = super::fetch_event_by_id_io(&client, "id") + .await + .expect("by id"); assert_eq!(by_id.id, event.id); push_send_ok(); @@ -1728,9 +1786,10 @@ mod tests { pubkey: rhi_keys.public_key().to_hex(), d_tag: "farmtag".to_string(), }; - push_validate_listing_ok(farm.clone()); + push_validate_listing_ok(listing_addr.clone(), farm.clone()); let validated = validate_listing_event_io(&event).expect("validate hook"); - assert_eq!(validated.pubkey, farm.pubkey); + assert_eq!(validated.0, listing_addr); + assert_eq!(validated.1.pubkey, farm.pubkey); assert_eq!(listing_addr.contains(':',), true); } @@ -1779,14 +1838,16 @@ mod tests { ); push_fetch_events_ok(vec![profile_event]); push_fetch_events_ok(vec![record_event]); - let ok = validate_farm_dependencies(&client, &farm).await.expect("ok deps"); + let ok = validate_farm_dependencies(&client, &farm) + .await + .expect("ok deps"); assert!(ok.is_empty()); } #[tokio::test] async fn handle_listing_validate_request_paths_are_covered() { let _guard = test_guard(); - let (rhi_keys, seller_keys, _) = make_keys(); + let (rhi_keys, seller_keys, buyer_keys) = make_keys(); let client = make_client(&rhi_keys); let listing_addr = listing_addr_for_seller(&seller_keys); let state = Arc::new(AsyncMutex::new(TradeListingState::default())); @@ -1805,26 +1866,35 @@ mod tests { relays: None, }), }; - assert!(handle_listing_validate_request(&event, payload, &listing_addr, &client, &state) - .await - .is_ok()); + assert!( + handle_listing_validate_request(&event, payload, &listing_addr, &client, &state) + .await + .is_ok() + ); push_fetch_events_ok(Vec::new()); push_send_ok(); - let payload = TradeListingValidateRequest { listing_event: None }; - assert!(handle_listing_validate_request(&event, payload, &listing_addr, &client, &state) - .await - .is_ok()); + let payload = TradeListingValidateRequest { + listing_event: None, + }; + assert!( + handle_listing_validate_request(&event, payload, &listing_addr, &client, &state) + .await + .is_ok() + ); dvm_test_hooks() .lock() .expect("hooks") .fetch_event_by_id_results .push_back(Ok(event.clone())); - push_validate_listing_ok(RadrootsListingFarmRef { - pubkey: seller_keys.public_key().to_hex(), - d_tag: "farmtag".to_string(), - }); + push_validate_listing_ok( + listing_addr.clone(), + RadrootsListingFarmRef { + pubkey: seller_keys.public_key().to_hex(), + d_tag: "farmtag".to_string(), + }, + ); push_farm_validation_result(Ok(Vec::new())); push_send_ok(); let payload = TradeListingValidateRequest { @@ -1833,10 +1903,51 @@ mod tests { relays: None, }), }; - assert!(handle_listing_validate_request(&event, payload, &listing_addr, &client, &state) - .await - .is_ok()); + assert!( + handle_listing_validate_request(&event, payload, &listing_addr, &client, &state) + .await + .is_ok() + ); assert!(state.lock().await.is_listing_validated(&listing_addr)); + + let other_listing_addr = listing_addr_for_seller(&rhi_keys); + dvm_test_hooks() + .lock() + .expect("hooks") + .fetch_event_by_id_results + .push_back(Ok(event.clone())); + push_validate_listing_ok( + other_listing_addr, + RadrootsListingFarmRef { + pubkey: seller_keys.public_key().to_hex(), + d_tag: "farmtag".to_string(), + }, + ); + push_send_ok(); + let payload = TradeListingValidateRequest { + listing_event: Some(RadrootsNostrEventPtr { + id: event.id.to_hex(), + relays: None, + }), + }; + let mismatch_listing_addr = listing_addr_for_seller(&buyer_keys); + assert!( + handle_listing_validate_request( + &event, + payload, + &mismatch_listing_addr, + &client, + &state, + ) + .await + .is_ok() + ); + assert!( + !state + .lock() + .await + .is_listing_validated(&mismatch_listing_addr) + ); } #[tokio::test] @@ -1866,16 +1977,18 @@ mod tests { &seller_pub, TradeOrderStatus::Requested, ); - assert!(handle_order_request( - &order_event, - order_payload, - &listing_addr_parsed, - Some(order_id), - &client, - &state - ) - .await - .is_ok()); + assert!( + handle_order_request( + &order_event, + order_payload, + &listing_addr_parsed, + Some(order_id), + &client, + &state + ) + .await + .is_ok() + ); let response_event = make_event( &seller_keys, @@ -1884,21 +1997,28 @@ mod tests { Vec::new(), ); push_send_ok(); - assert!(handle_order_response( - &response_event, - TradeOrderResponse { - accepted: true, - reason: None, - }, - &listing_addr_parsed, - Some(order_id), - &client, - &state - ) - .await - .is_ok()); + assert!( + handle_order_response( + &response_event, + TradeOrderResponse { + accepted: true, + reason: None, + }, + &listing_addr_parsed, + Some(order_id), + &client, + &state + ) + .await + .is_ok() + ); - state.lock().await.get_order_mut(order_id).expect("order").status = TradeOrderStatus::Requested; + state + .lock() + .await + .get_order_mut(order_id) + .expect("order") + .status = TradeOrderStatus::Requested; let revision_event = make_event( &seller_keys, RadrootsNostrKind::Custom(KIND_TRADE_LISTING_ORDER_REVISION_REQ), @@ -1906,21 +2026,23 @@ mod tests { Vec::new(), ); push_send_ok(); - assert!(handle_order_revision( - &revision_event, - TradeOrderRevision { - revision_id: "r1".to_string(), - order_id: order_id.to_string(), - changes: Vec::new(), - reason: None, - }, - &listing_addr_parsed, - Some(order_id), - &client, - &state - ) - .await - .is_ok()); + assert!( + handle_order_revision( + &revision_event, + TradeOrderRevision { + revision_id: "r1".to_string(), + order_id: order_id.to_string(), + changes: Vec::new(), + reason: None, + }, + &listing_addr_parsed, + Some(order_id), + &client, + &state + ) + .await + .is_ok() + ); let revision_response_event = make_event( &buyer_keys, @@ -1929,22 +2051,29 @@ mod tests { Vec::new(), ); push_send_ok(); - assert!(handle_order_revision_response( - &revision_response_event, - TradeListingMessageType::OrderRevisionAccept, - TradeOrderRevisionResponse { - accepted: true, - reason: None, - }, - &listing_addr_parsed, - Some(order_id), - &client, - &state - ) - .await - .is_ok()); + assert!( + handle_order_revision_response( + &revision_response_event, + TradeListingMessageType::OrderRevisionAccept, + TradeOrderRevisionResponse { + accepted: true, + reason: None, + }, + &listing_addr_parsed, + Some(order_id), + &client, + &state + ) + .await + .is_ok() + ); - state.lock().await.get_order_mut(order_id).expect("order").status = TradeOrderStatus::Requested; + state + .lock() + .await + .get_order_mut(order_id) + .expect("order") + .status = TradeOrderStatus::Requested; let question_event = make_event( &buyer_keys, RadrootsNostrKind::Custom(KIND_TRADE_LISTING_QUESTION_REQ), @@ -1952,21 +2081,23 @@ mod tests { Vec::new(), ); push_send_ok(); - assert!(handle_question( - &question_event, - TradeQuestion { - question_id: "q1".to_string(), - order_id: Some(order_id.to_string()), - listing_addr: Some(listing_addr.clone()), - question_text: "what".to_string(), - }, - &listing_addr_parsed, - Some(order_id), - &client, - &state - ) - .await - .is_ok()); + assert!( + handle_question( + &question_event, + TradeQuestion { + question_id: "q1".to_string(), + order_id: Some(order_id.to_string()), + listing_addr: Some(listing_addr.clone()), + question_text: "what".to_string(), + }, + &listing_addr_parsed, + Some(order_id), + &client, + &state + ) + .await + .is_ok() + ); let answer_event = make_event( &seller_keys, @@ -1975,21 +2106,23 @@ mod tests { Vec::new(), ); push_send_ok(); - assert!(handle_answer( - &answer_event, - TradeAnswer { - question_id: "q1".to_string(), - order_id: Some(order_id.to_string()), - listing_addr: Some(listing_addr.clone()), - answer_text: "ans".to_string(), - }, - &listing_addr_parsed, - Some(order_id), - &client, - &state - ) - .await - .is_ok()); + assert!( + handle_answer( + &answer_event, + TradeAnswer { + question_id: "q1".to_string(), + order_id: Some(order_id.to_string()), + listing_addr: Some(listing_addr.clone()), + answer_text: "ans".to_string(), + }, + &listing_addr_parsed, + Some(order_id), + &client, + &state + ) + .await + .is_ok() + ); let discount_request_event = make_event( &buyer_keys, @@ -1998,21 +2131,23 @@ mod tests { Vec::new(), ); push_send_ok(); - assert!(handle_discount_request( - &discount_request_event, - TradeDiscountRequest { - discount_id: "d1".to_string(), - order_id: order_id.to_string(), - value: sample_discount_value(), - conditions: None, - }, - &listing_addr_parsed, - Some(order_id), - &client, - &state - ) - .await - .is_ok()); + assert!( + handle_discount_request( + &discount_request_event, + TradeDiscountRequest { + discount_id: "d1".to_string(), + order_id: order_id.to_string(), + value: sample_discount_value(), + conditions: None, + }, + &listing_addr_parsed, + Some(order_id), + &client, + &state + ) + .await + .is_ok() + ); let discount_offer_event = make_event( &seller_keys, @@ -2021,21 +2156,23 @@ mod tests { Vec::new(), ); push_send_ok(); - assert!(handle_discount_offer( - &discount_offer_event, - TradeDiscountOffer { - discount_id: "d1".to_string(), - order_id: order_id.to_string(), - value: sample_discount_value(), - conditions: None, - }, - &listing_addr_parsed, - Some(order_id), - &client, - &state - ) - .await - .is_ok()); + assert!( + handle_discount_offer( + &discount_offer_event, + TradeDiscountOffer { + discount_id: "d1".to_string(), + order_id: order_id.to_string(), + value: sample_discount_value(), + conditions: None, + }, + &listing_addr_parsed, + Some(order_id), + &client, + &state + ) + .await + .is_ok() + ); let discount_accept_event = make_event( &buyer_keys, @@ -2044,40 +2181,54 @@ mod tests { Vec::new(), ); push_send_ok(); - assert!(handle_discount_decision( - &discount_accept_event, - TradeListingMessageType::DiscountAccept, - TradeDiscountDecision::Accept { - value: sample_discount_value(), - }, - &listing_addr_parsed, - Some(order_id), - &client, - &state - ) - .await - .is_ok()); - - state.lock().await.get_order_mut(order_id).expect("order").status = TradeOrderStatus::Requested; - let cancel_event = make_event( - &buyer_keys, - RadrootsNostrKind::Custom(KIND_TRADE_LISTING_CANCEL_REQ), - "cancel".to_string(), - Vec::new(), + assert!( + handle_discount_decision( + &discount_accept_event, + TradeListingMessageType::DiscountAccept, + TradeDiscountDecision::Accept { + value: sample_discount_value(), + }, + &listing_addr_parsed, + Some(order_id), + &client, + &state + ) + .await + .is_ok() + ); + + state + .lock() + .await + .get_order_mut(order_id) + .expect("order") + .status = TradeOrderStatus::Requested; + let cancel_event = make_event( + &buyer_keys, + RadrootsNostrKind::Custom(KIND_TRADE_LISTING_CANCEL_REQ), + "cancel".to_string(), + Vec::new(), ); push_send_ok(); - assert!(handle_cancel( - &cancel_event, - TradeListingCancel { reason: None }, - &listing_addr_parsed, - Some(order_id), - &client, - &state - ) - .await - .is_ok()); + assert!( + handle_cancel( + &cancel_event, + TradeListingCancel { reason: None }, + &listing_addr_parsed, + Some(order_id), + &client, + &state + ) + .await + .is_ok() + ); - state.lock().await.get_order_mut(order_id).expect("order").status = TradeOrderStatus::Accepted; + state + .lock() + .await + .get_order_mut(order_id) + .expect("order") + .status = TradeOrderStatus::Accepted; let fulfill_event = make_event( &seller_keys, RadrootsNostrKind::Custom(KIND_TRADE_LISTING_FULFILLMENT_UPDATE_REQ), @@ -2085,21 +2236,23 @@ mod tests { Vec::new(), ); push_send_ok(); - assert!(handle_fulfillment_update( - &fulfill_event, - TradeFulfillmentUpdate { - status: TradeFulfillmentStatus::Shipped, - tracking: None, - eta: None, - notes: None, - }, - &listing_addr_parsed, - Some(order_id), - &client, - &state - ) - .await - .is_ok()); + assert!( + handle_fulfillment_update( + &fulfill_event, + TradeFulfillmentUpdate { + status: TradeFulfillmentStatus::Shipped, + tracking: None, + eta: None, + notes: None, + }, + &listing_addr_parsed, + Some(order_id), + &client, + &state + ) + .await + .is_ok() + ); let receipt_event = make_event( &buyer_keys, @@ -2108,20 +2261,22 @@ mod tests { Vec::new(), ); push_send_ok(); - assert!(handle_receipt( - &receipt_event, - TradeReceipt { - acknowledged: true, - at: 1, - note: None, - }, - &listing_addr_parsed, - Some(order_id), - &client, - &state - ) - .await - .is_ok()); + assert!( + handle_receipt( + &receipt_event, + TradeReceipt { + acknowledged: true, + at: 1, + note: None, + }, + &listing_addr_parsed, + Some(order_id), + &client, + &state + ) + .await + .is_ok() + ); } #[tokio::test] @@ -2217,15 +2372,17 @@ mod tests { ), tags.clone(), ); - assert!(handle_event( - self_event, - tags.clone(), - rhi_keys.clone(), - client.clone(), - state.clone(), - ) - .await - .is_ok()); + assert!( + handle_event( + self_event, + tags.clone(), + rhi_keys.clone(), + client.clone(), + state.clone(), + ) + .await + .is_ok() + ); let kind_mismatch = make_event( &buyer_keys, @@ -2297,7 +2454,10 @@ mod tests { Err(TradeListingDvmError::TagMismatch("d")) )); - let bad_addr = format!("30403:{}:AAAAAAAAAAAAAAAAAAAAAA", seller_keys.public_key().to_hex()); + let bad_addr = format!( + "30403:{}:AAAAAAAAAAAAAAAAAAAAAA", + seller_keys.public_key().to_hex() + ); let bad_addr_tags = make_custom_tags(&rhi_pub, &bad_addr, Some(order_id)); let bad_addr_event = make_event( &buyer_keys, @@ -2323,10 +2483,22 @@ mod tests { )); let cases = vec![ - (TradeListingMessageType::ListingValidateRequest, KIND_TRADE_LISTING_VALIDATE_REQ), - (TradeListingMessageType::OrderRequest, KIND_TRADE_LISTING_ORDER_REQ), - (TradeListingMessageType::OrderResponse, KIND_TRADE_LISTING_ORDER_RES), - (TradeListingMessageType::OrderRevision, KIND_TRADE_LISTING_ORDER_REVISION_REQ), + ( + TradeListingMessageType::ListingValidateRequest, + KIND_TRADE_LISTING_VALIDATE_REQ, + ), + ( + TradeListingMessageType::OrderRequest, + KIND_TRADE_LISTING_ORDER_REQ, + ), + ( + TradeListingMessageType::OrderResponse, + KIND_TRADE_LISTING_ORDER_RES, + ), + ( + TradeListingMessageType::OrderRevision, + KIND_TRADE_LISTING_ORDER_REVISION_REQ, + ), ( TradeListingMessageType::OrderRevisionAccept, KIND_TRADE_LISTING_ORDER_REVISION_RES, @@ -2335,8 +2507,14 @@ mod tests { TradeListingMessageType::OrderRevisionDecline, KIND_TRADE_LISTING_ORDER_REVISION_RES, ), - (TradeListingMessageType::Question, KIND_TRADE_LISTING_QUESTION_REQ), - (TradeListingMessageType::Answer, KIND_TRADE_LISTING_ANSWER_RES), + ( + TradeListingMessageType::Question, + KIND_TRADE_LISTING_QUESTION_REQ, + ), + ( + TradeListingMessageType::Answer, + KIND_TRADE_LISTING_ANSWER_RES, + ), ( TradeListingMessageType::DiscountRequest, KIND_TRADE_LISTING_DISCOUNT_REQ, @@ -2353,12 +2531,18 @@ mod tests { TradeListingMessageType::DiscountDecline, KIND_TRADE_LISTING_DISCOUNT_DECLINE_REQ, ), - (TradeListingMessageType::Cancel, KIND_TRADE_LISTING_CANCEL_REQ), + ( + TradeListingMessageType::Cancel, + KIND_TRADE_LISTING_CANCEL_REQ, + ), ( TradeListingMessageType::FulfillmentUpdate, KIND_TRADE_LISTING_FULFILLMENT_UPDATE_REQ, ), - (TradeListingMessageType::Receipt, KIND_TRADE_LISTING_RECEIPT_REQ), + ( + TradeListingMessageType::Receipt, + KIND_TRADE_LISTING_RECEIPT_REQ, + ), ( TradeListingMessageType::ListingValidateResult, KIND_TRADE_LISTING_VALIDATE_RES, @@ -2371,7 +2555,12 @@ mod tests { push_send_ok(); } if message_type == TradeListingMessageType::Cancel { - state.lock().await.get_order_mut(order_id).expect("order").status = TradeOrderStatus::Requested; + state + .lock() + .await + .get_order_mut(order_id) + .expect("order") + .status = TradeOrderStatus::Requested; } let payload = if message_type == TradeListingMessageType::ListingValidateResult { json!({"valid": true, "errors": []}) @@ -2435,16 +2624,18 @@ mod tests { assert!(latest.is_some()); push_send_ok(); - assert!(send_envelope( - &client, - seller_keys.public_key().to_hex(), - TradeListingMessageType::ListingValidateResult, - &listing_addr_for_seller(&seller_keys), - None, - &json!({"valid":true,"errors":[]}), - ) - .await - .is_ok()); + assert!( + send_envelope( + &client, + seller_keys.public_key().to_hex(), + TradeListingMessageType::ListingValidateResult, + &listing_addr_for_seller(&seller_keys), + None, + &json!({"valid":true,"errors":[]}), + ) + .await + .is_ok() + ); let event = make_event( &seller_keys, @@ -2453,9 +2644,11 @@ mod tests { Vec::new(), ); push_send_ok(); - assert!(handle_error(TradeListingDvmError::UnsupportedKind, &event, &client) - .await - .is_ok()); + assert!( + handle_error(TradeListingDvmError::UnsupportedKind, &event, &client) + .await + .is_ok() + ); } #[tokio::test] @@ -2480,8 +2673,14 @@ mod tests { "listing".to_string(), Vec::new(), ); - push_fetch_events_ok(vec![wrong_kind.clone(), listing_event.clone(), listing_event.clone()]); - let fetched_listing = fetch_listing_by_addr(&client, &listing_addr).await.expect("listing fetch"); + push_fetch_events_ok(vec![ + wrong_kind.clone(), + listing_event.clone(), + listing_event.clone(), + ]); + let fetched_listing = fetch_listing_by_addr(&client, &listing_addr) + .await + .expect("listing fetch"); assert!(fetched_listing.is_some()); let wrong_custom = make_event( @@ -2547,9 +2746,15 @@ mod tests { let (rhi_keys, seller_keys, _) = make_keys(); let client = make_client(&rhi_keys); assert!(fetch_event_by_id_io(&client, "invalid-id").await.is_err()); - assert!(fetch_events_io(&client, RadrootsNostrFilter::new(), std::time::Duration::from_millis(1)) + assert!( + fetch_events_io( + &client, + RadrootsNostrFilter::new(), + std::time::Duration::from_millis(1) + ) .await - .is_err()); + .is_err() + ); let builder = radroots_nostr::prelude::radroots_nostr_build_event( KIND_TRADE_LISTING_ORDER_REQ as u32, "x", @@ -2607,7 +2812,12 @@ mod tests { ) .await; - let cases: Vec<(TradeListingMessageType, u16, serde_json::Value, TradeOrderStatus)> = vec![ + let cases: Vec<( + TradeListingMessageType, + u16, + serde_json::Value, + TradeOrderStatus, + )> = vec![ ( TradeListingMessageType::OrderRequest, KIND_TRADE_LISTING_ORDER_REQ, @@ -2777,17 +2987,17 @@ mod tests { | TradeListingMessageType::FulfillmentUpdate => &seller_keys, _ => &buyer_keys, }; - let content = make_envelope_content(message_type, &listing_addr, Some(order_id), payload); + let content = + make_envelope_content(message_type, &listing_addr, Some(order_id), payload); let tags = make_custom_tags(&rhi_pub, &listing_addr, Some(order_id)); - let event = make_event(sender, RadrootsNostrKind::Custom(kind), content, tags.clone()); - let _ = handle_event( - event, - tags, - rhi_keys.clone(), - client.clone(), - state.clone(), - ) - .await; + let event = make_event( + sender, + RadrootsNostrKind::Custom(kind), + content, + tags.clone(), + ); + let _ = + handle_event(event, tags, rhi_keys.clone(), client.clone(), state.clone()).await; } } @@ -2809,7 +3019,13 @@ mod tests { ) .await; - let bad_order = make_order("bad", &listing_addr, &buyer_pub, &seller_pub, TradeOrderStatus::Requested); + let bad_order = make_order( + "bad", + &listing_addr, + &buyer_pub, + &seller_pub, + TradeOrderStatus::Requested, + ); let event = make_event( &buyer_keys, RadrootsNostrKind::Custom(KIND_TRADE_LISTING_ORDER_REQ), @@ -2817,14 +3033,29 @@ mod tests { Vec::new(), ); assert!(matches!( - handle_order_request(&event, bad_order, &parsed, Some("order-1"), &client, &state).await, + handle_order_request(&event, bad_order, &parsed, Some("order-1"), &client, &state) + .await, Err(TradeListingDvmError::InvalidOrder) )); let missing_state = Arc::new(AsyncMutex::new(TradeListingState::default())); - let order = make_order("order-2", &listing_addr, &buyer_pub, &seller_pub, TradeOrderStatus::Requested); + let order = make_order( + "order-2", + &listing_addr, + &buyer_pub, + &seller_pub, + TradeOrderStatus::Requested, + ); assert!(matches!( - handle_order_request(&event, order, &parsed, Some("order-2"), &client, &missing_state).await, + handle_order_request( + &event, + order, + &parsed, + Some("order-2"), + &client, + &missing_state + ) + .await, Err(TradeListingDvmError::ListingNotValidated) )); @@ -2842,19 +3073,21 @@ mod tests { .expect("order") .seen_event_ids .insert(seller_event.id.to_string()); - assert!(handle_order_response( - &seller_event, - TradeOrderResponse { - accepted: true, - reason: None, - }, - &parsed, - Some("order-1"), - &client, - &state, - ) - .await - .is_ok()); + assert!( + handle_order_response( + &seller_event, + TradeOrderResponse { + accepted: true, + reason: None, + }, + &parsed, + Some("order-1"), + &client, + &state, + ) + .await + .is_ok() + ); let wrong_buyer = make_event( &seller_keys, @@ -2911,31 +3144,37 @@ mod tests { .fetch_event_by_id_results .push_back(Err(TradeListingDvmError::InvalidOrder)); push_send_ok(); - assert!(handle_listing_validate_request( - &validate_event, - TradeListingValidateRequest { - listing_event: Some(RadrootsNostrEventPtr { - id: "x".to_string(), - relays: None, - }), - }, - &listing_addr, - &client, - &state, - ) - .await - .is_ok()); + assert!( + handle_listing_validate_request( + &validate_event, + TradeListingValidateRequest { + listing_event: Some(RadrootsNostrEventPtr { + id: "x".to_string(), + relays: None, + }), + }, + &listing_addr, + &client, + &state, + ) + .await + .is_ok() + ); push_send_ok(); - assert!(handle_listing_validate_request( - &validate_event, - TradeListingValidateRequest { listing_event: None }, - "not-a-listing-addr", - &client, - &state, - ) - .await - .is_ok()); + assert!( + handle_listing_validate_request( + &validate_event, + TradeListingValidateRequest { + listing_event: None + }, + "not-a-listing-addr", + &client, + &state, + ) + .await + .is_ok() + ); dvm_test_hooks() .lock() @@ -2948,20 +3187,22 @@ mod tests { .validate_listing_results .push_back(Err(TradeListingValidationError::MissingInventory)); push_send_ok(); - assert!(handle_listing_validate_request( - &validate_event, - TradeListingValidateRequest { - listing_event: Some(RadrootsNostrEventPtr { - id: "x".to_string(), - relays: None, - }), - }, - &listing_addr, - &client, - &state, - ) - .await - .is_ok()); + assert!( + handle_listing_validate_request( + &validate_event, + TradeListingValidateRequest { + listing_event: Some(RadrootsNostrEventPtr { + id: "x".to_string(), + relays: None, + }), + }, + &listing_addr, + &client, + &state, + ) + .await + .is_ok() + ); let unauthorized_order = make_order( "order-3", @@ -2990,16 +3231,18 @@ mod tests { &seller_pub, TradeOrderStatus::Requested, ); - assert!(handle_order_request( - &event, - duplicate_order, - &parsed, - Some("order-1"), - &client, - &state, - ) - .await - .is_ok()); + assert!( + handle_order_request( + &event, + duplicate_order, + &parsed, + Some("order-1"), + &client, + &state, + ) + .await + .is_ok() + ); set_order_status(&state, "order-1", TradeOrderStatus::Requested).await; let buyer_event = make_event( @@ -3026,24 +3269,26 @@ mod tests { set_order_status(&state, "order-1", TradeOrderStatus::Requested).await; push_send_ok(); - assert!(handle_order_response( - &make_event( - &seller_keys, - RadrootsNostrKind::Custom(KIND_TRADE_LISTING_ORDER_RES), - "x".to_string(), - Vec::new(), - ), - TradeOrderResponse { - accepted: false, - reason: None, - }, - &parsed, - Some("order-1"), - &client, - &state, - ) - .await - .is_ok()); + assert!( + handle_order_response( + &make_event( + &seller_keys, + RadrootsNostrKind::Custom(KIND_TRADE_LISTING_ORDER_RES), + "x".to_string(), + Vec::new(), + ), + TradeOrderResponse { + accepted: false, + reason: None, + }, + &parsed, + Some("order-1"), + &client, + &state, + ) + .await + .is_ok() + ); set_order_status(&state, "order-1", TradeOrderStatus::Requested).await; assert!(matches!( @@ -3106,21 +3351,23 @@ mod tests { .expect("order") .seen_event_ids .insert(seen_event.id.to_string()); - assert!(handle_order_revision( - &seen_event, - TradeOrderRevision { - revision_id: "r4".to_string(), - order_id: "order-1".to_string(), - changes: Vec::new(), - reason: None, - }, - &parsed, - Some("order-1"), - &client, - &state, - ) - .await - .is_ok()); + assert!( + handle_order_revision( + &seen_event, + TradeOrderRevision { + revision_id: "r4".to_string(), + order_id: "order-1".to_string(), + changes: Vec::new(), + reason: None, + }, + &parsed, + Some("order-1"), + &client, + &state, + ) + .await + .is_ok() + ); set_order_status(&state, "order-1", TradeOrderStatus::Requested).await; assert!(matches!( @@ -3259,22 +3506,24 @@ mod tests { )); push_send_ok(); - assert!(handle_discount_decision( - &make_event( - &buyer_keys, - RadrootsNostrKind::Custom(KIND_TRADE_LISTING_DISCOUNT_DECLINE_REQ), - "x".to_string(), - Vec::new(), - ), - TradeListingMessageType::Cancel, - TradeDiscountDecision::Decline { reason: None }, - &parsed, - Some("order-1"), - &client, - &state, - ) - .await - .is_ok()); + assert!( + handle_discount_decision( + &make_event( + &buyer_keys, + RadrootsNostrKind::Custom(KIND_TRADE_LISTING_DISCOUNT_DECLINE_REQ), + "x".to_string(), + Vec::new(), + ), + TradeListingMessageType::Cancel, + TradeDiscountDecision::Decline { reason: None }, + &parsed, + Some("order-1"), + &client, + &state, + ) + .await + .is_ok() + ); set_order_status(&state, "order-1", TradeOrderStatus::Requested).await; let cancel_by_seller = make_event( @@ -3284,16 +3533,18 @@ mod tests { Vec::new(), ); push_send_ok(); - assert!(handle_cancel( - &cancel_by_seller, - TradeListingCancel { reason: None }, - &parsed, - Some("order-1"), - &client, - &state, - ) - .await - .is_ok()); + assert!( + handle_cancel( + &cancel_by_seller, + TradeListingCancel { reason: None }, + &parsed, + Some("order-1"), + &client, + &state, + ) + .await + .is_ok() + ); set_order_status(&state, "order-1", TradeOrderStatus::Accepted).await; assert!(matches!( @@ -3362,7 +3613,8 @@ mod tests { .await; let mismatched_addr = listing_addr_for_seller(&buyer_keys); - let mismatched_parsed = TradeListingAddress::parse(&mismatched_addr).expect("mismatched listing"); + let mismatched_parsed = + TradeListingAddress::parse(&mismatched_addr).expect("mismatched listing"); let revision_event = make_event( &seller_keys, RadrootsNostrKind::Custom(KIND_TRADE_LISTING_ORDER_REVISION_REQ), @@ -3394,20 +3646,22 @@ mod tests { Vec::new(), ); mark_event_seen(&state, "order-1", seen_revision_response.id.to_string()).await; - assert!(handle_order_revision_response( - &seen_revision_response, - TradeListingMessageType::OrderRevisionAccept, - TradeOrderRevisionResponse { - accepted: true, - reason: None, - }, - &parsed, - Some("order-1"), - &client, - &state, - ) - .await - .is_ok()); + assert!( + handle_order_revision_response( + &seen_revision_response, + TradeListingMessageType::OrderRevisionAccept, + TradeOrderRevisionResponse { + accepted: true, + reason: None, + }, + &parsed, + Some("order-1"), + &client, + &state, + ) + .await + .is_ok() + ); assert!(matches!( handle_order_revision_response( @@ -3454,26 +3708,28 @@ mod tests { set_order_status(&state, "order-1", TradeOrderStatus::Requested).await; push_send_ok(); - assert!(handle_question( - &make_event( - &buyer_keys, - RadrootsNostrKind::Custom(KIND_TRADE_LISTING_QUESTION_REQ), - "question-ok".to_string(), - Vec::new(), - ), - TradeQuestion { - question_id: "q1".to_string(), - order_id: None, - listing_addr: Some(listing_addr.clone()), - question_text: "question".to_string(), - }, - &parsed, - Some("order-1"), - &client, - &state, - ) - .await - .is_ok()); + assert!( + handle_question( + &make_event( + &buyer_keys, + RadrootsNostrKind::Custom(KIND_TRADE_LISTING_QUESTION_REQ), + "question-ok".to_string(), + Vec::new(), + ), + TradeQuestion { + question_id: "q1".to_string(), + order_id: None, + listing_addr: Some(listing_addr.clone()), + question_text: "question".to_string(), + }, + &parsed, + Some("order-1"), + &client, + &state, + ) + .await + .is_ok() + ); let seen_question = make_event( &buyer_keys, @@ -3482,21 +3738,23 @@ mod tests { Vec::new(), ); mark_event_seen(&state, "order-1", seen_question.id.to_string()).await; - assert!(handle_question( - &seen_question, - TradeQuestion { - question_id: "q2".to_string(), - order_id: Some("order-1".to_string()), - listing_addr: None, - question_text: "question".to_string(), - }, - &parsed, - Some("order-1"), - &client, - &state, - ) - .await - .is_ok()); + assert!( + handle_question( + &seen_question, + TradeQuestion { + question_id: "q2".to_string(), + order_id: Some("order-1".to_string()), + listing_addr: None, + question_text: "question".to_string(), + }, + &parsed, + Some("order-1"), + &client, + &state, + ) + .await + .is_ok() + ); assert!(matches!( handle_question( &make_event( @@ -3522,26 +3780,28 @@ mod tests { set_order_status(&state, "order-1", TradeOrderStatus::Questioned).await; push_send_ok(); - assert!(handle_answer( - &make_event( - &seller_keys, - RadrootsNostrKind::Custom(KIND_TRADE_LISTING_ANSWER_RES), - "answer-ok".to_string(), - Vec::new(), - ), - TradeAnswer { - question_id: "q1".to_string(), - order_id: None, - listing_addr: Some(listing_addr.clone()), - answer_text: "answer".to_string(), - }, - &parsed, - Some("order-1"), - &client, - &state, - ) - .await - .is_ok()); + assert!( + handle_answer( + &make_event( + &seller_keys, + RadrootsNostrKind::Custom(KIND_TRADE_LISTING_ANSWER_RES), + "answer-ok".to_string(), + Vec::new(), + ), + TradeAnswer { + question_id: "q1".to_string(), + order_id: None, + listing_addr: Some(listing_addr.clone()), + answer_text: "answer".to_string(), + }, + &parsed, + Some("order-1"), + &client, + &state, + ) + .await + .is_ok() + ); let seen_answer = make_event( &seller_keys, @@ -3550,21 +3810,23 @@ mod tests { Vec::new(), ); mark_event_seen(&state, "order-1", seen_answer.id.to_string()).await; - assert!(handle_answer( - &seen_answer, - TradeAnswer { - question_id: "q1".to_string(), - order_id: Some("order-1".to_string()), - listing_addr: None, - answer_text: "answer".to_string(), - }, - &parsed, - Some("order-1"), - &client, - &state, - ) - .await - .is_ok()); + assert!( + handle_answer( + &seen_answer, + TradeAnswer { + question_id: "q1".to_string(), + order_id: Some("order-1".to_string()), + listing_addr: None, + answer_text: "answer".to_string(), + }, + &parsed, + Some("order-1"), + &client, + &state, + ) + .await + .is_ok() + ); assert!(matches!( handle_answer( &make_event( @@ -3596,21 +3858,23 @@ mod tests { Vec::new(), ); mark_event_seen(&state, "order-1", seen_discount_request.id.to_string()).await; - assert!(handle_discount_request( - &seen_discount_request, - TradeDiscountRequest { - discount_id: "d1".to_string(), - order_id: "order-1".to_string(), - value: sample_discount_value(), - conditions: None, - }, - &parsed, - Some("order-1"), - &client, - &state, - ) - .await - .is_ok()); + assert!( + handle_discount_request( + &seen_discount_request, + TradeDiscountRequest { + discount_id: "d1".to_string(), + order_id: "order-1".to_string(), + value: sample_discount_value(), + conditions: None, + }, + &parsed, + Some("order-1"), + &client, + &state, + ) + .await + .is_ok() + ); assert!(matches!( handle_discount_request( &make_event( @@ -3642,21 +3906,23 @@ mod tests { Vec::new(), ); mark_event_seen(&state, "order-1", seen_discount_offer.id.to_string()).await; - assert!(handle_discount_offer( - &seen_discount_offer, - TradeDiscountOffer { - discount_id: "d1".to_string(), - order_id: "order-1".to_string(), - value: sample_discount_value(), - conditions: None, - }, - &parsed, - Some("order-1"), - &client, - &state, - ) - .await - .is_ok()); + assert!( + handle_discount_offer( + &seen_discount_offer, + TradeDiscountOffer { + discount_id: "d1".to_string(), + order_id: "order-1".to_string(), + value: sample_discount_value(), + conditions: None, + }, + &parsed, + Some("order-1"), + &client, + &state, + ) + .await + .is_ok() + ); assert!(matches!( handle_discount_offer( &make_event( @@ -3688,17 +3954,19 @@ mod tests { Vec::new(), ); mark_event_seen(&state, "order-1", seen_discount_decision.id.to_string()).await; - assert!(handle_discount_decision( - &seen_discount_decision, - TradeListingMessageType::DiscountDecline, - TradeDiscountDecision::Decline { reason: None }, - &parsed, - Some("order-1"), - &client, - &state, - ) - .await - .is_ok()); + assert!( + handle_discount_decision( + &seen_discount_decision, + TradeListingMessageType::DiscountDecline, + TradeDiscountDecision::Decline { reason: None }, + &parsed, + Some("order-1"), + &client, + &state, + ) + .await + .is_ok() + ); assert!(matches!( handle_discount_decision( &make_event( @@ -3726,16 +3994,18 @@ mod tests { Vec::new(), ); mark_event_seen(&state, "order-1", seen_cancel.id.to_string()).await; - assert!(handle_cancel( - &seen_cancel, - TradeListingCancel { reason: None }, - &parsed, - Some("order-1"), - &client, - &state, - ) - .await - .is_ok()); + assert!( + handle_cancel( + &seen_cancel, + TradeListingCancel { reason: None }, + &parsed, + Some("order-1"), + &client, + &state, + ) + .await + .is_ok() + ); set_order_status(&state, "order-1", TradeOrderStatus::Accepted).await; let seen_fulfillment = make_event( @@ -3745,21 +4015,23 @@ mod tests { Vec::new(), ); mark_event_seen(&state, "order-1", seen_fulfillment.id.to_string()).await; - assert!(handle_fulfillment_update( - &seen_fulfillment, - TradeFulfillmentUpdate { - status: TradeFulfillmentStatus::Shipped, - tracking: None, - eta: None, - notes: None, - }, - &parsed, - Some("order-1"), - &client, - &state, - ) - .await - .is_ok()); + assert!( + handle_fulfillment_update( + &seen_fulfillment, + TradeFulfillmentUpdate { + status: TradeFulfillmentStatus::Shipped, + tracking: None, + eta: None, + notes: None, + }, + &parsed, + Some("order-1"), + &client, + &state, + ) + .await + .is_ok() + ); set_order_status(&state, "order-1", TradeOrderStatus::Fulfilled).await; let seen_receipt = make_event( @@ -3769,20 +4041,22 @@ mod tests { Vec::new(), ); mark_event_seen(&state, "order-1", seen_receipt.id.to_string()).await; - assert!(handle_receipt( - &seen_receipt, - TradeReceipt { - acknowledged: true, - at: 1, - note: None, - }, - &parsed, - Some("order-1"), - &client, - &state, - ) - .await - .is_ok()); + assert!( + handle_receipt( + &seen_receipt, + TradeReceipt { + acknowledged: true, + at: 1, + note: None, + }, + &parsed, + Some("order-1"), + &client, + &state, + ) + .await + .is_ok() + ); } #[tokio::test] @@ -3807,26 +4081,31 @@ mod tests { .expect("hooks") .fetch_event_by_id_results .push_back(Ok(validate_event.clone())); - push_validate_listing_ok(RadrootsListingFarmRef { - pubkey: seller_keys.public_key().to_hex(), - d_tag: "farmtag".to_string(), - }); + push_validate_listing_ok( + listing_addr.clone(), + RadrootsListingFarmRef { + pubkey: seller_keys.public_key().to_hex(), + d_tag: "farmtag".to_string(), + }, + ); push_farm_validation_result(Ok(vec![TradeListingValidationError::MissingFarmRecord])); push_send_ok(); - assert!(handle_listing_validate_request( - &validate_event, - TradeListingValidateRequest { - listing_event: Some(RadrootsNostrEventPtr { - id: "x".to_string(), - relays: None, - }), - }, - &listing_addr, - &client, - &state_validate, - ) - .await - .is_ok()); + assert!( + handle_listing_validate_request( + &validate_event, + TradeListingValidateRequest { + listing_event: Some(RadrootsNostrEventPtr { + id: "x".to_string(), + relays: None, + }), + }, + &listing_addr, + &client, + &state_validate, + ) + .await + .is_ok() + ); let state = state_with_order( &listing_addr, @@ -3935,32 +4214,32 @@ mod tests { Err(TradeListingDvmError::Unauthorized) )); - let listing_event_new = RadrootsNostrEventBuilder::new( - RadrootsNostrKind::Custom(parsed.kind), - "listing-new", - ) - .custom_created_at(RadrootsNostrTimestamp::from(10_u64)) - .sign_with_keys(&seller_keys) - .expect("listing new"); - let listing_event_old = RadrootsNostrEventBuilder::new( - RadrootsNostrKind::Custom(parsed.kind), - "listing-old", - ) - .custom_created_at(RadrootsNostrTimestamp::from(9_u64)) - .sign_with_keys(&seller_keys) - .expect("listing old"); + let listing_event_new = + RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(parsed.kind), "listing-new") + .custom_created_at(RadrootsNostrTimestamp::from(10_u64)) + .sign_with_keys(&seller_keys) + .expect("listing new"); + let listing_event_old = + RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(parsed.kind), "listing-old") + .custom_created_at(RadrootsNostrTimestamp::from(9_u64)) + .sign_with_keys(&seller_keys) + .expect("listing old"); push_fetch_events_ok(vec![listing_event_new, listing_event_old]); - let fetched_listing = fetch_listing_by_addr(&client, &listing_addr).await.expect("listing fetch"); + let fetched_listing = fetch_listing_by_addr(&client, &listing_addr) + .await + .expect("listing fetch"); assert!(fetched_listing.is_some()); - let metadata_event_new = RadrootsNostrEventBuilder::new(RadrootsNostrKind::Metadata, "metadata-new") - .custom_created_at(RadrootsNostrTimestamp::from(20_u64)) - .sign_with_keys(&seller_keys) - .expect("metadata new"); - let metadata_event_old = RadrootsNostrEventBuilder::new(RadrootsNostrKind::Metadata, "metadata-old") - .custom_created_at(RadrootsNostrTimestamp::from(19_u64)) - .sign_with_keys(&seller_keys) - .expect("metadata old"); + let metadata_event_new = + RadrootsNostrEventBuilder::new(RadrootsNostrKind::Metadata, "metadata-new") + .custom_created_at(RadrootsNostrTimestamp::from(20_u64)) + .sign_with_keys(&seller_keys) + .expect("metadata new"); + let metadata_event_old = + RadrootsNostrEventBuilder::new(RadrootsNostrKind::Metadata, "metadata-old") + .custom_created_at(RadrootsNostrTimestamp::from(19_u64)) + .sign_with_keys(&seller_keys) + .expect("metadata old"); push_fetch_events_ok(vec![metadata_event_new, metadata_event_old]); let latest_metadata = fetch_latest_event_by_kind( &client, @@ -4006,7 +4285,10 @@ mod tests { state.clone(), ) .await; - assert!(matches!(invalid_json_result, Err(TradeListingDvmError::Serde(_)))); + assert!(matches!( + invalid_json_result, + Err(TradeListingDvmError::Serde(_)) + )); let invalid_envelope_event = make_event( &buyer_keys, @@ -4170,8 +4452,14 @@ mod tests { )); let missing_d_cases: Vec<(TradeListingMessageType, u16)> = vec![ - (TradeListingMessageType::OrderRequest, KIND_TRADE_LISTING_ORDER_REQ), - (TradeListingMessageType::OrderResponse, KIND_TRADE_LISTING_ORDER_RES), + ( + TradeListingMessageType::OrderRequest, + KIND_TRADE_LISTING_ORDER_REQ, + ), + ( + TradeListingMessageType::OrderResponse, + KIND_TRADE_LISTING_ORDER_RES, + ), ( TradeListingMessageType::OrderRevision, KIND_TRADE_LISTING_ORDER_REVISION_REQ, @@ -4184,8 +4472,14 @@ mod tests { TradeListingMessageType::OrderRevisionDecline, KIND_TRADE_LISTING_ORDER_REVISION_RES, ), - (TradeListingMessageType::Question, KIND_TRADE_LISTING_QUESTION_REQ), - (TradeListingMessageType::Answer, KIND_TRADE_LISTING_ANSWER_RES), + ( + TradeListingMessageType::Question, + KIND_TRADE_LISTING_QUESTION_REQ, + ), + ( + TradeListingMessageType::Answer, + KIND_TRADE_LISTING_ANSWER_RES, + ), ( TradeListingMessageType::DiscountRequest, KIND_TRADE_LISTING_DISCOUNT_REQ, @@ -4202,12 +4496,18 @@ mod tests { TradeListingMessageType::DiscountDecline, KIND_TRADE_LISTING_DISCOUNT_DECLINE_REQ, ), - (TradeListingMessageType::Cancel, KIND_TRADE_LISTING_CANCEL_REQ), + ( + TradeListingMessageType::Cancel, + KIND_TRADE_LISTING_CANCEL_REQ, + ), ( TradeListingMessageType::FulfillmentUpdate, KIND_TRADE_LISTING_FULFILLMENT_UPDATE_REQ, ), - (TradeListingMessageType::Receipt, KIND_TRADE_LISTING_RECEIPT_REQ), + ( + TradeListingMessageType::Receipt, + KIND_TRADE_LISTING_RECEIPT_REQ, + ), ]; for (message_type, kind) in missing_d_cases { let sender = sender_for_message(message_type, &seller_keys, &buyer_keys); @@ -4236,14 +4536,14 @@ mod tests { state.clone(), ) .await; - assert!(matches!( - result, - Err(TradeListingDvmError::MissingTag("d")) - )); + assert!(matches!(result, Err(TradeListingDvmError::MissingTag("d")))); } let missing_order_cases: Vec<(TradeListingMessageType, u16)> = vec![ - (TradeListingMessageType::OrderResponse, KIND_TRADE_LISTING_ORDER_RES), + ( + TradeListingMessageType::OrderResponse, + KIND_TRADE_LISTING_ORDER_RES, + ), ( TradeListingMessageType::OrderRevision, KIND_TRADE_LISTING_ORDER_REVISION_REQ, @@ -4256,8 +4556,14 @@ mod tests { TradeListingMessageType::OrderRevisionDecline, KIND_TRADE_LISTING_ORDER_REVISION_RES, ), - (TradeListingMessageType::Question, KIND_TRADE_LISTING_QUESTION_REQ), - (TradeListingMessageType::Answer, KIND_TRADE_LISTING_ANSWER_RES), + ( + TradeListingMessageType::Question, + KIND_TRADE_LISTING_QUESTION_REQ, + ), + ( + TradeListingMessageType::Answer, + KIND_TRADE_LISTING_ANSWER_RES, + ), ( TradeListingMessageType::DiscountRequest, KIND_TRADE_LISTING_DISCOUNT_REQ, @@ -4274,12 +4580,18 @@ mod tests { TradeListingMessageType::DiscountDecline, KIND_TRADE_LISTING_DISCOUNT_DECLINE_REQ, ), - (TradeListingMessageType::Cancel, KIND_TRADE_LISTING_CANCEL_REQ), + ( + TradeListingMessageType::Cancel, + KIND_TRADE_LISTING_CANCEL_REQ, + ), ( TradeListingMessageType::FulfillmentUpdate, KIND_TRADE_LISTING_FULFILLMENT_UPDATE_REQ, ), - (TradeListingMessageType::Receipt, KIND_TRADE_LISTING_RECEIPT_REQ), + ( + TradeListingMessageType::Receipt, + KIND_TRADE_LISTING_RECEIPT_REQ, + ), ]; for (message_type, kind) in missing_order_cases { let sender = sender_for_message(message_type, &seller_keys, &buyer_keys); @@ -4310,7 +4622,9 @@ mod tests { .await; assert!(matches!( result, - Err(TradeListingDvmError::State(TradeListingStateError::MissingOrder)) + Err(TradeListingDvmError::State( + TradeListingStateError::MissingOrder + )) )); } @@ -4406,7 +4720,9 @@ mod tests { .await; assert!(matches!( result, - Err(TradeListingDvmError::State(TradeListingStateError::InvalidTransition { .. })) + Err(TradeListingDvmError::State( + TradeListingStateError::InvalidTransition { .. } + )) )); } } diff --git a/src/features/trade_listing/state.rs b/src/features/trade_listing/state.rs @@ -1,8 +1,12 @@ #![forbid(unsafe_code)] use std::collections::{HashMap, HashSet}; +use std::sync::Arc; use radroots_trade::listing::order::TradeOrderStatus; +use tokio::sync::Mutex; + +pub type SharedTradeListingState = Arc<Mutex<TradeListingState>>; #[derive(Clone, Debug)] pub struct TradeOrderState { @@ -20,6 +24,29 @@ pub struct TradeListingState { orders: HashMap<String, TradeOrderState>, } +#[derive(Clone, Debug)] +pub struct TradeListingRuntime { + state: SharedTradeListingState, +} + +impl Default for TradeListingRuntime { + fn default() -> Self { + Self { + state: Arc::new(Mutex::new(TradeListingState::default())), + } + } +} + +impl TradeListingRuntime { + pub fn new() -> Self { + Self::default() + } + + pub fn state(&self) -> SharedTradeListingState { + Arc::clone(&self.state) + } +} + impl TradeListingState { pub fn mark_listing_validated(&mut self, listing_addr: &str) { self.validated_listings.insert(listing_addr.to_string()); @@ -60,7 +87,10 @@ impl TradeListingState { #[derive(Debug, Clone, PartialEq, Eq)] pub enum TradeListingStateError { MissingOrder, - InvalidTransition { from: TradeOrderStatus, to: TradeOrderStatus }, + InvalidTransition { + from: TradeOrderStatus, + to: TradeOrderStatus, + }, } impl core::fmt::Display for TradeListingStateError { @@ -79,7 +109,7 @@ impl std::error::Error for TradeListingStateError {} #[cfg(test)] #[cfg_attr(coverage_nightly, coverage(off))] mod tests { - use super::{TradeListingState, TradeListingStateError, TradeOrderState}; + use super::{TradeListingRuntime, TradeListingState, TradeListingStateError, TradeOrderState}; use radroots_trade::listing::order::TradeOrderStatus; #[test] @@ -125,4 +155,13 @@ mod tests { "invalid order transition: Requested -> Completed" ); } + + #[tokio::test] + async fn runtime_reuses_shared_trade_listing_state() { + let runtime = TradeListingRuntime::new(); + let state = runtime.state(); + state.lock().await.mark_listing_validated("addr"); + + assert!(runtime.state().lock().await.is_listing_validated("addr")); + } } diff --git a/src/features/trade_listing/subscriber.rs b/src/features/trade_listing/subscriber.rs @@ -1,19 +1,13 @@ #![forbid(unsafe_code)] #![cfg_attr(coverage_nightly, coverage(off))] -use std::{sync::Arc, time::Duration}; +use std::time::Duration; -use anyhow::{anyhow, Result}; +use anyhow::{Result, anyhow}; use radroots_nostr::prelude::{ - radroots_nostr_filter_new_events, - radroots_nostr_tags_resolve, - RadrootsNostrClient, - RadrootsNostrEvent, - RadrootsNostrFilter, - RadrootsNostrKind, - RadrootsNostrKeys, - RadrootsNostrRelayPoolNotification, - RadrootsNostrTag, + RadrootsNostrClient, RadrootsNostrEvent, RadrootsNostrFilter, RadrootsNostrKeys, + RadrootsNostrKind, RadrootsNostrRelayPoolNotification, RadrootsNostrTag, + radroots_nostr_filter_new_events, radroots_nostr_tags_resolve, }; use tokio::sync::watch; use tokio::time::sleep; @@ -22,8 +16,8 @@ use tracing::{info, warn}; use radroots_trade::listing::kinds::TRADE_LISTING_KINDS; use crate::features::trade_listing::{ - handlers::dvm::{handle_error, handle_event, TradeListingDvmError}, - state::TradeListingState, + handlers::dvm::{TradeListingDvmError, handle_error, handle_event}, + state::SharedTradeListingState, }; #[cfg(test)] @@ -32,10 +26,7 @@ struct SubscriberTestHooks { notifications: std::collections::VecDeque<Result<RadrootsNostrRelayPoolNotification, ()>>, delay_before_event_handle: std::collections::VecDeque<bool>, resolve_tags_results: std::collections::VecDeque< - Result< - Vec<RadrootsNostrTag>, - radroots_nostr::error::RadrootsNostrTagsResolveError, - >, + Result<Vec<RadrootsNostrTag>, radroots_nostr::error::RadrootsNostrTagsResolveError>, >, handle_event_results: std::collections::VecDeque<Result<(), TradeListingDvmError>>, handle_error_results: std::collections::VecDeque<Result<(), TradeListingDvmError>>, @@ -80,10 +71,8 @@ fn take_delay_hook() -> Option<bool> { } #[cfg(test)] -fn pop_resolve_tags_hook( -) -> Option< - Result<Vec<RadrootsNostrTag>, radroots_nostr::error::RadrootsNostrTagsResolveError>, -> { +fn pop_resolve_tags_hook() +-> Option<Result<Vec<RadrootsNostrTag>, radroots_nostr::error::RadrootsNostrTagsResolveError>> { subscriber_test_hooks() .lock() .unwrap_or_else(std::sync::PoisonError::into_inner) @@ -92,19 +81,15 @@ fn pop_resolve_tags_hook( } #[cfg(test)] -fn take_resolve_tags_hook( -) -> Option< - Result<Vec<RadrootsNostrTag>, radroots_nostr::error::RadrootsNostrTagsResolveError>, -> { +fn take_resolve_tags_hook() +-> Option<Result<Vec<RadrootsNostrTag>, radroots_nostr::error::RadrootsNostrTagsResolveError>> { pop_resolve_tags_hook() } #[cfg(not(test))] #[cfg_attr(coverage_nightly, coverage(off))] -fn take_resolve_tags_hook( -) -> Option< - Result<Vec<RadrootsNostrTag>, radroots_nostr::error::RadrootsNostrTagsResolveError>, -> { +fn take_resolve_tags_hook() +-> Option<Result<Vec<RadrootsNostrTag>, radroots_nostr::error::RadrootsNostrTagsResolveError>> { None } @@ -160,10 +145,7 @@ fn resolve_tags_io( } fn map_notification_recv_result( - result: Result< - RadrootsNostrRelayPoolNotification, - tokio::sync::broadcast::error::RecvError, - >, + result: Result<RadrootsNostrRelayPoolNotification, tokio::sync::broadcast::error::RecvError>, ) -> Result<RadrootsNostrRelayPoolNotification, ()> { result.map_err(|_| ()) } @@ -173,7 +155,7 @@ async fn handle_event_io( resolved_tags: Vec<RadrootsNostrTag>, keys: RadrootsNostrKeys, client: RadrootsNostrClient, - state: Arc<tokio::sync::Mutex<TradeListingState>>, + state: SharedTradeListingState, ) -> Result<(), TradeListingDvmError> { let result = match take_handle_event_hook() { Some(result) => result, @@ -208,7 +190,7 @@ async fn process_event_notification( event: RadrootsNostrEvent, keys: RadrootsNostrKeys, client: RadrootsNostrClient, - state: Arc<tokio::sync::Mutex<TradeListingState>>, + state: SharedTradeListingState, ) { if should_delay_before_event_handle() { sleep(Duration::from_millis(200)).await; @@ -222,7 +204,9 @@ async fn process_event_notification( } }; - if let Err(err) = handle_event_io(event.clone(), resolved_tags, keys, client.clone(), state).await { + if let Err(err) = + handle_event_io(event.clone(), resolved_tags, keys, client.clone(), state).await + { match err { TradeListingDvmError::MissingRecipient | TradeListingDvmError::UnsupportedKind => {} other => { @@ -234,32 +218,19 @@ async fn process_event_notification( } } -#[cfg(test)] async fn dispatch_event_processing( event: RadrootsNostrEvent, keys: RadrootsNostrKeys, client: RadrootsNostrClient, - state: Arc<tokio::sync::Mutex<TradeListingState>>, + state: SharedTradeListingState, ) { process_event_notification(event, keys, client, state).await; } -#[cfg(not(test))] -#[cfg_attr(coverage_nightly, coverage(off))] -async fn dispatch_event_processing( - event: RadrootsNostrEvent, - keys: RadrootsNostrKeys, - client: RadrootsNostrClient, - state: Arc<tokio::sync::Mutex<TradeListingState>>, -) { - tokio::spawn(async move { - process_event_notification(event, keys, client, state).await; - }); -} - pub async fn subscriber( client: RadrootsNostrClient, keys: RadrootsNostrKeys, + state: SharedTradeListingState, mut stop_rx: watch::Receiver<bool>, ) -> Result<()> { info!( @@ -278,8 +249,6 @@ pub async fn subscriber( } let subscription = client.subscribe(filter, None).await?; - - let state = Arc::new(tokio::sync::Mutex::new(TradeListingState::default())); let mut notifications = client.notifications(); let mut notifications_closed = false; @@ -308,7 +277,7 @@ pub async fn subscriber( let event = (*event).clone(); let keys = keys.clone(); let client = client.clone(); - let state = Arc::clone(&state); + let state = state.clone(); dispatch_event_processing(event, keys, client, state).await; } } @@ -330,7 +299,7 @@ mod tests { process_event_notification, resolve_tags_io, subscriber, subscriber_test_hooks, }; use crate::features::trade_listing::handlers::dvm::TradeListingDvmError; - use crate::features::trade_listing::state::TradeListingState; + use crate::features::trade_listing::state::{SharedTradeListingState, TradeListingState}; use radroots_nostr::error::RadrootsNostrTagsResolveError; use radroots_nostr::prelude::{ RadrootsNostrClient, RadrootsNostrEventBuilder, RadrootsNostrKeys, RadrootsNostrKind, @@ -343,7 +312,9 @@ mod tests { static TEST_LOCK: Mutex<()> = Mutex::new(()); fn test_guard() -> MutexGuard<'static, ()> { - let guard = TEST_LOCK.lock().unwrap_or_else(std::sync::PoisonError::into_inner); + let guard = TEST_LOCK + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); *subscriber_test_hooks() .lock() .unwrap_or_else(std::sync::PoisonError::into_inner) = SubscriberTestHooks::default(); @@ -365,14 +336,18 @@ mod tests { RadrootsNostrRelayPoolNotification::Shutdown } + fn shared_state() -> SharedTradeListingState { + Arc::new(tokio::sync::Mutex::new(TradeListingState::default())) + } + #[test] fn notification_recv_result_mapping_covers_ok_and_err() { let keys = RadrootsNostrKeys::generate(); assert!(map_notification_recv_result(Ok(scripted_event_notification(&keys))).is_ok()); - assert!(map_notification_recv_result(Err( - tokio::sync::broadcast::error::RecvError::Closed - )) - .is_err()); + assert!( + map_notification_recv_result(Err(tokio::sync::broadcast::error::RecvError::Closed)) + .is_err() + ); } #[tokio::test] @@ -392,7 +367,7 @@ mod tests { .push_back(Ok(Vec::<RadrootsNostrTag>::new())); assert!(resolve_tags_io(&event, &keys).is_ok()); - let state = Arc::new(tokio::sync::Mutex::new(TradeListingState::default())); + let state = shared_state(); assert!(matches!( handle_event_io( event.clone(), @@ -409,9 +384,17 @@ mod tests { .unwrap_or_else(std::sync::PoisonError::into_inner) .handle_event_results .push_back(Ok(())); - assert!(handle_event_io(event.clone(), Vec::new(), keys.clone(), client.clone(), state) + assert!( + handle_event_io( + event.clone(), + Vec::new(), + keys.clone(), + client.clone(), + state + ) .await - .is_ok()); + .is_ok() + ); let _ = handle_error_io(TradeListingDvmError::InvalidOrder, &event, &client).await; subscriber_test_hooks() @@ -419,9 +402,11 @@ mod tests { .unwrap_or_else(std::sync::PoisonError::into_inner) .handle_error_results .push_back(Ok(())); - assert!(handle_error_io(TradeListingDvmError::InvalidOrder, &event, &client) - .await - .is_ok()); + assert!( + handle_error_io(TradeListingDvmError::InvalidOrder, &event, &client) + .await + .is_ok() + ); } #[tokio::test] @@ -430,7 +415,32 @@ mod tests { let keys = RadrootsNostrKeys::generate(); let client = RadrootsNostrClient::new(keys.clone()); let (_tx, rx) = watch::channel(true); - assert!(subscriber(client, keys, rx).await.is_ok()); + assert!(subscriber(client, keys, shared_state(), rx).await.is_ok()); + } + + #[tokio::test] + async fn subscriber_reuses_runtime_owned_state_across_runs() { + let _guard = test_guard(); + let keys = RadrootsNostrKeys::generate(); + let client = RadrootsNostrClient::new(keys.clone()); + let state = shared_state(); + state.lock().await.mark_listing_validated("addr"); + + let (_tx_first, rx_first) = watch::channel(true); + assert!( + subscriber(client.clone(), keys.clone(), state.clone(), rx_first) + .await + .is_ok() + ); + assert!(state.lock().await.is_listing_validated("addr")); + + let (_tx_second, rx_second) = watch::channel(true); + assert!( + subscriber(client, keys, state.clone(), rx_second) + .await + .is_ok() + ); + assert!(state.lock().await.is_listing_validated("addr")); } #[tokio::test] @@ -439,7 +449,9 @@ mod tests { let keys = RadrootsNostrKeys::generate(); let client = RadrootsNostrClient::new(keys.clone()); let (_tx, rx) = watch::channel(false); - let err = subscriber(client, keys, rx).await.expect_err("expected relay error"); + let err = subscriber(client, keys, shared_state(), rx) + .await + .expect_err("expected relay error"); let msg = format!("{err:#}"); assert!(msg.contains("relay")); } @@ -451,7 +463,7 @@ mod tests { let client = RadrootsNostrClient::new(keys.clone()); let _ = client.add_relay("wss://relay.example.com").await; let (tx, rx) = watch::channel(false); - let join = tokio::spawn(subscriber(client, keys, rx)); + let join = tokio::spawn(subscriber(client, keys, shared_state(), rx)); tokio::time::sleep(std::time::Duration::from_millis(20)).await; let _ = tx.send(true); let _ = join.await; @@ -469,7 +481,9 @@ mod tests { .notifications .push_back(Err(())); let (_tx, rx) = watch::channel(false); - let err = subscriber(client, keys, rx).await.expect_err("closed notifications"); + let err = subscriber(client, keys, shared_state(), rx) + .await + .expect_err("closed notifications"); let msg = format!("{err:#}"); assert!(msg.contains("notifications closed")); } @@ -488,7 +502,7 @@ mod tests { .push_back(Ok(scripted_shutdown_notification())); let (tx, rx) = watch::channel(false); - let join = tokio::spawn(subscriber(client, keys, rx)); + let join = tokio::spawn(subscriber(client, keys, shared_state(), rx)); tokio::time::sleep(std::time::Duration::from_millis(30)).await; let _ = tx.send(true); let result = join.await.expect("subscriber join"); @@ -496,7 +510,7 @@ mod tests { } #[tokio::test] - async fn subscriber_covers_event_spawn_paths() { + async fn subscriber_covers_event_processing_paths() { let _guard = test_guard(); let keys = RadrootsNostrKeys::generate(); let client = RadrootsNostrClient::new(keys.clone()); @@ -514,11 +528,10 @@ mod tests { "resolve-failed".to_string(), ))); let (tx, rx) = watch::channel(false); - let join = tokio::spawn(subscriber(client, keys, rx)); + let join = tokio::spawn(subscriber(client, keys, shared_state(), rx)); tokio::time::sleep(std::time::Duration::from_millis(30)).await; let _ = tx.send(true); let _ = join.await; - tokio::time::sleep(std::time::Duration::from_millis(50)).await; } #[tokio::test] @@ -551,11 +564,10 @@ mod tests { drop(hooks); let (tx, rx) = watch::channel(false); - let join = tokio::spawn(subscriber(client, keys, rx)); + let join = tokio::spawn(subscriber(client, keys, shared_state(), rx)); tokio::time::sleep(std::time::Duration::from_millis(40)).await; let _ = tx.send(true); let _ = join.await; - tokio::time::sleep(std::time::Duration::from_millis(50)).await; } #[tokio::test] @@ -583,10 +595,11 @@ mod tests { drop(hooks); let (_tx, rx) = watch::channel(false); - let err = subscriber(client, keys, rx).await.expect_err("notifications closed"); + let err = subscriber(client, keys, shared_state(), rx) + .await + .expect_err("notifications closed"); let msg = format!("{err:#}"); assert!(msg.contains("notifications closed")); - tokio::time::sleep(std::time::Duration::from_millis(250)).await; } #[tokio::test] @@ -625,7 +638,7 @@ mod tests { let event_err = RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(6000), "err") .sign_with_keys(&keys) .expect("event err"); - let state = Arc::new(tokio::sync::Mutex::new(TradeListingState::default())); + let state = shared_state(); let mut hooks = subscriber_test_hooks() .lock() diff --git a/src/lib.rs b/src/lib.rs @@ -26,9 +26,8 @@ static RUN_RHI_SKIP_SUBSCRIBER: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false); #[cfg(test)] -static RUN_RHI_BOOTSTRAP_HOOK: std::sync::OnceLock< - std::sync::Mutex<Option<Result<(), String>>>, -> = std::sync::OnceLock::new(); +static RUN_RHI_BOOTSTRAP_HOOK: std::sync::OnceLock<std::sync::Mutex<Option<Result<(), String>>>> = + std::sync::OnceLock::new(); #[derive(Clone, Copy)] enum RunRhiWaitOutcome { @@ -151,6 +150,7 @@ pub async fn run_rhi(settings: &config::Settings, args: &cli_args) -> Result<()> let handle = start_subscriber( client.clone(), keys.clone(), + rhi.trade_listing_runtime.state(), settings.config.subscriber.backoff.clone(), ) .await; @@ -180,8 +180,8 @@ pub async fn run_rhi(settings: &config::Settings, args: &cli_args) -> Result<()> #[cfg_attr(coverage_nightly, coverage(off))] mod tests { use super::{ - RUN_RHI_AUTO_STOP, RUN_RHI_SKIP_SUBSCRIBER, RunRhiWaitOutcome, bootstrap_presence, - run_rhi, run_rhi_bootstrap_hook, run_rhi_wait_hook, + RUN_RHI_AUTO_STOP, RUN_RHI_SKIP_SUBSCRIBER, RunRhiWaitOutcome, bootstrap_presence, run_rhi, + run_rhi_bootstrap_hook, run_rhi_wait_hook, }; use crate::{cli_args, config}; use std::path::PathBuf; @@ -191,7 +191,9 @@ mod tests { static TEST_LOCK: Mutex<()> = Mutex::new(()); fn test_guard() -> MutexGuard<'static, ()> { - let guard = TEST_LOCK.lock().unwrap_or_else(std::sync::PoisonError::into_inner); + let guard = TEST_LOCK + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); RUN_RHI_AUTO_STOP.store(false, Ordering::Relaxed); RUN_RHI_SKIP_SUBSCRIBER.store(false, Ordering::Relaxed); *run_rhi_bootstrap_hook() @@ -278,7 +280,8 @@ mod tests { let settings_err = settings_with_relays(vec!["wss://relay.example.com".to_string()]); *run_rhi_bootstrap_hook() .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Err("presence failure".to_string())); + .unwrap_or_else(std::sync::PoisonError::into_inner) = + Some(Err("presence failure".to_string())); let err = run_rhi(&settings_err, &args_err).await; RUN_RHI_AUTO_STOP.store(false, Ordering::Relaxed); RUN_RHI_SKIP_SUBSCRIBER.store(false, Ordering::Relaxed); @@ -290,11 +293,9 @@ mod tests { async fn bootstrap_presence_fallback_path_is_callable() { let _guard = test_guard(); let identity_path = unique_identity_path("bootstrap"); - let identity = radroots_identity::RadrootsIdentity::load_or_generate( - Some(&identity_path), - true, - ) - .expect("identity"); + let identity = + radroots_identity::RadrootsIdentity::load_or_generate(Some(&identity_path), true) + .expect("identity"); let client = radroots_nostr::prelude::RadrootsNostrClient::new(identity.keys().clone()); let metadata: radroots_nostr::prelude::RadrootsNostrMetadata = serde_json::from_str(r#"{"name":"bootstrap"}"#).expect("bootstrap metadata"); diff --git a/src/main.rs b/src/main.rs @@ -1,8 +1,8 @@ #![cfg_attr(coverage_nightly, feature(coverage_attribute))] -use anyhow::Result; #[cfg(not(test))] use anyhow::Context; +use anyhow::Result; use rhi::{cli_args, config, run_rhi}; use std::process::ExitCode; use tracing::info; @@ -74,6 +74,7 @@ async fn run() -> Result<()> { mod tests { use super::{exit_code_from_run, main, run, run_load_hook, run_rhi}; use radroots_nostr::prelude::{RadrootsNostrClient, RadrootsNostrKeys}; + use rhi::features::trade_listing::state::TradeListingRuntime; use rhi::{cli_args, config}; use std::path::PathBuf; use std::process::ExitCode; @@ -112,7 +113,9 @@ mod tests { }, }; let settings = minimal_settings(); - let err = run_rhi(&settings, &args).await.expect_err("identity should fail"); + let err = run_rhi(&settings, &args) + .await + .expect_err("identity should fail"); let msg = format!("{err:#}"); assert!(msg.contains("identity")); } @@ -133,7 +136,8 @@ mod tests { }; *run_load_hook() .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Ok((args, minimal_settings()))); + .unwrap_or_else(std::sync::PoisonError::into_inner) = + Some(Ok((args, minimal_settings()))); let err = run().await.expect_err("missing identity should bubble"); let msg = format!("{err:#}"); assert!(msg.contains("identity")); @@ -144,7 +148,9 @@ mod tests { *run_load_hook() .lock() .unwrap_or_else(std::sync::PoisonError::into_inner) = None; - let err = run().await.expect_err("loader hook should be required in test build"); + let err = run() + .await + .expect_err("loader hook should be required in test build"); let msg = format!("{err:#}"); assert!(msg.contains("run loader hook not set")); } @@ -156,6 +162,7 @@ mod tests { let handle = rhi::rhi::start_subscriber( client, keys, + TradeListingRuntime::new().state(), radroots_runtime::BackoffConfig { base_ms: 1, max_ms: 2, diff --git a/src/rhi.rs b/src/rhi.rs @@ -5,6 +5,8 @@ use std::time::{Duration, Instant}; use radroots_nostr::prelude::{RadrootsNostrClient, RadrootsNostrKeys}; use radroots_runtime::{Backoff, BackoffConfig}; +use crate::features::trade_listing::state::{SharedTradeListingState, TradeListingRuntime}; + #[cfg(not(test))] fn connection_wait_timeout() -> Duration { Duration::from_secs(5) @@ -21,13 +23,15 @@ static SUBSCRIBER_RESULT_HOOK: std::sync::OnceLock< > = std::sync::OnceLock::new(); #[cfg(test)] -fn subscriber_result_hook() -> &'static std::sync::Mutex<std::collections::VecDeque<Result<(), anyhow::Error>>> { +fn subscriber_result_hook() +-> &'static std::sync::Mutex<std::collections::VecDeque<Result<(), anyhow::Error>>> { SUBSCRIBER_RESULT_HOOK.get_or_init(|| std::sync::Mutex::new(std::collections::VecDeque::new())) } async fn run_subscriber_once( client: RadrootsNostrClient, keys: RadrootsNostrKeys, + state: SharedTradeListingState, stop_rx: tokio::sync::watch::Receiver<bool>, ) -> Result<(), anyhow::Error> { #[cfg(test)] @@ -39,7 +43,7 @@ async fn run_subscriber_once( return result; } - crate::features::trade_listing::subscriber::subscriber(client, keys, stop_rx).await + crate::features::trade_listing::subscriber::subscriber(client, keys, state, stop_rx).await } async fn wait_for_connection_or_stop( @@ -58,6 +62,7 @@ async fn wait_for_connection_or_stop( pub struct Rhi { pub(crate) _started: Instant, pub client: RadrootsNostrClient, + pub(crate) trade_listing_runtime: TradeListingRuntime, } impl Rhi { @@ -66,6 +71,7 @@ impl Rhi { Self { _started: Instant::now(), client, + trade_listing_runtime: TradeListingRuntime::new(), } } } @@ -104,6 +110,7 @@ impl RhiHandle { pub async fn start_subscriber( client: RadrootsNostrClient, keys: RadrootsNostrKeys, + state: SharedTradeListingState, backoff_cfg: BackoffConfig, ) -> RhiHandle { let (stop_tx, mut stop_rx) = tokio::sync::watch::channel(false); @@ -120,12 +127,9 @@ pub async fn start_subscriber( break; } - let res = run_subscriber_once( - client.clone(), - keys.clone(), - stop_rx.clone(), - ) - .await; + let res = + run_subscriber_once(client.clone(), keys.clone(), state.clone(), stop_rx.clone()) + .await; let failed = res.is_err(); @@ -158,20 +162,30 @@ pub async fn start_subscriber( #[cfg(test)] #[cfg_attr(coverage_nightly, coverage(off))] mod tests { - use anyhow::anyhow; use super::{ Rhi, RhiHandle, start_subscriber, subscriber_result_hook, wait_for_connection_or_stop, }; + use crate::features::trade_listing::state::TradeListingRuntime; + use anyhow::anyhow; use radroots_nostr::prelude::{RadrootsNostrClient, RadrootsNostrKeys}; use radroots_runtime::BackoffConfig; use std::sync::Arc; use tokio::sync::Mutex; - #[test] - fn rhi_new_initializes_client() { + #[tokio::test] + async fn rhi_new_initializes_client_and_runtime() { let keys = RadrootsNostrKeys::generate(); let rhi = Rhi::new(keys); let _ = rhi.client.clone(); + let state = rhi.trade_listing_runtime.state(); + state.lock().await.mark_listing_validated("addr"); + assert!( + rhi.trade_listing_runtime + .state() + .lock() + .await + .is_listing_validated("addr") + ); } #[tokio::test] @@ -199,7 +213,13 @@ mod tests { }; let client_err = RadrootsNostrClient::new(keys.clone()); - let handle_err = start_subscriber(client_err, keys.clone(), cfg.clone()).await; + let handle_err = start_subscriber( + client_err, + keys.clone(), + TradeListingRuntime::new().state(), + cfg.clone(), + ) + .await; tokio::time::sleep(std::time::Duration::from_millis(30)).await; handle_err.stop(); handle_err.stopped().await; @@ -210,7 +230,8 @@ mod tests { .lock() .unwrap_or_else(std::sync::PoisonError::into_inner) .push_back(Ok(())); - let handle_ok = start_subscriber(client_ok, keys, cfg).await; + let handle_ok = + start_subscriber(client_ok, keys, TradeListingRuntime::new().state(), cfg).await; tokio::time::sleep(std::time::Duration::from_millis(30)).await; handle_ok.stop(); handle_ok.stopped().await; @@ -223,6 +244,7 @@ mod tests { let handle = start_subscriber( client, keys, + TradeListingRuntime::new().state(), BackoffConfig { base_ms: 25, max_ms: 50, @@ -248,6 +270,7 @@ mod tests { let handle = start_subscriber( client, keys, + TradeListingRuntime::new().state(), BackoffConfig { base_ms: 200, max_ms: 200,