direct_relay.rs (5934B)
1 use std::time::Duration; 2 3 use radroots_nostr::prelude::{ 4 RadrootsNostrClient, RadrootsNostrError, RadrootsNostrEvent, RadrootsNostrFilter, 5 RadrootsNostrOutput, 6 }; 7 8 const RELAY_CONNECT_TIMEOUT: Duration = Duration::from_secs(10); 9 const RELAY_FETCH_TIMEOUT: Duration = Duration::from_secs(10); 10 11 #[derive(Debug, Clone, PartialEq, Eq)] 12 pub struct DirectRelayFailure { 13 pub relay: String, 14 pub reason: String, 15 } 16 17 #[derive(Debug, Clone)] 18 pub struct DirectRelayFetchReceipt { 19 pub target_relays: Vec<String>, 20 pub connected_relays: Vec<String>, 21 pub failed_relays: Vec<DirectRelayFailure>, 22 pub events: Vec<RadrootsNostrEvent>, 23 } 24 25 #[derive(Debug, thiserror::Error)] 26 pub enum DirectRelayFetchError { 27 #[error("direct relay fetch requires at least one configured relay")] 28 MissingRelays, 29 #[error("failed to build async runtime for direct relay fetch: {0}")] 30 Runtime(String), 31 #[error("failed to configure relay `{relay}` for direct relay fetch: {source}")] 32 RelayConfig { 33 relay: String, 34 #[source] 35 source: RadrootsNostrError, 36 }, 37 #[error("direct relay connection failed: {reason}")] 38 Connect { 39 reason: String, 40 target_relays: Vec<String>, 41 failed_relays: Vec<DirectRelayFailure>, 42 }, 43 #[error("direct relay fetch failed: {0}")] 44 Fetch(#[source] RadrootsNostrError), 45 } 46 47 pub fn fetch_events_from_relays( 48 relay_urls: &[String], 49 filter: RadrootsNostrFilter, 50 ) -> Result<DirectRelayFetchReceipt, DirectRelayFetchError> { 51 fetch_events_from_relays_with_timeout(relay_urls, filter, RELAY_FETCH_TIMEOUT) 52 } 53 54 pub fn fetch_events_from_relays_with_timeout( 55 relay_urls: &[String], 56 filter: RadrootsNostrFilter, 57 fetch_timeout: Duration, 58 ) -> Result<DirectRelayFetchReceipt, DirectRelayFetchError> { 59 if relay_urls.is_empty() { 60 return Err(DirectRelayFetchError::MissingRelays); 61 } 62 63 let runtime = tokio::runtime::Builder::new_multi_thread() 64 .enable_all() 65 .build() 66 .map_err(|error| DirectRelayFetchError::Runtime(error.to_string()))?; 67 68 runtime.block_on(fetch_events_from_relays_async( 69 relay_urls, 70 filter, 71 fetch_timeout, 72 RELAY_CONNECT_TIMEOUT, 73 )) 74 } 75 76 async fn fetch_events_from_relays_async( 77 relay_urls: &[String], 78 filter: RadrootsNostrFilter, 79 fetch_timeout: Duration, 80 connect_timeout: Duration, 81 ) -> Result<DirectRelayFetchReceipt, DirectRelayFetchError> { 82 let client = RadrootsNostrClient::new_signerless(); 83 84 for relay_url in relay_urls { 85 client.add_read_relay(relay_url).await.map_err(|source| { 86 DirectRelayFetchError::RelayConfig { 87 relay: relay_url.clone(), 88 source, 89 } 90 })?; 91 } 92 93 let connection_output = client.try_connect(connect_timeout).await; 94 let failed_relays = relay_failures_from_output(&connection_output); 95 if connection_output.success.is_empty() { 96 return Err(DirectRelayFetchError::Connect { 97 reason: summarize_failures(&failed_relays), 98 target_relays: relay_urls.to_vec(), 99 failed_relays, 100 }); 101 } 102 103 let events = client 104 .fetch_events(filter, fetch_timeout) 105 .await 106 .map_err(DirectRelayFetchError::Fetch)?; 107 108 Ok(DirectRelayFetchReceipt { 109 target_relays: relay_urls.to_vec(), 110 connected_relays: connection_output 111 .success 112 .iter() 113 .map(ToString::to_string) 114 .collect(), 115 failed_relays, 116 events, 117 }) 118 } 119 120 fn relay_failures_from_output<T: std::fmt::Debug>( 121 output: &RadrootsNostrOutput<T>, 122 ) -> Vec<DirectRelayFailure> { 123 output 124 .failed 125 .iter() 126 .map(|(relay, reason)| DirectRelayFailure { 127 relay: relay.to_string(), 128 reason: reason.to_string(), 129 }) 130 .collect() 131 } 132 133 fn summarize_failures(failed_relays: &[DirectRelayFailure]) -> String { 134 if failed_relays.is_empty() { 135 return "no relay acknowledged the operation".to_owned(); 136 } 137 138 failed_relays 139 .iter() 140 .map(|failure| format!("{}: {}", failure.relay, failure.reason)) 141 .collect::<Vec<_>>() 142 .join("; ") 143 } 144 145 #[cfg(test)] 146 mod tests { 147 use std::time::Duration; 148 149 use radroots_nostr::prelude::RadrootsNostrFilter; 150 151 use super::{ 152 DirectRelayFetchError, fetch_events_from_relays_async, 153 fetch_events_from_relays_with_timeout, 154 }; 155 156 #[test] 157 fn fetch_events_requires_relays_before_runtime_work() { 158 let err = fetch_events_from_relays_with_timeout( 159 &[], 160 RadrootsNostrFilter::new(), 161 Duration::from_millis(1), 162 ) 163 .expect_err("missing relay error"); 164 165 assert!(matches!(err, DirectRelayFetchError::MissingRelays)); 166 } 167 168 #[test] 169 fn fetch_events_rejects_invalid_relay_urls() { 170 let err = fetch_events_from_relays_with_timeout( 171 &["not-a-relay".to_owned()], 172 RadrootsNostrFilter::new(), 173 Duration::from_millis(1), 174 ) 175 .expect_err("relay config error"); 176 177 assert!(matches!(err, DirectRelayFetchError::RelayConfig { .. })); 178 } 179 180 #[tokio::test] 181 async fn fetch_events_reports_connection_failure() { 182 let err = fetch_events_from_relays_async( 183 &["ws://127.0.0.1:9".to_owned()], 184 RadrootsNostrFilter::new(), 185 Duration::from_millis(1), 186 Duration::from_millis(50), 187 ) 188 .await 189 .expect_err("connection failure"); 190 191 match err { 192 DirectRelayFetchError::Connect { 193 target_relays, 194 failed_relays, 195 .. 196 } => { 197 assert_eq!(target_relays, vec!["ws://127.0.0.1:9"]); 198 assert_eq!(failed_relays.len(), 1); 199 } 200 _ => panic!("expected connection failure"), 201 } 202 } 203 }