nip46_e2e.rs (146062B)
1 use std::collections::{HashMap, VecDeque}; 2 use std::net::TcpListener as StdTcpListener; 3 use std::sync::Arc; 4 use std::time::Duration; 5 6 use futures_util::{SinkExt, StreamExt}; 7 use myc::control; 8 use myc::{ 9 MycActiveIdentity, MycConfig, MycConnectionApproval, MycDeliveryOutboxKind, 10 MycDeliveryOutboxRecord, MycDeliveryOutboxStatus, MycDiscoveryContext, MycDiscoveryLiveStatus, 11 MycDiscoveryRelayFetchStatus, MycDiscoveryRepairOutcome, MycOperationAuditKind, 12 MycOperationAuditOutcome, MycOperationAuditRecord, MycRuntime, MycRuntimeAuditBackend, 13 MycSignerStateBackend, MycTransportDeliveryPolicy, diff_live_nip89, fetch_live_nip89, 14 publish_nip89_event, refresh_nip89, 15 }; 16 use nostr::filter::MatchEventOptions; 17 use nostr::nips::nip44; 18 use nostr::nips::nip44::Version; 19 use nostr::nips::nip46::{ 20 NostrConnectMessage as ExternalNostrConnectMessage, 21 NostrConnectMethod as ExternalNostrConnectMethod, 22 NostrConnectRequest as ExternalNostrConnectRequest, 23 NostrConnectResponse as ExternalNostrConnectResponse, ResponseResult as ExternalResponseResult, 24 }; 25 use nostr::{ 26 ClientMessage, Event, EventBuilder, Filter, JsonUtil, Keys, Kind, PublicKey, RelayMessage, 27 SecretKey, SubscriptionId, Tag, Timestamp, UnsignedEvent, 28 }; 29 use radroots_identity::RadrootsIdentity; 30 use radroots_nostr::prelude::{ 31 RadrootsNostrApplicationHandlerSpec, RadrootsNostrClient, RadrootsNostrEventBuilder, 32 RadrootsNostrKind, RadrootsNostrMetadata, RadrootsNostrRelayUrl, RadrootsNostrTag, 33 radroots_nostr_build_application_handler_event, 34 }; 35 use radroots_nostr_connect::prelude::{ 36 RADROOTS_NOSTR_CONNECT_RPC_KIND, RadrootsNostrConnectClientMetadata, 37 RadrootsNostrConnectClientUri, RadrootsNostrConnectRequest, RadrootsNostrConnectRequestMessage, 38 RadrootsNostrConnectResponse, RadrootsNostrConnectResponseEnvelope, RadrootsNostrConnectUri, 39 }; 40 use radroots_nostr_signer::prelude::{ 41 RadrootsNostrSignerApprovalRequirement, RadrootsNostrSignerAuthState, 42 RadrootsNostrSignerConnectionDraft, 43 }; 44 use tempfile::TempDir; 45 use tokio::net::{TcpListener, TcpStream}; 46 use tokio::sync::{Mutex, Notify, mpsc, oneshot}; 47 use tokio::time::{Instant, sleep, timeout}; 48 use tokio_tungstenite::tungstenite::Message; 49 50 type TestResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>; 51 52 const RELAY_EVENT_TIMEOUT: Duration = Duration::from_secs(15); 53 const EXTERNAL_RESPONSE_TIMEOUT: Duration = Duration::from_secs(15); 54 const RUNTIME_STATE_TIMEOUT: Duration = Duration::from_secs(15); 55 const POLL_INTERVAL: Duration = Duration::from_millis(25); 56 57 #[derive(Clone)] 58 struct RelaySubscription { 59 connection_id: usize, 60 subscription_id: SubscriptionId, 61 filters: Vec<Filter>, 62 } 63 64 #[derive(Default)] 65 struct RelayState { 66 next_connection_id: usize, 67 senders: HashMap<usize, mpsc::UnboundedSender<Message>>, 68 subscriptions: Vec<RelaySubscription>, 69 published_events: Vec<Event>, 70 publish_outcomes_by_pubkey: HashMap<String, VecDeque<bool>>, 71 } 72 73 struct TestRelay { 74 url: String, 75 state: Arc<Mutex<RelayState>>, 76 notify: Arc<Notify>, 77 shutdown_tx: Option<oneshot::Sender<()>>, 78 } 79 80 impl TestRelay { 81 async fn spawn() -> TestResult<Self> { 82 let listener = TcpListener::bind("127.0.0.1:0").await?; 83 let addr = listener.local_addr()?; 84 let url = format!("ws://{addr}"); 85 let state = Arc::new(Mutex::new(RelayState::default())); 86 let notify = Arc::new(Notify::new()); 87 let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); 88 let relay_state = Arc::clone(&state); 89 let relay_notify = Arc::clone(¬ify); 90 91 tokio::spawn(async move { 92 loop { 93 tokio::select! { 94 _ = &mut shutdown_rx => break, 95 accept = listener.accept() => { 96 let Ok((stream, _)) = accept else { 97 break; 98 }; 99 let state = Arc::clone(&relay_state); 100 let notify = Arc::clone(&relay_notify); 101 tokio::spawn(async move { 102 let _ = handle_relay_connection(stream, state, notify).await; 103 }); 104 } 105 } 106 } 107 }); 108 109 Ok(Self { 110 url, 111 state, 112 notify, 113 shutdown_tx: Some(shutdown_tx), 114 }) 115 } 116 117 fn url(&self) -> &str { 118 self.url.as_str() 119 } 120 121 async fn queue_publish_outcomes(&self, public_key: PublicKey, outcomes: &[bool]) { 122 let mut state = self.state.lock().await; 123 state 124 .publish_outcomes_by_pubkey 125 .insert(public_key.to_hex(), outcomes.iter().copied().collect()); 126 } 127 128 async fn wait_for_subscription_count(&self, expected: usize) -> TestResult<()> { 129 timeout(RELAY_EVENT_TIMEOUT, async { 130 loop { 131 if self.state.lock().await.subscriptions.len() >= expected { 132 return; 133 } 134 self.notify.notified().await; 135 } 136 }) 137 .await?; 138 Ok(()) 139 } 140 141 async fn wait_for_published_events_by_author( 142 &self, 143 public_key: PublicKey, 144 expected: usize, 145 ) -> TestResult<Vec<Event>> { 146 timeout(RELAY_EVENT_TIMEOUT, async { 147 loop { 148 let events = self.published_events_by_author(public_key).await; 149 if events.len() >= expected { 150 return events; 151 } 152 self.notify.notified().await; 153 } 154 }) 155 .await 156 .map_err(Into::into) 157 } 158 159 async fn published_events_by_author(&self, public_key: PublicKey) -> Vec<Event> { 160 self.state 161 .lock() 162 .await 163 .published_events 164 .iter() 165 .filter(|event| event.pubkey == public_key) 166 .cloned() 167 .collect() 168 } 169 } 170 171 impl Drop for TestRelay { 172 fn drop(&mut self) { 173 if let Some(shutdown_tx) = self.shutdown_tx.take() { 174 let _ = shutdown_tx.send(()); 175 } 176 } 177 } 178 179 struct HangingRelay { 180 url: String, 181 shutdown_tx: Option<oneshot::Sender<()>>, 182 } 183 184 impl HangingRelay { 185 async fn spawn(hold_open_for: Duration) -> TestResult<Self> { 186 let listener = TcpListener::bind("127.0.0.1:0").await?; 187 let addr = listener.local_addr()?; 188 let url = format!("ws://{addr}"); 189 let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); 190 191 tokio::spawn(async move { 192 loop { 193 tokio::select! { 194 _ = &mut shutdown_rx => break, 195 accept = listener.accept() => { 196 let Ok((stream, _)) = accept else { 197 break; 198 }; 199 tokio::spawn(async move { 200 sleep(hold_open_for).await; 201 drop(stream); 202 }); 203 } 204 } 205 } 206 }); 207 208 Ok(Self { 209 url, 210 shutdown_tx: Some(shutdown_tx), 211 }) 212 } 213 214 fn url(&self) -> &str { 215 self.url.as_str() 216 } 217 } 218 219 impl Drop for HangingRelay { 220 fn drop(&mut self) { 221 if let Some(shutdown_tx) = self.shutdown_tx.take() { 222 let _ = shutdown_tx.send(()); 223 } 224 } 225 } 226 227 async fn handle_relay_connection( 228 stream: TcpStream, 229 state: Arc<Mutex<RelayState>>, 230 notify: Arc<Notify>, 231 ) -> TestResult<()> { 232 let websocket = tokio_tungstenite::accept_async(stream).await?; 233 let (mut writer, mut reader) = websocket.split(); 234 let (tx, mut rx) = mpsc::unbounded_channel::<Message>(); 235 let connection_id = { 236 let mut state = state.lock().await; 237 let connection_id = state.next_connection_id; 238 state.next_connection_id += 1; 239 state.senders.insert(connection_id, tx); 240 notify.notify_waiters(); 241 connection_id 242 }; 243 244 let writer_task = tokio::spawn(async move { 245 while let Some(message) = rx.recv().await { 246 if writer.send(message).await.is_err() { 247 break; 248 } 249 } 250 }); 251 252 while let Some(message) = reader.next().await { 253 let message = message?; 254 let Message::Text(text) = message else { 255 continue; 256 }; 257 let client_message = ClientMessage::from_json(text.as_str())?; 258 handle_client_message(connection_id, client_message, &state, ¬ify).await?; 259 } 260 261 writer_task.abort(); 262 let mut state = state.lock().await; 263 state.senders.remove(&connection_id); 264 state 265 .subscriptions 266 .retain(|subscription| subscription.connection_id != connection_id); 267 notify.notify_waiters(); 268 Ok(()) 269 } 270 271 async fn handle_client_message( 272 connection_id: usize, 273 client_message: ClientMessage<'_>, 274 state: &Arc<Mutex<RelayState>>, 275 notify: &Arc<Notify>, 276 ) -> TestResult<()> { 277 match client_message { 278 ClientMessage::Req { 279 subscription_id, 280 filters, 281 } => { 282 let (sender, matching_events) = { 283 let mut state = state.lock().await; 284 let matching_events = state 285 .published_events 286 .iter() 287 .filter(|event| { 288 filters 289 .iter() 290 .any(|filter| filter.match_event(event, MatchEventOptions::new())) 291 }) 292 .cloned() 293 .collect::<Vec<_>>(); 294 state.subscriptions.push(RelaySubscription { 295 connection_id, 296 subscription_id: subscription_id.as_ref().clone(), 297 filters: filters 298 .into_iter() 299 .map(|filter| filter.into_owned()) 300 .collect(), 301 }); 302 notify.notify_waiters(); 303 (state.senders.get(&connection_id).cloned(), matching_events) 304 }; 305 if let Some(sender) = sender { 306 for event in matching_events { 307 let message = 308 RelayMessage::event(subscription_id.as_ref().clone(), event).as_json(); 309 let _ = sender.send(Message::Text(message.into())); 310 } 311 let eose = RelayMessage::eose(subscription_id.as_ref().clone()).as_json(); 312 let _ = sender.send(Message::Text(eose.into())); 313 } 314 } 315 ClientMessage::Close(subscription_id) => { 316 let mut state = state.lock().await; 317 state.subscriptions.retain(|subscription| { 318 subscription.connection_id != connection_id 319 || subscription.subscription_id != *subscription_id 320 }); 321 notify.notify_waiters(); 322 } 323 ClientMessage::Event(event) => { 324 let event = event.into_owned(); 325 let (ok_message, subscriber_messages) = 326 accept_published_event(connection_id, event, state, notify).await?; 327 if let Some((sender, message)) = ok_message { 328 let _ = sender.send(message); 329 } 330 for (sender, message) in subscriber_messages { 331 let _ = sender.send(message); 332 } 333 } 334 _ => {} 335 } 336 337 Ok(()) 338 } 339 340 async fn accept_published_event( 341 connection_id: usize, 342 event: Event, 343 state: &Arc<Mutex<RelayState>>, 344 notify: &Arc<Notify>, 345 ) -> TestResult<( 346 Option<(mpsc::UnboundedSender<Message>, Message)>, 347 Vec<(mpsc::UnboundedSender<Message>, Message)>, 348 )> { 349 let event_id = event.id; 350 let event_pubkey_hex = event.pubkey.to_hex(); 351 let mut subscriber_messages = Vec::new(); 352 let mut ok_message = None; 353 354 { 355 let mut state = state.lock().await; 356 let publish_status = state 357 .publish_outcomes_by_pubkey 358 .get_mut(&event_pubkey_hex) 359 .and_then(|outcomes| outcomes.pop_front()) 360 .unwrap_or(true); 361 362 if let Some(sender) = state.senders.get(&connection_id).cloned() { 363 let message = if publish_status { 364 RelayMessage::ok(event_id, true, "").as_json() 365 } else { 366 RelayMessage::ok(event_id, false, "blocked by test relay").as_json() 367 }; 368 ok_message = Some((sender, Message::Text(message.into()))); 369 } 370 371 if publish_status { 372 state.published_events.push(event.clone()); 373 for subscription in &state.subscriptions { 374 if subscription 375 .filters 376 .iter() 377 .any(|filter| filter.match_event(&event, MatchEventOptions::new())) 378 { 379 if let Some(sender) = state.senders.get(&subscription.connection_id).cloned() { 380 let message = RelayMessage::event( 381 subscription.subscription_id.clone(), 382 event.clone(), 383 ) 384 .as_json(); 385 subscriber_messages.push((sender, Message::Text(message.into()))); 386 } 387 } 388 } 389 notify.notify_waiters(); 390 } 391 } 392 393 Ok((ok_message, subscriber_messages)) 394 } 395 396 struct MycTestRuntime { 397 _temp: TempDir, 398 runtime: MycRuntime, 399 } 400 401 impl MycTestRuntime { 402 fn new(relay_url: &str, approval: MycConnectionApproval) -> Self { 403 Self::new_with_transport_relays(&[relay_url], approval) 404 } 405 406 fn new_with_transport_relays(relay_urls: &[&str], approval: MycConnectionApproval) -> Self { 407 Self::new_with_transport_config(relay_urls, approval, |_| {}) 408 } 409 410 fn new_with_transport_config<F>( 411 relay_urls: &[&str], 412 approval: MycConnectionApproval, 413 configure: F, 414 ) -> Self 415 where 416 F: FnOnce(&mut MycConfig), 417 { 418 let temp = tempfile::tempdir().expect("tempdir"); 419 let mut config = MycConfig::default(); 420 config.paths.state_dir = temp.path().join("state"); 421 config.paths.signer_identity_path = temp.path().join("signer.json"); 422 config.paths.user_identity_path = temp.path().join("user.json"); 423 config.policy.connection_approval = approval; 424 config.transport.enabled = true; 425 config.transport.connect_timeout_secs = 1; 426 config.transport.relays = relay_urls.iter().map(|relay| (*relay).to_owned()).collect(); 427 configure(&mut config); 428 write_identity( 429 &config.paths.signer_identity_path, 430 "1111111111111111111111111111111111111111111111111111111111111111", 431 ); 432 write_identity( 433 &config.paths.user_identity_path, 434 "2222222222222222222222222222222222222222222222222222222222222222", 435 ); 436 437 Self { 438 runtime: MycRuntime::bootstrap(config).expect("runtime"), 439 _temp: temp, 440 } 441 } 442 443 fn new_with_discovery(relay_url: &str, approval: MycConnectionApproval) -> Self { 444 Self::new_with_discovery_relays(&[relay_url], approval) 445 } 446 447 fn new_with_discovery_relays(relay_urls: &[&str], approval: MycConnectionApproval) -> Self { 448 Self::new_with_discovery_relays_and_timeout(relay_urls, approval, 1) 449 } 450 451 fn new_with_discovery_relays_and_timeout( 452 relay_urls: &[&str], 453 approval: MycConnectionApproval, 454 connect_timeout_secs: u64, 455 ) -> Self { 456 let temp = tempfile::tempdir().expect("tempdir"); 457 let mut config = MycConfig::default(); 458 config.paths.state_dir = temp.path().join("state"); 459 config.paths.signer_identity_path = temp.path().join("signer.json"); 460 config.paths.user_identity_path = temp.path().join("user.json"); 461 config.policy.connection_approval = approval; 462 config.transport.connect_timeout_secs = connect_timeout_secs; 463 config.discovery.enabled = true; 464 config.discovery.domain = Some("signer.example.com".to_owned()); 465 config.discovery.public_relays = 466 relay_urls.iter().map(|relay| (*relay).to_owned()).collect(); 467 config.discovery.publish_relays = 468 relay_urls.iter().map(|relay| (*relay).to_owned()).collect(); 469 config.discovery.nostrconnect_url_template = 470 Some("https://signer.example.com/connect?uri=<nostrconnect>".to_owned()); 471 config.discovery.app_identity_path = Some(temp.path().join("app.json")); 472 write_identity( 473 &config.paths.signer_identity_path, 474 "1111111111111111111111111111111111111111111111111111111111111111", 475 ); 476 write_identity( 477 &config.paths.user_identity_path, 478 "2222222222222222222222222222222222222222222222222222222222222222", 479 ); 480 write_identity( 481 config 482 .discovery 483 .app_identity_path 484 .as_ref() 485 .expect("app identity path"), 486 "6666666666666666666666666666666666666666666666666666666666666666", 487 ); 488 489 Self { 490 runtime: MycRuntime::bootstrap(config).expect("runtime"), 491 _temp: temp, 492 } 493 } 494 } 495 496 fn write_identity(path: &std::path::Path, secret_key: &str) { 497 let identity = RadrootsIdentity::from_secret_key_str(secret_key).expect("identity"); 498 myc::identity_files::store_encrypted_identity(path, &identity).expect("save identity"); 499 } 500 501 fn identity(secret_key: &str) -> RadrootsIdentity { 502 RadrootsIdentity::from_secret_key_str(secret_key).expect("identity") 503 } 504 505 fn unavailable_relay_url() -> TestResult<String> { 506 let listener = StdTcpListener::bind("127.0.0.1:0")?; 507 let addr = listener.local_addr()?; 508 drop(listener); 509 Ok(format!("ws://{addr}")) 510 } 511 512 async fn publish_handler_event( 513 relay_url: &str, 514 identity: &RadrootsIdentity, 515 spec: &RadrootsNostrApplicationHandlerSpec, 516 ) -> TestResult<Event> { 517 let event = radroots_nostr_build_application_handler_event(spec)? 518 .sign_with_keys(identity.keys()) 519 .map_err(|error| format!("failed to sign handler event: {error}"))?; 520 let client = RadrootsNostrClient::from_identity(identity); 521 let _ = client.add_relay(relay_url).await?; 522 client.connect().await; 523 client.wait_for_connection(Duration::from_secs(1)).await; 524 let output = client.send_event(&event).await?; 525 assert!( 526 !output.success.is_empty(), 527 "handler event publish did not succeed: {:?}", 528 output.failed 529 ); 530 Ok(event) 531 } 532 533 async fn publish_signed_event( 534 relay_url: &str, 535 identity: &RadrootsIdentity, 536 event: &Event, 537 ) -> TestResult<()> { 538 let client = RadrootsNostrClient::from_identity(identity); 539 let _ = client.add_relay(relay_url).await?; 540 client.connect().await; 541 client.wait_for_connection(Duration::from_secs(1)).await; 542 let output = client.send_event(event).await?; 543 assert!( 544 !output.success.is_empty(), 545 "signed event publish did not succeed: {:?}", 546 output.failed 547 ); 548 Ok(()) 549 } 550 551 fn connect_request_message( 552 request_id: &str, 553 signer_public_key: PublicKey, 554 secret: &str, 555 ) -> RadrootsNostrConnectRequestMessage { 556 RadrootsNostrConnectRequestMessage::new( 557 request_id, 558 RadrootsNostrConnectRequest::Connect { 559 remote_signer_public_key: signer_public_key, 560 secret: Some(secret.to_owned()), 561 requested_permissions: Default::default(), 562 }, 563 ) 564 } 565 566 fn ping_request_message(request_id: &str) -> RadrootsNostrConnectRequestMessage { 567 RadrootsNostrConnectRequestMessage::new(request_id, RadrootsNostrConnectRequest::Ping) 568 } 569 570 fn build_request_event( 571 client_identity: &RadrootsIdentity, 572 signer_public_key: PublicKey, 573 request_message: RadrootsNostrConnectRequestMessage, 574 created_at_unix: u64, 575 ) -> Event { 576 let payload = serde_json::to_string(&request_message).expect("request payload"); 577 let ciphertext = nip44::encrypt( 578 client_identity.keys().secret_key(), 579 &signer_public_key, 580 payload, 581 Version::V2, 582 ) 583 .expect("encrypt request"); 584 EventBuilder::new(Kind::Custom(RADROOTS_NOSTR_CONNECT_RPC_KIND), ciphertext) 585 .tags([Tag::public_key(signer_public_key)]) 586 .custom_created_at(Timestamp::from(created_at_unix)) 587 .sign_with_keys(client_identity.keys()) 588 .expect("sign request event") 589 } 590 591 fn build_external_request_message( 592 request_id: &str, 593 request: &ExternalNostrConnectRequest, 594 ) -> ExternalNostrConnectMessage { 595 ExternalNostrConnectMessage::Request { 596 id: request_id.to_owned(), 597 method: request.method(), 598 params: request.params(), 599 } 600 } 601 602 fn build_external_request_event( 603 client_identity: &RadrootsIdentity, 604 signer_public_key: PublicKey, 605 request_message: &ExternalNostrConnectMessage, 606 created_at_unix: u64, 607 ) -> Event { 608 let payload = request_message.as_json(); 609 let ciphertext = nip44::encrypt( 610 client_identity.keys().secret_key(), 611 &signer_public_key, 612 payload, 613 Version::V2, 614 ) 615 .expect("encrypt external request"); 616 EventBuilder::new(Kind::Custom(RADROOTS_NOSTR_CONNECT_RPC_KIND), ciphertext) 617 .tags([Tag::public_key(signer_public_key)]) 618 .custom_created_at(Timestamp::from(created_at_unix)) 619 .sign_with_keys(client_identity.keys()) 620 .expect("sign external request event") 621 } 622 623 fn build_signer_noise_event(signer_identity: &MycActiveIdentity, created_at_unix: u64) -> Event { 624 signer_identity 625 .sign_event_builder( 626 RadrootsNostrEventBuilder::new( 627 RadrootsNostrKind::Custom(RADROOTS_NOSTR_CONNECT_RPC_KIND), 628 "non-nip44-signer-noise", 629 ) 630 .custom_created_at(Timestamp::from(created_at_unix)), 631 "signer noise event", 632 ) 633 .expect("sign noise event") 634 } 635 636 fn decrypt_response( 637 client_identity: &RadrootsIdentity, 638 signer_public_key: PublicKey, 639 response_event: &Event, 640 ) -> RadrootsNostrConnectResponseEnvelope { 641 let plaintext = nip44::decrypt( 642 client_identity.keys().secret_key(), 643 &signer_public_key, 644 &response_event.content, 645 ) 646 .expect("decrypt response"); 647 serde_json::from_str(&plaintext).expect("response envelope") 648 } 649 650 async fn wait_for_external_response( 651 relay: &TestRelay, 652 client_identity: &RadrootsIdentity, 653 signer_public_key: PublicKey, 654 request_id: &str, 655 method: ExternalNostrConnectMethod, 656 ) -> TestResult<(Event, ExternalNostrConnectResponse)> { 657 timeout(EXTERNAL_RESPONSE_TIMEOUT, async { 658 loop { 659 let events = relay.published_events_by_author(signer_public_key).await; 660 for event in events { 661 let Ok(plaintext) = nip44::decrypt( 662 client_identity.keys().secret_key(), 663 &signer_public_key, 664 &event.content, 665 ) else { 666 continue; 667 }; 668 let Ok(message) = ExternalNostrConnectMessage::from_json(&plaintext) else { 669 continue; 670 }; 671 if message.id() != request_id { 672 continue; 673 } 674 let response = message.to_response(method)?; 675 return Ok((event, response)); 676 } 677 sleep(POLL_INTERVAL).await; 678 } 679 }) 680 .await? 681 } 682 683 async fn publish_external_request_and_wait_for_response( 684 relay: &TestRelay, 685 client_identity: &RadrootsIdentity, 686 signer_public_key: PublicKey, 687 request_id: &str, 688 request: ExternalNostrConnectRequest, 689 created_at_unix: u64, 690 ) -> TestResult<(Event, ExternalNostrConnectResponse)> { 691 let method = request.method(); 692 let request_message = build_external_request_message(request_id, &request); 693 let event = build_external_request_event( 694 client_identity, 695 signer_public_key, 696 &request_message, 697 created_at_unix, 698 ); 699 publish_event(relay.url(), &event).await?; 700 wait_for_external_response( 701 relay, 702 client_identity, 703 signer_public_key, 704 request_id, 705 method, 706 ) 707 .await 708 } 709 710 fn register_external_client_session( 711 runtime: &MycRuntime, 712 client_public_key: PublicKey, 713 relay_url: &str, 714 permissions: &str, 715 ) -> TestResult<()> { 716 let manager = runtime.signer_manager()?; 717 let requested_permissions: radroots_nostr_connect::prelude::RadrootsNostrConnectPermissions = 718 if permissions.trim().is_empty() { 719 Default::default() 720 } else { 721 permissions.parse()? 722 }; 723 let connection = manager.register_connection( 724 RadrootsNostrSignerConnectionDraft::new(client_public_key, runtime.user_public_identity()) 725 .with_requested_permissions(requested_permissions.clone()) 726 .with_relays(vec![relay_url.parse()?]) 727 .with_approval_requirement(RadrootsNostrSignerApprovalRequirement::NotRequired), 728 )?; 729 let _ = manager.set_granted_permissions(&connection.connection_id, requested_permissions)?; 730 Ok(()) 731 } 732 733 async fn publish_event(relay_url: &str, event: &Event) -> TestResult<()> { 734 let (mut websocket, _) = tokio_tungstenite::connect_async(relay_url).await?; 735 websocket 736 .send(Message::Text( 737 ClientMessage::event(event.clone()).as_json().into(), 738 )) 739 .await?; 740 741 while let Some(message) = websocket.next().await { 742 let message = message?; 743 let Message::Text(text) = message else { 744 continue; 745 }; 746 let relay_message = RelayMessage::from_json(text.as_str())?; 747 if let RelayMessage::Ok { 748 event_id, 749 status, 750 message, 751 } = relay_message 752 { 753 assert_eq!(event_id, event.id); 754 assert!(status, "client publish rejected: {message}"); 755 return Ok(()); 756 } 757 } 758 759 Err("relay connection closed before OK".into()) 760 } 761 762 async fn wait_for_connection_count(runtime: &MycRuntime, expected: usize) -> TestResult<()> { 763 timeout(RUNTIME_STATE_TIMEOUT, async { 764 loop { 765 if runtime 766 .signer_manager() 767 .expect("manager") 768 .list_connections() 769 .expect("connections") 770 .len() 771 >= expected 772 { 773 return; 774 } 775 sleep(POLL_INTERVAL).await; 776 } 777 }) 778 .await?; 779 Ok(()) 780 } 781 782 async fn wait_for_connect_secret_consumed(runtime: &MycRuntime) -> TestResult<()> { 783 timeout(RUNTIME_STATE_TIMEOUT, async { 784 loop { 785 let consumed = runtime 786 .signer_manager() 787 .expect("manager") 788 .list_connections() 789 .expect("connections") 790 .into_iter() 791 .any(|connection| connection.connect_secret_is_consumed()); 792 if consumed { 793 return; 794 } 795 sleep(POLL_INTERVAL).await; 796 } 797 }) 798 .await?; 799 Ok(()) 800 } 801 802 async fn wait_for_operation_audit_count( 803 runtime: &MycRuntime, 804 expected: usize, 805 ) -> TestResult<Vec<MycOperationAuditRecord>> { 806 timeout(RUNTIME_STATE_TIMEOUT, async { 807 loop { 808 let records = runtime 809 .operation_audit_store() 810 .list() 811 .expect("operation audit"); 812 if records.len() >= expected { 813 return records; 814 } 815 sleep(POLL_INTERVAL).await; 816 } 817 }) 818 .await 819 .map_err(Into::into) 820 } 821 822 async fn wait_for_delivery_outbox_records<F>( 823 runtime: &MycRuntime, 824 predicate: F, 825 ) -> TestResult<Vec<MycDeliveryOutboxRecord>> 826 where 827 F: Fn(&[MycDeliveryOutboxRecord]) -> bool, 828 { 829 timeout(RUNTIME_STATE_TIMEOUT, async { 830 loop { 831 let records = runtime 832 .delivery_outbox_store() 833 .list_all() 834 .expect("delivery outbox"); 835 if predicate(&records) { 836 return records; 837 } 838 sleep(POLL_INTERVAL).await; 839 } 840 }) 841 .await 842 .map_err(Into::into) 843 } 844 845 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 846 async fn live_listener_rejects_denied_clients_without_registering_connection() -> TestResult<()> { 847 let relay = TestRelay::spawn().await?; 848 let client_identity = 849 identity("7777777777777777777777777777777777777777777777777777777777777777"); 850 let test_runtime = MycTestRuntime::new_with_transport_config( 851 &[relay.url()], 852 MycConnectionApproval::ExplicitUser, 853 |config| { 854 config.policy.denied_client_pubkeys = vec![client_identity.public_key().to_hex()]; 855 }, 856 ); 857 let runtime = test_runtime.runtime.clone(); 858 let signer_public_key = runtime.signer_identity().public_key(); 859 860 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); 861 let service_runtime = runtime.clone(); 862 let listener_task = tokio::spawn(async move { 863 service_runtime 864 .run_until(async { 865 let _ = shutdown_rx.await; 866 }) 867 .await 868 }); 869 870 relay.wait_for_subscription_count(1).await?; 871 872 let request_event = build_request_event( 873 &client_identity, 874 signer_public_key, 875 connect_request_message("denied-connect", signer_public_key, "denied-secret"), 876 Timestamp::now().as_secs(), 877 ); 878 publish_event(relay.url(), &request_event).await?; 879 880 let response_events = relay 881 .wait_for_published_events_by_author(signer_public_key, 1) 882 .await?; 883 let response = decrypt_response(&client_identity, signer_public_key, &response_events[0]); 884 assert_eq!(response.id, "denied-connect"); 885 let parsed = radroots_nostr_connect::prelude::RadrootsNostrConnectResponse::from_envelope( 886 &RadrootsNostrConnectRequest::Connect { 887 remote_signer_public_key: signer_public_key, 888 secret: Some("denied-secret".to_owned()), 889 requested_permissions: Default::default(), 890 } 891 .method(), 892 response, 893 )?; 894 assert_eq!( 895 parsed, 896 radroots_nostr_connect::prelude::RadrootsNostrConnectResponse::Error { 897 result: None, 898 error: "client public key denied by policy".to_owned(), 899 } 900 ); 901 assert!(runtime.signer_manager()?.list_connections()?.is_empty()); 902 903 let _ = shutdown_tx.send(()); 904 listener_task.await??; 905 Ok(()) 906 } 907 908 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 909 async fn external_nostr_client_compatibility_covers_connect_and_base_methods() -> TestResult<()> { 910 let relay = TestRelay::spawn().await?; 911 let test_runtime = MycTestRuntime::new(relay.url(), MycConnectionApproval::NotRequired); 912 let runtime = test_runtime.runtime.clone(); 913 let signer_public_key = runtime.signer_identity().public_key(); 914 let user_public_key = runtime.user_identity().public_key(); 915 let client_identity = 916 identity("3333333333333333333333333333333333333333333333333333333333333333"); 917 let base_created_at = Timestamp::now().as_secs(); 918 919 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); 920 let service_runtime = runtime.clone(); 921 let listener_task = tokio::spawn(async move { 922 service_runtime 923 .run_until(async { 924 let _ = shutdown_rx.await; 925 }) 926 .await 927 }); 928 929 relay.wait_for_subscription_count(1).await?; 930 931 let (_, connect_response) = publish_external_request_and_wait_for_response( 932 &relay, 933 &client_identity, 934 signer_public_key, 935 "external-connect", 936 ExternalNostrConnectRequest::Connect { 937 remote_signer_public_key: signer_public_key, 938 secret: None, 939 }, 940 base_created_at, 941 ) 942 .await?; 943 assert_eq!(connect_response.result, Some(ExternalResponseResult::Ack)); 944 assert_eq!(connect_response.error, None); 945 946 wait_for_connection_count(&runtime, 1).await?; 947 948 let (_, get_public_key_response) = publish_external_request_and_wait_for_response( 949 &relay, 950 &client_identity, 951 signer_public_key, 952 "external-get-public-key", 953 ExternalNostrConnectRequest::GetPublicKey, 954 base_created_at + 1, 955 ) 956 .await?; 957 assert_eq!( 958 get_public_key_response.result, 959 Some(ExternalResponseResult::GetPublicKey(user_public_key)) 960 ); 961 assert_eq!(get_public_key_response.error, None); 962 963 let (_, ping_response) = publish_external_request_and_wait_for_response( 964 &relay, 965 &client_identity, 966 signer_public_key, 967 "external-ping", 968 ExternalNostrConnectRequest::Ping, 969 base_created_at + 2, 970 ) 971 .await?; 972 assert_eq!(ping_response.result, Some(ExternalResponseResult::Pong)); 973 assert_eq!(ping_response.error, None); 974 975 let _ = shutdown_tx.send(()); 976 listener_task.await??; 977 Ok(()) 978 } 979 980 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 981 async fn external_nostr_client_compatibility_covers_signed_and_crypto_methods() -> TestResult<()> { 982 let relay = TestRelay::spawn().await?; 983 let test_runtime = MycTestRuntime::new(relay.url(), MycConnectionApproval::NotRequired); 984 let runtime = test_runtime.runtime.clone(); 985 let signer_public_key = runtime.signer_identity().public_key(); 986 let user_public_key = runtime.user_identity().public_key(); 987 let client_identity = 988 identity("3333333333333333333333333333333333333333333333333333333333333333"); 989 let peer_identity = 990 identity("4444444444444444444444444444444444444444444444444444444444444444"); 991 let base_created_at = Timestamp::now().as_secs(); 992 993 register_external_client_session( 994 &runtime, 995 client_identity.public_key(), 996 relay.url(), 997 "sign_event:1,nip04_encrypt,nip04_decrypt,nip44_encrypt,nip44_decrypt", 998 )?; 999 1000 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); 1001 let service_runtime = runtime.clone(); 1002 let listener_task = tokio::spawn(async move { 1003 service_runtime 1004 .run_until(async { 1005 let _ = shutdown_rx.await; 1006 }) 1007 .await 1008 }); 1009 1010 relay.wait_for_subscription_count(1).await?; 1011 1012 let unsigned_event: UnsignedEvent = serde_json::from_value(serde_json::json!({ 1013 "pubkey": user_public_key.to_hex(), 1014 "created_at": base_created_at, 1015 "kind": 1, 1016 "tags": [], 1017 "content": "hello from an external nostr client" 1018 }))?; 1019 let (_, sign_event_response) = publish_external_request_and_wait_for_response( 1020 &relay, 1021 &client_identity, 1022 signer_public_key, 1023 "external-sign-event", 1024 ExternalNostrConnectRequest::SignEvent(unsigned_event.clone()), 1025 base_created_at, 1026 ) 1027 .await?; 1028 let signed_event = sign_event_response 1029 .result 1030 .expect("sign_event result") 1031 .to_sign_event()?; 1032 assert_eq!(signed_event.pubkey, user_public_key); 1033 assert_eq!(signed_event.kind, unsigned_event.kind); 1034 assert_eq!(signed_event.content, unsigned_event.content); 1035 signed_event.verify()?; 1036 1037 let (_, nip04_encrypt_response) = publish_external_request_and_wait_for_response( 1038 &relay, 1039 &client_identity, 1040 signer_public_key, 1041 "external-nip04-encrypt", 1042 ExternalNostrConnectRequest::Nip04Encrypt { 1043 public_key: peer_identity.public_key(), 1044 text: "hello via nip04".to_owned(), 1045 }, 1046 base_created_at + 1, 1047 ) 1048 .await?; 1049 let nip04_ciphertext = nip04_encrypt_response 1050 .result 1051 .expect("nip04 encrypt result") 1052 .to_nip04_encrypt()?; 1053 let nip04_plaintext = nostr::nips::nip04::decrypt( 1054 peer_identity.keys().secret_key(), 1055 &user_public_key, 1056 nip04_ciphertext.clone(), 1057 )?; 1058 assert_eq!(nip04_plaintext, "hello via nip04"); 1059 1060 let nip04_reply_ciphertext = nostr::nips::nip04::encrypt( 1061 peer_identity.keys().secret_key(), 1062 &user_public_key, 1063 "reply via nip04".to_owned(), 1064 )?; 1065 let (_, nip04_decrypt_response) = publish_external_request_and_wait_for_response( 1066 &relay, 1067 &client_identity, 1068 signer_public_key, 1069 "external-nip04-decrypt", 1070 ExternalNostrConnectRequest::Nip04Decrypt { 1071 public_key: peer_identity.public_key(), 1072 ciphertext: nip04_reply_ciphertext, 1073 }, 1074 base_created_at + 2, 1075 ) 1076 .await?; 1077 assert_eq!( 1078 nip04_decrypt_response 1079 .result 1080 .expect("nip04 decrypt result") 1081 .to_nip04_decrypt()?, 1082 "reply via nip04" 1083 ); 1084 1085 let (_, nip44_encrypt_response) = publish_external_request_and_wait_for_response( 1086 &relay, 1087 &client_identity, 1088 signer_public_key, 1089 "external-nip44-encrypt", 1090 ExternalNostrConnectRequest::Nip44Encrypt { 1091 public_key: peer_identity.public_key(), 1092 text: "hello via nip44".to_owned(), 1093 }, 1094 base_created_at + 3, 1095 ) 1096 .await?; 1097 let nip44_ciphertext = nip44_encrypt_response 1098 .result 1099 .expect("nip44 encrypt result") 1100 .to_nip44_encrypt()?; 1101 let nip44_plaintext = nip44::decrypt( 1102 peer_identity.keys().secret_key(), 1103 &user_public_key, 1104 &nip44_ciphertext, 1105 )?; 1106 assert_eq!(nip44_plaintext, "hello via nip44"); 1107 1108 let nip44_reply_ciphertext = nip44::encrypt( 1109 peer_identity.keys().secret_key(), 1110 &user_public_key, 1111 "reply via nip44".to_owned(), 1112 Version::V2, 1113 )?; 1114 let (_, nip44_decrypt_response) = publish_external_request_and_wait_for_response( 1115 &relay, 1116 &client_identity, 1117 signer_public_key, 1118 "external-nip44-decrypt", 1119 ExternalNostrConnectRequest::Nip44Decrypt { 1120 public_key: peer_identity.public_key(), 1121 ciphertext: nip44_reply_ciphertext, 1122 }, 1123 base_created_at + 4, 1124 ) 1125 .await?; 1126 assert_eq!( 1127 nip44_decrypt_response 1128 .result 1129 .expect("nip44 decrypt result") 1130 .to_nip44_decrypt()?, 1131 "reply via nip44" 1132 ); 1133 1134 let _ = shutdown_tx.send(()); 1135 listener_task.await??; 1136 Ok(()) 1137 } 1138 1139 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 1140 async fn external_nostr_client_surfaces_pending_approval_state() -> TestResult<()> { 1141 let relay = TestRelay::spawn().await?; 1142 let test_runtime = MycTestRuntime::new(relay.url(), MycConnectionApproval::ExplicitUser); 1143 let runtime = test_runtime.runtime.clone(); 1144 let signer_public_key = runtime.signer_identity().public_key(); 1145 let client_identity = 1146 identity("8888888888888888888888888888888888888888888888888888888888888888"); 1147 let base_created_at = Timestamp::now().as_secs(); 1148 1149 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); 1150 let service_runtime = runtime.clone(); 1151 let listener_task = tokio::spawn(async move { 1152 service_runtime 1153 .run_until(async { 1154 let _ = shutdown_rx.await; 1155 }) 1156 .await 1157 }); 1158 1159 relay.wait_for_subscription_count(1).await?; 1160 1161 let (_, connect_response) = publish_external_request_and_wait_for_response( 1162 &relay, 1163 &client_identity, 1164 signer_public_key, 1165 "external-explicit-connect", 1166 ExternalNostrConnectRequest::Connect { 1167 remote_signer_public_key: signer_public_key, 1168 secret: None, 1169 }, 1170 base_created_at, 1171 ) 1172 .await?; 1173 assert_eq!(connect_response.result, Some(ExternalResponseResult::Ack)); 1174 1175 wait_for_connection_count(&runtime, 1).await?; 1176 1177 let (_, pending_response) = publish_external_request_and_wait_for_response( 1178 &relay, 1179 &client_identity, 1180 signer_public_key, 1181 "external-pending-get-public-key", 1182 ExternalNostrConnectRequest::GetPublicKey, 1183 base_created_at + 1, 1184 ) 1185 .await?; 1186 assert_eq!(pending_response.result, None); 1187 assert_eq!( 1188 pending_response.error.as_deref(), 1189 Some("connection is pending") 1190 ); 1191 1192 let _ = shutdown_tx.send(()); 1193 listener_task.await??; 1194 Ok(()) 1195 } 1196 1197 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 1198 async fn external_nostr_client_surfaces_auth_challenge_state() -> TestResult<()> { 1199 let relay = TestRelay::spawn().await?; 1200 let client_identity = 1201 identity("8989898989898989898989898989898989898989898989898989898989898989"); 1202 let test_runtime = MycTestRuntime::new(relay.url(), MycConnectionApproval::NotRequired); 1203 let runtime = test_runtime.runtime.clone(); 1204 let signer_public_key = runtime.signer_identity().public_key(); 1205 let base_created_at = Timestamp::now().as_secs(); 1206 1207 register_external_client_session(&runtime, client_identity.public_key(), relay.url(), "")?; 1208 let connection_id = runtime 1209 .signer_manager()? 1210 .list_connections()? 1211 .into_iter() 1212 .find(|connection| connection.client_public_key == client_identity.public_key()) 1213 .expect("active connection") 1214 .connection_id; 1215 let _ = runtime 1216 .signer_manager()? 1217 .require_auth_challenge(&connection_id, "https://auth.example/challenge")?; 1218 1219 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); 1220 let service_runtime = runtime.clone(); 1221 let listener_task = tokio::spawn(async move { 1222 service_runtime 1223 .run_until(async { 1224 let _ = shutdown_rx.await; 1225 }) 1226 .await 1227 }); 1228 1229 relay.wait_for_subscription_count(1).await?; 1230 1231 let (_, connect_response) = publish_external_request_and_wait_for_response( 1232 &relay, 1233 &client_identity, 1234 signer_public_key, 1235 "external-auth-ping", 1236 ExternalNostrConnectRequest::Ping, 1237 base_created_at, 1238 ) 1239 .await?; 1240 assert_eq!( 1241 connect_response.result, 1242 Some(ExternalResponseResult::AuthUrl) 1243 ); 1244 assert_eq!( 1245 connect_response.error.as_deref(), 1246 Some("https://auth.example/challenge") 1247 ); 1248 1249 let _ = shutdown_tx.send(()); 1250 listener_task.await??; 1251 Ok(()) 1252 } 1253 1254 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 1255 async fn external_nostr_client_ignores_unrelated_signer_events_before_response() -> TestResult<()> { 1256 let relay = TestRelay::spawn().await?; 1257 let test_runtime = MycTestRuntime::new(relay.url(), MycConnectionApproval::NotRequired); 1258 let runtime = test_runtime.runtime.clone(); 1259 let signer_identity = runtime.signer_identity(); 1260 let signer_public_key = signer_identity.public_key(); 1261 let client_identity = 1262 identity("5656565656565656565656565656565656565656565656565656565656565656"); 1263 let base_created_at = Timestamp::now().as_secs(); 1264 1265 register_external_client_session(&runtime, client_identity.public_key(), relay.url(), "")?; 1266 1267 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); 1268 let service_runtime = runtime.clone(); 1269 let listener_task = tokio::spawn(async move { 1270 service_runtime 1271 .run_until(async { 1272 let _ = shutdown_rx.await; 1273 }) 1274 .await 1275 }); 1276 1277 relay.wait_for_subscription_count(1).await?; 1278 1279 let noise_event = build_signer_noise_event(&signer_identity, base_created_at); 1280 publish_event(relay.url(), &noise_event).await?; 1281 1282 let (_, ping_response) = publish_external_request_and_wait_for_response( 1283 &relay, 1284 &client_identity, 1285 signer_public_key, 1286 "external-noise-ping", 1287 ExternalNostrConnectRequest::Ping, 1288 base_created_at + 1, 1289 ) 1290 .await?; 1291 assert_eq!(ping_response.result, Some(ExternalResponseResult::Pong)); 1292 assert_eq!(ping_response.error, None); 1293 1294 let _ = shutdown_tx.send(()); 1295 listener_task.await??; 1296 Ok(()) 1297 } 1298 1299 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 1300 async fn live_listener_consumes_connect_secret_only_after_successful_publish() -> TestResult<()> { 1301 let relay = TestRelay::spawn().await?; 1302 let test_runtime = MycTestRuntime::new(relay.url(), MycConnectionApproval::NotRequired); 1303 let runtime = test_runtime.runtime.clone(); 1304 let signer_public_key = runtime.signer_identity().public_key(); 1305 let client_identity = 1306 identity("3333333333333333333333333333333333333333333333333333333333333333"); 1307 let base_created_at = Timestamp::now().as_secs(); 1308 1309 relay 1310 .queue_publish_outcomes(signer_public_key, &[false, true]) 1311 .await; 1312 1313 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); 1314 let service_runtime = runtime.clone(); 1315 let listener_task = tokio::spawn(async move { 1316 service_runtime 1317 .run_until(async { 1318 let _ = shutdown_rx.await; 1319 }) 1320 .await 1321 }); 1322 1323 relay.wait_for_subscription_count(1).await?; 1324 1325 let request_one = build_request_event( 1326 &client_identity, 1327 signer_public_key, 1328 connect_request_message("connect-1", signer_public_key, "shared-secret"), 1329 base_created_at, 1330 ); 1331 publish_event(relay.url(), &request_one).await?; 1332 wait_for_connection_count(&runtime, 1).await?; 1333 sleep(Duration::from_millis(100)).await; 1334 1335 assert!( 1336 relay 1337 .published_events_by_author(signer_public_key) 1338 .await 1339 .is_empty() 1340 ); 1341 let initial_connection = runtime 1342 .signer_manager()? 1343 .list_connections()? 1344 .into_iter() 1345 .next() 1346 .expect("stored connection"); 1347 assert!(!initial_connection.connect_secret_is_consumed()); 1348 let operation_audit = wait_for_operation_audit_count(&runtime, 1).await?; 1349 assert_eq!(operation_audit.len(), 1); 1350 assert_eq!( 1351 operation_audit[0].operation, 1352 MycOperationAuditKind::ListenerResponsePublish 1353 ); 1354 assert_eq!( 1355 operation_audit[0].outcome, 1356 MycOperationAuditOutcome::Rejected 1357 ); 1358 assert_eq!( 1359 operation_audit[0].connection_id.as_deref(), 1360 Some(initial_connection.connection_id.as_str()) 1361 ); 1362 assert_eq!(operation_audit[0].request_id.as_deref(), Some("connect-1")); 1363 assert_eq!(operation_audit[0].relay_count, 1); 1364 assert_eq!(operation_audit[0].acknowledged_relay_count, 0); 1365 assert!( 1366 operation_audit[0] 1367 .relay_outcome_summary 1368 .contains("blocked by test relay") 1369 ); 1370 let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| { 1371 records.len() >= 1 && records[0].status == MycDeliveryOutboxStatus::Failed 1372 }) 1373 .await?; 1374 assert_eq!( 1375 outbox_records[0].kind, 1376 MycDeliveryOutboxKind::ListenerResponsePublish 1377 ); 1378 assert_eq!(outbox_records[0].status, MycDeliveryOutboxStatus::Failed); 1379 assert_eq!( 1380 outbox_records[0] 1381 .connection_id 1382 .as_ref() 1383 .map(|value| value.as_str()), 1384 Some(initial_connection.connection_id.as_str()) 1385 ); 1386 assert_eq!(outbox_records[0].request_id.as_deref(), Some("connect-1")); 1387 assert!(outbox_records[0].signer_publish_workflow_id.is_some()); 1388 assert!( 1389 runtime 1390 .signer_manager()? 1391 .list_publish_workflows()? 1392 .is_empty() 1393 ); 1394 1395 let request_two = build_request_event( 1396 &client_identity, 1397 signer_public_key, 1398 connect_request_message("connect-2", signer_public_key, "shared-secret"), 1399 base_created_at + 1, 1400 ); 1401 publish_event(relay.url(), &request_two).await?; 1402 1403 let response_events = relay 1404 .wait_for_published_events_by_author(signer_public_key, 1) 1405 .await?; 1406 let response = decrypt_response(&client_identity, signer_public_key, &response_events[0]); 1407 assert_eq!(response.id, "connect-2"); 1408 assert_eq!( 1409 response.result, 1410 Some(serde_json::Value::String("shared-secret".to_owned())) 1411 ); 1412 1413 wait_for_connect_secret_consumed(&runtime).await?; 1414 let consumed_connection = runtime 1415 .signer_manager()? 1416 .list_connections()? 1417 .into_iter() 1418 .next() 1419 .expect("stored connection"); 1420 assert!(consumed_connection.connect_secret_is_consumed()); 1421 let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| { 1422 records.len() >= 2 && records[1].status == MycDeliveryOutboxStatus::Finalized 1423 }) 1424 .await?; 1425 assert_eq!( 1426 outbox_records[1].kind, 1427 MycDeliveryOutboxKind::ListenerResponsePublish 1428 ); 1429 assert_eq!(outbox_records[1].status, MycDeliveryOutboxStatus::Finalized); 1430 assert_eq!(outbox_records[1].request_id.as_deref(), Some("connect-2")); 1431 assert!(outbox_records[1].published_at_unix.is_some()); 1432 assert!(outbox_records[1].finalized_at_unix.is_some()); 1433 assert!(outbox_records[1].signer_publish_workflow_id.is_some()); 1434 assert!( 1435 runtime 1436 .signer_manager()? 1437 .list_publish_workflows()? 1438 .is_empty() 1439 ); 1440 1441 let request_three = build_request_event( 1442 &client_identity, 1443 signer_public_key, 1444 connect_request_message("connect-3", signer_public_key, "shared-secret"), 1445 base_created_at + 2, 1446 ); 1447 publish_event(relay.url(), &request_three).await?; 1448 sleep(Duration::from_millis(300)).await; 1449 1450 assert_eq!( 1451 relay 1452 .published_events_by_author(signer_public_key) 1453 .await 1454 .len(), 1455 1 1456 ); 1457 let operation_audit = runtime.operation_audit_store().list()?; 1458 assert_eq!(operation_audit.len(), 2); 1459 assert_eq!( 1460 operation_audit[1].operation, 1461 MycOperationAuditKind::ListenerResponsePublish 1462 ); 1463 assert_eq!( 1464 operation_audit[1].outcome, 1465 MycOperationAuditOutcome::Succeeded 1466 ); 1467 assert_eq!(operation_audit[1].request_id.as_deref(), Some("connect-2")); 1468 1469 let _ = shutdown_tx.send(()); 1470 listener_task.await??; 1471 Ok(()) 1472 } 1473 1474 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 1475 async fn live_listener_works_with_sqlite_signer_state_and_runtime_audit() -> TestResult<()> { 1476 let relay = TestRelay::spawn().await?; 1477 let test_runtime = MycTestRuntime::new_with_transport_config( 1478 &[relay.url()], 1479 MycConnectionApproval::NotRequired, 1480 |config| { 1481 config.persistence.signer_state_backend = MycSignerStateBackend::Sqlite; 1482 config.persistence.runtime_audit_backend = MycRuntimeAuditBackend::Sqlite; 1483 }, 1484 ); 1485 let runtime = test_runtime.runtime.clone(); 1486 let signer_public_key = runtime.signer_identity().public_key(); 1487 let client_identity = 1488 identity("5353535353535353535353535353535353535353535353535353535353535353"); 1489 let base_created_at = Timestamp::now().as_secs(); 1490 1491 assert_eq!( 1492 runtime 1493 .paths() 1494 .signer_state_path 1495 .file_name() 1496 .and_then(|name| name.to_str()), 1497 Some("signer-state.sqlite") 1498 ); 1499 assert_eq!( 1500 runtime 1501 .paths() 1502 .runtime_audit_path 1503 .file_name() 1504 .and_then(|name| name.to_str()), 1505 Some("operations.sqlite") 1506 ); 1507 1508 relay 1509 .queue_publish_outcomes(signer_public_key, &[false, true]) 1510 .await; 1511 1512 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); 1513 let service_runtime = runtime.clone(); 1514 let listener_task = tokio::spawn(async move { 1515 service_runtime 1516 .run_until(async { 1517 let _ = shutdown_rx.await; 1518 }) 1519 .await 1520 }); 1521 1522 relay.wait_for_subscription_count(1).await?; 1523 1524 let request_one = build_request_event( 1525 &client_identity, 1526 signer_public_key, 1527 connect_request_message("sqlite-connect-1", signer_public_key, "sqlite-secret"), 1528 base_created_at, 1529 ); 1530 publish_event(relay.url(), &request_one).await?; 1531 wait_for_connection_count(&runtime, 1).await?; 1532 sleep(Duration::from_millis(100)).await; 1533 1534 assert!( 1535 relay 1536 .published_events_by_author(signer_public_key) 1537 .await 1538 .is_empty() 1539 ); 1540 let initial_connection = runtime 1541 .signer_manager()? 1542 .list_connections()? 1543 .into_iter() 1544 .next() 1545 .expect("stored connection"); 1546 assert!(!initial_connection.connect_secret_is_consumed()); 1547 1548 let request_two = build_request_event( 1549 &client_identity, 1550 signer_public_key, 1551 connect_request_message("sqlite-connect-2", signer_public_key, "sqlite-secret"), 1552 base_created_at + 1, 1553 ); 1554 publish_event(relay.url(), &request_two).await?; 1555 1556 let response_events = relay 1557 .wait_for_published_events_by_author(signer_public_key, 1) 1558 .await?; 1559 let response = decrypt_response(&client_identity, signer_public_key, &response_events[0]); 1560 assert_eq!(response.id, "sqlite-connect-2"); 1561 assert_eq!( 1562 response.result, 1563 Some(serde_json::Value::String("sqlite-secret".to_owned())) 1564 ); 1565 1566 wait_for_connect_secret_consumed(&runtime).await?; 1567 let consumed_connection = runtime 1568 .signer_manager()? 1569 .list_connections()? 1570 .into_iter() 1571 .next() 1572 .expect("stored connection"); 1573 assert!(consumed_connection.connect_secret_is_consumed()); 1574 let operation_audit = wait_for_operation_audit_count(&runtime, 2).await?; 1575 assert_eq!( 1576 operation_audit 1577 .iter() 1578 .filter(|record| record.outcome == MycOperationAuditOutcome::Rejected) 1579 .count(), 1580 1 1581 ); 1582 assert_eq!( 1583 operation_audit 1584 .iter() 1585 .filter(|record| record.outcome == MycOperationAuditOutcome::Succeeded) 1586 .count(), 1587 1 1588 ); 1589 let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| { 1590 records.len() >= 2 && records[1].status == MycDeliveryOutboxStatus::Finalized 1591 }) 1592 .await?; 1593 assert_eq!(outbox_records[0].status, MycDeliveryOutboxStatus::Failed); 1594 assert_eq!(outbox_records[1].status, MycDeliveryOutboxStatus::Finalized); 1595 1596 let restarted_runtime = MycRuntime::bootstrap(runtime.config().clone())?; 1597 assert_eq!( 1598 restarted_runtime 1599 .signer_manager()? 1600 .list_connections()? 1601 .len(), 1602 1 1603 ); 1604 assert_eq!( 1605 restarted_runtime.operation_audit_store().list_all()?.len(), 1606 2 1607 ); 1608 let restarted_outbox = restarted_runtime.delivery_outbox_store().list_all()?; 1609 assert_eq!(restarted_outbox.len(), 2); 1610 assert_eq!(restarted_outbox[0].status, MycDeliveryOutboxStatus::Failed); 1611 assert_eq!( 1612 restarted_outbox[1].status, 1613 MycDeliveryOutboxStatus::Finalized 1614 ); 1615 assert_eq!( 1616 restarted_outbox[0].request_id.as_deref(), 1617 Some("sqlite-connect-1") 1618 ); 1619 assert_eq!( 1620 restarted_outbox[1].request_id.as_deref(), 1621 Some("sqlite-connect-2") 1622 ); 1623 assert!(restarted_outbox[0].signer_publish_workflow_id.is_some()); 1624 assert!(restarted_outbox[1].signer_publish_workflow_id.is_some()); 1625 assert!( 1626 restarted_runtime 1627 .signer_manager()? 1628 .list_publish_workflows()? 1629 .is_empty() 1630 ); 1631 assert!( 1632 restarted_runtime 1633 .signer_manager()? 1634 .list_connections()? 1635 .into_iter() 1636 .next() 1637 .expect("persisted connection") 1638 .connect_secret_is_consumed() 1639 ); 1640 1641 let _ = shutdown_tx.send(()); 1642 listener_task.await??; 1643 Ok(()) 1644 } 1645 1646 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 1647 async fn external_nostr_client_recovers_connect_response_after_restart() -> TestResult<()> { 1648 let relay = TestRelay::spawn().await?; 1649 let test_runtime = MycTestRuntime::new(relay.url(), MycConnectionApproval::NotRequired); 1650 let MycTestRuntime { 1651 _temp: _tempdir, 1652 runtime, 1653 } = test_runtime; 1654 let config = runtime.config().clone(); 1655 let signer_public_key = runtime.signer_identity().public_key(); 1656 let user_public_key = runtime.user_identity().public_key(); 1657 let client_identity = 1658 identity("5757575757575757575757575757575757575757575757575757575757575757"); 1659 let base_created_at = Timestamp::now().as_secs(); 1660 let connect_request_id = "external-recovery-connect"; 1661 let connect_request = ExternalNostrConnectRequest::Connect { 1662 remote_signer_public_key: signer_public_key, 1663 secret: None, 1664 }; 1665 let request_message = build_external_request_message(connect_request_id, &connect_request); 1666 let request_event = build_external_request_event( 1667 &client_identity, 1668 signer_public_key, 1669 &request_message, 1670 base_created_at, 1671 ); 1672 publish_event(relay.url(), &request_event).await?; 1673 1674 let relay_url: RadrootsNostrRelayUrl = relay.url().parse()?; 1675 let manager = runtime.signer_manager()?; 1676 let connection = manager.register_connection( 1677 RadrootsNostrSignerConnectionDraft::new( 1678 client_identity.public_key(), 1679 runtime.user_public_identity(), 1680 ) 1681 .with_relays(vec![relay_url.clone()]) 1682 .with_approval_requirement(RadrootsNostrSignerApprovalRequirement::NotRequired), 1683 )?; 1684 let response_envelope = 1685 RadrootsNostrConnectResponse::ConnectAcknowledged.into_envelope(connect_request_id)?; 1686 let response_payload = serde_json::to_string(&response_envelope)?; 1687 let signer_identity = 1688 identity("1111111111111111111111111111111111111111111111111111111111111111"); 1689 let response_ciphertext = nip44::encrypt( 1690 signer_identity.keys().secret_key(), 1691 &client_identity.public_key(), 1692 response_payload, 1693 Version::V2, 1694 )?; 1695 let response_event = runtime.signer_identity().sign_event_builder( 1696 RadrootsNostrEventBuilder::new( 1697 RadrootsNostrKind::Custom(RADROOTS_NOSTR_CONNECT_RPC_KIND), 1698 response_ciphertext, 1699 ) 1700 .tags(vec![RadrootsNostrTag::public_key( 1701 client_identity.public_key(), 1702 )]), 1703 "external recovery queued connect response", 1704 )?; 1705 let queued_record = MycDeliveryOutboxRecord::new( 1706 MycDeliveryOutboxKind::ListenerResponsePublish, 1707 response_event, 1708 vec![relay_url], 1709 )? 1710 .with_connection_id(&connection.connection_id) 1711 .with_request_id(connect_request_id); 1712 runtime.delivery_outbox_store().enqueue(&queued_record)?; 1713 assert_eq!( 1714 queued_record.kind, 1715 MycDeliveryOutboxKind::ListenerResponsePublish 1716 ); 1717 1718 let restarted_runtime = MycRuntime::bootstrap(config.clone())?; 1719 let persisted_queued_record = restarted_runtime 1720 .delivery_outbox_store() 1721 .list_all()? 1722 .into_iter() 1723 .find(|record| record.request_id.as_deref() == Some(connect_request_id)) 1724 .expect("persisted queued external connect record"); 1725 assert_eq!( 1726 persisted_queued_record.status, 1727 MycDeliveryOutboxStatus::Queued 1728 ); 1729 1730 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); 1731 let service_runtime = restarted_runtime.clone(); 1732 let restarted_listener_task = tokio::spawn(async move { 1733 service_runtime 1734 .run_until(async { 1735 let _ = shutdown_rx.await; 1736 }) 1737 .await 1738 }); 1739 1740 let (_, connect_response) = wait_for_external_response( 1741 &relay, 1742 &client_identity, 1743 signer_public_key, 1744 connect_request_id, 1745 ExternalNostrConnectMethod::Connect, 1746 ) 1747 .await?; 1748 assert_eq!(connect_response.result, Some(ExternalResponseResult::Ack)); 1749 assert_eq!(connect_response.error, None); 1750 1751 let (_, get_public_key_response) = publish_external_request_and_wait_for_response( 1752 &relay, 1753 &client_identity, 1754 signer_public_key, 1755 "external-recovery-get-public-key", 1756 ExternalNostrConnectRequest::GetPublicKey, 1757 base_created_at + 1, 1758 ) 1759 .await?; 1760 assert_eq!( 1761 get_public_key_response.result, 1762 Some(ExternalResponseResult::GetPublicKey(user_public_key)) 1763 ); 1764 assert_eq!(get_public_key_response.error, None); 1765 1766 let _ = shutdown_tx.send(()); 1767 restarted_listener_task.await??; 1768 1769 let finalized_runtime = MycRuntime::bootstrap(config)?; 1770 let finalized_record = finalized_runtime 1771 .delivery_outbox_store() 1772 .list_all()? 1773 .into_iter() 1774 .find(|record| record.request_id.as_deref() == Some(connect_request_id)) 1775 .expect("finalized external connect recovery record"); 1776 assert_eq!(finalized_record.status, MycDeliveryOutboxStatus::Finalized); 1777 assert!(finalized_record.published_at_unix.is_some()); 1778 assert!(finalized_record.finalized_at_unix.is_some()); 1779 1780 Ok(()) 1781 } 1782 1783 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 1784 async fn startup_recovery_republishes_queued_listener_connect_secret_job() -> TestResult<()> { 1785 let relay = TestRelay::spawn().await?; 1786 let test_runtime = MycTestRuntime::new_with_transport_relays( 1787 &[relay.url()], 1788 MycConnectionApproval::NotRequired, 1789 ); 1790 let MycTestRuntime { 1791 _temp: _tempdir, 1792 runtime, 1793 } = test_runtime; 1794 let signer_public_key = runtime.signer_identity().public_key(); 1795 let config = runtime.config().clone(); 1796 let client_identity = 1797 identity("5454545454545454545454545454545454545454545454545454545454545454"); 1798 let relay_url: RadrootsNostrRelayUrl = relay.url().parse()?; 1799 1800 let manager = runtime.signer_manager()?; 1801 let connection = manager.register_connection( 1802 RadrootsNostrSignerConnectionDraft::new( 1803 client_identity.public_key(), 1804 runtime.user_public_identity(), 1805 ) 1806 .with_connect_secret("startup-recovery-secret") 1807 .with_relays(vec![relay_url.clone()]) 1808 .with_approval_requirement(RadrootsNostrSignerApprovalRequirement::NotRequired), 1809 )?; 1810 let workflow = manager.begin_connect_secret_publish_finalization(&connection.connection_id)?; 1811 let event = runtime 1812 .signer_identity() 1813 .sign_event_builder( 1814 RadrootsNostrEventBuilder::new( 1815 RadrootsNostrKind::Custom(RADROOTS_NOSTR_CONNECT_RPC_KIND), 1816 "startup-recovery", 1817 ), 1818 "startup recovery", 1819 ) 1820 .map_err(|error| format!("failed to sign startup recovery event: {error}"))?; 1821 let outbox_record = MycDeliveryOutboxRecord::new( 1822 MycDeliveryOutboxKind::ListenerResponsePublish, 1823 event, 1824 vec![relay_url], 1825 )? 1826 .with_connection_id(&connection.connection_id) 1827 .with_request_id("startup-recovery-connect") 1828 .with_signer_publish_workflow_id(&workflow.workflow_id); 1829 runtime.delivery_outbox_store().enqueue(&outbox_record)?; 1830 1831 runtime.run_until(async {}).await?; 1832 1833 let published = relay 1834 .wait_for_published_events_by_author(signer_public_key, 1) 1835 .await?; 1836 assert_eq!(published.len(), 1); 1837 1838 let restarted_runtime = MycRuntime::bootstrap(config)?; 1839 let recovered_connection = restarted_runtime 1840 .signer_manager()? 1841 .get_connection(&connection.connection_id)? 1842 .expect("persisted connection"); 1843 assert!(recovered_connection.connect_secret_is_consumed()); 1844 assert!( 1845 restarted_runtime 1846 .signer_manager()? 1847 .list_publish_workflows()? 1848 .is_empty() 1849 ); 1850 let outbox_records = restarted_runtime.delivery_outbox_store().list_all()?; 1851 assert_eq!(outbox_records.len(), 1); 1852 assert_eq!(outbox_records[0].status, MycDeliveryOutboxStatus::Finalized); 1853 assert_eq!( 1854 outbox_records[0].request_id.as_deref(), 1855 Some("startup-recovery-connect") 1856 ); 1857 assert!(outbox_records[0].published_at_unix.is_some()); 1858 assert!(outbox_records[0].finalized_at_unix.is_some()); 1859 let audit_records = restarted_runtime.operation_audit_store().list_all()?; 1860 assert_eq!(audit_records.len(), 2); 1861 assert_eq!( 1862 audit_records[0].operation, 1863 MycOperationAuditKind::ListenerResponsePublish 1864 ); 1865 assert_eq!( 1866 audit_records[0].outcome, 1867 MycOperationAuditOutcome::Succeeded 1868 ); 1869 assert_eq!( 1870 audit_records[0].request_id.as_deref(), 1871 Some("startup-recovery-connect") 1872 ); 1873 assert_eq!( 1874 audit_records[1].operation, 1875 MycOperationAuditKind::DeliveryRecovery 1876 ); 1877 assert_eq!( 1878 audit_records[1].outcome, 1879 MycOperationAuditOutcome::Succeeded 1880 ); 1881 1882 Ok(()) 1883 } 1884 1885 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 1886 async fn startup_recovery_republishes_queued_connect_accept_job() -> TestResult<()> { 1887 let relay = TestRelay::spawn().await?; 1888 let test_runtime = MycTestRuntime::new(relay.url(), MycConnectionApproval::NotRequired); 1889 let MycTestRuntime { 1890 _temp: _tempdir, 1891 runtime, 1892 } = test_runtime; 1893 let signer_public_key = runtime.signer_identity().public_key(); 1894 let config = runtime.config().clone(); 1895 let relay_url: RadrootsNostrRelayUrl = relay.url().parse()?; 1896 let client_identity = 1897 identity("4343434343434343434343434343434343434343434343434343434343434343"); 1898 1899 let manager = runtime.signer_manager()?; 1900 let connection = manager.register_connection( 1901 RadrootsNostrSignerConnectionDraft::new( 1902 client_identity.public_key(), 1903 runtime.user_public_identity(), 1904 ) 1905 .with_connect_secret("startup-connect-accept-secret") 1906 .with_relays(vec![relay_url.clone()]) 1907 .with_approval_requirement(RadrootsNostrSignerApprovalRequirement::NotRequired), 1908 )?; 1909 let workflow = manager.begin_connect_secret_publish_finalization(&connection.connection_id)?; 1910 let event = runtime 1911 .signer_identity() 1912 .sign_event_builder( 1913 RadrootsNostrEventBuilder::new( 1914 RadrootsNostrKind::Custom(RADROOTS_NOSTR_CONNECT_RPC_KIND), 1915 "startup-recovery-connect-accept", 1916 ), 1917 "startup recovery connect accept", 1918 ) 1919 .map_err(|error| { 1920 format!("failed to sign startup recovery connect-accept event: {error}") 1921 })?; 1922 let outbox_record = MycDeliveryOutboxRecord::new( 1923 MycDeliveryOutboxKind::ConnectAcceptPublish, 1924 event, 1925 vec![relay_url], 1926 )? 1927 .with_connection_id(&connection.connection_id) 1928 .with_request_id("startup-recovery-connect-accept") 1929 .with_signer_publish_workflow_id(&workflow.workflow_id); 1930 runtime.delivery_outbox_store().enqueue(&outbox_record)?; 1931 1932 runtime.run_until(async {}).await?; 1933 1934 let published = relay 1935 .wait_for_published_events_by_author(signer_public_key, 1) 1936 .await?; 1937 assert_eq!(published.len(), 1); 1938 1939 let restarted_runtime = MycRuntime::bootstrap(config)?; 1940 let recovered_connection = restarted_runtime 1941 .signer_manager()? 1942 .get_connection(&connection.connection_id)? 1943 .expect("persisted connection"); 1944 assert!(recovered_connection.connect_secret_is_consumed()); 1945 assert!( 1946 restarted_runtime 1947 .signer_manager()? 1948 .list_publish_workflows()? 1949 .is_empty() 1950 ); 1951 let outbox_records = restarted_runtime.delivery_outbox_store().list_all()?; 1952 assert_eq!(outbox_records.len(), 1); 1953 assert_eq!(outbox_records[0].status, MycDeliveryOutboxStatus::Finalized); 1954 assert_eq!( 1955 outbox_records[0].request_id.as_deref(), 1956 Some("startup-recovery-connect-accept") 1957 ); 1958 assert!(outbox_records[0].published_at_unix.is_some()); 1959 assert!(outbox_records[0].finalized_at_unix.is_some()); 1960 let audit_records = restarted_runtime.operation_audit_store().list_all()?; 1961 assert_eq!(audit_records.len(), 2); 1962 assert_eq!( 1963 audit_records[0].operation, 1964 MycOperationAuditKind::ConnectAcceptPublish 1965 ); 1966 assert_eq!( 1967 audit_records[0].outcome, 1968 MycOperationAuditOutcome::Succeeded 1969 ); 1970 assert_eq!( 1971 audit_records[0].request_id.as_deref(), 1972 Some("startup-recovery-connect-accept") 1973 ); 1974 assert_eq!( 1975 audit_records[1].operation, 1976 MycOperationAuditKind::DeliveryRecovery 1977 ); 1978 assert_eq!( 1979 audit_records[1].outcome, 1980 MycOperationAuditOutcome::Succeeded 1981 ); 1982 1983 Ok(()) 1984 } 1985 1986 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 1987 async fn startup_recovery_republishes_queued_auth_replay_job() -> TestResult<()> { 1988 let relay = TestRelay::spawn().await?; 1989 let test_runtime = MycTestRuntime::new(relay.url(), MycConnectionApproval::ExplicitUser); 1990 let MycTestRuntime { 1991 _temp: _tempdir, 1992 runtime, 1993 } = test_runtime; 1994 let signer_public_key = runtime.signer_identity().public_key(); 1995 let config = runtime.config().clone(); 1996 let relay_url: RadrootsNostrRelayUrl = relay.url().parse()?; 1997 let client_identity = 1998 identity("5353535353535353535353535353535353535353535353535353535353535353"); 1999 2000 let manager = runtime.signer_manager()?; 2001 let connection = manager.register_connection( 2002 RadrootsNostrSignerConnectionDraft::new( 2003 client_identity.public_key(), 2004 runtime.user_public_identity(), 2005 ) 2006 .with_relays(vec![relay_url.clone()]) 2007 .with_approval_requirement(RadrootsNostrSignerApprovalRequirement::ExplicitUser), 2008 )?; 2009 let _ = manager.require_auth_challenge(&connection.connection_id, "https://auth.example")?; 2010 let _ = manager.set_pending_request( 2011 &connection.connection_id, 2012 ping_request_message("startup-recovery-auth"), 2013 )?; 2014 let workflow = manager.begin_auth_replay_publish_finalization(&connection.connection_id)?; 2015 let event = runtime 2016 .signer_identity() 2017 .sign_event_builder( 2018 RadrootsNostrEventBuilder::new( 2019 RadrootsNostrKind::Custom(RADROOTS_NOSTR_CONNECT_RPC_KIND), 2020 "startup-recovery-auth-replay", 2021 ), 2022 "startup recovery auth replay", 2023 ) 2024 .map_err(|error| format!("failed to sign startup recovery auth-replay event: {error}"))?; 2025 let outbox_record = MycDeliveryOutboxRecord::new( 2026 MycDeliveryOutboxKind::AuthReplayPublish, 2027 event, 2028 vec![relay_url], 2029 )? 2030 .with_connection_id(&connection.connection_id) 2031 .with_request_id("startup-recovery-auth") 2032 .with_signer_publish_workflow_id(&workflow.workflow_id); 2033 runtime.delivery_outbox_store().enqueue(&outbox_record)?; 2034 2035 runtime.run_until(async {}).await?; 2036 2037 let published = relay 2038 .wait_for_published_events_by_author(signer_public_key, 1) 2039 .await?; 2040 assert_eq!(published.len(), 1); 2041 2042 let restarted_runtime = MycRuntime::bootstrap(config)?; 2043 let recovered_connection = restarted_runtime 2044 .signer_manager()? 2045 .get_connection(&connection.connection_id)? 2046 .expect("persisted connection"); 2047 assert_eq!( 2048 recovered_connection.auth_state, 2049 RadrootsNostrSignerAuthState::Authorized 2050 ); 2051 assert!(recovered_connection.pending_request.is_none()); 2052 assert!(recovered_connection.last_authenticated_at_unix.is_some()); 2053 assert!( 2054 restarted_runtime 2055 .signer_manager()? 2056 .list_publish_workflows()? 2057 .is_empty() 2058 ); 2059 let outbox_records = restarted_runtime.delivery_outbox_store().list_all()?; 2060 assert_eq!(outbox_records.len(), 1); 2061 assert_eq!(outbox_records[0].status, MycDeliveryOutboxStatus::Finalized); 2062 assert_eq!( 2063 outbox_records[0].request_id.as_deref(), 2064 Some("startup-recovery-auth") 2065 ); 2066 assert!(outbox_records[0].published_at_unix.is_some()); 2067 assert!(outbox_records[0].finalized_at_unix.is_some()); 2068 let audit_records = restarted_runtime.operation_audit_store().list_all()?; 2069 assert_eq!(audit_records.len(), 2); 2070 assert_eq!( 2071 audit_records[0].operation, 2072 MycOperationAuditKind::AuthReplayPublish 2073 ); 2074 assert_eq!( 2075 audit_records[0].outcome, 2076 MycOperationAuditOutcome::Succeeded 2077 ); 2078 assert_eq!( 2079 audit_records[0].request_id.as_deref(), 2080 Some("startup-recovery-auth") 2081 ); 2082 assert_eq!( 2083 audit_records[1].operation, 2084 MycOperationAuditKind::DeliveryRecovery 2085 ); 2086 assert_eq!( 2087 audit_records[1].outcome, 2088 MycOperationAuditOutcome::Succeeded 2089 ); 2090 2091 Ok(()) 2092 } 2093 2094 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 2095 async fn trusted_client_reauths_after_authorized_ttl() -> TestResult<()> { 2096 let relay = TestRelay::spawn().await?; 2097 let client_identity = 2098 identity("7878787878787878787878787878787878787878787878787878787878787878"); 2099 let test_runtime = MycTestRuntime::new_with_transport_config( 2100 &[relay.url()], 2101 MycConnectionApproval::ExplicitUser, 2102 |config| { 2103 config.policy.trusted_client_pubkeys = vec![client_identity.public_key().to_hex()]; 2104 config.policy.permission_ceiling = "sign_event:1".parse().expect("permission ceiling"); 2105 config.policy.allowed_sign_event_kinds = vec![1]; 2106 config.policy.auth_url = Some("https://auth.example/challenge".to_owned()); 2107 config.policy.auth_authorized_ttl_secs = Some(1); 2108 }, 2109 ); 2110 let runtime = test_runtime.runtime.clone(); 2111 let signer_public_key = runtime.signer_identity().public_key(); 2112 2113 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); 2114 let service_runtime = runtime.clone(); 2115 let listener_task = tokio::spawn(async move { 2116 service_runtime 2117 .run_until(async { 2118 let _ = shutdown_rx.await; 2119 }) 2120 .await 2121 }); 2122 2123 relay.wait_for_subscription_count(1).await?; 2124 2125 let connect_request = build_request_event( 2126 &client_identity, 2127 signer_public_key, 2128 RadrootsNostrConnectRequestMessage::new( 2129 "trusted-connect", 2130 RadrootsNostrConnectRequest::Connect { 2131 remote_signer_public_key: signer_public_key, 2132 secret: None, 2133 requested_permissions: "sign_event:1".parse().expect("requested permissions"), 2134 }, 2135 ), 2136 Timestamp::now().as_secs(), 2137 ); 2138 publish_event(relay.url(), &connect_request).await?; 2139 let response_events = relay 2140 .wait_for_published_events_by_author(signer_public_key, 1) 2141 .await?; 2142 let connect_response = 2143 decrypt_response(&client_identity, signer_public_key, &response_events[0]); 2144 let connect_parsed = 2145 radroots_nostr_connect::prelude::RadrootsNostrConnectResponse::from_envelope( 2146 &RadrootsNostrConnectRequest::Connect { 2147 remote_signer_public_key: signer_public_key, 2148 secret: None, 2149 requested_permissions: "sign_event:1".parse().expect("requested permissions"), 2150 } 2151 .method(), 2152 connect_response, 2153 )?; 2154 assert_eq!( 2155 connect_parsed, 2156 radroots_nostr_connect::prelude::RadrootsNostrConnectResponse::ConnectAcknowledged 2157 ); 2158 2159 let sign_request = |request_id: &str, created_at_unix| { 2160 build_request_event( 2161 &client_identity, 2162 signer_public_key, 2163 RadrootsNostrConnectRequestMessage::new( 2164 request_id, 2165 RadrootsNostrConnectRequest::SignEvent( 2166 serde_json::from_value(serde_json::json!({ 2167 "pubkey": runtime.user_identity().public_key().to_hex(), 2168 "created_at": created_at_unix, 2169 "kind": 1, 2170 "tags": [], 2171 "content": request_id 2172 })) 2173 .expect("unsigned event"), 2174 ), 2175 ), 2176 created_at_unix, 2177 ) 2178 }; 2179 2180 publish_event( 2181 relay.url(), 2182 &sign_request("trusted-sign-1", Timestamp::now().as_secs()), 2183 ) 2184 .await?; 2185 let response_events = relay 2186 .wait_for_published_events_by_author(signer_public_key, 2) 2187 .await?; 2188 let first_auth = decrypt_response(&client_identity, signer_public_key, &response_events[1]); 2189 let first_auth = radroots_nostr_connect::prelude::RadrootsNostrConnectResponse::from_envelope( 2190 &RadrootsNostrConnectRequest::SignEvent( 2191 serde_json::from_value(serde_json::json!({ 2192 "pubkey": runtime.user_identity().public_key().to_hex(), 2193 "created_at": Timestamp::from(1).as_secs(), 2194 "kind": 1, 2195 "tags": [], 2196 "content": "trusted-sign-1" 2197 })) 2198 .expect("unsigned event"), 2199 ) 2200 .method(), 2201 first_auth, 2202 )?; 2203 assert_eq!( 2204 first_auth, 2205 radroots_nostr_connect::prelude::RadrootsNostrConnectResponse::AuthUrl( 2206 "https://auth.example/challenge".to_owned() 2207 ) 2208 ); 2209 2210 let connection = runtime 2211 .signer_manager()? 2212 .list_connections()? 2213 .into_iter() 2214 .next() 2215 .expect("connection"); 2216 let replayed = control::authorize_auth_challenge(&runtime, &connection.connection_id).await?; 2217 assert_eq!( 2218 replayed.replayed_request_id.as_deref(), 2219 Some("trusted-sign-1") 2220 ); 2221 2222 let response_events = relay 2223 .wait_for_published_events_by_author(signer_public_key, 3) 2224 .await?; 2225 let replay_response = 2226 decrypt_response(&client_identity, signer_public_key, &response_events[2]); 2227 let replay_parsed = 2228 radroots_nostr_connect::prelude::RadrootsNostrConnectResponse::from_envelope( 2229 &RadrootsNostrConnectRequest::SignEvent( 2230 serde_json::from_value(serde_json::json!({ 2231 "pubkey": runtime.user_identity().public_key().to_hex(), 2232 "created_at": Timestamp::from(1).as_secs(), 2233 "kind": 1, 2234 "tags": [], 2235 "content": "trusted-sign-1" 2236 })) 2237 .expect("unsigned event"), 2238 ) 2239 .method(), 2240 replay_response, 2241 )?; 2242 assert!(matches!( 2243 replay_parsed, 2244 radroots_nostr_connect::prelude::RadrootsNostrConnectResponse::SignedEvent(_) 2245 )); 2246 2247 sleep(Duration::from_secs(2)).await; 2248 2249 publish_event( 2250 relay.url(), 2251 &sign_request("trusted-sign-2", Timestamp::now().as_secs()), 2252 ) 2253 .await?; 2254 let response_events = relay 2255 .wait_for_published_events_by_author(signer_public_key, 4) 2256 .await?; 2257 let second_auth = decrypt_response(&client_identity, signer_public_key, &response_events[3]); 2258 let second_auth = radroots_nostr_connect::prelude::RadrootsNostrConnectResponse::from_envelope( 2259 &RadrootsNostrConnectRequest::SignEvent( 2260 serde_json::from_value(serde_json::json!({ 2261 "pubkey": runtime.user_identity().public_key().to_hex(), 2262 "created_at": Timestamp::from(1).as_secs(), 2263 "kind": 1, 2264 "tags": [], 2265 "content": "trusted-sign-2" 2266 })) 2267 .expect("unsigned event"), 2268 ) 2269 .method(), 2270 second_auth, 2271 )?; 2272 assert_eq!( 2273 second_auth, 2274 radroots_nostr_connect::prelude::RadrootsNostrConnectResponse::AuthUrl( 2275 "https://auth.example/challenge".to_owned() 2276 ) 2277 ); 2278 2279 let _ = shutdown_tx.send(()); 2280 listener_task.await??; 2281 Ok(()) 2282 } 2283 2284 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 2285 async fn connect_accept_retries_without_consuming_secret_until_publish_succeeds() -> TestResult<()> 2286 { 2287 let relay = TestRelay::spawn().await?; 2288 let test_runtime = MycTestRuntime::new(relay.url(), MycConnectionApproval::NotRequired); 2289 let runtime = test_runtime.runtime; 2290 let signer_public_key = runtime.signer_identity().public_key(); 2291 let client_identity = 2292 identity("4444444444444444444444444444444444444444444444444444444444444444"); 2293 2294 relay 2295 .queue_publish_outcomes(signer_public_key, &[false, true]) 2296 .await; 2297 2298 let client_uri = RadrootsNostrConnectUri::Client(RadrootsNostrConnectClientUri { 2299 client_public_key: client_identity.public_key(), 2300 relays: vec![nostr::RelayUrl::parse(relay.url())?], 2301 secret: "client-secret".to_owned(), 2302 metadata: RadrootsNostrConnectClientMetadata::default(), 2303 }) 2304 .to_string(); 2305 2306 let failed = control::accept_client_uri(&runtime, &client_uri) 2307 .await 2308 .expect_err("first publish should fail"); 2309 assert!(failed.to_string().contains("Nostr publish failed")); 2310 2311 let stored_after_failure = runtime 2312 .signer_manager()? 2313 .list_connections()? 2314 .into_iter() 2315 .next() 2316 .expect("stored connection"); 2317 assert!(!stored_after_failure.connect_secret_is_consumed()); 2318 let operation_audit = wait_for_operation_audit_count(&runtime, 1).await?; 2319 assert_eq!( 2320 operation_audit[0].operation, 2321 MycOperationAuditKind::ConnectAcceptPublish 2322 ); 2323 assert_eq!( 2324 operation_audit[0].outcome, 2325 MycOperationAuditOutcome::Rejected 2326 ); 2327 assert_eq!( 2328 operation_audit[0].connection_id.as_deref(), 2329 Some(stored_after_failure.connection_id.as_str()) 2330 ); 2331 assert!(operation_audit[0].request_id.is_some()); 2332 assert_eq!(operation_audit[0].relay_count, 1); 2333 assert_eq!(operation_audit[0].acknowledged_relay_count, 0); 2334 assert!( 2335 operation_audit[0] 2336 .relay_outcome_summary 2337 .contains("blocked by test relay") 2338 ); 2339 let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| { 2340 records.len() >= 1 && records[0].status == MycDeliveryOutboxStatus::Failed 2341 }) 2342 .await?; 2343 assert_eq!( 2344 outbox_records[0].kind, 2345 MycDeliveryOutboxKind::ConnectAcceptPublish 2346 ); 2347 assert_eq!( 2348 outbox_records[0].request_id.as_deref(), 2349 operation_audit[0].request_id.as_deref() 2350 ); 2351 assert!(outbox_records[0].signer_publish_workflow_id.is_some()); 2352 assert!( 2353 runtime 2354 .signer_manager()? 2355 .list_publish_workflows()? 2356 .is_empty() 2357 ); 2358 2359 let accepted = control::accept_client_uri(&runtime, &client_uri).await?; 2360 assert_eq!(accepted.response_request_id.len(), 36); 2361 2362 let response_events = relay 2363 .wait_for_published_events_by_author(signer_public_key, 1) 2364 .await?; 2365 let response = decrypt_response(&client_identity, signer_public_key, &response_events[0]); 2366 assert_eq!(response.id, accepted.response_request_id); 2367 assert_eq!( 2368 response.result, 2369 Some(serde_json::Value::String("client-secret".to_owned())) 2370 ); 2371 2372 let stored_after_success = runtime 2373 .signer_manager()? 2374 .list_connections()? 2375 .into_iter() 2376 .next() 2377 .expect("stored connection"); 2378 assert!(stored_after_success.connect_secret_is_consumed()); 2379 let operation_audit = wait_for_operation_audit_count(&runtime, 2).await?; 2380 assert_eq!( 2381 operation_audit[1].operation, 2382 MycOperationAuditKind::ConnectAcceptPublish 2383 ); 2384 assert_eq!( 2385 operation_audit[1].outcome, 2386 MycOperationAuditOutcome::Succeeded 2387 ); 2388 assert_eq!( 2389 operation_audit[1].connection_id.as_deref(), 2390 Some(stored_after_success.connection_id.as_str()) 2391 ); 2392 assert_eq!( 2393 operation_audit[1].request_id.as_deref(), 2394 Some(accepted.response_request_id.as_str()) 2395 ); 2396 assert_eq!(operation_audit[1].relay_count, 1); 2397 assert_eq!(operation_audit[1].acknowledged_relay_count, 1); 2398 assert!( 2399 operation_audit[1] 2400 .relay_outcome_summary 2401 .contains("1/1 relays acknowledged publish") 2402 ); 2403 let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| { 2404 records.len() >= 2 && records[1].status == MycDeliveryOutboxStatus::Finalized 2405 }) 2406 .await?; 2407 assert_eq!( 2408 outbox_records[1].kind, 2409 MycDeliveryOutboxKind::ConnectAcceptPublish 2410 ); 2411 assert_eq!( 2412 outbox_records[1].request_id.as_deref(), 2413 Some(accepted.response_request_id.as_str()) 2414 ); 2415 assert!(outbox_records[1].published_at_unix.is_some()); 2416 assert!(outbox_records[1].finalized_at_unix.is_some()); 2417 assert!(outbox_records[1].signer_publish_workflow_id.is_some()); 2418 assert!( 2419 runtime 2420 .signer_manager()? 2421 .list_publish_workflows()? 2422 .is_empty() 2423 ); 2424 2425 let consumed = control::accept_client_uri(&runtime, &client_uri) 2426 .await 2427 .expect_err("consumed secret should be rejected"); 2428 assert!( 2429 consumed 2430 .to_string() 2431 .contains("connect secret has already been consumed") 2432 ); 2433 2434 Ok(()) 2435 } 2436 2437 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 2438 async fn connect_accept_succeeds_with_any_delivery_policy_when_one_relay_acknowledges() 2439 -> TestResult<()> { 2440 let relay_a = TestRelay::spawn().await?; 2441 let relay_b = TestRelay::spawn().await?; 2442 let test_runtime = MycTestRuntime::new_with_transport_config( 2443 &[relay_a.url(), relay_b.url()], 2444 MycConnectionApproval::NotRequired, 2445 |config| { 2446 config.transport.delivery_policy = MycTransportDeliveryPolicy::Any; 2447 config.transport.publish_max_attempts = 1; 2448 }, 2449 ); 2450 let runtime = test_runtime.runtime; 2451 let signer_public_key = runtime.signer_identity().public_key(); 2452 let client_identity = 2453 identity("5555555555555555555555555555555555555555555555555555555555555555"); 2454 2455 relay_a 2456 .queue_publish_outcomes(signer_public_key, &[false]) 2457 .await; 2458 relay_b 2459 .queue_publish_outcomes(signer_public_key, &[true]) 2460 .await; 2461 2462 let client_uri = RadrootsNostrConnectUri::Client(RadrootsNostrConnectClientUri { 2463 client_public_key: client_identity.public_key(), 2464 relays: vec![ 2465 nostr::RelayUrl::parse(relay_a.url())?, 2466 nostr::RelayUrl::parse(relay_b.url())?, 2467 ], 2468 secret: "delivery-any-secret".to_owned(), 2469 metadata: RadrootsNostrConnectClientMetadata::default(), 2470 }) 2471 .to_string(); 2472 2473 let accepted = control::accept_client_uri(&runtime, &client_uri).await?; 2474 assert_eq!(accepted.response_relays.len(), 2); 2475 let stored = runtime 2476 .signer_manager()? 2477 .list_connections()? 2478 .into_iter() 2479 .find(|connection| connection.connection_id == accepted.connection.connection_id) 2480 .expect("stored connection"); 2481 assert!(stored.connect_secret_is_consumed()); 2482 2483 let operation_audit = wait_for_operation_audit_count(&runtime, 1).await?; 2484 assert_eq!( 2485 operation_audit[0].operation, 2486 MycOperationAuditKind::ConnectAcceptPublish 2487 ); 2488 assert_eq!( 2489 operation_audit[0].outcome, 2490 MycOperationAuditOutcome::Succeeded 2491 ); 2492 assert_eq!(operation_audit[0].relay_count, 2); 2493 assert_eq!(operation_audit[0].acknowledged_relay_count, 1); 2494 assert_eq!( 2495 operation_audit[0].delivery_policy, 2496 Some(MycTransportDeliveryPolicy::Any) 2497 ); 2498 assert_eq!( 2499 operation_audit[0].required_acknowledged_relay_count, 2500 Some(1) 2501 ); 2502 assert_eq!(operation_audit[0].publish_attempt_count, Some(1)); 2503 assert!( 2504 operation_audit[0] 2505 .relay_outcome_summary 2506 .contains("delivery policy any") 2507 ); 2508 2509 Ok(()) 2510 } 2511 2512 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 2513 async fn connect_accept_rejects_when_quorum_delivery_policy_is_not_met() -> TestResult<()> { 2514 let relay_a = TestRelay::spawn().await?; 2515 let relay_b = TestRelay::spawn().await?; 2516 let test_runtime = MycTestRuntime::new_with_transport_config( 2517 &[relay_a.url(), relay_b.url()], 2518 MycConnectionApproval::NotRequired, 2519 |config| { 2520 config.transport.delivery_policy = MycTransportDeliveryPolicy::Quorum; 2521 config.transport.delivery_quorum = Some(2); 2522 config.transport.publish_max_attempts = 1; 2523 }, 2524 ); 2525 let runtime = test_runtime.runtime; 2526 let signer_public_key = runtime.signer_identity().public_key(); 2527 let client_identity = 2528 identity("6666666666666666666666666666666666666666666666666666666666666665"); 2529 2530 relay_a 2531 .queue_publish_outcomes(signer_public_key, &[true]) 2532 .await; 2533 relay_b 2534 .queue_publish_outcomes(signer_public_key, &[false]) 2535 .await; 2536 2537 let client_uri = RadrootsNostrConnectUri::Client(RadrootsNostrConnectClientUri { 2538 client_public_key: client_identity.public_key(), 2539 relays: vec![ 2540 nostr::RelayUrl::parse(relay_a.url())?, 2541 nostr::RelayUrl::parse(relay_b.url())?, 2542 ], 2543 secret: "delivery-quorum-secret".to_owned(), 2544 metadata: RadrootsNostrConnectClientMetadata::default(), 2545 }) 2546 .to_string(); 2547 2548 let error = control::accept_client_uri(&runtime, &client_uri) 2549 .await 2550 .expect_err("quorum publish should fail"); 2551 assert!( 2552 error 2553 .to_string() 2554 .contains("delivery policy quorum requiring 2 acknowledgements") 2555 ); 2556 assert_eq!( 2557 error.publish_delivery_policy(), 2558 Some(MycTransportDeliveryPolicy::Quorum) 2559 ); 2560 assert_eq!(error.publish_required_acknowledged_relay_count(), Some(2)); 2561 assert_eq!(error.publish_attempt_count(), Some(1)); 2562 2563 let stored = runtime 2564 .signer_manager()? 2565 .list_connections()? 2566 .into_iter() 2567 .next() 2568 .expect("stored connection"); 2569 assert!(!stored.connect_secret_is_consumed()); 2570 2571 let operation_audit = wait_for_operation_audit_count(&runtime, 1).await?; 2572 assert_eq!( 2573 operation_audit[0].operation, 2574 MycOperationAuditKind::ConnectAcceptPublish 2575 ); 2576 assert_eq!( 2577 operation_audit[0].outcome, 2578 MycOperationAuditOutcome::Rejected 2579 ); 2580 assert_eq!(operation_audit[0].relay_count, 2); 2581 assert_eq!(operation_audit[0].acknowledged_relay_count, 1); 2582 assert_eq!( 2583 operation_audit[0].delivery_policy, 2584 Some(MycTransportDeliveryPolicy::Quorum) 2585 ); 2586 assert_eq!( 2587 operation_audit[0].required_acknowledged_relay_count, 2588 Some(2) 2589 ); 2590 assert_eq!(operation_audit[0].publish_attempt_count, Some(1)); 2591 let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| { 2592 records.len() >= 1 && records[0].status == MycDeliveryOutboxStatus::Failed 2593 }) 2594 .await?; 2595 assert_eq!( 2596 outbox_records[0].kind, 2597 MycDeliveryOutboxKind::ConnectAcceptPublish 2598 ); 2599 assert!(outbox_records[0].signer_publish_workflow_id.is_some()); 2600 assert!( 2601 runtime 2602 .signer_manager()? 2603 .list_publish_workflows()? 2604 .is_empty() 2605 ); 2606 2607 Ok(()) 2608 } 2609 2610 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 2611 async fn live_listener_retries_until_all_delivery_policy_is_met() -> TestResult<()> { 2612 let relay_a = TestRelay::spawn().await?; 2613 let relay_b = TestRelay::spawn().await?; 2614 let test_runtime = MycTestRuntime::new_with_transport_config( 2615 &[relay_a.url(), relay_b.url()], 2616 MycConnectionApproval::NotRequired, 2617 |config| { 2618 config.transport.delivery_policy = MycTransportDeliveryPolicy::All; 2619 config.transport.publish_max_attempts = 2; 2620 config.transport.publish_initial_backoff_millis = 10; 2621 config.transport.publish_max_backoff_millis = 10; 2622 }, 2623 ); 2624 let runtime = test_runtime.runtime.clone(); 2625 let signer_public_key = runtime.signer_identity().public_key(); 2626 let client_identity = 2627 identity("7777777777777777777777777777777777777777777777777777777777777777"); 2628 let base_created_at = Timestamp::now().as_secs(); 2629 2630 relay_a 2631 .queue_publish_outcomes(signer_public_key, &[true, true]) 2632 .await; 2633 relay_b 2634 .queue_publish_outcomes(signer_public_key, &[false, true]) 2635 .await; 2636 2637 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); 2638 let service_runtime = runtime.clone(); 2639 let listener_task = tokio::spawn(async move { 2640 service_runtime 2641 .run_until(async { 2642 let _ = shutdown_rx.await; 2643 }) 2644 .await 2645 }); 2646 2647 relay_a.wait_for_subscription_count(1).await?; 2648 relay_b.wait_for_subscription_count(1).await?; 2649 2650 let request = build_request_event( 2651 &client_identity, 2652 signer_public_key, 2653 connect_request_message("connect-all-1", signer_public_key, "shared-secret-all"), 2654 base_created_at, 2655 ); 2656 publish_event(relay_a.url(), &request).await?; 2657 2658 let response_events = relay_b 2659 .wait_for_published_events_by_author(signer_public_key, 1) 2660 .await?; 2661 let response = decrypt_response(&client_identity, signer_public_key, &response_events[0]); 2662 assert_eq!(response.id, "connect-all-1"); 2663 assert_eq!( 2664 response.result, 2665 Some(serde_json::Value::String("shared-secret-all".to_owned())) 2666 ); 2667 2668 wait_for_connect_secret_consumed(&runtime).await?; 2669 let operation_audit = wait_for_operation_audit_count(&runtime, 1).await?; 2670 assert_eq!( 2671 operation_audit[0].operation, 2672 MycOperationAuditKind::ListenerResponsePublish 2673 ); 2674 assert_eq!( 2675 operation_audit[0].outcome, 2676 MycOperationAuditOutcome::Succeeded 2677 ); 2678 assert_eq!(operation_audit[0].relay_count, 2); 2679 assert_eq!(operation_audit[0].acknowledged_relay_count, 2); 2680 assert_eq!( 2681 operation_audit[0].delivery_policy, 2682 Some(MycTransportDeliveryPolicy::All) 2683 ); 2684 assert_eq!( 2685 operation_audit[0].required_acknowledged_relay_count, 2686 Some(2) 2687 ); 2688 assert_eq!(operation_audit[0].publish_attempt_count, Some(2)); 2689 assert!( 2690 operation_audit[0] 2691 .relay_outcome_summary 2692 .contains("attempt 1: 1/2 relays acknowledged publish") 2693 ); 2694 2695 let _ = shutdown_tx.send(()); 2696 listener_task.await??; 2697 Ok(()) 2698 } 2699 2700 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 2701 async fn auth_replay_restores_pending_request_until_publish_succeeds() -> TestResult<()> { 2702 let relay = TestRelay::spawn().await?; 2703 let test_runtime = MycTestRuntime::new(relay.url(), MycConnectionApproval::NotRequired); 2704 let runtime = test_runtime.runtime; 2705 let signer_public_key = runtime.signer_identity().public_key(); 2706 let client_public_key = Keys::new(SecretKey::from_hex( 2707 "5555555555555555555555555555555555555555555555555555555555555555", 2708 )?) 2709 .public_key(); 2710 2711 relay 2712 .queue_publish_outcomes(signer_public_key, &[false, true]) 2713 .await; 2714 2715 let manager = runtime.signer_manager()?; 2716 let connection = manager.register_connection( 2717 RadrootsNostrSignerConnectionDraft::new(client_public_key, runtime.user_public_identity()) 2718 .with_relays(vec![nostr::RelayUrl::parse(relay.url())?]) 2719 .with_approval_requirement(RadrootsNostrSignerApprovalRequirement::NotRequired), 2720 )?; 2721 manager.require_auth_challenge(&connection.connection_id, "https://auth.example/flow")?; 2722 manager.set_pending_request(&connection.connection_id, ping_request_message("auth-ping"))?; 2723 2724 let first_attempt = control::authorize_auth_challenge(&runtime, &connection.connection_id) 2725 .await 2726 .expect_err("first replay publish should fail"); 2727 assert!(first_attempt.to_string().contains("Nostr publish failed")); 2728 2729 let restored = runtime 2730 .signer_manager()? 2731 .get_connection(&connection.connection_id)? 2732 .expect("restored connection"); 2733 assert_eq!(restored.auth_state, RadrootsNostrSignerAuthState::Pending); 2734 assert_eq!( 2735 restored 2736 .pending_request 2737 .as_ref() 2738 .expect("pending request") 2739 .request_id() 2740 .as_str(), 2741 "auth-ping" 2742 ); 2743 assert_eq!( 2744 restored 2745 .auth_challenge 2746 .as_ref() 2747 .expect("auth challenge") 2748 .authorized_at_unix, 2749 None 2750 ); 2751 let operation_audit = wait_for_operation_audit_count(&runtime, 2).await?; 2752 assert_eq!( 2753 operation_audit[0].operation, 2754 MycOperationAuditKind::AuthReplayPublish 2755 ); 2756 assert_eq!( 2757 operation_audit[0].outcome, 2758 MycOperationAuditOutcome::Rejected 2759 ); 2760 assert_eq!( 2761 operation_audit[0].connection_id.as_deref(), 2762 Some(connection.connection_id.as_str()) 2763 ); 2764 assert_eq!(operation_audit[0].request_id.as_deref(), Some("auth-ping")); 2765 assert_eq!(operation_audit[0].relay_count, 1); 2766 assert_eq!(operation_audit[0].acknowledged_relay_count, 0); 2767 assert!( 2768 operation_audit[0] 2769 .relay_outcome_summary 2770 .contains("blocked by test relay") 2771 ); 2772 assert_eq!( 2773 operation_audit[1].operation, 2774 MycOperationAuditKind::AuthReplayRestore 2775 ); 2776 assert_eq!( 2777 operation_audit[1].outcome, 2778 MycOperationAuditOutcome::Restored 2779 ); 2780 assert_eq!( 2781 operation_audit[1].connection_id.as_deref(), 2782 Some(connection.connection_id.as_str()) 2783 ); 2784 assert_eq!(operation_audit[1].request_id.as_deref(), Some("auth-ping")); 2785 assert_eq!(operation_audit[1].relay_count, 1); 2786 assert_eq!(operation_audit[1].acknowledged_relay_count, 0); 2787 assert!( 2788 operation_audit[1] 2789 .relay_outcome_summary 2790 .contains("preserved pending auth challenge") 2791 ); 2792 let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| { 2793 records.len() >= 1 && records[0].status == MycDeliveryOutboxStatus::Failed 2794 }) 2795 .await?; 2796 assert_eq!( 2797 outbox_records[0].kind, 2798 MycDeliveryOutboxKind::AuthReplayPublish 2799 ); 2800 assert_eq!(outbox_records[0].request_id.as_deref(), Some("auth-ping")); 2801 assert!(outbox_records[0].signer_publish_workflow_id.is_some()); 2802 assert!( 2803 runtime 2804 .signer_manager()? 2805 .list_publish_workflows()? 2806 .is_empty() 2807 ); 2808 2809 let replayed = control::authorize_auth_challenge(&runtime, &connection.connection_id).await?; 2810 assert_eq!(replayed.replayed_request_id.as_deref(), Some("auth-ping")); 2811 2812 let client_identity = 2813 identity("5555555555555555555555555555555555555555555555555555555555555555"); 2814 let response_events = relay 2815 .wait_for_published_events_by_author(signer_public_key, 1) 2816 .await?; 2817 let response = decrypt_response(&client_identity, signer_public_key, &response_events[0]); 2818 assert_eq!(response.id, "auth-ping"); 2819 assert_eq!( 2820 response.result, 2821 Some(serde_json::Value::String("pong".to_owned())) 2822 ); 2823 2824 let authorized = runtime 2825 .signer_manager()? 2826 .get_connection(&connection.connection_id)? 2827 .expect("authorized connection"); 2828 assert_eq!( 2829 authorized.auth_state, 2830 RadrootsNostrSignerAuthState::Authorized 2831 ); 2832 assert!(authorized.pending_request.is_none()); 2833 assert!(authorized.last_authenticated_at_unix.is_some()); 2834 let operation_audit = wait_for_operation_audit_count(&runtime, 3).await?; 2835 assert_eq!( 2836 operation_audit[2].operation, 2837 MycOperationAuditKind::AuthReplayPublish 2838 ); 2839 assert_eq!( 2840 operation_audit[2].outcome, 2841 MycOperationAuditOutcome::Succeeded 2842 ); 2843 assert_eq!( 2844 operation_audit[2].connection_id.as_deref(), 2845 Some(connection.connection_id.as_str()) 2846 ); 2847 assert_eq!(operation_audit[2].request_id.as_deref(), Some("auth-ping")); 2848 assert_eq!(operation_audit[2].relay_count, 1); 2849 assert_eq!(operation_audit[2].acknowledged_relay_count, 1); 2850 assert!( 2851 operation_audit[2] 2852 .relay_outcome_summary 2853 .contains("1/1 relays acknowledged publish") 2854 ); 2855 let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| { 2856 records.len() >= 2 && records[1].status == MycDeliveryOutboxStatus::Finalized 2857 }) 2858 .await?; 2859 assert_eq!( 2860 outbox_records[1].kind, 2861 MycDeliveryOutboxKind::AuthReplayPublish 2862 ); 2863 assert_eq!(outbox_records[1].request_id.as_deref(), Some("auth-ping")); 2864 assert!(outbox_records[1].published_at_unix.is_some()); 2865 assert!(outbox_records[1].finalized_at_unix.is_some()); 2866 assert!(outbox_records[1].signer_publish_workflow_id.is_some()); 2867 assert!( 2868 runtime 2869 .signer_manager()? 2870 .list_publish_workflows()? 2871 .is_empty() 2872 ); 2873 2874 Ok(()) 2875 } 2876 2877 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 2878 async fn explicit_nip89_publish_uses_app_identity_and_records_audit() -> TestResult<()> { 2879 let relay = TestRelay::spawn().await?; 2880 let test_runtime = 2881 MycTestRuntime::new_with_discovery(relay.url(), MycConnectionApproval::ExplicitUser); 2882 let runtime = test_runtime.runtime; 2883 let app_identity = myc::identity_files::load_encrypted_identity( 2884 runtime 2885 .config() 2886 .discovery 2887 .app_identity_path 2888 .as_ref() 2889 .expect("app identity path"), 2890 )?; 2891 2892 relay 2893 .queue_publish_outcomes(app_identity.public_key(), &[true]) 2894 .await; 2895 2896 let published = publish_nip89_event(&runtime).await?; 2897 let published_event_id = published.event.id.to_hex(); 2898 let published_events = relay 2899 .wait_for_published_events_by_author(app_identity.public_key(), 1) 2900 .await?; 2901 let event = &published_events[0]; 2902 let event_json = event.as_json(); 2903 2904 assert_eq!( 2905 published.author_public_key_hex, 2906 app_identity.public_key_hex() 2907 ); 2908 assert_eq!( 2909 published.signer_public_key_hex, 2910 runtime.signer_identity().public_key_hex() 2911 ); 2912 assert_eq!(event.kind.as_u16(), 31_990); 2913 assert!(event_json.contains("\"24133\"")); 2914 assert!(event_json.contains("\"relay\"")); 2915 assert!(event_json.contains("\"nostrconnect_url\"")); 2916 assert_eq!(published.relay_count, 1); 2917 assert_eq!(published.acknowledged_relay_count, 1); 2918 2919 let operation_audit = wait_for_operation_audit_count(&runtime, 1).await?; 2920 assert_eq!( 2921 operation_audit[0].operation, 2922 MycOperationAuditKind::DiscoveryHandlerPublish 2923 ); 2924 assert_eq!( 2925 operation_audit[0].outcome, 2926 MycOperationAuditOutcome::Succeeded 2927 ); 2928 assert!(operation_audit[0].connection_id.is_none()); 2929 assert_eq!( 2930 operation_audit[0].request_id.as_deref(), 2931 Some(published_event_id.as_str()) 2932 ); 2933 assert_eq!(operation_audit[0].relay_count, 1); 2934 assert_eq!(operation_audit[0].acknowledged_relay_count, 1); 2935 assert!( 2936 operation_audit[0] 2937 .relay_outcome_summary 2938 .contains("1/1 relays acknowledged publish") 2939 ); 2940 let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| { 2941 records.len() >= 1 && records[0].status == MycDeliveryOutboxStatus::Finalized 2942 }) 2943 .await?; 2944 assert_eq!( 2945 outbox_records[0].kind, 2946 MycDeliveryOutboxKind::DiscoveryHandlerPublish 2947 ); 2948 assert_eq!( 2949 outbox_records[0].request_id.as_deref(), 2950 Some(published_event_id.as_str()) 2951 ); 2952 assert!(outbox_records[0].attempt_id.is_none()); 2953 assert!(outbox_records[0].signer_publish_workflow_id.is_none()); 2954 assert!(outbox_records[0].published_at_unix.is_some()); 2955 assert!(outbox_records[0].finalized_at_unix.is_some()); 2956 2957 Ok(()) 2958 } 2959 2960 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 2961 async fn startup_recovery_republishes_queued_discovery_publish_job() -> TestResult<()> { 2962 let relay = TestRelay::spawn().await?; 2963 let test_runtime = 2964 MycTestRuntime::new_with_discovery(relay.url(), MycConnectionApproval::ExplicitUser); 2965 let MycTestRuntime { 2966 _temp: _tempdir, 2967 runtime, 2968 } = test_runtime; 2969 let config = runtime.config().clone(); 2970 let relay_url: RadrootsNostrRelayUrl = relay.url().parse()?; 2971 let context = MycDiscoveryContext::from_runtime(&runtime)?; 2972 let app_public_key = context.app_identity().public_key(); 2973 let event = context.build_signed_handler_event()?; 2974 let event_id = event.id.to_hex(); 2975 let outbox_record = MycDeliveryOutboxRecord::new( 2976 MycDeliveryOutboxKind::DiscoveryHandlerPublish, 2977 event, 2978 vec![relay_url], 2979 )? 2980 .with_request_id(event_id.as_str()); 2981 runtime.delivery_outbox_store().enqueue(&outbox_record)?; 2982 2983 runtime.run_until(async {}).await?; 2984 2985 let published = relay 2986 .wait_for_published_events_by_author(app_public_key, 1) 2987 .await?; 2988 assert_eq!(published.len(), 1); 2989 assert_eq!(published[0].id.to_hex(), event_id); 2990 2991 let restarted_runtime = MycRuntime::bootstrap(config)?; 2992 let outbox_records = restarted_runtime.delivery_outbox_store().list_all()?; 2993 assert_eq!(outbox_records.len(), 1); 2994 assert_eq!(outbox_records[0].status, MycDeliveryOutboxStatus::Finalized); 2995 assert_eq!( 2996 outbox_records[0].request_id.as_deref(), 2997 Some(event_id.as_str()) 2998 ); 2999 assert!(outbox_records[0].published_at_unix.is_some()); 3000 assert!(outbox_records[0].finalized_at_unix.is_some()); 3001 let audit_records = restarted_runtime.operation_audit_store().list_all()?; 3002 assert_eq!(audit_records.len(), 2); 3003 assert_eq!( 3004 audit_records[0].operation, 3005 MycOperationAuditKind::DiscoveryHandlerPublish 3006 ); 3007 assert_eq!( 3008 audit_records[0].outcome, 3009 MycOperationAuditOutcome::Succeeded 3010 ); 3011 assert_eq!( 3012 audit_records[0].request_id.as_deref(), 3013 Some(event_id.as_str()) 3014 ); 3015 assert_eq!( 3016 audit_records[1].operation, 3017 MycOperationAuditKind::DeliveryRecovery 3018 ); 3019 assert_eq!( 3020 audit_records[1].outcome, 3021 MycOperationAuditOutcome::Succeeded 3022 ); 3023 3024 Ok(()) 3025 } 3026 3027 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 3028 async fn startup_recovery_finalizes_published_discovery_publish_job() -> TestResult<()> { 3029 let relay = TestRelay::spawn().await?; 3030 let test_runtime = 3031 MycTestRuntime::new_with_discovery(relay.url(), MycConnectionApproval::ExplicitUser); 3032 let MycTestRuntime { 3033 _temp: _tempdir, 3034 runtime, 3035 } = test_runtime; 3036 let config = runtime.config().clone(); 3037 let relay_url: RadrootsNostrRelayUrl = relay.url().parse()?; 3038 let context = MycDiscoveryContext::from_runtime(&runtime)?; 3039 let app_public_key = context.app_identity().public_key(); 3040 let event = context.build_signed_handler_event()?; 3041 let event_id = event.id.to_hex(); 3042 let outbox_record = MycDeliveryOutboxRecord::new( 3043 MycDeliveryOutboxKind::DiscoveryHandlerPublish, 3044 event, 3045 vec![relay_url], 3046 )? 3047 .with_request_id(event_id.as_str()); 3048 runtime.delivery_outbox_store().enqueue(&outbox_record)?; 3049 runtime 3050 .delivery_outbox_store() 3051 .mark_published_pending_finalize(&outbox_record.job_id, 1)?; 3052 3053 runtime.run_until(async {}).await?; 3054 3055 sleep(Duration::from_millis(100)).await; 3056 assert!( 3057 relay 3058 .published_events_by_author(app_public_key) 3059 .await 3060 .is_empty() 3061 ); 3062 3063 let restarted_runtime = MycRuntime::bootstrap(config)?; 3064 let outbox_records = restarted_runtime.delivery_outbox_store().list_all()?; 3065 assert_eq!(outbox_records.len(), 1); 3066 assert_eq!(outbox_records[0].status, MycDeliveryOutboxStatus::Finalized); 3067 assert_eq!( 3068 outbox_records[0].request_id.as_deref(), 3069 Some(event_id.as_str()) 3070 ); 3071 assert!(outbox_records[0].published_at_unix.is_some()); 3072 assert!(outbox_records[0].finalized_at_unix.is_some()); 3073 let audit_records = restarted_runtime.operation_audit_store().list_all()?; 3074 assert_eq!(audit_records.len(), 2); 3075 assert_eq!( 3076 audit_records[0].operation, 3077 MycOperationAuditKind::DiscoveryHandlerPublish 3078 ); 3079 assert_eq!( 3080 audit_records[0].outcome, 3081 MycOperationAuditOutcome::Succeeded 3082 ); 3083 assert_eq!( 3084 audit_records[0].request_id.as_deref(), 3085 Some(event_id.as_str()) 3086 ); 3087 assert!( 3088 audit_records[0] 3089 .relay_outcome_summary 3090 .contains("startup recovery finalized previously published delivery job") 3091 ); 3092 assert_eq!( 3093 audit_records[1].operation, 3094 MycOperationAuditKind::DeliveryRecovery 3095 ); 3096 assert_eq!( 3097 audit_records[1].outcome, 3098 MycOperationAuditOutcome::Succeeded 3099 ); 3100 3101 Ok(()) 3102 } 3103 3104 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 3105 async fn explicit_nip89_publish_retries_cleanly_after_rejection() -> TestResult<()> { 3106 let relay = TestRelay::spawn().await?; 3107 let test_runtime = 3108 MycTestRuntime::new_with_discovery(relay.url(), MycConnectionApproval::ExplicitUser); 3109 let runtime = test_runtime.runtime; 3110 let app_identity = myc::identity_files::load_encrypted_identity( 3111 runtime 3112 .config() 3113 .discovery 3114 .app_identity_path 3115 .as_ref() 3116 .expect("app identity path"), 3117 )?; 3118 3119 relay 3120 .queue_publish_outcomes(app_identity.public_key(), &[false, true]) 3121 .await; 3122 3123 let failed = publish_nip89_event(&runtime) 3124 .await 3125 .expect_err("first publish should fail"); 3126 assert!(failed.to_string().contains("Nostr publish failed")); 3127 assert!( 3128 relay 3129 .published_events_by_author(app_identity.public_key()) 3130 .await 3131 .is_empty() 3132 ); 3133 3134 let first_audit = wait_for_operation_audit_count(&runtime, 1).await?; 3135 assert_eq!( 3136 first_audit[0].operation, 3137 MycOperationAuditKind::DiscoveryHandlerPublish 3138 ); 3139 assert_eq!(first_audit[0].outcome, MycOperationAuditOutcome::Rejected); 3140 assert!(first_audit[0].connection_id.is_none()); 3141 assert!(first_audit[0].request_id.is_some()); 3142 assert_eq!(first_audit[0].relay_count, 1); 3143 assert_eq!(first_audit[0].acknowledged_relay_count, 0); 3144 assert!( 3145 first_audit[0] 3146 .relay_outcome_summary 3147 .contains("blocked by test relay") 3148 ); 3149 let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| { 3150 records.len() >= 1 && records[0].status == MycDeliveryOutboxStatus::Failed 3151 }) 3152 .await?; 3153 assert_eq!( 3154 outbox_records[0].kind, 3155 MycDeliveryOutboxKind::DiscoveryHandlerPublish 3156 ); 3157 assert_eq!( 3158 outbox_records[0].request_id.as_deref(), 3159 first_audit[0].request_id.as_deref() 3160 ); 3161 assert!(outbox_records[0].attempt_id.is_none()); 3162 assert!(outbox_records[0].signer_publish_workflow_id.is_none()); 3163 3164 let published = publish_nip89_event(&runtime).await?; 3165 let published_events = relay 3166 .wait_for_published_events_by_author(app_identity.public_key(), 1) 3167 .await?; 3168 assert_eq!(published_events.len(), 1); 3169 assert_eq!(published.relay_count, 1); 3170 assert_eq!(published.acknowledged_relay_count, 1); 3171 3172 let second_audit = wait_for_operation_audit_count(&runtime, 2).await?; 3173 assert_eq!( 3174 second_audit[1].operation, 3175 MycOperationAuditKind::DiscoveryHandlerPublish 3176 ); 3177 assert_eq!(second_audit[1].outcome, MycOperationAuditOutcome::Succeeded); 3178 assert_eq!( 3179 second_audit[1].request_id.as_deref(), 3180 Some(published.event.id.to_hex().as_str()) 3181 ); 3182 assert_eq!(second_audit[1].relay_count, 1); 3183 assert_eq!(second_audit[1].acknowledged_relay_count, 1); 3184 assert!( 3185 second_audit[1] 3186 .relay_outcome_summary 3187 .contains("1/1 relays acknowledged publish") 3188 ); 3189 let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| { 3190 records.len() >= 2 && records[1].status == MycDeliveryOutboxStatus::Finalized 3191 }) 3192 .await?; 3193 assert_eq!(outbox_records[0].status, MycDeliveryOutboxStatus::Failed); 3194 assert_eq!( 3195 outbox_records[1].kind, 3196 MycDeliveryOutboxKind::DiscoveryHandlerPublish 3197 ); 3198 assert_eq!( 3199 outbox_records[1].request_id.as_deref(), 3200 Some(published.event.id.to_hex().as_str()) 3201 ); 3202 assert!(outbox_records[1].attempt_id.is_none()); 3203 assert!(outbox_records[1].signer_publish_workflow_id.is_none()); 3204 assert!(outbox_records[1].published_at_unix.is_some()); 3205 assert!(outbox_records[1].finalized_at_unix.is_some()); 3206 3207 Ok(()) 3208 } 3209 3210 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 3211 async fn fetch_live_nip89_reports_missing_when_handler_is_unpublished() -> TestResult<()> { 3212 let relay = TestRelay::spawn().await?; 3213 let test_runtime = 3214 MycTestRuntime::new_with_discovery(relay.url(), MycConnectionApproval::ExplicitUser); 3215 3216 let output = fetch_live_nip89(&test_runtime.runtime).await?; 3217 3218 assert_eq!(output.handler_identifier, "myc"); 3219 assert_eq!(output.publish_relays, vec![relay.url().to_owned()]); 3220 assert!(output.live_groups.is_empty()); 3221 assert_eq!(output.relay_states.len(), 1); 3222 assert_eq!( 3223 output.relay_states[0].fetch_status, 3224 MycDiscoveryRelayFetchStatus::Available 3225 ); 3226 3227 Ok(()) 3228 } 3229 3230 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 3231 async fn fetch_live_nip89_fails_when_all_discovery_relays_are_unavailable() -> TestResult<()> { 3232 let unavailable_a = unavailable_relay_url()?; 3233 let unavailable_b = unavailable_relay_url()?; 3234 let test_runtime = MycTestRuntime::new_with_discovery_relays( 3235 &[unavailable_a.as_str(), unavailable_b.as_str()], 3236 MycConnectionApproval::ExplicitUser, 3237 ); 3238 3239 let error = fetch_live_nip89(&test_runtime.runtime) 3240 .await 3241 .expect_err("all-unavailable discovery fetch should fail"); 3242 assert!( 3243 error 3244 .to_string() 3245 .contains("failed to fetch discovery state from all configured relays") 3246 ); 3247 3248 let audit = wait_for_operation_audit_count(&test_runtime.runtime, 1).await?; 3249 assert_eq!( 3250 audit[0].operation, 3251 MycOperationAuditKind::DiscoveryHandlerFetch 3252 ); 3253 assert_eq!(audit[0].outcome, MycOperationAuditOutcome::Unavailable); 3254 assert_eq!(audit[0].relay_count, 2); 3255 assert_eq!(audit[0].acknowledged_relay_count, 0); 3256 assert!(audit[0].relay_outcome_summary.contains(&unavailable_a)); 3257 assert!(audit[0].relay_outcome_summary.contains(&unavailable_b)); 3258 3259 Ok(()) 3260 } 3261 3262 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 3263 async fn fetch_live_nip89_parallelizes_relay_fetch_and_preserves_configured_order() -> TestResult<()> 3264 { 3265 let live_relay = TestRelay::spawn().await?; 3266 let slow_a = HangingRelay::spawn(Duration::from_secs(3)).await?; 3267 let slow_b = HangingRelay::spawn(Duration::from_secs(3)).await?; 3268 let slow_c = HangingRelay::spawn(Duration::from_secs(3)).await?; 3269 let slow_d = HangingRelay::spawn(Duration::from_secs(3)).await?; 3270 let relay_urls = [ 3271 slow_a.url(), 3272 live_relay.url(), 3273 slow_b.url(), 3274 slow_c.url(), 3275 slow_d.url(), 3276 ]; 3277 let mut expected_relay_states = vec![ 3278 ( 3279 slow_a.url().to_owned(), 3280 MycDiscoveryRelayFetchStatus::Unavailable, 3281 ), 3282 ( 3283 live_relay.url().to_owned(), 3284 MycDiscoveryRelayFetchStatus::Available, 3285 ), 3286 ( 3287 slow_b.url().to_owned(), 3288 MycDiscoveryRelayFetchStatus::Unavailable, 3289 ), 3290 ( 3291 slow_c.url().to_owned(), 3292 MycDiscoveryRelayFetchStatus::Unavailable, 3293 ), 3294 ( 3295 slow_d.url().to_owned(), 3296 MycDiscoveryRelayFetchStatus::Unavailable, 3297 ), 3298 ]; 3299 expected_relay_states.sort_by(|left, right| left.0.cmp(&right.0)); 3300 let expected_relay_urls = expected_relay_states 3301 .iter() 3302 .map(|(relay_url, _)| relay_url.clone()) 3303 .collect::<Vec<_>>(); 3304 let test_runtime = MycTestRuntime::new_with_discovery_relays_and_timeout( 3305 &relay_urls, 3306 MycConnectionApproval::ExplicitUser, 3307 1, 3308 ); 3309 3310 let started_at = Instant::now(); 3311 let output = fetch_live_nip89(&test_runtime.runtime).await?; 3312 let elapsed = started_at.elapsed(); 3313 3314 assert!( 3315 elapsed < Duration::from_millis(2500), 3316 "expected concurrent relay fetch to finish under 2.5s, got {:?}", 3317 elapsed 3318 ); 3319 assert_eq!( 3320 output 3321 .relay_states 3322 .iter() 3323 .map(|relay_state| relay_state.relay_url.clone()) 3324 .collect::<Vec<_>>(), 3325 expected_relay_urls 3326 ); 3327 assert_eq!( 3328 output 3329 .relay_states 3330 .iter() 3331 .map(|relay_state| relay_state.fetch_status) 3332 .collect::<Vec<_>>(), 3333 expected_relay_states 3334 .iter() 3335 .map(|(_, fetch_status)| *fetch_status) 3336 .collect::<Vec<_>>() 3337 ); 3338 for relay_state in &output.relay_states { 3339 if relay_state.fetch_status == MycDiscoveryRelayFetchStatus::Available { 3340 assert!(relay_state.fetch_error.is_none()); 3341 assert!(relay_state.live_groups.is_empty()); 3342 } else { 3343 assert!(relay_state.fetch_error.is_some()); 3344 } 3345 } 3346 3347 Ok(()) 3348 } 3349 3350 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 3351 async fn diff_live_nip89_reports_matched_after_publish() -> TestResult<()> { 3352 let relay = TestRelay::spawn().await?; 3353 let test_runtime = 3354 MycTestRuntime::new_with_discovery(relay.url(), MycConnectionApproval::ExplicitUser); 3355 let runtime = test_runtime.runtime; 3356 let app_identity = myc::identity_files::load_encrypted_identity( 3357 runtime 3358 .config() 3359 .discovery 3360 .app_identity_path 3361 .as_ref() 3362 .expect("app identity path"), 3363 )?; 3364 3365 relay 3366 .queue_publish_outcomes(app_identity.public_key(), &[true]) 3367 .await; 3368 let published = publish_nip89_event(&runtime).await?; 3369 relay 3370 .wait_for_published_events_by_author(app_identity.public_key(), 1) 3371 .await?; 3372 3373 let diff = diff_live_nip89(&runtime).await?; 3374 3375 assert_eq!(diff.status, MycDiscoveryLiveStatus::Matched); 3376 assert!(diff.differing_fields.is_empty()); 3377 assert_eq!(diff.live_groups.len(), 1); 3378 let live_event = diff.live_groups[0] 3379 .events 3380 .last() 3381 .cloned() 3382 .expect("live event"); 3383 assert_eq!(live_event.event_id_hex, published.event.id.to_hex()); 3384 assert_eq!( 3385 live_event.handler.author_public_key_hex, 3386 app_identity.public_key_hex() 3387 ); 3388 assert_eq!(live_event.handler.kinds, vec![24_133]); 3389 3390 Ok(()) 3391 } 3392 3393 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 3394 async fn refresh_nip89_publishes_when_live_handler_is_missing() -> TestResult<()> { 3395 let relay = TestRelay::spawn().await?; 3396 let test_runtime = 3397 MycTestRuntime::new_with_discovery(relay.url(), MycConnectionApproval::ExplicitUser); 3398 let runtime = test_runtime.runtime; 3399 let app_identity = myc::identity_files::load_encrypted_identity( 3400 runtime 3401 .config() 3402 .discovery 3403 .app_identity_path 3404 .as_ref() 3405 .expect("app identity path"), 3406 )?; 3407 3408 relay 3409 .queue_publish_outcomes(app_identity.public_key(), &[true]) 3410 .await; 3411 3412 let refreshed = refresh_nip89(&runtime, false).await?; 3413 3414 assert_eq!(refreshed.status, MycDiscoveryLiveStatus::Missing); 3415 assert_eq!(refreshed.differing_fields, vec!["live_groups".to_owned()]); 3416 assert!(refreshed.live_groups.is_empty()); 3417 assert!(refreshed.published.is_some()); 3418 assert_eq!(refreshed.repair_summary.repaired, 1); 3419 assert_eq!(refreshed.repair_summary.failed, 0); 3420 assert_eq!(refreshed.repair_summary.unchanged, 0); 3421 assert_eq!(refreshed.repair_summary.skipped, 0); 3422 assert_eq!(refreshed.remaining_repair_relays, Vec::<String>::new()); 3423 assert_eq!(refreshed.repair_results.len(), 1); 3424 assert_eq!( 3425 refreshed.repair_results[0].outcome, 3426 MycDiscoveryRepairOutcome::Repaired 3427 ); 3428 3429 let audit = wait_for_operation_audit_count(&runtime, 3).await?; 3430 assert_eq!( 3431 audit[0].operation, 3432 MycOperationAuditKind::DiscoveryHandlerCompare 3433 ); 3434 assert_eq!(audit[0].outcome, MycOperationAuditOutcome::Missing); 3435 assert_eq!( 3436 audit[1].operation, 3437 MycOperationAuditKind::DiscoveryHandlerPublish 3438 ); 3439 assert_eq!(audit[1].outcome, MycOperationAuditOutcome::Succeeded); 3440 assert_eq!( 3441 audit[2].operation, 3442 MycOperationAuditKind::DiscoveryHandlerRepair 3443 ); 3444 assert_eq!(audit[2].outcome, MycOperationAuditOutcome::Succeeded); 3445 let published = refreshed 3446 .published 3447 .as_ref() 3448 .expect("published discovery output"); 3449 let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| { 3450 records.len() >= 1 && records[0].status == MycDeliveryOutboxStatus::Finalized 3451 }) 3452 .await?; 3453 assert_eq!( 3454 outbox_records[0].kind, 3455 MycDeliveryOutboxKind::DiscoveryHandlerPublish 3456 ); 3457 assert_eq!( 3458 outbox_records[0].request_id.as_deref(), 3459 Some(published.event.id.to_hex().as_str()) 3460 ); 3461 assert_eq!( 3462 outbox_records[0].attempt_id.as_deref(), 3463 Some(refreshed.attempt_id.as_str()) 3464 ); 3465 assert!(outbox_records[0].signer_publish_workflow_id.is_none()); 3466 assert!(outbox_records[0].published_at_unix.is_some()); 3467 assert!(outbox_records[0].finalized_at_unix.is_some()); 3468 3469 Ok(()) 3470 } 3471 3472 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 3473 async fn refresh_nip89_repairs_missing_relays_without_republishing_matched_relays() -> TestResult<()> 3474 { 3475 let relay_a = TestRelay::spawn().await?; 3476 let relay_b = TestRelay::spawn().await?; 3477 let test_runtime = MycTestRuntime::new_with_discovery_relays( 3478 &[relay_a.url(), relay_b.url()], 3479 MycConnectionApproval::ExplicitUser, 3480 ); 3481 let runtime = test_runtime.runtime; 3482 let app_identity = myc::identity_files::load_encrypted_identity( 3483 runtime 3484 .config() 3485 .discovery 3486 .app_identity_path 3487 .as_ref() 3488 .expect("app identity path"), 3489 )?; 3490 3491 let matched_event = MycDiscoveryContext::from_runtime(&runtime)? 3492 .build_signed_handler_event() 3493 .expect("matched event"); 3494 publish_signed_event(relay_a.url(), &app_identity, &matched_event).await?; 3495 relay_a 3496 .wait_for_published_events_by_author(app_identity.public_key(), 1) 3497 .await?; 3498 3499 relay_b 3500 .queue_publish_outcomes(app_identity.public_key(), &[true]) 3501 .await; 3502 let refreshed = refresh_nip89(&runtime, false).await?; 3503 let published = refreshed.published.expect("published output"); 3504 3505 assert_eq!(refreshed.status, MycDiscoveryLiveStatus::Matched); 3506 assert_eq!(published.publish_relays, vec![relay_b.url().to_owned()]); 3507 assert_eq!(published.relay_count, 1); 3508 assert_eq!(published.acknowledged_relay_count, 1); 3509 assert_eq!(refreshed.repair_summary.repaired, 1); 3510 assert_eq!(refreshed.repair_summary.failed, 0); 3511 assert_eq!(refreshed.repair_summary.unchanged, 1); 3512 assert_eq!(refreshed.repair_summary.skipped, 0); 3513 assert_eq!(refreshed.remaining_repair_relays, Vec::<String>::new()); 3514 assert_eq!(refreshed.repair_results.len(), 2); 3515 assert_eq!( 3516 refreshed 3517 .repair_results 3518 .iter() 3519 .find(|result| result.relay_url == relay_a.url()) 3520 .expect("matched relay repair result") 3521 .outcome, 3522 MycDiscoveryRepairOutcome::Unchanged 3523 ); 3524 assert_eq!( 3525 refreshed 3526 .repair_results 3527 .iter() 3528 .find(|result| result.relay_url == relay_b.url()) 3529 .expect("repaired relay result") 3530 .outcome, 3531 MycDiscoveryRepairOutcome::Repaired 3532 ); 3533 3534 relay_b 3535 .wait_for_published_events_by_author(app_identity.public_key(), 1) 3536 .await?; 3537 assert_eq!( 3538 relay_a 3539 .published_events_by_author(app_identity.public_key()) 3540 .await 3541 .len(), 3542 1 3543 ); 3544 assert_eq!( 3545 relay_b 3546 .published_events_by_author(app_identity.public_key()) 3547 .await 3548 .len(), 3549 1 3550 ); 3551 3552 let diff = diff_live_nip89(&runtime).await?; 3553 assert_eq!(diff.status, MycDiscoveryLiveStatus::Matched); 3554 assert_eq!(diff.relay_summary.matched_relays.len(), 2); 3555 assert!(diff.relay_summary.missing_relays.is_empty()); 3556 3557 Ok(()) 3558 } 3559 3560 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 3561 async fn refresh_nip89_skips_when_live_handler_matches() -> TestResult<()> { 3562 let relay = TestRelay::spawn().await?; 3563 let test_runtime = 3564 MycTestRuntime::new_with_discovery(relay.url(), MycConnectionApproval::ExplicitUser); 3565 let runtime = test_runtime.runtime; 3566 let app_identity = myc::identity_files::load_encrypted_identity( 3567 runtime 3568 .config() 3569 .discovery 3570 .app_identity_path 3571 .as_ref() 3572 .expect("app identity path"), 3573 )?; 3574 3575 relay 3576 .queue_publish_outcomes(app_identity.public_key(), &[true]) 3577 .await; 3578 publish_nip89_event(&runtime).await?; 3579 relay 3580 .wait_for_published_events_by_author(app_identity.public_key(), 1) 3581 .await?; 3582 3583 let refreshed = refresh_nip89(&runtime, false).await?; 3584 3585 assert_eq!(refreshed.status, MycDiscoveryLiveStatus::Matched); 3586 assert!(refreshed.differing_fields.is_empty()); 3587 assert_eq!(refreshed.live_groups.len(), 1); 3588 assert!(refreshed.published.is_none()); 3589 assert_eq!(refreshed.repair_summary.repaired, 0); 3590 assert_eq!(refreshed.repair_summary.failed, 0); 3591 assert_eq!(refreshed.repair_summary.unchanged, 1); 3592 assert_eq!(refreshed.repair_summary.skipped, 0); 3593 assert_eq!(refreshed.remaining_repair_relays, Vec::<String>::new()); 3594 assert_eq!(refreshed.repair_results.len(), 1); 3595 assert_eq!( 3596 refreshed.repair_results[0].outcome, 3597 MycDiscoveryRepairOutcome::Unchanged 3598 ); 3599 3600 let audit = wait_for_operation_audit_count(&runtime, 4).await?; 3601 assert_eq!( 3602 audit[1].operation, 3603 MycOperationAuditKind::DiscoveryHandlerCompare 3604 ); 3605 assert_eq!(audit[1].outcome, MycOperationAuditOutcome::Matched); 3606 assert_eq!( 3607 audit[2].operation, 3608 MycOperationAuditKind::DiscoveryHandlerRepair 3609 ); 3610 assert_eq!(audit[2].outcome, MycOperationAuditOutcome::Matched); 3611 assert_eq!( 3612 audit[3].operation, 3613 MycOperationAuditKind::DiscoveryHandlerRefresh 3614 ); 3615 assert_eq!(audit[3].outcome, MycOperationAuditOutcome::Skipped); 3616 3617 Ok(()) 3618 } 3619 3620 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 3621 async fn refresh_nip89_republishes_when_live_handler_drifted() -> TestResult<()> { 3622 let relay = TestRelay::spawn().await?; 3623 let test_runtime = 3624 MycTestRuntime::new_with_discovery(relay.url(), MycConnectionApproval::ExplicitUser); 3625 let runtime = test_runtime.runtime; 3626 let app_identity = myc::identity_files::load_encrypted_identity( 3627 runtime 3628 .config() 3629 .discovery 3630 .app_identity_path 3631 .as_ref() 3632 .expect("app identity path"), 3633 )?; 3634 3635 let mut drifted_spec = RadrootsNostrApplicationHandlerSpec::new(vec![24_133]); 3636 drifted_spec.identifier = Some("myc".to_owned()); 3637 drifted_spec.relays = vec!["wss://wrong.example.com".to_owned()]; 3638 drifted_spec.nostrconnect_url = 3639 Some("https://wrong.example.com/connect?uri=nostrconnect%3A%2F%2Fstale".to_owned()); 3640 let mut metadata = RadrootsNostrMetadata::default(); 3641 metadata.name = Some("stale".to_owned()); 3642 drifted_spec.metadata = Some(metadata); 3643 publish_handler_event(relay.url(), &app_identity, &drifted_spec).await?; 3644 relay 3645 .wait_for_published_events_by_author(app_identity.public_key(), 1) 3646 .await?; 3647 3648 relay 3649 .queue_publish_outcomes(app_identity.public_key(), &[true]) 3650 .await; 3651 let refreshed = refresh_nip89(&runtime, false).await?; 3652 3653 assert_eq!(refreshed.status, MycDiscoveryLiveStatus::Drifted); 3654 assert_eq!(refreshed.live_groups.len(), 1); 3655 assert!(refreshed.published.is_some()); 3656 assert_eq!(refreshed.repair_summary.repaired, 1); 3657 assert_eq!(refreshed.repair_summary.failed, 0); 3658 assert_eq!(refreshed.repair_summary.unchanged, 0); 3659 assert_eq!(refreshed.repair_summary.skipped, 0); 3660 assert_eq!(refreshed.remaining_repair_relays, Vec::<String>::new()); 3661 assert_eq!(refreshed.repair_results.len(), 1); 3662 assert_eq!( 3663 refreshed.repair_results[0].outcome, 3664 MycDiscoveryRepairOutcome::Repaired 3665 ); 3666 assert!( 3667 refreshed 3668 .differing_fields 3669 .iter() 3670 .any(|field| field == "relays" || field == "nostrconnect_url" || field == "metadata") 3671 ); 3672 3673 let audit = wait_for_operation_audit_count(&runtime, 3).await?; 3674 assert_eq!( 3675 audit[0].operation, 3676 MycOperationAuditKind::DiscoveryHandlerCompare 3677 ); 3678 assert_eq!(audit[0].outcome, MycOperationAuditOutcome::Drifted); 3679 assert_eq!( 3680 audit[1].operation, 3681 MycOperationAuditKind::DiscoveryHandlerPublish 3682 ); 3683 assert_eq!(audit[1].outcome, MycOperationAuditOutcome::Succeeded); 3684 assert_eq!( 3685 audit[2].operation, 3686 MycOperationAuditKind::DiscoveryHandlerRepair 3687 ); 3688 assert_eq!(audit[2].outcome, MycOperationAuditOutcome::Succeeded); 3689 3690 Ok(()) 3691 } 3692 3693 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 3694 async fn refresh_nip89_repairs_drifted_relays_without_force_when_other_relays_match() 3695 -> TestResult<()> { 3696 let relay_a = TestRelay::spawn().await?; 3697 let relay_b = TestRelay::spawn().await?; 3698 let test_runtime = MycTestRuntime::new_with_discovery_relays( 3699 &[relay_a.url(), relay_b.url()], 3700 MycConnectionApproval::ExplicitUser, 3701 ); 3702 let runtime = test_runtime.runtime; 3703 let app_identity = myc::identity_files::load_encrypted_identity( 3704 runtime 3705 .config() 3706 .discovery 3707 .app_identity_path 3708 .as_ref() 3709 .expect("app identity path"), 3710 )?; 3711 3712 let matched_event = MycDiscoveryContext::from_runtime(&runtime)? 3713 .build_signed_handler_event() 3714 .expect("matched event"); 3715 publish_signed_event(relay_a.url(), &app_identity, &matched_event).await?; 3716 3717 let mut drifted_spec = RadrootsNostrApplicationHandlerSpec::new(vec![24_133]); 3718 drifted_spec.identifier = Some("myc".to_owned()); 3719 drifted_spec.relays = vec!["wss://stale.example.com".to_owned()]; 3720 publish_handler_event(relay_b.url(), &app_identity, &drifted_spec).await?; 3721 3722 relay_a 3723 .wait_for_published_events_by_author(app_identity.public_key(), 1) 3724 .await?; 3725 relay_b 3726 .wait_for_published_events_by_author(app_identity.public_key(), 1) 3727 .await?; 3728 3729 relay_b 3730 .queue_publish_outcomes(app_identity.public_key(), &[true]) 3731 .await; 3732 let refreshed = refresh_nip89(&runtime, false).await?; 3733 let published = refreshed.published.expect("published output"); 3734 3735 assert_eq!(refreshed.status, MycDiscoveryLiveStatus::Conflicted); 3736 assert_eq!(published.publish_relays, vec![relay_b.url().to_owned()]); 3737 assert_eq!(published.relay_count, 1); 3738 assert_eq!(published.acknowledged_relay_count, 1); 3739 assert_eq!(refreshed.repair_summary.repaired, 1); 3740 assert_eq!(refreshed.repair_summary.failed, 0); 3741 assert_eq!(refreshed.repair_summary.unchanged, 1); 3742 assert_eq!(refreshed.repair_summary.skipped, 0); 3743 assert_eq!(refreshed.remaining_repair_relays, Vec::<String>::new()); 3744 assert_eq!(refreshed.repair_results.len(), 2); 3745 assert_eq!( 3746 refreshed 3747 .repair_results 3748 .iter() 3749 .find(|result| result.relay_url == relay_a.url()) 3750 .expect("matched relay result") 3751 .outcome, 3752 MycDiscoveryRepairOutcome::Unchanged 3753 ); 3754 assert_eq!( 3755 refreshed 3756 .repair_results 3757 .iter() 3758 .find(|result| result.relay_url == relay_b.url()) 3759 .expect("repaired relay result") 3760 .outcome, 3761 MycDiscoveryRepairOutcome::Repaired 3762 ); 3763 3764 relay_b 3765 .wait_for_published_events_by_author(app_identity.public_key(), 2) 3766 .await?; 3767 assert_eq!( 3768 relay_a 3769 .published_events_by_author(app_identity.public_key()) 3770 .await 3771 .len(), 3772 1 3773 ); 3774 assert_eq!( 3775 relay_b 3776 .published_events_by_author(app_identity.public_key()) 3777 .await 3778 .len(), 3779 2 3780 ); 3781 3782 Ok(()) 3783 } 3784 3785 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 3786 async fn refresh_nip89_reports_remaining_relays_after_mixed_targeted_repair() -> TestResult<()> { 3787 let relay_a = TestRelay::spawn().await?; 3788 let relay_b = TestRelay::spawn().await?; 3789 let test_runtime = MycTestRuntime::new_with_discovery_relays( 3790 &[relay_a.url(), relay_b.url()], 3791 MycConnectionApproval::ExplicitUser, 3792 ); 3793 let runtime = test_runtime.runtime; 3794 let app_identity = myc::identity_files::load_encrypted_identity( 3795 runtime 3796 .config() 3797 .discovery 3798 .app_identity_path 3799 .as_ref() 3800 .expect("app identity path"), 3801 )?; 3802 3803 relay_a 3804 .queue_publish_outcomes(app_identity.public_key(), &[true]) 3805 .await; 3806 relay_b 3807 .queue_publish_outcomes(app_identity.public_key(), &[false]) 3808 .await; 3809 3810 let refreshed = refresh_nip89(&runtime, false).await?; 3811 let published = refreshed.published.expect("published output"); 3812 3813 assert_eq!(refreshed.status, MycDiscoveryLiveStatus::Missing); 3814 assert_eq!( 3815 published.publish_relays, 3816 vec![relay_a.url().to_owned(), relay_b.url().to_owned()] 3817 ); 3818 assert_eq!(published.relay_count, 2); 3819 assert_eq!(published.acknowledged_relay_count, 1); 3820 assert_eq!(published.relay_results.len(), 2); 3821 assert_eq!(refreshed.repair_summary.repaired, 1); 3822 assert_eq!(refreshed.repair_summary.failed, 1); 3823 assert_eq!(refreshed.repair_summary.unchanged, 0); 3824 assert_eq!(refreshed.repair_summary.skipped, 0); 3825 assert_eq!(refreshed.repair_results.len(), 2); 3826 assert_eq!( 3827 refreshed.remaining_repair_relays, 3828 vec![relay_b.url().to_owned()] 3829 ); 3830 3831 let repaired = refreshed 3832 .repair_results 3833 .iter() 3834 .find(|result| result.relay_url == relay_a.url()) 3835 .expect("repaired relay result"); 3836 assert_eq!(repaired.outcome, MycDiscoveryRepairOutcome::Repaired); 3837 3838 let failed = refreshed 3839 .repair_results 3840 .iter() 3841 .find(|result| result.relay_url == relay_b.url()) 3842 .expect("failed relay result"); 3843 assert_eq!(failed.outcome, MycDiscoveryRepairOutcome::Failed); 3844 assert!( 3845 failed 3846 .detail 3847 .as_deref() 3848 .unwrap_or_default() 3849 .contains("blocked by test relay") 3850 ); 3851 3852 relay_a 3853 .wait_for_published_events_by_author(app_identity.public_key(), 1) 3854 .await?; 3855 assert_eq!( 3856 relay_b 3857 .published_events_by_author(app_identity.public_key()) 3858 .await 3859 .len(), 3860 0 3861 ); 3862 3863 let diff = diff_live_nip89(&runtime).await?; 3864 assert_eq!(diff.status, MycDiscoveryLiveStatus::Matched); 3865 assert_eq!( 3866 diff.relay_summary.matched_relays, 3867 vec![relay_a.url().to_owned()] 3868 ); 3869 assert_eq!( 3870 diff.relay_summary.missing_relays, 3871 vec![relay_b.url().to_owned()] 3872 ); 3873 3874 let audit = wait_for_operation_audit_count(&runtime, 4).await?; 3875 assert_eq!( 3876 audit[0].operation, 3877 MycOperationAuditKind::DiscoveryHandlerCompare 3878 ); 3879 assert_eq!(audit[0].outcome, MycOperationAuditOutcome::Missing); 3880 assert_eq!( 3881 audit[1].operation, 3882 MycOperationAuditKind::DiscoveryHandlerPublish 3883 ); 3884 assert_eq!(audit[1].outcome, MycOperationAuditOutcome::Succeeded); 3885 assert_eq!( 3886 audit[2].operation, 3887 MycOperationAuditKind::DiscoveryHandlerRepair 3888 ); 3889 assert_eq!(audit[2].outcome, MycOperationAuditOutcome::Succeeded); 3890 assert_eq!(audit[2].relay_url.as_deref(), Some(relay_a.url())); 3891 assert_eq!( 3892 audit[3].operation, 3893 MycOperationAuditKind::DiscoveryHandlerRepair 3894 ); 3895 assert_eq!(audit[3].outcome, MycOperationAuditOutcome::Rejected); 3896 assert_eq!(audit[3].relay_url.as_deref(), Some(relay_b.url())); 3897 3898 Ok(()) 3899 } 3900 3901 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 3902 async fn diff_live_nip89_reports_conflicted_when_live_groups_disagree() -> TestResult<()> { 3903 let relay = TestRelay::spawn().await?; 3904 let test_runtime = 3905 MycTestRuntime::new_with_discovery(relay.url(), MycConnectionApproval::ExplicitUser); 3906 let runtime = test_runtime.runtime; 3907 let app_identity = myc::identity_files::load_encrypted_identity( 3908 runtime 3909 .config() 3910 .discovery 3911 .app_identity_path 3912 .as_ref() 3913 .expect("app identity path"), 3914 )?; 3915 3916 let mut first_spec = RadrootsNostrApplicationHandlerSpec::new(vec![24_133]); 3917 first_spec.identifier = Some("myc".to_owned()); 3918 first_spec.relays = vec!["wss://relay-a.example.com".to_owned()]; 3919 publish_handler_event(relay.url(), &app_identity, &first_spec).await?; 3920 3921 let mut second_spec = RadrootsNostrApplicationHandlerSpec::new(vec![24_133]); 3922 second_spec.identifier = Some("myc".to_owned()); 3923 second_spec.relays = vec!["wss://relay-b.example.com".to_owned()]; 3924 publish_handler_event(relay.url(), &app_identity, &second_spec).await?; 3925 3926 relay 3927 .wait_for_published_events_by_author(app_identity.public_key(), 2) 3928 .await?; 3929 3930 let diff = diff_live_nip89(&runtime).await?; 3931 3932 assert_eq!(diff.status, MycDiscoveryLiveStatus::Conflicted); 3933 assert_eq!(diff.differing_fields, vec!["live_groups".to_owned()]); 3934 assert_eq!(diff.live_groups.len(), 2); 3935 3936 Ok(()) 3937 } 3938 3939 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 3940 async fn diff_live_nip89_surfaces_relay_divergence_with_provenance() -> TestResult<()> { 3941 let relay_a = TestRelay::spawn().await?; 3942 let relay_b = TestRelay::spawn().await?; 3943 let test_runtime = MycTestRuntime::new_with_discovery_relays( 3944 &[relay_a.url(), relay_b.url()], 3945 MycConnectionApproval::ExplicitUser, 3946 ); 3947 let runtime = test_runtime.runtime; 3948 let app_identity = myc::identity_files::load_encrypted_identity( 3949 runtime 3950 .config() 3951 .discovery 3952 .app_identity_path 3953 .as_ref() 3954 .expect("app identity path"), 3955 )?; 3956 3957 let matched_event = MycDiscoveryContext::from_runtime(&runtime)? 3958 .build_signed_handler_event() 3959 .expect("matched event"); 3960 publish_signed_event(relay_a.url(), &app_identity, &matched_event).await?; 3961 3962 let mut drifted_spec = RadrootsNostrApplicationHandlerSpec::new(vec![24_133]); 3963 drifted_spec.identifier = Some("myc".to_owned()); 3964 drifted_spec.relays = vec!["wss://stale.example.com".to_owned()]; 3965 let mut drifted_metadata = RadrootsNostrMetadata::default(); 3966 drifted_metadata.name = Some("stale".to_owned()); 3967 drifted_spec.metadata = Some(drifted_metadata); 3968 publish_handler_event(relay_b.url(), &app_identity, &drifted_spec).await?; 3969 3970 relay_a 3971 .wait_for_published_events_by_author(app_identity.public_key(), 1) 3972 .await?; 3973 relay_b 3974 .wait_for_published_events_by_author(app_identity.public_key(), 1) 3975 .await?; 3976 3977 let diff = diff_live_nip89(&runtime).await?; 3978 3979 assert_eq!(diff.status, MycDiscoveryLiveStatus::Conflicted); 3980 assert_eq!(diff.live_groups.len(), 2); 3981 assert_eq!(diff.relay_states.len(), 2); 3982 assert_eq!(diff.relay_summary.total_relays, 2); 3983 assert_eq!( 3984 diff.relay_summary.matched_relays, 3985 vec![relay_a.url().to_owned()] 3986 ); 3987 assert_eq!( 3988 diff.relay_summary.drifted_relays, 3989 vec![relay_b.url().to_owned()] 3990 ); 3991 assert!(diff.relay_summary.unavailable_relays.is_empty()); 3992 assert!(diff.relay_summary.missing_relays.is_empty()); 3993 assert!(diff.relay_summary.conflicted_relays.is_empty()); 3994 3995 let matched_relay = diff 3996 .relay_states 3997 .iter() 3998 .find(|relay_state| relay_state.relay_url == relay_a.url()) 3999 .expect("matched relay"); 4000 assert_eq!( 4001 matched_relay.fetch_status, 4002 MycDiscoveryRelayFetchStatus::Available 4003 ); 4004 assert_eq!( 4005 matched_relay.live_status, 4006 Some(MycDiscoveryLiveStatus::Matched) 4007 ); 4008 assert_eq!(matched_relay.live_groups.len(), 1); 4009 assert_eq!( 4010 matched_relay.live_groups[0].source_relays, 4011 vec![relay_a.url().to_owned()] 4012 ); 4013 4014 let drifted_relay = diff 4015 .relay_states 4016 .iter() 4017 .find(|relay_state| relay_state.relay_url == relay_b.url()) 4018 .expect("drifted relay"); 4019 assert_eq!( 4020 drifted_relay.fetch_status, 4021 MycDiscoveryRelayFetchStatus::Available 4022 ); 4023 assert_eq!( 4024 drifted_relay.live_status, 4025 Some(MycDiscoveryLiveStatus::Drifted) 4026 ); 4027 assert_eq!(drifted_relay.live_groups.len(), 1); 4028 assert_eq!( 4029 drifted_relay.live_groups[0].source_relays, 4030 vec![relay_b.url().to_owned()] 4031 ); 4032 4033 let live_group_relays = diff 4034 .live_groups 4035 .iter() 4036 .map(|group| group.source_relays.clone()) 4037 .collect::<Vec<_>>(); 4038 assert!(live_group_relays.contains(&vec![relay_a.url().to_owned()])); 4039 assert!(live_group_relays.contains(&vec![relay_b.url().to_owned()])); 4040 4041 Ok(()) 4042 } 4043 4044 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 4045 async fn refresh_nip89_requires_force_when_any_discovery_relay_is_unavailable() -> TestResult<()> { 4046 let relay = TestRelay::spawn().await?; 4047 let unavailable_relay = unavailable_relay_url()?; 4048 let test_runtime = MycTestRuntime::new_with_discovery_relays( 4049 &[relay.url(), unavailable_relay.as_str()], 4050 MycConnectionApproval::ExplicitUser, 4051 ); 4052 let runtime = test_runtime.runtime; 4053 let app_identity = myc::identity_files::load_encrypted_identity( 4054 runtime 4055 .config() 4056 .discovery 4057 .app_identity_path 4058 .as_ref() 4059 .expect("app identity path"), 4060 )?; 4061 4062 let diff = diff_live_nip89(&runtime).await?; 4063 assert_eq!(diff.status, MycDiscoveryLiveStatus::Missing); 4064 assert_eq!( 4065 diff.relay_summary.unavailable_relays, 4066 vec![unavailable_relay.clone()] 4067 ); 4068 assert_eq!( 4069 diff.relay_summary.missing_relays, 4070 vec![relay.url().to_owned()] 4071 ); 4072 4073 let unavailable_state = diff 4074 .relay_states 4075 .iter() 4076 .find(|relay_state| relay_state.relay_url == unavailable_relay) 4077 .expect("unavailable relay"); 4078 assert_eq!( 4079 unavailable_state.fetch_status, 4080 MycDiscoveryRelayFetchStatus::Unavailable 4081 ); 4082 assert_eq!(unavailable_state.live_status, None); 4083 assert!(unavailable_state.fetch_error.is_some()); 4084 4085 let error = refresh_nip89(&runtime, false) 4086 .await 4087 .expect_err("refresh without force should fail when a relay is unavailable"); 4088 assert!(error.to_string().contains("unavailable")); 4089 4090 let audit = wait_for_operation_audit_count(&runtime, 4).await?; 4091 assert_eq!( 4092 audit[0].operation, 4093 MycOperationAuditKind::DiscoveryHandlerFetch 4094 ); 4095 assert_eq!(audit[0].outcome, MycOperationAuditOutcome::Unavailable); 4096 assert_eq!( 4097 audit[1].operation, 4098 MycOperationAuditKind::DiscoveryHandlerFetch 4099 ); 4100 assert_eq!(audit[1].outcome, MycOperationAuditOutcome::Unavailable); 4101 assert_eq!( 4102 audit[2].operation, 4103 MycOperationAuditKind::DiscoveryHandlerCompare 4104 ); 4105 assert_eq!(audit[2].outcome, MycOperationAuditOutcome::Missing); 4106 assert_eq!( 4107 audit[3].operation, 4108 MycOperationAuditKind::DiscoveryHandlerRefresh 4109 ); 4110 assert_eq!(audit[3].outcome, MycOperationAuditOutcome::Unavailable); 4111 4112 relay 4113 .queue_publish_outcomes(app_identity.public_key(), &[true]) 4114 .await; 4115 let refreshed = refresh_nip89(&runtime, true).await?; 4116 assert_eq!(refreshed.status, MycDiscoveryLiveStatus::Missing); 4117 assert_eq!( 4118 refreshed.relay_summary.unavailable_relays, 4119 vec![unavailable_relay.clone()] 4120 ); 4121 assert!(refreshed.published.is_some()); 4122 4123 Ok(()) 4124 } 4125 4126 #[tokio::test(flavor = "multi_thread", worker_threads = 4)] 4127 async fn refresh_nip89_requires_force_when_live_handler_is_conflicted() -> TestResult<()> { 4128 let relay = TestRelay::spawn().await?; 4129 let test_runtime = 4130 MycTestRuntime::new_with_discovery(relay.url(), MycConnectionApproval::ExplicitUser); 4131 let runtime = test_runtime.runtime; 4132 let app_identity = myc::identity_files::load_encrypted_identity( 4133 runtime 4134 .config() 4135 .discovery 4136 .app_identity_path 4137 .as_ref() 4138 .expect("app identity path"), 4139 )?; 4140 4141 let mut first_spec = RadrootsNostrApplicationHandlerSpec::new(vec![24_133]); 4142 first_spec.identifier = Some("myc".to_owned()); 4143 first_spec.relays = vec!["wss://relay-a.example.com".to_owned()]; 4144 publish_handler_event(relay.url(), &app_identity, &first_spec).await?; 4145 4146 let mut second_spec = RadrootsNostrApplicationHandlerSpec::new(vec![24_133]); 4147 second_spec.identifier = Some("myc".to_owned()); 4148 second_spec.relays = vec!["wss://relay-b.example.com".to_owned()]; 4149 publish_handler_event(relay.url(), &app_identity, &second_spec).await?; 4150 4151 relay 4152 .wait_for_published_events_by_author(app_identity.public_key(), 2) 4153 .await?; 4154 4155 let error = refresh_nip89(&runtime, false) 4156 .await 4157 .expect_err("conflicted refresh without force should fail"); 4158 assert!( 4159 error 4160 .to_string() 4161 .contains("live discovery handler state is conflicted") 4162 ); 4163 4164 let audit = wait_for_operation_audit_count(&runtime, 2).await?; 4165 assert_eq!( 4166 audit[0].operation, 4167 MycOperationAuditKind::DiscoveryHandlerCompare 4168 ); 4169 assert_eq!(audit[0].outcome, MycOperationAuditOutcome::Conflicted); 4170 assert_eq!( 4171 audit[1].operation, 4172 MycOperationAuditKind::DiscoveryHandlerRefresh 4173 ); 4174 assert_eq!(audit[1].outcome, MycOperationAuditOutcome::Conflicted); 4175 4176 relay 4177 .queue_publish_outcomes(app_identity.public_key(), &[true]) 4178 .await; 4179 let refreshed = refresh_nip89(&runtime, true).await?; 4180 assert_eq!(refreshed.status, MycDiscoveryLiveStatus::Conflicted); 4181 assert_eq!(refreshed.live_groups.len(), 2); 4182 assert!(refreshed.published.is_some()); 4183 4184 Ok(()) 4185 }