server.rs (3465B)
1 use axum::extract::State; 2 use axum::http::{HeaderValue, StatusCode, header}; 3 use axum::response::{IntoResponse, Response}; 4 use axum::routing::get; 5 use axum::{Json, Router}; 6 use tokio::net::TcpListener; 7 8 use crate::app::MycRuntime; 9 use crate::error::MycError; 10 11 use super::{MycRuntimeStatus, collect_metrics, collect_status_full, render_metrics_text}; 12 13 #[derive(Clone)] 14 struct MycObservabilityState { 15 runtime: MycRuntime, 16 } 17 18 pub async fn run_observability_server<F>(runtime: MycRuntime, shutdown: F) -> Result<(), MycError> 19 where 20 F: std::future::Future<Output = ()> + Send + 'static, 21 { 22 let bind_addr = runtime.config().observability.bind_addr; 23 let listener = TcpListener::bind(bind_addr) 24 .await 25 .map_err(|source| MycError::ObservabilityBind { bind_addr, source })?; 26 let state = MycObservabilityState { runtime }; 27 let app = Router::new() 28 .route("/healthz", get(healthz)) 29 .route("/readyz", get(readyz)) 30 .route("/status", get(status)) 31 .route("/metrics", get(metrics)) 32 .with_state(state); 33 34 tracing::info!(bind_addr = %bind_addr, "observability server listening"); 35 axum::serve(listener, app) 36 .with_graceful_shutdown(shutdown) 37 .await 38 .map_err(|source| MycError::ObservabilityServe { bind_addr, source }) 39 } 40 41 async fn healthz(State(state): State<MycObservabilityState>) -> Response { 42 match collect_status_full(&state.runtime).await { 43 Ok(status) => { 44 let code = match status.status { 45 MycRuntimeStatus::Healthy | MycRuntimeStatus::Degraded => StatusCode::OK, 46 MycRuntimeStatus::Unready => StatusCode::SERVICE_UNAVAILABLE, 47 }; 48 (code, status.status.status_label()).into_response() 49 } 50 Err(error) => internal_error_response(error), 51 } 52 } 53 54 async fn readyz(State(state): State<MycObservabilityState>) -> Response { 55 match collect_status_full(&state.runtime).await { 56 Ok(status) => { 57 let code = if status.ready { 58 StatusCode::OK 59 } else { 60 StatusCode::SERVICE_UNAVAILABLE 61 }; 62 let body = if status.ready { "ready" } else { "unready" }; 63 (code, body).into_response() 64 } 65 Err(error) => internal_error_response(error), 66 } 67 } 68 69 async fn status(State(state): State<MycObservabilityState>) -> Response { 70 match collect_status_full(&state.runtime).await { 71 Ok(status) => Json(status).into_response(), 72 Err(error) => internal_error_response(error), 73 } 74 } 75 76 async fn metrics(State(state): State<MycObservabilityState>) -> Response { 77 match collect_metrics(&state.runtime) { 78 Ok(metrics) => { 79 let body = render_metrics_text(&metrics); 80 let mut response = body.into_response(); 81 response.headers_mut().insert( 82 header::CONTENT_TYPE, 83 HeaderValue::from_static("text/plain; version=0.0.4; charset=utf-8"), 84 ); 85 response 86 } 87 Err(error) => internal_error_response(error), 88 } 89 } 90 91 impl super::MycRuntimeStatus { 92 fn status_label(self) -> &'static str { 93 match self { 94 Self::Healthy => "healthy", 95 Self::Degraded => "degraded", 96 Self::Unready => "unready", 97 } 98 } 99 } 100 101 fn internal_error_response(error: MycError) -> Response { 102 (StatusCode::INTERNAL_SERVER_ERROR, error.to_string()).into_response() 103 }