myc

Self-custodial remote signer for Radroots apps
git clone https://radroots.dev/git/myc.git
Log | Files | Refs | README | LICENSE

commit 190d69f7b8d3cb76d92df2c2fb84b04509ccb66b
parent 9d5b15da7c3c3ad7d0bd0f8dd84a2631936197b3
Author: triesap <tyson@radroots.org>
Date:   Wed, 25 Mar 2026 20:45:47 +0000

status: add operability status and metrics

- add a shared operability model for health, readiness, relay probes, and metrics snapshots
- add machine-readable status and metrics commands backed by runtime and audit state
- expose startup serialization and audit store full-scan support for the new outputs
- prove no-transport, partial-outage, and metrics rendering behavior with new e2e and cli tests

Diffstat:
Msrc/app/runtime.rs | 3++-
Msrc/audit.rs | 4++++
Msrc/cli.rs | 118++++++++++++++++++++++++++++++++++++++-----------------------------------------
Msrc/lib.rs | 7+++++++
Asrc/operability/mod.rs | 912+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Msrc/transport.rs | 2+-
Atests/operability_cli.rs | 113+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Atests/operability_e2e.rs | 194+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
8 files changed, 1290 insertions(+), 63 deletions(-)

diff --git a/src/app/runtime.rs b/src/app/runtime.rs @@ -12,6 +12,7 @@ use radroots_nostr_signer::prelude::{ RadrootsNostrFileSignerStore, RadrootsNostrSignerApprovalRequirement, RadrootsNostrSignerManager, }; +use serde::Serialize; use std::sync::Arc; #[derive(Debug, Clone, PartialEq, Eq)] @@ -23,7 +24,7 @@ pub struct MycRuntimePaths { pub signer_state_path: PathBuf, } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] pub struct MycStartupSnapshot { pub instance_name: String, pub log_filter: String, diff --git a/src/audit.rs b/src/audit.rs @@ -194,6 +194,10 @@ impl MycOperationAuditStore { self.list_with_limit(self.config.default_read_limit) } + pub fn list_all(&self) -> Result<Vec<MycOperationAuditRecord>, MycError> { + self.list_matching(usize::MAX, |_| true) + } + pub fn list_with_limit(&self, limit: usize) -> Result<Vec<MycOperationAuditRecord>, MycError> { self.list_matching(limit, |_| true) } diff --git a/src/cli.rs b/src/cli.rs @@ -19,6 +19,11 @@ use crate::discovery::{ }; use crate::error::MycError; use crate::logging; +use crate::operability::{ + MycAuditDecisionCounts, MycOperationOutcomeCounts, MycStatusFullOutput, MycStatusSummaryOutput, + collect_metrics, collect_status_full, collect_status_summary, increment_outcome_counts, + is_aggregate_publish_operation, operation_kind_label, render_metrics_text, +}; #[derive(Debug, Parser)] #[command(name = "myc")] @@ -33,6 +38,14 @@ pub struct MycCli { #[derive(Debug, Subcommand)] pub enum MycCommand { Run, + Status { + #[arg(long, value_enum, default_value_t = MycStatusView::Summary)] + view: MycStatusView, + }, + Metrics { + #[arg(long, value_enum, default_value_t = MycMetricsFormat::Prometheus)] + format: MycMetricsFormat, + }, Connections { #[command(subcommand)] command: MycConnectionsCommand, @@ -110,6 +123,18 @@ pub enum MycDiscoveryRepairAttemptView { Records, } +#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)] +pub enum MycStatusView { + Summary, + Full, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)] +pub enum MycMetricsFormat { + Json, + Prometheus, +} + #[derive(Debug, Subcommand)] pub enum MycAuthCommand { Require { @@ -180,26 +205,6 @@ pub struct MycAuditListOutput { pub runtime_operation_audit: Vec<MycOperationAuditRecord>, } -#[derive(Debug, Default, Serialize, PartialEq, Eq)] -pub struct MycAuditDecisionCounts { - pub allowed: usize, - pub denied: usize, - pub challenged: usize, -} - -#[derive(Debug, Default, Serialize, PartialEq, Eq)] -pub struct MycOperationOutcomeCounts { - pub succeeded: usize, - pub rejected: usize, - pub restored: usize, - pub unavailable: usize, - pub missing: usize, - pub matched: usize, - pub drifted: usize, - pub conflicted: usize, - pub skipped: usize, -} - #[derive(Debug, Serialize, PartialEq, Eq)] pub struct MycAuditSummaryOutput { pub record_limit: usize, @@ -261,6 +266,13 @@ pub enum MycDiscoveryRepairAttemptOutput { Records(MycDiscoveryRepairAttemptRecordsOutput), } +#[derive(Debug, Serialize, PartialEq, Eq)] +#[serde(untagged)] +pub enum MycStatusOutput { + Summary(MycStatusSummaryOutput), + Full(MycStatusFullOutput), +} + pub async fn run_from_env() -> Result<(), MycError> { let cli = MycCli::parse(); let config = load_config(cli.env_file.as_deref())?; @@ -270,6 +282,27 @@ pub async fn run_from_env() -> Result<(), MycError> { logging::init_logging(&config.logging)?; MycRuntime::bootstrap(config)?.run().await } + MycCommand::Status { view } => { + let runtime = MycRuntime::bootstrap(config)?; + let output = match view { + MycStatusView::Summary => { + MycStatusOutput::Summary(collect_status_summary(&runtime).await?) + } + MycStatusView::Full => MycStatusOutput::Full(collect_status_full(&runtime).await?), + }; + print_json(&output) + } + MycCommand::Metrics { format } => { + let runtime = MycRuntime::bootstrap(config)?; + let output = collect_metrics(&runtime)?; + match format { + MycMetricsFormat::Json => print_json(&output), + MycMetricsFormat::Prometheus => { + print_text(&render_metrics_text(&output)); + Ok(()) + } + } + } MycCommand::Connections { command } => { let runtime = MycRuntime::bootstrap(config)?; match command { @@ -654,47 +687,6 @@ fn audit_read_limit(runtime: &MycRuntime, limit: Option<usize>) -> usize { limit.unwrap_or(runtime.operation_audit_store().config().default_read_limit) } -fn increment_outcome_counts( - counts: &mut MycOperationOutcomeCounts, - outcome: MycOperationAuditOutcome, -) { - match outcome { - MycOperationAuditOutcome::Succeeded => counts.succeeded += 1, - MycOperationAuditOutcome::Rejected => counts.rejected += 1, - MycOperationAuditOutcome::Restored => counts.restored += 1, - MycOperationAuditOutcome::Unavailable => counts.unavailable += 1, - MycOperationAuditOutcome::Missing => counts.missing += 1, - MycOperationAuditOutcome::Matched => counts.matched += 1, - MycOperationAuditOutcome::Drifted => counts.drifted += 1, - MycOperationAuditOutcome::Conflicted => counts.conflicted += 1, - MycOperationAuditOutcome::Skipped => counts.skipped += 1, - } -} - -fn operation_kind_label(kind: MycOperationAuditKind) -> String { - match kind { - MycOperationAuditKind::ListenerResponsePublish => "listener_response_publish".to_owned(), - MycOperationAuditKind::ConnectAcceptPublish => "connect_accept_publish".to_owned(), - MycOperationAuditKind::AuthReplayPublish => "auth_replay_publish".to_owned(), - MycOperationAuditKind::AuthReplayRestore => "auth_replay_restore".to_owned(), - MycOperationAuditKind::DiscoveryHandlerFetch => "discovery_handler_fetch".to_owned(), - MycOperationAuditKind::DiscoveryHandlerPublish => "discovery_handler_publish".to_owned(), - MycOperationAuditKind::DiscoveryHandlerCompare => "discovery_handler_compare".to_owned(), - MycOperationAuditKind::DiscoveryHandlerRefresh => "discovery_handler_refresh".to_owned(), - MycOperationAuditKind::DiscoveryHandlerRepair => "discovery_handler_repair".to_owned(), - } -} - -fn is_aggregate_publish_operation(kind: MycOperationAuditKind) -> bool { - matches!( - kind, - MycOperationAuditKind::ListenerResponsePublish - | MycOperationAuditKind::ConnectAcceptPublish - | MycOperationAuditKind::AuthReplayPublish - | MycOperationAuditKind::DiscoveryHandlerPublish - ) -} - impl MycDiscoveryRepairAttemptSummaryOutput { fn from_records( attempt_id: &str, @@ -806,6 +798,10 @@ where Ok(()) } +fn print_text(value: &str) { + println!("{value}"); +} + #[cfg(test)] mod tests { use std::path::PathBuf; diff --git a/src/lib.rs b/src/lib.rs @@ -8,6 +8,7 @@ pub mod control; pub mod discovery; pub mod error; pub mod logging; +pub mod operability; pub mod policy; pub mod transport; @@ -34,6 +35,12 @@ pub use discovery::{ render_nip05_output, verify_bundle, }; pub use error::MycError; +pub use operability::{ + MycAuditDecisionCounts, MycDiscoveryStatusOutput, MycMetricsSnapshot, + MycOperationOutcomeCounts, MycRelayProbe, MycRelayProbeAvailability, MycRuntimeStatus, + MycStatusFullOutput, MycStatusSummaryOutput, MycTransportStatusOutput, collect_metrics, + collect_status_full, collect_status_summary, render_metrics_text, +}; pub use policy::{MycConnectDecision, MycPolicyContext}; pub use transport::{MycNostrTransport, MycRelayPublishResult, MycTransportSnapshot}; diff --git a/src/operability/mod.rs b/src/operability/mod.rs @@ -0,0 +1,912 @@ +use std::collections::BTreeMap; +use std::time::Duration; + +use radroots_identity::RadrootsIdentity; +use radroots_nostr::prelude::{ + RadrootsNostrClient, RadrootsNostrRelayStatus, RadrootsNostrRelayUrl, +}; +use radroots_nostr_signer::prelude::RadrootsNostrSignerRequestDecision; +use serde::Serialize; +use tokio::task::JoinSet; + +use crate::app::MycRuntime; +use crate::audit::{MycOperationAuditKind, MycOperationAuditOutcome}; +use crate::config::MycTransportDeliveryPolicy; +use crate::discovery::MycDiscoveryContext; +use crate::error::MycError; +use crate::transport::MycTransportSnapshot; + +const MYC_RELAY_PROBE_CONCURRENCY_LIMIT: usize = 4; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum MycRuntimeStatus { + Healthy, + Degraded, + Unready, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum MycRelayProbeAvailability { + Available, + Unavailable, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub struct MycRelayProbe { + pub relay_url: String, + pub availability: MycRelayProbeAvailability, + #[serde(skip_serializing_if = "Option::is_none")] + pub relay_status: Option<String>, + pub connection_attempts: usize, + pub successful_connections: usize, + #[serde(skip_serializing_if = "Option::is_none")] + pub latency_ms: Option<u64>, + pub queue_depth: usize, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option<String>, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub struct MycTransportStatusOutput { + pub enabled: bool, + pub status: MycRuntimeStatus, + pub ready: bool, + pub configured_relay_count: usize, + pub required_available_relays: usize, + pub available_relay_count: usize, + pub unavailable_relay_count: usize, + pub delivery_policy: MycTransportDeliveryPolicy, + #[serde(skip_serializing_if = "Option::is_none")] + pub delivery_quorum: Option<usize>, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub relay_probes: Vec<MycRelayProbe>, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub struct MycDiscoveryRelayGroupStatusOutput { + pub configured_relay_count: usize, + pub available_relay_count: usize, + pub unavailable_relay_count: usize, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub relay_probes: Vec<MycRelayProbe>, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub struct MycDiscoveryStatusOutput { + pub enabled: bool, + pub status: MycRuntimeStatus, + pub public_relays: MycDiscoveryRelayGroupStatusOutput, + pub publish_relays: MycDiscoveryRelayGroupStatusOutput, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub struct MycStatusFullOutput { + pub status: MycRuntimeStatus, + pub ready: bool, + pub reasons: Vec<String>, + pub startup: crate::app::MycStartupSnapshot, + pub transport: MycTransportStatusOutput, + pub discovery: MycDiscoveryStatusOutput, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub struct MycStatusSummaryOutput { + pub status: MycRuntimeStatus, + pub ready: bool, + pub reasons: Vec<String>, + pub instance_name: String, + pub transport: MycTransportStatusOutput, + pub discovery: MycDiscoveryStatusOutput, +} + +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize)] +pub struct MycAuditDecisionCounts { + pub allowed: usize, + pub denied: usize, + pub challenged: usize, +} + +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize)] +pub struct MycOperationOutcomeCounts { + pub succeeded: usize, + pub rejected: usize, + pub restored: usize, + pub unavailable: usize, + pub missing: usize, + pub matched: usize, + pub drifted: usize, + pub conflicted: usize, + pub skipped: usize, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub struct MycMetricsSnapshot { + pub signer_request_total: usize, + pub signer_request_decisions: MycAuditDecisionCounts, + pub runtime_operation_total: usize, + pub runtime_operation_outcomes: MycOperationOutcomeCounts, + pub runtime_operation_by_kind: BTreeMap<String, MycOperationOutcomeCounts>, + pub runtime_aggregate_publish_rejection_count: usize, + pub runtime_repair_success_count: usize, + pub runtime_repair_rejection_count: usize, + pub runtime_unavailable_count: usize, + pub runtime_replay_restore_count: usize, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct MycTransportStatusEvaluation { + output: MycTransportStatusOutput, + reasons: Vec<String>, +} + +pub async fn collect_status_full(runtime: &MycRuntime) -> Result<MycStatusFullOutput, MycError> { + let snapshot = runtime.snapshot(); + let transport = collect_transport_status(runtime).await?; + let discovery = collect_discovery_status(runtime).await?; + let mut reasons = transport.reasons; + reasons.extend(discovery.reasons); + let status = combine_runtime_status( + transport.output.status, + if discovery.output.enabled { + Some(discovery.output.status) + } else { + None + }, + ); + let ready = transport.output.ready; + Ok(MycStatusFullOutput { + status, + ready, + reasons, + startup: snapshot, + transport: transport.output, + discovery: discovery.output, + }) +} + +pub async fn collect_status_summary( + runtime: &MycRuntime, +) -> Result<MycStatusSummaryOutput, MycError> { + let full = collect_status_full(runtime).await?; + Ok(MycStatusSummaryOutput { + status: full.status, + ready: full.ready, + reasons: full.reasons, + instance_name: full.startup.instance_name, + transport: MycTransportStatusOutput { + relay_probes: Vec::new(), + ..full.transport + }, + discovery: MycDiscoveryStatusOutput { + enabled: full.discovery.enabled, + status: full.discovery.status, + public_relays: MycDiscoveryRelayGroupStatusOutput { + relay_probes: Vec::new(), + ..full.discovery.public_relays + }, + publish_relays: MycDiscoveryRelayGroupStatusOutput { + relay_probes: Vec::new(), + ..full.discovery.publish_relays + }, + }, + }) +} + +pub fn collect_metrics(runtime: &MycRuntime) -> Result<MycMetricsSnapshot, MycError> { + let manager = runtime.signer_manager()?; + let signer_request_audit = manager.list_audit_records()?; + let runtime_operation_audit = runtime.operation_audit_store().list_all()?; + + let mut signer_request_decisions = MycAuditDecisionCounts::default(); + for record in &signer_request_audit { + match record.decision { + RadrootsNostrSignerRequestDecision::Allowed => signer_request_decisions.allowed += 1, + RadrootsNostrSignerRequestDecision::Denied => signer_request_decisions.denied += 1, + RadrootsNostrSignerRequestDecision::Challenged => { + signer_request_decisions.challenged += 1; + } + } + } + + let mut runtime_operation_outcomes = MycOperationOutcomeCounts::default(); + let mut runtime_operation_by_kind = BTreeMap::new(); + let mut runtime_aggregate_publish_rejection_count = 0; + let mut runtime_repair_success_count = 0; + let mut runtime_repair_rejection_count = 0; + let mut runtime_unavailable_count = 0; + let mut runtime_replay_restore_count = 0; + for record in &runtime_operation_audit { + increment_outcome_counts(&mut runtime_operation_outcomes, record.outcome); + increment_outcome_counts( + runtime_operation_by_kind + .entry(operation_kind_label(record.operation)) + .or_default(), + record.outcome, + ); + if is_aggregate_publish_operation(record.operation) + && record.outcome == MycOperationAuditOutcome::Rejected + { + runtime_aggregate_publish_rejection_count += 1; + } + if record.operation == MycOperationAuditKind::DiscoveryHandlerRepair { + match record.outcome { + MycOperationAuditOutcome::Succeeded => runtime_repair_success_count += 1, + MycOperationAuditOutcome::Rejected => runtime_repair_rejection_count += 1, + _ => {} + } + } + if record.outcome == MycOperationAuditOutcome::Unavailable { + runtime_unavailable_count += 1; + } + if record.operation == MycOperationAuditKind::AuthReplayRestore + && record.outcome == MycOperationAuditOutcome::Restored + { + runtime_replay_restore_count += 1; + } + } + + Ok(MycMetricsSnapshot { + signer_request_total: signer_request_audit.len(), + signer_request_decisions, + runtime_operation_total: runtime_operation_audit.len(), + runtime_operation_outcomes, + runtime_operation_by_kind, + runtime_aggregate_publish_rejection_count, + runtime_repair_success_count, + runtime_repair_rejection_count, + runtime_unavailable_count, + runtime_replay_restore_count, + }) +} + +pub fn render_metrics_text(snapshot: &MycMetricsSnapshot) -> String { + let mut lines = Vec::new(); + push_counter( + &mut lines, + "myc_signer_request_total", + snapshot.signer_request_total, + ); + push_labeled_counter( + &mut lines, + "myc_signer_request_decision_total", + "decision", + "allowed", + snapshot.signer_request_decisions.allowed, + ); + push_labeled_counter( + &mut lines, + "myc_signer_request_decision_total", + "decision", + "denied", + snapshot.signer_request_decisions.denied, + ); + push_labeled_counter( + &mut lines, + "myc_signer_request_decision_total", + "decision", + "challenged", + snapshot.signer_request_decisions.challenged, + ); + + push_counter( + &mut lines, + "myc_runtime_operation_total", + snapshot.runtime_operation_total, + ); + push_outcome_counters( + &mut lines, + "myc_runtime_operation_outcome_total", + &snapshot.runtime_operation_outcomes, + ); + for (kind, counts) in &snapshot.runtime_operation_by_kind { + push_outcome_counters_with_extra_label( + &mut lines, + "myc_runtime_operation_kind_total", + "kind", + kind, + counts, + ); + } + push_counter( + &mut lines, + "myc_runtime_aggregate_publish_rejection_total", + snapshot.runtime_aggregate_publish_rejection_count, + ); + push_counter( + &mut lines, + "myc_runtime_repair_success_total", + snapshot.runtime_repair_success_count, + ); + push_counter( + &mut lines, + "myc_runtime_repair_rejection_total", + snapshot.runtime_repair_rejection_count, + ); + push_counter( + &mut lines, + "myc_runtime_unavailable_total", + snapshot.runtime_unavailable_count, + ); + push_counter( + &mut lines, + "myc_runtime_replay_restore_total", + snapshot.runtime_replay_restore_count, + ); + + lines.join("\n") +} + +pub fn increment_outcome_counts( + counts: &mut MycOperationOutcomeCounts, + outcome: MycOperationAuditOutcome, +) { + match outcome { + MycOperationAuditOutcome::Succeeded => counts.succeeded += 1, + MycOperationAuditOutcome::Rejected => counts.rejected += 1, + MycOperationAuditOutcome::Restored => counts.restored += 1, + MycOperationAuditOutcome::Unavailable => counts.unavailable += 1, + MycOperationAuditOutcome::Missing => counts.missing += 1, + MycOperationAuditOutcome::Matched => counts.matched += 1, + MycOperationAuditOutcome::Drifted => counts.drifted += 1, + MycOperationAuditOutcome::Conflicted => counts.conflicted += 1, + MycOperationAuditOutcome::Skipped => counts.skipped += 1, + } +} + +pub fn operation_kind_label(kind: MycOperationAuditKind) -> String { + match kind { + MycOperationAuditKind::ListenerResponsePublish => "listener_response_publish".to_owned(), + MycOperationAuditKind::ConnectAcceptPublish => "connect_accept_publish".to_owned(), + MycOperationAuditKind::AuthReplayPublish => "auth_replay_publish".to_owned(), + MycOperationAuditKind::AuthReplayRestore => "auth_replay_restore".to_owned(), + MycOperationAuditKind::DiscoveryHandlerFetch => "discovery_handler_fetch".to_owned(), + MycOperationAuditKind::DiscoveryHandlerPublish => "discovery_handler_publish".to_owned(), + MycOperationAuditKind::DiscoveryHandlerCompare => "discovery_handler_compare".to_owned(), + MycOperationAuditKind::DiscoveryHandlerRefresh => "discovery_handler_refresh".to_owned(), + MycOperationAuditKind::DiscoveryHandlerRepair => "discovery_handler_repair".to_owned(), + } +} + +pub fn is_aggregate_publish_operation(kind: MycOperationAuditKind) -> bool { + matches!( + kind, + MycOperationAuditKind::ListenerResponsePublish + | MycOperationAuditKind::ConnectAcceptPublish + | MycOperationAuditKind::AuthReplayPublish + | MycOperationAuditKind::DiscoveryHandlerPublish + ) +} + +async fn collect_transport_status( + runtime: &MycRuntime, +) -> Result<MycTransportStatusEvaluation, MycError> { + let snapshot = runtime.snapshot().transport; + if !snapshot.enabled { + return Ok(MycTransportStatusEvaluation { + output: MycTransportStatusOutput { + enabled: false, + status: MycRuntimeStatus::Unready, + ready: false, + configured_relay_count: 0, + required_available_relays: 0, + available_relay_count: 0, + unavailable_relay_count: 0, + delivery_policy: snapshot.delivery_policy, + delivery_quorum: snapshot.delivery_quorum, + relay_probes: Vec::new(), + }, + reasons: vec!["transport is disabled".to_owned()], + }); + } + + let Some(transport) = runtime.transport() else { + return Ok(MycTransportStatusEvaluation { + output: MycTransportStatusOutput { + enabled: true, + status: MycRuntimeStatus::Unready, + ready: false, + configured_relay_count: 0, + required_available_relays: 0, + available_relay_count: 0, + unavailable_relay_count: 0, + delivery_policy: snapshot.delivery_policy, + delivery_quorum: snapshot.delivery_quorum, + relay_probes: Vec::new(), + }, + reasons: vec!["transport is enabled but no transport client was prepared".to_owned()], + }); + }; + + let relay_probes = probe_relays( + runtime.signer_identity(), + transport.relays(), + transport.connect_timeout_secs(), + ) + .await?; + let available_relay_count = relay_probes + .iter() + .filter(|probe| probe.availability == MycRelayProbeAvailability::Available) + .count(); + let configured_relay_count = relay_probes.len(); + let unavailable_relay_count = configured_relay_count.saturating_sub(available_relay_count); + let required_available_relays = + required_available_relays(&snapshot, configured_relay_count).unwrap_or(usize::MAX); + let ready = available_relay_count >= required_available_relays; + let status = if !ready { + MycRuntimeStatus::Unready + } else if unavailable_relay_count > 0 { + MycRuntimeStatus::Degraded + } else { + MycRuntimeStatus::Healthy + }; + let mut reasons = Vec::new(); + if !ready { + reasons.push(format!( + "transport availability {available_relay_count}/{} does not satisfy delivery policy {}", + configured_relay_count, + snapshot.delivery_policy.as_str() + )); + } else if unavailable_relay_count > 0 { + reasons.push(format!( + "{unavailable_relay_count} transport relay(s) are unavailable" + )); + } + + Ok(MycTransportStatusEvaluation { + output: MycTransportStatusOutput { + enabled: true, + status, + ready, + configured_relay_count, + required_available_relays, + available_relay_count, + unavailable_relay_count, + delivery_policy: snapshot.delivery_policy, + delivery_quorum: snapshot.delivery_quorum, + relay_probes, + }, + reasons, + }) +} + +struct MycDiscoveryStatusEvaluation { + output: MycDiscoveryStatusOutput, + reasons: Vec<String>, +} + +async fn collect_discovery_status( + runtime: &MycRuntime, +) -> Result<MycDiscoveryStatusEvaluation, MycError> { + if !runtime.config().discovery.enabled { + return Ok(MycDiscoveryStatusEvaluation { + output: MycDiscoveryStatusOutput { + enabled: false, + status: MycRuntimeStatus::Healthy, + public_relays: MycDiscoveryRelayGroupStatusOutput { + configured_relay_count: 0, + available_relay_count: 0, + unavailable_relay_count: 0, + relay_probes: Vec::new(), + }, + publish_relays: MycDiscoveryRelayGroupStatusOutput { + configured_relay_count: 0, + available_relay_count: 0, + unavailable_relay_count: 0, + relay_probes: Vec::new(), + }, + }, + reasons: Vec::new(), + }); + } + + let context = MycDiscoveryContext::from_runtime(runtime)?; + let public_relays = runtime + .config() + .discovery + .resolved_public_relays(&runtime.config().transport)?; + let public_relays = probe_relays( + context.app_identity(), + public_relays.as_slice(), + context.connect_timeout_secs(), + ) + .await?; + let publish_relays = probe_relays( + context.app_identity(), + context.publish_relays(), + context.connect_timeout_secs(), + ) + .await?; + let public_group = summarize_discovery_relay_group(public_relays); + let publish_group = summarize_discovery_relay_group(publish_relays); + + let status = + if public_group.unavailable_relay_count > 0 || publish_group.unavailable_relay_count > 0 { + MycRuntimeStatus::Degraded + } else { + MycRuntimeStatus::Healthy + }; + let mut reasons = Vec::new(); + if public_group.unavailable_relay_count > 0 { + reasons.push(format!( + "{} discovery public relay(s) are unavailable", + public_group.unavailable_relay_count + )); + } + if publish_group.unavailable_relay_count > 0 { + reasons.push(format!( + "{} discovery publish relay(s) are unavailable", + publish_group.unavailable_relay_count + )); + } + + Ok(MycDiscoveryStatusEvaluation { + output: MycDiscoveryStatusOutput { + enabled: true, + status, + public_relays: public_group, + publish_relays: publish_group, + }, + reasons, + }) +} + +fn summarize_discovery_relay_group( + relay_probes: Vec<MycRelayProbe>, +) -> MycDiscoveryRelayGroupStatusOutput { + let configured_relay_count = relay_probes.len(); + let available_relay_count = relay_probes + .iter() + .filter(|probe| probe.availability == MycRelayProbeAvailability::Available) + .count(); + let unavailable_relay_count = configured_relay_count.saturating_sub(available_relay_count); + MycDiscoveryRelayGroupStatusOutput { + configured_relay_count, + available_relay_count, + unavailable_relay_count, + relay_probes, + } +} + +fn combine_runtime_status( + transport_status: MycRuntimeStatus, + discovery_status: Option<MycRuntimeStatus>, +) -> MycRuntimeStatus { + let mut status = transport_status; + if let Some(discovery_status) = discovery_status { + status = worse_runtime_status(status, discovery_status); + } + status +} + +fn worse_runtime_status(left: MycRuntimeStatus, right: MycRuntimeStatus) -> MycRuntimeStatus { + use MycRuntimeStatus::{Degraded, Healthy, Unready}; + match (left, right) { + (Unready, _) | (_, Unready) => Unready, + (Degraded, _) | (_, Degraded) => Degraded, + _ => Healthy, + } +} + +fn required_available_relays( + snapshot: &MycTransportSnapshot, + configured_relay_count: usize, +) -> Result<usize, MycError> { + match snapshot.delivery_policy { + MycTransportDeliveryPolicy::Any => Ok(1), + MycTransportDeliveryPolicy::All => Ok(configured_relay_count), + MycTransportDeliveryPolicy::Quorum => snapshot.delivery_quorum.ok_or_else(|| { + MycError::InvalidConfig( + "transport.delivery_quorum must be set when transport.delivery_policy is `quorum`" + .to_owned(), + ) + }), + } +} + +async fn probe_relays( + identity: &RadrootsIdentity, + relays: &[RadrootsNostrRelayUrl], + connect_timeout_secs: u64, +) -> Result<Vec<MycRelayProbe>, MycError> { + let relay_count = relays.len(); + if relay_count == 0 { + return Ok(Vec::new()); + } + + let mut pending = relays + .iter() + .cloned() + .enumerate() + .collect::<Vec<_>>() + .into_iter(); + let mut join_set = JoinSet::new(); + let max_concurrency = relay_count.min(MYC_RELAY_PROBE_CONCURRENCY_LIMIT); + + while join_set.len() < max_concurrency { + let Some((relay_index, relay)) = pending.next() else { + break; + }; + let identity = (*identity).clone(); + join_set.spawn(async move { + let probe = probe_relay(identity, relay.clone(), connect_timeout_secs).await; + (relay_index, probe) + }); + } + + let mut probes = std::iter::repeat_with(|| None) + .take(relay_count) + .collect::<Vec<Option<MycRelayProbe>>>(); + + while let Some(joined) = join_set.join_next().await { + let (relay_index, probe_result) = joined.map_err(|error| { + MycError::InvalidOperation(format!("relay probe task failed: {error}")) + })?; + probes[relay_index] = Some(probe_result?); + while join_set.len() < max_concurrency { + let Some((relay_index, relay)) = pending.next() else { + break; + }; + let identity = (*identity).clone(); + join_set.spawn(async move { + let probe = probe_relay(identity, relay.clone(), connect_timeout_secs).await; + (relay_index, probe) + }); + } + } + + probes + .into_iter() + .map(|probe| { + probe.ok_or_else(|| MycError::InvalidOperation("missing relay probe result".to_owned())) + }) + .collect() +} + +async fn probe_relay( + identity: RadrootsIdentity, + relay: RadrootsNostrRelayUrl, + connect_timeout_secs: u64, +) -> Result<MycRelayProbe, MycError> { + let relay_url = relay.to_string(); + let client = RadrootsNostrClient::from_identity_owned(identity); + client + .add_relay(relay.as_str()) + .await + .map_err(MycError::from)?; + + match client + .try_connect_relay(relay.as_str(), Duration::from_secs(connect_timeout_secs)) + .await + { + Ok(_) => { + let relays = client.relays().await; + let relay_state = relays.get(&relay).ok_or_else(|| { + MycError::InvalidOperation(format!( + "connected relay `{relay_url}` did not appear in the relay map" + )) + })?; + Ok(MycRelayProbe { + relay_url, + availability: MycRelayProbeAvailability::Available, + relay_status: Some(relay_status_label(relay_state.status())), + connection_attempts: relay_state.stats().attempts(), + successful_connections: relay_state.stats().success(), + latency_ms: relay_state + .stats() + .latency() + .map(|duration| duration.as_millis() as u64), + queue_depth: relay_state.queue(), + error: None, + }) + } + Err(error) => Ok(MycRelayProbe { + relay_url, + availability: MycRelayProbeAvailability::Unavailable, + relay_status: None, + connection_attempts: 0, + successful_connections: 0, + latency_ms: None, + queue_depth: 0, + error: Some(error.to_string()), + }), + } +} + +fn relay_status_label(status: RadrootsNostrRelayStatus) -> String { + status.to_string().to_ascii_lowercase() +} + +fn push_counter(lines: &mut Vec<String>, name: &str, value: usize) { + lines.push(format!("{name} {value}")); +} + +fn push_labeled_counter( + lines: &mut Vec<String>, + name: &str, + label_key: &str, + label_value: &str, + value: usize, +) { + lines.push(format!(r#"{name}{{{label_key}="{label_value}"}} {value}"#)); +} + +fn push_outcome_counters(lines: &mut Vec<String>, name: &str, counts: &MycOperationOutcomeCounts) { + push_labeled_counter(lines, name, "outcome", "succeeded", counts.succeeded); + push_labeled_counter(lines, name, "outcome", "rejected", counts.rejected); + push_labeled_counter(lines, name, "outcome", "restored", counts.restored); + push_labeled_counter(lines, name, "outcome", "unavailable", counts.unavailable); + push_labeled_counter(lines, name, "outcome", "missing", counts.missing); + push_labeled_counter(lines, name, "outcome", "matched", counts.matched); + push_labeled_counter(lines, name, "outcome", "drifted", counts.drifted); + push_labeled_counter(lines, name, "outcome", "conflicted", counts.conflicted); + push_labeled_counter(lines, name, "outcome", "skipped", counts.skipped); +} + +fn push_outcome_counters_with_extra_label( + lines: &mut Vec<String>, + name: &str, + extra_label_key: &str, + extra_label_value: &str, + counts: &MycOperationOutcomeCounts, +) { + push_labeled_counter_pair( + lines, + name, + extra_label_key, + extra_label_value, + "outcome", + "succeeded", + counts.succeeded, + ); + push_labeled_counter_pair( + lines, + name, + extra_label_key, + extra_label_value, + "outcome", + "rejected", + counts.rejected, + ); + push_labeled_counter_pair( + lines, + name, + extra_label_key, + extra_label_value, + "outcome", + "restored", + counts.restored, + ); + push_labeled_counter_pair( + lines, + name, + extra_label_key, + extra_label_value, + "outcome", + "unavailable", + counts.unavailable, + ); + push_labeled_counter_pair( + lines, + name, + extra_label_key, + extra_label_value, + "outcome", + "missing", + counts.missing, + ); + push_labeled_counter_pair( + lines, + name, + extra_label_key, + extra_label_value, + "outcome", + "matched", + counts.matched, + ); + push_labeled_counter_pair( + lines, + name, + extra_label_key, + extra_label_value, + "outcome", + "drifted", + counts.drifted, + ); + push_labeled_counter_pair( + lines, + name, + extra_label_key, + extra_label_value, + "outcome", + "conflicted", + counts.conflicted, + ); + push_labeled_counter_pair( + lines, + name, + extra_label_key, + extra_label_value, + "outcome", + "skipped", + counts.skipped, + ); +} + +fn push_labeled_counter_pair( + lines: &mut Vec<String>, + name: &str, + first_key: &str, + first_value: &str, + second_key: &str, + second_value: &str, + value: usize, +) { + lines.push(format!( + r#"{name}{{{first_key}="{first_value}",{second_key}="{second_value}"}} {value}"# + )); +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + + use super::{ + MycMetricsSnapshot, MycOperationOutcomeCounts, MycRuntimeStatus, render_metrics_text, + worse_runtime_status, + }; + + #[test] + fn runtime_status_prefers_the_worst_state() { + assert_eq!( + worse_runtime_status(MycRuntimeStatus::Healthy, MycRuntimeStatus::Degraded), + MycRuntimeStatus::Degraded + ); + assert_eq!( + worse_runtime_status(MycRuntimeStatus::Healthy, MycRuntimeStatus::Unready), + MycRuntimeStatus::Unready + ); + assert_eq!( + worse_runtime_status(MycRuntimeStatus::Degraded, MycRuntimeStatus::Healthy), + MycRuntimeStatus::Degraded + ); + } + + #[test] + fn metrics_text_renderer_is_deterministic() { + let metrics = MycMetricsSnapshot { + signer_request_total: 3, + signer_request_decisions: super::MycAuditDecisionCounts { + allowed: 1, + denied: 1, + challenged: 1, + }, + runtime_operation_total: 2, + runtime_operation_outcomes: MycOperationOutcomeCounts { + succeeded: 1, + rejected: 1, + ..MycOperationOutcomeCounts::default() + }, + runtime_operation_by_kind: BTreeMap::from([( + "listener_response_publish".to_owned(), + MycOperationOutcomeCounts { + succeeded: 1, + ..MycOperationOutcomeCounts::default() + }, + )]), + runtime_aggregate_publish_rejection_count: 1, + runtime_repair_success_count: 0, + runtime_repair_rejection_count: 0, + runtime_unavailable_count: 0, + runtime_replay_restore_count: 0, + }; + + let rendered = render_metrics_text(&metrics); + + assert!(rendered.contains("myc_signer_request_total 3")); + assert!(rendered.contains( + r#"myc_runtime_operation_kind_total{kind="listener_response_publish",outcome="succeeded"} 1"# + )); + } +} diff --git a/src/transport.rs b/src/transport.rs @@ -28,7 +28,7 @@ pub struct MycNostrTransport { publish_max_backoff_millis: u64, } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] pub struct MycTransportSnapshot { pub enabled: bool, pub relay_count: usize, diff --git a/tests/operability_cli.rs b/tests/operability_cli.rs @@ -0,0 +1,113 @@ +use std::path::Path; +use std::process::Command; + +use myc::{MycOperationAuditKind, MycOperationAuditOutcome, MycOperationAuditRecord, MycRuntime}; +use radroots_identity::RadrootsIdentity; +use serde_json::Value; + +fn write_test_identity(path: &Path, secret_key: &str) { + RadrootsIdentity::from_secret_key_str(secret_key) + .expect("identity from secret") + .save_json(path) + .expect("write identity"); +} + +fn write_env_file(temp: &tempfile::TempDir) -> std::path::PathBuf { + let state_dir = temp.path().join("state"); + let signer_path = temp.path().join("signer.json"); + let user_path = temp.path().join("user.json"); + let env_path = temp.path().join("myc.env"); + + write_test_identity( + signer_path.as_path(), + "1111111111111111111111111111111111111111111111111111111111111111", + ); + write_test_identity( + user_path.as_path(), + "2222222222222222222222222222222222222222222222222222222222222222", + ); + + std::fs::write( + &env_path, + format!( + "MYC_SERVICE_INSTANCE_NAME=myc-test\n\ +MYC_LOGGING_FILTER=info,myc=info\n\ +MYC_LOGGING_STDOUT=false\n\ +MYC_PATHS_STATE_DIR={}\n\ +MYC_PATHS_SIGNER_IDENTITY_PATH={}\n\ +MYC_PATHS_USER_IDENTITY_PATH={}\n\ +MYC_DISCOVERY_ENABLED=false\n\ +MYC_TRANSPORT_ENABLED=false\n\ +MYC_TRANSPORT_CONNECT_TIMEOUT_SECS=1\n", + state_dir.display(), + signer_path.display(), + user_path.display(), + ), + ) + .expect("write env"); + + env_path +} + +#[test] +fn status_summary_command_emits_machine_readable_json() { + let temp = tempfile::tempdir().expect("tempdir"); + let env_path = write_env_file(&temp); + + let output = Command::new(env!("CARGO_BIN_EXE_myc")) + .arg("--env-file") + .arg(&env_path) + .arg("status") + .arg("--view") + .arg("summary") + .output() + .expect("run myc status"); + + assert!(output.status.success()); + let value: Value = serde_json::from_slice(&output.stdout).expect("status json"); + assert_eq!(value["status"], "unready"); + assert_eq!(value["ready"], false); + assert_eq!(value["transport"]["enabled"], false); +} + +#[test] +fn metrics_command_emits_json_and_prometheus_formats() { + let temp = tempfile::tempdir().expect("tempdir"); + let env_path = write_env_file(&temp); + let config = myc::MycConfig::load_from_env_path(&env_path).expect("load config"); + let runtime = MycRuntime::bootstrap(config).expect("runtime"); + runtime.record_operation_audit(&MycOperationAuditRecord::new( + MycOperationAuditKind::AuthReplayRestore, + MycOperationAuditOutcome::Restored, + None, + None, + 1, + 0, + "restored pending request after failed replay publish", + )); + + let json_output = Command::new(env!("CARGO_BIN_EXE_myc")) + .arg("--env-file") + .arg(&env_path) + .arg("metrics") + .arg("--format") + .arg("json") + .output() + .expect("run myc metrics json"); + assert!(json_output.status.success()); + let json_value: Value = serde_json::from_slice(&json_output.stdout).expect("metrics json"); + assert_eq!(json_value["runtime_replay_restore_count"], 1); + + let prometheus_output = Command::new(env!("CARGO_BIN_EXE_myc")) + .arg("--env-file") + .arg(&env_path) + .arg("metrics") + .arg("--format") + .arg("prometheus") + .output() + .expect("run myc metrics prometheus"); + assert!(prometheus_output.status.success()); + let rendered = String::from_utf8(prometheus_output.stdout).expect("utf8 metrics"); + assert!(rendered.contains("myc_runtime_replay_restore_total 1")); + assert!(rendered.contains("myc_signer_request_total 0")); +} diff --git a/tests/operability_e2e.rs b/tests/operability_e2e.rs @@ -0,0 +1,194 @@ +use std::path::{Path, PathBuf}; +use std::time::Duration; + +use myc::{ + MycConfig, MycRuntime, MycRuntimeStatus, MycTransportDeliveryPolicy, collect_status_full, +}; +use radroots_identity::RadrootsIdentity; +use tokio::net::TcpListener; +use tokio::sync::oneshot; +use tokio::time::sleep; + +type TestResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>; + +struct TestRelay { + url: String, + shutdown_tx: Option<oneshot::Sender<()>>, +} + +impl TestRelay { + async fn spawn() -> TestResult<Self> { + let listener = TcpListener::bind("127.0.0.1:0").await?; + let addr = listener.local_addr()?; + let url = format!("ws://{addr}"); + let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); + + tokio::spawn(async move { + loop { + tokio::select! { + _ = &mut shutdown_rx => break, + accept = listener.accept() => { + let Ok((stream, _)) = accept else { + break; + }; + tokio::spawn(async move { + let _ = tokio_tungstenite::accept_async(stream).await; + }); + } + } + } + }); + + Ok(Self { + url, + shutdown_tx: Some(shutdown_tx), + }) + } + + fn url(&self) -> &str { + self.url.as_str() + } +} + +impl Drop for TestRelay { + fn drop(&mut self) { + if let Some(shutdown_tx) = self.shutdown_tx.take() { + let _ = shutdown_tx.send(()); + } + } +} + +struct HangingRelay { + url: String, + shutdown_tx: Option<oneshot::Sender<()>>, +} + +impl HangingRelay { + async fn spawn(hold_open_for: Duration) -> TestResult<Self> { + let listener = TcpListener::bind("127.0.0.1:0").await?; + let addr = listener.local_addr()?; + let url = format!("ws://{addr}"); + let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); + + tokio::spawn(async move { + loop { + tokio::select! { + _ = &mut shutdown_rx => break, + accept = listener.accept() => { + let Ok((stream, _)) = accept else { + break; + }; + tokio::spawn(async move { + sleep(hold_open_for).await; + drop(stream); + }); + } + } + } + }); + + Ok(Self { + url, + shutdown_tx: Some(shutdown_tx), + }) + } + + fn url(&self) -> &str { + self.url.as_str() + } +} + +impl Drop for HangingRelay { + fn drop(&mut self) { + if let Some(shutdown_tx) = self.shutdown_tx.take() { + let _ = shutdown_tx.send(()); + } + } +} + +fn write_test_identity(path: &Path, secret_key: &str) { + RadrootsIdentity::from_secret_key_str(secret_key) + .expect("identity from secret") + .save_json(path) + .expect("write identity"); +} + +fn build_runtime<F>(configure: F) -> MycRuntime +where + F: FnOnce(&mut MycConfig), +{ + let temp = tempfile::tempdir().expect("tempdir").keep(); + let mut config = MycConfig::default(); + config.paths.state_dir = PathBuf::from(&temp).join("state"); + config.paths.signer_identity_path = PathBuf::from(&temp).join("signer.json"); + config.paths.user_identity_path = PathBuf::from(&temp).join("user.json"); + config.transport.connect_timeout_secs = 1; + write_test_identity( + &config.paths.signer_identity_path, + "1111111111111111111111111111111111111111111111111111111111111111", + ); + write_test_identity( + &config.paths.user_identity_path, + "2222222222222222222222222222222222222222222222222222222222222222", + ); + configure(&mut config); + MycRuntime::bootstrap(config).expect("runtime") +} + +#[tokio::test] +async fn status_is_unready_when_transport_is_disabled() -> TestResult<()> { + let runtime = build_runtime(|_| {}); + + let status = collect_status_full(&runtime).await?; + + assert_eq!(status.status, MycRuntimeStatus::Unready); + assert!(!status.ready); + assert_eq!(status.transport.status, MycRuntimeStatus::Unready); + assert!( + status + .reasons + .iter() + .any(|reason| reason == "transport is disabled") + ); + Ok(()) +} + +#[tokio::test] +async fn status_is_degraded_but_ready_when_any_policy_has_one_live_relay() -> TestResult<()> { + let relay = TestRelay::spawn().await?; + let hanging = HangingRelay::spawn(Duration::from_secs(5)).await?; + let runtime = build_runtime(|config| { + config.transport.enabled = true; + config.transport.relays = vec![relay.url().to_owned(), hanging.url().to_owned()]; + config.transport.delivery_policy = MycTransportDeliveryPolicy::Any; + }); + + let status = collect_status_full(&runtime).await?; + + assert_eq!(status.status, MycRuntimeStatus::Degraded); + assert!(status.ready); + assert_eq!(status.transport.status, MycRuntimeStatus::Degraded); + assert_eq!(status.transport.available_relay_count, 1); + assert_eq!(status.transport.unavailable_relay_count, 1); + Ok(()) +} + +#[tokio::test] +async fn status_is_unready_when_all_policy_cannot_be_satisfied() -> TestResult<()> { + let relay = TestRelay::spawn().await?; + let hanging = HangingRelay::spawn(Duration::from_secs(5)).await?; + let runtime = build_runtime(|config| { + config.transport.enabled = true; + config.transport.relays = vec![relay.url().to_owned(), hanging.url().to_owned()]; + config.transport.delivery_policy = MycTransportDeliveryPolicy::All; + }); + + let status = collect_status_full(&runtime).await?; + + assert_eq!(status.status, MycRuntimeStatus::Unready); + assert!(!status.ready); + assert_eq!(status.transport.status, MycRuntimeStatus::Unready); + assert_eq!(status.transport.available_relay_count, 1); + assert_eq!(status.transport.required_available_relays, 2); + Ok(()) +}