rhi.rs (10465B)
1 #![cfg_attr(coverage_nightly, coverage(off))] 2 3 use std::time::{Duration, Instant}; 4 5 use radroots_nostr::prelude::{RadrootsNostrClient, RadrootsNostrKeys}; 6 use radroots_runtime::{Backoff, BackoffConfig}; 7 8 use crate::features::trade_listing::state::TradeListingRuntime; 9 use crate::features::trade_validation_receipt::TradeValidationReceiptProverPolicy; 10 11 #[cfg(not(test))] 12 fn connection_wait_timeout() -> Duration { 13 Duration::from_secs(5) 14 } 15 16 #[cfg(test)] 17 fn connection_wait_timeout() -> Duration { 18 Duration::from_millis(10) 19 } 20 21 #[cfg(test)] 22 static SUBSCRIBER_RESULT_HOOK: std::sync::OnceLock< 23 std::sync::Mutex<std::collections::VecDeque<Result<(), anyhow::Error>>>, 24 > = std::sync::OnceLock::new(); 25 26 #[cfg(test)] 27 fn subscriber_result_hook() 28 -> &'static std::sync::Mutex<std::collections::VecDeque<Result<(), anyhow::Error>>> { 29 SUBSCRIBER_RESULT_HOOK.get_or_init(|| std::sync::Mutex::new(std::collections::VecDeque::new())) 30 } 31 32 async fn run_subscriber_once( 33 client: RadrootsNostrClient, 34 keys: RadrootsNostrKeys, 35 runtime: TradeListingRuntime, 36 proof_policy: TradeValidationReceiptProverPolicy, 37 stop_rx: tokio::sync::watch::Receiver<bool>, 38 ) -> Result<(), anyhow::Error> { 39 #[cfg(test)] 40 if let Some(result) = subscriber_result_hook() 41 .lock() 42 .unwrap_or_else(std::sync::PoisonError::into_inner) 43 .pop_front() 44 { 45 return result; 46 } 47 48 crate::features::trade_listing::subscriber::subscriber( 49 client, 50 keys, 51 runtime, 52 proof_policy, 53 stop_rx, 54 ) 55 .await 56 } 57 58 async fn wait_for_connection_or_stop( 59 client: &RadrootsNostrClient, 60 stop_rx: &mut tokio::sync::watch::Receiver<bool>, 61 ) -> bool { 62 if *stop_rx.borrow() { 63 return false; 64 } 65 tokio::select! { 66 _ = client.wait_for_connection(connection_wait_timeout()) => true, 67 _ = stop_rx.changed() => false, 68 } 69 } 70 71 pub struct Rhi { 72 pub(crate) _started: Instant, 73 pub client: RadrootsNostrClient, 74 pub(crate) trade_listing_runtime: TradeListingRuntime, 75 pub(crate) trade_validation_receipt_policy: TradeValidationReceiptProverPolicy, 76 } 77 78 impl Rhi { 79 pub fn new(keys: RadrootsNostrKeys) -> Self { 80 Self::with_trade_listing_runtime(keys, TradeListingRuntime::new()) 81 } 82 83 pub fn with_trade_listing_runtime( 84 keys: RadrootsNostrKeys, 85 trade_listing_runtime: TradeListingRuntime, 86 ) -> Self { 87 Self::with_trade_listing_runtime_and_policy( 88 keys, 89 trade_listing_runtime, 90 TradeValidationReceiptProverPolicy::default(), 91 ) 92 } 93 94 pub fn with_trade_listing_runtime_and_policy( 95 keys: RadrootsNostrKeys, 96 trade_listing_runtime: TradeListingRuntime, 97 trade_validation_receipt_policy: TradeValidationReceiptProverPolicy, 98 ) -> Self { 99 let client = RadrootsNostrClient::new(keys); 100 Self { 101 _started: Instant::now(), 102 client, 103 trade_listing_runtime, 104 trade_validation_receipt_policy, 105 } 106 } 107 } 108 109 use std::sync::Arc; 110 use tokio::sync::Mutex; 111 112 pub struct RhiHandle { 113 stop_tx: Arc<Mutex<Option<tokio::sync::watch::Sender<bool>>>>, 114 join: Option<tokio::task::JoinHandle<()>>, 115 } 116 117 impl Clone for RhiHandle { 118 fn clone(&self) -> Self { 119 Self { 120 stop_tx: Arc::clone(&self.stop_tx), 121 join: None, // don’t clone the JoinHandle! 122 } 123 } 124 } 125 126 impl RhiHandle { 127 pub fn stop(&self) { 128 if let Some(tx) = self.stop_tx.try_lock().ok().and_then(|mut opt| opt.take()) { 129 let _ = tx.send(true); 130 } 131 } 132 133 pub async fn stopped(mut self) { 134 if let Some(join) = self.join.take() { 135 let _ = join.await; 136 } 137 } 138 } 139 140 pub async fn start_subscriber( 141 client: RadrootsNostrClient, 142 keys: RadrootsNostrKeys, 143 runtime: TradeListingRuntime, 144 backoff_cfg: BackoffConfig, 145 ) -> RhiHandle { 146 start_subscriber_with_policy( 147 client, 148 keys, 149 runtime, 150 TradeValidationReceiptProverPolicy::default(), 151 backoff_cfg, 152 ) 153 .await 154 } 155 156 pub async fn start_subscriber_with_policy( 157 client: RadrootsNostrClient, 158 keys: RadrootsNostrKeys, 159 runtime: TradeListingRuntime, 160 proof_policy: TradeValidationReceiptProverPolicy, 161 backoff_cfg: BackoffConfig, 162 ) -> RhiHandle { 163 let (stop_tx, mut stop_rx) = tokio::sync::watch::channel(false); 164 165 let join = tokio::spawn(async move { 166 let mut backoff = Backoff::new(backoff_cfg); 167 loop { 168 if *stop_rx.borrow() { 169 break; 170 } 171 172 client.connect().await; 173 if !wait_for_connection_or_stop(&client, &mut stop_rx).await { 174 break; 175 } 176 177 let res = run_subscriber_once( 178 client.clone(), 179 keys.clone(), 180 runtime.clone(), 181 proof_policy.clone(), 182 stop_rx.clone(), 183 ) 184 .await; 185 186 let failed = res.is_err(); 187 188 if let Err(e) = res { 189 tracing::error!("Error on job request subscription: {e}"); 190 } else { 191 backoff.reset(); 192 } 193 194 if *stop_rx.borrow() { 195 break; 196 } 197 198 if failed { 199 let delay = backoff.next_delay(); 200 tokio::select! { 201 _ = tokio::time::sleep(delay) => {} 202 _ = stop_rx.changed() => break, 203 } 204 } 205 } 206 }); 207 208 RhiHandle { 209 stop_tx: Arc::new(Mutex::new(Some(stop_tx))), 210 join: Some(join), 211 } 212 } 213 214 #[cfg(test)] 215 #[cfg_attr(coverage_nightly, coverage(off))] 216 mod tests { 217 use super::{ 218 Rhi, RhiHandle, start_subscriber, subscriber_result_hook, wait_for_connection_or_stop, 219 }; 220 use crate::features::trade_listing::state::TradeListingRuntime; 221 use anyhow::anyhow; 222 use radroots_nostr::prelude::{RadrootsNostrClient, RadrootsNostrKeys}; 223 use radroots_runtime::BackoffConfig; 224 use std::sync::Arc; 225 use tokio::sync::Mutex; 226 227 #[tokio::test] 228 async fn rhi_new_initializes_client_and_runtime() { 229 let keys = RadrootsNostrKeys::generate(); 230 let rhi = Rhi::new(keys); 231 let _ = rhi.client.clone(); 232 let state = rhi.trade_listing_runtime.state(); 233 state 234 .lock() 235 .await 236 .mark_listing_validated("addr", "evt-listing-1"); 237 assert!( 238 rhi.trade_listing_runtime 239 .state() 240 .lock() 241 .await 242 .is_listing_validated("addr") 243 ); 244 } 245 246 #[tokio::test] 247 async fn rhi_handle_stop_and_stopped_cover_paths() { 248 let (tx, _rx) = tokio::sync::watch::channel(false); 249 let join = tokio::spawn(async {}); 250 let handle = RhiHandle { 251 stop_tx: Arc::new(Mutex::new(Some(tx))), 252 join: Some(join), 253 }; 254 handle.stop(); 255 handle.stop(); 256 handle.clone().stopped().await; 257 handle.stopped().await; 258 } 259 260 #[tokio::test] 261 async fn start_subscriber_runs_with_and_without_relay() { 262 let keys = RadrootsNostrKeys::generate(); 263 let cfg = BackoffConfig { 264 base_ms: 1, 265 max_ms: 2, 266 factor: 1, 267 jitter_ms: 0, 268 }; 269 270 let client_err = RadrootsNostrClient::new(keys.clone()); 271 let handle_err = start_subscriber( 272 client_err, 273 keys.clone(), 274 TradeListingRuntime::new(), 275 cfg.clone(), 276 ) 277 .await; 278 tokio::time::sleep(std::time::Duration::from_millis(30)).await; 279 handle_err.stop(); 280 handle_err.stopped().await; 281 282 let client_ok = RadrootsNostrClient::new(keys.clone()); 283 let _ = client_ok.add_relay("wss://relay.example.com").await; 284 subscriber_result_hook() 285 .lock() 286 .unwrap_or_else(std::sync::PoisonError::into_inner) 287 .push_back(Ok(())); 288 let handle_ok = start_subscriber(client_ok, keys, TradeListingRuntime::new(), cfg).await; 289 tokio::time::sleep(std::time::Duration::from_millis(30)).await; 290 handle_ok.stop(); 291 handle_ok.stopped().await; 292 } 293 294 #[tokio::test] 295 async fn start_subscriber_stops_during_connection_wait_branch() { 296 let keys = RadrootsNostrKeys::generate(); 297 let client = RadrootsNostrClient::new(keys.clone()); 298 let handle = start_subscriber( 299 client, 300 keys, 301 TradeListingRuntime::new(), 302 BackoffConfig { 303 base_ms: 25, 304 max_ms: 50, 305 factor: 1, 306 jitter_ms: 0, 307 }, 308 ) 309 .await; 310 tokio::time::sleep(std::time::Duration::from_millis(5)).await; 311 handle.stop(); 312 handle.stopped().await; 313 } 314 315 #[tokio::test] 316 async fn start_subscriber_stops_during_backoff_wait_branch() { 317 let keys = RadrootsNostrKeys::generate(); 318 let client = RadrootsNostrClient::new(keys.clone()); 319 let _ = client.add_relay("wss://relay.example.com").await; 320 subscriber_result_hook() 321 .lock() 322 .unwrap_or_else(std::sync::PoisonError::into_inner) 323 .push_back(Err(anyhow!("forced subscriber failure"))); 324 let handle = start_subscriber( 325 client, 326 keys, 327 TradeListingRuntime::new(), 328 BackoffConfig { 329 base_ms: 200, 330 max_ms: 200, 331 factor: 1, 332 jitter_ms: 0, 333 }, 334 ) 335 .await; 336 tokio::time::sleep(std::time::Duration::from_millis(25)).await; 337 handle.stop(); 338 handle.stopped().await; 339 } 340 341 #[tokio::test] 342 async fn wait_for_connection_or_stop_covers_both_outcomes() { 343 let keys = RadrootsNostrKeys::generate(); 344 345 let client_stop = RadrootsNostrClient::new(keys.clone()); 346 let (stop_tx, mut stop_rx) = tokio::sync::watch::channel(false); 347 let _ = stop_tx.send(true); 348 let stop_branch = wait_for_connection_or_stop(&client_stop, &mut stop_rx).await; 349 assert!(!stop_branch); 350 351 let client_wait = RadrootsNostrClient::new(keys); 352 let (_tx, mut rx) = tokio::sync::watch::channel(false); 353 let wait_branch = wait_for_connection_or_stop(&client_wait, &mut rx).await; 354 assert!(wait_branch); 355 } 356 }