lib

Core libraries for Radroots
git clone https://radroots.dev/git/lib.git
Log | Files | Refs | README | LICENSE

publish.rs (12802B)


      1 #![forbid(unsafe_code)]
      2 
      3 use crate::{RadrootsRelayOutcome, RadrootsRelayTargetSet, RadrootsRelayTransportError};
      4 #[cfg(feature = "client")]
      5 use core::time::Duration;
      6 use futures::future::BoxFuture;
      7 use radroots_events::draft::RadrootsSignedNostrEvent;
      8 use serde::{Deserialize, Serialize};
      9 use std::collections::{BTreeMap, BTreeSet};
     10 use std::sync::{Arc, Mutex, PoisonError};
     11 
     12 #[cfg(feature = "client")]
     13 use crate::RadrootsRelayOutcomeKind;
     14 #[cfg(feature = "client")]
     15 use nostr::JsonUtil;
     16 #[cfg(feature = "client")]
     17 use radroots_nostr::prelude::{RadrootsNostrClient, RadrootsNostrEvent};
     18 
     19 #[cfg(feature = "client")]
     20 const RELAY_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
     21 
     22 #[derive(Clone, Debug, PartialEq, Eq)]
     23 pub struct RadrootsRelayPublishRequest {
     24     pub signed_event: RadrootsSignedNostrEvent,
     25     pub targets: RadrootsRelayTargetSet,
     26     pub accepted_quorum: usize,
     27     pub now_ms: i64,
     28 }
     29 
     30 impl RadrootsRelayPublishRequest {
     31     pub fn new(
     32         signed_event: RadrootsSignedNostrEvent,
     33         targets: RadrootsRelayTargetSet,
     34         now_ms: i64,
     35     ) -> Self {
     36         let accepted_quorum = targets.len();
     37         Self {
     38             signed_event,
     39             targets,
     40             accepted_quorum,
     41             now_ms,
     42         }
     43     }
     44 
     45     pub fn with_accepted_quorum(mut self, accepted_quorum: usize) -> Self {
     46         self.accepted_quorum = accepted_quorum;
     47         self
     48     }
     49 }
     50 
     51 #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
     52 pub struct RadrootsRelayPublishRelayReceipt {
     53     pub relay_url: String,
     54     pub outcome: RadrootsRelayOutcome,
     55     pub attempted: bool,
     56 }
     57 
     58 impl RadrootsRelayPublishRelayReceipt {
     59     pub fn attempted(relay_url: impl Into<String>, outcome: RadrootsRelayOutcome) -> Self {
     60         Self {
     61             relay_url: relay_url.into(),
     62             outcome,
     63             attempted: true,
     64         }
     65     }
     66 
     67     pub fn skipped(relay_url: impl Into<String>, outcome: RadrootsRelayOutcome) -> Self {
     68         Self {
     69             relay_url: relay_url.into(),
     70             outcome,
     71             attempted: false,
     72         }
     73     }
     74 }
     75 
     76 #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
     77 pub struct RadrootsRelayPublishReceipt {
     78     pub event_id: String,
     79     pub attempted_count: usize,
     80     pub accepted_count: usize,
     81     pub retryable_count: usize,
     82     pub terminal_count: usize,
     83     pub quorum: usize,
     84     pub quorum_met: bool,
     85     pub relays: Vec<RadrootsRelayPublishRelayReceipt>,
     86 }
     87 
     88 pub trait RadrootsRelayPublishAdapter: Send + Sync {
     89     fn publish<'a>(
     90         &'a self,
     91         request: RadrootsRelayPublishRequest,
     92     ) -> BoxFuture<'a, Result<Vec<RadrootsRelayPublishRelayReceipt>, RadrootsRelayTransportError>>;
     93 }
     94 
     95 pub async fn publish_signed_event<A>(
     96     adapter: &A,
     97     request: RadrootsRelayPublishRequest,
     98 ) -> Result<RadrootsRelayPublishReceipt, RadrootsRelayTransportError>
     99 where
    100     A: RadrootsRelayPublishAdapter,
    101 {
    102     let event_id = request.signed_event.id.clone();
    103     let quorum = request.accepted_quorum;
    104     let relays = adapter.publish(request).await?;
    105     let attempted_count = relays.iter().filter(|receipt| receipt.attempted).count();
    106     let accepted_count = relays
    107         .iter()
    108         .filter(|receipt| receipt.outcome.counts_toward_quorum())
    109         .count();
    110     let retryable_count = relays
    111         .iter()
    112         .filter(|receipt| receipt.outcome.is_retryable())
    113         .count();
    114     let terminal_count = relays
    115         .iter()
    116         .filter(|receipt| receipt.outcome.is_terminal_failure())
    117         .count();
    118     Ok(RadrootsRelayPublishReceipt {
    119         event_id,
    120         attempted_count,
    121         accepted_count,
    122         retryable_count,
    123         terminal_count,
    124         quorum,
    125         quorum_met: accepted_count >= quorum,
    126         relays,
    127     })
    128 }
    129 
    130 #[derive(Clone, Default)]
    131 pub struct RadrootsMockRelayPublishAdapter {
    132     outcomes: BTreeMap<String, RadrootsRelayOutcome>,
    133     captured_raw_events: Arc<Mutex<Vec<String>>>,
    134 }
    135 
    136 impl RadrootsMockRelayPublishAdapter {
    137     pub fn new() -> Self {
    138         Self::default()
    139     }
    140 
    141     pub fn with_outcome(
    142         mut self,
    143         relay_url: impl Into<String>,
    144         outcome: RadrootsRelayOutcome,
    145     ) -> Self {
    146         self.outcomes.insert(relay_url.into(), outcome);
    147         self
    148     }
    149 
    150     pub fn captured_raw_events(&self) -> Vec<String> {
    151         self.captured_raw_events
    152             .lock()
    153             .expect("captured raw event lock")
    154             .clone()
    155     }
    156 }
    157 
    158 impl RadrootsRelayPublishAdapter for RadrootsMockRelayPublishAdapter {
    159     fn publish<'a>(
    160         &'a self,
    161         request: RadrootsRelayPublishRequest,
    162     ) -> BoxFuture<'a, Result<Vec<RadrootsRelayPublishRelayReceipt>, RadrootsRelayTransportError>>
    163     {
    164         Box::pin(async move {
    165             self.captured_raw_events
    166                 .lock()
    167                 .map_err(captured_raw_event_lock_error)?
    168                 .push(request.signed_event.raw_json.clone());
    169             Ok(request
    170                 .targets
    171                 .relays()
    172                 .iter()
    173                 .map(|relay| {
    174                     let outcome = self
    175                         .outcomes
    176                         .get(relay.as_str())
    177                         .cloned()
    178                         .unwrap_or_else(RadrootsRelayOutcome::accepted);
    179                     RadrootsRelayPublishRelayReceipt::attempted(relay.as_str(), outcome)
    180                 })
    181                 .collect())
    182         })
    183     }
    184 }
    185 
    186 #[cfg_attr(coverage_nightly, coverage(off))]
    187 fn captured_raw_event_lock_error<T>(_error: PoisonError<T>) -> RadrootsRelayTransportError {
    188     RadrootsRelayTransportError::Transport("captured raw event lock poisoned".to_owned())
    189 }
    190 
    191 #[cfg(feature = "client")]
    192 #[derive(Clone)]
    193 pub struct RadrootsNostrClientPublishAdapter {
    194     client: RadrootsNostrClient,
    195 }
    196 
    197 #[cfg(feature = "client")]
    198 impl RadrootsNostrClientPublishAdapter {
    199     #[cfg_attr(coverage_nightly, coverage(off))]
    200     pub fn new(client: RadrootsNostrClient) -> Self {
    201         Self { client }
    202     }
    203 }
    204 
    205 #[cfg(feature = "client")]
    206 impl RadrootsRelayPublishAdapter for RadrootsNostrClientPublishAdapter {
    207     #[cfg_attr(coverage_nightly, coverage(off))]
    208     fn publish<'a>(
    209         &'a self,
    210         request: RadrootsRelayPublishRequest,
    211     ) -> BoxFuture<'a, Result<Vec<RadrootsRelayPublishRelayReceipt>, RadrootsRelayTransportError>>
    212     {
    213         Box::pin(async move {
    214             let event = RadrootsNostrEvent::from_json(request.signed_event.raw_json.as_str())
    215                 .map_err(|error| RadrootsRelayTransportError::NostrEventJson(error.to_string()))?;
    216             ensure_raw_event_matches_signed_event(&event, &request.signed_event)?;
    217             let target_strings = request.targets.relay_strings();
    218             for relay_url in &target_strings {
    219                 self.client
    220                     .add_write_relay(relay_url.as_str())
    221                     .await
    222                     .map_err(|error| RadrootsRelayTransportError::Transport(error.to_string()))?;
    223             }
    224             let connection_output = self.client.try_connect(RELAY_CONNECT_TIMEOUT).await;
    225             let target_url_set = target_strings
    226                 .iter()
    227                 .map(|relay_url| relay_url.trim_end_matches('/').to_owned())
    228                 .collect::<BTreeSet<_>>();
    229             let connected_strings = self
    230                 .client
    231                 .relays()
    232                 .await
    233                 .into_values()
    234                 .filter(|relay| relay.is_connected())
    235                 .map(|relay| relay.url().to_string())
    236                 .filter(|relay_url| target_url_set.contains(relay_url.trim_end_matches('/')))
    237                 .collect::<Vec<_>>();
    238             let connection_failures = connection_output
    239                 .failed
    240                 .iter()
    241                 .map(|(relay, reason)| {
    242                     (
    243                         relay.to_string().trim_end_matches('/').to_owned(),
    244                         reason.clone(),
    245                     )
    246                 })
    247                 .collect::<BTreeMap<_, _>>();
    248             if connected_strings.is_empty() {
    249                 return Ok(target_strings
    250                     .into_iter()
    251                     .map(|relay_url| {
    252                         let target_url = relay_url.trim_end_matches('/');
    253                         let reason = connection_failures
    254                             .get(target_url)
    255                             .cloned()
    256                             .unwrap_or_else(|| "relay did not connect".to_owned());
    257                         RadrootsRelayPublishRelayReceipt::attempted(
    258                             relay_url,
    259                             RadrootsRelayOutcome::connection_failed(reason),
    260                         )
    261                     })
    262                     .collect());
    263             }
    264             let output = match self.client.send_event_to(connected_strings, &event).await {
    265                 Ok(output) => output,
    266                 Err(error) => {
    267                     let message = error.to_string();
    268                     return Ok(target_strings
    269                         .into_iter()
    270                         .map(|relay_url| {
    271                             RadrootsRelayPublishRelayReceipt::attempted(
    272                                 relay_url,
    273                                 RadrootsRelayOutcome::connection_failed(message.clone()),
    274                             )
    275                         })
    276                         .collect());
    277                 }
    278             };
    279             let mut receipts = Vec::new();
    280             for relay_url in &target_strings {
    281                 let target_url = relay_url.trim_end_matches('/');
    282                 let success = output
    283                     .success
    284                     .iter()
    285                     .any(|success_url| success_url.to_string().trim_end_matches('/') == target_url);
    286                 if success {
    287                     receipts.push(RadrootsRelayPublishRelayReceipt::attempted(
    288                         relay_url,
    289                         RadrootsRelayOutcome {
    290                             kind: RadrootsRelayOutcomeKind::Accepted,
    291                             message: Some(
    292                                 "nostr-relay-pool-success-ok-message-unavailable".to_owned(),
    293                             ),
    294                         },
    295                     ));
    296                     continue;
    297                 }
    298                 if let Some(reason) = connection_failures.get(target_url) {
    299                     receipts.push(RadrootsRelayPublishRelayReceipt::attempted(
    300                         relay_url,
    301                         RadrootsRelayOutcome::connection_failed(reason.clone()),
    302                     ));
    303                     continue;
    304                 }
    305                 let failed = output.failed.iter().find_map(|(failed_url, message)| {
    306                     if failed_url.to_string().trim_end_matches('/') == target_url {
    307                         Some(message.clone())
    308                     } else {
    309                         None
    310                     }
    311                 });
    312                 let outcome = failed
    313                     .map(RadrootsRelayOutcome::classify)
    314                     .unwrap_or_else(|| {
    315                         RadrootsRelayOutcome::classify("error: relay output omitted target")
    316                     });
    317                 receipts.push(RadrootsRelayPublishRelayReceipt::attempted(
    318                     relay_url, outcome,
    319                 ));
    320             }
    321             Ok(receipts)
    322         })
    323     }
    324 }
    325 
    326 #[cfg(feature = "client")]
    327 fn ensure_raw_event_matches_signed_event(
    328     event: &RadrootsNostrEvent,
    329     signed_event: &RadrootsSignedNostrEvent,
    330 ) -> Result<(), RadrootsRelayTransportError> {
    331     let mismatches = [
    332         ("id", event.id.to_hex(), signed_event.id.clone()),
    333         ("pubkey", event.pubkey.to_hex(), signed_event.pubkey.clone()),
    334         (
    335             "created_at",
    336             event.created_at.as_secs().to_string(),
    337             signed_event.created_at.to_string(),
    338         ),
    339         (
    340             "kind",
    341             (event.kind.as_u16() as u32).to_string(),
    342             signed_event.kind.to_string(),
    343         ),
    344         (
    345             "content",
    346             event.content.clone(),
    347             signed_event.content.clone(),
    348         ),
    349         ("sig", event.sig.to_string(), signed_event.sig.clone()),
    350     ];
    351     for (field, raw, wrapped) in mismatches {
    352         if raw != wrapped {
    353             return Err(RadrootsRelayTransportError::NostrEventJson(format!(
    354                 "raw event JSON {field} does not match signed event {field}"
    355             )));
    356         }
    357     }
    358     let raw_tags = event
    359         .tags
    360         .iter()
    361         .map(|tag| tag.as_slice().to_vec())
    362         .collect::<Vec<_>>();
    363     if raw_tags != signed_event.tags {
    364         return Err(RadrootsRelayTransportError::NostrEventJson(
    365             "raw event JSON tags do not match signed event tags".to_owned(),
    366         ));
    367     }
    368     Ok(())
    369 }