lib

Core libraries for Radroots
git clone https://radroots.dev/git/lib.git
Log | Files | Refs | README | LICENSE

manager.rs (2705B)


      1 use std::sync::Arc;
      2 use tokio::runtime::Handle;
      3 
      4 use super::inner::Inner;
      5 use radroots_nostr::prelude::{
      6     RadrootsNostrKeys, RadrootsNostrTimestamp, radroots_nostr_post_events_filter,
      7 };
      8 
      9 #[derive(Clone)]
     10 pub struct NostrClientManager {
     11     pub(super) inner: Arc<Inner>,
     12 }
     13 
     14 impl NostrClientManager {
     15     pub fn new(keys: RadrootsNostrKeys, rt: Handle) -> Self {
     16         let inner = Inner::new(keys, rt);
     17         let this = Self {
     18             inner: inner.clone(),
     19         };
     20         this.spawn_status_watcher();
     21         this
     22     }
     23 }
     24 
     25 impl NostrClientManager {
     26     pub fn start_post_event_stream(&self, since_unix: Option<u64>) {
     27         if self
     28             .inner
     29             .post_events_stream
     30             .lock()
     31             .ok()
     32             .is_some_and(|h| h.is_some())
     33         {
     34             return;
     35         }
     36 
     37         let inner = self.inner.clone();
     38         let rt = inner.rt.clone();
     39         let handle = rt.spawn({
     40             let inner = inner.clone();
     41             async move {
     42                 use futures::StreamExt;
     43 
     44                 let mut since =
     45                     since_unix.unwrap_or_else(|| RadrootsNostrTimestamp::now().as_secs());
     46                 loop {
     47                     let filter = radroots_nostr_post_events_filter(None, Some(since));
     48 
     49                     let mut stream = match inner
     50                         .client
     51                         .stream_events(filter, core::time::Duration::from_secs(30))
     52                         .await
     53                     {
     54                         Ok(s) => s,
     55                         Err(_) => {
     56                             tokio::time::sleep(core::time::Duration::from_secs(2)).await;
     57                             continue;
     58                         }
     59                     };
     60 
     61                     while let Some(event) = stream.next().await {
     62                         let meta = radroots_nostr::event_adapters::to_post_event_metadata(&event);
     63                         let ts = event.created_at.as_secs();
     64                         since = ts.saturating_add(1);
     65                         let _ = inner.post_events_tx.send(meta);
     66                     }
     67                 }
     68             }
     69         });
     70 
     71         if let Ok(mut g) = self.inner.post_events_stream.lock() {
     72             *g = Some(handle);
     73         }
     74     }
     75 
     76     pub fn stop_post_event_stream(&self) {
     77         if let Ok(mut g) = self.inner.post_events_stream.lock()
     78             && let Some(h) = g.take()
     79         {
     80             h.abort();
     81         }
     82     }
     83 
     84     pub fn subscribe_post_events(
     85         &self,
     86     ) -> tokio::sync::broadcast::Receiver<
     87         radroots_events_codec::parsed::RadrootsParsedData<radroots_events::post::RadrootsPost>,
     88     > {
     89         self.inner.post_events_tx.subscribe()
     90     }
     91 }