fetch.rs (9756B)
1 #![forbid(unsafe_code)] 2 3 use crate::{RadrootsRelayOutcome, RadrootsRelayTransportError}; 4 use futures::future::BoxFuture; 5 use nostr::JsonUtil; 6 use radroots_event_store::{ 7 RadrootsEventContractStatus, RadrootsEventIngest, RadrootsEventStore, RadrootsRelayObservation, 8 RadrootsRelayObservationType, 9 }; 10 use radroots_nostr::prelude::{RadrootsNostrEvent, radroots_event_from_nostr}; 11 use serde::{Deserialize, Serialize}; 12 use std::sync::{Arc, Mutex, PoisonError}; 13 14 #[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] 15 pub enum RadrootsRelayFetchMode { 16 Fetch, 17 Subscription, 18 } 19 20 #[derive(Clone, Debug, PartialEq, Eq)] 21 pub struct RadrootsRelayFetchRequest { 22 pub mode: RadrootsRelayFetchMode, 23 pub observed_at_ms: i64, 24 pub max_events: usize, 25 } 26 27 impl RadrootsRelayFetchRequest { 28 pub fn fetch(observed_at_ms: i64, max_events: usize) -> Self { 29 Self { 30 mode: RadrootsRelayFetchMode::Fetch, 31 observed_at_ms, 32 max_events, 33 } 34 } 35 36 pub fn subscription(observed_at_ms: i64, max_events: usize) -> Self { 37 Self { 38 mode: RadrootsRelayFetchMode::Subscription, 39 observed_at_ms, 40 max_events, 41 } 42 } 43 } 44 45 #[derive(Clone, Debug, PartialEq, Eq)] 46 pub enum RadrootsRelayFetchItem { 47 Event { 48 relay_url: String, 49 raw_json: String, 50 observed_at_ms: i64, 51 }, 52 Eose { 53 relay_url: String, 54 }, 55 Closed { 56 relay_url: String, 57 message: String, 58 }, 59 Notice { 60 relay_url: String, 61 message: String, 62 }, 63 } 64 65 #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] 66 pub enum RadrootsRelayFetchOutcomeKind { 67 Eose, 68 Closed, 69 Notice, 70 } 71 72 #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] 73 pub struct RadrootsRelayFetchRelayOutcome { 74 pub relay_url: String, 75 pub kind: RadrootsRelayFetchOutcomeKind, 76 pub relay_outcome: Option<RadrootsRelayOutcome>, 77 pub message: Option<String>, 78 } 79 80 #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] 81 pub struct RadrootsRelayFetchEventReceipt { 82 pub relay_url: String, 83 pub event_id: Option<String>, 84 pub inserted: bool, 85 pub duplicate: bool, 86 pub unsupported: bool, 87 pub malformed: bool, 88 pub projection_eligible: bool, 89 pub verification_status: Option<String>, 90 pub message: Option<String>, 91 } 92 93 #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] 94 pub struct RadrootsRelayFetchReceipt { 95 pub inserted_count: usize, 96 pub duplicate_count: usize, 97 pub malformed_count: usize, 98 pub unsupported_count: usize, 99 pub eose_count: usize, 100 pub closed_count: usize, 101 pub notice_count: usize, 102 pub events: Vec<RadrootsRelayFetchEventReceipt>, 103 pub relay_outcomes: Vec<RadrootsRelayFetchRelayOutcome>, 104 } 105 106 pub trait RadrootsRelayFetchAdapter: Send + Sync { 107 fn fetch<'a>( 108 &'a self, 109 request: RadrootsRelayFetchRequest, 110 ) -> BoxFuture<'a, Result<Vec<RadrootsRelayFetchItem>, RadrootsRelayTransportError>>; 111 } 112 113 pub async fn fetch_and_ingest_relay_events<A>( 114 adapter: &A, 115 event_store: &RadrootsEventStore, 116 request: RadrootsRelayFetchRequest, 117 ) -> Result<RadrootsRelayFetchReceipt, RadrootsRelayTransportError> 118 where 119 A: RadrootsRelayFetchAdapter, 120 { 121 let mode = request.mode; 122 let max_events = request.max_events; 123 let items = adapter.fetch(request).await?; 124 let mut receipt = RadrootsRelayFetchReceipt { 125 inserted_count: 0, 126 duplicate_count: 0, 127 malformed_count: 0, 128 unsupported_count: 0, 129 eose_count: 0, 130 closed_count: 0, 131 notice_count: 0, 132 events: Vec::new(), 133 relay_outcomes: Vec::new(), 134 }; 135 let mut processed_events = 0usize; 136 for item in items { 137 match item { 138 RadrootsRelayFetchItem::Event { 139 relay_url, 140 raw_json, 141 observed_at_ms, 142 } => { 143 if processed_events >= max_events { 144 continue; 145 } 146 processed_events += 1; 147 let parsed = RadrootsNostrEvent::from_json(raw_json.as_str()); 148 let Ok(raw_event) = parsed else { 149 receipt.malformed_count += 1; 150 receipt.events.push(RadrootsRelayFetchEventReceipt { 151 relay_url, 152 event_id: None, 153 inserted: false, 154 duplicate: false, 155 unsupported: false, 156 malformed: true, 157 projection_eligible: false, 158 verification_status: None, 159 message: Some("event JSON parse failed".to_owned()), 160 }); 161 continue; 162 }; 163 let event = radroots_event_from_nostr(&raw_event); 164 let observation_type = match mode { 165 RadrootsRelayFetchMode::Fetch => RadrootsRelayObservationType::Fetch, 166 RadrootsRelayFetchMode::Subscription => { 167 RadrootsRelayObservationType::Subscription 168 } 169 }; 170 let ingest = RadrootsEventIngest::new(event, observed_at_ms) 171 .with_raw_json(raw_json) 172 .with_observation(RadrootsRelayObservation::new( 173 relay_url.clone(), 174 observation_type, 175 observed_at_ms, 176 )); 177 match event_store.ingest_event(ingest).await { 178 Ok(store_receipt) => { 179 let unsupported = 180 store_receipt.contract_status != RadrootsEventContractStatus::Supported; 181 if store_receipt.inserted { 182 receipt.inserted_count += 1; 183 } else { 184 receipt.duplicate_count += 1; 185 } 186 if unsupported { 187 receipt.unsupported_count += 1; 188 } 189 receipt.events.push(RadrootsRelayFetchEventReceipt { 190 relay_url, 191 event_id: Some(store_receipt.event_id), 192 inserted: store_receipt.inserted, 193 duplicate: !store_receipt.inserted, 194 unsupported, 195 malformed: false, 196 projection_eligible: store_receipt.projection_eligible, 197 verification_status: Some( 198 store_receipt.verification_status.as_str().to_owned(), 199 ), 200 message: None, 201 }); 202 } 203 Err(error) => { 204 receipt.malformed_count += 1; 205 receipt.events.push(RadrootsRelayFetchEventReceipt { 206 relay_url, 207 event_id: Some(raw_event.id.to_hex()), 208 inserted: false, 209 duplicate: false, 210 unsupported: false, 211 malformed: true, 212 projection_eligible: false, 213 verification_status: None, 214 message: Some(error.to_string()), 215 }); 216 } 217 } 218 } 219 RadrootsRelayFetchItem::Eose { relay_url } => { 220 receipt.eose_count += 1; 221 receipt.relay_outcomes.push(RadrootsRelayFetchRelayOutcome { 222 relay_url, 223 kind: RadrootsRelayFetchOutcomeKind::Eose, 224 relay_outcome: None, 225 message: None, 226 }); 227 } 228 RadrootsRelayFetchItem::Closed { relay_url, message } => { 229 receipt.closed_count += 1; 230 receipt.relay_outcomes.push(RadrootsRelayFetchRelayOutcome { 231 relay_url, 232 kind: RadrootsRelayFetchOutcomeKind::Closed, 233 relay_outcome: Some(RadrootsRelayOutcome::classify(message.as_str())), 234 message: Some(message), 235 }); 236 } 237 RadrootsRelayFetchItem::Notice { relay_url, message } => { 238 receipt.notice_count += 1; 239 receipt.relay_outcomes.push(RadrootsRelayFetchRelayOutcome { 240 relay_url, 241 kind: RadrootsRelayFetchOutcomeKind::Notice, 242 relay_outcome: None, 243 message: Some(message), 244 }); 245 } 246 } 247 } 248 Ok(receipt) 249 } 250 251 #[derive(Clone, Default)] 252 pub struct RadrootsMockRelayFetchAdapter { 253 items: Arc<Mutex<Vec<RadrootsRelayFetchItem>>>, 254 } 255 256 impl RadrootsMockRelayFetchAdapter { 257 pub fn new(items: Vec<RadrootsRelayFetchItem>) -> Self { 258 Self { 259 items: Arc::new(Mutex::new(items)), 260 } 261 } 262 } 263 264 impl RadrootsRelayFetchAdapter for RadrootsMockRelayFetchAdapter { 265 fn fetch<'a>( 266 &'a self, 267 _request: RadrootsRelayFetchRequest, 268 ) -> BoxFuture<'a, Result<Vec<RadrootsRelayFetchItem>, RadrootsRelayTransportError>> { 269 Box::pin(async move { Ok(self.items.lock().map_err(fetch_item_lock_error)?.clone()) }) 270 } 271 } 272 273 #[cfg_attr(coverage_nightly, coverage(off))] 274 fn fetch_item_lock_error<T>(_error: PoisonError<T>) -> RadrootsRelayTransportError { 275 RadrootsRelayTransportError::Transport("fetch item lock poisoned".to_owned()) 276 }