rhi

Coordinated trade for connected markets
git clone https://radroots.dev/git/rhi.git
Log | Files | Refs | README | LICENSE

rhi.rs (10465B)


      1 #![cfg_attr(coverage_nightly, coverage(off))]
      2 
      3 use std::time::{Duration, Instant};
      4 
      5 use radroots_nostr::prelude::{RadrootsNostrClient, RadrootsNostrKeys};
      6 use radroots_runtime::{Backoff, BackoffConfig};
      7 
      8 use crate::features::trade_listing::state::TradeListingRuntime;
      9 use crate::features::trade_validation_receipt::TradeValidationReceiptProverPolicy;
     10 
     11 #[cfg(not(test))]
     12 fn connection_wait_timeout() -> Duration {
     13     Duration::from_secs(5)
     14 }
     15 
     16 #[cfg(test)]
     17 fn connection_wait_timeout() -> Duration {
     18     Duration::from_millis(10)
     19 }
     20 
     21 #[cfg(test)]
     22 static SUBSCRIBER_RESULT_HOOK: std::sync::OnceLock<
     23     std::sync::Mutex<std::collections::VecDeque<Result<(), anyhow::Error>>>,
     24 > = std::sync::OnceLock::new();
     25 
     26 #[cfg(test)]
     27 fn subscriber_result_hook()
     28 -> &'static std::sync::Mutex<std::collections::VecDeque<Result<(), anyhow::Error>>> {
     29     SUBSCRIBER_RESULT_HOOK.get_or_init(|| std::sync::Mutex::new(std::collections::VecDeque::new()))
     30 }
     31 
     32 async fn run_subscriber_once(
     33     client: RadrootsNostrClient,
     34     keys: RadrootsNostrKeys,
     35     runtime: TradeListingRuntime,
     36     proof_policy: TradeValidationReceiptProverPolicy,
     37     stop_rx: tokio::sync::watch::Receiver<bool>,
     38 ) -> Result<(), anyhow::Error> {
     39     #[cfg(test)]
     40     if let Some(result) = subscriber_result_hook()
     41         .lock()
     42         .unwrap_or_else(std::sync::PoisonError::into_inner)
     43         .pop_front()
     44     {
     45         return result;
     46     }
     47 
     48     crate::features::trade_listing::subscriber::subscriber(
     49         client,
     50         keys,
     51         runtime,
     52         proof_policy,
     53         stop_rx,
     54     )
     55     .await
     56 }
     57 
     58 async fn wait_for_connection_or_stop(
     59     client: &RadrootsNostrClient,
     60     stop_rx: &mut tokio::sync::watch::Receiver<bool>,
     61 ) -> bool {
     62     if *stop_rx.borrow() {
     63         return false;
     64     }
     65     tokio::select! {
     66         _ = client.wait_for_connection(connection_wait_timeout()) => true,
     67         _ = stop_rx.changed() => false,
     68     }
     69 }
     70 
     71 pub struct Rhi {
     72     pub(crate) _started: Instant,
     73     pub client: RadrootsNostrClient,
     74     pub(crate) trade_listing_runtime: TradeListingRuntime,
     75     pub(crate) trade_validation_receipt_policy: TradeValidationReceiptProverPolicy,
     76 }
     77 
     78 impl Rhi {
     79     pub fn new(keys: RadrootsNostrKeys) -> Self {
     80         Self::with_trade_listing_runtime(keys, TradeListingRuntime::new())
     81     }
     82 
     83     pub fn with_trade_listing_runtime(
     84         keys: RadrootsNostrKeys,
     85         trade_listing_runtime: TradeListingRuntime,
     86     ) -> Self {
     87         Self::with_trade_listing_runtime_and_policy(
     88             keys,
     89             trade_listing_runtime,
     90             TradeValidationReceiptProverPolicy::default(),
     91         )
     92     }
     93 
     94     pub fn with_trade_listing_runtime_and_policy(
     95         keys: RadrootsNostrKeys,
     96         trade_listing_runtime: TradeListingRuntime,
     97         trade_validation_receipt_policy: TradeValidationReceiptProverPolicy,
     98     ) -> Self {
     99         let client = RadrootsNostrClient::new(keys);
    100         Self {
    101             _started: Instant::now(),
    102             client,
    103             trade_listing_runtime,
    104             trade_validation_receipt_policy,
    105         }
    106     }
    107 }
    108 
    109 use std::sync::Arc;
    110 use tokio::sync::Mutex;
    111 
    112 pub struct RhiHandle {
    113     stop_tx: Arc<Mutex<Option<tokio::sync::watch::Sender<bool>>>>,
    114     join: Option<tokio::task::JoinHandle<()>>,
    115 }
    116 
    117 impl Clone for RhiHandle {
    118     fn clone(&self) -> Self {
    119         Self {
    120             stop_tx: Arc::clone(&self.stop_tx),
    121             join: None, // don’t clone the JoinHandle!
    122         }
    123     }
    124 }
    125 
    126 impl RhiHandle {
    127     pub fn stop(&self) {
    128         if let Some(tx) = self.stop_tx.try_lock().ok().and_then(|mut opt| opt.take()) {
    129             let _ = tx.send(true);
    130         }
    131     }
    132 
    133     pub async fn stopped(mut self) {
    134         if let Some(join) = self.join.take() {
    135             let _ = join.await;
    136         }
    137     }
    138 }
    139 
    140 pub async fn start_subscriber(
    141     client: RadrootsNostrClient,
    142     keys: RadrootsNostrKeys,
    143     runtime: TradeListingRuntime,
    144     backoff_cfg: BackoffConfig,
    145 ) -> RhiHandle {
    146     start_subscriber_with_policy(
    147         client,
    148         keys,
    149         runtime,
    150         TradeValidationReceiptProverPolicy::default(),
    151         backoff_cfg,
    152     )
    153     .await
    154 }
    155 
    156 pub async fn start_subscriber_with_policy(
    157     client: RadrootsNostrClient,
    158     keys: RadrootsNostrKeys,
    159     runtime: TradeListingRuntime,
    160     proof_policy: TradeValidationReceiptProverPolicy,
    161     backoff_cfg: BackoffConfig,
    162 ) -> RhiHandle {
    163     let (stop_tx, mut stop_rx) = tokio::sync::watch::channel(false);
    164 
    165     let join = tokio::spawn(async move {
    166         let mut backoff = Backoff::new(backoff_cfg);
    167         loop {
    168             if *stop_rx.borrow() {
    169                 break;
    170             }
    171 
    172             client.connect().await;
    173             if !wait_for_connection_or_stop(&client, &mut stop_rx).await {
    174                 break;
    175             }
    176 
    177             let res = run_subscriber_once(
    178                 client.clone(),
    179                 keys.clone(),
    180                 runtime.clone(),
    181                 proof_policy.clone(),
    182                 stop_rx.clone(),
    183             )
    184             .await;
    185 
    186             let failed = res.is_err();
    187 
    188             if let Err(e) = res {
    189                 tracing::error!("Error on job request subscription: {e}");
    190             } else {
    191                 backoff.reset();
    192             }
    193 
    194             if *stop_rx.borrow() {
    195                 break;
    196             }
    197 
    198             if failed {
    199                 let delay = backoff.next_delay();
    200                 tokio::select! {
    201                     _ = tokio::time::sleep(delay) => {}
    202                     _ = stop_rx.changed() => break,
    203                 }
    204             }
    205         }
    206     });
    207 
    208     RhiHandle {
    209         stop_tx: Arc::new(Mutex::new(Some(stop_tx))),
    210         join: Some(join),
    211     }
    212 }
    213 
    214 #[cfg(test)]
    215 #[cfg_attr(coverage_nightly, coverage(off))]
    216 mod tests {
    217     use super::{
    218         Rhi, RhiHandle, start_subscriber, subscriber_result_hook, wait_for_connection_or_stop,
    219     };
    220     use crate::features::trade_listing::state::TradeListingRuntime;
    221     use anyhow::anyhow;
    222     use radroots_nostr::prelude::{RadrootsNostrClient, RadrootsNostrKeys};
    223     use radroots_runtime::BackoffConfig;
    224     use std::sync::Arc;
    225     use tokio::sync::Mutex;
    226 
    227     #[tokio::test]
    228     async fn rhi_new_initializes_client_and_runtime() {
    229         let keys = RadrootsNostrKeys::generate();
    230         let rhi = Rhi::new(keys);
    231         let _ = rhi.client.clone();
    232         let state = rhi.trade_listing_runtime.state();
    233         state
    234             .lock()
    235             .await
    236             .mark_listing_validated("addr", "evt-listing-1");
    237         assert!(
    238             rhi.trade_listing_runtime
    239                 .state()
    240                 .lock()
    241                 .await
    242                 .is_listing_validated("addr")
    243         );
    244     }
    245 
    246     #[tokio::test]
    247     async fn rhi_handle_stop_and_stopped_cover_paths() {
    248         let (tx, _rx) = tokio::sync::watch::channel(false);
    249         let join = tokio::spawn(async {});
    250         let handle = RhiHandle {
    251             stop_tx: Arc::new(Mutex::new(Some(tx))),
    252             join: Some(join),
    253         };
    254         handle.stop();
    255         handle.stop();
    256         handle.clone().stopped().await;
    257         handle.stopped().await;
    258     }
    259 
    260     #[tokio::test]
    261     async fn start_subscriber_runs_with_and_without_relay() {
    262         let keys = RadrootsNostrKeys::generate();
    263         let cfg = BackoffConfig {
    264             base_ms: 1,
    265             max_ms: 2,
    266             factor: 1,
    267             jitter_ms: 0,
    268         };
    269 
    270         let client_err = RadrootsNostrClient::new(keys.clone());
    271         let handle_err = start_subscriber(
    272             client_err,
    273             keys.clone(),
    274             TradeListingRuntime::new(),
    275             cfg.clone(),
    276         )
    277         .await;
    278         tokio::time::sleep(std::time::Duration::from_millis(30)).await;
    279         handle_err.stop();
    280         handle_err.stopped().await;
    281 
    282         let client_ok = RadrootsNostrClient::new(keys.clone());
    283         let _ = client_ok.add_relay("wss://relay.example.com").await;
    284         subscriber_result_hook()
    285             .lock()
    286             .unwrap_or_else(std::sync::PoisonError::into_inner)
    287             .push_back(Ok(()));
    288         let handle_ok = start_subscriber(client_ok, keys, TradeListingRuntime::new(), cfg).await;
    289         tokio::time::sleep(std::time::Duration::from_millis(30)).await;
    290         handle_ok.stop();
    291         handle_ok.stopped().await;
    292     }
    293 
    294     #[tokio::test]
    295     async fn start_subscriber_stops_during_connection_wait_branch() {
    296         let keys = RadrootsNostrKeys::generate();
    297         let client = RadrootsNostrClient::new(keys.clone());
    298         let handle = start_subscriber(
    299             client,
    300             keys,
    301             TradeListingRuntime::new(),
    302             BackoffConfig {
    303                 base_ms: 25,
    304                 max_ms: 50,
    305                 factor: 1,
    306                 jitter_ms: 0,
    307             },
    308         )
    309         .await;
    310         tokio::time::sleep(std::time::Duration::from_millis(5)).await;
    311         handle.stop();
    312         handle.stopped().await;
    313     }
    314 
    315     #[tokio::test]
    316     async fn start_subscriber_stops_during_backoff_wait_branch() {
    317         let keys = RadrootsNostrKeys::generate();
    318         let client = RadrootsNostrClient::new(keys.clone());
    319         let _ = client.add_relay("wss://relay.example.com").await;
    320         subscriber_result_hook()
    321             .lock()
    322             .unwrap_or_else(std::sync::PoisonError::into_inner)
    323             .push_back(Err(anyhow!("forced subscriber failure")));
    324         let handle = start_subscriber(
    325             client,
    326             keys,
    327             TradeListingRuntime::new(),
    328             BackoffConfig {
    329                 base_ms: 200,
    330                 max_ms: 200,
    331                 factor: 1,
    332                 jitter_ms: 0,
    333             },
    334         )
    335         .await;
    336         tokio::time::sleep(std::time::Duration::from_millis(25)).await;
    337         handle.stop();
    338         handle.stopped().await;
    339     }
    340 
    341     #[tokio::test]
    342     async fn wait_for_connection_or_stop_covers_both_outcomes() {
    343         let keys = RadrootsNostrKeys::generate();
    344 
    345         let client_stop = RadrootsNostrClient::new(keys.clone());
    346         let (stop_tx, mut stop_rx) = tokio::sync::watch::channel(false);
    347         let _ = stop_tx.send(true);
    348         let stop_branch = wait_for_connection_or_stop(&client_stop, &mut stop_rx).await;
    349         assert!(!stop_branch);
    350 
    351         let client_wait = RadrootsNostrClient::new(keys);
    352         let (_tx, mut rx) = tokio::sync::watch::channel(false);
    353         let wait_branch = wait_for_connection_or_stop(&client_wait, &mut rx).await;
    354         assert!(wait_branch);
    355     }
    356 }