subscriber.rs (26646B)
1 #![forbid(unsafe_code)] 2 #![cfg_attr(coverage_nightly, coverage(off))] 3 4 use std::convert::TryFrom; 5 use std::time::Duration; 6 7 use anyhow::{Result, anyhow}; 8 use radroots_events::kinds::{ 9 KIND_LISTING, KIND_LISTING_DRAFT, ORDER_EVENT_KINDS, TRADE_VALIDATION_EVENT_KINDS, 10 is_trade_validation_service_event_kind, 11 }; 12 use radroots_nostr::prelude::{ 13 RadrootsNostrClient, RadrootsNostrEvent, RadrootsNostrFilter, RadrootsNostrKeys, 14 RadrootsNostrKind, RadrootsNostrRelayPoolNotification, RadrootsNostrTag, 15 radroots_nostr_tags_resolve, 16 }; 17 use tokio::sync::watch; 18 use tokio::time::sleep; 19 use tracing::{info, warn}; 20 21 use crate::features::trade_listing::{ 22 handlers::dvm::{TradeListingDvmError, handle_error, handle_event_with_policy}, 23 state::{SharedTradeListingState, TradeListingRuntime}, 24 }; 25 use crate::features::trade_validation_receipt::TradeValidationReceiptProverPolicy; 26 27 #[cfg(test)] 28 #[derive(Default)] 29 struct SubscriberTestHooks { 30 notifications: std::collections::VecDeque<Result<RadrootsNostrRelayPoolNotification, ()>>, 31 delay_before_event_handle: std::collections::VecDeque<bool>, 32 resolve_tags_results: std::collections::VecDeque< 33 Result<Vec<RadrootsNostrTag>, radroots_nostr::error::RadrootsNostrTagsResolveError>, 34 >, 35 handle_event_results: std::collections::VecDeque<Result<(), TradeListingDvmError>>, 36 handle_error_results: std::collections::VecDeque<Result<(), TradeListingDvmError>>, 37 } 38 39 #[cfg(test)] 40 static SUBSCRIBER_TEST_HOOKS: std::sync::OnceLock<std::sync::Mutex<SubscriberTestHooks>> = 41 std::sync::OnceLock::new(); 42 43 #[cfg(test)] 44 fn subscriber_test_hooks() -> &'static std::sync::Mutex<SubscriberTestHooks> { 45 SUBSCRIBER_TEST_HOOKS.get_or_init(|| std::sync::Mutex::new(SubscriberTestHooks::default())) 46 } 47 48 #[cfg(test)] 49 fn pop_notification_hook() -> Option<Result<RadrootsNostrRelayPoolNotification, ()>> { 50 subscriber_test_hooks() 51 .lock() 52 .unwrap_or_else(std::sync::PoisonError::into_inner) 53 .notifications 54 .pop_front() 55 } 56 57 #[cfg(test)] 58 fn pop_delay_hook() -> Option<bool> { 59 subscriber_test_hooks() 60 .lock() 61 .unwrap_or_else(std::sync::PoisonError::into_inner) 62 .delay_before_event_handle 63 .pop_front() 64 } 65 66 #[cfg(test)] 67 fn take_delay_hook() -> Option<bool> { 68 pop_delay_hook() 69 } 70 71 #[cfg(not(test))] 72 #[cfg_attr(coverage_nightly, coverage(off))] 73 fn take_delay_hook() -> Option<bool> { 74 None 75 } 76 77 #[cfg(test)] 78 fn pop_resolve_tags_hook() 79 -> Option<Result<Vec<RadrootsNostrTag>, radroots_nostr::error::RadrootsNostrTagsResolveError>> { 80 subscriber_test_hooks() 81 .lock() 82 .unwrap_or_else(std::sync::PoisonError::into_inner) 83 .resolve_tags_results 84 .pop_front() 85 } 86 87 #[cfg(test)] 88 fn take_resolve_tags_hook() 89 -> Option<Result<Vec<RadrootsNostrTag>, radroots_nostr::error::RadrootsNostrTagsResolveError>> { 90 pop_resolve_tags_hook() 91 } 92 93 #[cfg(not(test))] 94 #[cfg_attr(coverage_nightly, coverage(off))] 95 fn take_resolve_tags_hook() 96 -> Option<Result<Vec<RadrootsNostrTag>, radroots_nostr::error::RadrootsNostrTagsResolveError>> { 97 None 98 } 99 100 #[cfg(test)] 101 fn pop_handle_event_hook() -> Option<Result<(), TradeListingDvmError>> { 102 subscriber_test_hooks() 103 .lock() 104 .unwrap_or_else(std::sync::PoisonError::into_inner) 105 .handle_event_results 106 .pop_front() 107 } 108 109 #[cfg(test)] 110 fn take_handle_event_hook() -> Option<Result<(), TradeListingDvmError>> { 111 pop_handle_event_hook() 112 } 113 114 #[cfg(not(test))] 115 #[cfg_attr(coverage_nightly, coverage(off))] 116 fn take_handle_event_hook() -> Option<Result<(), TradeListingDvmError>> { 117 None 118 } 119 120 #[cfg(test)] 121 fn pop_handle_error_hook() -> Option<Result<(), TradeListingDvmError>> { 122 subscriber_test_hooks() 123 .lock() 124 .unwrap_or_else(std::sync::PoisonError::into_inner) 125 .handle_error_results 126 .pop_front() 127 } 128 129 #[cfg(test)] 130 fn take_handle_error_hook() -> Option<Result<(), TradeListingDvmError>> { 131 pop_handle_error_hook() 132 } 133 134 #[cfg(not(test))] 135 #[cfg_attr(coverage_nightly, coverage(off))] 136 fn take_handle_error_hook() -> Option<Result<(), TradeListingDvmError>> { 137 None 138 } 139 140 fn resolve_tags_io( 141 event: &RadrootsNostrEvent, 142 keys: &RadrootsNostrKeys, 143 ) -> Result<Vec<RadrootsNostrTag>, radroots_nostr::error::RadrootsNostrTagsResolveError> { 144 let resolved = match take_resolve_tags_hook() { 145 Some(result) => result?, 146 None => return radroots_nostr_tags_resolve(event, keys), 147 }; 148 Ok(resolved) 149 } 150 151 fn map_notification_recv_result( 152 result: Result<RadrootsNostrRelayPoolNotification, tokio::sync::broadcast::error::RecvError>, 153 ) -> Result<RadrootsNostrRelayPoolNotification, ()> { 154 result.map_err(|_| ()) 155 } 156 157 async fn handle_event_io( 158 event: RadrootsNostrEvent, 159 resolved_tags: Vec<RadrootsNostrTag>, 160 keys: RadrootsNostrKeys, 161 client: RadrootsNostrClient, 162 state: SharedTradeListingState, 163 proof_policy: TradeValidationReceiptProverPolicy, 164 ) -> Result<(), TradeListingDvmError> { 165 let result = match take_handle_event_hook() { 166 Some(result) => result, 167 None => { 168 handle_event_with_policy(event, resolved_tags, keys, client, state, &proof_policy).await 169 } 170 }; 171 result?; 172 Ok(()) 173 } 174 175 async fn handle_error_io( 176 err: TradeListingDvmError, 177 event: &RadrootsNostrEvent, 178 client: &RadrootsNostrClient, 179 ) -> Result<(), TradeListingDvmError> { 180 let result = match take_handle_error_hook() { 181 Some(result) => result, 182 None => handle_error(err, event, client).await, 183 }; 184 result?; 185 Ok(()) 186 } 187 188 fn should_delay_before_event_handle() -> bool { 189 if let Some(delay) = take_delay_hook() { 190 return delay; 191 } 192 cfg!(all(debug_assertions, not(test))) 193 } 194 195 #[cfg_attr(all(not(test), coverage_nightly), coverage(off))] 196 async fn process_event_notification( 197 event: RadrootsNostrEvent, 198 keys: RadrootsNostrKeys, 199 client: RadrootsNostrClient, 200 runtime: TradeListingRuntime, 201 proof_policy: TradeValidationReceiptProverPolicy, 202 ) -> Result<()> { 203 let created_at = u32::try_from(event.created_at.as_secs()).unwrap_or(u32::MAX); 204 if should_delay_before_event_handle() { 205 sleep(Duration::from_millis(200)).await; 206 } 207 208 let resolved_tags = match resolve_tags_io(&event, &keys) { 209 Ok(tags) => tags, 210 Err(err) => { 211 warn!("trade_listing: failed to resolve tags: {err}"); 212 return Ok(()); 213 } 214 }; 215 216 let state = runtime.state(); 217 let event_kind = match event.kind { 218 RadrootsNostrKind::Custom(v) => Some(u32::from(v)), 219 _ => None, 220 }; 221 if let Err(err) = handle_event_io( 222 event.clone(), 223 resolved_tags, 224 keys, 225 client.clone(), 226 state.clone(), 227 proof_policy, 228 ) 229 .await 230 { 231 match err { 232 TradeListingDvmError::MissingRecipient | TradeListingDvmError::UnsupportedKind => {} 233 other => { 234 if event_kind.is_some_and(is_trade_validation_service_event_kind) { 235 if let Err(err) = handle_error_io(other, &event, &client).await { 236 warn!("trade_listing: failed to send error feedback: {err}"); 237 } 238 } else { 239 warn!("trade_listing: rejected public trade event: {other}"); 240 } 241 runtime.mark_processed_event(created_at).await?; 242 } 243 } 244 return Ok(()); 245 } 246 247 runtime.mark_processed_event(created_at).await?; 248 Ok(()) 249 } 250 251 async fn dispatch_event_processing( 252 event: RadrootsNostrEvent, 253 keys: RadrootsNostrKeys, 254 client: RadrootsNostrClient, 255 runtime: TradeListingRuntime, 256 proof_policy: TradeValidationReceiptProverPolicy, 257 ) -> Result<()> { 258 process_event_notification(event, keys, client, runtime, proof_policy).await 259 } 260 261 pub async fn subscriber( 262 client: RadrootsNostrClient, 263 keys: RadrootsNostrKeys, 264 runtime: TradeListingRuntime, 265 proof_policy: TradeValidationReceiptProverPolicy, 266 mut stop_rx: watch::Receiver<bool>, 267 ) -> Result<()> { 268 let subscribed_kinds = [KIND_LISTING, KIND_LISTING_DRAFT] 269 .into_iter() 270 .chain(ORDER_EVENT_KINDS) 271 .chain(TRADE_VALIDATION_EVENT_KINDS) 272 .collect::<Vec<_>>(); 273 info!( 274 "Starting subscriber for trade listing, order, and trade validation kinds: {:?}", 275 subscribed_kinds 276 ); 277 278 let kinds: Vec<RadrootsNostrKind> = subscribed_kinds 279 .iter() 280 .map(|kind| u16::try_from(*kind).expect("trade listing kinds fit in nostr custom range")) 281 .map(RadrootsNostrKind::Custom) 282 .collect(); 283 let filter: RadrootsNostrFilter = runtime.recovery_filter(kinds).await; 284 285 if *stop_rx.borrow() { 286 return Ok(()); 287 } 288 289 let subscription = client.subscribe(filter, None).await?; 290 let mut notifications = client.notifications(); 291 292 let mut notifications_closed = false; 293 294 loop { 295 tokio::select! { 296 _ = stop_rx.changed() => { 297 break; 298 } 299 msg = async { 300 #[cfg(test)] 301 if let Some(result) = pop_notification_hook() { 302 return result; 303 } 304 map_notification_recv_result(notifications.recv().await) 305 } => { 306 let n = match msg { 307 Ok(n) => n, 308 Err(_) => { 309 notifications_closed = true; 310 break; 311 } 312 }; 313 314 if let RadrootsNostrRelayPoolNotification::Event { event, .. } = n { 315 let event = (*event).clone(); 316 let keys = keys.clone(); 317 let client = client.clone(); 318 let runtime = runtime.clone(); 319 let proof_policy = proof_policy.clone(); 320 dispatch_event_processing(event, keys, client, runtime, proof_policy).await?; 321 } 322 } 323 } 324 } 325 326 client.unsubscribe(&subscription.val).await; 327 if notifications_closed { 328 return Err(anyhow!("trade_listing subscriber notifications closed")); 329 } 330 Ok(()) 331 } 332 333 #[cfg(test)] 334 #[cfg_attr(coverage_nightly, coverage(off))] 335 mod tests { 336 use super::{ 337 SubscriberTestHooks, handle_error_io, handle_event_io, map_notification_recv_result, 338 process_event_notification, resolve_tags_io, subscriber, subscriber_test_hooks, 339 }; 340 use crate::features::trade_listing::handlers::dvm::TradeListingDvmError; 341 use crate::features::trade_listing::state::TradeListingRuntime; 342 use crate::features::trade_validation_receipt::TradeValidationReceiptProverPolicy; 343 use radroots_nostr::error::RadrootsNostrTagsResolveError; 344 use radroots_nostr::prelude::{ 345 RadrootsNostrClient, RadrootsNostrEventBuilder, RadrootsNostrKeys, RadrootsNostrKind, 346 RadrootsNostrRelayPoolNotification, RadrootsNostrRelayUrl, RadrootsNostrSubscriptionId, 347 RadrootsNostrTag, 348 }; 349 use tokio::sync::{Mutex, MutexGuard, watch}; 350 351 static TEST_LOCK: Mutex<()> = Mutex::const_new(()); 352 353 async fn test_guard() -> MutexGuard<'static, ()> { 354 let guard = TEST_LOCK.lock().await; 355 *subscriber_test_hooks() 356 .lock() 357 .unwrap_or_else(std::sync::PoisonError::into_inner) = SubscriberTestHooks::default(); 358 guard 359 } 360 361 fn scripted_event_notification(keys: &RadrootsNostrKeys) -> RadrootsNostrRelayPoolNotification { 362 let event = RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(6000), "test") 363 .sign_with_keys(keys) 364 .expect("event"); 365 RadrootsNostrRelayPoolNotification::Event { 366 relay_url: RadrootsNostrRelayUrl::parse("wss://relay.example.com").expect("relay"), 367 subscription_id: RadrootsNostrSubscriptionId::new("sub-1"), 368 event: Box::new(event), 369 } 370 } 371 372 fn scripted_shutdown_notification() -> RadrootsNostrRelayPoolNotification { 373 RadrootsNostrRelayPoolNotification::Shutdown 374 } 375 376 fn shared_runtime() -> TradeListingRuntime { 377 TradeListingRuntime::new() 378 } 379 380 fn proof_policy() -> TradeValidationReceiptProverPolicy { 381 TradeValidationReceiptProverPolicy::default() 382 } 383 384 #[test] 385 fn notification_recv_result_mapping_covers_ok_and_err() { 386 let keys = RadrootsNostrKeys::generate(); 387 assert!(map_notification_recv_result(Ok(scripted_event_notification(&keys))).is_ok()); 388 assert!( 389 map_notification_recv_result(Err(tokio::sync::broadcast::error::RecvError::Closed)) 390 .is_err() 391 ); 392 } 393 394 #[tokio::test] 395 async fn subscriber_io_wrappers_cover_fallback_and_hook_paths() { 396 let _guard = test_guard().await; 397 let keys = RadrootsNostrKeys::generate(); 398 let client = RadrootsNostrClient::new(keys.clone()); 399 let event = RadrootsNostrEventBuilder::new(RadrootsNostrKind::TextNote, "test") 400 .sign_with_keys(&keys) 401 .expect("event"); 402 403 let _ = resolve_tags_io(&event, &keys); 404 subscriber_test_hooks() 405 .lock() 406 .unwrap_or_else(std::sync::PoisonError::into_inner) 407 .resolve_tags_results 408 .push_back(Ok(Vec::<RadrootsNostrTag>::new())); 409 assert!(resolve_tags_io(&event, &keys).is_ok()); 410 411 let runtime = shared_runtime(); 412 let state = runtime.state(); 413 assert!(matches!( 414 handle_event_io( 415 event.clone(), 416 Vec::new(), 417 keys.clone(), 418 client.clone(), 419 state.clone(), 420 proof_policy() 421 ) 422 .await, 423 Err(TradeListingDvmError::UnsupportedKind) 424 )); 425 subscriber_test_hooks() 426 .lock() 427 .unwrap_or_else(std::sync::PoisonError::into_inner) 428 .handle_event_results 429 .push_back(Ok(())); 430 assert!( 431 handle_event_io( 432 event.clone(), 433 Vec::new(), 434 keys.clone(), 435 client.clone(), 436 state, 437 proof_policy() 438 ) 439 .await 440 .is_ok() 441 ); 442 443 let _ = handle_error_io(TradeListingDvmError::InvalidOrder, &event, &client).await; 444 subscriber_test_hooks() 445 .lock() 446 .unwrap_or_else(std::sync::PoisonError::into_inner) 447 .handle_error_results 448 .push_back(Ok(())); 449 assert!( 450 handle_error_io(TradeListingDvmError::InvalidOrder, &event, &client) 451 .await 452 .is_ok() 453 ); 454 } 455 456 #[tokio::test] 457 async fn subscriber_returns_ok_when_stop_is_pre_requested() { 458 let _guard = test_guard().await; 459 let keys = RadrootsNostrKeys::generate(); 460 let client = RadrootsNostrClient::new(keys.clone()); 461 let (_tx, rx) = watch::channel(true); 462 assert!( 463 subscriber(client, keys, shared_runtime(), proof_policy(), rx) 464 .await 465 .is_ok() 466 ); 467 } 468 469 #[tokio::test] 470 async fn subscriber_reuses_runtime_owned_state_across_runs() { 471 let _guard = test_guard().await; 472 let keys = RadrootsNostrKeys::generate(); 473 let client = RadrootsNostrClient::new(keys.clone()); 474 let runtime = shared_runtime(); 475 let state = runtime.state(); 476 state 477 .lock() 478 .await 479 .mark_listing_validated("addr", "evt-listing-1"); 480 481 let (_tx_first, rx_first) = watch::channel(true); 482 assert!( 483 subscriber( 484 client.clone(), 485 keys.clone(), 486 runtime.clone(), 487 proof_policy(), 488 rx_first 489 ) 490 .await 491 .is_ok() 492 ); 493 assert!(state.lock().await.is_listing_validated("addr")); 494 495 let (_tx_second, rx_second) = watch::channel(true); 496 assert!( 497 subscriber(client, keys, runtime.clone(), proof_policy(), rx_second) 498 .await 499 .is_ok() 500 ); 501 assert!(state.lock().await.is_listing_validated("addr")); 502 } 503 504 #[tokio::test] 505 async fn subscriber_returns_err_when_no_relays_are_configured() { 506 let _guard = test_guard().await; 507 let keys = RadrootsNostrKeys::generate(); 508 let client = RadrootsNostrClient::new(keys.clone()); 509 let (_tx, rx) = watch::channel(false); 510 let err = subscriber(client, keys, shared_runtime(), proof_policy(), rx) 511 .await 512 .expect_err("expected relay error"); 513 let msg = format!("{err:#}"); 514 assert!(msg.contains("relay")); 515 } 516 517 #[tokio::test] 518 async fn subscriber_can_stop_after_start_when_relay_is_present() { 519 let _guard = test_guard().await; 520 let keys = RadrootsNostrKeys::generate(); 521 let client = RadrootsNostrClient::new(keys.clone()); 522 let _ = client.add_relay("wss://relay.example.com").await; 523 let (tx, rx) = watch::channel(false); 524 let join = tokio::spawn(subscriber( 525 client, 526 keys, 527 shared_runtime(), 528 proof_policy(), 529 rx, 530 )); 531 tokio::time::sleep(std::time::Duration::from_millis(20)).await; 532 let _ = tx.send(true); 533 let _ = join.await; 534 } 535 536 #[tokio::test] 537 async fn subscriber_covers_notification_closed_path() { 538 let _guard = test_guard().await; 539 let keys = RadrootsNostrKeys::generate(); 540 let client = RadrootsNostrClient::new(keys.clone()); 541 let _ = client.add_relay("wss://relay.example.com").await; 542 subscriber_test_hooks() 543 .lock() 544 .unwrap_or_else(std::sync::PoisonError::into_inner) 545 .notifications 546 .push_back(Err(())); 547 let (_tx, rx) = watch::channel(false); 548 let err = subscriber(client, keys, shared_runtime(), proof_policy(), rx) 549 .await 550 .expect_err("closed notifications"); 551 let msg = format!("{err:#}"); 552 assert!(msg.contains("notifications closed")); 553 } 554 555 #[tokio::test] 556 async fn subscriber_covers_non_event_notification_and_stop_ok_path() { 557 let _guard = test_guard().await; 558 let keys = RadrootsNostrKeys::generate(); 559 let client = RadrootsNostrClient::new(keys.clone()); 560 let _ = client.add_relay("wss://relay.example.com").await; 561 562 subscriber_test_hooks() 563 .lock() 564 .unwrap_or_else(std::sync::PoisonError::into_inner) 565 .notifications 566 .push_back(Ok(scripted_shutdown_notification())); 567 568 let (tx, rx) = watch::channel(false); 569 let join = tokio::spawn(subscriber( 570 client, 571 keys, 572 shared_runtime(), 573 proof_policy(), 574 rx, 575 )); 576 tokio::time::sleep(std::time::Duration::from_millis(30)).await; 577 let _ = tx.send(true); 578 let result = join.await.expect("subscriber join"); 579 assert!(result.is_ok()); 580 } 581 582 #[tokio::test] 583 async fn subscriber_covers_event_processing_paths() { 584 let _guard = test_guard().await; 585 let keys = RadrootsNostrKeys::generate(); 586 let client = RadrootsNostrClient::new(keys.clone()); 587 let _ = client.add_relay("wss://relay.example.com").await; 588 subscriber_test_hooks() 589 .lock() 590 .unwrap_or_else(std::sync::PoisonError::into_inner) 591 .notifications 592 .push_back(Ok(scripted_event_notification(&keys))); 593 subscriber_test_hooks() 594 .lock() 595 .unwrap_or_else(std::sync::PoisonError::into_inner) 596 .resolve_tags_results 597 .push_back(Err(RadrootsNostrTagsResolveError::DecryptionError( 598 "resolve-failed".to_string(), 599 ))); 600 let (tx, rx) = watch::channel(false); 601 let join = tokio::spawn(subscriber( 602 client, 603 keys, 604 shared_runtime(), 605 proof_policy(), 606 rx, 607 )); 608 tokio::time::sleep(std::time::Duration::from_millis(30)).await; 609 let _ = tx.send(true); 610 let _ = join.await; 611 } 612 613 #[tokio::test] 614 async fn subscriber_covers_handle_event_and_error_paths() { 615 let _guard = test_guard().await; 616 let keys = RadrootsNostrKeys::generate(); 617 let client = RadrootsNostrClient::new(keys.clone()); 618 let _ = client.add_relay("wss://relay.example.com").await; 619 620 let mut hooks = subscriber_test_hooks() 621 .lock() 622 .unwrap_or_else(std::sync::PoisonError::into_inner); 623 hooks 624 .notifications 625 .push_back(Ok(scripted_event_notification(&keys))); 626 hooks 627 .notifications 628 .push_back(Ok(scripted_event_notification(&keys))); 629 hooks.resolve_tags_results.push_back(Ok(Vec::new())); 630 hooks.resolve_tags_results.push_back(Ok(Vec::new())); 631 hooks 632 .handle_event_results 633 .push_back(Err(TradeListingDvmError::MissingRecipient)); 634 hooks 635 .handle_event_results 636 .push_back(Err(TradeListingDvmError::InvalidOrder)); 637 hooks 638 .handle_error_results 639 .push_back(Err(TradeListingDvmError::InvalidOrder)); 640 drop(hooks); 641 642 let (tx, rx) = watch::channel(false); 643 let join = tokio::spawn(subscriber( 644 client, 645 keys, 646 shared_runtime(), 647 proof_policy(), 648 rx, 649 )); 650 tokio::time::sleep(std::time::Duration::from_millis(40)).await; 651 let _ = tx.send(true); 652 let _ = join.await; 653 } 654 655 #[tokio::test] 656 async fn subscriber_covers_delay_and_error_feedback_warn_path() { 657 let _guard = test_guard().await; 658 let keys = RadrootsNostrKeys::generate(); 659 let client = RadrootsNostrClient::new(keys.clone()); 660 let _ = client.add_relay("wss://relay.example.com").await; 661 662 let mut hooks = subscriber_test_hooks() 663 .lock() 664 .unwrap_or_else(std::sync::PoisonError::into_inner); 665 hooks 666 .notifications 667 .push_back(Ok(scripted_event_notification(&keys))); 668 hooks.notifications.push_back(Err(())); 669 hooks.delay_before_event_handle.push_back(true); 670 hooks.resolve_tags_results.push_back(Ok(Vec::new())); 671 hooks 672 .handle_event_results 673 .push_back(Err(TradeListingDvmError::InvalidOrder)); 674 hooks 675 .handle_error_results 676 .push_back(Err(TradeListingDvmError::InvalidOrder)); 677 drop(hooks); 678 679 let (_tx, rx) = watch::channel(false); 680 let err = subscriber(client, keys, shared_runtime(), proof_policy(), rx) 681 .await 682 .expect_err("notifications closed"); 683 let msg = format!("{err:#}"); 684 assert!(msg.contains("notifications closed")); 685 } 686 687 #[tokio::test] 688 async fn handled_domain_errors_advance_replay_anchor() { 689 let _guard = test_guard().await; 690 let keys = RadrootsNostrKeys::generate(); 691 let client = RadrootsNostrClient::new(keys.clone()); 692 let runtime = shared_runtime(); 693 let event = RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(6000), "test") 694 .custom_created_at(1_234_u64.into()) 695 .sign_with_keys(&keys) 696 .expect("event"); 697 698 let mut hooks = subscriber_test_hooks() 699 .lock() 700 .unwrap_or_else(std::sync::PoisonError::into_inner); 701 hooks.resolve_tags_results.push_back(Ok(Vec::new())); 702 hooks 703 .handle_event_results 704 .push_back(Err(TradeListingDvmError::InvalidOrder)); 705 hooks.handle_error_results.push_back(Ok(())); 706 drop(hooks); 707 708 process_event_notification(event, keys, client, runtime.clone(), proof_policy()) 709 .await 710 .expect("notification"); 711 712 assert_eq!( 713 runtime.state().lock().await.last_event_created_at(), 714 Some(1_234) 715 ); 716 } 717 718 #[tokio::test] 719 async fn subscriber_process_event_feedback_error_branches_are_covered() { 720 let _guard = test_guard().await; 721 let keys = RadrootsNostrKeys::generate(); 722 let client = RadrootsNostrClient::new(keys.clone()); 723 let event = RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(6000), "event") 724 .sign_with_keys(&keys) 725 .expect("event"); 726 let runtime = shared_runtime(); 727 728 let mut hooks = subscriber_test_hooks() 729 .lock() 730 .unwrap_or_else(std::sync::PoisonError::into_inner); 731 hooks.resolve_tags_results.push_back(Ok(Vec::new())); 732 hooks 733 .handle_event_results 734 .push_back(Err(TradeListingDvmError::InvalidOrder)); 735 hooks 736 .handle_error_results 737 .push_back(Err(TradeListingDvmError::InvalidOrder)); 738 drop(hooks); 739 740 process_event_notification(event, keys, client, runtime, proof_policy()) 741 .await 742 .expect("processing"); 743 } 744 745 #[tokio::test] 746 async fn subscriber_process_event_feedback_non_error_branches_are_covered() { 747 let _guard = test_guard().await; 748 let keys = RadrootsNostrKeys::generate(); 749 let client = RadrootsNostrClient::new(keys.clone()); 750 let event_ok = RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(6000), "ok") 751 .sign_with_keys(&keys) 752 .expect("event ok"); 753 let event_err = RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(6000), "err") 754 .sign_with_keys(&keys) 755 .expect("event err"); 756 let runtime = shared_runtime(); 757 758 let mut hooks = subscriber_test_hooks() 759 .lock() 760 .unwrap_or_else(std::sync::PoisonError::into_inner); 761 hooks.resolve_tags_results.push_back(Ok(Vec::new())); 762 hooks.handle_event_results.push_back(Ok(())); 763 hooks.resolve_tags_results.push_back(Ok(Vec::new())); 764 hooks 765 .handle_event_results 766 .push_back(Err(TradeListingDvmError::InvalidOrder)); 767 hooks.handle_error_results.push_back(Ok(())); 768 drop(hooks); 769 770 process_event_notification( 771 event_ok, 772 keys.clone(), 773 client.clone(), 774 runtime.clone(), 775 proof_policy(), 776 ) 777 .await 778 .expect("ok path"); 779 process_event_notification(event_err, keys, client, runtime, proof_policy()) 780 .await 781 .expect("error path"); 782 } 783 }