commit 75cc248f3daba62bb5fa44451e87fe841fb637fa
parent 086ad6d47bebd68335cb2e39381658644b930aae
Author: triesap <tyson@radroots.org>
Date: Wed, 25 Mar 2026 19:39:53 +0000
transport: add delivery policy and retry semantics
- add typed delivery policy quorum and bounded retry settings to runtime config and transport state
- apply policy-aware publish confirmation and delivery audit details to listener responses connect accept auth replay and discovery publication
- add relay-backed proof for any quorum and all-delivery behavior while preserving connect secret and auth replay safety
- document the env and operator delivery contract and validate with cargo metadata --format-version 1 --no-deps cargo check --locked cargo test --locked and cargo fmt --all --check
Diffstat:
12 files changed, 1191 insertions(+), 110 deletions(-)
diff --git a/.env.example b/.env.example
@@ -30,3 +30,9 @@ MYC_POLICY_CONNECTION_APPROVAL=explicit_user
MYC_TRANSPORT_ENABLED=true
MYC_TRANSPORT_CONNECT_TIMEOUT_SECS=10
MYC_TRANSPORT_RELAYS=wss://relay.radroots.org
+MYC_TRANSPORT_DELIVERY_POLICY=any
+# set MYC_TRANSPORT_DELIVERY_QUORUM when MYC_TRANSPORT_DELIVERY_POLICY=quorum
+# MYC_TRANSPORT_DELIVERY_QUORUM=2
+MYC_TRANSPORT_PUBLISH_MAX_ATTEMPTS=1
+MYC_TRANSPORT_PUBLISH_INITIAL_BACKOFF_MILLIS=250
+MYC_TRANSPORT_PUBLISH_MAX_BACKOFF_MILLIS=2000
diff --git a/src/app/runtime.rs b/src/app/runtime.rs
@@ -242,6 +242,9 @@ impl MycSignerContext {
connection_id = record.connection_id.as_deref().unwrap_or(""),
request_id = record.request_id.as_deref().unwrap_or(""),
attempt_id = record.attempt_id.as_deref().unwrap_or(""),
+ delivery_policy = ?record.delivery_policy,
+ required_acknowledged_relay_count = record.required_acknowledged_relay_count.unwrap_or_default(),
+ publish_attempt_count = record.publish_attempt_count.unwrap_or_default(),
relay_count = record.relay_count,
acknowledged_relay_count = record.acknowledged_relay_count,
relay_outcome_summary = %record.relay_outcome_summary,
@@ -307,6 +310,9 @@ fn emit_operation_audit_trace(record: &MycOperationAuditRecord) {
connection_id = record.connection_id.as_deref().unwrap_or(""),
request_id = record.request_id.as_deref().unwrap_or(""),
attempt_id = record.attempt_id.as_deref().unwrap_or(""),
+ delivery_policy = ?record.delivery_policy,
+ required_acknowledged_relay_count = record.required_acknowledged_relay_count.unwrap_or_default(),
+ publish_attempt_count = record.publish_attempt_count.unwrap_or_default(),
relay_count = record.relay_count,
acknowledged_relay_count = record.acknowledged_relay_count,
relay_outcome_summary = %record.relay_outcome_summary,
@@ -323,6 +329,9 @@ fn emit_operation_audit_trace(record: &MycOperationAuditRecord) {
connection_id = record.connection_id.as_deref().unwrap_or(""),
request_id = record.request_id.as_deref().unwrap_or(""),
attempt_id = record.attempt_id.as_deref().unwrap_or(""),
+ delivery_policy = ?record.delivery_policy,
+ required_acknowledged_relay_count = record.required_acknowledged_relay_count.unwrap_or_default(),
+ publish_attempt_count = record.publish_attempt_count.unwrap_or_default(),
relay_count = record.relay_count,
acknowledged_relay_count = record.acknowledged_relay_count,
relay_outcome_summary = %record.relay_outcome_summary,
diff --git a/src/audit.rs b/src/audit.rs
@@ -8,6 +8,7 @@ use radroots_nostr_signer::prelude::RadrootsNostrSignerConnectionId;
use serde::{Deserialize, Serialize};
use crate::config::MycAuditConfig;
+use crate::config::MycTransportDeliveryPolicy;
use crate::error::MycError;
const MYC_OPERATION_AUDIT_FILE_NAME: &str = "operations.jsonl";
@@ -66,6 +67,12 @@ pub struct MycOperationAuditRecord {
pub blocked_relays: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub blocked_reason: Option<String>,
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub delivery_policy: Option<MycTransportDeliveryPolicy>,
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub required_acknowledged_relay_count: Option<usize>,
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub publish_attempt_count: Option<usize>,
pub relay_count: usize,
pub acknowledged_relay_count: usize,
pub relay_outcome_summary: String,
@@ -103,6 +110,9 @@ impl MycOperationAuditRecord {
planned_repair_relays: Vec::new(),
blocked_relays: Vec::new(),
blocked_reason: None,
+ delivery_policy: None,
+ required_acknowledged_relay_count: None,
+ publish_attempt_count: None,
relay_count,
acknowledged_relay_count,
relay_outcome_summary: relay_outcome_summary.into(),
@@ -133,6 +143,18 @@ impl MycOperationAuditRecord {
self.blocked_relays = blocked_relays;
self
}
+
+ pub fn with_delivery_details(
+ mut self,
+ delivery_policy: MycTransportDeliveryPolicy,
+ required_acknowledged_relay_count: usize,
+ publish_attempt_count: usize,
+ ) -> Self {
+ self.delivery_policy = Some(delivery_policy);
+ self.required_acknowledged_relay_count = Some(required_acknowledged_relay_count);
+ self.publish_attempt_count = Some(publish_attempt_count);
+ self
+ }
}
impl MycOperationAuditStore {
diff --git a/src/cli.rs b/src/cli.rs
@@ -11,7 +11,7 @@ use serde::Serialize;
use crate::app::MycRuntime;
use crate::audit::{MycOperationAuditKind, MycOperationAuditOutcome, MycOperationAuditRecord};
-use crate::config::{DEFAULT_ENV_PATH, MycConfig};
+use crate::config::{DEFAULT_ENV_PATH, MycConfig, MycTransportDeliveryPolicy};
use crate::control::{accept_client_uri, authorize_auth_challenge, parse_permission_values};
use crate::discovery::{
MycDiscoveryContext, MycDiscoveryRepairSummary, diff_live_nip89, fetch_live_nip89,
@@ -239,6 +239,12 @@ pub struct MycDiscoveryRepairAttemptSummaryOutput {
pub aggregate_publish_acknowledged_relay_count: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub aggregate_publish_relay_outcome_summary: Option<String>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub aggregate_publish_delivery_policy: Option<MycTransportDeliveryPolicy>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub aggregate_publish_required_acknowledged_relay_count: Option<usize>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub aggregate_publish_attempt_count: Option<usize>,
pub repair_summary: MycDiscoveryRepairSummary,
pub planned_repair_relays: Vec<String>,
pub blocked_relays: Vec<String>,
@@ -774,6 +780,12 @@ impl MycDiscoveryRepairAttemptSummaryOutput {
.map(|record| record.acknowledged_relay_count),
aggregate_publish_relay_outcome_summary: publish_record
.map(|record| record.relay_outcome_summary.clone()),
+ aggregate_publish_delivery_policy: publish_record
+ .and_then(|record| record.delivery_policy),
+ aggregate_publish_required_acknowledged_relay_count: publish_record
+ .and_then(|record| record.required_acknowledged_relay_count),
+ aggregate_publish_attempt_count: publish_record
+ .and_then(|record| record.publish_attempt_count),
repair_summary,
planned_repair_relays,
blocked_relays,
diff --git a/src/config.rs b/src/config.rs
@@ -84,6 +84,11 @@ pub struct MycTransportConfig {
pub enabled: bool,
pub connect_timeout_secs: u64,
pub relays: Vec<String>,
+ pub delivery_policy: MycTransportDeliveryPolicy,
+ pub delivery_quorum: Option<usize>,
+ pub publish_max_attempts: usize,
+ pub publish_initial_backoff_millis: u64,
+ pub publish_max_backoff_millis: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
@@ -93,6 +98,14 @@ pub enum MycConnectionApproval {
ExplicitUser,
}
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
+#[serde(rename_all = "snake_case")]
+pub enum MycTransportDeliveryPolicy {
+ Any,
+ Quorum,
+ All,
+}
+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default, deny_unknown_fields)]
pub struct MycPolicyConfig {
@@ -147,6 +160,11 @@ impl Default for MycTransportConfig {
enabled: false,
connect_timeout_secs: 10,
relays: Vec::new(),
+ delivery_policy: MycTransportDeliveryPolicy::Any,
+ delivery_quorum: None,
+ publish_max_attempts: 1,
+ publish_initial_backoff_millis: 250,
+ publish_max_backoff_millis: 2_000,
}
}
}
@@ -206,6 +224,16 @@ impl MycConnectionApproval {
}
}
+impl MycTransportDeliveryPolicy {
+ pub fn as_str(self) -> &'static str {
+ match self {
+ Self::Any => "any",
+ Self::Quorum => "quorum",
+ Self::All => "all",
+ }
+ }
+}
+
impl MycConfig {
pub fn load_from_default_env_path() -> Result<Self, MycError> {
Self::load_from_env_path(DEFAULT_ENV_PATH)
@@ -290,6 +318,56 @@ impl MycConfig {
));
}
+ if self.transport.publish_max_attempts == 0 {
+ return Err(MycError::InvalidConfig(
+ "transport.publish_max_attempts must be greater than zero".to_owned(),
+ ));
+ }
+
+ if self.transport.publish_initial_backoff_millis == 0 {
+ return Err(MycError::InvalidConfig(
+ "transport.publish_initial_backoff_millis must be greater than zero".to_owned(),
+ ));
+ }
+
+ if self.transport.publish_max_backoff_millis == 0 {
+ return Err(MycError::InvalidConfig(
+ "transport.publish_max_backoff_millis must be greater than zero".to_owned(),
+ ));
+ }
+
+ if self.transport.publish_initial_backoff_millis > self.transport.publish_max_backoff_millis
+ {
+ return Err(MycError::InvalidConfig(
+ "transport.publish_max_backoff_millis must be greater than or equal to transport.publish_initial_backoff_millis"
+ .to_owned(),
+ ));
+ }
+
+ match self.transport.delivery_policy {
+ MycTransportDeliveryPolicy::Quorum => {
+ let Some(delivery_quorum) = self.transport.delivery_quorum else {
+ return Err(MycError::InvalidConfig(
+ "transport.delivery_quorum must be set when transport.delivery_policy is `quorum`"
+ .to_owned(),
+ ));
+ };
+ if delivery_quorum == 0 {
+ return Err(MycError::InvalidConfig(
+ "transport.delivery_quorum must be greater than zero".to_owned(),
+ ));
+ }
+ }
+ MycTransportDeliveryPolicy::Any | MycTransportDeliveryPolicy::All => {
+ if self.transport.delivery_quorum.is_some() {
+ return Err(MycError::InvalidConfig(
+ "transport.delivery_quorum is only valid when transport.delivery_policy is `quorum`"
+ .to_owned(),
+ ));
+ }
+ }
+ }
+
let parsed_relays = self.transport.parse_relays()?;
if self.transport.enabled && parsed_relays.is_empty() {
return Err(MycError::InvalidConfig(
@@ -462,6 +540,25 @@ fn apply_env_entry(
"MYC_TRANSPORT_RELAYS" => {
config.transport.relays = parse_string_list_env(value);
}
+ "MYC_TRANSPORT_DELIVERY_POLICY" => {
+ config.transport.delivery_policy =
+ parse_delivery_policy_env(key, value, path, line_number)?;
+ }
+ "MYC_TRANSPORT_DELIVERY_QUORUM" => {
+ config.transport.delivery_quorum =
+ Some(parse_usize_env(key, value, path, line_number)?);
+ }
+ "MYC_TRANSPORT_PUBLISH_MAX_ATTEMPTS" => {
+ config.transport.publish_max_attempts = parse_usize_env(key, value, path, line_number)?;
+ }
+ "MYC_TRANSPORT_PUBLISH_INITIAL_BACKOFF_MILLIS" => {
+ config.transport.publish_initial_backoff_millis =
+ parse_u64_env(key, value, path, line_number)?;
+ }
+ "MYC_TRANSPORT_PUBLISH_MAX_BACKOFF_MILLIS" => {
+ config.transport.publish_max_backoff_millis =
+ parse_u64_env(key, value, path, line_number)?;
+ }
_ => {
return Err(config_parse_error(
path,
@@ -531,6 +628,24 @@ fn parse_connection_approval_env(
}
}
+fn parse_delivery_policy_env(
+ key: &str,
+ value: &str,
+ path: &Path,
+ line_number: usize,
+) -> Result<MycTransportDeliveryPolicy, MycError> {
+ match value {
+ "any" => Ok(MycTransportDeliveryPolicy::Any),
+ "quorum" => Ok(MycTransportDeliveryPolicy::Quorum),
+ "all" => Ok(MycTransportDeliveryPolicy::All),
+ _ => Err(config_parse_error(
+ path,
+ line_number,
+ format!("{key} must be `any`, `quorum`, or `all`"),
+ )),
+ }
+}
+
fn parse_optional_string_env(value: &str) -> Option<String> {
let value = value.trim();
if value.is_empty() {
@@ -771,6 +886,14 @@ mod tests {
assert!(!config.transport.enabled);
assert_eq!(config.transport.connect_timeout_secs, 10);
assert!(config.transport.relays.is_empty());
+ assert_eq!(
+ config.transport.delivery_policy,
+ MycTransportDeliveryPolicy::Any
+ );
+ assert_eq!(config.transport.delivery_quorum, None);
+ assert_eq!(config.transport.publish_max_attempts, 1);
+ assert_eq!(config.transport.publish_initial_backoff_millis, 250);
+ assert_eq!(config.transport.publish_max_backoff_millis, 2_000);
}
#[test]
@@ -804,6 +927,11 @@ MYC_POLICY_CONNECTION_APPROVAL=not_required
MYC_TRANSPORT_ENABLED=true
MYC_TRANSPORT_CONNECT_TIMEOUT_SECS=15
MYC_TRANSPORT_RELAYS=wss://relay.example.com,wss://relay2.example.com
+MYC_TRANSPORT_DELIVERY_POLICY=quorum
+MYC_TRANSPORT_DELIVERY_QUORUM=2
+MYC_TRANSPORT_PUBLISH_MAX_ATTEMPTS=4
+MYC_TRANSPORT_PUBLISH_INITIAL_BACKOFF_MILLIS=100
+MYC_TRANSPORT_PUBLISH_MAX_BACKOFF_MILLIS=800
"#,
)
.expect("config");
@@ -868,6 +996,14 @@ MYC_TRANSPORT_RELAYS=wss://relay.example.com,wss://relay2.example.com
"wss://relay2.example.com".to_owned()
]
);
+ assert_eq!(
+ config.transport.delivery_policy,
+ MycTransportDeliveryPolicy::Quorum
+ );
+ assert_eq!(config.transport.delivery_quorum, Some(2));
+ assert_eq!(config.transport.publish_max_attempts, 4);
+ assert_eq!(config.transport.publish_initial_backoff_millis, 100);
+ assert_eq!(config.transport.publish_max_backoff_millis, 800);
}
#[test]
@@ -954,6 +1090,56 @@ MYC_UNKNOWN=nope
}
#[test]
+ fn validate_rejects_invalid_delivery_policy_settings() {
+ let mut config = MycConfig::default();
+ config.transport.enabled = true;
+ config.transport.relays = vec!["wss://relay.example.com".to_owned()];
+ config.transport.delivery_policy = MycTransportDeliveryPolicy::Quorum;
+
+ let err = config
+ .validate()
+ .expect_err("missing quorum should be rejected");
+ assert!(err.to_string().contains("transport.delivery_quorum"));
+
+ config.transport.delivery_quorum = Some(0);
+ let err = config
+ .validate()
+ .expect_err("zero quorum should be rejected");
+ assert!(err.to_string().contains("greater than zero"));
+
+ config.transport.delivery_policy = MycTransportDeliveryPolicy::Any;
+ config.transport.delivery_quorum = Some(1);
+ let err = config
+ .validate()
+ .expect_err("quorum on non-quorum policy should be rejected");
+ assert!(err.to_string().contains("only valid"));
+ }
+
+ #[test]
+ fn validate_rejects_invalid_publish_retry_settings() {
+ let mut config = MycConfig::default();
+ config.transport.publish_max_attempts = 0;
+ let err = config.validate().expect_err("zero attempts");
+ assert!(err.to_string().contains("publish_max_attempts"));
+
+ config.transport.publish_max_attempts = 1;
+ config.transport.publish_initial_backoff_millis = 0;
+ let err = config.validate().expect_err("zero initial backoff");
+ assert!(err.to_string().contains("publish_initial_backoff_millis"));
+
+ config.transport.publish_initial_backoff_millis = 10;
+ config.transport.publish_max_backoff_millis = 0;
+ let err = config.validate().expect_err("zero max backoff");
+ assert!(err.to_string().contains("publish_max_backoff_millis"));
+
+ config.transport.publish_max_backoff_millis = 5;
+ let err = config
+ .validate()
+ .expect_err("max backoff less than initial");
+ assert!(err.to_string().contains("greater than or equal"));
+ }
+
+ #[test]
fn example_env_parses_and_validates() {
let example =
fs::read_to_string(PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(".env.example"))
@@ -970,6 +1156,14 @@ MYC_UNKNOWN=nope
Some(PathBuf::from("/var/log/radroots/services/myc"))
);
assert_eq!(
+ config.transport.delivery_policy,
+ MycTransportDeliveryPolicy::Any
+ );
+ assert_eq!(config.transport.delivery_quorum, None);
+ assert_eq!(config.transport.publish_max_attempts, 1);
+ assert_eq!(config.transport.publish_initial_backoff_millis, 250);
+ assert_eq!(config.transport.publish_max_backoff_millis, 2_000);
+ assert_eq!(
config.discovery.nip05_output_path,
Some(PathBuf::from("/var/lib/myc/public/.well-known/nostr.json"))
);
diff --git a/src/control.rs b/src/control.rs
@@ -117,24 +117,20 @@ pub async fn accept_client_uri(
let publish_outcome = match MycNostrTransport::publish_once(
runtime.signer_identity(),
&response_relays,
- transport.connect_timeout_secs(),
+ &runtime.config().transport,
+ "connect accept response publish",
event,
)
.await
{
Ok(outcome) => outcome,
Err(error) => {
- runtime.record_operation_audit(&MycOperationAuditRecord::new(
+ runtime.record_operation_audit(&record_publish_failure(
MycOperationAuditKind::ConnectAcceptPublish,
- MycOperationAuditOutcome::Rejected,
Some(&connection.connection_id),
Some(response_request_id.as_str()),
response_relays.len(),
- error
- .publish_rejection_counts()
- .map(|(_, acknowledged)| acknowledged)
- .unwrap_or_default(),
- publish_failure_summary(&error),
+ &error,
));
return Err(error);
}
@@ -252,24 +248,20 @@ async fn replay_authorized_request(
let publish_outcome = match MycNostrTransport::publish_once(
runtime.signer_identity(),
&publish_relays,
- transport.connect_timeout_secs(),
+ &runtime.config().transport,
+ "authorized auth replay publish",
event,
)
.await
{
Ok(publish_outcome) => publish_outcome,
Err(error) => {
- runtime.record_operation_audit(&MycOperationAuditRecord::new(
+ runtime.record_operation_audit(&record_publish_failure(
MycOperationAuditKind::AuthReplayPublish,
- MycOperationAuditOutcome::Rejected,
Some(&outcome.connection.connection_id),
Some(pending_request.request_message.id.as_str()),
publish_relays.len(),
- error
- .publish_rejection_counts()
- .map(|(_, acknowledged)| acknowledged)
- .unwrap_or_default(),
- publish_failure_summary(&error),
+ &error,
));
return Err(restore_pending_auth_challenge_on_error(
runtime,
@@ -309,7 +301,7 @@ fn restore_pending_auth_challenge_on_error(
.map_err(Into::into)
}) {
Ok(_) => {
- runtime.record_operation_audit(&MycOperationAuditRecord::new(
+ let mut record = MycOperationAuditRecord::new(
MycOperationAuditKind::AuthReplayRestore,
MycOperationAuditOutcome::Restored,
Some(connection_id),
@@ -323,7 +315,23 @@ fn restore_pending_auth_challenge_on_error(
.map(|(_, acknowledged)| acknowledged)
.unwrap_or_default(),
format!("restored pending auth challenge after replay failure: {summary}"),
- ));
+ );
+ if let (
+ Some(delivery_policy),
+ Some(required_acknowledged_relay_count),
+ Some(attempt_count),
+ ) = (
+ error.publish_delivery_policy(),
+ error.publish_required_acknowledged_relay_count(),
+ error.publish_attempt_count(),
+ ) {
+ record = record.with_delivery_details(
+ delivery_policy,
+ required_acknowledged_relay_count,
+ attempt_count,
+ );
+ }
+ runtime.record_operation_audit(&record);
error
}
Err(restore_error) => MycError::InvalidOperation(format!(
@@ -340,15 +348,22 @@ fn record_publish_audit(
request_id: Option<&str>,
publish_outcome: &MycPublishOutcome,
) {
- runtime.record_operation_audit(&MycOperationAuditRecord::new(
- operation,
- outcome,
- connection_id,
- request_id,
- publish_outcome.relay_count,
- publish_outcome.acknowledged_relay_count,
- publish_outcome.relay_outcome_summary.clone(),
- ));
+ runtime.record_operation_audit(
+ &MycOperationAuditRecord::new(
+ operation,
+ outcome,
+ connection_id,
+ request_id,
+ publish_outcome.relay_count,
+ publish_outcome.acknowledged_relay_count,
+ publish_outcome.relay_outcome_summary.clone(),
+ )
+ .with_delivery_details(
+ publish_outcome.delivery_policy,
+ publish_outcome.required_acknowledged_relay_count,
+ publish_outcome.attempt_count,
+ ),
+ );
}
fn publish_failure_summary(error: &MycError) -> String {
@@ -358,6 +373,39 @@ fn publish_failure_summary(error: &MycError) -> String {
.unwrap_or_else(|| error.to_string())
}
+fn record_publish_failure(
+ operation: MycOperationAuditKind,
+ connection_id: Option<&RadrootsNostrSignerConnectionId>,
+ request_id: Option<&str>,
+ relay_count: usize,
+ error: &MycError,
+) -> MycOperationAuditRecord {
+ let mut record = MycOperationAuditRecord::new(
+ operation,
+ MycOperationAuditOutcome::Rejected,
+ connection_id,
+ request_id,
+ relay_count,
+ error
+ .publish_rejection_counts()
+ .map(|(_, acknowledged)| acknowledged)
+ .unwrap_or_default(),
+ publish_failure_summary(error),
+ );
+ if let (Some(delivery_policy), Some(required_acknowledged_relay_count), Some(attempt_count)) = (
+ error.publish_delivery_policy(),
+ error.publish_required_acknowledged_relay_count(),
+ error.publish_attempt_count(),
+ ) {
+ record = record.with_delivery_details(
+ delivery_policy,
+ required_acknowledged_relay_count,
+ attempt_count,
+ );
+ }
+ record
+}
+
fn merge_relays(
primary: &[nostr::RelayUrl],
secondary: &[nostr::RelayUrl],
diff --git a/src/discovery.rs b/src/discovery.rs
@@ -518,7 +518,8 @@ async fn publish_nip89_event_to_relays(
let publish_outcome = match MycNostrTransport::publish_event_once(
context.app_identity(),
relays,
- context.connect_timeout_secs(),
+ &runtime.config().transport,
+ "discovery handler publish",
&event,
)
.await
@@ -543,6 +544,21 @@ async fn publish_nip89_event_to_relays(
.map(ToOwned::to_owned)
.unwrap_or_else(|| error.to_string()),
);
+ if let (
+ Some(delivery_policy),
+ Some(required_acknowledged_relay_count),
+ Some(attempt_count),
+ ) = (
+ error.publish_delivery_policy(),
+ error.publish_required_acknowledged_relay_count(),
+ error.publish_attempt_count(),
+ ) {
+ record = record.with_delivery_details(
+ delivery_policy,
+ required_acknowledged_relay_count,
+ attempt_count,
+ );
+ }
if let Some(attempt_id) = attempt_id {
record = record.with_attempt_id(attempt_id);
}
@@ -559,6 +575,11 @@ async fn publish_nip89_event_to_relays(
publish_outcome.relay_count,
publish_outcome.acknowledged_relay_count,
publish_outcome.relay_outcome_summary.clone(),
+ )
+ .with_delivery_details(
+ publish_outcome.delivery_policy,
+ publish_outcome.required_acknowledged_relay_count,
+ publish_outcome.attempt_count,
);
if let Some(attempt_id) = attempt_id {
record = record.with_attempt_id(attempt_id);
diff --git a/src/error.rs b/src/error.rs
@@ -6,6 +6,8 @@ use radroots_nostr_connect::prelude::RadrootsNostrConnectError;
use radroots_nostr_signer::prelude::RadrootsNostrSignerError;
use thiserror::Error;
+use crate::config::MycTransportDeliveryPolicy;
+
#[derive(Debug, Error)]
pub enum MycError {
#[error("config io error at {path}: {source}")]
@@ -99,11 +101,17 @@ pub enum MycError {
Nip46Encrypt(String),
#[error("NIP-46 listener notifications closed")]
Nip46ListenerClosed,
- #[error("Nostr publish failed for {operation}: {details}")]
+ #[error(
+ "Nostr publish failed for {operation} after {attempt_count} attempt(s) with delivery policy {} requiring {required_acknowledged_relay_count} acknowledgements: {details}",
+ delivery_policy.as_str()
+ )]
PublishRejected {
operation: String,
relay_count: usize,
acknowledged_relay_count: usize,
+ required_acknowledged_relay_count: usize,
+ delivery_policy: MycTransportDeliveryPolicy,
+ attempt_count: usize,
details: String,
rejected_relays: Vec<String>,
},
@@ -165,10 +173,43 @@ impl MycError {
_ => None,
}
}
+
+ pub fn publish_delivery_policy(&self) -> Option<MycTransportDeliveryPolicy> {
+ match self {
+ Self::PublishRejected {
+ delivery_policy, ..
+ } => Some(*delivery_policy),
+ Self::DiscoveryRefreshFailed { source, .. } => source.publish_delivery_policy(),
+ _ => None,
+ }
+ }
+
+ pub fn publish_attempt_count(&self) -> Option<usize> {
+ match self {
+ Self::PublishRejected { attempt_count, .. } => Some(*attempt_count),
+ Self::DiscoveryRefreshFailed { source, .. } => source.publish_attempt_count(),
+ _ => None,
+ }
+ }
+
+ pub fn publish_required_acknowledged_relay_count(&self) -> Option<usize> {
+ match self {
+ Self::PublishRejected {
+ required_acknowledged_relay_count,
+ ..
+ } => Some(*required_acknowledged_relay_count),
+ Self::DiscoveryRefreshFailed { source, .. } => {
+ source.publish_required_acknowledged_relay_count()
+ }
+ _ => None,
+ }
+ }
}
#[cfg(test)]
mod tests {
+ use crate::config::MycTransportDeliveryPolicy;
+
use super::MycError;
#[test]
@@ -177,6 +218,9 @@ mod tests {
operation: "discovery refresh".to_owned(),
relay_count: 2,
acknowledged_relay_count: 0,
+ required_acknowledged_relay_count: 1,
+ delivery_policy: MycTransportDeliveryPolicy::Any,
+ attempt_count: 2,
details: "relay-a: blocked".to_owned(),
rejected_relays: vec!["wss://relay-a.example.com".to_owned()],
}
@@ -192,5 +236,11 @@ mod tests {
wrapped.publish_rejected_relays(),
Some(["wss://relay-a.example.com".to_owned()].as_slice())
);
+ assert_eq!(
+ wrapped.publish_delivery_policy(),
+ Some(MycTransportDeliveryPolicy::Any)
+ );
+ assert_eq!(wrapped.publish_required_acknowledged_relay_count(), Some(1));
+ assert_eq!(wrapped.publish_attempt_count(), Some(2));
}
}
diff --git a/src/lib.rs b/src/lib.rs
@@ -18,7 +18,7 @@ pub use audit::{
pub use config::{
DEFAULT_ENV_PATH, MycAuditConfig, MycConfig, MycConnectionApproval, MycDiscoveryConfig,
MycDiscoveryMetadataConfig, MycLoggingConfig, MycPathsConfig, MycPolicyConfig,
- MycServiceConfig, MycTransportConfig,
+ MycServiceConfig, MycTransportConfig, MycTransportDeliveryPolicy,
};
pub use control::{MycAcceptedConnectionOutput, MycAuthorizedReplayOutput};
pub use discovery::{
diff --git a/src/transport.rs b/src/transport.rs
@@ -9,8 +9,9 @@ use radroots_nostr::prelude::{
RadrootsNostrRelayUrl,
};
use serde::Serialize;
+use tokio::time::sleep;
-use crate::config::MycTransportConfig;
+use crate::config::{MycTransportConfig, MycTransportDeliveryPolicy};
use crate::error::MycError;
pub use nip46::{MycNip46Handler, MycNip46Service};
@@ -20,6 +21,11 @@ pub struct MycNostrTransport {
client: RadrootsNostrClient,
relays: Vec<RadrootsNostrRelayUrl>,
connect_timeout_secs: u64,
+ delivery_policy: MycTransportDeliveryPolicy,
+ delivery_quorum: Option<usize>,
+ publish_max_attempts: usize,
+ publish_initial_backoff_millis: u64,
+ publish_max_backoff_millis: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -27,14 +33,23 @@ pub struct MycTransportSnapshot {
pub enabled: bool,
pub relay_count: usize,
pub connect_timeout_secs: u64,
+ pub delivery_policy: MycTransportDeliveryPolicy,
+ pub delivery_quorum: Option<usize>,
+ pub publish_max_attempts: usize,
+ pub publish_initial_backoff_millis: u64,
+ pub publish_max_backoff_millis: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MycPublishOutcome {
pub relay_count: usize,
pub acknowledged_relay_count: usize,
+ pub required_acknowledged_relay_count: usize,
+ pub delivery_policy: MycTransportDeliveryPolicy,
+ pub attempt_count: usize,
pub relay_outcome_summary: String,
pub relay_results: Vec<MycRelayPublishResult>,
+ pub attempt_summaries: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
@@ -45,6 +60,15 @@ pub struct MycRelayPublishResult {
pub detail: Option<String>,
}
+#[derive(Debug, Clone, PartialEq, Eq)]
+struct MycPublishSettings {
+ delivery_policy: MycTransportDeliveryPolicy,
+ delivery_quorum: Option<usize>,
+ publish_max_attempts: usize,
+ publish_initial_backoff_millis: u64,
+ publish_max_backoff_millis: u64,
+}
+
impl MycNostrTransport {
pub fn bootstrap(
config: &MycTransportConfig,
@@ -58,6 +82,11 @@ impl MycNostrTransport {
client: RadrootsNostrClient::from_identity(signer_identity),
relays: config.parse_relays()?,
connect_timeout_secs: config.connect_timeout_secs,
+ delivery_policy: config.delivery_policy,
+ delivery_quorum: config.delivery_quorum,
+ publish_max_attempts: config.publish_max_attempts,
+ publish_initial_backoff_millis: config.publish_initial_backoff_millis,
+ publish_max_backoff_millis: config.publish_max_backoff_millis,
}))
}
@@ -73,6 +102,10 @@ impl MycNostrTransport {
self.connect_timeout_secs
}
+ pub fn delivery_policy(&self) -> MycTransportDeliveryPolicy {
+ self.delivery_policy
+ }
+
pub async fn connect(&self) -> Result<(), MycError> {
for relay in &self.relays {
let _ = self.client.add_relay(relay.as_str()).await?;
@@ -87,7 +120,8 @@ impl MycNostrTransport {
pub async fn publish_once(
signer_identity: &RadrootsIdentity,
relays: &[RadrootsNostrRelayUrl],
- connect_timeout_secs: u64,
+ config: &MycTransportConfig,
+ operation: &str,
event: RadrootsNostrEventBuilder,
) -> Result<MycPublishOutcome, MycError> {
if relays.is_empty() {
@@ -96,22 +130,19 @@ impl MycNostrTransport {
));
}
- let client = RadrootsNostrClient::from_identity(signer_identity);
- for relay in relays {
- let _ = client.add_relay(relay.as_str()).await?;
- }
- client.connect().await;
- client
- .wait_for_connection(Duration::from_secs(connect_timeout_secs))
- .await;
- let output = client.send_event_builder(event).await?;
- ensure_publish_confirmed(relays, output, "one-shot Nostr publish")
+ let event = event
+ .sign_with_keys(signer_identity.keys())
+ .map_err(|error| {
+ MycError::InvalidOperation(format!("failed to sign publish event: {error}"))
+ })?;
+ Self::publish_event_once(signer_identity, relays, config, operation, &event).await
}
pub async fn publish_event_once(
signer_identity: &RadrootsIdentity,
relays: &[RadrootsNostrRelayUrl],
- connect_timeout_secs: u64,
+ config: &MycTransportConfig,
+ operation: &str,
event: &RadrootsNostrEvent,
) -> Result<MycPublishOutcome, MycError> {
if relays.is_empty() {
@@ -120,16 +151,44 @@ impl MycNostrTransport {
));
}
- let client = RadrootsNostrClient::from_identity(signer_identity);
- for relay in relays {
- let _ = client.add_relay(relay.as_str()).await?;
- }
- client.connect().await;
- client
- .wait_for_connection(Duration::from_secs(connect_timeout_secs))
- .await;
- let output = client.send_event(event).await?;
- ensure_publish_confirmed(relays, output, "one-shot Nostr publish")
+ let settings = MycPublishSettings::from_config(config);
+ publish_with_policy(relays, &settings, operation, || async {
+ let client = RadrootsNostrClient::from_identity(signer_identity);
+ for relay in relays {
+ client
+ .add_relay(relay.as_str())
+ .await
+ .map_err(|error| error.to_string())?;
+ }
+ client.connect().await;
+ client
+ .wait_for_connection(Duration::from_secs(config.connect_timeout_secs))
+ .await;
+ client
+ .send_event(event)
+ .await
+ .map_err(|error| error.to_string())
+ })
+ .await
+ }
+
+ pub async fn publish_event(
+ &self,
+ operation: &str,
+ event: &RadrootsNostrEvent,
+ ) -> Result<MycPublishOutcome, MycError> {
+ publish_with_policy(
+ self.relays(),
+ &self.publish_settings(),
+ operation,
+ || async {
+ self.client
+ .send_event(event)
+ .await
+ .map_err(|error| error.to_string())
+ },
+ )
+ .await
}
pub fn snapshot(&self) -> MycTransportSnapshot {
@@ -137,41 +196,98 @@ impl MycNostrTransport {
enabled: true,
relay_count: self.relays.len(),
connect_timeout_secs: self.connect_timeout_secs,
+ delivery_policy: self.delivery_policy,
+ delivery_quorum: self.delivery_quorum,
+ publish_max_attempts: self.publish_max_attempts,
+ publish_initial_backoff_millis: self.publish_initial_backoff_millis,
+ publish_max_backoff_millis: self.publish_max_backoff_millis,
+ }
+ }
+
+ fn publish_settings(&self) -> MycPublishSettings {
+ MycPublishSettings {
+ delivery_policy: self.delivery_policy,
+ delivery_quorum: self.delivery_quorum,
+ publish_max_attempts: self.publish_max_attempts,
+ publish_initial_backoff_millis: self.publish_initial_backoff_millis,
+ publish_max_backoff_millis: self.publish_max_backoff_millis,
}
}
}
-pub(crate) fn ensure_publish_confirmed<T>(
+async fn publish_with_policy<T, F, Fut>(
relays: &[RadrootsNostrRelayUrl],
- output: RadrootsNostrOutput<T>,
+ settings: &MycPublishSettings,
operation: &str,
+ mut send_attempt: F,
) -> Result<MycPublishOutcome, MycError>
where
T: std::fmt::Debug,
+ F: FnMut() -> Fut,
+ Fut: std::future::Future<Output = Result<RadrootsNostrOutput<T>, String>>,
{
- let relay_results = build_publish_relay_results(relays, &output);
- let relay_count = relay_results.len();
- let acknowledged_relay_count = relay_results
- .iter()
- .filter(|result| result.acknowledged)
- .count();
- let relay_outcome_summary = summarize_publish_results(&relay_results);
-
- if !output.success.is_empty() {
- return Ok(MycPublishOutcome {
- relay_count,
- acknowledged_relay_count,
- relay_outcome_summary,
- relay_results,
- });
+ let relay_count = relays.len();
+ let required_acknowledged_relay_count =
+ settings.required_acknowledged_relay_count(relay_count)?;
+ let mut attempt_results = Vec::new();
+
+ for attempt_number in 1..=settings.publish_max_attempts {
+ let attempt = match send_attempt().await {
+ Ok(output) => build_publish_attempt_result(relays, attempt_number, &output),
+ Err(error) => build_failed_publish_attempt_result(relays, attempt_number, error),
+ };
+ let threshold_reached =
+ attempt.acknowledged_relay_count >= required_acknowledged_relay_count;
+ attempt_results.push(attempt);
+
+ if threshold_reached {
+ let final_attempt = attempt_results
+ .last()
+ .expect("publish attempt results contain the successful attempt");
+ return Ok(MycPublishOutcome {
+ relay_count,
+ acknowledged_relay_count: final_attempt.acknowledged_relay_count,
+ required_acknowledged_relay_count,
+ delivery_policy: settings.delivery_policy,
+ attempt_count: attempt_results.len(),
+ relay_outcome_summary: summarize_delivery_policy_result(
+ settings.delivery_policy,
+ required_acknowledged_relay_count,
+ &attempt_results,
+ ),
+ relay_results: final_attempt.relay_results.clone(),
+ attempt_summaries: attempt_results
+ .iter()
+ .map(|attempt| attempt.relay_outcome_summary.clone())
+ .collect(),
+ });
+ }
+
+ if attempt_number < settings.publish_max_attempts {
+ sleep(Duration::from_millis(
+ settings.backoff_for_attempt(attempt_number),
+ ))
+ .await;
+ }
}
+ let final_attempt = attempt_results
+ .last()
+ .expect("publish attempt results contain at least one attempt");
Err(MycError::PublishRejected {
operation: operation.to_owned(),
relay_count,
- acknowledged_relay_count,
- details: relay_outcome_summary,
- rejected_relays: relay_results
+ acknowledged_relay_count: final_attempt.acknowledged_relay_count,
+ required_acknowledged_relay_count,
+ delivery_policy: settings.delivery_policy,
+ attempt_count: attempt_results.len(),
+ details: summarize_delivery_policy_result(
+ settings.delivery_policy,
+ required_acknowledged_relay_count,
+ &attempt_results,
+ ),
+ rejected_relays: final_attempt
+ .relay_results
.iter()
.filter(|result| !result.acknowledged)
.map(|result| result.relay_url.clone())
@@ -223,6 +339,48 @@ where
.collect()
}
+fn build_publish_attempt_result<T>(
+ relays: &[RadrootsNostrRelayUrl],
+ attempt_number: usize,
+ output: &RadrootsNostrOutput<T>,
+) -> MycPublishAttemptResult
+where
+ T: std::fmt::Debug,
+{
+ let relay_results = build_publish_relay_results(relays, output);
+ let acknowledged_relay_count = relay_results
+ .iter()
+ .filter(|result| result.acknowledged)
+ .count();
+ MycPublishAttemptResult {
+ attempt_number,
+ acknowledged_relay_count,
+ relay_outcome_summary: summarize_publish_results(&relay_results),
+ relay_results,
+ }
+}
+
+fn build_failed_publish_attempt_result(
+ relays: &[RadrootsNostrRelayUrl],
+ attempt_number: usize,
+ error: String,
+) -> MycPublishAttemptResult {
+ let relay_results = relays
+ .iter()
+ .map(|relay| MycRelayPublishResult {
+ relay_url: relay.to_string(),
+ acknowledged: false,
+ detail: Some(error.clone()),
+ })
+ .collect::<Vec<_>>();
+ MycPublishAttemptResult {
+ attempt_number,
+ acknowledged_relay_count: 0,
+ relay_outcome_summary: summarize_publish_results(&relay_results),
+ relay_results,
+ }
+}
+
fn summarize_publish_results(relay_results: &[MycRelayPublishResult]) -> String {
let relay_count = relay_results.len();
let acknowledged_relay_count = relay_results
@@ -235,6 +393,15 @@ fn summarize_publish_results(relay_results: &[MycRelayPublishResult]) -> String
let mut summary =
format!("{acknowledged_relay_count}/{relay_count} relays acknowledged publish");
+ let acknowledged = relay_results
+ .iter()
+ .filter(|result| result.acknowledged)
+ .map(|result| result.relay_url.clone())
+ .collect::<Vec<_>>();
+ if !acknowledged.is_empty() {
+ summary.push_str("; acknowledged: ");
+ summary.push_str(&acknowledged.join(", "));
+ }
let failures = relay_results
.iter()
.filter(|result| !result.acknowledged)
@@ -250,23 +417,115 @@ fn summarize_publish_results(relay_results: &[MycRelayPublishResult]) -> String
summary
}
+fn summarize_delivery_policy_result(
+ delivery_policy: MycTransportDeliveryPolicy,
+ required_acknowledged_relay_count: usize,
+ attempt_results: &[MycPublishAttemptResult],
+) -> String {
+ let attempt_count = attempt_results.len();
+ let final_attempt = attempt_results
+ .last()
+ .expect("delivery policy summary requires at least one attempt");
+ let mut summary = format!(
+ "delivery policy {} required {required_acknowledged_relay_count} acknowledgements across {attempt_count} attempt(s); final attempt {}: {}",
+ delivery_policy.as_str(),
+ final_attempt.attempt_number,
+ final_attempt.relay_outcome_summary,
+ );
+ if attempt_results.len() > 1 {
+ let attempt_summaries = attempt_results
+ .iter()
+ .map(|attempt| {
+ format!(
+ "attempt {}: {}",
+ attempt.attempt_number, attempt.relay_outcome_summary
+ )
+ })
+ .collect::<Vec<_>>();
+ summary.push_str("; ");
+ summary.push_str(&attempt_summaries.join(" | "));
+ }
+ summary
+}
+
impl MycTransportSnapshot {
pub fn disabled() -> Self {
Self {
enabled: false,
relay_count: 0,
connect_timeout_secs: 0,
+ delivery_policy: MycTransportDeliveryPolicy::Any,
+ delivery_quorum: None,
+ publish_max_attempts: 1,
+ publish_initial_backoff_millis: 250,
+ publish_max_backoff_millis: 2_000,
}
}
}
+#[derive(Debug, Clone, PartialEq, Eq)]
+struct MycPublishAttemptResult {
+ attempt_number: usize,
+ acknowledged_relay_count: usize,
+ relay_outcome_summary: String,
+ relay_results: Vec<MycRelayPublishResult>,
+}
+
+impl MycPublishSettings {
+ fn from_config(config: &MycTransportConfig) -> Self {
+ Self {
+ delivery_policy: config.delivery_policy,
+ delivery_quorum: config.delivery_quorum,
+ publish_max_attempts: config.publish_max_attempts,
+ publish_initial_backoff_millis: config.publish_initial_backoff_millis,
+ publish_max_backoff_millis: config.publish_max_backoff_millis,
+ }
+ }
+
+ fn required_acknowledged_relay_count(&self, relay_count: usize) -> Result<usize, MycError> {
+ match self.delivery_policy {
+ MycTransportDeliveryPolicy::Any => Ok(1),
+ MycTransportDeliveryPolicy::All => Ok(relay_count),
+ MycTransportDeliveryPolicy::Quorum => {
+ let delivery_quorum = self.delivery_quorum.ok_or_else(|| {
+ MycError::InvalidConfig(
+ "transport.delivery_quorum must be set when transport.delivery_policy is `quorum`"
+ .to_owned(),
+ )
+ })?;
+ if delivery_quorum > relay_count {
+ return Err(MycError::InvalidOperation(format!(
+ "transport.delivery_quorum `{delivery_quorum}` cannot be satisfied by `{relay_count}` target relays"
+ )));
+ }
+ Ok(delivery_quorum)
+ }
+ }
+ }
+
+ fn backoff_for_attempt(&self, completed_attempt_number: usize) -> u64 {
+ let exponent = completed_attempt_number.saturating_sub(1) as u32;
+ let scaled = self
+ .publish_initial_backoff_millis
+ .saturating_mul(2_u64.saturating_pow(exponent));
+ scaled.min(self.publish_max_backoff_millis)
+ }
+}
+
#[cfg(test)]
mod tests {
+ use std::collections::{HashMap, HashSet};
+ use std::sync::{Arc, Mutex};
+
use radroots_identity::RadrootsIdentity;
+ use radroots_nostr::prelude::{
+ RadrootsNostrEventId, RadrootsNostrOutput, RadrootsNostrRelayUrl,
+ };
+ use tokio::time::Instant;
- use crate::config::MycTransportConfig;
+ use crate::config::{MycTransportConfig, MycTransportDeliveryPolicy};
- use super::{MycNostrTransport, MycTransportSnapshot};
+ use super::{MycNostrTransport, MycPublishSettings, MycTransportSnapshot, publish_with_policy};
fn signer_identity() -> RadrootsIdentity {
RadrootsIdentity::from_secret_key_str(
@@ -294,6 +553,11 @@ mod tests {
"wss://relay.example.com".to_owned(),
"wss://relay2.example.com".to_owned(),
];
+ config.delivery_policy = MycTransportDeliveryPolicy::Quorum;
+ config.delivery_quorum = Some(2);
+ config.publish_max_attempts = 3;
+ config.publish_initial_backoff_millis = 125;
+ config.publish_max_backoff_millis = 500;
let transport = MycNostrTransport::bootstrap(&config, &signer_identity())
.expect("transport")
@@ -307,7 +571,148 @@ mod tests {
enabled: true,
relay_count: 2,
connect_timeout_secs: 15,
+ delivery_policy: MycTransportDeliveryPolicy::Quorum,
+ delivery_quorum: Some(2),
+ publish_max_attempts: 3,
+ publish_initial_backoff_millis: 125,
+ publish_max_backoff_millis: 500,
+ }
+ );
+ }
+
+ #[tokio::test]
+ async fn publish_with_policy_retries_until_threshold_is_met() {
+ let relays = vec![
+ RadrootsNostrRelayUrl::parse("wss://relay-a.example.com").expect("relay-a"),
+ RadrootsNostrRelayUrl::parse("wss://relay-b.example.com").expect("relay-b"),
+ ];
+ let settings = MycPublishSettings {
+ delivery_policy: MycTransportDeliveryPolicy::All,
+ delivery_quorum: None,
+ publish_max_attempts: 2,
+ publish_initial_backoff_millis: 10,
+ publish_max_backoff_millis: 10,
+ };
+ let attempts = Arc::new(Mutex::new(vec![
+ publish_output(
+ "1111111111111111111111111111111111111111111111111111111111111111",
+ &["wss://relay-a.example.com"],
+ &[("wss://relay-b.example.com", "blocked")],
+ ),
+ publish_output(
+ "2222222222222222222222222222222222222222222222222222222222222222",
+ &["wss://relay-a.example.com", "wss://relay-b.example.com"],
+ &[],
+ ),
+ ]));
+
+ let start = Instant::now();
+ let outcome = publish_with_policy(&relays, &settings, "test publish", || {
+ let attempts = Arc::clone(&attempts);
+ async move {
+ let output = attempts.lock().expect("attempts lock").remove(0);
+ Ok(output)
}
+ })
+ .await
+ .expect("publish succeeds on retry");
+
+ assert_eq!(outcome.delivery_policy, MycTransportDeliveryPolicy::All);
+ assert_eq!(outcome.required_acknowledged_relay_count, 2);
+ assert_eq!(outcome.attempt_count, 2);
+ assert_eq!(outcome.acknowledged_relay_count, 2);
+ assert_eq!(outcome.relay_results.len(), 2);
+ assert_eq!(outcome.attempt_summaries.len(), 2);
+ assert!(
+ outcome
+ .relay_outcome_summary
+ .contains("delivery policy all")
+ );
+ assert!(outcome.relay_outcome_summary.contains("attempt 1"));
+ assert!(start.elapsed() >= std::time::Duration::from_millis(10));
+ }
+
+ #[tokio::test]
+ async fn publish_with_policy_reports_threshold_failure() {
+ let relays = vec![
+ RadrootsNostrRelayUrl::parse("wss://relay-a.example.com").expect("relay-a"),
+ RadrootsNostrRelayUrl::parse("wss://relay-b.example.com").expect("relay-b"),
+ ];
+ let settings = MycPublishSettings {
+ delivery_policy: MycTransportDeliveryPolicy::Quorum,
+ delivery_quorum: Some(2),
+ publish_max_attempts: 2,
+ publish_initial_backoff_millis: 1,
+ publish_max_backoff_millis: 1,
+ };
+
+ let error = publish_with_policy::<RadrootsNostrEventId, _, _>(
+ &relays,
+ &settings,
+ "test publish",
+ || async {
+ Ok(publish_output(
+ "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
+ &["wss://relay-a.example.com"],
+ &[("wss://relay-b.example.com", "blocked")],
+ ))
+ },
+ )
+ .await
+ .expect_err("quorum should fail without both acknowledgements");
+
+ assert_eq!(
+ error.publish_delivery_policy(),
+ Some(MycTransportDeliveryPolicy::Quorum)
+ );
+ assert_eq!(error.publish_required_acknowledged_relay_count(), Some(2));
+ assert_eq!(error.publish_attempt_count(), Some(2));
+ assert!(error.to_string().contains("delivery policy quorum"));
+ }
+
+ #[test]
+ fn publish_settings_reject_impossible_quorum_for_target_relays() {
+ let settings = MycPublishSettings {
+ delivery_policy: MycTransportDeliveryPolicy::Quorum,
+ delivery_quorum: Some(3),
+ publish_max_attempts: 1,
+ publish_initial_backoff_millis: 10,
+ publish_max_backoff_millis: 100,
+ };
+
+ let error = settings
+ .required_acknowledged_relay_count(2)
+ .expect_err("impossible quorum");
+ assert!(
+ error
+ .to_string()
+ .contains("cannot be satisfied by `2` target relays")
);
}
+
+ fn publish_output(
+ event_id_hex: &str,
+ succeeded_relays: &[&str],
+ failed_relays: &[(&str, &str)],
+ ) -> RadrootsNostrOutput<RadrootsNostrEventId> {
+ let success = succeeded_relays
+ .iter()
+ .map(|relay| RadrootsNostrRelayUrl::parse(*relay).expect("success relay"))
+ .collect::<HashSet<_>>();
+ let failed = failed_relays
+ .iter()
+ .map(|(relay, error)| {
+ (
+ RadrootsNostrRelayUrl::parse(*relay).expect("failed relay"),
+ (*error).to_owned(),
+ )
+ })
+ .collect::<HashMap<_, _>>();
+
+ RadrootsNostrOutput {
+ val: RadrootsNostrEventId::parse(event_id_hex).expect("event id"),
+ success,
+ failed,
+ }
+ }
}
diff --git a/src/transport/nip46.rs b/src/transport/nip46.rs
@@ -22,7 +22,7 @@ use tokio::sync::broadcast;
use crate::app::MycSignerContext;
use crate::audit::{MycOperationAuditKind, MycOperationAuditOutcome, MycOperationAuditRecord};
use crate::error::MycError;
-use crate::transport::{MycNostrTransport, ensure_publish_confirmed};
+use crate::transport::MycNostrTransport;
#[derive(Clone)]
pub struct MycNip46Handler {
@@ -157,6 +157,17 @@ impl MycNip46Handler {
secret: Option<String>,
) -> Result<MycNip46HandledRequest, MycError> {
let manager = self.signer.load_signer_manager()?;
+ if let Some(connect_secret) = secret.as_deref() {
+ if let Some(connection) = manager.find_connection_by_connect_secret(connect_secret)? {
+ if connection.connect_secret_is_consumed() {
+ tracing::debug!(
+ connection_id = %connection.connection_id,
+ "ignoring reused consumed NIP-46 connect secret"
+ );
+ return Ok(MycNip46HandledRequest::Ignore);
+ }
+ }
+ }
let evaluation = manager.evaluate_connect_request(client_public_key, request)?;
match evaluation {
@@ -496,36 +507,32 @@ impl MycNip46Service {
let response_event =
self.handler
.build_response_event(event.pubkey, request_id.as_str(), response)?;
- let publish_output = match self
+ let response_event =
+ match response_event.sign_with_keys(self.handler.signer.signer_identity().keys()) {
+ Ok(event) => event,
+ Err(error) => {
+ self.handler
+ .signer
+ .record_operation_audit(&MycOperationAuditRecord::new(
+ MycOperationAuditKind::ListenerResponsePublish,
+ MycOperationAuditOutcome::Rejected,
+ connection_id.as_ref(),
+ Some(request_id.as_str()),
+ self.transport.relays().len(),
+ 0,
+ format!("failed to sign NIP-46 response event: {error}"),
+ ));
+ continue;
+ }
+ };
+ let publish_outcome = match self
.transport
- .client()
- .send_event_builder(response_event)
+ .publish_event("NIP-46 response publish", &response_event)
.await
{
- Ok(output) => output,
+ Ok(publish_outcome) => publish_outcome,
Err(error) => {
- self.handler
- .signer
- .record_operation_audit(&MycOperationAuditRecord::new(
- MycOperationAuditKind::ListenerResponsePublish,
- MycOperationAuditOutcome::Rejected,
- connection_id.as_ref(),
- Some(request_id.as_str()),
- self.transport.relays().len(),
- 0,
- error.to_string(),
- ));
- continue;
- }
- };
- if let Err(error) = ensure_publish_confirmed(
- self.transport.relays(),
- publish_output,
- "NIP-46 response publish",
- ) {
- self.handler
- .signer
- .record_operation_audit(&MycOperationAuditRecord::new(
+ let mut record = MycOperationAuditRecord::new(
MycOperationAuditKind::ListenerResponsePublish,
MycOperationAuditOutcome::Rejected,
connection_id.as_ref(),
@@ -542,9 +549,42 @@ impl MycNip46Service {
.publish_rejection_details()
.map(ToOwned::to_owned)
.unwrap_or_else(|| error.to_string()),
- ));
- continue;
- }
+ );
+ if let (
+ Some(delivery_policy),
+ Some(required_acknowledged_relay_count),
+ Some(attempt_count),
+ ) = (
+ error.publish_delivery_policy(),
+ error.publish_required_acknowledged_relay_count(),
+ error.publish_attempt_count(),
+ ) {
+ record = record.with_delivery_details(
+ delivery_policy,
+ required_acknowledged_relay_count,
+ attempt_count,
+ );
+ }
+ self.handler.signer.record_operation_audit(&record);
+ continue;
+ }
+ };
+ self.handler.signer.record_operation_audit(
+ &MycOperationAuditRecord::new(
+ MycOperationAuditKind::ListenerResponsePublish,
+ MycOperationAuditOutcome::Succeeded,
+ connection_id.as_ref(),
+ Some(request_id.as_str()),
+ publish_outcome.relay_count,
+ publish_outcome.acknowledged_relay_count,
+ publish_outcome.relay_outcome_summary.clone(),
+ )
+ .with_delivery_details(
+ publish_outcome.delivery_policy,
+ publish_outcome.required_acknowledged_relay_count,
+ publish_outcome.attempt_count,
+ ),
+ );
if let Some(connection_id) = consume_connect_secret_for {
if let Err(error) = self
.handler
diff --git a/tests/nip46_e2e.rs b/tests/nip46_e2e.rs
@@ -8,8 +8,8 @@ use myc::control;
use myc::{
MycConfig, MycConnectionApproval, MycDiscoveryContext, MycDiscoveryLiveStatus,
MycDiscoveryRelayFetchStatus, MycDiscoveryRepairOutcome, MycOperationAuditKind,
- MycOperationAuditOutcome, MycOperationAuditRecord, MycRuntime, diff_live_nip89,
- fetch_live_nip89, publish_nip89_event, refresh_nip89,
+ MycOperationAuditOutcome, MycOperationAuditRecord, MycRuntime, MycTransportDeliveryPolicy,
+ diff_live_nip89, fetch_live_nip89, publish_nip89_event, refresh_nip89,
};
use nostr::filter::MatchEventOptions;
use nostr::nips::nip44;
@@ -386,6 +386,21 @@ struct MycTestRuntime {
impl MycTestRuntime {
fn new(relay_url: &str, approval: MycConnectionApproval) -> Self {
+ Self::new_with_transport_relays(&[relay_url], approval)
+ }
+
+ fn new_with_transport_relays(relay_urls: &[&str], approval: MycConnectionApproval) -> Self {
+ Self::new_with_transport_config(relay_urls, approval, |_| {})
+ }
+
+ fn new_with_transport_config<F>(
+ relay_urls: &[&str],
+ approval: MycConnectionApproval,
+ configure: F,
+ ) -> Self
+ where
+ F: FnOnce(&mut MycConfig),
+ {
let temp = tempfile::tempdir().expect("tempdir");
let mut config = MycConfig::default();
config.paths.state_dir = temp.path().join("state");
@@ -394,7 +409,8 @@ impl MycTestRuntime {
config.policy.connection_approval = approval;
config.transport.enabled = true;
config.transport.connect_timeout_secs = 1;
- config.transport.relays = vec![relay_url.to_owned()];
+ config.transport.relays = relay_urls.iter().map(|relay| (*relay).to_owned()).collect();
+ configure(&mut config);
write_identity(
&config.paths.signer_identity_path,
"1111111111111111111111111111111111111111111111111111111111111111",
@@ -778,7 +794,17 @@ async fn live_listener_consumes_connect_secret_only_after_successful_publish() -
.len(),
1
);
- assert_eq!(runtime.operation_audit_store().list()?.len(), 1);
+ let operation_audit = runtime.operation_audit_store().list()?;
+ assert_eq!(operation_audit.len(), 2);
+ assert_eq!(
+ operation_audit[1].operation,
+ MycOperationAuditKind::ListenerResponsePublish
+ );
+ assert_eq!(
+ operation_audit[1].outcome,
+ MycOperationAuditOutcome::Succeeded
+ );
+ assert_eq!(operation_audit[1].request_id.as_deref(), Some("connect-2"));
let _ = shutdown_tx.send(());
listener_task.await??;
@@ -899,6 +925,254 @@ async fn connect_accept_retries_without_consuming_secret_until_publish_succeeds(
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
+async fn connect_accept_succeeds_with_any_delivery_policy_when_one_relay_acknowledges()
+-> TestResult<()> {
+ let relay_a = TestRelay::spawn().await?;
+ let relay_b = TestRelay::spawn().await?;
+ let test_runtime = MycTestRuntime::new_with_transport_config(
+ &[relay_a.url(), relay_b.url()],
+ MycConnectionApproval::NotRequired,
+ |config| {
+ config.transport.delivery_policy = MycTransportDeliveryPolicy::Any;
+ config.transport.publish_max_attempts = 1;
+ },
+ );
+ let runtime = test_runtime.runtime;
+ let signer_public_key = runtime.signer_identity().public_key();
+ let client_identity =
+ identity("5555555555555555555555555555555555555555555555555555555555555555");
+
+ relay_a
+ .queue_publish_outcomes(signer_public_key, &[false])
+ .await;
+ relay_b
+ .queue_publish_outcomes(signer_public_key, &[true])
+ .await;
+
+ let client_uri = RadrootsNostrConnectUri::Client(RadrootsNostrConnectClientUri {
+ client_public_key: client_identity.public_key(),
+ relays: vec![
+ nostr::RelayUrl::parse(relay_a.url())?,
+ nostr::RelayUrl::parse(relay_b.url())?,
+ ],
+ secret: "delivery-any-secret".to_owned(),
+ metadata: RadrootsNostrConnectClientMetadata::default(),
+ })
+ .to_string();
+
+ let accepted = control::accept_client_uri(&runtime, &client_uri).await?;
+ assert_eq!(accepted.response_relays.len(), 2);
+ let stored = runtime
+ .signer_manager()?
+ .list_connections()?
+ .into_iter()
+ .find(|connection| connection.connection_id == accepted.connection.connection_id)
+ .expect("stored connection");
+ assert!(stored.connect_secret_is_consumed());
+
+ let operation_audit = wait_for_operation_audit_count(&runtime, 1).await?;
+ assert_eq!(
+ operation_audit[0].operation,
+ MycOperationAuditKind::ConnectAcceptPublish
+ );
+ assert_eq!(
+ operation_audit[0].outcome,
+ MycOperationAuditOutcome::Succeeded
+ );
+ assert_eq!(operation_audit[0].relay_count, 2);
+ assert_eq!(operation_audit[0].acknowledged_relay_count, 1);
+ assert_eq!(
+ operation_audit[0].delivery_policy,
+ Some(MycTransportDeliveryPolicy::Any)
+ );
+ assert_eq!(
+ operation_audit[0].required_acknowledged_relay_count,
+ Some(1)
+ );
+ assert_eq!(operation_audit[0].publish_attempt_count, Some(1));
+ assert!(
+ operation_audit[0]
+ .relay_outcome_summary
+ .contains("delivery policy any")
+ );
+
+ Ok(())
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
+async fn connect_accept_rejects_when_quorum_delivery_policy_is_not_met() -> TestResult<()> {
+ let relay_a = TestRelay::spawn().await?;
+ let relay_b = TestRelay::spawn().await?;
+ let test_runtime = MycTestRuntime::new_with_transport_config(
+ &[relay_a.url(), relay_b.url()],
+ MycConnectionApproval::NotRequired,
+ |config| {
+ config.transport.delivery_policy = MycTransportDeliveryPolicy::Quorum;
+ config.transport.delivery_quorum = Some(2);
+ config.transport.publish_max_attempts = 1;
+ },
+ );
+ let runtime = test_runtime.runtime;
+ let signer_public_key = runtime.signer_identity().public_key();
+ let client_identity =
+ identity("6666666666666666666666666666666666666666666666666666666666666665");
+
+ relay_a
+ .queue_publish_outcomes(signer_public_key, &[true])
+ .await;
+ relay_b
+ .queue_publish_outcomes(signer_public_key, &[false])
+ .await;
+
+ let client_uri = RadrootsNostrConnectUri::Client(RadrootsNostrConnectClientUri {
+ client_public_key: client_identity.public_key(),
+ relays: vec![
+ nostr::RelayUrl::parse(relay_a.url())?,
+ nostr::RelayUrl::parse(relay_b.url())?,
+ ],
+ secret: "delivery-quorum-secret".to_owned(),
+ metadata: RadrootsNostrConnectClientMetadata::default(),
+ })
+ .to_string();
+
+ let error = control::accept_client_uri(&runtime, &client_uri)
+ .await
+ .expect_err("quorum publish should fail");
+ assert!(
+ error
+ .to_string()
+ .contains("delivery policy quorum requiring 2 acknowledgements")
+ );
+ assert_eq!(
+ error.publish_delivery_policy(),
+ Some(MycTransportDeliveryPolicy::Quorum)
+ );
+ assert_eq!(error.publish_required_acknowledged_relay_count(), Some(2));
+ assert_eq!(error.publish_attempt_count(), Some(1));
+
+ let stored = runtime
+ .signer_manager()?
+ .list_connections()?
+ .into_iter()
+ .next()
+ .expect("stored connection");
+ assert!(!stored.connect_secret_is_consumed());
+
+ let operation_audit = wait_for_operation_audit_count(&runtime, 1).await?;
+ assert_eq!(
+ operation_audit[0].operation,
+ MycOperationAuditKind::ConnectAcceptPublish
+ );
+ assert_eq!(
+ operation_audit[0].outcome,
+ MycOperationAuditOutcome::Rejected
+ );
+ assert_eq!(operation_audit[0].relay_count, 2);
+ assert_eq!(operation_audit[0].acknowledged_relay_count, 1);
+ assert_eq!(
+ operation_audit[0].delivery_policy,
+ Some(MycTransportDeliveryPolicy::Quorum)
+ );
+ assert_eq!(
+ operation_audit[0].required_acknowledged_relay_count,
+ Some(2)
+ );
+ assert_eq!(operation_audit[0].publish_attempt_count, Some(1));
+
+ Ok(())
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
+async fn live_listener_retries_until_all_delivery_policy_is_met() -> TestResult<()> {
+ let relay_a = TestRelay::spawn().await?;
+ let relay_b = TestRelay::spawn().await?;
+ let test_runtime = MycTestRuntime::new_with_transport_config(
+ &[relay_a.url(), relay_b.url()],
+ MycConnectionApproval::NotRequired,
+ |config| {
+ config.transport.delivery_policy = MycTransportDeliveryPolicy::All;
+ config.transport.publish_max_attempts = 2;
+ config.transport.publish_initial_backoff_millis = 10;
+ config.transport.publish_max_backoff_millis = 10;
+ },
+ );
+ let runtime = test_runtime.runtime.clone();
+ let signer_public_key = runtime.signer_identity().public_key();
+ let client_identity =
+ identity("7777777777777777777777777777777777777777777777777777777777777777");
+ let base_created_at = Timestamp::now().as_secs();
+
+ relay_a
+ .queue_publish_outcomes(signer_public_key, &[true, true])
+ .await;
+ relay_b
+ .queue_publish_outcomes(signer_public_key, &[false, true])
+ .await;
+
+ let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
+ let service_runtime = runtime.clone();
+ let listener_task = tokio::spawn(async move {
+ service_runtime
+ .run_until(async {
+ let _ = shutdown_rx.await;
+ })
+ .await
+ });
+
+ relay_a.wait_for_subscription_count(1).await?;
+ relay_b.wait_for_subscription_count(1).await?;
+
+ let request = build_request_event(
+ &client_identity,
+ signer_public_key,
+ connect_request_message("connect-all-1", signer_public_key, "shared-secret-all"),
+ base_created_at,
+ );
+ publish_event(relay_a.url(), &request).await?;
+
+ let response_events = relay_b
+ .wait_for_published_events_by_author(signer_public_key, 1)
+ .await?;
+ let response = decrypt_response(&client_identity, signer_public_key, &response_events[0]);
+ assert_eq!(response.id, "connect-all-1");
+ assert_eq!(
+ response.result,
+ Some(serde_json::Value::String("shared-secret-all".to_owned()))
+ );
+
+ wait_for_connect_secret_consumed(&runtime).await?;
+ let operation_audit = wait_for_operation_audit_count(&runtime, 1).await?;
+ assert_eq!(
+ operation_audit[0].operation,
+ MycOperationAuditKind::ListenerResponsePublish
+ );
+ assert_eq!(
+ operation_audit[0].outcome,
+ MycOperationAuditOutcome::Succeeded
+ );
+ assert_eq!(operation_audit[0].relay_count, 2);
+ assert_eq!(operation_audit[0].acknowledged_relay_count, 2);
+ assert_eq!(
+ operation_audit[0].delivery_policy,
+ Some(MycTransportDeliveryPolicy::All)
+ );
+ assert_eq!(
+ operation_audit[0].required_acknowledged_relay_count,
+ Some(2)
+ );
+ assert_eq!(operation_audit[0].publish_attempt_count, Some(2));
+ assert!(
+ operation_audit[0]
+ .relay_outcome_summary
+ .contains("attempt 1: 1/2 relays acknowledged publish")
+ );
+
+ let _ = shutdown_tx.send(());
+ listener_task.await??;
+ Ok(())
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn auth_replay_restores_pending_request_until_publish_succeeds() -> TestResult<()> {
let relay = TestRelay::spawn().await?;
let test_runtime = MycTestRuntime::new(relay.url(), MycConnectionApproval::NotRequired);