tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

ops.rs (17528B)


      1 #![forbid(unsafe_code)]
      2 
      3 use axum::{Json, Router, extract::State, routing::get};
      4 use http::StatusCode;
      5 use serde::{Deserialize, Serialize};
      6 use std::sync::{Arc, RwLock};
      7 
      8 use crate::runtime::{TangleRuntimeMetrics, TangleRuntimeMetricsSnapshot};
      9 
     10 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
     11 pub enum BaseRelayReadinessCheckStatus {
     12     Ready,
     13     NotReady,
     14 }
     15 
     16 impl BaseRelayReadinessCheckStatus {
     17     pub fn as_str(self) -> &'static str {
     18         match self {
     19             Self::Ready => "ready",
     20             Self::NotReady => "not_ready",
     21         }
     22     }
     23 
     24     pub fn is_ready(self) -> bool {
     25         self == Self::Ready
     26     }
     27 }
     28 
     29 #[derive(Debug, Clone, PartialEq, Eq)]
     30 pub struct BaseRelayReadinessState {
     31     config: BaseRelayReadinessCheckStatus,
     32     server_bind: BaseRelayReadinessCheckStatus,
     33     relay_identity: BaseRelayReadinessCheckStatus,
     34     pocket_storage: BaseRelayReadinessCheckStatus,
     35     group_projection: BaseRelayReadinessCheckStatus,
     36     group_outbox_replay: BaseRelayReadinessCheckStatus,
     37     event_bus: BaseRelayReadinessCheckStatus,
     38 }
     39 
     40 impl BaseRelayReadinessState {
     41     pub fn new(
     42         config: BaseRelayReadinessCheckStatus,
     43         server_bind: BaseRelayReadinessCheckStatus,
     44         relay_identity: BaseRelayReadinessCheckStatus,
     45         pocket_storage: BaseRelayReadinessCheckStatus,
     46         group_projection: BaseRelayReadinessCheckStatus,
     47         group_outbox_replay: BaseRelayReadinessCheckStatus,
     48         event_bus: BaseRelayReadinessCheckStatus,
     49     ) -> Self {
     50         Self {
     51             config,
     52             server_bind,
     53             relay_identity,
     54             pocket_storage,
     55             group_projection,
     56             group_outbox_replay,
     57             event_bus,
     58         }
     59     }
     60 
     61     pub fn ready() -> Self {
     62         Self::new(
     63             BaseRelayReadinessCheckStatus::Ready,
     64             BaseRelayReadinessCheckStatus::Ready,
     65             BaseRelayReadinessCheckStatus::Ready,
     66             BaseRelayReadinessCheckStatus::Ready,
     67             BaseRelayReadinessCheckStatus::Ready,
     68             BaseRelayReadinessCheckStatus::Ready,
     69             BaseRelayReadinessCheckStatus::Ready,
     70         )
     71     }
     72 
     73     pub fn runtime_ready_before_bind() -> Self {
     74         Self::new(
     75             BaseRelayReadinessCheckStatus::Ready,
     76             BaseRelayReadinessCheckStatus::NotReady,
     77             BaseRelayReadinessCheckStatus::Ready,
     78             BaseRelayReadinessCheckStatus::Ready,
     79             BaseRelayReadinessCheckStatus::Ready,
     80             BaseRelayReadinessCheckStatus::Ready,
     81             BaseRelayReadinessCheckStatus::Ready,
     82         )
     83     }
     84 
     85     pub fn with_server_bind(mut self, server_bind: BaseRelayReadinessCheckStatus) -> Self {
     86         self.server_bind = server_bind;
     87         self
     88     }
     89 
     90     pub fn is_ready(&self) -> bool {
     91         [
     92             self.config,
     93             self.server_bind,
     94             self.relay_identity,
     95             self.pocket_storage,
     96             self.group_projection,
     97             self.group_outbox_replay,
     98             self.event_bus,
     99         ]
    100         .into_iter()
    101         .all(BaseRelayReadinessCheckStatus::is_ready)
    102     }
    103 
    104     pub fn response(&self) -> BaseRelayReadinessDocument {
    105         BaseRelayReadinessDocument {
    106             status: if self.is_ready() {
    107                 "ready".to_owned()
    108             } else {
    109                 "not_ready".to_owned()
    110             },
    111             checks: BaseRelayReadinessChecksDocument {
    112                 config: self.config.as_str().to_owned(),
    113                 server_bind: self.server_bind.as_str().to_owned(),
    114                 relay_identity: self.relay_identity.as_str().to_owned(),
    115                 pocket_storage: self.pocket_storage.as_str().to_owned(),
    116                 group_projection: self.group_projection.as_str().to_owned(),
    117                 group_outbox_replay: self.group_outbox_replay.as_str().to_owned(),
    118                 event_bus: self.event_bus.as_str().to_owned(),
    119             },
    120         }
    121     }
    122 }
    123 
    124 #[derive(Debug, Clone)]
    125 pub struct BaseRelayReadinessHandle {
    126     inner: Arc<RwLock<BaseRelayReadinessState>>,
    127 }
    128 
    129 impl BaseRelayReadinessHandle {
    130     pub fn new(state: BaseRelayReadinessState) -> Self {
    131         Self {
    132             inner: Arc::new(RwLock::new(state)),
    133         }
    134     }
    135 
    136     pub fn snapshot(&self) -> BaseRelayReadinessState {
    137         match self.inner.read() {
    138             Ok(state) => state.clone(),
    139             Err(poisoned) => poisoned.into_inner().clone(),
    140         }
    141     }
    142 
    143     pub fn set_server_bind(&self, status: BaseRelayReadinessCheckStatus) {
    144         let mut state = match self.inner.write() {
    145             Ok(state) => state,
    146             Err(poisoned) => poisoned.into_inner(),
    147         };
    148         state.server_bind = status;
    149     }
    150 }
    151 
    152 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
    153 pub struct BaseRelayHealthDocument {
    154     pub status: String,
    155 }
    156 
    157 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
    158 pub struct BaseRelayReadinessDocument {
    159     pub status: String,
    160     pub checks: BaseRelayReadinessChecksDocument,
    161 }
    162 
    163 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
    164 pub struct BaseRelayReadinessChecksDocument {
    165     pub config: String,
    166     pub server_bind: String,
    167     pub relay_identity: String,
    168     pub pocket_storage: String,
    169     pub group_projection: String,
    170     pub group_outbox_replay: String,
    171     pub event_bus: String,
    172 }
    173 
    174 #[derive(Debug, Clone)]
    175 struct BaseRelayOpsState {
    176     readiness: BaseRelayReadinessHandle,
    177     metrics: TangleRuntimeMetrics,
    178 }
    179 
    180 pub fn base_relay_ops_router(
    181     readiness: BaseRelayReadinessHandle,
    182     metrics: TangleRuntimeMetrics,
    183 ) -> Router {
    184     Router::new()
    185         .route("/healthz", get(base_relay_healthz))
    186         .route("/readyz", get(base_relay_readyz))
    187         .route("/metricsz", get(base_relay_metricsz))
    188         .with_state(BaseRelayOpsState { readiness, metrics })
    189 }
    190 
    191 async fn base_relay_healthz() -> Json<BaseRelayHealthDocument> {
    192     Json(BaseRelayHealthDocument {
    193         status: "ok".to_owned(),
    194     })
    195 }
    196 
    197 async fn base_relay_readyz(
    198     State(state): State<BaseRelayOpsState>,
    199 ) -> (StatusCode, Json<BaseRelayReadinessDocument>) {
    200     let readiness = state.readiness.snapshot();
    201     let status = if readiness.is_ready() {
    202         StatusCode::OK
    203     } else {
    204         StatusCode::SERVICE_UNAVAILABLE
    205     };
    206     (status, Json(readiness.response()))
    207 }
    208 
    209 async fn base_relay_metricsz(
    210     State(state): State<BaseRelayOpsState>,
    211 ) -> Json<TangleRuntimeMetricsSnapshot> {
    212     let readiness = state.readiness.snapshot();
    213     Json(state.metrics.snapshot_with_readiness(readiness.is_ready()))
    214 }
    215 
    216 #[cfg(test)]
    217 mod tests {
    218     use super::{
    219         BaseRelayReadinessCheckStatus, BaseRelayReadinessHandle, BaseRelayReadinessState,
    220         base_relay_ops_router,
    221     };
    222     use crate::relay::core::BaseRelayQueryMetrics;
    223     use crate::runtime::TangleRuntimeMetrics;
    224     use axum::body::to_bytes;
    225     use http::{Request, StatusCode};
    226     use tower::ServiceExt;
    227 
    228     #[tokio::test]
    229     async fn base_relay_ops_router_reports_health_and_readiness() {
    230         let metrics = TangleRuntimeMetrics::new();
    231         let readiness = BaseRelayReadinessHandle::new(BaseRelayReadinessState::ready());
    232         let health = base_relay_ops_router(readiness.clone(), metrics.clone())
    233             .oneshot(
    234                 Request::builder()
    235                     .uri("/healthz")
    236                     .body(axum::body::Body::empty())
    237                     .expect("request"),
    238             )
    239             .await
    240             .expect("health");
    241 
    242         assert_eq!(health.status(), StatusCode::OK);
    243         let health_body = to_bytes(health.into_body(), usize::MAX)
    244             .await
    245             .expect("body");
    246         let health_value = serde_json::from_slice::<serde_json::Value>(&health_body).expect("json");
    247         assert_eq!(health_value["status"], "ok");
    248 
    249         metrics.record_session_opened();
    250         metrics.record_client_message(crate::runtime::TangleClientMessageMetricKind::Req);
    251         metrics.record_subscription_opened();
    252         metrics.record_auth_success();
    253         metrics.record_auth_failure();
    254         metrics.record_event_admission();
    255         metrics.record_event_rejection();
    256         metrics.record_group_read_denial();
    257         metrics.record_group_write_denial();
    258         metrics.record_event_bus_receivers(2);
    259         metrics.record_event_bus_publish(2);
    260         metrics.record_event_bus_lagged(3);
    261         metrics.record_outbound_queue_full_close();
    262         metrics.record_outbox_pending_events(5);
    263         metrics.record_outbox_replayed_event();
    264         metrics.record_disk_used_bytes(89);
    265         metrics.record_event_admission_latency(11);
    266         metrics.record_query_latency(17);
    267         metrics.record_query_metrics(BaseRelayQueryMetrics::new(5, 3, 2));
    268         metrics.record_count_refusal();
    269         metrics.record_broad_query_rejection();
    270         let ready = base_relay_ops_router(readiness.clone(), metrics.clone())
    271             .oneshot(
    272                 Request::builder()
    273                     .uri("/readyz")
    274                     .body(axum::body::Body::empty())
    275                     .expect("request"),
    276             )
    277             .await
    278             .expect("ready");
    279 
    280         assert_eq!(ready.status(), StatusCode::OK);
    281         let ready_body = to_bytes(ready.into_body(), usize::MAX).await.expect("body");
    282         let ready_value = serde_json::from_slice::<serde_json::Value>(&ready_body).expect("json");
    283         assert_eq!(ready_value["status"], "ready");
    284         assert_eq!(ready_value["checks"]["server_bind"], "ready");
    285         assert_eq!(ready_value["checks"]["group_outbox_replay"], "ready");
    286         assert_eq!(ready_value["checks"]["event_bus"], "ready");
    287         let metrics_response = base_relay_ops_router(readiness, metrics)
    288             .oneshot(
    289                 Request::builder()
    290                     .uri("/metricsz")
    291                     .body(axum::body::Body::empty())
    292                     .expect("request"),
    293             )
    294             .await
    295             .expect("metrics");
    296 
    297         assert_eq!(metrics_response.status(), StatusCode::OK);
    298         let metrics_body = to_bytes(metrics_response.into_body(), usize::MAX)
    299             .await
    300             .expect("body");
    301         let metrics_value =
    302             serde_json::from_slice::<serde_json::Value>(&metrics_body).expect("json");
    303         let keys = metrics_value
    304             .as_object()
    305             .expect("metrics object")
    306             .keys()
    307             .cloned()
    308             .collect::<std::collections::BTreeSet<_>>();
    309         assert_eq!(
    310             keys,
    311             [
    312                 "tangle_auth_failure_total",
    313                 "tangle_auth_messages_total",
    314                 "tangle_auth_success_total",
    315                 "tangle_client_messages_total",
    316                 "tangle_close_messages_total",
    317                 "tangle_count_messages_total",
    318                 "tangle_count_refusals_total",
    319                 "tangle_disk_used_bytes",
    320                 "tangle_event_admission_latency_count",
    321                 "tangle_event_admission_latency_total_micros",
    322                 "tangle_event_admitted_total",
    323                 "tangle_event_bus_lagged_offsets_total",
    324                 "tangle_event_bus_lagged_receivers_total",
    325                 "tangle_event_bus_published_offsets_total",
    326                 "tangle_event_bus_receivers_current",
    327                 "tangle_event_messages_total",
    328                 "tangle_event_rejected_total",
    329                 "tangle_group_read_denied_total",
    330                 "tangle_group_write_denied_total",
    331                 "tangle_outbound_queue_full_closes_total",
    332                 "tangle_outbox_pending_events",
    333                 "tangle_outbox_replayed_events_total",
    334                 "tangle_broad_query_rejections_total",
    335                 "tangle_query_candidates_scanned_total",
    336                 "tangle_query_latency_count",
    337                 "tangle_query_latency_total_micros",
    338                 "tangle_query_redacted_events_total",
    339                 "tangle_query_returned_events_total",
    340                 "tangle_rate_limit_rejections_total",
    341                 "tangle_readiness_ready",
    342                 "tangle_req_messages_total",
    343                 "tangle_runtime_uptime_seconds",
    344                 "tangle_stored_event_offsets_total",
    345                 "tangle_subscriptions_closed_total",
    346                 "tangle_subscriptions_opened_total",
    347                 "tangle_ws_connections_current",
    348                 "tangle_ws_connections_total",
    349             ]
    350             .into_iter()
    351             .map(str::to_owned)
    352             .collect::<std::collections::BTreeSet<_>>()
    353         );
    354         assert_eq!(metrics_value["tangle_readiness_ready"], true);
    355         assert_eq!(metrics_value["tangle_ws_connections_current"], 1);
    356         assert_eq!(metrics_value["tangle_ws_connections_total"], 1);
    357         assert_eq!(metrics_value["tangle_client_messages_total"], 1);
    358         assert_eq!(metrics_value["tangle_req_messages_total"], 1);
    359         assert_eq!(metrics_value["tangle_subscriptions_opened_total"], 1);
    360         assert_eq!(metrics_value["tangle_auth_success_total"], 1);
    361         assert_eq!(metrics_value["tangle_auth_failure_total"], 1);
    362         assert_eq!(metrics_value["tangle_event_admitted_total"], 1);
    363         assert_eq!(metrics_value["tangle_event_rejected_total"], 1);
    364         assert_eq!(metrics_value["tangle_group_read_denied_total"], 1);
    365         assert_eq!(metrics_value["tangle_group_write_denied_total"], 1);
    366         assert_eq!(metrics_value["tangle_event_bus_receivers_current"], 2);
    367         assert_eq!(metrics_value["tangle_event_bus_published_offsets_total"], 1);
    368         assert_eq!(metrics_value["tangle_event_bus_lagged_receivers_total"], 1);
    369         assert_eq!(metrics_value["tangle_event_bus_lagged_offsets_total"], 3);
    370         assert_eq!(metrics_value["tangle_outbound_queue_full_closes_total"], 1);
    371         assert_eq!(metrics_value["tangle_outbox_pending_events"], 5);
    372         assert_eq!(metrics_value["tangle_outbox_replayed_events_total"], 1);
    373         assert_eq!(metrics_value["tangle_disk_used_bytes"], 89);
    374         assert_eq!(
    375             metrics_value["tangle_event_admission_latency_total_micros"],
    376             11
    377         );
    378         assert_eq!(metrics_value["tangle_event_admission_latency_count"], 1);
    379         assert_eq!(metrics_value["tangle_query_latency_total_micros"], 17);
    380         assert_eq!(metrics_value["tangle_query_latency_count"], 1);
    381         assert_eq!(metrics_value["tangle_query_candidates_scanned_total"], 5);
    382         assert_eq!(metrics_value["tangle_query_returned_events_total"], 3);
    383         assert_eq!(metrics_value["tangle_query_redacted_events_total"], 2);
    384         assert_eq!(metrics_value["tangle_count_refusals_total"], 1);
    385         assert_eq!(metrics_value["tangle_broad_query_rejections_total"], 1);
    386         let metrics_text = String::from_utf8(metrics_body.to_vec()).expect("utf8");
    387         assert!(!metrics_text.contains("relay_secret"));
    388         assert!(!metrics_text.contains("invite"));
    389 
    390         let not_ready = BaseRelayReadinessState::new(
    391             BaseRelayReadinessCheckStatus::Ready,
    392             BaseRelayReadinessCheckStatus::Ready,
    393             BaseRelayReadinessCheckStatus::Ready,
    394             BaseRelayReadinessCheckStatus::Ready,
    395             BaseRelayReadinessCheckStatus::NotReady,
    396             BaseRelayReadinessCheckStatus::Ready,
    397             BaseRelayReadinessCheckStatus::Ready,
    398         );
    399         let rejected = base_relay_ops_router(
    400             BaseRelayReadinessHandle::new(not_ready),
    401             TangleRuntimeMetrics::new(),
    402         )
    403         .oneshot(
    404             Request::builder()
    405                 .uri("/readyz")
    406                 .body(axum::body::Body::empty())
    407                 .expect("request"),
    408         )
    409         .await
    410         .expect("not ready");
    411 
    412         assert_eq!(rejected.status(), StatusCode::SERVICE_UNAVAILABLE);
    413         let rejected_body = to_bytes(rejected.into_body(), usize::MAX)
    414             .await
    415             .expect("body");
    416         let rejected_value =
    417             serde_json::from_slice::<serde_json::Value>(&rejected_body).expect("json");
    418         assert_eq!(rejected_value["status"], "not_ready");
    419         assert_eq!(rejected_value["checks"]["server_bind"], "ready");
    420         assert_eq!(rejected_value["checks"]["group_projection"], "not_ready");
    421     }
    422 
    423     #[tokio::test]
    424     async fn base_relay_ops_router_reports_live_readiness_state() {
    425         let readiness =
    426             BaseRelayReadinessHandle::new(BaseRelayReadinessState::runtime_ready_before_bind());
    427         let router = base_relay_ops_router(readiness.clone(), TangleRuntimeMetrics::new());
    428         let not_ready = router
    429             .clone()
    430             .oneshot(
    431                 Request::builder()
    432                     .uri("/readyz")
    433                     .body(axum::body::Body::empty())
    434                     .expect("request"),
    435             )
    436             .await
    437             .expect("not ready");
    438 
    439         assert_eq!(not_ready.status(), StatusCode::SERVICE_UNAVAILABLE);
    440         readiness.set_server_bind(BaseRelayReadinessCheckStatus::Ready);
    441         let ready = router
    442             .oneshot(
    443                 Request::builder()
    444                     .uri("/readyz")
    445                     .body(axum::body::Body::empty())
    446                     .expect("request"),
    447             )
    448             .await
    449             .expect("ready");
    450 
    451         assert_eq!(ready.status(), StatusCode::OK);
    452         let body = to_bytes(ready.into_body(), usize::MAX).await.expect("body");
    453         let value = serde_json::from_slice::<serde_json::Value>(&body).expect("json");
    454         assert_eq!(value["status"], "ready");
    455         assert_eq!(value["checks"]["event_bus"], "ready");
    456     }
    457 }