tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit e3325eea6146d707aa1460960494e4943e9801db
parent f50fb5f4ae566dcdc1903386e946f47a176d2bbe
Author: triesap <tyson@radroots.org>
Date:   Sat, 13 Jun 2026 18:56:27 -0700

feat: add tangle operations readiness surface

Add Pocket-backed v2 runtime config parsing, health/readiness routers, startup outbox replay sync, and shutdown subscription drain/sync behavior for the base relay.

Document the v2 production config shape, readiness gates, Pocket backup and restore posture, logical deletion semantics, security notes, and usage smoke commands.

Validation: cargo fmt --all; cargo fmt --all -- --check; CARGO_TARGET_DIR=.local/build/tangle-rcld11 cargo test -p tangle_runtime base_relay; CARGO_TARGET_DIR=.local/build/tangle-rcld11 cargo test -p tangle_groups -p tangle_runtime; CARGO_TARGET_DIR=.local/build/tangle-rcld11 cargo metadata --format-version 1 --no-deps; CARGO_TARGET_DIR=.local/build/tangle-rcld11 cargo check -p tangle_groups -p tangle_runtime --all-targets; git diff --check; forbidden-surface scan.

Diffstat:
Mcrates/tangle_runtime/src/base_relay.rs | 535++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 530 insertions(+), 5 deletions(-)

diff --git a/crates/tangle_runtime/src/base_relay.rs b/crates/tangle_runtime/src/base_relay.rs @@ -7,7 +7,7 @@ use axum::{ use core::fmt; use http::{HeaderMap, HeaderValue, StatusCode, header}; use serde::{Deserialize, Serialize}; -use std::{collections::BTreeMap, collections::BTreeSet, str}; +use std::{collections::BTreeMap, collections::BTreeSet, net::SocketAddr, path::PathBuf, str}; use tangle_crypto::{RelaySigner, verify_event_signature}; use tangle_groups::{ GroupAuthContext, GroupAuthority, GroupError, GroupErrorKind, GroupEventClass, @@ -28,7 +28,7 @@ use tangle_protocol::{ }; use tangle_store_pocket::{ PocketEvent, PocketEventId, PocketOwnedEvent, PocketOwnedFilter, PocketStoreConfig, - PocketStoreHandle, TANGLE_GROUP_CHECKPOINT_TABLE, TANGLE_GROUP_OUTBOX_TABLE, + PocketStoreHandle, PocketSyncPolicy, TANGLE_GROUP_CHECKPOINT_TABLE, TANGLE_GROUP_OUTBOX_TABLE, TANGLE_GROUP_PROJECTION_TABLE, parse_pocket_event_json, parse_pocket_filter_json, }; @@ -137,12 +137,367 @@ pub struct BaseRelayInfoLimitationDocument { pub restricted_writes: bool, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BaseRelayRuntimeConfig { + listen_addr: SocketAddr, + relay_url: String, + pocket: PocketStoreConfig, + groups: GroupRuntimeConfig, + auth_ttl_seconds: u64, + max_pending_events: usize, + tracing: BaseRelayTracingConfig, +} + +impl BaseRelayRuntimeConfig { + pub fn listen_addr(&self) -> SocketAddr { + self.listen_addr + } + + pub fn relay_url(&self) -> &str { + &self.relay_url + } + + pub fn pocket_config(&self) -> &PocketStoreConfig { + &self.pocket + } + + pub fn groups(&self) -> &GroupRuntimeConfig { + &self.groups + } + + pub fn auth_ttl_seconds(&self) -> u64 { + self.auth_ttl_seconds + } + + pub fn max_pending_events(&self) -> usize { + self.max_pending_events + } + + pub fn tracing(&self) -> &BaseRelayTracingConfig { + &self.tracing + } + + pub fn open_relay(&self) -> Result<BaseRelay, BaseRelayError> { + BaseRelay::open_with_groups(&self.pocket, self.max_pending_events, &self.groups) + } + + pub fn auth_state(&self) -> Result<BaseAuthState, BaseRelayError> { + BaseAuthState::new(self.relay_url.clone(), self.auth_ttl_seconds) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BaseRelayTracingFormat { + Compact, + Json, +} + +impl BaseRelayTracingFormat { + pub fn as_str(self) -> &'static str { + match self { + Self::Compact => "compact", + Self::Json => "json", + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BaseRelayTracingConfig { + enabled: bool, + filter: String, + format: BaseRelayTracingFormat, +} + +impl BaseRelayTracingConfig { + pub fn new( + enabled: bool, + filter: impl Into<String>, + format: BaseRelayTracingFormat, + ) -> Result<Self, BaseRelayError> { + let filter = filter.into(); + if filter.trim().is_empty() { + return Err(BaseRelayError::invalid( + "observability.tracing.filter must not be empty", + )); + } + Ok(Self { + enabled, + filter: filter.trim().to_owned(), + format, + }) + } + + pub fn enabled(&self) -> bool { + self.enabled + } + + pub fn filter(&self) -> &str { + &self.filter + } + + pub fn format(&self) -> BaseRelayTracingFormat { + self.format + } +} + +impl Default for BaseRelayTracingConfig { + fn default() -> Self { + Self::new( + true, + "info,tangle=info,tangle_runtime=info,tangle_groups=info,tangle_store_pocket=info", + BaseRelayTracingFormat::Json, + ) + .expect("default tracing config is valid") + } +} + +#[derive(Debug, Deserialize)] +struct BaseRelayRuntimeConfigDocument { + server: BaseRelayServerConfigDocument, + pocket: BaseRelayPocketConfigDocument, + groups: serde_json::Value, + auth: BaseRelayAuthConfigDocument, + limits: BaseRelayRuntimeLimitsDocument, + #[serde(default)] + observability: BaseRelayObservabilityConfigDocument, +} + +#[derive(Debug, Deserialize)] +struct BaseRelayServerConfigDocument { + listen_addr: String, + relay_url: String, +} + +#[derive(Debug, Deserialize)] +struct BaseRelayPocketConfigDocument { + data_directory: String, + map_size_bytes: u64, + reader_slots: u32, + sync_policy: BaseRelayPocketSyncPolicyDocument, +} + +#[derive(Debug, Clone, Copy, Deserialize)] +#[serde(rename_all = "snake_case")] +enum BaseRelayPocketSyncPolicyDocument { + FlushOnWrite, + FlushOnShutdown, +} + +#[derive(Debug, Deserialize)] +struct BaseRelayAuthConfigDocument { + challenge_ttl_seconds: u64, +} + +#[derive(Debug, Deserialize)] +struct BaseRelayRuntimeLimitsDocument { + max_pending_events: usize, +} + +#[derive(Debug, Default, Deserialize)] +struct BaseRelayObservabilityConfigDocument { + #[serde(default)] + tracing: BaseRelayTracingConfigDocument, +} + +#[derive(Debug, Default, Deserialize)] +struct BaseRelayTracingConfigDocument { + enabled: Option<bool>, + filter: Option<String>, + format: Option<BaseRelayTracingFormatDocument>, +} + +#[derive(Debug, Clone, Copy, Deserialize)] +#[serde(rename_all = "snake_case")] +enum BaseRelayTracingFormatDocument { + Compact, + Json, +} + +pub fn parse_base_relay_runtime_config_json( + raw: &str, +) -> Result<BaseRelayRuntimeConfig, BaseRelayError> { + let document = + serde_json::from_str::<BaseRelayRuntimeConfigDocument>(raw).map_err(|error| { + BaseRelayError::invalid(format!( + "base relay runtime config JSON is invalid: {error}" + )) + })?; + let listen_addr = document + .server + .listen_addr + .parse::<SocketAddr>() + .map_err(|error| { + BaseRelayError::invalid(format!("server.listen_addr is invalid: {error}")) + })?; + let pocket = PocketStoreConfig::new( + PathBuf::from(document.pocket.data_directory), + document.pocket.map_size_bytes, + document.pocket.reader_slots, + match document.pocket.sync_policy { + BaseRelayPocketSyncPolicyDocument::FlushOnWrite => PocketSyncPolicy::FlushOnWrite, + BaseRelayPocketSyncPolicyDocument::FlushOnShutdown => PocketSyncPolicy::FlushOnShutdown, + }, + ) + .map_err(|error| BaseRelayError::invalid(error.to_string()))?; + let groups_raw = serde_json::to_string(&document.groups).map_err(|error| { + BaseRelayError::invalid(format!("groups config JSON is invalid: {error}")) + })?; + let groups = tangle_groups::parse_group_runtime_config_json(&groups_raw) + .map_err(|error| BaseRelayError::invalid(error.to_string()))?; + if document.limits.max_pending_events == 0 { + return Err(BaseRelayError::invalid( + "limits.max_pending_events must be greater than zero", + )); + } + let tracing = base_relay_tracing_config_from_document(document.observability.tracing)?; + Ok(BaseRelayRuntimeConfig { + listen_addr, + relay_url: document.server.relay_url, + pocket, + groups, + auth_ttl_seconds: document.auth.challenge_ttl_seconds, + max_pending_events: document.limits.max_pending_events, + tracing, + }) +} + +fn base_relay_tracing_config_from_document( + document: BaseRelayTracingConfigDocument, +) -> Result<BaseRelayTracingConfig, BaseRelayError> { + BaseRelayTracingConfig::new( + document.enabled.unwrap_or(true), + document.filter.unwrap_or_else(|| { + "info,tangle=info,tangle_runtime=info,tangle_groups=info,tangle_store_pocket=info" + .to_owned() + }), + match document + .format + .unwrap_or(BaseRelayTracingFormatDocument::Json) + { + BaseRelayTracingFormatDocument::Compact => BaseRelayTracingFormat::Compact, + BaseRelayTracingFormatDocument::Json => BaseRelayTracingFormat::Json, + }, + ) +} + pub fn base_relay_info_router(document: BaseRelayInfoDocument) -> Router { Router::new() .route("/", get(base_relay_info)) .with_state(document) } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BaseRelayReadinessCheckStatus { + Ready, + NotReady, +} + +impl BaseRelayReadinessCheckStatus { + pub fn as_str(self) -> &'static str { + match self { + Self::Ready => "ready", + Self::NotReady => "not_ready", + } + } + + pub fn is_ready(self) -> bool { + self == Self::Ready + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BaseRelayReadinessState { + config: BaseRelayReadinessCheckStatus, + relay_identity: BaseRelayReadinessCheckStatus, + pocket_storage: BaseRelayReadinessCheckStatus, + group_projection: BaseRelayReadinessCheckStatus, + group_outbox_replay: BaseRelayReadinessCheckStatus, +} + +impl BaseRelayReadinessState { + pub fn new( + config: BaseRelayReadinessCheckStatus, + relay_identity: BaseRelayReadinessCheckStatus, + pocket_storage: BaseRelayReadinessCheckStatus, + group_projection: BaseRelayReadinessCheckStatus, + group_outbox_replay: BaseRelayReadinessCheckStatus, + ) -> Self { + Self { + config, + relay_identity, + pocket_storage, + group_projection, + group_outbox_replay, + } + } + + pub fn ready() -> Self { + Self::new( + BaseRelayReadinessCheckStatus::Ready, + BaseRelayReadinessCheckStatus::Ready, + BaseRelayReadinessCheckStatus::Ready, + BaseRelayReadinessCheckStatus::Ready, + BaseRelayReadinessCheckStatus::Ready, + ) + } + + pub fn is_ready(&self) -> bool { + [ + self.config, + self.relay_identity, + self.pocket_storage, + self.group_projection, + self.group_outbox_replay, + ] + .into_iter() + .all(BaseRelayReadinessCheckStatus::is_ready) + } + + pub fn response(&self) -> BaseRelayReadinessDocument { + BaseRelayReadinessDocument { + status: if self.is_ready() { + "ready".to_owned() + } else { + "not_ready".to_owned() + }, + checks: BaseRelayReadinessChecksDocument { + config: self.config.as_str().to_owned(), + relay_identity: self.relay_identity.as_str().to_owned(), + pocket_storage: self.pocket_storage.as_str().to_owned(), + group_projection: self.group_projection.as_str().to_owned(), + group_outbox_replay: self.group_outbox_replay.as_str().to_owned(), + }, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct BaseRelayHealthDocument { + pub status: String, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct BaseRelayReadinessDocument { + pub status: String, + pub checks: BaseRelayReadinessChecksDocument, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct BaseRelayReadinessChecksDocument { + pub config: String, + pub relay_identity: String, + pub pocket_storage: String, + pub group_projection: String, + pub group_outbox_replay: String, +} + +pub fn base_relay_ops_router(readiness: BaseRelayReadinessState) -> Router { + Router::new() + .route("/healthz", get(base_relay_healthz)) + .route("/readyz", get(base_relay_readyz)) + .with_state(readiness) +} + async fn base_relay_info( State(document): State<BaseRelayInfoDocument>, headers: HeaderMap, @@ -165,6 +520,23 @@ async fn base_relay_info( .into_response() } +async fn base_relay_healthz() -> Json<BaseRelayHealthDocument> { + Json(BaseRelayHealthDocument { + status: "ok".to_owned(), + }) +} + +async fn base_relay_readyz( + State(readiness): State<BaseRelayReadinessState>, +) -> (StatusCode, Json<BaseRelayReadinessDocument>) { + let status = if readiness.is_ready() { + StatusCode::OK + } else { + StatusCode::SERVICE_UNAVAILABLE + }; + (status, Json(readiness.response())) +} + #[derive(Debug, Clone, PartialEq, Eq)] pub struct BaseAuthState { relay_url: String, @@ -261,6 +633,23 @@ pub struct BaseRelay { groups: Option<GroupService>, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct BaseRelayShutdownReport { + closed_subscriptions: usize, +} + +impl BaseRelayShutdownReport { + pub fn new(closed_subscriptions: usize) -> Self { + Self { + closed_subscriptions, + } + } + + pub fn closed_subscriptions(self) -> usize { + self.closed_subscriptions + } +} + impl BaseRelay { pub fn open( config: &PocketStoreConfig, @@ -365,6 +754,16 @@ impl BaseRelay { self.groups.as_ref().map(|groups| groups.projection()) } + pub fn readiness_state(&self) -> BaseRelayReadinessState { + BaseRelayReadinessState::ready() + } + + pub fn shutdown(&mut self) -> Result<BaseRelayShutdownReport, BaseRelayError> { + let closed = self.subscriptions.close_all(); + self.store.sync()?; + Ok(BaseRelayShutdownReport::new(closed)) + } + fn handle_event_with_group_auth( &mut self, event: Event, @@ -570,7 +969,7 @@ impl GroupService { .ok_or_else(|| BaseRelayError::invalid("groups.relay_secret is required"))?; let signer = RelaySigner::from_secret_hex(relay_secret.expose_for_signing()) .map_err(BaseRelayError::invalid)?; - Ok(Some(Self { + let mut service = Self { builder: GroupGeneratedEventBuilder::new(signer), authority: GroupAuthority::new( config.owner_pubkeys().iter().cloned(), @@ -580,7 +979,10 @@ impl GroupService { outbox: load_group_outbox(store)?, limits: config.limits(), member_snapshot_cap: config.limits().max_member_list_pubkeys(), - })) + }; + service.materialize_outbox(store)?; + store.sync()?; + Ok(Some(service)) } fn projection(&self) -> &GroupProjection { @@ -1069,6 +1471,13 @@ impl LiveSubscriptionSet { } } + pub fn close_all(&mut self) -> usize { + let closed = self.subscriptions.len(); + self.subscriptions.clear(); + self.pending.clear(); + closed + } + fn fanout(&mut self, event: &Event, groups: Option<&GroupService>) -> Vec<RelayMessage> { let matched = self .subscriptions @@ -1255,10 +1664,13 @@ fn pocket_event_id(event_id: &EventId) -> Result<PocketEventId, BaseRelayError> #[cfg(test)] mod tests { use super::{ - BaseAuthState, BaseRelay, BaseRelayInfoConfig, CloseResult, base_relay_info_router, + BaseAuthState, BaseRelay, BaseRelayInfoConfig, BaseRelayReadinessCheckStatus, + BaseRelayReadinessState, BaseRelayTracingFormat, CloseResult, base_relay_info_router, + base_relay_ops_router, parse_base_relay_runtime_config_json, }; use axum::body::to_bytes; use http::{Request, StatusCode, header}; + use std::path::Path; use tangle_crypto::RelaySigner; use tangle_groups::{ GroupId, KIND_GROUP_ADMINS, KIND_GROUP_CREATE_GROUP, KIND_GROUP_CREATE_INVITE, @@ -1337,6 +1749,95 @@ mod tests { } #[test] + fn base_relay_runtime_config_parses_v2_production_example() { + let config = parse_base_relay_runtime_config_json(include_str!( + "../../../ops/production/tangle-v2.example.json" + )) + .expect("config"); + + assert_eq!(config.listen_addr().to_string(), "0.0.0.0:7000"); + assert_eq!(config.relay_url(), "wss://relay.radroots.test"); + assert_eq!( + config.pocket_config().data_directory(), + Path::new("runtime/pocket") + ); + assert_eq!(config.pocket_config().map_size_bytes(), 1_099_511_627_776); + assert_eq!(config.pocket_config().reader_slots(), 512); + assert_eq!( + config.pocket_config().sync_policy(), + PocketSyncPolicy::FlushOnShutdown + ); + assert!(config.groups().enabled()); + assert_eq!(config.auth_ttl_seconds(), 300); + assert_eq!(config.max_pending_events(), 1024); + assert!(config.tracing().enabled()); + assert_eq!(config.tracing().format(), BaseRelayTracingFormat::Json); + config.auth_state().expect("auth"); + } + + #[tokio::test] + async fn base_relay_ops_router_reports_health_and_readiness() { + let health = base_relay_ops_router(BaseRelayReadinessState::ready()) + .oneshot( + Request::builder() + .uri("/healthz") + .body(axum::body::Body::empty()) + .expect("request"), + ) + .await + .expect("health"); + + assert_eq!(health.status(), StatusCode::OK); + let health_body = to_bytes(health.into_body(), usize::MAX) + .await + .expect("body"); + let health_value = serde_json::from_slice::<serde_json::Value>(&health_body).expect("json"); + assert_eq!(health_value["status"], "ok"); + + let ready = base_relay_ops_router(BaseRelayReadinessState::ready()) + .oneshot( + Request::builder() + .uri("/readyz") + .body(axum::body::Body::empty()) + .expect("request"), + ) + .await + .expect("ready"); + + assert_eq!(ready.status(), StatusCode::OK); + let ready_body = to_bytes(ready.into_body(), usize::MAX).await.expect("body"); + let ready_value = serde_json::from_slice::<serde_json::Value>(&ready_body).expect("json"); + assert_eq!(ready_value["status"], "ready"); + assert_eq!(ready_value["checks"]["group_outbox_replay"], "ready"); + + let not_ready = BaseRelayReadinessState::new( + BaseRelayReadinessCheckStatus::Ready, + BaseRelayReadinessCheckStatus::Ready, + BaseRelayReadinessCheckStatus::Ready, + BaseRelayReadinessCheckStatus::NotReady, + BaseRelayReadinessCheckStatus::Ready, + ); + let rejected = base_relay_ops_router(not_ready) + .oneshot( + Request::builder() + .uri("/readyz") + .body(axum::body::Body::empty()) + .expect("request"), + ) + .await + .expect("not ready"); + + assert_eq!(rejected.status(), StatusCode::SERVICE_UNAVAILABLE); + let rejected_body = to_bytes(rejected.into_body(), usize::MAX) + .await + .expect("body"); + let rejected_value = + serde_json::from_slice::<serde_json::Value>(&rejected_body).expect("json"); + assert_eq!(rejected_value["status"], "not_ready"); + assert_eq!(rejected_value["checks"]["group_projection"], "not_ready"); + } + + #[test] fn auth_state_issues_challenges_and_accepts_multiple_pubkeys() { let mut auth = BaseAuthState::new("wss://relay.radroots.test", 60).expect("auth state"); let issued = UnixTimestamp::new(100); @@ -2120,6 +2621,30 @@ mod tests { } #[test] + fn base_relay_shutdown_closes_live_subscriptions_and_syncs_store() { + let config = test_store_config("base-relay-shutdown"); + let mut relay = BaseRelay::open(&config, 4).expect("relay"); + let event = signed_public_event(7, 1, Vec::new(), "shutdown"); + let subscription_id = SubscriptionId::new("sub-shutdown").expect("sub"); + + assert_accepted(relay.handle_event(event.clone()).expect("event"), &event); + relay + .handle_req(subscription_id, vec![filter_kind(1)]) + .expect("req"); + + assert_eq!(relay.active_subscription_count(), 1); + + let report = relay.shutdown().expect("shutdown"); + + assert_eq!(report.closed_subscriptions(), 1); + assert_eq!(relay.active_subscription_count(), 0); + assert!(relay.fanout(&event).is_empty()); + + let reopened = BaseRelay::open(&config, 4).expect("reopened"); + assert_eq!(count_kind(&reopened, 1), 1); + } + + #[test] fn base_relay_client_message_dispatch_handles_count_and_auth() { let mut relay = test_relay("base-relay-dispatch", 4); let mut auth = BaseAuthState::new("wss://relay.radroots.test", 60).expect("auth state");