commit 498a71bf0a9f699929efa8436ce5ae0762ae73a2
parent 933cc8d8874b242f10596e55366acedd8f9c0a2b
Author: triesap <triesap@radroots.dev>
Date: Tue, 6 Jan 2026 13:56:02 +0000
nip46: subscribe for ephemeral responses
- wait for relay connections before sending requests
- switch connect response handling to subscription loop
- switch nip46 client response handling to subscription loop
- add timeout and notification error handling for nip46 flow
Diffstat:
2 files changed, 100 insertions(+), 38 deletions(-)
diff --git a/src/api/jsonrpc/methods/nip46/connect.rs b/src/api/jsonrpc/methods/nip46/connect.rs
@@ -3,6 +3,8 @@ use std::time::Duration;
use anyhow::Result;
use jsonrpsee::server::RpcModule;
use serde::{Deserialize, Serialize};
+use tokio::sync::broadcast;
+use tokio::time::sleep;
use uuid::Uuid;
use crate::api::jsonrpc::params::DEFAULT_TIMEOUT_SECS;
@@ -18,6 +20,8 @@ use radroots_nostr::prelude::{
RadrootsNostrKind,
RadrootsNostrKeys,
RadrootsNostrPublicKey,
+ RadrootsNostrRelayPoolNotification,
+ RadrootsNostrTimestamp,
};
use nostr::nips::{nip44, nip46::NostrConnectMessage, nip46::NostrConnectRequest};
use nostr::JsonUtil;
@@ -78,6 +82,9 @@ async fn connect_bunker(
add_relays(&client, &info.relays).await?;
client.connect().await;
+ client
+ .wait_for_connection(Duration::from_secs(DEFAULT_TIMEOUT_SECS))
+ .await;
let request_id = send_connect_request(
&client,
@@ -166,31 +173,54 @@ async fn wait_for_connect_response(
) -> Result<NostrConnectMessage, RpcError> {
let filter = RadrootsNostrFilter::new()
.kind(RadrootsNostrKind::NostrConnect)
- .author(remote_signer_pubkey.clone());
+ .author(remote_signer_pubkey.clone())
+ .since(RadrootsNostrTimestamp::now());
let filter = radroots_nostr_filter_tag(filter, "p", vec![client_pubkey.to_hex()])
.map_err(|e| RpcError::Other(format!("nip46 connect filter failed: {e}")))?;
- let events = client
- .fetch_events(filter, Duration::from_secs(DEFAULT_TIMEOUT_SECS))
+ let mut notifications = client.notifications();
+ let subscription = client
+ .subscribe(filter, None)
.await
.map_err(|e| RpcError::Other(format!("nip46 connect failed: {e}")))?;
+ let timeout = sleep(Duration::from_secs(DEFAULT_TIMEOUT_SECS));
+ tokio::pin!(timeout);
- for event in events {
- let decrypted = nip44::decrypt(
- client_keys.secret_key(),
- remote_signer_pubkey,
- &event.content,
- )
- .map_err(|e| RpcError::Other(format!("nip46 connect decrypt failed: {e}")))?;
- let message = NostrConnectMessage::from_json(&decrypted)
- .map_err(|e| RpcError::Other(format!("nip46 connect response parse failed: {e}")))?;
- if message.is_response() && message.id() == request_id {
- return Ok(message);
+ loop {
+ tokio::select! {
+ _ = &mut timeout => {
+ client.unsubscribe(&subscription.val).await;
+ return Err(RpcError::Other("nip46 connect response not found".to_string()));
+ }
+ msg = notifications.recv() => {
+ let notification = match msg {
+ Ok(notification) => notification,
+ Err(broadcast::error::RecvError::Lagged(_)) => continue,
+ Err(broadcast::error::RecvError::Closed) => {
+ return Err(RpcError::Other("nip46 connect notification closed".to_string()));
+ }
+ };
+ let RadrootsNostrRelayPoolNotification::Event { event, .. } = notification else {
+ continue;
+ };
+ let event = (*event).clone();
+ if event.kind != RadrootsNostrKind::NostrConnect || &event.pubkey != remote_signer_pubkey {
+ continue;
+ }
+ let decrypted = nip44::decrypt(
+ client_keys.secret_key(),
+ remote_signer_pubkey,
+ &event.content,
+ )
+ .map_err(|e| RpcError::Other(format!("nip46 connect decrypt failed: {e}")))?;
+ let message = NostrConnectMessage::from_json(&decrypted)
+ .map_err(|e| RpcError::Other(format!("nip46 connect response parse failed: {e}")))?;
+ if message.is_response() && message.id() == request_id {
+ client.unsubscribe(&subscription.val).await;
+ return Ok(message);
+ }
+ }
}
}
-
- Err(RpcError::Other(
- "nip46 connect response not found".to_string(),
- ))
}
fn validate_connect_response(
diff --git a/src/nip46/client.rs b/src/nip46/client.rs
@@ -9,6 +9,8 @@ use radroots_nostr::prelude::{
RadrootsNostrEventBuilder,
RadrootsNostrFilter,
RadrootsNostrKind,
+ RadrootsNostrRelayPoolNotification,
+ RadrootsNostrTimestamp,
};
use nostr::nips::{
nip44,
@@ -16,6 +18,8 @@ use nostr::nips::{
};
use nostr::JsonUtil;
use nostr::UnsignedEvent;
+use tokio::sync::broadcast;
+use tokio::time::sleep;
pub async fn sign_event(
session: &Nip46Session,
@@ -68,6 +72,10 @@ async fn send_request(
label: &str,
) -> Result<String, RpcError> {
session.client.connect().await;
+ session
+ .client
+ .wait_for_connection(Duration::from_secs(DEFAULT_TIMEOUT_SECS))
+ .await;
let message = NostrConnectMessage::request(&request);
let request_id = message.id().to_string();
@@ -94,31 +102,55 @@ async fn wait_for_response(
) -> Result<NostrConnectMessage, RpcError> {
let filter = RadrootsNostrFilter::new()
.kind(RadrootsNostrKind::NostrConnect)
- .author(session.remote_signer_pubkey.clone());
+ .author(session.remote_signer_pubkey.clone())
+ .since(RadrootsNostrTimestamp::now());
let filter = radroots_nostr_filter_tag(filter, "p", vec![session.client_pubkey.to_hex()])
.map_err(|e| RpcError::Other(format!("nip46 {label} failed: {e}")))?;
-
- let events = session
+ let mut notifications = session.client.notifications();
+ let subscription = session
.client
- .fetch_events(filter, Duration::from_secs(DEFAULT_TIMEOUT_SECS))
+ .subscribe(filter, None)
.await
.map_err(|e| RpcError::Other(format!("nip46 {label} failed: {e}")))?;
-
- for event in events {
- let decrypted = nip44::decrypt(
- session.client_keys.secret_key(),
- &session.remote_signer_pubkey,
- &event.content,
- )
- .map_err(|e| RpcError::Other(format!("nip46 {label} failed: {e}")))?;
- let message = NostrConnectMessage::from_json(&decrypted)
- .map_err(|e| RpcError::Other(format!("nip46 {label} failed: {e}")))?;
- if message.is_response() && message.id() == request_id {
- return Ok(message);
+ let timeout = sleep(Duration::from_secs(DEFAULT_TIMEOUT_SECS));
+ tokio::pin!(timeout);
+
+ loop {
+ tokio::select! {
+ _ = &mut timeout => {
+ session.client.unsubscribe(&subscription.val).await;
+ return Err(RpcError::Other(format!("nip46 {label} response not found")));
+ }
+ msg = notifications.recv() => {
+ let notification = match msg {
+ Ok(notification) => notification,
+ Err(broadcast::error::RecvError::Lagged(_)) => continue,
+ Err(broadcast::error::RecvError::Closed) => {
+ return Err(RpcError::Other(format!("nip46 {label} notification closed")));
+ }
+ };
+ let RadrootsNostrRelayPoolNotification::Event { event, .. } = notification else {
+ continue;
+ };
+ let event = (*event).clone();
+ if event.kind != RadrootsNostrKind::NostrConnect
+ || event.pubkey != session.remote_signer_pubkey
+ {
+ continue;
+ }
+ let decrypted = nip44::decrypt(
+ session.client_keys.secret_key(),
+ &session.remote_signer_pubkey,
+ &event.content,
+ )
+ .map_err(|e| RpcError::Other(format!("nip46 {label} failed: {e}")))?;
+ let message = NostrConnectMessage::from_json(&decrypted)
+ .map_err(|e| RpcError::Other(format!("nip46 {label} failed: {e}")))?;
+ if message.is_response() && message.id() == request_id {
+ session.client.unsubscribe(&subscription.val).await;
+ return Ok(message);
+ }
+ }
}
}
-
- Err(RpcError::Other(format!(
- "nip46 {label} response not found"
- )))
}