tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit ca0dcf9352372ba781d7b4489694f0ae2a04219b
parent e429592e588e0e57721b66c66f7df0898c092023
Author: triesap <tyson@radroots.org>
Date:   Sun, 14 Jun 2026 18:29:40 -0700

runtime: expose safe query metrics

Diffstat:
Mcrates/tangle_runtime/src/ops.rs | 14++++++++++++++
Mcrates/tangle_runtime/src/relay/core.rs | 142+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------
Mcrates/tangle_runtime/src/runtime.rs | 116+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------
Mcrates/tangle_runtime/tests/phase2_acceptance_targets.rs | 18++++++++++++++++++
4 files changed, 269 insertions(+), 21 deletions(-)

diff --git a/crates/tangle_runtime/src/ops.rs b/crates/tangle_runtime/src/ops.rs @@ -219,6 +219,7 @@ mod tests { BaseRelayReadinessCheckStatus, BaseRelayReadinessHandle, BaseRelayReadinessState, base_relay_ops_router, }; + use crate::relay::core::BaseRelayQueryMetrics; use crate::runtime::TangleRuntimeMetrics; use axum::body::to_bytes; use http::{Request, StatusCode}; @@ -263,6 +264,9 @@ mod tests { metrics.record_disk_used_bytes(89); metrics.record_event_admission_latency(11); metrics.record_query_latency(17); + metrics.record_query_metrics(BaseRelayQueryMetrics::new(5, 3, 2)); + metrics.record_count_refusal(); + metrics.record_broad_query_rejection(); let ready = base_relay_ops_router(readiness.clone(), metrics.clone()) .oneshot( Request::builder() @@ -311,6 +315,7 @@ mod tests { "tangle_client_messages_total", "tangle_close_messages_total", "tangle_count_messages_total", + "tangle_count_refusals_total", "tangle_disk_used_bytes", "tangle_event_admission_latency_count", "tangle_event_admission_latency_total_micros", @@ -326,8 +331,12 @@ mod tests { "tangle_outbound_queue_full_closes_total", "tangle_outbox_pending_events", "tangle_outbox_replayed_events_total", + "tangle_broad_query_rejections_total", + "tangle_query_candidates_scanned_total", "tangle_query_latency_count", "tangle_query_latency_total_micros", + "tangle_query_redacted_events_total", + "tangle_query_returned_events_total", "tangle_rate_limit_rejections_total", "tangle_readiness_ready", "tangle_req_messages_total", @@ -369,6 +378,11 @@ mod tests { assert_eq!(metrics_value["tangle_event_admission_latency_count"], 1); assert_eq!(metrics_value["tangle_query_latency_total_micros"], 17); assert_eq!(metrics_value["tangle_query_latency_count"], 1); + assert_eq!(metrics_value["tangle_query_candidates_scanned_total"], 5); + assert_eq!(metrics_value["tangle_query_returned_events_total"], 3); + assert_eq!(metrics_value["tangle_query_redacted_events_total"], 2); + assert_eq!(metrics_value["tangle_count_refusals_total"], 1); + assert_eq!(metrics_value["tangle_broad_query_rejections_total"], 1); let metrics_text = String::from_utf8(metrics_body.to_vec()).expect("utf8"); assert!(!metrics_text.contains("relay_secret")); assert!(!metrics_text.contains("invite")); diff --git a/crates/tangle_runtime/src/relay/core.rs b/crates/tangle_runtime/src/relay/core.rs @@ -9,7 +9,10 @@ use crate::relay::{ auth::BaseAuthState, live::{CloseResult, LiveSubscriptionSet}, }; -use std::{cell::RefCell, collections::BTreeSet}; +use std::{ + cell::{Cell, RefCell}, + collections::BTreeSet, +}; use tangle_crypto::verify_event_signature; use tangle_groups::{ GroupAuthContext, GroupEventClass, GroupEventView, GroupRuntimeConfig, StoreOffset, @@ -63,13 +66,19 @@ impl BaseRelayEventWrite { pub(crate) struct BaseRelayQueryReport { messages: Vec<RelayMessage>, group_read_denied: bool, + query_metrics: BaseRelayQueryMetrics, } impl BaseRelayQueryReport { - fn new(messages: Vec<RelayMessage>, group_read_denied: bool) -> Self { + fn new( + messages: Vec<RelayMessage>, + group_read_denied: bool, + query_metrics: BaseRelayQueryMetrics, + ) -> Self { Self { messages, group_read_denied, + query_metrics, } } @@ -77,6 +86,10 @@ impl BaseRelayQueryReport { self.group_read_denied } + pub(crate) fn query_metrics(&self) -> BaseRelayQueryMetrics { + self.query_metrics + } + pub(crate) fn into_messages(self) -> Vec<RelayMessage> { self.messages } @@ -86,13 +99,19 @@ impl BaseRelayQueryReport { pub(crate) struct BaseRelayCountReport { message: RelayMessage, group_read_denied: bool, + query_metrics: BaseRelayQueryMetrics, } impl BaseRelayCountReport { - fn new(message: RelayMessage, group_read_denied: bool) -> Self { + fn new( + message: RelayMessage, + group_read_denied: bool, + query_metrics: BaseRelayQueryMetrics, + ) -> Self { Self { message, group_read_denied, + query_metrics, } } @@ -100,6 +119,10 @@ impl BaseRelayCountReport { self.group_read_denied } + pub(crate) fn query_metrics(&self) -> BaseRelayQueryMetrics { + self.query_metrics + } + pub(crate) fn into_message(self) -> RelayMessage { self.message } @@ -109,28 +132,82 @@ impl BaseRelayCountReport { struct BaseRelayEventQueryReport { events: Vec<Event>, group_read_denied: bool, + query_metrics: BaseRelayQueryMetrics, } impl BaseRelayEventQueryReport { - fn new(events: Vec<Event>, group_read_denied: bool) -> Self { + fn new( + events: Vec<Event>, + group_read_denied: bool, + query_metrics: BaseRelayQueryMetrics, + ) -> Self { Self { events, group_read_denied, + query_metrics, } } } +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub(crate) struct BaseRelayQueryMetrics { + candidates_scanned: u64, + returned_events: u64, + redacted_events: u64, +} + +impl BaseRelayQueryMetrics { + pub(crate) fn new(candidates_scanned: u64, returned_events: u64, redacted_events: u64) -> Self { + Self { + candidates_scanned, + returned_events, + redacted_events, + } + } + + fn add(self, other: Self) -> Self { + Self { + candidates_scanned: self + .candidates_scanned + .saturating_add(other.candidates_scanned), + returned_events: self.returned_events.saturating_add(other.returned_events), + redacted_events: self.redacted_events.saturating_add(other.redacted_events), + } + } + + fn with_returned_events(self, returned_events: usize) -> Self { + Self { + returned_events: u64::try_from(returned_events).expect("returned events fit in u64"), + ..self + } + } + + pub(crate) fn candidates_scanned(self) -> u64 { + self.candidates_scanned + } + + pub(crate) fn returned_events(self) -> u64 { + self.returned_events + } + + pub(crate) fn redacted_events(self) -> u64 { + self.redacted_events + } +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] struct BaseRelayCountEventsReport { count: u64, group_read_denied: bool, + query_metrics: BaseRelayQueryMetrics, } impl BaseRelayCountEventsReport { - fn new(count: u64, group_read_denied: bool) -> Self { + fn new(count: u64, group_read_denied: bool, query_metrics: BaseRelayQueryMetrics) -> Self { Self { count, group_read_denied, + query_metrics, } } } @@ -835,7 +912,11 @@ impl BaseRelay { self.limits.validate_subscription_id(&subscription_id)?; self.limits.validate_filters(&filters)?; if let Some(message) = Self::unsupported_search_closed(&subscription_id, &filters) { - return Ok(BaseRelayQueryReport::new(vec![message], false)); + return Ok(BaseRelayQueryReport::new( + vec![message], + false, + BaseRelayQueryMetrics::default(), + )); } self.subscriptions .subscribe(subscription_id.clone(), filters.clone(), auth.clone())?; @@ -871,11 +952,16 @@ impl BaseRelay { limits.validate_subscription_id(&subscription_id)?; limits.validate_filters(&filters)?; if let Some(message) = Self::unsupported_search_closed(&subscription_id, &filters) { - return Ok(BaseRelayQueryReport::new(vec![message], false)); + return Ok(BaseRelayQueryReport::new( + vec![message], + false, + BaseRelayQueryMetrics::default(), + )); } let report = Self::query_events_report_with_services(store, groups, limits, query, &filters, auth)?; let group_read_denied = report.group_read_denied; + let query_metrics = report.query_metrics; let mut messages = report .events .into_iter() @@ -885,7 +971,11 @@ impl BaseRelay { }) .collect::<Vec<_>>(); messages.push(RelayMessage::Eose(subscription_id)); - Ok(BaseRelayQueryReport::new(messages, group_read_denied)) + Ok(BaseRelayQueryReport::new( + messages, + group_read_denied, + query_metrics, + )) } pub fn handle_count( @@ -986,7 +1076,11 @@ impl BaseRelay { limits.validate_subscription_id(&subscription_id)?; limits.validate_filters(&filters)?; if let Some(message) = Self::unsupported_search_closed(&subscription_id, &filters) { - return Ok(BaseRelayCountReport::new(message, false)); + return Ok(BaseRelayCountReport::new( + message, + false, + BaseRelayQueryMetrics::default(), + )); } let report = Self::count_events_report_with_services(store, groups, limits, query, &filters, auth)?; @@ -996,6 +1090,7 @@ impl BaseRelay { count: report.count, }, report.group_read_denied, + report.query_metrics, )) } @@ -1048,18 +1143,23 @@ impl BaseRelay { ) -> Result<BaseRelayEventQueryReport, BaseRelayError> { let mut output = Vec::new(); let mut group_read_denied = false; + let mut query_metrics = BaseRelayQueryMetrics::default(); for filter in filters { let report = Self::query_filter_events_report_with_services( store, groups, limits, query, filter, auth, )?; group_read_denied |= report.group_read_denied; + query_metrics = query_metrics.add(report.query_metrics); let mut events = Self::sort_and_dedupe_query_events(report.events); events.truncate(limits.effective_filter_limit(filter)); output.extend(events); } + let events = Self::sort_and_dedupe_query_events(output); + query_metrics = query_metrics.with_returned_events(events.len()); Ok(BaseRelayEventQueryReport::new( - Self::sort_and_dedupe_query_events(output), + events, group_read_denied, + query_metrics, )) } @@ -1073,19 +1173,25 @@ impl BaseRelay { ) -> Result<BaseRelayCountEventsReport, BaseRelayError> { let mut seen = BTreeSet::new(); let mut group_read_denied = false; + let mut query_metrics = BaseRelayQueryMetrics::default(); for filter in filters { let filter = filter.without_limit(); let report = Self::query_filter_events_report_with_services( store, groups, limits, query, &filter, auth, )?; group_read_denied |= report.group_read_denied; + query_metrics = query_metrics.add(report.query_metrics); for event in report.events { seen.insert(event.id().clone()); } } let count = u64::try_from(seen.len()) .map_err(|_| BaseRelayError::error("visible event count overflow"))?; - Ok(BaseRelayCountEventsReport::new(count, group_read_denied)) + Ok(BaseRelayCountEventsReport::new( + count, + group_read_denied, + query_metrics, + )) } fn query_filter_events_report_with_services( @@ -1099,7 +1205,10 @@ impl BaseRelay { let effective_filter = Self::filter_with_limits(limits, filter); let pocket_filter = tangle_filter_to_pocket(&effective_filter)?; let screen_error = RefCell::new(None); + let candidates_scanned = Cell::new(0_u64); + let redacted_events = Cell::new(0_u64); let screened = store.find_events_with_screen(&pocket_filter, query, |pocket_event| { + candidates_scanned.set(candidates_scanned.get().saturating_add(1)); if screen_error.borrow().is_some() { return PocketScreenResult::Mismatch; } @@ -1108,7 +1217,10 @@ impl BaseRelay { Ok(true) => { match Self::group_read_gate_visible_to_auth(groups, pocket_event, auth) { Ok(true) => PocketScreenResult::Match, - Ok(false) => PocketScreenResult::Redacted, + Ok(false) => { + redacted_events.set(redacted_events.get().saturating_add(1)); + PocketScreenResult::Redacted + } Err(error) => { *screen_error.borrow_mut() = Some(error); PocketScreenResult::Mismatch @@ -1130,7 +1242,11 @@ impl BaseRelay { .into_iter() .map(|pocket_event| pocket_event_to_tangle(&pocket_event)) .collect::<Result<Vec<_>, _>>()?; - Ok(BaseRelayEventQueryReport::new(events, group_read_denied)) + Ok(BaseRelayEventQueryReport::new( + events, + group_read_denied, + BaseRelayQueryMetrics::new(candidates_scanned.get(), 0, redacted_events.get()), + )) } fn filter_with_limits(limits: BaseRelayLimits, filter: &Filter) -> Filter { diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs @@ -16,7 +16,7 @@ use crate::{ auth::BaseAuthState, core::{ BaseRelay, BaseRelayCountReport, BaseRelayEventWrite, BaseRelayLimits, - BaseRelayQueryReport, BaseRelayShutdownReport, + BaseRelayQueryMetrics, BaseRelayQueryReport, BaseRelayShutdownReport, }, live::LiveSubscriptionSet, }, @@ -626,14 +626,19 @@ impl TangleRuntimeShared { subscription_id: &SubscriptionId, filters: &[Filter], ) -> Option<RelayMessage> { - TangleQueryClassifier::new(self.limits.base_relay_limits()) + if TangleQueryClassifier::new(self.limits.base_relay_limits()) .classify_count(filters) .is_broad() - .then(|| RelayMessage::Closed { + { + self.metrics.record_count_refusal(); + self.metrics.record_broad_query_rejection(); + return Some(RelayMessage::Closed { subscription_id: subscription_id.clone(), message: BaseRelayError::restricted("count filters are too broad or expensive") .prefixed_message(), - }) + }); + } + None } fn rate_limit_query(&self, request: TangleQueryRateLimitRequest<'_>) -> Option<RelayMessage> { @@ -697,9 +702,9 @@ impl TangleRuntimeShared { return Some(message); } } - if TangleQueryClassifier::new(self.limits.base_relay_limits()) - .classify(request.scope, request.filters) - .is_broad() + let query_classification = TangleQueryClassifier::new(self.limits.base_relay_limits()) + .classify(request.scope, request.filters); + if query_classification.is_broad() && let Some(message) = self.rate_limit_closed( request.subscription_id, TangleRateLimitKey::query_class(request.scope, TangleRateLimitQueryClass::Broad), @@ -709,6 +714,7 @@ impl TangleRuntimeShared { request.now, ) { + self.metrics.record_broad_query_rejection(); return Some(message); } None @@ -894,6 +900,9 @@ impl TangleRuntimeHandle { let report = self.inner .query_req_with_auth_report(subscription_id, filters, auth)?; + self.inner + .metrics + .record_query_metrics(report.query_metrics()); if report.group_read_denied() { self.inner.metrics.record_group_read_denial(); } @@ -944,6 +953,9 @@ impl TangleRuntimeHandle { let report = self.inner .handle_count_with_auth_report(subscription_id, filters, auth)?; + self.inner + .metrics + .record_query_metrics(report.query_metrics()); if report.group_read_denied() { self.inner.metrics.record_group_read_denial(); } @@ -1281,6 +1293,11 @@ struct TangleRuntimeMetricsInner { event_admission_latency_count: AtomicU64, query_latency_total_micros: AtomicU64, query_latency_count: AtomicU64, + query_candidates_scanned: AtomicU64, + query_returned_events: AtomicU64, + query_redacted_events: AtomicU64, + count_refusals: AtomicU64, + broad_query_rejections: AtomicU64, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -1326,6 +1343,11 @@ pub struct TangleRuntimeMetricsSnapshot { tangle_event_admission_latency_count: u64, tangle_query_latency_total_micros: u64, tangle_query_latency_count: u64, + tangle_query_candidates_scanned_total: u64, + tangle_query_returned_events_total: u64, + tangle_query_redacted_events_total: u64, + tangle_count_refusals_total: u64, + tangle_broad_query_rejections_total: u64, } impl TangleRuntimeMetricsSnapshot { @@ -1413,6 +1435,11 @@ impl TangleRuntimeMetrics { event_admission_latency_count: AtomicU64::new(0), query_latency_total_micros: AtomicU64::new(0), query_latency_count: AtomicU64::new(0), + query_candidates_scanned: AtomicU64::new(0), + query_returned_events: AtomicU64::new(0), + query_redacted_events: AtomicU64::new(0), + count_refusals: AtomicU64::new(0), + broad_query_rejections: AtomicU64::new(0), }), } } @@ -1456,6 +1483,11 @@ impl TangleRuntimeMetrics { tangle_event_admission_latency_count: self.event_admission_latency_count(), tangle_query_latency_total_micros: self.query_latency_total_micros(), tangle_query_latency_count: self.query_latency_count(), + tangle_query_candidates_scanned_total: self.query_candidates_scanned(), + tangle_query_returned_events_total: self.query_returned_events(), + tangle_query_redacted_events_total: self.query_redacted_events(), + tangle_count_refusals_total: self.count_refusals(), + tangle_broad_query_rejections_total: self.broad_query_rejections(), } } @@ -1597,6 +1629,26 @@ impl TangleRuntimeMetrics { self.inner.query_latency_count.load(Ordering::Relaxed) } + pub fn query_candidates_scanned(&self) -> u64 { + self.inner.query_candidates_scanned.load(Ordering::Relaxed) + } + + pub fn query_returned_events(&self) -> u64 { + self.inner.query_returned_events.load(Ordering::Relaxed) + } + + pub fn query_redacted_events(&self) -> u64 { + self.inner.query_redacted_events.load(Ordering::Relaxed) + } + + pub fn count_refusals(&self) -> u64 { + self.inner.count_refusals.load(Ordering::Relaxed) + } + + pub fn broad_query_rejections(&self) -> u64 { + self.inner.broad_query_rejections.load(Ordering::Relaxed) + } + pub fn record_session_opened(&self) -> usize { self.inner.total_sessions.fetch_add(1, Ordering::Relaxed); self.inner.active_sessions.fetch_add(1, Ordering::Relaxed) + 1 @@ -1764,6 +1816,29 @@ impl TangleRuntimeMetrics { .query_latency_count .fetch_add(1, Ordering::Relaxed); } + + pub(crate) fn record_query_metrics(&self, metrics: BaseRelayQueryMetrics) { + self.inner + .query_candidates_scanned + .fetch_add(metrics.candidates_scanned(), Ordering::Relaxed); + self.inner + .query_returned_events + .fetch_add(metrics.returned_events(), Ordering::Relaxed); + self.inner + .query_redacted_events + .fetch_add(metrics.redacted_events(), Ordering::Relaxed); + } + + pub fn record_count_refusal(&self) -> u64 { + self.inner.count_refusals.fetch_add(1, Ordering::Relaxed) + 1 + } + + pub fn record_broad_query_rejection(&self) -> u64 { + self.inner + .broad_query_rejections + .fetch_add(1, Ordering::Relaxed) + + 1 + } } impl Default for TangleRuntimeMetrics { @@ -1812,7 +1887,7 @@ mod tests { use crate::config::{BaseRelayRuntimeConfig, parse_base_relay_runtime_config_json}; use crate::event_bus::{TangleEventBus, TangleEventReceiveError}; use crate::rate_limits::{TangleRateLimitKey, TangleRateLimitQueryClass, TangleRateLimitScope}; - use crate::relay::core::{BaseRelayLimitSettings, BaseRelayLimits}; + use crate::relay::core::{BaseRelayLimitSettings, BaseRelayLimits, BaseRelayQueryMetrics}; use crate::relay::live::LiveSubscriptionSet; use serde_json::json; use std::{ @@ -1926,6 +2001,11 @@ mod tests { runtime.metrics().record_disk_used_bytes(5); runtime.metrics().record_event_admission_latency(13); runtime.metrics().record_query_latency(17); + runtime + .metrics() + .record_query_metrics(BaseRelayQueryMetrics::new(5, 3, 2)); + assert_eq!(runtime.metrics().record_count_refusal(), 1); + assert_eq!(runtime.metrics().record_broad_query_rejection(), 1); let snapshot = runtime.metrics().snapshot_with_readiness(true); assert_eq!(snapshot.active_sessions(), 0); assert_eq!(snapshot.total_sessions(), 1); @@ -1961,6 +2041,11 @@ mod tests { assert_eq!(snapshot_value["tangle_event_admission_latency_count"], 1); assert_eq!(snapshot_value["tangle_query_latency_total_micros"], 17); assert_eq!(snapshot_value["tangle_query_latency_count"], 1); + assert_eq!(snapshot_value["tangle_query_candidates_scanned_total"], 5); + assert_eq!(snapshot_value["tangle_query_returned_events_total"], 3); + assert_eq!(snapshot_value["tangle_query_redacted_events_total"], 2); + assert_eq!(snapshot_value["tangle_count_refusals_total"], 1); + assert_eq!(snapshot_value["tangle_broad_query_rejections_total"], 1); let report = runtime.shutdown().expect("shutdown"); @@ -1989,6 +2074,11 @@ mod tests { assert_eq!(value["tangle_event_bus_published_offsets_total"], 1); assert_eq!(value["tangle_disk_used_bytes"], 42); assert_eq!(value["tangle_outbound_queue_full_closes_total"], 0); + assert_eq!(value["tangle_query_candidates_scanned_total"], 0); + assert_eq!(value["tangle_query_returned_events_total"], 0); + assert_eq!(value["tangle_query_redacted_events_total"], 0); + assert_eq!(value["tangle_count_refusals_total"], 0); + assert_eq!(value["tangle_broad_query_rejections_total"], 0); assert!(value.get("active_sessions").is_none()); assert!(value.get("stored_event_offsets").is_none()); } @@ -3122,6 +3212,8 @@ mod tests { message: "restricted: count filters are too broad or expensive".to_owned() }] ); + assert_eq!(handle.metrics().count_refusals(), 1); + assert_eq!(handle.metrics().broad_query_rejections(), 1); let _ = std::fs::remove_dir_all(root); } @@ -3173,6 +3265,8 @@ mod tests { }] ); } + assert_eq!(handle.metrics().count_refusals(), 3); + assert_eq!(handle.metrics().broad_query_rejections(), 3); let _ = std::fs::remove_dir_all(root); } @@ -3694,6 +3788,12 @@ mod tests { }) .await .expect("query concurrency timeout"); + assert!(handle.metrics().query_candidates_scanned() > 0); + assert!( + handle.metrics().query_returned_events() + >= u64::try_from(group_write_count * 3).expect("returned event count") + ); + assert!(handle.metrics().query_redacted_events() > 0); handle.shutdown().await.expect("shutdown"); let _ = std::fs::remove_dir_all(root); diff --git a/crates/tangle_runtime/tests/phase2_acceptance_targets.rs b/crates/tangle_runtime/tests/phase2_acceptance_targets.rs @@ -1014,6 +1014,24 @@ async fn websocket_private_and_hidden_groups_do_not_leak_through_query_count_or_ ] { assert!(!metrics.contains(private_value)); } + let metrics_body = metrics.split_once("\r\n\r\n").expect("metrics body").1; + let metrics_value: Value = serde_json::from_str(metrics_body).expect("metrics json"); + assert_eq!(metrics_value["tangle_count_refusals_total"], 2); + assert_eq!(metrics_value["tangle_broad_query_rejections_total"], 2); + assert!( + metrics_value["tangle_query_candidates_scanned_total"] + .as_u64() + .expect("candidates") + >= metrics_value["tangle_query_redacted_events_total"] + .as_u64() + .expect("redacted") + ); + assert!( + metrics_value["tangle_query_redacted_events_total"] + .as_u64() + .expect("redacted") + >= 2 + ); shutdown.request_shutdown(); read_websocket_close(&mut owner_writer).await;