ndb.rs (41453B)
1 use crate::config::RadrootsNostrNdbConfig; 2 use crate::error::RadrootsNostrNdbError; 3 use crate::filter::parse_hex_32; 4 use crate::ingest::RadrootsNostrNdbIngestSource; 5 use crate::query::{RadrootsNostrNdbNote, RadrootsNostrNdbProfile, RadrootsNostrNdbQuerySpec}; 6 use crate::subscription::{ 7 RadrootsNostrNdbNoteKey, RadrootsNostrNdbSubscriptionHandle, RadrootsNostrNdbSubscriptionSpec, 8 RadrootsNostrNdbSubscriptionStream, 9 }; 10 use radroots_nostr::prelude::RadrootsNostrEvent; 11 use std::path::Path; 12 13 #[derive(Debug, Clone)] 14 pub struct RadrootsNostrNdb { 15 db_dir: std::path::PathBuf, 16 pub(crate) inner: nostrdb::Ndb, 17 } 18 19 #[cfg(test)] 20 mod test_hooks { 21 use std::sync::atomic::{AtomicBool, Ordering}; 22 23 pub static FORCE_EVENT_JSON_ERROR: AtomicBool = AtomicBool::new(false); 24 pub static FORCE_PROCESS_EVENT_ERROR: AtomicBool = AtomicBool::new(false); 25 pub static FORCE_SUBSCRIBE_ERROR: AtomicBool = AtomicBool::new(false); 26 pub static FORCE_UNSUBSCRIBE_ERROR: AtomicBool = AtomicBool::new(false); 27 pub static FORCE_WAIT_ERROR: AtomicBool = AtomicBool::new(false); 28 pub static FORCE_TRANSACTION_ERROR: AtomicBool = AtomicBool::new(false); 29 pub static FORCE_QUERY_ERROR: AtomicBool = AtomicBool::new(false); 30 pub static FORCE_NOTE_JSON_ERROR: AtomicBool = AtomicBool::new(false); 31 pub static FORCE_PROFILE_QUERY_ERROR: AtomicBool = AtomicBool::new(false); 32 33 pub fn take(flag: &AtomicBool) -> bool { 34 flag.swap(false, Ordering::SeqCst) 35 } 36 } 37 38 fn map_profile_lookup_result<T>( 39 result: Result<T, nostrdb::Error>, 40 ) -> Result<Option<T>, RadrootsNostrNdbError> { 41 match result { 42 Ok(value) => Ok(Some(value)), 43 Err(nostrdb::Error::NotFound) => Ok(None), 44 Err(source) => Err(source.into()), 45 } 46 } 47 48 impl RadrootsNostrNdb { 49 fn serialize_event(event: &RadrootsNostrEvent) -> Result<String, RadrootsNostrNdbError> { 50 #[cfg(test)] 51 if test_hooks::take(&test_hooks::FORCE_EVENT_JSON_ERROR) { 52 return Err(RadrootsNostrNdbError::EventJsonEncode( 53 "forced event json error".into(), 54 )); 55 } 56 serde_json::to_string(event).map_err(Into::into) 57 } 58 59 fn process_event_with_inner( 60 &self, 61 json: &str, 62 metadata: nostrdb::IngestMetadata, 63 ) -> Result<(), RadrootsNostrNdbError> { 64 #[cfg(test)] 65 if test_hooks::take(&test_hooks::FORCE_PROCESS_EVENT_ERROR) { 66 return Err(RadrootsNostrNdbError::Ndb( 67 "forced process event error".into(), 68 )); 69 } 70 self.inner 71 .process_event_with(json, metadata) 72 .map_err(Into::into) 73 } 74 75 fn subscribe_inner( 76 &self, 77 filters: &[nostrdb::Filter], 78 ) -> Result<nostrdb::Subscription, RadrootsNostrNdbError> { 79 #[cfg(test)] 80 if test_hooks::take(&test_hooks::FORCE_SUBSCRIBE_ERROR) { 81 return Err(RadrootsNostrNdbError::Ndb("forced subscribe error".into())); 82 } 83 self.inner.subscribe(filters).map_err(Into::into) 84 } 85 86 fn unsubscribe_inner( 87 &self, 88 subscription: nostrdb::Subscription, 89 ) -> Result<(), RadrootsNostrNdbError> { 90 #[cfg(test)] 91 if test_hooks::take(&test_hooks::FORCE_UNSUBSCRIBE_ERROR) { 92 return Err(RadrootsNostrNdbError::Ndb( 93 "forced unsubscribe error".into(), 94 )); 95 } 96 let mut inner = self.inner.clone(); 97 inner.unsubscribe(subscription).map_err(Into::into) 98 } 99 100 #[cfg(feature = "rt")] 101 async fn wait_for_notes_inner( 102 &self, 103 subscription: nostrdb::Subscription, 104 max_notes: u32, 105 ) -> Result<Vec<nostrdb::NoteKey>, RadrootsNostrNdbError> { 106 #[cfg(test)] 107 if test_hooks::take(&test_hooks::FORCE_WAIT_ERROR) { 108 return Err(RadrootsNostrNdbError::Ndb("forced wait error".into())); 109 } 110 self.inner 111 .wait_for_notes(subscription, max_notes) 112 .await 113 .map_err(Into::into) 114 } 115 116 fn open_txn(&self) -> Result<nostrdb::Transaction, RadrootsNostrNdbError> { 117 #[cfg(test)] 118 if test_hooks::take(&test_hooks::FORCE_TRANSACTION_ERROR) { 119 return Err(RadrootsNostrNdbError::Ndb( 120 "forced transaction error".into(), 121 )); 122 } 123 nostrdb::Transaction::new(&self.inner).map_err(Into::into) 124 } 125 126 fn query_inner<'a>( 127 &self, 128 txn: &'a nostrdb::Transaction, 129 filters: &[nostrdb::Filter], 130 max_results: i32, 131 ) -> Result<Vec<nostrdb::QueryResult<'a>>, RadrootsNostrNdbError> { 132 #[cfg(test)] 133 if test_hooks::take(&test_hooks::FORCE_QUERY_ERROR) { 134 return Err(RadrootsNostrNdbError::Ndb("forced query error".into())); 135 } 136 self.inner 137 .query(txn, filters, max_results) 138 .map_err(Into::into) 139 } 140 141 fn note_json_value(note: &nostrdb::Note) -> Result<String, RadrootsNostrNdbError> { 142 #[cfg(test)] 143 if test_hooks::take(&test_hooks::FORCE_NOTE_JSON_ERROR) { 144 return Err(RadrootsNostrNdbError::Ndb("forced note json error".into())); 145 } 146 note.json().map_err(Into::into) 147 } 148 149 fn get_profile_record<'a>( 150 &self, 151 txn: &'a nostrdb::Transaction, 152 pubkey: &[u8; 32], 153 ) -> Result<Option<nostrdb::ProfileRecord<'a>>, RadrootsNostrNdbError> { 154 #[cfg(test)] 155 if test_hooks::take(&test_hooks::FORCE_PROFILE_QUERY_ERROR) { 156 return map_profile_lookup_result(Err(nostrdb::Error::QueryError)); 157 } 158 map_profile_lookup_result(self.inner.get_profile_by_pubkey(txn, pubkey)) 159 } 160 161 pub fn open(config: RadrootsNostrNdbConfig) -> Result<Self, RadrootsNostrNdbError> { 162 let mut inner_config = nostrdb::Config::new().skip_validation(config.skip_validation()); 163 if let Some(mapsize_bytes) = config.mapsize_bytes() { 164 inner_config = inner_config.set_mapsize(mapsize_bytes); 165 } 166 if let Some(ingester_threads) = config.ingester_threads() { 167 inner_config = inner_config.set_ingester_threads(ingester_threads); 168 } 169 170 let db_dir = config.db_dir().to_path_buf(); 171 let db_dir_str = db_dir.to_str().ok_or(RadrootsNostrNdbError::NonUtf8Path)?; 172 let inner = nostrdb::Ndb::new(db_dir_str, &inner_config)?; 173 174 Ok(Self { db_dir, inner }) 175 } 176 177 pub fn db_dir(&self) -> &Path { 178 &self.db_dir 179 } 180 181 pub fn ingest_event_json_with_source( 182 &self, 183 json: &str, 184 source: RadrootsNostrNdbIngestSource, 185 ) -> Result<(), RadrootsNostrNdbError> { 186 let metadata = source.to_ndb_metadata(); 187 self.process_event_with_inner(json, metadata)?; 188 Ok(()) 189 } 190 191 pub fn ingest_event_json(&self, json: &str) -> Result<(), RadrootsNostrNdbError> { 192 self.ingest_event_json_with_source(json, RadrootsNostrNdbIngestSource::default()) 193 } 194 195 pub fn ingest_event( 196 &self, 197 event: &RadrootsNostrEvent, 198 source: RadrootsNostrNdbIngestSource, 199 ) -> Result<(), RadrootsNostrNdbError> { 200 let json = Self::serialize_event(event)?; 201 self.ingest_event_json_with_source(json.as_str(), source) 202 } 203 204 #[cfg(feature = "giftwrap")] 205 pub fn add_giftwrap_secret_key(&self, secret_key: [u8; 32]) -> bool { 206 self.inner.add_key(&secret_key) 207 } 208 209 #[cfg(feature = "giftwrap")] 210 pub fn add_giftwrap_secret_key_hex( 211 &self, 212 secret_key_hex: &str, 213 ) -> Result<bool, RadrootsNostrNdbError> { 214 let secret_key = parse_hex_32(secret_key_hex, "secret_key")?; 215 Ok(self.add_giftwrap_secret_key(secret_key)) 216 } 217 218 #[cfg(feature = "giftwrap")] 219 pub fn process_giftwraps(&self) -> Result<(), RadrootsNostrNdbError> { 220 let txn = nostrdb::Transaction::new(&self.inner)?; 221 self.inner.process_giftwraps(&txn); 222 Ok(()) 223 } 224 225 pub fn subscribe( 226 &self, 227 spec: &RadrootsNostrNdbSubscriptionSpec, 228 ) -> Result<RadrootsNostrNdbSubscriptionHandle, RadrootsNostrNdbError> { 229 let filters = spec 230 .filters() 231 .iter() 232 .map(|filter_spec| filter_spec.to_ndb_filter()) 233 .collect::<Result<Vec<_>, _>>()?; 234 let subscription = self.subscribe_inner(filters.as_slice())?; 235 Ok(RadrootsNostrNdbSubscriptionHandle::new(subscription.id())) 236 } 237 238 pub fn unsubscribe( 239 &self, 240 handle: RadrootsNostrNdbSubscriptionHandle, 241 ) -> Result<(), RadrootsNostrNdbError> { 242 let subscription = nostrdb::Subscription::new(handle.id()); 243 self.unsubscribe_inner(subscription)?; 244 Ok(()) 245 } 246 247 pub fn poll_for_note_keys( 248 &self, 249 handle: RadrootsNostrNdbSubscriptionHandle, 250 max_notes: u32, 251 ) -> Vec<RadrootsNostrNdbNoteKey> { 252 self.inner 253 .poll_for_notes(nostrdb::Subscription::new(handle.id()), max_notes) 254 .into_iter() 255 .map(|note_key| RadrootsNostrNdbNoteKey::new(note_key.as_u64())) 256 .collect() 257 } 258 259 #[cfg(feature = "rt")] 260 pub async fn wait_for_note_keys( 261 &self, 262 handle: RadrootsNostrNdbSubscriptionHandle, 263 max_notes: u32, 264 ) -> Result<Vec<RadrootsNostrNdbNoteKey>, RadrootsNostrNdbError> { 265 let note_keys = self 266 .wait_for_notes_inner(nostrdb::Subscription::new(handle.id()), max_notes) 267 .await?; 268 Ok(note_keys 269 .into_iter() 270 .map(|note_key| RadrootsNostrNdbNoteKey::new(note_key.as_u64())) 271 .collect()) 272 } 273 274 #[cfg(feature = "rt")] 275 pub fn subscription_stream( 276 &self, 277 handle: RadrootsNostrNdbSubscriptionHandle, 278 notes_per_await: u32, 279 ) -> RadrootsNostrNdbSubscriptionStream { 280 let stream = nostrdb::Subscription::new(handle.id()) 281 .stream(&self.inner) 282 .notes_per_await(notes_per_await.max(1)); 283 RadrootsNostrNdbSubscriptionStream { inner: stream } 284 } 285 286 pub fn query_notes( 287 &self, 288 spec: &RadrootsNostrNdbQuerySpec, 289 ) -> Result<Vec<RadrootsNostrNdbNote>, RadrootsNostrNdbError> { 290 if spec.filters().is_empty() { 291 return Ok(Vec::new()); 292 } 293 294 let filters = spec 295 .filters() 296 .iter() 297 .map(|filter_spec| filter_spec.to_ndb_filter()) 298 .collect::<Result<Vec<_>, _>>()?; 299 let txn = self.open_txn()?; 300 let query_results = 301 self.query_inner(&txn, filters.as_slice(), spec.max_results() as i32)?; 302 303 query_results 304 .into_iter() 305 .map(|query_result| { 306 let note = query_result.note; 307 let json = Self::note_json_value(¬e)?; 308 Ok(RadrootsNostrNdbNote { 309 note_key: query_result.note_key.as_u64(), 310 id_hex: hex::encode(note.id()), 311 author_hex: hex::encode(note.pubkey()), 312 kind: note.kind(), 313 created_at_unix: note.created_at(), 314 content: note.content().to_owned(), 315 json, 316 }) 317 }) 318 .collect::<Result<Vec<_>, RadrootsNostrNdbError>>() 319 } 320 321 pub fn get_profile_by_pubkey_hex( 322 &self, 323 pubkey_hex: &str, 324 ) -> Result<Option<RadrootsNostrNdbProfile>, RadrootsNostrNdbError> { 325 let pubkey = parse_hex_32(pubkey_hex, "pubkey")?; 326 let txn = self.open_txn()?; 327 let Some(profile_record) = self.get_profile_record(&txn, &pubkey)? else { 328 return Ok(None); 329 }; 330 331 let profile = profile_record.record().profile(); 332 let profile_key = profile_record.key().map(|key| key.as_u64()); 333 Ok(profile.map(|profile| RadrootsNostrNdbProfile { 334 profile_key, 335 pubkey_hex: pubkey_hex.to_owned(), 336 name: profile.name().map(ToOwned::to_owned), 337 display_name: profile.display_name().map(ToOwned::to_owned), 338 about: profile.about().map(ToOwned::to_owned), 339 picture: profile.picture().map(ToOwned::to_owned), 340 banner: profile.banner().map(ToOwned::to_owned), 341 website: profile.website().map(ToOwned::to_owned), 342 nip05: profile.nip05().map(ToOwned::to_owned), 343 lud16: profile.lud16().map(ToOwned::to_owned), 344 })) 345 } 346 } 347 348 #[cfg(test)] 349 mod tests { 350 use super::*; 351 use crate::filter::RadrootsNostrNdbFilterSpec; 352 use crate::ingest::RadrootsNostrNdbIngestSource; 353 use crate::query::RadrootsNostrNdbQuerySpec; 354 use crate::test_fixtures::{FIXTURE_ALICE_EMAIL, FIXTURE_ALICE_USERNAME}; 355 use futures::StreamExt; 356 use radroots_nostr::prelude::{RadrootsNostrEventBuilder, RadrootsNostrKeys}; 357 use radroots_nostr::prelude::{RadrootsNostrMetadata, radroots_nostr_build_metadata_event}; 358 use std::sync::atomic::Ordering; 359 use std::sync::{Mutex, OnceLock}; 360 use std::time::Duration; 361 use tempfile::TempDir; 362 363 fn test_hooks_lock() -> &'static Mutex<()> { 364 static TEST_HOOKS_LOCK: OnceLock<Mutex<()>> = OnceLock::new(); 365 TEST_HOOKS_LOCK.get_or_init(|| Mutex::new(())) 366 } 367 368 fn test_hooks_guard() -> std::sync::MutexGuard<'static, ()> { 369 test_hooks_lock().lock().expect("test hooks lock") 370 } 371 372 fn reset_test_flags() { 373 test_hooks::FORCE_EVENT_JSON_ERROR.store(false, Ordering::SeqCst); 374 test_hooks::FORCE_PROCESS_EVENT_ERROR.store(false, Ordering::SeqCst); 375 test_hooks::FORCE_SUBSCRIBE_ERROR.store(false, Ordering::SeqCst); 376 test_hooks::FORCE_UNSUBSCRIBE_ERROR.store(false, Ordering::SeqCst); 377 test_hooks::FORCE_WAIT_ERROR.store(false, Ordering::SeqCst); 378 test_hooks::FORCE_TRANSACTION_ERROR.store(false, Ordering::SeqCst); 379 test_hooks::FORCE_QUERY_ERROR.store(false, Ordering::SeqCst); 380 test_hooks::FORCE_NOTE_JSON_ERROR.store(false, Ordering::SeqCst); 381 test_hooks::FORCE_PROFILE_QUERY_ERROR.store(false, Ordering::SeqCst); 382 } 383 384 #[test] 385 fn config_builder_tracks_values() { 386 let config = RadrootsNostrNdbConfig::new("target/testdbs/nostr_ndb_config") 387 .with_mapsize_bytes(1024 * 1024) 388 .with_ingester_threads(2) 389 .with_skip_validation(true); 390 391 assert_eq!(config.mapsize_bytes(), Some(1024 * 1024)); 392 assert_eq!(config.ingester_threads(), Some(2)); 393 assert!(config.skip_validation()); 394 } 395 396 #[test] 397 fn map_profile_lookup_result_handles_all_error_kinds() { 398 let success = map_profile_lookup_result::<u64>(Ok(7)).expect("ok"); 399 assert_eq!(success, Some(7)); 400 401 let not_found = 402 map_profile_lookup_result::<u64>(Err(nostrdb::Error::NotFound)).expect("none"); 403 assert!(not_found.is_none()); 404 405 let query_error = map_profile_lookup_result::<u64>(Err(nostrdb::Error::QueryError)) 406 .expect_err("query error"); 407 assert!(query_error.to_string().starts_with("nostrdb error:")); 408 } 409 410 #[test] 411 fn open_creates_database() { 412 let tmp_dir = TempDir::new().expect("tempdir should open"); 413 let db_dir = tmp_dir.path().join("ndb"); 414 let config = RadrootsNostrNdbConfig::new(&db_dir) 415 .with_mapsize_bytes(64 * 1024 * 1024) 416 .with_ingester_threads(1); 417 418 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 419 assert_eq!(ndb.db_dir(), db_dir.as_path()); 420 assert!(db_dir.exists()); 421 } 422 423 #[cfg(unix)] 424 #[test] 425 fn open_rejects_non_utf8_path() { 426 use std::os::unix::ffi::OsStrExt; 427 428 let path = std::path::PathBuf::from(std::ffi::OsStr::from_bytes(b"ndb-\xFF")); 429 let config = RadrootsNostrNdbConfig::new(&path); 430 let err = RadrootsNostrNdb::open(config).expect_err("non utf8 path"); 431 assert!(err.to_string().contains("utf-8")); 432 } 433 434 #[test] 435 fn open_reports_ndb_error_for_file_path() { 436 let tmp_dir = TempDir::new().expect("tempdir should open"); 437 let db_dir = tmp_dir.path().join("ndb"); 438 std::fs::write(&db_dir, "not a directory").expect("write db file"); 439 let config = RadrootsNostrNdbConfig::new(&db_dir); 440 let err = RadrootsNostrNdb::open(config).expect_err("file path should fail"); 441 assert!(err.to_string().starts_with("nostrdb error:")); 442 } 443 444 #[test] 445 fn ingest_source_builders_track_origin() { 446 assert_eq!( 447 RadrootsNostrNdbIngestSource::default(), 448 RadrootsNostrNdbIngestSource::client() 449 ); 450 assert_eq!( 451 RadrootsNostrNdbIngestSource::relay("wss://radroots.org"), 452 RadrootsNostrNdbIngestSource::Relay { 453 relay_url: Some("wss://radroots.org".into()) 454 } 455 ); 456 assert_eq!( 457 RadrootsNostrNdbIngestSource::relay_unknown(), 458 RadrootsNostrNdbIngestSource::Relay { relay_url: None } 459 ); 460 } 461 462 #[test] 463 fn ingest_event_accepts_signed_note() { 464 let tmp_dir = TempDir::new().expect("tempdir should open"); 465 let db_dir = tmp_dir.path().join("ndb"); 466 let config = RadrootsNostrNdbConfig::new(&db_dir); 467 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 468 469 let keys = RadrootsNostrKeys::generate(); 470 let event = RadrootsNostrEventBuilder::text_note("hello from ndb") 471 .sign_with_keys(&keys) 472 .expect("event should sign"); 473 474 ndb.ingest_event(&event, RadrootsNostrNdbIngestSource::client()) 475 .expect("ingest should succeed"); 476 } 477 478 #[test] 479 fn ingest_event_json_accepts_signed_note() { 480 let tmp_dir = TempDir::new().expect("tempdir should open"); 481 let db_dir = tmp_dir.path().join("ndb"); 482 let config = RadrootsNostrNdbConfig::new(&db_dir); 483 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 484 485 let keys = RadrootsNostrKeys::generate(); 486 let event = RadrootsNostrEventBuilder::text_note("hello from ndb json") 487 .sign_with_keys(&keys) 488 .expect("event should sign"); 489 let json = serde_json::to_string(&event).expect("event json"); 490 491 ndb.ingest_event_json(&json) 492 .expect("json ingest should succeed"); 493 } 494 495 #[test] 496 fn ingest_event_json_rejects_invalid_json() { 497 let _guard = test_hooks_guard(); 498 reset_test_flags(); 499 let tmp_dir = TempDir::new().expect("tempdir should open"); 500 let db_dir = tmp_dir.path().join("ndb"); 501 let config = RadrootsNostrNdbConfig::new(&db_dir); 502 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 503 504 test_hooks::FORCE_PROCESS_EVENT_ERROR.store(true, Ordering::SeqCst); 505 let err = ndb 506 .ingest_event_json_with_source("not json", RadrootsNostrNdbIngestSource::client()) 507 .expect_err("process event error"); 508 assert!(err.to_string().starts_with("nostrdb error:")); 509 } 510 511 #[test] 512 fn ingest_event_reports_event_json_error() { 513 let _guard = test_hooks_guard(); 514 reset_test_flags(); 515 let tmp_dir = TempDir::new().expect("tempdir should open"); 516 let db_dir = tmp_dir.path().join("ndb"); 517 let config = RadrootsNostrNdbConfig::new(&db_dir); 518 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 519 520 let keys = RadrootsNostrKeys::generate(); 521 let event = RadrootsNostrEventBuilder::text_note("forced json error") 522 .sign_with_keys(&keys) 523 .expect("event should sign"); 524 test_hooks::FORCE_EVENT_JSON_ERROR.store(true, Ordering::SeqCst); 525 526 let err = ndb 527 .ingest_event(&event, RadrootsNostrNdbIngestSource::client()) 528 .expect_err("forced json error"); 529 assert!(err.to_string().starts_with("event json encode failed:")); 530 } 531 532 #[test] 533 fn subscribe_poll_and_unsubscribe_round_trip() { 534 let _guard = test_hooks_guard(); 535 reset_test_flags(); 536 let tmp_dir = TempDir::new().expect("tempdir should open"); 537 let db_dir = tmp_dir.path().join("ndb"); 538 let config = RadrootsNostrNdbConfig::new(&db_dir); 539 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 540 let spec = RadrootsNostrNdbSubscriptionSpec::single( 541 RadrootsNostrNdbFilterSpec::new() 542 .with_kind(1) 543 .with_limit(10), 544 ); 545 let handle = ndb.subscribe(&spec).expect("subscribe should succeed"); 546 547 let keys = RadrootsNostrKeys::generate(); 548 let event = RadrootsNostrEventBuilder::text_note("subscription test") 549 .sign_with_keys(&keys) 550 .expect("event should sign"); 551 ndb.ingest_event(&event, RadrootsNostrNdbIngestSource::relay_unknown()) 552 .expect("ingest should succeed"); 553 554 let mut notes = Vec::new(); 555 for _ in 0..40 { 556 notes = ndb.poll_for_note_keys(handle, 32); 557 if !notes.is_empty() { 558 break; 559 } 560 std::thread::sleep(Duration::from_millis(25)); 561 } 562 563 assert!(!notes.is_empty()); 564 ndb.unsubscribe(handle).expect("unsubscribe should succeed"); 565 } 566 567 #[test] 568 fn subscribe_reports_ndb_error() { 569 let _guard = test_hooks_guard(); 570 reset_test_flags(); 571 let tmp_dir = TempDir::new().expect("tempdir should open"); 572 let db_dir = tmp_dir.path().join("ndb"); 573 let config = RadrootsNostrNdbConfig::new(&db_dir); 574 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 575 let spec = RadrootsNostrNdbSubscriptionSpec::text_notes(Some(10), None); 576 test_hooks::FORCE_SUBSCRIBE_ERROR.store(true, Ordering::SeqCst); 577 578 let err = ndb.subscribe(&spec).expect_err("forced subscribe error"); 579 assert!(err.to_string().starts_with("nostrdb error:")); 580 } 581 582 #[test] 583 fn unsubscribe_reports_ndb_error() { 584 let _guard = test_hooks_guard(); 585 reset_test_flags(); 586 let tmp_dir = TempDir::new().expect("tempdir should open"); 587 let db_dir = tmp_dir.path().join("ndb"); 588 let config = RadrootsNostrNdbConfig::new(&db_dir); 589 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 590 let spec = RadrootsNostrNdbSubscriptionSpec::text_notes(Some(10), None); 591 let handle = ndb.subscribe(&spec).expect("subscribe should succeed"); 592 test_hooks::FORCE_UNSUBSCRIBE_ERROR.store(true, Ordering::SeqCst); 593 594 let err = ndb 595 .unsubscribe(handle) 596 .expect_err("forced unsubscribe error"); 597 assert!(err.to_string().starts_with("nostrdb error:")); 598 } 599 600 #[tokio::test] 601 async fn wait_for_note_keys_yields_results() { 602 let _guard = test_hooks_guard(); 603 reset_test_flags(); 604 let tmp_dir = TempDir::new().expect("tempdir should open"); 605 let db_dir = tmp_dir.path().join("ndb"); 606 let config = RadrootsNostrNdbConfig::new(&db_dir); 607 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 608 let spec = RadrootsNostrNdbSubscriptionSpec::text_notes(Some(10), None); 609 let handle = ndb.subscribe(&spec).expect("subscribe should succeed"); 610 611 let keys = RadrootsNostrKeys::generate(); 612 let event = RadrootsNostrEventBuilder::text_note("wait test") 613 .sign_with_keys(&keys) 614 .expect("event should sign"); 615 ndb.ingest_event(&event, RadrootsNostrNdbIngestSource::relay_unknown()) 616 .expect("ingest should succeed"); 617 618 let notes = ndb 619 .wait_for_note_keys(handle, 32) 620 .await 621 .expect("wait should succeed"); 622 assert!(!notes.is_empty()); 623 } 624 625 #[tokio::test] 626 async fn wait_for_note_keys_reports_ndb_error() { 627 let _guard = test_hooks_guard(); 628 reset_test_flags(); 629 let tmp_dir = TempDir::new().expect("tempdir should open"); 630 let db_dir = tmp_dir.path().join("ndb"); 631 let config = RadrootsNostrNdbConfig::new(&db_dir); 632 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 633 let spec = RadrootsNostrNdbSubscriptionSpec::text_notes(Some(10), None); 634 let handle = ndb.subscribe(&spec).expect("subscribe should succeed"); 635 test_hooks::FORCE_WAIT_ERROR.store(true, Ordering::SeqCst); 636 637 let err = ndb 638 .wait_for_note_keys(handle, 1) 639 .await 640 .expect_err("forced wait error"); 641 assert!(err.to_string().starts_with("nostrdb error:")); 642 } 643 644 #[test] 645 fn query_notes_returns_ingested_results() { 646 let _guard = test_hooks_guard(); 647 reset_test_flags(); 648 let tmp_dir = TempDir::new().expect("tempdir should open"); 649 let db_dir = tmp_dir.path().join("ndb"); 650 let config = RadrootsNostrNdbConfig::new(&db_dir); 651 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 652 653 let keys = RadrootsNostrKeys::generate(); 654 let event = RadrootsNostrEventBuilder::text_note("query note") 655 .sign_with_keys(&keys) 656 .expect("event should sign"); 657 ndb.ingest_event(&event, RadrootsNostrNdbIngestSource::client()) 658 .expect("ingest should succeed"); 659 660 let query_spec = RadrootsNostrNdbQuerySpec::text_notes(Some(50), None, 50); 661 let mut notes = Vec::new(); 662 for _ in 0..40 { 663 notes = ndb.query_notes(&query_spec).expect("query should succeed"); 664 if !notes.is_empty() { 665 break; 666 } 667 std::thread::sleep(Duration::from_millis(25)); 668 } 669 assert!(!notes.is_empty()); 670 let note_pairs = notes 671 .iter() 672 .map(|note| (note.id_hex.clone(), note.content.clone())) 673 .collect::<Vec<_>>(); 674 assert!(note_pairs.contains(&(event.id.to_hex(), "query note".to_string()))); 675 } 676 677 #[test] 678 fn query_notes_empty_filters_returns_empty() { 679 let _guard = test_hooks_guard(); 680 reset_test_flags(); 681 let tmp_dir = TempDir::new().expect("tempdir should open"); 682 let db_dir = tmp_dir.path().join("ndb"); 683 let config = RadrootsNostrNdbConfig::new(&db_dir); 684 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 685 686 let query_spec = RadrootsNostrNdbQuerySpec::new(Vec::new(), 10); 687 let notes = ndb.query_notes(&query_spec).expect("query should succeed"); 688 assert!(notes.is_empty()); 689 } 690 691 #[test] 692 fn query_notes_rejects_invalid_filters() { 693 let _guard = test_hooks_guard(); 694 reset_test_flags(); 695 let tmp_dir = TempDir::new().expect("tempdir should open"); 696 let db_dir = tmp_dir.path().join("ndb"); 697 let config = RadrootsNostrNdbConfig::new(&db_dir); 698 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 699 700 let spec = RadrootsNostrNdbQuerySpec::single( 701 RadrootsNostrNdbFilterSpec::new().with_author_hex("not-hex"), 702 10, 703 ); 704 let err = ndb.query_notes(&spec).expect_err("invalid filter"); 705 assert!(err.to_string().contains("invalid hex")); 706 } 707 708 #[test] 709 fn query_notes_reports_transaction_error() { 710 let _guard = test_hooks_guard(); 711 reset_test_flags(); 712 let tmp_dir = TempDir::new().expect("tempdir should open"); 713 let db_dir = tmp_dir.path().join("ndb"); 714 let config = RadrootsNostrNdbConfig::new(&db_dir); 715 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 716 let spec = RadrootsNostrNdbQuerySpec::text_notes(Some(10), None, 10); 717 test_hooks::FORCE_TRANSACTION_ERROR.store(true, Ordering::SeqCst); 718 719 let err = ndb 720 .query_notes(&spec) 721 .expect_err("forced transaction error"); 722 assert!(err.to_string().starts_with("nostrdb error:")); 723 } 724 725 #[test] 726 fn query_notes_reports_query_error() { 727 let _guard = test_hooks_guard(); 728 reset_test_flags(); 729 let tmp_dir = TempDir::new().expect("tempdir should open"); 730 let db_dir = tmp_dir.path().join("ndb"); 731 let config = RadrootsNostrNdbConfig::new(&db_dir); 732 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 733 let spec = RadrootsNostrNdbQuerySpec::text_notes(Some(10), None, 10); 734 test_hooks::FORCE_QUERY_ERROR.store(true, Ordering::SeqCst); 735 736 let err = ndb.query_notes(&spec).expect_err("forced query error"); 737 assert!(err.to_string().starts_with("nostrdb error:")); 738 } 739 740 #[test] 741 fn query_notes_reports_note_json_error() { 742 let _guard = test_hooks_guard(); 743 reset_test_flags(); 744 let tmp_dir = TempDir::new().expect("tempdir should open"); 745 let db_dir = tmp_dir.path().join("ndb"); 746 let config = RadrootsNostrNdbConfig::new(&db_dir); 747 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 748 749 let keys = RadrootsNostrKeys::generate(); 750 let event = RadrootsNostrEventBuilder::text_note("note json error") 751 .sign_with_keys(&keys) 752 .expect("event should sign"); 753 ndb.ingest_event(&event, RadrootsNostrNdbIngestSource::client()) 754 .expect("ingest should succeed"); 755 756 let query_spec = RadrootsNostrNdbQuerySpec::text_notes(Some(50), None, 50); 757 for _ in 0..40 { 758 let notes = ndb.query_notes(&query_spec).expect("query should succeed"); 759 if !notes.is_empty() { 760 break; 761 } 762 std::thread::sleep(Duration::from_millis(25)); 763 } 764 test_hooks::FORCE_NOTE_JSON_ERROR.store(true, Ordering::SeqCst); 765 766 let err = ndb 767 .query_notes(&query_spec) 768 .expect_err("forced note json error"); 769 assert!(err.to_string().starts_with("nostrdb error:")); 770 } 771 772 #[test] 773 fn profile_lookup_returns_metadata_fields() { 774 let _guard = test_hooks_guard(); 775 reset_test_flags(); 776 let tmp_dir = TempDir::new().expect("tempdir should open"); 777 let db_dir = tmp_dir.path().join("ndb"); 778 let config = RadrootsNostrNdbConfig::new(&db_dir); 779 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 780 781 let keys = RadrootsNostrKeys::generate(); 782 let pubkey_hex = keys.public_key().to_hex(); 783 let metadata = RadrootsNostrMetadata::new() 784 .name(FIXTURE_ALICE_USERNAME) 785 .display_name(FIXTURE_ALICE_USERNAME) 786 .about("coffee operator") 787 .lud16(FIXTURE_ALICE_EMAIL); 788 let metadata_event = radroots_nostr_build_metadata_event(&metadata) 789 .sign_with_keys(&keys) 790 .expect("metadata event should sign"); 791 ndb.ingest_event(&metadata_event, RadrootsNostrNdbIngestSource::client()) 792 .expect("ingest should succeed"); 793 794 let mut profile = None; 795 for _ in 0..40 { 796 profile = ndb 797 .get_profile_by_pubkey_hex(pubkey_hex.as_str()) 798 .expect("profile lookup should succeed"); 799 if profile.is_some() { 800 break; 801 } 802 std::thread::sleep(Duration::from_millis(25)); 803 } 804 let profile = profile.expect("profile should exist"); 805 assert_eq!(profile.pubkey_hex, pubkey_hex); 806 assert_eq!(profile.name.as_deref(), Some(FIXTURE_ALICE_USERNAME)); 807 assert_eq!( 808 profile.display_name.as_deref(), 809 Some(FIXTURE_ALICE_USERNAME) 810 ); 811 assert_eq!(profile.lud16.as_deref(), Some(FIXTURE_ALICE_EMAIL)); 812 } 813 814 #[test] 815 fn profile_lookup_returns_none_when_missing() { 816 let _guard = test_hooks_guard(); 817 reset_test_flags(); 818 let tmp_dir = TempDir::new().expect("tempdir should open"); 819 let db_dir = tmp_dir.path().join("ndb"); 820 let config = RadrootsNostrNdbConfig::new(&db_dir); 821 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 822 823 let pubkey_hex = RadrootsNostrKeys::generate().public_key().to_hex(); 824 let profile = ndb 825 .get_profile_by_pubkey_hex(pubkey_hex.as_str()) 826 .expect("profile lookup"); 827 assert!(profile.is_none()); 828 } 829 830 #[test] 831 fn profile_lookup_reports_query_error() { 832 let _guard = test_hooks_guard(); 833 reset_test_flags(); 834 let tmp_dir = TempDir::new().expect("tempdir should open"); 835 let db_dir = tmp_dir.path().join("ndb"); 836 let config = RadrootsNostrNdbConfig::new(&db_dir); 837 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 838 let pubkey_hex = RadrootsNostrKeys::generate().public_key().to_hex(); 839 test_hooks::FORCE_PROFILE_QUERY_ERROR.store(true, Ordering::SeqCst); 840 841 let err = ndb 842 .get_profile_by_pubkey_hex(pubkey_hex.as_str()) 843 .expect_err("forced profile query error"); 844 assert!(err.to_string().starts_with("nostrdb error:")); 845 } 846 847 #[test] 848 fn profile_lookup_reports_transaction_error() { 849 let _guard = test_hooks_guard(); 850 reset_test_flags(); 851 let tmp_dir = TempDir::new().expect("tempdir should open"); 852 let db_dir = tmp_dir.path().join("ndb"); 853 let config = RadrootsNostrNdbConfig::new(&db_dir); 854 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 855 let pubkey_hex = RadrootsNostrKeys::generate().public_key().to_hex(); 856 test_hooks::FORCE_TRANSACTION_ERROR.store(true, Ordering::SeqCst); 857 858 let err = ndb 859 .get_profile_by_pubkey_hex(pubkey_hex.as_str()) 860 .expect_err("forced transaction error"); 861 assert!(err.to_string().starts_with("nostrdb error:")); 862 } 863 864 #[test] 865 fn profile_lookup_returns_none_without_metadata_record() { 866 let _guard = test_hooks_guard(); 867 reset_test_flags(); 868 let tmp_dir = TempDir::new().expect("tempdir should open"); 869 let db_dir = tmp_dir.path().join("ndb"); 870 let config = RadrootsNostrNdbConfig::new(&db_dir); 871 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 872 873 let keys = RadrootsNostrKeys::generate(); 874 let pubkey_hex = keys.public_key().to_hex(); 875 let event = RadrootsNostrEventBuilder::text_note("non profile event") 876 .sign_with_keys(&keys) 877 .expect("event should sign"); 878 ndb.ingest_event(&event, RadrootsNostrNdbIngestSource::client()) 879 .expect("ingest should succeed"); 880 881 let profile = ndb 882 .get_profile_by_pubkey_hex(pubkey_hex.as_str()) 883 .expect("profile lookup"); 884 assert!(profile.is_none()); 885 } 886 887 #[test] 888 fn profile_lookup_invalid_metadata_content_returns_none() { 889 let _guard = test_hooks_guard(); 890 reset_test_flags(); 891 let tmp_dir = TempDir::new().expect("tempdir should open"); 892 let db_dir = tmp_dir.path().join("ndb"); 893 let config = RadrootsNostrNdbConfig::new(&db_dir); 894 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 895 896 let keys = RadrootsNostrKeys::generate(); 897 let pubkey_hex = keys.public_key().to_hex(); 898 let event = RadrootsNostrEventBuilder::new( 899 radroots_nostr::prelude::RadrootsNostrKind::Metadata, 900 "not valid metadata json", 901 ) 902 .sign_with_keys(&keys) 903 .expect("event should sign"); 904 ndb.ingest_event(&event, RadrootsNostrNdbIngestSource::client()) 905 .expect("ingest should succeed"); 906 907 let result = ndb.get_profile_by_pubkey_hex(pubkey_hex.as_str()); 908 assert!(result.expect("profile lookup").is_none()); 909 } 910 911 #[test] 912 fn subscribe_rejects_invalid_author_hex() { 913 let tmp_dir = TempDir::new().expect("tempdir should open"); 914 let db_dir = tmp_dir.path().join("ndb"); 915 let config = RadrootsNostrNdbConfig::new(&db_dir); 916 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 917 918 let spec = RadrootsNostrNdbSubscriptionSpec::single( 919 RadrootsNostrNdbFilterSpec::new().with_author_hex("not-hex"), 920 ); 921 let err = ndb.subscribe(&spec).expect_err("subscribe should fail"); 922 assert!(err.to_string().contains("invalid hex for author")); 923 } 924 925 #[test] 926 fn profile_lookup_rejects_invalid_pubkey_length() { 927 let _guard = test_hooks_guard(); 928 reset_test_flags(); 929 let tmp_dir = TempDir::new().expect("tempdir should open"); 930 let db_dir = tmp_dir.path().join("ndb"); 931 let config = RadrootsNostrNdbConfig::new(&db_dir); 932 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 933 934 let err = ndb 935 .get_profile_by_pubkey_hex("abcd") 936 .expect_err("lookup should fail"); 937 assert!(err.to_string().contains("invalid hex length for pubkey")); 938 } 939 940 #[tokio::test] 941 async fn subscription_stream_yields_events() { 942 let tmp_dir = TempDir::new().expect("tempdir should open"); 943 let db_dir = tmp_dir.path().join("ndb"); 944 let config = RadrootsNostrNdbConfig::new(&db_dir); 945 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 946 let spec = RadrootsNostrNdbSubscriptionSpec::text_notes(Some(10), None); 947 let handle = ndb.subscribe(&spec).expect("subscribe should succeed"); 948 let mut stream = ndb.subscription_stream(handle, 0); 949 950 let pending = tokio::time::timeout(Duration::from_millis(20), stream.next()).await; 951 assert!(pending.is_err()); 952 953 let keys = RadrootsNostrKeys::generate(); 954 let event = RadrootsNostrEventBuilder::text_note("stream note") 955 .sign_with_keys(&keys) 956 .expect("event should sign"); 957 ndb.ingest_event(&event, RadrootsNostrNdbIngestSource::client()) 958 .expect("ingest should succeed"); 959 960 let note_keys = tokio::time::timeout(Duration::from_secs(2), stream.next()) 961 .await 962 .expect("stream should wake") 963 .expect("stream should yield note keys"); 964 assert!(!note_keys.is_empty()); 965 assert!(note_keys.iter().all(|key| key.as_u64() > 0)); 966 } 967 968 #[test] 969 fn concurrent_ingest_handles_parallel_writers() { 970 let _guard = test_hooks_guard(); 971 reset_test_flags(); 972 let tmp_dir = TempDir::new().expect("tempdir should open"); 973 let db_dir = tmp_dir.path().join("ndb"); 974 let config = RadrootsNostrNdbConfig::new(&db_dir); 975 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 976 977 let worker_count = 4usize; 978 let notes_per_worker = 20usize; 979 let mut handles = Vec::new(); 980 981 for worker in 0..worker_count { 982 let db = ndb.clone(); 983 handles.push(std::thread::spawn(move || { 984 let keys = RadrootsNostrKeys::generate(); 985 for idx in 0..notes_per_worker { 986 let content = format!("parallel-{worker}-{idx}"); 987 let event = RadrootsNostrEventBuilder::text_note(content.as_str()) 988 .sign_with_keys(&keys) 989 .expect("event should sign"); 990 db.ingest_event(&event, RadrootsNostrNdbIngestSource::client()) 991 .expect("ingest should succeed"); 992 } 993 })); 994 } 995 996 for handle in handles { 997 handle.join().expect("worker should complete"); 998 } 999 1000 let query_spec = RadrootsNostrNdbQuerySpec::text_notes(Some(512), None, 512); 1001 let expected = worker_count * notes_per_worker; 1002 let mut observed = 0usize; 1003 let mut break_threshold = expected + 1; 1004 1005 for _ in 0..80 { 1006 let notes = ndb.query_notes(&query_spec).expect("query should succeed"); 1007 observed = notes 1008 .iter() 1009 .filter(|note| note.content.starts_with("parallel-")) 1010 .count(); 1011 if observed >= break_threshold { 1012 break; 1013 } 1014 break_threshold = expected; 1015 std::thread::sleep(Duration::from_millis(25)); 1016 } 1017 1018 assert!( 1019 observed >= expected, 1020 "expected at least {expected} parallel notes, got {observed}" 1021 ); 1022 } 1023 1024 #[cfg(feature = "giftwrap")] 1025 #[test] 1026 fn giftwrap_secret_key_hex_validates_length() { 1027 let tmp_dir = TempDir::new().expect("tempdir should open"); 1028 let db_dir = tmp_dir.path().join("ndb"); 1029 let config = RadrootsNostrNdbConfig::new(&db_dir); 1030 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 1031 1032 let result = ndb.add_giftwrap_secret_key_hex("abcd"); 1033 let err = result.expect_err("invalid giftwrap key"); 1034 assert!( 1035 err.to_string() 1036 .contains("invalid hex length for secret_key") 1037 ); 1038 } 1039 1040 #[cfg(feature = "giftwrap")] 1041 #[test] 1042 fn giftwrap_process_flow_executes() { 1043 let tmp_dir = TempDir::new().expect("tempdir should open"); 1044 let db_dir = tmp_dir.path().join("ndb"); 1045 let config = RadrootsNostrNdbConfig::new(&db_dir); 1046 let ndb = RadrootsNostrNdb::open(config).expect("database should open"); 1047 1048 let secret_key = [7u8; 32]; 1049 let _ = ndb.add_giftwrap_secret_key(secret_key); 1050 ndb.process_giftwraps() 1051 .expect("giftwrap processing should run"); 1052 } 1053 }