publish.rs (12802B)
1 #![forbid(unsafe_code)] 2 3 use crate::{RadrootsRelayOutcome, RadrootsRelayTargetSet, RadrootsRelayTransportError}; 4 #[cfg(feature = "client")] 5 use core::time::Duration; 6 use futures::future::BoxFuture; 7 use radroots_events::draft::RadrootsSignedNostrEvent; 8 use serde::{Deserialize, Serialize}; 9 use std::collections::{BTreeMap, BTreeSet}; 10 use std::sync::{Arc, Mutex, PoisonError}; 11 12 #[cfg(feature = "client")] 13 use crate::RadrootsRelayOutcomeKind; 14 #[cfg(feature = "client")] 15 use nostr::JsonUtil; 16 #[cfg(feature = "client")] 17 use radroots_nostr::prelude::{RadrootsNostrClient, RadrootsNostrEvent}; 18 19 #[cfg(feature = "client")] 20 const RELAY_CONNECT_TIMEOUT: Duration = Duration::from_secs(10); 21 22 #[derive(Clone, Debug, PartialEq, Eq)] 23 pub struct RadrootsRelayPublishRequest { 24 pub signed_event: RadrootsSignedNostrEvent, 25 pub targets: RadrootsRelayTargetSet, 26 pub accepted_quorum: usize, 27 pub now_ms: i64, 28 } 29 30 impl RadrootsRelayPublishRequest { 31 pub fn new( 32 signed_event: RadrootsSignedNostrEvent, 33 targets: RadrootsRelayTargetSet, 34 now_ms: i64, 35 ) -> Self { 36 let accepted_quorum = targets.len(); 37 Self { 38 signed_event, 39 targets, 40 accepted_quorum, 41 now_ms, 42 } 43 } 44 45 pub fn with_accepted_quorum(mut self, accepted_quorum: usize) -> Self { 46 self.accepted_quorum = accepted_quorum; 47 self 48 } 49 } 50 51 #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] 52 pub struct RadrootsRelayPublishRelayReceipt { 53 pub relay_url: String, 54 pub outcome: RadrootsRelayOutcome, 55 pub attempted: bool, 56 } 57 58 impl RadrootsRelayPublishRelayReceipt { 59 pub fn attempted(relay_url: impl Into<String>, outcome: RadrootsRelayOutcome) -> Self { 60 Self { 61 relay_url: relay_url.into(), 62 outcome, 63 attempted: true, 64 } 65 } 66 67 pub fn skipped(relay_url: impl Into<String>, outcome: RadrootsRelayOutcome) -> Self { 68 Self { 69 relay_url: relay_url.into(), 70 outcome, 71 attempted: false, 72 } 73 } 74 } 75 76 #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] 77 pub struct RadrootsRelayPublishReceipt { 78 pub event_id: String, 79 pub attempted_count: usize, 80 pub accepted_count: usize, 81 pub retryable_count: usize, 82 pub terminal_count: usize, 83 pub quorum: usize, 84 pub quorum_met: bool, 85 pub relays: Vec<RadrootsRelayPublishRelayReceipt>, 86 } 87 88 pub trait RadrootsRelayPublishAdapter: Send + Sync { 89 fn publish<'a>( 90 &'a self, 91 request: RadrootsRelayPublishRequest, 92 ) -> BoxFuture<'a, Result<Vec<RadrootsRelayPublishRelayReceipt>, RadrootsRelayTransportError>>; 93 } 94 95 pub async fn publish_signed_event<A>( 96 adapter: &A, 97 request: RadrootsRelayPublishRequest, 98 ) -> Result<RadrootsRelayPublishReceipt, RadrootsRelayTransportError> 99 where 100 A: RadrootsRelayPublishAdapter, 101 { 102 let event_id = request.signed_event.id.clone(); 103 let quorum = request.accepted_quorum; 104 let relays = adapter.publish(request).await?; 105 let attempted_count = relays.iter().filter(|receipt| receipt.attempted).count(); 106 let accepted_count = relays 107 .iter() 108 .filter(|receipt| receipt.outcome.counts_toward_quorum()) 109 .count(); 110 let retryable_count = relays 111 .iter() 112 .filter(|receipt| receipt.outcome.is_retryable()) 113 .count(); 114 let terminal_count = relays 115 .iter() 116 .filter(|receipt| receipt.outcome.is_terminal_failure()) 117 .count(); 118 Ok(RadrootsRelayPublishReceipt { 119 event_id, 120 attempted_count, 121 accepted_count, 122 retryable_count, 123 terminal_count, 124 quorum, 125 quorum_met: accepted_count >= quorum, 126 relays, 127 }) 128 } 129 130 #[derive(Clone, Default)] 131 pub struct RadrootsMockRelayPublishAdapter { 132 outcomes: BTreeMap<String, RadrootsRelayOutcome>, 133 captured_raw_events: Arc<Mutex<Vec<String>>>, 134 } 135 136 impl RadrootsMockRelayPublishAdapter { 137 pub fn new() -> Self { 138 Self::default() 139 } 140 141 pub fn with_outcome( 142 mut self, 143 relay_url: impl Into<String>, 144 outcome: RadrootsRelayOutcome, 145 ) -> Self { 146 self.outcomes.insert(relay_url.into(), outcome); 147 self 148 } 149 150 pub fn captured_raw_events(&self) -> Vec<String> { 151 self.captured_raw_events 152 .lock() 153 .expect("captured raw event lock") 154 .clone() 155 } 156 } 157 158 impl RadrootsRelayPublishAdapter for RadrootsMockRelayPublishAdapter { 159 fn publish<'a>( 160 &'a self, 161 request: RadrootsRelayPublishRequest, 162 ) -> BoxFuture<'a, Result<Vec<RadrootsRelayPublishRelayReceipt>, RadrootsRelayTransportError>> 163 { 164 Box::pin(async move { 165 self.captured_raw_events 166 .lock() 167 .map_err(captured_raw_event_lock_error)? 168 .push(request.signed_event.raw_json.clone()); 169 Ok(request 170 .targets 171 .relays() 172 .iter() 173 .map(|relay| { 174 let outcome = self 175 .outcomes 176 .get(relay.as_str()) 177 .cloned() 178 .unwrap_or_else(RadrootsRelayOutcome::accepted); 179 RadrootsRelayPublishRelayReceipt::attempted(relay.as_str(), outcome) 180 }) 181 .collect()) 182 }) 183 } 184 } 185 186 #[cfg_attr(coverage_nightly, coverage(off))] 187 fn captured_raw_event_lock_error<T>(_error: PoisonError<T>) -> RadrootsRelayTransportError { 188 RadrootsRelayTransportError::Transport("captured raw event lock poisoned".to_owned()) 189 } 190 191 #[cfg(feature = "client")] 192 #[derive(Clone)] 193 pub struct RadrootsNostrClientPublishAdapter { 194 client: RadrootsNostrClient, 195 } 196 197 #[cfg(feature = "client")] 198 impl RadrootsNostrClientPublishAdapter { 199 #[cfg_attr(coverage_nightly, coverage(off))] 200 pub fn new(client: RadrootsNostrClient) -> Self { 201 Self { client } 202 } 203 } 204 205 #[cfg(feature = "client")] 206 impl RadrootsRelayPublishAdapter for RadrootsNostrClientPublishAdapter { 207 #[cfg_attr(coverage_nightly, coverage(off))] 208 fn publish<'a>( 209 &'a self, 210 request: RadrootsRelayPublishRequest, 211 ) -> BoxFuture<'a, Result<Vec<RadrootsRelayPublishRelayReceipt>, RadrootsRelayTransportError>> 212 { 213 Box::pin(async move { 214 let event = RadrootsNostrEvent::from_json(request.signed_event.raw_json.as_str()) 215 .map_err(|error| RadrootsRelayTransportError::NostrEventJson(error.to_string()))?; 216 ensure_raw_event_matches_signed_event(&event, &request.signed_event)?; 217 let target_strings = request.targets.relay_strings(); 218 for relay_url in &target_strings { 219 self.client 220 .add_write_relay(relay_url.as_str()) 221 .await 222 .map_err(|error| RadrootsRelayTransportError::Transport(error.to_string()))?; 223 } 224 let connection_output = self.client.try_connect(RELAY_CONNECT_TIMEOUT).await; 225 let target_url_set = target_strings 226 .iter() 227 .map(|relay_url| relay_url.trim_end_matches('/').to_owned()) 228 .collect::<BTreeSet<_>>(); 229 let connected_strings = self 230 .client 231 .relays() 232 .await 233 .into_values() 234 .filter(|relay| relay.is_connected()) 235 .map(|relay| relay.url().to_string()) 236 .filter(|relay_url| target_url_set.contains(relay_url.trim_end_matches('/'))) 237 .collect::<Vec<_>>(); 238 let connection_failures = connection_output 239 .failed 240 .iter() 241 .map(|(relay, reason)| { 242 ( 243 relay.to_string().trim_end_matches('/').to_owned(), 244 reason.clone(), 245 ) 246 }) 247 .collect::<BTreeMap<_, _>>(); 248 if connected_strings.is_empty() { 249 return Ok(target_strings 250 .into_iter() 251 .map(|relay_url| { 252 let target_url = relay_url.trim_end_matches('/'); 253 let reason = connection_failures 254 .get(target_url) 255 .cloned() 256 .unwrap_or_else(|| "relay did not connect".to_owned()); 257 RadrootsRelayPublishRelayReceipt::attempted( 258 relay_url, 259 RadrootsRelayOutcome::connection_failed(reason), 260 ) 261 }) 262 .collect()); 263 } 264 let output = match self.client.send_event_to(connected_strings, &event).await { 265 Ok(output) => output, 266 Err(error) => { 267 let message = error.to_string(); 268 return Ok(target_strings 269 .into_iter() 270 .map(|relay_url| { 271 RadrootsRelayPublishRelayReceipt::attempted( 272 relay_url, 273 RadrootsRelayOutcome::connection_failed(message.clone()), 274 ) 275 }) 276 .collect()); 277 } 278 }; 279 let mut receipts = Vec::new(); 280 for relay_url in &target_strings { 281 let target_url = relay_url.trim_end_matches('/'); 282 let success = output 283 .success 284 .iter() 285 .any(|success_url| success_url.to_string().trim_end_matches('/') == target_url); 286 if success { 287 receipts.push(RadrootsRelayPublishRelayReceipt::attempted( 288 relay_url, 289 RadrootsRelayOutcome { 290 kind: RadrootsRelayOutcomeKind::Accepted, 291 message: Some( 292 "nostr-relay-pool-success-ok-message-unavailable".to_owned(), 293 ), 294 }, 295 )); 296 continue; 297 } 298 if let Some(reason) = connection_failures.get(target_url) { 299 receipts.push(RadrootsRelayPublishRelayReceipt::attempted( 300 relay_url, 301 RadrootsRelayOutcome::connection_failed(reason.clone()), 302 )); 303 continue; 304 } 305 let failed = output.failed.iter().find_map(|(failed_url, message)| { 306 if failed_url.to_string().trim_end_matches('/') == target_url { 307 Some(message.clone()) 308 } else { 309 None 310 } 311 }); 312 let outcome = failed 313 .map(RadrootsRelayOutcome::classify) 314 .unwrap_or_else(|| { 315 RadrootsRelayOutcome::classify("error: relay output omitted target") 316 }); 317 receipts.push(RadrootsRelayPublishRelayReceipt::attempted( 318 relay_url, outcome, 319 )); 320 } 321 Ok(receipts) 322 }) 323 } 324 } 325 326 #[cfg(feature = "client")] 327 fn ensure_raw_event_matches_signed_event( 328 event: &RadrootsNostrEvent, 329 signed_event: &RadrootsSignedNostrEvent, 330 ) -> Result<(), RadrootsRelayTransportError> { 331 let mismatches = [ 332 ("id", event.id.to_hex(), signed_event.id.clone()), 333 ("pubkey", event.pubkey.to_hex(), signed_event.pubkey.clone()), 334 ( 335 "created_at", 336 event.created_at.as_secs().to_string(), 337 signed_event.created_at.to_string(), 338 ), 339 ( 340 "kind", 341 (event.kind.as_u16() as u32).to_string(), 342 signed_event.kind.to_string(), 343 ), 344 ( 345 "content", 346 event.content.clone(), 347 signed_event.content.clone(), 348 ), 349 ("sig", event.sig.to_string(), signed_event.sig.clone()), 350 ]; 351 for (field, raw, wrapped) in mismatches { 352 if raw != wrapped { 353 return Err(RadrootsRelayTransportError::NostrEventJson(format!( 354 "raw event JSON {field} does not match signed event {field}" 355 ))); 356 } 357 } 358 let raw_tags = event 359 .tags 360 .iter() 361 .map(|tag| tag.as_slice().to_vec()) 362 .collect::<Vec<_>>(); 363 if raw_tags != signed_event.tags { 364 return Err(RadrootsRelayTransportError::NostrEventJson( 365 "raw event JSON tags do not match signed event tags".to_owned(), 366 )); 367 } 368 Ok(()) 369 }