commit 22b83c6dcb6431a6fff2d1518d2325c13d52ba7b
parent 78eba8ad069b8012d1b40286083548a3899a1bc3
Author: triesap <tyson@radroots.org>
Date: Sun, 22 Mar 2026 01:18:59 +0000
app: harden publish confirmation and replay control
- add runtime and service shutdown seams for deterministic listener execution in tests
- require at least one relay acknowledgement before treating a publish as successful
- extract reusable control flows for connect accept and auth replay out of the cli layer
- restore pending auth challenges when replay publishing fails instead of dropping the request
Diffstat:
8 files changed, 361 insertions(+), 205 deletions(-)
diff --git a/src/app/mod.rs b/src/app/mod.rs
@@ -28,6 +28,13 @@ impl MycApp {
pub async fn run(self) -> Result<(), MycError> {
self.runtime.run().await
}
+
+ pub async fn run_until<F>(self, shutdown: F) -> Result<(), MycError>
+ where
+ F: std::future::Future<Output = ()>,
+ {
+ self.runtime.run_until(shutdown).await
+ }
}
#[cfg(test)]
diff --git a/src/app/runtime.rs b/src/app/runtime.rs
@@ -1,4 +1,5 @@
use std::fs;
+use std::future::Future;
use std::path::{Path, PathBuf};
use crate::config::MycConfig;
@@ -135,6 +136,13 @@ impl MycRuntime {
}
pub async fn run(self) -> Result<(), MycError> {
+ self.run_until(std::future::pending()).await
+ }
+
+ pub async fn run_until<F>(self, shutdown: F) -> Result<(), MycError>
+ where
+ F: Future<Output = ()>,
+ {
let snapshot = self.snapshot();
tracing::info!(
instance_name = %snapshot.instance_name,
@@ -154,8 +162,10 @@ impl MycRuntime {
);
if let Some(transport) = self.transport.clone() {
let service = MycNip46Service::new(self.signer_context(), transport);
- return service.run().await;
+ return service.run_until(shutdown).await;
}
+ tokio::pin!(shutdown);
+ shutdown.await;
Ok(())
}
diff --git a/src/cli.rs b/src/cli.rs
@@ -1,23 +1,17 @@
use std::path::{Path, PathBuf};
-use std::str::FromStr;
use clap::{Args, Parser, Subcommand};
-use radroots_nostr_connect::prelude::{
- RadrootsNostrConnectPermission, RadrootsNostrConnectPermissions, RadrootsNostrConnectRequest,
- RadrootsNostrConnectResponse, RadrootsNostrConnectUri,
-};
+use radroots_nostr_connect::prelude::RadrootsNostrConnectPermissions;
use radroots_nostr_signer::prelude::{
- RadrootsNostrSignerApprovalRequirement, RadrootsNostrSignerAuthorizationOutcome,
RadrootsNostrSignerConnectionId, RadrootsNostrSignerConnectionRecord,
- RadrootsNostrSignerRequestId,
};
use serde::Serialize;
use crate::app::MycRuntime;
use crate::config::{DEFAULT_CONFIG_PATH, MycConfig};
+use crate::control::{accept_client_uri, authorize_auth_challenge, parse_permission_values};
use crate::error::MycError;
use crate::logging;
-use crate::transport::{MycNip46Handler, MycNostrTransport};
#[derive(Debug, Parser)]
#[command(name = "myc")]
@@ -104,19 +98,6 @@ pub struct MycConnectionReasonArgs {
reason: Option<String>,
}
-#[derive(Debug, Serialize)]
-struct MycAuthorizedReplayOutput {
- connection: RadrootsNostrSignerConnectionRecord,
- replayed_request_id: Option<String>,
-}
-
-#[derive(Debug, Serialize)]
-struct MycAcceptedConnectionOutput {
- connection: RadrootsNostrSignerConnectionRecord,
- response_request_id: String,
- response_relays: Vec<String>,
-}
-
pub async fn run_from_env() -> Result<(), MycError> {
let cli = MycCli::parse();
let config = load_config(cli.config.as_deref())?;
@@ -184,14 +165,8 @@ pub async fn run_from_env() -> Result<(), MycError> {
}
MycAuthCommand::Authorize { connection_id } => {
let connection_id = parse_connection_id(&connection_id)?;
- let outcome = runtime
- .signer_manager()?
- .authorize_auth_challenge(&connection_id)?;
- let replayed_request_id = replay_authorized_request(&runtime, &outcome).await?;
- print_json(&MycAuthorizedReplayOutput {
- connection: outcome.connection,
- replayed_request_id,
- })
+ let replayed = authorize_auth_challenge(&runtime, &connection_id).await?;
+ print_json(&replayed)
}
}
}
@@ -218,22 +193,6 @@ fn parse_connection_id(value: &str) -> Result<RadrootsNostrSignerConnectionId, M
Ok(RadrootsNostrSignerConnectionId::parse(value)?)
}
-fn parse_permission_values(values: &[String]) -> Result<RadrootsNostrConnectPermissions, MycError> {
- let mut permissions = Vec::new();
- for value in values {
- for fragment in value.split(',') {
- let trimmed = fragment.trim();
- if trimmed.is_empty() {
- continue;
- }
- permissions.push(RadrootsNostrConnectPermission::from_str(trimmed)?);
- }
- }
- permissions.sort();
- permissions.dedup();
- Ok(permissions.into())
-}
-
fn granted_permissions_for_approval(
connections: &[RadrootsNostrSignerConnectionRecord],
connection_id: &RadrootsNostrSignerConnectionId,
@@ -252,156 +211,6 @@ fn granted_permissions_for_approval(
Ok(connection.requested_permissions.clone())
}
-async fn replay_authorized_request(
- runtime: &MycRuntime,
- outcome: &RadrootsNostrSignerAuthorizationOutcome,
-) -> Result<Option<String>, MycError> {
- let Some(pending_request) = &outcome.pending_request else {
- return Ok(None);
- };
- let transport = runtime.transport().ok_or_else(|| {
- MycError::InvalidOperation(
- "transport.enabled must be true to replay authorized requests".to_owned(),
- )
- })?;
- let handler = MycNip46Handler::new(runtime.signer_context(), transport.relays().to_vec());
- let handled_request = handler.handle_request(
- outcome.connection.client_public_key,
- pending_request.request_message.clone(),
- )?;
- let Some((response, consume_connect_secret_for)) = handled_request.into_publish_parts() else {
- return Ok(None);
- };
- let event = handler.build_response_event(
- outcome.connection.client_public_key,
- pending_request.request_message.id.clone(),
- response,
- )?;
- let publish_relays = if outcome.connection.relays.is_empty() {
- transport.relays().to_vec()
- } else {
- outcome.connection.relays.clone()
- };
- MycNostrTransport::publish_once(
- runtime.signer_identity(),
- &publish_relays,
- transport.connect_timeout_secs(),
- event,
- )
- .await?;
- if let Some(connection_id) = consume_connect_secret_for {
- runtime
- .signer_manager()?
- .mark_connect_secret_consumed(&connection_id)?;
- }
- Ok(Some(pending_request.request_message.id.clone()))
-}
-
-async fn accept_client_uri(
- runtime: &MycRuntime,
- uri: &str,
-) -> Result<MycAcceptedConnectionOutput, MycError> {
- let Some(transport) = runtime.transport() else {
- return Err(MycError::InvalidOperation(
- "transport.enabled must be true to accept client nostrconnect URIs".to_owned(),
- ));
- };
- let preferred_relays = transport.relays().to_vec();
- if preferred_relays.is_empty() {
- return Err(MycError::InvalidOperation(
- "transport.relays must not be empty to accept client nostrconnect URIs".to_owned(),
- ));
- }
-
- let client_uri = match RadrootsNostrConnectUri::parse(uri)? {
- RadrootsNostrConnectUri::Client(client_uri) => client_uri,
- RadrootsNostrConnectUri::Bunker(_) => {
- return Err(MycError::InvalidOperation(
- "connect accept requires a nostrconnect:// client URI".to_owned(),
- ));
- }
- };
-
- let request = RadrootsNostrConnectRequest::Connect {
- remote_signer_public_key: runtime.signer_identity().public_key(),
- secret: Some(client_uri.secret.clone()),
- requested_permissions: client_uri.metadata.requested_permissions.clone(),
- };
- let manager = runtime.signer_manager()?;
- let connection = match manager.evaluate_connect_request(client_uri.client_public_key, request)? {
- radroots_nostr_signer::prelude::RadrootsNostrSignerConnectEvaluation::ExistingConnection(
- connection,
- ) => {
- if connection.connect_secret_is_consumed() {
- return Err(MycError::InvalidOperation(
- "connect secret has already been consumed by a successful connection"
- .to_owned(),
- ));
- }
- connection
- }
- radroots_nostr_signer::prelude::RadrootsNostrSignerConnectEvaluation::RegistrationRequired(
- proposal,
- ) => {
- let draft = proposal
- .into_connection_draft(runtime.user_public_identity())
- .with_relays(preferred_relays.clone())
- .with_approval_requirement(runtime.signer_context().connection_approval_requirement());
- let connection = manager.register_connection(draft)?;
- if runtime.signer_context().connection_approval_requirement()
- == RadrootsNostrSignerApprovalRequirement::NotRequired
- {
- let _ = manager.set_granted_permissions(
- &connection.connection_id,
- connection.requested_permissions.clone(),
- )?;
- }
- connection
- }
- };
-
- let handler = MycNip46Handler::new(runtime.signer_context(), preferred_relays.clone());
- let response_request_id = RadrootsNostrSignerRequestId::new_v7().into_string();
- let event = handler.build_response_event(
- client_uri.client_public_key,
- response_request_id.clone(),
- RadrootsNostrConnectResponse::ConnectSecretEcho(client_uri.secret),
- )?;
- let response_relays = merge_relays(&client_uri.relays, &preferred_relays);
- MycNostrTransport::publish_once(
- runtime.signer_identity(),
- &response_relays,
- transport.connect_timeout_secs(),
- event,
- )
- .await?;
- let _ = manager.mark_connect_secret_consumed(&connection.connection_id)?;
-
- Ok(MycAcceptedConnectionOutput {
- connection: runtime
- .signer_manager()?
- .list_connections()?
- .into_iter()
- .find(|record| record.connection_id == connection.connection_id)
- .ok_or_else(|| {
- MycError::InvalidOperation("accepted connection was not persisted".to_owned())
- })?,
- response_request_id,
- response_relays: response_relays.iter().map(ToString::to_string).collect(),
- })
-}
-
-fn merge_relays(
- primary: &[nostr::RelayUrl],
- secondary: &[nostr::RelayUrl],
-) -> Vec<nostr::RelayUrl> {
- let mut relays = primary.to_vec();
- relays.extend_from_slice(secondary);
- relays.sort_by(|left, right| left.as_str().cmp(right.as_str()));
- relays.dedup_by(|left, right| left.as_str() == right.as_str());
- relays
-}
-
fn print_json<T>(value: &T) -> Result<(), MycError>
where
T: Serialize,
diff --git a/src/control.rs b/src/control.rs
@@ -0,0 +1,274 @@
+use std::str::FromStr;
+
+use radroots_nostr_connect::prelude::{
+ RadrootsNostrConnectPermission, RadrootsNostrConnectPermissions, RadrootsNostrConnectRequest,
+ RadrootsNostrConnectResponse, RadrootsNostrConnectUri,
+};
+use radroots_nostr_signer::prelude::{
+ RadrootsNostrSignerApprovalRequirement, RadrootsNostrSignerAuthorizationOutcome,
+ RadrootsNostrSignerConnectionId, RadrootsNostrSignerConnectionRecord,
+ RadrootsNostrSignerPendingRequest, RadrootsNostrSignerRequestId,
+};
+use serde::Serialize;
+
+use crate::app::MycRuntime;
+use crate::error::MycError;
+use crate::transport::{MycNip46Handler, MycNostrTransport};
+
+#[derive(Debug, Serialize)]
+pub struct MycAuthorizedReplayOutput {
+ pub connection: RadrootsNostrSignerConnectionRecord,
+ pub replayed_request_id: Option<String>,
+}
+
+#[derive(Debug, Serialize)]
+pub struct MycAcceptedConnectionOutput {
+ pub connection: RadrootsNostrSignerConnectionRecord,
+ pub response_request_id: String,
+ pub response_relays: Vec<String>,
+}
+
+pub async fn authorize_auth_challenge(
+ runtime: &MycRuntime,
+ connection_id: &RadrootsNostrSignerConnectionId,
+) -> Result<MycAuthorizedReplayOutput, MycError> {
+ let outcome = runtime
+ .signer_manager()?
+ .authorize_auth_challenge(connection_id)?;
+ let replayed_request_id = replay_authorized_request(runtime, &outcome).await?;
+ Ok(MycAuthorizedReplayOutput {
+ connection: outcome.connection,
+ replayed_request_id,
+ })
+}
+
+pub async fn accept_client_uri(
+ runtime: &MycRuntime,
+ uri: &str,
+) -> Result<MycAcceptedConnectionOutput, MycError> {
+ let Some(transport) = runtime.transport() else {
+ return Err(MycError::InvalidOperation(
+ "transport.enabled must be true to accept client nostrconnect URIs".to_owned(),
+ ));
+ };
+ let preferred_relays = transport.relays().to_vec();
+ if preferred_relays.is_empty() {
+ return Err(MycError::InvalidOperation(
+ "transport.relays must not be empty to accept client nostrconnect URIs".to_owned(),
+ ));
+ }
+
+ let client_uri = match RadrootsNostrConnectUri::parse(uri)? {
+ RadrootsNostrConnectUri::Client(client_uri) => client_uri,
+ RadrootsNostrConnectUri::Bunker(_) => {
+ return Err(MycError::InvalidOperation(
+ "connect accept requires a nostrconnect:// client URI".to_owned(),
+ ));
+ }
+ };
+
+ let request = RadrootsNostrConnectRequest::Connect {
+ remote_signer_public_key: runtime.signer_identity().public_key(),
+ secret: Some(client_uri.secret.clone()),
+ requested_permissions: client_uri.metadata.requested_permissions.clone(),
+ };
+ let manager = runtime.signer_manager()?;
+ let connection = match manager.evaluate_connect_request(client_uri.client_public_key, request)? {
+ radroots_nostr_signer::prelude::RadrootsNostrSignerConnectEvaluation::ExistingConnection(
+ connection,
+ ) => {
+ if connection.connect_secret_is_consumed() {
+ return Err(MycError::InvalidOperation(
+ "connect secret has already been consumed by a successful connection"
+ .to_owned(),
+ ));
+ }
+ connection
+ }
+ radroots_nostr_signer::prelude::RadrootsNostrSignerConnectEvaluation::RegistrationRequired(
+ proposal,
+ ) => {
+ let draft = proposal
+ .into_connection_draft(runtime.user_public_identity())
+ .with_relays(preferred_relays.clone())
+ .with_approval_requirement(runtime.signer_context().connection_approval_requirement());
+ let connection = manager.register_connection(draft)?;
+ if runtime.signer_context().connection_approval_requirement()
+ == RadrootsNostrSignerApprovalRequirement::NotRequired
+ {
+ let _ = manager.set_granted_permissions(
+ &connection.connection_id,
+ connection.requested_permissions.clone(),
+ )?;
+ }
+ connection
+ }
+ };
+
+ let handler = MycNip46Handler::new(runtime.signer_context(), preferred_relays.clone());
+ let response_request_id = RadrootsNostrSignerRequestId::new_v7().into_string();
+ let event = handler.build_response_event(
+ client_uri.client_public_key,
+ response_request_id.clone(),
+ RadrootsNostrConnectResponse::ConnectSecretEcho(client_uri.secret),
+ )?;
+ let response_relays = merge_relays(&client_uri.relays, &preferred_relays);
+ MycNostrTransport::publish_once(
+ runtime.signer_identity(),
+ &response_relays,
+ transport.connect_timeout_secs(),
+ event,
+ )
+ .await?;
+ let _ = manager.mark_connect_secret_consumed(&connection.connection_id)?;
+
+ Ok(MycAcceptedConnectionOutput {
+ connection: runtime
+ .signer_manager()?
+ .list_connections()?
+ .into_iter()
+ .find(|record| record.connection_id == connection.connection_id)
+ .ok_or_else(|| {
+ MycError::InvalidOperation("accepted connection was not persisted".to_owned())
+ })?,
+ response_request_id,
+ response_relays: response_relays.iter().map(ToString::to_string).collect(),
+ })
+}
+
+pub fn parse_permission_values(
+ values: &[String],
+) -> Result<RadrootsNostrConnectPermissions, MycError> {
+ let mut permissions = Vec::new();
+ for value in values {
+ for fragment in value.split(',') {
+ let trimmed = fragment.trim();
+ if trimmed.is_empty() {
+ continue;
+ }
+ permissions.push(RadrootsNostrConnectPermission::from_str(trimmed)?);
+ }
+ }
+ permissions.sort();
+ permissions.dedup();
+ Ok(permissions.into())
+}
+
+async fn replay_authorized_request(
+ runtime: &MycRuntime,
+ outcome: &RadrootsNostrSignerAuthorizationOutcome,
+) -> Result<Option<String>, MycError> {
+ let Some(pending_request) = outcome.pending_request.clone() else {
+ return Ok(None);
+ };
+ let transport = match runtime.transport() {
+ Some(transport) => transport,
+ None => {
+ let error = MycError::InvalidOperation(
+ "transport.enabled must be true to replay authorized requests".to_owned(),
+ );
+ return Err(restore_pending_auth_challenge_on_error(
+ runtime,
+ &outcome.connection.connection_id,
+ pending_request,
+ error,
+ ));
+ }
+ };
+ let handler = MycNip46Handler::new(runtime.signer_context(), transport.relays().to_vec());
+ let handled_request = match handler.handle_request(
+ outcome.connection.client_public_key,
+ pending_request.request_message.clone(),
+ ) {
+ Ok(handled_request) => handled_request,
+ Err(error) => {
+ return Err(restore_pending_auth_challenge_on_error(
+ runtime,
+ &outcome.connection.connection_id,
+ pending_request,
+ error,
+ ));
+ }
+ };
+ let Some((response, consume_connect_secret_for)) = handled_request.into_publish_parts() else {
+ let error = MycError::InvalidOperation(
+ "authorized auth replay did not produce a response".to_owned(),
+ );
+ return Err(restore_pending_auth_challenge_on_error(
+ runtime,
+ &outcome.connection.connection_id,
+ pending_request,
+ error,
+ ));
+ };
+ let event = match handler.build_response_event(
+ outcome.connection.client_public_key,
+ pending_request.request_message.id.clone(),
+ response,
+ ) {
+ Ok(event) => event,
+ Err(error) => {
+ return Err(restore_pending_auth_challenge_on_error(
+ runtime,
+ &outcome.connection.connection_id,
+ pending_request,
+ error,
+ ));
+ }
+ };
+ let publish_relays = if outcome.connection.relays.is_empty() {
+ transport.relays().to_vec()
+ } else {
+ outcome.connection.relays.clone()
+ };
+ if let Err(error) = MycNostrTransport::publish_once(
+ runtime.signer_identity(),
+ &publish_relays,
+ transport.connect_timeout_secs(),
+ event,
+ )
+ .await
+ {
+ return Err(restore_pending_auth_challenge_on_error(
+ runtime,
+ &outcome.connection.connection_id,
+ pending_request,
+ error,
+ ));
+ }
+ if let Some(connection_id) = consume_connect_secret_for {
+ runtime
+ .signer_manager()?
+ .mark_connect_secret_consumed(&connection_id)?;
+ }
+ Ok(Some(pending_request.request_message.id.clone()))
+}
+
+fn restore_pending_auth_challenge_on_error(
+ runtime: &MycRuntime,
+ connection_id: &RadrootsNostrSignerConnectionId,
+ pending_request: RadrootsNostrSignerPendingRequest,
+ error: MycError,
+) -> MycError {
+ match runtime.signer_manager().and_then(|manager| {
+ manager
+ .restore_pending_auth_challenge(connection_id, pending_request)
+ .map_err(Into::into)
+ }) {
+ Ok(_) => error,
+ Err(restore_error) => MycError::InvalidOperation(format!(
+ "{error}; additionally failed to restore pending auth challenge: {restore_error}"
+ )),
+ }
+}
+
+fn merge_relays(
+ primary: &[nostr::RelayUrl],
+ secondary: &[nostr::RelayUrl],
+) -> Vec<nostr::RelayUrl> {
+ let mut relays = primary.to_vec();
+ relays.extend_from_slice(secondary);
+ relays.sort_by(|left, right| left.as_str().cmp(right.as_str()));
+ relays.dedup_by(|left, right| left.as_str() == right.as_str());
+ relays
+}
diff --git a/src/error.rs b/src/error.rs
@@ -54,6 +54,8 @@ pub enum MycError {
Nip46Encrypt(String),
#[error("NIP-46 listener notifications closed")]
Nip46ListenerClosed,
+ #[error("Nostr publish failed for {operation}: {details}")]
+ PublishRejected { operation: String, details: String },
#[error(
"configured signer identity `{configured_identity_id}` at {identity_path} does not match persisted signer identity `{persisted_identity_id}` in {state_path}"
)]
diff --git a/src/lib.rs b/src/lib.rs
@@ -3,6 +3,7 @@
pub mod app;
pub mod cli;
pub mod config;
+pub mod control;
pub mod error;
pub mod logging;
pub mod transport;
@@ -12,6 +13,7 @@ pub use config::{
DEFAULT_CONFIG_PATH, MycConfig, MycConnectionApproval, MycLoggingConfig, MycPathsConfig,
MycPolicyConfig, MycServiceConfig, MycTransportConfig,
};
+pub use control::{MycAcceptedConnectionOutput, MycAuthorizedReplayOutput};
pub use error::MycError;
pub use transport::{MycNostrTransport, MycTransportSnapshot};
diff --git a/src/transport.rs b/src/transport.rs
@@ -4,7 +4,7 @@ use std::time::Duration;
use radroots_identity::RadrootsIdentity;
use radroots_nostr::prelude::{
- RadrootsNostrClient, RadrootsNostrEventBuilder, RadrootsNostrRelayUrl,
+ RadrootsNostrClient, RadrootsNostrEventBuilder, RadrootsNostrOutput, RadrootsNostrRelayUrl,
};
use crate::config::MycTransportConfig;
@@ -85,7 +85,8 @@ impl MycNostrTransport {
client
.wait_for_connection(Duration::from_secs(connect_timeout_secs))
.await;
- let _ = client.send_event_builder(event).await?;
+ let output = client.send_event_builder(event).await?;
+ let _ = ensure_publish_confirmed(output, "one-shot Nostr publish")?;
Ok(())
}
@@ -98,6 +99,34 @@ impl MycNostrTransport {
}
}
+pub(crate) fn ensure_publish_confirmed<T>(
+ output: RadrootsNostrOutput<T>,
+ operation: &str,
+) -> Result<RadrootsNostrOutput<T>, MycError>
+where
+ T: std::fmt::Debug,
+{
+ if !output.success.is_empty() {
+ return Ok(output);
+ }
+
+ let details = if output.failed.is_empty() {
+ "no relay acknowledged the publish".to_owned()
+ } else {
+ output
+ .failed
+ .iter()
+ .map(|(relay, error)| format!("{relay}: {error}"))
+ .collect::<Vec<_>>()
+ .join("; ")
+ };
+
+ Err(MycError::PublishRejected {
+ operation: operation.to_owned(),
+ details,
+ })
+}
+
impl MycTransportSnapshot {
pub fn disabled() -> Self {
Self {
diff --git a/src/transport/nip46.rs b/src/transport/nip46.rs
@@ -1,3 +1,5 @@
+use std::future::Future;
+
use nostr::nips::nip04;
use nostr::nips::nip44;
use nostr::nips::nip44::Version;
@@ -19,7 +21,7 @@ use tokio::sync::broadcast;
use crate::app::MycSignerContext;
use crate::error::MycError;
-use crate::transport::MycNostrTransport;
+use crate::transport::{MycNostrTransport, ensure_publish_confirmed};
#[derive(Clone)]
pub struct MycNip46Handler {
@@ -393,6 +395,14 @@ impl MycNip46Service {
}
pub async fn run(&self) -> Result<(), MycError> {
+ self.run_until(std::future::pending()).await
+ }
+
+ pub async fn run_until<F>(&self, shutdown: F) -> Result<(), MycError>
+ where
+ F: Future<Output = ()>,
+ {
+ tokio::pin!(shutdown);
self.transport.connect().await?;
let filter = self.handler.filter()?;
@@ -405,11 +415,16 @@ impl MycNip46Service {
);
loop {
- let notification = match notifications.recv().await {
- Ok(notification) => notification,
- Err(broadcast::error::RecvError::Lagged(_)) => continue,
- Err(broadcast::error::RecvError::Closed) => {
- return Err(MycError::Nip46ListenerClosed);
+ let notification = tokio::select! {
+ _ = &mut shutdown => return Ok(()),
+ notification = notifications.recv() => {
+ match notification {
+ Ok(notification) => notification,
+ Err(broadcast::error::RecvError::Lagged(_)) => continue,
+ Err(broadcast::error::RecvError::Closed) => {
+ return Err(MycError::Nip46ListenerClosed);
+ }
+ }
}
};
let RadrootsNostrRelayPoolNotification::Event { event, .. } = notification else {
@@ -452,12 +467,20 @@ impl MycNip46Service {
let response_event =
self.handler
.build_response_event(event.pubkey, request_id, response)?;
- if let Err(error) = self
+ let publish_output = match self
.transport
.client()
.send_event_builder(response_event)
.await
{
+ Ok(output) => output,
+ Err(error) => {
+ tracing::warn!(error = %error, "failed to publish NIP-46 response");
+ continue;
+ }
+ };
+ if let Err(error) = ensure_publish_confirmed(publish_output, "NIP-46 response publish")
+ {
tracing::warn!(error = %error, "failed to publish NIP-46 response");
continue;
}