commit dc19ff32ce0798a59024fa98eab911989ce9efe9
parent 985a49fe22184ef198c84579d05af1ff726c3eec
Author: triesap <tyson@radroots.org>
Date: Fri, 27 Mar 2026 19:08:07 +0000
transport: subscribe before publishing nip46 requests
Diffstat:
2 files changed, 79 insertions(+), 56 deletions(-)
diff --git a/src/transport/jsonrpc/methods/nip46/connect.rs b/src/transport/jsonrpc/methods/nip46/connect.rs
@@ -26,6 +26,7 @@ use radroots_nostr::prelude::{
RadrootsNostrPublicKey,
RadrootsNostrSecretKey,
RadrootsNostrRelayPoolNotification,
+ RadrootsNostrSubscriptionId,
RadrootsNostrTimestamp,
};
use nostr::nips::{nip44, nip46::NostrConnectMessage, nip46::NostrConnectRequest};
@@ -99,20 +100,37 @@ async fn connect_bunker(
.wait_for_connection(Duration::from_secs(DEFAULT_TIMEOUT_SECS))
.await;
- let request_id = send_connect_request(
- &client,
- &client_keys,
+ let request = NostrConnectRequest::Connect {
+ remote_signer_public_key: remote_signer_pubkey.clone(),
+ secret: info.secret.clone(),
+ };
+ let message = NostrConnectMessage::request(&request);
+ let request_id = message.id().to_string();
+ let filter = connect_response_filter(
&remote_signer_pubkey,
- info.secret.as_deref(),
- )
- .await?;
+ &client_pubkey,
+ RadrootsNostrTimestamp::now(),
+ )?;
+ let notifications = client.notifications();
+ let subscription = client
+ .subscribe(filter, None)
+ .await
+ .map_err(|e| RpcError::Other(format!("nip46 connect failed: {e}")))?;
+
+ if let Err(error) = send_connect_request(&client, &client_keys, &remote_signer_pubkey, message)
+ .await
+ {
+ client.unsubscribe(&subscription.val).await;
+ return Err(error);
+ }
let response = wait_for_connect_response(
&client,
&client_keys,
&remote_signer_pubkey,
- &client_pubkey,
&request_id,
+ notifications,
+ &subscription.val,
)
.await?;
@@ -262,14 +280,8 @@ async fn send_connect_request(
client: &RadrootsNostrClient,
client_keys: &RadrootsNostrKeys,
remote_signer_pubkey: &RadrootsNostrPublicKey,
- secret: Option<&str>,
-) -> Result<String, RpcError> {
- let req = NostrConnectRequest::Connect {
- remote_signer_public_key: remote_signer_pubkey.clone(),
- secret: secret.map(|value| value.to_string()),
- };
- let message = NostrConnectMessage::request(&req);
- let request_id = message.id().to_string();
+ message: NostrConnectMessage,
+) -> Result<(), RpcError> {
let event = RadrootsNostrEventBuilder::nostr_connect(
client_keys,
remote_signer_pubkey.clone(),
@@ -280,34 +292,37 @@ async fn send_connect_request(
.send_event_builder(event)
.await
.map_err(|e| RpcError::Other(format!("nip46 connect request failed: {e}")))?;
- Ok(request_id)
+ Ok(())
+}
+
+fn connect_response_filter(
+ remote_signer_pubkey: &RadrootsNostrPublicKey,
+ client_pubkey: &RadrootsNostrPublicKey,
+ since: RadrootsNostrTimestamp,
+) -> Result<RadrootsNostrFilter, RpcError> {
+ let filter = RadrootsNostrFilter::new()
+ .kind(RadrootsNostrKind::NostrConnect)
+ .author(remote_signer_pubkey.clone())
+ .since(since);
+ radroots_nostr_filter_tag(filter, "p", vec![client_pubkey.to_hex()])
+ .map_err(|e| RpcError::Other(format!("nip46 connect filter failed: {e}")))
}
async fn wait_for_connect_response(
client: &RadrootsNostrClient,
client_keys: &RadrootsNostrKeys,
remote_signer_pubkey: &RadrootsNostrPublicKey,
- client_pubkey: &RadrootsNostrPublicKey,
request_id: &str,
+ mut notifications: broadcast::Receiver<RadrootsNostrRelayPoolNotification>,
+ subscription_id: &RadrootsNostrSubscriptionId,
) -> Result<NostrConnectMessage, RpcError> {
- let filter = RadrootsNostrFilter::new()
- .kind(RadrootsNostrKind::NostrConnect)
- .author(remote_signer_pubkey.clone())
- .since(RadrootsNostrTimestamp::now());
- let filter = radroots_nostr_filter_tag(filter, "p", vec![client_pubkey.to_hex()])
- .map_err(|e| RpcError::Other(format!("nip46 connect filter failed: {e}")))?;
- let mut notifications = client.notifications();
- let subscription = client
- .subscribe(filter, None)
- .await
- .map_err(|e| RpcError::Other(format!("nip46 connect failed: {e}")))?;
let timeout = sleep(Duration::from_secs(DEFAULT_TIMEOUT_SECS));
tokio::pin!(timeout);
loop {
tokio::select! {
_ = &mut timeout => {
- client.unsubscribe(&subscription.val).await;
+ client.unsubscribe(subscription_id).await;
return Err(RpcError::Other("nip46 connect response not found".to_string()));
}
msg = notifications.recv() => {
@@ -315,6 +330,7 @@ async fn wait_for_connect_response(
Ok(notification) => notification,
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => {
+ client.unsubscribe(subscription_id).await;
return Err(RpcError::Other("nip46 connect notification closed".to_string()));
}
};
@@ -336,7 +352,7 @@ async fn wait_for_connect_response(
let message = NostrConnectMessage::from_json(&decrypted)
.map_err(|e| RpcError::Other(format!("nip46 connect response parse failed: {e}")))?;
if message.is_response() && message.id() == request_id {
- client.unsubscribe(&subscription.val).await;
+ client.unsubscribe(subscription_id).await;
return Ok(message);
}
}
diff --git a/src/transport/jsonrpc/nip46/client.rs b/src/transport/jsonrpc/nip46/client.rs
@@ -10,6 +10,7 @@ use radroots_nostr::prelude::{
RadrootsNostrFilter,
RadrootsNostrKind,
RadrootsNostrRelayPoolNotification,
+ RadrootsNostrSubscriptionId,
RadrootsNostrTimestamp,
};
use nostr::nips::{
@@ -62,15 +63,6 @@ pub async fn request(
request: NostrConnectRequest,
label: &str,
) -> Result<NostrConnectMessage, RpcError> {
- let request_id = send_request(session, request, label).await?;
- wait_for_response(session, &request_id, label).await
-}
-
-async fn send_request(
- session: &Nip46Session,
- request: NostrConnectRequest,
- label: &str,
-) -> Result<String, RpcError> {
session.client.connect().await;
session
.client
@@ -79,6 +71,13 @@ async fn send_request(
let message = NostrConnectMessage::request(&request);
let request_id = message.id().to_string();
+ let filter = response_filter(session, RadrootsNostrTimestamp::now(), label)?;
+ let notifications = session.client.notifications();
+ let subscription = session
+ .client
+ .subscribe(filter, None)
+ .await
+ .map_err(|e| RpcError::Other(format!("nip46 {label} failed: {e}")))?;
let event = RadrootsNostrEventBuilder::nostr_connect(
&session.client_keys,
session.remote_signer_pubkey.clone(),
@@ -86,39 +85,46 @@ async fn send_request(
)
.map_err(|e| RpcError::Other(format!("nip46 {label} failed: {e}")))?;
- session
+ if let Err(error) = session
.client
.send_event_builder(event)
.await
- .map_err(|e| RpcError::Other(format!("nip46 {label} failed: {e}")))?;
+ .map_err(|e| RpcError::Other(format!("nip46 {label} failed: {e}")))
+ {
+ session.client.unsubscribe(&subscription.val).await;
+ return Err(error);
+ }
- Ok(request_id)
+ wait_for_response(session, &request_id, label, notifications, &subscription.val).await
}
-async fn wait_for_response(
+fn response_filter(
session: &Nip46Session,
- request_id: &str,
+ since: RadrootsNostrTimestamp,
label: &str,
-) -> Result<NostrConnectMessage, RpcError> {
+) -> Result<RadrootsNostrFilter, RpcError> {
let filter = RadrootsNostrFilter::new()
.kind(RadrootsNostrKind::NostrConnect)
.author(session.remote_signer_pubkey.clone())
- .since(RadrootsNostrTimestamp::now());
- let filter = radroots_nostr_filter_tag(filter, "p", vec![session.client_pubkey.to_hex()])
- .map_err(|e| RpcError::Other(format!("nip46 {label} failed: {e}")))?;
- let mut notifications = session.client.notifications();
- let subscription = session
- .client
- .subscribe(filter, None)
- .await
- .map_err(|e| RpcError::Other(format!("nip46 {label} failed: {e}")))?;
+ .since(since);
+ radroots_nostr_filter_tag(filter, "p", vec![session.client_pubkey.to_hex()])
+ .map_err(|e| RpcError::Other(format!("nip46 {label} failed: {e}")))
+}
+
+async fn wait_for_response(
+ session: &Nip46Session,
+ request_id: &str,
+ label: &str,
+ mut notifications: broadcast::Receiver<RadrootsNostrRelayPoolNotification>,
+ subscription_id: &RadrootsNostrSubscriptionId,
+) -> Result<NostrConnectMessage, RpcError> {
let timeout = sleep(Duration::from_secs(DEFAULT_TIMEOUT_SECS));
tokio::pin!(timeout);
loop {
tokio::select! {
_ = &mut timeout => {
- session.client.unsubscribe(&subscription.val).await;
+ session.client.unsubscribe(subscription_id).await;
return Err(RpcError::Other(format!("nip46 {label} response not found")));
}
msg = notifications.recv() => {
@@ -126,6 +132,7 @@ async fn wait_for_response(
Ok(notification) => notification,
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => {
+ session.client.unsubscribe(subscription_id).await;
return Err(RpcError::Other(format!("nip46 {label} notification closed")));
}
};
@@ -147,7 +154,7 @@ async fn wait_for_response(
let message = NostrConnectMessage::from_json(&decrypted)
.map_err(|e| RpcError::Other(format!("nip46 {label} failed: {e}")))?;
if message.is_response() && message.id() == request_id {
- session.client.unsubscribe(&subscription.val).await;
+ session.client.unsubscribe(subscription_id).await;
return Ok(message);
}
}