myc

Self-custodial remote signer for Radroots apps
git clone https://radroots.dev/git/myc.git
Log | Files | Refs | README | LICENSE

nip46_e2e.rs (146062B)


      1 use std::collections::{HashMap, VecDeque};
      2 use std::net::TcpListener as StdTcpListener;
      3 use std::sync::Arc;
      4 use std::time::Duration;
      5 
      6 use futures_util::{SinkExt, StreamExt};
      7 use myc::control;
      8 use myc::{
      9     MycActiveIdentity, MycConfig, MycConnectionApproval, MycDeliveryOutboxKind,
     10     MycDeliveryOutboxRecord, MycDeliveryOutboxStatus, MycDiscoveryContext, MycDiscoveryLiveStatus,
     11     MycDiscoveryRelayFetchStatus, MycDiscoveryRepairOutcome, MycOperationAuditKind,
     12     MycOperationAuditOutcome, MycOperationAuditRecord, MycRuntime, MycRuntimeAuditBackend,
     13     MycSignerStateBackend, MycTransportDeliveryPolicy, diff_live_nip89, fetch_live_nip89,
     14     publish_nip89_event, refresh_nip89,
     15 };
     16 use nostr::filter::MatchEventOptions;
     17 use nostr::nips::nip44;
     18 use nostr::nips::nip44::Version;
     19 use nostr::nips::nip46::{
     20     NostrConnectMessage as ExternalNostrConnectMessage,
     21     NostrConnectMethod as ExternalNostrConnectMethod,
     22     NostrConnectRequest as ExternalNostrConnectRequest,
     23     NostrConnectResponse as ExternalNostrConnectResponse, ResponseResult as ExternalResponseResult,
     24 };
     25 use nostr::{
     26     ClientMessage, Event, EventBuilder, Filter, JsonUtil, Keys, Kind, PublicKey, RelayMessage,
     27     SecretKey, SubscriptionId, Tag, Timestamp, UnsignedEvent,
     28 };
     29 use radroots_identity::RadrootsIdentity;
     30 use radroots_nostr::prelude::{
     31     RadrootsNostrApplicationHandlerSpec, RadrootsNostrClient, RadrootsNostrEventBuilder,
     32     RadrootsNostrKind, RadrootsNostrMetadata, RadrootsNostrRelayUrl, RadrootsNostrTag,
     33     radroots_nostr_build_application_handler_event,
     34 };
     35 use radroots_nostr_connect::prelude::{
     36     RADROOTS_NOSTR_CONNECT_RPC_KIND, RadrootsNostrConnectClientMetadata,
     37     RadrootsNostrConnectClientUri, RadrootsNostrConnectRequest, RadrootsNostrConnectRequestMessage,
     38     RadrootsNostrConnectResponse, RadrootsNostrConnectResponseEnvelope, RadrootsNostrConnectUri,
     39 };
     40 use radroots_nostr_signer::prelude::{
     41     RadrootsNostrSignerApprovalRequirement, RadrootsNostrSignerAuthState,
     42     RadrootsNostrSignerConnectionDraft,
     43 };
     44 use tempfile::TempDir;
     45 use tokio::net::{TcpListener, TcpStream};
     46 use tokio::sync::{Mutex, Notify, mpsc, oneshot};
     47 use tokio::time::{Instant, sleep, timeout};
     48 use tokio_tungstenite::tungstenite::Message;
     49 
     50 type TestResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>;
     51 
     52 const RELAY_EVENT_TIMEOUT: Duration = Duration::from_secs(15);
     53 const EXTERNAL_RESPONSE_TIMEOUT: Duration = Duration::from_secs(15);
     54 const RUNTIME_STATE_TIMEOUT: Duration = Duration::from_secs(15);
     55 const POLL_INTERVAL: Duration = Duration::from_millis(25);
     56 
     57 #[derive(Clone)]
     58 struct RelaySubscription {
     59     connection_id: usize,
     60     subscription_id: SubscriptionId,
     61     filters: Vec<Filter>,
     62 }
     63 
     64 #[derive(Default)]
     65 struct RelayState {
     66     next_connection_id: usize,
     67     senders: HashMap<usize, mpsc::UnboundedSender<Message>>,
     68     subscriptions: Vec<RelaySubscription>,
     69     published_events: Vec<Event>,
     70     publish_outcomes_by_pubkey: HashMap<String, VecDeque<bool>>,
     71 }
     72 
     73 struct TestRelay {
     74     url: String,
     75     state: Arc<Mutex<RelayState>>,
     76     notify: Arc<Notify>,
     77     shutdown_tx: Option<oneshot::Sender<()>>,
     78 }
     79 
     80 impl TestRelay {
     81     async fn spawn() -> TestResult<Self> {
     82         let listener = TcpListener::bind("127.0.0.1:0").await?;
     83         let addr = listener.local_addr()?;
     84         let url = format!("ws://{addr}");
     85         let state = Arc::new(Mutex::new(RelayState::default()));
     86         let notify = Arc::new(Notify::new());
     87         let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
     88         let relay_state = Arc::clone(&state);
     89         let relay_notify = Arc::clone(&notify);
     90 
     91         tokio::spawn(async move {
     92             loop {
     93                 tokio::select! {
     94                     _ = &mut shutdown_rx => break,
     95                     accept = listener.accept() => {
     96                         let Ok((stream, _)) = accept else {
     97                             break;
     98                         };
     99                         let state = Arc::clone(&relay_state);
    100                         let notify = Arc::clone(&relay_notify);
    101                         tokio::spawn(async move {
    102                             let _ = handle_relay_connection(stream, state, notify).await;
    103                         });
    104                     }
    105                 }
    106             }
    107         });
    108 
    109         Ok(Self {
    110             url,
    111             state,
    112             notify,
    113             shutdown_tx: Some(shutdown_tx),
    114         })
    115     }
    116 
    117     fn url(&self) -> &str {
    118         self.url.as_str()
    119     }
    120 
    121     async fn queue_publish_outcomes(&self, public_key: PublicKey, outcomes: &[bool]) {
    122         let mut state = self.state.lock().await;
    123         state
    124             .publish_outcomes_by_pubkey
    125             .insert(public_key.to_hex(), outcomes.iter().copied().collect());
    126     }
    127 
    128     async fn wait_for_subscription_count(&self, expected: usize) -> TestResult<()> {
    129         timeout(RELAY_EVENT_TIMEOUT, async {
    130             loop {
    131                 if self.state.lock().await.subscriptions.len() >= expected {
    132                     return;
    133                 }
    134                 self.notify.notified().await;
    135             }
    136         })
    137         .await?;
    138         Ok(())
    139     }
    140 
    141     async fn wait_for_published_events_by_author(
    142         &self,
    143         public_key: PublicKey,
    144         expected: usize,
    145     ) -> TestResult<Vec<Event>> {
    146         timeout(RELAY_EVENT_TIMEOUT, async {
    147             loop {
    148                 let events = self.published_events_by_author(public_key).await;
    149                 if events.len() >= expected {
    150                     return events;
    151                 }
    152                 self.notify.notified().await;
    153             }
    154         })
    155         .await
    156         .map_err(Into::into)
    157     }
    158 
    159     async fn published_events_by_author(&self, public_key: PublicKey) -> Vec<Event> {
    160         self.state
    161             .lock()
    162             .await
    163             .published_events
    164             .iter()
    165             .filter(|event| event.pubkey == public_key)
    166             .cloned()
    167             .collect()
    168     }
    169 }
    170 
    171 impl Drop for TestRelay {
    172     fn drop(&mut self) {
    173         if let Some(shutdown_tx) = self.shutdown_tx.take() {
    174             let _ = shutdown_tx.send(());
    175         }
    176     }
    177 }
    178 
    179 struct HangingRelay {
    180     url: String,
    181     shutdown_tx: Option<oneshot::Sender<()>>,
    182 }
    183 
    184 impl HangingRelay {
    185     async fn spawn(hold_open_for: Duration) -> TestResult<Self> {
    186         let listener = TcpListener::bind("127.0.0.1:0").await?;
    187         let addr = listener.local_addr()?;
    188         let url = format!("ws://{addr}");
    189         let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
    190 
    191         tokio::spawn(async move {
    192             loop {
    193                 tokio::select! {
    194                     _ = &mut shutdown_rx => break,
    195                     accept = listener.accept() => {
    196                         let Ok((stream, _)) = accept else {
    197                             break;
    198                         };
    199                         tokio::spawn(async move {
    200                             sleep(hold_open_for).await;
    201                             drop(stream);
    202                         });
    203                     }
    204                 }
    205             }
    206         });
    207 
    208         Ok(Self {
    209             url,
    210             shutdown_tx: Some(shutdown_tx),
    211         })
    212     }
    213 
    214     fn url(&self) -> &str {
    215         self.url.as_str()
    216     }
    217 }
    218 
    219 impl Drop for HangingRelay {
    220     fn drop(&mut self) {
    221         if let Some(shutdown_tx) = self.shutdown_tx.take() {
    222             let _ = shutdown_tx.send(());
    223         }
    224     }
    225 }
    226 
    227 async fn handle_relay_connection(
    228     stream: TcpStream,
    229     state: Arc<Mutex<RelayState>>,
    230     notify: Arc<Notify>,
    231 ) -> TestResult<()> {
    232     let websocket = tokio_tungstenite::accept_async(stream).await?;
    233     let (mut writer, mut reader) = websocket.split();
    234     let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
    235     let connection_id = {
    236         let mut state = state.lock().await;
    237         let connection_id = state.next_connection_id;
    238         state.next_connection_id += 1;
    239         state.senders.insert(connection_id, tx);
    240         notify.notify_waiters();
    241         connection_id
    242     };
    243 
    244     let writer_task = tokio::spawn(async move {
    245         while let Some(message) = rx.recv().await {
    246             if writer.send(message).await.is_err() {
    247                 break;
    248             }
    249         }
    250     });
    251 
    252     while let Some(message) = reader.next().await {
    253         let message = message?;
    254         let Message::Text(text) = message else {
    255             continue;
    256         };
    257         let client_message = ClientMessage::from_json(text.as_str())?;
    258         handle_client_message(connection_id, client_message, &state, &notify).await?;
    259     }
    260 
    261     writer_task.abort();
    262     let mut state = state.lock().await;
    263     state.senders.remove(&connection_id);
    264     state
    265         .subscriptions
    266         .retain(|subscription| subscription.connection_id != connection_id);
    267     notify.notify_waiters();
    268     Ok(())
    269 }
    270 
    271 async fn handle_client_message(
    272     connection_id: usize,
    273     client_message: ClientMessage<'_>,
    274     state: &Arc<Mutex<RelayState>>,
    275     notify: &Arc<Notify>,
    276 ) -> TestResult<()> {
    277     match client_message {
    278         ClientMessage::Req {
    279             subscription_id,
    280             filters,
    281         } => {
    282             let (sender, matching_events) = {
    283                 let mut state = state.lock().await;
    284                 let matching_events = state
    285                     .published_events
    286                     .iter()
    287                     .filter(|event| {
    288                         filters
    289                             .iter()
    290                             .any(|filter| filter.match_event(event, MatchEventOptions::new()))
    291                     })
    292                     .cloned()
    293                     .collect::<Vec<_>>();
    294                 state.subscriptions.push(RelaySubscription {
    295                     connection_id,
    296                     subscription_id: subscription_id.as_ref().clone(),
    297                     filters: filters
    298                         .into_iter()
    299                         .map(|filter| filter.into_owned())
    300                         .collect(),
    301                 });
    302                 notify.notify_waiters();
    303                 (state.senders.get(&connection_id).cloned(), matching_events)
    304             };
    305             if let Some(sender) = sender {
    306                 for event in matching_events {
    307                     let message =
    308                         RelayMessage::event(subscription_id.as_ref().clone(), event).as_json();
    309                     let _ = sender.send(Message::Text(message.into()));
    310                 }
    311                 let eose = RelayMessage::eose(subscription_id.as_ref().clone()).as_json();
    312                 let _ = sender.send(Message::Text(eose.into()));
    313             }
    314         }
    315         ClientMessage::Close(subscription_id) => {
    316             let mut state = state.lock().await;
    317             state.subscriptions.retain(|subscription| {
    318                 subscription.connection_id != connection_id
    319                     || subscription.subscription_id != *subscription_id
    320             });
    321             notify.notify_waiters();
    322         }
    323         ClientMessage::Event(event) => {
    324             let event = event.into_owned();
    325             let (ok_message, subscriber_messages) =
    326                 accept_published_event(connection_id, event, state, notify).await?;
    327             if let Some((sender, message)) = ok_message {
    328                 let _ = sender.send(message);
    329             }
    330             for (sender, message) in subscriber_messages {
    331                 let _ = sender.send(message);
    332             }
    333         }
    334         _ => {}
    335     }
    336 
    337     Ok(())
    338 }
    339 
    340 async fn accept_published_event(
    341     connection_id: usize,
    342     event: Event,
    343     state: &Arc<Mutex<RelayState>>,
    344     notify: &Arc<Notify>,
    345 ) -> TestResult<(
    346     Option<(mpsc::UnboundedSender<Message>, Message)>,
    347     Vec<(mpsc::UnboundedSender<Message>, Message)>,
    348 )> {
    349     let event_id = event.id;
    350     let event_pubkey_hex = event.pubkey.to_hex();
    351     let mut subscriber_messages = Vec::new();
    352     let mut ok_message = None;
    353 
    354     {
    355         let mut state = state.lock().await;
    356         let publish_status = state
    357             .publish_outcomes_by_pubkey
    358             .get_mut(&event_pubkey_hex)
    359             .and_then(|outcomes| outcomes.pop_front())
    360             .unwrap_or(true);
    361 
    362         if let Some(sender) = state.senders.get(&connection_id).cloned() {
    363             let message = if publish_status {
    364                 RelayMessage::ok(event_id, true, "").as_json()
    365             } else {
    366                 RelayMessage::ok(event_id, false, "blocked by test relay").as_json()
    367             };
    368             ok_message = Some((sender, Message::Text(message.into())));
    369         }
    370 
    371         if publish_status {
    372             state.published_events.push(event.clone());
    373             for subscription in &state.subscriptions {
    374                 if subscription
    375                     .filters
    376                     .iter()
    377                     .any(|filter| filter.match_event(&event, MatchEventOptions::new()))
    378                 {
    379                     if let Some(sender) = state.senders.get(&subscription.connection_id).cloned() {
    380                         let message = RelayMessage::event(
    381                             subscription.subscription_id.clone(),
    382                             event.clone(),
    383                         )
    384                         .as_json();
    385                         subscriber_messages.push((sender, Message::Text(message.into())));
    386                     }
    387                 }
    388             }
    389             notify.notify_waiters();
    390         }
    391     }
    392 
    393     Ok((ok_message, subscriber_messages))
    394 }
    395 
    396 struct MycTestRuntime {
    397     _temp: TempDir,
    398     runtime: MycRuntime,
    399 }
    400 
    401 impl MycTestRuntime {
    402     fn new(relay_url: &str, approval: MycConnectionApproval) -> Self {
    403         Self::new_with_transport_relays(&[relay_url], approval)
    404     }
    405 
    406     fn new_with_transport_relays(relay_urls: &[&str], approval: MycConnectionApproval) -> Self {
    407         Self::new_with_transport_config(relay_urls, approval, |_| {})
    408     }
    409 
    410     fn new_with_transport_config<F>(
    411         relay_urls: &[&str],
    412         approval: MycConnectionApproval,
    413         configure: F,
    414     ) -> Self
    415     where
    416         F: FnOnce(&mut MycConfig),
    417     {
    418         let temp = tempfile::tempdir().expect("tempdir");
    419         let mut config = MycConfig::default();
    420         config.paths.state_dir = temp.path().join("state");
    421         config.paths.signer_identity_path = temp.path().join("signer.json");
    422         config.paths.user_identity_path = temp.path().join("user.json");
    423         config.policy.connection_approval = approval;
    424         config.transport.enabled = true;
    425         config.transport.connect_timeout_secs = 1;
    426         config.transport.relays = relay_urls.iter().map(|relay| (*relay).to_owned()).collect();
    427         configure(&mut config);
    428         write_identity(
    429             &config.paths.signer_identity_path,
    430             "1111111111111111111111111111111111111111111111111111111111111111",
    431         );
    432         write_identity(
    433             &config.paths.user_identity_path,
    434             "2222222222222222222222222222222222222222222222222222222222222222",
    435         );
    436 
    437         Self {
    438             runtime: MycRuntime::bootstrap(config).expect("runtime"),
    439             _temp: temp,
    440         }
    441     }
    442 
    443     fn new_with_discovery(relay_url: &str, approval: MycConnectionApproval) -> Self {
    444         Self::new_with_discovery_relays(&[relay_url], approval)
    445     }
    446 
    447     fn new_with_discovery_relays(relay_urls: &[&str], approval: MycConnectionApproval) -> Self {
    448         Self::new_with_discovery_relays_and_timeout(relay_urls, approval, 1)
    449     }
    450 
    451     fn new_with_discovery_relays_and_timeout(
    452         relay_urls: &[&str],
    453         approval: MycConnectionApproval,
    454         connect_timeout_secs: u64,
    455     ) -> Self {
    456         let temp = tempfile::tempdir().expect("tempdir");
    457         let mut config = MycConfig::default();
    458         config.paths.state_dir = temp.path().join("state");
    459         config.paths.signer_identity_path = temp.path().join("signer.json");
    460         config.paths.user_identity_path = temp.path().join("user.json");
    461         config.policy.connection_approval = approval;
    462         config.transport.connect_timeout_secs = connect_timeout_secs;
    463         config.discovery.enabled = true;
    464         config.discovery.domain = Some("signer.example.com".to_owned());
    465         config.discovery.public_relays =
    466             relay_urls.iter().map(|relay| (*relay).to_owned()).collect();
    467         config.discovery.publish_relays =
    468             relay_urls.iter().map(|relay| (*relay).to_owned()).collect();
    469         config.discovery.nostrconnect_url_template =
    470             Some("https://signer.example.com/connect?uri=<nostrconnect>".to_owned());
    471         config.discovery.app_identity_path = Some(temp.path().join("app.json"));
    472         write_identity(
    473             &config.paths.signer_identity_path,
    474             "1111111111111111111111111111111111111111111111111111111111111111",
    475         );
    476         write_identity(
    477             &config.paths.user_identity_path,
    478             "2222222222222222222222222222222222222222222222222222222222222222",
    479         );
    480         write_identity(
    481             config
    482                 .discovery
    483                 .app_identity_path
    484                 .as_ref()
    485                 .expect("app identity path"),
    486             "6666666666666666666666666666666666666666666666666666666666666666",
    487         );
    488 
    489         Self {
    490             runtime: MycRuntime::bootstrap(config).expect("runtime"),
    491             _temp: temp,
    492         }
    493     }
    494 }
    495 
    496 fn write_identity(path: &std::path::Path, secret_key: &str) {
    497     let identity = RadrootsIdentity::from_secret_key_str(secret_key).expect("identity");
    498     myc::identity_files::store_encrypted_identity(path, &identity).expect("save identity");
    499 }
    500 
    501 fn identity(secret_key: &str) -> RadrootsIdentity {
    502     RadrootsIdentity::from_secret_key_str(secret_key).expect("identity")
    503 }
    504 
    505 fn unavailable_relay_url() -> TestResult<String> {
    506     let listener = StdTcpListener::bind("127.0.0.1:0")?;
    507     let addr = listener.local_addr()?;
    508     drop(listener);
    509     Ok(format!("ws://{addr}"))
    510 }
    511 
    512 async fn publish_handler_event(
    513     relay_url: &str,
    514     identity: &RadrootsIdentity,
    515     spec: &RadrootsNostrApplicationHandlerSpec,
    516 ) -> TestResult<Event> {
    517     let event = radroots_nostr_build_application_handler_event(spec)?
    518         .sign_with_keys(identity.keys())
    519         .map_err(|error| format!("failed to sign handler event: {error}"))?;
    520     let client = RadrootsNostrClient::from_identity(identity);
    521     let _ = client.add_relay(relay_url).await?;
    522     client.connect().await;
    523     client.wait_for_connection(Duration::from_secs(1)).await;
    524     let output = client.send_event(&event).await?;
    525     assert!(
    526         !output.success.is_empty(),
    527         "handler event publish did not succeed: {:?}",
    528         output.failed
    529     );
    530     Ok(event)
    531 }
    532 
    533 async fn publish_signed_event(
    534     relay_url: &str,
    535     identity: &RadrootsIdentity,
    536     event: &Event,
    537 ) -> TestResult<()> {
    538     let client = RadrootsNostrClient::from_identity(identity);
    539     let _ = client.add_relay(relay_url).await?;
    540     client.connect().await;
    541     client.wait_for_connection(Duration::from_secs(1)).await;
    542     let output = client.send_event(event).await?;
    543     assert!(
    544         !output.success.is_empty(),
    545         "signed event publish did not succeed: {:?}",
    546         output.failed
    547     );
    548     Ok(())
    549 }
    550 
    551 fn connect_request_message(
    552     request_id: &str,
    553     signer_public_key: PublicKey,
    554     secret: &str,
    555 ) -> RadrootsNostrConnectRequestMessage {
    556     RadrootsNostrConnectRequestMessage::new(
    557         request_id,
    558         RadrootsNostrConnectRequest::Connect {
    559             remote_signer_public_key: signer_public_key,
    560             secret: Some(secret.to_owned()),
    561             requested_permissions: Default::default(),
    562         },
    563     )
    564 }
    565 
    566 fn ping_request_message(request_id: &str) -> RadrootsNostrConnectRequestMessage {
    567     RadrootsNostrConnectRequestMessage::new(request_id, RadrootsNostrConnectRequest::Ping)
    568 }
    569 
    570 fn build_request_event(
    571     client_identity: &RadrootsIdentity,
    572     signer_public_key: PublicKey,
    573     request_message: RadrootsNostrConnectRequestMessage,
    574     created_at_unix: u64,
    575 ) -> Event {
    576     let payload = serde_json::to_string(&request_message).expect("request payload");
    577     let ciphertext = nip44::encrypt(
    578         client_identity.keys().secret_key(),
    579         &signer_public_key,
    580         payload,
    581         Version::V2,
    582     )
    583     .expect("encrypt request");
    584     EventBuilder::new(Kind::Custom(RADROOTS_NOSTR_CONNECT_RPC_KIND), ciphertext)
    585         .tags([Tag::public_key(signer_public_key)])
    586         .custom_created_at(Timestamp::from(created_at_unix))
    587         .sign_with_keys(client_identity.keys())
    588         .expect("sign request event")
    589 }
    590 
    591 fn build_external_request_message(
    592     request_id: &str,
    593     request: &ExternalNostrConnectRequest,
    594 ) -> ExternalNostrConnectMessage {
    595     ExternalNostrConnectMessage::Request {
    596         id: request_id.to_owned(),
    597         method: request.method(),
    598         params: request.params(),
    599     }
    600 }
    601 
    602 fn build_external_request_event(
    603     client_identity: &RadrootsIdentity,
    604     signer_public_key: PublicKey,
    605     request_message: &ExternalNostrConnectMessage,
    606     created_at_unix: u64,
    607 ) -> Event {
    608     let payload = request_message.as_json();
    609     let ciphertext = nip44::encrypt(
    610         client_identity.keys().secret_key(),
    611         &signer_public_key,
    612         payload,
    613         Version::V2,
    614     )
    615     .expect("encrypt external request");
    616     EventBuilder::new(Kind::Custom(RADROOTS_NOSTR_CONNECT_RPC_KIND), ciphertext)
    617         .tags([Tag::public_key(signer_public_key)])
    618         .custom_created_at(Timestamp::from(created_at_unix))
    619         .sign_with_keys(client_identity.keys())
    620         .expect("sign external request event")
    621 }
    622 
    623 fn build_signer_noise_event(signer_identity: &MycActiveIdentity, created_at_unix: u64) -> Event {
    624     signer_identity
    625         .sign_event_builder(
    626             RadrootsNostrEventBuilder::new(
    627                 RadrootsNostrKind::Custom(RADROOTS_NOSTR_CONNECT_RPC_KIND),
    628                 "non-nip44-signer-noise",
    629             )
    630             .custom_created_at(Timestamp::from(created_at_unix)),
    631             "signer noise event",
    632         )
    633         .expect("sign noise event")
    634 }
    635 
    636 fn decrypt_response(
    637     client_identity: &RadrootsIdentity,
    638     signer_public_key: PublicKey,
    639     response_event: &Event,
    640 ) -> RadrootsNostrConnectResponseEnvelope {
    641     let plaintext = nip44::decrypt(
    642         client_identity.keys().secret_key(),
    643         &signer_public_key,
    644         &response_event.content,
    645     )
    646     .expect("decrypt response");
    647     serde_json::from_str(&plaintext).expect("response envelope")
    648 }
    649 
    650 async fn wait_for_external_response(
    651     relay: &TestRelay,
    652     client_identity: &RadrootsIdentity,
    653     signer_public_key: PublicKey,
    654     request_id: &str,
    655     method: ExternalNostrConnectMethod,
    656 ) -> TestResult<(Event, ExternalNostrConnectResponse)> {
    657     timeout(EXTERNAL_RESPONSE_TIMEOUT, async {
    658         loop {
    659             let events = relay.published_events_by_author(signer_public_key).await;
    660             for event in events {
    661                 let Ok(plaintext) = nip44::decrypt(
    662                     client_identity.keys().secret_key(),
    663                     &signer_public_key,
    664                     &event.content,
    665                 ) else {
    666                     continue;
    667                 };
    668                 let Ok(message) = ExternalNostrConnectMessage::from_json(&plaintext) else {
    669                     continue;
    670                 };
    671                 if message.id() != request_id {
    672                     continue;
    673                 }
    674                 let response = message.to_response(method)?;
    675                 return Ok((event, response));
    676             }
    677             sleep(POLL_INTERVAL).await;
    678         }
    679     })
    680     .await?
    681 }
    682 
    683 async fn publish_external_request_and_wait_for_response(
    684     relay: &TestRelay,
    685     client_identity: &RadrootsIdentity,
    686     signer_public_key: PublicKey,
    687     request_id: &str,
    688     request: ExternalNostrConnectRequest,
    689     created_at_unix: u64,
    690 ) -> TestResult<(Event, ExternalNostrConnectResponse)> {
    691     let method = request.method();
    692     let request_message = build_external_request_message(request_id, &request);
    693     let event = build_external_request_event(
    694         client_identity,
    695         signer_public_key,
    696         &request_message,
    697         created_at_unix,
    698     );
    699     publish_event(relay.url(), &event).await?;
    700     wait_for_external_response(
    701         relay,
    702         client_identity,
    703         signer_public_key,
    704         request_id,
    705         method,
    706     )
    707     .await
    708 }
    709 
    710 fn register_external_client_session(
    711     runtime: &MycRuntime,
    712     client_public_key: PublicKey,
    713     relay_url: &str,
    714     permissions: &str,
    715 ) -> TestResult<()> {
    716     let manager = runtime.signer_manager()?;
    717     let requested_permissions: radroots_nostr_connect::prelude::RadrootsNostrConnectPermissions =
    718         if permissions.trim().is_empty() {
    719             Default::default()
    720         } else {
    721             permissions.parse()?
    722         };
    723     let connection = manager.register_connection(
    724         RadrootsNostrSignerConnectionDraft::new(client_public_key, runtime.user_public_identity())
    725             .with_requested_permissions(requested_permissions.clone())
    726             .with_relays(vec![relay_url.parse()?])
    727             .with_approval_requirement(RadrootsNostrSignerApprovalRequirement::NotRequired),
    728     )?;
    729     let _ = manager.set_granted_permissions(&connection.connection_id, requested_permissions)?;
    730     Ok(())
    731 }
    732 
    733 async fn publish_event(relay_url: &str, event: &Event) -> TestResult<()> {
    734     let (mut websocket, _) = tokio_tungstenite::connect_async(relay_url).await?;
    735     websocket
    736         .send(Message::Text(
    737             ClientMessage::event(event.clone()).as_json().into(),
    738         ))
    739         .await?;
    740 
    741     while let Some(message) = websocket.next().await {
    742         let message = message?;
    743         let Message::Text(text) = message else {
    744             continue;
    745         };
    746         let relay_message = RelayMessage::from_json(text.as_str())?;
    747         if let RelayMessage::Ok {
    748             event_id,
    749             status,
    750             message,
    751         } = relay_message
    752         {
    753             assert_eq!(event_id, event.id);
    754             assert!(status, "client publish rejected: {message}");
    755             return Ok(());
    756         }
    757     }
    758 
    759     Err("relay connection closed before OK".into())
    760 }
    761 
    762 async fn wait_for_connection_count(runtime: &MycRuntime, expected: usize) -> TestResult<()> {
    763     timeout(RUNTIME_STATE_TIMEOUT, async {
    764         loop {
    765             if runtime
    766                 .signer_manager()
    767                 .expect("manager")
    768                 .list_connections()
    769                 .expect("connections")
    770                 .len()
    771                 >= expected
    772             {
    773                 return;
    774             }
    775             sleep(POLL_INTERVAL).await;
    776         }
    777     })
    778     .await?;
    779     Ok(())
    780 }
    781 
    782 async fn wait_for_connect_secret_consumed(runtime: &MycRuntime) -> TestResult<()> {
    783     timeout(RUNTIME_STATE_TIMEOUT, async {
    784         loop {
    785             let consumed = runtime
    786                 .signer_manager()
    787                 .expect("manager")
    788                 .list_connections()
    789                 .expect("connections")
    790                 .into_iter()
    791                 .any(|connection| connection.connect_secret_is_consumed());
    792             if consumed {
    793                 return;
    794             }
    795             sleep(POLL_INTERVAL).await;
    796         }
    797     })
    798     .await?;
    799     Ok(())
    800 }
    801 
    802 async fn wait_for_operation_audit_count(
    803     runtime: &MycRuntime,
    804     expected: usize,
    805 ) -> TestResult<Vec<MycOperationAuditRecord>> {
    806     timeout(RUNTIME_STATE_TIMEOUT, async {
    807         loop {
    808             let records = runtime
    809                 .operation_audit_store()
    810                 .list()
    811                 .expect("operation audit");
    812             if records.len() >= expected {
    813                 return records;
    814             }
    815             sleep(POLL_INTERVAL).await;
    816         }
    817     })
    818     .await
    819     .map_err(Into::into)
    820 }
    821 
    822 async fn wait_for_delivery_outbox_records<F>(
    823     runtime: &MycRuntime,
    824     predicate: F,
    825 ) -> TestResult<Vec<MycDeliveryOutboxRecord>>
    826 where
    827     F: Fn(&[MycDeliveryOutboxRecord]) -> bool,
    828 {
    829     timeout(RUNTIME_STATE_TIMEOUT, async {
    830         loop {
    831             let records = runtime
    832                 .delivery_outbox_store()
    833                 .list_all()
    834                 .expect("delivery outbox");
    835             if predicate(&records) {
    836                 return records;
    837             }
    838             sleep(POLL_INTERVAL).await;
    839         }
    840     })
    841     .await
    842     .map_err(Into::into)
    843 }
    844 
    845 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
    846 async fn live_listener_rejects_denied_clients_without_registering_connection() -> TestResult<()> {
    847     let relay = TestRelay::spawn().await?;
    848     let client_identity =
    849         identity("7777777777777777777777777777777777777777777777777777777777777777");
    850     let test_runtime = MycTestRuntime::new_with_transport_config(
    851         &[relay.url()],
    852         MycConnectionApproval::ExplicitUser,
    853         |config| {
    854             config.policy.denied_client_pubkeys = vec![client_identity.public_key().to_hex()];
    855         },
    856     );
    857     let runtime = test_runtime.runtime.clone();
    858     let signer_public_key = runtime.signer_identity().public_key();
    859 
    860     let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
    861     let service_runtime = runtime.clone();
    862     let listener_task = tokio::spawn(async move {
    863         service_runtime
    864             .run_until(async {
    865                 let _ = shutdown_rx.await;
    866             })
    867             .await
    868     });
    869 
    870     relay.wait_for_subscription_count(1).await?;
    871 
    872     let request_event = build_request_event(
    873         &client_identity,
    874         signer_public_key,
    875         connect_request_message("denied-connect", signer_public_key, "denied-secret"),
    876         Timestamp::now().as_secs(),
    877     );
    878     publish_event(relay.url(), &request_event).await?;
    879 
    880     let response_events = relay
    881         .wait_for_published_events_by_author(signer_public_key, 1)
    882         .await?;
    883     let response = decrypt_response(&client_identity, signer_public_key, &response_events[0]);
    884     assert_eq!(response.id, "denied-connect");
    885     let parsed = radroots_nostr_connect::prelude::RadrootsNostrConnectResponse::from_envelope(
    886         &RadrootsNostrConnectRequest::Connect {
    887             remote_signer_public_key: signer_public_key,
    888             secret: Some("denied-secret".to_owned()),
    889             requested_permissions: Default::default(),
    890         }
    891         .method(),
    892         response,
    893     )?;
    894     assert_eq!(
    895         parsed,
    896         radroots_nostr_connect::prelude::RadrootsNostrConnectResponse::Error {
    897             result: None,
    898             error: "client public key denied by policy".to_owned(),
    899         }
    900     );
    901     assert!(runtime.signer_manager()?.list_connections()?.is_empty());
    902 
    903     let _ = shutdown_tx.send(());
    904     listener_task.await??;
    905     Ok(())
    906 }
    907 
    908 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
    909 async fn external_nostr_client_compatibility_covers_connect_and_base_methods() -> TestResult<()> {
    910     let relay = TestRelay::spawn().await?;
    911     let test_runtime = MycTestRuntime::new(relay.url(), MycConnectionApproval::NotRequired);
    912     let runtime = test_runtime.runtime.clone();
    913     let signer_public_key = runtime.signer_identity().public_key();
    914     let user_public_key = runtime.user_identity().public_key();
    915     let client_identity =
    916         identity("3333333333333333333333333333333333333333333333333333333333333333");
    917     let base_created_at = Timestamp::now().as_secs();
    918 
    919     let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
    920     let service_runtime = runtime.clone();
    921     let listener_task = tokio::spawn(async move {
    922         service_runtime
    923             .run_until(async {
    924                 let _ = shutdown_rx.await;
    925             })
    926             .await
    927     });
    928 
    929     relay.wait_for_subscription_count(1).await?;
    930 
    931     let (_, connect_response) = publish_external_request_and_wait_for_response(
    932         &relay,
    933         &client_identity,
    934         signer_public_key,
    935         "external-connect",
    936         ExternalNostrConnectRequest::Connect {
    937             remote_signer_public_key: signer_public_key,
    938             secret: None,
    939         },
    940         base_created_at,
    941     )
    942     .await?;
    943     assert_eq!(connect_response.result, Some(ExternalResponseResult::Ack));
    944     assert_eq!(connect_response.error, None);
    945 
    946     wait_for_connection_count(&runtime, 1).await?;
    947 
    948     let (_, get_public_key_response) = publish_external_request_and_wait_for_response(
    949         &relay,
    950         &client_identity,
    951         signer_public_key,
    952         "external-get-public-key",
    953         ExternalNostrConnectRequest::GetPublicKey,
    954         base_created_at + 1,
    955     )
    956     .await?;
    957     assert_eq!(
    958         get_public_key_response.result,
    959         Some(ExternalResponseResult::GetPublicKey(user_public_key))
    960     );
    961     assert_eq!(get_public_key_response.error, None);
    962 
    963     let (_, ping_response) = publish_external_request_and_wait_for_response(
    964         &relay,
    965         &client_identity,
    966         signer_public_key,
    967         "external-ping",
    968         ExternalNostrConnectRequest::Ping,
    969         base_created_at + 2,
    970     )
    971     .await?;
    972     assert_eq!(ping_response.result, Some(ExternalResponseResult::Pong));
    973     assert_eq!(ping_response.error, None);
    974 
    975     let _ = shutdown_tx.send(());
    976     listener_task.await??;
    977     Ok(())
    978 }
    979 
    980 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
    981 async fn external_nostr_client_compatibility_covers_signed_and_crypto_methods() -> TestResult<()> {
    982     let relay = TestRelay::spawn().await?;
    983     let test_runtime = MycTestRuntime::new(relay.url(), MycConnectionApproval::NotRequired);
    984     let runtime = test_runtime.runtime.clone();
    985     let signer_public_key = runtime.signer_identity().public_key();
    986     let user_public_key = runtime.user_identity().public_key();
    987     let client_identity =
    988         identity("3333333333333333333333333333333333333333333333333333333333333333");
    989     let peer_identity =
    990         identity("4444444444444444444444444444444444444444444444444444444444444444");
    991     let base_created_at = Timestamp::now().as_secs();
    992 
    993     register_external_client_session(
    994         &runtime,
    995         client_identity.public_key(),
    996         relay.url(),
    997         "sign_event:1,nip04_encrypt,nip04_decrypt,nip44_encrypt,nip44_decrypt",
    998     )?;
    999 
   1000     let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
   1001     let service_runtime = runtime.clone();
   1002     let listener_task = tokio::spawn(async move {
   1003         service_runtime
   1004             .run_until(async {
   1005                 let _ = shutdown_rx.await;
   1006             })
   1007             .await
   1008     });
   1009 
   1010     relay.wait_for_subscription_count(1).await?;
   1011 
   1012     let unsigned_event: UnsignedEvent = serde_json::from_value(serde_json::json!({
   1013         "pubkey": user_public_key.to_hex(),
   1014         "created_at": base_created_at,
   1015         "kind": 1,
   1016         "tags": [],
   1017         "content": "hello from an external nostr client"
   1018     }))?;
   1019     let (_, sign_event_response) = publish_external_request_and_wait_for_response(
   1020         &relay,
   1021         &client_identity,
   1022         signer_public_key,
   1023         "external-sign-event",
   1024         ExternalNostrConnectRequest::SignEvent(unsigned_event.clone()),
   1025         base_created_at,
   1026     )
   1027     .await?;
   1028     let signed_event = sign_event_response
   1029         .result
   1030         .expect("sign_event result")
   1031         .to_sign_event()?;
   1032     assert_eq!(signed_event.pubkey, user_public_key);
   1033     assert_eq!(signed_event.kind, unsigned_event.kind);
   1034     assert_eq!(signed_event.content, unsigned_event.content);
   1035     signed_event.verify()?;
   1036 
   1037     let (_, nip04_encrypt_response) = publish_external_request_and_wait_for_response(
   1038         &relay,
   1039         &client_identity,
   1040         signer_public_key,
   1041         "external-nip04-encrypt",
   1042         ExternalNostrConnectRequest::Nip04Encrypt {
   1043             public_key: peer_identity.public_key(),
   1044             text: "hello via nip04".to_owned(),
   1045         },
   1046         base_created_at + 1,
   1047     )
   1048     .await?;
   1049     let nip04_ciphertext = nip04_encrypt_response
   1050         .result
   1051         .expect("nip04 encrypt result")
   1052         .to_nip04_encrypt()?;
   1053     let nip04_plaintext = nostr::nips::nip04::decrypt(
   1054         peer_identity.keys().secret_key(),
   1055         &user_public_key,
   1056         nip04_ciphertext.clone(),
   1057     )?;
   1058     assert_eq!(nip04_plaintext, "hello via nip04");
   1059 
   1060     let nip04_reply_ciphertext = nostr::nips::nip04::encrypt(
   1061         peer_identity.keys().secret_key(),
   1062         &user_public_key,
   1063         "reply via nip04".to_owned(),
   1064     )?;
   1065     let (_, nip04_decrypt_response) = publish_external_request_and_wait_for_response(
   1066         &relay,
   1067         &client_identity,
   1068         signer_public_key,
   1069         "external-nip04-decrypt",
   1070         ExternalNostrConnectRequest::Nip04Decrypt {
   1071             public_key: peer_identity.public_key(),
   1072             ciphertext: nip04_reply_ciphertext,
   1073         },
   1074         base_created_at + 2,
   1075     )
   1076     .await?;
   1077     assert_eq!(
   1078         nip04_decrypt_response
   1079             .result
   1080             .expect("nip04 decrypt result")
   1081             .to_nip04_decrypt()?,
   1082         "reply via nip04"
   1083     );
   1084 
   1085     let (_, nip44_encrypt_response) = publish_external_request_and_wait_for_response(
   1086         &relay,
   1087         &client_identity,
   1088         signer_public_key,
   1089         "external-nip44-encrypt",
   1090         ExternalNostrConnectRequest::Nip44Encrypt {
   1091             public_key: peer_identity.public_key(),
   1092             text: "hello via nip44".to_owned(),
   1093         },
   1094         base_created_at + 3,
   1095     )
   1096     .await?;
   1097     let nip44_ciphertext = nip44_encrypt_response
   1098         .result
   1099         .expect("nip44 encrypt result")
   1100         .to_nip44_encrypt()?;
   1101     let nip44_plaintext = nip44::decrypt(
   1102         peer_identity.keys().secret_key(),
   1103         &user_public_key,
   1104         &nip44_ciphertext,
   1105     )?;
   1106     assert_eq!(nip44_plaintext, "hello via nip44");
   1107 
   1108     let nip44_reply_ciphertext = nip44::encrypt(
   1109         peer_identity.keys().secret_key(),
   1110         &user_public_key,
   1111         "reply via nip44".to_owned(),
   1112         Version::V2,
   1113     )?;
   1114     let (_, nip44_decrypt_response) = publish_external_request_and_wait_for_response(
   1115         &relay,
   1116         &client_identity,
   1117         signer_public_key,
   1118         "external-nip44-decrypt",
   1119         ExternalNostrConnectRequest::Nip44Decrypt {
   1120             public_key: peer_identity.public_key(),
   1121             ciphertext: nip44_reply_ciphertext,
   1122         },
   1123         base_created_at + 4,
   1124     )
   1125     .await?;
   1126     assert_eq!(
   1127         nip44_decrypt_response
   1128             .result
   1129             .expect("nip44 decrypt result")
   1130             .to_nip44_decrypt()?,
   1131         "reply via nip44"
   1132     );
   1133 
   1134     let _ = shutdown_tx.send(());
   1135     listener_task.await??;
   1136     Ok(())
   1137 }
   1138 
   1139 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   1140 async fn external_nostr_client_surfaces_pending_approval_state() -> TestResult<()> {
   1141     let relay = TestRelay::spawn().await?;
   1142     let test_runtime = MycTestRuntime::new(relay.url(), MycConnectionApproval::ExplicitUser);
   1143     let runtime = test_runtime.runtime.clone();
   1144     let signer_public_key = runtime.signer_identity().public_key();
   1145     let client_identity =
   1146         identity("8888888888888888888888888888888888888888888888888888888888888888");
   1147     let base_created_at = Timestamp::now().as_secs();
   1148 
   1149     let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
   1150     let service_runtime = runtime.clone();
   1151     let listener_task = tokio::spawn(async move {
   1152         service_runtime
   1153             .run_until(async {
   1154                 let _ = shutdown_rx.await;
   1155             })
   1156             .await
   1157     });
   1158 
   1159     relay.wait_for_subscription_count(1).await?;
   1160 
   1161     let (_, connect_response) = publish_external_request_and_wait_for_response(
   1162         &relay,
   1163         &client_identity,
   1164         signer_public_key,
   1165         "external-explicit-connect",
   1166         ExternalNostrConnectRequest::Connect {
   1167             remote_signer_public_key: signer_public_key,
   1168             secret: None,
   1169         },
   1170         base_created_at,
   1171     )
   1172     .await?;
   1173     assert_eq!(connect_response.result, Some(ExternalResponseResult::Ack));
   1174 
   1175     wait_for_connection_count(&runtime, 1).await?;
   1176 
   1177     let (_, pending_response) = publish_external_request_and_wait_for_response(
   1178         &relay,
   1179         &client_identity,
   1180         signer_public_key,
   1181         "external-pending-get-public-key",
   1182         ExternalNostrConnectRequest::GetPublicKey,
   1183         base_created_at + 1,
   1184     )
   1185     .await?;
   1186     assert_eq!(pending_response.result, None);
   1187     assert_eq!(
   1188         pending_response.error.as_deref(),
   1189         Some("connection is pending")
   1190     );
   1191 
   1192     let _ = shutdown_tx.send(());
   1193     listener_task.await??;
   1194     Ok(())
   1195 }
   1196 
   1197 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   1198 async fn external_nostr_client_surfaces_auth_challenge_state() -> TestResult<()> {
   1199     let relay = TestRelay::spawn().await?;
   1200     let client_identity =
   1201         identity("8989898989898989898989898989898989898989898989898989898989898989");
   1202     let test_runtime = MycTestRuntime::new(relay.url(), MycConnectionApproval::NotRequired);
   1203     let runtime = test_runtime.runtime.clone();
   1204     let signer_public_key = runtime.signer_identity().public_key();
   1205     let base_created_at = Timestamp::now().as_secs();
   1206 
   1207     register_external_client_session(&runtime, client_identity.public_key(), relay.url(), "")?;
   1208     let connection_id = runtime
   1209         .signer_manager()?
   1210         .list_connections()?
   1211         .into_iter()
   1212         .find(|connection| connection.client_public_key == client_identity.public_key())
   1213         .expect("active connection")
   1214         .connection_id;
   1215     let _ = runtime
   1216         .signer_manager()?
   1217         .require_auth_challenge(&connection_id, "https://auth.example/challenge")?;
   1218 
   1219     let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
   1220     let service_runtime = runtime.clone();
   1221     let listener_task = tokio::spawn(async move {
   1222         service_runtime
   1223             .run_until(async {
   1224                 let _ = shutdown_rx.await;
   1225             })
   1226             .await
   1227     });
   1228 
   1229     relay.wait_for_subscription_count(1).await?;
   1230 
   1231     let (_, connect_response) = publish_external_request_and_wait_for_response(
   1232         &relay,
   1233         &client_identity,
   1234         signer_public_key,
   1235         "external-auth-ping",
   1236         ExternalNostrConnectRequest::Ping,
   1237         base_created_at,
   1238     )
   1239     .await?;
   1240     assert_eq!(
   1241         connect_response.result,
   1242         Some(ExternalResponseResult::AuthUrl)
   1243     );
   1244     assert_eq!(
   1245         connect_response.error.as_deref(),
   1246         Some("https://auth.example/challenge")
   1247     );
   1248 
   1249     let _ = shutdown_tx.send(());
   1250     listener_task.await??;
   1251     Ok(())
   1252 }
   1253 
   1254 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   1255 async fn external_nostr_client_ignores_unrelated_signer_events_before_response() -> TestResult<()> {
   1256     let relay = TestRelay::spawn().await?;
   1257     let test_runtime = MycTestRuntime::new(relay.url(), MycConnectionApproval::NotRequired);
   1258     let runtime = test_runtime.runtime.clone();
   1259     let signer_identity = runtime.signer_identity();
   1260     let signer_public_key = signer_identity.public_key();
   1261     let client_identity =
   1262         identity("5656565656565656565656565656565656565656565656565656565656565656");
   1263     let base_created_at = Timestamp::now().as_secs();
   1264 
   1265     register_external_client_session(&runtime, client_identity.public_key(), relay.url(), "")?;
   1266 
   1267     let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
   1268     let service_runtime = runtime.clone();
   1269     let listener_task = tokio::spawn(async move {
   1270         service_runtime
   1271             .run_until(async {
   1272                 let _ = shutdown_rx.await;
   1273             })
   1274             .await
   1275     });
   1276 
   1277     relay.wait_for_subscription_count(1).await?;
   1278 
   1279     let noise_event = build_signer_noise_event(&signer_identity, base_created_at);
   1280     publish_event(relay.url(), &noise_event).await?;
   1281 
   1282     let (_, ping_response) = publish_external_request_and_wait_for_response(
   1283         &relay,
   1284         &client_identity,
   1285         signer_public_key,
   1286         "external-noise-ping",
   1287         ExternalNostrConnectRequest::Ping,
   1288         base_created_at + 1,
   1289     )
   1290     .await?;
   1291     assert_eq!(ping_response.result, Some(ExternalResponseResult::Pong));
   1292     assert_eq!(ping_response.error, None);
   1293 
   1294     let _ = shutdown_tx.send(());
   1295     listener_task.await??;
   1296     Ok(())
   1297 }
   1298 
   1299 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   1300 async fn live_listener_consumes_connect_secret_only_after_successful_publish() -> TestResult<()> {
   1301     let relay = TestRelay::spawn().await?;
   1302     let test_runtime = MycTestRuntime::new(relay.url(), MycConnectionApproval::NotRequired);
   1303     let runtime = test_runtime.runtime.clone();
   1304     let signer_public_key = runtime.signer_identity().public_key();
   1305     let client_identity =
   1306         identity("3333333333333333333333333333333333333333333333333333333333333333");
   1307     let base_created_at = Timestamp::now().as_secs();
   1308 
   1309     relay
   1310         .queue_publish_outcomes(signer_public_key, &[false, true])
   1311         .await;
   1312 
   1313     let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
   1314     let service_runtime = runtime.clone();
   1315     let listener_task = tokio::spawn(async move {
   1316         service_runtime
   1317             .run_until(async {
   1318                 let _ = shutdown_rx.await;
   1319             })
   1320             .await
   1321     });
   1322 
   1323     relay.wait_for_subscription_count(1).await?;
   1324 
   1325     let request_one = build_request_event(
   1326         &client_identity,
   1327         signer_public_key,
   1328         connect_request_message("connect-1", signer_public_key, "shared-secret"),
   1329         base_created_at,
   1330     );
   1331     publish_event(relay.url(), &request_one).await?;
   1332     wait_for_connection_count(&runtime, 1).await?;
   1333     sleep(Duration::from_millis(100)).await;
   1334 
   1335     assert!(
   1336         relay
   1337             .published_events_by_author(signer_public_key)
   1338             .await
   1339             .is_empty()
   1340     );
   1341     let initial_connection = runtime
   1342         .signer_manager()?
   1343         .list_connections()?
   1344         .into_iter()
   1345         .next()
   1346         .expect("stored connection");
   1347     assert!(!initial_connection.connect_secret_is_consumed());
   1348     let operation_audit = wait_for_operation_audit_count(&runtime, 1).await?;
   1349     assert_eq!(operation_audit.len(), 1);
   1350     assert_eq!(
   1351         operation_audit[0].operation,
   1352         MycOperationAuditKind::ListenerResponsePublish
   1353     );
   1354     assert_eq!(
   1355         operation_audit[0].outcome,
   1356         MycOperationAuditOutcome::Rejected
   1357     );
   1358     assert_eq!(
   1359         operation_audit[0].connection_id.as_deref(),
   1360         Some(initial_connection.connection_id.as_str())
   1361     );
   1362     assert_eq!(operation_audit[0].request_id.as_deref(), Some("connect-1"));
   1363     assert_eq!(operation_audit[0].relay_count, 1);
   1364     assert_eq!(operation_audit[0].acknowledged_relay_count, 0);
   1365     assert!(
   1366         operation_audit[0]
   1367             .relay_outcome_summary
   1368             .contains("blocked by test relay")
   1369     );
   1370     let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| {
   1371         records.len() >= 1 && records[0].status == MycDeliveryOutboxStatus::Failed
   1372     })
   1373     .await?;
   1374     assert_eq!(
   1375         outbox_records[0].kind,
   1376         MycDeliveryOutboxKind::ListenerResponsePublish
   1377     );
   1378     assert_eq!(outbox_records[0].status, MycDeliveryOutboxStatus::Failed);
   1379     assert_eq!(
   1380         outbox_records[0]
   1381             .connection_id
   1382             .as_ref()
   1383             .map(|value| value.as_str()),
   1384         Some(initial_connection.connection_id.as_str())
   1385     );
   1386     assert_eq!(outbox_records[0].request_id.as_deref(), Some("connect-1"));
   1387     assert!(outbox_records[0].signer_publish_workflow_id.is_some());
   1388     assert!(
   1389         runtime
   1390             .signer_manager()?
   1391             .list_publish_workflows()?
   1392             .is_empty()
   1393     );
   1394 
   1395     let request_two = build_request_event(
   1396         &client_identity,
   1397         signer_public_key,
   1398         connect_request_message("connect-2", signer_public_key, "shared-secret"),
   1399         base_created_at + 1,
   1400     );
   1401     publish_event(relay.url(), &request_two).await?;
   1402 
   1403     let response_events = relay
   1404         .wait_for_published_events_by_author(signer_public_key, 1)
   1405         .await?;
   1406     let response = decrypt_response(&client_identity, signer_public_key, &response_events[0]);
   1407     assert_eq!(response.id, "connect-2");
   1408     assert_eq!(
   1409         response.result,
   1410         Some(serde_json::Value::String("shared-secret".to_owned()))
   1411     );
   1412 
   1413     wait_for_connect_secret_consumed(&runtime).await?;
   1414     let consumed_connection = runtime
   1415         .signer_manager()?
   1416         .list_connections()?
   1417         .into_iter()
   1418         .next()
   1419         .expect("stored connection");
   1420     assert!(consumed_connection.connect_secret_is_consumed());
   1421     let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| {
   1422         records.len() >= 2 && records[1].status == MycDeliveryOutboxStatus::Finalized
   1423     })
   1424     .await?;
   1425     assert_eq!(
   1426         outbox_records[1].kind,
   1427         MycDeliveryOutboxKind::ListenerResponsePublish
   1428     );
   1429     assert_eq!(outbox_records[1].status, MycDeliveryOutboxStatus::Finalized);
   1430     assert_eq!(outbox_records[1].request_id.as_deref(), Some("connect-2"));
   1431     assert!(outbox_records[1].published_at_unix.is_some());
   1432     assert!(outbox_records[1].finalized_at_unix.is_some());
   1433     assert!(outbox_records[1].signer_publish_workflow_id.is_some());
   1434     assert!(
   1435         runtime
   1436             .signer_manager()?
   1437             .list_publish_workflows()?
   1438             .is_empty()
   1439     );
   1440 
   1441     let request_three = build_request_event(
   1442         &client_identity,
   1443         signer_public_key,
   1444         connect_request_message("connect-3", signer_public_key, "shared-secret"),
   1445         base_created_at + 2,
   1446     );
   1447     publish_event(relay.url(), &request_three).await?;
   1448     sleep(Duration::from_millis(300)).await;
   1449 
   1450     assert_eq!(
   1451         relay
   1452             .published_events_by_author(signer_public_key)
   1453             .await
   1454             .len(),
   1455         1
   1456     );
   1457     let operation_audit = runtime.operation_audit_store().list()?;
   1458     assert_eq!(operation_audit.len(), 2);
   1459     assert_eq!(
   1460         operation_audit[1].operation,
   1461         MycOperationAuditKind::ListenerResponsePublish
   1462     );
   1463     assert_eq!(
   1464         operation_audit[1].outcome,
   1465         MycOperationAuditOutcome::Succeeded
   1466     );
   1467     assert_eq!(operation_audit[1].request_id.as_deref(), Some("connect-2"));
   1468 
   1469     let _ = shutdown_tx.send(());
   1470     listener_task.await??;
   1471     Ok(())
   1472 }
   1473 
   1474 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   1475 async fn live_listener_works_with_sqlite_signer_state_and_runtime_audit() -> TestResult<()> {
   1476     let relay = TestRelay::spawn().await?;
   1477     let test_runtime = MycTestRuntime::new_with_transport_config(
   1478         &[relay.url()],
   1479         MycConnectionApproval::NotRequired,
   1480         |config| {
   1481             config.persistence.signer_state_backend = MycSignerStateBackend::Sqlite;
   1482             config.persistence.runtime_audit_backend = MycRuntimeAuditBackend::Sqlite;
   1483         },
   1484     );
   1485     let runtime = test_runtime.runtime.clone();
   1486     let signer_public_key = runtime.signer_identity().public_key();
   1487     let client_identity =
   1488         identity("5353535353535353535353535353535353535353535353535353535353535353");
   1489     let base_created_at = Timestamp::now().as_secs();
   1490 
   1491     assert_eq!(
   1492         runtime
   1493             .paths()
   1494             .signer_state_path
   1495             .file_name()
   1496             .and_then(|name| name.to_str()),
   1497         Some("signer-state.sqlite")
   1498     );
   1499     assert_eq!(
   1500         runtime
   1501             .paths()
   1502             .runtime_audit_path
   1503             .file_name()
   1504             .and_then(|name| name.to_str()),
   1505         Some("operations.sqlite")
   1506     );
   1507 
   1508     relay
   1509         .queue_publish_outcomes(signer_public_key, &[false, true])
   1510         .await;
   1511 
   1512     let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
   1513     let service_runtime = runtime.clone();
   1514     let listener_task = tokio::spawn(async move {
   1515         service_runtime
   1516             .run_until(async {
   1517                 let _ = shutdown_rx.await;
   1518             })
   1519             .await
   1520     });
   1521 
   1522     relay.wait_for_subscription_count(1).await?;
   1523 
   1524     let request_one = build_request_event(
   1525         &client_identity,
   1526         signer_public_key,
   1527         connect_request_message("sqlite-connect-1", signer_public_key, "sqlite-secret"),
   1528         base_created_at,
   1529     );
   1530     publish_event(relay.url(), &request_one).await?;
   1531     wait_for_connection_count(&runtime, 1).await?;
   1532     sleep(Duration::from_millis(100)).await;
   1533 
   1534     assert!(
   1535         relay
   1536             .published_events_by_author(signer_public_key)
   1537             .await
   1538             .is_empty()
   1539     );
   1540     let initial_connection = runtime
   1541         .signer_manager()?
   1542         .list_connections()?
   1543         .into_iter()
   1544         .next()
   1545         .expect("stored connection");
   1546     assert!(!initial_connection.connect_secret_is_consumed());
   1547 
   1548     let request_two = build_request_event(
   1549         &client_identity,
   1550         signer_public_key,
   1551         connect_request_message("sqlite-connect-2", signer_public_key, "sqlite-secret"),
   1552         base_created_at + 1,
   1553     );
   1554     publish_event(relay.url(), &request_two).await?;
   1555 
   1556     let response_events = relay
   1557         .wait_for_published_events_by_author(signer_public_key, 1)
   1558         .await?;
   1559     let response = decrypt_response(&client_identity, signer_public_key, &response_events[0]);
   1560     assert_eq!(response.id, "sqlite-connect-2");
   1561     assert_eq!(
   1562         response.result,
   1563         Some(serde_json::Value::String("sqlite-secret".to_owned()))
   1564     );
   1565 
   1566     wait_for_connect_secret_consumed(&runtime).await?;
   1567     let consumed_connection = runtime
   1568         .signer_manager()?
   1569         .list_connections()?
   1570         .into_iter()
   1571         .next()
   1572         .expect("stored connection");
   1573     assert!(consumed_connection.connect_secret_is_consumed());
   1574     let operation_audit = wait_for_operation_audit_count(&runtime, 2).await?;
   1575     assert_eq!(
   1576         operation_audit
   1577             .iter()
   1578             .filter(|record| record.outcome == MycOperationAuditOutcome::Rejected)
   1579             .count(),
   1580         1
   1581     );
   1582     assert_eq!(
   1583         operation_audit
   1584             .iter()
   1585             .filter(|record| record.outcome == MycOperationAuditOutcome::Succeeded)
   1586             .count(),
   1587         1
   1588     );
   1589     let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| {
   1590         records.len() >= 2 && records[1].status == MycDeliveryOutboxStatus::Finalized
   1591     })
   1592     .await?;
   1593     assert_eq!(outbox_records[0].status, MycDeliveryOutboxStatus::Failed);
   1594     assert_eq!(outbox_records[1].status, MycDeliveryOutboxStatus::Finalized);
   1595 
   1596     let restarted_runtime = MycRuntime::bootstrap(runtime.config().clone())?;
   1597     assert_eq!(
   1598         restarted_runtime
   1599             .signer_manager()?
   1600             .list_connections()?
   1601             .len(),
   1602         1
   1603     );
   1604     assert_eq!(
   1605         restarted_runtime.operation_audit_store().list_all()?.len(),
   1606         2
   1607     );
   1608     let restarted_outbox = restarted_runtime.delivery_outbox_store().list_all()?;
   1609     assert_eq!(restarted_outbox.len(), 2);
   1610     assert_eq!(restarted_outbox[0].status, MycDeliveryOutboxStatus::Failed);
   1611     assert_eq!(
   1612         restarted_outbox[1].status,
   1613         MycDeliveryOutboxStatus::Finalized
   1614     );
   1615     assert_eq!(
   1616         restarted_outbox[0].request_id.as_deref(),
   1617         Some("sqlite-connect-1")
   1618     );
   1619     assert_eq!(
   1620         restarted_outbox[1].request_id.as_deref(),
   1621         Some("sqlite-connect-2")
   1622     );
   1623     assert!(restarted_outbox[0].signer_publish_workflow_id.is_some());
   1624     assert!(restarted_outbox[1].signer_publish_workflow_id.is_some());
   1625     assert!(
   1626         restarted_runtime
   1627             .signer_manager()?
   1628             .list_publish_workflows()?
   1629             .is_empty()
   1630     );
   1631     assert!(
   1632         restarted_runtime
   1633             .signer_manager()?
   1634             .list_connections()?
   1635             .into_iter()
   1636             .next()
   1637             .expect("persisted connection")
   1638             .connect_secret_is_consumed()
   1639     );
   1640 
   1641     let _ = shutdown_tx.send(());
   1642     listener_task.await??;
   1643     Ok(())
   1644 }
   1645 
   1646 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   1647 async fn external_nostr_client_recovers_connect_response_after_restart() -> TestResult<()> {
   1648     let relay = TestRelay::spawn().await?;
   1649     let test_runtime = MycTestRuntime::new(relay.url(), MycConnectionApproval::NotRequired);
   1650     let MycTestRuntime {
   1651         _temp: _tempdir,
   1652         runtime,
   1653     } = test_runtime;
   1654     let config = runtime.config().clone();
   1655     let signer_public_key = runtime.signer_identity().public_key();
   1656     let user_public_key = runtime.user_identity().public_key();
   1657     let client_identity =
   1658         identity("5757575757575757575757575757575757575757575757575757575757575757");
   1659     let base_created_at = Timestamp::now().as_secs();
   1660     let connect_request_id = "external-recovery-connect";
   1661     let connect_request = ExternalNostrConnectRequest::Connect {
   1662         remote_signer_public_key: signer_public_key,
   1663         secret: None,
   1664     };
   1665     let request_message = build_external_request_message(connect_request_id, &connect_request);
   1666     let request_event = build_external_request_event(
   1667         &client_identity,
   1668         signer_public_key,
   1669         &request_message,
   1670         base_created_at,
   1671     );
   1672     publish_event(relay.url(), &request_event).await?;
   1673 
   1674     let relay_url: RadrootsNostrRelayUrl = relay.url().parse()?;
   1675     let manager = runtime.signer_manager()?;
   1676     let connection = manager.register_connection(
   1677         RadrootsNostrSignerConnectionDraft::new(
   1678             client_identity.public_key(),
   1679             runtime.user_public_identity(),
   1680         )
   1681         .with_relays(vec![relay_url.clone()])
   1682         .with_approval_requirement(RadrootsNostrSignerApprovalRequirement::NotRequired),
   1683     )?;
   1684     let response_envelope =
   1685         RadrootsNostrConnectResponse::ConnectAcknowledged.into_envelope(connect_request_id)?;
   1686     let response_payload = serde_json::to_string(&response_envelope)?;
   1687     let signer_identity =
   1688         identity("1111111111111111111111111111111111111111111111111111111111111111");
   1689     let response_ciphertext = nip44::encrypt(
   1690         signer_identity.keys().secret_key(),
   1691         &client_identity.public_key(),
   1692         response_payload,
   1693         Version::V2,
   1694     )?;
   1695     let response_event = runtime.signer_identity().sign_event_builder(
   1696         RadrootsNostrEventBuilder::new(
   1697             RadrootsNostrKind::Custom(RADROOTS_NOSTR_CONNECT_RPC_KIND),
   1698             response_ciphertext,
   1699         )
   1700         .tags(vec![RadrootsNostrTag::public_key(
   1701             client_identity.public_key(),
   1702         )]),
   1703         "external recovery queued connect response",
   1704     )?;
   1705     let queued_record = MycDeliveryOutboxRecord::new(
   1706         MycDeliveryOutboxKind::ListenerResponsePublish,
   1707         response_event,
   1708         vec![relay_url],
   1709     )?
   1710     .with_connection_id(&connection.connection_id)
   1711     .with_request_id(connect_request_id);
   1712     runtime.delivery_outbox_store().enqueue(&queued_record)?;
   1713     assert_eq!(
   1714         queued_record.kind,
   1715         MycDeliveryOutboxKind::ListenerResponsePublish
   1716     );
   1717 
   1718     let restarted_runtime = MycRuntime::bootstrap(config.clone())?;
   1719     let persisted_queued_record = restarted_runtime
   1720         .delivery_outbox_store()
   1721         .list_all()?
   1722         .into_iter()
   1723         .find(|record| record.request_id.as_deref() == Some(connect_request_id))
   1724         .expect("persisted queued external connect record");
   1725     assert_eq!(
   1726         persisted_queued_record.status,
   1727         MycDeliveryOutboxStatus::Queued
   1728     );
   1729 
   1730     let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
   1731     let service_runtime = restarted_runtime.clone();
   1732     let restarted_listener_task = tokio::spawn(async move {
   1733         service_runtime
   1734             .run_until(async {
   1735                 let _ = shutdown_rx.await;
   1736             })
   1737             .await
   1738     });
   1739 
   1740     let (_, connect_response) = wait_for_external_response(
   1741         &relay,
   1742         &client_identity,
   1743         signer_public_key,
   1744         connect_request_id,
   1745         ExternalNostrConnectMethod::Connect,
   1746     )
   1747     .await?;
   1748     assert_eq!(connect_response.result, Some(ExternalResponseResult::Ack));
   1749     assert_eq!(connect_response.error, None);
   1750 
   1751     let (_, get_public_key_response) = publish_external_request_and_wait_for_response(
   1752         &relay,
   1753         &client_identity,
   1754         signer_public_key,
   1755         "external-recovery-get-public-key",
   1756         ExternalNostrConnectRequest::GetPublicKey,
   1757         base_created_at + 1,
   1758     )
   1759     .await?;
   1760     assert_eq!(
   1761         get_public_key_response.result,
   1762         Some(ExternalResponseResult::GetPublicKey(user_public_key))
   1763     );
   1764     assert_eq!(get_public_key_response.error, None);
   1765 
   1766     let _ = shutdown_tx.send(());
   1767     restarted_listener_task.await??;
   1768 
   1769     let finalized_runtime = MycRuntime::bootstrap(config)?;
   1770     let finalized_record = finalized_runtime
   1771         .delivery_outbox_store()
   1772         .list_all()?
   1773         .into_iter()
   1774         .find(|record| record.request_id.as_deref() == Some(connect_request_id))
   1775         .expect("finalized external connect recovery record");
   1776     assert_eq!(finalized_record.status, MycDeliveryOutboxStatus::Finalized);
   1777     assert!(finalized_record.published_at_unix.is_some());
   1778     assert!(finalized_record.finalized_at_unix.is_some());
   1779 
   1780     Ok(())
   1781 }
   1782 
   1783 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   1784 async fn startup_recovery_republishes_queued_listener_connect_secret_job() -> TestResult<()> {
   1785     let relay = TestRelay::spawn().await?;
   1786     let test_runtime = MycTestRuntime::new_with_transport_relays(
   1787         &[relay.url()],
   1788         MycConnectionApproval::NotRequired,
   1789     );
   1790     let MycTestRuntime {
   1791         _temp: _tempdir,
   1792         runtime,
   1793     } = test_runtime;
   1794     let signer_public_key = runtime.signer_identity().public_key();
   1795     let config = runtime.config().clone();
   1796     let client_identity =
   1797         identity("5454545454545454545454545454545454545454545454545454545454545454");
   1798     let relay_url: RadrootsNostrRelayUrl = relay.url().parse()?;
   1799 
   1800     let manager = runtime.signer_manager()?;
   1801     let connection = manager.register_connection(
   1802         RadrootsNostrSignerConnectionDraft::new(
   1803             client_identity.public_key(),
   1804             runtime.user_public_identity(),
   1805         )
   1806         .with_connect_secret("startup-recovery-secret")
   1807         .with_relays(vec![relay_url.clone()])
   1808         .with_approval_requirement(RadrootsNostrSignerApprovalRequirement::NotRequired),
   1809     )?;
   1810     let workflow = manager.begin_connect_secret_publish_finalization(&connection.connection_id)?;
   1811     let event = runtime
   1812         .signer_identity()
   1813         .sign_event_builder(
   1814             RadrootsNostrEventBuilder::new(
   1815                 RadrootsNostrKind::Custom(RADROOTS_NOSTR_CONNECT_RPC_KIND),
   1816                 "startup-recovery",
   1817             ),
   1818             "startup recovery",
   1819         )
   1820         .map_err(|error| format!("failed to sign startup recovery event: {error}"))?;
   1821     let outbox_record = MycDeliveryOutboxRecord::new(
   1822         MycDeliveryOutboxKind::ListenerResponsePublish,
   1823         event,
   1824         vec![relay_url],
   1825     )?
   1826     .with_connection_id(&connection.connection_id)
   1827     .with_request_id("startup-recovery-connect")
   1828     .with_signer_publish_workflow_id(&workflow.workflow_id);
   1829     runtime.delivery_outbox_store().enqueue(&outbox_record)?;
   1830 
   1831     runtime.run_until(async {}).await?;
   1832 
   1833     let published = relay
   1834         .wait_for_published_events_by_author(signer_public_key, 1)
   1835         .await?;
   1836     assert_eq!(published.len(), 1);
   1837 
   1838     let restarted_runtime = MycRuntime::bootstrap(config)?;
   1839     let recovered_connection = restarted_runtime
   1840         .signer_manager()?
   1841         .get_connection(&connection.connection_id)?
   1842         .expect("persisted connection");
   1843     assert!(recovered_connection.connect_secret_is_consumed());
   1844     assert!(
   1845         restarted_runtime
   1846             .signer_manager()?
   1847             .list_publish_workflows()?
   1848             .is_empty()
   1849     );
   1850     let outbox_records = restarted_runtime.delivery_outbox_store().list_all()?;
   1851     assert_eq!(outbox_records.len(), 1);
   1852     assert_eq!(outbox_records[0].status, MycDeliveryOutboxStatus::Finalized);
   1853     assert_eq!(
   1854         outbox_records[0].request_id.as_deref(),
   1855         Some("startup-recovery-connect")
   1856     );
   1857     assert!(outbox_records[0].published_at_unix.is_some());
   1858     assert!(outbox_records[0].finalized_at_unix.is_some());
   1859     let audit_records = restarted_runtime.operation_audit_store().list_all()?;
   1860     assert_eq!(audit_records.len(), 2);
   1861     assert_eq!(
   1862         audit_records[0].operation,
   1863         MycOperationAuditKind::ListenerResponsePublish
   1864     );
   1865     assert_eq!(
   1866         audit_records[0].outcome,
   1867         MycOperationAuditOutcome::Succeeded
   1868     );
   1869     assert_eq!(
   1870         audit_records[0].request_id.as_deref(),
   1871         Some("startup-recovery-connect")
   1872     );
   1873     assert_eq!(
   1874         audit_records[1].operation,
   1875         MycOperationAuditKind::DeliveryRecovery
   1876     );
   1877     assert_eq!(
   1878         audit_records[1].outcome,
   1879         MycOperationAuditOutcome::Succeeded
   1880     );
   1881 
   1882     Ok(())
   1883 }
   1884 
   1885 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   1886 async fn startup_recovery_republishes_queued_connect_accept_job() -> TestResult<()> {
   1887     let relay = TestRelay::spawn().await?;
   1888     let test_runtime = MycTestRuntime::new(relay.url(), MycConnectionApproval::NotRequired);
   1889     let MycTestRuntime {
   1890         _temp: _tempdir,
   1891         runtime,
   1892     } = test_runtime;
   1893     let signer_public_key = runtime.signer_identity().public_key();
   1894     let config = runtime.config().clone();
   1895     let relay_url: RadrootsNostrRelayUrl = relay.url().parse()?;
   1896     let client_identity =
   1897         identity("4343434343434343434343434343434343434343434343434343434343434343");
   1898 
   1899     let manager = runtime.signer_manager()?;
   1900     let connection = manager.register_connection(
   1901         RadrootsNostrSignerConnectionDraft::new(
   1902             client_identity.public_key(),
   1903             runtime.user_public_identity(),
   1904         )
   1905         .with_connect_secret("startup-connect-accept-secret")
   1906         .with_relays(vec![relay_url.clone()])
   1907         .with_approval_requirement(RadrootsNostrSignerApprovalRequirement::NotRequired),
   1908     )?;
   1909     let workflow = manager.begin_connect_secret_publish_finalization(&connection.connection_id)?;
   1910     let event = runtime
   1911         .signer_identity()
   1912         .sign_event_builder(
   1913             RadrootsNostrEventBuilder::new(
   1914                 RadrootsNostrKind::Custom(RADROOTS_NOSTR_CONNECT_RPC_KIND),
   1915                 "startup-recovery-connect-accept",
   1916             ),
   1917             "startup recovery connect accept",
   1918         )
   1919         .map_err(|error| {
   1920             format!("failed to sign startup recovery connect-accept event: {error}")
   1921         })?;
   1922     let outbox_record = MycDeliveryOutboxRecord::new(
   1923         MycDeliveryOutboxKind::ConnectAcceptPublish,
   1924         event,
   1925         vec![relay_url],
   1926     )?
   1927     .with_connection_id(&connection.connection_id)
   1928     .with_request_id("startup-recovery-connect-accept")
   1929     .with_signer_publish_workflow_id(&workflow.workflow_id);
   1930     runtime.delivery_outbox_store().enqueue(&outbox_record)?;
   1931 
   1932     runtime.run_until(async {}).await?;
   1933 
   1934     let published = relay
   1935         .wait_for_published_events_by_author(signer_public_key, 1)
   1936         .await?;
   1937     assert_eq!(published.len(), 1);
   1938 
   1939     let restarted_runtime = MycRuntime::bootstrap(config)?;
   1940     let recovered_connection = restarted_runtime
   1941         .signer_manager()?
   1942         .get_connection(&connection.connection_id)?
   1943         .expect("persisted connection");
   1944     assert!(recovered_connection.connect_secret_is_consumed());
   1945     assert!(
   1946         restarted_runtime
   1947             .signer_manager()?
   1948             .list_publish_workflows()?
   1949             .is_empty()
   1950     );
   1951     let outbox_records = restarted_runtime.delivery_outbox_store().list_all()?;
   1952     assert_eq!(outbox_records.len(), 1);
   1953     assert_eq!(outbox_records[0].status, MycDeliveryOutboxStatus::Finalized);
   1954     assert_eq!(
   1955         outbox_records[0].request_id.as_deref(),
   1956         Some("startup-recovery-connect-accept")
   1957     );
   1958     assert!(outbox_records[0].published_at_unix.is_some());
   1959     assert!(outbox_records[0].finalized_at_unix.is_some());
   1960     let audit_records = restarted_runtime.operation_audit_store().list_all()?;
   1961     assert_eq!(audit_records.len(), 2);
   1962     assert_eq!(
   1963         audit_records[0].operation,
   1964         MycOperationAuditKind::ConnectAcceptPublish
   1965     );
   1966     assert_eq!(
   1967         audit_records[0].outcome,
   1968         MycOperationAuditOutcome::Succeeded
   1969     );
   1970     assert_eq!(
   1971         audit_records[0].request_id.as_deref(),
   1972         Some("startup-recovery-connect-accept")
   1973     );
   1974     assert_eq!(
   1975         audit_records[1].operation,
   1976         MycOperationAuditKind::DeliveryRecovery
   1977     );
   1978     assert_eq!(
   1979         audit_records[1].outcome,
   1980         MycOperationAuditOutcome::Succeeded
   1981     );
   1982 
   1983     Ok(())
   1984 }
   1985 
   1986 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   1987 async fn startup_recovery_republishes_queued_auth_replay_job() -> TestResult<()> {
   1988     let relay = TestRelay::spawn().await?;
   1989     let test_runtime = MycTestRuntime::new(relay.url(), MycConnectionApproval::ExplicitUser);
   1990     let MycTestRuntime {
   1991         _temp: _tempdir,
   1992         runtime,
   1993     } = test_runtime;
   1994     let signer_public_key = runtime.signer_identity().public_key();
   1995     let config = runtime.config().clone();
   1996     let relay_url: RadrootsNostrRelayUrl = relay.url().parse()?;
   1997     let client_identity =
   1998         identity("5353535353535353535353535353535353535353535353535353535353535353");
   1999 
   2000     let manager = runtime.signer_manager()?;
   2001     let connection = manager.register_connection(
   2002         RadrootsNostrSignerConnectionDraft::new(
   2003             client_identity.public_key(),
   2004             runtime.user_public_identity(),
   2005         )
   2006         .with_relays(vec![relay_url.clone()])
   2007         .with_approval_requirement(RadrootsNostrSignerApprovalRequirement::ExplicitUser),
   2008     )?;
   2009     let _ = manager.require_auth_challenge(&connection.connection_id, "https://auth.example")?;
   2010     let _ = manager.set_pending_request(
   2011         &connection.connection_id,
   2012         ping_request_message("startup-recovery-auth"),
   2013     )?;
   2014     let workflow = manager.begin_auth_replay_publish_finalization(&connection.connection_id)?;
   2015     let event = runtime
   2016         .signer_identity()
   2017         .sign_event_builder(
   2018             RadrootsNostrEventBuilder::new(
   2019                 RadrootsNostrKind::Custom(RADROOTS_NOSTR_CONNECT_RPC_KIND),
   2020                 "startup-recovery-auth-replay",
   2021             ),
   2022             "startup recovery auth replay",
   2023         )
   2024         .map_err(|error| format!("failed to sign startup recovery auth-replay event: {error}"))?;
   2025     let outbox_record = MycDeliveryOutboxRecord::new(
   2026         MycDeliveryOutboxKind::AuthReplayPublish,
   2027         event,
   2028         vec![relay_url],
   2029     )?
   2030     .with_connection_id(&connection.connection_id)
   2031     .with_request_id("startup-recovery-auth")
   2032     .with_signer_publish_workflow_id(&workflow.workflow_id);
   2033     runtime.delivery_outbox_store().enqueue(&outbox_record)?;
   2034 
   2035     runtime.run_until(async {}).await?;
   2036 
   2037     let published = relay
   2038         .wait_for_published_events_by_author(signer_public_key, 1)
   2039         .await?;
   2040     assert_eq!(published.len(), 1);
   2041 
   2042     let restarted_runtime = MycRuntime::bootstrap(config)?;
   2043     let recovered_connection = restarted_runtime
   2044         .signer_manager()?
   2045         .get_connection(&connection.connection_id)?
   2046         .expect("persisted connection");
   2047     assert_eq!(
   2048         recovered_connection.auth_state,
   2049         RadrootsNostrSignerAuthState::Authorized
   2050     );
   2051     assert!(recovered_connection.pending_request.is_none());
   2052     assert!(recovered_connection.last_authenticated_at_unix.is_some());
   2053     assert!(
   2054         restarted_runtime
   2055             .signer_manager()?
   2056             .list_publish_workflows()?
   2057             .is_empty()
   2058     );
   2059     let outbox_records = restarted_runtime.delivery_outbox_store().list_all()?;
   2060     assert_eq!(outbox_records.len(), 1);
   2061     assert_eq!(outbox_records[0].status, MycDeliveryOutboxStatus::Finalized);
   2062     assert_eq!(
   2063         outbox_records[0].request_id.as_deref(),
   2064         Some("startup-recovery-auth")
   2065     );
   2066     assert!(outbox_records[0].published_at_unix.is_some());
   2067     assert!(outbox_records[0].finalized_at_unix.is_some());
   2068     let audit_records = restarted_runtime.operation_audit_store().list_all()?;
   2069     assert_eq!(audit_records.len(), 2);
   2070     assert_eq!(
   2071         audit_records[0].operation,
   2072         MycOperationAuditKind::AuthReplayPublish
   2073     );
   2074     assert_eq!(
   2075         audit_records[0].outcome,
   2076         MycOperationAuditOutcome::Succeeded
   2077     );
   2078     assert_eq!(
   2079         audit_records[0].request_id.as_deref(),
   2080         Some("startup-recovery-auth")
   2081     );
   2082     assert_eq!(
   2083         audit_records[1].operation,
   2084         MycOperationAuditKind::DeliveryRecovery
   2085     );
   2086     assert_eq!(
   2087         audit_records[1].outcome,
   2088         MycOperationAuditOutcome::Succeeded
   2089     );
   2090 
   2091     Ok(())
   2092 }
   2093 
   2094 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   2095 async fn trusted_client_reauths_after_authorized_ttl() -> TestResult<()> {
   2096     let relay = TestRelay::spawn().await?;
   2097     let client_identity =
   2098         identity("7878787878787878787878787878787878787878787878787878787878787878");
   2099     let test_runtime = MycTestRuntime::new_with_transport_config(
   2100         &[relay.url()],
   2101         MycConnectionApproval::ExplicitUser,
   2102         |config| {
   2103             config.policy.trusted_client_pubkeys = vec![client_identity.public_key().to_hex()];
   2104             config.policy.permission_ceiling = "sign_event:1".parse().expect("permission ceiling");
   2105             config.policy.allowed_sign_event_kinds = vec![1];
   2106             config.policy.auth_url = Some("https://auth.example/challenge".to_owned());
   2107             config.policy.auth_authorized_ttl_secs = Some(1);
   2108         },
   2109     );
   2110     let runtime = test_runtime.runtime.clone();
   2111     let signer_public_key = runtime.signer_identity().public_key();
   2112 
   2113     let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
   2114     let service_runtime = runtime.clone();
   2115     let listener_task = tokio::spawn(async move {
   2116         service_runtime
   2117             .run_until(async {
   2118                 let _ = shutdown_rx.await;
   2119             })
   2120             .await
   2121     });
   2122 
   2123     relay.wait_for_subscription_count(1).await?;
   2124 
   2125     let connect_request = build_request_event(
   2126         &client_identity,
   2127         signer_public_key,
   2128         RadrootsNostrConnectRequestMessage::new(
   2129             "trusted-connect",
   2130             RadrootsNostrConnectRequest::Connect {
   2131                 remote_signer_public_key: signer_public_key,
   2132                 secret: None,
   2133                 requested_permissions: "sign_event:1".parse().expect("requested permissions"),
   2134             },
   2135         ),
   2136         Timestamp::now().as_secs(),
   2137     );
   2138     publish_event(relay.url(), &connect_request).await?;
   2139     let response_events = relay
   2140         .wait_for_published_events_by_author(signer_public_key, 1)
   2141         .await?;
   2142     let connect_response =
   2143         decrypt_response(&client_identity, signer_public_key, &response_events[0]);
   2144     let connect_parsed =
   2145         radroots_nostr_connect::prelude::RadrootsNostrConnectResponse::from_envelope(
   2146             &RadrootsNostrConnectRequest::Connect {
   2147                 remote_signer_public_key: signer_public_key,
   2148                 secret: None,
   2149                 requested_permissions: "sign_event:1".parse().expect("requested permissions"),
   2150             }
   2151             .method(),
   2152             connect_response,
   2153         )?;
   2154     assert_eq!(
   2155         connect_parsed,
   2156         radroots_nostr_connect::prelude::RadrootsNostrConnectResponse::ConnectAcknowledged
   2157     );
   2158 
   2159     let sign_request = |request_id: &str, created_at_unix| {
   2160         build_request_event(
   2161             &client_identity,
   2162             signer_public_key,
   2163             RadrootsNostrConnectRequestMessage::new(
   2164                 request_id,
   2165                 RadrootsNostrConnectRequest::SignEvent(
   2166                     serde_json::from_value(serde_json::json!({
   2167                         "pubkey": runtime.user_identity().public_key().to_hex(),
   2168                         "created_at": created_at_unix,
   2169                         "kind": 1,
   2170                         "tags": [],
   2171                         "content": request_id
   2172                     }))
   2173                     .expect("unsigned event"),
   2174                 ),
   2175             ),
   2176             created_at_unix,
   2177         )
   2178     };
   2179 
   2180     publish_event(
   2181         relay.url(),
   2182         &sign_request("trusted-sign-1", Timestamp::now().as_secs()),
   2183     )
   2184     .await?;
   2185     let response_events = relay
   2186         .wait_for_published_events_by_author(signer_public_key, 2)
   2187         .await?;
   2188     let first_auth = decrypt_response(&client_identity, signer_public_key, &response_events[1]);
   2189     let first_auth = radroots_nostr_connect::prelude::RadrootsNostrConnectResponse::from_envelope(
   2190         &RadrootsNostrConnectRequest::SignEvent(
   2191             serde_json::from_value(serde_json::json!({
   2192                 "pubkey": runtime.user_identity().public_key().to_hex(),
   2193                 "created_at": Timestamp::from(1).as_secs(),
   2194                 "kind": 1,
   2195                 "tags": [],
   2196                 "content": "trusted-sign-1"
   2197             }))
   2198             .expect("unsigned event"),
   2199         )
   2200         .method(),
   2201         first_auth,
   2202     )?;
   2203     assert_eq!(
   2204         first_auth,
   2205         radroots_nostr_connect::prelude::RadrootsNostrConnectResponse::AuthUrl(
   2206             "https://auth.example/challenge".to_owned()
   2207         )
   2208     );
   2209 
   2210     let connection = runtime
   2211         .signer_manager()?
   2212         .list_connections()?
   2213         .into_iter()
   2214         .next()
   2215         .expect("connection");
   2216     let replayed = control::authorize_auth_challenge(&runtime, &connection.connection_id).await?;
   2217     assert_eq!(
   2218         replayed.replayed_request_id.as_deref(),
   2219         Some("trusted-sign-1")
   2220     );
   2221 
   2222     let response_events = relay
   2223         .wait_for_published_events_by_author(signer_public_key, 3)
   2224         .await?;
   2225     let replay_response =
   2226         decrypt_response(&client_identity, signer_public_key, &response_events[2]);
   2227     let replay_parsed =
   2228         radroots_nostr_connect::prelude::RadrootsNostrConnectResponse::from_envelope(
   2229             &RadrootsNostrConnectRequest::SignEvent(
   2230                 serde_json::from_value(serde_json::json!({
   2231                     "pubkey": runtime.user_identity().public_key().to_hex(),
   2232                     "created_at": Timestamp::from(1).as_secs(),
   2233                     "kind": 1,
   2234                     "tags": [],
   2235                     "content": "trusted-sign-1"
   2236                 }))
   2237                 .expect("unsigned event"),
   2238             )
   2239             .method(),
   2240             replay_response,
   2241         )?;
   2242     assert!(matches!(
   2243         replay_parsed,
   2244         radroots_nostr_connect::prelude::RadrootsNostrConnectResponse::SignedEvent(_)
   2245     ));
   2246 
   2247     sleep(Duration::from_secs(2)).await;
   2248 
   2249     publish_event(
   2250         relay.url(),
   2251         &sign_request("trusted-sign-2", Timestamp::now().as_secs()),
   2252     )
   2253     .await?;
   2254     let response_events = relay
   2255         .wait_for_published_events_by_author(signer_public_key, 4)
   2256         .await?;
   2257     let second_auth = decrypt_response(&client_identity, signer_public_key, &response_events[3]);
   2258     let second_auth = radroots_nostr_connect::prelude::RadrootsNostrConnectResponse::from_envelope(
   2259         &RadrootsNostrConnectRequest::SignEvent(
   2260             serde_json::from_value(serde_json::json!({
   2261                 "pubkey": runtime.user_identity().public_key().to_hex(),
   2262                 "created_at": Timestamp::from(1).as_secs(),
   2263                 "kind": 1,
   2264                 "tags": [],
   2265                 "content": "trusted-sign-2"
   2266             }))
   2267             .expect("unsigned event"),
   2268         )
   2269         .method(),
   2270         second_auth,
   2271     )?;
   2272     assert_eq!(
   2273         second_auth,
   2274         radroots_nostr_connect::prelude::RadrootsNostrConnectResponse::AuthUrl(
   2275             "https://auth.example/challenge".to_owned()
   2276         )
   2277     );
   2278 
   2279     let _ = shutdown_tx.send(());
   2280     listener_task.await??;
   2281     Ok(())
   2282 }
   2283 
   2284 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   2285 async fn connect_accept_retries_without_consuming_secret_until_publish_succeeds() -> TestResult<()>
   2286 {
   2287     let relay = TestRelay::spawn().await?;
   2288     let test_runtime = MycTestRuntime::new(relay.url(), MycConnectionApproval::NotRequired);
   2289     let runtime = test_runtime.runtime;
   2290     let signer_public_key = runtime.signer_identity().public_key();
   2291     let client_identity =
   2292         identity("4444444444444444444444444444444444444444444444444444444444444444");
   2293 
   2294     relay
   2295         .queue_publish_outcomes(signer_public_key, &[false, true])
   2296         .await;
   2297 
   2298     let client_uri = RadrootsNostrConnectUri::Client(RadrootsNostrConnectClientUri {
   2299         client_public_key: client_identity.public_key(),
   2300         relays: vec![nostr::RelayUrl::parse(relay.url())?],
   2301         secret: "client-secret".to_owned(),
   2302         metadata: RadrootsNostrConnectClientMetadata::default(),
   2303     })
   2304     .to_string();
   2305 
   2306     let failed = control::accept_client_uri(&runtime, &client_uri)
   2307         .await
   2308         .expect_err("first publish should fail");
   2309     assert!(failed.to_string().contains("Nostr publish failed"));
   2310 
   2311     let stored_after_failure = runtime
   2312         .signer_manager()?
   2313         .list_connections()?
   2314         .into_iter()
   2315         .next()
   2316         .expect("stored connection");
   2317     assert!(!stored_after_failure.connect_secret_is_consumed());
   2318     let operation_audit = wait_for_operation_audit_count(&runtime, 1).await?;
   2319     assert_eq!(
   2320         operation_audit[0].operation,
   2321         MycOperationAuditKind::ConnectAcceptPublish
   2322     );
   2323     assert_eq!(
   2324         operation_audit[0].outcome,
   2325         MycOperationAuditOutcome::Rejected
   2326     );
   2327     assert_eq!(
   2328         operation_audit[0].connection_id.as_deref(),
   2329         Some(stored_after_failure.connection_id.as_str())
   2330     );
   2331     assert!(operation_audit[0].request_id.is_some());
   2332     assert_eq!(operation_audit[0].relay_count, 1);
   2333     assert_eq!(operation_audit[0].acknowledged_relay_count, 0);
   2334     assert!(
   2335         operation_audit[0]
   2336             .relay_outcome_summary
   2337             .contains("blocked by test relay")
   2338     );
   2339     let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| {
   2340         records.len() >= 1 && records[0].status == MycDeliveryOutboxStatus::Failed
   2341     })
   2342     .await?;
   2343     assert_eq!(
   2344         outbox_records[0].kind,
   2345         MycDeliveryOutboxKind::ConnectAcceptPublish
   2346     );
   2347     assert_eq!(
   2348         outbox_records[0].request_id.as_deref(),
   2349         operation_audit[0].request_id.as_deref()
   2350     );
   2351     assert!(outbox_records[0].signer_publish_workflow_id.is_some());
   2352     assert!(
   2353         runtime
   2354             .signer_manager()?
   2355             .list_publish_workflows()?
   2356             .is_empty()
   2357     );
   2358 
   2359     let accepted = control::accept_client_uri(&runtime, &client_uri).await?;
   2360     assert_eq!(accepted.response_request_id.len(), 36);
   2361 
   2362     let response_events = relay
   2363         .wait_for_published_events_by_author(signer_public_key, 1)
   2364         .await?;
   2365     let response = decrypt_response(&client_identity, signer_public_key, &response_events[0]);
   2366     assert_eq!(response.id, accepted.response_request_id);
   2367     assert_eq!(
   2368         response.result,
   2369         Some(serde_json::Value::String("client-secret".to_owned()))
   2370     );
   2371 
   2372     let stored_after_success = runtime
   2373         .signer_manager()?
   2374         .list_connections()?
   2375         .into_iter()
   2376         .next()
   2377         .expect("stored connection");
   2378     assert!(stored_after_success.connect_secret_is_consumed());
   2379     let operation_audit = wait_for_operation_audit_count(&runtime, 2).await?;
   2380     assert_eq!(
   2381         operation_audit[1].operation,
   2382         MycOperationAuditKind::ConnectAcceptPublish
   2383     );
   2384     assert_eq!(
   2385         operation_audit[1].outcome,
   2386         MycOperationAuditOutcome::Succeeded
   2387     );
   2388     assert_eq!(
   2389         operation_audit[1].connection_id.as_deref(),
   2390         Some(stored_after_success.connection_id.as_str())
   2391     );
   2392     assert_eq!(
   2393         operation_audit[1].request_id.as_deref(),
   2394         Some(accepted.response_request_id.as_str())
   2395     );
   2396     assert_eq!(operation_audit[1].relay_count, 1);
   2397     assert_eq!(operation_audit[1].acknowledged_relay_count, 1);
   2398     assert!(
   2399         operation_audit[1]
   2400             .relay_outcome_summary
   2401             .contains("1/1 relays acknowledged publish")
   2402     );
   2403     let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| {
   2404         records.len() >= 2 && records[1].status == MycDeliveryOutboxStatus::Finalized
   2405     })
   2406     .await?;
   2407     assert_eq!(
   2408         outbox_records[1].kind,
   2409         MycDeliveryOutboxKind::ConnectAcceptPublish
   2410     );
   2411     assert_eq!(
   2412         outbox_records[1].request_id.as_deref(),
   2413         Some(accepted.response_request_id.as_str())
   2414     );
   2415     assert!(outbox_records[1].published_at_unix.is_some());
   2416     assert!(outbox_records[1].finalized_at_unix.is_some());
   2417     assert!(outbox_records[1].signer_publish_workflow_id.is_some());
   2418     assert!(
   2419         runtime
   2420             .signer_manager()?
   2421             .list_publish_workflows()?
   2422             .is_empty()
   2423     );
   2424 
   2425     let consumed = control::accept_client_uri(&runtime, &client_uri)
   2426         .await
   2427         .expect_err("consumed secret should be rejected");
   2428     assert!(
   2429         consumed
   2430             .to_string()
   2431             .contains("connect secret has already been consumed")
   2432     );
   2433 
   2434     Ok(())
   2435 }
   2436 
   2437 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   2438 async fn connect_accept_succeeds_with_any_delivery_policy_when_one_relay_acknowledges()
   2439 -> TestResult<()> {
   2440     let relay_a = TestRelay::spawn().await?;
   2441     let relay_b = TestRelay::spawn().await?;
   2442     let test_runtime = MycTestRuntime::new_with_transport_config(
   2443         &[relay_a.url(), relay_b.url()],
   2444         MycConnectionApproval::NotRequired,
   2445         |config| {
   2446             config.transport.delivery_policy = MycTransportDeliveryPolicy::Any;
   2447             config.transport.publish_max_attempts = 1;
   2448         },
   2449     );
   2450     let runtime = test_runtime.runtime;
   2451     let signer_public_key = runtime.signer_identity().public_key();
   2452     let client_identity =
   2453         identity("5555555555555555555555555555555555555555555555555555555555555555");
   2454 
   2455     relay_a
   2456         .queue_publish_outcomes(signer_public_key, &[false])
   2457         .await;
   2458     relay_b
   2459         .queue_publish_outcomes(signer_public_key, &[true])
   2460         .await;
   2461 
   2462     let client_uri = RadrootsNostrConnectUri::Client(RadrootsNostrConnectClientUri {
   2463         client_public_key: client_identity.public_key(),
   2464         relays: vec![
   2465             nostr::RelayUrl::parse(relay_a.url())?,
   2466             nostr::RelayUrl::parse(relay_b.url())?,
   2467         ],
   2468         secret: "delivery-any-secret".to_owned(),
   2469         metadata: RadrootsNostrConnectClientMetadata::default(),
   2470     })
   2471     .to_string();
   2472 
   2473     let accepted = control::accept_client_uri(&runtime, &client_uri).await?;
   2474     assert_eq!(accepted.response_relays.len(), 2);
   2475     let stored = runtime
   2476         .signer_manager()?
   2477         .list_connections()?
   2478         .into_iter()
   2479         .find(|connection| connection.connection_id == accepted.connection.connection_id)
   2480         .expect("stored connection");
   2481     assert!(stored.connect_secret_is_consumed());
   2482 
   2483     let operation_audit = wait_for_operation_audit_count(&runtime, 1).await?;
   2484     assert_eq!(
   2485         operation_audit[0].operation,
   2486         MycOperationAuditKind::ConnectAcceptPublish
   2487     );
   2488     assert_eq!(
   2489         operation_audit[0].outcome,
   2490         MycOperationAuditOutcome::Succeeded
   2491     );
   2492     assert_eq!(operation_audit[0].relay_count, 2);
   2493     assert_eq!(operation_audit[0].acknowledged_relay_count, 1);
   2494     assert_eq!(
   2495         operation_audit[0].delivery_policy,
   2496         Some(MycTransportDeliveryPolicy::Any)
   2497     );
   2498     assert_eq!(
   2499         operation_audit[0].required_acknowledged_relay_count,
   2500         Some(1)
   2501     );
   2502     assert_eq!(operation_audit[0].publish_attempt_count, Some(1));
   2503     assert!(
   2504         operation_audit[0]
   2505             .relay_outcome_summary
   2506             .contains("delivery policy any")
   2507     );
   2508 
   2509     Ok(())
   2510 }
   2511 
   2512 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   2513 async fn connect_accept_rejects_when_quorum_delivery_policy_is_not_met() -> TestResult<()> {
   2514     let relay_a = TestRelay::spawn().await?;
   2515     let relay_b = TestRelay::spawn().await?;
   2516     let test_runtime = MycTestRuntime::new_with_transport_config(
   2517         &[relay_a.url(), relay_b.url()],
   2518         MycConnectionApproval::NotRequired,
   2519         |config| {
   2520             config.transport.delivery_policy = MycTransportDeliveryPolicy::Quorum;
   2521             config.transport.delivery_quorum = Some(2);
   2522             config.transport.publish_max_attempts = 1;
   2523         },
   2524     );
   2525     let runtime = test_runtime.runtime;
   2526     let signer_public_key = runtime.signer_identity().public_key();
   2527     let client_identity =
   2528         identity("6666666666666666666666666666666666666666666666666666666666666665");
   2529 
   2530     relay_a
   2531         .queue_publish_outcomes(signer_public_key, &[true])
   2532         .await;
   2533     relay_b
   2534         .queue_publish_outcomes(signer_public_key, &[false])
   2535         .await;
   2536 
   2537     let client_uri = RadrootsNostrConnectUri::Client(RadrootsNostrConnectClientUri {
   2538         client_public_key: client_identity.public_key(),
   2539         relays: vec![
   2540             nostr::RelayUrl::parse(relay_a.url())?,
   2541             nostr::RelayUrl::parse(relay_b.url())?,
   2542         ],
   2543         secret: "delivery-quorum-secret".to_owned(),
   2544         metadata: RadrootsNostrConnectClientMetadata::default(),
   2545     })
   2546     .to_string();
   2547 
   2548     let error = control::accept_client_uri(&runtime, &client_uri)
   2549         .await
   2550         .expect_err("quorum publish should fail");
   2551     assert!(
   2552         error
   2553             .to_string()
   2554             .contains("delivery policy quorum requiring 2 acknowledgements")
   2555     );
   2556     assert_eq!(
   2557         error.publish_delivery_policy(),
   2558         Some(MycTransportDeliveryPolicy::Quorum)
   2559     );
   2560     assert_eq!(error.publish_required_acknowledged_relay_count(), Some(2));
   2561     assert_eq!(error.publish_attempt_count(), Some(1));
   2562 
   2563     let stored = runtime
   2564         .signer_manager()?
   2565         .list_connections()?
   2566         .into_iter()
   2567         .next()
   2568         .expect("stored connection");
   2569     assert!(!stored.connect_secret_is_consumed());
   2570 
   2571     let operation_audit = wait_for_operation_audit_count(&runtime, 1).await?;
   2572     assert_eq!(
   2573         operation_audit[0].operation,
   2574         MycOperationAuditKind::ConnectAcceptPublish
   2575     );
   2576     assert_eq!(
   2577         operation_audit[0].outcome,
   2578         MycOperationAuditOutcome::Rejected
   2579     );
   2580     assert_eq!(operation_audit[0].relay_count, 2);
   2581     assert_eq!(operation_audit[0].acknowledged_relay_count, 1);
   2582     assert_eq!(
   2583         operation_audit[0].delivery_policy,
   2584         Some(MycTransportDeliveryPolicy::Quorum)
   2585     );
   2586     assert_eq!(
   2587         operation_audit[0].required_acknowledged_relay_count,
   2588         Some(2)
   2589     );
   2590     assert_eq!(operation_audit[0].publish_attempt_count, Some(1));
   2591     let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| {
   2592         records.len() >= 1 && records[0].status == MycDeliveryOutboxStatus::Failed
   2593     })
   2594     .await?;
   2595     assert_eq!(
   2596         outbox_records[0].kind,
   2597         MycDeliveryOutboxKind::ConnectAcceptPublish
   2598     );
   2599     assert!(outbox_records[0].signer_publish_workflow_id.is_some());
   2600     assert!(
   2601         runtime
   2602             .signer_manager()?
   2603             .list_publish_workflows()?
   2604             .is_empty()
   2605     );
   2606 
   2607     Ok(())
   2608 }
   2609 
   2610 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   2611 async fn live_listener_retries_until_all_delivery_policy_is_met() -> TestResult<()> {
   2612     let relay_a = TestRelay::spawn().await?;
   2613     let relay_b = TestRelay::spawn().await?;
   2614     let test_runtime = MycTestRuntime::new_with_transport_config(
   2615         &[relay_a.url(), relay_b.url()],
   2616         MycConnectionApproval::NotRequired,
   2617         |config| {
   2618             config.transport.delivery_policy = MycTransportDeliveryPolicy::All;
   2619             config.transport.publish_max_attempts = 2;
   2620             config.transport.publish_initial_backoff_millis = 10;
   2621             config.transport.publish_max_backoff_millis = 10;
   2622         },
   2623     );
   2624     let runtime = test_runtime.runtime.clone();
   2625     let signer_public_key = runtime.signer_identity().public_key();
   2626     let client_identity =
   2627         identity("7777777777777777777777777777777777777777777777777777777777777777");
   2628     let base_created_at = Timestamp::now().as_secs();
   2629 
   2630     relay_a
   2631         .queue_publish_outcomes(signer_public_key, &[true, true])
   2632         .await;
   2633     relay_b
   2634         .queue_publish_outcomes(signer_public_key, &[false, true])
   2635         .await;
   2636 
   2637     let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
   2638     let service_runtime = runtime.clone();
   2639     let listener_task = tokio::spawn(async move {
   2640         service_runtime
   2641             .run_until(async {
   2642                 let _ = shutdown_rx.await;
   2643             })
   2644             .await
   2645     });
   2646 
   2647     relay_a.wait_for_subscription_count(1).await?;
   2648     relay_b.wait_for_subscription_count(1).await?;
   2649 
   2650     let request = build_request_event(
   2651         &client_identity,
   2652         signer_public_key,
   2653         connect_request_message("connect-all-1", signer_public_key, "shared-secret-all"),
   2654         base_created_at,
   2655     );
   2656     publish_event(relay_a.url(), &request).await?;
   2657 
   2658     let response_events = relay_b
   2659         .wait_for_published_events_by_author(signer_public_key, 1)
   2660         .await?;
   2661     let response = decrypt_response(&client_identity, signer_public_key, &response_events[0]);
   2662     assert_eq!(response.id, "connect-all-1");
   2663     assert_eq!(
   2664         response.result,
   2665         Some(serde_json::Value::String("shared-secret-all".to_owned()))
   2666     );
   2667 
   2668     wait_for_connect_secret_consumed(&runtime).await?;
   2669     let operation_audit = wait_for_operation_audit_count(&runtime, 1).await?;
   2670     assert_eq!(
   2671         operation_audit[0].operation,
   2672         MycOperationAuditKind::ListenerResponsePublish
   2673     );
   2674     assert_eq!(
   2675         operation_audit[0].outcome,
   2676         MycOperationAuditOutcome::Succeeded
   2677     );
   2678     assert_eq!(operation_audit[0].relay_count, 2);
   2679     assert_eq!(operation_audit[0].acknowledged_relay_count, 2);
   2680     assert_eq!(
   2681         operation_audit[0].delivery_policy,
   2682         Some(MycTransportDeliveryPolicy::All)
   2683     );
   2684     assert_eq!(
   2685         operation_audit[0].required_acknowledged_relay_count,
   2686         Some(2)
   2687     );
   2688     assert_eq!(operation_audit[0].publish_attempt_count, Some(2));
   2689     assert!(
   2690         operation_audit[0]
   2691             .relay_outcome_summary
   2692             .contains("attempt 1: 1/2 relays acknowledged publish")
   2693     );
   2694 
   2695     let _ = shutdown_tx.send(());
   2696     listener_task.await??;
   2697     Ok(())
   2698 }
   2699 
   2700 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   2701 async fn auth_replay_restores_pending_request_until_publish_succeeds() -> TestResult<()> {
   2702     let relay = TestRelay::spawn().await?;
   2703     let test_runtime = MycTestRuntime::new(relay.url(), MycConnectionApproval::NotRequired);
   2704     let runtime = test_runtime.runtime;
   2705     let signer_public_key = runtime.signer_identity().public_key();
   2706     let client_public_key = Keys::new(SecretKey::from_hex(
   2707         "5555555555555555555555555555555555555555555555555555555555555555",
   2708     )?)
   2709     .public_key();
   2710 
   2711     relay
   2712         .queue_publish_outcomes(signer_public_key, &[false, true])
   2713         .await;
   2714 
   2715     let manager = runtime.signer_manager()?;
   2716     let connection = manager.register_connection(
   2717         RadrootsNostrSignerConnectionDraft::new(client_public_key, runtime.user_public_identity())
   2718             .with_relays(vec![nostr::RelayUrl::parse(relay.url())?])
   2719             .with_approval_requirement(RadrootsNostrSignerApprovalRequirement::NotRequired),
   2720     )?;
   2721     manager.require_auth_challenge(&connection.connection_id, "https://auth.example/flow")?;
   2722     manager.set_pending_request(&connection.connection_id, ping_request_message("auth-ping"))?;
   2723 
   2724     let first_attempt = control::authorize_auth_challenge(&runtime, &connection.connection_id)
   2725         .await
   2726         .expect_err("first replay publish should fail");
   2727     assert!(first_attempt.to_string().contains("Nostr publish failed"));
   2728 
   2729     let restored = runtime
   2730         .signer_manager()?
   2731         .get_connection(&connection.connection_id)?
   2732         .expect("restored connection");
   2733     assert_eq!(restored.auth_state, RadrootsNostrSignerAuthState::Pending);
   2734     assert_eq!(
   2735         restored
   2736             .pending_request
   2737             .as_ref()
   2738             .expect("pending request")
   2739             .request_id()
   2740             .as_str(),
   2741         "auth-ping"
   2742     );
   2743     assert_eq!(
   2744         restored
   2745             .auth_challenge
   2746             .as_ref()
   2747             .expect("auth challenge")
   2748             .authorized_at_unix,
   2749         None
   2750     );
   2751     let operation_audit = wait_for_operation_audit_count(&runtime, 2).await?;
   2752     assert_eq!(
   2753         operation_audit[0].operation,
   2754         MycOperationAuditKind::AuthReplayPublish
   2755     );
   2756     assert_eq!(
   2757         operation_audit[0].outcome,
   2758         MycOperationAuditOutcome::Rejected
   2759     );
   2760     assert_eq!(
   2761         operation_audit[0].connection_id.as_deref(),
   2762         Some(connection.connection_id.as_str())
   2763     );
   2764     assert_eq!(operation_audit[0].request_id.as_deref(), Some("auth-ping"));
   2765     assert_eq!(operation_audit[0].relay_count, 1);
   2766     assert_eq!(operation_audit[0].acknowledged_relay_count, 0);
   2767     assert!(
   2768         operation_audit[0]
   2769             .relay_outcome_summary
   2770             .contains("blocked by test relay")
   2771     );
   2772     assert_eq!(
   2773         operation_audit[1].operation,
   2774         MycOperationAuditKind::AuthReplayRestore
   2775     );
   2776     assert_eq!(
   2777         operation_audit[1].outcome,
   2778         MycOperationAuditOutcome::Restored
   2779     );
   2780     assert_eq!(
   2781         operation_audit[1].connection_id.as_deref(),
   2782         Some(connection.connection_id.as_str())
   2783     );
   2784     assert_eq!(operation_audit[1].request_id.as_deref(), Some("auth-ping"));
   2785     assert_eq!(operation_audit[1].relay_count, 1);
   2786     assert_eq!(operation_audit[1].acknowledged_relay_count, 0);
   2787     assert!(
   2788         operation_audit[1]
   2789             .relay_outcome_summary
   2790             .contains("preserved pending auth challenge")
   2791     );
   2792     let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| {
   2793         records.len() >= 1 && records[0].status == MycDeliveryOutboxStatus::Failed
   2794     })
   2795     .await?;
   2796     assert_eq!(
   2797         outbox_records[0].kind,
   2798         MycDeliveryOutboxKind::AuthReplayPublish
   2799     );
   2800     assert_eq!(outbox_records[0].request_id.as_deref(), Some("auth-ping"));
   2801     assert!(outbox_records[0].signer_publish_workflow_id.is_some());
   2802     assert!(
   2803         runtime
   2804             .signer_manager()?
   2805             .list_publish_workflows()?
   2806             .is_empty()
   2807     );
   2808 
   2809     let replayed = control::authorize_auth_challenge(&runtime, &connection.connection_id).await?;
   2810     assert_eq!(replayed.replayed_request_id.as_deref(), Some("auth-ping"));
   2811 
   2812     let client_identity =
   2813         identity("5555555555555555555555555555555555555555555555555555555555555555");
   2814     let response_events = relay
   2815         .wait_for_published_events_by_author(signer_public_key, 1)
   2816         .await?;
   2817     let response = decrypt_response(&client_identity, signer_public_key, &response_events[0]);
   2818     assert_eq!(response.id, "auth-ping");
   2819     assert_eq!(
   2820         response.result,
   2821         Some(serde_json::Value::String("pong".to_owned()))
   2822     );
   2823 
   2824     let authorized = runtime
   2825         .signer_manager()?
   2826         .get_connection(&connection.connection_id)?
   2827         .expect("authorized connection");
   2828     assert_eq!(
   2829         authorized.auth_state,
   2830         RadrootsNostrSignerAuthState::Authorized
   2831     );
   2832     assert!(authorized.pending_request.is_none());
   2833     assert!(authorized.last_authenticated_at_unix.is_some());
   2834     let operation_audit = wait_for_operation_audit_count(&runtime, 3).await?;
   2835     assert_eq!(
   2836         operation_audit[2].operation,
   2837         MycOperationAuditKind::AuthReplayPublish
   2838     );
   2839     assert_eq!(
   2840         operation_audit[2].outcome,
   2841         MycOperationAuditOutcome::Succeeded
   2842     );
   2843     assert_eq!(
   2844         operation_audit[2].connection_id.as_deref(),
   2845         Some(connection.connection_id.as_str())
   2846     );
   2847     assert_eq!(operation_audit[2].request_id.as_deref(), Some("auth-ping"));
   2848     assert_eq!(operation_audit[2].relay_count, 1);
   2849     assert_eq!(operation_audit[2].acknowledged_relay_count, 1);
   2850     assert!(
   2851         operation_audit[2]
   2852             .relay_outcome_summary
   2853             .contains("1/1 relays acknowledged publish")
   2854     );
   2855     let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| {
   2856         records.len() >= 2 && records[1].status == MycDeliveryOutboxStatus::Finalized
   2857     })
   2858     .await?;
   2859     assert_eq!(
   2860         outbox_records[1].kind,
   2861         MycDeliveryOutboxKind::AuthReplayPublish
   2862     );
   2863     assert_eq!(outbox_records[1].request_id.as_deref(), Some("auth-ping"));
   2864     assert!(outbox_records[1].published_at_unix.is_some());
   2865     assert!(outbox_records[1].finalized_at_unix.is_some());
   2866     assert!(outbox_records[1].signer_publish_workflow_id.is_some());
   2867     assert!(
   2868         runtime
   2869             .signer_manager()?
   2870             .list_publish_workflows()?
   2871             .is_empty()
   2872     );
   2873 
   2874     Ok(())
   2875 }
   2876 
   2877 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   2878 async fn explicit_nip89_publish_uses_app_identity_and_records_audit() -> TestResult<()> {
   2879     let relay = TestRelay::spawn().await?;
   2880     let test_runtime =
   2881         MycTestRuntime::new_with_discovery(relay.url(), MycConnectionApproval::ExplicitUser);
   2882     let runtime = test_runtime.runtime;
   2883     let app_identity = myc::identity_files::load_encrypted_identity(
   2884         runtime
   2885             .config()
   2886             .discovery
   2887             .app_identity_path
   2888             .as_ref()
   2889             .expect("app identity path"),
   2890     )?;
   2891 
   2892     relay
   2893         .queue_publish_outcomes(app_identity.public_key(), &[true])
   2894         .await;
   2895 
   2896     let published = publish_nip89_event(&runtime).await?;
   2897     let published_event_id = published.event.id.to_hex();
   2898     let published_events = relay
   2899         .wait_for_published_events_by_author(app_identity.public_key(), 1)
   2900         .await?;
   2901     let event = &published_events[0];
   2902     let event_json = event.as_json();
   2903 
   2904     assert_eq!(
   2905         published.author_public_key_hex,
   2906         app_identity.public_key_hex()
   2907     );
   2908     assert_eq!(
   2909         published.signer_public_key_hex,
   2910         runtime.signer_identity().public_key_hex()
   2911     );
   2912     assert_eq!(event.kind.as_u16(), 31_990);
   2913     assert!(event_json.contains("\"24133\""));
   2914     assert!(event_json.contains("\"relay\""));
   2915     assert!(event_json.contains("\"nostrconnect_url\""));
   2916     assert_eq!(published.relay_count, 1);
   2917     assert_eq!(published.acknowledged_relay_count, 1);
   2918 
   2919     let operation_audit = wait_for_operation_audit_count(&runtime, 1).await?;
   2920     assert_eq!(
   2921         operation_audit[0].operation,
   2922         MycOperationAuditKind::DiscoveryHandlerPublish
   2923     );
   2924     assert_eq!(
   2925         operation_audit[0].outcome,
   2926         MycOperationAuditOutcome::Succeeded
   2927     );
   2928     assert!(operation_audit[0].connection_id.is_none());
   2929     assert_eq!(
   2930         operation_audit[0].request_id.as_deref(),
   2931         Some(published_event_id.as_str())
   2932     );
   2933     assert_eq!(operation_audit[0].relay_count, 1);
   2934     assert_eq!(operation_audit[0].acknowledged_relay_count, 1);
   2935     assert!(
   2936         operation_audit[0]
   2937             .relay_outcome_summary
   2938             .contains("1/1 relays acknowledged publish")
   2939     );
   2940     let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| {
   2941         records.len() >= 1 && records[0].status == MycDeliveryOutboxStatus::Finalized
   2942     })
   2943     .await?;
   2944     assert_eq!(
   2945         outbox_records[0].kind,
   2946         MycDeliveryOutboxKind::DiscoveryHandlerPublish
   2947     );
   2948     assert_eq!(
   2949         outbox_records[0].request_id.as_deref(),
   2950         Some(published_event_id.as_str())
   2951     );
   2952     assert!(outbox_records[0].attempt_id.is_none());
   2953     assert!(outbox_records[0].signer_publish_workflow_id.is_none());
   2954     assert!(outbox_records[0].published_at_unix.is_some());
   2955     assert!(outbox_records[0].finalized_at_unix.is_some());
   2956 
   2957     Ok(())
   2958 }
   2959 
   2960 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   2961 async fn startup_recovery_republishes_queued_discovery_publish_job() -> TestResult<()> {
   2962     let relay = TestRelay::spawn().await?;
   2963     let test_runtime =
   2964         MycTestRuntime::new_with_discovery(relay.url(), MycConnectionApproval::ExplicitUser);
   2965     let MycTestRuntime {
   2966         _temp: _tempdir,
   2967         runtime,
   2968     } = test_runtime;
   2969     let config = runtime.config().clone();
   2970     let relay_url: RadrootsNostrRelayUrl = relay.url().parse()?;
   2971     let context = MycDiscoveryContext::from_runtime(&runtime)?;
   2972     let app_public_key = context.app_identity().public_key();
   2973     let event = context.build_signed_handler_event()?;
   2974     let event_id = event.id.to_hex();
   2975     let outbox_record = MycDeliveryOutboxRecord::new(
   2976         MycDeliveryOutboxKind::DiscoveryHandlerPublish,
   2977         event,
   2978         vec![relay_url],
   2979     )?
   2980     .with_request_id(event_id.as_str());
   2981     runtime.delivery_outbox_store().enqueue(&outbox_record)?;
   2982 
   2983     runtime.run_until(async {}).await?;
   2984 
   2985     let published = relay
   2986         .wait_for_published_events_by_author(app_public_key, 1)
   2987         .await?;
   2988     assert_eq!(published.len(), 1);
   2989     assert_eq!(published[0].id.to_hex(), event_id);
   2990 
   2991     let restarted_runtime = MycRuntime::bootstrap(config)?;
   2992     let outbox_records = restarted_runtime.delivery_outbox_store().list_all()?;
   2993     assert_eq!(outbox_records.len(), 1);
   2994     assert_eq!(outbox_records[0].status, MycDeliveryOutboxStatus::Finalized);
   2995     assert_eq!(
   2996         outbox_records[0].request_id.as_deref(),
   2997         Some(event_id.as_str())
   2998     );
   2999     assert!(outbox_records[0].published_at_unix.is_some());
   3000     assert!(outbox_records[0].finalized_at_unix.is_some());
   3001     let audit_records = restarted_runtime.operation_audit_store().list_all()?;
   3002     assert_eq!(audit_records.len(), 2);
   3003     assert_eq!(
   3004         audit_records[0].operation,
   3005         MycOperationAuditKind::DiscoveryHandlerPublish
   3006     );
   3007     assert_eq!(
   3008         audit_records[0].outcome,
   3009         MycOperationAuditOutcome::Succeeded
   3010     );
   3011     assert_eq!(
   3012         audit_records[0].request_id.as_deref(),
   3013         Some(event_id.as_str())
   3014     );
   3015     assert_eq!(
   3016         audit_records[1].operation,
   3017         MycOperationAuditKind::DeliveryRecovery
   3018     );
   3019     assert_eq!(
   3020         audit_records[1].outcome,
   3021         MycOperationAuditOutcome::Succeeded
   3022     );
   3023 
   3024     Ok(())
   3025 }
   3026 
   3027 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   3028 async fn startup_recovery_finalizes_published_discovery_publish_job() -> TestResult<()> {
   3029     let relay = TestRelay::spawn().await?;
   3030     let test_runtime =
   3031         MycTestRuntime::new_with_discovery(relay.url(), MycConnectionApproval::ExplicitUser);
   3032     let MycTestRuntime {
   3033         _temp: _tempdir,
   3034         runtime,
   3035     } = test_runtime;
   3036     let config = runtime.config().clone();
   3037     let relay_url: RadrootsNostrRelayUrl = relay.url().parse()?;
   3038     let context = MycDiscoveryContext::from_runtime(&runtime)?;
   3039     let app_public_key = context.app_identity().public_key();
   3040     let event = context.build_signed_handler_event()?;
   3041     let event_id = event.id.to_hex();
   3042     let outbox_record = MycDeliveryOutboxRecord::new(
   3043         MycDeliveryOutboxKind::DiscoveryHandlerPublish,
   3044         event,
   3045         vec![relay_url],
   3046     )?
   3047     .with_request_id(event_id.as_str());
   3048     runtime.delivery_outbox_store().enqueue(&outbox_record)?;
   3049     runtime
   3050         .delivery_outbox_store()
   3051         .mark_published_pending_finalize(&outbox_record.job_id, 1)?;
   3052 
   3053     runtime.run_until(async {}).await?;
   3054 
   3055     sleep(Duration::from_millis(100)).await;
   3056     assert!(
   3057         relay
   3058             .published_events_by_author(app_public_key)
   3059             .await
   3060             .is_empty()
   3061     );
   3062 
   3063     let restarted_runtime = MycRuntime::bootstrap(config)?;
   3064     let outbox_records = restarted_runtime.delivery_outbox_store().list_all()?;
   3065     assert_eq!(outbox_records.len(), 1);
   3066     assert_eq!(outbox_records[0].status, MycDeliveryOutboxStatus::Finalized);
   3067     assert_eq!(
   3068         outbox_records[0].request_id.as_deref(),
   3069         Some(event_id.as_str())
   3070     );
   3071     assert!(outbox_records[0].published_at_unix.is_some());
   3072     assert!(outbox_records[0].finalized_at_unix.is_some());
   3073     let audit_records = restarted_runtime.operation_audit_store().list_all()?;
   3074     assert_eq!(audit_records.len(), 2);
   3075     assert_eq!(
   3076         audit_records[0].operation,
   3077         MycOperationAuditKind::DiscoveryHandlerPublish
   3078     );
   3079     assert_eq!(
   3080         audit_records[0].outcome,
   3081         MycOperationAuditOutcome::Succeeded
   3082     );
   3083     assert_eq!(
   3084         audit_records[0].request_id.as_deref(),
   3085         Some(event_id.as_str())
   3086     );
   3087     assert!(
   3088         audit_records[0]
   3089             .relay_outcome_summary
   3090             .contains("startup recovery finalized previously published delivery job")
   3091     );
   3092     assert_eq!(
   3093         audit_records[1].operation,
   3094         MycOperationAuditKind::DeliveryRecovery
   3095     );
   3096     assert_eq!(
   3097         audit_records[1].outcome,
   3098         MycOperationAuditOutcome::Succeeded
   3099     );
   3100 
   3101     Ok(())
   3102 }
   3103 
   3104 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   3105 async fn explicit_nip89_publish_retries_cleanly_after_rejection() -> TestResult<()> {
   3106     let relay = TestRelay::spawn().await?;
   3107     let test_runtime =
   3108         MycTestRuntime::new_with_discovery(relay.url(), MycConnectionApproval::ExplicitUser);
   3109     let runtime = test_runtime.runtime;
   3110     let app_identity = myc::identity_files::load_encrypted_identity(
   3111         runtime
   3112             .config()
   3113             .discovery
   3114             .app_identity_path
   3115             .as_ref()
   3116             .expect("app identity path"),
   3117     )?;
   3118 
   3119     relay
   3120         .queue_publish_outcomes(app_identity.public_key(), &[false, true])
   3121         .await;
   3122 
   3123     let failed = publish_nip89_event(&runtime)
   3124         .await
   3125         .expect_err("first publish should fail");
   3126     assert!(failed.to_string().contains("Nostr publish failed"));
   3127     assert!(
   3128         relay
   3129             .published_events_by_author(app_identity.public_key())
   3130             .await
   3131             .is_empty()
   3132     );
   3133 
   3134     let first_audit = wait_for_operation_audit_count(&runtime, 1).await?;
   3135     assert_eq!(
   3136         first_audit[0].operation,
   3137         MycOperationAuditKind::DiscoveryHandlerPublish
   3138     );
   3139     assert_eq!(first_audit[0].outcome, MycOperationAuditOutcome::Rejected);
   3140     assert!(first_audit[0].connection_id.is_none());
   3141     assert!(first_audit[0].request_id.is_some());
   3142     assert_eq!(first_audit[0].relay_count, 1);
   3143     assert_eq!(first_audit[0].acknowledged_relay_count, 0);
   3144     assert!(
   3145         first_audit[0]
   3146             .relay_outcome_summary
   3147             .contains("blocked by test relay")
   3148     );
   3149     let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| {
   3150         records.len() >= 1 && records[0].status == MycDeliveryOutboxStatus::Failed
   3151     })
   3152     .await?;
   3153     assert_eq!(
   3154         outbox_records[0].kind,
   3155         MycDeliveryOutboxKind::DiscoveryHandlerPublish
   3156     );
   3157     assert_eq!(
   3158         outbox_records[0].request_id.as_deref(),
   3159         first_audit[0].request_id.as_deref()
   3160     );
   3161     assert!(outbox_records[0].attempt_id.is_none());
   3162     assert!(outbox_records[0].signer_publish_workflow_id.is_none());
   3163 
   3164     let published = publish_nip89_event(&runtime).await?;
   3165     let published_events = relay
   3166         .wait_for_published_events_by_author(app_identity.public_key(), 1)
   3167         .await?;
   3168     assert_eq!(published_events.len(), 1);
   3169     assert_eq!(published.relay_count, 1);
   3170     assert_eq!(published.acknowledged_relay_count, 1);
   3171 
   3172     let second_audit = wait_for_operation_audit_count(&runtime, 2).await?;
   3173     assert_eq!(
   3174         second_audit[1].operation,
   3175         MycOperationAuditKind::DiscoveryHandlerPublish
   3176     );
   3177     assert_eq!(second_audit[1].outcome, MycOperationAuditOutcome::Succeeded);
   3178     assert_eq!(
   3179         second_audit[1].request_id.as_deref(),
   3180         Some(published.event.id.to_hex().as_str())
   3181     );
   3182     assert_eq!(second_audit[1].relay_count, 1);
   3183     assert_eq!(second_audit[1].acknowledged_relay_count, 1);
   3184     assert!(
   3185         second_audit[1]
   3186             .relay_outcome_summary
   3187             .contains("1/1 relays acknowledged publish")
   3188     );
   3189     let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| {
   3190         records.len() >= 2 && records[1].status == MycDeliveryOutboxStatus::Finalized
   3191     })
   3192     .await?;
   3193     assert_eq!(outbox_records[0].status, MycDeliveryOutboxStatus::Failed);
   3194     assert_eq!(
   3195         outbox_records[1].kind,
   3196         MycDeliveryOutboxKind::DiscoveryHandlerPublish
   3197     );
   3198     assert_eq!(
   3199         outbox_records[1].request_id.as_deref(),
   3200         Some(published.event.id.to_hex().as_str())
   3201     );
   3202     assert!(outbox_records[1].attempt_id.is_none());
   3203     assert!(outbox_records[1].signer_publish_workflow_id.is_none());
   3204     assert!(outbox_records[1].published_at_unix.is_some());
   3205     assert!(outbox_records[1].finalized_at_unix.is_some());
   3206 
   3207     Ok(())
   3208 }
   3209 
   3210 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   3211 async fn fetch_live_nip89_reports_missing_when_handler_is_unpublished() -> TestResult<()> {
   3212     let relay = TestRelay::spawn().await?;
   3213     let test_runtime =
   3214         MycTestRuntime::new_with_discovery(relay.url(), MycConnectionApproval::ExplicitUser);
   3215 
   3216     let output = fetch_live_nip89(&test_runtime.runtime).await?;
   3217 
   3218     assert_eq!(output.handler_identifier, "myc");
   3219     assert_eq!(output.publish_relays, vec![relay.url().to_owned()]);
   3220     assert!(output.live_groups.is_empty());
   3221     assert_eq!(output.relay_states.len(), 1);
   3222     assert_eq!(
   3223         output.relay_states[0].fetch_status,
   3224         MycDiscoveryRelayFetchStatus::Available
   3225     );
   3226 
   3227     Ok(())
   3228 }
   3229 
   3230 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   3231 async fn fetch_live_nip89_fails_when_all_discovery_relays_are_unavailable() -> TestResult<()> {
   3232     let unavailable_a = unavailable_relay_url()?;
   3233     let unavailable_b = unavailable_relay_url()?;
   3234     let test_runtime = MycTestRuntime::new_with_discovery_relays(
   3235         &[unavailable_a.as_str(), unavailable_b.as_str()],
   3236         MycConnectionApproval::ExplicitUser,
   3237     );
   3238 
   3239     let error = fetch_live_nip89(&test_runtime.runtime)
   3240         .await
   3241         .expect_err("all-unavailable discovery fetch should fail");
   3242     assert!(
   3243         error
   3244             .to_string()
   3245             .contains("failed to fetch discovery state from all configured relays")
   3246     );
   3247 
   3248     let audit = wait_for_operation_audit_count(&test_runtime.runtime, 1).await?;
   3249     assert_eq!(
   3250         audit[0].operation,
   3251         MycOperationAuditKind::DiscoveryHandlerFetch
   3252     );
   3253     assert_eq!(audit[0].outcome, MycOperationAuditOutcome::Unavailable);
   3254     assert_eq!(audit[0].relay_count, 2);
   3255     assert_eq!(audit[0].acknowledged_relay_count, 0);
   3256     assert!(audit[0].relay_outcome_summary.contains(&unavailable_a));
   3257     assert!(audit[0].relay_outcome_summary.contains(&unavailable_b));
   3258 
   3259     Ok(())
   3260 }
   3261 
   3262 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   3263 async fn fetch_live_nip89_parallelizes_relay_fetch_and_preserves_configured_order() -> TestResult<()>
   3264 {
   3265     let live_relay = TestRelay::spawn().await?;
   3266     let slow_a = HangingRelay::spawn(Duration::from_secs(3)).await?;
   3267     let slow_b = HangingRelay::spawn(Duration::from_secs(3)).await?;
   3268     let slow_c = HangingRelay::spawn(Duration::from_secs(3)).await?;
   3269     let slow_d = HangingRelay::spawn(Duration::from_secs(3)).await?;
   3270     let relay_urls = [
   3271         slow_a.url(),
   3272         live_relay.url(),
   3273         slow_b.url(),
   3274         slow_c.url(),
   3275         slow_d.url(),
   3276     ];
   3277     let mut expected_relay_states = vec![
   3278         (
   3279             slow_a.url().to_owned(),
   3280             MycDiscoveryRelayFetchStatus::Unavailable,
   3281         ),
   3282         (
   3283             live_relay.url().to_owned(),
   3284             MycDiscoveryRelayFetchStatus::Available,
   3285         ),
   3286         (
   3287             slow_b.url().to_owned(),
   3288             MycDiscoveryRelayFetchStatus::Unavailable,
   3289         ),
   3290         (
   3291             slow_c.url().to_owned(),
   3292             MycDiscoveryRelayFetchStatus::Unavailable,
   3293         ),
   3294         (
   3295             slow_d.url().to_owned(),
   3296             MycDiscoveryRelayFetchStatus::Unavailable,
   3297         ),
   3298     ];
   3299     expected_relay_states.sort_by(|left, right| left.0.cmp(&right.0));
   3300     let expected_relay_urls = expected_relay_states
   3301         .iter()
   3302         .map(|(relay_url, _)| relay_url.clone())
   3303         .collect::<Vec<_>>();
   3304     let test_runtime = MycTestRuntime::new_with_discovery_relays_and_timeout(
   3305         &relay_urls,
   3306         MycConnectionApproval::ExplicitUser,
   3307         1,
   3308     );
   3309 
   3310     let started_at = Instant::now();
   3311     let output = fetch_live_nip89(&test_runtime.runtime).await?;
   3312     let elapsed = started_at.elapsed();
   3313 
   3314     assert!(
   3315         elapsed < Duration::from_millis(2500),
   3316         "expected concurrent relay fetch to finish under 2.5s, got {:?}",
   3317         elapsed
   3318     );
   3319     assert_eq!(
   3320         output
   3321             .relay_states
   3322             .iter()
   3323             .map(|relay_state| relay_state.relay_url.clone())
   3324             .collect::<Vec<_>>(),
   3325         expected_relay_urls
   3326     );
   3327     assert_eq!(
   3328         output
   3329             .relay_states
   3330             .iter()
   3331             .map(|relay_state| relay_state.fetch_status)
   3332             .collect::<Vec<_>>(),
   3333         expected_relay_states
   3334             .iter()
   3335             .map(|(_, fetch_status)| *fetch_status)
   3336             .collect::<Vec<_>>()
   3337     );
   3338     for relay_state in &output.relay_states {
   3339         if relay_state.fetch_status == MycDiscoveryRelayFetchStatus::Available {
   3340             assert!(relay_state.fetch_error.is_none());
   3341             assert!(relay_state.live_groups.is_empty());
   3342         } else {
   3343             assert!(relay_state.fetch_error.is_some());
   3344         }
   3345     }
   3346 
   3347     Ok(())
   3348 }
   3349 
   3350 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   3351 async fn diff_live_nip89_reports_matched_after_publish() -> TestResult<()> {
   3352     let relay = TestRelay::spawn().await?;
   3353     let test_runtime =
   3354         MycTestRuntime::new_with_discovery(relay.url(), MycConnectionApproval::ExplicitUser);
   3355     let runtime = test_runtime.runtime;
   3356     let app_identity = myc::identity_files::load_encrypted_identity(
   3357         runtime
   3358             .config()
   3359             .discovery
   3360             .app_identity_path
   3361             .as_ref()
   3362             .expect("app identity path"),
   3363     )?;
   3364 
   3365     relay
   3366         .queue_publish_outcomes(app_identity.public_key(), &[true])
   3367         .await;
   3368     let published = publish_nip89_event(&runtime).await?;
   3369     relay
   3370         .wait_for_published_events_by_author(app_identity.public_key(), 1)
   3371         .await?;
   3372 
   3373     let diff = diff_live_nip89(&runtime).await?;
   3374 
   3375     assert_eq!(diff.status, MycDiscoveryLiveStatus::Matched);
   3376     assert!(diff.differing_fields.is_empty());
   3377     assert_eq!(diff.live_groups.len(), 1);
   3378     let live_event = diff.live_groups[0]
   3379         .events
   3380         .last()
   3381         .cloned()
   3382         .expect("live event");
   3383     assert_eq!(live_event.event_id_hex, published.event.id.to_hex());
   3384     assert_eq!(
   3385         live_event.handler.author_public_key_hex,
   3386         app_identity.public_key_hex()
   3387     );
   3388     assert_eq!(live_event.handler.kinds, vec![24_133]);
   3389 
   3390     Ok(())
   3391 }
   3392 
   3393 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   3394 async fn refresh_nip89_publishes_when_live_handler_is_missing() -> TestResult<()> {
   3395     let relay = TestRelay::spawn().await?;
   3396     let test_runtime =
   3397         MycTestRuntime::new_with_discovery(relay.url(), MycConnectionApproval::ExplicitUser);
   3398     let runtime = test_runtime.runtime;
   3399     let app_identity = myc::identity_files::load_encrypted_identity(
   3400         runtime
   3401             .config()
   3402             .discovery
   3403             .app_identity_path
   3404             .as_ref()
   3405             .expect("app identity path"),
   3406     )?;
   3407 
   3408     relay
   3409         .queue_publish_outcomes(app_identity.public_key(), &[true])
   3410         .await;
   3411 
   3412     let refreshed = refresh_nip89(&runtime, false).await?;
   3413 
   3414     assert_eq!(refreshed.status, MycDiscoveryLiveStatus::Missing);
   3415     assert_eq!(refreshed.differing_fields, vec!["live_groups".to_owned()]);
   3416     assert!(refreshed.live_groups.is_empty());
   3417     assert!(refreshed.published.is_some());
   3418     assert_eq!(refreshed.repair_summary.repaired, 1);
   3419     assert_eq!(refreshed.repair_summary.failed, 0);
   3420     assert_eq!(refreshed.repair_summary.unchanged, 0);
   3421     assert_eq!(refreshed.repair_summary.skipped, 0);
   3422     assert_eq!(refreshed.remaining_repair_relays, Vec::<String>::new());
   3423     assert_eq!(refreshed.repair_results.len(), 1);
   3424     assert_eq!(
   3425         refreshed.repair_results[0].outcome,
   3426         MycDiscoveryRepairOutcome::Repaired
   3427     );
   3428 
   3429     let audit = wait_for_operation_audit_count(&runtime, 3).await?;
   3430     assert_eq!(
   3431         audit[0].operation,
   3432         MycOperationAuditKind::DiscoveryHandlerCompare
   3433     );
   3434     assert_eq!(audit[0].outcome, MycOperationAuditOutcome::Missing);
   3435     assert_eq!(
   3436         audit[1].operation,
   3437         MycOperationAuditKind::DiscoveryHandlerPublish
   3438     );
   3439     assert_eq!(audit[1].outcome, MycOperationAuditOutcome::Succeeded);
   3440     assert_eq!(
   3441         audit[2].operation,
   3442         MycOperationAuditKind::DiscoveryHandlerRepair
   3443     );
   3444     assert_eq!(audit[2].outcome, MycOperationAuditOutcome::Succeeded);
   3445     let published = refreshed
   3446         .published
   3447         .as_ref()
   3448         .expect("published discovery output");
   3449     let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| {
   3450         records.len() >= 1 && records[0].status == MycDeliveryOutboxStatus::Finalized
   3451     })
   3452     .await?;
   3453     assert_eq!(
   3454         outbox_records[0].kind,
   3455         MycDeliveryOutboxKind::DiscoveryHandlerPublish
   3456     );
   3457     assert_eq!(
   3458         outbox_records[0].request_id.as_deref(),
   3459         Some(published.event.id.to_hex().as_str())
   3460     );
   3461     assert_eq!(
   3462         outbox_records[0].attempt_id.as_deref(),
   3463         Some(refreshed.attempt_id.as_str())
   3464     );
   3465     assert!(outbox_records[0].signer_publish_workflow_id.is_none());
   3466     assert!(outbox_records[0].published_at_unix.is_some());
   3467     assert!(outbox_records[0].finalized_at_unix.is_some());
   3468 
   3469     Ok(())
   3470 }
   3471 
   3472 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   3473 async fn refresh_nip89_repairs_missing_relays_without_republishing_matched_relays() -> TestResult<()>
   3474 {
   3475     let relay_a = TestRelay::spawn().await?;
   3476     let relay_b = TestRelay::spawn().await?;
   3477     let test_runtime = MycTestRuntime::new_with_discovery_relays(
   3478         &[relay_a.url(), relay_b.url()],
   3479         MycConnectionApproval::ExplicitUser,
   3480     );
   3481     let runtime = test_runtime.runtime;
   3482     let app_identity = myc::identity_files::load_encrypted_identity(
   3483         runtime
   3484             .config()
   3485             .discovery
   3486             .app_identity_path
   3487             .as_ref()
   3488             .expect("app identity path"),
   3489     )?;
   3490 
   3491     let matched_event = MycDiscoveryContext::from_runtime(&runtime)?
   3492         .build_signed_handler_event()
   3493         .expect("matched event");
   3494     publish_signed_event(relay_a.url(), &app_identity, &matched_event).await?;
   3495     relay_a
   3496         .wait_for_published_events_by_author(app_identity.public_key(), 1)
   3497         .await?;
   3498 
   3499     relay_b
   3500         .queue_publish_outcomes(app_identity.public_key(), &[true])
   3501         .await;
   3502     let refreshed = refresh_nip89(&runtime, false).await?;
   3503     let published = refreshed.published.expect("published output");
   3504 
   3505     assert_eq!(refreshed.status, MycDiscoveryLiveStatus::Matched);
   3506     assert_eq!(published.publish_relays, vec![relay_b.url().to_owned()]);
   3507     assert_eq!(published.relay_count, 1);
   3508     assert_eq!(published.acknowledged_relay_count, 1);
   3509     assert_eq!(refreshed.repair_summary.repaired, 1);
   3510     assert_eq!(refreshed.repair_summary.failed, 0);
   3511     assert_eq!(refreshed.repair_summary.unchanged, 1);
   3512     assert_eq!(refreshed.repair_summary.skipped, 0);
   3513     assert_eq!(refreshed.remaining_repair_relays, Vec::<String>::new());
   3514     assert_eq!(refreshed.repair_results.len(), 2);
   3515     assert_eq!(
   3516         refreshed
   3517             .repair_results
   3518             .iter()
   3519             .find(|result| result.relay_url == relay_a.url())
   3520             .expect("matched relay repair result")
   3521             .outcome,
   3522         MycDiscoveryRepairOutcome::Unchanged
   3523     );
   3524     assert_eq!(
   3525         refreshed
   3526             .repair_results
   3527             .iter()
   3528             .find(|result| result.relay_url == relay_b.url())
   3529             .expect("repaired relay result")
   3530             .outcome,
   3531         MycDiscoveryRepairOutcome::Repaired
   3532     );
   3533 
   3534     relay_b
   3535         .wait_for_published_events_by_author(app_identity.public_key(), 1)
   3536         .await?;
   3537     assert_eq!(
   3538         relay_a
   3539             .published_events_by_author(app_identity.public_key())
   3540             .await
   3541             .len(),
   3542         1
   3543     );
   3544     assert_eq!(
   3545         relay_b
   3546             .published_events_by_author(app_identity.public_key())
   3547             .await
   3548             .len(),
   3549         1
   3550     );
   3551 
   3552     let diff = diff_live_nip89(&runtime).await?;
   3553     assert_eq!(diff.status, MycDiscoveryLiveStatus::Matched);
   3554     assert_eq!(diff.relay_summary.matched_relays.len(), 2);
   3555     assert!(diff.relay_summary.missing_relays.is_empty());
   3556 
   3557     Ok(())
   3558 }
   3559 
   3560 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   3561 async fn refresh_nip89_skips_when_live_handler_matches() -> TestResult<()> {
   3562     let relay = TestRelay::spawn().await?;
   3563     let test_runtime =
   3564         MycTestRuntime::new_with_discovery(relay.url(), MycConnectionApproval::ExplicitUser);
   3565     let runtime = test_runtime.runtime;
   3566     let app_identity = myc::identity_files::load_encrypted_identity(
   3567         runtime
   3568             .config()
   3569             .discovery
   3570             .app_identity_path
   3571             .as_ref()
   3572             .expect("app identity path"),
   3573     )?;
   3574 
   3575     relay
   3576         .queue_publish_outcomes(app_identity.public_key(), &[true])
   3577         .await;
   3578     publish_nip89_event(&runtime).await?;
   3579     relay
   3580         .wait_for_published_events_by_author(app_identity.public_key(), 1)
   3581         .await?;
   3582 
   3583     let refreshed = refresh_nip89(&runtime, false).await?;
   3584 
   3585     assert_eq!(refreshed.status, MycDiscoveryLiveStatus::Matched);
   3586     assert!(refreshed.differing_fields.is_empty());
   3587     assert_eq!(refreshed.live_groups.len(), 1);
   3588     assert!(refreshed.published.is_none());
   3589     assert_eq!(refreshed.repair_summary.repaired, 0);
   3590     assert_eq!(refreshed.repair_summary.failed, 0);
   3591     assert_eq!(refreshed.repair_summary.unchanged, 1);
   3592     assert_eq!(refreshed.repair_summary.skipped, 0);
   3593     assert_eq!(refreshed.remaining_repair_relays, Vec::<String>::new());
   3594     assert_eq!(refreshed.repair_results.len(), 1);
   3595     assert_eq!(
   3596         refreshed.repair_results[0].outcome,
   3597         MycDiscoveryRepairOutcome::Unchanged
   3598     );
   3599 
   3600     let audit = wait_for_operation_audit_count(&runtime, 4).await?;
   3601     assert_eq!(
   3602         audit[1].operation,
   3603         MycOperationAuditKind::DiscoveryHandlerCompare
   3604     );
   3605     assert_eq!(audit[1].outcome, MycOperationAuditOutcome::Matched);
   3606     assert_eq!(
   3607         audit[2].operation,
   3608         MycOperationAuditKind::DiscoveryHandlerRepair
   3609     );
   3610     assert_eq!(audit[2].outcome, MycOperationAuditOutcome::Matched);
   3611     assert_eq!(
   3612         audit[3].operation,
   3613         MycOperationAuditKind::DiscoveryHandlerRefresh
   3614     );
   3615     assert_eq!(audit[3].outcome, MycOperationAuditOutcome::Skipped);
   3616 
   3617     Ok(())
   3618 }
   3619 
   3620 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   3621 async fn refresh_nip89_republishes_when_live_handler_drifted() -> TestResult<()> {
   3622     let relay = TestRelay::spawn().await?;
   3623     let test_runtime =
   3624         MycTestRuntime::new_with_discovery(relay.url(), MycConnectionApproval::ExplicitUser);
   3625     let runtime = test_runtime.runtime;
   3626     let app_identity = myc::identity_files::load_encrypted_identity(
   3627         runtime
   3628             .config()
   3629             .discovery
   3630             .app_identity_path
   3631             .as_ref()
   3632             .expect("app identity path"),
   3633     )?;
   3634 
   3635     let mut drifted_spec = RadrootsNostrApplicationHandlerSpec::new(vec![24_133]);
   3636     drifted_spec.identifier = Some("myc".to_owned());
   3637     drifted_spec.relays = vec!["wss://wrong.example.com".to_owned()];
   3638     drifted_spec.nostrconnect_url =
   3639         Some("https://wrong.example.com/connect?uri=nostrconnect%3A%2F%2Fstale".to_owned());
   3640     let mut metadata = RadrootsNostrMetadata::default();
   3641     metadata.name = Some("stale".to_owned());
   3642     drifted_spec.metadata = Some(metadata);
   3643     publish_handler_event(relay.url(), &app_identity, &drifted_spec).await?;
   3644     relay
   3645         .wait_for_published_events_by_author(app_identity.public_key(), 1)
   3646         .await?;
   3647 
   3648     relay
   3649         .queue_publish_outcomes(app_identity.public_key(), &[true])
   3650         .await;
   3651     let refreshed = refresh_nip89(&runtime, false).await?;
   3652 
   3653     assert_eq!(refreshed.status, MycDiscoveryLiveStatus::Drifted);
   3654     assert_eq!(refreshed.live_groups.len(), 1);
   3655     assert!(refreshed.published.is_some());
   3656     assert_eq!(refreshed.repair_summary.repaired, 1);
   3657     assert_eq!(refreshed.repair_summary.failed, 0);
   3658     assert_eq!(refreshed.repair_summary.unchanged, 0);
   3659     assert_eq!(refreshed.repair_summary.skipped, 0);
   3660     assert_eq!(refreshed.remaining_repair_relays, Vec::<String>::new());
   3661     assert_eq!(refreshed.repair_results.len(), 1);
   3662     assert_eq!(
   3663         refreshed.repair_results[0].outcome,
   3664         MycDiscoveryRepairOutcome::Repaired
   3665     );
   3666     assert!(
   3667         refreshed
   3668             .differing_fields
   3669             .iter()
   3670             .any(|field| field == "relays" || field == "nostrconnect_url" || field == "metadata")
   3671     );
   3672 
   3673     let audit = wait_for_operation_audit_count(&runtime, 3).await?;
   3674     assert_eq!(
   3675         audit[0].operation,
   3676         MycOperationAuditKind::DiscoveryHandlerCompare
   3677     );
   3678     assert_eq!(audit[0].outcome, MycOperationAuditOutcome::Drifted);
   3679     assert_eq!(
   3680         audit[1].operation,
   3681         MycOperationAuditKind::DiscoveryHandlerPublish
   3682     );
   3683     assert_eq!(audit[1].outcome, MycOperationAuditOutcome::Succeeded);
   3684     assert_eq!(
   3685         audit[2].operation,
   3686         MycOperationAuditKind::DiscoveryHandlerRepair
   3687     );
   3688     assert_eq!(audit[2].outcome, MycOperationAuditOutcome::Succeeded);
   3689 
   3690     Ok(())
   3691 }
   3692 
   3693 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   3694 async fn refresh_nip89_repairs_drifted_relays_without_force_when_other_relays_match()
   3695 -> TestResult<()> {
   3696     let relay_a = TestRelay::spawn().await?;
   3697     let relay_b = TestRelay::spawn().await?;
   3698     let test_runtime = MycTestRuntime::new_with_discovery_relays(
   3699         &[relay_a.url(), relay_b.url()],
   3700         MycConnectionApproval::ExplicitUser,
   3701     );
   3702     let runtime = test_runtime.runtime;
   3703     let app_identity = myc::identity_files::load_encrypted_identity(
   3704         runtime
   3705             .config()
   3706             .discovery
   3707             .app_identity_path
   3708             .as_ref()
   3709             .expect("app identity path"),
   3710     )?;
   3711 
   3712     let matched_event = MycDiscoveryContext::from_runtime(&runtime)?
   3713         .build_signed_handler_event()
   3714         .expect("matched event");
   3715     publish_signed_event(relay_a.url(), &app_identity, &matched_event).await?;
   3716 
   3717     let mut drifted_spec = RadrootsNostrApplicationHandlerSpec::new(vec![24_133]);
   3718     drifted_spec.identifier = Some("myc".to_owned());
   3719     drifted_spec.relays = vec!["wss://stale.example.com".to_owned()];
   3720     publish_handler_event(relay_b.url(), &app_identity, &drifted_spec).await?;
   3721 
   3722     relay_a
   3723         .wait_for_published_events_by_author(app_identity.public_key(), 1)
   3724         .await?;
   3725     relay_b
   3726         .wait_for_published_events_by_author(app_identity.public_key(), 1)
   3727         .await?;
   3728 
   3729     relay_b
   3730         .queue_publish_outcomes(app_identity.public_key(), &[true])
   3731         .await;
   3732     let refreshed = refresh_nip89(&runtime, false).await?;
   3733     let published = refreshed.published.expect("published output");
   3734 
   3735     assert_eq!(refreshed.status, MycDiscoveryLiveStatus::Conflicted);
   3736     assert_eq!(published.publish_relays, vec![relay_b.url().to_owned()]);
   3737     assert_eq!(published.relay_count, 1);
   3738     assert_eq!(published.acknowledged_relay_count, 1);
   3739     assert_eq!(refreshed.repair_summary.repaired, 1);
   3740     assert_eq!(refreshed.repair_summary.failed, 0);
   3741     assert_eq!(refreshed.repair_summary.unchanged, 1);
   3742     assert_eq!(refreshed.repair_summary.skipped, 0);
   3743     assert_eq!(refreshed.remaining_repair_relays, Vec::<String>::new());
   3744     assert_eq!(refreshed.repair_results.len(), 2);
   3745     assert_eq!(
   3746         refreshed
   3747             .repair_results
   3748             .iter()
   3749             .find(|result| result.relay_url == relay_a.url())
   3750             .expect("matched relay result")
   3751             .outcome,
   3752         MycDiscoveryRepairOutcome::Unchanged
   3753     );
   3754     assert_eq!(
   3755         refreshed
   3756             .repair_results
   3757             .iter()
   3758             .find(|result| result.relay_url == relay_b.url())
   3759             .expect("repaired relay result")
   3760             .outcome,
   3761         MycDiscoveryRepairOutcome::Repaired
   3762     );
   3763 
   3764     relay_b
   3765         .wait_for_published_events_by_author(app_identity.public_key(), 2)
   3766         .await?;
   3767     assert_eq!(
   3768         relay_a
   3769             .published_events_by_author(app_identity.public_key())
   3770             .await
   3771             .len(),
   3772         1
   3773     );
   3774     assert_eq!(
   3775         relay_b
   3776             .published_events_by_author(app_identity.public_key())
   3777             .await
   3778             .len(),
   3779         2
   3780     );
   3781 
   3782     Ok(())
   3783 }
   3784 
   3785 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   3786 async fn refresh_nip89_reports_remaining_relays_after_mixed_targeted_repair() -> TestResult<()> {
   3787     let relay_a = TestRelay::spawn().await?;
   3788     let relay_b = TestRelay::spawn().await?;
   3789     let test_runtime = MycTestRuntime::new_with_discovery_relays(
   3790         &[relay_a.url(), relay_b.url()],
   3791         MycConnectionApproval::ExplicitUser,
   3792     );
   3793     let runtime = test_runtime.runtime;
   3794     let app_identity = myc::identity_files::load_encrypted_identity(
   3795         runtime
   3796             .config()
   3797             .discovery
   3798             .app_identity_path
   3799             .as_ref()
   3800             .expect("app identity path"),
   3801     )?;
   3802 
   3803     relay_a
   3804         .queue_publish_outcomes(app_identity.public_key(), &[true])
   3805         .await;
   3806     relay_b
   3807         .queue_publish_outcomes(app_identity.public_key(), &[false])
   3808         .await;
   3809 
   3810     let refreshed = refresh_nip89(&runtime, false).await?;
   3811     let published = refreshed.published.expect("published output");
   3812 
   3813     assert_eq!(refreshed.status, MycDiscoveryLiveStatus::Missing);
   3814     assert_eq!(
   3815         published.publish_relays,
   3816         vec![relay_a.url().to_owned(), relay_b.url().to_owned()]
   3817     );
   3818     assert_eq!(published.relay_count, 2);
   3819     assert_eq!(published.acknowledged_relay_count, 1);
   3820     assert_eq!(published.relay_results.len(), 2);
   3821     assert_eq!(refreshed.repair_summary.repaired, 1);
   3822     assert_eq!(refreshed.repair_summary.failed, 1);
   3823     assert_eq!(refreshed.repair_summary.unchanged, 0);
   3824     assert_eq!(refreshed.repair_summary.skipped, 0);
   3825     assert_eq!(refreshed.repair_results.len(), 2);
   3826     assert_eq!(
   3827         refreshed.remaining_repair_relays,
   3828         vec![relay_b.url().to_owned()]
   3829     );
   3830 
   3831     let repaired = refreshed
   3832         .repair_results
   3833         .iter()
   3834         .find(|result| result.relay_url == relay_a.url())
   3835         .expect("repaired relay result");
   3836     assert_eq!(repaired.outcome, MycDiscoveryRepairOutcome::Repaired);
   3837 
   3838     let failed = refreshed
   3839         .repair_results
   3840         .iter()
   3841         .find(|result| result.relay_url == relay_b.url())
   3842         .expect("failed relay result");
   3843     assert_eq!(failed.outcome, MycDiscoveryRepairOutcome::Failed);
   3844     assert!(
   3845         failed
   3846             .detail
   3847             .as_deref()
   3848             .unwrap_or_default()
   3849             .contains("blocked by test relay")
   3850     );
   3851 
   3852     relay_a
   3853         .wait_for_published_events_by_author(app_identity.public_key(), 1)
   3854         .await?;
   3855     assert_eq!(
   3856         relay_b
   3857             .published_events_by_author(app_identity.public_key())
   3858             .await
   3859             .len(),
   3860         0
   3861     );
   3862 
   3863     let diff = diff_live_nip89(&runtime).await?;
   3864     assert_eq!(diff.status, MycDiscoveryLiveStatus::Matched);
   3865     assert_eq!(
   3866         diff.relay_summary.matched_relays,
   3867         vec![relay_a.url().to_owned()]
   3868     );
   3869     assert_eq!(
   3870         diff.relay_summary.missing_relays,
   3871         vec![relay_b.url().to_owned()]
   3872     );
   3873 
   3874     let audit = wait_for_operation_audit_count(&runtime, 4).await?;
   3875     assert_eq!(
   3876         audit[0].operation,
   3877         MycOperationAuditKind::DiscoveryHandlerCompare
   3878     );
   3879     assert_eq!(audit[0].outcome, MycOperationAuditOutcome::Missing);
   3880     assert_eq!(
   3881         audit[1].operation,
   3882         MycOperationAuditKind::DiscoveryHandlerPublish
   3883     );
   3884     assert_eq!(audit[1].outcome, MycOperationAuditOutcome::Succeeded);
   3885     assert_eq!(
   3886         audit[2].operation,
   3887         MycOperationAuditKind::DiscoveryHandlerRepair
   3888     );
   3889     assert_eq!(audit[2].outcome, MycOperationAuditOutcome::Succeeded);
   3890     assert_eq!(audit[2].relay_url.as_deref(), Some(relay_a.url()));
   3891     assert_eq!(
   3892         audit[3].operation,
   3893         MycOperationAuditKind::DiscoveryHandlerRepair
   3894     );
   3895     assert_eq!(audit[3].outcome, MycOperationAuditOutcome::Rejected);
   3896     assert_eq!(audit[3].relay_url.as_deref(), Some(relay_b.url()));
   3897 
   3898     Ok(())
   3899 }
   3900 
   3901 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   3902 async fn diff_live_nip89_reports_conflicted_when_live_groups_disagree() -> TestResult<()> {
   3903     let relay = TestRelay::spawn().await?;
   3904     let test_runtime =
   3905         MycTestRuntime::new_with_discovery(relay.url(), MycConnectionApproval::ExplicitUser);
   3906     let runtime = test_runtime.runtime;
   3907     let app_identity = myc::identity_files::load_encrypted_identity(
   3908         runtime
   3909             .config()
   3910             .discovery
   3911             .app_identity_path
   3912             .as_ref()
   3913             .expect("app identity path"),
   3914     )?;
   3915 
   3916     let mut first_spec = RadrootsNostrApplicationHandlerSpec::new(vec![24_133]);
   3917     first_spec.identifier = Some("myc".to_owned());
   3918     first_spec.relays = vec!["wss://relay-a.example.com".to_owned()];
   3919     publish_handler_event(relay.url(), &app_identity, &first_spec).await?;
   3920 
   3921     let mut second_spec = RadrootsNostrApplicationHandlerSpec::new(vec![24_133]);
   3922     second_spec.identifier = Some("myc".to_owned());
   3923     second_spec.relays = vec!["wss://relay-b.example.com".to_owned()];
   3924     publish_handler_event(relay.url(), &app_identity, &second_spec).await?;
   3925 
   3926     relay
   3927         .wait_for_published_events_by_author(app_identity.public_key(), 2)
   3928         .await?;
   3929 
   3930     let diff = diff_live_nip89(&runtime).await?;
   3931 
   3932     assert_eq!(diff.status, MycDiscoveryLiveStatus::Conflicted);
   3933     assert_eq!(diff.differing_fields, vec!["live_groups".to_owned()]);
   3934     assert_eq!(diff.live_groups.len(), 2);
   3935 
   3936     Ok(())
   3937 }
   3938 
   3939 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   3940 async fn diff_live_nip89_surfaces_relay_divergence_with_provenance() -> TestResult<()> {
   3941     let relay_a = TestRelay::spawn().await?;
   3942     let relay_b = TestRelay::spawn().await?;
   3943     let test_runtime = MycTestRuntime::new_with_discovery_relays(
   3944         &[relay_a.url(), relay_b.url()],
   3945         MycConnectionApproval::ExplicitUser,
   3946     );
   3947     let runtime = test_runtime.runtime;
   3948     let app_identity = myc::identity_files::load_encrypted_identity(
   3949         runtime
   3950             .config()
   3951             .discovery
   3952             .app_identity_path
   3953             .as_ref()
   3954             .expect("app identity path"),
   3955     )?;
   3956 
   3957     let matched_event = MycDiscoveryContext::from_runtime(&runtime)?
   3958         .build_signed_handler_event()
   3959         .expect("matched event");
   3960     publish_signed_event(relay_a.url(), &app_identity, &matched_event).await?;
   3961 
   3962     let mut drifted_spec = RadrootsNostrApplicationHandlerSpec::new(vec![24_133]);
   3963     drifted_spec.identifier = Some("myc".to_owned());
   3964     drifted_spec.relays = vec!["wss://stale.example.com".to_owned()];
   3965     let mut drifted_metadata = RadrootsNostrMetadata::default();
   3966     drifted_metadata.name = Some("stale".to_owned());
   3967     drifted_spec.metadata = Some(drifted_metadata);
   3968     publish_handler_event(relay_b.url(), &app_identity, &drifted_spec).await?;
   3969 
   3970     relay_a
   3971         .wait_for_published_events_by_author(app_identity.public_key(), 1)
   3972         .await?;
   3973     relay_b
   3974         .wait_for_published_events_by_author(app_identity.public_key(), 1)
   3975         .await?;
   3976 
   3977     let diff = diff_live_nip89(&runtime).await?;
   3978 
   3979     assert_eq!(diff.status, MycDiscoveryLiveStatus::Conflicted);
   3980     assert_eq!(diff.live_groups.len(), 2);
   3981     assert_eq!(diff.relay_states.len(), 2);
   3982     assert_eq!(diff.relay_summary.total_relays, 2);
   3983     assert_eq!(
   3984         diff.relay_summary.matched_relays,
   3985         vec![relay_a.url().to_owned()]
   3986     );
   3987     assert_eq!(
   3988         diff.relay_summary.drifted_relays,
   3989         vec![relay_b.url().to_owned()]
   3990     );
   3991     assert!(diff.relay_summary.unavailable_relays.is_empty());
   3992     assert!(diff.relay_summary.missing_relays.is_empty());
   3993     assert!(diff.relay_summary.conflicted_relays.is_empty());
   3994 
   3995     let matched_relay = diff
   3996         .relay_states
   3997         .iter()
   3998         .find(|relay_state| relay_state.relay_url == relay_a.url())
   3999         .expect("matched relay");
   4000     assert_eq!(
   4001         matched_relay.fetch_status,
   4002         MycDiscoveryRelayFetchStatus::Available
   4003     );
   4004     assert_eq!(
   4005         matched_relay.live_status,
   4006         Some(MycDiscoveryLiveStatus::Matched)
   4007     );
   4008     assert_eq!(matched_relay.live_groups.len(), 1);
   4009     assert_eq!(
   4010         matched_relay.live_groups[0].source_relays,
   4011         vec![relay_a.url().to_owned()]
   4012     );
   4013 
   4014     let drifted_relay = diff
   4015         .relay_states
   4016         .iter()
   4017         .find(|relay_state| relay_state.relay_url == relay_b.url())
   4018         .expect("drifted relay");
   4019     assert_eq!(
   4020         drifted_relay.fetch_status,
   4021         MycDiscoveryRelayFetchStatus::Available
   4022     );
   4023     assert_eq!(
   4024         drifted_relay.live_status,
   4025         Some(MycDiscoveryLiveStatus::Drifted)
   4026     );
   4027     assert_eq!(drifted_relay.live_groups.len(), 1);
   4028     assert_eq!(
   4029         drifted_relay.live_groups[0].source_relays,
   4030         vec![relay_b.url().to_owned()]
   4031     );
   4032 
   4033     let live_group_relays = diff
   4034         .live_groups
   4035         .iter()
   4036         .map(|group| group.source_relays.clone())
   4037         .collect::<Vec<_>>();
   4038     assert!(live_group_relays.contains(&vec![relay_a.url().to_owned()]));
   4039     assert!(live_group_relays.contains(&vec![relay_b.url().to_owned()]));
   4040 
   4041     Ok(())
   4042 }
   4043 
   4044 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   4045 async fn refresh_nip89_requires_force_when_any_discovery_relay_is_unavailable() -> TestResult<()> {
   4046     let relay = TestRelay::spawn().await?;
   4047     let unavailable_relay = unavailable_relay_url()?;
   4048     let test_runtime = MycTestRuntime::new_with_discovery_relays(
   4049         &[relay.url(), unavailable_relay.as_str()],
   4050         MycConnectionApproval::ExplicitUser,
   4051     );
   4052     let runtime = test_runtime.runtime;
   4053     let app_identity = myc::identity_files::load_encrypted_identity(
   4054         runtime
   4055             .config()
   4056             .discovery
   4057             .app_identity_path
   4058             .as_ref()
   4059             .expect("app identity path"),
   4060     )?;
   4061 
   4062     let diff = diff_live_nip89(&runtime).await?;
   4063     assert_eq!(diff.status, MycDiscoveryLiveStatus::Missing);
   4064     assert_eq!(
   4065         diff.relay_summary.unavailable_relays,
   4066         vec![unavailable_relay.clone()]
   4067     );
   4068     assert_eq!(
   4069         diff.relay_summary.missing_relays,
   4070         vec![relay.url().to_owned()]
   4071     );
   4072 
   4073     let unavailable_state = diff
   4074         .relay_states
   4075         .iter()
   4076         .find(|relay_state| relay_state.relay_url == unavailable_relay)
   4077         .expect("unavailable relay");
   4078     assert_eq!(
   4079         unavailable_state.fetch_status,
   4080         MycDiscoveryRelayFetchStatus::Unavailable
   4081     );
   4082     assert_eq!(unavailable_state.live_status, None);
   4083     assert!(unavailable_state.fetch_error.is_some());
   4084 
   4085     let error = refresh_nip89(&runtime, false)
   4086         .await
   4087         .expect_err("refresh without force should fail when a relay is unavailable");
   4088     assert!(error.to_string().contains("unavailable"));
   4089 
   4090     let audit = wait_for_operation_audit_count(&runtime, 4).await?;
   4091     assert_eq!(
   4092         audit[0].operation,
   4093         MycOperationAuditKind::DiscoveryHandlerFetch
   4094     );
   4095     assert_eq!(audit[0].outcome, MycOperationAuditOutcome::Unavailable);
   4096     assert_eq!(
   4097         audit[1].operation,
   4098         MycOperationAuditKind::DiscoveryHandlerFetch
   4099     );
   4100     assert_eq!(audit[1].outcome, MycOperationAuditOutcome::Unavailable);
   4101     assert_eq!(
   4102         audit[2].operation,
   4103         MycOperationAuditKind::DiscoveryHandlerCompare
   4104     );
   4105     assert_eq!(audit[2].outcome, MycOperationAuditOutcome::Missing);
   4106     assert_eq!(
   4107         audit[3].operation,
   4108         MycOperationAuditKind::DiscoveryHandlerRefresh
   4109     );
   4110     assert_eq!(audit[3].outcome, MycOperationAuditOutcome::Unavailable);
   4111 
   4112     relay
   4113         .queue_publish_outcomes(app_identity.public_key(), &[true])
   4114         .await;
   4115     let refreshed = refresh_nip89(&runtime, true).await?;
   4116     assert_eq!(refreshed.status, MycDiscoveryLiveStatus::Missing);
   4117     assert_eq!(
   4118         refreshed.relay_summary.unavailable_relays,
   4119         vec![unavailable_relay.clone()]
   4120     );
   4121     assert!(refreshed.published.is_some());
   4122 
   4123     Ok(())
   4124 }
   4125 
   4126 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
   4127 async fn refresh_nip89_requires_force_when_live_handler_is_conflicted() -> TestResult<()> {
   4128     let relay = TestRelay::spawn().await?;
   4129     let test_runtime =
   4130         MycTestRuntime::new_with_discovery(relay.url(), MycConnectionApproval::ExplicitUser);
   4131     let runtime = test_runtime.runtime;
   4132     let app_identity = myc::identity_files::load_encrypted_identity(
   4133         runtime
   4134             .config()
   4135             .discovery
   4136             .app_identity_path
   4137             .as_ref()
   4138             .expect("app identity path"),
   4139     )?;
   4140 
   4141     let mut first_spec = RadrootsNostrApplicationHandlerSpec::new(vec![24_133]);
   4142     first_spec.identifier = Some("myc".to_owned());
   4143     first_spec.relays = vec!["wss://relay-a.example.com".to_owned()];
   4144     publish_handler_event(relay.url(), &app_identity, &first_spec).await?;
   4145 
   4146     let mut second_spec = RadrootsNostrApplicationHandlerSpec::new(vec![24_133]);
   4147     second_spec.identifier = Some("myc".to_owned());
   4148     second_spec.relays = vec!["wss://relay-b.example.com".to_owned()];
   4149     publish_handler_event(relay.url(), &app_identity, &second_spec).await?;
   4150 
   4151     relay
   4152         .wait_for_published_events_by_author(app_identity.public_key(), 2)
   4153         .await?;
   4154 
   4155     let error = refresh_nip89(&runtime, false)
   4156         .await
   4157         .expect_err("conflicted refresh without force should fail");
   4158     assert!(
   4159         error
   4160             .to_string()
   4161             .contains("live discovery handler state is conflicted")
   4162     );
   4163 
   4164     let audit = wait_for_operation_audit_count(&runtime, 2).await?;
   4165     assert_eq!(
   4166         audit[0].operation,
   4167         MycOperationAuditKind::DiscoveryHandlerCompare
   4168     );
   4169     assert_eq!(audit[0].outcome, MycOperationAuditOutcome::Conflicted);
   4170     assert_eq!(
   4171         audit[1].operation,
   4172         MycOperationAuditKind::DiscoveryHandlerRefresh
   4173     );
   4174     assert_eq!(audit[1].outcome, MycOperationAuditOutcome::Conflicted);
   4175 
   4176     relay
   4177         .queue_publish_outcomes(app_identity.public_key(), &[true])
   4178         .await;
   4179     let refreshed = refresh_nip89(&runtime, true).await?;
   4180     assert_eq!(refreshed.status, MycDiscoveryLiveStatus::Conflicted);
   4181     assert_eq!(refreshed.live_groups.len(), 2);
   4182     assert!(refreshed.published.is_some());
   4183 
   4184     Ok(())
   4185 }