commit 8647874eec7e34409e7bc38a54bc1fcd83bd48d1
parent e049fd399ef6f6729da5c8d4ac6200b305e9fe11
Author: triesap <tyson@radroots.org>
Date: Mon, 6 Oct 2025 15:16:59 +0100
net-core: add async post event streaming with broadcast channel integration
Diffstat:
4 files changed, 103 insertions(+), 0 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -677,6 +677,7 @@ checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
+ "futures-executor",
"futures-io",
"futures-sink",
"futures-task",
@@ -700,12 +701,34 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
+name = "futures-executor"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
+dependencies = [
+ "futures-core",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
name = "futures-io"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
[[package]]
+name = "futures-macro"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
name = "futures-sink"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -726,6 +749,7 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-io",
+ "futures-macro",
"futures-sink",
"futures-task",
"memchr",
@@ -1699,6 +1723,7 @@ name = "radroots-net-core"
version = "0.1.0"
dependencies = [
"directories",
+ "futures",
"hex",
"nostr",
"nostr-sdk",
diff --git a/crates/net-core/Cargo.toml b/crates/net-core/Cargo.toml
@@ -42,3 +42,4 @@ tokio = { workspace = true, optional = true, features = ["rt-multi-thread"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["fmt", "env-filter"] }
tracing-appender = { workspace = true }
+futures = { workspace = true }
diff --git a/crates/net-core/src/nostr_client/inner.rs b/crates/net-core/src/nostr_client/inner.rs
@@ -2,7 +2,10 @@ use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use nostr_sdk::prelude::*;
+use radroots_events::post::models::RadrootsPostEventMetadata;
use tokio::runtime::Handle;
+use tokio::sync::broadcast;
+use tokio::task::JoinHandle;
pub(super) struct Inner {
pub client: Client,
@@ -10,12 +13,15 @@ pub(super) struct Inner {
pub statuses: Arc<Mutex<HashMap<RelayUrl, RelayStatus>>>,
pub last_error: Arc<Mutex<Option<String>>>,
pub rt: Handle,
+ pub posts_tx: broadcast::Sender<RadrootsPostEventMetadata>,
+ pub posts_stream: Arc<Mutex<Option<JoinHandle<()>>>>,
}
impl Inner {
pub fn new(keys: nostr::Keys, rt: Handle) -> Arc<Self> {
let monitor = Monitor::new(2048);
let client = Client::builder().signer(keys).monitor(monitor).build();
+ let (tx, _) = broadcast::channel(2048);
Arc::new(Self {
client,
@@ -23,6 +29,8 @@ impl Inner {
statuses: Arc::new(Mutex::new(HashMap::new())),
last_error: Arc::new(Mutex::new(None)),
rt,
+ posts_tx: tx,
+ posts_stream: Arc::new(Mutex::new(None)),
})
}
}
diff --git a/crates/net-core/src/nostr_client/manager.rs b/crates/net-core/src/nostr_client/manager.rs
@@ -18,3 +18,72 @@ impl NostrClientManager {
this
}
}
+
+impl NostrClientManager {
+ pub fn start_textnote_stream(&self, since_unix: Option<u64>) {
+ if self
+ .inner
+ .posts_stream
+ .lock()
+ .ok()
+ .is_some_and(|h| h.is_some())
+ {
+ return;
+ }
+
+ let inner = self.inner.clone();
+ let rt = inner.rt.clone();
+ let handle = rt.spawn({
+ let inner = inner.clone();
+ async move {
+ use futures::StreamExt;
+ use nostr_sdk::prelude::*;
+
+ let mut since = since_unix.unwrap_or_else(|| Timestamp::now().as_u64());
+ loop {
+ let filter = Filter::new()
+ .kind(Kind::TextNote)
+ .since(Timestamp::from(since));
+
+ let mut stream = match inner
+ .client
+ .stream_events(filter, core::time::Duration::from_secs(30))
+ .await
+ {
+ Ok(s) => s,
+ Err(_) => {
+ tokio::time::sleep(core::time::Duration::from_secs(2)).await;
+ continue;
+ }
+ };
+
+ while let Some(event) = stream.next().await {
+ let meta = radroots_nostr::event_adapters::to_post_event_metadata(&event);
+ let ts = event.created_at.as_u64();
+ since = ts.saturating_add(1);
+ let _ = inner.posts_tx.send(meta);
+ }
+ }
+ }
+ });
+
+ if let Ok(mut g) = self.inner.posts_stream.lock() {
+ *g = Some(handle);
+ }
+ }
+
+ pub fn stop_textnote_stream(&self) {
+ if let Ok(mut g) = self.inner.posts_stream.lock() {
+ if let Some(h) = g.take() {
+ h.abort();
+ }
+ }
+ }
+
+ pub fn subscribe_text_notes(
+ &self,
+ ) -> tokio::sync::broadcast::Receiver<radroots_events::post::models::RadrootsPostEventMetadata>
+ {
+ self.inner.posts_tx.subscribe()
+ }
+}