commit 139e6478338454db5a4d6565c723cbf303014fc6
parent 38d9edd7350f48dace0ce2f4f3987c15f59b43fa
Author: triesap <tyson@radroots.org>
Date: Thu, 26 Mar 2026 22:23:36 +0000
operability: add outbox-aware status and metrics
Diffstat:
8 files changed, 672 insertions(+), 18 deletions(-)
diff --git a/src/app/runtime.rs b/src/app/runtime.rs
@@ -291,7 +291,16 @@ impl MycRuntime {
.delivery_outbox_store
.list_by_status(MycDeliveryOutboxStatus::PublishedPendingFinalize)?;
if queued_records.is_empty() && published_records.is_empty() {
- self.ensure_no_orphaned_publish_workflows()?;
+ if let Err(error) = self.ensure_no_orphaned_publish_workflows() {
+ self.record_delivery_recovery_summary(
+ MycOperationAuditOutcome::Rejected,
+ 0,
+ 0,
+ 0,
+ error.to_string(),
+ );
+ return Err(error);
+ }
return Ok(());
}
@@ -307,12 +316,49 @@ impl MycRuntime {
"starting myc delivery recovery"
);
+ let unfinished_delivery_job_count = queued_records.len();
+ let mut finalized_job_count = 0usize;
+ let mut republished_job_count = 0usize;
let manager = self.signer_manager()?;
for record in queued_records {
- self.recover_delivery_outbox_record(&manager, record)
- .await?;
+ match self.recover_delivery_outbox_record(&manager, record).await {
+ Ok(republished) => {
+ finalized_job_count += 1;
+ if republished {
+ republished_job_count += 1;
+ }
+ }
+ Err(error) => {
+ self.record_delivery_recovery_summary(
+ MycOperationAuditOutcome::Rejected,
+ unfinished_delivery_job_count,
+ finalized_job_count,
+ republished_job_count,
+ error.to_string(),
+ );
+ return Err(error);
+ }
+ }
+ }
+ if let Err(error) = self.ensure_no_orphaned_publish_workflows() {
+ self.record_delivery_recovery_summary(
+ MycOperationAuditOutcome::Rejected,
+ unfinished_delivery_job_count,
+ finalized_job_count,
+ republished_job_count,
+ error.to_string(),
+ );
+ return Err(error);
}
- self.ensure_no_orphaned_publish_workflows()?;
+ self.record_delivery_recovery_summary(
+ MycOperationAuditOutcome::Succeeded,
+ unfinished_delivery_job_count,
+ finalized_job_count,
+ republished_job_count,
+ format!(
+ "recovered {finalized_job_count}/{unfinished_delivery_job_count} delivery outbox job(s); republished {republished_job_count}"
+ ),
+ );
tracing::info!("completed myc delivery recovery");
Ok(())
@@ -343,7 +389,7 @@ impl MycRuntime {
&self,
manager: &RadrootsNostrSignerManager,
record: MycDeliveryOutboxRecord,
- ) -> Result<(), MycError> {
+ ) -> Result<bool, MycError> {
self.validate_outbox_workflow_expectations(&record)?;
let workflow = self.lookup_publish_workflow_for_record(manager, &record)?;
tracing::info!(
@@ -379,12 +425,13 @@ impl MycRuntime {
let published = self
.delivery_outbox_store
.mark_published_pending_finalize(&record.job_id, publish_attempt_count)?;
- return self.finalize_recovered_delivery_job(
+ self.finalize_recovered_delivery_job(
manager,
published,
workflow.as_ref(),
None,
- );
+ )?;
+ return Ok(false);
}
let publish_outcome = self
@@ -428,12 +475,14 @@ impl MycRuntime {
published,
published_workflow.as_ref(),
Some(&publish_outcome),
- )
+ )?;
+ Ok(true)
}
MycDeliveryOutboxStatus::PublishedPendingFinalize => {
- self.finalize_recovered_delivery_job(manager, record, workflow.as_ref(), None)
+ self.finalize_recovered_delivery_job(manager, record, workflow.as_ref(), None)?;
+ Ok(false)
}
- MycDeliveryOutboxStatus::Finalized | MycDeliveryOutboxStatus::Failed => Ok(()),
+ MycDeliveryOutboxStatus::Finalized | MycDeliveryOutboxStatus::Failed => Ok(false),
}
}
@@ -737,6 +786,35 @@ impl MycRuntime {
self.record_operation_audit(&audit_record);
}
+ fn record_delivery_recovery_summary(
+ &self,
+ outcome: MycOperationAuditOutcome,
+ unfinished_job_count: usize,
+ finalized_job_count: usize,
+ republished_job_count: usize,
+ summary: impl Into<String>,
+ ) {
+ let summary = summary.into();
+ let record = MycOperationAuditRecord::new(
+ MycOperationAuditKind::DeliveryRecovery,
+ outcome,
+ None,
+ None,
+ unfinished_job_count,
+ finalized_job_count,
+ summary.clone(),
+ );
+ tracing::info!(
+ outcome = ?outcome,
+ unfinished_job_count,
+ finalized_job_count,
+ republished_job_count,
+ summary = %summary,
+ "recorded myc delivery recovery summary"
+ );
+ self.record_operation_audit(&record);
+ }
+
fn required_acknowledged_relay_count(&self, relay_count: usize) -> Result<usize, MycError> {
match self.config.transport.delivery_policy {
MycTransportDeliveryPolicy::Any => Ok(1),
@@ -1529,7 +1607,7 @@ mod tests {
assert_eq!(outbox_records[0].status, MycDeliveryOutboxStatus::Finalized);
assert!(outbox_records[0].finalized_at_unix.is_some());
let audit_records = runtime.operation_audit_store().list().expect("list audit");
- assert_eq!(audit_records.len(), 1);
+ assert_eq!(audit_records.len(), 2);
assert_eq!(
audit_records[0].operation,
MycOperationAuditKind::ListenerResponsePublish
@@ -1542,6 +1620,14 @@ mod tests {
audit_records[0].request_id.as_deref(),
Some("recovery-request")
);
+ assert_eq!(
+ audit_records[1].operation,
+ MycOperationAuditKind::DeliveryRecovery
+ );
+ assert_eq!(
+ audit_records[1].outcome,
+ MycOperationAuditOutcome::Succeeded
+ );
}
#[tokio::test]
diff --git a/src/audit.rs b/src/audit.rs
@@ -23,6 +23,7 @@ const MYC_OPERATION_AUDIT_LATEST_SUFFIX: &str = ".attempt";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum MycOperationAuditKind {
+ DeliveryRecovery,
ListenerResponsePublish,
ConnectAcceptPublish,
AuthReplayPublish,
@@ -770,6 +771,7 @@ fn parse_archive_index(file_name: &str) -> Option<usize> {
fn operation_index_label(kind: MycOperationAuditKind) -> &'static str {
match kind {
+ MycOperationAuditKind::DeliveryRecovery => "delivery_recovery",
MycOperationAuditKind::ListenerResponsePublish => "listener_response_publish",
MycOperationAuditKind::ConnectAcceptPublish => "connect_accept_publish",
MycOperationAuditKind::AuthReplayPublish => "auth_replay_publish",
diff --git a/src/audit_sqlite.rs b/src/audit_sqlite.rs
@@ -471,6 +471,7 @@ fn parse_optional_usize(value: i64) -> Result<usize, MycError> {
fn operation_kind_label(value: MycOperationAuditKind) -> &'static str {
match value {
+ MycOperationAuditKind::DeliveryRecovery => "delivery_recovery",
MycOperationAuditKind::ListenerResponsePublish => "listener_response_publish",
MycOperationAuditKind::ConnectAcceptPublish => "connect_accept_publish",
MycOperationAuditKind::AuthReplayPublish => "auth_replay_publish",
@@ -485,6 +486,7 @@ fn operation_kind_label(value: MycOperationAuditKind) -> &'static str {
fn parse_operation_kind(value: &str) -> Result<MycOperationAuditKind, MycError> {
match value {
+ "delivery_recovery" => Ok(MycOperationAuditKind::DeliveryRecovery),
"listener_response_publish" => Ok(MycOperationAuditKind::ListenerResponsePublish),
"connect_accept_publish" => Ok(MycOperationAuditKind::ConnectAcceptPublish),
"auth_replay_publish" => Ok(MycOperationAuditKind::AuthReplayPublish),
diff --git a/src/lib.rs b/src/lib.rs
@@ -45,7 +45,8 @@ pub use discovery::{
};
pub use error::MycError;
pub use operability::{
- MycAuditDecisionCounts, MycCustodyStatusOutput, MycDiscoveryStatusOutput, MycMetricsSnapshot,
+ MycAuditDecisionCounts, MycCustodyStatusOutput, MycDeliveryOutboxStatusOutput,
+ MycDeliveryRecoveryStatusOutput, MycDiscoveryStatusOutput, MycMetricsSnapshot,
MycOperationOutcomeCounts, MycPersistenceStatusOutput, MycRelayProbe,
MycRelayProbeAvailability, MycRuntimeAuditPersistenceStatusOutput, MycRuntimeStatus,
MycSignerStatePersistenceStatusOutput, MycSqliteSchemaStatusOutput, MycStatusFullOutput,
diff --git a/src/operability/mod.rs b/src/operability/mod.rs
@@ -8,7 +8,10 @@ use radroots_identity::RadrootsIdentity;
use radroots_nostr::prelude::{
RadrootsNostrClient, RadrootsNostrRelayStatus, RadrootsNostrRelayUrl,
};
-use radroots_nostr_signer::prelude::RadrootsNostrSignerRequestDecision;
+use radroots_nostr_signer::prelude::{
+ RadrootsNostrSignerPublishWorkflowRecord, RadrootsNostrSignerPublishWorkflowState,
+ RadrootsNostrSignerRequestDecision,
+};
use radroots_sql_core::{SqlExecutor, SqliteExecutor};
use serde::{Deserialize, Serialize};
use tokio::task::JoinSet;
@@ -19,6 +22,7 @@ use crate::config::{MycRuntimeAuditBackend, MycSignerStateBackend, MycTransportD
use crate::custody::MycIdentityStatusOutput;
use crate::discovery::MycDiscoveryContext;
use crate::error::MycError;
+use crate::outbox::{MycDeliveryOutboxRecord, MycDeliveryOutboxStatus, now_unix_secs};
use crate::transport::MycTransportSnapshot;
const MYC_RELAY_PROBE_CONCURRENCY_LIMIT: usize = 4;
@@ -101,6 +105,37 @@ pub struct MycPersistenceStatusOutput {
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
+pub struct MycDeliveryRecoveryStatusOutput {
+ pub recorded_at_unix: u64,
+ pub outcome: MycOperationAuditOutcome,
+ pub summary: String,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
+pub struct MycDeliveryOutboxStatusOutput {
+ pub status: MycRuntimeStatus,
+ pub ready: bool,
+ pub path: PathBuf,
+ pub exists: bool,
+ pub total_job_count: usize,
+ pub queued_job_count: usize,
+ pub published_pending_finalize_job_count: usize,
+ pub finalized_job_count: usize,
+ pub failed_job_count: usize,
+ pub unfinished_job_count: usize,
+ pub critical_unfinished_job_count: usize,
+ pub blocked_job_count: usize,
+ pub critical_blocked_job_count: usize,
+ pub stuck_after_secs: u64,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub oldest_unfinished_age_secs: Option<u64>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub oldest_blocked_age_secs: Option<u64>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub last_recovery: Option<MycDeliveryRecoveryStatusOutput>,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct MycSignerStatePersistenceStatusOutput {
pub backend: MycSignerStateBackend,
pub path: PathBuf,
@@ -141,6 +176,7 @@ pub struct MycStatusFullOutput {
pub startup: crate::app::MycStartupSnapshot,
pub custody: MycCustodyStatusOutput,
pub persistence: MycPersistenceStatusOutput,
+ pub delivery_outbox: MycDeliveryOutboxStatusOutput,
pub transport: MycTransportStatusOutput,
pub discovery: MycDiscoveryStatusOutput,
}
@@ -153,6 +189,7 @@ pub struct MycStatusSummaryOutput {
pub instance_name: String,
pub custody: MycCustodyStatusOutput,
pub persistence: MycPersistenceStatusOutput,
+ pub delivery_outbox: MycDeliveryOutboxStatusOutput,
pub transport: MycTransportStatusOutput,
pub discovery: MycDiscoveryStatusOutput,
}
@@ -189,6 +226,17 @@ pub struct MycMetricsSnapshot {
pub runtime_repair_rejection_count: usize,
pub runtime_unavailable_count: usize,
pub runtime_replay_restore_count: usize,
+ pub delivery_recovery_success_count: usize,
+ pub delivery_recovery_rejection_count: usize,
+ pub delivery_outbox_total: usize,
+ pub delivery_outbox_queued_count: usize,
+ pub delivery_outbox_published_pending_finalize_count: usize,
+ pub delivery_outbox_failed_count: usize,
+ pub delivery_outbox_finalized_count: usize,
+ pub delivery_outbox_unfinished_count: usize,
+ pub delivery_outbox_critical_unfinished_count: usize,
+ pub delivery_outbox_blocked_count: usize,
+ pub delivery_outbox_critical_blocked_count: usize,
}
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -209,6 +257,12 @@ struct MycPersistenceStatusEvaluation {
status: Option<MycRuntimeStatus>,
}
+#[derive(Debug, Clone, PartialEq, Eq)]
+struct MycDeliveryOutboxStatusEvaluation {
+ output: MycDeliveryOutboxStatusOutput,
+ reasons: Vec<String>,
+}
+
#[derive(Debug, Deserialize)]
struct MycSqliteAppliedCountRow {
applied_count: u64,
@@ -233,6 +287,7 @@ pub async fn collect_status_full(runtime: &MycRuntime) -> Result<MycStatusFullOu
let snapshot = runtime.snapshot();
let custody = collect_custody_status(runtime)?;
let persistence = collect_persistence_status(runtime);
+ let delivery_outbox = collect_delivery_outbox_status(runtime)?;
let transport = collect_transport_status(runtime).await?;
let discovery = collect_discovery_status(runtime).await?;
let mut status = combine_runtime_status(
@@ -245,6 +300,8 @@ pub async fn collect_status_full(runtime: &MycRuntime) -> Result<MycStatusFullOu
);
let mut reasons = transport.reasons;
reasons.extend(discovery.reasons);
+ status = worse_runtime_status(status, delivery_outbox.output.status);
+ reasons.extend(delivery_outbox.reasons.clone());
if let Some(persistence_status) = persistence.status {
status = worse_runtime_status(status, persistence_status);
}
@@ -259,7 +316,7 @@ pub async fn collect_status_full(runtime: &MycRuntime) -> Result<MycStatusFullOu
status = MycRuntimeStatus::Degraded;
reasons.push("discovery app identity could not be resolved".to_owned());
}
- let ready = transport.output.ready;
+ let ready = transport.output.ready && delivery_outbox.output.ready;
Ok(MycStatusFullOutput {
status,
ready,
@@ -267,6 +324,7 @@ pub async fn collect_status_full(runtime: &MycRuntime) -> Result<MycStatusFullOu
startup: snapshot,
custody: custody.output,
persistence: persistence.output,
+ delivery_outbox: delivery_outbox.output,
transport: transport.output,
discovery: discovery.output,
})
@@ -283,6 +341,7 @@ pub async fn collect_status_summary(
instance_name: full.startup.instance_name,
custody: full.custody,
persistence: full.persistence,
+ delivery_outbox: full.delivery_outbox,
transport: MycTransportStatusOutput {
relay_probes: Vec::new(),
..full.transport
@@ -405,6 +464,7 @@ pub fn collect_metrics(runtime: &MycRuntime) -> Result<MycMetricsSnapshot, MycEr
let manager = runtime.signer_manager()?;
let signer_request_audit = manager.list_audit_records()?;
let runtime_operation_audit = runtime.operation_audit_store().list_all()?;
+ let outbox_status = collect_delivery_outbox_status(runtime)?;
let mut signer_request_decisions = MycAuditDecisionCounts::default();
for record in &signer_request_audit {
@@ -424,6 +484,8 @@ pub fn collect_metrics(runtime: &MycRuntime) -> Result<MycMetricsSnapshot, MycEr
let mut runtime_repair_rejection_count = 0;
let mut runtime_unavailable_count = 0;
let mut runtime_replay_restore_count = 0;
+ let mut delivery_recovery_success_count = 0;
+ let mut delivery_recovery_rejection_count = 0;
for record in &runtime_operation_audit {
increment_outcome_counts(&mut runtime_operation_outcomes, record.outcome);
increment_outcome_counts(
@@ -452,6 +514,13 @@ pub fn collect_metrics(runtime: &MycRuntime) -> Result<MycMetricsSnapshot, MycEr
{
runtime_replay_restore_count += 1;
}
+ if record.operation == MycOperationAuditKind::DeliveryRecovery {
+ match record.outcome {
+ MycOperationAuditOutcome::Succeeded => delivery_recovery_success_count += 1,
+ MycOperationAuditOutcome::Rejected => delivery_recovery_rejection_count += 1,
+ _ => {}
+ }
+ }
}
Ok(MycMetricsSnapshot {
@@ -465,6 +534,21 @@ pub fn collect_metrics(runtime: &MycRuntime) -> Result<MycMetricsSnapshot, MycEr
runtime_repair_rejection_count,
runtime_unavailable_count,
runtime_replay_restore_count,
+ delivery_recovery_success_count,
+ delivery_recovery_rejection_count,
+ delivery_outbox_total: outbox_status.output.total_job_count,
+ delivery_outbox_queued_count: outbox_status.output.queued_job_count,
+ delivery_outbox_published_pending_finalize_count: outbox_status
+ .output
+ .published_pending_finalize_job_count,
+ delivery_outbox_failed_count: outbox_status.output.failed_job_count,
+ delivery_outbox_finalized_count: outbox_status.output.finalized_job_count,
+ delivery_outbox_unfinished_count: outbox_status.output.unfinished_job_count,
+ delivery_outbox_critical_unfinished_count: outbox_status
+ .output
+ .critical_unfinished_job_count,
+ delivery_outbox_blocked_count: outbox_status.output.blocked_job_count,
+ delivery_outbox_critical_blocked_count: outbox_status.output.critical_blocked_job_count,
})
}
@@ -541,6 +625,61 @@ pub fn render_metrics_text(snapshot: &MycMetricsSnapshot) -> String {
"myc_runtime_replay_restore_total",
snapshot.runtime_replay_restore_count,
);
+ push_counter(
+ &mut lines,
+ "myc_delivery_recovery_success_total",
+ snapshot.delivery_recovery_success_count,
+ );
+ push_counter(
+ &mut lines,
+ "myc_delivery_recovery_rejection_total",
+ snapshot.delivery_recovery_rejection_count,
+ );
+ push_counter(
+ &mut lines,
+ "myc_delivery_outbox_total",
+ snapshot.delivery_outbox_total,
+ );
+ push_counter(
+ &mut lines,
+ "myc_delivery_outbox_queued_total",
+ snapshot.delivery_outbox_queued_count,
+ );
+ push_counter(
+ &mut lines,
+ "myc_delivery_outbox_published_pending_finalize_total",
+ snapshot.delivery_outbox_published_pending_finalize_count,
+ );
+ push_counter(
+ &mut lines,
+ "myc_delivery_outbox_failed_total",
+ snapshot.delivery_outbox_failed_count,
+ );
+ push_counter(
+ &mut lines,
+ "myc_delivery_outbox_finalized_total",
+ snapshot.delivery_outbox_finalized_count,
+ );
+ push_counter(
+ &mut lines,
+ "myc_delivery_outbox_unfinished_total",
+ snapshot.delivery_outbox_unfinished_count,
+ );
+ push_counter(
+ &mut lines,
+ "myc_delivery_outbox_critical_unfinished_total",
+ snapshot.delivery_outbox_critical_unfinished_count,
+ );
+ push_counter(
+ &mut lines,
+ "myc_delivery_outbox_blocked_total",
+ snapshot.delivery_outbox_blocked_count,
+ );
+ push_counter(
+ &mut lines,
+ "myc_delivery_outbox_critical_blocked_total",
+ snapshot.delivery_outbox_critical_blocked_count,
+ );
lines.join("\n")
}
@@ -564,6 +703,7 @@ pub fn increment_outcome_counts(
pub fn operation_kind_label(kind: MycOperationAuditKind) -> String {
match kind {
+ MycOperationAuditKind::DeliveryRecovery => "delivery_recovery".to_owned(),
MycOperationAuditKind::ListenerResponsePublish => "listener_response_publish".to_owned(),
MycOperationAuditKind::ConnectAcceptPublish => "connect_accept_publish".to_owned(),
MycOperationAuditKind::AuthReplayPublish => "auth_replay_publish".to_owned(),
@@ -776,6 +916,229 @@ fn summarize_discovery_relay_group(
}
}
+fn collect_delivery_outbox_status(
+ runtime: &MycRuntime,
+) -> Result<MycDeliveryOutboxStatusEvaluation, MycError> {
+ let outbox_records = runtime.delivery_outbox_store().list_all()?;
+ let workflow_by_id = runtime
+ .signer_manager()?
+ .list_publish_workflows()?
+ .into_iter()
+ .map(|workflow| (workflow.workflow_id.to_string(), workflow))
+ .collect::<BTreeMap<_, _>>();
+ let now_unix = now_unix_secs();
+ let stuck_after_secs = delivery_outbox_stuck_after_secs(runtime);
+ let path = runtime.paths().delivery_outbox_path.clone();
+ let exists = path.exists();
+ let mut queued_job_count = 0usize;
+ let mut published_pending_finalize_job_count = 0usize;
+ let mut finalized_job_count = 0usize;
+ let mut failed_job_count = 0usize;
+ let mut unfinished_job_count = 0usize;
+ let mut critical_unfinished_job_count = 0usize;
+ let mut blocked_job_count = 0usize;
+ let mut critical_blocked_job_count = 0usize;
+ let mut oldest_unfinished_age_secs = None;
+ let mut oldest_blocked_age_secs = None;
+
+ for record in &outbox_records {
+ match record.status {
+ MycDeliveryOutboxStatus::Queued => queued_job_count += 1,
+ MycDeliveryOutboxStatus::PublishedPendingFinalize => {
+ published_pending_finalize_job_count += 1;
+ }
+ MycDeliveryOutboxStatus::Finalized => finalized_job_count += 1,
+ MycDeliveryOutboxStatus::Failed => failed_job_count += 1,
+ }
+
+ if !is_delivery_outbox_unfinished(record) {
+ continue;
+ }
+
+ unfinished_job_count += 1;
+ if is_critical_delivery_outbox_job(record) {
+ critical_unfinished_job_count += 1;
+ }
+ let age_secs = delivery_outbox_record_age_secs(record, now_unix);
+ oldest_unfinished_age_secs =
+ Some(oldest_unfinished_age_secs.map_or(age_secs, |current: u64| current.max(age_secs)));
+
+ if let Some(is_critical) = classify_blocked_delivery_outbox_record(
+ record,
+ &workflow_by_id,
+ age_secs,
+ stuck_after_secs,
+ ) {
+ blocked_job_count += 1;
+ if is_critical {
+ critical_blocked_job_count += 1;
+ }
+ oldest_blocked_age_secs = Some(
+ oldest_blocked_age_secs.map_or(age_secs, |current: u64| current.max(age_secs)),
+ );
+ }
+ }
+
+ let last_recovery = latest_delivery_recovery_status(runtime)?;
+ let mut reasons = Vec::new();
+ if !exists {
+ reasons.push(format!(
+ "delivery outbox persistence file at {} is missing",
+ path.display()
+ ));
+ }
+ if critical_blocked_job_count > 0 {
+ reasons.push(format!(
+ "{critical_blocked_job_count} critical delivery outbox job(s) are blocked"
+ ));
+ }
+ let noncritical_blocked_job_count =
+ blocked_job_count.saturating_sub(critical_blocked_job_count);
+ if noncritical_blocked_job_count > 0 {
+ reasons.push(format!(
+ "{noncritical_blocked_job_count} non-critical delivery outbox job(s) are blocked"
+ ));
+ }
+
+ let (status, ready) = if !exists || critical_blocked_job_count > 0 {
+ (MycRuntimeStatus::Unready, false)
+ } else if blocked_job_count > 0 {
+ (MycRuntimeStatus::Degraded, true)
+ } else {
+ (MycRuntimeStatus::Healthy, true)
+ };
+
+ Ok(MycDeliveryOutboxStatusEvaluation {
+ output: MycDeliveryOutboxStatusOutput {
+ status,
+ ready,
+ path,
+ exists,
+ total_job_count: outbox_records.len(),
+ queued_job_count,
+ published_pending_finalize_job_count,
+ finalized_job_count,
+ failed_job_count,
+ unfinished_job_count,
+ critical_unfinished_job_count,
+ blocked_job_count,
+ critical_blocked_job_count,
+ stuck_after_secs,
+ oldest_unfinished_age_secs,
+ oldest_blocked_age_secs,
+ last_recovery,
+ },
+ reasons,
+ })
+}
+
+fn latest_delivery_recovery_status(
+ runtime: &MycRuntime,
+) -> Result<Option<MycDeliveryRecoveryStatusOutput>, MycError> {
+ let latest = runtime
+ .operation_audit_store()
+ .list_all()?
+ .into_iter()
+ .filter(|record| record.operation == MycOperationAuditKind::DeliveryRecovery)
+ .max_by_key(|record| record.recorded_at_unix);
+ Ok(latest.map(|record| MycDeliveryRecoveryStatusOutput {
+ recorded_at_unix: record.recorded_at_unix,
+ outcome: record.outcome,
+ summary: record.relay_outcome_summary,
+ }))
+}
+
+fn delivery_outbox_stuck_after_secs(runtime: &MycRuntime) -> u64 {
+ let transport = &runtime.config().transport;
+ let mut total_millis = transport
+ .connect_timeout_secs
+ .saturating_mul(1000)
+ .saturating_mul(transport.publish_max_attempts as u64);
+ for completed_attempt in 1..transport.publish_max_attempts {
+ total_millis =
+ total_millis.saturating_add(delivery_outbox_backoff_millis(runtime, completed_attempt));
+ }
+ total_millis.saturating_add(999) / 1000
+}
+
+fn delivery_outbox_backoff_millis(runtime: &MycRuntime, completed_attempt_number: usize) -> u64 {
+ let transport = &runtime.config().transport;
+ let exponent = completed_attempt_number.saturating_sub(1) as u32;
+ let multiplier = 1u64.checked_shl(exponent).unwrap_or(u64::MAX);
+ let scaled = transport
+ .publish_initial_backoff_millis
+ .saturating_mul(multiplier);
+ scaled.min(transport.publish_max_backoff_millis)
+}
+
+fn is_delivery_outbox_unfinished(record: &MycDeliveryOutboxRecord) -> bool {
+ matches!(
+ record.status,
+ MycDeliveryOutboxStatus::Queued | MycDeliveryOutboxStatus::PublishedPendingFinalize
+ )
+}
+
+fn is_critical_delivery_outbox_job(record: &MycDeliveryOutboxRecord) -> bool {
+ record.kind != crate::outbox::MycDeliveryOutboxKind::DiscoveryHandlerPublish
+}
+
+fn delivery_outbox_record_age_secs(record: &MycDeliveryOutboxRecord, now_unix: u64) -> u64 {
+ now_unix.saturating_sub(record.updated_at_unix)
+}
+
+fn classify_blocked_delivery_outbox_record(
+ record: &MycDeliveryOutboxRecord,
+ workflow_by_id: &BTreeMap<String, RadrootsNostrSignerPublishWorkflowRecord>,
+ age_secs: u64,
+ stuck_after_secs: u64,
+) -> Option<bool> {
+ if !is_delivery_outbox_unfinished(record) {
+ return None;
+ }
+
+ let is_critical = is_critical_delivery_outbox_job(record);
+ match record.kind {
+ crate::outbox::MycDeliveryOutboxKind::DiscoveryHandlerPublish => {
+ if record.signer_publish_workflow_id.is_some() {
+ return Some(false);
+ }
+ }
+ crate::outbox::MycDeliveryOutboxKind::ConnectAcceptPublish
+ | crate::outbox::MycDeliveryOutboxKind::AuthReplayPublish => {
+ if record.signer_publish_workflow_id.is_none() {
+ return Some(true);
+ }
+ }
+ crate::outbox::MycDeliveryOutboxKind::ListenerResponsePublish => {}
+ }
+
+ if let Some(workflow_id) = record.signer_publish_workflow_id.as_ref() {
+ let Some(workflow) = workflow_by_id.get(workflow_id.as_str()) else {
+ return Some(is_critical);
+ };
+ let expected_state = match record.status {
+ MycDeliveryOutboxStatus::Queued => {
+ RadrootsNostrSignerPublishWorkflowState::PendingPublish
+ }
+ MycDeliveryOutboxStatus::PublishedPendingFinalize => {
+ RadrootsNostrSignerPublishWorkflowState::PublishedPendingFinalize
+ }
+ MycDeliveryOutboxStatus::Finalized | MycDeliveryOutboxStatus::Failed => {
+ return None;
+ }
+ };
+ if workflow.state != expected_state {
+ return Some(is_critical);
+ }
+ }
+
+ if age_secs > stuck_after_secs {
+ return Some(is_critical);
+ }
+
+ None
+}
+
fn combine_runtime_status(
transport_status: MycRuntimeStatus,
discovery_status: Option<MycRuntimeStatus>,
@@ -1197,6 +1560,17 @@ mod tests {
runtime_repair_rejection_count: 0,
runtime_unavailable_count: 0,
runtime_replay_restore_count: 0,
+ delivery_recovery_success_count: 1,
+ delivery_recovery_rejection_count: 0,
+ delivery_outbox_total: 2,
+ delivery_outbox_queued_count: 1,
+ delivery_outbox_published_pending_finalize_count: 0,
+ delivery_outbox_failed_count: 1,
+ delivery_outbox_finalized_count: 0,
+ delivery_outbox_unfinished_count: 1,
+ delivery_outbox_critical_unfinished_count: 1,
+ delivery_outbox_blocked_count: 0,
+ delivery_outbox_critical_blocked_count: 0,
};
let rendered = render_metrics_text(&metrics);
@@ -1205,6 +1579,8 @@ mod tests {
assert!(rendered.contains(
r#"myc_runtime_operation_kind_total{kind="listener_response_publish",outcome="succeeded"} 1"#
));
+ assert!(rendered.contains("myc_delivery_recovery_success_total 1"));
+ assert!(rendered.contains("myc_delivery_outbox_total 2"));
}
#[test]
diff --git a/tests/nip46_e2e.rs b/tests/nip46_e2e.rs
@@ -1705,7 +1705,7 @@ async fn startup_recovery_republishes_queued_listener_connect_secret_job() -> Te
assert!(outbox_records[0].published_at_unix.is_some());
assert!(outbox_records[0].finalized_at_unix.is_some());
let audit_records = restarted_runtime.operation_audit_store().list_all()?;
- assert_eq!(audit_records.len(), 1);
+ assert_eq!(audit_records.len(), 2);
assert_eq!(
audit_records[0].operation,
MycOperationAuditKind::ListenerResponsePublish
@@ -1718,6 +1718,14 @@ async fn startup_recovery_republishes_queued_listener_connect_secret_job() -> Te
audit_records[0].request_id.as_deref(),
Some("startup-recovery-connect")
);
+ assert_eq!(
+ audit_records[1].operation,
+ MycOperationAuditKind::DeliveryRecovery
+ );
+ assert_eq!(
+ audit_records[1].outcome,
+ MycOperationAuditOutcome::Succeeded
+ );
Ok(())
}
diff --git a/tests/operability_cli.rs b/tests/operability_cli.rs
@@ -1,8 +1,12 @@
use std::path::Path;
use std::process::Command;
-use myc::{MycOperationAuditKind, MycOperationAuditOutcome, MycOperationAuditRecord, MycRuntime};
+use myc::{
+ MycDeliveryOutboxKind, MycDeliveryOutboxRecord, MycOperationAuditKind,
+ MycOperationAuditOutcome, MycOperationAuditRecord, MycRuntime,
+};
use radroots_identity::RadrootsIdentity;
+use radroots_nostr::prelude::{RadrootsNostrEventBuilder, RadrootsNostrKind};
use serde_json::Value;
fn write_test_identity(path: &Path, secret_key: &str) {
@@ -49,6 +53,12 @@ MYC_TRANSPORT_CONNECT_TIMEOUT_SECS=1\n",
env_path
}
+fn signed_event(identity: &RadrootsIdentity) -> nostr::Event {
+ RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(24133), "operability")
+ .sign_with_keys(identity.keys())
+ .expect("sign event")
+}
+
#[test]
fn status_summary_command_emits_machine_readable_json() {
let temp = tempfile::tempdir().expect("tempdir");
@@ -74,6 +84,9 @@ fn status_summary_command_emits_machine_readable_json() {
value["persistence"]["runtime_audit"]["backend"],
"jsonl_file"
);
+ assert_eq!(value["delivery_outbox"]["status"], "healthy");
+ assert_eq!(value["delivery_outbox"]["ready"], true);
+ assert_eq!(value["delivery_outbox"]["total_job_count"], 0);
assert_eq!(value["transport"]["enabled"], false);
}
@@ -92,6 +105,25 @@ fn metrics_command_emits_json_and_prometheus_formats() {
0,
"restored pending request after failed replay publish",
));
+ runtime.record_operation_audit(&MycOperationAuditRecord::new(
+ MycOperationAuditKind::DeliveryRecovery,
+ MycOperationAuditOutcome::Succeeded,
+ None,
+ None,
+ 1,
+ 1,
+ "recovered 1/1 delivery outbox job(s); republished 1",
+ ));
+ let outbox_record = MycDeliveryOutboxRecord::new(
+ MycDeliveryOutboxKind::DiscoveryHandlerPublish,
+ signed_event(runtime.signer_identity()),
+ vec!["wss://relay.example.com".parse().expect("relay url")],
+ )
+ .expect("outbox record");
+ runtime
+ .delivery_outbox_store()
+ .enqueue(&outbox_record)
+ .expect("enqueue outbox record");
let json_output = Command::new(env!("CARGO_BIN_EXE_myc"))
.arg("--env-file")
@@ -104,6 +136,9 @@ fn metrics_command_emits_json_and_prometheus_formats() {
assert!(json_output.status.success());
let json_value: Value = serde_json::from_slice(&json_output.stdout).expect("metrics json");
assert_eq!(json_value["runtime_replay_restore_count"], 1);
+ assert_eq!(json_value["delivery_recovery_success_count"], 1);
+ assert_eq!(json_value["delivery_outbox_total"], 1);
+ assert_eq!(json_value["delivery_outbox_queued_count"], 1);
let prometheus_output = Command::new(env!("CARGO_BIN_EXE_myc"))
.arg("--env-file")
@@ -116,5 +151,7 @@ fn metrics_command_emits_json_and_prometheus_formats() {
assert!(prometheus_output.status.success());
let rendered = String::from_utf8(prometheus_output.stdout).expect("utf8 metrics");
assert!(rendered.contains("myc_runtime_replay_restore_total 1"));
+ assert!(rendered.contains("myc_delivery_recovery_success_total 1"));
+ assert!(rendered.contains("myc_delivery_outbox_total 1"));
assert!(rendered.contains("myc_signer_request_total 0"));
}
diff --git a/tests/operability_e2e.rs b/tests/operability_e2e.rs
@@ -1,11 +1,19 @@
use std::path::{Path, PathBuf};
use std::time::Duration;
+use std::time::{SystemTime, UNIX_EPOCH};
use myc::{
- MycConfig, MycRuntime, MycRuntimeAuditBackend, MycRuntimeStatus, MycSignerStateBackend,
- MycTransportDeliveryPolicy, collect_status_full,
+ MycConfig, MycDeliveryOutboxKind, MycDeliveryOutboxRecord, MycOperationAuditKind,
+ MycOperationAuditOutcome, MycOperationAuditRecord, MycRuntime, MycRuntimeAuditBackend,
+ MycRuntimeStatus, MycSignerStateBackend, MycTransportDeliveryPolicy, collect_status_full,
};
use radroots_identity::RadrootsIdentity;
+use radroots_nostr::prelude::{
+ RadrootsNostrEventBuilder, RadrootsNostrKind, RadrootsNostrRelayUrl,
+};
+use radroots_nostr_signer::prelude::{
+ RadrootsNostrSignerApprovalRequirement, RadrootsNostrSignerConnectionDraft,
+};
use tokio::net::TcpListener;
use tokio::sync::oneshot;
use tokio::time::sleep;
@@ -114,6 +122,19 @@ fn write_test_identity(path: &Path, secret_key: &str) {
.expect("write identity");
}
+fn signed_delivery_event(identity: &RadrootsIdentity, content: &str) -> nostr::Event {
+ RadrootsNostrEventBuilder::new(RadrootsNostrKind::Custom(24133), content)
+ .sign_with_keys(identity.keys())
+ .expect("sign event")
+}
+
+fn now_unix_secs() -> u64 {
+ SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("system time")
+ .as_secs()
+}
+
fn build_runtime<F>(configure: F) -> MycRuntime
where
F: FnOnce(&mut MycConfig),
@@ -195,6 +216,127 @@ async fn status_is_unready_when_all_policy_cannot_be_satisfied() -> TestResult<(
}
#[tokio::test]
+async fn status_is_unready_when_critical_delivery_job_is_blocked() -> TestResult<()> {
+ let relay = TestRelay::spawn().await?;
+ let relay_url: RadrootsNostrRelayUrl = relay.url().parse()?;
+ let runtime = build_runtime(|config| {
+ config.transport.enabled = true;
+ config.transport.relays = vec![relay.url().to_owned()];
+ });
+ let client_identity = RadrootsIdentity::from_secret_key_str(
+ "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
+ )?;
+ let manager = runtime.signer_manager()?;
+ let connection = manager.register_connection(
+ RadrootsNostrSignerConnectionDraft::new(
+ client_identity.public_key(),
+ runtime.user_public_identity(),
+ )
+ .with_connect_secret("blocked-secret")
+ .with_relays(vec![relay_url.clone()])
+ .with_approval_requirement(RadrootsNostrSignerApprovalRequirement::NotRequired),
+ )?;
+ let workflow = manager.begin_connect_secret_publish_finalization(&connection.connection_id)?;
+ let outbox_record = MycDeliveryOutboxRecord::new(
+ MycDeliveryOutboxKind::ListenerResponsePublish,
+ signed_delivery_event(runtime.signer_identity(), "blocked-listener"),
+ vec![relay_url],
+ )?
+ .with_connection_id(&connection.connection_id)
+ .with_request_id("blocked-request")
+ .with_signer_publish_workflow_id(&workflow.workflow_id);
+ runtime.delivery_outbox_store().enqueue(&outbox_record)?;
+ manager.cancel_publish_workflow(&workflow.workflow_id)?;
+
+ let status = collect_status_full(&runtime).await?;
+
+ assert_eq!(status.transport.status, MycRuntimeStatus::Healthy);
+ assert_eq!(status.status, MycRuntimeStatus::Unready);
+ assert!(!status.ready);
+ assert_eq!(status.delivery_outbox.status, MycRuntimeStatus::Unready);
+ assert!(!status.delivery_outbox.ready);
+ assert_eq!(status.delivery_outbox.unfinished_job_count, 1);
+ assert_eq!(status.delivery_outbox.critical_unfinished_job_count, 1);
+ assert_eq!(status.delivery_outbox.blocked_job_count, 1);
+ assert_eq!(status.delivery_outbox.critical_blocked_job_count, 1);
+ assert!(
+ status
+ .reasons
+ .iter()
+ .any(|reason| reason == "1 critical delivery outbox job(s) are blocked")
+ );
+ Ok(())
+}
+
+#[tokio::test]
+async fn status_is_degraded_but_ready_when_only_discovery_job_is_stuck() -> TestResult<()> {
+ let relay = TestRelay::spawn().await?;
+ let relay_url: RadrootsNostrRelayUrl = relay.url().parse()?;
+ let runtime = build_runtime(|config| {
+ config.transport.enabled = true;
+ config.transport.relays = vec![relay.url().to_owned()];
+ config.transport.connect_timeout_secs = 1;
+ });
+ let mut outbox_record = MycDeliveryOutboxRecord::new(
+ MycDeliveryOutboxKind::DiscoveryHandlerPublish,
+ signed_delivery_event(runtime.signer_identity(), "stuck-discovery"),
+ vec![relay_url],
+ )?
+ .with_attempt_id("discovery-attempt-1");
+ let old_timestamp = now_unix_secs().saturating_sub(30);
+ outbox_record.created_at_unix = old_timestamp;
+ outbox_record.updated_at_unix = old_timestamp;
+ runtime.delivery_outbox_store().enqueue(&outbox_record)?;
+
+ let status = collect_status_full(&runtime).await?;
+
+ assert_eq!(status.transport.status, MycRuntimeStatus::Healthy);
+ assert_eq!(status.status, MycRuntimeStatus::Degraded);
+ assert!(status.ready);
+ assert_eq!(status.delivery_outbox.status, MycRuntimeStatus::Degraded);
+ assert!(status.delivery_outbox.ready);
+ assert_eq!(status.delivery_outbox.unfinished_job_count, 1);
+ assert_eq!(status.delivery_outbox.critical_unfinished_job_count, 0);
+ assert_eq!(status.delivery_outbox.blocked_job_count, 1);
+ assert_eq!(status.delivery_outbox.critical_blocked_job_count, 0);
+ assert_eq!(status.delivery_outbox.oldest_blocked_age_secs, Some(30));
+ assert!(
+ status
+ .reasons
+ .iter()
+ .any(|reason| reason == "1 non-critical delivery outbox job(s) are blocked")
+ );
+ Ok(())
+}
+
+#[tokio::test]
+async fn status_surfaces_last_delivery_recovery_result() -> TestResult<()> {
+ let runtime = build_runtime(|_| {});
+ runtime.record_operation_audit(&MycOperationAuditRecord::new(
+ MycOperationAuditKind::DeliveryRecovery,
+ MycOperationAuditOutcome::Succeeded,
+ None,
+ None,
+ 2,
+ 2,
+ "recovered 2/2 delivery outbox job(s); republished 1",
+ ));
+
+ let status = collect_status_full(&runtime).await?;
+ let last_recovery = status
+ .delivery_outbox
+ .last_recovery
+ .expect("last delivery recovery");
+
+ assert_eq!(last_recovery.outcome, MycOperationAuditOutcome::Succeeded);
+ assert_eq!(
+ last_recovery.summary,
+ "recovered 2/2 delivery outbox job(s); republished 1"
+ );
+ Ok(())
+}
+
+#[tokio::test]
async fn status_reports_sqlite_persistence_schema_state() -> TestResult<()> {
let runtime = build_runtime(|config| {
config.persistence.signer_state_backend = MycSignerStateBackend::Sqlite;