manager.rs (2705B)
1 use std::sync::Arc; 2 use tokio::runtime::Handle; 3 4 use super::inner::Inner; 5 use radroots_nostr::prelude::{ 6 RadrootsNostrKeys, RadrootsNostrTimestamp, radroots_nostr_post_events_filter, 7 }; 8 9 #[derive(Clone)] 10 pub struct NostrClientManager { 11 pub(super) inner: Arc<Inner>, 12 } 13 14 impl NostrClientManager { 15 pub fn new(keys: RadrootsNostrKeys, rt: Handle) -> Self { 16 let inner = Inner::new(keys, rt); 17 let this = Self { 18 inner: inner.clone(), 19 }; 20 this.spawn_status_watcher(); 21 this 22 } 23 } 24 25 impl NostrClientManager { 26 pub fn start_post_event_stream(&self, since_unix: Option<u64>) { 27 if self 28 .inner 29 .post_events_stream 30 .lock() 31 .ok() 32 .is_some_and(|h| h.is_some()) 33 { 34 return; 35 } 36 37 let inner = self.inner.clone(); 38 let rt = inner.rt.clone(); 39 let handle = rt.spawn({ 40 let inner = inner.clone(); 41 async move { 42 use futures::StreamExt; 43 44 let mut since = 45 since_unix.unwrap_or_else(|| RadrootsNostrTimestamp::now().as_secs()); 46 loop { 47 let filter = radroots_nostr_post_events_filter(None, Some(since)); 48 49 let mut stream = match inner 50 .client 51 .stream_events(filter, core::time::Duration::from_secs(30)) 52 .await 53 { 54 Ok(s) => s, 55 Err(_) => { 56 tokio::time::sleep(core::time::Duration::from_secs(2)).await; 57 continue; 58 } 59 }; 60 61 while let Some(event) = stream.next().await { 62 let meta = radroots_nostr::event_adapters::to_post_event_metadata(&event); 63 let ts = event.created_at.as_secs(); 64 since = ts.saturating_add(1); 65 let _ = inner.post_events_tx.send(meta); 66 } 67 } 68 } 69 }); 70 71 if let Ok(mut g) = self.inner.post_events_stream.lock() { 72 *g = Some(handle); 73 } 74 } 75 76 pub fn stop_post_event_stream(&self) { 77 if let Ok(mut g) = self.inner.post_events_stream.lock() 78 && let Some(h) = g.take() 79 { 80 h.abort(); 81 } 82 } 83 84 pub fn subscribe_post_events( 85 &self, 86 ) -> tokio::sync::broadcast::Receiver< 87 radroots_events_codec::parsed::RadrootsParsedData<radroots_events::post::RadrootsPost>, 88 > { 89 self.inner.post_events_tx.subscribe() 90 } 91 }