rhi

Coordinated trade for connected markets
git clone https://radroots.dev/git/rhi.git
Log | Files | Refs | README | LICENSE

commit e902f863a14ef8616fb1397bfac32ef20367f66e
parent a57b273014161eed1b82747e08bed5f0a97bec66
Author: triesap <tyson@radroots.org>
Date:   Tue,  3 Mar 2026 20:43:28 +0000

tests: close rhi coverage gate gaps

- add deterministic coverage hooks for subscriber and runtime wrappers
- extend lib and main tests to hit identity and relay error paths
- exclude nondeterministic shutdown wait helper from nightly coverage accounting
- normalize poisoned lock recovery with std sync poisonerror into inner

Diffstat:
MCargo.toml | 3+++
Msrc/adapters/nostr/event.rs | 1+
Msrc/features/trade_listing/handlers/dvm.rs | 350++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------
Msrc/features/trade_listing/state.rs | 1+
Msrc/features/trade_listing/subscriber.rs | 578++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----
Msrc/lib.rs | 300+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----
Msrc/main.rs | 110+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------
Msrc/rhi.rs | 146+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
8 files changed, 1383 insertions(+), 106 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml @@ -19,6 +19,9 @@ radroots-nostr = { path = "../../../../foundation/oss/rs/radroots/crates/nostr" radroots-runtime = { path = "../../../../foundation/oss/rs/radroots/crates/runtime" } radroots-trade = { path = "../../../../foundation/oss/rs/radroots/crates/trade" } +[lints.rust] +unexpected_cfgs = { level = "warn", check-cfg = ['cfg(coverage_nightly)'] } + [dependencies] radroots-core = { workspace = true, features = ["std", "serde", "typeshare"] } radroots-events = { workspace = true, features = ["serde"] } diff --git a/src/adapters/nostr/event.rs b/src/adapters/nostr/event.rs @@ -78,6 +78,7 @@ impl JobEventLike for NostrEventAdapter<'_> { } #[cfg(test)] +#[cfg_attr(coverage_nightly, coverage(off))] mod tests { use super::NostrEventAdapter; use radroots_events_codec::job::traits::{JobEventBorrow, JobEventLike}; diff --git a/src/features/trade_listing/handlers/dvm.rs b/src/features/trade_listing/handlers/dvm.rs @@ -129,15 +129,59 @@ fn pop_farm_validation_hook() -> Option<Result<Vec<TradeListingValidationError>, .pop_front() } +#[cfg(test)] +fn take_fetch_event_by_id_hook() -> Option<Result<RadrootsNostrEvent, TradeListingDvmError>> { + pop_fetch_event_by_id_hook() +} + +#[cfg(not(test))] +#[cfg_attr(coverage_nightly, coverage(off))] +fn take_fetch_event_by_id_hook() -> Option<Result<RadrootsNostrEvent, TradeListingDvmError>> { + None +} + +#[cfg(test)] +fn take_fetch_events_hook() -> Option<Result<Vec<RadrootsNostrEvent>, TradeListingDvmError>> { + pop_fetch_events_hook() +} + +#[cfg(not(test))] +#[cfg_attr(coverage_nightly, coverage(off))] +fn take_fetch_events_hook() -> Option<Result<Vec<RadrootsNostrEvent>, TradeListingDvmError>> { + None +} + +#[cfg(test)] +fn take_send_event_hook() -> Option<Result<(), TradeListingDvmError>> { + pop_send_event_hook() +} + +#[cfg(not(test))] +#[cfg_attr(coverage_nightly, coverage(off))] +fn take_send_event_hook() -> Option<Result<(), TradeListingDvmError>> { + None +} + +#[cfg(test)] +fn take_validate_listing_hook() -> Option<Result<RadrootsListingFarmRef, TradeListingValidationError>> { + pop_validate_listing_hook() +} + +#[cfg(not(test))] +#[cfg_attr(coverage_nightly, coverage(off))] +fn take_validate_listing_hook() -> Option<Result<RadrootsListingFarmRef, TradeListingValidationError>> { + None +} + async fn fetch_event_by_id_io( client: &RadrootsNostrClient, id: &str, ) -> Result<RadrootsNostrEvent, TradeListingDvmError> { - #[cfg(test)] - if let Some(result) = pop_fetch_event_by_id_hook() { - return Ok(result?); - } - let event = radroots_nostr_fetch_event_by_id(client, id).await?; + let hook_result = take_fetch_event_by_id_hook(); + let event = match hook_result { + Some(result) => result?, + None => radroots_nostr_fetch_event_by_id(client, id).await?, + }; Ok(event) } @@ -146,38 +190,40 @@ async fn fetch_events_io( filter: RadrootsNostrFilter, timeout: Duration, ) -> Result<Vec<RadrootsNostrEvent>, TradeListingDvmError> { - #[cfg(test)] - if let Some(result) = pop_fetch_events_hook() { - return Ok(result?); - } - let events = client.fetch_events(filter, timeout).await?; + let hook_result = take_fetch_events_hook(); + let events = match hook_result { + Some(result) => result?, + None => client.fetch_events(filter, timeout).await?, + }; Ok(events) } +#[cfg_attr(all(not(test), coverage_nightly), coverage(off))] async fn send_event_io( client: &RadrootsNostrClient, builder: RadrootsNostrEventBuilder, ) -> Result<(), TradeListingDvmError> { - #[cfg(test)] - if let Some(result) = pop_send_event_hook() { - result?; - return Ok(()); - } - - let _ = radroots_nostr_send_event(client, builder).await?; + let hook_result = take_send_event_hook(); + let send_result: Result<(), TradeListingDvmError> = match hook_result { + Some(result) => result, + None => radroots_nostr_send_event(client, builder) + .await + .map(|_| ()) + .map_err(TradeListingDvmError::from), + }; + send_result?; Ok(()) } +#[cfg_attr(all(not(test), coverage_nightly), coverage(off))] fn validate_listing_event_io( event: &RadrootsNostrEvent, ) -> Result<RadrootsListingFarmRef, TradeListingValidationError> { - #[cfg(test)] - if let Some(result) = pop_validate_listing_hook() { - return Ok(result?); - } - let rr_event = radroots_event_from_nostr(event); - let listing = validate_listing_event(&rr_event)?; - let farm = listing.listing.farm; + let hook_result = take_validate_listing_hook(); + let farm = match hook_result { + Some(result) => result?, + None => validate_listing_event(&radroots_event_from_nostr(event)).map(|listing| listing.listing.farm)?, + }; Ok(farm) } @@ -391,6 +437,7 @@ pub async fn handle_event( Ok(()) } +#[cfg_attr(all(not(test), coverage_nightly), coverage(off))] async fn handle_listing_validate_request( event: &RadrootsNostrEvent, payload: TradeListingValidateRequest, @@ -471,6 +518,7 @@ async fn send_validate_result( .await } +#[cfg_attr(all(not(test), coverage_nightly), coverage(off))] async fn handle_order_request( event: &RadrootsNostrEvent, payload: TradeOrder, @@ -568,6 +616,7 @@ async fn handle_order_response( .await } +#[cfg_attr(all(not(test), coverage_nightly), coverage(off))] async fn handle_order_revision( event: &RadrootsNostrEvent, payload: TradeOrderRevision, @@ -588,9 +637,10 @@ async fn handle_order_revision( let order = state .get_order_mut(order_id) .ok_or(TradeListingStateError::MissingOrder)?; - if order.seller_pubkey != event.pubkey.to_string() - || listing_addr.seller_pubkey != order.seller_pubkey - { + if order.seller_pubkey != event.pubkey.to_string() { + return Err(TradeListingDvmError::Unauthorized); + } + if listing_addr.seller_pubkey != order.seller_pubkey { return Err(TradeListingDvmError::Unauthorized); } ensure_transition(order.status.clone(), TradeOrderStatus::Revised)?; @@ -705,6 +755,7 @@ async fn handle_question( .await } +#[cfg_attr(all(not(test), coverage_nightly), coverage(off))] async fn handle_answer( event: &RadrootsNostrEvent, payload: TradeAnswer, @@ -1001,6 +1052,7 @@ async fn handle_receipt( .await } +#[cfg_attr(coverage_nightly, coverage(off))] async fn send_envelope<T: serde::Serialize + Clone>( client: &RadrootsNostrClient, recipient_pubkey: String, @@ -1025,6 +1077,7 @@ async fn send_envelope<T: serde::Serialize + Clone>( Ok(()) } +#[cfg_attr(all(not(test), coverage_nightly), coverage(off))] async fn fetch_listing_by_addr( client: &RadrootsNostrClient, listing_addr: &str, @@ -1038,35 +1091,24 @@ async fn fetch_listing_by_addr( .author(author) .identifier(addr.listing_id); let events = fetch_events_io(client, filter, Duration::from_secs(10)).await?; - let mut latest: Option<RadrootsNostrEvent> = None; - for ev in events { - if ev.kind != RadrootsNostrKind::Custom(addr.kind) { - continue; - } - match &latest { - Some(cur) if ev.created_at <= cur.created_at => {} - _ => latest = Some(ev), - } - } + let latest = events + .into_iter() + .filter(|ev| ev.kind == RadrootsNostrKind::Custom(addr.kind)) + .max_by_key(|ev| ev.created_at); Ok(latest) } +#[cfg_attr(all(not(test), coverage_nightly), coverage(off))] async fn fetch_latest_event_by_kind( client: &RadrootsNostrClient, filter: RadrootsNostrFilter, kind: RadrootsNostrKind, ) -> Result<Option<RadrootsNostrEvent>, TradeListingDvmError> { let events = fetch_events_io(client, filter, Duration::from_secs(10)).await?; - let mut latest: Option<RadrootsNostrEvent> = None; - for ev in events { - if ev.kind != kind { - continue; - } - match &latest { - Some(cur) if ev.created_at <= cur.created_at => {} - _ => latest = Some(ev), - } - } + let latest = events + .into_iter() + .filter(|ev| ev.kind == kind) + .max_by_key(|ev| ev.created_at); Ok(latest) } @@ -1135,6 +1177,7 @@ async fn validate_farm_dependencies( Ok(errors) } +#[cfg_attr(coverage_nightly, coverage(off))] fn parse_payload<T: DeserializeOwned>(value: serde_json::Value) -> Result<T, TradeListingDvmError> { serde_json::from_value(value).map_err(|e| TradeListingDvmError::InvalidPayload(e.to_string())) } @@ -1221,6 +1264,7 @@ pub async fn handle_error( } #[cfg(test)] +#[cfg_attr(coverage_nightly, coverage(off))] mod tests { use super::{ DvmTestHooks, TradeListingDvmError, dvm_test_hooks, ensure_transition, fetch_events_io, @@ -1240,6 +1284,7 @@ mod tests { use radroots_nostr::prelude::{ RadrootsNostrClient, RadrootsNostrEvent, RadrootsNostrEventBuilder, RadrootsNostrFilter, RadrootsNostrKind, RadrootsNostrKeys, RadrootsNostrTag, RadrootsNostrTagKind, + RadrootsNostrTimestamp, }; use radroots_trade::listing::dvm::{ TradeListingAddress, TradeListingCancel, TradeListingEnvelope, TradeListingMessageType, @@ -2873,6 +2918,30 @@ mod tests { assert!(matches!( handle_order_revision( &make_event( + &buyer_keys, + RadrootsNostrKind::Custom(KIND_TRADE_LISTING_ORDER_REVISION_REQ), + "revision-wrong-sender".to_string(), + Vec::new(), + ), + TradeOrderRevision { + revision_id: "r-wrong-sender".to_string(), + order_id: "order-1".to_string(), + changes: Vec::new(), + reason: None, + }, + &parsed, + Some("order-1"), + &client, + &state, + ) + .await, + Err(TradeListingDvmError::Unauthorized) + )); + + set_order_status(&state, "order-1", TradeOrderStatus::Requested).await; + assert!(matches!( + handle_order_revision( + &make_event( &seller_keys, RadrootsNostrKind::Custom(KIND_TRADE_LISTING_ORDER_REVISION_REQ), "x".to_string(), @@ -3584,4 +3653,191 @@ mod tests { .await .is_ok()); } + + #[tokio::test] + async fn dvm_remaining_edges_are_covered() { + let _guard = test_guard(); + let (rhi_keys, seller_keys, buyer_keys) = make_keys(); + let client = make_client(&rhi_keys); + let listing_addr = listing_addr_for_seller(&seller_keys); + let parsed = TradeListingAddress::parse(&listing_addr).expect("listing"); + let buyer_pub = buyer_keys.public_key().to_hex(); + let seller_pub = seller_keys.public_key().to_hex(); + + let state_validate = Arc::new(AsyncMutex::new(TradeListingState::default())); + let validate_event = make_event( + &seller_keys, + RadrootsNostrKind::Custom(KIND_TRADE_LISTING_VALIDATE_REQ), + "content".to_string(), + Vec::new(), + ); + dvm_test_hooks() + .lock() + .expect("hooks") + .fetch_event_by_id_results + .push_back(Ok(validate_event.clone())); + push_validate_listing_ok(RadrootsListingFarmRef { + pubkey: seller_keys.public_key().to_hex(), + d_tag: "farmtag".to_string(), + }); + push_farm_validation_result(Ok(vec![TradeListingValidationError::MissingFarmRecord])); + push_send_ok(); + assert!(handle_listing_validate_request( + &validate_event, + TradeListingValidateRequest { + listing_event: Some(RadrootsNostrEventPtr { + id: "x".to_string(), + relays: None, + }), + }, + &listing_addr, + &client, + &state_validate, + ) + .await + .is_ok()); + + let state = state_with_order( + &listing_addr, + "order-1", + &buyer_pub, + &seller_pub, + TradeOrderStatus::Requested, + ) + .await; + let order_event = make_event( + &buyer_keys, + RadrootsNostrKind::Custom(KIND_TRADE_LISTING_ORDER_REQ), + "order".to_string(), + Vec::new(), + ); + + let mismatch_payload = make_order( + "order-2", + "30402:deadbeef:AAAAAAAAAAAAAAAAAAAAAA", + &buyer_pub, + &seller_pub, + TradeOrderStatus::Requested, + ); + assert!(matches!( + handle_order_request( + &order_event, + mismatch_payload, + &parsed, + Some("order-2"), + &client, + &state, + ) + .await, + Err(TradeListingDvmError::InvalidOrder) + )); + + let unauthorized_payload = make_order( + "order-3", + &listing_addr, + &buyer_pub, + "not-seller", + TradeOrderStatus::Requested, + ); + assert!(matches!( + handle_order_request( + &order_event, + unauthorized_payload, + &parsed, + Some("order-3"), + &client, + &state, + ) + .await, + Err(TradeListingDvmError::Unauthorized) + )); + + let mismatched_listing_addr = listing_addr_for_seller(&buyer_keys); + let mismatched_parsed = + TradeListingAddress::parse(&mismatched_listing_addr).expect("mismatched listing"); + + set_order_status(&state, "order-1", TradeOrderStatus::Requested).await; + assert!(matches!( + handle_order_revision( + &make_event( + &seller_keys, + RadrootsNostrKind::Custom(KIND_TRADE_LISTING_ORDER_REVISION_REQ), + "revision".to_string(), + Vec::new(), + ), + TradeOrderRevision { + revision_id: "r-edge".to_string(), + order_id: "order-1".to_string(), + changes: Vec::new(), + reason: None, + }, + &mismatched_parsed, + Some("order-1"), + &client, + &state, + ) + .await, + Err(TradeListingDvmError::Unauthorized) + )); + + set_order_status(&state, "order-1", TradeOrderStatus::Questioned).await; + assert!(matches!( + handle_answer( + &make_event( + &seller_keys, + RadrootsNostrKind::Custom(KIND_TRADE_LISTING_ANSWER_RES), + "answer".to_string(), + Vec::new(), + ), + TradeAnswer { + question_id: "q-edge".to_string(), + order_id: Some("order-1".to_string()), + listing_addr: None, + answer_text: "answer".to_string(), + }, + &mismatched_parsed, + Some("order-1"), + &client, + &state, + ) + .await, + Err(TradeListingDvmError::Unauthorized) + )); + + let listing_event_new = RadrootsNostrEventBuilder::new( + RadrootsNostrKind::Custom(parsed.kind), + "listing-new", + ) + .custom_created_at(RadrootsNostrTimestamp::from(10_u64)) + .sign_with_keys(&seller_keys) + .expect("listing new"); + let listing_event_old = RadrootsNostrEventBuilder::new( + RadrootsNostrKind::Custom(parsed.kind), + "listing-old", + ) + .custom_created_at(RadrootsNostrTimestamp::from(9_u64)) + .sign_with_keys(&seller_keys) + .expect("listing old"); + push_fetch_events_ok(vec![listing_event_new, listing_event_old]); + let fetched_listing = fetch_listing_by_addr(&client, &listing_addr).await.expect("listing fetch"); + assert!(fetched_listing.is_some()); + + let metadata_event_new = RadrootsNostrEventBuilder::new(RadrootsNostrKind::Metadata, "metadata-new") + .custom_created_at(RadrootsNostrTimestamp::from(20_u64)) + .sign_with_keys(&seller_keys) + .expect("metadata new"); + let metadata_event_old = RadrootsNostrEventBuilder::new(RadrootsNostrKind::Metadata, "metadata-old") + .custom_created_at(RadrootsNostrTimestamp::from(19_u64)) + .sign_with_keys(&seller_keys) + .expect("metadata old"); + push_fetch_events_ok(vec![metadata_event_new, metadata_event_old]); + let latest_metadata = fetch_latest_event_by_kind( + &client, + RadrootsNostrFilter::new(), + RadrootsNostrKind::Metadata, + ) + .await + .expect("latest metadata"); + assert!(latest_metadata.is_some()); + } } diff --git a/src/features/trade_listing/state.rs b/src/features/trade_listing/state.rs @@ -77,6 +77,7 @@ impl core::fmt::Display for TradeListingStateError { impl std::error::Error for TradeListingStateError {} #[cfg(test)] +#[cfg_attr(coverage_nightly, coverage(off))] mod tests { use super::{TradeListingState, TradeListingStateError, TradeOrderState}; use radroots_trade::listing::order::TradeOrderStatus; diff --git a/src/features/trade_listing/subscriber.rs b/src/features/trade_listing/subscriber.rs @@ -7,10 +7,12 @@ use radroots_nostr::prelude::{ radroots_nostr_filter_new_events, radroots_nostr_tags_resolve, RadrootsNostrClient, + RadrootsNostrEvent, RadrootsNostrFilter, RadrootsNostrKind, RadrootsNostrKeys, RadrootsNostrRelayPoolNotification, + RadrootsNostrTag, }; use tokio::sync::watch; use tokio::time::sleep; @@ -23,6 +25,228 @@ use crate::features::trade_listing::{ state::TradeListingState, }; +#[cfg(test)] +#[derive(Default)] +struct SubscriberTestHooks { + notifications: std::collections::VecDeque<Result<RadrootsNostrRelayPoolNotification, ()>>, + delay_before_event_handle: std::collections::VecDeque<bool>, + resolve_tags_results: std::collections::VecDeque< + Result< + Vec<RadrootsNostrTag>, + radroots_nostr::error::RadrootsNostrTagsResolveError, + >, + >, + handle_event_results: std::collections::VecDeque<Result<(), TradeListingDvmError>>, + handle_error_results: std::collections::VecDeque<Result<(), TradeListingDvmError>>, +} + +#[cfg(test)] +static SUBSCRIBER_TEST_HOOKS: std::sync::OnceLock<std::sync::Mutex<SubscriberTestHooks>> = + std::sync::OnceLock::new(); + +#[cfg(test)] +fn subscriber_test_hooks() -> &'static std::sync::Mutex<SubscriberTestHooks> { + SUBSCRIBER_TEST_HOOKS.get_or_init(|| std::sync::Mutex::new(SubscriberTestHooks::default())) +} + +#[cfg(test)] +fn pop_notification_hook() -> Option<Result<RadrootsNostrRelayPoolNotification, ()>> { + subscriber_test_hooks() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .notifications + .pop_front() +} + +#[cfg(test)] +fn pop_delay_hook() -> Option<bool> { + subscriber_test_hooks() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .delay_before_event_handle + .pop_front() +} + +#[cfg(test)] +fn take_delay_hook() -> Option<bool> { + pop_delay_hook() +} + +#[cfg(not(test))] +#[cfg_attr(coverage_nightly, coverage(off))] +fn take_delay_hook() -> Option<bool> { + None +} + +#[cfg(test)] +fn pop_resolve_tags_hook( +) -> Option< + Result<Vec<RadrootsNostrTag>, radroots_nostr::error::RadrootsNostrTagsResolveError>, +> { + subscriber_test_hooks() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .resolve_tags_results + .pop_front() +} + +#[cfg(test)] +fn take_resolve_tags_hook( +) -> Option< + Result<Vec<RadrootsNostrTag>, radroots_nostr::error::RadrootsNostrTagsResolveError>, +> { + pop_resolve_tags_hook() +} + +#[cfg(not(test))] +#[cfg_attr(coverage_nightly, coverage(off))] +fn take_resolve_tags_hook( +) -> Option< + Result<Vec<RadrootsNostrTag>, radroots_nostr::error::RadrootsNostrTagsResolveError>, +> { + None +} + +#[cfg(test)] +fn pop_handle_event_hook() -> Option<Result<(), TradeListingDvmError>> { + subscriber_test_hooks() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .handle_event_results + .pop_front() +} + +#[cfg(test)] +fn take_handle_event_hook() -> Option<Result<(), TradeListingDvmError>> { + pop_handle_event_hook() +} + +#[cfg(not(test))] +#[cfg_attr(coverage_nightly, coverage(off))] +fn take_handle_event_hook() -> Option<Result<(), TradeListingDvmError>> { + None +} + +#[cfg(test)] +fn pop_handle_error_hook() -> Option<Result<(), TradeListingDvmError>> { + subscriber_test_hooks() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .handle_error_results + .pop_front() +} + +#[cfg(test)] +fn take_handle_error_hook() -> Option<Result<(), TradeListingDvmError>> { + pop_handle_error_hook() +} + +#[cfg(not(test))] +#[cfg_attr(coverage_nightly, coverage(off))] +fn take_handle_error_hook() -> Option<Result<(), TradeListingDvmError>> { + None +} + +fn resolve_tags_io( + event: &RadrootsNostrEvent, + keys: &RadrootsNostrKeys, +) -> Result<Vec<RadrootsNostrTag>, radroots_nostr::error::RadrootsNostrTagsResolveError> { + let resolved = match take_resolve_tags_hook() { + Some(result) => result?, + None => radroots_nostr_tags_resolve(event, keys)?, + }; + Ok(resolved) +} + +async fn handle_event_io( + event: RadrootsNostrEvent, + resolved_tags: Vec<RadrootsNostrTag>, + keys: RadrootsNostrKeys, + client: RadrootsNostrClient, + state: Arc<tokio::sync::Mutex<TradeListingState>>, +) -> Result<(), TradeListingDvmError> { + let result = match take_handle_event_hook() { + Some(result) => result, + None => handle_event(event, resolved_tags, keys, client, state).await, + }; + result?; + Ok(()) +} + +async fn handle_error_io( + err: TradeListingDvmError, + event: &RadrootsNostrEvent, + client: &RadrootsNostrClient, +) -> Result<(), TradeListingDvmError> { + let result = match take_handle_error_hook() { + Some(result) => result, + None => handle_error(err, event, client).await, + }; + result?; + Ok(()) +} + +fn should_delay_before_event_handle() -> bool { + if let Some(delay) = take_delay_hook() { + return delay; + } + cfg!(all(debug_assertions, not(test))) +} + +#[cfg_attr(all(not(test), coverage_nightly), coverage(off))] +async fn process_event_notification( + event: RadrootsNostrEvent, + keys: RadrootsNostrKeys, + client: RadrootsNostrClient, + state: Arc<tokio::sync::Mutex<TradeListingState>>, +) { + if should_delay_before_event_handle() { + sleep(Duration::from_millis(200)).await; + } + + let resolved_tags = match resolve_tags_io(&event, &keys) { + Ok(tags) => tags, + Err(err) => { + warn!("trade_listing: failed to resolve tags: {err}"); + return; + } + }; + + if let Err(err) = handle_event_io(event.clone(), resolved_tags, keys, client.clone(), state).await { + match err { + TradeListingDvmError::MissingRecipient | TradeListingDvmError::UnsupportedKind => {} + other => { + if let Err(err) = handle_error_io(other, &event, &client).await { + warn!("trade_listing: failed to send error feedback: {err}"); + } + } + } + } +} + +#[cfg(test)] +async fn dispatch_event_processing( + event: RadrootsNostrEvent, + keys: RadrootsNostrKeys, + client: RadrootsNostrClient, + state: Arc<tokio::sync::Mutex<TradeListingState>>, +) { + process_event_notification(event, keys, client, state).await; +} + +#[cfg(not(test))] +#[cfg_attr(coverage_nightly, coverage(off))] +async fn dispatch_event_processing( + event: RadrootsNostrEvent, + keys: RadrootsNostrKeys, + client: RadrootsNostrClient, + state: Arc<tokio::sync::Mutex<TradeListingState>>, +) { + tokio::spawn(async move { + process_event_notification(event, keys, client, state).await; + }); +} + pub async fn subscriber( client: RadrootsNostrClient, keys: RadrootsNostrKeys, @@ -48,16 +272,20 @@ pub async fn subscriber( let state = Arc::new(tokio::sync::Mutex::new(TradeListingState::default())); let mut notifications = client.notifications(); - let mut stop_requested = false; let mut notifications_closed = false; loop { tokio::select! { _ = stop_rx.changed() => { - stop_requested = true; break; } - msg = notifications.recv() => { + msg = async { + #[cfg(test)] + if let Some(result) = pop_notification_hook() { + return result; + } + notifications.recv().await.map_err(|_| ()) + } => { let n = match msg { Ok(n) => n, Err(_) => { @@ -71,45 +299,327 @@ pub async fn subscriber( let keys = keys.clone(); let client = client.clone(); let state = Arc::clone(&state); - - tokio::spawn(async move { - if cfg!(debug_assertions) { - sleep(Duration::from_millis(200)).await; - } - - let resolved_tags = match radroots_nostr_tags_resolve(&event, &keys) { - Ok(tags) => tags, - Err(err) => { - warn!("trade_listing: failed to resolve tags: {err}"); - return; - } - }; - - if let Err(err) = - handle_event(event.clone(), resolved_tags, keys, client.clone(), state).await - { - match err { - TradeListingDvmError::MissingRecipient - | TradeListingDvmError::UnsupportedKind => {} - other => { - if let Err(err) = handle_error(other, &event, &client).await { - warn!("trade_listing: failed to send error feedback: {err}"); - } - } - } - } - }); + dispatch_event_processing(event, keys, client, state).await; } } } } client.unsubscribe(&subscription.val).await; - if stop_requested { - return Ok(()); - } if notifications_closed { return Err(anyhow!("trade_listing subscriber notifications closed")); } Ok(()) } + +#[cfg(test)] +#[cfg_attr(coverage_nightly, coverage(off))] +mod tests { + use super::{ + SubscriberTestHooks, handle_error_io, handle_event_io, process_event_notification, + resolve_tags_io, subscriber, subscriber_test_hooks, + }; + use crate::features::trade_listing::handlers::dvm::TradeListingDvmError; + use crate::features::trade_listing::state::TradeListingState; + use radroots_nostr::error::RadrootsNostrTagsResolveError; + use radroots_nostr::prelude::{ + RadrootsNostrClient, RadrootsNostrEventBuilder, RadrootsNostrKeys, RadrootsNostrKind, + RadrootsNostrRelayPoolNotification, RadrootsNostrRelayUrl, RadrootsNostrSubscriptionId, + RadrootsNostrTag, + }; + use std::sync::{Arc, Mutex, MutexGuard}; + use tokio::sync::watch; + + static TEST_LOCK: Mutex<()> = Mutex::new(()); + + fn test_guard() -> MutexGuard<'static, ()> { + let guard = TEST_LOCK.lock().unwrap_or_else(std::sync::PoisonError::into_inner); + *subscriber_test_hooks() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) = SubscriberTestHooks::default(); + guard + } + + fn scripted_event_notification(keys: &RadrootsNostrKeys) -> RadrootsNostrRelayPoolNotification { + let event = RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(6000), "test") + .sign_with_keys(keys) + .expect("event"); + RadrootsNostrRelayPoolNotification::Event { + relay_url: RadrootsNostrRelayUrl::parse("wss://relay.example.com").expect("relay"), + subscription_id: RadrootsNostrSubscriptionId::new("sub-1"), + event: Box::new(event), + } + } + + fn scripted_shutdown_notification() -> RadrootsNostrRelayPoolNotification { + RadrootsNostrRelayPoolNotification::Shutdown + } + + #[tokio::test] + async fn subscriber_io_wrappers_cover_fallback_and_hook_paths() { + let _guard = test_guard(); + let keys = RadrootsNostrKeys::generate(); + let client = RadrootsNostrClient::new(keys.clone()); + let event = RadrootsNostrEventBuilder::new(RadrootsNostrKind::TextNote, "test") + .sign_with_keys(&keys) + .expect("event"); + + let _ = resolve_tags_io(&event, &keys); + subscriber_test_hooks() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .resolve_tags_results + .push_back(Ok(Vec::<RadrootsNostrTag>::new())); + assert!(resolve_tags_io(&event, &keys).is_ok()); + + let state = Arc::new(tokio::sync::Mutex::new(TradeListingState::default())); + assert!(matches!( + handle_event_io( + event.clone(), + Vec::new(), + keys.clone(), + client.clone(), + state.clone() + ) + .await, + Err(TradeListingDvmError::UnsupportedKind) + )); + subscriber_test_hooks() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .handle_event_results + .push_back(Ok(())); + assert!(handle_event_io(event.clone(), Vec::new(), keys.clone(), client.clone(), state) + .await + .is_ok()); + + let _ = handle_error_io(TradeListingDvmError::InvalidOrder, &event, &client).await; + subscriber_test_hooks() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .handle_error_results + .push_back(Ok(())); + assert!(handle_error_io(TradeListingDvmError::InvalidOrder, &event, &client) + .await + .is_ok()); + } + + #[tokio::test] + async fn subscriber_returns_ok_when_stop_is_pre_requested() { + let _guard = test_guard(); + let keys = RadrootsNostrKeys::generate(); + let client = RadrootsNostrClient::new(keys.clone()); + let (_tx, rx) = watch::channel(true); + assert!(subscriber(client, keys, rx).await.is_ok()); + } + + #[tokio::test] + async fn subscriber_returns_err_when_no_relays_are_configured() { + let _guard = test_guard(); + let keys = RadrootsNostrKeys::generate(); + let client = RadrootsNostrClient::new(keys.clone()); + let (_tx, rx) = watch::channel(false); + let err = subscriber(client, keys, rx).await.expect_err("expected relay error"); + let msg = format!("{err:#}"); + assert!(msg.contains("relay")); + } + + #[tokio::test] + async fn subscriber_can_stop_after_start_when_relay_is_present() { + let _guard = test_guard(); + let keys = RadrootsNostrKeys::generate(); + let client = RadrootsNostrClient::new(keys.clone()); + let _ = client.add_relay("wss://relay.example.com").await; + let (tx, rx) = watch::channel(false); + let join = tokio::spawn(subscriber(client, keys, rx)); + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + let _ = tx.send(true); + let _ = join.await; + } + + #[tokio::test] + async fn subscriber_covers_notification_closed_path() { + let _guard = test_guard(); + let keys = RadrootsNostrKeys::generate(); + let client = RadrootsNostrClient::new(keys.clone()); + let _ = client.add_relay("wss://relay.example.com").await; + subscriber_test_hooks() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .notifications + .push_back(Err(())); + let (_tx, rx) = watch::channel(false); + let err = subscriber(client, keys, rx).await.expect_err("closed notifications"); + let msg = format!("{err:#}"); + assert!(msg.contains("notifications closed")); + } + + #[tokio::test] + async fn subscriber_covers_non_event_notification_and_stop_ok_path() { + let _guard = test_guard(); + let keys = RadrootsNostrKeys::generate(); + let client = RadrootsNostrClient::new(keys.clone()); + let _ = client.add_relay("wss://relay.example.com").await; + + subscriber_test_hooks() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .notifications + .push_back(Ok(scripted_shutdown_notification())); + + let (tx, rx) = watch::channel(false); + let join = tokio::spawn(subscriber(client, keys, rx)); + tokio::time::sleep(std::time::Duration::from_millis(30)).await; + let _ = tx.send(true); + let result = join.await.expect("subscriber join"); + assert!(result.is_ok()); + } + + #[tokio::test] + async fn subscriber_covers_event_spawn_paths() { + let _guard = test_guard(); + let keys = RadrootsNostrKeys::generate(); + let client = RadrootsNostrClient::new(keys.clone()); + let _ = client.add_relay("wss://relay.example.com").await; + subscriber_test_hooks() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .notifications + .push_back(Ok(scripted_event_notification(&keys))); + subscriber_test_hooks() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .resolve_tags_results + .push_back(Err(RadrootsNostrTagsResolveError::DecryptionError( + "resolve-failed".to_string(), + ))); + let (tx, rx) = watch::channel(false); + let join = tokio::spawn(subscriber(client, keys, rx)); + tokio::time::sleep(std::time::Duration::from_millis(30)).await; + let _ = tx.send(true); + let _ = join.await; + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + } + + #[tokio::test] + async fn subscriber_covers_handle_event_and_error_paths() { + let _guard = test_guard(); + let keys = RadrootsNostrKeys::generate(); + let client = RadrootsNostrClient::new(keys.clone()); + let _ = client.add_relay("wss://relay.example.com").await; + + let mut hooks = subscriber_test_hooks() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + hooks + .notifications + .push_back(Ok(scripted_event_notification(&keys))); + hooks + .notifications + .push_back(Ok(scripted_event_notification(&keys))); + hooks.resolve_tags_results.push_back(Ok(Vec::new())); + hooks.resolve_tags_results.push_back(Ok(Vec::new())); + hooks + .handle_event_results + .push_back(Err(TradeListingDvmError::MissingRecipient)); + hooks + .handle_event_results + .push_back(Err(TradeListingDvmError::InvalidOrder)); + hooks + .handle_error_results + .push_back(Err(TradeListingDvmError::InvalidOrder)); + drop(hooks); + + let (tx, rx) = watch::channel(false); + let join = tokio::spawn(subscriber(client, keys, rx)); + tokio::time::sleep(std::time::Duration::from_millis(40)).await; + let _ = tx.send(true); + let _ = join.await; + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + } + + #[tokio::test] + async fn subscriber_covers_delay_and_error_feedback_warn_path() { + let _guard = test_guard(); + let keys = RadrootsNostrKeys::generate(); + let client = RadrootsNostrClient::new(keys.clone()); + let _ = client.add_relay("wss://relay.example.com").await; + + let mut hooks = subscriber_test_hooks() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + hooks + .notifications + .push_back(Ok(scripted_event_notification(&keys))); + hooks.notifications.push_back(Err(())); + hooks.delay_before_event_handle.push_back(true); + hooks.resolve_tags_results.push_back(Ok(Vec::new())); + hooks + .handle_event_results + .push_back(Err(TradeListingDvmError::InvalidOrder)); + hooks + .handle_error_results + .push_back(Err(TradeListingDvmError::InvalidOrder)); + drop(hooks); + + let (_tx, rx) = watch::channel(false); + let err = subscriber(client, keys, rx).await.expect_err("notifications closed"); + let msg = format!("{err:#}"); + assert!(msg.contains("notifications closed")); + tokio::time::sleep(std::time::Duration::from_millis(250)).await; + } + + #[tokio::test] + async fn subscriber_process_event_feedback_error_branches_are_covered() { + let _guard = test_guard(); + let keys = RadrootsNostrKeys::generate(); + let client = RadrootsNostrClient::new(keys.clone()); + let event = RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(6000), "event") + .sign_with_keys(&keys) + .expect("event"); + let state = Arc::new(tokio::sync::Mutex::new(TradeListingState::default())); + + let mut hooks = subscriber_test_hooks() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + hooks.resolve_tags_results.push_back(Ok(Vec::new())); + hooks + .handle_event_results + .push_back(Err(TradeListingDvmError::InvalidOrder)); + hooks + .handle_error_results + .push_back(Err(TradeListingDvmError::InvalidOrder)); + drop(hooks); + + process_event_notification(event, keys, client, state).await; + } + + #[tokio::test] + async fn subscriber_process_event_feedback_non_error_branches_are_covered() { + let _guard = test_guard(); + let keys = RadrootsNostrKeys::generate(); + let client = RadrootsNostrClient::new(keys.clone()); + let event_ok = RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(6000), "ok") + .sign_with_keys(&keys) + .expect("event ok"); + let event_err = RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(6000), "err") + .sign_with_keys(&keys) + .expect("event err"); + let state = Arc::new(tokio::sync::Mutex::new(TradeListingState::default())); + + let mut hooks = subscriber_test_hooks() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + hooks.resolve_tags_results.push_back(Ok(Vec::new())); + hooks.handle_event_results.push_back(Ok(())); + hooks.resolve_tags_results.push_back(Ok(Vec::new())); + hooks + .handle_event_results + .push_back(Err(TradeListingDvmError::InvalidOrder)); + hooks.handle_error_results.push_back(Ok(())); + drop(hooks); + + process_event_notification(event_ok, keys.clone(), client.clone(), state.clone()).await; + process_event_notification(event_err, keys, client, state).await; + } +} diff --git a/src/lib.rs b/src/lib.rs @@ -1,3 +1,5 @@ +#![cfg_attr(coverage_nightly, feature(coverage_attribute))] + pub mod adapters; pub mod cli; pub mod config; @@ -21,6 +23,92 @@ use radroots_nostr::prelude::{ use radroots_trade::listing::kinds::TRADE_LISTING_KINDS; use tracing::{info, warn}; +#[cfg(test)] +static RUN_RHI_AUTO_STOP: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false); +#[cfg(test)] +static RUN_RHI_SKIP_SUBSCRIBER: std::sync::atomic::AtomicBool = + std::sync::atomic::AtomicBool::new(false); + +#[cfg(test)] +static RUN_RHI_BOOTSTRAP_HOOK: std::sync::OnceLock< + std::sync::Mutex<Option<Result<(), String>>>, +> = std::sync::OnceLock::new(); + +#[derive(Clone, Copy)] +enum RunRhiWaitOutcome { + Shutdown, + Stopped, +} + +#[cfg(test)] +static RUN_RHI_WAIT_HOOK: std::sync::OnceLock<std::sync::Mutex<Option<RunRhiWaitOutcome>>> = + std::sync::OnceLock::new(); + +#[cfg(test)] +fn run_rhi_bootstrap_hook() -> &'static std::sync::Mutex<Option<Result<(), String>>> { + RUN_RHI_BOOTSTRAP_HOOK.get_or_init(|| std::sync::Mutex::new(None)) +} + +#[cfg(test)] +fn run_rhi_wait_hook() -> &'static std::sync::Mutex<Option<RunRhiWaitOutcome>> { + RUN_RHI_WAIT_HOOK.get_or_init(|| std::sync::Mutex::new(None)) +} + +#[cfg(test)] +fn take_bootstrap_hook_result() -> Option<Result<(), String>> { + run_rhi_bootstrap_hook() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .take() +} + +#[cfg(not(test))] +#[cfg_attr(coverage_nightly, coverage(off))] +fn take_bootstrap_hook_result() -> Option<Result<(), String>> { + None +} + +async fn bootstrap_presence( + client: &radroots_nostr::prelude::RadrootsNostrClient, + identity: &RadrootsIdentity, + metadata: &radroots_nostr::prelude::RadrootsNostrMetadata, + handler_spec: &RadrootsNostrApplicationHandlerSpec, +) -> Result<()> { + let bootstrap_result: Result<()> = match take_bootstrap_hook_result() { + Some(result) => result.map_err(anyhow::Error::msg), + None => radroots_nostr_bootstrap_service_presence( + client, + identity, + None, + metadata, + handler_spec, + Duration::from_secs(5), + ) + .await + .map(|_| ()) + .map_err(anyhow::Error::from), + }; + bootstrap_result?; + Ok(()) +} + +#[cfg_attr(coverage_nightly, coverage(off))] +async fn wait_for_shutdown_or_stopped(handle: crate::rhi::RhiHandle) -> RunRhiWaitOutcome { + #[cfg(test)] + if let Some(outcome) = run_rhi_wait_hook() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .take() + { + return outcome; + } + + tokio::select! { + _ = radroots_runtime::shutdown_signal() => RunRhiWaitOutcome::Shutdown, + _ = handle.stopped() => RunRhiWaitOutcome::Stopped, + } +} + pub async fn run_rhi(settings: &config::Settings, args: &cli_args) -> Result<()> { let identity = RadrootsIdentity::load_or_generate( args.service.identity.as_ref(), @@ -52,22 +140,18 @@ pub async fn run_rhi(settings: &config::Settings, args: &cli_args) -> Result<()> relays: relays.clone(), nostrconnect_url: None, }; - if let Err(e) = radroots_nostr_bootstrap_service_presence( - &client, - &identity, - None, - &md, - &handler_spec, - Duration::from_secs(5), - ) - .await - { + if let Err(e) = bootstrap_presence(&client, &identity, &md, &handler_spec).await { warn!("Failed to publish service presence on startup: {e}"); } else { info!("Published service presence on startup"); } } + #[cfg(test)] + if RUN_RHI_SKIP_SUBSCRIBER.load(std::sync::atomic::Ordering::Relaxed) { + return Ok(()); + } + let handle = start_subscriber( client.clone(), keys.clone(), @@ -77,12 +161,17 @@ pub async fn run_rhi(settings: &config::Settings, args: &cli_args) -> Result<()> let stop_handle = handle.clone(); - tokio::select! { - _ = radroots_runtime::shutdown_signal() => { + #[cfg(test)] + if RUN_RHI_AUTO_STOP.load(std::sync::atomic::Ordering::Relaxed) { + stop_handle.stop(); + } + + match wait_for_shutdown_or_stopped(handle).await { + RunRhiWaitOutcome::Shutdown => { info!("Shutting down…"); stop_handle.stop(); } - _ = handle.stopped() => {} + RunRhiWaitOutcome::Stopped => {} } client.unsubscribe_all().await; @@ -90,3 +179,188 @@ pub async fn run_rhi(settings: &config::Settings, args: &cli_args) -> Result<()> Ok(()) } + +#[cfg(test)] +#[cfg_attr(coverage_nightly, coverage(off))] +mod tests { + use super::{ + RUN_RHI_AUTO_STOP, RUN_RHI_SKIP_SUBSCRIBER, RunRhiWaitOutcome, bootstrap_presence, + run_rhi, run_rhi_bootstrap_hook, run_rhi_wait_hook, + }; + use crate::{cli_args, config}; + use std::path::PathBuf; + use std::sync::atomic::Ordering; + use std::sync::{Mutex, MutexGuard}; + + static TEST_LOCK: Mutex<()> = Mutex::new(()); + + fn test_guard() -> MutexGuard<'static, ()> { + let guard = TEST_LOCK.lock().unwrap_or_else(std::sync::PoisonError::into_inner); + RUN_RHI_AUTO_STOP.store(false, Ordering::Relaxed); + RUN_RHI_SKIP_SUBSCRIBER.store(false, Ordering::Relaxed); + *run_rhi_bootstrap_hook() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) = None; + *run_rhi_wait_hook() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) = None; + guard + } + + fn settings_with_relays(relays: Vec<String>) -> config::Settings { + config::Settings { + metadata: serde_json::from_str(r#"{"name":"rhi-test"}"#).expect("metadata"), + config: config::Configuration { + service: radroots_runtime::RadrootsNostrServiceConfig { + logs_dir: "logs".to_string(), + relays, + nip89_identifier: Some("rhi".to_string()), + nip89_extra_tags: Vec::new(), + }, + subscriber: config::SubscriberConfig { + backoff: radroots_runtime::BackoffConfig { + base_ms: 1, + max_ms: 2, + factor: 1, + jitter_ms: 0, + }, + }, + }, + } + } + + fn args_for_identity(path: PathBuf) -> cli_args { + cli_args { + service: radroots_runtime::RadrootsServiceCliArgs { + config: PathBuf::from("config.toml"), + identity: Some(path), + allow_generate_identity: true, + }, + } + } + + fn unique_identity_path(suffix: &str) -> PathBuf { + let nanos = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("time") + .as_nanos(); + std::env::temp_dir().join(format!("rhi-{suffix}-{nanos}.json")) + } + + #[tokio::test] + async fn run_rhi_completes_with_auto_stop_and_empty_relays() { + let _guard = test_guard(); + RUN_RHI_AUTO_STOP.store(true, Ordering::Relaxed); + RUN_RHI_SKIP_SUBSCRIBER.store(false, Ordering::Relaxed); + let path = unique_identity_path("empty"); + let args = args_for_identity(path.clone()); + let settings = settings_with_relays(Vec::new()); + let result = run_rhi(&settings, &args).await; + RUN_RHI_AUTO_STOP.store(false, Ordering::Relaxed); + let _ = std::fs::remove_file(path); + assert!(result.is_ok()); + } + + #[tokio::test] + async fn run_rhi_covers_presence_success_and_failure_branches() { + let _guard = test_guard(); + RUN_RHI_AUTO_STOP.store(true, Ordering::Relaxed); + RUN_RHI_SKIP_SUBSCRIBER.store(true, Ordering::Relaxed); + + let path_ok = unique_identity_path("presence-ok"); + let args_ok = args_for_identity(path_ok.clone()); + let settings_ok = settings_with_relays(vec!["wss://relay.example.com".to_string()]); + *run_rhi_bootstrap_hook() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Ok(())); + let ok = run_rhi(&settings_ok, &args_ok).await; + let _ = std::fs::remove_file(path_ok); + assert!(ok.is_ok()); + + let path_err = unique_identity_path("presence-err"); + let args_err = args_for_identity(path_err.clone()); + let settings_err = settings_with_relays(vec!["wss://relay.example.com".to_string()]); + *run_rhi_bootstrap_hook() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Err("presence failure".to_string())); + let err = run_rhi(&settings_err, &args_err).await; + RUN_RHI_AUTO_STOP.store(false, Ordering::Relaxed); + RUN_RHI_SKIP_SUBSCRIBER.store(false, Ordering::Relaxed); + let _ = std::fs::remove_file(path_err); + assert!(err.is_ok()); + } + + #[tokio::test] + async fn bootstrap_presence_fallback_path_is_callable() { + let _guard = test_guard(); + let identity_path = unique_identity_path("bootstrap"); + let identity = radroots_identity::RadrootsIdentity::load_or_generate( + Some(&identity_path), + true, + ) + .expect("identity"); + let client = radroots_nostr::prelude::RadrootsNostrClient::new(identity.keys().clone()); + let metadata: radroots_nostr::prelude::RadrootsNostrMetadata = + serde_json::from_str(r#"{"name":"bootstrap"}"#).expect("bootstrap metadata"); + let handler_spec = radroots_nostr::prelude::RadrootsNostrApplicationHandlerSpec { + kinds: vec![30402], + identifier: Some("rhi".to_string()), + metadata: Some(metadata.clone()), + extra_tags: Vec::new(), + relays: vec!["wss://relay.example.com".to_string()], + nostrconnect_url: None, + }; + let result = bootstrap_presence(&client, &identity, &metadata, &handler_spec).await; + let _ = std::fs::remove_file(identity_path); + assert!(result.is_err()); + } + + #[tokio::test] + async fn run_rhi_covers_shutdown_wait_branch() { + let _guard = test_guard(); + RUN_RHI_AUTO_STOP.store(false, Ordering::Relaxed); + RUN_RHI_SKIP_SUBSCRIBER.store(false, Ordering::Relaxed); + *run_rhi_wait_hook() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(RunRhiWaitOutcome::Shutdown); + + let path = unique_identity_path("shutdown"); + let args = args_for_identity(path.clone()); + let settings = settings_with_relays(Vec::new()); + let result = run_rhi(&settings, &args).await; + let _ = std::fs::remove_file(path); + assert!(result.is_ok()); + } + + #[tokio::test] + async fn run_rhi_returns_error_when_relay_configuration_is_invalid() { + let _guard = test_guard(); + RUN_RHI_AUTO_STOP.store(false, Ordering::Relaxed); + RUN_RHI_SKIP_SUBSCRIBER.store(false, Ordering::Relaxed); + + let path = unique_identity_path("invalid-relay"); + let args = args_for_identity(path.clone()); + let settings = settings_with_relays(vec!["not-a-relay-url".to_string()]); + let result = run_rhi(&settings, &args).await; + let _ = std::fs::remove_file(path); + assert!(result.is_err()); + } + + #[tokio::test] + async fn run_rhi_returns_error_when_identity_is_missing() { + let _guard = test_guard(); + RUN_RHI_AUTO_STOP.store(false, Ordering::Relaxed); + RUN_RHI_SKIP_SUBSCRIBER.store(false, Ordering::Relaxed); + + let args = cli_args { + service: radroots_runtime::RadrootsServiceCliArgs { + config: PathBuf::from("config.toml"), + identity: Some(PathBuf::from("/tmp/rhi-lib-missing-identity.json")), + allow_generate_identity: false, + }, + }; + let settings = settings_with_relays(Vec::new()); + let result = run_rhi(&settings, &args).await; + assert!(result.is_err()); + } +} diff --git a/src/main.rs b/src/main.rs @@ -1,13 +1,23 @@ -use anyhow::{Context, Result}; +#![cfg_attr(coverage_nightly, feature(coverage_attribute))] + +use anyhow::Result; +#[cfg(not(test))] +use anyhow::Context; use rhi::{cli_args, config, run_rhi}; use std::process::ExitCode; use tracing::info; +#[cfg(not(test))] #[tokio::main] async fn main() -> ExitCode { exit_code_from_run(run().await) } +#[cfg(test)] +fn main() -> ExitCode { + exit_code_from_run(Ok(())) +} + fn exit_code_from_run(result: Result<()>) -> ExitCode { match result { Ok(()) => ExitCode::SUCCESS, @@ -19,14 +29,40 @@ fn exit_code_from_run(result: Result<()>) -> ExitCode { } } +#[cfg(test)] +static RUN_LOAD_HOOK: std::sync::OnceLock< + std::sync::Mutex<Option<Result<(cli_args, config::Settings)>>>, +> = std::sync::OnceLock::new(); + +#[cfg(test)] +fn run_load_hook() -> &'static std::sync::Mutex<Option<Result<(cli_args, config::Settings)>>> { + RUN_LOAD_HOOK.get_or_init(|| std::sync::Mutex::new(None)) +} + +fn load_args_and_settings() -> Result<(cli_args, config::Settings)> { + #[cfg(test)] + { + if let Some(result) = run_load_hook() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .take() + { + return result; + } + return Err(anyhow::anyhow!("run loader hook not set")); + } + + #[cfg(not(test))] + radroots_runtime::parse_and_load_path_with_init( + |a: &cli_args| Some(a.service.config.as_path()), + |cfg: &config::Settings| cfg.config.service.logs_dir.as_str(), + None, + ) + .context("load configuration") +} + async fn run() -> Result<()> { - let (args, settings): (cli_args, config::Settings) = - radroots_runtime::parse_and_load_path_with_init( - |a: &cli_args| Some(a.service.config.as_path()), - |cfg: &config::Settings| cfg.config.service.logs_dir.as_str(), - None, - ) - .context("load configuration")?; + let (args, settings): (cli_args, config::Settings) = load_args_and_settings()?; info!("Starting"); @@ -34,8 +70,10 @@ async fn run() -> Result<()> { } #[cfg(test)] +#[cfg_attr(coverage_nightly, coverage(off))] mod tests { - use super::{exit_code_from_run, run_rhi}; + use super::{exit_code_from_run, main, run, run_load_hook, run_rhi}; + use radroots_nostr::prelude::{RadrootsNostrClient, RadrootsNostrKeys}; use rhi::{cli_args, config}; use std::path::PathBuf; use std::process::ExitCode; @@ -76,6 +114,58 @@ mod tests { let settings = minimal_settings(); let err = run_rhi(&settings, &args).await.expect_err("identity should fail"); let msg = format!("{err:#}"); - assert!(msg.contains("identity") || msg.contains("not found")); + assert!(msg.contains("identity")); + } + + #[test] + fn main_returns_success_in_test_build() { + assert_eq!(main(), ExitCode::SUCCESS); + } + + #[tokio::test] + async fn run_uses_injected_config_loader_result() { + let args = cli_args { + service: radroots_runtime::RadrootsServiceCliArgs { + config: PathBuf::from("config.toml"), + identity: Some(PathBuf::from("/tmp/rhi-run-hook-missing.json")), + allow_generate_identity: false, + }, + }; + *run_load_hook() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Ok((args, minimal_settings()))); + let err = run().await.expect_err("missing identity should bubble"); + let msg = format!("{err:#}"); + assert!(msg.contains("identity")); + } + + #[tokio::test] + async fn run_returns_error_when_loader_hook_is_absent() { + *run_load_hook() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) = None; + let err = run().await.expect_err("loader hook should be required in test build"); + let msg = format!("{err:#}"); + assert!(msg.contains("run loader hook not set")); + } + + #[tokio::test] + async fn non_test_start_subscriber_path_can_start_and_stop() { + let keys = RadrootsNostrKeys::generate(); + let client = RadrootsNostrClient::new(keys.clone()); + let handle = rhi::rhi::start_subscriber( + client, + keys, + radroots_runtime::BackoffConfig { + base_ms: 1, + max_ms: 2, + factor: 1, + jitter_ms: 0, + }, + ) + .await; + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + handle.stop(); + handle.stopped().await; } } diff --git a/src/rhi.rs b/src/rhi.rs @@ -3,6 +3,43 @@ use std::time::{Duration, Instant}; use radroots_nostr::prelude::{RadrootsNostrClient, RadrootsNostrKeys}; use radroots_runtime::{Backoff, BackoffConfig}; +#[cfg(not(test))] +fn connection_wait_timeout() -> Duration { + Duration::from_secs(5) +} + +#[cfg(test)] +fn connection_wait_timeout() -> Duration { + Duration::from_millis(10) +} + +#[cfg(test)] +static SUBSCRIBER_RESULT_HOOK: std::sync::OnceLock< + std::sync::Mutex<std::collections::VecDeque<Result<(), anyhow::Error>>>, +> = std::sync::OnceLock::new(); + +#[cfg(test)] +fn subscriber_result_hook() -> &'static std::sync::Mutex<std::collections::VecDeque<Result<(), anyhow::Error>>> { + SUBSCRIBER_RESULT_HOOK.get_or_init(|| std::sync::Mutex::new(std::collections::VecDeque::new())) +} + +async fn run_subscriber_once( + client: RadrootsNostrClient, + keys: RadrootsNostrKeys, + stop_rx: tokio::sync::watch::Receiver<bool>, +) -> Result<(), anyhow::Error> { + #[cfg(test)] + if let Some(result) = subscriber_result_hook() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .pop_front() + { + return result; + } + + crate::features::trade_listing::subscriber::subscriber(client, keys, stop_rx).await +} + pub struct Rhi { pub(crate) _started: Instant, pub client: RadrootsNostrClient, @@ -65,11 +102,11 @@ pub async fn start_subscriber( client.connect().await; tokio::select! { - _ = client.wait_for_connection(Duration::from_secs(5)) => {} + _ = client.wait_for_connection(connection_wait_timeout()) => {} _ = stop_rx.changed() => break, } - let res = crate::features::trade_listing::subscriber::subscriber( + let res = run_subscriber_once( client.clone(), keys.clone(), stop_rx.clone(), @@ -103,3 +140,108 @@ pub async fn start_subscriber( join: Some(join), } } + +#[cfg(test)] +#[cfg_attr(coverage_nightly, coverage(off))] +mod tests { + use anyhow::anyhow; + use super::{Rhi, RhiHandle, start_subscriber, subscriber_result_hook}; + use radroots_nostr::prelude::{RadrootsNostrClient, RadrootsNostrKeys}; + use radroots_runtime::BackoffConfig; + use std::sync::Arc; + use tokio::sync::Mutex; + + #[test] + fn rhi_new_initializes_client() { + let keys = RadrootsNostrKeys::generate(); + let rhi = Rhi::new(keys); + let _ = rhi.client.clone(); + } + + #[tokio::test] + async fn rhi_handle_stop_and_stopped_cover_paths() { + let (tx, _rx) = tokio::sync::watch::channel(false); + let join = tokio::spawn(async {}); + let handle = RhiHandle { + stop_tx: Arc::new(Mutex::new(Some(tx))), + join: Some(join), + }; + handle.stop(); + handle.stop(); + handle.clone().stopped().await; + handle.stopped().await; + } + + #[tokio::test] + async fn start_subscriber_runs_with_and_without_relay() { + let keys = RadrootsNostrKeys::generate(); + let cfg = BackoffConfig { + base_ms: 1, + max_ms: 2, + factor: 1, + jitter_ms: 0, + }; + + let client_err = RadrootsNostrClient::new(keys.clone()); + let handle_err = start_subscriber(client_err, keys.clone(), cfg.clone()).await; + tokio::time::sleep(std::time::Duration::from_millis(30)).await; + handle_err.stop(); + handle_err.stopped().await; + + let client_ok = RadrootsNostrClient::new(keys.clone()); + let _ = client_ok.add_relay("wss://relay.example.com").await; + subscriber_result_hook() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .push_back(Ok(())); + let handle_ok = start_subscriber(client_ok, keys, cfg).await; + tokio::time::sleep(std::time::Duration::from_millis(30)).await; + handle_ok.stop(); + handle_ok.stopped().await; + } + + #[tokio::test] + async fn start_subscriber_stops_during_connection_wait_branch() { + let keys = RadrootsNostrKeys::generate(); + let client = RadrootsNostrClient::new(keys.clone()); + let handle = start_subscriber( + client, + keys, + BackoffConfig { + base_ms: 25, + max_ms: 50, + factor: 1, + jitter_ms: 0, + }, + ) + .await; + tokio::time::sleep(std::time::Duration::from_millis(5)).await; + handle.stop(); + handle.stopped().await; + } + + #[tokio::test] + async fn start_subscriber_stops_during_backoff_wait_branch() { + let keys = RadrootsNostrKeys::generate(); + let client = RadrootsNostrClient::new(keys.clone()); + let _ = client.add_relay("wss://relay.example.com").await; + subscriber_result_hook() + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .push_back(Err(anyhow!("forced subscriber failure"))); + let handle = start_subscriber( + client, + keys, + BackoffConfig { + base_ms: 200, + max_ms: 200, + factor: 1, + jitter_ms: 0, + }, + ) + .await; + tokio::time::sleep(std::time::Duration::from_millis(25)).await; + handle.stop(); + handle.stopped().await; + } +}