rhi

Coordinated trade for connected markets
git clone https://radroots.dev/git/rhi.git
Log | Files | Refs | README | LICENSE

lib.rs (14790B)


      1 #![cfg_attr(coverage_nightly, feature(coverage_attribute))]
      2 
      3 pub mod adapters;
      4 pub mod cli;
      5 pub mod config;
      6 pub mod features;
      7 pub mod identity_storage;
      8 pub mod paths;
      9 pub mod proof_smoke;
     10 pub mod remote_prove;
     11 pub mod rhi;
     12 
     13 pub use cli::Args as cli_args;
     14 
     15 use anyhow::Result;
     16 use radroots_events::kinds::{
     17     KIND_LISTING, KIND_LISTING_DRAFT, ORDER_EVENT_KINDS, TRADE_VALIDATION_EVENT_KINDS,
     18 };
     19 use std::time::Duration;
     20 
     21 use crate::features::trade_listing::state::{TradeListingRuntime, TradeListingRuntimeConfig};
     22 use crate::identity_storage::load_service_identity;
     23 use crate::rhi::{Rhi, start_subscriber_with_policy};
     24 use radroots_identity::RadrootsIdentity;
     25 use radroots_nostr::prelude::{
     26     RadrootsNostrApplicationHandlerSpec, radroots_nostr_bootstrap_service_presence,
     27 };
     28 use tracing::{info, warn};
     29 
     30 #[cfg(test)]
     31 static RUN_RHI_AUTO_STOP: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
     32 #[cfg(test)]
     33 static RUN_RHI_SKIP_SUBSCRIBER: std::sync::atomic::AtomicBool =
     34     std::sync::atomic::AtomicBool::new(false);
     35 
     36 #[cfg(test)]
     37 static RUN_RHI_BOOTSTRAP_HOOK: std::sync::OnceLock<std::sync::Mutex<Option<Result<(), String>>>> =
     38     std::sync::OnceLock::new();
     39 
     40 #[derive(Clone, Copy)]
     41 enum RunRhiWaitOutcome {
     42     Shutdown,
     43     Stopped,
     44 }
     45 
     46 #[cfg(test)]
     47 static RUN_RHI_WAIT_HOOK: std::sync::OnceLock<std::sync::Mutex<Option<RunRhiWaitOutcome>>> =
     48     std::sync::OnceLock::new();
     49 
     50 #[cfg(test)]
     51 fn run_rhi_bootstrap_hook() -> &'static std::sync::Mutex<Option<Result<(), String>>> {
     52     RUN_RHI_BOOTSTRAP_HOOK.get_or_init(|| std::sync::Mutex::new(None))
     53 }
     54 
     55 #[cfg(test)]
     56 fn run_rhi_wait_hook() -> &'static std::sync::Mutex<Option<RunRhiWaitOutcome>> {
     57     RUN_RHI_WAIT_HOOK.get_or_init(|| std::sync::Mutex::new(None))
     58 }
     59 
     60 #[cfg(test)]
     61 fn take_bootstrap_hook_result() -> Option<Result<(), String>> {
     62     run_rhi_bootstrap_hook()
     63         .lock()
     64         .unwrap_or_else(std::sync::PoisonError::into_inner)
     65         .take()
     66 }
     67 
     68 #[cfg(not(test))]
     69 #[cfg_attr(coverage_nightly, coverage(off))]
     70 fn take_bootstrap_hook_result() -> Option<Result<(), String>> {
     71     None
     72 }
     73 
     74 async fn bootstrap_presence(
     75     client: &radroots_nostr::prelude::RadrootsNostrClient,
     76     identity: &RadrootsIdentity,
     77     metadata: &radroots_nostr::prelude::RadrootsNostrMetadata,
     78     handler_spec: &RadrootsNostrApplicationHandlerSpec,
     79 ) -> Result<()> {
     80     let bootstrap_result: Result<()> = match take_bootstrap_hook_result() {
     81         Some(result) => result.map_err(anyhow::Error::msg),
     82         None => radroots_nostr_bootstrap_service_presence(
     83             client,
     84             identity,
     85             None,
     86             metadata,
     87             handler_spec,
     88             Duration::from_secs(5),
     89         )
     90         .await
     91         .map(|_| ())
     92         .map_err(anyhow::Error::from),
     93     };
     94     bootstrap_result?;
     95     Ok(())
     96 }
     97 
     98 #[cfg_attr(coverage_nightly, coverage(off))]
     99 async fn wait_for_shutdown_or_stopped(handle: crate::rhi::RhiHandle) -> RunRhiWaitOutcome {
    100     #[cfg(test)]
    101     if let Some(outcome) = run_rhi_wait_hook()
    102         .lock()
    103         .unwrap_or_else(std::sync::PoisonError::into_inner)
    104         .take()
    105     {
    106         return outcome;
    107     }
    108 
    109     tokio::select! {
    110         _ = radroots_runtime::shutdown_signal() => RunRhiWaitOutcome::Shutdown,
    111         _ = handle.stopped() => RunRhiWaitOutcome::Stopped,
    112     }
    113 }
    114 
    115 pub async fn run_rhi(settings: &config::Settings, args: &cli_args) -> Result<()> {
    116     let identity = load_service_identity(
    117         args.service.identity.as_deref(),
    118         args.service.allow_generate_identity,
    119     )?;
    120     let keys = identity.keys().clone();
    121     let trade_listing_runtime = TradeListingRuntime::load(TradeListingRuntimeConfig {
    122         state_path: settings.config.subscriber.state.path.clone(),
    123         replay_window_secs: settings.config.subscriber.state.replay_window_secs,
    124         replay_overlap_secs: settings.config.subscriber.state.replay_overlap_secs,
    125     })
    126     .await?;
    127 
    128     let rhi = Rhi::with_trade_listing_runtime_and_policy(
    129         keys.clone(),
    130         trade_listing_runtime,
    131         settings.config.trade_validation_receipt.clone(),
    132     );
    133     let client = rhi.client.clone();
    134     let service_cfg = settings.config.service.clone();
    135     let relays = service_cfg.relays.clone();
    136 
    137     for relay in &relays {
    138         client.add_relay(relay).await?;
    139     }
    140 
    141     let md = settings.metadata.clone();
    142 
    143     if !relays.is_empty() {
    144         let handler_kinds = [KIND_LISTING, KIND_LISTING_DRAFT]
    145             .into_iter()
    146             .chain(ORDER_EVENT_KINDS)
    147             .chain(TRADE_VALIDATION_EVENT_KINDS)
    148             .collect();
    149         let handler_spec = RadrootsNostrApplicationHandlerSpec {
    150             kinds: handler_kinds,
    151             identifier: service_cfg.nip89_identifier.clone(),
    152             metadata: Some(md.clone()),
    153             extra_tags: service_cfg.nip89_extra_tags.clone(),
    154             relays: relays.clone(),
    155             nostrconnect_url: None,
    156         };
    157         if let Err(e) = bootstrap_presence(&client, &identity, &md, &handler_spec).await {
    158             warn!("Failed to publish service presence on startup: {e}");
    159         } else {
    160             info!("Published service presence on startup");
    161         }
    162     }
    163 
    164     #[cfg(test)]
    165     if RUN_RHI_SKIP_SUBSCRIBER.load(std::sync::atomic::Ordering::Relaxed) {
    166         return Ok(());
    167     }
    168 
    169     let handle = start_subscriber_with_policy(
    170         client.clone(),
    171         keys.clone(),
    172         rhi.trade_listing_runtime.clone(),
    173         rhi.trade_validation_receipt_policy.clone(),
    174         settings.config.subscriber.backoff.clone(),
    175     )
    176     .await;
    177 
    178     let stop_handle = handle.clone();
    179 
    180     #[cfg(test)]
    181     if RUN_RHI_AUTO_STOP.load(std::sync::atomic::Ordering::Relaxed) {
    182         stop_handle.stop();
    183     }
    184 
    185     match wait_for_shutdown_or_stopped(handle).await {
    186         RunRhiWaitOutcome::Shutdown => {
    187             info!("Shutting down…");
    188             stop_handle.stop();
    189         }
    190         RunRhiWaitOutcome::Stopped => {}
    191     }
    192 
    193     client.unsubscribe_all().await;
    194     client.disconnect().await;
    195 
    196     Ok(())
    197 }
    198 
    199 #[cfg(test)]
    200 #[cfg_attr(coverage_nightly, coverage(off))]
    201 mod tests {
    202     use super::{
    203         RUN_RHI_AUTO_STOP, RUN_RHI_SKIP_SUBSCRIBER, RunRhiWaitOutcome, bootstrap_presence, run_rhi,
    204         run_rhi_bootstrap_hook, run_rhi_wait_hook,
    205     };
    206     use crate::{cli_args, config};
    207     use std::path::PathBuf;
    208     use std::sync::atomic::Ordering;
    209     use std::sync::{Mutex, MutexGuard};
    210 
    211     static TEST_LOCK: Mutex<()> = Mutex::new(());
    212 
    213     fn test_guard() -> MutexGuard<'static, ()> {
    214         let guard = TEST_LOCK
    215             .lock()
    216             .unwrap_or_else(std::sync::PoisonError::into_inner);
    217         RUN_RHI_AUTO_STOP.store(false, Ordering::Relaxed);
    218         RUN_RHI_SKIP_SUBSCRIBER.store(false, Ordering::Relaxed);
    219         *run_rhi_bootstrap_hook()
    220             .lock()
    221             .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
    222         *run_rhi_wait_hook()
    223             .lock()
    224             .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
    225         guard
    226     }
    227 
    228     fn settings_with_relays(relays: Vec<String>) -> config::Settings {
    229         config::Settings {
    230             metadata: serde_json::from_str(r#"{"name":"rhi-test"}"#).expect("metadata"),
    231             config: config::Configuration {
    232                 service: radroots_runtime::RadrootsNostrServiceConfig {
    233                     logs_dir: std::env::temp_dir()
    234                         .join("rhi-test-logs")
    235                         .display()
    236                         .to_string(),
    237                     relays,
    238                     nip89_identifier: Some("rhi".to_string()),
    239                     nip89_extra_tags: Vec::new(),
    240                 },
    241                 logging: config::LoggingConfig {
    242                     output_dir: std::env::temp_dir().join("rhi-test-logs"),
    243                     filter: "info".to_string(),
    244                     stdout: true,
    245                 },
    246                 subscriber: config::SubscriberConfig {
    247                     backoff: radroots_runtime::BackoffConfig {
    248                         base_ms: 1,
    249                         max_ms: 2,
    250                         factor: 1,
    251                         jitter_ms: 0,
    252                     },
    253                     state: config::SubscriberStateConfig {
    254                         path: unique_state_path("settings"),
    255                         ..Default::default()
    256                     },
    257                 },
    258                 trade_validation_receipt:
    259                     crate::features::trade_validation_receipt::TradeValidationReceiptProverPolicy::default(),
    260             },
    261         }
    262     }
    263 
    264     fn args_for_identity(path: PathBuf) -> cli_args {
    265         cli_args {
    266             command: None,
    267             service: radroots_runtime::RadrootsServiceCliArgs {
    268                 config: Some(PathBuf::from("config.toml")),
    269                 identity: Some(path),
    270                 allow_generate_identity: true,
    271             },
    272         }
    273     }
    274 
    275     fn unique_identity_path(suffix: &str) -> PathBuf {
    276         let nanos = std::time::SystemTime::now()
    277             .duration_since(std::time::UNIX_EPOCH)
    278             .expect("time")
    279             .as_nanos();
    280         std::env::temp_dir().join(format!("rhi-{suffix}-{nanos}.secret.json"))
    281     }
    282 
    283     fn cleanup_identity_artifacts(path: &std::path::Path) {
    284         let _ = std::fs::remove_file(path);
    285         let _ = std::fs::remove_file(crate::identity_storage::encrypted_identity_key_path(path));
    286     }
    287 
    288     fn unique_state_path(suffix: &str) -> PathBuf {
    289         let nanos = std::time::SystemTime::now()
    290             .duration_since(std::time::UNIX_EPOCH)
    291             .expect("time")
    292             .as_nanos();
    293         std::env::temp_dir().join(format!("rhi-state-{suffix}-{nanos}.json"))
    294     }
    295 
    296     #[tokio::test]
    297     async fn run_rhi_completes_with_auto_stop_and_empty_relays() {
    298         let _guard = test_guard();
    299         RUN_RHI_AUTO_STOP.store(true, Ordering::Relaxed);
    300         RUN_RHI_SKIP_SUBSCRIBER.store(false, Ordering::Relaxed);
    301         let path = unique_identity_path("empty");
    302         let args = args_for_identity(path.clone());
    303         let settings = settings_with_relays(Vec::new());
    304         let result = run_rhi(&settings, &args).await;
    305         RUN_RHI_AUTO_STOP.store(false, Ordering::Relaxed);
    306         cleanup_identity_artifacts(&path);
    307         assert!(result.is_ok());
    308     }
    309 
    310     #[tokio::test]
    311     async fn run_rhi_covers_presence_success_and_failure_branches() {
    312         let _guard = test_guard();
    313         RUN_RHI_AUTO_STOP.store(true, Ordering::Relaxed);
    314         RUN_RHI_SKIP_SUBSCRIBER.store(true, Ordering::Relaxed);
    315 
    316         let path_ok = unique_identity_path("presence-ok");
    317         let args_ok = args_for_identity(path_ok.clone());
    318         let settings_ok = settings_with_relays(vec!["wss://relay.example.com".to_string()]);
    319         *run_rhi_bootstrap_hook()
    320             .lock()
    321             .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(Ok(()));
    322         let ok = run_rhi(&settings_ok, &args_ok).await;
    323         cleanup_identity_artifacts(&path_ok);
    324         assert!(ok.is_ok());
    325 
    326         let path_err = unique_identity_path("presence-err");
    327         let args_err = args_for_identity(path_err.clone());
    328         let settings_err = settings_with_relays(vec!["wss://relay.example.com".to_string()]);
    329         *run_rhi_bootstrap_hook()
    330             .lock()
    331             .unwrap_or_else(std::sync::PoisonError::into_inner) =
    332             Some(Err("presence failure".to_string()));
    333         let err = run_rhi(&settings_err, &args_err).await;
    334         RUN_RHI_AUTO_STOP.store(false, Ordering::Relaxed);
    335         RUN_RHI_SKIP_SUBSCRIBER.store(false, Ordering::Relaxed);
    336         cleanup_identity_artifacts(&path_err);
    337         assert!(err.is_ok());
    338     }
    339 
    340     #[tokio::test]
    341     async fn bootstrap_presence_fallback_path_is_callable() {
    342         let _guard = test_guard();
    343         let identity_path = unique_identity_path("bootstrap");
    344         let identity = crate::identity_storage::load_service_identity(Some(&identity_path), true)
    345             .expect("identity");
    346         let client = radroots_nostr::prelude::RadrootsNostrClient::new(identity.keys().clone());
    347         let metadata: radroots_nostr::prelude::RadrootsNostrMetadata =
    348             serde_json::from_str(r#"{"name":"bootstrap"}"#).expect("bootstrap metadata");
    349         let handler_spec = radroots_nostr::prelude::RadrootsNostrApplicationHandlerSpec {
    350             kinds: vec![30402],
    351             identifier: Some("rhi".to_string()),
    352             metadata: Some(metadata.clone()),
    353             extra_tags: Vec::new(),
    354             relays: vec!["wss://relay.example.com".to_string()],
    355             nostrconnect_url: None,
    356         };
    357         let result = bootstrap_presence(&client, &identity, &metadata, &handler_spec).await;
    358         cleanup_identity_artifacts(&identity_path);
    359         assert!(result.is_err());
    360     }
    361 
    362     #[tokio::test]
    363     async fn run_rhi_covers_shutdown_wait_branch() {
    364         let _guard = test_guard();
    365         RUN_RHI_AUTO_STOP.store(false, Ordering::Relaxed);
    366         RUN_RHI_SKIP_SUBSCRIBER.store(false, Ordering::Relaxed);
    367         *run_rhi_wait_hook()
    368             .lock()
    369             .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(RunRhiWaitOutcome::Shutdown);
    370 
    371         let path = unique_identity_path("shutdown");
    372         let args = args_for_identity(path.clone());
    373         let settings = settings_with_relays(Vec::new());
    374         let result = run_rhi(&settings, &args).await;
    375         cleanup_identity_artifacts(&path);
    376         assert!(result.is_ok());
    377     }
    378 
    379     #[tokio::test]
    380     async fn run_rhi_returns_error_when_relay_configuration_is_invalid() {
    381         let _guard = test_guard();
    382         RUN_RHI_AUTO_STOP.store(false, Ordering::Relaxed);
    383         RUN_RHI_SKIP_SUBSCRIBER.store(false, Ordering::Relaxed);
    384 
    385         let path = unique_identity_path("invalid-relay");
    386         let args = args_for_identity(path.clone());
    387         let settings = settings_with_relays(vec!["not-a-relay-url".to_string()]);
    388         let result = run_rhi(&settings, &args).await;
    389         cleanup_identity_artifacts(&path);
    390         assert!(result.is_err());
    391     }
    392 
    393     #[tokio::test]
    394     async fn run_rhi_returns_error_when_identity_is_missing() {
    395         let _guard = test_guard();
    396         RUN_RHI_AUTO_STOP.store(false, Ordering::Relaxed);
    397         RUN_RHI_SKIP_SUBSCRIBER.store(false, Ordering::Relaxed);
    398 
    399         let args = cli_args {
    400             command: None,
    401             service: radroots_runtime::RadrootsServiceCliArgs {
    402                 config: Some(PathBuf::from("config.toml")),
    403                 identity: Some(PathBuf::from("/tmp/rhi-lib-missing-identity.secret.json")),
    404                 allow_generate_identity: false,
    405             },
    406         };
    407         let settings = settings_with_relays(Vec::new());
    408         let result = run_rhi(&settings, &args).await;
    409         assert!(result.is_err());
    410     }
    411 }