rhi

Coordinated trade for connected markets
git clone https://radroots.dev/git/rhi.git
Log | Files | Refs | README | LICENSE

subscriber.rs (26646B)


      1 #![forbid(unsafe_code)]
      2 #![cfg_attr(coverage_nightly, coverage(off))]
      3 
      4 use std::convert::TryFrom;
      5 use std::time::Duration;
      6 
      7 use anyhow::{Result, anyhow};
      8 use radroots_events::kinds::{
      9     KIND_LISTING, KIND_LISTING_DRAFT, ORDER_EVENT_KINDS, TRADE_VALIDATION_EVENT_KINDS,
     10     is_trade_validation_service_event_kind,
     11 };
     12 use radroots_nostr::prelude::{
     13     RadrootsNostrClient, RadrootsNostrEvent, RadrootsNostrFilter, RadrootsNostrKeys,
     14     RadrootsNostrKind, RadrootsNostrRelayPoolNotification, RadrootsNostrTag,
     15     radroots_nostr_tags_resolve,
     16 };
     17 use tokio::sync::watch;
     18 use tokio::time::sleep;
     19 use tracing::{info, warn};
     20 
     21 use crate::features::trade_listing::{
     22     handlers::dvm::{TradeListingDvmError, handle_error, handle_event_with_policy},
     23     state::{SharedTradeListingState, TradeListingRuntime},
     24 };
     25 use crate::features::trade_validation_receipt::TradeValidationReceiptProverPolicy;
     26 
     27 #[cfg(test)]
     28 #[derive(Default)]
     29 struct SubscriberTestHooks {
     30     notifications: std::collections::VecDeque<Result<RadrootsNostrRelayPoolNotification, ()>>,
     31     delay_before_event_handle: std::collections::VecDeque<bool>,
     32     resolve_tags_results: std::collections::VecDeque<
     33         Result<Vec<RadrootsNostrTag>, radroots_nostr::error::RadrootsNostrTagsResolveError>,
     34     >,
     35     handle_event_results: std::collections::VecDeque<Result<(), TradeListingDvmError>>,
     36     handle_error_results: std::collections::VecDeque<Result<(), TradeListingDvmError>>,
     37 }
     38 
     39 #[cfg(test)]
     40 static SUBSCRIBER_TEST_HOOKS: std::sync::OnceLock<std::sync::Mutex<SubscriberTestHooks>> =
     41     std::sync::OnceLock::new();
     42 
     43 #[cfg(test)]
     44 fn subscriber_test_hooks() -> &'static std::sync::Mutex<SubscriberTestHooks> {
     45     SUBSCRIBER_TEST_HOOKS.get_or_init(|| std::sync::Mutex::new(SubscriberTestHooks::default()))
     46 }
     47 
     48 #[cfg(test)]
     49 fn pop_notification_hook() -> Option<Result<RadrootsNostrRelayPoolNotification, ()>> {
     50     subscriber_test_hooks()
     51         .lock()
     52         .unwrap_or_else(std::sync::PoisonError::into_inner)
     53         .notifications
     54         .pop_front()
     55 }
     56 
     57 #[cfg(test)]
     58 fn pop_delay_hook() -> Option<bool> {
     59     subscriber_test_hooks()
     60         .lock()
     61         .unwrap_or_else(std::sync::PoisonError::into_inner)
     62         .delay_before_event_handle
     63         .pop_front()
     64 }
     65 
     66 #[cfg(test)]
     67 fn take_delay_hook() -> Option<bool> {
     68     pop_delay_hook()
     69 }
     70 
     71 #[cfg(not(test))]
     72 #[cfg_attr(coverage_nightly, coverage(off))]
     73 fn take_delay_hook() -> Option<bool> {
     74     None
     75 }
     76 
     77 #[cfg(test)]
     78 fn pop_resolve_tags_hook()
     79 -> Option<Result<Vec<RadrootsNostrTag>, radroots_nostr::error::RadrootsNostrTagsResolveError>> {
     80     subscriber_test_hooks()
     81         .lock()
     82         .unwrap_or_else(std::sync::PoisonError::into_inner)
     83         .resolve_tags_results
     84         .pop_front()
     85 }
     86 
     87 #[cfg(test)]
     88 fn take_resolve_tags_hook()
     89 -> Option<Result<Vec<RadrootsNostrTag>, radroots_nostr::error::RadrootsNostrTagsResolveError>> {
     90     pop_resolve_tags_hook()
     91 }
     92 
     93 #[cfg(not(test))]
     94 #[cfg_attr(coverage_nightly, coverage(off))]
     95 fn take_resolve_tags_hook()
     96 -> Option<Result<Vec<RadrootsNostrTag>, radroots_nostr::error::RadrootsNostrTagsResolveError>> {
     97     None
     98 }
     99 
    100 #[cfg(test)]
    101 fn pop_handle_event_hook() -> Option<Result<(), TradeListingDvmError>> {
    102     subscriber_test_hooks()
    103         .lock()
    104         .unwrap_or_else(std::sync::PoisonError::into_inner)
    105         .handle_event_results
    106         .pop_front()
    107 }
    108 
    109 #[cfg(test)]
    110 fn take_handle_event_hook() -> Option<Result<(), TradeListingDvmError>> {
    111     pop_handle_event_hook()
    112 }
    113 
    114 #[cfg(not(test))]
    115 #[cfg_attr(coverage_nightly, coverage(off))]
    116 fn take_handle_event_hook() -> Option<Result<(), TradeListingDvmError>> {
    117     None
    118 }
    119 
    120 #[cfg(test)]
    121 fn pop_handle_error_hook() -> Option<Result<(), TradeListingDvmError>> {
    122     subscriber_test_hooks()
    123         .lock()
    124         .unwrap_or_else(std::sync::PoisonError::into_inner)
    125         .handle_error_results
    126         .pop_front()
    127 }
    128 
    129 #[cfg(test)]
    130 fn take_handle_error_hook() -> Option<Result<(), TradeListingDvmError>> {
    131     pop_handle_error_hook()
    132 }
    133 
    134 #[cfg(not(test))]
    135 #[cfg_attr(coverage_nightly, coverage(off))]
    136 fn take_handle_error_hook() -> Option<Result<(), TradeListingDvmError>> {
    137     None
    138 }
    139 
    140 fn resolve_tags_io(
    141     event: &RadrootsNostrEvent,
    142     keys: &RadrootsNostrKeys,
    143 ) -> Result<Vec<RadrootsNostrTag>, radroots_nostr::error::RadrootsNostrTagsResolveError> {
    144     let resolved = match take_resolve_tags_hook() {
    145         Some(result) => result?,
    146         None => return radroots_nostr_tags_resolve(event, keys),
    147     };
    148     Ok(resolved)
    149 }
    150 
    151 fn map_notification_recv_result(
    152     result: Result<RadrootsNostrRelayPoolNotification, tokio::sync::broadcast::error::RecvError>,
    153 ) -> Result<RadrootsNostrRelayPoolNotification, ()> {
    154     result.map_err(|_| ())
    155 }
    156 
    157 async fn handle_event_io(
    158     event: RadrootsNostrEvent,
    159     resolved_tags: Vec<RadrootsNostrTag>,
    160     keys: RadrootsNostrKeys,
    161     client: RadrootsNostrClient,
    162     state: SharedTradeListingState,
    163     proof_policy: TradeValidationReceiptProverPolicy,
    164 ) -> Result<(), TradeListingDvmError> {
    165     let result = match take_handle_event_hook() {
    166         Some(result) => result,
    167         None => {
    168             handle_event_with_policy(event, resolved_tags, keys, client, state, &proof_policy).await
    169         }
    170     };
    171     result?;
    172     Ok(())
    173 }
    174 
    175 async fn handle_error_io(
    176     err: TradeListingDvmError,
    177     event: &RadrootsNostrEvent,
    178     client: &RadrootsNostrClient,
    179 ) -> Result<(), TradeListingDvmError> {
    180     let result = match take_handle_error_hook() {
    181         Some(result) => result,
    182         None => handle_error(err, event, client).await,
    183     };
    184     result?;
    185     Ok(())
    186 }
    187 
    188 fn should_delay_before_event_handle() -> bool {
    189     if let Some(delay) = take_delay_hook() {
    190         return delay;
    191     }
    192     cfg!(all(debug_assertions, not(test)))
    193 }
    194 
    195 #[cfg_attr(all(not(test), coverage_nightly), coverage(off))]
    196 async fn process_event_notification(
    197     event: RadrootsNostrEvent,
    198     keys: RadrootsNostrKeys,
    199     client: RadrootsNostrClient,
    200     runtime: TradeListingRuntime,
    201     proof_policy: TradeValidationReceiptProverPolicy,
    202 ) -> Result<()> {
    203     let created_at = u32::try_from(event.created_at.as_secs()).unwrap_or(u32::MAX);
    204     if should_delay_before_event_handle() {
    205         sleep(Duration::from_millis(200)).await;
    206     }
    207 
    208     let resolved_tags = match resolve_tags_io(&event, &keys) {
    209         Ok(tags) => tags,
    210         Err(err) => {
    211             warn!("trade_listing: failed to resolve tags: {err}");
    212             return Ok(());
    213         }
    214     };
    215 
    216     let state = runtime.state();
    217     let event_kind = match event.kind {
    218         RadrootsNostrKind::Custom(v) => Some(u32::from(v)),
    219         _ => None,
    220     };
    221     if let Err(err) = handle_event_io(
    222         event.clone(),
    223         resolved_tags,
    224         keys,
    225         client.clone(),
    226         state.clone(),
    227         proof_policy,
    228     )
    229     .await
    230     {
    231         match err {
    232             TradeListingDvmError::MissingRecipient | TradeListingDvmError::UnsupportedKind => {}
    233             other => {
    234                 if event_kind.is_some_and(is_trade_validation_service_event_kind) {
    235                     if let Err(err) = handle_error_io(other, &event, &client).await {
    236                         warn!("trade_listing: failed to send error feedback: {err}");
    237                     }
    238                 } else {
    239                     warn!("trade_listing: rejected public trade event: {other}");
    240                 }
    241                 runtime.mark_processed_event(created_at).await?;
    242             }
    243         }
    244         return Ok(());
    245     }
    246 
    247     runtime.mark_processed_event(created_at).await?;
    248     Ok(())
    249 }
    250 
    251 async fn dispatch_event_processing(
    252     event: RadrootsNostrEvent,
    253     keys: RadrootsNostrKeys,
    254     client: RadrootsNostrClient,
    255     runtime: TradeListingRuntime,
    256     proof_policy: TradeValidationReceiptProverPolicy,
    257 ) -> Result<()> {
    258     process_event_notification(event, keys, client, runtime, proof_policy).await
    259 }
    260 
    261 pub async fn subscriber(
    262     client: RadrootsNostrClient,
    263     keys: RadrootsNostrKeys,
    264     runtime: TradeListingRuntime,
    265     proof_policy: TradeValidationReceiptProverPolicy,
    266     mut stop_rx: watch::Receiver<bool>,
    267 ) -> Result<()> {
    268     let subscribed_kinds = [KIND_LISTING, KIND_LISTING_DRAFT]
    269         .into_iter()
    270         .chain(ORDER_EVENT_KINDS)
    271         .chain(TRADE_VALIDATION_EVENT_KINDS)
    272         .collect::<Vec<_>>();
    273     info!(
    274         "Starting subscriber for trade listing, order, and trade validation kinds: {:?}",
    275         subscribed_kinds
    276     );
    277 
    278     let kinds: Vec<RadrootsNostrKind> = subscribed_kinds
    279         .iter()
    280         .map(|kind| u16::try_from(*kind).expect("trade listing kinds fit in nostr custom range"))
    281         .map(RadrootsNostrKind::Custom)
    282         .collect();
    283     let filter: RadrootsNostrFilter = runtime.recovery_filter(kinds).await;
    284 
    285     if *stop_rx.borrow() {
    286         return Ok(());
    287     }
    288 
    289     let subscription = client.subscribe(filter, None).await?;
    290     let mut notifications = client.notifications();
    291 
    292     let mut notifications_closed = false;
    293 
    294     loop {
    295         tokio::select! {
    296             _ = stop_rx.changed() => {
    297                 break;
    298             }
    299             msg = async {
    300                 #[cfg(test)]
    301                 if let Some(result) = pop_notification_hook() {
    302                     return result;
    303                 }
    304                 map_notification_recv_result(notifications.recv().await)
    305             } => {
    306                 let n = match msg {
    307                     Ok(n) => n,
    308                     Err(_) => {
    309                         notifications_closed = true;
    310                         break;
    311                     }
    312                 };
    313 
    314                 if let RadrootsNostrRelayPoolNotification::Event { event, .. } = n {
    315                     let event = (*event).clone();
    316                     let keys = keys.clone();
    317                     let client = client.clone();
    318                     let runtime = runtime.clone();
    319                     let proof_policy = proof_policy.clone();
    320                     dispatch_event_processing(event, keys, client, runtime, proof_policy).await?;
    321                 }
    322             }
    323         }
    324     }
    325 
    326     client.unsubscribe(&subscription.val).await;
    327     if notifications_closed {
    328         return Err(anyhow!("trade_listing subscriber notifications closed"));
    329     }
    330     Ok(())
    331 }
    332 
    333 #[cfg(test)]
    334 #[cfg_attr(coverage_nightly, coverage(off))]
    335 mod tests {
    336     use super::{
    337         SubscriberTestHooks, handle_error_io, handle_event_io, map_notification_recv_result,
    338         process_event_notification, resolve_tags_io, subscriber, subscriber_test_hooks,
    339     };
    340     use crate::features::trade_listing::handlers::dvm::TradeListingDvmError;
    341     use crate::features::trade_listing::state::TradeListingRuntime;
    342     use crate::features::trade_validation_receipt::TradeValidationReceiptProverPolicy;
    343     use radroots_nostr::error::RadrootsNostrTagsResolveError;
    344     use radroots_nostr::prelude::{
    345         RadrootsNostrClient, RadrootsNostrEventBuilder, RadrootsNostrKeys, RadrootsNostrKind,
    346         RadrootsNostrRelayPoolNotification, RadrootsNostrRelayUrl, RadrootsNostrSubscriptionId,
    347         RadrootsNostrTag,
    348     };
    349     use tokio::sync::{Mutex, MutexGuard, watch};
    350 
    351     static TEST_LOCK: Mutex<()> = Mutex::const_new(());
    352 
    353     async fn test_guard() -> MutexGuard<'static, ()> {
    354         let guard = TEST_LOCK.lock().await;
    355         *subscriber_test_hooks()
    356             .lock()
    357             .unwrap_or_else(std::sync::PoisonError::into_inner) = SubscriberTestHooks::default();
    358         guard
    359     }
    360 
    361     fn scripted_event_notification(keys: &RadrootsNostrKeys) -> RadrootsNostrRelayPoolNotification {
    362         let event = RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(6000), "test")
    363             .sign_with_keys(keys)
    364             .expect("event");
    365         RadrootsNostrRelayPoolNotification::Event {
    366             relay_url: RadrootsNostrRelayUrl::parse("wss://relay.example.com").expect("relay"),
    367             subscription_id: RadrootsNostrSubscriptionId::new("sub-1"),
    368             event: Box::new(event),
    369         }
    370     }
    371 
    372     fn scripted_shutdown_notification() -> RadrootsNostrRelayPoolNotification {
    373         RadrootsNostrRelayPoolNotification::Shutdown
    374     }
    375 
    376     fn shared_runtime() -> TradeListingRuntime {
    377         TradeListingRuntime::new()
    378     }
    379 
    380     fn proof_policy() -> TradeValidationReceiptProverPolicy {
    381         TradeValidationReceiptProverPolicy::default()
    382     }
    383 
    384     #[test]
    385     fn notification_recv_result_mapping_covers_ok_and_err() {
    386         let keys = RadrootsNostrKeys::generate();
    387         assert!(map_notification_recv_result(Ok(scripted_event_notification(&keys))).is_ok());
    388         assert!(
    389             map_notification_recv_result(Err(tokio::sync::broadcast::error::RecvError::Closed))
    390                 .is_err()
    391         );
    392     }
    393 
    394     #[tokio::test]
    395     async fn subscriber_io_wrappers_cover_fallback_and_hook_paths() {
    396         let _guard = test_guard().await;
    397         let keys = RadrootsNostrKeys::generate();
    398         let client = RadrootsNostrClient::new(keys.clone());
    399         let event = RadrootsNostrEventBuilder::new(RadrootsNostrKind::TextNote, "test")
    400             .sign_with_keys(&keys)
    401             .expect("event");
    402 
    403         let _ = resolve_tags_io(&event, &keys);
    404         subscriber_test_hooks()
    405             .lock()
    406             .unwrap_or_else(std::sync::PoisonError::into_inner)
    407             .resolve_tags_results
    408             .push_back(Ok(Vec::<RadrootsNostrTag>::new()));
    409         assert!(resolve_tags_io(&event, &keys).is_ok());
    410 
    411         let runtime = shared_runtime();
    412         let state = runtime.state();
    413         assert!(matches!(
    414             handle_event_io(
    415                 event.clone(),
    416                 Vec::new(),
    417                 keys.clone(),
    418                 client.clone(),
    419                 state.clone(),
    420                 proof_policy()
    421             )
    422             .await,
    423             Err(TradeListingDvmError::UnsupportedKind)
    424         ));
    425         subscriber_test_hooks()
    426             .lock()
    427             .unwrap_or_else(std::sync::PoisonError::into_inner)
    428             .handle_event_results
    429             .push_back(Ok(()));
    430         assert!(
    431             handle_event_io(
    432                 event.clone(),
    433                 Vec::new(),
    434                 keys.clone(),
    435                 client.clone(),
    436                 state,
    437                 proof_policy()
    438             )
    439             .await
    440             .is_ok()
    441         );
    442 
    443         let _ = handle_error_io(TradeListingDvmError::InvalidOrder, &event, &client).await;
    444         subscriber_test_hooks()
    445             .lock()
    446             .unwrap_or_else(std::sync::PoisonError::into_inner)
    447             .handle_error_results
    448             .push_back(Ok(()));
    449         assert!(
    450             handle_error_io(TradeListingDvmError::InvalidOrder, &event, &client)
    451                 .await
    452                 .is_ok()
    453         );
    454     }
    455 
    456     #[tokio::test]
    457     async fn subscriber_returns_ok_when_stop_is_pre_requested() {
    458         let _guard = test_guard().await;
    459         let keys = RadrootsNostrKeys::generate();
    460         let client = RadrootsNostrClient::new(keys.clone());
    461         let (_tx, rx) = watch::channel(true);
    462         assert!(
    463             subscriber(client, keys, shared_runtime(), proof_policy(), rx)
    464                 .await
    465                 .is_ok()
    466         );
    467     }
    468 
    469     #[tokio::test]
    470     async fn subscriber_reuses_runtime_owned_state_across_runs() {
    471         let _guard = test_guard().await;
    472         let keys = RadrootsNostrKeys::generate();
    473         let client = RadrootsNostrClient::new(keys.clone());
    474         let runtime = shared_runtime();
    475         let state = runtime.state();
    476         state
    477             .lock()
    478             .await
    479             .mark_listing_validated("addr", "evt-listing-1");
    480 
    481         let (_tx_first, rx_first) = watch::channel(true);
    482         assert!(
    483             subscriber(
    484                 client.clone(),
    485                 keys.clone(),
    486                 runtime.clone(),
    487                 proof_policy(),
    488                 rx_first
    489             )
    490             .await
    491             .is_ok()
    492         );
    493         assert!(state.lock().await.is_listing_validated("addr"));
    494 
    495         let (_tx_second, rx_second) = watch::channel(true);
    496         assert!(
    497             subscriber(client, keys, runtime.clone(), proof_policy(), rx_second)
    498                 .await
    499                 .is_ok()
    500         );
    501         assert!(state.lock().await.is_listing_validated("addr"));
    502     }
    503 
    504     #[tokio::test]
    505     async fn subscriber_returns_err_when_no_relays_are_configured() {
    506         let _guard = test_guard().await;
    507         let keys = RadrootsNostrKeys::generate();
    508         let client = RadrootsNostrClient::new(keys.clone());
    509         let (_tx, rx) = watch::channel(false);
    510         let err = subscriber(client, keys, shared_runtime(), proof_policy(), rx)
    511             .await
    512             .expect_err("expected relay error");
    513         let msg = format!("{err:#}");
    514         assert!(msg.contains("relay"));
    515     }
    516 
    517     #[tokio::test]
    518     async fn subscriber_can_stop_after_start_when_relay_is_present() {
    519         let _guard = test_guard().await;
    520         let keys = RadrootsNostrKeys::generate();
    521         let client = RadrootsNostrClient::new(keys.clone());
    522         let _ = client.add_relay("wss://relay.example.com").await;
    523         let (tx, rx) = watch::channel(false);
    524         let join = tokio::spawn(subscriber(
    525             client,
    526             keys,
    527             shared_runtime(),
    528             proof_policy(),
    529             rx,
    530         ));
    531         tokio::time::sleep(std::time::Duration::from_millis(20)).await;
    532         let _ = tx.send(true);
    533         let _ = join.await;
    534     }
    535 
    536     #[tokio::test]
    537     async fn subscriber_covers_notification_closed_path() {
    538         let _guard = test_guard().await;
    539         let keys = RadrootsNostrKeys::generate();
    540         let client = RadrootsNostrClient::new(keys.clone());
    541         let _ = client.add_relay("wss://relay.example.com").await;
    542         subscriber_test_hooks()
    543             .lock()
    544             .unwrap_or_else(std::sync::PoisonError::into_inner)
    545             .notifications
    546             .push_back(Err(()));
    547         let (_tx, rx) = watch::channel(false);
    548         let err = subscriber(client, keys, shared_runtime(), proof_policy(), rx)
    549             .await
    550             .expect_err("closed notifications");
    551         let msg = format!("{err:#}");
    552         assert!(msg.contains("notifications closed"));
    553     }
    554 
    555     #[tokio::test]
    556     async fn subscriber_covers_non_event_notification_and_stop_ok_path() {
    557         let _guard = test_guard().await;
    558         let keys = RadrootsNostrKeys::generate();
    559         let client = RadrootsNostrClient::new(keys.clone());
    560         let _ = client.add_relay("wss://relay.example.com").await;
    561 
    562         subscriber_test_hooks()
    563             .lock()
    564             .unwrap_or_else(std::sync::PoisonError::into_inner)
    565             .notifications
    566             .push_back(Ok(scripted_shutdown_notification()));
    567 
    568         let (tx, rx) = watch::channel(false);
    569         let join = tokio::spawn(subscriber(
    570             client,
    571             keys,
    572             shared_runtime(),
    573             proof_policy(),
    574             rx,
    575         ));
    576         tokio::time::sleep(std::time::Duration::from_millis(30)).await;
    577         let _ = tx.send(true);
    578         let result = join.await.expect("subscriber join");
    579         assert!(result.is_ok());
    580     }
    581 
    582     #[tokio::test]
    583     async fn subscriber_covers_event_processing_paths() {
    584         let _guard = test_guard().await;
    585         let keys = RadrootsNostrKeys::generate();
    586         let client = RadrootsNostrClient::new(keys.clone());
    587         let _ = client.add_relay("wss://relay.example.com").await;
    588         subscriber_test_hooks()
    589             .lock()
    590             .unwrap_or_else(std::sync::PoisonError::into_inner)
    591             .notifications
    592             .push_back(Ok(scripted_event_notification(&keys)));
    593         subscriber_test_hooks()
    594             .lock()
    595             .unwrap_or_else(std::sync::PoisonError::into_inner)
    596             .resolve_tags_results
    597             .push_back(Err(RadrootsNostrTagsResolveError::DecryptionError(
    598                 "resolve-failed".to_string(),
    599             )));
    600         let (tx, rx) = watch::channel(false);
    601         let join = tokio::spawn(subscriber(
    602             client,
    603             keys,
    604             shared_runtime(),
    605             proof_policy(),
    606             rx,
    607         ));
    608         tokio::time::sleep(std::time::Duration::from_millis(30)).await;
    609         let _ = tx.send(true);
    610         let _ = join.await;
    611     }
    612 
    613     #[tokio::test]
    614     async fn subscriber_covers_handle_event_and_error_paths() {
    615         let _guard = test_guard().await;
    616         let keys = RadrootsNostrKeys::generate();
    617         let client = RadrootsNostrClient::new(keys.clone());
    618         let _ = client.add_relay("wss://relay.example.com").await;
    619 
    620         let mut hooks = subscriber_test_hooks()
    621             .lock()
    622             .unwrap_or_else(std::sync::PoisonError::into_inner);
    623         hooks
    624             .notifications
    625             .push_back(Ok(scripted_event_notification(&keys)));
    626         hooks
    627             .notifications
    628             .push_back(Ok(scripted_event_notification(&keys)));
    629         hooks.resolve_tags_results.push_back(Ok(Vec::new()));
    630         hooks.resolve_tags_results.push_back(Ok(Vec::new()));
    631         hooks
    632             .handle_event_results
    633             .push_back(Err(TradeListingDvmError::MissingRecipient));
    634         hooks
    635             .handle_event_results
    636             .push_back(Err(TradeListingDvmError::InvalidOrder));
    637         hooks
    638             .handle_error_results
    639             .push_back(Err(TradeListingDvmError::InvalidOrder));
    640         drop(hooks);
    641 
    642         let (tx, rx) = watch::channel(false);
    643         let join = tokio::spawn(subscriber(
    644             client,
    645             keys,
    646             shared_runtime(),
    647             proof_policy(),
    648             rx,
    649         ));
    650         tokio::time::sleep(std::time::Duration::from_millis(40)).await;
    651         let _ = tx.send(true);
    652         let _ = join.await;
    653     }
    654 
    655     #[tokio::test]
    656     async fn subscriber_covers_delay_and_error_feedback_warn_path() {
    657         let _guard = test_guard().await;
    658         let keys = RadrootsNostrKeys::generate();
    659         let client = RadrootsNostrClient::new(keys.clone());
    660         let _ = client.add_relay("wss://relay.example.com").await;
    661 
    662         let mut hooks = subscriber_test_hooks()
    663             .lock()
    664             .unwrap_or_else(std::sync::PoisonError::into_inner);
    665         hooks
    666             .notifications
    667             .push_back(Ok(scripted_event_notification(&keys)));
    668         hooks.notifications.push_back(Err(()));
    669         hooks.delay_before_event_handle.push_back(true);
    670         hooks.resolve_tags_results.push_back(Ok(Vec::new()));
    671         hooks
    672             .handle_event_results
    673             .push_back(Err(TradeListingDvmError::InvalidOrder));
    674         hooks
    675             .handle_error_results
    676             .push_back(Err(TradeListingDvmError::InvalidOrder));
    677         drop(hooks);
    678 
    679         let (_tx, rx) = watch::channel(false);
    680         let err = subscriber(client, keys, shared_runtime(), proof_policy(), rx)
    681             .await
    682             .expect_err("notifications closed");
    683         let msg = format!("{err:#}");
    684         assert!(msg.contains("notifications closed"));
    685     }
    686 
    687     #[tokio::test]
    688     async fn handled_domain_errors_advance_replay_anchor() {
    689         let _guard = test_guard().await;
    690         let keys = RadrootsNostrKeys::generate();
    691         let client = RadrootsNostrClient::new(keys.clone());
    692         let runtime = shared_runtime();
    693         let event = RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(6000), "test")
    694             .custom_created_at(1_234_u64.into())
    695             .sign_with_keys(&keys)
    696             .expect("event");
    697 
    698         let mut hooks = subscriber_test_hooks()
    699             .lock()
    700             .unwrap_or_else(std::sync::PoisonError::into_inner);
    701         hooks.resolve_tags_results.push_back(Ok(Vec::new()));
    702         hooks
    703             .handle_event_results
    704             .push_back(Err(TradeListingDvmError::InvalidOrder));
    705         hooks.handle_error_results.push_back(Ok(()));
    706         drop(hooks);
    707 
    708         process_event_notification(event, keys, client, runtime.clone(), proof_policy())
    709             .await
    710             .expect("notification");
    711 
    712         assert_eq!(
    713             runtime.state().lock().await.last_event_created_at(),
    714             Some(1_234)
    715         );
    716     }
    717 
    718     #[tokio::test]
    719     async fn subscriber_process_event_feedback_error_branches_are_covered() {
    720         let _guard = test_guard().await;
    721         let keys = RadrootsNostrKeys::generate();
    722         let client = RadrootsNostrClient::new(keys.clone());
    723         let event = RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(6000), "event")
    724             .sign_with_keys(&keys)
    725             .expect("event");
    726         let runtime = shared_runtime();
    727 
    728         let mut hooks = subscriber_test_hooks()
    729             .lock()
    730             .unwrap_or_else(std::sync::PoisonError::into_inner);
    731         hooks.resolve_tags_results.push_back(Ok(Vec::new()));
    732         hooks
    733             .handle_event_results
    734             .push_back(Err(TradeListingDvmError::InvalidOrder));
    735         hooks
    736             .handle_error_results
    737             .push_back(Err(TradeListingDvmError::InvalidOrder));
    738         drop(hooks);
    739 
    740         process_event_notification(event, keys, client, runtime, proof_policy())
    741             .await
    742             .expect("processing");
    743     }
    744 
    745     #[tokio::test]
    746     async fn subscriber_process_event_feedback_non_error_branches_are_covered() {
    747         let _guard = test_guard().await;
    748         let keys = RadrootsNostrKeys::generate();
    749         let client = RadrootsNostrClient::new(keys.clone());
    750         let event_ok = RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(6000), "ok")
    751             .sign_with_keys(&keys)
    752             .expect("event ok");
    753         let event_err = RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(6000), "err")
    754             .sign_with_keys(&keys)
    755             .expect("event err");
    756         let runtime = shared_runtime();
    757 
    758         let mut hooks = subscriber_test_hooks()
    759             .lock()
    760             .unwrap_or_else(std::sync::PoisonError::into_inner);
    761         hooks.resolve_tags_results.push_back(Ok(Vec::new()));
    762         hooks.handle_event_results.push_back(Ok(()));
    763         hooks.resolve_tags_results.push_back(Ok(Vec::new()));
    764         hooks
    765             .handle_event_results
    766             .push_back(Err(TradeListingDvmError::InvalidOrder));
    767         hooks.handle_error_results.push_back(Ok(()));
    768         drop(hooks);
    769 
    770         process_event_notification(
    771             event_ok,
    772             keys.clone(),
    773             client.clone(),
    774             runtime.clone(),
    775             proof_policy(),
    776         )
    777         .await
    778         .expect("ok path");
    779         process_event_notification(event_err, keys, client, runtime, proof_policy())
    780             .await
    781             .expect("error path");
    782     }
    783 }