ops.rs (17528B)
1 #![forbid(unsafe_code)] 2 3 use axum::{Json, Router, extract::State, routing::get}; 4 use http::StatusCode; 5 use serde::{Deserialize, Serialize}; 6 use std::sync::{Arc, RwLock}; 7 8 use crate::runtime::{TangleRuntimeMetrics, TangleRuntimeMetricsSnapshot}; 9 10 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 11 pub enum BaseRelayReadinessCheckStatus { 12 Ready, 13 NotReady, 14 } 15 16 impl BaseRelayReadinessCheckStatus { 17 pub fn as_str(self) -> &'static str { 18 match self { 19 Self::Ready => "ready", 20 Self::NotReady => "not_ready", 21 } 22 } 23 24 pub fn is_ready(self) -> bool { 25 self == Self::Ready 26 } 27 } 28 29 #[derive(Debug, Clone, PartialEq, Eq)] 30 pub struct BaseRelayReadinessState { 31 config: BaseRelayReadinessCheckStatus, 32 server_bind: BaseRelayReadinessCheckStatus, 33 relay_identity: BaseRelayReadinessCheckStatus, 34 pocket_storage: BaseRelayReadinessCheckStatus, 35 group_projection: BaseRelayReadinessCheckStatus, 36 group_outbox_replay: BaseRelayReadinessCheckStatus, 37 event_bus: BaseRelayReadinessCheckStatus, 38 } 39 40 impl BaseRelayReadinessState { 41 pub fn new( 42 config: BaseRelayReadinessCheckStatus, 43 server_bind: BaseRelayReadinessCheckStatus, 44 relay_identity: BaseRelayReadinessCheckStatus, 45 pocket_storage: BaseRelayReadinessCheckStatus, 46 group_projection: BaseRelayReadinessCheckStatus, 47 group_outbox_replay: BaseRelayReadinessCheckStatus, 48 event_bus: BaseRelayReadinessCheckStatus, 49 ) -> Self { 50 Self { 51 config, 52 server_bind, 53 relay_identity, 54 pocket_storage, 55 group_projection, 56 group_outbox_replay, 57 event_bus, 58 } 59 } 60 61 pub fn ready() -> Self { 62 Self::new( 63 BaseRelayReadinessCheckStatus::Ready, 64 BaseRelayReadinessCheckStatus::Ready, 65 BaseRelayReadinessCheckStatus::Ready, 66 BaseRelayReadinessCheckStatus::Ready, 67 BaseRelayReadinessCheckStatus::Ready, 68 BaseRelayReadinessCheckStatus::Ready, 69 BaseRelayReadinessCheckStatus::Ready, 70 ) 71 } 72 73 pub fn runtime_ready_before_bind() -> Self { 74 Self::new( 75 BaseRelayReadinessCheckStatus::Ready, 76 BaseRelayReadinessCheckStatus::NotReady, 77 BaseRelayReadinessCheckStatus::Ready, 78 BaseRelayReadinessCheckStatus::Ready, 79 BaseRelayReadinessCheckStatus::Ready, 80 BaseRelayReadinessCheckStatus::Ready, 81 BaseRelayReadinessCheckStatus::Ready, 82 ) 83 } 84 85 pub fn with_server_bind(mut self, server_bind: BaseRelayReadinessCheckStatus) -> Self { 86 self.server_bind = server_bind; 87 self 88 } 89 90 pub fn is_ready(&self) -> bool { 91 [ 92 self.config, 93 self.server_bind, 94 self.relay_identity, 95 self.pocket_storage, 96 self.group_projection, 97 self.group_outbox_replay, 98 self.event_bus, 99 ] 100 .into_iter() 101 .all(BaseRelayReadinessCheckStatus::is_ready) 102 } 103 104 pub fn response(&self) -> BaseRelayReadinessDocument { 105 BaseRelayReadinessDocument { 106 status: if self.is_ready() { 107 "ready".to_owned() 108 } else { 109 "not_ready".to_owned() 110 }, 111 checks: BaseRelayReadinessChecksDocument { 112 config: self.config.as_str().to_owned(), 113 server_bind: self.server_bind.as_str().to_owned(), 114 relay_identity: self.relay_identity.as_str().to_owned(), 115 pocket_storage: self.pocket_storage.as_str().to_owned(), 116 group_projection: self.group_projection.as_str().to_owned(), 117 group_outbox_replay: self.group_outbox_replay.as_str().to_owned(), 118 event_bus: self.event_bus.as_str().to_owned(), 119 }, 120 } 121 } 122 } 123 124 #[derive(Debug, Clone)] 125 pub struct BaseRelayReadinessHandle { 126 inner: Arc<RwLock<BaseRelayReadinessState>>, 127 } 128 129 impl BaseRelayReadinessHandle { 130 pub fn new(state: BaseRelayReadinessState) -> Self { 131 Self { 132 inner: Arc::new(RwLock::new(state)), 133 } 134 } 135 136 pub fn snapshot(&self) -> BaseRelayReadinessState { 137 match self.inner.read() { 138 Ok(state) => state.clone(), 139 Err(poisoned) => poisoned.into_inner().clone(), 140 } 141 } 142 143 pub fn set_server_bind(&self, status: BaseRelayReadinessCheckStatus) { 144 let mut state = match self.inner.write() { 145 Ok(state) => state, 146 Err(poisoned) => poisoned.into_inner(), 147 }; 148 state.server_bind = status; 149 } 150 } 151 152 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 153 pub struct BaseRelayHealthDocument { 154 pub status: String, 155 } 156 157 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 158 pub struct BaseRelayReadinessDocument { 159 pub status: String, 160 pub checks: BaseRelayReadinessChecksDocument, 161 } 162 163 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 164 pub struct BaseRelayReadinessChecksDocument { 165 pub config: String, 166 pub server_bind: String, 167 pub relay_identity: String, 168 pub pocket_storage: String, 169 pub group_projection: String, 170 pub group_outbox_replay: String, 171 pub event_bus: String, 172 } 173 174 #[derive(Debug, Clone)] 175 struct BaseRelayOpsState { 176 readiness: BaseRelayReadinessHandle, 177 metrics: TangleRuntimeMetrics, 178 } 179 180 pub fn base_relay_ops_router( 181 readiness: BaseRelayReadinessHandle, 182 metrics: TangleRuntimeMetrics, 183 ) -> Router { 184 Router::new() 185 .route("/healthz", get(base_relay_healthz)) 186 .route("/readyz", get(base_relay_readyz)) 187 .route("/metricsz", get(base_relay_metricsz)) 188 .with_state(BaseRelayOpsState { readiness, metrics }) 189 } 190 191 async fn base_relay_healthz() -> Json<BaseRelayHealthDocument> { 192 Json(BaseRelayHealthDocument { 193 status: "ok".to_owned(), 194 }) 195 } 196 197 async fn base_relay_readyz( 198 State(state): State<BaseRelayOpsState>, 199 ) -> (StatusCode, Json<BaseRelayReadinessDocument>) { 200 let readiness = state.readiness.snapshot(); 201 let status = if readiness.is_ready() { 202 StatusCode::OK 203 } else { 204 StatusCode::SERVICE_UNAVAILABLE 205 }; 206 (status, Json(readiness.response())) 207 } 208 209 async fn base_relay_metricsz( 210 State(state): State<BaseRelayOpsState>, 211 ) -> Json<TangleRuntimeMetricsSnapshot> { 212 let readiness = state.readiness.snapshot(); 213 Json(state.metrics.snapshot_with_readiness(readiness.is_ready())) 214 } 215 216 #[cfg(test)] 217 mod tests { 218 use super::{ 219 BaseRelayReadinessCheckStatus, BaseRelayReadinessHandle, BaseRelayReadinessState, 220 base_relay_ops_router, 221 }; 222 use crate::relay::core::BaseRelayQueryMetrics; 223 use crate::runtime::TangleRuntimeMetrics; 224 use axum::body::to_bytes; 225 use http::{Request, StatusCode}; 226 use tower::ServiceExt; 227 228 #[tokio::test] 229 async fn base_relay_ops_router_reports_health_and_readiness() { 230 let metrics = TangleRuntimeMetrics::new(); 231 let readiness = BaseRelayReadinessHandle::new(BaseRelayReadinessState::ready()); 232 let health = base_relay_ops_router(readiness.clone(), metrics.clone()) 233 .oneshot( 234 Request::builder() 235 .uri("/healthz") 236 .body(axum::body::Body::empty()) 237 .expect("request"), 238 ) 239 .await 240 .expect("health"); 241 242 assert_eq!(health.status(), StatusCode::OK); 243 let health_body = to_bytes(health.into_body(), usize::MAX) 244 .await 245 .expect("body"); 246 let health_value = serde_json::from_slice::<serde_json::Value>(&health_body).expect("json"); 247 assert_eq!(health_value["status"], "ok"); 248 249 metrics.record_session_opened(); 250 metrics.record_client_message(crate::runtime::TangleClientMessageMetricKind::Req); 251 metrics.record_subscription_opened(); 252 metrics.record_auth_success(); 253 metrics.record_auth_failure(); 254 metrics.record_event_admission(); 255 metrics.record_event_rejection(); 256 metrics.record_group_read_denial(); 257 metrics.record_group_write_denial(); 258 metrics.record_event_bus_receivers(2); 259 metrics.record_event_bus_publish(2); 260 metrics.record_event_bus_lagged(3); 261 metrics.record_outbound_queue_full_close(); 262 metrics.record_outbox_pending_events(5); 263 metrics.record_outbox_replayed_event(); 264 metrics.record_disk_used_bytes(89); 265 metrics.record_event_admission_latency(11); 266 metrics.record_query_latency(17); 267 metrics.record_query_metrics(BaseRelayQueryMetrics::new(5, 3, 2)); 268 metrics.record_count_refusal(); 269 metrics.record_broad_query_rejection(); 270 let ready = base_relay_ops_router(readiness.clone(), metrics.clone()) 271 .oneshot( 272 Request::builder() 273 .uri("/readyz") 274 .body(axum::body::Body::empty()) 275 .expect("request"), 276 ) 277 .await 278 .expect("ready"); 279 280 assert_eq!(ready.status(), StatusCode::OK); 281 let ready_body = to_bytes(ready.into_body(), usize::MAX).await.expect("body"); 282 let ready_value = serde_json::from_slice::<serde_json::Value>(&ready_body).expect("json"); 283 assert_eq!(ready_value["status"], "ready"); 284 assert_eq!(ready_value["checks"]["server_bind"], "ready"); 285 assert_eq!(ready_value["checks"]["group_outbox_replay"], "ready"); 286 assert_eq!(ready_value["checks"]["event_bus"], "ready"); 287 let metrics_response = base_relay_ops_router(readiness, metrics) 288 .oneshot( 289 Request::builder() 290 .uri("/metricsz") 291 .body(axum::body::Body::empty()) 292 .expect("request"), 293 ) 294 .await 295 .expect("metrics"); 296 297 assert_eq!(metrics_response.status(), StatusCode::OK); 298 let metrics_body = to_bytes(metrics_response.into_body(), usize::MAX) 299 .await 300 .expect("body"); 301 let metrics_value = 302 serde_json::from_slice::<serde_json::Value>(&metrics_body).expect("json"); 303 let keys = metrics_value 304 .as_object() 305 .expect("metrics object") 306 .keys() 307 .cloned() 308 .collect::<std::collections::BTreeSet<_>>(); 309 assert_eq!( 310 keys, 311 [ 312 "tangle_auth_failure_total", 313 "tangle_auth_messages_total", 314 "tangle_auth_success_total", 315 "tangle_client_messages_total", 316 "tangle_close_messages_total", 317 "tangle_count_messages_total", 318 "tangle_count_refusals_total", 319 "tangle_disk_used_bytes", 320 "tangle_event_admission_latency_count", 321 "tangle_event_admission_latency_total_micros", 322 "tangle_event_admitted_total", 323 "tangle_event_bus_lagged_offsets_total", 324 "tangle_event_bus_lagged_receivers_total", 325 "tangle_event_bus_published_offsets_total", 326 "tangle_event_bus_receivers_current", 327 "tangle_event_messages_total", 328 "tangle_event_rejected_total", 329 "tangle_group_read_denied_total", 330 "tangle_group_write_denied_total", 331 "tangle_outbound_queue_full_closes_total", 332 "tangle_outbox_pending_events", 333 "tangle_outbox_replayed_events_total", 334 "tangle_broad_query_rejections_total", 335 "tangle_query_candidates_scanned_total", 336 "tangle_query_latency_count", 337 "tangle_query_latency_total_micros", 338 "tangle_query_redacted_events_total", 339 "tangle_query_returned_events_total", 340 "tangle_rate_limit_rejections_total", 341 "tangle_readiness_ready", 342 "tangle_req_messages_total", 343 "tangle_runtime_uptime_seconds", 344 "tangle_stored_event_offsets_total", 345 "tangle_subscriptions_closed_total", 346 "tangle_subscriptions_opened_total", 347 "tangle_ws_connections_current", 348 "tangle_ws_connections_total", 349 ] 350 .into_iter() 351 .map(str::to_owned) 352 .collect::<std::collections::BTreeSet<_>>() 353 ); 354 assert_eq!(metrics_value["tangle_readiness_ready"], true); 355 assert_eq!(metrics_value["tangle_ws_connections_current"], 1); 356 assert_eq!(metrics_value["tangle_ws_connections_total"], 1); 357 assert_eq!(metrics_value["tangle_client_messages_total"], 1); 358 assert_eq!(metrics_value["tangle_req_messages_total"], 1); 359 assert_eq!(metrics_value["tangle_subscriptions_opened_total"], 1); 360 assert_eq!(metrics_value["tangle_auth_success_total"], 1); 361 assert_eq!(metrics_value["tangle_auth_failure_total"], 1); 362 assert_eq!(metrics_value["tangle_event_admitted_total"], 1); 363 assert_eq!(metrics_value["tangle_event_rejected_total"], 1); 364 assert_eq!(metrics_value["tangle_group_read_denied_total"], 1); 365 assert_eq!(metrics_value["tangle_group_write_denied_total"], 1); 366 assert_eq!(metrics_value["tangle_event_bus_receivers_current"], 2); 367 assert_eq!(metrics_value["tangle_event_bus_published_offsets_total"], 1); 368 assert_eq!(metrics_value["tangle_event_bus_lagged_receivers_total"], 1); 369 assert_eq!(metrics_value["tangle_event_bus_lagged_offsets_total"], 3); 370 assert_eq!(metrics_value["tangle_outbound_queue_full_closes_total"], 1); 371 assert_eq!(metrics_value["tangle_outbox_pending_events"], 5); 372 assert_eq!(metrics_value["tangle_outbox_replayed_events_total"], 1); 373 assert_eq!(metrics_value["tangle_disk_used_bytes"], 89); 374 assert_eq!( 375 metrics_value["tangle_event_admission_latency_total_micros"], 376 11 377 ); 378 assert_eq!(metrics_value["tangle_event_admission_latency_count"], 1); 379 assert_eq!(metrics_value["tangle_query_latency_total_micros"], 17); 380 assert_eq!(metrics_value["tangle_query_latency_count"], 1); 381 assert_eq!(metrics_value["tangle_query_candidates_scanned_total"], 5); 382 assert_eq!(metrics_value["tangle_query_returned_events_total"], 3); 383 assert_eq!(metrics_value["tangle_query_redacted_events_total"], 2); 384 assert_eq!(metrics_value["tangle_count_refusals_total"], 1); 385 assert_eq!(metrics_value["tangle_broad_query_rejections_total"], 1); 386 let metrics_text = String::from_utf8(metrics_body.to_vec()).expect("utf8"); 387 assert!(!metrics_text.contains("relay_secret")); 388 assert!(!metrics_text.contains("invite")); 389 390 let not_ready = BaseRelayReadinessState::new( 391 BaseRelayReadinessCheckStatus::Ready, 392 BaseRelayReadinessCheckStatus::Ready, 393 BaseRelayReadinessCheckStatus::Ready, 394 BaseRelayReadinessCheckStatus::Ready, 395 BaseRelayReadinessCheckStatus::NotReady, 396 BaseRelayReadinessCheckStatus::Ready, 397 BaseRelayReadinessCheckStatus::Ready, 398 ); 399 let rejected = base_relay_ops_router( 400 BaseRelayReadinessHandle::new(not_ready), 401 TangleRuntimeMetrics::new(), 402 ) 403 .oneshot( 404 Request::builder() 405 .uri("/readyz") 406 .body(axum::body::Body::empty()) 407 .expect("request"), 408 ) 409 .await 410 .expect("not ready"); 411 412 assert_eq!(rejected.status(), StatusCode::SERVICE_UNAVAILABLE); 413 let rejected_body = to_bytes(rejected.into_body(), usize::MAX) 414 .await 415 .expect("body"); 416 let rejected_value = 417 serde_json::from_slice::<serde_json::Value>(&rejected_body).expect("json"); 418 assert_eq!(rejected_value["status"], "not_ready"); 419 assert_eq!(rejected_value["checks"]["server_bind"], "ready"); 420 assert_eq!(rejected_value["checks"]["group_projection"], "not_ready"); 421 } 422 423 #[tokio::test] 424 async fn base_relay_ops_router_reports_live_readiness_state() { 425 let readiness = 426 BaseRelayReadinessHandle::new(BaseRelayReadinessState::runtime_ready_before_bind()); 427 let router = base_relay_ops_router(readiness.clone(), TangleRuntimeMetrics::new()); 428 let not_ready = router 429 .clone() 430 .oneshot( 431 Request::builder() 432 .uri("/readyz") 433 .body(axum::body::Body::empty()) 434 .expect("request"), 435 ) 436 .await 437 .expect("not ready"); 438 439 assert_eq!(not_ready.status(), StatusCode::SERVICE_UNAVAILABLE); 440 readiness.set_server_bind(BaseRelayReadinessCheckStatus::Ready); 441 let ready = router 442 .oneshot( 443 Request::builder() 444 .uri("/readyz") 445 .body(axum::body::Body::empty()) 446 .expect("request"), 447 ) 448 .await 449 .expect("ready"); 450 451 assert_eq!(ready.status(), StatusCode::OK); 452 let body = to_bytes(ready.into_body(), usize::MAX).await.expect("body"); 453 let value = serde_json::from_slice::<serde_json::Value>(&body).expect("json"); 454 assert_eq!(value["status"], "ready"); 455 assert_eq!(value["checks"]["event_bus"], "ready"); 456 } 457 }