commit 40ddd75b3cdaa15856bb9f473bf773c14a9c8e19
parent 3c9df8754e1901ee2d6e7c39b0df3126bbc5b3c7
Author: triesap <tyson@radroots.org>
Date: Thu, 26 Mar 2026 19:06:55 +0000
delivery: route listener responses through outbox
Diffstat:
4 files changed, 453 insertions(+), 77 deletions(-)
diff --git a/src/app/runtime.rs b/src/app/runtime.rs
@@ -220,7 +220,11 @@ impl MycRuntime {
let mut tasks = tokio::task::JoinSet::new();
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
if let Some(transport) = self.transport.clone() {
- let service = MycNip46Service::new(self.signer_context(), transport);
+ let service = MycNip46Service::new(
+ self.signer_context(),
+ transport,
+ self.delivery_outbox_store(),
+ );
let shutdown = observe_shutdown_signal(shutdown_rx.clone());
tasks.spawn(async move { service.run_until(shutdown).await });
}
diff --git a/src/transport/nip46.rs b/src/transport/nip46.rs
@@ -16,14 +16,16 @@ use radroots_nostr_signer::prelude::{
RadrootsNostrSignerConnectEvaluation, RadrootsNostrSignerConnectionId,
RadrootsNostrSignerConnectionRecord, RadrootsNostrSignerRequestAction,
RadrootsNostrSignerRequestEvaluation, RadrootsNostrSignerRequestResponseHint,
- RadrootsNostrSignerSessionLookup,
+ RadrootsNostrSignerSessionLookup, RadrootsNostrSignerWorkflowId,
};
use tokio::sync::broadcast;
use crate::app::MycSignerContext;
use crate::audit::{MycOperationAuditKind, MycOperationAuditOutcome, MycOperationAuditRecord};
use crate::error::MycError;
+use crate::outbox::{MycDeliveryOutboxKind, MycDeliveryOutboxRecord, MycDeliveryOutboxStore};
use crate::transport::MycNostrTransport;
+use std::sync::Arc;
#[derive(Clone)]
pub struct MycNip46Handler {
@@ -34,6 +36,7 @@ pub struct MycNip46Handler {
pub struct MycNip46Service {
handler: MycNip46Handler,
transport: MycNostrTransport,
+ delivery_outbox_store: Arc<dyn MycDeliveryOutboxStore>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -511,9 +514,17 @@ impl MycNip46Handler {
}
impl MycNip46Service {
- pub fn new(signer: MycSignerContext, transport: MycNostrTransport) -> Self {
+ pub fn new(
+ signer: MycSignerContext,
+ transport: MycNostrTransport,
+ delivery_outbox_store: Arc<dyn MycDeliveryOutboxStore>,
+ ) -> Self {
let handler = MycNip46Handler::new(signer, transport.relays().to_vec());
- Self { handler, transport }
+ Self {
+ handler,
+ transport,
+ delivery_outbox_store,
+ }
}
pub async fn run(&self) -> Result<(), MycError> {
@@ -594,20 +605,71 @@ impl MycNip46Service {
match response_event.sign_with_keys(self.handler.signer.signer_identity().keys()) {
Ok(event) => event,
Err(error) => {
- self.handler
- .signer
- .record_operation_audit(&MycOperationAuditRecord::new(
- MycOperationAuditKind::ListenerResponsePublish,
- MycOperationAuditOutcome::Rejected,
- connection_id.as_ref(),
- Some(request_id.as_str()),
- self.transport.relays().len(),
- 0,
- format!("failed to sign NIP-46 response event: {error}"),
- ));
+ self.record_listener_publish_local_rejection(
+ connection_id.as_ref(),
+ request_id.as_str(),
+ format!("failed to sign NIP-46 response event: {error}"),
+ );
continue;
}
};
+
+ let mut workflow_id = None;
+ if let Some(connect_connection_id) = consume_connect_secret_for.as_ref() {
+ let manager = match self.handler.signer.load_signer_manager() {
+ Ok(manager) => manager,
+ Err(error) => {
+ self.record_listener_publish_local_rejection(
+ connection_id.as_ref(),
+ request_id.as_str(),
+ error.to_string(),
+ );
+ continue;
+ }
+ };
+ match manager.begin_connect_secret_publish_finalization(connect_connection_id) {
+ Ok(workflow) => workflow_id = Some(workflow.workflow_id),
+ Err(error) => {
+ self.record_listener_publish_local_rejection(
+ connection_id.as_ref(),
+ request_id.as_str(),
+ format!(
+ "failed to begin connect-secret publish finalization workflow: {error}"
+ ),
+ );
+ continue;
+ }
+ }
+ }
+
+ let outbox_record = match self.build_listener_outbox_record(
+ response_event.clone(),
+ connection_id.as_ref(),
+ request_id.as_str(),
+ workflow_id.as_ref(),
+ ) {
+ Ok(record) => record,
+ Err(error) => {
+ let error = self
+ .cancel_listener_publish_workflow_if_needed(workflow_id.as_ref(), error);
+ self.record_listener_publish_local_rejection(
+ connection_id.as_ref(),
+ request_id.as_str(),
+ error.to_string(),
+ );
+ continue;
+ }
+ };
+ if let Err(error) = self.delivery_outbox_store.enqueue(&outbox_record) {
+ let error =
+ self.cancel_listener_publish_workflow_if_needed(workflow_id.as_ref(), error);
+ self.record_listener_publish_local_rejection(
+ connection_id.as_ref(),
+ request_id.as_str(),
+ error.to_string(),
+ );
+ continue;
+ }
let publish_outcome = match self
.transport
.publish_event("NIP-46 response publish", &response_event)
@@ -615,74 +677,274 @@ impl MycNip46Service {
{
Ok(publish_outcome) => publish_outcome,
Err(error) => {
- let mut record = MycOperationAuditRecord::new(
- MycOperationAuditKind::ListenerResponsePublish,
- MycOperationAuditOutcome::Rejected,
+ let mut error = self.record_listener_outbox_failure(&outbox_record, error);
+ error = self
+ .cancel_listener_publish_workflow_if_needed(workflow_id.as_ref(), error);
+ self.record_listener_publish_error(
connection_id.as_ref(),
- Some(request_id.as_str()),
- error
- .publish_rejection_counts()
- .map(|(relay_count, _)| relay_count)
- .unwrap_or(self.transport.relays().len()),
- error
- .publish_rejection_counts()
- .map(|(_, acknowledged)| acknowledged)
- .unwrap_or_default(),
- error
- .publish_rejection_details()
- .map(ToOwned::to_owned)
- .unwrap_or_else(|| error.to_string()),
+ request_id.as_str(),
+ &error,
);
- if let (
- Some(delivery_policy),
- Some(required_acknowledged_relay_count),
- Some(attempt_count),
- ) = (
- error.publish_delivery_policy(),
- error.publish_required_acknowledged_relay_count(),
- error.publish_attempt_count(),
- ) {
- record = record.with_delivery_details(
- delivery_policy,
- required_acknowledged_relay_count,
- attempt_count,
+ continue;
+ }
+ };
+ if let Some(workflow_id) = workflow_id.as_ref() {
+ let manager = match self.handler.signer.load_signer_manager() {
+ Ok(manager) => manager,
+ Err(error) => {
+ self.record_listener_publish_post_publish_failure(
+ connection_id.as_ref(),
+ request_id.as_str(),
+ &publish_outcome,
+ format!(
+ "failed to load signer manager for publish finalization: {error}"
+ ),
);
+ continue;
}
- self.handler.signer.record_operation_audit(&record);
+ };
+ if let Err(error) = manager.mark_publish_workflow_published(workflow_id) {
+ self.record_listener_publish_post_publish_failure(
+ connection_id.as_ref(),
+ request_id.as_str(),
+ &publish_outcome,
+ format!("failed to mark signer publish workflow as published: {error}"),
+ );
continue;
}
- };
- self.handler.signer.record_operation_audit(
- &MycOperationAuditRecord::new(
- MycOperationAuditKind::ListenerResponsePublish,
- MycOperationAuditOutcome::Succeeded,
+ }
+ if let Err(error) = self.delivery_outbox_store.mark_published_pending_finalize(
+ &outbox_record.job_id,
+ publish_outcome.attempt_count,
+ ) {
+ self.record_listener_publish_post_publish_failure(
connection_id.as_ref(),
- Some(request_id.as_str()),
- publish_outcome.relay_count,
- publish_outcome.acknowledged_relay_count,
- publish_outcome.relay_outcome_summary.clone(),
- )
- .with_delivery_details(
- publish_outcome.delivery_policy,
- publish_outcome.required_acknowledged_relay_count,
- publish_outcome.attempt_count,
- ),
- );
- if let Some(connection_id) = consume_connect_secret_for {
- if let Err(error) = self
- .handler
- .signer
- .load_signer_manager()?
- .mark_connect_secret_consumed(&connection_id)
- {
- tracing::warn!(
- error = %error,
- connection_id = %connection_id,
- "failed to persist consumed NIP-46 connect secret"
+ request_id.as_str(),
+ &publish_outcome,
+ format!("failed to persist delivery outbox published state: {error}"),
+ );
+ continue;
+ }
+ if let Some(workflow_id) = workflow_id.as_ref() {
+ let manager = match self.handler.signer.load_signer_manager() {
+ Ok(manager) => manager,
+ Err(error) => {
+ self.record_listener_publish_post_publish_failure(
+ connection_id.as_ref(),
+ request_id.as_str(),
+ &publish_outcome,
+ format!("failed to load signer manager for publish workflow finalization: {error}"),
+ );
+ continue;
+ }
+ };
+ if let Err(error) = manager.finalize_publish_workflow(workflow_id) {
+ self.record_listener_publish_post_publish_failure(
+ connection_id.as_ref(),
+ request_id.as_str(),
+ &publish_outcome,
+ format!("failed to finalize signer publish workflow: {error}"),
);
+ continue;
}
}
+ if let Err(error) = self
+ .delivery_outbox_store
+ .mark_finalized(&outbox_record.job_id)
+ {
+ self.record_listener_publish_post_publish_failure(
+ connection_id.as_ref(),
+ request_id.as_str(),
+ &publish_outcome,
+ format!("failed to finalize delivery outbox job: {error}"),
+ );
+ continue;
+ }
+ self.record_listener_publish_success(
+ connection_id.as_ref(),
+ request_id.as_str(),
+ &publish_outcome,
+ );
+ }
+ }
+
+ fn build_listener_outbox_record(
+ &self,
+ response_event: RadrootsNostrEvent,
+ connection_id: Option<&RadrootsNostrSignerConnectionId>,
+ request_id: &str,
+ workflow_id: Option<&RadrootsNostrSignerWorkflowId>,
+ ) -> Result<MycDeliveryOutboxRecord, MycError> {
+ let mut record = MycDeliveryOutboxRecord::new(
+ MycDeliveryOutboxKind::ListenerResponsePublish,
+ response_event,
+ self.transport.relays().to_vec(),
+ )?
+ .with_request_id(request_id.to_owned());
+ if let Some(connection_id) = connection_id {
+ record = record.with_connection_id(connection_id);
+ }
+ if let Some(workflow_id) = workflow_id {
+ record = record.with_signer_publish_workflow_id(workflow_id);
+ }
+ Ok(record)
+ }
+
+ fn cancel_listener_publish_workflow_if_needed(
+ &self,
+ workflow_id: Option<&RadrootsNostrSignerWorkflowId>,
+ error: MycError,
+ ) -> MycError {
+ let Some(workflow_id) = workflow_id else {
+ return error;
+ };
+ match self
+ .handler
+ .signer
+ .load_signer_manager()
+ .and_then(|manager| {
+ manager
+ .cancel_publish_workflow(workflow_id)
+ .map(|_| ())
+ .map_err(Into::into)
+ }) {
+ Ok(()) => error,
+ Err(cancel_error) => MycError::InvalidOperation(format!(
+ "{error}; additionally failed to cancel listener publish workflow: {cancel_error}"
+ )),
+ }
+ }
+
+ fn record_listener_outbox_failure(
+ &self,
+ outbox_record: &MycDeliveryOutboxRecord,
+ error: MycError,
+ ) -> MycError {
+ let publish_attempt_count = error.publish_attempt_count().unwrap_or_default();
+ let failure_summary = error
+ .publish_rejection_details()
+ .map(ToOwned::to_owned)
+ .unwrap_or_else(|| error.to_string());
+ match self.delivery_outbox_store.mark_failed(
+ &outbox_record.job_id,
+ publish_attempt_count,
+ &failure_summary,
+ ) {
+ Ok(_) => error,
+ Err(outbox_error) => MycError::InvalidOperation(format!(
+ "{error}; additionally failed to persist listener publish failure to the outbox: {outbox_error}"
+ )),
+ }
+ }
+
+ fn record_listener_publish_local_rejection(
+ &self,
+ connection_id: Option<&RadrootsNostrSignerConnectionId>,
+ request_id: &str,
+ summary: impl Into<String>,
+ ) {
+ self.handler
+ .signer
+ .record_operation_audit(&MycOperationAuditRecord::new(
+ MycOperationAuditKind::ListenerResponsePublish,
+ MycOperationAuditOutcome::Rejected,
+ connection_id,
+ Some(request_id),
+ self.transport.relays().len(),
+ 0,
+ summary.into(),
+ ));
+ }
+
+ fn record_listener_publish_error(
+ &self,
+ connection_id: Option<&RadrootsNostrSignerConnectionId>,
+ request_id: &str,
+ error: &MycError,
+ ) {
+ let mut record = MycOperationAuditRecord::new(
+ MycOperationAuditKind::ListenerResponsePublish,
+ MycOperationAuditOutcome::Rejected,
+ connection_id,
+ Some(request_id),
+ error
+ .publish_rejection_counts()
+ .map(|(relay_count, _)| relay_count)
+ .unwrap_or(self.transport.relays().len()),
+ error
+ .publish_rejection_counts()
+ .map(|(_, acknowledged)| acknowledged)
+ .unwrap_or_default(),
+ error
+ .publish_rejection_details()
+ .map(ToOwned::to_owned)
+ .unwrap_or_else(|| error.to_string()),
+ );
+ if let (
+ Some(delivery_policy),
+ Some(required_acknowledged_relay_count),
+ Some(attempt_count),
+ ) = (
+ error.publish_delivery_policy(),
+ error.publish_required_acknowledged_relay_count(),
+ error.publish_attempt_count(),
+ ) {
+ record = record.with_delivery_details(
+ delivery_policy,
+ required_acknowledged_relay_count,
+ attempt_count,
+ );
}
+ self.handler.signer.record_operation_audit(&record);
+ }
+
+ fn record_listener_publish_post_publish_failure(
+ &self,
+ connection_id: Option<&RadrootsNostrSignerConnectionId>,
+ request_id: &str,
+ publish_outcome: &crate::transport::MycPublishOutcome,
+ summary: impl Into<String>,
+ ) {
+ self.handler.signer.record_operation_audit(
+ &MycOperationAuditRecord::new(
+ MycOperationAuditKind::ListenerResponsePublish,
+ MycOperationAuditOutcome::Rejected,
+ connection_id,
+ Some(request_id),
+ publish_outcome.relay_count,
+ publish_outcome.acknowledged_relay_count,
+ summary.into(),
+ )
+ .with_delivery_details(
+ publish_outcome.delivery_policy,
+ publish_outcome.required_acknowledged_relay_count,
+ publish_outcome.attempt_count,
+ ),
+ );
+ }
+
+ fn record_listener_publish_success(
+ &self,
+ connection_id: Option<&RadrootsNostrSignerConnectionId>,
+ request_id: &str,
+ publish_outcome: &crate::transport::MycPublishOutcome,
+ ) {
+ self.handler.signer.record_operation_audit(
+ &MycOperationAuditRecord::new(
+ MycOperationAuditKind::ListenerResponsePublish,
+ MycOperationAuditOutcome::Succeeded,
+ connection_id,
+ Some(request_id),
+ publish_outcome.relay_count,
+ publish_outcome.acknowledged_relay_count,
+ publish_outcome.relay_outcome_summary.clone(),
+ )
+ .with_delivery_details(
+ publish_outcome.delivery_policy,
+ publish_outcome.required_acknowledged_relay_count,
+ publish_outcome.attempt_count,
+ ),
+ );
}
}
diff --git a/tests/logging_run.rs b/tests/logging_run.rs
@@ -12,11 +12,22 @@ fn write_test_identity(path: &Path, secret_key: &str) {
.expect("write identity");
}
-fn wait_for_non_empty_log(path: &Path, timeout: Duration) -> Result<String, String> {
+fn wait_for_log_contents(
+ path: &Path,
+ timeout: Duration,
+ expected_substrings: &[&str],
+) -> Result<String, String> {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
match std::fs::read_to_string(path) {
- Ok(contents) if !contents.trim().is_empty() => return Ok(contents),
+ Ok(contents)
+ if !contents.trim().is_empty()
+ && expected_substrings
+ .iter()
+ .all(|substring| contents.contains(substring)) =>
+ {
+ return Ok(contents);
+ }
Ok(_) => {}
Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
Err(error) => return Err(format!("failed to read log file: {error}")),
@@ -92,8 +103,11 @@ MYC_TRANSPORT_CONNECT_TIMEOUT_SECS=10\n",
.spawn()
.expect("spawn myc");
- let contents = match wait_for_non_empty_log(expected_log_path.as_path(), Duration::from_secs(5))
- {
+ let contents = match wait_for_log_contents(
+ expected_log_path.as_path(),
+ Duration::from_secs(5),
+ &["logging initialized", "myc runtime bootstrapped"],
+ ) {
Ok(contents) => contents,
Err(error) => {
kill_child(&mut child);
diff --git a/tests/nip46_e2e.rs b/tests/nip46_e2e.rs
@@ -6,7 +6,8 @@ use std::time::Duration;
use futures_util::{SinkExt, StreamExt};
use myc::control;
use myc::{
- MycConfig, MycConnectionApproval, MycDiscoveryContext, MycDiscoveryLiveStatus,
+ MycConfig, MycConnectionApproval, MycDeliveryOutboxKind, MycDeliveryOutboxRecord,
+ MycDeliveryOutboxStatus, MycDiscoveryContext, MycDiscoveryLiveStatus,
MycDiscoveryRelayFetchStatus, MycDiscoveryRepairOutcome, MycOperationAuditKind,
MycOperationAuditOutcome, MycOperationAuditRecord, MycRuntime, MycRuntimeAuditBackend,
MycSignerStateBackend, MycTransportDeliveryPolicy, diff_live_nip89, fetch_live_nip89,
@@ -811,6 +812,29 @@ async fn wait_for_operation_audit_count(
.map_err(Into::into)
}
+async fn wait_for_delivery_outbox_records<F>(
+ runtime: &MycRuntime,
+ predicate: F,
+) -> TestResult<Vec<MycDeliveryOutboxRecord>>
+where
+ F: Fn(&[MycDeliveryOutboxRecord]) -> bool,
+{
+ timeout(Duration::from_secs(5), async {
+ loop {
+ let records = runtime
+ .delivery_outbox_store()
+ .list_all()
+ .expect("delivery outbox");
+ if predicate(&records) {
+ return records;
+ }
+ sleep(Duration::from_millis(25)).await;
+ }
+ })
+ .await
+ .map_err(Into::into)
+}
+
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn live_listener_rejects_denied_clients_without_registering_connection() -> TestResult<()> {
let relay = TestRelay::spawn().await?;
@@ -1336,6 +1360,30 @@ async fn live_listener_consumes_connect_secret_only_after_successful_publish() -
.relay_outcome_summary
.contains("blocked by test relay")
);
+ let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| {
+ records.len() >= 1 && records[0].status == MycDeliveryOutboxStatus::Failed
+ })
+ .await?;
+ assert_eq!(
+ outbox_records[0].kind,
+ MycDeliveryOutboxKind::ListenerResponsePublish
+ );
+ assert_eq!(outbox_records[0].status, MycDeliveryOutboxStatus::Failed);
+ assert_eq!(
+ outbox_records[0]
+ .connection_id
+ .as_ref()
+ .map(|value| value.as_str()),
+ Some(initial_connection.connection_id.as_str())
+ );
+ assert_eq!(outbox_records[0].request_id.as_deref(), Some("connect-1"));
+ assert!(outbox_records[0].signer_publish_workflow_id.is_some());
+ assert!(
+ runtime
+ .signer_manager()?
+ .list_publish_workflows()?
+ .is_empty()
+ );
let request_two = build_request_event(
&client_identity,
@@ -1363,6 +1411,25 @@ async fn live_listener_consumes_connect_secret_only_after_successful_publish() -
.next()
.expect("stored connection");
assert!(consumed_connection.connect_secret_is_consumed());
+ let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| {
+ records.len() >= 2 && records[1].status == MycDeliveryOutboxStatus::Finalized
+ })
+ .await?;
+ assert_eq!(
+ outbox_records[1].kind,
+ MycDeliveryOutboxKind::ListenerResponsePublish
+ );
+ assert_eq!(outbox_records[1].status, MycDeliveryOutboxStatus::Finalized);
+ assert_eq!(outbox_records[1].request_id.as_deref(), Some("connect-2"));
+ assert!(outbox_records[1].published_at_unix.is_some());
+ assert!(outbox_records[1].finalized_at_unix.is_some());
+ assert!(outbox_records[1].signer_publish_workflow_id.is_some());
+ assert!(
+ runtime
+ .signer_manager()?
+ .list_publish_workflows()?
+ .is_empty()
+ );
let request_three = build_request_event(
&client_identity,
@@ -1507,6 +1574,12 @@ async fn live_listener_works_with_sqlite_signer_state_and_runtime_audit() -> Tes
operation_audit[1].outcome,
MycOperationAuditOutcome::Succeeded
);
+ let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| {
+ records.len() >= 2 && records[1].status == MycDeliveryOutboxStatus::Finalized
+ })
+ .await?;
+ assert_eq!(outbox_records[0].status, MycDeliveryOutboxStatus::Failed);
+ assert_eq!(outbox_records[1].status, MycDeliveryOutboxStatus::Finalized);
let restarted_runtime = MycRuntime::bootstrap(runtime.config().clone())?;
assert_eq!(
@@ -1520,6 +1593,29 @@ async fn live_listener_works_with_sqlite_signer_state_and_runtime_audit() -> Tes
restarted_runtime.operation_audit_store().list_all()?.len(),
2
);
+ let restarted_outbox = restarted_runtime.delivery_outbox_store().list_all()?;
+ assert_eq!(restarted_outbox.len(), 2);
+ assert_eq!(restarted_outbox[0].status, MycDeliveryOutboxStatus::Failed);
+ assert_eq!(
+ restarted_outbox[1].status,
+ MycDeliveryOutboxStatus::Finalized
+ );
+ assert_eq!(
+ restarted_outbox[0].request_id.as_deref(),
+ Some("sqlite-connect-1")
+ );
+ assert_eq!(
+ restarted_outbox[1].request_id.as_deref(),
+ Some("sqlite-connect-2")
+ );
+ assert!(restarted_outbox[0].signer_publish_workflow_id.is_some());
+ assert!(restarted_outbox[1].signer_publish_workflow_id.is_some());
+ assert!(
+ restarted_runtime
+ .signer_manager()?
+ .list_publish_workflows()?
+ .is_empty()
+ );
assert!(
restarted_runtime
.signer_manager()?