tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 84e4f486bf2851ee25e9c0406454fc4d0462801d
parent 13c9b4ab6973f28e13fefcf4bd5e23f2a6fdf211
Author: triesap <tyson@radroots.org>
Date:   Sat,  6 Jun 2026 12:39:42 -0700

observability: add metrics endpoint

- expose Prometheus text metrics from the runtime router
- add SurrealDB-backed store snapshot counts
- cover metrics rendering with store and runtime tests
- assert live relay metrics and direct Surreal state in integration

Diffstat:
Mcrates/tangle/tests/run_integration.rs | 18++++++++++++++++++
Mcrates/tangle_runtime/src/lib.rs | 219++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
Mcrates/tangle_store_surreal/src/lib.rs | 191+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 420 insertions(+), 8 deletions(-)

diff --git a/crates/tangle/tests/run_integration.rs b/crates/tangle/tests/run_integration.rs @@ -410,6 +410,16 @@ async fn tangle_run_serves_relay_clients_and_persists_surreal_state() { assert!(seller_detail.contains(profile.id().as_str())); assert!(seller_detail.contains("\"display_name\":\"Radroots Market\"")); assert!(seller_detail.contains("\"regions\":[\"cascadia\",\"pnw\"]")); + let metrics = http_get(port, "/metrics"); + assert!(metrics.contains("200 OK")); + assert!(metrics.contains("text/plain; version=0.0.4")); + assert!(metrics.contains("tangle_info{")); + assert!(metrics.contains("tangle_relay_ready 1")); + assert!(metrics.contains("tangle_store_events{state=\"stored\"} 8")); + assert!(metrics.contains("tangle_store_events{state=\"visible\"} 8")); + assert!(metrics.contains("tangle_store_listings{state=\"active\"} 1")); + assert!(metrics.contains("tangle_store_seller_profiles{state=\"visible\"} 1")); + assert!(metrics.contains("tangle_store_sellers{state=\"approved\"} 0")); let trace_output = stop_relay_with_stderr(relay); assert!(trace_output.contains("tracing initialized")); @@ -442,6 +452,14 @@ async fn tangle_run_serves_relay_clients_and_persists_surreal_state() { .expect("profile raw row") .is_some() ); + let metrics_snapshot = store.metrics_snapshot().await.expect("metrics snapshot"); + assert_eq!(metrics_snapshot.stored_events(), 8); + assert_eq!(metrics_snapshot.visible_events(), 8); + assert_eq!(metrics_snapshot.current_listings(), 1); + assert_eq!(metrics_snapshot.active_listings(), 1); + assert_eq!(metrics_snapshot.seller_profiles(), 1); + assert_eq!(metrics_snapshot.visible_seller_profiles(), 1); + assert_eq!(metrics_snapshot.approved_sellers(), 0); let profile_row = store .seller_profile_row(seller.as_str()) .await diff --git a/crates/tangle_runtime/src/lib.rs b/crates/tangle_runtime/src/lib.rs @@ -39,8 +39,8 @@ use tangle_store_surreal::{ ForumThreadProjectionOutcome, ForumThreadProjectionQuery, LabelProjectionOutcome, LabelProjectionQuery, ListingProjectionQuery, LongFormProjectionOutcome, MigrationApplyOutcome, ReactionProjectionOutcome, ReportProjectionOutcome, ReportProjectionQuery, SearchDocumentQuery, - SellerProfileProjectionOutcome, SurrealConnectionConfig, SurrealConnectionMode, SurrealStore, - base_migration_plan, + SellerProfileProjectionOutcome, SurrealConnectionConfig, SurrealConnectionMode, + SurrealMetricsSnapshot, SurrealStore, base_migration_plan, }; use tokio::net::TcpListener; use tokio::sync::broadcast; @@ -48,6 +48,7 @@ use url::form_urlencoded; pub const TANGLE_SUPPORTED_NIPS: [u16; 8] = [1, 9, 11, 16, 33, 42, 50, 99]; pub const TANGLE_RELAY_SOFTWARE: &str = "https://github.com/radrootslabs/tangle"; +pub const TANGLE_RELAY_VERSION: &str = env!("CARGO_PKG_VERSION"); #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct RelayConnectionId(String); @@ -1128,6 +1129,7 @@ fn runtime_router( .route("/ws", get(runtime_websocket_upgrade)) .route("/healthz", get(runtime_healthz)) .route("/readyz", get(runtime_readyz)) + .route("/metrics", get(runtime_metrics)) .route("/api/listings", get(runtime_listings)) .route("/api/listings/{pubkey}/{d}", get(runtime_listing_detail)) .route( @@ -1203,6 +1205,10 @@ async fn runtime_readyz() -> (StatusCode, Json<ReadinessDocument>) { readyz(State(ReadinessState::ready())).await } +async fn runtime_metrics(State(state): State<RuntimeRelayState>) -> Result<Response, ApiError> { + metrics(State(MetricsHttpState::new(state.store))).await +} + async fn runtime_listings( State(state): State<RuntimeRelayState>, RawQuery(query): RawQuery, @@ -2785,7 +2791,7 @@ impl RelayInfoDocument { icon: None, supported_nips: TANGLE_SUPPORTED_NIPS.to_vec(), software: TANGLE_RELAY_SOFTWARE.to_owned(), - version: env!("CARGO_PKG_VERSION").to_owned(), + version: TANGLE_RELAY_VERSION.to_owned(), limitation: RelayInfoLimitationDocument { payment_required: false, restricted_writes: true, @@ -2849,6 +2855,17 @@ impl ListingsHttpState { } } +#[derive(Debug, Clone)] +pub struct MetricsHttpState { + store: SurrealStore, +} + +impl MetricsHttpState { + pub fn new(store: SurrealStore) -> Self { + Self { store } + } +} + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ListingsDocument { pub items: Vec<ListingItemDocument>, @@ -3187,6 +3204,12 @@ pub fn health_router(readiness: ReadinessState) -> Router { .with_state(readiness) } +pub fn metrics_router(state: MetricsHttpState) -> Router { + Router::new() + .route("/metrics", get(metrics)) + .with_state(state) +} + pub fn relay_info_router(document: RelayInfoDocument) -> Router { Router::new() .route("/", get(relay_info)) @@ -3234,6 +3257,122 @@ async fn readyz(State(readiness): State<ReadinessState>) -> (StatusCode, Json<Re (status, Json(readiness.response())) } +async fn metrics(State(state): State<MetricsHttpState>) -> Result<Response, ApiError> { + let snapshot = state + .store + .metrics_snapshot() + .await + .map_err(|_| ApiError::internal())?; + Ok(( + StatusCode::OK, + [( + header::CONTENT_TYPE, + HeaderValue::from_static("text/plain; version=0.0.4"), + )], + metrics_text(snapshot), + ) + .into_response()) +} + +fn metrics_text(snapshot: SurrealMetricsSnapshot) -> String { + let mut output = String::new(); + output.push_str("# HELP tangle_info Tangle relay build information\n"); + output.push_str("# TYPE tangle_info gauge\n"); + output.push_str(&format!( + "tangle_info{{software=\"{}\",version=\"{}\"}} 1\n", + prometheus_label_value(TANGLE_RELAY_SOFTWARE), + prometheus_label_value(TANGLE_RELAY_VERSION) + )); + output.push_str("# HELP tangle_relay_ready Relay readiness gauge\n"); + output.push_str("# TYPE tangle_relay_ready gauge\n"); + output.push_str("tangle_relay_ready 1\n"); + output.push_str("# HELP tangle_store_events Stored Nostr event gauges\n"); + output.push_str("# TYPE tangle_store_events gauge\n"); + append_labeled_gauge( + &mut output, + "tangle_store_events", + "stored", + snapshot.stored_events(), + ); + append_labeled_gauge( + &mut output, + "tangle_store_events", + "visible", + snapshot.visible_events(), + ); + append_labeled_gauge( + &mut output, + "tangle_store_events", + "hidden", + snapshot.hidden_events(), + ); + append_labeled_gauge( + &mut output, + "tangle_store_events", + "deleted", + snapshot.deleted_events(), + ); + output.push_str("# HELP tangle_store_listings Current listing projection gauges\n"); + output.push_str("# TYPE tangle_store_listings gauge\n"); + append_labeled_gauge( + &mut output, + "tangle_store_listings", + "current", + snapshot.current_listings(), + ); + append_labeled_gauge( + &mut output, + "tangle_store_listings", + "active", + snapshot.active_listings(), + ); + output.push_str("# HELP tangle_store_seller_profiles Seller profile projection gauges\n"); + output.push_str("# TYPE tangle_store_seller_profiles gauge\n"); + append_labeled_gauge( + &mut output, + "tangle_store_seller_profiles", + "stored", + snapshot.seller_profiles(), + ); + append_labeled_gauge( + &mut output, + "tangle_store_seller_profiles", + "visible", + snapshot.visible_seller_profiles(), + ); + output.push_str("# HELP tangle_store_sellers Seller policy gauges\n"); + output.push_str("# TYPE tangle_store_sellers gauge\n"); + append_labeled_gauge( + &mut output, + "tangle_store_sellers", + "approved", + snapshot.approved_sellers(), + ); + output.push_str("# HELP tangle_store_pubkeys Relay pubkey policy gauges\n"); + output.push_str("# TYPE tangle_store_pubkeys gauge\n"); + append_labeled_gauge( + &mut output, + "tangle_store_pubkeys", + "blocked", + snapshot.blocked_pubkeys(), + ); + output +} + +fn append_labeled_gauge(output: &mut String, metric: &str, state: &str, value: u64) { + output.push_str(&format!( + "{metric}{{state=\"{}\"}} {value}\n", + prometheus_label_value(state) + )); +} + +fn prometheus_label_value(value: &str) -> String { + value + .replace('\\', r"\\") + .replace('"', r#"\""#) + .replace('\n', r"\n") +} + async fn relay_info(State(relay_info): State<RelayInfoDocument>, headers: HeaderMap) -> Response { if !accepts_nostr_json(headers.get(header::ACCEPT)) { return ApiError::not_found("relay information requires application/nostr+json") @@ -4369,11 +4508,12 @@ mod tests { ApiError, ApiErrorBody, ApiErrorCode, ApiErrorEnvelope, AuthMessageHandler, ClientFrame, ClientFrameOutcome, ClientMessageLoop, CloseMessageHandler, CloseMessageOutcome, EventMessageHandler, GracefulShutdownSignal, ListingsHttpState, LiveEventFanout, - ReadinessCheckStatus, ReadinessState, RelayConnection, RelayConnectionConfig, - RelayConnectionId, RelayInfoDocument, ReqMessageHandler, RuntimeCommandErrorKind, - RuntimeConfigErrorKind, RuntimeTracingFormat, TANGLE_RELAY_SOFTWARE, TANGLE_SUPPORTED_NIPS, - WebSocketHttpState, health_router, listing_item_document, listing_projection_query, - listings_router, load_runtime_config, migrate_runtime_database, parse_listing_query, + MetricsHttpState, ReadinessCheckStatus, ReadinessState, RelayConnection, + RelayConnectionConfig, RelayConnectionId, RelayInfoDocument, ReqMessageHandler, + RuntimeCommandErrorKind, RuntimeConfigErrorKind, RuntimeTracingFormat, + TANGLE_RELAY_SOFTWARE, TANGLE_RELAY_VERSION, TANGLE_SUPPORTED_NIPS, WebSocketHttpState, + health_router, listing_item_document, listing_projection_query, listings_router, + load_runtime_config, metrics_router, migrate_runtime_database, parse_listing_query, parse_marketplace_search_query, parse_runtime_config_json, relay_info_router, search_document_query, websocket_router, }; @@ -5707,6 +5847,69 @@ mod tests { ); } + #[tokio::test] + async fn metrics_endpoint_reports_store_snapshot() { + let store = runtime_memory_store().await; + let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing"); + let profile = seller_profile(1_714_125_300, "radroots-market", Some("Radroots Market")); + let seller = listing.unsigned().pubkey().as_str().to_owned(); + store + .store_raw_event(&StoredEvent::new( + listing.clone(), + UnixTimestamp::new(1_714_125_301), + )) + .await + .expect("store listing"); + store + .store_raw_event(&StoredEvent::new( + profile.clone(), + UnixTimestamp::new(1_714_125_302), + )) + .await + .expect("store profile"); + store + .project_current_listing(&listing, UnixTimestamp::new(1_714_125_303)) + .await + .expect("project listing"); + store + .project_seller_profile(&profile, UnixTimestamp::new(1_714_125_304)) + .await + .expect("project profile"); + store + .set_seller_approved(seller.as_str(), true, UnixTimestamp::new(1_714_125_305)) + .await + .expect("approve seller"); + + let response = metrics_router(MetricsHttpState::new(store)) + .oneshot( + Request::builder() + .uri("/metrics") + .body(Body::empty()) + .expect("request"), + ) + .await + .expect("response"); + assert_eq!(response.status(), StatusCode::OK); + assert_eq!( + response.headers().get(header::CONTENT_TYPE), + Some(&HeaderValue::from_static("text/plain; version=0.0.4")) + ); + let body = axum::body::to_bytes(response.into_body(), usize::MAX) + .await + .expect("body"); + let body = String::from_utf8(body.to_vec()).expect("utf8"); + assert!(body.contains(&format!( + "tangle_info{{software=\"{}\",version=\"{}\"}} 1", + TANGLE_RELAY_SOFTWARE, TANGLE_RELAY_VERSION + ))); + assert!(body.contains("tangle_relay_ready 1")); + assert!(body.contains("tangle_store_events{state=\"stored\"} 2")); + assert!(body.contains("tangle_store_events{state=\"visible\"} 2")); + assert!(body.contains("tangle_store_listings{state=\"active\"} 1")); + assert!(body.contains("tangle_store_seller_profiles{state=\"visible\"} 1")); + assert!(body.contains("tangle_store_sellers{state=\"approved\"} 1")); + } + #[test] fn relay_info_default_matches_mvp_protocol_claims() { let relay_info = RelayInfoDocument::tangle_default(); diff --git a/crates/tangle_store_surreal/src/lib.rs b/crates/tangle_store_surreal/src/lib.rs @@ -121,6 +121,62 @@ impl fmt::Display for SurrealConfigError { impl std::error::Error for SurrealConfigError {} +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct SurrealMetricsSnapshot { + stored_events: u64, + visible_events: u64, + hidden_events: u64, + deleted_events: u64, + current_listings: u64, + active_listings: u64, + seller_profiles: u64, + visible_seller_profiles: u64, + approved_sellers: u64, + blocked_pubkeys: u64, +} + +impl SurrealMetricsSnapshot { + pub fn stored_events(self) -> u64 { + self.stored_events + } + + pub fn visible_events(self) -> u64 { + self.visible_events + } + + pub fn hidden_events(self) -> u64 { + self.hidden_events + } + + pub fn deleted_events(self) -> u64 { + self.deleted_events + } + + pub fn current_listings(self) -> u64 { + self.current_listings + } + + pub fn active_listings(self) -> u64 { + self.active_listings + } + + pub fn seller_profiles(self) -> u64 { + self.seller_profiles + } + + pub fn visible_seller_profiles(self) -> u64 { + self.visible_seller_profiles + } + + pub fn approved_sellers(self) -> u64 { + self.approved_sellers + } + + pub fn blocked_pubkeys(self) -> u64 { + self.blocked_pubkeys + } +} + fn normalized_identifier(value: &str, field: &str) -> Result<String, SurrealConfigError> { let trimmed = value.trim(); if trimmed.is_empty() { @@ -1494,6 +1550,51 @@ impl SurrealStore { Ok(format!("{info:?}")) } + pub async fn metrics_snapshot(&self) -> Result<SurrealMetricsSnapshot, SurrealStoreError> { + Ok(SurrealMetricsSnapshot { + stored_events: self + .count_query("SELECT VALUE count() FROM nostr_event GROUP ALL;") + .await?, + visible_events: self + .count_query( + "SELECT VALUE count() FROM nostr_event WHERE deleted = false AND hidden = false GROUP ALL;", + ) + .await?, + hidden_events: self + .count_query("SELECT VALUE count() FROM nostr_event WHERE hidden = true GROUP ALL;") + .await?, + deleted_events: self + .count_query( + "SELECT VALUE count() FROM nostr_event WHERE deleted = true GROUP ALL;", + ) + .await?, + current_listings: self + .count_query("SELECT VALUE count() FROM listing_current GROUP ALL;") + .await?, + active_listings: self + .count_query( + "SELECT VALUE count() FROM listing_current WHERE effective_status = 'active' AND hidden = false AND deleted = false GROUP ALL;", + ) + .await?, + seller_profiles: self + .count_query("SELECT VALUE count() FROM seller_profile GROUP ALL;") + .await?, + visible_seller_profiles: self + .count_query( + "SELECT VALUE count() FROM seller_profile WHERE hidden = false AND deleted = false GROUP ALL;", + ) + .await?, + approved_sellers: self + .count_query( + "SELECT VALUE count() FROM relay_user WHERE seller_approved = true AND blocked = false GROUP ALL;", + ) + .await?, + blocked_pubkeys: self + .count_query("SELECT VALUE count() FROM relay_user WHERE blocked = true GROUP ALL;") + .await?, + }) + } + pub async fn store_raw_event( &self, stored: &StoredEvent, @@ -1560,6 +1661,18 @@ CREATE type::record('nostr_event', $event_id) CONTENT { response.take(0).map_err(SurrealStoreError::from) } + async fn count_query(&self, statement: &str) -> Result<u64, SurrealStoreError> { + let mut response = self + .db + .query(statement) + .await + .map_err(SurrealStoreError::from)? + .check() + .map_err(SurrealStoreError::from)?; + let rows: Vec<serde_json::Value> = response.take(0).map_err(SurrealStoreError::from)?; + rows.into_iter().next().map(count_value).unwrap_or(Ok(0)) + } + pub async fn query_raw_events( &self, filter: &Filter, @@ -4203,6 +4316,18 @@ fn event_tags_json(event: &Event) -> Vec<serde_json::Value> { .collect() } +fn count_value(value: serde_json::Value) -> Result<u64, SurrealStoreError> { + if let Some(count) = value.as_u64() { + return Ok(count); + } + if let Some(count) = value.get("count").and_then(serde_json::Value::as_u64) { + return Ok(count); + } + Err(SurrealStoreError::new( + "surreal count query returned a non-numeric count", + )) +} + fn required_policy_text(value: &str, field: &str) -> Result<String, SurrealStoreError> { let value = value.trim(); if value.is_empty() { @@ -8792,6 +8917,72 @@ mod tests { } #[tokio::test] + async fn metrics_snapshot_counts_projected_store_state() { + let store = memory_store().await; + store + .apply_plan(&base_migration_plan()) + .await + .expect("apply plan"); + let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing"); + let profile = seller_profile( + 1_714_125_145, + "radroots-market", + Some("Radroots Market"), + &["PNW"], + &["Produce"], + &["CSA"], + ); + let blocked_pubkey = "b".repeat(PublicKeyHex::HEX_LENGTH); + + store + .store_raw_event(&StoredEvent::new( + listing.clone(), + UnixTimestamp::new(1_714_125_146), + )) + .await + .expect("store listing"); + store + .store_raw_event(&StoredEvent::new( + profile.clone(), + UnixTimestamp::new(1_714_125_147), + )) + .await + .expect("store profile"); + store + .project_current_listing(&listing, UnixTimestamp::new(1_714_125_148)) + .await + .expect("project listing"); + store + .project_seller_profile(&profile, UnixTimestamp::new(1_714_125_149)) + .await + .expect("project profile"); + store + .set_seller_approved( + FixtureKey::Seller.public_key().as_str(), + true, + UnixTimestamp::new(1_714_125_150), + ) + .await + .expect("approve seller"); + store + .set_pubkey_blocked(&blocked_pubkey, true, UnixTimestamp::new(1_714_125_151)) + .await + .expect("block pubkey"); + + let snapshot = store.metrics_snapshot().await.expect("snapshot"); + assert_eq!(snapshot.stored_events(), 2); + assert_eq!(snapshot.visible_events(), 2); + assert_eq!(snapshot.hidden_events(), 0); + assert_eq!(snapshot.deleted_events(), 0); + assert_eq!(snapshot.current_listings(), 1); + assert_eq!(snapshot.active_listings(), 1); + assert_eq!(snapshot.seller_profiles(), 1); + assert_eq!(snapshot.visible_seller_profiles(), 1); + assert_eq!(snapshot.approved_sellers(), 1); + assert_eq!(snapshot.blocked_pubkeys(), 1); + } + + #[tokio::test] async fn seller_profile_projection_tracks_replacement_moderation_and_deletion() { let store = memory_store().await; store