lib

Core libraries for Radroots
git clone https://radroots.dev/git/lib.git
Log | Files | Refs | README | LICENSE

runtime.rs (18091B)


      1 use crate::error::RadrootsNostrRuntimeError;
      2 use crate::sink::RadrootsNostrEventSink;
      3 use crate::types::{
      4     RadrootsNostrConnectionSnapshot, RadrootsNostrRuntimeEvent, RadrootsNostrSubscriptionHandle,
      5     RadrootsNostrSubscriptionPolicy, RadrootsNostrSubscriptionSpec, RadrootsNostrTrafficLight,
      6 };
      7 use alloc::string::{String, ToString};
      8 use alloc::sync::Arc;
      9 use alloc::vec::Vec;
     10 use core::sync::atomic::{AtomicBool, AtomicU64, Ordering};
     11 use core::time::Duration;
     12 use futures::StreamExt;
     13 use radroots_nostr::prelude::{
     14     RadrootsNostrClient, RadrootsNostrKeys, RadrootsNostrMonitor, RadrootsNostrMonitorNotification,
     15     RadrootsNostrRelayStatus, RadrootsNostrRelayUrl, RadrootsNostrTimestamp,
     16 };
     17 use std::collections::HashMap;
     18 use std::sync::Mutex;
     19 use tokio::sync::mpsc;
     20 use tokio::task::JoinHandle;
     21 
     22 #[derive(Clone)]
     23 pub struct RadrootsNostrRuntimeBuilder {
     24     keys: Option<RadrootsNostrKeys>,
     25     relays: Vec<String>,
     26     queue_capacity: usize,
     27     monitor_capacity: usize,
     28     event_sink: Option<Arc<dyn RadrootsNostrEventSink>>,
     29 }
     30 
     31 impl RadrootsNostrRuntimeBuilder {
     32     pub const DEFAULT_QUEUE_CAPACITY: usize = 2_048;
     33     pub const DEFAULT_MONITOR_CAPACITY: usize = 2_048;
     34 
     35     pub fn new() -> Self {
     36         Self {
     37             keys: None,
     38             relays: Vec::new(),
     39             queue_capacity: Self::DEFAULT_QUEUE_CAPACITY,
     40             monitor_capacity: Self::DEFAULT_MONITOR_CAPACITY,
     41             event_sink: None,
     42         }
     43     }
     44 
     45     pub fn keys(mut self, keys: RadrootsNostrKeys) -> Self {
     46         self.keys = Some(keys);
     47         self
     48     }
     49 
     50     pub fn relays(mut self, relays: Vec<String>) -> Self {
     51         self.relays = relays;
     52         self
     53     }
     54 
     55     pub fn add_relay(mut self, relay: impl Into<String>) -> Self {
     56         self.relays.push(relay.into());
     57         self
     58     }
     59 
     60     pub fn queue_capacity(mut self, capacity: usize) -> Self {
     61         self.queue_capacity = capacity;
     62         self
     63     }
     64 
     65     pub fn monitor_capacity(mut self, capacity: usize) -> Self {
     66         self.monitor_capacity = capacity;
     67         self
     68     }
     69 
     70     pub fn event_sink(mut self, sink: Arc<dyn RadrootsNostrEventSink>) -> Self {
     71         self.event_sink = Some(sink);
     72         self
     73     }
     74 
     75     pub fn build(self) -> Result<RadrootsNostrRuntime, RadrootsNostrRuntimeError> {
     76         let keys = self
     77             .keys
     78             .ok_or(RadrootsNostrRuntimeError::MissingConfig("keys"))?;
     79         if self.relays.is_empty() {
     80             return Err(RadrootsNostrRuntimeError::MissingConfig("relays"));
     81         }
     82         if self.queue_capacity == 0 {
     83             return Err(RadrootsNostrRuntimeError::InvalidConfig("queue_capacity"));
     84         }
     85         if self.monitor_capacity == 0 {
     86             return Err(RadrootsNostrRuntimeError::InvalidConfig("monitor_capacity"));
     87         }
     88 
     89         let monitor = RadrootsNostrMonitor::new(self.monitor_capacity);
     90         let client = RadrootsNostrClient::new_with_monitor(keys, monitor);
     91         let (queue_tx, queue_rx) = mpsc::channel(self.queue_capacity);
     92 
     93         let inner = Arc::new(RadrootsNostrRuntimeInner {
     94             client,
     95             relays: Mutex::new(self.relays),
     96             queue_tx,
     97             queue_rx: Mutex::new(queue_rx),
     98             statuses: Mutex::new(HashMap::new()),
     99             last_error: Mutex::new(None),
    100             monitor_task: Mutex::new(None),
    101             subscription_tasks: Mutex::new(HashMap::new()),
    102             started: AtomicBool::new(false),
    103             shutting_down: AtomicBool::new(false),
    104             next_subscription_id: AtomicU64::new(1),
    105             event_sink: self.event_sink,
    106         });
    107 
    108         Ok(RadrootsNostrRuntime { inner })
    109     }
    110 }
    111 
    112 impl Default for RadrootsNostrRuntimeBuilder {
    113     fn default() -> Self {
    114         Self::new()
    115     }
    116 }
    117 
    118 #[derive(Clone)]
    119 pub struct RadrootsNostrRuntime {
    120     inner: Arc<RadrootsNostrRuntimeInner>,
    121 }
    122 
    123 struct RadrootsNostrRuntimeInner {
    124     client: RadrootsNostrClient,
    125     relays: Mutex<Vec<String>>,
    126     queue_tx: mpsc::Sender<RadrootsNostrRuntimeEvent>,
    127     queue_rx: Mutex<mpsc::Receiver<RadrootsNostrRuntimeEvent>>,
    128     statuses: Mutex<HashMap<RadrootsNostrRelayUrl, RadrootsNostrRelayStatus>>,
    129     last_error: Mutex<Option<String>>,
    130     monitor_task: Mutex<Option<JoinHandle<()>>>,
    131     subscription_tasks: Mutex<HashMap<String, JoinHandle<()>>>,
    132     started: AtomicBool,
    133     shutting_down: AtomicBool,
    134     next_subscription_id: AtomicU64,
    135     event_sink: Option<Arc<dyn RadrootsNostrEventSink>>,
    136 }
    137 
    138 impl RadrootsNostrRuntime {
    139     pub async fn start(&self) -> Result<(), RadrootsNostrRuntimeError> {
    140         if self.inner.started.swap(true, Ordering::SeqCst) {
    141             return Err(RadrootsNostrRuntimeError::RuntimeAlreadyStarted);
    142         }
    143         self.inner.shutting_down.store(false, Ordering::SeqCst);
    144 
    145         let relays = self.relays();
    146         for relay in relays {
    147             if let Err(source) = self.inner.client.add_relay(relay.as_str()).await {
    148                 let message = source.to_string();
    149                 self.record_error(message.clone());
    150                 let _ = self
    151                     .inner
    152                     .queue_tx
    153                     .send(RadrootsNostrRuntimeEvent::Error { message })
    154                     .await;
    155             }
    156         }
    157 
    158         self.spawn_monitor_watcher();
    159         self.inner.client.connect().await;
    160         let _ = self
    161             .inner
    162             .queue_tx
    163             .send(RadrootsNostrRuntimeEvent::RuntimeStarted)
    164             .await;
    165 
    166         Ok(())
    167     }
    168 
    169     pub async fn shutdown(&self) -> Result<(), RadrootsNostrRuntimeError> {
    170         if !self.inner.started.swap(false, Ordering::SeqCst) {
    171             return Err(RadrootsNostrRuntimeError::RuntimeNotStarted);
    172         }
    173         self.inner.shutting_down.store(true, Ordering::SeqCst);
    174 
    175         if let Ok(mut guard) = self.inner.subscription_tasks.lock() {
    176             for (_, handle) in guard.drain() {
    177                 handle.abort();
    178             }
    179         }
    180 
    181         if let Ok(mut guard) = self.inner.monitor_task.lock()
    182             && let Some(handle) = guard.take()
    183         {
    184             handle.abort();
    185         }
    186 
    187         let _ = self
    188             .inner
    189             .queue_tx
    190             .send(RadrootsNostrRuntimeEvent::RuntimeStopped)
    191             .await;
    192 
    193         Ok(())
    194     }
    195 
    196     pub async fn subscribe(
    197         &self,
    198         spec: RadrootsNostrSubscriptionSpec,
    199     ) -> Result<RadrootsNostrSubscriptionHandle, RadrootsNostrRuntimeError> {
    200         if !self.inner.started.load(Ordering::SeqCst) {
    201             return Err(RadrootsNostrRuntimeError::RuntimeNotStarted);
    202         }
    203 
    204         let sequence = self
    205             .inner
    206             .next_subscription_id
    207             .fetch_add(1, Ordering::SeqCst);
    208         let id = alloc::format!("sub-{sequence}");
    209         let handle = RadrootsNostrSubscriptionHandle {
    210             id: id.clone(),
    211             name: spec.name.clone(),
    212         };
    213 
    214         let worker = spawn_subscription_worker(self.inner.clone(), id.clone(), spec);
    215         self.inner
    216             .subscription_tasks
    217             .lock()
    218             .map_err(|_| RadrootsNostrRuntimeError::Runtime("subscription lock poisoned".into()))?
    219             .insert(id, worker);
    220 
    221         Ok(handle)
    222     }
    223 
    224     pub async fn unsubscribe(
    225         &self,
    226         handle: &RadrootsNostrSubscriptionHandle,
    227     ) -> Result<(), RadrootsNostrRuntimeError> {
    228         let removed = self
    229             .inner
    230             .subscription_tasks
    231             .lock()
    232             .map_err(|_| RadrootsNostrRuntimeError::Runtime("subscription lock poisoned".into()))?
    233             .remove(handle.id.as_str());
    234 
    235         let task = removed.ok_or_else(|| {
    236             RadrootsNostrRuntimeError::SubscriptionNotFound(handle.id.to_string())
    237         })?;
    238         task.abort();
    239         let _ = self
    240             .inner
    241             .queue_tx
    242             .send(RadrootsNostrRuntimeEvent::SubscriptionClosed {
    243                 id: handle.id.clone(),
    244             })
    245             .await;
    246 
    247         Ok(())
    248     }
    249 
    250     pub fn set_relays(&self, relays: Vec<String>) -> Result<(), RadrootsNostrRuntimeError> {
    251         if relays.is_empty() {
    252             return Err(RadrootsNostrRuntimeError::InvalidConfig("relays"));
    253         }
    254         self.inner
    255             .relays
    256             .lock()
    257             .map_err(|_| RadrootsNostrRuntimeError::Runtime("relays lock poisoned".into()))
    258             .map(|mut guard| {
    259                 *guard = relays;
    260             })
    261     }
    262 
    263     pub fn relays(&self) -> Vec<String> {
    264         self.inner
    265             .relays
    266             .lock()
    267             .map(|guard| guard.clone())
    268             .unwrap_or_default()
    269     }
    270 
    271     pub fn drain_events(&self, max: usize) -> Vec<RadrootsNostrRuntimeEvent> {
    272         if max == 0 {
    273             return Vec::new();
    274         }
    275 
    276         let mut out = Vec::with_capacity(max);
    277         let mut guard = match self.inner.queue_rx.lock() {
    278             Ok(guard) => guard,
    279             Err(_) => return out,
    280         };
    281 
    282         for _ in 0..max {
    283             match guard.try_recv() {
    284                 Ok(event) => out.push(event),
    285                 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
    286                 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
    287             }
    288         }
    289 
    290         out
    291     }
    292 
    293     pub fn snapshot(&self) -> RadrootsNostrConnectionSnapshot {
    294         let statuses = self
    295             .inner
    296             .statuses
    297             .lock()
    298             .map(|guard| guard.clone())
    299             .unwrap_or_default();
    300         let last_error = self
    301             .inner
    302             .last_error
    303             .lock()
    304             .ok()
    305             .and_then(|guard| guard.clone());
    306 
    307         let mut connected = 0usize;
    308         let mut connecting = 0usize;
    309         for (_, status) in statuses.iter() {
    310             match status {
    311                 RadrootsNostrRelayStatus::Connected => connected += 1,
    312                 RadrootsNostrRelayStatus::Connecting => connecting += 1,
    313                 _ => {}
    314             }
    315         }
    316 
    317         let light = if connected > 0 {
    318             RadrootsNostrTrafficLight::Green
    319         } else if connecting > 0 {
    320             RadrootsNostrTrafficLight::Yellow
    321         } else {
    322             RadrootsNostrTrafficLight::Red
    323         };
    324 
    325         RadrootsNostrConnectionSnapshot {
    326             light,
    327             connected,
    328             connecting,
    329             last_error,
    330         }
    331     }
    332 
    333     fn spawn_monitor_watcher(&self) {
    334         let inner = self.inner.clone();
    335         let handle = tokio::spawn(async move {
    336             if let Some(monitor) = inner.client.monitor() {
    337                 let mut rx = monitor.subscribe();
    338                 while let Ok(notification) = rx.recv().await {
    339                     match notification {
    340                         RadrootsNostrMonitorNotification::StatusChanged { relay_url, status } => {
    341                             if let Ok(mut map) = inner.statuses.lock() {
    342                                 map.insert(relay_url, status);
    343                             }
    344                         }
    345                     }
    346                 }
    347             }
    348         });
    349 
    350         if let Ok(mut guard) = self.inner.monitor_task.lock()
    351             && let Some(existing) = guard.replace(handle)
    352         {
    353             existing.abort();
    354         }
    355     }
    356 
    357     fn record_error(&self, message: String) {
    358         if let Ok(mut guard) = self.inner.last_error.lock() {
    359             *guard = Some(message);
    360         }
    361     }
    362 }
    363 
    364 fn spawn_subscription_worker(
    365     inner: Arc<RadrootsNostrRuntimeInner>,
    366     id: String,
    367     spec: RadrootsNostrSubscriptionSpec,
    368 ) -> JoinHandle<()> {
    369     tokio::spawn(async move {
    370         let _ = inner
    371             .queue_tx
    372             .send(RadrootsNostrRuntimeEvent::SubscriptionOpened { id: id.clone() })
    373             .await;
    374 
    375         let timeout = Duration::from_secs(spec.stream_timeout_secs.max(1));
    376         let reconnect_delay = Duration::from_millis(spec.reconnect_delay_millis.max(1));
    377         let mut since_unix: Option<u64> = None;
    378 
    379         loop {
    380             if inner.shutting_down.load(Ordering::SeqCst) {
    381                 break;
    382             }
    383 
    384             let mut filter = spec.filter.clone();
    385             if let Some(since) = since_unix {
    386                 filter = filter.since(RadrootsNostrTimestamp::from(since));
    387             }
    388 
    389             let mut stream = match inner.client.stream_events(filter, timeout).await {
    390                 Ok(stream) => stream,
    391                 Err(source) => {
    392                     let message = source.to_string();
    393                     if let Ok(mut guard) = inner.last_error.lock() {
    394                         *guard = Some(message.clone());
    395                     }
    396                     let _ = inner
    397                         .queue_tx
    398                         .send(RadrootsNostrRuntimeEvent::Error { message })
    399                         .await;
    400 
    401                     if matches!(spec.policy, RadrootsNostrSubscriptionPolicy::OneShotOnEose) {
    402                         break;
    403                     }
    404 
    405                     tokio::time::sleep(reconnect_delay).await;
    406                     continue;
    407                 }
    408             };
    409 
    410             while let Some(event) = stream.next().await {
    411                 let event_id = event.id.to_hex();
    412                 let author = event.pubkey.to_hex();
    413                 let kind = event.kind.as_u16();
    414                 since_unix = Some(event.created_at.as_secs().saturating_add(1));
    415 
    416                 if let Some(sink) = inner.event_sink.as_ref()
    417                     && let Err(message) = sink.ingest_event(&event)
    418                 {
    419                     if let Ok(mut guard) = inner.last_error.lock() {
    420                         *guard = Some(message.clone());
    421                     }
    422                     let _ = inner
    423                         .queue_tx
    424                         .send(RadrootsNostrRuntimeEvent::Error { message })
    425                         .await;
    426                 }
    427 
    428                 let _ = inner
    429                     .queue_tx
    430                     .send(RadrootsNostrRuntimeEvent::Note {
    431                         subscription_id: id.clone(),
    432                         id: event_id,
    433                         author,
    434                         kind,
    435                         relay: None,
    436                     })
    437                     .await;
    438 
    439                 if inner.shutting_down.load(Ordering::SeqCst) {
    440                     break;
    441                 }
    442             }
    443 
    444             if matches!(spec.policy, RadrootsNostrSubscriptionPolicy::OneShotOnEose) {
    445                 break;
    446             }
    447 
    448             tokio::time::sleep(reconnect_delay).await;
    449         }
    450 
    451         let _ = inner
    452             .queue_tx
    453             .send(RadrootsNostrRuntimeEvent::SubscriptionClosed { id })
    454             .await;
    455     })
    456 }
    457 
    458 #[cfg(test)]
    459 mod tests {
    460     use super::*;
    461     use crate::sink::RadrootsNostrInMemoryEventSink;
    462     use alloc::sync::Arc;
    463     use radroots_nostr::prelude::RadrootsNostrFilter;
    464 
    465     fn sample_runtime() -> RadrootsNostrRuntime {
    466         RadrootsNostrRuntimeBuilder::new()
    467             .keys(RadrootsNostrKeys::generate())
    468             .add_relay("wss://relay.example.com")
    469             .build()
    470             .expect("runtime should build")
    471     }
    472 
    473     #[test]
    474     fn build_requires_keys() {
    475         let result = RadrootsNostrRuntimeBuilder::new()
    476             .add_relay("wss://relay.example.com")
    477             .build();
    478         assert!(matches!(
    479             result,
    480             Err(RadrootsNostrRuntimeError::MissingConfig("keys"))
    481         ));
    482     }
    483 
    484     #[test]
    485     fn build_requires_relays() {
    486         let result = RadrootsNostrRuntimeBuilder::new()
    487             .keys(RadrootsNostrKeys::generate())
    488             .build();
    489         assert!(matches!(
    490             result,
    491             Err(RadrootsNostrRuntimeError::MissingConfig("relays"))
    492         ));
    493     }
    494 
    495     #[test]
    496     fn queue_capacity_must_be_positive() {
    497         let result = RadrootsNostrRuntimeBuilder::new()
    498             .keys(RadrootsNostrKeys::generate())
    499             .add_relay("wss://relay.example.com")
    500             .queue_capacity(0)
    501             .build();
    502         assert!(matches!(
    503             result,
    504             Err(RadrootsNostrRuntimeError::InvalidConfig("queue_capacity"))
    505         ));
    506     }
    507 
    508     #[test]
    509     fn monitor_capacity_must_be_positive() {
    510         let result = RadrootsNostrRuntimeBuilder::new()
    511             .keys(RadrootsNostrKeys::generate())
    512             .add_relay("wss://relay.example.com")
    513             .monitor_capacity(0)
    514             .build();
    515         assert!(matches!(
    516             result,
    517             Err(RadrootsNostrRuntimeError::InvalidConfig("monitor_capacity"))
    518         ));
    519     }
    520 
    521     #[test]
    522     fn build_accepts_event_sink() {
    523         let sink = Arc::new(RadrootsNostrInMemoryEventSink::new());
    524         let result = RadrootsNostrRuntimeBuilder::new()
    525             .keys(RadrootsNostrKeys::generate())
    526             .add_relay("wss://relay.example.com")
    527             .event_sink(sink)
    528             .build();
    529         assert!(result.is_ok());
    530     }
    531 
    532     #[test]
    533     fn set_relays_rejects_empty_input() {
    534         let runtime = sample_runtime();
    535         let result = runtime.set_relays(Vec::new());
    536         assert!(matches!(
    537             result,
    538             Err(RadrootsNostrRuntimeError::InvalidConfig("relays"))
    539         ));
    540     }
    541 
    542     #[test]
    543     fn drain_events_zero_returns_empty() {
    544         let runtime = sample_runtime();
    545         assert!(runtime.drain_events(0).is_empty());
    546     }
    547 
    548     #[tokio::test]
    549     async fn subscribe_requires_started_runtime() {
    550         let runtime = sample_runtime();
    551         let spec = RadrootsNostrSubscriptionSpec::streaming(RadrootsNostrFilter::new());
    552         let result = runtime.subscribe(spec).await;
    553         assert!(matches!(
    554             result,
    555             Err(RadrootsNostrRuntimeError::RuntimeNotStarted)
    556         ));
    557     }
    558 
    559     #[tokio::test]
    560     async fn unsubscribe_requires_existing_subscription() {
    561         let runtime = sample_runtime();
    562         let handle = RadrootsNostrSubscriptionHandle {
    563             id: "sub-999".into(),
    564             name: None,
    565         };
    566         let result = runtime.unsubscribe(&handle).await;
    567         assert!(matches!(
    568             result,
    569             Err(RadrootsNostrRuntimeError::SubscriptionNotFound(_))
    570         ));
    571     }
    572 }