myc

Self-custodial remote signer for Radroots apps
git clone https://radroots.dev/git/myc.git
Log | Files | Refs | README | LICENSE

commit a6fd87ed9d8302987add90a3f5d8d016756b163e
parent 09c2231df3606699d171cd9237834b4866660b6f
Author: triesap <tyson@radroots.org>
Date:   Sun, 22 Mar 2026 16:29:49 +0000

audit: correlate discovery repair attempts

Diffstat:
Msrc/app/runtime.rs | 3+++
Msrc/audit.rs | 154+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------
Msrc/cli.rs | 218++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
Msrc/discovery.rs | 241+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------------
Mtests/discovery_cli.rs | 180+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mtests/nip46_e2e.rs | 9+++++++--
6 files changed, 719 insertions(+), 86 deletions(-)

diff --git a/src/app/runtime.rs b/src/app/runtime.rs @@ -241,6 +241,7 @@ impl MycSignerContext { relay_url = record.relay_url.as_deref().unwrap_or(""), connection_id = record.connection_id.as_deref().unwrap_or(""), request_id = record.request_id.as_deref().unwrap_or(""), + attempt_id = record.attempt_id.as_deref().unwrap_or(""), relay_count = record.relay_count, acknowledged_relay_count = record.acknowledged_relay_count, relay_outcome_summary = %record.relay_outcome_summary, @@ -305,6 +306,7 @@ fn emit_operation_audit_trace(record: &MycOperationAuditRecord) { relay_url = record.relay_url.as_deref().unwrap_or(""), connection_id = record.connection_id.as_deref().unwrap_or(""), request_id = record.request_id.as_deref().unwrap_or(""), + attempt_id = record.attempt_id.as_deref().unwrap_or(""), relay_count = record.relay_count, acknowledged_relay_count = record.acknowledged_relay_count, relay_outcome_summary = %record.relay_outcome_summary, @@ -320,6 +322,7 @@ fn emit_operation_audit_trace(record: &MycOperationAuditRecord) { relay_url = record.relay_url.as_deref().unwrap_or(""), connection_id = record.connection_id.as_deref().unwrap_or(""), request_id = record.request_id.as_deref().unwrap_or(""), + attempt_id = record.attempt_id.as_deref().unwrap_or(""), relay_count = record.relay_count, acknowledged_relay_count = record.acknowledged_relay_count, relay_outcome_summary = %record.relay_outcome_summary, diff --git a/src/audit.rs b/src/audit.rs @@ -52,6 +52,8 @@ pub struct MycOperationAuditRecord { pub connection_id: Option<String>, #[serde(default, skip_serializing_if = "Option::is_none")] pub request_id: Option<String>, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub attempt_id: Option<String>, pub relay_count: usize, pub acknowledged_relay_count: usize, pub relay_outcome_summary: String, @@ -80,6 +82,7 @@ impl MycOperationAuditRecord { relay_url: None, connection_id: connection_id.map(ToString::to_string), request_id: request_id.map(ToOwned::to_owned), + attempt_id: None, relay_count, acknowledged_relay_count, relay_outcome_summary: relay_outcome_summary.into(), @@ -90,6 +93,11 @@ impl MycOperationAuditRecord { self.relay_url = Some(relay_url.into()); self } + + pub fn with_attempt_id(mut self, attempt_id: impl Into<String>) -> Self { + self.attempt_id = Some(attempt_id.into()); + self + } } impl MycOperationAuditStore { @@ -161,6 +169,43 @@ impl MycOperationAuditStore { }) } + pub fn list_for_attempt_id( + &self, + attempt_id: &str, + ) -> Result<Vec<MycOperationAuditRecord>, MycError> { + self.list_for_attempt_id_with_limit(attempt_id, usize::MAX) + } + + pub fn list_for_attempt_id_with_limit( + &self, + attempt_id: &str, + limit: usize, + ) -> Result<Vec<MycOperationAuditRecord>, MycError> { + self.list_matching(limit, |record| { + record.attempt_id.as_deref() == Some(attempt_id) + }) + } + + pub fn latest_attempt_id_for_operation( + &self, + operation: MycOperationAuditKind, + ) -> Result<Option<String>, MycError> { + for path in self.read_paths_newest_first()? { + let mut file_records = self.read_records_from_path(&path)?; + file_records.reverse(); + + for record in file_records { + if record.operation == operation { + if let Some(attempt_id) = record.attempt_id { + return Ok(Some(attempt_id)); + } + } + } + } + + Ok(None) + } + fn list_matching<F>( &self, limit: usize, @@ -391,15 +436,18 @@ mod tests { RadrootsNostrSignerConnectionId::parse("connection-1").expect("connection id"); store - .append(&MycOperationAuditRecord::new( - MycOperationAuditKind::ConnectAcceptPublish, - MycOperationAuditOutcome::Rejected, - Some(&connection_id), - Some("request-1"), - 2, - 0, - "0/2 relays acknowledged publish; failures: relay-a: rejected", - )) + .append( + &MycOperationAuditRecord::new( + MycOperationAuditKind::ConnectAcceptPublish, + MycOperationAuditOutcome::Rejected, + Some(&connection_id), + Some("request-1"), + 2, + 0, + "0/2 relays acknowledged publish; failures: relay-a: rejected", + ) + .with_attempt_id("attempt-1"), + ) .expect("append rejected record"); store .append(&MycOperationAuditRecord::new( @@ -422,6 +470,7 @@ mod tests { assert_eq!(records[0].outcome, MycOperationAuditOutcome::Rejected); assert_eq!(records[0].connection_id.as_deref(), Some("connection-1")); assert_eq!(records[0].request_id.as_deref(), Some("request-1")); + assert_eq!(records[0].attempt_id.as_deref(), Some("attempt-1")); assert_eq!(records[0].relay_count, 2); assert_eq!(records[0].acknowledged_relay_count, 0); @@ -453,15 +502,18 @@ mod tests { for index in 0..6 { store - .append(&MycOperationAuditRecord::new( - MycOperationAuditKind::ListenerResponsePublish, - MycOperationAuditOutcome::Rejected, - None, - Some(&format!("request-{index}")), - 1, - 0, - format!("failure-{index}"), - )) + .append( + &MycOperationAuditRecord::new( + MycOperationAuditKind::ListenerResponsePublish, + MycOperationAuditOutcome::Rejected, + None, + Some(&format!("request-{index}")), + 1, + 0, + format!("failure-{index}"), + ) + .with_attempt_id(format!("attempt-{index}")), + ) .expect("append record"); } @@ -473,4 +525,70 @@ mod tests { assert!(temp.path().join("operations.2.jsonl").exists()); assert!(!temp.path().join("operations.3.jsonl").exists()); } + + #[test] + fn list_for_attempt_and_latest_attempt_id_work() { + let temp = tempfile::tempdir().expect("tempdir"); + let store = MycOperationAuditStore::new(temp.path(), config()); + + store + .append( + &MycOperationAuditRecord::new( + MycOperationAuditKind::DiscoveryHandlerRefresh, + MycOperationAuditOutcome::Rejected, + None, + None, + 2, + 0, + "first attempt rejected", + ) + .with_attempt_id("attempt-1"), + ) + .expect("append first attempt"); + store + .append( + &MycOperationAuditRecord::new( + MycOperationAuditKind::DiscoveryHandlerRepair, + MycOperationAuditOutcome::Rejected, + None, + None, + 1, + 0, + "relay-a rejected", + ) + .with_attempt_id("attempt-1") + .with_relay_url("wss://relay-a.example.com"), + ) + .expect("append first repair"); + store + .append( + &MycOperationAuditRecord::new( + MycOperationAuditKind::DiscoveryHandlerRefresh, + MycOperationAuditOutcome::Succeeded, + None, + None, + 1, + 1, + "second attempt succeeded", + ) + .with_attempt_id("attempt-2"), + ) + .expect("append second attempt"); + + let attempt_records = store + .list_for_attempt_id("attempt-1") + .expect("list attempt records"); + assert_eq!(attempt_records.len(), 2); + assert!( + attempt_records + .iter() + .all(|record| record.attempt_id.as_deref() == Some("attempt-1")) + ); + assert_eq!( + store + .latest_attempt_id_for_operation(MycOperationAuditKind::DiscoveryHandlerRefresh) + .expect("latest attempt"), + Some("attempt-2".to_owned()) + ); + } } diff --git a/src/cli.rs b/src/cli.rs @@ -14,8 +14,8 @@ use crate::audit::{MycOperationAuditKind, MycOperationAuditOutcome, MycOperation use crate::config::{DEFAULT_CONFIG_PATH, MycConfig}; use crate::control::{accept_client_uri, authorize_auth_challenge, parse_permission_values}; use crate::discovery::{ - MycDiscoveryContext, diff_live_nip89, fetch_live_nip89, publish_nip89_event, refresh_nip89, - verify_bundle, + MycDiscoveryContext, MycDiscoveryRepairSummary, diff_live_nip89, fetch_live_nip89, + publish_nip89_event, refresh_nip89, verify_bundle, }; use crate::error::MycError; use crate::logging; @@ -68,6 +68,8 @@ pub enum MycAuditCommand { List { #[arg(long)] connection_id: Option<String>, + #[arg(long)] + attempt_id: Option<String>, #[arg(long, value_enum, default_value_t = MycAuditScope::All)] scope: MycAuditScope, #[arg(long)] @@ -76,11 +78,23 @@ pub enum MycAuditCommand { Summary { #[arg(long)] connection_id: Option<String>, + #[arg(long)] + attempt_id: Option<String>, #[arg(long, value_enum, default_value_t = MycAuditScope::All)] scope: MycAuditScope, #[arg(long)] limit: Option<usize>, }, + LatestDiscoveryRepair { + #[arg(long, value_enum, default_value_t = MycDiscoveryRepairAttemptView::Summary)] + view: MycDiscoveryRepairAttemptView, + }, + DiscoveryRepairAttempt { + #[arg(long)] + attempt_id: String, + #[arg(long, value_enum, default_value_t = MycDiscoveryRepairAttemptView::Summary)] + view: MycDiscoveryRepairAttemptView, + }, } #[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)] @@ -90,6 +104,12 @@ pub enum MycAuditScope { Operation, } +#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)] +pub enum MycDiscoveryRepairAttemptView { + Summary, + Records, +} + #[derive(Debug, Subcommand)] pub enum MycAuthCommand { Require { @@ -195,6 +215,42 @@ pub struct MycAuditSummaryOutput { pub runtime_replay_restore_count: usize, } +#[derive(Debug, Serialize, PartialEq, Eq)] +pub struct MycDiscoveryRepairAttemptRecordsOutput { + pub attempt_id: String, + pub runtime_operation_audit: Vec<MycOperationAuditRecord>, +} + +#[derive(Debug, Serialize, PartialEq, Eq)] +pub struct MycDiscoveryRepairAttemptSummaryOutput { + pub attempt_id: String, + pub record_count: usize, + pub started_at_unix: u64, + pub finished_at_unix: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub compare_outcome: Option<MycOperationAuditOutcome>, + #[serde(skip_serializing_if = "Option::is_none")] + pub refresh_outcome: Option<MycOperationAuditOutcome>, + #[serde(skip_serializing_if = "Option::is_none")] + pub aggregate_publish_outcome: Option<MycOperationAuditOutcome>, + #[serde(skip_serializing_if = "Option::is_none")] + pub aggregate_publish_relay_count: Option<usize>, + #[serde(skip_serializing_if = "Option::is_none")] + pub aggregate_publish_acknowledged_relay_count: Option<usize>, + #[serde(skip_serializing_if = "Option::is_none")] + pub aggregate_publish_relay_outcome_summary: Option<String>, + pub repair_summary: MycDiscoveryRepairSummary, + pub failed_relays: Vec<String>, + pub remaining_repair_relays: Vec<String>, +} + +#[derive(Debug, Serialize, PartialEq, Eq)] +#[serde(untagged)] +pub enum MycDiscoveryRepairAttemptOutput { + Summary(MycDiscoveryRepairAttemptSummaryOutput), + Records(MycDiscoveryRepairAttemptRecordsOutput), +} + pub async fn run_from_env() -> Result<(), MycError> { let cli = MycCli::parse(); let config = load_config(cli.config.as_deref())?; @@ -243,6 +299,7 @@ pub async fn run_from_env() -> Result<(), MycError> { match command { MycAuditCommand::List { connection_id, + attempt_id, scope, limit, } => { @@ -250,6 +307,7 @@ pub async fn run_from_env() -> Result<(), MycError> { &runtime, &manager, connection_id.as_deref(), + attempt_id.as_deref(), scope, limit, )?; @@ -257,6 +315,7 @@ pub async fn run_from_env() -> Result<(), MycError> { } MycAuditCommand::Summary { connection_id, + attempt_id, scope, limit, } => { @@ -264,11 +323,21 @@ pub async fn run_from_env() -> Result<(), MycError> { &runtime, &manager, connection_id.as_deref(), + attempt_id.as_deref(), scope, limit, )?; print_json(&output) } + MycAuditCommand::LatestDiscoveryRepair { view } => { + let output = load_latest_discovery_repair_attempt_output(&runtime, view)?; + print_json(&output) + } + MycAuditCommand::DiscoveryRepairAttempt { attempt_id, view } => { + let output = + load_discovery_repair_attempt_output(&runtime, attempt_id.as_str(), view)?; + print_json(&output) + } } } MycCommand::Auth { command } => { @@ -391,9 +460,21 @@ fn load_audit_output( runtime: &MycRuntime, manager: &radroots_nostr_signer::prelude::RadrootsNostrSignerManager, connection_id: Option<&str>, + attempt_id: Option<&str>, scope: MycAuditScope, limit: Option<usize>, ) -> Result<MycAuditListOutput, MycError> { + if connection_id.is_some() && attempt_id.is_some() { + return Err(MycError::InvalidOperation( + "audit commands cannot filter by both connection_id and attempt_id".to_owned(), + )); + } + if attempt_id.is_some() && scope == MycAuditScope::Request { + return Err(MycError::InvalidOperation( + "audit attempt lookup only supports operation or all scope".to_owned(), + )); + } + let limit = audit_read_limit(runtime, limit); let connection_id = connection_id.map(parse_connection_id).transpose()?; let signer_request_audit = match (scope, connection_id.as_ref()) { @@ -417,12 +498,15 @@ fn load_audit_output( .rev() .collect(), }; - let runtime_operation_audit = match (scope, connection_id.as_ref()) { - (MycAuditScope::Request, _) => Vec::new(), - (_, Some(connection_id)) => runtime + let runtime_operation_audit = match (scope, connection_id.as_ref(), attempt_id) { + (MycAuditScope::Request, _, _) => Vec::new(), + (_, Some(connection_id), _) => runtime .operation_audit_store() .list_for_connection_with_limit(connection_id, limit)?, - (_, None) => runtime.operation_audit_store().list_with_limit(limit)?, + (_, None, Some(attempt_id)) => runtime + .operation_audit_store() + .list_for_attempt_id_with_limit(attempt_id, limit)?, + (_, None, None) => runtime.operation_audit_store().list_with_limit(limit)?, }; Ok(MycAuditListOutput { @@ -435,11 +519,19 @@ fn summarize_audit_output( runtime: &MycRuntime, manager: &radroots_nostr_signer::prelude::RadrootsNostrSignerManager, connection_id: Option<&str>, + attempt_id: Option<&str>, scope: MycAuditScope, limit: Option<usize>, ) -> Result<MycAuditSummaryOutput, MycError> { let record_limit = audit_read_limit(runtime, limit); - let audit = load_audit_output(runtime, manager, connection_id, scope, Some(record_limit))?; + let audit = load_audit_output( + runtime, + manager, + connection_id, + attempt_id, + scope, + Some(record_limit), + )?; let mut signer_request_decisions = MycAuditDecisionCounts::default(); for record in &audit.signer_request_audit { match record.decision { @@ -506,6 +598,46 @@ fn summarize_audit_output( }) } +fn load_latest_discovery_repair_attempt_output( + runtime: &MycRuntime, + view: MycDiscoveryRepairAttemptView, +) -> Result<MycDiscoveryRepairAttemptOutput, MycError> { + let attempt_id = runtime + .operation_audit_store() + .latest_attempt_id_for_operation(MycOperationAuditKind::DiscoveryHandlerRefresh)? + .ok_or_else(|| { + MycError::InvalidOperation("no discovery repair attempts have been recorded".to_owned()) + })?; + load_discovery_repair_attempt_output(runtime, attempt_id.as_str(), view) +} + +fn load_discovery_repair_attempt_output( + runtime: &MycRuntime, + attempt_id: &str, + view: MycDiscoveryRepairAttemptView, +) -> Result<MycDiscoveryRepairAttemptOutput, MycError> { + let records = runtime + .operation_audit_store() + .list_for_attempt_id(attempt_id)?; + if records.is_empty() { + return Err(MycError::InvalidOperation(format!( + "discovery repair attempt `{attempt_id}` was not found" + ))); + } + + match view { + MycDiscoveryRepairAttemptView::Summary => Ok(MycDiscoveryRepairAttemptOutput::Summary( + MycDiscoveryRepairAttemptSummaryOutput::from_records(attempt_id, &records)?, + )), + MycDiscoveryRepairAttemptView::Records => Ok(MycDiscoveryRepairAttemptOutput::Records( + MycDiscoveryRepairAttemptRecordsOutput { + attempt_id: attempt_id.to_owned(), + runtime_operation_audit: records, + }, + )), + } +} + fn audit_read_limit(runtime: &MycRuntime, limit: Option<usize>) -> usize { limit.unwrap_or(runtime.operation_audit_store().config().default_read_limit) } @@ -551,6 +683,75 @@ fn is_aggregate_publish_operation(kind: MycOperationAuditKind) -> bool { ) } +impl MycDiscoveryRepairAttemptSummaryOutput { + fn from_records( + attempt_id: &str, + records: &[MycOperationAuditRecord], + ) -> Result<Self, MycError> { + let Some(first_record) = records.first() else { + return Err(MycError::InvalidOperation(format!( + "discovery repair attempt `{attempt_id}` had no records" + ))); + }; + let finished_at_unix = records + .last() + .map(|record| record.recorded_at_unix) + .unwrap_or(first_record.recorded_at_unix); + let compare_outcome = records.iter().find_map(|record| { + (record.operation == MycOperationAuditKind::DiscoveryHandlerCompare) + .then_some(record.outcome) + }); + let refresh_outcome = records.iter().rev().find_map(|record| { + (record.operation == MycOperationAuditKind::DiscoveryHandlerRefresh) + .then_some(record.outcome) + }); + let publish_record = records + .iter() + .rev() + .find(|record| record.operation == MycOperationAuditKind::DiscoveryHandlerPublish); + + let mut repair_summary = MycDiscoveryRepairSummary::default(); + let mut failed_relays = Vec::new(); + for record in records + .iter() + .filter(|record| record.operation == MycOperationAuditKind::DiscoveryHandlerRepair) + { + match record.outcome { + MycOperationAuditOutcome::Succeeded => repair_summary.repaired += 1, + MycOperationAuditOutcome::Rejected => { + repair_summary.failed += 1; + if let Some(relay_url) = record.relay_url.clone() { + failed_relays.push(relay_url); + } + } + MycOperationAuditOutcome::Matched => repair_summary.unchanged += 1, + MycOperationAuditOutcome::Skipped => repair_summary.skipped += 1, + _ => {} + } + } + failed_relays.sort(); + failed_relays.dedup(); + + Ok(Self { + attempt_id: attempt_id.to_owned(), + record_count: records.len(), + started_at_unix: first_record.recorded_at_unix, + finished_at_unix, + compare_outcome, + refresh_outcome, + aggregate_publish_outcome: publish_record.map(|record| record.outcome), + aggregate_publish_relay_count: publish_record.map(|record| record.relay_count), + aggregate_publish_acknowledged_relay_count: publish_record + .map(|record| record.acknowledged_relay_count), + aggregate_publish_relay_outcome_summary: publish_record + .map(|record| record.relay_outcome_summary.clone()), + repair_summary, + failed_relays: failed_relays.clone(), + remaining_repair_relays: failed_relays, + }) + } +} + fn print_json<T>(value: &T) -> Result<(), MycError> where T: Serialize, @@ -633,6 +834,7 @@ mod tests { &runtime, &manager, Some(connection.connection_id.as_str()), + None, MycAuditScope::All, None, ) @@ -720,6 +922,7 @@ mod tests { &runtime, &manager, Some(connection.connection_id.as_str()), + None, MycAuditScope::All, None, ) @@ -815,6 +1018,7 @@ mod tests { &runtime, &manager, Some(connection.connection_id.as_str()), + None, MycAuditScope::Operation, Some(10), ) diff --git a/src/discovery.rs b/src/discovery.rs @@ -11,6 +11,7 @@ use radroots_nostr::prelude::{ radroots_nostr_filter_tag, radroots_nostr_metadata_has_fields, radroots_nostr_tag_first_value, }; use radroots_nostr_connect::prelude::{RadrootsNostrConnectBunkerUri, RadrootsNostrConnectUri}; +use radroots_nostr_signer::prelude::RadrootsNostrSignerRequestId; use serde::{Deserialize, Serialize}; use tokio::task::JoinSet; @@ -202,6 +203,7 @@ pub struct MycDiscoveryDiffOutput { #[derive(Debug, Clone, PartialEq, Eq, Serialize)] pub struct MycRefreshedNip89Output { + pub attempt_id: String, pub status: MycDiscoveryLiveStatus, pub force: bool, pub differing_fields: Vec<String>, @@ -496,13 +498,14 @@ pub async fn publish_nip89_event( runtime: &MycRuntime, ) -> Result<MycPublishedNip89Output, MycError> { let context = MycDiscoveryContext::from_runtime(runtime)?; - publish_nip89_event_to_relays(runtime, &context, context.publish_relays()).await + publish_nip89_event_to_relays(runtime, &context, context.publish_relays(), None).await } async fn publish_nip89_event_to_relays( runtime: &MycRuntime, context: &MycDiscoveryContext, relays: &[RadrootsNostrRelayUrl], + attempt_id: Option<&str>, ) -> Result<MycPublishedNip89Output, MycError> { let event = context.build_signed_handler_event()?; let event_id = event.id.to_hex(); @@ -516,7 +519,7 @@ async fn publish_nip89_event_to_relays( { Ok(outcome) => outcome, Err(error) => { - runtime.record_operation_audit(&MycOperationAuditRecord::new( + let mut record = MycOperationAuditRecord::new( MycOperationAuditKind::DiscoveryHandlerPublish, MycOperationAuditOutcome::Rejected, None, @@ -533,12 +536,16 @@ async fn publish_nip89_event_to_relays( .publish_rejection_details() .map(ToOwned::to_owned) .unwrap_or_else(|| error.to_string()), - )); + ); + if let Some(attempt_id) = attempt_id { + record = record.with_attempt_id(attempt_id); + } + runtime.record_operation_audit(&record); return Err(error); } }; - runtime.record_operation_audit(&MycOperationAuditRecord::new( + let mut record = MycOperationAuditRecord::new( MycOperationAuditKind::DiscoveryHandlerPublish, MycOperationAuditOutcome::Succeeded, None, @@ -546,7 +553,11 @@ async fn publish_nip89_event_to_relays( publish_outcome.relay_count, publish_outcome.acknowledged_relay_count, publish_outcome.relay_outcome_summary.clone(), - )); + ); + if let Some(attempt_id) = attempt_id { + record = record.with_attempt_id(attempt_id); + } + runtime.record_operation_audit(&record); Ok(MycPublishedNip89Output { author_public_key_hex: context.app_identity().public_key_hex(), @@ -562,7 +573,7 @@ async fn publish_nip89_event_to_relays( pub async fn fetch_live_nip89(runtime: &MycRuntime) -> Result<MycFetchedLiveNip89Output, MycError> { let context = MycDiscoveryContext::from_runtime(runtime)?; - let fetched = fetch_live_nip89_state_for_runtime(runtime, &context).await?; + let fetched = fetch_live_nip89_state_for_runtime(runtime, &context, None).await?; Ok(MycFetchedLiveNip89Output { author_public_key_hex: context.app_identity().public_key_hex(), publish_relays: context @@ -579,7 +590,7 @@ pub async fn fetch_live_nip89(runtime: &MycRuntime) -> Result<MycFetchedLiveNip8 pub async fn diff_live_nip89(runtime: &MycRuntime) -> Result<MycDiscoveryDiffOutput, MycError> { let context = MycDiscoveryContext::from_runtime(runtime)?; let local_handler = context.render_normalized_nip89_handler(); - let fetched = fetch_live_nip89_state_for_runtime(runtime, &context).await?; + let fetched = fetch_live_nip89_state_for_runtime(runtime, &context, None).await?; let relay_states = build_relay_diffs(&local_handler, &fetched.relay_states); let relay_summary = summarize_relay_diffs(&relay_states); let live_groups = fetched.live_groups; @@ -599,8 +610,39 @@ pub async fn refresh_nip89( force: bool, ) -> Result<MycRefreshedNip89Output, MycError> { let context = MycDiscoveryContext::from_runtime(runtime)?; + let attempt_id = RadrootsNostrSignerRequestId::new_v7().into_string(); let local_handler = context.render_normalized_nip89_handler(); - let fetched = fetch_live_nip89_state_for_runtime(runtime, &context).await?; + let fetched = match fetch_live_nip89_state_for_runtime( + runtime, + &context, + Some(attempt_id.as_str()), + ) + .await + { + Ok(fetched) => fetched, + Err(MycError::DiscoveryFetchUnavailable { + relay_count, + details, + }) => { + runtime.record_operation_audit( + &MycOperationAuditRecord::new( + MycOperationAuditKind::DiscoveryHandlerRefresh, + MycOperationAuditOutcome::Unavailable, + None, + None, + relay_count, + 0, + details.clone(), + ) + .with_attempt_id(attempt_id.clone()), + ); + return Err(MycError::DiscoveryFetchUnavailable { + relay_count, + details, + }); + } + Err(error) => return Err(error), + }; let relay_states = build_relay_diffs(&local_handler, &fetched.relay_states); let relay_summary = summarize_relay_diffs(&relay_states); let live_groups = fetched.live_groups; @@ -610,29 +652,35 @@ pub async fn refresh_nip89( let compare_summary = describe_compare_status(status, &differing_fields, &live_groups, &relay_summary); - runtime.record_operation_audit(&MycOperationAuditRecord::new( - MycOperationAuditKind::DiscoveryHandlerCompare, - compare_status_to_audit_outcome(status), - None, - compare_request_id, - relay_count, - relay_count.saturating_sub(relay_summary.unavailable_relays.len()), - compare_summary, - )); - - if !relay_summary.unavailable_relays.is_empty() && !force { - runtime.record_operation_audit(&MycOperationAuditRecord::new( - MycOperationAuditKind::DiscoveryHandlerRefresh, - MycOperationAuditOutcome::Unavailable, + runtime.record_operation_audit( + &MycOperationAuditRecord::new( + MycOperationAuditKind::DiscoveryHandlerCompare, + compare_status_to_audit_outcome(status), None, compare_request_id, relay_count, relay_count.saturating_sub(relay_summary.unavailable_relays.len()), - format!( - "discovery relays were unavailable; rerun refresh with --force to override: {}", - relay_summary.unavailable_relays.join(", ") - ), - )); + compare_summary, + ) + .with_attempt_id(attempt_id.clone()), + ); + + if !relay_summary.unavailable_relays.is_empty() && !force { + runtime.record_operation_audit( + &MycOperationAuditRecord::new( + MycOperationAuditKind::DiscoveryHandlerRefresh, + MycOperationAuditOutcome::Unavailable, + None, + compare_request_id, + relay_count, + relay_count.saturating_sub(relay_summary.unavailable_relays.len()), + format!( + "discovery relays were unavailable; rerun refresh with --force to override: {}", + relay_summary.unavailable_relays.join(", ") + ), + ) + .with_attempt_id(attempt_id.clone()), + ); return Err(MycError::InvalidOperation(format!( "one or more discovery relays were unavailable; rerun `discovery refresh-nip89 --force` to override: {}", relay_summary.unavailable_relays.join(", ") @@ -640,16 +688,19 @@ pub async fn refresh_nip89( } if !relay_summary.conflicted_relays.is_empty() && !force { - runtime.record_operation_audit(&MycOperationAuditRecord::new( - MycOperationAuditKind::DiscoveryHandlerRefresh, - MycOperationAuditOutcome::Conflicted, - None, - compare_request_id, - relay_count, - relay_count.saturating_sub(relay_summary.unavailable_relays.len()), - "live discovery handler state is conflicted; rerun refresh with --force to override" - .to_owned(), - )); + runtime.record_operation_audit( + &MycOperationAuditRecord::new( + MycOperationAuditKind::DiscoveryHandlerRefresh, + MycOperationAuditOutcome::Conflicted, + None, + compare_request_id, + relay_count, + relay_count.saturating_sub(relay_summary.unavailable_relays.len()), + "live discovery handler state is conflicted; rerun refresh with --force to override" + .to_owned(), + ) + .with_attempt_id(attempt_id.clone()), + ); return Err(MycError::InvalidOperation( "live discovery handler state is conflicted; rerun `discovery refresh-nip89 --force` to override" .to_owned(), @@ -661,16 +712,26 @@ pub async fn refresh_nip89( if refresh_relays.is_empty() { let repair_results = build_repair_results(&context, &relay_states, &[], None, None); let repair_summary = summarize_repair_results(&repair_results); - runtime.record_operation_audit(&MycOperationAuditRecord::new( - MycOperationAuditKind::DiscoveryHandlerRefresh, - MycOperationAuditOutcome::Skipped, - None, - compare_request_id, - relay_count, - relay_count.saturating_sub(relay_summary.unavailable_relays.len()), - "local discovery handler already matches live state".to_owned(), - )); + record_refresh_repair_audit( + runtime, + compare_request_id.map(ToOwned::to_owned), + attempt_id.as_str(), + &repair_results, + ); + runtime.record_operation_audit( + &MycOperationAuditRecord::new( + MycOperationAuditKind::DiscoveryHandlerRefresh, + MycOperationAuditOutcome::Skipped, + None, + compare_request_id, + relay_count, + relay_count.saturating_sub(relay_summary.unavailable_relays.len()), + "local discovery handler already matches live state".to_owned(), + ) + .with_attempt_id(attempt_id.clone()), + ); return Ok(MycRefreshedNip89Output { + attempt_id, status, force, differing_fields, @@ -684,8 +745,16 @@ pub async fn refresh_nip89( }); } - match publish_nip89_event_to_relays(runtime, &context, &refresh_relays).await { + match publish_nip89_event_to_relays( + runtime, + &context, + &refresh_relays, + Some(attempt_id.as_str()), + ) + .await + { Ok(published) => { + let published_event_id = published.event.id.to_hex(); let repair_results = build_repair_results( &context, &relay_states, @@ -695,12 +764,32 @@ pub async fn refresh_nip89( ); record_refresh_repair_audit( runtime, - Some(published.event.id.to_hex()), + Some(published_event_id.clone()), + attempt_id.as_str(), &repair_results, ); let repair_summary = summarize_repair_results(&repair_results); let remaining_repair_relays = remaining_repair_relays(&repair_results); + runtime.record_operation_audit( + &MycOperationAuditRecord::new( + MycOperationAuditKind::DiscoveryHandlerRefresh, + MycOperationAuditOutcome::Succeeded, + None, + Some(published_event_id.as_str()), + published.relay_count, + published.acknowledged_relay_count, + format!( + "refresh completed with {} repaired, {} failed, {} unchanged, {} skipped", + repair_summary.repaired, + repair_summary.failed, + repair_summary.unchanged, + repair_summary.skipped + ), + ) + .with_attempt_id(attempt_id.clone()), + ); return Ok(MycRefreshedNip89Output { + attempt_id, status, force, differing_fields, @@ -716,7 +805,31 @@ pub async fn refresh_nip89( Err(error) => { let repair_results = build_repair_results(&context, &relay_states, &refresh_relays, None, Some(&error)); - record_refresh_repair_audit(runtime, None, &repair_results); + let repair_summary = summarize_repair_results(&repair_results); + record_refresh_repair_audit(runtime, None, attempt_id.as_str(), &repair_results); + runtime.record_operation_audit( + &MycOperationAuditRecord::new( + MycOperationAuditKind::DiscoveryHandlerRefresh, + MycOperationAuditOutcome::Rejected, + None, + compare_request_id, + relay_count, + relay_states + .iter() + .filter(|relay_state| { + relay_state.fetch_status == MycDiscoveryRelayFetchStatus::Available + }) + .count(), + format!( + "refresh failed with {} repaired, {} failed, {} unchanged, {} skipped", + repair_summary.repaired, + repair_summary.failed, + repair_summary.unchanged, + repair_summary.skipped + ), + ) + .with_attempt_id(attempt_id), + ); return Err(error); } } @@ -876,15 +989,15 @@ fn summarize_repair_results( fn record_refresh_repair_audit( runtime: &MycRuntime, request_id: Option<String>, + attempt_id: &str, repair_results: &[MycDiscoveryRelayRepairResult], ) { for result in repair_results { - let Some((outcome, acknowledged_relay_count)) = (match result.outcome { - MycDiscoveryRepairOutcome::Repaired => Some((MycOperationAuditOutcome::Succeeded, 1)), - MycDiscoveryRepairOutcome::Failed => Some((MycOperationAuditOutcome::Rejected, 0)), - MycDiscoveryRepairOutcome::Unchanged | MycDiscoveryRepairOutcome::Skipped => None, - }) else { - continue; + let (outcome, acknowledged_relay_count) = match result.outcome { + MycDiscoveryRepairOutcome::Repaired => (MycOperationAuditOutcome::Succeeded, 1), + MycDiscoveryRepairOutcome::Failed => (MycOperationAuditOutcome::Rejected, 0), + MycDiscoveryRepairOutcome::Unchanged => (MycOperationAuditOutcome::Matched, 0), + MycDiscoveryRepairOutcome::Skipped => (MycOperationAuditOutcome::Skipped, 0), }; runtime.record_operation_audit( @@ -900,6 +1013,7 @@ fn record_refresh_repair_audit( .clone() .unwrap_or_else(|| result.relay_url.clone()), ) + .with_attempt_id(attempt_id) .with_relay_url(result.relay_url.clone()), ); } @@ -908,6 +1022,7 @@ fn record_refresh_repair_audit( async fn fetch_live_nip89_state_for_runtime( runtime: &MycRuntime, context: &MycDiscoveryContext, + attempt_id: Option<&str>, ) -> Result<MycFetchedLiveNip89State, MycError> { match fetch_live_nip89_state(context).await { Ok(fetched) => { @@ -919,7 +1034,7 @@ async fn fetch_live_nip89_state_for_runtime( }) .collect::<Vec<_>>(); if !unavailable_relays.is_empty() { - runtime.record_operation_audit(&MycOperationAuditRecord::new( + let mut record = MycOperationAuditRecord::new( MycOperationAuditKind::DiscoveryHandlerFetch, MycOperationAuditOutcome::Unavailable, None, @@ -927,7 +1042,11 @@ async fn fetch_live_nip89_state_for_runtime( fetched.relay_states.len(), fetched.relay_states.len() - unavailable_relays.len(), summarize_unavailable_relays(&fetched.relay_states), - )); + ); + if let Some(attempt_id) = attempt_id { + record = record.with_attempt_id(attempt_id); + } + runtime.record_operation_audit(&record); } Ok(fetched) } @@ -935,7 +1054,7 @@ async fn fetch_live_nip89_state_for_runtime( relay_count, details, }) => { - runtime.record_operation_audit(&MycOperationAuditRecord::new( + let mut record = MycOperationAuditRecord::new( MycOperationAuditKind::DiscoveryHandlerFetch, MycOperationAuditOutcome::Unavailable, None, @@ -943,7 +1062,11 @@ async fn fetch_live_nip89_state_for_runtime( relay_count, 0, details.clone(), - )); + ); + if let Some(attempt_id) = attempt_id { + record = record.with_attempt_id(attempt_id); + } + runtime.record_operation_audit(&record); Err(MycError::DiscoveryFetchUnavailable { relay_count, details, diff --git a/tests/discovery_cli.rs b/tests/discovery_cli.rs @@ -803,6 +803,186 @@ async fn refresh_reports_partial_repair_and_audit_summary_through_the_cli() -> T } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn discovery_repair_attempt_commands_correlate_multiple_refresh_runs() -> TestResult<()> { + let relay_a = TestRelay::spawn().await?; + let relay_b = TestRelay::spawn().await?; + let temp = tempfile::tempdir()?; + let config_path = temp.path().join("config.toml"); + let state_dir = temp.path().join("state"); + let signer_identity_path = temp.path().join("signer.json"); + let user_identity_path = temp.path().join("user.json"); + let app_identity_path = temp.path().join("app.json"); + let app_identity = RadrootsIdentity::from_secret_key_str( + "3333333333333333333333333333333333333333333333333333333333333333", + )?; + + write_identity( + &signer_identity_path, + "1111111111111111111111111111111111111111111111111111111111111111", + ); + write_identity( + &user_identity_path, + "2222222222222222222222222222222222222222222222222222222222222222", + ); + app_identity.save_json(&app_identity_path)?; + write_config( + &config_path, + &state_dir, + &signer_identity_path, + &user_identity_path, + &app_identity_path, + &[relay_a.url(), relay_b.url()], + ); + + relay_a + .queue_publish_outcomes(app_identity.public_key(), &[true]) + .await; + relay_b + .queue_publish_outcomes(app_identity.public_key(), &[false, true]) + .await; + + let first_refresh = run_myc(&config_path, &["discovery", "refresh-nip89"])?; + assert!( + first_refresh.status.success(), + "first refresh-nip89 failed: {}", + String::from_utf8_lossy(&first_refresh.stderr) + ); + let first_refresh_output: Value = serde_json::from_slice(&first_refresh.stdout)?; + let first_attempt_id = first_refresh_output["attempt_id"] + .as_str() + .expect("first attempt id") + .to_owned(); + assert_eq!(first_refresh_output["repair_summary"]["repaired"], 1); + assert_eq!(first_refresh_output["repair_summary"]["failed"], 1); + assert_eq!( + first_refresh_output["remaining_repair_relays"], + Value::Array(vec![Value::String(relay_b.url().to_owned())]) + ); + + relay_a + .wait_for_published_events_by_author(app_identity.public_key(), 1) + .await?; + + let second_refresh = run_myc(&config_path, &["discovery", "refresh-nip89"])?; + assert!( + second_refresh.status.success(), + "second refresh-nip89 failed: {}", + String::from_utf8_lossy(&second_refresh.stderr) + ); + let second_refresh_output: Value = serde_json::from_slice(&second_refresh.stdout)?; + let second_attempt_id = second_refresh_output["attempt_id"] + .as_str() + .expect("second attempt id") + .to_owned(); + assert_ne!(first_attempt_id, second_attempt_id); + assert_eq!(second_refresh_output["repair_summary"]["repaired"], 1); + assert_eq!(second_refresh_output["repair_summary"]["failed"], 0); + assert_eq!(second_refresh_output["repair_summary"]["unchanged"], 1); + assert_eq!( + second_refresh_output["remaining_repair_relays"], + Value::Array(vec![]) + ); + + let latest_attempt = run_myc(&config_path, &["audit", "latest-discovery-repair"])?; + assert!( + latest_attempt.status.success(), + "latest-discovery-repair failed: {}", + String::from_utf8_lossy(&latest_attempt.stderr) + ); + let latest_attempt_output: Value = serde_json::from_slice(&latest_attempt.stdout)?; + assert_eq!( + latest_attempt_output["attempt_id"], + Value::String(second_attempt_id.clone()) + ); + assert_eq!( + latest_attempt_output["compare_outcome"], + Value::String("matched".to_owned()) + ); + assert_eq!( + latest_attempt_output["refresh_outcome"], + Value::String("succeeded".to_owned()) + ); + assert_eq!(latest_attempt_output["repair_summary"]["repaired"], 1); + assert_eq!(latest_attempt_output["repair_summary"]["failed"], 0); + assert_eq!(latest_attempt_output["repair_summary"]["unchanged"], 1); + assert_eq!( + latest_attempt_output["remaining_repair_relays"], + Value::Array(vec![]) + ); + + let first_attempt_summary = run_myc( + &config_path, + &[ + "audit", + "discovery-repair-attempt", + "--attempt-id", + first_attempt_id.as_str(), + ], + )?; + assert!( + first_attempt_summary.status.success(), + "discovery-repair-attempt summary failed: {}", + String::from_utf8_lossy(&first_attempt_summary.stderr) + ); + let first_attempt_summary_output: Value = + serde_json::from_slice(&first_attempt_summary.stdout)?; + assert_eq!( + first_attempt_summary_output["attempt_id"], + Value::String(first_attempt_id.clone()) + ); + assert_eq!( + first_attempt_summary_output["refresh_outcome"], + Value::String("succeeded".to_owned()) + ); + assert_eq!( + first_attempt_summary_output["repair_summary"]["repaired"], + 1 + ); + assert_eq!(first_attempt_summary_output["repair_summary"]["failed"], 1); + assert_eq!( + first_attempt_summary_output["failed_relays"], + Value::Array(vec![Value::String(relay_b.url().to_owned())]) + ); + assert_eq!( + first_attempt_summary_output["remaining_repair_relays"], + Value::Array(vec![Value::String(relay_b.url().to_owned())]) + ); + + let first_attempt_records = run_myc( + &config_path, + &[ + "audit", + "discovery-repair-attempt", + "--attempt-id", + first_attempt_id.as_str(), + "--view", + "records", + ], + )?; + assert!( + first_attempt_records.status.success(), + "discovery-repair-attempt records failed: {}", + String::from_utf8_lossy(&first_attempt_records.stderr) + ); + let first_attempt_records_output: Value = + serde_json::from_slice(&first_attempt_records.stdout)?; + let record_attempt_ids = first_attempt_records_output["runtime_operation_audit"] + .as_array() + .expect("attempt records") + .iter() + .map(|record| record["attempt_id"].as_str().expect("record attempt id")) + .collect::<Vec<_>>(); + assert!(!record_attempt_ids.is_empty()); + assert!( + record_attempt_ids + .iter() + .all(|attempt_id| *attempt_id == first_attempt_id) + ); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn discovery_diff_surfaces_relay_provenance_through_the_cli() -> TestResult<()> { let relay_a = TestRelay::spawn().await?; let relay_b = TestRelay::spawn().await?; diff --git a/tests/nip46_e2e.rs b/tests/nip46_e2e.rs @@ -1550,7 +1550,7 @@ async fn refresh_nip89_skips_when_live_handler_matches() -> TestResult<()> { MycDiscoveryRepairOutcome::Unchanged ); - let audit = wait_for_operation_audit_count(&runtime, 3).await?; + let audit = wait_for_operation_audit_count(&runtime, 4).await?; assert_eq!( audit[1].operation, MycOperationAuditKind::DiscoveryHandlerCompare @@ -1558,9 +1558,14 @@ async fn refresh_nip89_skips_when_live_handler_matches() -> TestResult<()> { assert_eq!(audit[1].outcome, MycOperationAuditOutcome::Matched); assert_eq!( audit[2].operation, + MycOperationAuditKind::DiscoveryHandlerRepair + ); + assert_eq!(audit[2].outcome, MycOperationAuditOutcome::Matched); + assert_eq!( + audit[3].operation, MycOperationAuditKind::DiscoveryHandlerRefresh ); - assert_eq!(audit[2].outcome, MycOperationAuditOutcome::Skipped); + assert_eq!(audit[3].outcome, MycOperationAuditOutcome::Skipped); Ok(()) }