commit e1c1e3d527c49304d234c2c374c03b1169ca6769
parent 0a5e1cf3197960cddabfd1fe0a9831ee63778698
Author: triesap <tyson@radroots.org>
Date: Fri, 27 Mar 2026 01:51:31 +0000
operability: keep metrics live in memory
Diffstat:
5 files changed, 293 insertions(+), 122 deletions(-)
diff --git a/src/app/runtime.rs b/src/app/runtime.rs
@@ -16,7 +16,10 @@ use crate::config::{
use crate::custody::{MycActiveIdentity, MycIdentityProvider};
use crate::discovery::MycDiscoveryContext;
use crate::error::MycError;
-use crate::operability::server::run_observability_server;
+use crate::operability::{
+ MycDeliveryOutboxStatusOutput, MycLiveMetricsHandle, MycLiveMetricsState, MycMetricsSnapshot,
+ server::run_observability_server,
+};
use crate::outbox::{
MycDeliveryOutboxKind, MycDeliveryOutboxRecord, MycDeliveryOutboxStatus, MycDeliveryOutboxStore,
};
@@ -30,8 +33,8 @@ use radroots_nostr_signer::prelude::{
RadrootsNostrFileSignerStore, RadrootsNostrSignerApprovalRequirement,
RadrootsNostrSignerAuthState, RadrootsNostrSignerConnectionRecord, RadrootsNostrSignerManager,
RadrootsNostrSignerPublishWorkflowKind, RadrootsNostrSignerPublishWorkflowRecord,
- RadrootsNostrSignerPublishWorkflowState, RadrootsNostrSignerStore,
- RadrootsNostrSqliteSignerStore,
+ RadrootsNostrSignerPublishWorkflowState, RadrootsNostrSignerRequestAuditRecord,
+ RadrootsNostrSignerStore, RadrootsNostrSqliteSignerStore,
};
use serde::Serialize;
@@ -77,6 +80,7 @@ pub struct MycSignerContext {
user_identity: MycActiveIdentity,
signer_store: Arc<dyn RadrootsNostrSignerStore>,
operation_audit_store: Arc<dyn MycOperationAuditStore>,
+ live_metrics: MycLiveMetricsHandle,
policy: MycPolicyContext,
connection_approval_requirement: RadrootsNostrSignerApprovalRequirement,
}
@@ -152,6 +156,13 @@ impl MycRuntime {
self.signer.operation_audit_store()
}
+ pub(crate) fn metrics_snapshot(
+ &self,
+ outbox_status: &MycDeliveryOutboxStatusOutput,
+ ) -> MycMetricsSnapshot {
+ self.signer.metrics_snapshot(outbox_status)
+ }
+
pub fn delivery_outbox_store(&self) -> Arc<dyn MycDeliveryOutboxStore> {
self.delivery_outbox_store.clone()
}
@@ -1006,28 +1017,56 @@ impl MycSignerContext {
self.operation_audit_store.clone()
}
+ pub fn record_signer_request_audit(&self, record: &RadrootsNostrSignerRequestAuditRecord) {
+ let mut metrics = self
+ .live_metrics
+ .lock()
+ .unwrap_or_else(|poisoned| poisoned.into_inner());
+ metrics.record_signer_request_audit(record);
+ }
+
pub fn record_operation_audit(&self, record: &MycOperationAuditRecord) {
emit_operation_audit_trace(record);
- if let Err(error) = self.operation_audit_store.append(record) {
- tracing::error!(
- operation = ?record.operation,
- outcome = ?record.outcome,
- relay_url = record.relay_url.as_deref().unwrap_or(""),
- connection_id = record.connection_id.as_deref().unwrap_or(""),
- request_id = record.request_id.as_deref().unwrap_or(""),
- attempt_id = record.attempt_id.as_deref().unwrap_or(""),
- delivery_policy = ?record.delivery_policy,
- required_acknowledged_relay_count = record.required_acknowledged_relay_count.unwrap_or_default(),
- publish_attempt_count = record.publish_attempt_count.unwrap_or_default(),
- relay_count = record.relay_count,
- acknowledged_relay_count = record.acknowledged_relay_count,
- relay_outcome_summary = %record.relay_outcome_summary,
- error = %error,
- "failed to persist myc operation audit record"
- );
+ match self.operation_audit_store.append(record) {
+ Ok(()) => {
+ let mut metrics = self
+ .live_metrics
+ .lock()
+ .unwrap_or_else(|poisoned| poisoned.into_inner());
+ metrics.record_runtime_operation(record);
+ }
+ Err(error) => {
+ tracing::error!(
+ operation = ?record.operation,
+ outcome = ?record.outcome,
+ relay_url = record.relay_url.as_deref().unwrap_or(""),
+ connection_id = record.connection_id.as_deref().unwrap_or(""),
+ request_id = record.request_id.as_deref().unwrap_or(""),
+ attempt_id = record.attempt_id.as_deref().unwrap_or(""),
+ delivery_policy = ?record.delivery_policy,
+ required_acknowledged_relay_count = record.required_acknowledged_relay_count.unwrap_or_default(),
+ publish_attempt_count = record.publish_attempt_count.unwrap_or_default(),
+ relay_count = record.relay_count,
+ acknowledged_relay_count = record.acknowledged_relay_count,
+ relay_outcome_summary = %record.relay_outcome_summary,
+ error = %error,
+ "failed to persist myc operation audit record"
+ );
+ }
}
}
+ pub fn metrics_snapshot(
+ &self,
+ outbox_status: &MycDeliveryOutboxStatusOutput,
+ ) -> MycMetricsSnapshot {
+ let metrics = self
+ .live_metrics
+ .lock()
+ .unwrap_or_else(|poisoned| poisoned.into_inner());
+ metrics.snapshot(outbox_status)
+ }
+
pub fn connection_approval_requirement(&self) -> RadrootsNostrSignerApprovalRequirement {
self.connection_approval_requirement
}
@@ -1054,6 +1093,10 @@ impl MycSignerContext {
let operation_audit_store =
Self::build_operation_audit_store(persistence, &paths.audit_dir, audit_config)?;
let manager = Self::load_signer_manager_from_store(signer_store.clone())?;
+ let live_metrics = Arc::new(std::sync::Mutex::new(MycLiveMetricsState::from_records(
+ &manager.list_audit_records()?,
+ &operation_audit_store.list_all()?,
+ )));
let configured_public = signer_identity.to_public();
match manager.signer_identity()? {
@@ -1083,6 +1126,7 @@ impl MycSignerContext {
user_identity,
signer_store,
operation_audit_store,
+ live_metrics,
connection_approval_requirement: policy.default_approval_requirement(),
policy,
})
diff --git a/src/control.rs b/src/control.rs
@@ -354,6 +354,9 @@ async fn replay_authorized_request(
));
}
};
+ runtime
+ .signer_context()
+ .record_signer_request_audit(&evaluation.audit);
let handled_request = match handler
.handle_authorized_request_evaluation(pending_request.request_message.clone(), evaluation)
{
diff --git a/src/operability/mod.rs b/src/operability/mod.rs
@@ -2,12 +2,13 @@ pub mod server;
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
+use std::sync::{Arc, Mutex};
use std::time::Duration;
use radroots_nostr::prelude::{RadrootsNostrRelayStatus, RadrootsNostrRelayUrl};
use radroots_nostr_signer::prelude::{
RadrootsNostrSignerPublishWorkflowRecord, RadrootsNostrSignerPublishWorkflowState,
- RadrootsNostrSignerRequestDecision,
+ RadrootsNostrSignerRequestAuditRecord, RadrootsNostrSignerRequestDecision,
};
use radroots_sql_core::{SqlExecutor, SqliteExecutor};
use serde::{Deserialize, Serialize};
@@ -236,6 +237,24 @@ pub struct MycMetricsSnapshot {
pub delivery_outbox_critical_blocked_count: usize,
}
+#[derive(Debug, Clone, Default, PartialEq, Eq)]
+pub(crate) struct MycLiveMetricsState {
+ signer_request_total: usize,
+ signer_request_decisions: MycAuditDecisionCounts,
+ runtime_operation_total: usize,
+ runtime_operation_outcomes: MycOperationOutcomeCounts,
+ runtime_operation_by_kind: BTreeMap<String, MycOperationOutcomeCounts>,
+ runtime_aggregate_publish_rejection_count: usize,
+ runtime_repair_success_count: usize,
+ runtime_repair_rejection_count: usize,
+ runtime_unavailable_count: usize,
+ runtime_replay_restore_count: usize,
+ delivery_recovery_success_count: usize,
+ delivery_recovery_rejection_count: usize,
+}
+
+pub(crate) type MycLiveMetricsHandle = Arc<Mutex<MycLiveMetricsState>>;
+
#[derive(Debug, Clone, PartialEq, Eq)]
struct MycTransportStatusEvaluation {
output: MycTransportStatusOutput,
@@ -458,95 +477,8 @@ fn collect_persistence_status(runtime: &MycRuntime) -> MycPersistenceStatusEvalu
}
pub fn collect_metrics(runtime: &MycRuntime) -> Result<MycMetricsSnapshot, MycError> {
- let manager = runtime.signer_manager()?;
- let signer_request_audit = manager.list_audit_records()?;
- let runtime_operation_audit = runtime.operation_audit_store().list_all()?;
let outbox_status = collect_delivery_outbox_status(runtime)?;
-
- let mut signer_request_decisions = MycAuditDecisionCounts::default();
- for record in &signer_request_audit {
- match record.decision {
- RadrootsNostrSignerRequestDecision::Allowed => signer_request_decisions.allowed += 1,
- RadrootsNostrSignerRequestDecision::Denied => signer_request_decisions.denied += 1,
- RadrootsNostrSignerRequestDecision::Challenged => {
- signer_request_decisions.challenged += 1;
- }
- }
- }
-
- let mut runtime_operation_outcomes = MycOperationOutcomeCounts::default();
- let mut runtime_operation_by_kind = BTreeMap::new();
- let mut runtime_aggregate_publish_rejection_count = 0;
- let mut runtime_repair_success_count = 0;
- let mut runtime_repair_rejection_count = 0;
- let mut runtime_unavailable_count = 0;
- let mut runtime_replay_restore_count = 0;
- let mut delivery_recovery_success_count = 0;
- let mut delivery_recovery_rejection_count = 0;
- for record in &runtime_operation_audit {
- increment_outcome_counts(&mut runtime_operation_outcomes, record.outcome);
- increment_outcome_counts(
- runtime_operation_by_kind
- .entry(operation_kind_label(record.operation))
- .or_default(),
- record.outcome,
- );
- if is_aggregate_publish_operation(record.operation)
- && record.outcome == MycOperationAuditOutcome::Rejected
- {
- runtime_aggregate_publish_rejection_count += 1;
- }
- if record.operation == MycOperationAuditKind::DiscoveryHandlerRepair {
- match record.outcome {
- MycOperationAuditOutcome::Succeeded => runtime_repair_success_count += 1,
- MycOperationAuditOutcome::Rejected => runtime_repair_rejection_count += 1,
- _ => {}
- }
- }
- if record.outcome == MycOperationAuditOutcome::Unavailable {
- runtime_unavailable_count += 1;
- }
- if record.operation == MycOperationAuditKind::AuthReplayRestore
- && record.outcome == MycOperationAuditOutcome::Restored
- {
- runtime_replay_restore_count += 1;
- }
- if record.operation == MycOperationAuditKind::DeliveryRecovery {
- match record.outcome {
- MycOperationAuditOutcome::Succeeded => delivery_recovery_success_count += 1,
- MycOperationAuditOutcome::Rejected => delivery_recovery_rejection_count += 1,
- _ => {}
- }
- }
- }
-
- Ok(MycMetricsSnapshot {
- signer_request_total: signer_request_audit.len(),
- signer_request_decisions,
- runtime_operation_total: runtime_operation_audit.len(),
- runtime_operation_outcomes,
- runtime_operation_by_kind,
- runtime_aggregate_publish_rejection_count,
- runtime_repair_success_count,
- runtime_repair_rejection_count,
- runtime_unavailable_count,
- runtime_replay_restore_count,
- delivery_recovery_success_count,
- delivery_recovery_rejection_count,
- delivery_outbox_total: outbox_status.output.total_job_count,
- delivery_outbox_queued_count: outbox_status.output.queued_job_count,
- delivery_outbox_published_pending_finalize_count: outbox_status
- .output
- .published_pending_finalize_job_count,
- delivery_outbox_failed_count: outbox_status.output.failed_job_count,
- delivery_outbox_finalized_count: outbox_status.output.finalized_job_count,
- delivery_outbox_unfinished_count: outbox_status.output.unfinished_job_count,
- delivery_outbox_critical_unfinished_count: outbox_status
- .output
- .critical_unfinished_job_count,
- delivery_outbox_blocked_count: outbox_status.output.blocked_job_count,
- delivery_outbox_critical_blocked_count: outbox_status.output.critical_blocked_job_count,
- })
+ Ok(runtime.metrics_snapshot(&outbox_status.output))
}
pub fn render_metrics_text(snapshot: &MycMetricsSnapshot) -> String {
@@ -681,6 +613,114 @@ pub fn render_metrics_text(snapshot: &MycMetricsSnapshot) -> String {
lines.join("\n")
}
+impl MycLiveMetricsState {
+ pub(crate) fn from_records(
+ signer_request_audit: &[RadrootsNostrSignerRequestAuditRecord],
+ runtime_operation_audit: &[crate::audit::MycOperationAuditRecord],
+ ) -> Self {
+ let mut state = Self::default();
+ for record in signer_request_audit {
+ state.record_signer_request_audit(record);
+ }
+ for record in runtime_operation_audit {
+ state.record_runtime_operation(record);
+ }
+ state
+ }
+
+ pub(crate) fn record_signer_request_audit(
+ &mut self,
+ record: &RadrootsNostrSignerRequestAuditRecord,
+ ) {
+ self.signer_request_total += 1;
+ match record.decision {
+ RadrootsNostrSignerRequestDecision::Allowed => {
+ self.signer_request_decisions.allowed += 1;
+ }
+ RadrootsNostrSignerRequestDecision::Denied => {
+ self.signer_request_decisions.denied += 1;
+ }
+ RadrootsNostrSignerRequestDecision::Challenged => {
+ self.signer_request_decisions.challenged += 1;
+ }
+ }
+ }
+
+ pub(crate) fn record_runtime_operation(
+ &mut self,
+ record: &crate::audit::MycOperationAuditRecord,
+ ) {
+ self.runtime_operation_total += 1;
+ increment_outcome_counts(&mut self.runtime_operation_outcomes, record.outcome);
+ increment_outcome_counts(
+ self.runtime_operation_by_kind
+ .entry(operation_kind_label(record.operation))
+ .or_default(),
+ record.outcome,
+ );
+ if is_aggregate_publish_operation(record.operation)
+ && record.outcome == MycOperationAuditOutcome::Rejected
+ {
+ self.runtime_aggregate_publish_rejection_count += 1;
+ }
+ if record.operation == MycOperationAuditKind::DiscoveryHandlerRepair {
+ match record.outcome {
+ MycOperationAuditOutcome::Succeeded => self.runtime_repair_success_count += 1,
+ MycOperationAuditOutcome::Rejected => self.runtime_repair_rejection_count += 1,
+ _ => {}
+ }
+ }
+ if record.outcome == MycOperationAuditOutcome::Unavailable {
+ self.runtime_unavailable_count += 1;
+ }
+ if record.operation == MycOperationAuditKind::AuthReplayRestore
+ && record.outcome == MycOperationAuditOutcome::Restored
+ {
+ self.runtime_replay_restore_count += 1;
+ }
+ if record.operation == MycOperationAuditKind::DeliveryRecovery {
+ match record.outcome {
+ MycOperationAuditOutcome::Succeeded => self.delivery_recovery_success_count += 1,
+ MycOperationAuditOutcome::Rejected => {
+ self.delivery_recovery_rejection_count += 1;
+ }
+ _ => {}
+ }
+ }
+ }
+
+ pub(crate) fn snapshot(
+ &self,
+ outbox_status: &MycDeliveryOutboxStatusOutput,
+ ) -> MycMetricsSnapshot {
+ MycMetricsSnapshot {
+ signer_request_total: self.signer_request_total,
+ signer_request_decisions: self.signer_request_decisions.clone(),
+ runtime_operation_total: self.runtime_operation_total,
+ runtime_operation_outcomes: self.runtime_operation_outcomes.clone(),
+ runtime_operation_by_kind: self.runtime_operation_by_kind.clone(),
+ runtime_aggregate_publish_rejection_count: self
+ .runtime_aggregate_publish_rejection_count,
+ runtime_repair_success_count: self.runtime_repair_success_count,
+ runtime_repair_rejection_count: self.runtime_repair_rejection_count,
+ runtime_unavailable_count: self.runtime_unavailable_count,
+ runtime_replay_restore_count: self.runtime_replay_restore_count,
+ delivery_recovery_success_count: self.delivery_recovery_success_count,
+ delivery_recovery_rejection_count: self.delivery_recovery_rejection_count,
+ delivery_outbox_total: outbox_status.total_job_count,
+ delivery_outbox_queued_count: outbox_status.queued_job_count,
+ delivery_outbox_published_pending_finalize_count: outbox_status
+ .published_pending_finalize_job_count,
+ delivery_outbox_failed_count: outbox_status.failed_job_count,
+ delivery_outbox_finalized_count: outbox_status.finalized_job_count,
+ delivery_outbox_unfinished_count: outbox_status.unfinished_job_count,
+ delivery_outbox_critical_unfinished_count: outbox_status.critical_unfinished_job_count,
+ delivery_outbox_blocked_count: outbox_status.blocked_job_count,
+ delivery_outbox_critical_blocked_count: outbox_status.critical_blocked_job_count,
+ }
+ }
+}
+
pub fn increment_outcome_counts(
counts: &mut MycOperationOutcomeCounts,
outcome: MycOperationAuditOutcome,
@@ -1505,14 +1545,30 @@ fn push_labeled_counter_pair(
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
+ use std::path::Path;
use std::path::PathBuf;
+ use nostr::PublicKey;
+ use radroots_identity::RadrootsIdentity;
+ use radroots_nostr_signer::prelude::{
+ RadrootsNostrSignerApprovalRequirement, RadrootsNostrSignerConnectionDraft,
+ RadrootsNostrSignerRequestDecision,
+ };
+
use super::{
- MycMetricsSnapshot, MycOperationOutcomeCounts, MycRuntimeStatus,
+ MycMetricsSnapshot, MycOperationOutcomeCounts, MycRuntimeStatus, collect_metrics,
inspect_runtime_audit_sqlite_schema, render_metrics_text, worse_runtime_status,
};
- use crate::app::MycRuntimePaths;
- use crate::config::MycRuntimeAuditBackend;
+ use crate::app::{MycRuntime, MycRuntimePaths};
+ use crate::audit::{MycOperationAuditKind, MycOperationAuditOutcome, MycOperationAuditRecord};
+ use crate::config::{MycConfig, MycRuntimeAuditBackend};
+
+ fn write_test_identity(path: &Path, secret_key: &str) {
+ RadrootsIdentity::from_secret_key_str(secret_key)
+ .expect("identity from secret")
+ .save_json(path)
+ .expect("write identity");
+ }
#[test]
fn runtime_status_prefers_the_worst_state() {
@@ -1597,4 +1653,67 @@ mod tests {
Some("sqlite persistence file is missing")
);
}
+
+ #[test]
+ fn collect_metrics_uses_live_state_after_bootstrap() {
+ let temp = tempfile::tempdir().expect("tempdir");
+ let mut config = MycConfig::default();
+ config.paths.state_dir = temp.path().join("state");
+ config.paths.signer_identity_path = temp.path().join("signer.json");
+ config.paths.user_identity_path = temp.path().join("user.json");
+ write_test_identity(
+ &config.paths.signer_identity_path,
+ "1111111111111111111111111111111111111111111111111111111111111111",
+ );
+ write_test_identity(
+ &config.paths.user_identity_path,
+ "2222222222222222222222222222222222222222222222222222222222222222",
+ );
+
+ let runtime = MycRuntime::bootstrap(config.clone()).expect("runtime");
+ let manager = runtime.signer_manager().expect("manager");
+ let client_public_key =
+ PublicKey::parse("7777777777777777777777777777777777777777777777777777777777777777")
+ .expect("client public key");
+ let connection = manager
+ .register_connection(
+ RadrootsNostrSignerConnectionDraft::new(
+ client_public_key,
+ runtime.user_public_identity(),
+ )
+ .with_approval_requirement(RadrootsNostrSignerApprovalRequirement::NotRequired),
+ )
+ .expect("register connection");
+ manager
+ .record_request(
+ &connection.connection_id,
+ "req-live-metrics",
+ radroots_nostr_connect::prelude::RadrootsNostrConnectMethod::Ping,
+ RadrootsNostrSignerRequestDecision::Allowed,
+ None,
+ )
+ .expect("record request");
+ runtime.record_operation_audit(&MycOperationAuditRecord::new(
+ MycOperationAuditKind::DeliveryRecovery,
+ MycOperationAuditOutcome::Succeeded,
+ None,
+ None,
+ 1,
+ 1,
+ "startup recovery succeeded",
+ ));
+ drop(runtime);
+
+ let runtime = MycRuntime::bootstrap(config).expect("runtime restart");
+ std::fs::remove_file(&runtime.paths().signer_state_path).expect("remove signer state");
+ std::fs::remove_file(&runtime.paths().runtime_audit_path).expect("remove runtime audit");
+
+ let metrics = collect_metrics(&runtime).expect("collect metrics");
+
+ assert_eq!(metrics.signer_request_total, 1);
+ assert_eq!(metrics.signer_request_decisions.allowed, 1);
+ assert_eq!(metrics.runtime_operation_total, 1);
+ assert_eq!(metrics.runtime_operation_outcomes.succeeded, 1);
+ assert_eq!(metrics.delivery_recovery_success_count, 1);
+ }
}
diff --git a/src/policy.rs b/src/policy.rs
@@ -9,7 +9,8 @@ use radroots_nostr_connect::prelude::{
};
use radroots_nostr_signer::prelude::{
RadrootsNostrSignerApprovalRequirement, RadrootsNostrSignerConnectionRecord,
- RadrootsNostrSignerManager, RadrootsNostrSignerRequestDecision,
+ RadrootsNostrSignerManager, RadrootsNostrSignerRequestAuditRecord,
+ RadrootsNostrSignerRequestDecision,
};
use crate::config::{MycConnectionApproval, MycPolicyConfig};
@@ -251,16 +252,15 @@ impl MycPolicyContext {
connection: &RadrootsNostrSignerConnectionRecord,
request_message: &RadrootsNostrConnectRequestMessage,
reason: impl Into<String>,
- ) -> Result<String, MycError> {
+ ) -> Result<RadrootsNostrSignerRequestAuditRecord, MycError> {
let reason = reason.into();
- manager.record_request(
+ Ok(manager.record_request(
&connection.connection_id,
&request_message.id,
request_message.request.method(),
RadrootsNostrSignerRequestDecision::Denied,
Some(reason.clone()),
- )?;
- Ok(reason)
+ )?)
}
fn client_is_denied(&self, client_public_key: &PublicKey) -> bool {
diff --git a/src/transport/nip46.rs b/src/transport/nip46.rs
@@ -493,18 +493,23 @@ impl MycNip46Handler {
.policy()
.prepare_request(&manager, connection, &request_message)?
{
- let reason = self.signer.policy().record_policy_denied_request(
+ let audit = self.signer.policy().record_policy_denied_request(
&manager,
connection,
&request_message,
reason,
)?;
- return Ok(MycPreparedRequestEvaluation::Denied(reason));
+ self.signer.record_signer_request_audit(&audit);
+ return Ok(MycPreparedRequestEvaluation::Denied(
+ audit
+ .message
+ .unwrap_or_else(|| "request denied by policy".to_owned()),
+ ));
}
- Ok(MycPreparedRequestEvaluation::Evaluation(
- manager.evaluate_request(&connection.connection_id, request_message)?,
- ))
+ let evaluation = manager.evaluate_request(&connection.connection_id, request_message)?;
+ self.signer.record_signer_request_audit(&evaluation.audit);
+ Ok(MycPreparedRequestEvaluation::Evaluation(evaluation))
}
fn lookup_connection(