commit 32b02ea45824890518f482f77e6b87e4d39bd984
parent c7bf59b3aac5beb37223823aa5bdae6f33b0ccad
Author: triesap <tyson@radroots.org>
Date: Sun, 14 Jun 2026 13:15:14 -0700
metrics: expose ops contract keys
- serialize stable tangle-prefixed metrics from /metricsz
- record auth event query event-bus and outbox counters
- include live readiness disk usage and latency fields
- prove metric keys through runtime ops and server tests
Diffstat:
7 files changed, 768 insertions(+), 111 deletions(-)
diff --git a/crates/tangle_runtime/src/groups.rs b/crates/tangle_runtime/src/groups.rs
@@ -79,6 +79,10 @@ impl GroupService {
self.limits
}
+ pub(crate) fn outbox_pending_events(&self) -> usize {
+ self.outbox.replay_plan().records().len()
+ }
+
pub(crate) fn check_event(
&self,
store: &PocketStoreHandle,
diff --git a/crates/tangle_runtime/src/ops.rs b/crates/tangle_runtime/src/ops.rs
@@ -209,7 +209,8 @@ async fn base_relay_readyz(
async fn base_relay_metricsz(
State(state): State<BaseRelayOpsState>,
) -> Json<TangleRuntimeMetricsSnapshot> {
- Json(state.metrics.snapshot())
+ let readiness = state.readiness.snapshot();
+ Json(state.metrics.snapshot_with_readiness(readiness.is_ready()))
}
#[cfg(test)]
@@ -247,6 +248,20 @@ mod tests {
metrics.record_session_opened();
metrics.record_client_message(crate::runtime::TangleClientMessageMetricKind::Req);
metrics.record_subscription_opened();
+ metrics.record_auth_success();
+ metrics.record_auth_failure();
+ metrics.record_event_admission();
+ metrics.record_event_rejection();
+ metrics.record_group_read_denial();
+ metrics.record_group_write_denial();
+ metrics.record_event_bus_receivers(2);
+ metrics.record_event_bus_publish(2);
+ metrics.record_event_bus_lagged(3);
+ metrics.record_outbox_pending_events(5);
+ metrics.record_outbox_replayed_event();
+ metrics.record_disk_used_bytes(89);
+ metrics.record_event_admission_latency(11);
+ metrics.record_query_latency(17);
let ready = base_relay_ops_router(readiness.clone(), metrics.clone())
.oneshot(
Request::builder()
@@ -280,11 +295,80 @@ mod tests {
.expect("body");
let metrics_value =
serde_json::from_slice::<serde_json::Value>(&metrics_body).expect("json");
- assert_eq!(metrics_value["active_sessions"], 1);
- assert_eq!(metrics_value["total_sessions"], 1);
- assert_eq!(metrics_value["client_messages"], 1);
- assert_eq!(metrics_value["req_messages"], 1);
- assert_eq!(metrics_value["opened_subscriptions"], 1);
+ let keys = metrics_value
+ .as_object()
+ .expect("metrics object")
+ .keys()
+ .cloned()
+ .collect::<std::collections::BTreeSet<_>>();
+ assert_eq!(
+ keys,
+ [
+ "tangle_auth_failure_total",
+ "tangle_auth_messages_total",
+ "tangle_auth_success_total",
+ "tangle_client_messages_total",
+ "tangle_close_messages_total",
+ "tangle_count_messages_total",
+ "tangle_disk_used_bytes",
+ "tangle_event_admission_latency_count",
+ "tangle_event_admission_latency_total_micros",
+ "tangle_event_admitted_total",
+ "tangle_event_bus_lagged_offsets_total",
+ "tangle_event_bus_lagged_receivers_total",
+ "tangle_event_bus_published_offsets_total",
+ "tangle_event_bus_receivers_current",
+ "tangle_event_messages_total",
+ "tangle_event_rejected_total",
+ "tangle_group_read_denied_total",
+ "tangle_group_write_denied_total",
+ "tangle_outbox_pending_events",
+ "tangle_outbox_replayed_events_total",
+ "tangle_query_latency_count",
+ "tangle_query_latency_total_micros",
+ "tangle_rate_limit_rejections_total",
+ "tangle_readiness_ready",
+ "tangle_req_messages_total",
+ "tangle_runtime_uptime_seconds",
+ "tangle_stored_event_offsets_total",
+ "tangle_subscriptions_closed_total",
+ "tangle_subscriptions_opened_total",
+ "tangle_ws_connections_current",
+ "tangle_ws_connections_total",
+ ]
+ .into_iter()
+ .map(str::to_owned)
+ .collect::<std::collections::BTreeSet<_>>()
+ );
+ assert_eq!(metrics_value["tangle_readiness_ready"], true);
+ assert_eq!(metrics_value["tangle_ws_connections_current"], 1);
+ assert_eq!(metrics_value["tangle_ws_connections_total"], 1);
+ assert_eq!(metrics_value["tangle_client_messages_total"], 1);
+ assert_eq!(metrics_value["tangle_req_messages_total"], 1);
+ assert_eq!(metrics_value["tangle_subscriptions_opened_total"], 1);
+ assert_eq!(metrics_value["tangle_auth_success_total"], 1);
+ assert_eq!(metrics_value["tangle_auth_failure_total"], 1);
+ assert_eq!(metrics_value["tangle_event_admitted_total"], 1);
+ assert_eq!(metrics_value["tangle_event_rejected_total"], 1);
+ assert_eq!(metrics_value["tangle_group_read_denied_total"], 1);
+ assert_eq!(metrics_value["tangle_group_write_denied_total"], 1);
+ assert_eq!(metrics_value["tangle_event_bus_receivers_current"], 2);
+ assert_eq!(metrics_value["tangle_event_bus_published_offsets_total"], 1);
+ assert_eq!(metrics_value["tangle_event_bus_lagged_receivers_total"], 1);
+ assert_eq!(metrics_value["tangle_event_bus_lagged_offsets_total"], 3);
+ assert_eq!(metrics_value["tangle_outbox_pending_events"], 5);
+ assert_eq!(metrics_value["tangle_outbox_replayed_events_total"], 1);
+ assert_eq!(metrics_value["tangle_disk_used_bytes"], 89);
+ assert_eq!(
+ metrics_value["tangle_event_admission_latency_total_micros"],
+ 11
+ );
+ assert_eq!(metrics_value["tangle_event_admission_latency_count"], 1);
+ assert_eq!(metrics_value["tangle_query_latency_total_micros"], 17);
+ assert_eq!(metrics_value["tangle_query_latency_count"], 1);
+ let metrics_text = String::from_utf8(metrics_body.to_vec()).expect("utf8");
+ assert!(!metrics_text.contains("relay_secret"));
+ assert!(!metrics_text.contains("invite"));
let not_ready = BaseRelayReadinessState::new(
BaseRelayReadinessCheckStatus::Ready,
diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs
@@ -55,6 +55,82 @@ impl BaseRelayEventWrite {
}
}
+#[derive(Debug, Clone, PartialEq)]
+pub(crate) struct BaseRelayQueryReport {
+ messages: Vec<RelayMessage>,
+ group_read_denied: bool,
+}
+
+impl BaseRelayQueryReport {
+ fn new(messages: Vec<RelayMessage>, group_read_denied: bool) -> Self {
+ Self {
+ messages,
+ group_read_denied,
+ }
+ }
+
+ pub(crate) fn group_read_denied(&self) -> bool {
+ self.group_read_denied
+ }
+
+ pub(crate) fn into_messages(self) -> Vec<RelayMessage> {
+ self.messages
+ }
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub(crate) struct BaseRelayCountReport {
+ message: RelayMessage,
+ group_read_denied: bool,
+}
+
+impl BaseRelayCountReport {
+ fn new(message: RelayMessage, group_read_denied: bool) -> Self {
+ Self {
+ message,
+ group_read_denied,
+ }
+ }
+
+ pub(crate) fn group_read_denied(&self) -> bool {
+ self.group_read_denied
+ }
+
+ pub(crate) fn into_message(self) -> RelayMessage {
+ self.message
+ }
+}
+
+#[derive(Debug, Clone, PartialEq)]
+struct BaseRelayEventQueryReport {
+ events: Vec<Event>,
+ group_read_denied: bool,
+}
+
+impl BaseRelayEventQueryReport {
+ fn new(events: Vec<Event>, group_read_denied: bool) -> Self {
+ Self {
+ events,
+ group_read_denied,
+ }
+ }
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+struct BaseRelayCountEventsReport {
+ count: u64,
+ group_read_denied: bool,
+}
+
+impl BaseRelayCountEventsReport {
+ fn new(count: u64, group_read_denied: bool) -> Self {
+ Self {
+ count,
+ group_read_denied,
+ }
+ }
+}
+
fn is_nip70_protected_event(event: &Event) -> bool {
event
.unsigned()
@@ -399,13 +475,13 @@ impl BaseRelay {
}
}
- pub(crate) fn query_req_with_auth(
+ pub(crate) fn query_req_with_auth_report(
&self,
subscription_id: SubscriptionId,
filters: Vec<Filter>,
auth: &BaseAuthState,
- ) -> Result<Vec<RelayMessage>, BaseRelayError> {
- self.query_req_with_group_auth(
+ ) -> Result<BaseRelayQueryReport, BaseRelayError> {
+ self.query_req_with_group_auth_report(
subscription_id,
filters,
&GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()),
@@ -509,6 +585,13 @@ impl BaseRelay {
self.groups.as_ref().map(|groups| groups.projection())
}
+ pub(crate) fn group_outbox_pending_events(&self) -> usize {
+ self.groups
+ .as_ref()
+ .map(GroupService::outbox_pending_events)
+ .unwrap_or(0)
+ }
+
pub fn readiness_state(&self) -> BaseRelayReadinessState {
self.readiness.clone()
}
@@ -641,23 +724,35 @@ impl BaseRelay {
filters: Vec<Filter>,
auth: &GroupAuthContext,
) -> Result<Vec<RelayMessage>, BaseRelayError> {
+ self.handle_req_with_group_auth_report(subscription_id, filters, auth)
+ .map(BaseRelayQueryReport::into_messages)
+ }
+
+ fn handle_req_with_group_auth_report(
+ &mut self,
+ subscription_id: SubscriptionId,
+ filters: Vec<Filter>,
+ auth: &GroupAuthContext,
+ ) -> Result<BaseRelayQueryReport, BaseRelayError> {
self.limits.validate_subscription_id(&subscription_id)?;
self.limits.validate_filters(&filters)?;
self.subscriptions
.subscribe(subscription_id.clone(), filters.clone(), auth.clone())?;
- self.query_req_with_group_auth(subscription_id, filters, auth)
+ self.query_req_with_group_auth_report(subscription_id, filters, auth)
}
- fn query_req_with_group_auth(
+ fn query_req_with_group_auth_report(
&self,
subscription_id: SubscriptionId,
filters: Vec<Filter>,
auth: &GroupAuthContext,
- ) -> Result<Vec<RelayMessage>, BaseRelayError> {
+ ) -> Result<BaseRelayQueryReport, BaseRelayError> {
self.limits.validate_subscription_id(&subscription_id)?;
self.limits.validate_filters(&filters)?;
- let mut messages = self
- .query_events(&filters, auth)?
+ let report = self.query_events_report(&filters, auth)?;
+ let group_read_denied = report.group_read_denied;
+ let mut messages = report
+ .events
.into_iter()
.map(|event| RelayMessage::Event {
subscription_id: subscription_id.clone(),
@@ -665,7 +760,7 @@ impl BaseRelay {
})
.collect::<Vec<_>>();
messages.push(RelayMessage::Eose(subscription_id));
- Ok(messages)
+ Ok(BaseRelayQueryReport::new(messages, group_read_denied))
}
pub fn handle_count(
@@ -686,7 +781,17 @@ impl BaseRelay {
filters: Vec<Filter>,
auth: &BaseAuthState,
) -> Result<RelayMessage, BaseRelayError> {
- self.handle_count_with_group_auth(
+ self.handle_count_with_auth_report(subscription_id, filters, auth)
+ .map(BaseRelayCountReport::into_message)
+ }
+
+ pub(crate) fn handle_count_with_auth_report(
+ &self,
+ subscription_id: SubscriptionId,
+ filters: Vec<Filter>,
+ auth: &BaseAuthState,
+ ) -> Result<BaseRelayCountReport, BaseRelayError> {
+ self.handle_count_with_group_auth_report(
subscription_id,
filters,
&GroupAuthContext::new(auth.authenticated_pubkeys().iter().cloned()),
@@ -699,12 +804,26 @@ impl BaseRelay {
filters: Vec<Filter>,
auth: &GroupAuthContext,
) -> Result<RelayMessage, BaseRelayError> {
+ self.handle_count_with_group_auth_report(subscription_id, filters, auth)
+ .map(BaseRelayCountReport::into_message)
+ }
+
+ fn handle_count_with_group_auth_report(
+ &self,
+ subscription_id: SubscriptionId,
+ filters: Vec<Filter>,
+ auth: &GroupAuthContext,
+ ) -> Result<BaseRelayCountReport, BaseRelayError> {
self.limits.validate_subscription_id(&subscription_id)?;
self.limits.validate_filters(&filters)?;
- Ok(RelayMessage::Count {
- subscription_id,
- count: self.count_events(&filters, auth)?,
- })
+ let report = self.count_events_report(&filters, auth)?;
+ Ok(BaseRelayCountReport::new(
+ RelayMessage::Count {
+ subscription_id,
+ count: report.count,
+ },
+ report.group_read_denied,
+ ))
}
pub fn handle_close(&mut self, subscription_id: &SubscriptionId) -> CloseResult {
@@ -731,36 +850,55 @@ impl BaseRelay {
filters: &[Filter],
auth: &GroupAuthContext,
) -> Result<Vec<Event>, BaseRelayError> {
+ self.query_events_report(filters, auth)
+ .map(|report| report.events)
+ }
+
+ fn query_events_report(
+ &self,
+ filters: &[Filter],
+ auth: &GroupAuthContext,
+ ) -> Result<BaseRelayEventQueryReport, BaseRelayError> {
let mut output = Vec::new();
+ let mut group_read_denied = false;
for filter in filters {
- let mut events =
- Self::sort_and_dedupe_query_events(self.query_filter_events(filter, auth)?);
+ let report = self.query_filter_events_report(filter, auth)?;
+ group_read_denied |= report.group_read_denied;
+ let mut events = Self::sort_and_dedupe_query_events(report.events);
events.truncate(self.limits.effective_filter_limit(filter));
output.extend(events);
}
- Ok(Self::sort_and_dedupe_query_events(output))
+ Ok(BaseRelayEventQueryReport::new(
+ Self::sort_and_dedupe_query_events(output),
+ group_read_denied,
+ ))
}
- fn count_events(
+ fn count_events_report(
&self,
filters: &[Filter],
auth: &GroupAuthContext,
- ) -> Result<u64, BaseRelayError> {
+ ) -> Result<BaseRelayCountEventsReport, BaseRelayError> {
let mut seen = BTreeSet::new();
+ let mut group_read_denied = false;
for filter in filters {
let filter = filter.without_limit();
- for event in self.query_filter_events(&filter, auth)? {
+ let report = self.query_filter_events_report(&filter, auth)?;
+ group_read_denied |= report.group_read_denied;
+ for event in report.events {
seen.insert(event.id().clone());
}
}
- u64::try_from(seen.len()).map_err(|_| BaseRelayError::error("visible event count overflow"))
+ let count = u64::try_from(seen.len())
+ .map_err(|_| BaseRelayError::error("visible event count overflow"))?;
+ Ok(BaseRelayCountEventsReport::new(count, group_read_denied))
}
- fn query_filter_events(
+ fn query_filter_events_report(
&self,
filter: &Filter,
auth: &GroupAuthContext,
- ) -> Result<Vec<Event>, BaseRelayError> {
+ ) -> Result<BaseRelayEventQueryReport, BaseRelayError> {
let pocket_filter = tangle_filter_to_pocket(filter)?;
let screen_error = RefCell::new(None);
let screened = self.store.find_events_with_screen(
@@ -796,11 +934,13 @@ impl BaseRelay {
if let Some(error) = screen_error.into_inner() {
return Err(error);
}
- screened
+ let group_read_denied = screened.redacted();
+ let events = screened
.into_events()
.into_iter()
.map(|pocket_event| pocket_event_to_tangle(&pocket_event))
- .collect()
+ .collect::<Result<Vec<_>, _>>()?;
+ Ok(BaseRelayEventQueryReport::new(events, group_read_denied))
}
fn sort_and_dedupe_query_events(mut events: Vec<Event>) -> Vec<Event> {
diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs
@@ -19,8 +19,9 @@ use crate::{
use serde::{Deserialize, Serialize};
use std::{
collections::BTreeSet,
- fmt,
+ fmt, fs,
net::IpAddr,
+ path::Path,
sync::{
Arc,
atomic::{AtomicU64, AtomicUsize, Ordering},
@@ -28,7 +29,8 @@ use std::{
time::Instant,
};
use tangle_groups::{
- GroupId, KIND_GROUP_JOIN_REQUEST, StoreOffset, validate_client_group_event_structure,
+ GroupEventClass, GroupId, KIND_GROUP_JOIN_REQUEST, StoreOffset,
+ validate_client_group_event_structure,
};
use tangle_protocol::{
ClientMessage, Event, Filter, Kind, RelayMessage, SubscriptionId, UnixTimestamp,
@@ -77,15 +79,22 @@ impl TangleRuntime {
let limits = TangleRuntimeLimits::from_config(&config)?;
let relay = config.open_relay()?;
let readiness = BaseRelayReadinessHandle::new(relay.readiness_state());
+ let event_bus = TangleEventBus::new(limits.event_bus_capacity())?;
let rate_limiter = TangleRateLimiter::new();
+ let metrics = TangleRuntimeMetrics::new();
+ metrics.record_disk_used_bytes(directory_size_bytes(
+ config.pocket_config().data_directory(),
+ ));
+ metrics.record_event_bus_receivers(event_bus.receiver_count());
+ metrics.record_outbox_pending_events(relay.group_outbox_pending_events());
logging::log_runtime_opened(&config);
Ok(Self {
config,
relay,
readiness,
- event_bus: TangleEventBus::new(limits.event_bus_capacity())?,
+ event_bus,
rate_limiter,
- metrics: TangleRuntimeMetrics::new(),
+ metrics,
limits,
shutdown: TangleShutdownSignal::new(),
})
@@ -237,6 +246,12 @@ impl TangleRuntime {
)
}
+ fn is_group_event(&self, event: &Event) -> bool {
+ self.config.groups().enabled()
+ && validate_client_group_event_structure(event, self.config.groups().limits())
+ .is_ok_and(|class| !matches!(class, GroupEventClass::NonGroup))
+ }
+
fn rate_limit_req(
&self,
subscription_id: &SubscriptionId,
@@ -454,19 +469,32 @@ impl TangleRuntimeHandle {
let mut runtime = self.inner.lock().await;
match message {
ClientMessage::Event(event) => {
+ let started_at = Instant::now();
let event_id = event.id().clone();
+ let is_group_event = runtime.is_group_event(&event);
if let Some(message) = runtime.rate_limit_event(&event, now) {
+ record_event_metrics(runtime.metrics(), &message, is_group_event, started_at);
return Ok(vec![message]);
}
if let Some(message) = runtime.rate_limit_group_write(&event, now) {
+ record_event_metrics(runtime.metrics(), &message, is_group_event, started_at);
return Ok(vec![message]);
}
let result = runtime
.relay_mut()
.handle_event_with_auth_report(event, auth)?;
+ if is_group_event {
+ for _ in 0..result.stored_offsets().len().saturating_sub(1) {
+ runtime.metrics().record_outbox_replayed_event();
+ }
+ runtime.metrics().record_outbox_pending_events(
+ runtime.relay().group_outbox_pending_events(),
+ );
+ }
for offset in result.stored_offsets() {
runtime.metrics().record_stored_event_offset();
- runtime.event_bus().publish(*offset);
+ let receivers = runtime.event_bus().publish(*offset);
+ runtime.metrics().record_event_bus_publish(receivers);
}
if !result.stored_offsets().is_empty() {
logging::log_event_stored(
@@ -475,12 +503,15 @@ impl TangleRuntimeHandle {
runtime.metrics().stored_event_offsets(),
);
}
- Ok(vec![result.into_message()])
+ let message = result.into_message();
+ record_event_metrics(runtime.metrics(), &message, is_group_event, started_at);
+ Ok(vec![message])
}
ClientMessage::Req {
subscription_id,
filters,
} => {
+ let started_at = Instant::now();
runtime
.limits()
.base_relay_limits()
@@ -492,21 +523,29 @@ impl TangleRuntimeHandle {
if let Some(message) =
runtime.rate_limit_req(&subscription_id, &filters, auth, query_context, now)
{
+ runtime
+ .metrics()
+ .record_query_latency(elapsed_micros(started_at));
return Ok(vec![message]);
}
- runtime.relay_mut().handle_client_message(
- ClientMessage::Req {
- subscription_id,
- filters,
- },
+ let report = runtime.relay_mut().query_req_with_auth_report(
+ subscription_id,
+ filters,
auth,
- now,
- )
+ )?;
+ if report.group_read_denied() {
+ runtime.metrics().record_group_read_denial();
+ }
+ runtime
+ .metrics()
+ .record_query_latency(elapsed_micros(started_at));
+ Ok(report.into_messages())
}
ClientMessage::Count {
subscription_id,
filters,
} => {
+ let started_at = Instant::now();
runtime
.limits()
.base_relay_limits()
@@ -518,19 +557,27 @@ impl TangleRuntimeHandle {
if let Some(message) =
runtime.rate_limit_count(&subscription_id, &filters, auth, query_context, now)
{
+ runtime
+ .metrics()
+ .record_query_latency(elapsed_micros(started_at));
return Ok(vec![message]);
}
- runtime.relay_mut().handle_client_message(
- ClientMessage::Count {
- subscription_id,
- filters,
- },
+ let report = runtime.relay_mut().handle_count_with_auth_report(
+ subscription_id,
+ filters,
auth,
- now,
- )
+ )?;
+ if report.group_read_denied() {
+ runtime.metrics().record_group_read_denial();
+ }
+ runtime
+ .metrics()
+ .record_query_latency(elapsed_micros(started_at));
+ Ok(vec![report.into_message()])
}
ClientMessage::Auth(event) => {
if let Err(error) = runtime.limits().base_relay_limits().validate_event(&event) {
+ runtime.metrics().record_auth_failure();
return Ok(vec![RelayMessage::Ok {
event_id: event.id().clone(),
accepted: false,
@@ -538,6 +585,7 @@ impl TangleRuntimeHandle {
}]);
}
if let Some(message) = runtime.rate_limit_auth_attempt(&event, now) {
+ runtime.metrics().record_auth_failure();
return Ok(vec![message]);
}
let event_for_failure = event.clone();
@@ -546,10 +594,14 @@ impl TangleRuntimeHandle {
auth,
now,
)?;
- if auth_response_failed(&replies)
- && let Some(message) = runtime.rate_limit_auth_failure(&event_for_failure, now)
- {
- return Ok(vec![message]);
+ if auth_response_failed(&replies) {
+ runtime.metrics().record_auth_failure();
+ if let Some(message) = runtime.rate_limit_auth_failure(&event_for_failure, now)
+ {
+ return Ok(vec![message]);
+ }
+ } else {
+ runtime.metrics().record_auth_success();
}
Ok(replies)
}
@@ -560,7 +612,12 @@ impl TangleRuntimeHandle {
}
pub async fn subscribe_events(&self) -> TangleEventReceiver {
- self.inner.lock().await.event_bus().subscribe()
+ let runtime = self.inner.lock().await;
+ let receiver = runtime.event_bus().subscribe();
+ runtime
+ .metrics()
+ .record_event_bus_receivers(runtime.event_bus().receiver_count());
+ receiver
}
pub async fn rate_limiter(&self) -> TangleRateLimiter {
@@ -587,11 +644,19 @@ impl TangleRuntimeHandle {
filters: Vec<Filter>,
auth: &BaseAuthState,
) -> Result<Vec<RelayMessage>, BaseRelayError> {
- self.inner
- .lock()
- .await
- .relay()
- .query_req_with_auth(subscription_id, filters, auth)
+ let started_at = Instant::now();
+ let mut runtime = self.inner.lock().await;
+ let report =
+ runtime
+ .relay_mut()
+ .query_req_with_auth_report(subscription_id, filters, auth)?;
+ if report.group_read_denied() {
+ runtime.metrics().record_group_read_denial();
+ }
+ runtime
+ .metrics()
+ .record_query_latency(elapsed_micros(started_at));
+ Ok(report.into_messages())
}
pub async fn event_by_offset_with_auth(
@@ -599,11 +664,12 @@ impl TangleRuntimeHandle {
offset: StoreOffset,
auth: &BaseAuthState,
) -> Result<Option<Event>, BaseRelayError> {
- self.inner
- .lock()
- .await
- .relay()
- .event_by_offset_with_auth(offset, auth)
+ let runtime = self.inner.lock().await;
+ let event = runtime.relay().event_by_offset_with_auth(offset, auth)?;
+ if event.is_none() {
+ runtime.metrics().record_group_read_denial();
+ }
+ Ok(event)
}
pub(crate) async fn fanout_event_offset(
@@ -635,6 +701,48 @@ fn auth_response_failed(replies: &[RelayMessage]) -> bool {
})
}
+fn record_event_metrics(
+ metrics: &TangleRuntimeMetrics,
+ message: &RelayMessage,
+ is_group_event: bool,
+ started_at: Instant,
+) {
+ metrics.record_event_admission_latency(elapsed_micros(started_at));
+ if let RelayMessage::Ok { accepted, .. } = message {
+ if *accepted {
+ metrics.record_event_admission();
+ } else {
+ metrics.record_event_rejection();
+ if is_group_event {
+ metrics.record_group_write_denial();
+ }
+ }
+ }
+}
+
+fn elapsed_micros(started_at: Instant) -> u64 {
+ u64::try_from(started_at.elapsed().as_micros()).unwrap_or(u64::MAX)
+}
+
+fn directory_size_bytes(path: &Path) -> u64 {
+ let Ok(metadata) = fs::metadata(path) else {
+ return 0;
+ };
+ if metadata.is_file() {
+ return metadata.len();
+ }
+ if !metadata.is_dir() {
+ return 0;
+ }
+ let Ok(entries) = fs::read_dir(path) else {
+ return 0;
+ };
+ entries
+ .filter_map(Result::ok)
+ .map(|entry| directory_size_bytes(&entry.path()))
+ .sum()
+}
+
fn client_message_metric_kind(message: &ClientMessage) -> TangleClientMessageMetricKind {
match message {
ClientMessage::Event(_) => TangleClientMessageMetricKind::Event,
@@ -772,6 +880,23 @@ struct TangleRuntimeMetricsInner {
closed_subscriptions: AtomicU64,
stored_event_offsets: AtomicU64,
rate_limit_rejections: AtomicU64,
+ auth_successes: AtomicU64,
+ auth_failures: AtomicU64,
+ event_admissions: AtomicU64,
+ event_rejections: AtomicU64,
+ group_read_denials: AtomicU64,
+ group_write_denials: AtomicU64,
+ event_bus_receivers_current: AtomicUsize,
+ event_bus_published_offsets: AtomicU64,
+ event_bus_lagged_receivers: AtomicU64,
+ event_bus_lagged_offsets: AtomicU64,
+ outbox_pending_events: AtomicUsize,
+ outbox_replayed_events: AtomicU64,
+ disk_used_bytes: AtomicU64,
+ event_admission_latency_total_micros: AtomicU64,
+ event_admission_latency_count: AtomicU64,
+ query_latency_total_micros: AtomicU64,
+ query_latency_count: AtomicU64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -785,68 +910,86 @@ pub enum TangleClientMessageMetricKind {
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TangleRuntimeMetricsSnapshot {
- uptime_seconds: u64,
- active_sessions: usize,
- total_sessions: u64,
- client_messages: u64,
- event_messages: u64,
- req_messages: u64,
- count_messages: u64,
- auth_messages: u64,
- close_messages: u64,
- opened_subscriptions: u64,
- closed_subscriptions: u64,
- stored_event_offsets: u64,
- rate_limit_rejections: u64,
+ tangle_runtime_uptime_seconds: u64,
+ tangle_readiness_ready: bool,
+ tangle_ws_connections_current: usize,
+ tangle_ws_connections_total: u64,
+ tangle_client_messages_total: u64,
+ tangle_event_messages_total: u64,
+ tangle_req_messages_total: u64,
+ tangle_count_messages_total: u64,
+ tangle_auth_messages_total: u64,
+ tangle_close_messages_total: u64,
+ tangle_subscriptions_opened_total: u64,
+ tangle_subscriptions_closed_total: u64,
+ tangle_stored_event_offsets_total: u64,
+ tangle_rate_limit_rejections_total: u64,
+ tangle_auth_success_total: u64,
+ tangle_auth_failure_total: u64,
+ tangle_event_admitted_total: u64,
+ tangle_event_rejected_total: u64,
+ tangle_group_read_denied_total: u64,
+ tangle_group_write_denied_total: u64,
+ tangle_event_bus_receivers_current: usize,
+ tangle_event_bus_published_offsets_total: u64,
+ tangle_event_bus_lagged_receivers_total: u64,
+ tangle_event_bus_lagged_offsets_total: u64,
+ tangle_outbox_pending_events: usize,
+ tangle_outbox_replayed_events_total: u64,
+ tangle_disk_used_bytes: u64,
+ tangle_event_admission_latency_total_micros: u64,
+ tangle_event_admission_latency_count: u64,
+ tangle_query_latency_total_micros: u64,
+ tangle_query_latency_count: u64,
}
impl TangleRuntimeMetricsSnapshot {
pub fn active_sessions(&self) -> usize {
- self.active_sessions
+ self.tangle_ws_connections_current
}
pub fn total_sessions(&self) -> u64 {
- self.total_sessions
+ self.tangle_ws_connections_total
}
pub fn client_messages(&self) -> u64 {
- self.client_messages
+ self.tangle_client_messages_total
}
pub fn event_messages(&self) -> u64 {
- self.event_messages
+ self.tangle_event_messages_total
}
pub fn req_messages(&self) -> u64 {
- self.req_messages
+ self.tangle_req_messages_total
}
pub fn count_messages(&self) -> u64 {
- self.count_messages
+ self.tangle_count_messages_total
}
pub fn auth_messages(&self) -> u64 {
- self.auth_messages
+ self.tangle_auth_messages_total
}
pub fn close_messages(&self) -> u64 {
- self.close_messages
+ self.tangle_close_messages_total
}
pub fn opened_subscriptions(&self) -> u64 {
- self.opened_subscriptions
+ self.tangle_subscriptions_opened_total
}
pub fn closed_subscriptions(&self) -> u64 {
- self.closed_subscriptions
+ self.tangle_subscriptions_closed_total
}
pub fn stored_event_offsets(&self) -> u64 {
- self.stored_event_offsets
+ self.tangle_stored_event_offsets_total
}
pub fn rate_limit_rejections(&self) -> u64 {
- self.rate_limit_rejections
+ self.tangle_rate_limit_rejections_total
}
}
@@ -867,25 +1010,65 @@ impl TangleRuntimeMetrics {
closed_subscriptions: AtomicU64::new(0),
stored_event_offsets: AtomicU64::new(0),
rate_limit_rejections: AtomicU64::new(0),
+ auth_successes: AtomicU64::new(0),
+ auth_failures: AtomicU64::new(0),
+ event_admissions: AtomicU64::new(0),
+ event_rejections: AtomicU64::new(0),
+ group_read_denials: AtomicU64::new(0),
+ group_write_denials: AtomicU64::new(0),
+ event_bus_receivers_current: AtomicUsize::new(0),
+ event_bus_published_offsets: AtomicU64::new(0),
+ event_bus_lagged_receivers: AtomicU64::new(0),
+ event_bus_lagged_offsets: AtomicU64::new(0),
+ outbox_pending_events: AtomicUsize::new(0),
+ outbox_replayed_events: AtomicU64::new(0),
+ disk_used_bytes: AtomicU64::new(0),
+ event_admission_latency_total_micros: AtomicU64::new(0),
+ event_admission_latency_count: AtomicU64::new(0),
+ query_latency_total_micros: AtomicU64::new(0),
+ query_latency_count: AtomicU64::new(0),
}),
}
}
pub fn snapshot(&self) -> TangleRuntimeMetricsSnapshot {
+ self.snapshot_with_readiness(false)
+ }
+
+ pub fn snapshot_with_readiness(&self, readiness_ready: bool) -> TangleRuntimeMetricsSnapshot {
TangleRuntimeMetricsSnapshot {
- uptime_seconds: self.started_at().elapsed().as_secs(),
- active_sessions: self.active_sessions(),
- total_sessions: self.total_sessions(),
- client_messages: self.client_messages(),
- event_messages: self.event_messages(),
- req_messages: self.req_messages(),
- count_messages: self.count_messages(),
- auth_messages: self.auth_messages(),
- close_messages: self.close_messages(),
- opened_subscriptions: self.opened_subscriptions(),
- closed_subscriptions: self.closed_subscriptions(),
- stored_event_offsets: self.stored_event_offsets(),
- rate_limit_rejections: self.rate_limit_rejections(),
+ tangle_runtime_uptime_seconds: self.started_at().elapsed().as_secs(),
+ tangle_readiness_ready: readiness_ready,
+ tangle_ws_connections_current: self.active_sessions(),
+ tangle_ws_connections_total: self.total_sessions(),
+ tangle_client_messages_total: self.client_messages(),
+ tangle_event_messages_total: self.event_messages(),
+ tangle_req_messages_total: self.req_messages(),
+ tangle_count_messages_total: self.count_messages(),
+ tangle_auth_messages_total: self.auth_messages(),
+ tangle_close_messages_total: self.close_messages(),
+ tangle_subscriptions_opened_total: self.opened_subscriptions(),
+ tangle_subscriptions_closed_total: self.closed_subscriptions(),
+ tangle_stored_event_offsets_total: self.stored_event_offsets(),
+ tangle_rate_limit_rejections_total: self.rate_limit_rejections(),
+ tangle_auth_success_total: self.auth_successes(),
+ tangle_auth_failure_total: self.auth_failures(),
+ tangle_event_admitted_total: self.event_admissions(),
+ tangle_event_rejected_total: self.event_rejections(),
+ tangle_group_read_denied_total: self.group_read_denials(),
+ tangle_group_write_denied_total: self.group_write_denials(),
+ tangle_event_bus_receivers_current: self.event_bus_receivers_current(),
+ tangle_event_bus_published_offsets_total: self.event_bus_published_offsets(),
+ tangle_event_bus_lagged_receivers_total: self.event_bus_lagged_receivers(),
+ tangle_event_bus_lagged_offsets_total: self.event_bus_lagged_offsets(),
+ tangle_outbox_pending_events: self.outbox_pending_events(),
+ tangle_outbox_replayed_events_total: self.outbox_replayed_events(),
+ tangle_disk_used_bytes: self.disk_used_bytes(),
+ tangle_event_admission_latency_total_micros: self
+ .event_admission_latency_total_micros(),
+ tangle_event_admission_latency_count: self.event_admission_latency_count(),
+ tangle_query_latency_total_micros: self.query_latency_total_micros(),
+ tangle_query_latency_count: self.query_latency_count(),
}
}
@@ -941,6 +1124,86 @@ impl TangleRuntimeMetrics {
self.inner.rate_limit_rejections.load(Ordering::Relaxed)
}
+ pub fn auth_successes(&self) -> u64 {
+ self.inner.auth_successes.load(Ordering::Relaxed)
+ }
+
+ pub fn auth_failures(&self) -> u64 {
+ self.inner.auth_failures.load(Ordering::Relaxed)
+ }
+
+ pub fn event_admissions(&self) -> u64 {
+ self.inner.event_admissions.load(Ordering::Relaxed)
+ }
+
+ pub fn event_rejections(&self) -> u64 {
+ self.inner.event_rejections.load(Ordering::Relaxed)
+ }
+
+ pub fn group_read_denials(&self) -> u64 {
+ self.inner.group_read_denials.load(Ordering::Relaxed)
+ }
+
+ pub fn group_write_denials(&self) -> u64 {
+ self.inner.group_write_denials.load(Ordering::Relaxed)
+ }
+
+ pub fn event_bus_receivers_current(&self) -> usize {
+ self.inner
+ .event_bus_receivers_current
+ .load(Ordering::Relaxed)
+ }
+
+ pub fn event_bus_published_offsets(&self) -> u64 {
+ self.inner
+ .event_bus_published_offsets
+ .load(Ordering::Relaxed)
+ }
+
+ pub fn event_bus_lagged_receivers(&self) -> u64 {
+ self.inner
+ .event_bus_lagged_receivers
+ .load(Ordering::Relaxed)
+ }
+
+ pub fn event_bus_lagged_offsets(&self) -> u64 {
+ self.inner.event_bus_lagged_offsets.load(Ordering::Relaxed)
+ }
+
+ pub fn outbox_pending_events(&self) -> usize {
+ self.inner.outbox_pending_events.load(Ordering::Relaxed)
+ }
+
+ pub fn outbox_replayed_events(&self) -> u64 {
+ self.inner.outbox_replayed_events.load(Ordering::Relaxed)
+ }
+
+ pub fn disk_used_bytes(&self) -> u64 {
+ self.inner.disk_used_bytes.load(Ordering::Relaxed)
+ }
+
+ pub fn event_admission_latency_total_micros(&self) -> u64 {
+ self.inner
+ .event_admission_latency_total_micros
+ .load(Ordering::Relaxed)
+ }
+
+ pub fn event_admission_latency_count(&self) -> u64 {
+ self.inner
+ .event_admission_latency_count
+ .load(Ordering::Relaxed)
+ }
+
+ pub fn query_latency_total_micros(&self) -> u64 {
+ self.inner
+ .query_latency_total_micros
+ .load(Ordering::Relaxed)
+ }
+
+ pub fn query_latency_count(&self) -> u64 {
+ self.inner.query_latency_count.load(Ordering::Relaxed)
+ }
+
pub fn record_session_opened(&self) -> usize {
self.inner.total_sessions.fetch_add(1, Ordering::Relaxed);
self.inner.active_sessions.fetch_add(1, Ordering::Relaxed) + 1
@@ -1013,6 +1276,94 @@ impl TangleRuntimeMetrics {
.fetch_add(1, Ordering::Relaxed)
+ 1
}
+
+ pub fn record_auth_success(&self) -> u64 {
+ self.inner.auth_successes.fetch_add(1, Ordering::Relaxed) + 1
+ }
+
+ pub fn record_auth_failure(&self) -> u64 {
+ self.inner.auth_failures.fetch_add(1, Ordering::Relaxed) + 1
+ }
+
+ pub fn record_event_admission(&self) -> u64 {
+ self.inner.event_admissions.fetch_add(1, Ordering::Relaxed) + 1
+ }
+
+ pub fn record_event_rejection(&self) -> u64 {
+ self.inner.event_rejections.fetch_add(1, Ordering::Relaxed) + 1
+ }
+
+ pub fn record_group_read_denial(&self) -> u64 {
+ self.inner
+ .group_read_denials
+ .fetch_add(1, Ordering::Relaxed)
+ + 1
+ }
+
+ pub fn record_group_write_denial(&self) -> u64 {
+ self.inner
+ .group_write_denials
+ .fetch_add(1, Ordering::Relaxed)
+ + 1
+ }
+
+ pub fn record_event_bus_receivers(&self, count: usize) {
+ self.inner
+ .event_bus_receivers_current
+ .store(count, Ordering::Relaxed);
+ }
+
+ pub fn record_event_bus_publish(&self, receivers: usize) -> u64 {
+ self.record_event_bus_receivers(receivers);
+ self.inner
+ .event_bus_published_offsets
+ .fetch_add(1, Ordering::Relaxed)
+ + 1
+ }
+
+ pub fn record_event_bus_lagged(&self, skipped: u64) {
+ self.inner
+ .event_bus_lagged_receivers
+ .fetch_add(1, Ordering::Relaxed);
+ self.inner
+ .event_bus_lagged_offsets
+ .fetch_add(skipped, Ordering::Relaxed);
+ }
+
+ pub fn record_outbox_pending_events(&self, count: usize) {
+ self.inner
+ .outbox_pending_events
+ .store(count, Ordering::Relaxed);
+ }
+
+ pub fn record_outbox_replayed_event(&self) -> u64 {
+ self.inner
+ .outbox_replayed_events
+ .fetch_add(1, Ordering::Relaxed)
+ + 1
+ }
+
+ pub fn record_disk_used_bytes(&self, bytes: u64) {
+ self.inner.disk_used_bytes.store(bytes, Ordering::Relaxed);
+ }
+
+ pub fn record_event_admission_latency(&self, micros: u64) {
+ self.inner
+ .event_admission_latency_total_micros
+ .fetch_add(micros, Ordering::Relaxed);
+ self.inner
+ .event_admission_latency_count
+ .fetch_add(1, Ordering::Relaxed);
+ }
+
+ pub fn record_query_latency(&self, micros: u64) {
+ self.inner
+ .query_latency_total_micros
+ .fetch_add(micros, Ordering::Relaxed);
+ self.inner
+ .query_latency_count
+ .fetch_add(1, Ordering::Relaxed);
+ }
}
impl Default for TangleRuntimeMetrics {
@@ -1155,7 +1506,21 @@ mod tests {
assert_eq!(runtime.metrics().stored_event_offsets(), 1);
assert_eq!(runtime.metrics().record_rate_limit_rejection(), 1);
assert_eq!(runtime.metrics().rate_limit_rejections(), 1);
- let snapshot = runtime.metrics().snapshot();
+ assert_eq!(runtime.metrics().record_auth_success(), 1);
+ assert_eq!(runtime.metrics().record_auth_failure(), 1);
+ assert_eq!(runtime.metrics().record_event_admission(), 1);
+ assert_eq!(runtime.metrics().record_event_rejection(), 1);
+ assert_eq!(runtime.metrics().record_group_read_denial(), 1);
+ assert_eq!(runtime.metrics().record_group_write_denial(), 1);
+ runtime.metrics().record_event_bus_receivers(3);
+ assert_eq!(runtime.metrics().record_event_bus_publish(3), 1);
+ runtime.metrics().record_event_bus_lagged(4);
+ runtime.metrics().record_outbox_pending_events(2);
+ assert_eq!(runtime.metrics().record_outbox_replayed_event(), 1);
+ runtime.metrics().record_disk_used_bytes(5);
+ runtime.metrics().record_event_admission_latency(13);
+ runtime.metrics().record_query_latency(17);
+ let snapshot = runtime.metrics().snapshot_with_readiness(true);
assert_eq!(snapshot.active_sessions(), 0);
assert_eq!(snapshot.total_sessions(), 1);
assert_eq!(snapshot.client_messages(), 1);
@@ -1164,6 +1529,31 @@ mod tests {
assert_eq!(snapshot.closed_subscriptions(), 1);
assert_eq!(snapshot.stored_event_offsets(), 1);
assert_eq!(snapshot.rate_limit_rejections(), 1);
+ let snapshot_value = serde_json::to_value(snapshot).expect("snapshot json");
+ assert_eq!(snapshot_value["tangle_readiness_ready"], true);
+ assert_eq!(snapshot_value["tangle_auth_success_total"], 1);
+ assert_eq!(snapshot_value["tangle_auth_failure_total"], 1);
+ assert_eq!(snapshot_value["tangle_event_admitted_total"], 1);
+ assert_eq!(snapshot_value["tangle_event_rejected_total"], 1);
+ assert_eq!(snapshot_value["tangle_group_read_denied_total"], 1);
+ assert_eq!(snapshot_value["tangle_group_write_denied_total"], 1);
+ assert_eq!(snapshot_value["tangle_event_bus_receivers_current"], 3);
+ assert_eq!(
+ snapshot_value["tangle_event_bus_published_offsets_total"],
+ 1
+ );
+ assert_eq!(snapshot_value["tangle_event_bus_lagged_receivers_total"], 1);
+ assert_eq!(snapshot_value["tangle_event_bus_lagged_offsets_total"], 4);
+ assert_eq!(snapshot_value["tangle_outbox_pending_events"], 2);
+ assert_eq!(snapshot_value["tangle_outbox_replayed_events_total"], 1);
+ assert_eq!(snapshot_value["tangle_disk_used_bytes"], 5);
+ assert_eq!(
+ snapshot_value["tangle_event_admission_latency_total_micros"],
+ 13
+ );
+ assert_eq!(snapshot_value["tangle_event_admission_latency_count"], 1);
+ assert_eq!(snapshot_value["tangle_query_latency_total_micros"], 17);
+ assert_eq!(snapshot_value["tangle_query_latency_count"], 1);
let report = runtime.shutdown().expect("shutdown");
@@ -1175,6 +1565,27 @@ mod tests {
}
#[test]
+ fn runtime_metrics_snapshot_serializes_tangle_contract_keys() {
+ let metrics = super::TangleRuntimeMetrics::new();
+ metrics.record_session_opened();
+ metrics.record_auth_success();
+ metrics.record_event_admission();
+ metrics.record_event_bus_publish(1);
+ metrics.record_disk_used_bytes(42);
+ let snapshot = metrics.snapshot_with_readiness(true);
+ let value = serde_json::to_value(snapshot).expect("snapshot");
+
+ assert_eq!(value["tangle_readiness_ready"], true);
+ assert_eq!(value["tangle_ws_connections_current"], 1);
+ assert_eq!(value["tangle_auth_success_total"], 1);
+ assert_eq!(value["tangle_event_admitted_total"], 1);
+ assert_eq!(value["tangle_event_bus_published_offsets_total"], 1);
+ assert_eq!(value["tangle_disk_used_bytes"], 42);
+ assert!(value.get("active_sessions").is_none());
+ assert!(value.get("stored_event_offsets").is_none());
+ }
+
+ #[test]
fn runtime_limits_and_event_bus_reject_zero_capacity() {
assert!(TangleRuntimeLimits::new(0, runtime_relay_limits(1), 1, 1).is_err());
assert!(TangleRuntimeLimits::new(1, runtime_relay_limits(1), 0, 1).is_err());
@@ -1254,6 +1665,10 @@ mod tests {
assert_eq!(snapshot.client_messages(), 2);
assert_eq!(snapshot.event_messages(), 2);
assert_eq!(snapshot.stored_event_offsets(), 1);
+ assert_eq!(handle.metrics().event_admissions(), 2);
+ assert_eq!(handle.metrics().event_bus_receivers_current(), 1);
+ assert_eq!(handle.metrics().event_bus_published_offsets(), 1);
+ assert_eq!(handle.metrics().event_admission_latency_count(), 2);
let _ = std::fs::remove_dir_all(root);
}
@@ -1929,6 +2344,9 @@ mod tests {
.contains(&event.unsigned().kind().as_u32())
));
}
+ assert_eq!(handle.metrics().outbox_replayed_events(), 2);
+ assert_eq!(handle.metrics().outbox_pending_events(), 0);
+ assert_eq!(handle.metrics().event_bus_published_offsets(), 3);
assert_eq!(
offsets.try_recv().expect_err("only source plus generated"),
TangleEventReceiveError::Empty
diff --git a/crates/tangle_runtime/src/server.rs b/crates/tangle_runtime/src/server.rs
@@ -503,8 +503,9 @@ mod tests {
.expect("body");
let metrics_value =
serde_json::from_slice::<serde_json::Value>(&metrics_body).expect("json");
- assert_eq!(metrics_value["active_sessions"], 0);
- assert_eq!(metrics_value["stored_event_offsets"], 0);
+ assert_eq!(metrics_value["tangle_readiness_ready"], true);
+ assert_eq!(metrics_value["tangle_ws_connections_current"], 0);
+ assert_eq!(metrics_value["tangle_stored_event_offsets_total"], 0);
let root_body = to_bytes(root_without_accept.into_body(), usize::MAX)
.await
.expect("body");
diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs
@@ -110,6 +110,9 @@ impl TangleWebSocketSession {
let closed_subscriptions = self.subscriptions.close_all();
metrics.record_subscriptions_closed(closed_subscriptions);
metrics.record_session_closed();
+ metrics.record_event_bus_receivers(
+ metrics.event_bus_receivers_current().saturating_sub(1),
+ );
logging::log_websocket_session_closed(
self.connection_id,
self.peer_ip,
@@ -162,6 +165,7 @@ impl TangleWebSocketSession {
let closed_subscriptions = self.subscriptions.close_all();
metrics.record_subscriptions_closed(closed_subscriptions);
metrics.record_session_closed();
+ metrics.record_event_bus_receivers(metrics.event_bus_receivers_current().saturating_sub(1));
logging::log_websocket_session_closed(
self.connection_id,
self.peer_ip,
@@ -181,7 +185,8 @@ impl TangleWebSocketSession {
TangleSessionControl::Stop
}
}
- Err(TangleEventReceiveError::Lagged(_)) => {
+ Err(TangleEventReceiveError::Lagged(skipped)) => {
+ self.runtime.metrics().record_event_bus_lagged(skipped);
TangleSessionControl::Close(event_stream_lag_close_message())
}
Err(TangleEventReceiveError::Closed) => TangleSessionControl::Stop,
@@ -642,10 +647,12 @@ mod tests {
let events = runtime.event_bus().subscribe();
assert_eq!(runtime.event_bus().publish(StoreOffset::new(1)), 1);
assert_eq!(runtime.event_bus().publish(StoreOffset::new(2)), 1);
+ let runtime = TangleRuntimeHandle::new(runtime);
+ let metrics = runtime.metrics();
let mut session = TangleWebSocketSession::new(
session_limits(1),
shutdown.subscribe(),
- TangleRuntimeHandle::new(runtime),
+ runtime,
auth,
events,
)
@@ -656,6 +663,8 @@ mod tests {
session.handle_event_receive_result(event).await,
TangleSessionControl::Close(event_stream_lag_close_message())
);
+ assert_eq!(metrics.event_bus_lagged_receivers(), 1);
+ assert_eq!(metrics.event_bus_lagged_offsets(), 1);
let _ = std::fs::remove_dir_all(root);
}
diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs
@@ -58,8 +58,9 @@ async fn tangle_run_serves_until_shutdown() {
assert!(health.contains(r#""status":"ok""#));
assert!(ready.contains(r#""status":"ready""#));
assert!(ready.contains(r#""server_bind":"ready""#));
- assert!(metrics.contains(r#""active_sessions":0"#));
- assert!(metrics.contains(r#""stored_event_offsets":0"#));
+ assert!(metrics.contains(r#""tangle_readiness_ready":true"#));
+ assert!(metrics.contains(r#""tangle_ws_connections_current":0"#));
+ assert!(metrics.contains(r#""tangle_stored_event_offsets_total":0"#));
assert!(nip11.contains(r#""name":"tangle""#));
assert!(
nip11