lib

Core libraries for Radroots
git clone https://radroots.dev/git/lib.git
Log | Files | Refs | README | LICENSE

ndb.rs (41453B)


      1 use crate::config::RadrootsNostrNdbConfig;
      2 use crate::error::RadrootsNostrNdbError;
      3 use crate::filter::parse_hex_32;
      4 use crate::ingest::RadrootsNostrNdbIngestSource;
      5 use crate::query::{RadrootsNostrNdbNote, RadrootsNostrNdbProfile, RadrootsNostrNdbQuerySpec};
      6 use crate::subscription::{
      7     RadrootsNostrNdbNoteKey, RadrootsNostrNdbSubscriptionHandle, RadrootsNostrNdbSubscriptionSpec,
      8     RadrootsNostrNdbSubscriptionStream,
      9 };
     10 use radroots_nostr::prelude::RadrootsNostrEvent;
     11 use std::path::Path;
     12 
     13 #[derive(Debug, Clone)]
     14 pub struct RadrootsNostrNdb {
     15     db_dir: std::path::PathBuf,
     16     pub(crate) inner: nostrdb::Ndb,
     17 }
     18 
     19 #[cfg(test)]
     20 mod test_hooks {
     21     use std::sync::atomic::{AtomicBool, Ordering};
     22 
     23     pub static FORCE_EVENT_JSON_ERROR: AtomicBool = AtomicBool::new(false);
     24     pub static FORCE_PROCESS_EVENT_ERROR: AtomicBool = AtomicBool::new(false);
     25     pub static FORCE_SUBSCRIBE_ERROR: AtomicBool = AtomicBool::new(false);
     26     pub static FORCE_UNSUBSCRIBE_ERROR: AtomicBool = AtomicBool::new(false);
     27     pub static FORCE_WAIT_ERROR: AtomicBool = AtomicBool::new(false);
     28     pub static FORCE_TRANSACTION_ERROR: AtomicBool = AtomicBool::new(false);
     29     pub static FORCE_QUERY_ERROR: AtomicBool = AtomicBool::new(false);
     30     pub static FORCE_NOTE_JSON_ERROR: AtomicBool = AtomicBool::new(false);
     31     pub static FORCE_PROFILE_QUERY_ERROR: AtomicBool = AtomicBool::new(false);
     32 
     33     pub fn take(flag: &AtomicBool) -> bool {
     34         flag.swap(false, Ordering::SeqCst)
     35     }
     36 }
     37 
     38 fn map_profile_lookup_result<T>(
     39     result: Result<T, nostrdb::Error>,
     40 ) -> Result<Option<T>, RadrootsNostrNdbError> {
     41     match result {
     42         Ok(value) => Ok(Some(value)),
     43         Err(nostrdb::Error::NotFound) => Ok(None),
     44         Err(source) => Err(source.into()),
     45     }
     46 }
     47 
     48 impl RadrootsNostrNdb {
     49     fn serialize_event(event: &RadrootsNostrEvent) -> Result<String, RadrootsNostrNdbError> {
     50         #[cfg(test)]
     51         if test_hooks::take(&test_hooks::FORCE_EVENT_JSON_ERROR) {
     52             return Err(RadrootsNostrNdbError::EventJsonEncode(
     53                 "forced event json error".into(),
     54             ));
     55         }
     56         serde_json::to_string(event).map_err(Into::into)
     57     }
     58 
     59     fn process_event_with_inner(
     60         &self,
     61         json: &str,
     62         metadata: nostrdb::IngestMetadata,
     63     ) -> Result<(), RadrootsNostrNdbError> {
     64         #[cfg(test)]
     65         if test_hooks::take(&test_hooks::FORCE_PROCESS_EVENT_ERROR) {
     66             return Err(RadrootsNostrNdbError::Ndb(
     67                 "forced process event error".into(),
     68             ));
     69         }
     70         self.inner
     71             .process_event_with(json, metadata)
     72             .map_err(Into::into)
     73     }
     74 
     75     fn subscribe_inner(
     76         &self,
     77         filters: &[nostrdb::Filter],
     78     ) -> Result<nostrdb::Subscription, RadrootsNostrNdbError> {
     79         #[cfg(test)]
     80         if test_hooks::take(&test_hooks::FORCE_SUBSCRIBE_ERROR) {
     81             return Err(RadrootsNostrNdbError::Ndb("forced subscribe error".into()));
     82         }
     83         self.inner.subscribe(filters).map_err(Into::into)
     84     }
     85 
     86     fn unsubscribe_inner(
     87         &self,
     88         subscription: nostrdb::Subscription,
     89     ) -> Result<(), RadrootsNostrNdbError> {
     90         #[cfg(test)]
     91         if test_hooks::take(&test_hooks::FORCE_UNSUBSCRIBE_ERROR) {
     92             return Err(RadrootsNostrNdbError::Ndb(
     93                 "forced unsubscribe error".into(),
     94             ));
     95         }
     96         let mut inner = self.inner.clone();
     97         inner.unsubscribe(subscription).map_err(Into::into)
     98     }
     99 
    100     #[cfg(feature = "rt")]
    101     async fn wait_for_notes_inner(
    102         &self,
    103         subscription: nostrdb::Subscription,
    104         max_notes: u32,
    105     ) -> Result<Vec<nostrdb::NoteKey>, RadrootsNostrNdbError> {
    106         #[cfg(test)]
    107         if test_hooks::take(&test_hooks::FORCE_WAIT_ERROR) {
    108             return Err(RadrootsNostrNdbError::Ndb("forced wait error".into()));
    109         }
    110         self.inner
    111             .wait_for_notes(subscription, max_notes)
    112             .await
    113             .map_err(Into::into)
    114     }
    115 
    116     fn open_txn(&self) -> Result<nostrdb::Transaction, RadrootsNostrNdbError> {
    117         #[cfg(test)]
    118         if test_hooks::take(&test_hooks::FORCE_TRANSACTION_ERROR) {
    119             return Err(RadrootsNostrNdbError::Ndb(
    120                 "forced transaction error".into(),
    121             ));
    122         }
    123         nostrdb::Transaction::new(&self.inner).map_err(Into::into)
    124     }
    125 
    126     fn query_inner<'a>(
    127         &self,
    128         txn: &'a nostrdb::Transaction,
    129         filters: &[nostrdb::Filter],
    130         max_results: i32,
    131     ) -> Result<Vec<nostrdb::QueryResult<'a>>, RadrootsNostrNdbError> {
    132         #[cfg(test)]
    133         if test_hooks::take(&test_hooks::FORCE_QUERY_ERROR) {
    134             return Err(RadrootsNostrNdbError::Ndb("forced query error".into()));
    135         }
    136         self.inner
    137             .query(txn, filters, max_results)
    138             .map_err(Into::into)
    139     }
    140 
    141     fn note_json_value(note: &nostrdb::Note) -> Result<String, RadrootsNostrNdbError> {
    142         #[cfg(test)]
    143         if test_hooks::take(&test_hooks::FORCE_NOTE_JSON_ERROR) {
    144             return Err(RadrootsNostrNdbError::Ndb("forced note json error".into()));
    145         }
    146         note.json().map_err(Into::into)
    147     }
    148 
    149     fn get_profile_record<'a>(
    150         &self,
    151         txn: &'a nostrdb::Transaction,
    152         pubkey: &[u8; 32],
    153     ) -> Result<Option<nostrdb::ProfileRecord<'a>>, RadrootsNostrNdbError> {
    154         #[cfg(test)]
    155         if test_hooks::take(&test_hooks::FORCE_PROFILE_QUERY_ERROR) {
    156             return map_profile_lookup_result(Err(nostrdb::Error::QueryError));
    157         }
    158         map_profile_lookup_result(self.inner.get_profile_by_pubkey(txn, pubkey))
    159     }
    160 
    161     pub fn open(config: RadrootsNostrNdbConfig) -> Result<Self, RadrootsNostrNdbError> {
    162         let mut inner_config = nostrdb::Config::new().skip_validation(config.skip_validation());
    163         if let Some(mapsize_bytes) = config.mapsize_bytes() {
    164             inner_config = inner_config.set_mapsize(mapsize_bytes);
    165         }
    166         if let Some(ingester_threads) = config.ingester_threads() {
    167             inner_config = inner_config.set_ingester_threads(ingester_threads);
    168         }
    169 
    170         let db_dir = config.db_dir().to_path_buf();
    171         let db_dir_str = db_dir.to_str().ok_or(RadrootsNostrNdbError::NonUtf8Path)?;
    172         let inner = nostrdb::Ndb::new(db_dir_str, &inner_config)?;
    173 
    174         Ok(Self { db_dir, inner })
    175     }
    176 
    177     pub fn db_dir(&self) -> &Path {
    178         &self.db_dir
    179     }
    180 
    181     pub fn ingest_event_json_with_source(
    182         &self,
    183         json: &str,
    184         source: RadrootsNostrNdbIngestSource,
    185     ) -> Result<(), RadrootsNostrNdbError> {
    186         let metadata = source.to_ndb_metadata();
    187         self.process_event_with_inner(json, metadata)?;
    188         Ok(())
    189     }
    190 
    191     pub fn ingest_event_json(&self, json: &str) -> Result<(), RadrootsNostrNdbError> {
    192         self.ingest_event_json_with_source(json, RadrootsNostrNdbIngestSource::default())
    193     }
    194 
    195     pub fn ingest_event(
    196         &self,
    197         event: &RadrootsNostrEvent,
    198         source: RadrootsNostrNdbIngestSource,
    199     ) -> Result<(), RadrootsNostrNdbError> {
    200         let json = Self::serialize_event(event)?;
    201         self.ingest_event_json_with_source(json.as_str(), source)
    202     }
    203 
    204     #[cfg(feature = "giftwrap")]
    205     pub fn add_giftwrap_secret_key(&self, secret_key: [u8; 32]) -> bool {
    206         self.inner.add_key(&secret_key)
    207     }
    208 
    209     #[cfg(feature = "giftwrap")]
    210     pub fn add_giftwrap_secret_key_hex(
    211         &self,
    212         secret_key_hex: &str,
    213     ) -> Result<bool, RadrootsNostrNdbError> {
    214         let secret_key = parse_hex_32(secret_key_hex, "secret_key")?;
    215         Ok(self.add_giftwrap_secret_key(secret_key))
    216     }
    217 
    218     #[cfg(feature = "giftwrap")]
    219     pub fn process_giftwraps(&self) -> Result<(), RadrootsNostrNdbError> {
    220         let txn = nostrdb::Transaction::new(&self.inner)?;
    221         self.inner.process_giftwraps(&txn);
    222         Ok(())
    223     }
    224 
    225     pub fn subscribe(
    226         &self,
    227         spec: &RadrootsNostrNdbSubscriptionSpec,
    228     ) -> Result<RadrootsNostrNdbSubscriptionHandle, RadrootsNostrNdbError> {
    229         let filters = spec
    230             .filters()
    231             .iter()
    232             .map(|filter_spec| filter_spec.to_ndb_filter())
    233             .collect::<Result<Vec<_>, _>>()?;
    234         let subscription = self.subscribe_inner(filters.as_slice())?;
    235         Ok(RadrootsNostrNdbSubscriptionHandle::new(subscription.id()))
    236     }
    237 
    238     pub fn unsubscribe(
    239         &self,
    240         handle: RadrootsNostrNdbSubscriptionHandle,
    241     ) -> Result<(), RadrootsNostrNdbError> {
    242         let subscription = nostrdb::Subscription::new(handle.id());
    243         self.unsubscribe_inner(subscription)?;
    244         Ok(())
    245     }
    246 
    247     pub fn poll_for_note_keys(
    248         &self,
    249         handle: RadrootsNostrNdbSubscriptionHandle,
    250         max_notes: u32,
    251     ) -> Vec<RadrootsNostrNdbNoteKey> {
    252         self.inner
    253             .poll_for_notes(nostrdb::Subscription::new(handle.id()), max_notes)
    254             .into_iter()
    255             .map(|note_key| RadrootsNostrNdbNoteKey::new(note_key.as_u64()))
    256             .collect()
    257     }
    258 
    259     #[cfg(feature = "rt")]
    260     pub async fn wait_for_note_keys(
    261         &self,
    262         handle: RadrootsNostrNdbSubscriptionHandle,
    263         max_notes: u32,
    264     ) -> Result<Vec<RadrootsNostrNdbNoteKey>, RadrootsNostrNdbError> {
    265         let note_keys = self
    266             .wait_for_notes_inner(nostrdb::Subscription::new(handle.id()), max_notes)
    267             .await?;
    268         Ok(note_keys
    269             .into_iter()
    270             .map(|note_key| RadrootsNostrNdbNoteKey::new(note_key.as_u64()))
    271             .collect())
    272     }
    273 
    274     #[cfg(feature = "rt")]
    275     pub fn subscription_stream(
    276         &self,
    277         handle: RadrootsNostrNdbSubscriptionHandle,
    278         notes_per_await: u32,
    279     ) -> RadrootsNostrNdbSubscriptionStream {
    280         let stream = nostrdb::Subscription::new(handle.id())
    281             .stream(&self.inner)
    282             .notes_per_await(notes_per_await.max(1));
    283         RadrootsNostrNdbSubscriptionStream { inner: stream }
    284     }
    285 
    286     pub fn query_notes(
    287         &self,
    288         spec: &RadrootsNostrNdbQuerySpec,
    289     ) -> Result<Vec<RadrootsNostrNdbNote>, RadrootsNostrNdbError> {
    290         if spec.filters().is_empty() {
    291             return Ok(Vec::new());
    292         }
    293 
    294         let filters = spec
    295             .filters()
    296             .iter()
    297             .map(|filter_spec| filter_spec.to_ndb_filter())
    298             .collect::<Result<Vec<_>, _>>()?;
    299         let txn = self.open_txn()?;
    300         let query_results =
    301             self.query_inner(&txn, filters.as_slice(), spec.max_results() as i32)?;
    302 
    303         query_results
    304             .into_iter()
    305             .map(|query_result| {
    306                 let note = query_result.note;
    307                 let json = Self::note_json_value(&note)?;
    308                 Ok(RadrootsNostrNdbNote {
    309                     note_key: query_result.note_key.as_u64(),
    310                     id_hex: hex::encode(note.id()),
    311                     author_hex: hex::encode(note.pubkey()),
    312                     kind: note.kind(),
    313                     created_at_unix: note.created_at(),
    314                     content: note.content().to_owned(),
    315                     json,
    316                 })
    317             })
    318             .collect::<Result<Vec<_>, RadrootsNostrNdbError>>()
    319     }
    320 
    321     pub fn get_profile_by_pubkey_hex(
    322         &self,
    323         pubkey_hex: &str,
    324     ) -> Result<Option<RadrootsNostrNdbProfile>, RadrootsNostrNdbError> {
    325         let pubkey = parse_hex_32(pubkey_hex, "pubkey")?;
    326         let txn = self.open_txn()?;
    327         let Some(profile_record) = self.get_profile_record(&txn, &pubkey)? else {
    328             return Ok(None);
    329         };
    330 
    331         let profile = profile_record.record().profile();
    332         let profile_key = profile_record.key().map(|key| key.as_u64());
    333         Ok(profile.map(|profile| RadrootsNostrNdbProfile {
    334             profile_key,
    335             pubkey_hex: pubkey_hex.to_owned(),
    336             name: profile.name().map(ToOwned::to_owned),
    337             display_name: profile.display_name().map(ToOwned::to_owned),
    338             about: profile.about().map(ToOwned::to_owned),
    339             picture: profile.picture().map(ToOwned::to_owned),
    340             banner: profile.banner().map(ToOwned::to_owned),
    341             website: profile.website().map(ToOwned::to_owned),
    342             nip05: profile.nip05().map(ToOwned::to_owned),
    343             lud16: profile.lud16().map(ToOwned::to_owned),
    344         }))
    345     }
    346 }
    347 
    348 #[cfg(test)]
    349 mod tests {
    350     use super::*;
    351     use crate::filter::RadrootsNostrNdbFilterSpec;
    352     use crate::ingest::RadrootsNostrNdbIngestSource;
    353     use crate::query::RadrootsNostrNdbQuerySpec;
    354     use crate::test_fixtures::{FIXTURE_ALICE_EMAIL, FIXTURE_ALICE_USERNAME};
    355     use futures::StreamExt;
    356     use radroots_nostr::prelude::{RadrootsNostrEventBuilder, RadrootsNostrKeys};
    357     use radroots_nostr::prelude::{RadrootsNostrMetadata, radroots_nostr_build_metadata_event};
    358     use std::sync::atomic::Ordering;
    359     use std::sync::{Mutex, OnceLock};
    360     use std::time::Duration;
    361     use tempfile::TempDir;
    362 
    363     fn test_hooks_lock() -> &'static Mutex<()> {
    364         static TEST_HOOKS_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
    365         TEST_HOOKS_LOCK.get_or_init(|| Mutex::new(()))
    366     }
    367 
    368     fn test_hooks_guard() -> std::sync::MutexGuard<'static, ()> {
    369         test_hooks_lock().lock().expect("test hooks lock")
    370     }
    371 
    372     fn reset_test_flags() {
    373         test_hooks::FORCE_EVENT_JSON_ERROR.store(false, Ordering::SeqCst);
    374         test_hooks::FORCE_PROCESS_EVENT_ERROR.store(false, Ordering::SeqCst);
    375         test_hooks::FORCE_SUBSCRIBE_ERROR.store(false, Ordering::SeqCst);
    376         test_hooks::FORCE_UNSUBSCRIBE_ERROR.store(false, Ordering::SeqCst);
    377         test_hooks::FORCE_WAIT_ERROR.store(false, Ordering::SeqCst);
    378         test_hooks::FORCE_TRANSACTION_ERROR.store(false, Ordering::SeqCst);
    379         test_hooks::FORCE_QUERY_ERROR.store(false, Ordering::SeqCst);
    380         test_hooks::FORCE_NOTE_JSON_ERROR.store(false, Ordering::SeqCst);
    381         test_hooks::FORCE_PROFILE_QUERY_ERROR.store(false, Ordering::SeqCst);
    382     }
    383 
    384     #[test]
    385     fn config_builder_tracks_values() {
    386         let config = RadrootsNostrNdbConfig::new("target/testdbs/nostr_ndb_config")
    387             .with_mapsize_bytes(1024 * 1024)
    388             .with_ingester_threads(2)
    389             .with_skip_validation(true);
    390 
    391         assert_eq!(config.mapsize_bytes(), Some(1024 * 1024));
    392         assert_eq!(config.ingester_threads(), Some(2));
    393         assert!(config.skip_validation());
    394     }
    395 
    396     #[test]
    397     fn map_profile_lookup_result_handles_all_error_kinds() {
    398         let success = map_profile_lookup_result::<u64>(Ok(7)).expect("ok");
    399         assert_eq!(success, Some(7));
    400 
    401         let not_found =
    402             map_profile_lookup_result::<u64>(Err(nostrdb::Error::NotFound)).expect("none");
    403         assert!(not_found.is_none());
    404 
    405         let query_error = map_profile_lookup_result::<u64>(Err(nostrdb::Error::QueryError))
    406             .expect_err("query error");
    407         assert!(query_error.to_string().starts_with("nostrdb error:"));
    408     }
    409 
    410     #[test]
    411     fn open_creates_database() {
    412         let tmp_dir = TempDir::new().expect("tempdir should open");
    413         let db_dir = tmp_dir.path().join("ndb");
    414         let config = RadrootsNostrNdbConfig::new(&db_dir)
    415             .with_mapsize_bytes(64 * 1024 * 1024)
    416             .with_ingester_threads(1);
    417 
    418         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
    419         assert_eq!(ndb.db_dir(), db_dir.as_path());
    420         assert!(db_dir.exists());
    421     }
    422 
    423     #[cfg(unix)]
    424     #[test]
    425     fn open_rejects_non_utf8_path() {
    426         use std::os::unix::ffi::OsStrExt;
    427 
    428         let path = std::path::PathBuf::from(std::ffi::OsStr::from_bytes(b"ndb-\xFF"));
    429         let config = RadrootsNostrNdbConfig::new(&path);
    430         let err = RadrootsNostrNdb::open(config).expect_err("non utf8 path");
    431         assert!(err.to_string().contains("utf-8"));
    432     }
    433 
    434     #[test]
    435     fn open_reports_ndb_error_for_file_path() {
    436         let tmp_dir = TempDir::new().expect("tempdir should open");
    437         let db_dir = tmp_dir.path().join("ndb");
    438         std::fs::write(&db_dir, "not a directory").expect("write db file");
    439         let config = RadrootsNostrNdbConfig::new(&db_dir);
    440         let err = RadrootsNostrNdb::open(config).expect_err("file path should fail");
    441         assert!(err.to_string().starts_with("nostrdb error:"));
    442     }
    443 
    444     #[test]
    445     fn ingest_source_builders_track_origin() {
    446         assert_eq!(
    447             RadrootsNostrNdbIngestSource::default(),
    448             RadrootsNostrNdbIngestSource::client()
    449         );
    450         assert_eq!(
    451             RadrootsNostrNdbIngestSource::relay("wss://radroots.org"),
    452             RadrootsNostrNdbIngestSource::Relay {
    453                 relay_url: Some("wss://radroots.org".into())
    454             }
    455         );
    456         assert_eq!(
    457             RadrootsNostrNdbIngestSource::relay_unknown(),
    458             RadrootsNostrNdbIngestSource::Relay { relay_url: None }
    459         );
    460     }
    461 
    462     #[test]
    463     fn ingest_event_accepts_signed_note() {
    464         let tmp_dir = TempDir::new().expect("tempdir should open");
    465         let db_dir = tmp_dir.path().join("ndb");
    466         let config = RadrootsNostrNdbConfig::new(&db_dir);
    467         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
    468 
    469         let keys = RadrootsNostrKeys::generate();
    470         let event = RadrootsNostrEventBuilder::text_note("hello from ndb")
    471             .sign_with_keys(&keys)
    472             .expect("event should sign");
    473 
    474         ndb.ingest_event(&event, RadrootsNostrNdbIngestSource::client())
    475             .expect("ingest should succeed");
    476     }
    477 
    478     #[test]
    479     fn ingest_event_json_accepts_signed_note() {
    480         let tmp_dir = TempDir::new().expect("tempdir should open");
    481         let db_dir = tmp_dir.path().join("ndb");
    482         let config = RadrootsNostrNdbConfig::new(&db_dir);
    483         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
    484 
    485         let keys = RadrootsNostrKeys::generate();
    486         let event = RadrootsNostrEventBuilder::text_note("hello from ndb json")
    487             .sign_with_keys(&keys)
    488             .expect("event should sign");
    489         let json = serde_json::to_string(&event).expect("event json");
    490 
    491         ndb.ingest_event_json(&json)
    492             .expect("json ingest should succeed");
    493     }
    494 
    495     #[test]
    496     fn ingest_event_json_rejects_invalid_json() {
    497         let _guard = test_hooks_guard();
    498         reset_test_flags();
    499         let tmp_dir = TempDir::new().expect("tempdir should open");
    500         let db_dir = tmp_dir.path().join("ndb");
    501         let config = RadrootsNostrNdbConfig::new(&db_dir);
    502         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
    503 
    504         test_hooks::FORCE_PROCESS_EVENT_ERROR.store(true, Ordering::SeqCst);
    505         let err = ndb
    506             .ingest_event_json_with_source("not json", RadrootsNostrNdbIngestSource::client())
    507             .expect_err("process event error");
    508         assert!(err.to_string().starts_with("nostrdb error:"));
    509     }
    510 
    511     #[test]
    512     fn ingest_event_reports_event_json_error() {
    513         let _guard = test_hooks_guard();
    514         reset_test_flags();
    515         let tmp_dir = TempDir::new().expect("tempdir should open");
    516         let db_dir = tmp_dir.path().join("ndb");
    517         let config = RadrootsNostrNdbConfig::new(&db_dir);
    518         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
    519 
    520         let keys = RadrootsNostrKeys::generate();
    521         let event = RadrootsNostrEventBuilder::text_note("forced json error")
    522             .sign_with_keys(&keys)
    523             .expect("event should sign");
    524         test_hooks::FORCE_EVENT_JSON_ERROR.store(true, Ordering::SeqCst);
    525 
    526         let err = ndb
    527             .ingest_event(&event, RadrootsNostrNdbIngestSource::client())
    528             .expect_err("forced json error");
    529         assert!(err.to_string().starts_with("event json encode failed:"));
    530     }
    531 
    532     #[test]
    533     fn subscribe_poll_and_unsubscribe_round_trip() {
    534         let _guard = test_hooks_guard();
    535         reset_test_flags();
    536         let tmp_dir = TempDir::new().expect("tempdir should open");
    537         let db_dir = tmp_dir.path().join("ndb");
    538         let config = RadrootsNostrNdbConfig::new(&db_dir);
    539         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
    540         let spec = RadrootsNostrNdbSubscriptionSpec::single(
    541             RadrootsNostrNdbFilterSpec::new()
    542                 .with_kind(1)
    543                 .with_limit(10),
    544         );
    545         let handle = ndb.subscribe(&spec).expect("subscribe should succeed");
    546 
    547         let keys = RadrootsNostrKeys::generate();
    548         let event = RadrootsNostrEventBuilder::text_note("subscription test")
    549             .sign_with_keys(&keys)
    550             .expect("event should sign");
    551         ndb.ingest_event(&event, RadrootsNostrNdbIngestSource::relay_unknown())
    552             .expect("ingest should succeed");
    553 
    554         let mut notes = Vec::new();
    555         for _ in 0..40 {
    556             notes = ndb.poll_for_note_keys(handle, 32);
    557             if !notes.is_empty() {
    558                 break;
    559             }
    560             std::thread::sleep(Duration::from_millis(25));
    561         }
    562 
    563         assert!(!notes.is_empty());
    564         ndb.unsubscribe(handle).expect("unsubscribe should succeed");
    565     }
    566 
    567     #[test]
    568     fn subscribe_reports_ndb_error() {
    569         let _guard = test_hooks_guard();
    570         reset_test_flags();
    571         let tmp_dir = TempDir::new().expect("tempdir should open");
    572         let db_dir = tmp_dir.path().join("ndb");
    573         let config = RadrootsNostrNdbConfig::new(&db_dir);
    574         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
    575         let spec = RadrootsNostrNdbSubscriptionSpec::text_notes(Some(10), None);
    576         test_hooks::FORCE_SUBSCRIBE_ERROR.store(true, Ordering::SeqCst);
    577 
    578         let err = ndb.subscribe(&spec).expect_err("forced subscribe error");
    579         assert!(err.to_string().starts_with("nostrdb error:"));
    580     }
    581 
    582     #[test]
    583     fn unsubscribe_reports_ndb_error() {
    584         let _guard = test_hooks_guard();
    585         reset_test_flags();
    586         let tmp_dir = TempDir::new().expect("tempdir should open");
    587         let db_dir = tmp_dir.path().join("ndb");
    588         let config = RadrootsNostrNdbConfig::new(&db_dir);
    589         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
    590         let spec = RadrootsNostrNdbSubscriptionSpec::text_notes(Some(10), None);
    591         let handle = ndb.subscribe(&spec).expect("subscribe should succeed");
    592         test_hooks::FORCE_UNSUBSCRIBE_ERROR.store(true, Ordering::SeqCst);
    593 
    594         let err = ndb
    595             .unsubscribe(handle)
    596             .expect_err("forced unsubscribe error");
    597         assert!(err.to_string().starts_with("nostrdb error:"));
    598     }
    599 
    600     #[tokio::test]
    601     async fn wait_for_note_keys_yields_results() {
    602         let _guard = test_hooks_guard();
    603         reset_test_flags();
    604         let tmp_dir = TempDir::new().expect("tempdir should open");
    605         let db_dir = tmp_dir.path().join("ndb");
    606         let config = RadrootsNostrNdbConfig::new(&db_dir);
    607         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
    608         let spec = RadrootsNostrNdbSubscriptionSpec::text_notes(Some(10), None);
    609         let handle = ndb.subscribe(&spec).expect("subscribe should succeed");
    610 
    611         let keys = RadrootsNostrKeys::generate();
    612         let event = RadrootsNostrEventBuilder::text_note("wait test")
    613             .sign_with_keys(&keys)
    614             .expect("event should sign");
    615         ndb.ingest_event(&event, RadrootsNostrNdbIngestSource::relay_unknown())
    616             .expect("ingest should succeed");
    617 
    618         let notes = ndb
    619             .wait_for_note_keys(handle, 32)
    620             .await
    621             .expect("wait should succeed");
    622         assert!(!notes.is_empty());
    623     }
    624 
    625     #[tokio::test]
    626     async fn wait_for_note_keys_reports_ndb_error() {
    627         let _guard = test_hooks_guard();
    628         reset_test_flags();
    629         let tmp_dir = TempDir::new().expect("tempdir should open");
    630         let db_dir = tmp_dir.path().join("ndb");
    631         let config = RadrootsNostrNdbConfig::new(&db_dir);
    632         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
    633         let spec = RadrootsNostrNdbSubscriptionSpec::text_notes(Some(10), None);
    634         let handle = ndb.subscribe(&spec).expect("subscribe should succeed");
    635         test_hooks::FORCE_WAIT_ERROR.store(true, Ordering::SeqCst);
    636 
    637         let err = ndb
    638             .wait_for_note_keys(handle, 1)
    639             .await
    640             .expect_err("forced wait error");
    641         assert!(err.to_string().starts_with("nostrdb error:"));
    642     }
    643 
    644     #[test]
    645     fn query_notes_returns_ingested_results() {
    646         let _guard = test_hooks_guard();
    647         reset_test_flags();
    648         let tmp_dir = TempDir::new().expect("tempdir should open");
    649         let db_dir = tmp_dir.path().join("ndb");
    650         let config = RadrootsNostrNdbConfig::new(&db_dir);
    651         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
    652 
    653         let keys = RadrootsNostrKeys::generate();
    654         let event = RadrootsNostrEventBuilder::text_note("query note")
    655             .sign_with_keys(&keys)
    656             .expect("event should sign");
    657         ndb.ingest_event(&event, RadrootsNostrNdbIngestSource::client())
    658             .expect("ingest should succeed");
    659 
    660         let query_spec = RadrootsNostrNdbQuerySpec::text_notes(Some(50), None, 50);
    661         let mut notes = Vec::new();
    662         for _ in 0..40 {
    663             notes = ndb.query_notes(&query_spec).expect("query should succeed");
    664             if !notes.is_empty() {
    665                 break;
    666             }
    667             std::thread::sleep(Duration::from_millis(25));
    668         }
    669         assert!(!notes.is_empty());
    670         let note_pairs = notes
    671             .iter()
    672             .map(|note| (note.id_hex.clone(), note.content.clone()))
    673             .collect::<Vec<_>>();
    674         assert!(note_pairs.contains(&(event.id.to_hex(), "query note".to_string())));
    675     }
    676 
    677     #[test]
    678     fn query_notes_empty_filters_returns_empty() {
    679         let _guard = test_hooks_guard();
    680         reset_test_flags();
    681         let tmp_dir = TempDir::new().expect("tempdir should open");
    682         let db_dir = tmp_dir.path().join("ndb");
    683         let config = RadrootsNostrNdbConfig::new(&db_dir);
    684         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
    685 
    686         let query_spec = RadrootsNostrNdbQuerySpec::new(Vec::new(), 10);
    687         let notes = ndb.query_notes(&query_spec).expect("query should succeed");
    688         assert!(notes.is_empty());
    689     }
    690 
    691     #[test]
    692     fn query_notes_rejects_invalid_filters() {
    693         let _guard = test_hooks_guard();
    694         reset_test_flags();
    695         let tmp_dir = TempDir::new().expect("tempdir should open");
    696         let db_dir = tmp_dir.path().join("ndb");
    697         let config = RadrootsNostrNdbConfig::new(&db_dir);
    698         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
    699 
    700         let spec = RadrootsNostrNdbQuerySpec::single(
    701             RadrootsNostrNdbFilterSpec::new().with_author_hex("not-hex"),
    702             10,
    703         );
    704         let err = ndb.query_notes(&spec).expect_err("invalid filter");
    705         assert!(err.to_string().contains("invalid hex"));
    706     }
    707 
    708     #[test]
    709     fn query_notes_reports_transaction_error() {
    710         let _guard = test_hooks_guard();
    711         reset_test_flags();
    712         let tmp_dir = TempDir::new().expect("tempdir should open");
    713         let db_dir = tmp_dir.path().join("ndb");
    714         let config = RadrootsNostrNdbConfig::new(&db_dir);
    715         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
    716         let spec = RadrootsNostrNdbQuerySpec::text_notes(Some(10), None, 10);
    717         test_hooks::FORCE_TRANSACTION_ERROR.store(true, Ordering::SeqCst);
    718 
    719         let err = ndb
    720             .query_notes(&spec)
    721             .expect_err("forced transaction error");
    722         assert!(err.to_string().starts_with("nostrdb error:"));
    723     }
    724 
    725     #[test]
    726     fn query_notes_reports_query_error() {
    727         let _guard = test_hooks_guard();
    728         reset_test_flags();
    729         let tmp_dir = TempDir::new().expect("tempdir should open");
    730         let db_dir = tmp_dir.path().join("ndb");
    731         let config = RadrootsNostrNdbConfig::new(&db_dir);
    732         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
    733         let spec = RadrootsNostrNdbQuerySpec::text_notes(Some(10), None, 10);
    734         test_hooks::FORCE_QUERY_ERROR.store(true, Ordering::SeqCst);
    735 
    736         let err = ndb.query_notes(&spec).expect_err("forced query error");
    737         assert!(err.to_string().starts_with("nostrdb error:"));
    738     }
    739 
    740     #[test]
    741     fn query_notes_reports_note_json_error() {
    742         let _guard = test_hooks_guard();
    743         reset_test_flags();
    744         let tmp_dir = TempDir::new().expect("tempdir should open");
    745         let db_dir = tmp_dir.path().join("ndb");
    746         let config = RadrootsNostrNdbConfig::new(&db_dir);
    747         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
    748 
    749         let keys = RadrootsNostrKeys::generate();
    750         let event = RadrootsNostrEventBuilder::text_note("note json error")
    751             .sign_with_keys(&keys)
    752             .expect("event should sign");
    753         ndb.ingest_event(&event, RadrootsNostrNdbIngestSource::client())
    754             .expect("ingest should succeed");
    755 
    756         let query_spec = RadrootsNostrNdbQuerySpec::text_notes(Some(50), None, 50);
    757         for _ in 0..40 {
    758             let notes = ndb.query_notes(&query_spec).expect("query should succeed");
    759             if !notes.is_empty() {
    760                 break;
    761             }
    762             std::thread::sleep(Duration::from_millis(25));
    763         }
    764         test_hooks::FORCE_NOTE_JSON_ERROR.store(true, Ordering::SeqCst);
    765 
    766         let err = ndb
    767             .query_notes(&query_spec)
    768             .expect_err("forced note json error");
    769         assert!(err.to_string().starts_with("nostrdb error:"));
    770     }
    771 
    772     #[test]
    773     fn profile_lookup_returns_metadata_fields() {
    774         let _guard = test_hooks_guard();
    775         reset_test_flags();
    776         let tmp_dir = TempDir::new().expect("tempdir should open");
    777         let db_dir = tmp_dir.path().join("ndb");
    778         let config = RadrootsNostrNdbConfig::new(&db_dir);
    779         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
    780 
    781         let keys = RadrootsNostrKeys::generate();
    782         let pubkey_hex = keys.public_key().to_hex();
    783         let metadata = RadrootsNostrMetadata::new()
    784             .name(FIXTURE_ALICE_USERNAME)
    785             .display_name(FIXTURE_ALICE_USERNAME)
    786             .about("coffee operator")
    787             .lud16(FIXTURE_ALICE_EMAIL);
    788         let metadata_event = radroots_nostr_build_metadata_event(&metadata)
    789             .sign_with_keys(&keys)
    790             .expect("metadata event should sign");
    791         ndb.ingest_event(&metadata_event, RadrootsNostrNdbIngestSource::client())
    792             .expect("ingest should succeed");
    793 
    794         let mut profile = None;
    795         for _ in 0..40 {
    796             profile = ndb
    797                 .get_profile_by_pubkey_hex(pubkey_hex.as_str())
    798                 .expect("profile lookup should succeed");
    799             if profile.is_some() {
    800                 break;
    801             }
    802             std::thread::sleep(Duration::from_millis(25));
    803         }
    804         let profile = profile.expect("profile should exist");
    805         assert_eq!(profile.pubkey_hex, pubkey_hex);
    806         assert_eq!(profile.name.as_deref(), Some(FIXTURE_ALICE_USERNAME));
    807         assert_eq!(
    808             profile.display_name.as_deref(),
    809             Some(FIXTURE_ALICE_USERNAME)
    810         );
    811         assert_eq!(profile.lud16.as_deref(), Some(FIXTURE_ALICE_EMAIL));
    812     }
    813 
    814     #[test]
    815     fn profile_lookup_returns_none_when_missing() {
    816         let _guard = test_hooks_guard();
    817         reset_test_flags();
    818         let tmp_dir = TempDir::new().expect("tempdir should open");
    819         let db_dir = tmp_dir.path().join("ndb");
    820         let config = RadrootsNostrNdbConfig::new(&db_dir);
    821         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
    822 
    823         let pubkey_hex = RadrootsNostrKeys::generate().public_key().to_hex();
    824         let profile = ndb
    825             .get_profile_by_pubkey_hex(pubkey_hex.as_str())
    826             .expect("profile lookup");
    827         assert!(profile.is_none());
    828     }
    829 
    830     #[test]
    831     fn profile_lookup_reports_query_error() {
    832         let _guard = test_hooks_guard();
    833         reset_test_flags();
    834         let tmp_dir = TempDir::new().expect("tempdir should open");
    835         let db_dir = tmp_dir.path().join("ndb");
    836         let config = RadrootsNostrNdbConfig::new(&db_dir);
    837         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
    838         let pubkey_hex = RadrootsNostrKeys::generate().public_key().to_hex();
    839         test_hooks::FORCE_PROFILE_QUERY_ERROR.store(true, Ordering::SeqCst);
    840 
    841         let err = ndb
    842             .get_profile_by_pubkey_hex(pubkey_hex.as_str())
    843             .expect_err("forced profile query error");
    844         assert!(err.to_string().starts_with("nostrdb error:"));
    845     }
    846 
    847     #[test]
    848     fn profile_lookup_reports_transaction_error() {
    849         let _guard = test_hooks_guard();
    850         reset_test_flags();
    851         let tmp_dir = TempDir::new().expect("tempdir should open");
    852         let db_dir = tmp_dir.path().join("ndb");
    853         let config = RadrootsNostrNdbConfig::new(&db_dir);
    854         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
    855         let pubkey_hex = RadrootsNostrKeys::generate().public_key().to_hex();
    856         test_hooks::FORCE_TRANSACTION_ERROR.store(true, Ordering::SeqCst);
    857 
    858         let err = ndb
    859             .get_profile_by_pubkey_hex(pubkey_hex.as_str())
    860             .expect_err("forced transaction error");
    861         assert!(err.to_string().starts_with("nostrdb error:"));
    862     }
    863 
    864     #[test]
    865     fn profile_lookup_returns_none_without_metadata_record() {
    866         let _guard = test_hooks_guard();
    867         reset_test_flags();
    868         let tmp_dir = TempDir::new().expect("tempdir should open");
    869         let db_dir = tmp_dir.path().join("ndb");
    870         let config = RadrootsNostrNdbConfig::new(&db_dir);
    871         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
    872 
    873         let keys = RadrootsNostrKeys::generate();
    874         let pubkey_hex = keys.public_key().to_hex();
    875         let event = RadrootsNostrEventBuilder::text_note("non profile event")
    876             .sign_with_keys(&keys)
    877             .expect("event should sign");
    878         ndb.ingest_event(&event, RadrootsNostrNdbIngestSource::client())
    879             .expect("ingest should succeed");
    880 
    881         let profile = ndb
    882             .get_profile_by_pubkey_hex(pubkey_hex.as_str())
    883             .expect("profile lookup");
    884         assert!(profile.is_none());
    885     }
    886 
    887     #[test]
    888     fn profile_lookup_invalid_metadata_content_returns_none() {
    889         let _guard = test_hooks_guard();
    890         reset_test_flags();
    891         let tmp_dir = TempDir::new().expect("tempdir should open");
    892         let db_dir = tmp_dir.path().join("ndb");
    893         let config = RadrootsNostrNdbConfig::new(&db_dir);
    894         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
    895 
    896         let keys = RadrootsNostrKeys::generate();
    897         let pubkey_hex = keys.public_key().to_hex();
    898         let event = RadrootsNostrEventBuilder::new(
    899             radroots_nostr::prelude::RadrootsNostrKind::Metadata,
    900             "not valid metadata json",
    901         )
    902         .sign_with_keys(&keys)
    903         .expect("event should sign");
    904         ndb.ingest_event(&event, RadrootsNostrNdbIngestSource::client())
    905             .expect("ingest should succeed");
    906 
    907         let result = ndb.get_profile_by_pubkey_hex(pubkey_hex.as_str());
    908         assert!(result.expect("profile lookup").is_none());
    909     }
    910 
    911     #[test]
    912     fn subscribe_rejects_invalid_author_hex() {
    913         let tmp_dir = TempDir::new().expect("tempdir should open");
    914         let db_dir = tmp_dir.path().join("ndb");
    915         let config = RadrootsNostrNdbConfig::new(&db_dir);
    916         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
    917 
    918         let spec = RadrootsNostrNdbSubscriptionSpec::single(
    919             RadrootsNostrNdbFilterSpec::new().with_author_hex("not-hex"),
    920         );
    921         let err = ndb.subscribe(&spec).expect_err("subscribe should fail");
    922         assert!(err.to_string().contains("invalid hex for author"));
    923     }
    924 
    925     #[test]
    926     fn profile_lookup_rejects_invalid_pubkey_length() {
    927         let _guard = test_hooks_guard();
    928         reset_test_flags();
    929         let tmp_dir = TempDir::new().expect("tempdir should open");
    930         let db_dir = tmp_dir.path().join("ndb");
    931         let config = RadrootsNostrNdbConfig::new(&db_dir);
    932         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
    933 
    934         let err = ndb
    935             .get_profile_by_pubkey_hex("abcd")
    936             .expect_err("lookup should fail");
    937         assert!(err.to_string().contains("invalid hex length for pubkey"));
    938     }
    939 
    940     #[tokio::test]
    941     async fn subscription_stream_yields_events() {
    942         let tmp_dir = TempDir::new().expect("tempdir should open");
    943         let db_dir = tmp_dir.path().join("ndb");
    944         let config = RadrootsNostrNdbConfig::new(&db_dir);
    945         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
    946         let spec = RadrootsNostrNdbSubscriptionSpec::text_notes(Some(10), None);
    947         let handle = ndb.subscribe(&spec).expect("subscribe should succeed");
    948         let mut stream = ndb.subscription_stream(handle, 0);
    949 
    950         let pending = tokio::time::timeout(Duration::from_millis(20), stream.next()).await;
    951         assert!(pending.is_err());
    952 
    953         let keys = RadrootsNostrKeys::generate();
    954         let event = RadrootsNostrEventBuilder::text_note("stream note")
    955             .sign_with_keys(&keys)
    956             .expect("event should sign");
    957         ndb.ingest_event(&event, RadrootsNostrNdbIngestSource::client())
    958             .expect("ingest should succeed");
    959 
    960         let note_keys = tokio::time::timeout(Duration::from_secs(2), stream.next())
    961             .await
    962             .expect("stream should wake")
    963             .expect("stream should yield note keys");
    964         assert!(!note_keys.is_empty());
    965         assert!(note_keys.iter().all(|key| key.as_u64() > 0));
    966     }
    967 
    968     #[test]
    969     fn concurrent_ingest_handles_parallel_writers() {
    970         let _guard = test_hooks_guard();
    971         reset_test_flags();
    972         let tmp_dir = TempDir::new().expect("tempdir should open");
    973         let db_dir = tmp_dir.path().join("ndb");
    974         let config = RadrootsNostrNdbConfig::new(&db_dir);
    975         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
    976 
    977         let worker_count = 4usize;
    978         let notes_per_worker = 20usize;
    979         let mut handles = Vec::new();
    980 
    981         for worker in 0..worker_count {
    982             let db = ndb.clone();
    983             handles.push(std::thread::spawn(move || {
    984                 let keys = RadrootsNostrKeys::generate();
    985                 for idx in 0..notes_per_worker {
    986                     let content = format!("parallel-{worker}-{idx}");
    987                     let event = RadrootsNostrEventBuilder::text_note(content.as_str())
    988                         .sign_with_keys(&keys)
    989                         .expect("event should sign");
    990                     db.ingest_event(&event, RadrootsNostrNdbIngestSource::client())
    991                         .expect("ingest should succeed");
    992                 }
    993             }));
    994         }
    995 
    996         for handle in handles {
    997             handle.join().expect("worker should complete");
    998         }
    999 
   1000         let query_spec = RadrootsNostrNdbQuerySpec::text_notes(Some(512), None, 512);
   1001         let expected = worker_count * notes_per_worker;
   1002         let mut observed = 0usize;
   1003         let mut break_threshold = expected + 1;
   1004 
   1005         for _ in 0..80 {
   1006             let notes = ndb.query_notes(&query_spec).expect("query should succeed");
   1007             observed = notes
   1008                 .iter()
   1009                 .filter(|note| note.content.starts_with("parallel-"))
   1010                 .count();
   1011             if observed >= break_threshold {
   1012                 break;
   1013             }
   1014             break_threshold = expected;
   1015             std::thread::sleep(Duration::from_millis(25));
   1016         }
   1017 
   1018         assert!(
   1019             observed >= expected,
   1020             "expected at least {expected} parallel notes, got {observed}"
   1021         );
   1022     }
   1023 
   1024     #[cfg(feature = "giftwrap")]
   1025     #[test]
   1026     fn giftwrap_secret_key_hex_validates_length() {
   1027         let tmp_dir = TempDir::new().expect("tempdir should open");
   1028         let db_dir = tmp_dir.path().join("ndb");
   1029         let config = RadrootsNostrNdbConfig::new(&db_dir);
   1030         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
   1031 
   1032         let result = ndb.add_giftwrap_secret_key_hex("abcd");
   1033         let err = result.expect_err("invalid giftwrap key");
   1034         assert!(
   1035             err.to_string()
   1036                 .contains("invalid hex length for secret_key")
   1037         );
   1038     }
   1039 
   1040     #[cfg(feature = "giftwrap")]
   1041     #[test]
   1042     fn giftwrap_process_flow_executes() {
   1043         let tmp_dir = TempDir::new().expect("tempdir should open");
   1044         let db_dir = tmp_dir.path().join("ndb");
   1045         let config = RadrootsNostrNdbConfig::new(&db_dir);
   1046         let ndb = RadrootsNostrNdb::open(config).expect("database should open");
   1047 
   1048         let secret_key = [7u8; 32];
   1049         let _ = ndb.add_giftwrap_secret_key(secret_key);
   1050         ndb.process_giftwraps()
   1051             .expect("giftwrap processing should run");
   1052     }
   1053 }