logging.rs (22611B)
1 #![forbid(unsafe_code)] 2 3 use crate::{ 4 config::{BaseRelayRuntimeConfig, BaseRelayTracingConfig, BaseRelayTracingFormat}, 5 errors::BaseRelayError, 6 }; 7 use std::{fmt, net::IpAddr, net::SocketAddr}; 8 use tangle_groups::{ 9 GroupEventClass, GroupEventView, KIND_GROUP_ADMINS, KIND_GROUP_CREATE_GROUP, 10 KIND_GROUP_DELETE_EVENT, KIND_GROUP_DELETE_GROUP, KIND_GROUP_EDIT_METADATA, 11 KIND_GROUP_JOIN_REQUEST, KIND_GROUP_LEAVE_REQUEST, KIND_GROUP_MEMBERS, KIND_GROUP_METADATA, 12 KIND_GROUP_PUT_USER, KIND_GROUP_REMOVE_USER, 13 }; 14 use tangle_protocol::{EventId, SubscriptionId, UnixTimestamp}; 15 use tracing_subscriber::EnvFilter; 16 17 pub const TANGLE_LOG_REDACTED: &str = "<redacted>"; 18 pub const TANGLE_LOG_SECRET_ABSENT: &str = "absent"; 19 20 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 21 pub enum TangleTracingInit { 22 Disabled, 23 Installed, 24 AlreadyInstalled, 25 } 26 27 #[derive(Clone, PartialEq, Eq)] 28 pub struct TangleLogRedactor { 29 secrets: Vec<String>, 30 } 31 32 impl TangleLogRedactor { 33 pub fn new<I, S>(secrets: I) -> Self 34 where 35 I: IntoIterator<Item = S>, 36 S: Into<String>, 37 { 38 let mut secrets = secrets 39 .into_iter() 40 .map(Into::into) 41 .filter(|secret| !secret.is_empty()) 42 .collect::<Vec<_>>(); 43 secrets.sort(); 44 secrets.dedup(); 45 Self { secrets } 46 } 47 48 pub fn from_runtime_config(config: &BaseRelayRuntimeConfig) -> Self { 49 Self::new( 50 config 51 .groups() 52 .relay_secret() 53 .map(|secret| secret.expose_for_signing().to_owned()), 54 ) 55 } 56 57 pub fn redact(&self, value: impl AsRef<str>) -> String { 58 let mut redacted = value.as_ref().to_owned(); 59 for secret in &self.secrets { 60 redacted = redacted.replace(secret, TANGLE_LOG_REDACTED); 61 } 62 redacted 63 } 64 65 pub fn contains_secret(&self, value: impl AsRef<str>) -> bool { 66 let value = value.as_ref(); 67 self.secrets.iter().any(|secret| value.contains(secret)) 68 } 69 70 pub fn secret_count(&self) -> usize { 71 self.secrets.len() 72 } 73 } 74 75 impl fmt::Debug for TangleLogRedactor { 76 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { 77 formatter 78 .debug_struct("TangleLogRedactor") 79 .field("secret_count", &self.secret_count()) 80 .finish() 81 } 82 } 83 84 #[derive(Debug, Clone, PartialEq, Eq)] 85 pub struct TangleRuntimeLogSummary { 86 listen_addr: SocketAddr, 87 relay_url: String, 88 groups_enabled: bool, 89 relay_secret: &'static str, 90 } 91 92 impl TangleRuntimeLogSummary { 93 pub fn from_config(config: &BaseRelayRuntimeConfig) -> Self { 94 Self { 95 listen_addr: config.listen_addr(), 96 relay_url: config.relay_url().to_owned(), 97 groups_enabled: config.groups().enabled(), 98 relay_secret: relay_secret_log_value(config), 99 } 100 } 101 102 pub fn listen_addr(&self) -> SocketAddr { 103 self.listen_addr 104 } 105 106 pub fn relay_url(&self) -> &str { 107 &self.relay_url 108 } 109 110 pub fn groups_enabled(&self) -> bool { 111 self.groups_enabled 112 } 113 114 pub fn relay_secret(&self) -> &'static str { 115 self.relay_secret 116 } 117 } 118 119 pub fn init_tangle_tracing( 120 config: &BaseRelayTracingConfig, 121 ) -> Result<TangleTracingInit, BaseRelayError> { 122 if !config.enabled() { 123 return Ok(TangleTracingInit::Disabled); 124 } 125 let filter = EnvFilter::try_new(config.filter()).map_err(|error| { 126 BaseRelayError::invalid(format!("observability.tracing.filter is invalid: {error}")) 127 })?; 128 let result = match config.format() { 129 BaseRelayTracingFormat::Compact => tracing_subscriber::fmt() 130 .with_env_filter(filter) 131 .with_writer(std::io::stderr) 132 .compact() 133 .try_init(), 134 BaseRelayTracingFormat::Json => tracing_subscriber::fmt() 135 .with_env_filter(filter) 136 .with_writer(std::io::stderr) 137 .json() 138 .try_init(), 139 }; 140 match result { 141 Ok(()) => Ok(TangleTracingInit::Installed), 142 Err(_) => Ok(TangleTracingInit::AlreadyInstalled), 143 } 144 } 145 146 pub fn log_runtime_config_loaded(config: &BaseRelayRuntimeConfig) { 147 let summary = TangleRuntimeLogSummary::from_config(config); 148 tracing::info!( 149 event = "runtime_config_loaded", 150 listen_addr = %summary.listen_addr(), 151 relay_url = summary.relay_url(), 152 groups_enabled = summary.groups_enabled(), 153 relay_secret = summary.relay_secret(), 154 "tangle runtime config loaded" 155 ); 156 } 157 158 pub fn log_runtime_opened(config: &BaseRelayRuntimeConfig) { 159 let summary = TangleRuntimeLogSummary::from_config(config); 160 tracing::info!( 161 event = "runtime_opened", 162 listen_addr = %summary.listen_addr(), 163 relay_url = summary.relay_url(), 164 groups_enabled = summary.groups_enabled(), 165 relay_secret = summary.relay_secret(), 166 "tangle runtime opened" 167 ); 168 } 169 170 pub fn log_server_listening(listen_addr: SocketAddr, relay_url: &str) { 171 tracing::info!( 172 event = "server_listening", 173 listen_addr = %listen_addr, 174 relay_url, 175 "tangle server listening" 176 ); 177 } 178 179 pub fn log_server_shutdown(listen_addr: SocketAddr, closed_subscriptions: usize) { 180 tracing::info!( 181 event = "server_shutdown", 182 listen_addr = %listen_addr, 183 closed_subscriptions, 184 "tangle server shut down" 185 ); 186 } 187 188 pub fn log_websocket_session_opened(connection_id: u64, peer_ip: Option<IpAddr>) { 189 tracing::info!( 190 event = "websocket_session_opened", 191 connection_id, 192 peer_ip = optional_ip(peer_ip), 193 "tangle websocket session opened" 194 ); 195 } 196 197 pub fn log_websocket_session_closed( 198 connection_id: u64, 199 peer_ip: Option<IpAddr>, 200 closed_subscriptions: usize, 201 ) { 202 tracing::info!( 203 event = "websocket_session_closed", 204 connection_id, 205 peer_ip = optional_ip(peer_ip), 206 closed_subscriptions, 207 "tangle websocket session closed" 208 ); 209 } 210 211 pub fn log_subscription_opened(connection_id: u64, subscription_id: &SubscriptionId) { 212 tracing::info!( 213 event = "subscription_opened", 214 connection_id, 215 subscription_id = subscription_id.as_str(), 216 "tangle subscription opened" 217 ); 218 } 219 220 pub fn log_rate_limit_rejected( 221 scope: &'static str, 222 dimension: &'static str, 223 reset_at: UnixTimestamp, 224 ) { 225 tracing::warn!( 226 event = "rate_limit_rejected", 227 scope, 228 dimension, 229 reset_at = reset_at.as_u64(), 230 "tangle rate limit rejected client message" 231 ); 232 } 233 234 pub fn log_event_stored(event_id: &EventId, stored_offsets: usize, total_stored_offsets: u64) { 235 tracing::info!( 236 event = "event_stored", 237 event_id = event_id.as_str(), 238 stored_offsets, 239 total_stored_offsets, 240 "tangle event stored" 241 ); 242 } 243 244 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 245 pub(crate) enum TangleModerationAuditResult { 246 Accepted, 247 Rejected, 248 } 249 250 impl TangleModerationAuditResult { 251 fn as_str(self) -> &'static str { 252 match self { 253 Self::Accepted => "accepted", 254 Self::Rejected => "rejected", 255 } 256 } 257 } 258 259 #[derive(Debug, Clone, PartialEq, Eq)] 260 pub(crate) struct TangleModerationAuditEntry { 261 action_family: &'static str, 262 result: &'static str, 263 event_id: String, 264 actor_pubkey: String, 265 event_kind: u32, 266 target_count: usize, 267 generated_state_rejection: bool, 268 } 269 270 impl TangleModerationAuditEntry { 271 pub(crate) fn new( 272 event: &(impl GroupEventView + ?Sized), 273 class: &GroupEventClass, 274 result: TangleModerationAuditResult, 275 ) -> Option<Self> { 276 let action_family = moderation_audit_action_family(event, class)?; 277 let event_id = event.id().ok()?; 278 let actor_pubkey = event.pubkey().ok()?; 279 let generated_state_rejection = matches!( 280 (class, result), 281 ( 282 GroupEventClass::RelayGeneratedSnapshot { .. }, 283 TangleModerationAuditResult::Rejected 284 ) 285 ); 286 Some(Self { 287 action_family, 288 result: result.as_str(), 289 event_id: event_id.as_str().to_owned(), 290 actor_pubkey: actor_pubkey.as_str().to_owned(), 291 event_kind: event.kind_u32(), 292 target_count: moderation_target_count(event, action_family), 293 generated_state_rejection, 294 }) 295 } 296 } 297 298 pub(crate) fn log_group_moderation_audit( 299 event: &(impl GroupEventView + ?Sized), 300 class: &GroupEventClass, 301 result: TangleModerationAuditResult, 302 ) { 303 let Some(entry) = TangleModerationAuditEntry::new(event, class, result) else { 304 return; 305 }; 306 tracing::info!( 307 event = "group_moderation_audit", 308 action_family = entry.action_family, 309 result = entry.result, 310 event_id = entry.event_id, 311 actor_pubkey = entry.actor_pubkey, 312 event_kind = entry.event_kind, 313 target_count = entry.target_count, 314 group_id = TANGLE_LOG_REDACTED, 315 group_id_redacted = true, 316 generated_state_rejection = entry.generated_state_rejection, 317 "tangle group moderation audit" 318 ); 319 } 320 321 pub fn sanitize_error_message(config: &BaseRelayRuntimeConfig, message: impl AsRef<str>) -> String { 322 TangleLogRedactor::from_runtime_config(config).redact(message) 323 } 324 325 fn moderation_audit_action_family( 326 event: &(impl GroupEventView + ?Sized), 327 class: &GroupEventClass, 328 ) -> Option<&'static str> { 329 match class { 330 GroupEventClass::Moderation { kind, .. } => match kind.as_u32() { 331 KIND_GROUP_CREATE_GROUP => Some("group_create"), 332 KIND_GROUP_EDIT_METADATA => Some("metadata"), 333 KIND_GROUP_DELETE_GROUP => Some("delete_group"), 334 KIND_GROUP_DELETE_EVENT => Some("delete_event"), 335 KIND_GROUP_PUT_USER => Some("put_user"), 336 KIND_GROUP_REMOVE_USER => Some("remove_user"), 337 _ => None, 338 }, 339 GroupEventClass::Normal { .. } => match event.kind_u32() { 340 KIND_GROUP_JOIN_REQUEST => Some("join"), 341 KIND_GROUP_LEAVE_REQUEST => Some("leave"), 342 _ => None, 343 }, 344 GroupEventClass::RelayGeneratedSnapshot { kind, .. } => match kind.as_u32() { 345 KIND_GROUP_METADATA => Some("metadata"), 346 KIND_GROUP_ADMINS => Some("admins"), 347 KIND_GROUP_MEMBERS => Some("members"), 348 _ => None, 349 }, 350 GroupEventClass::NonGroup => None, 351 } 352 } 353 354 fn moderation_target_count(event: &(impl GroupEventView + ?Sized), action_family: &str) -> usize { 355 let target_tag = match action_family { 356 "put_user" | "remove_user" | "members" => Some("p"), 357 "delete_event" => Some("e"), 358 _ => None, 359 }; 360 let Some(target_tag) = target_tag else { 361 return 0; 362 }; 363 let mut count = 0; 364 if event 365 .visit_tags(|tag| { 366 if tag 367 .indexed_pair() 368 .is_some_and(|(name, _)| name == target_tag) 369 { 370 count += 1; 371 } 372 Ok(()) 373 }) 374 .is_err() 375 { 376 return 0; 377 } 378 count 379 } 380 381 fn relay_secret_log_value(config: &BaseRelayRuntimeConfig) -> &'static str { 382 if config.groups().relay_secret().is_some() { 383 TANGLE_LOG_REDACTED 384 } else { 385 TANGLE_LOG_SECRET_ABSENT 386 } 387 } 388 389 fn optional_ip(peer_ip: Option<IpAddr>) -> String { 390 peer_ip 391 .map(|address| address.to_string()) 392 .unwrap_or_else(|| "unknown".to_owned()) 393 } 394 395 #[cfg(test)] 396 mod tests { 397 use super::{ 398 TANGLE_LOG_REDACTED, TangleLogRedactor, TangleModerationAuditEntry, 399 TangleModerationAuditResult, TangleRuntimeLogSummary, log_group_moderation_audit, 400 log_runtime_config_loaded, sanitize_error_message, 401 }; 402 use crate::config::parse_base_relay_runtime_config_json; 403 use crate::pocket_conversion::tangle_event_to_pocket; 404 use std::{ 405 io, 406 sync::{Arc, Mutex}, 407 }; 408 use tangle_groups::{ 409 GroupEventClass, GroupLimitsConfig, KIND_GROUP_ADMINS, KIND_GROUP_JOIN_REQUEST, 410 KIND_GROUP_MEMBERS, KIND_GROUP_METADATA, classify_group_event, 411 }; 412 use tangle_test_support::{ 413 FixtureKey, tangle_v2_address_group_tag, tangle_v2_delete_event_event, 414 tangle_v2_delete_group_event, tangle_v2_event, tangle_v2_group_create_event, 415 tangle_v2_group_event, tangle_v2_group_metadata_event, tangle_v2_group_tag, 416 tangle_v2_join_event, tangle_v2_leave_event, tangle_v2_put_user_event, 417 tangle_v2_remove_user_event, tangle_v2_tag, 418 }; 419 420 #[test] 421 fn log_redactor_removes_configured_relay_secret() { 422 let secret = "7".repeat(64); 423 let redactor = TangleLogRedactor::new([secret.clone()]); 424 425 assert_eq!( 426 redactor.redact(format!("relay secret {secret} loaded")), 427 "relay secret <redacted> loaded" 428 ); 429 assert!(redactor.contains_secret(format!("raw={secret}"))); 430 assert!(!format!("{redactor:?}").contains(&secret)); 431 } 432 433 #[test] 434 fn runtime_log_summary_never_contains_relay_secret() { 435 let raw = include_str!("../../../config/tangle.example.json"); 436 let config = parse_base_relay_runtime_config_json(raw).expect("config"); 437 let secret = "7".repeat(64); 438 let summary = TangleRuntimeLogSummary::from_config(&config); 439 440 assert_eq!(summary.relay_secret(), TANGLE_LOG_REDACTED); 441 assert!(!format!("{summary:?}").contains(&secret)); 442 assert_eq!( 443 sanitize_error_message(&config, format!("failed with relay secret {secret}")), 444 "failed with relay secret <redacted>" 445 ); 446 } 447 448 #[test] 449 fn structured_runtime_config_log_redacts_relay_secret() { 450 let raw = include_str!("../../../config/tangle.example.json"); 451 let config = parse_base_relay_runtime_config_json(raw).expect("config"); 452 let secret = "7".repeat(64); 453 let writer = CapturedWriter::default(); 454 let subscriber = tracing_subscriber::fmt() 455 .json() 456 .with_writer(writer.clone()) 457 .with_max_level(tracing::Level::INFO) 458 .finish(); 459 460 tracing::subscriber::with_default(subscriber, || { 461 log_runtime_config_loaded(&config); 462 }); 463 464 let output = writer.output(); 465 assert!(output.contains(r#""event":"runtime_config_loaded""#)); 466 assert!(output.contains(r#""relay_secret":"<redacted>""#)); 467 assert!(!output.contains(&secret)); 468 } 469 470 #[test] 471 fn group_moderation_audit_entries_cover_required_action_families_and_target_counts() { 472 let target = tangle_v2_group_event(FixtureKey::Member, "AuditFarm", 10, 1, "target") 473 .expect("target"); 474 let cases = [ 475 ( 476 tangle_v2_group_create_event(FixtureKey::Owner, "AuditFarm", 11, &[]) 477 .expect("create"), 478 "group_create", 479 0, 480 false, 481 ), 482 ( 483 tangle_v2_group_metadata_event( 484 FixtureKey::Owner, 485 "AuditFarm", 486 "Audit Farm", 487 12, 488 &[], 489 ) 490 .expect("metadata"), 491 "metadata", 492 0, 493 false, 494 ), 495 ( 496 tangle_v2_put_user_event(FixtureKey::Owner, "AuditFarm", FixtureKey::Member, 13) 497 .expect("put"), 498 "put_user", 499 1, 500 false, 501 ), 502 ( 503 tangle_v2_remove_user_event(FixtureKey::Owner, "AuditFarm", FixtureKey::Member, 14) 504 .expect("remove"), 505 "remove_user", 506 1, 507 false, 508 ), 509 ( 510 tangle_v2_delete_event_event(FixtureKey::Owner, "AuditFarm", &target, 15) 511 .expect("delete event"), 512 "delete_event", 513 1, 514 false, 515 ), 516 ( 517 tangle_v2_delete_group_event(FixtureKey::Owner, "AuditFarm", 16) 518 .expect("delete group"), 519 "delete_group", 520 0, 521 false, 522 ), 523 ( 524 tangle_v2_join_event(FixtureKey::Member, "AuditFarm", 17).expect("join"), 525 "join", 526 0, 527 false, 528 ), 529 ( 530 tangle_v2_leave_event(FixtureKey::Member, "AuditFarm", 18).expect("leave"), 531 "leave", 532 0, 533 false, 534 ), 535 ( 536 tangle_v2_event( 537 FixtureKey::Owner, 538 19, 539 KIND_GROUP_METADATA.into(), 540 vec![tangle_v2_address_group_tag("AuditFarm").expect("d")], 541 "", 542 ) 543 .expect("generated metadata"), 544 "metadata", 545 0, 546 true, 547 ), 548 ( 549 tangle_v2_event( 550 FixtureKey::Owner, 551 20, 552 KIND_GROUP_ADMINS.into(), 553 vec![tangle_v2_address_group_tag("AuditFarm").expect("d")], 554 "", 555 ) 556 .expect("generated admins"), 557 "admins", 558 0, 559 true, 560 ), 561 ( 562 tangle_v2_event( 563 FixtureKey::Owner, 564 21, 565 KIND_GROUP_MEMBERS.into(), 566 vec![ 567 tangle_v2_address_group_tag("AuditFarm").expect("d"), 568 tangle_v2_tag("p", &[FixtureKey::Member.public_key().as_str()]).expect("p"), 569 ], 570 "", 571 ) 572 .expect("generated members"), 573 "members", 574 1, 575 true, 576 ), 577 ]; 578 579 for (event, action_family, target_count, generated_state_rejection) in cases { 580 let pocket = tangle_event_to_pocket(&event).expect("pocket"); 581 let class = classify_group_event(&pocket, GroupLimitsConfig::default()).expect("class"); 582 let result = if matches!(class, GroupEventClass::RelayGeneratedSnapshot { .. }) { 583 TangleModerationAuditResult::Rejected 584 } else { 585 TangleModerationAuditResult::Accepted 586 }; 587 let entry = 588 TangleModerationAuditEntry::new(&pocket, &class, result).expect("audit entry"); 589 590 assert_eq!(entry.action_family, action_family); 591 assert_eq!(entry.target_count, target_count); 592 assert_eq!(entry.generated_state_rejection, generated_state_rejection); 593 assert_eq!(entry.result, result.as_str()); 594 } 595 } 596 597 #[test] 598 fn group_moderation_audit_log_redacts_group_content_invite_and_secret_values() { 599 let secret = "7".repeat(64); 600 let event = tangle_v2_event( 601 FixtureKey::Member, 602 31, 603 KIND_GROUP_JOIN_REQUEST.into(), 604 vec![ 605 tangle_v2_group_tag("SecretFarm").expect("h"), 606 tangle_v2_tag("code", &["invite-super-secret"]).expect("code"), 607 ], 608 "secret-content", 609 ) 610 .expect("join"); 611 let pocket = tangle_event_to_pocket(&event).expect("pocket"); 612 let class = classify_group_event(&pocket, GroupLimitsConfig::default()).expect("class"); 613 let writer = CapturedWriter::default(); 614 let subscriber = tracing_subscriber::fmt() 615 .json() 616 .with_writer(writer.clone()) 617 .with_max_level(tracing::Level::INFO) 618 .finish(); 619 620 tracing::subscriber::with_default(subscriber, || { 621 log_group_moderation_audit(&pocket, &class, TangleModerationAuditResult::Rejected); 622 }); 623 624 let output = writer.output(); 625 assert!(output.contains(r#""event":"group_moderation_audit""#)); 626 assert!(output.contains(r#""action_family":"join""#)); 627 assert!(output.contains(r#""result":"rejected""#)); 628 assert!(output.contains(r#""event_id":"#)); 629 assert!(output.contains(event.id().as_str())); 630 assert!(output.contains(r#""actor_pubkey":"#)); 631 assert!(output.contains(event.unsigned().pubkey().as_str())); 632 assert!(output.contains(r#""event_kind":9021"#)); 633 assert!(output.contains(r#""target_count":0"#)); 634 assert!(output.contains(r#""group_id":"<redacted>""#)); 635 assert!(output.contains(r#""group_id_redacted":true"#)); 636 assert!(output.contains(r#""generated_state_rejection":false"#)); 637 assert!(!output.contains("SecretFarm")); 638 assert!(!output.contains("secret-content")); 639 assert!(!output.contains("invite-super-secret")); 640 assert!(!output.contains(&secret)); 641 } 642 643 #[test] 644 fn group_moderation_audit_ignores_non_requested_group_event_kinds() { 645 let event = 646 tangle_v2_group_event(FixtureKey::Member, "AuditFarm", 41, 1, "normal").expect("event"); 647 let pocket = tangle_event_to_pocket(&event).expect("pocket"); 648 let class = classify_group_event(&pocket, GroupLimitsConfig::default()).expect("class"); 649 650 assert!( 651 TangleModerationAuditEntry::new(&pocket, &class, TangleModerationAuditResult::Accepted) 652 .is_none() 653 ); 654 } 655 656 #[derive(Clone, Default)] 657 struct CapturedWriter { 658 inner: Arc<Mutex<Vec<u8>>>, 659 } 660 661 impl CapturedWriter { 662 fn output(&self) -> String { 663 let bytes = self.inner.lock().expect("writer").clone(); 664 String::from_utf8(bytes).expect("utf8") 665 } 666 } 667 668 impl<'a> tracing_subscriber::fmt::MakeWriter<'a> for CapturedWriter { 669 type Writer = CapturedWriterGuard; 670 671 fn make_writer(&'a self) -> Self::Writer { 672 CapturedWriterGuard { 673 inner: Arc::clone(&self.inner), 674 } 675 } 676 } 677 678 struct CapturedWriterGuard { 679 inner: Arc<Mutex<Vec<u8>>>, 680 } 681 682 impl io::Write for CapturedWriterGuard { 683 fn write(&mut self, buffer: &[u8]) -> io::Result<usize> { 684 self.inner.lock().expect("writer").extend_from_slice(buffer); 685 Ok(buffer.len()) 686 } 687 688 fn flush(&mut self) -> io::Result<()> { 689 Ok(()) 690 } 691 } 692 }