commit 99cfeaf1fbb129d26a866e75eef4d5733c8dad6b
parent b5034dd8ade932849ecb0a0b182b9529aa7f07ab
Author: triesap <tyson@radroots.org>
Date: Thu, 26 Mar 2026 20:55:21 +0000
delivery: route discovery publishes through outbox
Diffstat:
| M | src/discovery.rs | | | 261 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------- |
| M | tests/nip46_e2e.rs | | | 70 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ |
2 files changed, 276 insertions(+), 55 deletions(-)
diff --git a/src/discovery.rs b/src/discovery.rs
@@ -20,7 +20,8 @@ use crate::audit::{MycOperationAuditKind, MycOperationAuditOutcome, MycOperation
use crate::config::MycDiscoveryMetadataConfig;
use crate::custody::MycIdentityProvider;
use crate::error::MycError;
-use crate::transport::{MycNostrTransport, MycRelayPublishResult};
+use crate::outbox::{MycDeliveryOutboxKind, MycDeliveryOutboxRecord};
+use crate::transport::{MycNostrTransport, MycPublishOutcome, MycRelayPublishResult};
const NIP46_RPC_KIND: u32 = 24_133;
const DISCOVERY_BUNDLE_VERSION: u32 = 1;
@@ -517,6 +518,18 @@ async fn publish_nip89_event_to_relays(
) -> Result<MycPublishedNip89Output, MycError> {
let event = context.build_signed_handler_event()?;
let event_id = event.id.to_hex();
+ let outbox_record =
+ build_discovery_outbox_record(event.clone(), relays, event_id.as_str(), attempt_id)?;
+ if let Err(error) = runtime.delivery_outbox_store().enqueue(&outbox_record) {
+ record_discovery_publish_local_failure(
+ runtime,
+ relays.len(),
+ event_id.as_str(),
+ attempt_id,
+ error.to_string(),
+ );
+ return Err(error);
+ }
let publish_outcome = match MycNostrTransport::publish_event_once(
context.app_identity(),
relays,
@@ -528,65 +541,45 @@ async fn publish_nip89_event_to_relays(
{
Ok(outcome) => outcome,
Err(error) => {
- let mut record = MycOperationAuditRecord::new(
- MycOperationAuditKind::DiscoveryHandlerPublish,
- MycOperationAuditOutcome::Rejected,
- None,
- Some(event_id.as_str()),
- error
- .publish_rejection_counts()
- .map(|(relay_count, _)| relay_count)
- .unwrap_or(relays.len()),
- error
- .publish_rejection_counts()
- .map(|(_, acknowledged)| acknowledged)
- .unwrap_or_default(),
- error
- .publish_rejection_details()
- .map(ToOwned::to_owned)
- .unwrap_or_else(|| error.to_string()),
+ let error = mark_discovery_outbox_publish_failed(runtime, &outbox_record, error);
+ record_discovery_publish_failure(
+ runtime,
+ relays.len(),
+ event_id.as_str(),
+ attempt_id,
+ &error,
);
- if let (
- Some(delivery_policy),
- Some(required_acknowledged_relay_count),
- Some(attempt_count),
- ) = (
- error.publish_delivery_policy(),
- error.publish_required_acknowledged_relay_count(),
- error.publish_attempt_count(),
- ) {
- record = record.with_delivery_details(
- delivery_policy,
- required_acknowledged_relay_count,
- attempt_count,
- );
- }
- if let Some(attempt_id) = attempt_id {
- record = record.with_attempt_id(attempt_id);
- }
- runtime.record_operation_audit(&record);
return Err(error);
}
};
-
- let mut record = MycOperationAuditRecord::new(
- MycOperationAuditKind::DiscoveryHandlerPublish,
- MycOperationAuditOutcome::Succeeded,
- None,
- Some(event_id.as_str()),
- publish_outcome.relay_count,
- publish_outcome.acknowledged_relay_count,
- publish_outcome.relay_outcome_summary.clone(),
- )
- .with_delivery_details(
- publish_outcome.delivery_policy,
- publish_outcome.required_acknowledged_relay_count,
- publish_outcome.attempt_count,
- );
- if let Some(attempt_id) = attempt_id {
- record = record.with_attempt_id(attempt_id);
+ if let Err(error) = runtime
+ .delivery_outbox_store()
+ .mark_published_pending_finalize(&outbox_record.job_id, publish_outcome.attempt_count)
+ {
+ record_discovery_post_publish_failure(
+ runtime,
+ event_id.as_str(),
+ attempt_id,
+ &publish_outcome,
+ format!("failed to persist discovery outbox published state: {error}"),
+ );
+ return Err(error);
}
- runtime.record_operation_audit(&record);
+ if let Err(error) = runtime
+ .delivery_outbox_store()
+ .mark_finalized(&outbox_record.job_id)
+ {
+ record_discovery_post_publish_failure(
+ runtime,
+ event_id.as_str(),
+ attempt_id,
+ &publish_outcome,
+ format!("failed to finalize discovery outbox job: {error}"),
+ );
+ return Err(error);
+ }
+
+ record_discovery_publish_success(runtime, event_id.as_str(), attempt_id, &publish_outcome);
Ok(MycPublishedNip89Output {
author_public_key_hex: context.app_identity().public_key_hex(),
@@ -893,6 +886,164 @@ pub async fn refresh_nip89(
}
}
+fn build_discovery_outbox_record(
+ event: RadrootsNostrEvent,
+ relays: &[RadrootsNostrRelayUrl],
+ event_id: &str,
+ attempt_id: Option<&str>,
+) -> Result<MycDeliveryOutboxRecord, MycError> {
+ let mut record = MycDeliveryOutboxRecord::new(
+ MycDeliveryOutboxKind::DiscoveryHandlerPublish,
+ event,
+ relays.to_vec(),
+ )?
+ .with_request_id(event_id.to_owned());
+ if let Some(attempt_id) = attempt_id {
+ record = record.with_attempt_id(attempt_id.to_owned());
+ }
+ Ok(record)
+}
+
+fn mark_discovery_outbox_publish_failed(
+ runtime: &MycRuntime,
+ outbox_record: &MycDeliveryOutboxRecord,
+ error: MycError,
+) -> MycError {
+ let publish_attempt_count = error.publish_attempt_count().unwrap_or_default();
+ let summary = publish_failure_summary(&error);
+ match runtime.delivery_outbox_store().mark_failed(
+ &outbox_record.job_id,
+ publish_attempt_count,
+ &summary,
+ ) {
+ Ok(_) => error,
+ Err(outbox_error) => MycError::InvalidOperation(format!(
+ "{error}; additionally failed to persist discovery publish failure to the outbox: {outbox_error}"
+ )),
+ }
+}
+
+fn record_discovery_publish_local_failure(
+ runtime: &MycRuntime,
+ relay_count: usize,
+ event_id: &str,
+ attempt_id: Option<&str>,
+ summary: impl Into<String>,
+) {
+ let mut record = MycOperationAuditRecord::new(
+ MycOperationAuditKind::DiscoveryHandlerPublish,
+ MycOperationAuditOutcome::Rejected,
+ None,
+ Some(event_id),
+ relay_count,
+ 0,
+ summary.into(),
+ );
+ if let Some(attempt_id) = attempt_id {
+ record = record.with_attempt_id(attempt_id);
+ }
+ runtime.record_operation_audit(&record);
+}
+
+fn record_discovery_publish_failure(
+ runtime: &MycRuntime,
+ relay_count: usize,
+ event_id: &str,
+ attempt_id: Option<&str>,
+ error: &MycError,
+) {
+ let mut record = MycOperationAuditRecord::new(
+ MycOperationAuditKind::DiscoveryHandlerPublish,
+ MycOperationAuditOutcome::Rejected,
+ None,
+ Some(event_id),
+ error
+ .publish_rejection_counts()
+ .map(|(publish_relay_count, _)| publish_relay_count)
+ .unwrap_or(relay_count),
+ error
+ .publish_rejection_counts()
+ .map(|(_, acknowledged)| acknowledged)
+ .unwrap_or_default(),
+ publish_failure_summary(error),
+ );
+ if let (Some(delivery_policy), Some(required_acknowledged_relay_count), Some(attempt_count)) = (
+ error.publish_delivery_policy(),
+ error.publish_required_acknowledged_relay_count(),
+ error.publish_attempt_count(),
+ ) {
+ record = record.with_delivery_details(
+ delivery_policy,
+ required_acknowledged_relay_count,
+ attempt_count,
+ );
+ }
+ if let Some(attempt_id) = attempt_id {
+ record = record.with_attempt_id(attempt_id);
+ }
+ runtime.record_operation_audit(&record);
+}
+
+fn record_discovery_post_publish_failure(
+ runtime: &MycRuntime,
+ event_id: &str,
+ attempt_id: Option<&str>,
+ publish_outcome: &MycPublishOutcome,
+ summary: impl Into<String>,
+) {
+ let mut record = MycOperationAuditRecord::new(
+ MycOperationAuditKind::DiscoveryHandlerPublish,
+ MycOperationAuditOutcome::Rejected,
+ None,
+ Some(event_id),
+ publish_outcome.relay_count,
+ publish_outcome.acknowledged_relay_count,
+ summary.into(),
+ )
+ .with_delivery_details(
+ publish_outcome.delivery_policy,
+ publish_outcome.required_acknowledged_relay_count,
+ publish_outcome.attempt_count,
+ );
+ if let Some(attempt_id) = attempt_id {
+ record = record.with_attempt_id(attempt_id);
+ }
+ runtime.record_operation_audit(&record);
+}
+
+fn record_discovery_publish_success(
+ runtime: &MycRuntime,
+ event_id: &str,
+ attempt_id: Option<&str>,
+ publish_outcome: &MycPublishOutcome,
+) {
+ let mut record = MycOperationAuditRecord::new(
+ MycOperationAuditKind::DiscoveryHandlerPublish,
+ MycOperationAuditOutcome::Succeeded,
+ None,
+ Some(event_id),
+ publish_outcome.relay_count,
+ publish_outcome.acknowledged_relay_count,
+ publish_outcome.relay_outcome_summary.clone(),
+ )
+ .with_delivery_details(
+ publish_outcome.delivery_policy,
+ publish_outcome.required_acknowledged_relay_count,
+ publish_outcome.attempt_count,
+ );
+ if let Some(attempt_id) = attempt_id {
+ record = record.with_attempt_id(attempt_id);
+ }
+ runtime.record_operation_audit(&record);
+}
+
+fn publish_failure_summary(error: &MycError) -> String {
+ error
+ .publish_rejection_details()
+ .map(ToOwned::to_owned)
+ .unwrap_or_else(|| error.to_string())
+}
+
fn build_refresh_plan(
context: &MycDiscoveryContext,
relay_states: &[MycDiscoveryRelayState],
diff --git a/tests/nip46_e2e.rs b/tests/nip46_e2e.rs
@@ -2477,6 +2477,22 @@ async fn explicit_nip89_publish_uses_app_identity_and_records_audit() -> TestRes
.relay_outcome_summary
.contains("1/1 relays acknowledged publish")
);
+ let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| {
+ records.len() >= 1 && records[0].status == MycDeliveryOutboxStatus::Finalized
+ })
+ .await?;
+ assert_eq!(
+ outbox_records[0].kind,
+ MycDeliveryOutboxKind::DiscoveryHandlerPublish
+ );
+ assert_eq!(
+ outbox_records[0].request_id.as_deref(),
+ Some(published_event_id.as_str())
+ );
+ assert!(outbox_records[0].attempt_id.is_none());
+ assert!(outbox_records[0].signer_publish_workflow_id.is_none());
+ assert!(outbox_records[0].published_at_unix.is_some());
+ assert!(outbox_records[0].finalized_at_unix.is_some());
Ok(())
}
@@ -2526,6 +2542,20 @@ async fn explicit_nip89_publish_retries_cleanly_after_rejection() -> TestResult<
.relay_outcome_summary
.contains("blocked by test relay")
);
+ let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| {
+ records.len() >= 1 && records[0].status == MycDeliveryOutboxStatus::Failed
+ })
+ .await?;
+ assert_eq!(
+ outbox_records[0].kind,
+ MycDeliveryOutboxKind::DiscoveryHandlerPublish
+ );
+ assert_eq!(
+ outbox_records[0].request_id.as_deref(),
+ first_audit[0].request_id.as_deref()
+ );
+ assert!(outbox_records[0].attempt_id.is_none());
+ assert!(outbox_records[0].signer_publish_workflow_id.is_none());
let published = publish_nip89_event(&runtime).await?;
let published_events = relay
@@ -2552,6 +2582,23 @@ async fn explicit_nip89_publish_retries_cleanly_after_rejection() -> TestResult<
.relay_outcome_summary
.contains("1/1 relays acknowledged publish")
);
+ let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| {
+ records.len() >= 2 && records[1].status == MycDeliveryOutboxStatus::Finalized
+ })
+ .await?;
+ assert_eq!(outbox_records[0].status, MycDeliveryOutboxStatus::Failed);
+ assert_eq!(
+ outbox_records[1].kind,
+ MycDeliveryOutboxKind::DiscoveryHandlerPublish
+ );
+ assert_eq!(
+ outbox_records[1].request_id.as_deref(),
+ Some(published.event.id.to_hex().as_str())
+ );
+ assert!(outbox_records[1].attempt_id.is_none());
+ assert!(outbox_records[1].signer_publish_workflow_id.is_none());
+ assert!(outbox_records[1].published_at_unix.is_some());
+ assert!(outbox_records[1].finalized_at_unix.is_some());
Ok(())
}
@@ -2791,6 +2838,29 @@ async fn refresh_nip89_publishes_when_live_handler_is_missing() -> TestResult<()
MycOperationAuditKind::DiscoveryHandlerRepair
);
assert_eq!(audit[2].outcome, MycOperationAuditOutcome::Succeeded);
+ let published = refreshed
+ .published
+ .as_ref()
+ .expect("published discovery output");
+ let outbox_records = wait_for_delivery_outbox_records(&runtime, |records| {
+ records.len() >= 1 && records[0].status == MycDeliveryOutboxStatus::Finalized
+ })
+ .await?;
+ assert_eq!(
+ outbox_records[0].kind,
+ MycDeliveryOutboxKind::DiscoveryHandlerPublish
+ );
+ assert_eq!(
+ outbox_records[0].request_id.as_deref(),
+ Some(published.event.id.to_hex().as_str())
+ );
+ assert_eq!(
+ outbox_records[0].attempt_id.as_deref(),
+ Some(refreshed.attempt_id.as_str())
+ );
+ assert!(outbox_records[0].signer_publish_workflow_id.is_none());
+ assert!(outbox_records[0].published_at_unix.is_some());
+ assert!(outbox_records[0].finalized_at_unix.is_some());
Ok(())
}