lib.rs (14790B)
1 #![cfg_attr(coverage_nightly, feature(coverage_attribute))] 2 3 pub mod adapters; 4 pub mod cli; 5 pub mod config; 6 pub mod features; 7 pub mod identity_storage; 8 pub mod paths; 9 pub mod proof_smoke; 10 pub mod remote_prove; 11 pub mod rhi; 12 13 pub use cli::Args as cli_args; 14 15 use anyhow::Result; 16 use radroots_events::kinds::{ 17 KIND_LISTING, KIND_LISTING_DRAFT, ORDER_EVENT_KINDS, TRADE_VALIDATION_EVENT_KINDS, 18 }; 19 use std::time::Duration; 20 21 use crate::features::trade_listing::state::{TradeListingRuntime, TradeListingRuntimeConfig}; 22 use crate::identity_storage::load_service_identity; 23 use crate::rhi::{Rhi, start_subscriber_with_policy}; 24 use radroots_identity::RadrootsIdentity; 25 use radroots_nostr::prelude::{ 26 RadrootsNostrApplicationHandlerSpec, radroots_nostr_bootstrap_service_presence, 27 }; 28 use tracing::{info, warn}; 29 30 #[cfg(test)] 31 static RUN_RHI_AUTO_STOP: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false); 32 #[cfg(test)] 33 static RUN_RHI_SKIP_SUBSCRIBER: std::sync::atomic::AtomicBool = 34 std::sync::atomic::AtomicBool::new(false); 35 36 #[cfg(test)] 37 static RUN_RHI_BOOTSTRAP_HOOK: std::sync::OnceLock<std::sync::Mutex<Option<Result<(), String>>>> = 38 std::sync::OnceLock::new(); 39 40 #[derive(Clone, Copy)] 41 enum RunRhiWaitOutcome { 42 Shutdown, 43 Stopped, 44 } 45 46 #[cfg(test)] 47 static RUN_RHI_WAIT_HOOK: std::sync::OnceLock<std::sync::Mutex<Option<RunRhiWaitOutcome>>> = 48 std::sync::OnceLock::new(); 49 50 #[cfg(test)] 51 fn run_rhi_bootstrap_hook() -> &'static std::sync::Mutex<Option<Result<(), String>>> { 52 RUN_RHI_BOOTSTRAP_HOOK.get_or_init(|| std::sync::Mutex::new(None)) 53 } 54 55 #[cfg(test)] 56 fn run_rhi_wait_hook() -> &'static std::sync::Mutex<Option<RunRhiWaitOutcome>> { 57 RUN_RHI_WAIT_HOOK.get_or_init(|| std::sync::Mutex::new(None)) 58 } 59 60 #[cfg(test)] 61 fn take_bootstrap_hook_result() -> Option<Result<(), String>> { 62 run_rhi_bootstrap_hook() 63 .lock() 64 .unwrap_or_else(std::sync::PoisonError::into_inner) 65 .take() 66 } 67 68 #[cfg(not(test))] 69 #[cfg_attr(coverage_nightly, coverage(off))] 70 fn take_bootstrap_hook_result() -> Option<Result<(), String>> { 71 None 72 } 73 74 async fn bootstrap_presence( 75 client: &radroots_nostr::prelude::RadrootsNostrClient, 76 identity: &RadrootsIdentity, 77 metadata: &radroots_nostr::prelude::RadrootsNostrMetadata, 78 handler_spec: &RadrootsNostrApplicationHandlerSpec, 79 ) -> Result<()> { 80 let bootstrap_result: Result<()> = match take_bootstrap_hook_result() { 81 Some(result) => result.map_err(anyhow::Error::msg), 82 None => radroots_nostr_bootstrap_service_presence( 83 client, 84 identity, 85 None, 86 metadata, 87 handler_spec, 88 Duration::from_secs(5), 89 ) 90 .await 91 .map(|_| ()) 92 .map_err(anyhow::Error::from), 93 }; 94 bootstrap_result?; 95 Ok(()) 96 } 97 98 #[cfg_attr(coverage_nightly, coverage(off))] 99 async fn wait_for_shutdown_or_stopped(handle: crate::rhi::RhiHandle) -> RunRhiWaitOutcome { 100 #[cfg(test)] 101 if let Some(outcome) = run_rhi_wait_hook() 102 .lock() 103 .unwrap_or_else(std::sync::PoisonError::into_inner) 104 .take() 105 { 106 return outcome; 107 } 108 109 tokio::select! { 110 _ = radroots_runtime::shutdown_signal() => RunRhiWaitOutcome::Shutdown, 111 _ = handle.stopped() => RunRhiWaitOutcome::Stopped, 112 } 113 } 114 115 pub async fn run_rhi(settings: &config::Settings, args: &cli_args) -> Result<()> { 116 let identity = load_service_identity( 117 args.service.identity.as_deref(), 118 args.service.allow_generate_identity, 119 )?; 120 let keys = identity.keys().clone(); 121 let trade_listing_runtime = TradeListingRuntime::load(TradeListingRuntimeConfig { 122 state_path: settings.config.subscriber.state.path.clone(), 123 replay_window_secs: settings.config.subscriber.state.replay_window_secs, 124 replay_overlap_secs: settings.config.subscriber.state.replay_overlap_secs, 125 }) 126 .await?; 127 128 let rhi = Rhi::with_trade_listing_runtime_and_policy( 129 keys.clone(), 130 trade_listing_runtime, 131 settings.config.trade_validation_receipt.clone(), 132 ); 133 let client = rhi.client.clone(); 134 let service_cfg = settings.config.service.clone(); 135 let relays = service_cfg.relays.clone(); 136 137 for relay in &relays { 138 client.add_relay(relay).await?; 139 } 140 141 let md = settings.metadata.clone(); 142 143 if !relays.is_empty() { 144 let handler_kinds = [KIND_LISTING, KIND_LISTING_DRAFT] 145 .into_iter() 146 .chain(ORDER_EVENT_KINDS) 147 .chain(TRADE_VALIDATION_EVENT_KINDS) 148 .collect(); 149 let handler_spec = RadrootsNostrApplicationHandlerSpec { 150 kinds: handler_kinds, 151 identifier: service_cfg.nip89_identifier.clone(), 152 metadata: Some(md.clone()), 153 extra_tags: service_cfg.nip89_extra_tags.clone(), 154 relays: relays.clone(), 155 nostrconnect_url: None, 156 }; 157 if let Err(e) = bootstrap_presence(&client, &identity, &md, &handler_spec).await { 158 warn!("Failed to publish service presence on startup: {e}"); 159 } else { 160 info!("Published service presence on startup"); 161 } 162 } 163 164 #[cfg(test)] 165 if RUN_RHI_SKIP_SUBSCRIBER.load(std::sync::atomic::Ordering::Relaxed) { 166 return Ok(()); 167 } 168 169 let handle = start_subscriber_with_policy( 170 client.clone(), 171 keys.clone(), 172 rhi.trade_listing_runtime.clone(), 173 rhi.trade_validation_receipt_policy.clone(), 174 settings.config.subscriber.backoff.clone(), 175 ) 176 .await; 177 178 let stop_handle = handle.clone(); 179 180 #[cfg(test)] 181 if RUN_RHI_AUTO_STOP.load(std::sync::atomic::Ordering::Relaxed) { 182 stop_handle.stop(); 183 } 184 185 match wait_for_shutdown_or_stopped(handle).await { 186 RunRhiWaitOutcome::Shutdown => { 187 info!("Shutting down…"); 188 stop_handle.stop(); 189 } 190 RunRhiWaitOutcome::Stopped => {} 191 } 192 193 client.unsubscribe_all().await; 194 client.disconnect().await; 195 196 Ok(()) 197 } 198 199 #[cfg(test)] 200 #[cfg_attr(coverage_nightly, coverage(off))] 201 mod tests { 202 use super::{ 203 RUN_RHI_AUTO_STOP, RUN_RHI_SKIP_SUBSCRIBER, RunRhiWaitOutcome, bootstrap_presence, run_rhi, 204 run_rhi_bootstrap_hook, run_rhi_wait_hook, 205 }; 206 use crate::{cli_args, config}; 207 use std::path::PathBuf; 208 use std::sync::atomic::Ordering; 209 use std::sync::{Mutex, MutexGuard}; 210 211 static TEST_LOCK: Mutex<()> = Mutex::new(()); 212 213 fn test_guard() -> MutexGuard<'static, ()> { 214 let guard = TEST_LOCK 215 .lock() 216 .unwrap_or_else(std::sync::PoisonError::into_inner); 217 RUN_RHI_AUTO_STOP.store(false, Ordering::Relaxed); 218 RUN_RHI_SKIP_SUBSCRIBER.store(false, Ordering::Relaxed); 219 *run_rhi_bootstrap_hook() 220 .lock() 221 .unwrap_or_else(std::sync::PoisonError::into_inner) = None; 222 *run_rhi_wait_hook() 223 .lock() 224 .unwrap_or_else(std::sync::PoisonError::into_inner) = None; 225 guard 226 } 227 228 fn settings_with_relays(relays: Vec<String>) -> config::Settings { 229 config::Settings { 230 metadata: serde_json::from_str(r#"{"name":"rhi-test"}"#).expect("metadata"), 231 config: config::Configuration { 232 service: radroots_runtime::RadrootsNostrServiceConfig { 233 logs_dir: std::env::temp_dir() 234 .join("rhi-test-logs") 235 .display() 236 .to_string(), 237 relays, 238 nip89_identifier: Some("rhi".to_string()), 239 nip89_extra_tags: Vec::new(), 240 }, 241 logging: config::LoggingConfig { 242 output_dir: std::env::temp_dir().join("rhi-test-logs"), 243 filter: "info".to_string(), 244 stdout: true, 245 }, 246 subscriber: config::SubscriberConfig { 247 backoff: radroots_runtime::BackoffConfig { 248 base_ms: 1, 249 max_ms: 2, 250 factor: 1, 251 jitter_ms: 0, 252 }, 253 state: config::SubscriberStateConfig { 254 path: unique_state_path("settings"), 255 ..Default::default() 256 }, 257 }, 258 trade_validation_receipt: 259 crate::features::trade_validation_receipt::TradeValidationReceiptProverPolicy::default(), 260 }, 261 } 262 } 263 264 fn args_for_identity(path: PathBuf) -> cli_args { 265 cli_args { 266 command: None, 267 service: radroots_runtime::RadrootsServiceCliArgs { 268 config: Some(PathBuf::from("config.toml")), 269 identity: Some(path), 270 allow_generate_identity: true, 271 }, 272 } 273 } 274 275 fn unique_identity_path(suffix: &str) -> PathBuf { 276 let nanos = std::time::SystemTime::now() 277 .duration_since(std::time::UNIX_EPOCH) 278 .expect("time") 279 .as_nanos(); 280 std::env::temp_dir().join(format!("rhi-{suffix}-{nanos}.secret.json")) 281 } 282 283 fn cleanup_identity_artifacts(path: &std::path::Path) { 284 let _ = std::fs::remove_file(path); 285 let _ = std::fs::remove_file(crate::identity_storage::encrypted_identity_key_path(path)); 286 } 287 288 fn unique_state_path(suffix: &str) -> PathBuf { 289 let nanos = std::time::SystemTime::now() 290 .duration_since(std::time::UNIX_EPOCH) 291 .expect("time") 292 .as_nanos(); 293 std::env::temp_dir().join(format!("rhi-state-{suffix}-{nanos}.json")) 294 } 295 296 #[tokio::test] 297 async fn run_rhi_completes_with_auto_stop_and_empty_relays() { 298 let _guard = test_guard(); 299 RUN_RHI_AUTO_STOP.store(true, Ordering::Relaxed); 300 RUN_RHI_SKIP_SUBSCRIBER.store(false, Ordering::Relaxed); 301 let path = unique_identity_path("empty"); 302 let args = args_for_identity(path.clone()); 303 let settings = settings_with_relays(Vec::new()); 304 let result = run_rhi(&settings, &args).await; 305 RUN_RHI_AUTO_STOP.store(false, Ordering::Relaxed); 306 cleanup_identity_artifacts(&path); 307 assert!(result.is_ok()); 308 } 309 310 #[tokio::test] 311 async fn run_rhi_covers_presence_success_and_failure_branches() { 312 let _guard = test_guard(); 313 RUN_RHI_AUTO_STOP.store(true, Ordering::Relaxed); 314 RUN_RHI_SKIP_SUBSCRIBER.store(true, Ordering::Relaxed); 315 316 let path_ok = unique_identity_path("presence-ok"); 317 let args_ok = args_for_identity(path_ok.clone()); 318 let settings_ok = settings_with_relays(vec!["wss://relay.example.com".to_string()]); 319 *run_rhi_bootstrap_hook() 320 .lock() 321 .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Ok(())); 322 let ok = run_rhi(&settings_ok, &args_ok).await; 323 cleanup_identity_artifacts(&path_ok); 324 assert!(ok.is_ok()); 325 326 let path_err = unique_identity_path("presence-err"); 327 let args_err = args_for_identity(path_err.clone()); 328 let settings_err = settings_with_relays(vec!["wss://relay.example.com".to_string()]); 329 *run_rhi_bootstrap_hook() 330 .lock() 331 .unwrap_or_else(std::sync::PoisonError::into_inner) = 332 Some(Err("presence failure".to_string())); 333 let err = run_rhi(&settings_err, &args_err).await; 334 RUN_RHI_AUTO_STOP.store(false, Ordering::Relaxed); 335 RUN_RHI_SKIP_SUBSCRIBER.store(false, Ordering::Relaxed); 336 cleanup_identity_artifacts(&path_err); 337 assert!(err.is_ok()); 338 } 339 340 #[tokio::test] 341 async fn bootstrap_presence_fallback_path_is_callable() { 342 let _guard = test_guard(); 343 let identity_path = unique_identity_path("bootstrap"); 344 let identity = crate::identity_storage::load_service_identity(Some(&identity_path), true) 345 .expect("identity"); 346 let client = radroots_nostr::prelude::RadrootsNostrClient::new(identity.keys().clone()); 347 let metadata: radroots_nostr::prelude::RadrootsNostrMetadata = 348 serde_json::from_str(r#"{"name":"bootstrap"}"#).expect("bootstrap metadata"); 349 let handler_spec = radroots_nostr::prelude::RadrootsNostrApplicationHandlerSpec { 350 kinds: vec![30402], 351 identifier: Some("rhi".to_string()), 352 metadata: Some(metadata.clone()), 353 extra_tags: Vec::new(), 354 relays: vec!["wss://relay.example.com".to_string()], 355 nostrconnect_url: None, 356 }; 357 let result = bootstrap_presence(&client, &identity, &metadata, &handler_spec).await; 358 cleanup_identity_artifacts(&identity_path); 359 assert!(result.is_err()); 360 } 361 362 #[tokio::test] 363 async fn run_rhi_covers_shutdown_wait_branch() { 364 let _guard = test_guard(); 365 RUN_RHI_AUTO_STOP.store(false, Ordering::Relaxed); 366 RUN_RHI_SKIP_SUBSCRIBER.store(false, Ordering::Relaxed); 367 *run_rhi_wait_hook() 368 .lock() 369 .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(RunRhiWaitOutcome::Shutdown); 370 371 let path = unique_identity_path("shutdown"); 372 let args = args_for_identity(path.clone()); 373 let settings = settings_with_relays(Vec::new()); 374 let result = run_rhi(&settings, &args).await; 375 cleanup_identity_artifacts(&path); 376 assert!(result.is_ok()); 377 } 378 379 #[tokio::test] 380 async fn run_rhi_returns_error_when_relay_configuration_is_invalid() { 381 let _guard = test_guard(); 382 RUN_RHI_AUTO_STOP.store(false, Ordering::Relaxed); 383 RUN_RHI_SKIP_SUBSCRIBER.store(false, Ordering::Relaxed); 384 385 let path = unique_identity_path("invalid-relay"); 386 let args = args_for_identity(path.clone()); 387 let settings = settings_with_relays(vec!["not-a-relay-url".to_string()]); 388 let result = run_rhi(&settings, &args).await; 389 cleanup_identity_artifacts(&path); 390 assert!(result.is_err()); 391 } 392 393 #[tokio::test] 394 async fn run_rhi_returns_error_when_identity_is_missing() { 395 let _guard = test_guard(); 396 RUN_RHI_AUTO_STOP.store(false, Ordering::Relaxed); 397 RUN_RHI_SKIP_SUBSCRIBER.store(false, Ordering::Relaxed); 398 399 let args = cli_args { 400 command: None, 401 service: radroots_runtime::RadrootsServiceCliArgs { 402 config: Some(PathBuf::from("config.toml")), 403 identity: Some(PathBuf::from("/tmp/rhi-lib-missing-identity.secret.json")), 404 allow_generate_identity: false, 405 }, 406 }; 407 let settings = settings_with_relays(Vec::new()); 408 let result = run_rhi(&settings, &args).await; 409 assert!(result.is_err()); 410 } 411 }