tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit c7bf59b3aac5beb37223823aa5bdae6f33b0ccad
parent a4131641c7f27893b3825254fe58648fbf3aff53
Author: triesap <tyson@radroots.org>
Date:   Sun, 14 Jun 2026 12:57:20 -0700

ops: make readiness live

- introduce a shared readiness handle for ops routes
- expose event bus readiness in readyz checks
- update server startup to mark the live bind state
- cover live readiness changes and recovery readiness

Diffstat:
Mcrates/tangle_runtime/src/ops.rs | 119+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------
Mcrates/tangle_runtime/src/runtime.rs | 18+++++++++++++-----
Mcrates/tangle_runtime/src/server.rs | 17+++++++++--------
Mcrates/tangle_runtime/tests/ops_truthfulness.rs | 1+
Mcrates/tangle_runtime/tests/phase2_acceptance_targets.rs | 1+
5 files changed, 125 insertions(+), 31 deletions(-)

diff --git a/crates/tangle_runtime/src/ops.rs b/crates/tangle_runtime/src/ops.rs @@ -1,9 +1,11 @@ #![forbid(unsafe_code)] -use crate::runtime::{TangleRuntimeMetrics, TangleRuntimeMetricsSnapshot}; use axum::{Json, Router, extract::State, routing::get}; use http::StatusCode; use serde::{Deserialize, Serialize}; +use std::sync::{Arc, RwLock}; + +use crate::runtime::{TangleRuntimeMetrics, TangleRuntimeMetricsSnapshot}; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum BaseRelayReadinessCheckStatus { @@ -32,6 +34,7 @@ pub struct BaseRelayReadinessState { pocket_storage: BaseRelayReadinessCheckStatus, group_projection: BaseRelayReadinessCheckStatus, group_outbox_replay: BaseRelayReadinessCheckStatus, + event_bus: BaseRelayReadinessCheckStatus, } impl BaseRelayReadinessState { @@ -42,6 +45,7 @@ impl BaseRelayReadinessState { pocket_storage: BaseRelayReadinessCheckStatus, group_projection: BaseRelayReadinessCheckStatus, group_outbox_replay: BaseRelayReadinessCheckStatus, + event_bus: BaseRelayReadinessCheckStatus, ) -> Self { Self { config, @@ -50,6 +54,7 @@ impl BaseRelayReadinessState { pocket_storage, group_projection, group_outbox_replay, + event_bus, } } @@ -61,6 +66,7 @@ impl BaseRelayReadinessState { BaseRelayReadinessCheckStatus::Ready, BaseRelayReadinessCheckStatus::Ready, BaseRelayReadinessCheckStatus::Ready, + BaseRelayReadinessCheckStatus::Ready, ) } @@ -72,6 +78,7 @@ impl BaseRelayReadinessState { BaseRelayReadinessCheckStatus::Ready, BaseRelayReadinessCheckStatus::Ready, BaseRelayReadinessCheckStatus::Ready, + BaseRelayReadinessCheckStatus::Ready, ) } @@ -88,6 +95,7 @@ impl BaseRelayReadinessState { self.pocket_storage, self.group_projection, self.group_outbox_replay, + self.event_bus, ] .into_iter() .all(BaseRelayReadinessCheckStatus::is_ready) @@ -107,11 +115,40 @@ impl BaseRelayReadinessState { pocket_storage: self.pocket_storage.as_str().to_owned(), group_projection: self.group_projection.as_str().to_owned(), group_outbox_replay: self.group_outbox_replay.as_str().to_owned(), + event_bus: self.event_bus.as_str().to_owned(), }, } } } +#[derive(Debug, Clone)] +pub struct BaseRelayReadinessHandle { + inner: Arc<RwLock<BaseRelayReadinessState>>, +} + +impl BaseRelayReadinessHandle { + pub fn new(state: BaseRelayReadinessState) -> Self { + Self { + inner: Arc::new(RwLock::new(state)), + } + } + + pub fn snapshot(&self) -> BaseRelayReadinessState { + match self.inner.read() { + Ok(state) => state.clone(), + Err(poisoned) => poisoned.into_inner().clone(), + } + } + + pub fn set_server_bind(&self, status: BaseRelayReadinessCheckStatus) { + let mut state = match self.inner.write() { + Ok(state) => state, + Err(poisoned) => poisoned.into_inner(), + }; + state.server_bind = status; + } +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct BaseRelayHealthDocument { pub status: String, @@ -131,16 +168,17 @@ pub struct BaseRelayReadinessChecksDocument { pub pocket_storage: String, pub group_projection: String, pub group_outbox_replay: String, + pub event_bus: String, } #[derive(Debug, Clone)] struct BaseRelayOpsState { - readiness: BaseRelayReadinessState, + readiness: BaseRelayReadinessHandle, metrics: TangleRuntimeMetrics, } pub fn base_relay_ops_router( - readiness: BaseRelayReadinessState, + readiness: BaseRelayReadinessHandle, metrics: TangleRuntimeMetrics, ) -> Router { Router::new() @@ -159,12 +197,13 @@ async fn base_relay_healthz() -> Json<BaseRelayHealthDocument> { async fn base_relay_readyz( State(state): State<BaseRelayOpsState>, ) -> (StatusCode, Json<BaseRelayReadinessDocument>) { - let status = if state.readiness.is_ready() { + let readiness = state.readiness.snapshot(); + let status = if readiness.is_ready() { StatusCode::OK } else { StatusCode::SERVICE_UNAVAILABLE }; - (status, Json(state.readiness.response())) + (status, Json(readiness.response())) } async fn base_relay_metricsz( @@ -175,7 +214,10 @@ async fn base_relay_metricsz( #[cfg(test)] mod tests { - use super::{BaseRelayReadinessCheckStatus, BaseRelayReadinessState, base_relay_ops_router}; + use super::{ + BaseRelayReadinessCheckStatus, BaseRelayReadinessHandle, BaseRelayReadinessState, + base_relay_ops_router, + }; use crate::runtime::TangleRuntimeMetrics; use axum::body::to_bytes; use http::{Request, StatusCode}; @@ -184,7 +226,8 @@ mod tests { #[tokio::test] async fn base_relay_ops_router_reports_health_and_readiness() { let metrics = TangleRuntimeMetrics::new(); - let health = base_relay_ops_router(BaseRelayReadinessState::ready(), metrics.clone()) + let readiness = BaseRelayReadinessHandle::new(BaseRelayReadinessState::ready()); + let health = base_relay_ops_router(readiness.clone(), metrics.clone()) .oneshot( Request::builder() .uri("/healthz") @@ -204,7 +247,7 @@ mod tests { metrics.record_session_opened(); metrics.record_client_message(crate::runtime::TangleClientMessageMetricKind::Req); metrics.record_subscription_opened(); - let ready = base_relay_ops_router(BaseRelayReadinessState::ready(), metrics.clone()) + let ready = base_relay_ops_router(readiness.clone(), metrics.clone()) .oneshot( Request::builder() .uri("/readyz") @@ -220,7 +263,8 @@ mod tests { assert_eq!(ready_value["status"], "ready"); assert_eq!(ready_value["checks"]["server_bind"], "ready"); assert_eq!(ready_value["checks"]["group_outbox_replay"], "ready"); - let metrics_response = base_relay_ops_router(BaseRelayReadinessState::ready(), metrics) + assert_eq!(ready_value["checks"]["event_bus"], "ready"); + let metrics_response = base_relay_ops_router(readiness, metrics) .oneshot( Request::builder() .uri("/metricsz") @@ -249,16 +293,20 @@ mod tests { BaseRelayReadinessCheckStatus::Ready, BaseRelayReadinessCheckStatus::NotReady, BaseRelayReadinessCheckStatus::Ready, + BaseRelayReadinessCheckStatus::Ready, ); - let rejected = base_relay_ops_router(not_ready, TangleRuntimeMetrics::new()) - .oneshot( - Request::builder() - .uri("/readyz") - .body(axum::body::Body::empty()) - .expect("request"), - ) - .await - .expect("not ready"); + let rejected = base_relay_ops_router( + BaseRelayReadinessHandle::new(not_ready), + TangleRuntimeMetrics::new(), + ) + .oneshot( + Request::builder() + .uri("/readyz") + .body(axum::body::Body::empty()) + .expect("request"), + ) + .await + .expect("not ready"); assert_eq!(rejected.status(), StatusCode::SERVICE_UNAVAILABLE); let rejected_body = to_bytes(rejected.into_body(), usize::MAX) @@ -270,4 +318,39 @@ mod tests { assert_eq!(rejected_value["checks"]["server_bind"], "ready"); assert_eq!(rejected_value["checks"]["group_projection"], "not_ready"); } + + #[tokio::test] + async fn base_relay_ops_router_reports_live_readiness_state() { + let readiness = + BaseRelayReadinessHandle::new(BaseRelayReadinessState::runtime_ready_before_bind()); + let router = base_relay_ops_router(readiness.clone(), TangleRuntimeMetrics::new()); + let not_ready = router + .clone() + .oneshot( + Request::builder() + .uri("/readyz") + .body(axum::body::Body::empty()) + .expect("request"), + ) + .await + .expect("not ready"); + + assert_eq!(not_ready.status(), StatusCode::SERVICE_UNAVAILABLE); + readiness.set_server_bind(BaseRelayReadinessCheckStatus::Ready); + let ready = router + .oneshot( + Request::builder() + .uri("/readyz") + .body(axum::body::Body::empty()) + .expect("request"), + ) + .await + .expect("ready"); + + assert_eq!(ready.status(), StatusCode::OK); + let body = to_bytes(ready.into_body(), usize::MAX).await.expect("body"); + let value = serde_json::from_slice::<serde_json::Value>(&body).expect("json"); + assert_eq!(value["status"], "ready"); + assert_eq!(value["checks"]["event_bus"], "ready"); + } } diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs @@ -5,7 +5,7 @@ use crate::{ errors::BaseRelayError, event_bus::{TangleEventBus, TangleEventReceiver}, logging, - ops::BaseRelayReadinessState, + ops::{BaseRelayReadinessHandle, BaseRelayReadinessState}, rate_limits::{ TangleQueryRateLimitConfig, TangleRateLimitDecision, TangleRateLimitKey, TangleRateLimitQueryClass, TangleRateLimitRule, TangleRateLimitScope, TangleRateLimiter, @@ -38,7 +38,7 @@ use tokio::sync::{Mutex, watch}; pub struct TangleRuntime { config: BaseRelayRuntimeConfig, relay: BaseRelay, - readiness: BaseRelayReadinessState, + readiness: BaseRelayReadinessHandle, limits: TangleRuntimeLimits, event_bus: TangleEventBus, rate_limiter: TangleRateLimiter, @@ -76,7 +76,7 @@ impl TangleRuntime { pub fn open(config: BaseRelayRuntimeConfig) -> Result<Self, BaseRelayError> { let limits = TangleRuntimeLimits::from_config(&config)?; let relay = config.open_relay()?; - let readiness = relay.readiness_state(); + let readiness = BaseRelayReadinessHandle::new(relay.readiness_state()); let rate_limiter = TangleRateLimiter::new(); logging::log_runtime_opened(&config); Ok(Self { @@ -107,8 +107,12 @@ impl TangleRuntime { self.config.auth_state() } - pub fn readiness_state(&self) -> &BaseRelayReadinessState { - &self.readiness + pub fn readiness_state(&self) -> BaseRelayReadinessState { + self.readiness.snapshot() + } + + pub fn readiness_handle(&self) -> BaseRelayReadinessHandle { + self.readiness.clone() } pub fn limits(&self) -> TangleRuntimeLimits { @@ -1108,6 +1112,10 @@ mod tests { .group_outbox_replay, "ready" ); + assert_eq!( + runtime.readiness_state().response().checks.event_bus, + "ready" + ); assert!(!*shutdown.borrow()); assert_eq!(runtime.event_bus().publish(StoreOffset::new(42)), 1); diff --git a/crates/tangle_runtime/src/server.rs b/crates/tangle_runtime/src/server.rs @@ -4,7 +4,7 @@ use crate::{ errors::BaseRelayError, logging, nip11::{BaseRelayInfoConfig, BaseRelayInfoDocument, base_relay_info_response}, - ops::{BaseRelayReadinessCheckStatus, BaseRelayReadinessState, base_relay_ops_router}, + ops::{BaseRelayReadinessCheckStatus, BaseRelayReadinessHandle, base_relay_ops_router}, runtime::{ TangleRuntime, TangleRuntimeHandle, TangleRuntimeLimits, TangleRuntimeMetrics, TangleShutdownSignal, @@ -65,10 +65,8 @@ pub async fn serve_listener_until_shutdown( .map_err(|error| BaseRelayError::error(error.to_string()))?; let relay_url = runtime.config().relay_url().to_owned(); let info = BaseRelayInfoConfig::new("tangle", runtime.config())?.build_document()?; - let readiness = runtime - .readiness_state() - .clone() - .with_server_bind(BaseRelayReadinessCheckStatus::Ready); + let readiness = runtime.readiness_handle(); + readiness.set_server_bind(BaseRelayReadinessCheckStatus::Ready); let limits = runtime.limits(); let metrics = runtime.metrics().clone(); let shutdown_signal = runtime.shutdown_signal().clone(); @@ -108,7 +106,7 @@ pub async fn serve_listener_until_shutdown( } pub fn tangle_http_router( - readiness: BaseRelayReadinessState, + readiness: BaseRelayReadinessHandle, info: BaseRelayInfoDocument, limits: TangleRuntimeLimits, metrics: TangleRuntimeMetrics, @@ -175,7 +173,7 @@ mod tests { use crate::{ config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json}, nip11::BaseRelayInfoConfig, - ops::BaseRelayReadinessState, + ops::BaseRelayReadinessCheckStatus, runtime::{TangleRuntime, TangleRuntimeHandle, TangleShutdownSignal}, }; use axum::{body::to_bytes, extract::ConnectInfo}; @@ -399,10 +397,12 @@ mod tests { .build_document() .expect("info"); let runtime = TangleRuntime::open(config).expect("runtime"); + let readiness = runtime.readiness_handle(); + readiness.set_server_bind(BaseRelayReadinessCheckStatus::Ready); let limits = runtime.limits(); let metrics = runtime.metrics().clone(); let router = tangle_http_router( - BaseRelayReadinessState::ready(), + readiness, info, limits, metrics, @@ -496,6 +496,7 @@ mod tests { let ready_body = to_bytes(ready.into_body(), usize::MAX).await.expect("body"); let ready_value = serde_json::from_slice::<serde_json::Value>(&ready_body).expect("json"); assert_eq!(ready_value["checks"]["server_bind"], "ready"); + assert_eq!(ready_value["checks"]["event_bus"], "ready"); assert_eq!(metrics.status(), http::StatusCode::OK); let metrics_body = to_bytes(metrics.into_body(), usize::MAX) .await diff --git a/crates/tangle_runtime/tests/ops_truthfulness.rs b/crates/tangle_runtime/tests/ops_truthfulness.rs @@ -77,6 +77,7 @@ fn operations_surfaces_match_enforced_runtime_contracts() { assert_eq!(pre_bind.checks.server_bind, "not_ready"); assert_eq!(pre_bind.checks.group_projection, "ready"); assert_eq!(pre_bind.checks.group_outbox_replay, "ready"); + assert_eq!(pre_bind.checks.event_bus, "ready"); let bound = runtime .readiness_state() .clone() diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs @@ -1418,6 +1418,7 @@ fn projection_and_outbox_recover_from_canonical_pocket_events() { let readiness = recovered.readiness_state().response(); assert_eq!(readiness.checks.group_projection, "ready"); assert_eq!(readiness.checks.group_outbox_replay, "ready"); + assert_eq!(readiness.checks.event_bus, "ready"); assert!( recovered .relay()