myc

Self-custodial remote signer for Radroots apps
git clone https://radroots.dev/git/myc.git
Log | Files | Refs | README | LICENSE

commit f6b0d948a6fbdb9ef3db58ea7f8af591ba8bc7c6
parent 190d69f7b8d3cb76d92df2c2fb84b04509ccb66b
Author: triesap <tyson@radroots.org>
Date:   Wed, 25 Mar 2026 21:06:28 +0000

admin: add local observability server

- add loopback-only observability config with env parsing and runtime snapshot fields
- serve read-only healthz readyz status and metrics endpoints from the shared operability model
- run the admin server alongside the signer service with coordinated shutdown and error propagation
- prove loopback validation and endpoint behavior with config and operability server tests

Diffstat:
MCargo.lock | 182+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
MCargo.toml | 1+
Msrc/app/runtime.rs | 82++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
Msrc/config.rs | 79+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Msrc/error.rs | 13+++++++++++++
Msrc/lib.rs | 4++--
Msrc/operability/mod.rs | 2++
Asrc/operability/server.rs | 103+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Atests/operability_server.rs | 279+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
9 files changed, 740 insertions(+), 5 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -176,12 +176,66 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef49f5882e4b6afaac09ad239a4f8c70a24b8f2b0897edb1f706008efd109cf4" [[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + +[[package]] name = "autocfg" version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" [[package]] +name = "axum" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8" +dependencies = [ + "axum-core", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde_core", + "serde_json", + "serde_path_to_error", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "sync_wrapper", + "tower-layer", + "tower-service", +] + +[[package]] name = "base64" version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -801,12 +855,77 @@ dependencies = [ ] [[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http", +] + +[[package]] +name = "http-body-util" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" +dependencies = [ + "bytes", + "futures-core", + "http", + "http-body", + "pin-project-lite", +] + +[[package]] name = "httparse" version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" [[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + +[[package]] +name = "hyper" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ab2d4f250c3d7b1c9fcdff1cece94ea4e2dfbec68614f7b87cb205f24ca9d11" +dependencies = [ + "atomic-waker", + "bytes", + "futures-channel", + "futures-core", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "pin-utils", + "smallvec", + "tokio", +] + +[[package]] +name = "hyper-util" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" +dependencies = [ + "bytes", + "http", + "http-body", + "hyper", + "pin-project-lite", + "tokio", + "tower-service", +] + +[[package]] name = "iana-time-zone" version = "0.1.65" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1057,12 +1176,24 @@ dependencies = [ ] [[package]] +name = "matchit" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" + +[[package]] name = "memchr" version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" [[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + +[[package]] name = "minimal-lexical" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1083,6 +1214,7 @@ dependencies = [ name = "myc" version = "0.1.0" dependencies = [ + "axum", "clap", "futures-util", "nostr", @@ -1331,6 +1463,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" [[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] name = "poly1305" version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1788,6 +1926,17 @@ dependencies = [ ] [[package]] +name = "serde_path_to_error" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457" +dependencies = [ + "itoa", + "serde", + "serde_core", +] + +[[package]] name = "serde_spanned" version = "0.6.9" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1895,6 +2044,12 @@ dependencies = [ ] [[package]] +name = "sync_wrapper" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" + +[[package]] name = "synstructure" version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2139,6 +2294,33 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" [[package]] +name = "tower" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper", + "tokio", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + +[[package]] +name = "tower-service" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" + +[[package]] name = "tracing" version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" diff --git a/Cargo.toml b/Cargo.toml @@ -14,6 +14,7 @@ resolver = "2" unexpected_cfgs = { level = "warn", check-cfg = ['cfg(coverage_nightly)'] } [dependencies] +axum = { version = "0.8", default-features = false, features = ["http1", "json", "tokio"] } clap = { version = "4.5", features = ["derive"] } nostr = { version = "0.44.2", features = ["nip04", "nip44"] } radroots-identity = { path = "../lib/crates/identity" } diff --git a/src/app/runtime.rs b/src/app/runtime.rs @@ -1,10 +1,12 @@ use std::fs; use std::future::Future; +use std::net::SocketAddr; use std::path::{Path, PathBuf}; use crate::audit::{MycOperationAuditRecord, MycOperationAuditStore}; use crate::config::{MycAuditConfig, MycConfig}; use crate::error::MycError; +use crate::operability::server::run_observability_server; use crate::policy::MycPolicyContext; use crate::transport::{MycNip46Service, MycNostrTransport, MycTransportSnapshot}; use radroots_identity::{RadrootsIdentity, RadrootsIdentityPublic}; @@ -28,6 +30,8 @@ pub struct MycRuntimePaths { pub struct MycStartupSnapshot { pub instance_name: String, pub log_filter: String, + pub observability_enabled: bool, + pub observability_bind_addr: SocketAddr, pub state_dir: PathBuf, pub audit_dir: PathBuf, pub signer_identity_path: PathBuf, @@ -130,6 +134,8 @@ impl MycRuntime { MycStartupSnapshot { instance_name: self.config.service.instance_name.clone(), log_filter: self.config.logging.filter.clone(), + observability_enabled: self.config.observability.enabled, + observability_bind_addr: self.config.observability.bind_addr, state_dir: self.paths.state_dir.clone(), audit_dir: self.paths.audit_dir.clone(), signer_identity_path: self.paths.signer_identity_path.clone(), @@ -167,18 +173,49 @@ impl MycRuntime { signer_public_key_hex = %snapshot.signer_public_key_hex, user_identity_id = %snapshot.user_identity_id, user_public_key_hex = %snapshot.user_public_key_hex, + observability_enabled = snapshot.observability_enabled, + observability_bind_addr = %snapshot.observability_bind_addr, transport_enabled = snapshot.transport.enabled, transport_relay_count = snapshot.transport.relay_count, transport_connect_timeout_secs = snapshot.transport.connect_timeout_secs, "myc runtime bootstrapped" ); + let mut tasks = tokio::task::JoinSet::new(); + let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); if let Some(transport) = self.transport.clone() { let service = MycNip46Service::new(self.signer_context(), transport); - return service.run_until(shutdown).await; + let shutdown = observe_shutdown_signal(shutdown_rx.clone()); + tasks.spawn(async move { service.run_until(shutdown).await }); } + if self.config.observability.enabled { + let runtime = self.clone(); + let shutdown = observe_shutdown_signal(shutdown_rx); + tasks.spawn(async move { run_observability_server(runtime, shutdown).await }); + } + tokio::pin!(shutdown); - shutdown.await; - Ok(()) + if tasks.is_empty() { + shutdown.await; + return Ok(()); + } + + tokio::select! { + _ = &mut shutdown => { + let _ = shutdown_tx.send(true); + drain_runtime_tasks(tasks).await + } + joined = tasks.join_next() => { + let _ = shutdown_tx.send(true); + let first_result = match joined { + Some(result) => result.map_err(|error| { + MycError::InvalidOperation(format!("myc runtime task failed: {error}")) + })?, + None => Ok(()), + }; + let remaining = drain_runtime_tasks(tasks).await; + first_result.and(remaining) + } + } } fn prepare_filesystem_for(paths: &MycRuntimePaths) -> Result<(), MycError> { @@ -194,6 +231,45 @@ impl MycRuntime { } } +async fn drain_runtime_tasks( + mut tasks: tokio::task::JoinSet<Result<(), MycError>>, +) -> Result<(), MycError> { + let mut first_error = None; + while let Some(joined) = tasks.join_next().await { + match joined { + Ok(Ok(())) => {} + Ok(Err(error)) => { + if first_error.is_none() { + first_error = Some(error); + } + } + Err(error) => { + if first_error.is_none() { + first_error = Some(MycError::InvalidOperation(format!( + "myc runtime task failed: {error}" + ))); + } + } + } + } + + match first_error { + Some(error) => Err(error), + None => Ok(()), + } +} + +async fn observe_shutdown_signal(mut shutdown_rx: tokio::sync::watch::Receiver<bool>) { + loop { + if *shutdown_rx.borrow() { + break; + } + if shutdown_rx.changed().await.is_err() { + break; + } + } +} + impl MycRuntimePaths { fn from_config(config: &MycConfig) -> Self { let state_dir = config.paths.state_dir.clone(); diff --git a/src/config.rs b/src/config.rs @@ -1,5 +1,6 @@ use std::collections::BTreeSet; use std::fs; +use std::net::SocketAddr; use std::path::{Path, PathBuf}; use nostr::PublicKey; @@ -21,6 +22,7 @@ pub struct MycConfig { pub logging: MycLoggingConfig, pub paths: MycPathsConfig, pub audit: MycAuditConfig, + pub observability: MycObservabilityConfig, pub discovery: MycDiscoveryConfig, pub policy: MycPolicyConfig, pub transport: MycTransportConfig, @@ -58,6 +60,13 @@ pub struct MycAuditConfig { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(default, deny_unknown_fields)] +pub struct MycObservabilityConfig { + pub enabled: bool, + pub bind_addr: SocketAddr, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(default, deny_unknown_fields)] pub struct MycDiscoveryConfig { pub enabled: bool, pub domain: Option<String>, @@ -130,6 +139,7 @@ impl Default for MycConfig { logging: MycLoggingConfig::default(), paths: MycPathsConfig::default(), audit: MycAuditConfig::default(), + observability: MycObservabilityConfig::default(), discovery: MycDiscoveryConfig::default(), policy: MycPolicyConfig::default(), transport: MycTransportConfig::default(), @@ -190,6 +200,17 @@ impl Default for MycAuditConfig { } } +impl Default for MycObservabilityConfig { + fn default() -> Self { + Self { + enabled: false, + bind_addr: "127.0.0.1:9460" + .parse() + .expect("default observability bind addr"), + } + } +} + impl Default for MycDiscoveryConfig { fn default() -> Self { Self { @@ -329,6 +350,12 @@ impl MycConfig { )); } + if !self.observability.bind_addr.ip().is_loopback() { + return Err(MycError::InvalidConfig( + "observability.bind_addr must use a loopback address".to_owned(), + )); + } + self.discovery.validate(&self.transport)?; if self.transport.connect_timeout_secs == 0 { @@ -555,6 +582,12 @@ fn apply_env_entry( "MYC_AUDIT_MAX_ARCHIVED_FILES" => { config.audit.max_archived_files = parse_usize_env(key, value, path, line_number)?; } + "MYC_OBSERVABILITY_ENABLED" => { + config.observability.enabled = parse_bool_env(key, value, path, line_number)?; + } + "MYC_OBSERVABILITY_BIND_ADDR" => { + config.observability.bind_addr = parse_socket_addr_env(key, value, path, line_number)?; + } "MYC_DISCOVERY_ENABLED" => { config.discovery.enabled = parse_bool_env(key, value, path, line_number)?; } @@ -706,6 +739,21 @@ fn parse_u64_env(key: &str, value: &str, path: &Path, line_number: usize) -> Res }) } +fn parse_socket_addr_env( + key: &str, + value: &str, + path: &Path, + line_number: usize, +) -> Result<SocketAddr, MycError> { + value.parse::<SocketAddr>().map_err(|error| { + config_parse_error( + path, + line_number, + format!("{key} must be a socket address: {error}"), + ) + }) +} + fn parse_connection_approval_env( key: &str, value: &str, @@ -1029,6 +1077,13 @@ mod tests { assert_eq!(config.audit.default_read_limit, 200); assert_eq!(config.audit.max_active_file_bytes, 262_144); assert_eq!(config.audit.max_archived_files, 8); + assert!(!config.observability.enabled); + assert_eq!( + config.observability.bind_addr, + "127.0.0.1:9460" + .parse() + .expect("default observability bind addr") + ); assert!(!config.discovery.enabled); assert_eq!(config.discovery.handler_identifier, "myc"); assert!(config.discovery.domain.is_none()); @@ -1063,6 +1118,8 @@ MYC_PATHS_USER_IDENTITY_PATH=/tmp/myc-user.json MYC_AUDIT_DEFAULT_READ_LIMIT=50 MYC_AUDIT_MAX_ACTIVE_FILE_BYTES=4096 MYC_AUDIT_MAX_ARCHIVED_FILES=3 +MYC_OBSERVABILITY_ENABLED=true +MYC_OBSERVABILITY_BIND_ADDR=127.0.0.1:9550 MYC_DISCOVERY_ENABLED=true MYC_DISCOVERY_DOMAIN=myc.example.com MYC_DISCOVERY_HANDLER_IDENTIFIER=myc-main @@ -1116,6 +1173,11 @@ MYC_TRANSPORT_PUBLISH_MAX_BACKOFF_MILLIS=800 assert_eq!(config.audit.default_read_limit, 50); assert_eq!(config.audit.max_active_file_bytes, 4096); assert_eq!(config.audit.max_archived_files, 3); + assert!(config.observability.enabled); + assert_eq!( + config.observability.bind_addr, + "127.0.0.1:9550".parse().expect("observability bind addr") + ); assert!(config.discovery.enabled); assert_eq!(config.discovery.domain.as_deref(), Some("myc.example.com")); assert_eq!(config.discovery.handler_identifier, "myc-main"); @@ -1228,6 +1290,23 @@ MYC_UNKNOWN=nope } #[test] + fn validate_rejects_non_loopback_observability_bind_addr() { + let mut config = MycConfig::default(); + config.observability.enabled = true; + config.observability.bind_addr = "0.0.0.0:9460" + .parse() + .expect("non-loopback observability bind addr"); + + let err = config + .validate() + .expect_err("non-loopback observability bind addr should be rejected"); + assert!( + err.to_string() + .contains("observability.bind_addr must use a loopback address") + ); + } + + #[test] fn discovery_validation_requires_domain_and_relays_when_enabled() { let mut config = MycConfig::default(); config.discovery.enabled = true; diff --git a/src/error.rs b/src/error.rs @@ -1,3 +1,4 @@ +use std::net::SocketAddr; use std::path::PathBuf; use radroots_identity::IdentityError; @@ -40,6 +41,18 @@ pub enum MycError { #[source] source: std::io::Error, }, + #[error("failed to bind observability server at {bind_addr}: {source}")] + ObservabilityBind { + bind_addr: SocketAddr, + #[source] + source: std::io::Error, + }, + #[error("observability server failed at {bind_addr}: {source}")] + ObservabilityServe { + bind_addr: SocketAddr, + #[source] + source: std::io::Error, + }, #[error("audit io error at {path}: {source}")] AuditIo { path: PathBuf, diff --git a/src/lib.rs b/src/lib.rs @@ -19,8 +19,8 @@ pub use audit::{ }; pub use config::{ DEFAULT_ENV_PATH, MycAuditConfig, MycConfig, MycConnectionApproval, MycDiscoveryConfig, - MycDiscoveryMetadataConfig, MycLoggingConfig, MycPathsConfig, MycPolicyConfig, - MycServiceConfig, MycTransportConfig, MycTransportDeliveryPolicy, + MycDiscoveryMetadataConfig, MycLoggingConfig, MycObservabilityConfig, MycPathsConfig, + MycPolicyConfig, MycServiceConfig, MycTransportConfig, MycTransportDeliveryPolicy, }; pub use control::{MycAcceptedConnectionOutput, MycAuthorizedReplayOutput}; pub use discovery::{ diff --git a/src/operability/mod.rs b/src/operability/mod.rs @@ -1,3 +1,5 @@ +pub mod server; + use std::collections::BTreeMap; use std::time::Duration; diff --git a/src/operability/server.rs b/src/operability/server.rs @@ -0,0 +1,103 @@ +use axum::extract::State; +use axum::http::{HeaderValue, StatusCode, header}; +use axum::response::{IntoResponse, Response}; +use axum::routing::get; +use axum::{Json, Router}; +use tokio::net::TcpListener; + +use crate::app::MycRuntime; +use crate::error::MycError; + +use super::{MycRuntimeStatus, collect_metrics, collect_status_full, render_metrics_text}; + +#[derive(Clone)] +struct MycObservabilityState { + runtime: MycRuntime, +} + +pub async fn run_observability_server<F>(runtime: MycRuntime, shutdown: F) -> Result<(), MycError> +where + F: std::future::Future<Output = ()> + Send + 'static, +{ + let bind_addr = runtime.config().observability.bind_addr; + let listener = TcpListener::bind(bind_addr) + .await + .map_err(|source| MycError::ObservabilityBind { bind_addr, source })?; + let state = MycObservabilityState { runtime }; + let app = Router::new() + .route("/healthz", get(healthz)) + .route("/readyz", get(readyz)) + .route("/status", get(status)) + .route("/metrics", get(metrics)) + .with_state(state); + + tracing::info!(bind_addr = %bind_addr, "observability server listening"); + axum::serve(listener, app) + .with_graceful_shutdown(shutdown) + .await + .map_err(|source| MycError::ObservabilityServe { bind_addr, source }) +} + +async fn healthz(State(state): State<MycObservabilityState>) -> Response { + match collect_status_full(&state.runtime).await { + Ok(status) => { + let code = match status.status { + MycRuntimeStatus::Healthy | MycRuntimeStatus::Degraded => StatusCode::OK, + MycRuntimeStatus::Unready => StatusCode::SERVICE_UNAVAILABLE, + }; + (code, status.status.status_label()).into_response() + } + Err(error) => internal_error_response(error), + } +} + +async fn readyz(State(state): State<MycObservabilityState>) -> Response { + match collect_status_full(&state.runtime).await { + Ok(status) => { + let code = if status.ready { + StatusCode::OK + } else { + StatusCode::SERVICE_UNAVAILABLE + }; + let body = if status.ready { "ready" } else { "unready" }; + (code, body).into_response() + } + Err(error) => internal_error_response(error), + } +} + +async fn status(State(state): State<MycObservabilityState>) -> Response { + match collect_status_full(&state.runtime).await { + Ok(status) => Json(status).into_response(), + Err(error) => internal_error_response(error), + } +} + +async fn metrics(State(state): State<MycObservabilityState>) -> Response { + match collect_metrics(&state.runtime) { + Ok(metrics) => { + let body = render_metrics_text(&metrics); + let mut response = body.into_response(); + response.headers_mut().insert( + header::CONTENT_TYPE, + HeaderValue::from_static("text/plain; version=0.0.4; charset=utf-8"), + ); + response + } + Err(error) => internal_error_response(error), + } +} + +impl super::MycRuntimeStatus { + fn status_label(self) -> &'static str { + match self { + Self::Healthy => "healthy", + Self::Degraded => "degraded", + Self::Unready => "unready", + } + } +} + +fn internal_error_response(error: MycError) -> Response { + (StatusCode::INTERNAL_SERVER_ERROR, error.to_string()).into_response() +} diff --git a/tests/operability_server.rs b/tests/operability_server.rs @@ -0,0 +1,279 @@ +use std::net::{SocketAddr, TcpListener as StdTcpListener}; +use std::path::{Path, PathBuf}; +use std::time::Duration; + +use myc::{MycConfig, MycRuntime, MycTransportDeliveryPolicy}; +use radroots_identity::RadrootsIdentity; +use serde_json::Value; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::oneshot; +use tokio::time::{sleep, timeout}; + +type TestResult<T> = Result<T, Box<dyn std::error::Error + Send + Sync>>; + +struct TestRelay { + url: String, + shutdown_tx: Option<oneshot::Sender<()>>, +} + +impl TestRelay { + async fn spawn() -> TestResult<Self> { + let listener = TcpListener::bind("127.0.0.1:0").await?; + let addr = listener.local_addr()?; + let url = format!("ws://{addr}"); + let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); + + tokio::spawn(async move { + loop { + tokio::select! { + _ = &mut shutdown_rx => break, + accept = listener.accept() => { + let Ok((stream, _)) = accept else { + break; + }; + tokio::spawn(async move { + let _ = tokio_tungstenite::accept_async(stream).await; + }); + } + } + } + }); + + Ok(Self { + url, + shutdown_tx: Some(shutdown_tx), + }) + } + + fn url(&self) -> &str { + self.url.as_str() + } +} + +impl Drop for TestRelay { + fn drop(&mut self) { + if let Some(shutdown_tx) = self.shutdown_tx.take() { + let _ = shutdown_tx.send(()); + } + } +} + +struct HangingRelay { + url: String, + shutdown_tx: Option<oneshot::Sender<()>>, +} + +impl HangingRelay { + async fn spawn(hold_open_for: Duration) -> TestResult<Self> { + let listener = TcpListener::bind("127.0.0.1:0").await?; + let addr = listener.local_addr()?; + let url = format!("ws://{addr}"); + let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); + + tokio::spawn(async move { + loop { + tokio::select! { + _ = &mut shutdown_rx => break, + accept = listener.accept() => { + let Ok((stream, _)) = accept else { + break; + }; + tokio::spawn(async move { + sleep(hold_open_for).await; + drop(stream); + }); + } + } + } + }); + + Ok(Self { + url, + shutdown_tx: Some(shutdown_tx), + }) + } + + fn url(&self) -> &str { + self.url.as_str() + } +} + +impl Drop for HangingRelay { + fn drop(&mut self) { + if let Some(shutdown_tx) = self.shutdown_tx.take() { + let _ = shutdown_tx.send(()); + } + } +} + +fn write_test_identity(path: &Path, secret_key: &str) { + RadrootsIdentity::from_secret_key_str(secret_key) + .expect("identity from secret") + .save_json(path) + .expect("write identity"); +} + +fn free_loopback_addr() -> SocketAddr { + let listener = StdTcpListener::bind("127.0.0.1:0").expect("bind free loopback addr"); + let addr = listener.local_addr().expect("local addr"); + drop(listener); + addr +} + +fn build_runtime<F>(configure: F) -> (MycRuntime, SocketAddr) +where + F: FnOnce(&mut MycConfig), +{ + let temp = tempfile::tempdir().expect("tempdir").keep(); + let bind_addr = free_loopback_addr(); + let mut config = MycConfig::default(); + config.paths.state_dir = PathBuf::from(&temp).join("state"); + config.paths.signer_identity_path = PathBuf::from(&temp).join("signer.json"); + config.paths.user_identity_path = PathBuf::from(&temp).join("user.json"); + config.transport.connect_timeout_secs = 1; + config.observability.enabled = true; + config.observability.bind_addr = bind_addr; + write_test_identity( + &config.paths.signer_identity_path, + "1111111111111111111111111111111111111111111111111111111111111111", + ); + write_test_identity( + &config.paths.user_identity_path, + "2222222222222222222222222222222222222222222222222222222222222222", + ); + configure(&mut config); + (MycRuntime::bootstrap(config).expect("runtime"), bind_addr) +} + +async fn spawn_runtime(runtime: MycRuntime) -> oneshot::Sender<()> { + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + tokio::spawn(async move { + let _ = runtime + .run_until(async move { + let _ = shutdown_rx.await; + }) + .await; + }); + shutdown_tx +} + +async fn wait_for_http(addr: SocketAddr) -> TestResult<()> { + timeout(Duration::from_secs(5), async { + loop { + match TcpStream::connect(addr).await { + Ok(mut stream) => { + let _ = stream.shutdown().await; + return; + } + Err(_) => sleep(Duration::from_millis(50)).await, + } + } + }) + .await?; + Ok(()) +} + +struct SimpleHttpResponse { + status: u16, + content_type: Option<String>, + body: String, +} + +async fn http_get(addr: SocketAddr, path: &str) -> TestResult<SimpleHttpResponse> { + let mut stream = TcpStream::connect(addr).await?; + let request = format!("GET {path} HTTP/1.1\r\nHost: 127.0.0.1\r\nConnection: close\r\n\r\n"); + stream.write_all(request.as_bytes()).await?; + let mut response = Vec::new(); + stream.read_to_end(&mut response).await?; + let response = String::from_utf8(response)?; + let (head, body) = response + .split_once("\r\n\r\n") + .ok_or("missing http body separator")?; + let mut lines = head.lines(); + let status_line = lines.next().ok_or("missing status line")?; + let status = status_line + .split_whitespace() + .nth(1) + .ok_or("missing status code")? + .parse::<u16>()?; + let content_type = lines.find_map(|line| { + let (key, value) = line.split_once(':')?; + if key.eq_ignore_ascii_case("content-type") { + Some(value.trim().to_owned()) + } else { + None + } + }); + Ok(SimpleHttpResponse { + status, + content_type, + body: body.to_owned(), + }) +} + +#[tokio::test] +async fn observability_server_reports_unready_when_transport_is_disabled() -> TestResult<()> { + let (runtime, bind_addr) = build_runtime(|_| {}); + let shutdown_tx = spawn_runtime(runtime).await; + wait_for_http(bind_addr).await?; + + let health = http_get(bind_addr, "/healthz").await?; + assert_eq!(health.status, 503); + assert_eq!(health.body, "unready"); + + let ready = http_get(bind_addr, "/readyz").await?; + assert_eq!(ready.status, 503); + assert_eq!(ready.body, "unready"); + + let status = http_get(bind_addr, "/status").await?; + assert_eq!(status.status, 200); + let body: Value = serde_json::from_str(status.body.as_str())?; + assert_eq!(body["status"], "unready"); + assert_eq!(body["ready"], false); + + let metrics = http_get(bind_addr, "/metrics").await?; + assert_eq!(metrics.status, 200); + assert!( + metrics + .content_type + .as_deref() + .unwrap_or_default() + .starts_with("text/plain") + ); + assert!(metrics.body.contains("myc_runtime_operation_total")); + + let _ = shutdown_tx.send(()); + Ok(()) +} + +#[tokio::test] +async fn observability_server_reports_degraded_but_ready_partial_outage() -> TestResult<()> { + let relay = TestRelay::spawn().await?; + let hanging = HangingRelay::spawn(Duration::from_secs(5)).await?; + let (runtime, bind_addr) = build_runtime(|config| { + config.transport.enabled = true; + config.transport.relays = vec![relay.url().to_owned(), hanging.url().to_owned()]; + config.transport.delivery_policy = MycTransportDeliveryPolicy::Any; + }); + let shutdown_tx = spawn_runtime(runtime).await; + wait_for_http(bind_addr).await?; + + let health = http_get(bind_addr, "/healthz").await?; + assert_eq!(health.status, 200); + assert_eq!(health.body, "degraded"); + + let ready = http_get(bind_addr, "/readyz").await?; + assert_eq!(ready.status, 200); + assert_eq!(ready.body, "ready"); + + let status = http_get(bind_addr, "/status").await?; + let body: Value = serde_json::from_str(status.body.as_str())?; + assert_eq!(body["status"], "degraded"); + assert_eq!(body["ready"], true); + assert_eq!(body["transport"]["available_relay_count"], 1); + assert_eq!(body["transport"]["unavailable_relay_count"], 1); + + let _ = shutdown_tx.send(()); + Ok(()) +}