mod.rs (67817B)
1 pub mod server; 2 3 use std::collections::BTreeMap; 4 use std::path::{Path, PathBuf}; 5 use std::sync::{Arc, Mutex}; 6 use std::time::Duration; 7 8 use radroots_nostr::prelude::{RadrootsNostrRelayStatus, RadrootsNostrRelayUrl}; 9 use radroots_nostr_signer::prelude::{ 10 RadrootsNostrLocalSignerCapability, RadrootsNostrRemoteSessionSignerCapability, 11 RadrootsNostrSignerBackend, RadrootsNostrSignerPublishWorkflowRecord, 12 RadrootsNostrSignerPublishWorkflowState, RadrootsNostrSignerRequestAuditRecord, 13 RadrootsNostrSignerRequestDecision, 14 }; 15 use radroots_sql_core::{SqlExecutor, SqliteExecutor}; 16 use serde::{Deserialize, Serialize}; 17 use tokio::task::JoinSet; 18 19 use crate::app::MycRuntime; 20 use crate::audit::{MycOperationAuditKind, MycOperationAuditOutcome}; 21 use crate::config::{ 22 MycRuntimeAuditBackend, MycRuntimeContractOutput, MycSignerStateBackend, 23 MycTransportDeliveryPolicy, 24 }; 25 use crate::custody::{MycActiveIdentity, MycIdentityStatusOutput}; 26 use crate::discovery::MycDiscoveryContext; 27 use crate::error::MycError; 28 use crate::outbox::{MycDeliveryOutboxRecord, MycDeliveryOutboxStatus, now_unix_secs}; 29 use crate::transport::MycTransportSnapshot; 30 31 const MYC_RELAY_PROBE_CONCURRENCY_LIMIT: usize = 4; 32 pub const MYC_SIGNER_STATUS_CONTRACT_VERSION: u32 = 1; 33 34 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] 35 #[serde(rename_all = "snake_case")] 36 pub enum MycRuntimeStatus { 37 Healthy, 38 Degraded, 39 Unready, 40 } 41 42 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] 43 #[serde(rename_all = "snake_case")] 44 pub enum MycRelayProbeAvailability { 45 Available, 46 Unavailable, 47 } 48 49 #[derive(Debug, Clone, PartialEq, Eq, Serialize)] 50 pub struct MycRelayProbe { 51 pub relay_url: String, 52 pub availability: MycRelayProbeAvailability, 53 #[serde(skip_serializing_if = "Option::is_none")] 54 pub relay_status: Option<String>, 55 pub connection_attempts: usize, 56 pub successful_connections: usize, 57 #[serde(skip_serializing_if = "Option::is_none")] 58 pub latency_ms: Option<u64>, 59 pub queue_depth: usize, 60 #[serde(skip_serializing_if = "Option::is_none")] 61 pub error: Option<String>, 62 } 63 64 #[derive(Debug, Clone, PartialEq, Eq, Serialize)] 65 pub struct MycTransportStatusOutput { 66 pub enabled: bool, 67 pub status: MycRuntimeStatus, 68 pub ready: bool, 69 pub configured_relay_count: usize, 70 pub required_available_relays: usize, 71 pub available_relay_count: usize, 72 pub unavailable_relay_count: usize, 73 pub delivery_policy: MycTransportDeliveryPolicy, 74 #[serde(skip_serializing_if = "Option::is_none")] 75 pub delivery_quorum: Option<usize>, 76 #[serde(skip_serializing_if = "Vec::is_empty", default)] 77 pub relay_probes: Vec<MycRelayProbe>, 78 } 79 80 #[derive(Debug, Clone, PartialEq, Eq, Serialize)] 81 pub struct MycDiscoveryRelayGroupStatusOutput { 82 pub configured_relay_count: usize, 83 pub available_relay_count: usize, 84 pub unavailable_relay_count: usize, 85 #[serde(skip_serializing_if = "Vec::is_empty", default)] 86 pub relay_probes: Vec<MycRelayProbe>, 87 } 88 89 #[derive(Debug, Clone, PartialEq, Eq, Serialize)] 90 pub struct MycDiscoveryStatusOutput { 91 pub enabled: bool, 92 pub status: MycRuntimeStatus, 93 pub public_relays: MycDiscoveryRelayGroupStatusOutput, 94 pub publish_relays: MycDiscoveryRelayGroupStatusOutput, 95 } 96 97 #[derive(Debug, Clone, PartialEq, Eq, Serialize)] 98 pub struct MycCustodyStatusOutput { 99 pub signer: MycIdentityStatusOutput, 100 pub user: MycIdentityStatusOutput, 101 #[serde(skip_serializing_if = "Option::is_none")] 102 pub discovery_app: Option<MycIdentityStatusOutput>, 103 } 104 105 #[derive(Debug, Clone, PartialEq, Eq, Serialize)] 106 pub struct MycPersistenceStatusOutput { 107 pub signer_state: MycSignerStatePersistenceStatusOutput, 108 pub runtime_audit: MycRuntimeAuditPersistenceStatusOutput, 109 } 110 111 #[derive(Debug, Clone, PartialEq, Eq, Serialize)] 112 pub struct MycSignerBackendStatusOutput { 113 #[serde(skip_serializing_if = "Option::is_none")] 114 pub local_signer: Option<RadrootsNostrLocalSignerCapability>, 115 pub remote_session_count: usize, 116 #[serde(skip_serializing_if = "Vec::is_empty", default)] 117 pub remote_sessions: Vec<RadrootsNostrRemoteSessionSignerCapability>, 118 pub publish_workflow_count: usize, 119 } 120 121 #[derive(Debug, Clone, PartialEq, Eq, Serialize)] 122 pub struct MycDeliveryRecoveryStatusOutput { 123 pub recorded_at_unix: u64, 124 pub outcome: MycOperationAuditOutcome, 125 pub summary: String, 126 } 127 128 #[derive(Debug, Clone, PartialEq, Eq, Serialize)] 129 pub struct MycDeliveryOutboxStatusOutput { 130 pub status: MycRuntimeStatus, 131 pub ready: bool, 132 pub path: PathBuf, 133 pub exists: bool, 134 pub total_job_count: usize, 135 pub queued_job_count: usize, 136 pub published_pending_finalize_job_count: usize, 137 pub finalized_job_count: usize, 138 pub failed_job_count: usize, 139 pub unfinished_job_count: usize, 140 pub critical_unfinished_job_count: usize, 141 pub blocked_job_count: usize, 142 pub critical_blocked_job_count: usize, 143 pub stuck_after_secs: u64, 144 #[serde(skip_serializing_if = "Option::is_none")] 145 pub oldest_unfinished_age_secs: Option<u64>, 146 #[serde(skip_serializing_if = "Option::is_none")] 147 pub oldest_blocked_age_secs: Option<u64>, 148 #[serde(skip_serializing_if = "Option::is_none")] 149 pub last_recovery: Option<MycDeliveryRecoveryStatusOutput>, 150 } 151 152 #[derive(Debug, Clone, PartialEq, Eq, Serialize)] 153 pub struct MycSignerStatePersistenceStatusOutput { 154 pub backend: MycSignerStateBackend, 155 pub path: PathBuf, 156 pub exists: bool, 157 #[serde(skip_serializing_if = "Option::is_none")] 158 pub sqlite_schema: Option<MycSqliteSchemaStatusOutput>, 159 } 160 161 #[derive(Debug, Clone, PartialEq, Eq, Serialize)] 162 pub struct MycRuntimeAuditPersistenceStatusOutput { 163 pub backend: MycRuntimeAuditBackend, 164 pub path: PathBuf, 165 pub exists: bool, 166 #[serde(skip_serializing_if = "Option::is_none")] 167 pub sqlite_schema: Option<MycSqliteSchemaStatusOutput>, 168 } 169 170 #[derive(Debug, Clone, PartialEq, Eq, Serialize)] 171 pub struct MycSqliteSchemaStatusOutput { 172 pub ready: bool, 173 #[serde(skip_serializing_if = "Option::is_none")] 174 pub applied_migration_count: Option<usize>, 175 #[serde(skip_serializing_if = "Option::is_none")] 176 pub latest_migration: Option<String>, 177 #[serde(skip_serializing_if = "Option::is_none")] 178 pub journal_mode: Option<String>, 179 #[serde(skip_serializing_if = "Option::is_none")] 180 pub store_version: Option<u32>, 181 #[serde(skip_serializing_if = "Option::is_none")] 182 pub error: Option<String>, 183 } 184 185 #[derive(Debug, Clone, PartialEq, Eq, Serialize)] 186 pub struct MycStatusFullOutput { 187 pub status: MycRuntimeStatus, 188 pub ready: bool, 189 pub reasons: Vec<String>, 190 pub runtime_contract: MycRuntimeContractOutput, 191 pub startup: crate::app::MycStartupSnapshot, 192 pub signer_backend: MycSignerBackendStatusOutput, 193 pub custody: MycCustodyStatusOutput, 194 pub persistence: MycPersistenceStatusOutput, 195 pub delivery_outbox: MycDeliveryOutboxStatusOutput, 196 pub transport: MycTransportStatusOutput, 197 pub discovery: MycDiscoveryStatusOutput, 198 } 199 200 #[derive(Debug, Clone, PartialEq, Eq, Serialize)] 201 pub struct MycStatusSignerOutput { 202 pub status_contract_version: u32, 203 pub status: MycRuntimeStatus, 204 pub ready: bool, 205 pub reasons: Vec<String>, 206 pub runtime_contract: MycRuntimeContractOutput, 207 pub signer_backend: MycSignerBackendStatusOutput, 208 pub custody: MycCustodyStatusOutput, 209 } 210 211 #[derive(Debug, Clone, PartialEq, Eq, Serialize)] 212 pub struct MycStatusSummaryOutput { 213 pub status: MycRuntimeStatus, 214 pub ready: bool, 215 pub reasons: Vec<String>, 216 pub instance_name: String, 217 pub runtime_contract: MycRuntimeContractOutput, 218 pub signer_backend: MycSignerBackendStatusOutput, 219 pub custody: MycCustodyStatusOutput, 220 pub persistence: MycPersistenceStatusOutput, 221 pub delivery_outbox: MycDeliveryOutboxStatusOutput, 222 pub transport: MycTransportStatusOutput, 223 pub discovery: MycDiscoveryStatusOutput, 224 } 225 226 #[derive(Debug, Clone, Default, PartialEq, Eq, Serialize)] 227 pub struct MycAuditDecisionCounts { 228 pub allowed: usize, 229 pub denied: usize, 230 pub challenged: usize, 231 } 232 233 #[derive(Debug, Clone, Default, PartialEq, Eq, Serialize)] 234 pub struct MycOperationOutcomeCounts { 235 pub succeeded: usize, 236 pub rejected: usize, 237 pub restored: usize, 238 pub unavailable: usize, 239 pub missing: usize, 240 pub matched: usize, 241 pub drifted: usize, 242 pub conflicted: usize, 243 pub skipped: usize, 244 } 245 246 #[derive(Debug, Clone, PartialEq, Eq, Serialize)] 247 pub struct MycMetricsSnapshot { 248 pub signer_request_total: usize, 249 pub signer_request_decisions: MycAuditDecisionCounts, 250 pub runtime_operation_total: usize, 251 pub runtime_operation_outcomes: MycOperationOutcomeCounts, 252 pub runtime_operation_by_kind: BTreeMap<String, MycOperationOutcomeCounts>, 253 pub runtime_aggregate_publish_rejection_count: usize, 254 pub runtime_repair_success_count: usize, 255 pub runtime_repair_rejection_count: usize, 256 pub runtime_unavailable_count: usize, 257 pub runtime_replay_restore_count: usize, 258 pub delivery_recovery_success_count: usize, 259 pub delivery_recovery_rejection_count: usize, 260 pub delivery_outbox_total: usize, 261 pub delivery_outbox_queued_count: usize, 262 pub delivery_outbox_published_pending_finalize_count: usize, 263 pub delivery_outbox_failed_count: usize, 264 pub delivery_outbox_finalized_count: usize, 265 pub delivery_outbox_unfinished_count: usize, 266 pub delivery_outbox_critical_unfinished_count: usize, 267 pub delivery_outbox_blocked_count: usize, 268 pub delivery_outbox_critical_blocked_count: usize, 269 } 270 271 #[derive(Debug, Clone, Default, PartialEq, Eq)] 272 pub(crate) struct MycLiveMetricsState { 273 signer_request_total: usize, 274 signer_request_decisions: MycAuditDecisionCounts, 275 runtime_operation_total: usize, 276 runtime_operation_outcomes: MycOperationOutcomeCounts, 277 runtime_operation_by_kind: BTreeMap<String, MycOperationOutcomeCounts>, 278 runtime_aggregate_publish_rejection_count: usize, 279 runtime_repair_success_count: usize, 280 runtime_repair_rejection_count: usize, 281 runtime_unavailable_count: usize, 282 runtime_replay_restore_count: usize, 283 delivery_recovery_success_count: usize, 284 delivery_recovery_rejection_count: usize, 285 } 286 287 pub(crate) type MycLiveMetricsHandle = Arc<Mutex<MycLiveMetricsState>>; 288 289 #[derive(Debug, Clone, PartialEq, Eq)] 290 struct MycTransportStatusEvaluation { 291 output: MycTransportStatusOutput, 292 reasons: Vec<String>, 293 } 294 295 #[derive(Debug, Clone, PartialEq, Eq)] 296 struct MycCustodyStatusEvaluation { 297 output: MycCustodyStatusOutput, 298 } 299 300 #[derive(Debug, Clone, PartialEq, Eq)] 301 struct MycPersistenceStatusEvaluation { 302 output: MycPersistenceStatusOutput, 303 reasons: Vec<String>, 304 status: Option<MycRuntimeStatus>, 305 } 306 307 #[derive(Debug, Clone, PartialEq, Eq)] 308 struct MycDeliveryOutboxStatusEvaluation { 309 output: MycDeliveryOutboxStatusOutput, 310 reasons: Vec<String>, 311 } 312 313 #[derive(Debug, Deserialize)] 314 struct MycSqliteAppliedCountRow { 315 applied_count: u64, 316 } 317 318 #[derive(Debug, Deserialize)] 319 struct MycSqliteNamedRow { 320 name: String, 321 } 322 323 #[derive(Debug, Deserialize)] 324 struct MycSqliteJournalModeRow { 325 journal_mode: String, 326 } 327 328 #[derive(Debug, Deserialize)] 329 struct MycSqliteStoreVersionRow { 330 store_version: u64, 331 } 332 333 pub async fn collect_status_full(runtime: &MycRuntime) -> Result<MycStatusFullOutput, MycError> { 334 let snapshot = runtime.snapshot(); 335 let signer_backend = collect_signer_backend_status(runtime)?; 336 let custody = collect_custody_status(runtime)?; 337 let persistence = collect_persistence_status(runtime); 338 let delivery_outbox = collect_delivery_outbox_status(runtime)?; 339 let transport = collect_transport_status(runtime).await?; 340 let discovery = collect_discovery_status(runtime).await?; 341 let mut status = combine_runtime_status( 342 transport.output.status, 343 if discovery.output.enabled { 344 Some(discovery.output.status) 345 } else { 346 None 347 }, 348 ); 349 let mut reasons = transport.reasons; 350 reasons.extend(discovery.reasons); 351 status = worse_runtime_status(status, delivery_outbox.output.status); 352 reasons.extend(delivery_outbox.reasons.clone()); 353 if let Some(persistence_status) = persistence.status { 354 status = worse_runtime_status(status, persistence_status); 355 } 356 reasons.extend(persistence.reasons.clone()); 357 if custody 358 .output 359 .discovery_app 360 .as_ref() 361 .is_some_and(|status_output| !status_output.resolved) 362 && status != MycRuntimeStatus::Unready 363 { 364 status = MycRuntimeStatus::Degraded; 365 reasons.push("discovery app identity could not be resolved".to_owned()); 366 } 367 let ready = transport.output.ready && delivery_outbox.output.ready; 368 Ok(MycStatusFullOutput { 369 status, 370 ready, 371 reasons, 372 runtime_contract: runtime.config().runtime_contract_output(), 373 startup: snapshot, 374 signer_backend, 375 custody: custody.output, 376 persistence: persistence.output, 377 delivery_outbox: delivery_outbox.output, 378 transport: transport.output, 379 discovery: discovery.output, 380 }) 381 } 382 383 pub fn collect_status_signer(runtime: &MycRuntime) -> Result<MycStatusSignerOutput, MycError> { 384 let signer_backend = collect_signer_backend_status(runtime)?; 385 let custody = collect_custody_status(runtime)?; 386 let mut reasons = Vec::new(); 387 388 if !custody.output.signer.resolved { 389 reasons.push("signer identity could not be resolved".to_owned()); 390 } 391 if !custody.output.user.resolved { 392 reasons.push("user identity could not be resolved".to_owned()); 393 } 394 match signer_backend.local_signer.as_ref() { 395 Some(local_signer) if local_signer.is_secret_backed() => {} 396 Some(_) => reasons.push("local signer capability is not secret-backed".to_owned()), 397 None => reasons.push("local signer capability is unavailable".to_owned()), 398 } 399 400 let ready = reasons.is_empty(); 401 Ok(MycStatusSignerOutput { 402 status_contract_version: MYC_SIGNER_STATUS_CONTRACT_VERSION, 403 status: if ready { 404 MycRuntimeStatus::Healthy 405 } else { 406 MycRuntimeStatus::Unready 407 }, 408 ready, 409 reasons, 410 runtime_contract: runtime.config().runtime_contract_output(), 411 signer_backend, 412 custody: custody.output, 413 }) 414 } 415 416 pub async fn collect_status_summary( 417 runtime: &MycRuntime, 418 ) -> Result<MycStatusSummaryOutput, MycError> { 419 let full = collect_status_full(runtime).await?; 420 Ok(MycStatusSummaryOutput { 421 status: full.status, 422 ready: full.ready, 423 reasons: full.reasons, 424 instance_name: full.startup.instance_name, 425 runtime_contract: full.runtime_contract, 426 signer_backend: MycSignerBackendStatusOutput { 427 local_signer: full.signer_backend.local_signer.clone(), 428 remote_session_count: full.signer_backend.remote_session_count, 429 remote_sessions: Vec::new(), 430 publish_workflow_count: full.signer_backend.publish_workflow_count, 431 }, 432 custody: full.custody, 433 persistence: full.persistence, 434 delivery_outbox: full.delivery_outbox, 435 transport: MycTransportStatusOutput { 436 relay_probes: Vec::new(), 437 ..full.transport 438 }, 439 discovery: MycDiscoveryStatusOutput { 440 enabled: full.discovery.enabled, 441 status: full.discovery.status, 442 public_relays: MycDiscoveryRelayGroupStatusOutput { 443 relay_probes: Vec::new(), 444 ..full.discovery.public_relays 445 }, 446 publish_relays: MycDiscoveryRelayGroupStatusOutput { 447 relay_probes: Vec::new(), 448 ..full.discovery.publish_relays 449 }, 450 }, 451 }) 452 } 453 454 fn collect_custody_status(runtime: &MycRuntime) -> Result<MycCustodyStatusEvaluation, MycError> { 455 let signer = runtime 456 .signer_context() 457 .signer_identity_provider() 458 .resolved_status(runtime.signer_identity()); 459 let user = runtime 460 .signer_context() 461 .user_identity_provider() 462 .resolved_status(runtime.user_identity()); 463 let discovery_app = if runtime.config().discovery.enabled { 464 match runtime.config().discovery.app_identity_source() { 465 Some(source) => Some( 466 crate::custody::MycIdentityProvider::from_source( 467 "discovery app", 468 source, 469 Duration::from_secs(runtime.config().custody.external_command_timeout_secs), 470 )? 471 .probe_status(), 472 ), 473 None => Some( 474 runtime 475 .signer_context() 476 .signer_identity_provider() 477 .resolved_status(runtime.signer_identity()) 478 .with_inherited_from("signer"), 479 ), 480 } 481 } else { 482 None 483 }; 484 485 Ok(MycCustodyStatusEvaluation { 486 output: MycCustodyStatusOutput { 487 signer, 488 user, 489 discovery_app, 490 }, 491 }) 492 } 493 494 fn collect_signer_backend_status( 495 runtime: &MycRuntime, 496 ) -> Result<MycSignerBackendStatusOutput, MycError> { 497 let backend = runtime.signer_backend(); 498 let capabilities = backend.capabilities()?; 499 let publish_workflow_count = backend.list_publish_workflows()?.len(); 500 Ok(MycSignerBackendStatusOutput { 501 local_signer: capabilities.local_signer, 502 remote_session_count: capabilities.remote_sessions.len(), 503 remote_sessions: capabilities.remote_sessions, 504 publish_workflow_count, 505 }) 506 } 507 508 fn collect_persistence_status(runtime: &MycRuntime) -> MycPersistenceStatusEvaluation { 509 let signer_state_backend = runtime.config().persistence.signer_state_backend; 510 let runtime_audit_backend = runtime.config().persistence.runtime_audit_backend; 511 let signer_state = MycSignerStatePersistenceStatusOutput { 512 backend: signer_state_backend, 513 path: runtime.paths().signer_state_path.clone(), 514 exists: runtime.paths().signer_state_path.exists(), 515 sqlite_schema: match signer_state_backend { 516 MycSignerStateBackend::JsonFile => None, 517 MycSignerStateBackend::Sqlite => Some(inspect_signer_state_sqlite_schema( 518 runtime.paths().signer_state_path.as_path(), 519 )), 520 }, 521 }; 522 let runtime_audit = MycRuntimeAuditPersistenceStatusOutput { 523 backend: runtime_audit_backend, 524 path: runtime.paths().runtime_audit_path.clone(), 525 exists: runtime.paths().runtime_audit_path.exists(), 526 sqlite_schema: match runtime_audit_backend { 527 MycRuntimeAuditBackend::JsonlFile => None, 528 MycRuntimeAuditBackend::Sqlite => Some(inspect_runtime_audit_sqlite_schema( 529 runtime.paths().runtime_audit_path.as_path(), 530 )), 531 }, 532 }; 533 534 let mut reasons = Vec::new(); 535 if signer_state 536 .sqlite_schema 537 .as_ref() 538 .is_some_and(|schema| !schema.ready) 539 { 540 reasons.push(format!( 541 "signer-state sqlite schema at {} is not ready", 542 signer_state.path.display() 543 )); 544 } 545 if runtime_audit 546 .sqlite_schema 547 .as_ref() 548 .is_some_and(|schema| !schema.ready) 549 { 550 reasons.push(format!( 551 "runtime-audit sqlite schema at {} is not ready", 552 runtime_audit.path.display() 553 )); 554 } 555 let status = if reasons.is_empty() { 556 None 557 } else { 558 Some(MycRuntimeStatus::Degraded) 559 }; 560 561 MycPersistenceStatusEvaluation { 562 output: MycPersistenceStatusOutput { 563 signer_state, 564 runtime_audit, 565 }, 566 reasons, 567 status, 568 } 569 } 570 571 pub fn collect_metrics(runtime: &MycRuntime) -> Result<MycMetricsSnapshot, MycError> { 572 let outbox_status = collect_delivery_outbox_status(runtime)?; 573 Ok(runtime.metrics_snapshot(&outbox_status.output)) 574 } 575 576 pub fn render_metrics_text(snapshot: &MycMetricsSnapshot) -> String { 577 let mut lines = Vec::new(); 578 push_counter( 579 &mut lines, 580 "myc_signer_request_total", 581 snapshot.signer_request_total, 582 ); 583 push_labeled_counter( 584 &mut lines, 585 "myc_signer_request_decision_total", 586 "decision", 587 "allowed", 588 snapshot.signer_request_decisions.allowed, 589 ); 590 push_labeled_counter( 591 &mut lines, 592 "myc_signer_request_decision_total", 593 "decision", 594 "denied", 595 snapshot.signer_request_decisions.denied, 596 ); 597 push_labeled_counter( 598 &mut lines, 599 "myc_signer_request_decision_total", 600 "decision", 601 "challenged", 602 snapshot.signer_request_decisions.challenged, 603 ); 604 605 push_counter( 606 &mut lines, 607 "myc_runtime_operation_total", 608 snapshot.runtime_operation_total, 609 ); 610 push_outcome_counters( 611 &mut lines, 612 "myc_runtime_operation_outcome_total", 613 &snapshot.runtime_operation_outcomes, 614 ); 615 for (kind, counts) in &snapshot.runtime_operation_by_kind { 616 push_outcome_counters_with_extra_label( 617 &mut lines, 618 "myc_runtime_operation_kind_total", 619 "kind", 620 kind, 621 counts, 622 ); 623 } 624 push_counter( 625 &mut lines, 626 "myc_runtime_aggregate_publish_rejection_total", 627 snapshot.runtime_aggregate_publish_rejection_count, 628 ); 629 push_counter( 630 &mut lines, 631 "myc_runtime_repair_success_total", 632 snapshot.runtime_repair_success_count, 633 ); 634 push_counter( 635 &mut lines, 636 "myc_runtime_repair_rejection_total", 637 snapshot.runtime_repair_rejection_count, 638 ); 639 push_counter( 640 &mut lines, 641 "myc_runtime_unavailable_total", 642 snapshot.runtime_unavailable_count, 643 ); 644 push_counter( 645 &mut lines, 646 "myc_runtime_replay_restore_total", 647 snapshot.runtime_replay_restore_count, 648 ); 649 push_counter( 650 &mut lines, 651 "myc_delivery_recovery_success_total", 652 snapshot.delivery_recovery_success_count, 653 ); 654 push_counter( 655 &mut lines, 656 "myc_delivery_recovery_rejection_total", 657 snapshot.delivery_recovery_rejection_count, 658 ); 659 push_counter( 660 &mut lines, 661 "myc_delivery_outbox_total", 662 snapshot.delivery_outbox_total, 663 ); 664 push_counter( 665 &mut lines, 666 "myc_delivery_outbox_queued_total", 667 snapshot.delivery_outbox_queued_count, 668 ); 669 push_counter( 670 &mut lines, 671 "myc_delivery_outbox_published_pending_finalize_total", 672 snapshot.delivery_outbox_published_pending_finalize_count, 673 ); 674 push_counter( 675 &mut lines, 676 "myc_delivery_outbox_failed_total", 677 snapshot.delivery_outbox_failed_count, 678 ); 679 push_counter( 680 &mut lines, 681 "myc_delivery_outbox_finalized_total", 682 snapshot.delivery_outbox_finalized_count, 683 ); 684 push_counter( 685 &mut lines, 686 "myc_delivery_outbox_unfinished_total", 687 snapshot.delivery_outbox_unfinished_count, 688 ); 689 push_counter( 690 &mut lines, 691 "myc_delivery_outbox_critical_unfinished_total", 692 snapshot.delivery_outbox_critical_unfinished_count, 693 ); 694 push_counter( 695 &mut lines, 696 "myc_delivery_outbox_blocked_total", 697 snapshot.delivery_outbox_blocked_count, 698 ); 699 push_counter( 700 &mut lines, 701 "myc_delivery_outbox_critical_blocked_total", 702 snapshot.delivery_outbox_critical_blocked_count, 703 ); 704 705 lines.join("\n") 706 } 707 708 impl MycLiveMetricsState { 709 pub(crate) fn from_records( 710 signer_request_audit: &[RadrootsNostrSignerRequestAuditRecord], 711 runtime_operation_audit: &[crate::audit::MycOperationAuditRecord], 712 ) -> Self { 713 let mut state = Self::default(); 714 for record in signer_request_audit { 715 state.record_signer_request_audit(record); 716 } 717 for record in runtime_operation_audit { 718 state.record_runtime_operation(record); 719 } 720 state 721 } 722 723 pub(crate) fn record_signer_request_audit( 724 &mut self, 725 record: &RadrootsNostrSignerRequestAuditRecord, 726 ) { 727 self.signer_request_total += 1; 728 match record.decision { 729 RadrootsNostrSignerRequestDecision::Allowed => { 730 self.signer_request_decisions.allowed += 1; 731 } 732 RadrootsNostrSignerRequestDecision::Denied => { 733 self.signer_request_decisions.denied += 1; 734 } 735 RadrootsNostrSignerRequestDecision::Challenged => { 736 self.signer_request_decisions.challenged += 1; 737 } 738 } 739 } 740 741 pub(crate) fn record_runtime_operation( 742 &mut self, 743 record: &crate::audit::MycOperationAuditRecord, 744 ) { 745 self.runtime_operation_total += 1; 746 increment_outcome_counts(&mut self.runtime_operation_outcomes, record.outcome); 747 increment_outcome_counts( 748 self.runtime_operation_by_kind 749 .entry(operation_kind_label(record.operation)) 750 .or_default(), 751 record.outcome, 752 ); 753 if is_aggregate_publish_operation(record.operation) 754 && record.outcome == MycOperationAuditOutcome::Rejected 755 { 756 self.runtime_aggregate_publish_rejection_count += 1; 757 } 758 if record.operation == MycOperationAuditKind::DiscoveryHandlerRepair { 759 match record.outcome { 760 MycOperationAuditOutcome::Succeeded => self.runtime_repair_success_count += 1, 761 MycOperationAuditOutcome::Rejected => self.runtime_repair_rejection_count += 1, 762 _ => {} 763 } 764 } 765 if record.outcome == MycOperationAuditOutcome::Unavailable { 766 self.runtime_unavailable_count += 1; 767 } 768 if record.operation == MycOperationAuditKind::AuthReplayRestore 769 && record.outcome == MycOperationAuditOutcome::Restored 770 { 771 self.runtime_replay_restore_count += 1; 772 } 773 if record.operation == MycOperationAuditKind::DeliveryRecovery { 774 match record.outcome { 775 MycOperationAuditOutcome::Succeeded => self.delivery_recovery_success_count += 1, 776 MycOperationAuditOutcome::Rejected => { 777 self.delivery_recovery_rejection_count += 1; 778 } 779 _ => {} 780 } 781 } 782 } 783 784 pub(crate) fn snapshot( 785 &self, 786 outbox_status: &MycDeliveryOutboxStatusOutput, 787 ) -> MycMetricsSnapshot { 788 MycMetricsSnapshot { 789 signer_request_total: self.signer_request_total, 790 signer_request_decisions: self.signer_request_decisions.clone(), 791 runtime_operation_total: self.runtime_operation_total, 792 runtime_operation_outcomes: self.runtime_operation_outcomes.clone(), 793 runtime_operation_by_kind: self.runtime_operation_by_kind.clone(), 794 runtime_aggregate_publish_rejection_count: self 795 .runtime_aggregate_publish_rejection_count, 796 runtime_repair_success_count: self.runtime_repair_success_count, 797 runtime_repair_rejection_count: self.runtime_repair_rejection_count, 798 runtime_unavailable_count: self.runtime_unavailable_count, 799 runtime_replay_restore_count: self.runtime_replay_restore_count, 800 delivery_recovery_success_count: self.delivery_recovery_success_count, 801 delivery_recovery_rejection_count: self.delivery_recovery_rejection_count, 802 delivery_outbox_total: outbox_status.total_job_count, 803 delivery_outbox_queued_count: outbox_status.queued_job_count, 804 delivery_outbox_published_pending_finalize_count: outbox_status 805 .published_pending_finalize_job_count, 806 delivery_outbox_failed_count: outbox_status.failed_job_count, 807 delivery_outbox_finalized_count: outbox_status.finalized_job_count, 808 delivery_outbox_unfinished_count: outbox_status.unfinished_job_count, 809 delivery_outbox_critical_unfinished_count: outbox_status.critical_unfinished_job_count, 810 delivery_outbox_blocked_count: outbox_status.blocked_job_count, 811 delivery_outbox_critical_blocked_count: outbox_status.critical_blocked_job_count, 812 } 813 } 814 } 815 816 pub fn increment_outcome_counts( 817 counts: &mut MycOperationOutcomeCounts, 818 outcome: MycOperationAuditOutcome, 819 ) { 820 match outcome { 821 MycOperationAuditOutcome::Succeeded => counts.succeeded += 1, 822 MycOperationAuditOutcome::Rejected => counts.rejected += 1, 823 MycOperationAuditOutcome::Restored => counts.restored += 1, 824 MycOperationAuditOutcome::Unavailable => counts.unavailable += 1, 825 MycOperationAuditOutcome::Missing => counts.missing += 1, 826 MycOperationAuditOutcome::Matched => counts.matched += 1, 827 MycOperationAuditOutcome::Drifted => counts.drifted += 1, 828 MycOperationAuditOutcome::Conflicted => counts.conflicted += 1, 829 MycOperationAuditOutcome::Skipped => counts.skipped += 1, 830 } 831 } 832 833 pub fn operation_kind_label(kind: MycOperationAuditKind) -> String { 834 match kind { 835 MycOperationAuditKind::DeliveryRecovery => "delivery_recovery".to_owned(), 836 MycOperationAuditKind::ListenerResponsePublish => "listener_response_publish".to_owned(), 837 MycOperationAuditKind::ConnectAcceptPublish => "connect_accept_publish".to_owned(), 838 MycOperationAuditKind::AuthReplayPublish => "auth_replay_publish".to_owned(), 839 MycOperationAuditKind::AuthReplayRestore => "auth_replay_restore".to_owned(), 840 MycOperationAuditKind::DiscoveryHandlerFetch => "discovery_handler_fetch".to_owned(), 841 MycOperationAuditKind::DiscoveryHandlerPublish => "discovery_handler_publish".to_owned(), 842 MycOperationAuditKind::DiscoveryHandlerCompare => "discovery_handler_compare".to_owned(), 843 MycOperationAuditKind::DiscoveryHandlerRefresh => "discovery_handler_refresh".to_owned(), 844 MycOperationAuditKind::DiscoveryHandlerRepair => "discovery_handler_repair".to_owned(), 845 } 846 } 847 848 pub fn is_aggregate_publish_operation(kind: MycOperationAuditKind) -> bool { 849 matches!( 850 kind, 851 MycOperationAuditKind::ListenerResponsePublish 852 | MycOperationAuditKind::ConnectAcceptPublish 853 | MycOperationAuditKind::AuthReplayPublish 854 | MycOperationAuditKind::DiscoveryHandlerPublish 855 ) 856 } 857 858 async fn collect_transport_status( 859 runtime: &MycRuntime, 860 ) -> Result<MycTransportStatusEvaluation, MycError> { 861 let snapshot = runtime.snapshot().transport; 862 if !snapshot.enabled { 863 return Ok(MycTransportStatusEvaluation { 864 output: MycTransportStatusOutput { 865 enabled: false, 866 status: MycRuntimeStatus::Unready, 867 ready: false, 868 configured_relay_count: 0, 869 required_available_relays: 0, 870 available_relay_count: 0, 871 unavailable_relay_count: 0, 872 delivery_policy: snapshot.delivery_policy, 873 delivery_quorum: snapshot.delivery_quorum, 874 relay_probes: Vec::new(), 875 }, 876 reasons: vec!["transport is disabled".to_owned()], 877 }); 878 } 879 880 let Some(transport) = runtime.transport() else { 881 return Ok(MycTransportStatusEvaluation { 882 output: MycTransportStatusOutput { 883 enabled: true, 884 status: MycRuntimeStatus::Unready, 885 ready: false, 886 configured_relay_count: 0, 887 required_available_relays: 0, 888 available_relay_count: 0, 889 unavailable_relay_count: 0, 890 delivery_policy: snapshot.delivery_policy, 891 delivery_quorum: snapshot.delivery_quorum, 892 relay_probes: Vec::new(), 893 }, 894 reasons: vec!["transport is enabled but no transport client was prepared".to_owned()], 895 }); 896 }; 897 898 let relay_probes = probe_relays( 899 runtime.signer_identity(), 900 transport.relays(), 901 transport.connect_timeout_secs(), 902 ) 903 .await?; 904 let available_relay_count = relay_probes 905 .iter() 906 .filter(|probe| probe.availability == MycRelayProbeAvailability::Available) 907 .count(); 908 let configured_relay_count = relay_probes.len(); 909 let unavailable_relay_count = configured_relay_count.saturating_sub(available_relay_count); 910 let required_available_relays = 911 required_available_relays(&snapshot, configured_relay_count).unwrap_or(usize::MAX); 912 let ready = available_relay_count >= required_available_relays; 913 let status = if !ready { 914 MycRuntimeStatus::Unready 915 } else if unavailable_relay_count > 0 { 916 MycRuntimeStatus::Degraded 917 } else { 918 MycRuntimeStatus::Healthy 919 }; 920 let mut reasons = Vec::new(); 921 if !ready { 922 reasons.push(format!( 923 "transport availability {available_relay_count}/{} does not satisfy delivery policy {}", 924 configured_relay_count, 925 snapshot.delivery_policy.as_str() 926 )); 927 } else if unavailable_relay_count > 0 { 928 reasons.push(format!( 929 "{unavailable_relay_count} transport relay(s) are unavailable" 930 )); 931 } 932 933 Ok(MycTransportStatusEvaluation { 934 output: MycTransportStatusOutput { 935 enabled: true, 936 status, 937 ready, 938 configured_relay_count, 939 required_available_relays, 940 available_relay_count, 941 unavailable_relay_count, 942 delivery_policy: snapshot.delivery_policy, 943 delivery_quorum: snapshot.delivery_quorum, 944 relay_probes, 945 }, 946 reasons, 947 }) 948 } 949 950 struct MycDiscoveryStatusEvaluation { 951 output: MycDiscoveryStatusOutput, 952 reasons: Vec<String>, 953 } 954 955 async fn collect_discovery_status( 956 runtime: &MycRuntime, 957 ) -> Result<MycDiscoveryStatusEvaluation, MycError> { 958 if !runtime.config().discovery.enabled { 959 return Ok(MycDiscoveryStatusEvaluation { 960 output: MycDiscoveryStatusOutput { 961 enabled: false, 962 status: MycRuntimeStatus::Healthy, 963 public_relays: MycDiscoveryRelayGroupStatusOutput { 964 configured_relay_count: 0, 965 available_relay_count: 0, 966 unavailable_relay_count: 0, 967 relay_probes: Vec::new(), 968 }, 969 publish_relays: MycDiscoveryRelayGroupStatusOutput { 970 configured_relay_count: 0, 971 available_relay_count: 0, 972 unavailable_relay_count: 0, 973 relay_probes: Vec::new(), 974 }, 975 }, 976 reasons: Vec::new(), 977 }); 978 } 979 980 let context = MycDiscoveryContext::from_runtime(runtime)?; 981 let public_relays = runtime 982 .config() 983 .discovery 984 .resolved_public_relays(&runtime.config().transport)?; 985 let public_relays = probe_relays( 986 context.app_identity(), 987 public_relays.as_slice(), 988 context.connect_timeout_secs(), 989 ) 990 .await?; 991 let publish_relays = probe_relays( 992 context.app_identity(), 993 context.publish_relays(), 994 context.connect_timeout_secs(), 995 ) 996 .await?; 997 let public_group = summarize_discovery_relay_group(public_relays); 998 let publish_group = summarize_discovery_relay_group(publish_relays); 999 1000 let status = 1001 if public_group.unavailable_relay_count > 0 || publish_group.unavailable_relay_count > 0 { 1002 MycRuntimeStatus::Degraded 1003 } else { 1004 MycRuntimeStatus::Healthy 1005 }; 1006 let mut reasons = Vec::new(); 1007 if public_group.unavailable_relay_count > 0 { 1008 reasons.push(format!( 1009 "{} discovery public relay(s) are unavailable", 1010 public_group.unavailable_relay_count 1011 )); 1012 } 1013 if publish_group.unavailable_relay_count > 0 { 1014 reasons.push(format!( 1015 "{} discovery publish relay(s) are unavailable", 1016 publish_group.unavailable_relay_count 1017 )); 1018 } 1019 1020 Ok(MycDiscoveryStatusEvaluation { 1021 output: MycDiscoveryStatusOutput { 1022 enabled: true, 1023 status, 1024 public_relays: public_group, 1025 publish_relays: publish_group, 1026 }, 1027 reasons, 1028 }) 1029 } 1030 1031 fn summarize_discovery_relay_group( 1032 relay_probes: Vec<MycRelayProbe>, 1033 ) -> MycDiscoveryRelayGroupStatusOutput { 1034 let configured_relay_count = relay_probes.len(); 1035 let available_relay_count = relay_probes 1036 .iter() 1037 .filter(|probe| probe.availability == MycRelayProbeAvailability::Available) 1038 .count(); 1039 let unavailable_relay_count = configured_relay_count.saturating_sub(available_relay_count); 1040 MycDiscoveryRelayGroupStatusOutput { 1041 configured_relay_count, 1042 available_relay_count, 1043 unavailable_relay_count, 1044 relay_probes, 1045 } 1046 } 1047 1048 fn collect_delivery_outbox_status( 1049 runtime: &MycRuntime, 1050 ) -> Result<MycDeliveryOutboxStatusEvaluation, MycError> { 1051 let outbox_records = runtime.delivery_outbox_store().list_all()?; 1052 let workflow_by_id = runtime 1053 .signer_backend() 1054 .list_publish_workflows()? 1055 .into_iter() 1056 .map(|workflow| (workflow.workflow_id.to_string(), workflow)) 1057 .collect::<BTreeMap<_, _>>(); 1058 let now_unix = now_unix_secs(); 1059 let stuck_after_secs = delivery_outbox_stuck_after_secs(runtime); 1060 let path = runtime.paths().delivery_outbox_path.clone(); 1061 let exists = path.exists(); 1062 let mut queued_job_count = 0usize; 1063 let mut published_pending_finalize_job_count = 0usize; 1064 let mut finalized_job_count = 0usize; 1065 let mut failed_job_count = 0usize; 1066 let mut unfinished_job_count = 0usize; 1067 let mut critical_unfinished_job_count = 0usize; 1068 let mut blocked_job_count = 0usize; 1069 let mut critical_blocked_job_count = 0usize; 1070 let mut oldest_unfinished_age_secs = None; 1071 let mut oldest_blocked_age_secs = None; 1072 1073 for record in &outbox_records { 1074 match record.status { 1075 MycDeliveryOutboxStatus::Queued => queued_job_count += 1, 1076 MycDeliveryOutboxStatus::PublishedPendingFinalize => { 1077 published_pending_finalize_job_count += 1; 1078 } 1079 MycDeliveryOutboxStatus::Finalized => finalized_job_count += 1, 1080 MycDeliveryOutboxStatus::Failed => failed_job_count += 1, 1081 } 1082 1083 if !is_delivery_outbox_unfinished(record) { 1084 continue; 1085 } 1086 1087 unfinished_job_count += 1; 1088 if is_critical_delivery_outbox_job(record) { 1089 critical_unfinished_job_count += 1; 1090 } 1091 let age_secs = delivery_outbox_record_age_secs(record, now_unix); 1092 oldest_unfinished_age_secs = 1093 Some(oldest_unfinished_age_secs.map_or(age_secs, |current: u64| current.max(age_secs))); 1094 1095 if let Some(is_critical) = classify_blocked_delivery_outbox_record( 1096 record, 1097 &workflow_by_id, 1098 age_secs, 1099 stuck_after_secs, 1100 ) { 1101 blocked_job_count += 1; 1102 if is_critical { 1103 critical_blocked_job_count += 1; 1104 } 1105 oldest_blocked_age_secs = Some( 1106 oldest_blocked_age_secs.map_or(age_secs, |current: u64| current.max(age_secs)), 1107 ); 1108 } 1109 } 1110 1111 let last_recovery = latest_delivery_recovery_status(runtime)?; 1112 let mut reasons = Vec::new(); 1113 if !exists { 1114 reasons.push(format!( 1115 "delivery outbox persistence file at {} is missing", 1116 path.display() 1117 )); 1118 } 1119 if critical_blocked_job_count > 0 { 1120 reasons.push(format!( 1121 "{critical_blocked_job_count} critical delivery outbox job(s) are blocked" 1122 )); 1123 } 1124 let noncritical_blocked_job_count = 1125 blocked_job_count.saturating_sub(critical_blocked_job_count); 1126 if noncritical_blocked_job_count > 0 { 1127 reasons.push(format!( 1128 "{noncritical_blocked_job_count} non-critical delivery outbox job(s) are blocked" 1129 )); 1130 } 1131 1132 let (status, ready) = if !exists || critical_blocked_job_count > 0 { 1133 (MycRuntimeStatus::Unready, false) 1134 } else if blocked_job_count > 0 { 1135 (MycRuntimeStatus::Degraded, true) 1136 } else { 1137 (MycRuntimeStatus::Healthy, true) 1138 }; 1139 1140 Ok(MycDeliveryOutboxStatusEvaluation { 1141 output: MycDeliveryOutboxStatusOutput { 1142 status, 1143 ready, 1144 path, 1145 exists, 1146 total_job_count: outbox_records.len(), 1147 queued_job_count, 1148 published_pending_finalize_job_count, 1149 finalized_job_count, 1150 failed_job_count, 1151 unfinished_job_count, 1152 critical_unfinished_job_count, 1153 blocked_job_count, 1154 critical_blocked_job_count, 1155 stuck_after_secs, 1156 oldest_unfinished_age_secs, 1157 oldest_blocked_age_secs, 1158 last_recovery, 1159 }, 1160 reasons, 1161 }) 1162 } 1163 1164 fn latest_delivery_recovery_status( 1165 runtime: &MycRuntime, 1166 ) -> Result<Option<MycDeliveryRecoveryStatusOutput>, MycError> { 1167 let latest = runtime 1168 .operation_audit_store() 1169 .list_all()? 1170 .into_iter() 1171 .filter(|record| record.operation == MycOperationAuditKind::DeliveryRecovery) 1172 .max_by_key(|record| record.recorded_at_unix); 1173 Ok(latest.map(|record| MycDeliveryRecoveryStatusOutput { 1174 recorded_at_unix: record.recorded_at_unix, 1175 outcome: record.outcome, 1176 summary: record.relay_outcome_summary, 1177 })) 1178 } 1179 1180 fn delivery_outbox_stuck_after_secs(runtime: &MycRuntime) -> u64 { 1181 let transport = &runtime.config().transport; 1182 let mut total_millis = transport 1183 .connect_timeout_secs 1184 .saturating_mul(1000) 1185 .saturating_mul(transport.publish_max_attempts as u64); 1186 for completed_attempt in 1..transport.publish_max_attempts { 1187 total_millis = 1188 total_millis.saturating_add(delivery_outbox_backoff_millis(runtime, completed_attempt)); 1189 } 1190 total_millis.saturating_add(999) / 1000 1191 } 1192 1193 fn delivery_outbox_backoff_millis(runtime: &MycRuntime, completed_attempt_number: usize) -> u64 { 1194 let transport = &runtime.config().transport; 1195 let exponent = completed_attempt_number.saturating_sub(1) as u32; 1196 let multiplier = 1u64.checked_shl(exponent).unwrap_or(u64::MAX); 1197 let scaled = transport 1198 .publish_initial_backoff_millis 1199 .saturating_mul(multiplier); 1200 scaled.min(transport.publish_max_backoff_millis) 1201 } 1202 1203 fn is_delivery_outbox_unfinished(record: &MycDeliveryOutboxRecord) -> bool { 1204 matches!( 1205 record.status, 1206 MycDeliveryOutboxStatus::Queued | MycDeliveryOutboxStatus::PublishedPendingFinalize 1207 ) 1208 } 1209 1210 fn is_critical_delivery_outbox_job(record: &MycDeliveryOutboxRecord) -> bool { 1211 record.kind != crate::outbox::MycDeliveryOutboxKind::DiscoveryHandlerPublish 1212 } 1213 1214 fn delivery_outbox_record_age_secs(record: &MycDeliveryOutboxRecord, now_unix: u64) -> u64 { 1215 now_unix.saturating_sub(record.updated_at_unix) 1216 } 1217 1218 fn classify_blocked_delivery_outbox_record( 1219 record: &MycDeliveryOutboxRecord, 1220 workflow_by_id: &BTreeMap<String, RadrootsNostrSignerPublishWorkflowRecord>, 1221 age_secs: u64, 1222 stuck_after_secs: u64, 1223 ) -> Option<bool> { 1224 if !is_delivery_outbox_unfinished(record) { 1225 return None; 1226 } 1227 1228 let is_critical = is_critical_delivery_outbox_job(record); 1229 match record.kind { 1230 crate::outbox::MycDeliveryOutboxKind::DiscoveryHandlerPublish => { 1231 if record.signer_publish_workflow_id.is_some() { 1232 return Some(false); 1233 } 1234 } 1235 crate::outbox::MycDeliveryOutboxKind::ConnectAcceptPublish 1236 | crate::outbox::MycDeliveryOutboxKind::AuthReplayPublish => { 1237 if record.signer_publish_workflow_id.is_none() { 1238 return Some(true); 1239 } 1240 } 1241 crate::outbox::MycDeliveryOutboxKind::ListenerResponsePublish => {} 1242 } 1243 1244 if let Some(workflow_id) = record.signer_publish_workflow_id.as_ref() { 1245 let Some(workflow) = workflow_by_id.get(workflow_id.as_str()) else { 1246 return Some(is_critical); 1247 }; 1248 let expected_state = match record.status { 1249 MycDeliveryOutboxStatus::Queued => { 1250 RadrootsNostrSignerPublishWorkflowState::PendingPublish 1251 } 1252 MycDeliveryOutboxStatus::PublishedPendingFinalize => { 1253 RadrootsNostrSignerPublishWorkflowState::PublishedPendingFinalize 1254 } 1255 MycDeliveryOutboxStatus::Finalized | MycDeliveryOutboxStatus::Failed => { 1256 return None; 1257 } 1258 }; 1259 if workflow.state != expected_state { 1260 return Some(is_critical); 1261 } 1262 } 1263 1264 if age_secs > stuck_after_secs { 1265 return Some(is_critical); 1266 } 1267 1268 None 1269 } 1270 1271 fn combine_runtime_status( 1272 transport_status: MycRuntimeStatus, 1273 discovery_status: Option<MycRuntimeStatus>, 1274 ) -> MycRuntimeStatus { 1275 let mut status = transport_status; 1276 if let Some(discovery_status) = discovery_status { 1277 status = worse_runtime_status(status, discovery_status); 1278 } 1279 status 1280 } 1281 1282 fn worse_runtime_status(left: MycRuntimeStatus, right: MycRuntimeStatus) -> MycRuntimeStatus { 1283 use MycRuntimeStatus::{Degraded, Healthy, Unready}; 1284 match (left, right) { 1285 (Unready, _) | (_, Unready) => Unready, 1286 (Degraded, _) | (_, Degraded) => Degraded, 1287 _ => Healthy, 1288 } 1289 } 1290 1291 fn required_available_relays( 1292 snapshot: &MycTransportSnapshot, 1293 configured_relay_count: usize, 1294 ) -> Result<usize, MycError> { 1295 match snapshot.delivery_policy { 1296 MycTransportDeliveryPolicy::Any => Ok(1), 1297 MycTransportDeliveryPolicy::All => Ok(configured_relay_count), 1298 MycTransportDeliveryPolicy::Quorum => snapshot.delivery_quorum.ok_or_else(|| { 1299 MycError::InvalidConfig( 1300 "transport.delivery_quorum must be set when transport.delivery_policy is `quorum`" 1301 .to_owned(), 1302 ) 1303 }), 1304 } 1305 } 1306 1307 async fn probe_relays( 1308 identity: &MycActiveIdentity, 1309 relays: &[RadrootsNostrRelayUrl], 1310 connect_timeout_secs: u64, 1311 ) -> Result<Vec<MycRelayProbe>, MycError> { 1312 let relay_count = relays.len(); 1313 if relay_count == 0 { 1314 return Ok(Vec::new()); 1315 } 1316 1317 let mut pending = relays 1318 .iter() 1319 .cloned() 1320 .enumerate() 1321 .collect::<Vec<_>>() 1322 .into_iter(); 1323 let mut join_set = JoinSet::new(); 1324 let max_concurrency = relay_count.min(MYC_RELAY_PROBE_CONCURRENCY_LIMIT); 1325 1326 while join_set.len() < max_concurrency { 1327 let Some((relay_index, relay)) = pending.next() else { 1328 break; 1329 }; 1330 let identity = identity.clone(); 1331 join_set.spawn(async move { 1332 let probe = probe_relay(identity, relay.clone(), connect_timeout_secs).await; 1333 (relay_index, probe) 1334 }); 1335 } 1336 1337 let mut probes = std::iter::repeat_with(|| None) 1338 .take(relay_count) 1339 .collect::<Vec<Option<MycRelayProbe>>>(); 1340 1341 while let Some(joined) = join_set.join_next().await { 1342 let (relay_index, probe_result) = joined.map_err(|error| { 1343 MycError::InvalidOperation(format!("relay probe task failed: {error}")) 1344 })?; 1345 probes[relay_index] = Some(probe_result?); 1346 while join_set.len() < max_concurrency { 1347 let Some((relay_index, relay)) = pending.next() else { 1348 break; 1349 }; 1350 let identity = identity.clone(); 1351 join_set.spawn(async move { 1352 let probe = probe_relay(identity, relay.clone(), connect_timeout_secs).await; 1353 (relay_index, probe) 1354 }); 1355 } 1356 } 1357 1358 probes 1359 .into_iter() 1360 .map(|probe| { 1361 probe.ok_or_else(|| MycError::InvalidOperation("missing relay probe result".to_owned())) 1362 }) 1363 .collect() 1364 } 1365 1366 async fn probe_relay( 1367 identity: MycActiveIdentity, 1368 relay: RadrootsNostrRelayUrl, 1369 connect_timeout_secs: u64, 1370 ) -> Result<MycRelayProbe, MycError> { 1371 let relay_url = relay.to_string(); 1372 let client = identity.nostr_client_owned(); 1373 client 1374 .add_relay(relay.as_str()) 1375 .await 1376 .map_err(MycError::from)?; 1377 1378 match client 1379 .try_connect_relay(relay.as_str(), Duration::from_secs(connect_timeout_secs)) 1380 .await 1381 { 1382 Ok(_) => { 1383 let relays = client.relays().await; 1384 let relay_state = relays.get(&relay).ok_or_else(|| { 1385 MycError::InvalidOperation(format!( 1386 "connected relay `{relay_url}` did not appear in the relay map" 1387 )) 1388 })?; 1389 Ok(MycRelayProbe { 1390 relay_url, 1391 availability: MycRelayProbeAvailability::Available, 1392 relay_status: Some(relay_status_label(relay_state.status())), 1393 connection_attempts: relay_state.stats().attempts(), 1394 successful_connections: relay_state.stats().success(), 1395 latency_ms: relay_state 1396 .stats() 1397 .latency() 1398 .map(|duration| duration.as_millis() as u64), 1399 queue_depth: relay_state.queue(), 1400 error: None, 1401 }) 1402 } 1403 Err(error) => Ok(MycRelayProbe { 1404 relay_url, 1405 availability: MycRelayProbeAvailability::Unavailable, 1406 relay_status: None, 1407 connection_attempts: 0, 1408 successful_connections: 0, 1409 latency_ms: None, 1410 queue_depth: 0, 1411 error: Some(error.to_string()), 1412 }), 1413 } 1414 } 1415 1416 fn relay_status_label(status: RadrootsNostrRelayStatus) -> String { 1417 status.to_string().to_ascii_lowercase() 1418 } 1419 1420 fn inspect_signer_state_sqlite_schema(path: &Path) -> MycSqliteSchemaStatusOutput { 1421 inspect_sqlite_schema( 1422 path, 1423 Some("SELECT store_version FROM signer_store_metadata WHERE singleton_id = 1"), 1424 ) 1425 } 1426 1427 fn inspect_runtime_audit_sqlite_schema(path: &Path) -> MycSqliteSchemaStatusOutput { 1428 inspect_sqlite_schema(path, None) 1429 } 1430 1431 fn inspect_sqlite_schema( 1432 path: &Path, 1433 store_version_sql: Option<&str>, 1434 ) -> MycSqliteSchemaStatusOutput { 1435 let outcome = (|| -> Result<MycSqliteSchemaStatusOutput, String> { 1436 if !path.exists() { 1437 return Err("sqlite persistence file is missing".to_owned()); 1438 } 1439 let executor = SqliteExecutor::open(path).map_err(|error| error.to_string())?; 1440 let applied_count = query_sqlite_rows::<MycSqliteAppliedCountRow>( 1441 &executor, 1442 "SELECT COUNT(*) AS applied_count FROM __migrations", 1443 )? 1444 .into_iter() 1445 .next() 1446 .ok_or_else(|| "sqlite migrations query returned no rows".to_owned())? 1447 .applied_count; 1448 let latest_migration = query_sqlite_rows::<MycSqliteNamedRow>( 1449 &executor, 1450 "SELECT name FROM __migrations ORDER BY rowid DESC LIMIT 1", 1451 )? 1452 .into_iter() 1453 .next() 1454 .map(|row| row.name); 1455 let journal_mode = 1456 query_sqlite_rows::<MycSqliteJournalModeRow>(&executor, "PRAGMA journal_mode")? 1457 .into_iter() 1458 .next() 1459 .ok_or_else(|| "sqlite journal mode query returned no rows".to_owned())? 1460 .journal_mode; 1461 let store_version = if let Some(sql) = store_version_sql { 1462 query_sqlite_rows::<MycSqliteStoreVersionRow>(&executor, sql)? 1463 .into_iter() 1464 .next() 1465 .map(|row| { 1466 u32::try_from(row.store_version) 1467 .map_err(|_| "sqlite store_version is out of range".to_owned()) 1468 }) 1469 .transpose()? 1470 } else { 1471 None 1472 }; 1473 1474 Ok(MycSqliteSchemaStatusOutput { 1475 ready: true, 1476 applied_migration_count: Some(applied_count as usize), 1477 latest_migration, 1478 journal_mode: Some(journal_mode), 1479 store_version, 1480 error: None, 1481 }) 1482 })(); 1483 1484 match outcome { 1485 Ok(output) => output, 1486 Err(error) => MycSqliteSchemaStatusOutput { 1487 ready: false, 1488 applied_migration_count: None, 1489 latest_migration: None, 1490 journal_mode: None, 1491 store_version: None, 1492 error: Some(error), 1493 }, 1494 } 1495 } 1496 1497 fn query_sqlite_rows<T>(executor: &SqliteExecutor, sql: &str) -> Result<Vec<T>, String> 1498 where 1499 T: for<'de> Deserialize<'de>, 1500 { 1501 let raw = executor 1502 .query_raw(sql, "[]") 1503 .map_err(|error| error.to_string())?; 1504 serde_json::from_str(&raw).map_err(|error| error.to_string()) 1505 } 1506 1507 fn push_counter(lines: &mut Vec<String>, name: &str, value: usize) { 1508 lines.push(format!("{name} {value}")); 1509 } 1510 1511 fn push_labeled_counter( 1512 lines: &mut Vec<String>, 1513 name: &str, 1514 label_key: &str, 1515 label_value: &str, 1516 value: usize, 1517 ) { 1518 lines.push(format!(r#"{name}{{{label_key}="{label_value}"}} {value}"#)); 1519 } 1520 1521 fn push_outcome_counters(lines: &mut Vec<String>, name: &str, counts: &MycOperationOutcomeCounts) { 1522 push_labeled_counter(lines, name, "outcome", "succeeded", counts.succeeded); 1523 push_labeled_counter(lines, name, "outcome", "rejected", counts.rejected); 1524 push_labeled_counter(lines, name, "outcome", "restored", counts.restored); 1525 push_labeled_counter(lines, name, "outcome", "unavailable", counts.unavailable); 1526 push_labeled_counter(lines, name, "outcome", "missing", counts.missing); 1527 push_labeled_counter(lines, name, "outcome", "matched", counts.matched); 1528 push_labeled_counter(lines, name, "outcome", "drifted", counts.drifted); 1529 push_labeled_counter(lines, name, "outcome", "conflicted", counts.conflicted); 1530 push_labeled_counter(lines, name, "outcome", "skipped", counts.skipped); 1531 } 1532 1533 fn push_outcome_counters_with_extra_label( 1534 lines: &mut Vec<String>, 1535 name: &str, 1536 extra_label_key: &str, 1537 extra_label_value: &str, 1538 counts: &MycOperationOutcomeCounts, 1539 ) { 1540 push_labeled_counter_pair( 1541 lines, 1542 name, 1543 extra_label_key, 1544 extra_label_value, 1545 "outcome", 1546 "succeeded", 1547 counts.succeeded, 1548 ); 1549 push_labeled_counter_pair( 1550 lines, 1551 name, 1552 extra_label_key, 1553 extra_label_value, 1554 "outcome", 1555 "rejected", 1556 counts.rejected, 1557 ); 1558 push_labeled_counter_pair( 1559 lines, 1560 name, 1561 extra_label_key, 1562 extra_label_value, 1563 "outcome", 1564 "restored", 1565 counts.restored, 1566 ); 1567 push_labeled_counter_pair( 1568 lines, 1569 name, 1570 extra_label_key, 1571 extra_label_value, 1572 "outcome", 1573 "unavailable", 1574 counts.unavailable, 1575 ); 1576 push_labeled_counter_pair( 1577 lines, 1578 name, 1579 extra_label_key, 1580 extra_label_value, 1581 "outcome", 1582 "missing", 1583 counts.missing, 1584 ); 1585 push_labeled_counter_pair( 1586 lines, 1587 name, 1588 extra_label_key, 1589 extra_label_value, 1590 "outcome", 1591 "matched", 1592 counts.matched, 1593 ); 1594 push_labeled_counter_pair( 1595 lines, 1596 name, 1597 extra_label_key, 1598 extra_label_value, 1599 "outcome", 1600 "drifted", 1601 counts.drifted, 1602 ); 1603 push_labeled_counter_pair( 1604 lines, 1605 name, 1606 extra_label_key, 1607 extra_label_value, 1608 "outcome", 1609 "conflicted", 1610 counts.conflicted, 1611 ); 1612 push_labeled_counter_pair( 1613 lines, 1614 name, 1615 extra_label_key, 1616 extra_label_value, 1617 "outcome", 1618 "skipped", 1619 counts.skipped, 1620 ); 1621 } 1622 1623 fn push_labeled_counter_pair( 1624 lines: &mut Vec<String>, 1625 name: &str, 1626 first_key: &str, 1627 first_value: &str, 1628 second_key: &str, 1629 second_value: &str, 1630 value: usize, 1631 ) { 1632 lines.push(format!( 1633 r#"{name}{{{first_key}="{first_value}",{second_key}="{second_value}"}} {value}"# 1634 )); 1635 } 1636 1637 #[cfg(test)] 1638 mod tests { 1639 use std::collections::BTreeMap; 1640 use std::path::Path; 1641 use std::path::PathBuf; 1642 1643 use nostr::PublicKey; 1644 use radroots_identity::RadrootsIdentity; 1645 use radroots_nostr_signer::prelude::{ 1646 RadrootsNostrSignerApprovalRequirement, RadrootsNostrSignerConnectionDraft, 1647 RadrootsNostrSignerRequestDecision, 1648 }; 1649 1650 use super::{ 1651 MYC_SIGNER_STATUS_CONTRACT_VERSION, MycMetricsSnapshot, MycOperationOutcomeCounts, 1652 MycRuntimeStatus, collect_metrics, collect_status_full, collect_status_signer, 1653 inspect_runtime_audit_sqlite_schema, render_metrics_text, worse_runtime_status, 1654 }; 1655 use crate::app::{MycRuntime, MycRuntimePaths}; 1656 use crate::audit::{MycOperationAuditKind, MycOperationAuditOutcome, MycOperationAuditRecord}; 1657 use crate::config::{MycConfig, MycRuntimeAuditBackend}; 1658 1659 fn write_test_identity(path: &Path, secret_key: &str) { 1660 let identity = 1661 RadrootsIdentity::from_secret_key_str(secret_key).expect("identity from secret"); 1662 crate::identity_files::store_encrypted_identity(path, &identity).expect("write identity"); 1663 } 1664 1665 #[test] 1666 fn runtime_status_prefers_the_worst_state() { 1667 assert_eq!( 1668 worse_runtime_status(MycRuntimeStatus::Healthy, MycRuntimeStatus::Degraded), 1669 MycRuntimeStatus::Degraded 1670 ); 1671 assert_eq!( 1672 worse_runtime_status(MycRuntimeStatus::Healthy, MycRuntimeStatus::Unready), 1673 MycRuntimeStatus::Unready 1674 ); 1675 assert_eq!( 1676 worse_runtime_status(MycRuntimeStatus::Degraded, MycRuntimeStatus::Healthy), 1677 MycRuntimeStatus::Degraded 1678 ); 1679 } 1680 1681 #[test] 1682 fn metrics_text_renderer_is_deterministic() { 1683 let metrics = MycMetricsSnapshot { 1684 signer_request_total: 3, 1685 signer_request_decisions: super::MycAuditDecisionCounts { 1686 allowed: 1, 1687 denied: 1, 1688 challenged: 1, 1689 }, 1690 runtime_operation_total: 2, 1691 runtime_operation_outcomes: MycOperationOutcomeCounts { 1692 succeeded: 1, 1693 rejected: 1, 1694 ..MycOperationOutcomeCounts::default() 1695 }, 1696 runtime_operation_by_kind: BTreeMap::from([( 1697 "listener_response_publish".to_owned(), 1698 MycOperationOutcomeCounts { 1699 succeeded: 1, 1700 ..MycOperationOutcomeCounts::default() 1701 }, 1702 )]), 1703 runtime_aggregate_publish_rejection_count: 1, 1704 runtime_repair_success_count: 0, 1705 runtime_repair_rejection_count: 0, 1706 runtime_unavailable_count: 0, 1707 runtime_replay_restore_count: 0, 1708 delivery_recovery_success_count: 1, 1709 delivery_recovery_rejection_count: 0, 1710 delivery_outbox_total: 2, 1711 delivery_outbox_queued_count: 1, 1712 delivery_outbox_published_pending_finalize_count: 0, 1713 delivery_outbox_failed_count: 1, 1714 delivery_outbox_finalized_count: 0, 1715 delivery_outbox_unfinished_count: 1, 1716 delivery_outbox_critical_unfinished_count: 1, 1717 delivery_outbox_blocked_count: 0, 1718 delivery_outbox_critical_blocked_count: 0, 1719 }; 1720 1721 let rendered = render_metrics_text(&metrics); 1722 1723 assert!(rendered.contains("myc_signer_request_total 3")); 1724 assert!(rendered.contains( 1725 r#"myc_runtime_operation_kind_total{kind="listener_response_publish",outcome="succeeded"} 1"# 1726 )); 1727 assert!(rendered.contains("myc_delivery_recovery_success_total 1")); 1728 assert!(rendered.contains("myc_delivery_outbox_total 2")); 1729 } 1730 1731 #[test] 1732 fn runtime_audit_sqlite_schema_status_reports_missing_file() { 1733 let temp = tempfile::tempdir().expect("tempdir"); 1734 let status = inspect_runtime_audit_sqlite_schema( 1735 MycRuntimePaths::runtime_audit_path_for_backend( 1736 PathBuf::from(temp.path()).as_path(), 1737 MycRuntimeAuditBackend::Sqlite, 1738 ) 1739 .as_path(), 1740 ); 1741 1742 assert!(!status.ready); 1743 assert_eq!( 1744 status.error.as_deref(), 1745 Some("sqlite persistence file is missing") 1746 ); 1747 } 1748 1749 #[test] 1750 fn collect_metrics_uses_live_state_after_bootstrap() { 1751 let temp = tempfile::tempdir().expect("tempdir"); 1752 let mut config = MycConfig::default(); 1753 config.paths.state_dir = temp.path().join("state"); 1754 config.paths.signer_identity_path = temp.path().join("signer.json"); 1755 config.paths.user_identity_path = temp.path().join("user.json"); 1756 write_test_identity( 1757 &config.paths.signer_identity_path, 1758 "1111111111111111111111111111111111111111111111111111111111111111", 1759 ); 1760 write_test_identity( 1761 &config.paths.user_identity_path, 1762 "2222222222222222222222222222222222222222222222222222222222222222", 1763 ); 1764 1765 let runtime = MycRuntime::bootstrap(config.clone()).expect("runtime"); 1766 let manager = runtime.signer_manager().expect("manager"); 1767 let client_public_key = 1768 PublicKey::parse("7777777777777777777777777777777777777777777777777777777777777777") 1769 .expect("client public key"); 1770 let connection = manager 1771 .register_connection( 1772 RadrootsNostrSignerConnectionDraft::new( 1773 client_public_key, 1774 runtime.user_public_identity(), 1775 ) 1776 .with_approval_requirement(RadrootsNostrSignerApprovalRequirement::NotRequired), 1777 ) 1778 .expect("register connection"); 1779 manager 1780 .record_request( 1781 &connection.connection_id, 1782 "req-live-metrics", 1783 radroots_nostr_connect::prelude::RadrootsNostrConnectMethod::Ping, 1784 RadrootsNostrSignerRequestDecision::Allowed, 1785 None, 1786 ) 1787 .expect("record request"); 1788 runtime.record_operation_audit(&MycOperationAuditRecord::new( 1789 MycOperationAuditKind::DeliveryRecovery, 1790 MycOperationAuditOutcome::Succeeded, 1791 None, 1792 None, 1793 1, 1794 1, 1795 "startup recovery succeeded", 1796 )); 1797 drop(runtime); 1798 1799 let runtime = MycRuntime::bootstrap(config).expect("runtime restart"); 1800 std::fs::remove_file(&runtime.paths().signer_state_path).expect("remove signer state"); 1801 std::fs::remove_file(&runtime.paths().runtime_audit_path).expect("remove runtime audit"); 1802 1803 let metrics = collect_metrics(&runtime).expect("collect metrics"); 1804 1805 assert_eq!(metrics.signer_request_total, 1); 1806 assert_eq!(metrics.signer_request_decisions.allowed, 1); 1807 assert_eq!(metrics.runtime_operation_total, 1); 1808 assert_eq!(metrics.runtime_operation_outcomes.succeeded, 1); 1809 assert_eq!(metrics.delivery_recovery_success_count, 1); 1810 } 1811 1812 #[tokio::test(flavor = "current_thread")] 1813 async fn status_full_reports_signer_backend_capabilities() { 1814 use radroots_nostr_signer::prelude::{ 1815 RadrootsNostrSignerBackend, RadrootsNostrSignerConnectionDraft, 1816 }; 1817 1818 let temp = tempfile::tempdir().expect("tempdir"); 1819 let mut config = MycConfig::default(); 1820 config.paths.state_dir = temp.path().join("state"); 1821 config.paths.signer_identity_path = temp.path().join("signer.json"); 1822 config.paths.user_identity_path = temp.path().join("user.json"); 1823 write_test_identity( 1824 &config.paths.signer_identity_path, 1825 "1111111111111111111111111111111111111111111111111111111111111111", 1826 ); 1827 write_test_identity( 1828 &config.paths.user_identity_path, 1829 "2222222222222222222222222222222222222222222222222222222222222222", 1830 ); 1831 1832 let runtime = MycRuntime::bootstrap(config).expect("runtime"); 1833 let backend = runtime.signer_backend(); 1834 let connection = backend 1835 .register_connection(RadrootsNostrSignerConnectionDraft::new( 1836 nostr::Keys::generate().public_key(), 1837 runtime.user_public_identity(), 1838 )) 1839 .expect("register connection"); 1840 1841 let status = collect_status_full(&runtime).await.expect("status"); 1842 assert!( 1843 status 1844 .signer_backend 1845 .local_signer 1846 .expect("local signer") 1847 .is_secret_backed() 1848 ); 1849 assert_eq!(status.signer_backend.remote_session_count, 1); 1850 assert_eq!(status.signer_backend.remote_sessions.len(), 1); 1851 assert_eq!( 1852 status.signer_backend.remote_sessions[0].connection_id, 1853 connection.connection_id 1854 ); 1855 } 1856 1857 #[test] 1858 fn status_signer_reports_remote_sessions_without_transport_diagnostics() { 1859 use radroots_nostr_signer::prelude::RadrootsNostrSignerBackend; 1860 1861 let temp = tempfile::tempdir().expect("tempdir"); 1862 let mut config = MycConfig::default(); 1863 config.paths.state_dir = temp.path().join("state"); 1864 config.paths.signer_identity_path = temp.path().join("signer.json"); 1865 config.paths.user_identity_path = temp.path().join("user.json"); 1866 config.transport.enabled = true; 1867 config.transport.relays = vec!["ws://127.0.0.1:9".to_owned()]; 1868 config.transport.connect_timeout_secs = 99; 1869 write_test_identity( 1870 &config.paths.signer_identity_path, 1871 "1111111111111111111111111111111111111111111111111111111111111111", 1872 ); 1873 write_test_identity( 1874 &config.paths.user_identity_path, 1875 "2222222222222222222222222222222222222222222222222222222222222222", 1876 ); 1877 1878 let runtime = MycRuntime::bootstrap(config).expect("runtime"); 1879 let backend = runtime.signer_backend(); 1880 let connection = backend 1881 .register_connection(RadrootsNostrSignerConnectionDraft::new( 1882 nostr::Keys::generate().public_key(), 1883 runtime.user_public_identity(), 1884 )) 1885 .expect("register connection"); 1886 1887 let status = collect_status_signer(&runtime).expect("status"); 1888 1889 assert_eq!( 1890 status.status_contract_version, 1891 MYC_SIGNER_STATUS_CONTRACT_VERSION 1892 ); 1893 assert_eq!(status.status, MycRuntimeStatus::Healthy); 1894 assert!(status.ready); 1895 assert!(status.reasons.is_empty()); 1896 assert_eq!( 1897 status.custody.signer.public_key_hex.as_deref(), 1898 Some("4f355bdcb7cc0af728ef3cceb9615d90684bb5b2ca5f859ab0f0b704075871aa") 1899 ); 1900 assert_eq!( 1901 status.custody.user.public_key_hex.as_deref(), 1902 Some("466d7fcae563e5cb09a0d1870bb580344804617879a14949cf22285f1bae3f27") 1903 ); 1904 assert!( 1905 status 1906 .signer_backend 1907 .local_signer 1908 .as_ref() 1909 .expect("local signer") 1910 .is_secret_backed() 1911 ); 1912 assert_eq!(status.signer_backend.remote_session_count, 1); 1913 assert_eq!(status.signer_backend.remote_sessions.len(), 1); 1914 assert_eq!( 1915 status.signer_backend.remote_sessions[0].connection_id, 1916 connection.connection_id 1917 ); 1918 } 1919 }