lib

Core libraries for Radroots
git clone https://radroots.dev/git/lib.git
Log | Files | Refs | README | LICENSE

fetch.rs (9756B)


      1 #![forbid(unsafe_code)]
      2 
      3 use crate::{RadrootsRelayOutcome, RadrootsRelayTransportError};
      4 use futures::future::BoxFuture;
      5 use nostr::JsonUtil;
      6 use radroots_event_store::{
      7     RadrootsEventContractStatus, RadrootsEventIngest, RadrootsEventStore, RadrootsRelayObservation,
      8     RadrootsRelayObservationType,
      9 };
     10 use radroots_nostr::prelude::{RadrootsNostrEvent, radroots_event_from_nostr};
     11 use serde::{Deserialize, Serialize};
     12 use std::sync::{Arc, Mutex, PoisonError};
     13 
     14 #[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
     15 pub enum RadrootsRelayFetchMode {
     16     Fetch,
     17     Subscription,
     18 }
     19 
     20 #[derive(Clone, Debug, PartialEq, Eq)]
     21 pub struct RadrootsRelayFetchRequest {
     22     pub mode: RadrootsRelayFetchMode,
     23     pub observed_at_ms: i64,
     24     pub max_events: usize,
     25 }
     26 
     27 impl RadrootsRelayFetchRequest {
     28     pub fn fetch(observed_at_ms: i64, max_events: usize) -> Self {
     29         Self {
     30             mode: RadrootsRelayFetchMode::Fetch,
     31             observed_at_ms,
     32             max_events,
     33         }
     34     }
     35 
     36     pub fn subscription(observed_at_ms: i64, max_events: usize) -> Self {
     37         Self {
     38             mode: RadrootsRelayFetchMode::Subscription,
     39             observed_at_ms,
     40             max_events,
     41         }
     42     }
     43 }
     44 
     45 #[derive(Clone, Debug, PartialEq, Eq)]
     46 pub enum RadrootsRelayFetchItem {
     47     Event {
     48         relay_url: String,
     49         raw_json: String,
     50         observed_at_ms: i64,
     51     },
     52     Eose {
     53         relay_url: String,
     54     },
     55     Closed {
     56         relay_url: String,
     57         message: String,
     58     },
     59     Notice {
     60         relay_url: String,
     61         message: String,
     62     },
     63 }
     64 
     65 #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
     66 pub enum RadrootsRelayFetchOutcomeKind {
     67     Eose,
     68     Closed,
     69     Notice,
     70 }
     71 
     72 #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
     73 pub struct RadrootsRelayFetchRelayOutcome {
     74     pub relay_url: String,
     75     pub kind: RadrootsRelayFetchOutcomeKind,
     76     pub relay_outcome: Option<RadrootsRelayOutcome>,
     77     pub message: Option<String>,
     78 }
     79 
     80 #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
     81 pub struct RadrootsRelayFetchEventReceipt {
     82     pub relay_url: String,
     83     pub event_id: Option<String>,
     84     pub inserted: bool,
     85     pub duplicate: bool,
     86     pub unsupported: bool,
     87     pub malformed: bool,
     88     pub projection_eligible: bool,
     89     pub verification_status: Option<String>,
     90     pub message: Option<String>,
     91 }
     92 
     93 #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
     94 pub struct RadrootsRelayFetchReceipt {
     95     pub inserted_count: usize,
     96     pub duplicate_count: usize,
     97     pub malformed_count: usize,
     98     pub unsupported_count: usize,
     99     pub eose_count: usize,
    100     pub closed_count: usize,
    101     pub notice_count: usize,
    102     pub events: Vec<RadrootsRelayFetchEventReceipt>,
    103     pub relay_outcomes: Vec<RadrootsRelayFetchRelayOutcome>,
    104 }
    105 
    106 pub trait RadrootsRelayFetchAdapter: Send + Sync {
    107     fn fetch<'a>(
    108         &'a self,
    109         request: RadrootsRelayFetchRequest,
    110     ) -> BoxFuture<'a, Result<Vec<RadrootsRelayFetchItem>, RadrootsRelayTransportError>>;
    111 }
    112 
    113 pub async fn fetch_and_ingest_relay_events<A>(
    114     adapter: &A,
    115     event_store: &RadrootsEventStore,
    116     request: RadrootsRelayFetchRequest,
    117 ) -> Result<RadrootsRelayFetchReceipt, RadrootsRelayTransportError>
    118 where
    119     A: RadrootsRelayFetchAdapter,
    120 {
    121     let mode = request.mode;
    122     let max_events = request.max_events;
    123     let items = adapter.fetch(request).await?;
    124     let mut receipt = RadrootsRelayFetchReceipt {
    125         inserted_count: 0,
    126         duplicate_count: 0,
    127         malformed_count: 0,
    128         unsupported_count: 0,
    129         eose_count: 0,
    130         closed_count: 0,
    131         notice_count: 0,
    132         events: Vec::new(),
    133         relay_outcomes: Vec::new(),
    134     };
    135     let mut processed_events = 0usize;
    136     for item in items {
    137         match item {
    138             RadrootsRelayFetchItem::Event {
    139                 relay_url,
    140                 raw_json,
    141                 observed_at_ms,
    142             } => {
    143                 if processed_events >= max_events {
    144                     continue;
    145                 }
    146                 processed_events += 1;
    147                 let parsed = RadrootsNostrEvent::from_json(raw_json.as_str());
    148                 let Ok(raw_event) = parsed else {
    149                     receipt.malformed_count += 1;
    150                     receipt.events.push(RadrootsRelayFetchEventReceipt {
    151                         relay_url,
    152                         event_id: None,
    153                         inserted: false,
    154                         duplicate: false,
    155                         unsupported: false,
    156                         malformed: true,
    157                         projection_eligible: false,
    158                         verification_status: None,
    159                         message: Some("event JSON parse failed".to_owned()),
    160                     });
    161                     continue;
    162                 };
    163                 let event = radroots_event_from_nostr(&raw_event);
    164                 let observation_type = match mode {
    165                     RadrootsRelayFetchMode::Fetch => RadrootsRelayObservationType::Fetch,
    166                     RadrootsRelayFetchMode::Subscription => {
    167                         RadrootsRelayObservationType::Subscription
    168                     }
    169                 };
    170                 let ingest = RadrootsEventIngest::new(event, observed_at_ms)
    171                     .with_raw_json(raw_json)
    172                     .with_observation(RadrootsRelayObservation::new(
    173                         relay_url.clone(),
    174                         observation_type,
    175                         observed_at_ms,
    176                     ));
    177                 match event_store.ingest_event(ingest).await {
    178                     Ok(store_receipt) => {
    179                         let unsupported =
    180                             store_receipt.contract_status != RadrootsEventContractStatus::Supported;
    181                         if store_receipt.inserted {
    182                             receipt.inserted_count += 1;
    183                         } else {
    184                             receipt.duplicate_count += 1;
    185                         }
    186                         if unsupported {
    187                             receipt.unsupported_count += 1;
    188                         }
    189                         receipt.events.push(RadrootsRelayFetchEventReceipt {
    190                             relay_url,
    191                             event_id: Some(store_receipt.event_id),
    192                             inserted: store_receipt.inserted,
    193                             duplicate: !store_receipt.inserted,
    194                             unsupported,
    195                             malformed: false,
    196                             projection_eligible: store_receipt.projection_eligible,
    197                             verification_status: Some(
    198                                 store_receipt.verification_status.as_str().to_owned(),
    199                             ),
    200                             message: None,
    201                         });
    202                     }
    203                     Err(error) => {
    204                         receipt.malformed_count += 1;
    205                         receipt.events.push(RadrootsRelayFetchEventReceipt {
    206                             relay_url,
    207                             event_id: Some(raw_event.id.to_hex()),
    208                             inserted: false,
    209                             duplicate: false,
    210                             unsupported: false,
    211                             malformed: true,
    212                             projection_eligible: false,
    213                             verification_status: None,
    214                             message: Some(error.to_string()),
    215                         });
    216                     }
    217                 }
    218             }
    219             RadrootsRelayFetchItem::Eose { relay_url } => {
    220                 receipt.eose_count += 1;
    221                 receipt.relay_outcomes.push(RadrootsRelayFetchRelayOutcome {
    222                     relay_url,
    223                     kind: RadrootsRelayFetchOutcomeKind::Eose,
    224                     relay_outcome: None,
    225                     message: None,
    226                 });
    227             }
    228             RadrootsRelayFetchItem::Closed { relay_url, message } => {
    229                 receipt.closed_count += 1;
    230                 receipt.relay_outcomes.push(RadrootsRelayFetchRelayOutcome {
    231                     relay_url,
    232                     kind: RadrootsRelayFetchOutcomeKind::Closed,
    233                     relay_outcome: Some(RadrootsRelayOutcome::classify(message.as_str())),
    234                     message: Some(message),
    235                 });
    236             }
    237             RadrootsRelayFetchItem::Notice { relay_url, message } => {
    238                 receipt.notice_count += 1;
    239                 receipt.relay_outcomes.push(RadrootsRelayFetchRelayOutcome {
    240                     relay_url,
    241                     kind: RadrootsRelayFetchOutcomeKind::Notice,
    242                     relay_outcome: None,
    243                     message: Some(message),
    244                 });
    245             }
    246         }
    247     }
    248     Ok(receipt)
    249 }
    250 
    251 #[derive(Clone, Default)]
    252 pub struct RadrootsMockRelayFetchAdapter {
    253     items: Arc<Mutex<Vec<RadrootsRelayFetchItem>>>,
    254 }
    255 
    256 impl RadrootsMockRelayFetchAdapter {
    257     pub fn new(items: Vec<RadrootsRelayFetchItem>) -> Self {
    258         Self {
    259             items: Arc::new(Mutex::new(items)),
    260         }
    261     }
    262 }
    263 
    264 impl RadrootsRelayFetchAdapter for RadrootsMockRelayFetchAdapter {
    265     fn fetch<'a>(
    266         &'a self,
    267         _request: RadrootsRelayFetchRequest,
    268     ) -> BoxFuture<'a, Result<Vec<RadrootsRelayFetchItem>, RadrootsRelayTransportError>> {
    269         Box::pin(async move { Ok(self.items.lock().map_err(fetch_item_lock_error)?.clone()) })
    270     }
    271 }
    272 
    273 #[cfg_attr(coverage_nightly, coverage(off))]
    274 fn fetch_item_lock_error<T>(_error: PoisonError<T>) -> RadrootsRelayTransportError {
    275     RadrootsRelayTransportError::Transport("fetch item lock poisoned".to_owned())
    276 }