commit f3510f37e41d893cb41bde73a4e46f170cc6e497
parent edf8c090a3694e7a93cf26ec9654d3e0aac87e21
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 08:15:55 -0700
metrics: expose relay runtime counters
- Add a cloneable runtime metrics facade with session, message-class, subscription, stored-offset, and rate-limit rejection counters plus serializable snapshots.
- Mount /metricsz on the ops router and pass the metrics facade through the server router without holding the runtime mutex.
- Instrument runtime EVENT handling, session REQ/CLOSE paths, session lifecycle, subscriptions, stored offsets, and rate-limit rejections.
- Cover the metrics facade through runtime, session, ops-router, server-router, and phase2 listener tests.
- Validated with cargo fmt --all -- --check, cargo check --workspace --all-targets, cargo test --workspace, and cargo clippy --workspace --all-targets -- -D warnings.
Diffstat:
5 files changed, 383 insertions(+), 20 deletions(-)
diff --git a/crates/tangle_runtime/src/ops.rs b/crates/tangle_runtime/src/ops.rs
@@ -1,5 +1,6 @@
#![forbid(unsafe_code)]
+use crate::runtime::{TangleRuntimeMetrics, TangleRuntimeMetricsSnapshot};
use axum::{Json, Router, extract::State, routing::get};
use http::StatusCode;
use serde::{Deserialize, Serialize};
@@ -109,11 +110,21 @@ pub struct BaseRelayReadinessChecksDocument {
pub group_outbox_replay: String,
}
-pub fn base_relay_ops_router(readiness: BaseRelayReadinessState) -> Router {
+#[derive(Debug, Clone)]
+struct BaseRelayOpsState {
+ readiness: BaseRelayReadinessState,
+ metrics: TangleRuntimeMetrics,
+}
+
+pub fn base_relay_ops_router(
+ readiness: BaseRelayReadinessState,
+ metrics: TangleRuntimeMetrics,
+) -> Router {
Router::new()
.route("/healthz", get(base_relay_healthz))
.route("/readyz", get(base_relay_readyz))
- .with_state(readiness)
+ .route("/metricsz", get(base_relay_metricsz))
+ .with_state(BaseRelayOpsState { readiness, metrics })
}
async fn base_relay_healthz() -> Json<BaseRelayHealthDocument> {
@@ -123,26 +134,34 @@ async fn base_relay_healthz() -> Json<BaseRelayHealthDocument> {
}
async fn base_relay_readyz(
- State(readiness): State<BaseRelayReadinessState>,
+ State(state): State<BaseRelayOpsState>,
) -> (StatusCode, Json<BaseRelayReadinessDocument>) {
- let status = if readiness.is_ready() {
+ let status = if state.readiness.is_ready() {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};
- (status, Json(readiness.response()))
+ (status, Json(state.readiness.response()))
+}
+
+async fn base_relay_metricsz(
+ State(state): State<BaseRelayOpsState>,
+) -> Json<TangleRuntimeMetricsSnapshot> {
+ Json(state.metrics.snapshot())
}
#[cfg(test)]
mod tests {
use super::{BaseRelayReadinessCheckStatus, BaseRelayReadinessState, base_relay_ops_router};
+ use crate::runtime::TangleRuntimeMetrics;
use axum::body::to_bytes;
use http::{Request, StatusCode};
use tower::ServiceExt;
#[tokio::test]
async fn base_relay_ops_router_reports_health_and_readiness() {
- let health = base_relay_ops_router(BaseRelayReadinessState::ready())
+ let metrics = TangleRuntimeMetrics::new();
+ let health = base_relay_ops_router(BaseRelayReadinessState::ready(), metrics.clone())
.oneshot(
Request::builder()
.uri("/healthz")
@@ -159,7 +178,10 @@ mod tests {
let health_value = serde_json::from_slice::<serde_json::Value>(&health_body).expect("json");
assert_eq!(health_value["status"], "ok");
- let ready = base_relay_ops_router(BaseRelayReadinessState::ready())
+ metrics.record_session_opened();
+ metrics.record_client_message(crate::runtime::TangleClientMessageMetricKind::Req);
+ metrics.record_subscription_opened();
+ let ready = base_relay_ops_router(BaseRelayReadinessState::ready(), metrics.clone())
.oneshot(
Request::builder()
.uri("/readyz")
@@ -174,6 +196,27 @@ mod tests {
let ready_value = serde_json::from_slice::<serde_json::Value>(&ready_body).expect("json");
assert_eq!(ready_value["status"], "ready");
assert_eq!(ready_value["checks"]["group_outbox_replay"], "ready");
+ let metrics_response = base_relay_ops_router(BaseRelayReadinessState::ready(), metrics)
+ .oneshot(
+ Request::builder()
+ .uri("/metricsz")
+ .body(axum::body::Body::empty())
+ .expect("request"),
+ )
+ .await
+ .expect("metrics");
+
+ assert_eq!(metrics_response.status(), StatusCode::OK);
+ let metrics_body = to_bytes(metrics_response.into_body(), usize::MAX)
+ .await
+ .expect("body");
+ let metrics_value =
+ serde_json::from_slice::<serde_json::Value>(&metrics_body).expect("json");
+ assert_eq!(metrics_value["active_sessions"], 1);
+ assert_eq!(metrics_value["total_sessions"], 1);
+ assert_eq!(metrics_value["client_messages"], 1);
+ assert_eq!(metrics_value["req_messages"], 1);
+ assert_eq!(metrics_value["opened_subscriptions"], 1);
let not_ready = BaseRelayReadinessState::new(
BaseRelayReadinessCheckStatus::Ready,
@@ -182,7 +225,7 @@ mod tests {
BaseRelayReadinessCheckStatus::NotReady,
BaseRelayReadinessCheckStatus::Ready,
);
- let rejected = base_relay_ops_router(not_ready)
+ let rejected = base_relay_ops_router(not_ready, TangleRuntimeMetrics::new())
.oneshot(
Request::builder()
.uri("/readyz")
diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs
@@ -16,6 +16,7 @@ use crate::{
live::LiveSubscriptionSet,
},
};
+use serde::{Deserialize, Serialize};
use std::{
collections::BTreeSet,
fmt,
@@ -360,6 +361,7 @@ impl TangleRuntime {
match self.rate_limiter.record(key, rule, now) {
TangleRateLimitDecision::Allowed { .. } => None,
TangleRateLimitDecision::Rejected { reset_at } => {
+ self.metrics.record_rate_limit_rejection();
logging::log_rate_limit_rejected(label, dimension, reset_at);
Some(RelayMessage::Closed {
subscription_id: subscription_id.clone(),
@@ -383,6 +385,7 @@ impl TangleRuntime {
match self.rate_limiter.record(key, rule, now) {
TangleRateLimitDecision::Allowed { .. } => None,
TangleRateLimitDecision::Rejected { reset_at } => {
+ self.metrics.record_rate_limit_rejection();
logging::log_rate_limit_rejected(label, "event", reset_at);
Some(RelayMessage::Ok {
event_id: event.id().clone(),
@@ -400,15 +403,22 @@ impl TangleRuntime {
#[derive(Clone)]
pub struct TangleRuntimeHandle {
inner: Arc<Mutex<TangleRuntime>>,
+ metrics: TangleRuntimeMetrics,
}
impl TangleRuntimeHandle {
pub fn new(runtime: TangleRuntime) -> Self {
+ let metrics = runtime.metrics().clone();
Self {
inner: Arc::new(Mutex::new(runtime)),
+ metrics,
}
}
+ pub fn metrics(&self) -> TangleRuntimeMetrics {
+ self.metrics.clone()
+ }
+
pub async fn auth_state(&self) -> Result<BaseAuthState, BaseRelayError> {
self.inner.lock().await.auth_state()
}
@@ -435,6 +445,8 @@ impl TangleRuntimeHandle {
query_context: TangleQueryRateLimitContext,
now: UnixTimestamp,
) -> Result<Vec<RelayMessage>, BaseRelayError> {
+ self.metrics
+ .record_client_message(client_message_metric_kind(&message));
let mut runtime = self.inner.lock().await;
match message {
ClientMessage::Event(event) => {
@@ -619,6 +631,16 @@ fn auth_response_failed(replies: &[RelayMessage]) -> bool {
})
}
+fn client_message_metric_kind(message: &ClientMessage) -> TangleClientMessageMetricKind {
+ match message {
+ ClientMessage::Event(_) => TangleClientMessageMetricKind::Event,
+ ClientMessage::Req { .. } => TangleClientMessageMetricKind::Req,
+ ClientMessage::Count { .. } => TangleClientMessageMetricKind::Count,
+ ClientMessage::Auth(_) => TangleClientMessageMetricKind::Auth,
+ ClientMessage::Close(_) => TangleClientMessageMetricKind::Close,
+ }
+}
+
fn filter_group_ids(filters: &[Filter]) -> Vec<GroupId> {
filters
.iter()
@@ -735,7 +757,93 @@ pub struct TangleRuntimeMetrics {
struct TangleRuntimeMetricsInner {
started_at: Instant,
active_sessions: AtomicUsize,
+ total_sessions: AtomicU64,
+ client_messages: AtomicU64,
+ event_messages: AtomicU64,
+ req_messages: AtomicU64,
+ count_messages: AtomicU64,
+ auth_messages: AtomicU64,
+ close_messages: AtomicU64,
+ opened_subscriptions: AtomicU64,
+ closed_subscriptions: AtomicU64,
stored_event_offsets: AtomicU64,
+ rate_limit_rejections: AtomicU64,
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum TangleClientMessageMetricKind {
+ Event,
+ Req,
+ Count,
+ Auth,
+ Close,
+}
+
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct TangleRuntimeMetricsSnapshot {
+ uptime_seconds: u64,
+ active_sessions: usize,
+ total_sessions: u64,
+ client_messages: u64,
+ event_messages: u64,
+ req_messages: u64,
+ count_messages: u64,
+ auth_messages: u64,
+ close_messages: u64,
+ opened_subscriptions: u64,
+ closed_subscriptions: u64,
+ stored_event_offsets: u64,
+ rate_limit_rejections: u64,
+}
+
+impl TangleRuntimeMetricsSnapshot {
+ pub fn active_sessions(&self) -> usize {
+ self.active_sessions
+ }
+
+ pub fn total_sessions(&self) -> u64 {
+ self.total_sessions
+ }
+
+ pub fn client_messages(&self) -> u64 {
+ self.client_messages
+ }
+
+ pub fn event_messages(&self) -> u64 {
+ self.event_messages
+ }
+
+ pub fn req_messages(&self) -> u64 {
+ self.req_messages
+ }
+
+ pub fn count_messages(&self) -> u64 {
+ self.count_messages
+ }
+
+ pub fn auth_messages(&self) -> u64 {
+ self.auth_messages
+ }
+
+ pub fn close_messages(&self) -> u64 {
+ self.close_messages
+ }
+
+ pub fn opened_subscriptions(&self) -> u64 {
+ self.opened_subscriptions
+ }
+
+ pub fn closed_subscriptions(&self) -> u64 {
+ self.closed_subscriptions
+ }
+
+ pub fn stored_event_offsets(&self) -> u64 {
+ self.stored_event_offsets
+ }
+
+ pub fn rate_limit_rejections(&self) -> u64 {
+ self.rate_limit_rejections
+ }
}
impl TangleRuntimeMetrics {
@@ -744,11 +852,39 @@ impl TangleRuntimeMetrics {
inner: Arc::new(TangleRuntimeMetricsInner {
started_at: Instant::now(),
active_sessions: AtomicUsize::new(0),
+ total_sessions: AtomicU64::new(0),
+ client_messages: AtomicU64::new(0),
+ event_messages: AtomicU64::new(0),
+ req_messages: AtomicU64::new(0),
+ count_messages: AtomicU64::new(0),
+ auth_messages: AtomicU64::new(0),
+ close_messages: AtomicU64::new(0),
+ opened_subscriptions: AtomicU64::new(0),
+ closed_subscriptions: AtomicU64::new(0),
stored_event_offsets: AtomicU64::new(0),
+ rate_limit_rejections: AtomicU64::new(0),
}),
}
}
+ pub fn snapshot(&self) -> TangleRuntimeMetricsSnapshot {
+ TangleRuntimeMetricsSnapshot {
+ uptime_seconds: self.started_at().elapsed().as_secs(),
+ active_sessions: self.active_sessions(),
+ total_sessions: self.total_sessions(),
+ client_messages: self.client_messages(),
+ event_messages: self.event_messages(),
+ req_messages: self.req_messages(),
+ count_messages: self.count_messages(),
+ auth_messages: self.auth_messages(),
+ close_messages: self.close_messages(),
+ opened_subscriptions: self.opened_subscriptions(),
+ closed_subscriptions: self.closed_subscriptions(),
+ stored_event_offsets: self.stored_event_offsets(),
+ rate_limit_rejections: self.rate_limit_rejections(),
+ }
+ }
+
pub fn started_at(&self) -> Instant {
self.inner.started_at
}
@@ -757,24 +893,122 @@ impl TangleRuntimeMetrics {
self.inner.active_sessions.load(Ordering::Relaxed)
}
- pub fn increment_active_sessions(&self) -> usize {
- self.inner.active_sessions.fetch_add(1, Ordering::Relaxed) + 1
+ pub fn total_sessions(&self) -> u64 {
+ self.inner.total_sessions.load(Ordering::Relaxed)
}
- pub fn decrement_active_sessions(&self) -> usize {
- self.inner.active_sessions.fetch_sub(1, Ordering::Relaxed) - 1
+ pub fn client_messages(&self) -> u64 {
+ self.inner.client_messages.load(Ordering::Relaxed)
+ }
+
+ pub fn event_messages(&self) -> u64 {
+ self.inner.event_messages.load(Ordering::Relaxed)
+ }
+
+ pub fn req_messages(&self) -> u64 {
+ self.inner.req_messages.load(Ordering::Relaxed)
+ }
+
+ pub fn count_messages(&self) -> u64 {
+ self.inner.count_messages.load(Ordering::Relaxed)
+ }
+
+ pub fn auth_messages(&self) -> u64 {
+ self.inner.auth_messages.load(Ordering::Relaxed)
+ }
+
+ pub fn close_messages(&self) -> u64 {
+ self.inner.close_messages.load(Ordering::Relaxed)
+ }
+
+ pub fn opened_subscriptions(&self) -> u64 {
+ self.inner.opened_subscriptions.load(Ordering::Relaxed)
+ }
+
+ pub fn closed_subscriptions(&self) -> u64 {
+ self.inner.closed_subscriptions.load(Ordering::Relaxed)
}
pub fn stored_event_offsets(&self) -> u64 {
self.inner.stored_event_offsets.load(Ordering::Relaxed)
}
+ pub fn rate_limit_rejections(&self) -> u64 {
+ self.inner.rate_limit_rejections.load(Ordering::Relaxed)
+ }
+
+ pub fn record_session_opened(&self) -> usize {
+ self.inner.total_sessions.fetch_add(1, Ordering::Relaxed);
+ self.inner.active_sessions.fetch_add(1, Ordering::Relaxed) + 1
+ }
+
+ pub fn record_session_closed(&self) -> usize {
+ let mut current = self.inner.active_sessions.load(Ordering::Relaxed);
+ loop {
+ if current == 0 {
+ return 0;
+ }
+ match self.inner.active_sessions.compare_exchange(
+ current,
+ current - 1,
+ Ordering::Relaxed,
+ Ordering::Relaxed,
+ ) {
+ Ok(_) => return current - 1,
+ Err(actual) => current = actual,
+ }
+ }
+ }
+
+ pub fn record_client_message(&self, kind: TangleClientMessageMetricKind) -> u64 {
+ let total = self.inner.client_messages.fetch_add(1, Ordering::Relaxed) + 1;
+ match kind {
+ TangleClientMessageMetricKind::Event => {
+ self.inner.event_messages.fetch_add(1, Ordering::Relaxed);
+ }
+ TangleClientMessageMetricKind::Req => {
+ self.inner.req_messages.fetch_add(1, Ordering::Relaxed);
+ }
+ TangleClientMessageMetricKind::Count => {
+ self.inner.count_messages.fetch_add(1, Ordering::Relaxed);
+ }
+ TangleClientMessageMetricKind::Auth => {
+ self.inner.auth_messages.fetch_add(1, Ordering::Relaxed);
+ }
+ TangleClientMessageMetricKind::Close => {
+ self.inner.close_messages.fetch_add(1, Ordering::Relaxed);
+ }
+ };
+ total
+ }
+
+ pub fn record_subscription_opened(&self) -> u64 {
+ self.inner
+ .opened_subscriptions
+ .fetch_add(1, Ordering::Relaxed)
+ + 1
+ }
+
+ pub fn record_subscriptions_closed(&self, count: usize) -> u64 {
+ self.inner.closed_subscriptions.fetch_add(
+ u64::try_from(count).expect("subscription count fits in u64"),
+ Ordering::Relaxed,
+ ) + u64::try_from(count).expect("subscription count fits in u64")
+ }
+
pub fn record_stored_event_offset(&self) -> u64 {
self.inner
.stored_event_offsets
.fetch_add(1, Ordering::Relaxed)
+ 1
}
+
+ pub fn record_rate_limit_rejection(&self) -> u64 {
+ self.inner
+ .rate_limit_rejections
+ .fetch_add(1, Ordering::Relaxed)
+ + 1
+ }
}
impl Default for TangleRuntimeMetrics {
@@ -879,12 +1113,37 @@ mod tests {
Path::new(&root).join("pocket")
);
- assert_eq!(runtime.metrics().increment_active_sessions(), 1);
+ assert_eq!(runtime.metrics().record_session_opened(), 1);
assert_eq!(runtime.metrics().active_sessions(), 1);
- assert_eq!(runtime.metrics().decrement_active_sessions(), 0);
+ assert_eq!(runtime.metrics().total_sessions(), 1);
+ assert_eq!(runtime.metrics().record_session_closed(), 0);
assert_eq!(runtime.metrics().active_sessions(), 0);
+ assert_eq!(runtime.metrics().total_sessions(), 1);
+ assert_eq!(
+ runtime
+ .metrics()
+ .record_client_message(super::TangleClientMessageMetricKind::Req),
+ 1
+ );
+ assert_eq!(runtime.metrics().client_messages(), 1);
+ assert_eq!(runtime.metrics().req_messages(), 1);
+ assert_eq!(runtime.metrics().record_subscription_opened(), 1);
+ assert_eq!(runtime.metrics().opened_subscriptions(), 1);
+ assert_eq!(runtime.metrics().record_subscriptions_closed(1), 1);
+ assert_eq!(runtime.metrics().closed_subscriptions(), 1);
assert_eq!(runtime.metrics().record_stored_event_offset(), 1);
assert_eq!(runtime.metrics().stored_event_offsets(), 1);
+ assert_eq!(runtime.metrics().record_rate_limit_rejection(), 1);
+ assert_eq!(runtime.metrics().rate_limit_rejections(), 1);
+ let snapshot = runtime.metrics().snapshot();
+ assert_eq!(snapshot.active_sessions(), 0);
+ assert_eq!(snapshot.total_sessions(), 1);
+ assert_eq!(snapshot.client_messages(), 1);
+ assert_eq!(snapshot.req_messages(), 1);
+ assert_eq!(snapshot.opened_subscriptions(), 1);
+ assert_eq!(snapshot.closed_subscriptions(), 1);
+ assert_eq!(snapshot.stored_event_offsets(), 1);
+ assert_eq!(snapshot.rate_limit_rejections(), 1);
let report = runtime.shutdown().expect("shutdown");
@@ -971,6 +1230,10 @@ mod tests {
offsets.try_recv().expect_err("no duplicate offset"),
TangleEventReceiveError::Empty
);
+ let snapshot = handle.metrics().snapshot();
+ assert_eq!(snapshot.client_messages(), 2);
+ assert_eq!(snapshot.event_messages(), 2);
+ assert_eq!(snapshot.stored_event_offsets(), 1);
let _ = std::fs::remove_dir_all(root);
}
diff --git a/crates/tangle_runtime/src/server.rs b/crates/tangle_runtime/src/server.rs
@@ -5,7 +5,10 @@ use crate::{
logging,
nip11::{BaseRelayInfoConfig, BaseRelayInfoDocument, base_relay_info_response},
ops::{BaseRelayReadinessState, base_relay_ops_router},
- runtime::{TangleRuntime, TangleRuntimeHandle, TangleRuntimeLimits, TangleShutdownSignal},
+ runtime::{
+ TangleRuntime, TangleRuntimeHandle, TangleRuntimeLimits, TangleRuntimeMetrics,
+ TangleShutdownSignal,
+ },
session::TangleWebSocketSession,
};
use axum::{
@@ -65,12 +68,14 @@ pub async fn serve_listener_until_shutdown(
BaseRelayInfoConfig::new("tangle", runtime.config().groups().clone())?.build_document()?;
let readiness = runtime.readiness_state().clone();
let limits = runtime.limits();
+ let metrics = runtime.metrics().clone();
let shutdown_signal = runtime.shutdown_signal().clone();
let runtime = TangleRuntimeHandle::new(runtime);
let router = tangle_http_router(
readiness,
info,
limits,
+ metrics,
shutdown_signal.clone(),
runtime.clone(),
);
@@ -104,6 +109,7 @@ pub fn tangle_http_router(
readiness: BaseRelayReadinessState,
info: BaseRelayInfoDocument,
limits: TangleRuntimeLimits,
+ metrics: TangleRuntimeMetrics,
shutdown: TangleShutdownSignal,
runtime: TangleRuntimeHandle,
) -> Router {
@@ -115,7 +121,7 @@ pub fn tangle_http_router(
shutdown,
runtime,
})
- .merge(base_relay_ops_router(readiness))
+ .merge(base_relay_ops_router(readiness, metrics))
}
#[derive(Debug, Clone)]
@@ -392,10 +398,12 @@ mod tests {
.expect("info");
let runtime = TangleRuntime::open(config).expect("runtime");
let limits = runtime.limits();
+ let metrics = runtime.metrics().clone();
let router = tangle_http_router(
BaseRelayReadinessState::ready(),
info,
limits,
+ metrics,
TangleShutdownSignal::new(),
TangleRuntimeHandle::new(runtime),
);
@@ -433,6 +441,7 @@ mod tests {
.await
.expect("health");
let ready = router
+ .clone()
.oneshot(
Request::builder()
.uri("/readyz")
@@ -441,6 +450,15 @@ mod tests {
)
.await
.expect("ready");
+ let metrics = router
+ .oneshot(
+ Request::builder()
+ .uri("/metricsz")
+ .body(axum::body::Body::empty())
+ .expect("request"),
+ )
+ .await
+ .expect("metrics");
assert_eq!(nip11.status(), http::StatusCode::OK);
assert_eq!(
@@ -459,6 +477,14 @@ mod tests {
assert_eq!(root_without_accept.status(), http::StatusCode::NOT_FOUND);
assert_eq!(health.status(), http::StatusCode::OK);
assert_eq!(ready.status(), http::StatusCode::OK);
+ assert_eq!(metrics.status(), http::StatusCode::OK);
+ let metrics_body = to_bytes(metrics.into_body(), usize::MAX)
+ .await
+ .expect("body");
+ let metrics_value =
+ serde_json::from_slice::<serde_json::Value>(&metrics_body).expect("json");
+ assert_eq!(metrics_value["active_sessions"], 0);
+ assert_eq!(metrics_value["stored_event_offsets"], 0);
let root_body = to_bytes(root_without_accept.into_body(), usize::MAX)
.await
.expect("body");
diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs
@@ -6,9 +6,12 @@ use crate::{
logging,
relay::{
auth::{BaseAuthState, generate_auth_challenge},
- live::LiveSubscriptionSet,
+ live::{CloseResult, LiveSubscriptionSet},
+ },
+ runtime::{
+ TangleClientMessageMetricKind, TangleQueryRateLimitContext, TangleRuntimeHandle,
+ TangleRuntimeLimits,
},
- runtime::{TangleQueryRateLimitContext, TangleRuntimeHandle, TangleRuntimeLimits},
};
use axum::extract::ws::{CloseFrame, Message, Utf8Bytes, WebSocket};
use std::{
@@ -100,9 +103,13 @@ impl TangleWebSocketSession {
}
pub async fn run(mut self, mut socket: WebSocket) {
+ let metrics = self.runtime.metrics();
+ metrics.record_session_opened();
logging::log_websocket_session_opened(self.connection_id, self.peer_ip);
if !self.issue_auth_challenge() {
let closed_subscriptions = self.subscriptions.close_all();
+ metrics.record_subscriptions_closed(closed_subscriptions);
+ metrics.record_session_closed();
logging::log_websocket_session_closed(
self.connection_id,
self.peer_ip,
@@ -153,6 +160,8 @@ impl TangleWebSocketSession {
}
}
let closed_subscriptions = self.subscriptions.close_all();
+ metrics.record_subscriptions_closed(closed_subscriptions);
+ metrics.record_session_closed();
logging::log_websocket_session_closed(
self.connection_id,
self.peer_ip,
@@ -271,10 +280,14 @@ impl TangleWebSocketSession {
.await
}
ClientMessage::Close(subscription_id) => {
+ let metrics = self.runtime.metrics();
+ metrics.record_client_message(TangleClientMessageMetricKind::Close);
self.limits
.base_relay_limits()
.validate_subscription_id(&subscription_id)?;
- self.subscriptions.close(&subscription_id);
+ if self.subscriptions.close(&subscription_id) == CloseResult::Closed {
+ metrics.record_subscriptions_closed(1);
+ }
Ok(Vec::new())
}
message => {
@@ -290,6 +303,8 @@ impl TangleWebSocketSession {
subscription_id: SubscriptionId,
filters: Vec<Filter>,
) -> Result<Vec<RelayMessage>, BaseRelayError> {
+ let metrics = self.runtime.metrics();
+ metrics.record_client_message(TangleClientMessageMetricKind::Req);
self.limits
.base_relay_limits()
.validate_subscription_id(&subscription_id)?;
@@ -312,6 +327,7 @@ impl TangleWebSocketSession {
filters.clone(),
GroupAuthContext::new(self.auth.authenticated_pubkeys().iter().cloned()),
)?;
+ metrics.record_subscription_opened();
logging::log_subscription_opened(self.connection_id, &subscription_id);
match self
.runtime
@@ -483,6 +499,7 @@ mod tests {
let _ = std::fs::remove_dir_all(&root);
let runtime =
TangleRuntimeHandle::new(TangleRuntime::open(runtime_config(&root)).expect("runtime"));
+ let metrics = runtime.metrics();
let auth_a = runtime.auth_state().await.expect("auth a");
let auth_b = runtime.auth_state().await.expect("auth b");
let events_a = runtime.subscribe_events().await;
@@ -498,7 +515,7 @@ mod tests {
let mut second = TangleWebSocketSession::new(
session_limits(8),
shutdown.subscribe(),
- runtime,
+ runtime.clone(),
auth_b,
events_b,
)
@@ -550,6 +567,12 @@ mod tests {
Vec::<RelayMessage>::new()
);
assert_eq!(second.active_subscription_count(), 0);
+ let snapshot = metrics.snapshot();
+ assert_eq!(snapshot.client_messages(), 5);
+ assert_eq!(snapshot.req_messages(), 3);
+ assert_eq!(snapshot.close_messages(), 2);
+ assert_eq!(snapshot.opened_subscriptions(), 3);
+ assert_eq!(snapshot.closed_subscriptions(), 2);
let _ = std::fs::remove_dir_all(root);
}
@@ -599,6 +622,11 @@ mod tests {
}]
);
assert_eq!(session.active_subscription_count(), 0);
+ let snapshot = runtime.metrics().snapshot();
+ assert_eq!(snapshot.client_messages(), 1);
+ assert_eq!(snapshot.req_messages(), 1);
+ assert_eq!(snapshot.opened_subscriptions(), 0);
+ assert_eq!(snapshot.rate_limit_rejections(), 1);
let _ = std::fs::remove_dir_all(root);
}
diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs
@@ -46,10 +46,13 @@ async fn tangle_run_serves_until_shutdown() {
let health = wait_for_http_ok(address, "/healthz", None).await;
let ready = wait_for_http_ok(address, "/readyz", None).await;
+ let metrics = wait_for_http_ok(address, "/metricsz", None).await;
let nip11 = wait_for_http_ok(address, "/", Some("application/nostr+json")).await;
assert!(health.contains(r#""status":"ok""#));
assert!(ready.contains(r#""status":"ready""#));
+ assert!(metrics.contains(r#""active_sessions":0"#));
+ assert!(metrics.contains(r#""stored_event_offsets":0"#));
assert!(nip11.contains(r#""name":"tangle""#));
assert!(
nip11