commit 41060be766604e8019fd965b07bc2fcf1fb9a193
parent ec5e59445a0e475ef3b41040d98cdaaaa1f498b0
Author: triesap <tyson@radroots.org>
Date: Thu, 19 Feb 2026 18:34:00 +0000
nostr-ndb: add subscription poll and stream api
- add filter spec conversion into nostrdb query filter objects
- add subscription handles with poll, wait, and stream accessors
- add note key wrappers for stable external subscription results
- add unit and async tests for subscribe, poll, wait, and unsubscribe flows
Diffstat:
7 files changed, 389 insertions(+), 0 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -2237,6 +2237,7 @@ name = "radroots-nostr-ndb"
version = "0.1.0"
dependencies = [
"futures",
+ "hex",
"nostrdb",
"radroots-nostr",
"radroots-nostr-runtime",
diff --git a/nostr-ndb/Cargo.toml b/nostr-ndb/Cargo.toml
@@ -18,6 +18,7 @@ giftwrap = ["std", "ndb"]
radroots-nostr = { workspace = true, default-features = false, features = ["std"] }
radroots-nostr-runtime = { workspace = true, optional = true, default-features = false, features = ["std", "rt", "nostr-client"] }
futures = { workspace = true, optional = true }
+hex = { workspace = true }
nostrdb = { version = "0.9.0", optional = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
diff --git a/nostr-ndb/src/error.rs b/nostr-ndb/src/error.rs
@@ -5,6 +5,19 @@ pub enum RadrootsNostrNdbError {
#[error("database path must be utf-8")]
NonUtf8Path,
+ #[error("invalid hex for {field}: {reason}")]
+ InvalidHex {
+ field: &'static str,
+ reason: String,
+ },
+
+ #[error("invalid hex length for {field}: expected {expected} bytes, got {actual}")]
+ InvalidHexLength {
+ field: &'static str,
+ expected: usize,
+ actual: usize,
+ },
+
#[error("event json encode failed: {0}")]
EventJsonEncode(String),
diff --git a/nostr-ndb/src/filter.rs b/nostr-ndb/src/filter.rs
@@ -0,0 +1,155 @@
+use crate::error::RadrootsNostrNdbError;
+
+#[derive(Debug, Clone, Eq, PartialEq, Default)]
+pub struct RadrootsNostrNdbFilterSpec {
+ event_ids_hex: Vec<String>,
+ authors_hex: Vec<String>,
+ kinds: Vec<u16>,
+ since_unix: Option<u64>,
+ until_unix: Option<u64>,
+ limit: Option<u64>,
+ search: Option<String>,
+}
+
+impl RadrootsNostrNdbFilterSpec {
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ pub fn text_notes(limit: Option<u64>, since_unix: Option<u64>) -> Self {
+ let mut filter = Self::new().with_kind(1);
+ if let Some(limit) = limit {
+ filter = filter.with_limit(limit);
+ }
+ if let Some(since_unix) = since_unix {
+ filter = filter.with_since_unix(since_unix);
+ }
+ filter
+ }
+
+ pub fn with_event_id_hex(mut self, id_hex: impl Into<String>) -> Self {
+ self.event_ids_hex.push(id_hex.into());
+ self
+ }
+
+ pub fn with_author_hex(mut self, author_hex: impl Into<String>) -> Self {
+ self.authors_hex.push(author_hex.into());
+ self
+ }
+
+ pub fn with_kind(mut self, kind: u16) -> Self {
+ self.kinds.push(kind);
+ self
+ }
+
+ pub fn with_since_unix(mut self, since_unix: u64) -> Self {
+ self.since_unix = Some(since_unix);
+ self
+ }
+
+ pub fn with_until_unix(mut self, until_unix: u64) -> Self {
+ self.until_unix = Some(until_unix);
+ self
+ }
+
+ pub fn with_limit(mut self, limit: u64) -> Self {
+ self.limit = Some(limit);
+ self
+ }
+
+ pub fn with_search(mut self, search: impl Into<String>) -> Self {
+ self.search = Some(search.into());
+ self
+ }
+
+ pub fn event_ids_hex(&self) -> &[String] {
+ &self.event_ids_hex
+ }
+
+ pub fn authors_hex(&self) -> &[String] {
+ &self.authors_hex
+ }
+
+ pub fn kinds(&self) -> &[u16] {
+ &self.kinds
+ }
+
+ pub fn since_unix(&self) -> Option<u64> {
+ self.since_unix
+ }
+
+ pub fn until_unix(&self) -> Option<u64> {
+ self.until_unix
+ }
+
+ pub fn limit(&self) -> Option<u64> {
+ self.limit
+ }
+
+ pub fn search(&self) -> Option<&str> {
+ self.search.as_deref()
+ }
+
+ pub(crate) fn to_ndb_filter(&self) -> Result<nostrdb::Filter, RadrootsNostrNdbError> {
+ let mut builder = nostrdb::Filter::new();
+
+ if !self.event_ids_hex.is_empty() {
+ let event_ids = self
+ .event_ids_hex
+ .iter()
+ .map(|hex_value| parse_hex_32(hex_value, "event_id"))
+ .collect::<Result<Vec<_>, _>>()?;
+ builder = builder.ids(event_ids.iter());
+ }
+
+ if !self.authors_hex.is_empty() {
+ let authors = self
+ .authors_hex
+ .iter()
+ .map(|hex_value| parse_hex_32(hex_value, "author"))
+ .collect::<Result<Vec<_>, _>>()?;
+ builder = builder.authors(authors.iter());
+ }
+
+ if !self.kinds.is_empty() {
+ builder = builder.kinds(self.kinds.iter().map(|kind| *kind as u64));
+ }
+
+ if let Some(since_unix) = self.since_unix {
+ builder = builder.since(since_unix);
+ }
+
+ if let Some(until_unix) = self.until_unix {
+ builder = builder.until(until_unix);
+ }
+
+ if let Some(limit) = self.limit {
+ builder = builder.limit(limit);
+ }
+
+ if let Some(search) = self.search() {
+ builder = builder.search(search);
+ }
+
+ Ok(builder.build())
+ }
+}
+
+fn parse_hex_32(value: &str, field: &'static str) -> Result<[u8; 32], RadrootsNostrNdbError> {
+ let bytes = hex::decode(value).map_err(|source| RadrootsNostrNdbError::InvalidHex {
+ field,
+ reason: source.to_string(),
+ })?;
+
+ if bytes.len() != 32 {
+ return Err(RadrootsNostrNdbError::InvalidHexLength {
+ field,
+ expected: 32,
+ actual: bytes.len(),
+ });
+ }
+
+ let mut out = [0u8; 32];
+ out.copy_from_slice(bytes.as_slice());
+ Ok(out)
+}
diff --git a/nostr-ndb/src/lib.rs b/nostr-ndb/src/lib.rs
@@ -10,20 +10,33 @@ extern crate alloc;
pub mod config;
pub mod error;
#[cfg(feature = "ndb")]
+pub mod filter;
+#[cfg(feature = "ndb")]
pub mod ingest;
#[cfg(feature = "ndb")]
pub mod ndb;
#[cfg(all(feature = "ndb", feature = "runtime-adapter"))]
pub mod runtime_adapter;
+#[cfg(feature = "ndb")]
+pub mod subscription;
pub mod prelude {
#[cfg(feature = "ndb")]
pub use crate::config::RadrootsNostrNdbConfig;
pub use crate::error::RadrootsNostrNdbError;
#[cfg(feature = "ndb")]
+ pub use crate::filter::RadrootsNostrNdbFilterSpec;
+ #[cfg(feature = "ndb")]
pub use crate::ingest::RadrootsNostrNdbIngestSource;
#[cfg(feature = "ndb")]
pub use crate::ndb::RadrootsNostrNdb;
#[cfg(all(feature = "ndb", feature = "runtime-adapter"))]
pub use crate::runtime_adapter::RadrootsNostrNdbEventStoreAdapter;
+ #[cfg(all(feature = "ndb", feature = "rt"))]
+ pub use crate::subscription::RadrootsNostrNdbSubscriptionStream;
+ #[cfg(feature = "ndb")]
+ pub use crate::subscription::{
+ RadrootsNostrNdbNoteKey, RadrootsNostrNdbSubscriptionHandle,
+ RadrootsNostrNdbSubscriptionSpec,
+ };
}
diff --git a/nostr-ndb/src/ndb.rs b/nostr-ndb/src/ndb.rs
@@ -1,6 +1,10 @@
use crate::config::RadrootsNostrNdbConfig;
use crate::error::RadrootsNostrNdbError;
use crate::ingest::RadrootsNostrNdbIngestSource;
+use crate::subscription::{
+ RadrootsNostrNdbNoteKey, RadrootsNostrNdbSubscriptionHandle, RadrootsNostrNdbSubscriptionSpec,
+ RadrootsNostrNdbSubscriptionStream,
+};
use radroots_nostr::prelude::RadrootsNostrEvent;
use std::path::Path;
@@ -54,13 +58,77 @@ impl RadrootsNostrNdb {
.map_err(|source| RadrootsNostrNdbError::EventJsonEncode(source.to_string()))?;
self.ingest_event_json_with_source(json.as_str(), source)
}
+
+ pub fn subscribe(
+ &self,
+ spec: &RadrootsNostrNdbSubscriptionSpec,
+ ) -> Result<RadrootsNostrNdbSubscriptionHandle, RadrootsNostrNdbError> {
+ let filters = spec
+ .filters()
+ .iter()
+ .map(|filter_spec| filter_spec.to_ndb_filter())
+ .collect::<Result<Vec<_>, _>>()?;
+ let subscription = self.inner.subscribe(filters.as_slice())?;
+ Ok(RadrootsNostrNdbSubscriptionHandle::new(subscription.id()))
+ }
+
+ pub fn unsubscribe(
+ &self,
+ handle: RadrootsNostrNdbSubscriptionHandle,
+ ) -> Result<(), RadrootsNostrNdbError> {
+ let mut inner = self.inner.clone();
+ inner.unsubscribe(nostrdb::Subscription::new(handle.id()))?;
+ Ok(())
+ }
+
+ pub fn poll_for_note_keys(
+ &self,
+ handle: RadrootsNostrNdbSubscriptionHandle,
+ max_notes: u32,
+ ) -> Vec<RadrootsNostrNdbNoteKey> {
+ self.inner
+ .poll_for_notes(nostrdb::Subscription::new(handle.id()), max_notes)
+ .into_iter()
+ .map(|note_key| RadrootsNostrNdbNoteKey::new(note_key.as_u64()))
+ .collect()
+ }
+
+ #[cfg(feature = "rt")]
+ pub async fn wait_for_note_keys(
+ &self,
+ handle: RadrootsNostrNdbSubscriptionHandle,
+ max_notes: u32,
+ ) -> Result<Vec<RadrootsNostrNdbNoteKey>, RadrootsNostrNdbError> {
+ let note_keys = self
+ .inner
+ .wait_for_notes(nostrdb::Subscription::new(handle.id()), max_notes)
+ .await?;
+ Ok(note_keys
+ .into_iter()
+ .map(|note_key| RadrootsNostrNdbNoteKey::new(note_key.as_u64()))
+ .collect())
+ }
+
+ #[cfg(feature = "rt")]
+ pub fn subscription_stream(
+ &self,
+ handle: RadrootsNostrNdbSubscriptionHandle,
+ notes_per_await: u32,
+ ) -> RadrootsNostrNdbSubscriptionStream {
+ let stream = nostrdb::Subscription::new(handle.id())
+ .stream(&self.inner)
+ .notes_per_await(notes_per_await.max(1));
+ RadrootsNostrNdbSubscriptionStream { inner: stream }
+ }
}
#[cfg(test)]
mod tests {
use super::*;
+ use crate::filter::RadrootsNostrNdbFilterSpec;
use crate::ingest::RadrootsNostrNdbIngestSource;
use radroots_nostr::prelude::{RadrootsNostrEventBuilder, RadrootsNostrKeys};
+ use std::time::Duration;
use tempfile::TempDir;
#[test]
@@ -121,4 +189,60 @@ mod tests {
ndb.ingest_event(&event, RadrootsNostrNdbIngestSource::client())
.expect("ingest should succeed");
}
+
+ #[test]
+ fn subscribe_poll_and_unsubscribe_round_trip() {
+ let tmp_dir = TempDir::new().expect("tempdir should open");
+ let db_dir = tmp_dir.path().join("ndb");
+ let config = RadrootsNostrNdbConfig::new(&db_dir);
+ let ndb = RadrootsNostrNdb::open(config).expect("database should open");
+ let spec = RadrootsNostrNdbSubscriptionSpec::single(
+ RadrootsNostrNdbFilterSpec::new()
+ .with_kind(1)
+ .with_limit(10),
+ );
+ let handle = ndb.subscribe(&spec).expect("subscribe should succeed");
+
+ let keys = RadrootsNostrKeys::generate();
+ let event = RadrootsNostrEventBuilder::text_note("subscription test")
+ .sign_with_keys(&keys)
+ .expect("event should sign");
+ ndb.ingest_event(&event, RadrootsNostrNdbIngestSource::relay_unknown())
+ .expect("ingest should succeed");
+
+ let mut notes = Vec::new();
+ for _ in 0..40 {
+ notes = ndb.poll_for_note_keys(handle, 32);
+ if !notes.is_empty() {
+ break;
+ }
+ std::thread::sleep(Duration::from_millis(25));
+ }
+
+ assert!(!notes.is_empty());
+ ndb.unsubscribe(handle).expect("unsubscribe should succeed");
+ }
+
+ #[tokio::test]
+ async fn wait_for_note_keys_yields_results() {
+ let tmp_dir = TempDir::new().expect("tempdir should open");
+ let db_dir = tmp_dir.path().join("ndb");
+ let config = RadrootsNostrNdbConfig::new(&db_dir);
+ let ndb = RadrootsNostrNdb::open(config).expect("database should open");
+ let spec = RadrootsNostrNdbSubscriptionSpec::text_notes(Some(10), None);
+ let handle = ndb.subscribe(&spec).expect("subscribe should succeed");
+
+ let keys = RadrootsNostrKeys::generate();
+ let event = RadrootsNostrEventBuilder::text_note("wait test")
+ .sign_with_keys(&keys)
+ .expect("event should sign");
+ ndb.ingest_event(&event, RadrootsNostrNdbIngestSource::relay_unknown())
+ .expect("ingest should succeed");
+
+ let notes = ndb
+ .wait_for_note_keys(handle, 32)
+ .await
+ .expect("wait should succeed");
+ assert!(!notes.is_empty());
+ }
}
diff --git a/nostr-ndb/src/subscription.rs b/nostr-ndb/src/subscription.rs
@@ -0,0 +1,82 @@
+use crate::filter::RadrootsNostrNdbFilterSpec;
+
+#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
+pub struct RadrootsNostrNdbSubscriptionHandle {
+ id: u64,
+}
+
+impl RadrootsNostrNdbSubscriptionHandle {
+ pub(crate) fn new(id: u64) -> Self {
+ Self { id }
+ }
+
+ pub fn id(self) -> u64 {
+ self.id
+ }
+}
+
+#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
+pub struct RadrootsNostrNdbNoteKey {
+ key: u64,
+}
+
+impl RadrootsNostrNdbNoteKey {
+ pub(crate) fn new(key: u64) -> Self {
+ Self { key }
+ }
+
+ pub fn as_u64(self) -> u64 {
+ self.key
+ }
+}
+
+#[derive(Debug, Clone, Eq, PartialEq)]
+pub struct RadrootsNostrNdbSubscriptionSpec {
+ filters: Vec<RadrootsNostrNdbFilterSpec>,
+}
+
+impl RadrootsNostrNdbSubscriptionSpec {
+ pub fn new(filters: Vec<RadrootsNostrNdbFilterSpec>) -> Self {
+ Self { filters }
+ }
+
+ pub fn single(filter: RadrootsNostrNdbFilterSpec) -> Self {
+ Self {
+ filters: vec![filter],
+ }
+ }
+
+ pub fn text_notes(limit: Option<u64>, since_unix: Option<u64>) -> Self {
+ Self::single(RadrootsNostrNdbFilterSpec::text_notes(limit, since_unix))
+ }
+
+ pub fn filters(&self) -> &[RadrootsNostrNdbFilterSpec] {
+ &self.filters
+ }
+}
+
+#[cfg(feature = "rt")]
+pub struct RadrootsNostrNdbSubscriptionStream {
+ pub(crate) inner: nostrdb::SubscriptionStream,
+}
+
+#[cfg(feature = "rt")]
+impl futures::Stream for RadrootsNostrNdbSubscriptionStream {
+ type Item = Vec<RadrootsNostrNdbNoteKey>;
+
+ fn poll_next(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Option<Self::Item>> {
+ match std::pin::Pin::new(&mut self.inner).poll_next(cx) {
+ std::task::Poll::Ready(Some(note_keys)) => std::task::Poll::Ready(Some(
+ note_keys
+ .into_iter()
+ .map(|note_key| RadrootsNostrNdbNoteKey::new(note_key.as_u64()))
+ .collect(),
+ )),
+ std::task::Poll::Ready(None) => std::task::Poll::Ready(None),
+ std::task::Poll::Pending => std::task::Poll::Pending,
+ }
+ }
+}