audit.rs (36565B)
1 use std::collections::VecDeque; 2 use std::fs::{self, OpenOptions}; 3 use std::io::{BufRead, BufReader, Write}; 4 use std::path::{Path, PathBuf}; 5 use std::time::{SystemTime, UNIX_EPOCH}; 6 7 use radroots_nostr_signer::prelude::RadrootsNostrSignerConnectionId; 8 use serde::{Deserialize, Serialize}; 9 10 use crate::config::MycAuditConfig; 11 use crate::config::MycTransportDeliveryPolicy; 12 use crate::error::MycError; 13 14 const MYC_OPERATION_AUDIT_FILE_NAME: &str = "operations.jsonl"; 15 const MYC_OPERATION_AUDIT_ARCHIVE_PREFIX: &str = "operations."; 16 const MYC_OPERATION_AUDIT_ARCHIVE_SUFFIX: &str = ".jsonl"; 17 const MYC_OPERATION_AUDIT_INDEX_DIR_NAME: &str = "index"; 18 const MYC_OPERATION_AUDIT_INDEX_TMP_DIR_NAME: &str = "index.tmp"; 19 const MYC_OPERATION_AUDIT_ATTEMPTS_DIR_NAME: &str = "attempts"; 20 const MYC_OPERATION_AUDIT_LATEST_DIR_NAME: &str = "latest"; 21 const MYC_OPERATION_AUDIT_LATEST_SUFFIX: &str = ".attempt"; 22 23 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] 24 #[serde(rename_all = "snake_case")] 25 pub enum MycOperationAuditKind { 26 DeliveryRecovery, 27 ListenerResponsePublish, 28 ConnectAcceptPublish, 29 AuthReplayPublish, 30 AuthReplayRestore, 31 DiscoveryHandlerFetch, 32 DiscoveryHandlerPublish, 33 DiscoveryHandlerCompare, 34 DiscoveryHandlerRefresh, 35 DiscoveryHandlerRepair, 36 } 37 38 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] 39 #[serde(rename_all = "snake_case")] 40 pub enum MycOperationAuditOutcome { 41 Succeeded, 42 Rejected, 43 Restored, 44 Unavailable, 45 Missing, 46 Matched, 47 Drifted, 48 Conflicted, 49 Skipped, 50 } 51 52 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 53 pub struct MycOperationAuditRecord { 54 pub recorded_at_unix: u64, 55 pub operation: MycOperationAuditKind, 56 pub outcome: MycOperationAuditOutcome, 57 #[serde(default, skip_serializing_if = "Option::is_none")] 58 pub relay_url: Option<String>, 59 #[serde(default, skip_serializing_if = "Option::is_none")] 60 pub connection_id: Option<String>, 61 #[serde(default, skip_serializing_if = "Option::is_none")] 62 pub request_id: Option<String>, 63 #[serde(default, skip_serializing_if = "Option::is_none")] 64 pub attempt_id: Option<String>, 65 #[serde(default, skip_serializing_if = "Vec::is_empty")] 66 pub planned_repair_relays: Vec<String>, 67 #[serde(default, skip_serializing_if = "Vec::is_empty")] 68 pub blocked_relays: Vec<String>, 69 #[serde(default, skip_serializing_if = "Option::is_none")] 70 pub blocked_reason: Option<String>, 71 #[serde(default, skip_serializing_if = "Option::is_none")] 72 pub delivery_policy: Option<MycTransportDeliveryPolicy>, 73 #[serde(default, skip_serializing_if = "Option::is_none")] 74 pub required_acknowledged_relay_count: Option<usize>, 75 #[serde(default, skip_serializing_if = "Option::is_none")] 76 pub publish_attempt_count: Option<usize>, 77 pub relay_count: usize, 78 pub acknowledged_relay_count: usize, 79 pub relay_outcome_summary: String, 80 } 81 82 pub trait MycOperationAuditStore: Send + Sync { 83 fn config(&self) -> &MycAuditConfig; 84 fn append(&self, record: &MycOperationAuditRecord) -> Result<(), MycError>; 85 fn list(&self) -> Result<Vec<MycOperationAuditRecord>, MycError> { 86 self.list_with_limit(self.config().default_read_limit) 87 } 88 fn list_all(&self) -> Result<Vec<MycOperationAuditRecord>, MycError>; 89 fn list_with_limit(&self, limit: usize) -> Result<Vec<MycOperationAuditRecord>, MycError>; 90 fn list_for_connection( 91 &self, 92 connection_id: &RadrootsNostrSignerConnectionId, 93 ) -> Result<Vec<MycOperationAuditRecord>, MycError> { 94 self.list_for_connection_with_limit(connection_id, self.config().default_read_limit) 95 } 96 fn list_for_connection_with_limit( 97 &self, 98 connection_id: &RadrootsNostrSignerConnectionId, 99 limit: usize, 100 ) -> Result<Vec<MycOperationAuditRecord>, MycError>; 101 fn list_for_attempt_id( 102 &self, 103 attempt_id: &str, 104 ) -> Result<Vec<MycOperationAuditRecord>, MycError> { 105 self.list_for_attempt_id_with_limit(attempt_id, usize::MAX) 106 } 107 fn list_for_attempt_id_with_limit( 108 &self, 109 attempt_id: &str, 110 limit: usize, 111 ) -> Result<Vec<MycOperationAuditRecord>, MycError>; 112 fn latest_attempt_id_for_operation( 113 &self, 114 operation: MycOperationAuditKind, 115 ) -> Result<Option<String>, MycError>; 116 } 117 118 #[derive(Debug, Clone)] 119 pub struct MycJsonlOperationAuditStore { 120 audit_dir: PathBuf, 121 config: MycAuditConfig, 122 } 123 124 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 125 struct MycAuditRotationResult { 126 pruned_retained_records: bool, 127 } 128 129 impl MycOperationAuditRecord { 130 pub fn new( 131 operation: MycOperationAuditKind, 132 outcome: MycOperationAuditOutcome, 133 connection_id: Option<&RadrootsNostrSignerConnectionId>, 134 request_id: Option<&str>, 135 relay_count: usize, 136 acknowledged_relay_count: usize, 137 relay_outcome_summary: impl Into<String>, 138 ) -> Self { 139 Self { 140 recorded_at_unix: now_unix_secs(), 141 operation, 142 outcome, 143 relay_url: None, 144 connection_id: connection_id.map(ToString::to_string), 145 request_id: request_id.map(ToOwned::to_owned), 146 attempt_id: None, 147 planned_repair_relays: Vec::new(), 148 blocked_relays: Vec::new(), 149 blocked_reason: None, 150 delivery_policy: None, 151 required_acknowledged_relay_count: None, 152 publish_attempt_count: None, 153 relay_count, 154 acknowledged_relay_count, 155 relay_outcome_summary: relay_outcome_summary.into(), 156 } 157 } 158 159 pub fn with_relay_url(mut self, relay_url: impl Into<String>) -> Self { 160 self.relay_url = Some(relay_url.into()); 161 self 162 } 163 164 pub fn with_attempt_id(mut self, attempt_id: impl Into<String>) -> Self { 165 self.attempt_id = Some(attempt_id.into()); 166 self 167 } 168 169 pub fn with_planned_repair_relays(mut self, planned_repair_relays: Vec<String>) -> Self { 170 self.planned_repair_relays = planned_repair_relays; 171 self 172 } 173 174 pub fn with_blocked_relays( 175 mut self, 176 blocked_reason: impl Into<String>, 177 blocked_relays: Vec<String>, 178 ) -> Self { 179 self.blocked_reason = Some(blocked_reason.into()); 180 self.blocked_relays = blocked_relays; 181 self 182 } 183 184 pub fn with_delivery_details( 185 mut self, 186 delivery_policy: MycTransportDeliveryPolicy, 187 required_acknowledged_relay_count: usize, 188 publish_attempt_count: usize, 189 ) -> Self { 190 self.delivery_policy = Some(delivery_policy); 191 self.required_acknowledged_relay_count = Some(required_acknowledged_relay_count); 192 self.publish_attempt_count = Some(publish_attempt_count); 193 self 194 } 195 } 196 197 impl MycJsonlOperationAuditStore { 198 pub fn new(audit_dir: impl AsRef<Path>, config: MycAuditConfig) -> Self { 199 Self { 200 audit_dir: audit_dir.as_ref().to_path_buf(), 201 config, 202 } 203 } 204 205 pub fn path(&self) -> PathBuf { 206 self.active_path() 207 } 208 209 pub fn config(&self) -> &MycAuditConfig { 210 &self.config 211 } 212 213 pub fn append(&self, record: &MycOperationAuditRecord) -> Result<(), MycError> { 214 let active_path = self.active_path(); 215 let encoded = serde_json::to_vec(record).map_err(|source| MycError::AuditSerialize { 216 path: active_path.clone(), 217 source, 218 })?; 219 let rotation = self.rotate_if_needed(encoded.len() as u64 + 1)?; 220 self.append_encoded_record_line(&active_path, &encoded)?; 221 222 if rotation.pruned_retained_records { 223 self.rebuild_query_indexes_from_retained_logs()?; 224 } else { 225 self.append_record_to_indexes(record)?; 226 } 227 Ok(()) 228 } 229 230 pub fn list(&self) -> Result<Vec<MycOperationAuditRecord>, MycError> { 231 self.list_with_limit(self.config.default_read_limit) 232 } 233 234 pub fn list_all(&self) -> Result<Vec<MycOperationAuditRecord>, MycError> { 235 self.list_matching(usize::MAX, |_| true) 236 } 237 238 pub fn list_with_limit(&self, limit: usize) -> Result<Vec<MycOperationAuditRecord>, MycError> { 239 self.list_matching(limit, |_| true) 240 } 241 242 pub fn list_for_connection( 243 &self, 244 connection_id: &RadrootsNostrSignerConnectionId, 245 ) -> Result<Vec<MycOperationAuditRecord>, MycError> { 246 self.list_for_connection_with_limit(connection_id, self.config.default_read_limit) 247 } 248 249 pub fn list_for_connection_with_limit( 250 &self, 251 connection_id: &RadrootsNostrSignerConnectionId, 252 limit: usize, 253 ) -> Result<Vec<MycOperationAuditRecord>, MycError> { 254 self.list_matching(limit, |record| { 255 record.connection_id.as_deref() == Some(connection_id.as_str()) 256 }) 257 } 258 259 pub fn list_for_attempt_id( 260 &self, 261 attempt_id: &str, 262 ) -> Result<Vec<MycOperationAuditRecord>, MycError> { 263 self.list_for_attempt_id_with_limit(attempt_id, usize::MAX) 264 } 265 266 pub fn list_for_attempt_id_with_limit( 267 &self, 268 attempt_id: &str, 269 limit: usize, 270 ) -> Result<Vec<MycOperationAuditRecord>, MycError> { 271 if limit == 0 { 272 return Ok(Vec::new()); 273 } 274 275 let attempt_path = self.attempt_index_path(attempt_id); 276 if !attempt_path.exists() { 277 self.rebuild_query_indexes_from_retained_logs()?; 278 } 279 self.read_recent_records_from_path_with_limit(&attempt_path, limit) 280 } 281 282 pub fn latest_attempt_id_for_operation( 283 &self, 284 operation: MycOperationAuditKind, 285 ) -> Result<Option<String>, MycError> { 286 let latest_path = self.latest_attempt_path(operation); 287 if !latest_path.exists() { 288 self.rebuild_query_indexes_from_retained_logs()?; 289 } 290 self.read_latest_attempt_id_from_path(&latest_path) 291 } 292 293 fn list_matching<F>( 294 &self, 295 limit: usize, 296 predicate: F, 297 ) -> Result<Vec<MycOperationAuditRecord>, MycError> 298 where 299 F: Fn(&MycOperationAuditRecord) -> bool, 300 { 301 if limit == 0 { 302 return Ok(Vec::new()); 303 } 304 305 let mut newest_records = Vec::new(); 306 for path in self.read_paths_newest_first()? { 307 let remaining = limit.saturating_sub(newest_records.len()); 308 if remaining == 0 { 309 break; 310 } 311 312 let mut file_records = 313 self.read_recent_records_from_path_matching(&path, remaining, &predicate)?; 314 file_records.reverse(); 315 newest_records.extend(file_records); 316 } 317 318 newest_records.reverse(); 319 Ok(newest_records) 320 } 321 322 fn read_records_from_path( 323 &self, 324 path: &Path, 325 ) -> Result<Vec<MycOperationAuditRecord>, MycError> { 326 if !path.exists() { 327 return Ok(Vec::new()); 328 } 329 330 let file = fs::File::open(path).map_err(|source| MycError::AuditIo { 331 path: path.to_path_buf(), 332 source, 333 })?; 334 let reader = BufReader::new(file); 335 let mut records = Vec::new(); 336 337 for (line_number, line) in reader.lines().enumerate() { 338 let line = line.map_err(|source| MycError::AuditIo { 339 path: path.to_path_buf(), 340 source, 341 })?; 342 if line.trim().is_empty() { 343 continue; 344 } 345 346 let record = 347 serde_json::from_str::<MycOperationAuditRecord>(&line).map_err(|source| { 348 MycError::AuditParse { 349 path: path.to_path_buf(), 350 line_number: line_number + 1, 351 source, 352 } 353 })?; 354 records.push(record); 355 } 356 357 Ok(records) 358 } 359 360 fn rotate_if_needed(&self, additional_bytes: u64) -> Result<MycAuditRotationResult, MycError> { 361 let active_path = self.active_path(); 362 let current_len = match fs::metadata(&active_path) { 363 Ok(metadata) => metadata.len(), 364 Err(error) if error.kind() == std::io::ErrorKind::NotFound => 0, 365 Err(source) => { 366 return Err(MycError::AuditIo { 367 path: active_path, 368 source, 369 }); 370 } 371 }; 372 373 if current_len == 0 374 || current_len.saturating_add(additional_bytes) <= self.config.max_active_file_bytes 375 { 376 return Ok(MycAuditRotationResult { 377 pruned_retained_records: false, 378 }); 379 } 380 381 self.rotate_active_file() 382 } 383 384 fn rotate_active_file(&self) -> Result<MycAuditRotationResult, MycError> { 385 let mut pruned_retained_records = false; 386 for index in (1..=self.config.max_archived_files).rev() { 387 let archived_path = self.archive_path(index); 388 if !archived_path.exists() { 389 continue; 390 } 391 392 if index == self.config.max_archived_files { 393 fs::remove_file(&archived_path).map_err(|source| MycError::AuditIo { 394 path: archived_path, 395 source, 396 })?; 397 pruned_retained_records = true; 398 } else { 399 let next_path = self.archive_path(index + 1); 400 fs::rename(&archived_path, &next_path).map_err(|source| MycError::AuditIo { 401 path: archived_path, 402 source, 403 })?; 404 } 405 } 406 407 let active_path = self.active_path(); 408 if !active_path.exists() { 409 return Ok(MycAuditRotationResult { 410 pruned_retained_records, 411 }); 412 } 413 414 if self.config.max_archived_files == 0 { 415 fs::remove_file(&active_path).map_err(|source| MycError::AuditIo { 416 path: active_path, 417 source, 418 })?; 419 return Ok(MycAuditRotationResult { 420 pruned_retained_records: true, 421 }); 422 } 423 424 let first_archive = self.archive_path(1); 425 fs::rename(&active_path, &first_archive).map_err(|source| MycError::AuditIo { 426 path: active_path, 427 source, 428 })?; 429 Ok(MycAuditRotationResult { 430 pruned_retained_records, 431 }) 432 } 433 434 fn read_paths_newest_first(&self) -> Result<Vec<PathBuf>, MycError> { 435 let mut paths = Vec::new(); 436 let active_path = self.active_path(); 437 if active_path.exists() { 438 paths.push(active_path); 439 } 440 441 let mut archived = self.archived_paths()?; 442 archived.sort_by_key(|(_, index)| *index); 443 for (path, _) in archived { 444 paths.push(path); 445 } 446 447 Ok(paths) 448 } 449 450 fn archived_paths(&self) -> Result<Vec<(PathBuf, usize)>, MycError> { 451 let mut archived = Vec::new(); 452 if !self.audit_dir.exists() { 453 return Ok(archived); 454 } 455 456 for entry in fs::read_dir(&self.audit_dir).map_err(|source| MycError::AuditIo { 457 path: self.audit_dir.clone(), 458 source, 459 })? { 460 let entry = entry.map_err(|source| MycError::AuditIo { 461 path: self.audit_dir.clone(), 462 source, 463 })?; 464 let file_name = entry.file_name(); 465 let Some(file_name) = file_name.to_str() else { 466 continue; 467 }; 468 let Some(index) = parse_archive_index(file_name) else { 469 continue; 470 }; 471 archived.push((entry.path(), index)); 472 } 473 474 Ok(archived) 475 } 476 477 fn active_path(&self) -> PathBuf { 478 self.audit_dir.join(MYC_OPERATION_AUDIT_FILE_NAME) 479 } 480 481 fn archive_path(&self, index: usize) -> PathBuf { 482 self.audit_dir.join(format!( 483 "{MYC_OPERATION_AUDIT_ARCHIVE_PREFIX}{index}{MYC_OPERATION_AUDIT_ARCHIVE_SUFFIX}" 484 )) 485 } 486 487 fn index_dir(&self) -> PathBuf { 488 self.audit_dir.join(MYC_OPERATION_AUDIT_INDEX_DIR_NAME) 489 } 490 491 fn attempt_index_dir(&self) -> PathBuf { 492 self.index_dir().join(MYC_OPERATION_AUDIT_ATTEMPTS_DIR_NAME) 493 } 494 495 fn latest_attempt_dir(&self) -> PathBuf { 496 self.index_dir().join(MYC_OPERATION_AUDIT_LATEST_DIR_NAME) 497 } 498 499 fn attempt_index_path(&self, attempt_id: &str) -> PathBuf { 500 self.attempt_index_dir() 501 .join(format!("{}.jsonl", encode_index_component(attempt_id))) 502 } 503 504 fn latest_attempt_path(&self, operation: MycOperationAuditKind) -> PathBuf { 505 self.latest_attempt_dir().join(format!( 506 "{}{MYC_OPERATION_AUDIT_LATEST_SUFFIX}", 507 operation_index_label(operation) 508 )) 509 } 510 511 fn append_encoded_record_line(&self, path: &Path, encoded: &[u8]) -> Result<(), MycError> { 512 let mut file = OpenOptions::new() 513 .create(true) 514 .append(true) 515 .open(path) 516 .map_err(|source| MycError::AuditIo { 517 path: path.to_path_buf(), 518 source, 519 })?; 520 file.write_all(encoded) 521 .map_err(|source| MycError::AuditIo { 522 path: path.to_path_buf(), 523 source, 524 })?; 525 file.write_all(b"\n").map_err(|source| MycError::AuditIo { 526 path: path.to_path_buf(), 527 source, 528 })?; 529 Ok(()) 530 } 531 532 fn append_record_to_indexes(&self, record: &MycOperationAuditRecord) -> Result<(), MycError> { 533 let Some(attempt_id) = record.attempt_id.as_deref() else { 534 return Ok(()); 535 }; 536 537 self.ensure_index_dirs()?; 538 self.append_record_to_index_root(&self.index_dir(), record)?; 539 self.write_latest_attempt_pointer(record.operation, attempt_id) 540 } 541 542 fn ensure_index_dirs(&self) -> Result<(), MycError> { 543 fs::create_dir_all(self.attempt_index_dir()).map_err(|source| MycError::AuditIo { 544 path: self.attempt_index_dir(), 545 source, 546 })?; 547 fs::create_dir_all(self.latest_attempt_dir()).map_err(|source| MycError::AuditIo { 548 path: self.latest_attempt_dir(), 549 source, 550 })?; 551 Ok(()) 552 } 553 554 fn append_record_to_index_root( 555 &self, 556 index_root: &Path, 557 record: &MycOperationAuditRecord, 558 ) -> Result<(), MycError> { 559 let Some(attempt_id) = record.attempt_id.as_deref() else { 560 return Ok(()); 561 }; 562 563 let attempts_dir = index_root.join(MYC_OPERATION_AUDIT_ATTEMPTS_DIR_NAME); 564 fs::create_dir_all(&attempts_dir).map_err(|source| MycError::AuditIo { 565 path: attempts_dir.clone(), 566 source, 567 })?; 568 let latest_dir = index_root.join(MYC_OPERATION_AUDIT_LATEST_DIR_NAME); 569 fs::create_dir_all(&latest_dir).map_err(|source| MycError::AuditIo { 570 path: latest_dir.clone(), 571 source, 572 })?; 573 574 let encoded = serde_json::to_vec(record).map_err(|source| MycError::AuditSerialize { 575 path: attempts_dir.join(format!("{}.jsonl", encode_index_component(attempt_id))), 576 source, 577 })?; 578 self.append_encoded_record_line( 579 &attempts_dir.join(format!("{}.jsonl", encode_index_component(attempt_id))), 580 &encoded, 581 )?; 582 self.write_latest_attempt_pointer_to_root(index_root, record.operation, attempt_id) 583 } 584 585 fn write_latest_attempt_pointer( 586 &self, 587 operation: MycOperationAuditKind, 588 attempt_id: &str, 589 ) -> Result<(), MycError> { 590 self.write_latest_attempt_pointer_to_root(&self.index_dir(), operation, attempt_id) 591 } 592 593 fn write_latest_attempt_pointer_to_root( 594 &self, 595 index_root: &Path, 596 operation: MycOperationAuditKind, 597 attempt_id: &str, 598 ) -> Result<(), MycError> { 599 let latest_dir = index_root.join(MYC_OPERATION_AUDIT_LATEST_DIR_NAME); 600 fs::create_dir_all(&latest_dir).map_err(|source| MycError::AuditIo { 601 path: latest_dir.clone(), 602 source, 603 })?; 604 let path = latest_dir.join(format!( 605 "{}{MYC_OPERATION_AUDIT_LATEST_SUFFIX}", 606 operation_index_label(operation) 607 )); 608 write_atomic_text(&path, attempt_id) 609 } 610 611 fn rebuild_query_indexes_from_retained_logs(&self) -> Result<(), MycError> { 612 let staging_root = self.audit_dir.join(MYC_OPERATION_AUDIT_INDEX_TMP_DIR_NAME); 613 if staging_root.exists() { 614 fs::remove_dir_all(&staging_root).map_err(|source| MycError::AuditIo { 615 path: staging_root.clone(), 616 source, 617 })?; 618 } 619 fs::create_dir_all(&staging_root).map_err(|source| MycError::AuditIo { 620 path: staging_root.clone(), 621 source, 622 })?; 623 624 let mut retained_paths = self.read_paths_newest_first()?; 625 retained_paths.reverse(); 626 for path in retained_paths { 627 for record in self.read_records_from_path(&path)? { 628 self.append_record_to_index_root(&staging_root, &record)?; 629 } 630 } 631 632 let final_root = self.index_dir(); 633 if final_root.exists() { 634 fs::remove_dir_all(&final_root).map_err(|source| MycError::AuditIo { 635 path: final_root.clone(), 636 source, 637 })?; 638 } 639 fs::rename(&staging_root, &final_root).map_err(|source| MycError::AuditIo { 640 path: staging_root, 641 source, 642 })?; 643 Ok(()) 644 } 645 646 fn read_latest_attempt_id_from_path(&self, path: &Path) -> Result<Option<String>, MycError> { 647 match fs::read_to_string(path) { 648 Ok(contents) => { 649 let attempt_id = contents.trim(); 650 if attempt_id.is_empty() { 651 Ok(None) 652 } else { 653 Ok(Some(attempt_id.to_owned())) 654 } 655 } 656 Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None), 657 Err(source) => Err(MycError::AuditIo { 658 path: path.to_path_buf(), 659 source, 660 }), 661 } 662 } 663 664 fn read_recent_records_from_path_with_limit( 665 &self, 666 path: &Path, 667 limit: usize, 668 ) -> Result<Vec<MycOperationAuditRecord>, MycError> { 669 self.read_recent_records_from_path_matching(path, limit, &|_| true) 670 } 671 672 fn read_recent_records_from_path_matching<F>( 673 &self, 674 path: &Path, 675 limit: usize, 676 predicate: &F, 677 ) -> Result<Vec<MycOperationAuditRecord>, MycError> 678 where 679 F: Fn(&MycOperationAuditRecord) -> bool, 680 { 681 if limit == 0 || !path.exists() { 682 return Ok(Vec::new()); 683 } 684 685 let file = fs::File::open(path).map_err(|source| MycError::AuditIo { 686 path: path.to_path_buf(), 687 source, 688 })?; 689 let reader = BufReader::new(file); 690 let mut recent_records = VecDeque::new(); 691 692 for (line_number, line) in reader.lines().enumerate() { 693 let line = line.map_err(|source| MycError::AuditIo { 694 path: path.to_path_buf(), 695 source, 696 })?; 697 if line.trim().is_empty() { 698 continue; 699 } 700 701 let record = 702 serde_json::from_str::<MycOperationAuditRecord>(&line).map_err(|source| { 703 MycError::AuditParse { 704 path: path.to_path_buf(), 705 line_number: line_number + 1, 706 source, 707 } 708 })?; 709 if !predicate(&record) { 710 continue; 711 } 712 713 if recent_records.len() == limit { 714 recent_records.pop_front(); 715 } 716 recent_records.push_back(record); 717 } 718 719 Ok(recent_records.into_iter().collect()) 720 } 721 } 722 723 impl MycOperationAuditStore for MycJsonlOperationAuditStore { 724 fn config(&self) -> &MycAuditConfig { 725 &self.config 726 } 727 728 fn append(&self, record: &MycOperationAuditRecord) -> Result<(), MycError> { 729 MycJsonlOperationAuditStore::append(self, record) 730 } 731 732 fn list_all(&self) -> Result<Vec<MycOperationAuditRecord>, MycError> { 733 MycJsonlOperationAuditStore::list_all(self) 734 } 735 736 fn list_with_limit(&self, limit: usize) -> Result<Vec<MycOperationAuditRecord>, MycError> { 737 MycJsonlOperationAuditStore::list_with_limit(self, limit) 738 } 739 740 fn list_for_connection_with_limit( 741 &self, 742 connection_id: &RadrootsNostrSignerConnectionId, 743 limit: usize, 744 ) -> Result<Vec<MycOperationAuditRecord>, MycError> { 745 MycJsonlOperationAuditStore::list_for_connection_with_limit(self, connection_id, limit) 746 } 747 748 fn list_for_attempt_id_with_limit( 749 &self, 750 attempt_id: &str, 751 limit: usize, 752 ) -> Result<Vec<MycOperationAuditRecord>, MycError> { 753 MycJsonlOperationAuditStore::list_for_attempt_id_with_limit(self, attempt_id, limit) 754 } 755 756 fn latest_attempt_id_for_operation( 757 &self, 758 operation: MycOperationAuditKind, 759 ) -> Result<Option<String>, MycError> { 760 MycJsonlOperationAuditStore::latest_attempt_id_for_operation(self, operation) 761 } 762 } 763 764 fn parse_archive_index(file_name: &str) -> Option<usize> { 765 file_name 766 .strip_prefix(MYC_OPERATION_AUDIT_ARCHIVE_PREFIX)? 767 .strip_suffix(MYC_OPERATION_AUDIT_ARCHIVE_SUFFIX)? 768 .parse() 769 .ok() 770 } 771 772 fn operation_index_label(kind: MycOperationAuditKind) -> &'static str { 773 match kind { 774 MycOperationAuditKind::DeliveryRecovery => "delivery_recovery", 775 MycOperationAuditKind::ListenerResponsePublish => "listener_response_publish", 776 MycOperationAuditKind::ConnectAcceptPublish => "connect_accept_publish", 777 MycOperationAuditKind::AuthReplayPublish => "auth_replay_publish", 778 MycOperationAuditKind::AuthReplayRestore => "auth_replay_restore", 779 MycOperationAuditKind::DiscoveryHandlerFetch => "discovery_handler_fetch", 780 MycOperationAuditKind::DiscoveryHandlerPublish => "discovery_handler_publish", 781 MycOperationAuditKind::DiscoveryHandlerCompare => "discovery_handler_compare", 782 MycOperationAuditKind::DiscoveryHandlerRefresh => "discovery_handler_refresh", 783 MycOperationAuditKind::DiscoveryHandlerRepair => "discovery_handler_repair", 784 } 785 } 786 787 fn encode_index_component(value: &str) -> String { 788 let mut encoded = String::with_capacity(value.len() * 2); 789 for byte in value.bytes() { 790 encoded.push_str(&format!("{byte:02x}")); 791 } 792 encoded 793 } 794 795 fn write_atomic_text(path: &Path, contents: &str) -> Result<(), MycError> { 796 let tmp_path = path.with_extension("tmp"); 797 fs::write(&tmp_path, contents).map_err(|source| MycError::AuditIo { 798 path: tmp_path.clone(), 799 source, 800 })?; 801 fs::rename(&tmp_path, path).map_err(|source| MycError::AuditIo { 802 path: tmp_path, 803 source, 804 })?; 805 Ok(()) 806 } 807 808 fn now_unix_secs() -> u64 { 809 SystemTime::now() 810 .duration_since(UNIX_EPOCH) 811 .expect("system clock is before unix epoch") 812 .as_secs() 813 } 814 815 #[cfg(test)] 816 mod tests { 817 use std::fs; 818 819 use radroots_nostr_signer::prelude::RadrootsNostrSignerConnectionId; 820 821 use crate::config::MycAuditConfig; 822 823 use super::{ 824 MycJsonlOperationAuditStore, MycOperationAuditKind, MycOperationAuditOutcome, 825 MycOperationAuditRecord, 826 }; 827 828 fn config() -> MycAuditConfig { 829 MycAuditConfig { 830 default_read_limit: 10, 831 max_active_file_bytes: 512, 832 max_archived_files: 2, 833 } 834 } 835 836 #[test] 837 fn append_and_list_operation_audit_records() { 838 let temp = tempfile::tempdir().expect("tempdir"); 839 let store = MycJsonlOperationAuditStore::new(temp.path(), config()); 840 let connection_id = 841 RadrootsNostrSignerConnectionId::parse("connection-1").expect("connection id"); 842 843 store 844 .append( 845 &MycOperationAuditRecord::new( 846 MycOperationAuditKind::ConnectAcceptPublish, 847 MycOperationAuditOutcome::Rejected, 848 Some(&connection_id), 849 Some("request-1"), 850 2, 851 0, 852 "0/2 relays acknowledged publish; failures: relay-a: rejected", 853 ) 854 .with_attempt_id("attempt-1"), 855 ) 856 .expect("append rejected record"); 857 store 858 .append(&MycOperationAuditRecord::new( 859 MycOperationAuditKind::AuthReplayRestore, 860 MycOperationAuditOutcome::Restored, 861 Some(&connection_id), 862 Some("request-1"), 863 0, 864 0, 865 "restored pending auth challenge after replay publish rejection", 866 )) 867 .expect("append restored record"); 868 869 let records = store.list().expect("list records"); 870 assert_eq!(records.len(), 2); 871 assert_eq!( 872 records[0].operation, 873 MycOperationAuditKind::ConnectAcceptPublish 874 ); 875 assert_eq!(records[0].outcome, MycOperationAuditOutcome::Rejected); 876 assert_eq!(records[0].connection_id.as_deref(), Some("connection-1")); 877 assert_eq!(records[0].request_id.as_deref(), Some("request-1")); 878 assert_eq!(records[0].attempt_id.as_deref(), Some("attempt-1")); 879 assert_eq!(records[0].relay_count, 2); 880 assert_eq!(records[0].acknowledged_relay_count, 0); 881 882 let connection_records = store 883 .list_for_connection(&connection_id) 884 .expect("list connection records"); 885 assert_eq!(connection_records, records); 886 } 887 888 #[test] 889 fn list_returns_empty_when_audit_file_is_missing() { 890 let temp = tempfile::tempdir().expect("tempdir"); 891 let store = MycJsonlOperationAuditStore::new(temp.path(), config()); 892 893 assert!(store.list().expect("list missing records").is_empty()); 894 } 895 896 #[test] 897 fn rotation_and_bounded_reads_keep_recent_records() { 898 let temp = tempfile::tempdir().expect("tempdir"); 899 let store = MycJsonlOperationAuditStore::new( 900 temp.path(), 901 MycAuditConfig { 902 default_read_limit: 3, 903 max_active_file_bytes: 180, 904 max_archived_files: 2, 905 }, 906 ); 907 908 for index in 0..6 { 909 store 910 .append( 911 &MycOperationAuditRecord::new( 912 MycOperationAuditKind::ListenerResponsePublish, 913 MycOperationAuditOutcome::Rejected, 914 None, 915 Some(&format!("request-{index}")), 916 1, 917 0, 918 format!("failure-{index}"), 919 ) 920 .with_attempt_id(format!("attempt-{index}")), 921 ) 922 .expect("append record"); 923 } 924 925 let records = store.list().expect("list bounded records"); 926 assert_eq!(records.len(), 3); 927 assert_eq!(records[0].request_id.as_deref(), Some("request-3")); 928 assert_eq!(records[2].request_id.as_deref(), Some("request-5")); 929 assert!(temp.path().join("operations.1.jsonl").exists()); 930 assert!(temp.path().join("operations.2.jsonl").exists()); 931 assert!(!temp.path().join("operations.3.jsonl").exists()); 932 } 933 934 #[test] 935 fn list_for_attempt_and_latest_attempt_id_work() { 936 let temp = tempfile::tempdir().expect("tempdir"); 937 let store = MycJsonlOperationAuditStore::new(temp.path(), config()); 938 939 store 940 .append( 941 &MycOperationAuditRecord::new( 942 MycOperationAuditKind::DiscoveryHandlerRefresh, 943 MycOperationAuditOutcome::Rejected, 944 None, 945 None, 946 2, 947 0, 948 "first attempt rejected", 949 ) 950 .with_attempt_id("attempt-1") 951 .with_planned_repair_relays(vec!["wss://relay-a.example.com".to_owned()]) 952 .with_blocked_relays( 953 "unavailable_relays", 954 vec!["wss://relay-b.example.com".to_owned()], 955 ), 956 ) 957 .expect("append first attempt"); 958 store 959 .append( 960 &MycOperationAuditRecord::new( 961 MycOperationAuditKind::DiscoveryHandlerRepair, 962 MycOperationAuditOutcome::Rejected, 963 None, 964 None, 965 1, 966 0, 967 "relay-a rejected", 968 ) 969 .with_attempt_id("attempt-1") 970 .with_relay_url("wss://relay-a.example.com"), 971 ) 972 .expect("append first repair"); 973 store 974 .append( 975 &MycOperationAuditRecord::new( 976 MycOperationAuditKind::DiscoveryHandlerRefresh, 977 MycOperationAuditOutcome::Succeeded, 978 None, 979 None, 980 1, 981 1, 982 "second attempt succeeded", 983 ) 984 .with_attempt_id("attempt-2"), 985 ) 986 .expect("append second attempt"); 987 988 let attempt_records = store 989 .list_for_attempt_id("attempt-1") 990 .expect("list attempt records"); 991 assert_eq!(attempt_records.len(), 2); 992 assert!( 993 attempt_records 994 .iter() 995 .all(|record| record.attempt_id.as_deref() == Some("attempt-1")) 996 ); 997 assert_eq!( 998 attempt_records[0].planned_repair_relays, 999 vec!["wss://relay-a.example.com".to_owned()] 1000 ); 1001 assert_eq!( 1002 attempt_records[0].blocked_relays, 1003 vec!["wss://relay-b.example.com".to_owned()] 1004 ); 1005 assert_eq!( 1006 attempt_records[0].blocked_reason.as_deref(), 1007 Some("unavailable_relays") 1008 ); 1009 assert_eq!( 1010 store 1011 .latest_attempt_id_for_operation(MycOperationAuditKind::DiscoveryHandlerRefresh) 1012 .expect("latest attempt"), 1013 Some("attempt-2".to_owned()) 1014 ); 1015 } 1016 1017 #[test] 1018 fn attempt_lookup_rebuilds_indexes_from_retained_logs() { 1019 let temp = tempfile::tempdir().expect("tempdir"); 1020 let store = MycJsonlOperationAuditStore::new(temp.path(), config()); 1021 1022 store 1023 .append( 1024 &MycOperationAuditRecord::new( 1025 MycOperationAuditKind::DiscoveryHandlerRefresh, 1026 MycOperationAuditOutcome::Rejected, 1027 None, 1028 None, 1029 2, 1030 0, 1031 "first attempt rejected", 1032 ) 1033 .with_attempt_id("attempt-1"), 1034 ) 1035 .expect("append first attempt"); 1036 store 1037 .append( 1038 &MycOperationAuditRecord::new( 1039 MycOperationAuditKind::DiscoveryHandlerRefresh, 1040 MycOperationAuditOutcome::Succeeded, 1041 None, 1042 None, 1043 1, 1044 1, 1045 "second attempt succeeded", 1046 ) 1047 .with_attempt_id("attempt-2"), 1048 ) 1049 .expect("append second attempt"); 1050 1051 fs::remove_dir_all(store.index_dir()).expect("remove index dir"); 1052 1053 let rebuilt_attempt_records = store 1054 .list_for_attempt_id("attempt-1") 1055 .expect("rebuild attempt records"); 1056 assert_eq!(rebuilt_attempt_records.len(), 1); 1057 assert_eq!( 1058 rebuilt_attempt_records[0].attempt_id.as_deref(), 1059 Some("attempt-1") 1060 ); 1061 assert_eq!( 1062 store 1063 .latest_attempt_id_for_operation(MycOperationAuditKind::DiscoveryHandlerRefresh) 1064 .expect("latest attempt after rebuild"), 1065 Some("attempt-2".to_owned()) 1066 ); 1067 assert!(store.attempt_index_path("attempt-1").exists()); 1068 assert!( 1069 store 1070 .latest_attempt_path(MycOperationAuditKind::DiscoveryHandlerRefresh) 1071 .exists() 1072 ); 1073 } 1074 }