tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit edf8c090a3694e7a93cf26ec9654d3e0aac87e21
parent 043977ca477ce6ac598f35885ad9b9a2588fab62
Author: triesap <tyson@radroots.org>
Date:   Sun, 14 Jun 2026 08:05:28 -0700

logging: add redacted relay events

- Add tracing initialization from the existing runtime tracing config and emit structured runtime, server, session, subscription, rate-limit, and event-storage records.

- Redact configured relay secrets through log summaries and sanitizer helpers, and route configured tracing output to stderr for the CLI run path.

- Cover redaction with focused logging tests and update the CLI run test to assert stderr logs never expose relay secret bytes.

- Validated with cargo fmt --all -- --check, cargo check --workspace --all-targets, cargo test --workspace, and cargo clippy --workspace --all-targets -- -D warnings.

Diffstat:
MCargo.lock | 118+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcrates/tangle/src/lib.rs | 7++++++-
Mcrates/tangle/tests/version.rs | 5++++-
Mcrates/tangle_runtime/Cargo.toml | 1+
Mcrates/tangle_runtime/src/lib.rs | 1+
Acrates/tangle_runtime/src/logging.rs | 354+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcrates/tangle_runtime/src/runtime.rs | 46+++++++++++++++++++++++++++++++---------------
Mcrates/tangle_runtime/src/server.rs | 4++++
Mcrates/tangle_runtime/src/session.rs | 16+++++++++++++++-
9 files changed, 534 insertions(+), 18 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -3,6 +3,15 @@ version = 4 [[package]] +name = "aho-corasick" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301" +dependencies = [ + "memchr", +] + +[[package]] name = "arrayvec" version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -680,6 +689,12 @@ dependencies = [ ] [[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + +[[package]] name = "libc" version = "0.2.186" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -709,6 +724,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "953f07c43838f8e6f9758cab68bf5bed85465e7587ebe0b823f1bcd81978ad3a" [[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + +[[package]] name = "matchit" version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -757,6 +781,15 @@ dependencies = [ ] [[package]] +name = "nu-ansi-term" +version = "0.50.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" +dependencies = [ + "windows-sys", +] + +[[package]] name = "once_cell" version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -947,6 +980,23 @@ dependencies = [ ] [[package]] +name = "regex-automata" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6f6ff9a378485b298a5286656da665ba74413d36db0979633275d2e708145d4" + +[[package]] name = "rfc6979" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1100,6 +1150,15 @@ dependencies = [ ] [[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + +[[package]] name = "shlex" version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1273,6 +1332,7 @@ dependencies = [ "tokio-tungstenite", "tower", "tracing", + "tracing-subscriber", ] [[package]] @@ -1315,6 +1375,15 @@ dependencies = [ ] [[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + +[[package]] name = "tinystr" version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1420,6 +1489,49 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-serde" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" +dependencies = [ + "serde", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex-automata", + "serde", + "serde_json", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", + "tracing-serde", ] [[package]] @@ -1469,6 +1581,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" [[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + +[[package]] name = "version_check" version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" diff --git a/crates/tangle/src/lib.rs b/crates/tangle/src/lib.rs @@ -148,8 +148,13 @@ pub fn require_config_path(invocation: &TangleInvocation) -> Result<&str, Tangle pub async fn run_with_config( config_path: &str, ) -> Result<tangle_runtime::server::TangleServeReport, String> { - let runtime = tangle_runtime::open_tangle_runtime_from_config_path(config_path) + let config = tangle_runtime::load_base_relay_runtime_config(config_path) .map_err(|error| error.to_string())?; + tangle_runtime::logging::init_tangle_tracing(config.tracing()) + .map_err(|error| error.to_string())?; + tangle_runtime::logging::log_runtime_config_loaded(&config); + let runtime = + tangle_runtime::runtime::TangleRuntime::open(config).map_err(|error| error.to_string())?; tangle_runtime::server::serve_until_shutdown(runtime) .await .map_err(|error| error.to_string()) diff --git a/crates/tangle/tests/version.rs b/crates/tangle/tests/version.rs @@ -203,7 +203,10 @@ fn tangle_run_starts_server_and_stays_alive_until_shutdown() { let output = child.stop().expect("stop child"); assert!(output.stdout.is_empty()); - assert!(output.stderr.is_empty()); + let stderr = String::from_utf8(output.stderr).expect("stderr utf8"); + assert!(stderr.contains(r#""event":"runtime_config_loaded""#)); + assert!(stderr.contains(r#""relay_secret":"<redacted>""#)); + assert!(!stderr.contains(TANGLE_V2_RELAY_SECRET_HEX)); std::fs::remove_dir_all(&root).expect("remove runtime root"); } diff --git a/crates/tangle_runtime/Cargo.toml b/crates/tangle_runtime/Cargo.toml @@ -19,6 +19,7 @@ tangle_protocol = { path = "../tangle_protocol" } tangle_store_pocket = { path = "../tangle_store_pocket" } tokio = { version = "1", features = ["net", "sync"] } tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } [dev-dependencies] futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] } diff --git a/crates/tangle_runtime/src/lib.rs b/crates/tangle_runtime/src/lib.rs @@ -5,6 +5,7 @@ pub mod config; pub mod errors; pub mod event_bus; pub mod groups; +pub mod logging; pub mod nip11; pub mod ops; pub(crate) mod pocket_conversion; diff --git a/crates/tangle_runtime/src/logging.rs b/crates/tangle_runtime/src/logging.rs @@ -0,0 +1,354 @@ +#![forbid(unsafe_code)] + +use crate::{ + config::{BaseRelayRuntimeConfig, BaseRelayTracingConfig, BaseRelayTracingFormat}, + errors::BaseRelayError, +}; +use std::{fmt, net::IpAddr, net::SocketAddr}; +use tangle_protocol::{EventId, SubscriptionId, UnixTimestamp}; +use tracing_subscriber::EnvFilter; + +pub const TANGLE_LOG_REDACTED: &str = "<redacted>"; +pub const TANGLE_LOG_SECRET_ABSENT: &str = "absent"; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TangleTracingInit { + Disabled, + Installed, + AlreadyInstalled, +} + +#[derive(Clone, PartialEq, Eq)] +pub struct TangleLogRedactor { + secrets: Vec<String>, +} + +impl TangleLogRedactor { + pub fn new<I, S>(secrets: I) -> Self + where + I: IntoIterator<Item = S>, + S: Into<String>, + { + let mut secrets = secrets + .into_iter() + .map(Into::into) + .filter(|secret| !secret.is_empty()) + .collect::<Vec<_>>(); + secrets.sort(); + secrets.dedup(); + Self { secrets } + } + + pub fn from_runtime_config(config: &BaseRelayRuntimeConfig) -> Self { + Self::new( + config + .groups() + .relay_secret() + .map(|secret| secret.expose_for_signing().to_owned()), + ) + } + + pub fn redact(&self, value: impl AsRef<str>) -> String { + let mut redacted = value.as_ref().to_owned(); + for secret in &self.secrets { + redacted = redacted.replace(secret, TANGLE_LOG_REDACTED); + } + redacted + } + + pub fn contains_secret(&self, value: impl AsRef<str>) -> bool { + let value = value.as_ref(); + self.secrets.iter().any(|secret| value.contains(secret)) + } + + pub fn secret_count(&self) -> usize { + self.secrets.len() + } +} + +impl fmt::Debug for TangleLogRedactor { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter + .debug_struct("TangleLogRedactor") + .field("secret_count", &self.secret_count()) + .finish() + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TangleRuntimeLogSummary { + listen_addr: SocketAddr, + relay_url: String, + groups_enabled: bool, + relay_secret: &'static str, +} + +impl TangleRuntimeLogSummary { + pub fn from_config(config: &BaseRelayRuntimeConfig) -> Self { + Self { + listen_addr: config.listen_addr(), + relay_url: config.relay_url().to_owned(), + groups_enabled: config.groups().enabled(), + relay_secret: relay_secret_log_value(config), + } + } + + pub fn listen_addr(&self) -> SocketAddr { + self.listen_addr + } + + pub fn relay_url(&self) -> &str { + &self.relay_url + } + + pub fn groups_enabled(&self) -> bool { + self.groups_enabled + } + + pub fn relay_secret(&self) -> &'static str { + self.relay_secret + } +} + +pub fn init_tangle_tracing( + config: &BaseRelayTracingConfig, +) -> Result<TangleTracingInit, BaseRelayError> { + if !config.enabled() { + return Ok(TangleTracingInit::Disabled); + } + let filter = EnvFilter::try_new(config.filter()).map_err(|error| { + BaseRelayError::invalid(format!("observability.tracing.filter is invalid: {error}")) + })?; + let result = match config.format() { + BaseRelayTracingFormat::Compact => tracing_subscriber::fmt() + .with_env_filter(filter) + .with_writer(std::io::stderr) + .compact() + .try_init(), + BaseRelayTracingFormat::Json => tracing_subscriber::fmt() + .with_env_filter(filter) + .with_writer(std::io::stderr) + .json() + .try_init(), + }; + match result { + Ok(()) => Ok(TangleTracingInit::Installed), + Err(_) => Ok(TangleTracingInit::AlreadyInstalled), + } +} + +pub fn log_runtime_config_loaded(config: &BaseRelayRuntimeConfig) { + let summary = TangleRuntimeLogSummary::from_config(config); + tracing::info!( + event = "runtime_config_loaded", + listen_addr = %summary.listen_addr(), + relay_url = summary.relay_url(), + groups_enabled = summary.groups_enabled(), + relay_secret = summary.relay_secret(), + "tangle runtime config loaded" + ); +} + +pub fn log_runtime_opened(config: &BaseRelayRuntimeConfig) { + let summary = TangleRuntimeLogSummary::from_config(config); + tracing::info!( + event = "runtime_opened", + listen_addr = %summary.listen_addr(), + relay_url = summary.relay_url(), + groups_enabled = summary.groups_enabled(), + relay_secret = summary.relay_secret(), + "tangle runtime opened" + ); +} + +pub fn log_server_listening(listen_addr: SocketAddr, relay_url: &str) { + tracing::info!( + event = "server_listening", + listen_addr = %listen_addr, + relay_url, + "tangle server listening" + ); +} + +pub fn log_server_shutdown(listen_addr: SocketAddr, closed_subscriptions: usize) { + tracing::info!( + event = "server_shutdown", + listen_addr = %listen_addr, + closed_subscriptions, + "tangle server shut down" + ); +} + +pub fn log_websocket_session_opened(connection_id: u64, peer_ip: Option<IpAddr>) { + tracing::info!( + event = "websocket_session_opened", + connection_id, + peer_ip = optional_ip(peer_ip), + "tangle websocket session opened" + ); +} + +pub fn log_websocket_session_closed( + connection_id: u64, + peer_ip: Option<IpAddr>, + closed_subscriptions: usize, +) { + tracing::info!( + event = "websocket_session_closed", + connection_id, + peer_ip = optional_ip(peer_ip), + closed_subscriptions, + "tangle websocket session closed" + ); +} + +pub fn log_subscription_opened(connection_id: u64, subscription_id: &SubscriptionId) { + tracing::info!( + event = "subscription_opened", + connection_id, + subscription_id = subscription_id.as_str(), + "tangle subscription opened" + ); +} + +pub fn log_rate_limit_rejected( + scope: &'static str, + dimension: &'static str, + reset_at: UnixTimestamp, +) { + tracing::warn!( + event = "rate_limit_rejected", + scope, + dimension, + reset_at = reset_at.as_u64(), + "tangle rate limit rejected client message" + ); +} + +pub fn log_event_stored(event_id: &EventId, stored_offsets: usize, total_stored_offsets: u64) { + tracing::info!( + event = "event_stored", + event_id = event_id.as_str(), + stored_offsets, + total_stored_offsets, + "tangle event stored" + ); +} + +pub fn sanitize_error_message(config: &BaseRelayRuntimeConfig, message: impl AsRef<str>) -> String { + TangleLogRedactor::from_runtime_config(config).redact(message) +} + +fn relay_secret_log_value(config: &BaseRelayRuntimeConfig) -> &'static str { + if config.groups().relay_secret().is_some() { + TANGLE_LOG_REDACTED + } else { + TANGLE_LOG_SECRET_ABSENT + } +} + +fn optional_ip(peer_ip: Option<IpAddr>) -> String { + peer_ip + .map(|address| address.to_string()) + .unwrap_or_else(|| "unknown".to_owned()) +} + +#[cfg(test)] +mod tests { + use super::{ + TANGLE_LOG_REDACTED, TangleLogRedactor, TangleRuntimeLogSummary, log_runtime_config_loaded, + sanitize_error_message, + }; + use crate::config::parse_base_relay_runtime_config_json; + use std::{ + io, + sync::{Arc, Mutex}, + }; + + #[test] + fn log_redactor_removes_configured_relay_secret() { + let secret = "7".repeat(64); + let redactor = TangleLogRedactor::new([secret.clone()]); + + assert_eq!( + redactor.redact(format!("relay secret {secret} loaded")), + "relay secret <redacted> loaded" + ); + assert!(redactor.contains_secret(format!("raw={secret}"))); + assert!(!format!("{redactor:?}").contains(&secret)); + } + + #[test] + fn runtime_log_summary_never_contains_relay_secret() { + let raw = include_str!("../../../ops/production/tangle-v2.example.json"); + let config = parse_base_relay_runtime_config_json(raw).expect("config"); + let secret = "7".repeat(64); + let summary = TangleRuntimeLogSummary::from_config(&config); + + assert_eq!(summary.relay_secret(), TANGLE_LOG_REDACTED); + assert!(!format!("{summary:?}").contains(&secret)); + assert_eq!( + sanitize_error_message(&config, format!("failed with relay secret {secret}")), + "failed with relay secret <redacted>" + ); + } + + #[test] + fn structured_runtime_config_log_redacts_relay_secret() { + let raw = include_str!("../../../ops/production/tangle-v2.example.json"); + let config = parse_base_relay_runtime_config_json(raw).expect("config"); + let secret = "7".repeat(64); + let writer = CapturedWriter::default(); + let subscriber = tracing_subscriber::fmt() + .json() + .with_writer(writer.clone()) + .with_max_level(tracing::Level::INFO) + .finish(); + + tracing::subscriber::with_default(subscriber, || { + log_runtime_config_loaded(&config); + }); + + let output = writer.output(); + assert!(output.contains(r#""event":"runtime_config_loaded""#)); + assert!(output.contains(r#""relay_secret":"<redacted>""#)); + assert!(!output.contains(&secret)); + } + + #[derive(Clone, Default)] + struct CapturedWriter { + inner: Arc<Mutex<Vec<u8>>>, + } + + impl CapturedWriter { + fn output(&self) -> String { + let bytes = self.inner.lock().expect("writer").clone(); + String::from_utf8(bytes).expect("utf8") + } + } + + impl<'a> tracing_subscriber::fmt::MakeWriter<'a> for CapturedWriter { + type Writer = CapturedWriterGuard; + + fn make_writer(&'a self) -> Self::Writer { + CapturedWriterGuard { + inner: Arc::clone(&self.inner), + } + } + } + + struct CapturedWriterGuard { + inner: Arc<Mutex<Vec<u8>>>, + } + + impl io::Write for CapturedWriterGuard { + fn write(&mut self, buffer: &[u8]) -> io::Result<usize> { + self.inner.lock().expect("writer").extend_from_slice(buffer); + Ok(buffer.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + } +} diff --git a/crates/tangle_runtime/src/runtime.rs b/crates/tangle_runtime/src/runtime.rs @@ -4,6 +4,7 @@ use crate::{ config::BaseRelayRuntimeConfig, errors::BaseRelayError, event_bus::{TangleEventBus, TangleEventReceiver}, + logging, ops::BaseRelayReadinessState, rate_limits::{ TangleQueryRateLimitConfig, TangleRateLimitDecision, TangleRateLimitKey, @@ -76,6 +77,7 @@ impl TangleRuntime { let relay = config.open_relay()?; let readiness = relay.readiness_state(); let rate_limiter = TangleRateLimiter::new(); + logging::log_runtime_opened(&config); Ok(Self { config, relay, @@ -357,13 +359,16 @@ impl TangleRuntime { ) -> Option<RelayMessage> { match self.rate_limiter.record(key, rule, now) { TangleRateLimitDecision::Allowed { .. } => None, - TangleRateLimitDecision::Rejected { reset_at } => Some(RelayMessage::Closed { - subscription_id: subscription_id.clone(), - message: BaseRelayError::rate_limited(format!( - "{label} {dimension} rate limit exceeded until {reset_at}" - )) - .prefixed_message(), - }), + TangleRateLimitDecision::Rejected { reset_at } => { + logging::log_rate_limit_rejected(label, dimension, reset_at); + Some(RelayMessage::Closed { + subscription_id: subscription_id.clone(), + message: BaseRelayError::rate_limited(format!( + "{label} {dimension} rate limit exceeded until {reset_at}" + )) + .prefixed_message(), + }) + } } } @@ -377,14 +382,17 @@ impl TangleRuntime { ) -> Option<RelayMessage> { match self.rate_limiter.record(key, rule, now) { TangleRateLimitDecision::Allowed { .. } => None, - TangleRateLimitDecision::Rejected { reset_at } => Some(RelayMessage::Ok { - event_id: event.id().clone(), - accepted: false, - message: BaseRelayError::rate_limited(format!( - "{label} rate limit exceeded until {reset_at}" - )) - .prefixed_message(), - }), + TangleRateLimitDecision::Rejected { reset_at } => { + logging::log_rate_limit_rejected(label, "event", reset_at); + Some(RelayMessage::Ok { + event_id: event.id().clone(), + accepted: false, + message: BaseRelayError::rate_limited(format!( + "{label} rate limit exceeded until {reset_at}" + )) + .prefixed_message(), + }) + } } } } @@ -430,6 +438,7 @@ impl TangleRuntimeHandle { let mut runtime = self.inner.lock().await; match message { ClientMessage::Event(event) => { + let event_id = event.id().clone(); if let Some(message) = runtime.rate_limit_event(&event, now) { return Ok(vec![message]); } @@ -443,6 +452,13 @@ impl TangleRuntimeHandle { runtime.metrics().record_stored_event_offset(); runtime.event_bus().publish(*offset); } + if !result.stored_offsets().is_empty() { + logging::log_event_stored( + &event_id, + result.stored_offsets().len(), + runtime.metrics().stored_event_offsets(), + ); + } Ok(vec![result.into_message()]) } ClientMessage::Req { diff --git a/crates/tangle_runtime/src/server.rs b/crates/tangle_runtime/src/server.rs @@ -2,6 +2,7 @@ use crate::{ errors::BaseRelayError, + logging, nip11::{BaseRelayInfoConfig, BaseRelayInfoDocument, base_relay_info_response}, ops::{BaseRelayReadinessState, base_relay_ops_router}, runtime::{TangleRuntime, TangleRuntimeHandle, TangleRuntimeLimits, TangleShutdownSignal}, @@ -59,6 +60,7 @@ pub async fn serve_listener_until_shutdown( let listen_addr = listener .local_addr() .map_err(|error| BaseRelayError::error(error.to_string()))?; + let relay_url = runtime.config().relay_url().to_owned(); let info = BaseRelayInfoConfig::new("tangle", runtime.config().groups().clone())?.build_document()?; let readiness = runtime.readiness_state().clone(); @@ -73,6 +75,7 @@ pub async fn serve_listener_until_shutdown( runtime.clone(), ); let mut shutdown = shutdown_signal.subscribe(); + logging::log_server_listening(listen_addr, &relay_url); axum::serve( listener, router.into_make_service_with_connect_info::<SocketAddr>(), @@ -90,6 +93,7 @@ pub async fn serve_listener_until_shutdown( .await .map_err(|error| BaseRelayError::error(error.to_string()))?; let shutdown = runtime.shutdown().await?; + logging::log_server_shutdown(listen_addr, shutdown.closed_subscriptions()); Ok(TangleServeReport::new( listen_addr, shutdown.closed_subscriptions(), diff --git a/crates/tangle_runtime/src/session.rs b/crates/tangle_runtime/src/session.rs @@ -3,6 +3,7 @@ use crate::{ errors::BaseRelayError, event_bus::{TangleEventReceiveError, TangleEventReceiver}, + logging, relay::{ auth::{BaseAuthState, generate_auth_challenge}, live::LiveSubscriptionSet, @@ -99,7 +100,14 @@ impl TangleWebSocketSession { } pub async fn run(mut self, mut socket: WebSocket) { + logging::log_websocket_session_opened(self.connection_id, self.peer_ip); if !self.issue_auth_challenge() { + let closed_subscriptions = self.subscriptions.close_all(); + logging::log_websocket_session_closed( + self.connection_id, + self.peer_ip, + closed_subscriptions, + ); return; } loop { @@ -144,7 +152,12 @@ impl TangleWebSocketSession { } } } - self.subscriptions.close_all(); + let closed_subscriptions = self.subscriptions.close_all(); + logging::log_websocket_session_closed( + self.connection_id, + self.peer_ip, + closed_subscriptions, + ); } async fn handle_event_receive_result( @@ -299,6 +312,7 @@ impl TangleWebSocketSession { filters.clone(), GroupAuthContext::new(self.auth.authenticated_pubkeys().iter().cloned()), )?; + logging::log_subscription_opened(self.connection_id, &subscription_id); match self .runtime .query_req_with_auth(subscription_id.clone(), filters, &self.auth)