commit d410cf28ae2c3e098c54505ebe0ed79769367c71
parent 95bdf1753b46d172f983ee41d1773e283826ab55
Author: triesap <tyson@radroots.org>
Date: Sun, 22 Mar 2026 18:37:04 +0000
audit: index discovery repair queries
- add sidecar attempt indexes and latest-attempt pointers under the runtime audit dir
- stream bounded audit reads instead of materializing retained log files before filtering
- rebuild query indexes from retained logs when indexes are missing or retention pruning occurs
- validate with cargo metadata --format-version 1 --no-deps, cargo fmt --all --check, and the repo-root cargo test --locked reconciliation run
Diffstat:
| M | src/audit.rs | | | 434 | +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------- |
1 file changed, 383 insertions(+), 51 deletions(-)
diff --git a/src/audit.rs b/src/audit.rs
@@ -1,3 +1,4 @@
+use std::collections::VecDeque;
use std::fs::{self, OpenOptions};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
@@ -12,6 +13,11 @@ use crate::error::MycError;
const MYC_OPERATION_AUDIT_FILE_NAME: &str = "operations.jsonl";
const MYC_OPERATION_AUDIT_ARCHIVE_PREFIX: &str = "operations.";
const MYC_OPERATION_AUDIT_ARCHIVE_SUFFIX: &str = ".jsonl";
+const MYC_OPERATION_AUDIT_INDEX_DIR_NAME: &str = "index";
+const MYC_OPERATION_AUDIT_INDEX_TMP_DIR_NAME: &str = "index.tmp";
+const MYC_OPERATION_AUDIT_ATTEMPTS_DIR_NAME: &str = "attempts";
+const MYC_OPERATION_AUDIT_LATEST_DIR_NAME: &str = "latest";
+const MYC_OPERATION_AUDIT_LATEST_SUFFIX: &str = ".attempt";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
@@ -71,6 +77,11 @@ pub struct MycOperationAuditStore {
config: MycAuditConfig,
}
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+struct MycAuditRotationResult {
+ pruned_retained_records: bool,
+}
+
impl MycOperationAuditRecord {
pub fn new(
operation: MycOperationAuditKind,
@@ -146,25 +157,14 @@ impl MycOperationAuditStore {
path: active_path.clone(),
source,
})?;
- self.rotate_if_needed(encoded.len() as u64 + 1)?;
+ let rotation = self.rotate_if_needed(encoded.len() as u64 + 1)?;
+ self.append_encoded_record_line(&active_path, &encoded)?;
- let mut file = OpenOptions::new()
- .create(true)
- .append(true)
- .open(&active_path)
- .map_err(|source| MycError::AuditIo {
- path: active_path.clone(),
- source,
- })?;
- file.write_all(&encoded)
- .map_err(|source| MycError::AuditIo {
- path: active_path.clone(),
- source,
- })?;
- file.write_all(b"\n").map_err(|source| MycError::AuditIo {
- path: active_path,
- source,
- })?;
+ if rotation.pruned_retained_records {
+ self.rebuild_query_indexes_from_retained_logs()?;
+ } else {
+ self.append_record_to_indexes(record)?;
+ }
Ok(())
}
@@ -205,29 +205,26 @@ impl MycOperationAuditStore {
attempt_id: &str,
limit: usize,
) -> Result<Vec<MycOperationAuditRecord>, MycError> {
- self.list_matching(limit, |record| {
- record.attempt_id.as_deref() == Some(attempt_id)
- })
+ if limit == 0 {
+ return Ok(Vec::new());
+ }
+
+ let attempt_path = self.attempt_index_path(attempt_id);
+ if !attempt_path.exists() {
+ self.rebuild_query_indexes_from_retained_logs()?;
+ }
+ self.read_recent_records_from_path_with_limit(&attempt_path, limit)
}
pub fn latest_attempt_id_for_operation(
&self,
operation: MycOperationAuditKind,
) -> Result<Option<String>, MycError> {
- for path in self.read_paths_newest_first()? {
- let mut file_records = self.read_records_from_path(&path)?;
- file_records.reverse();
-
- for record in file_records {
- if record.operation == operation {
- if let Some(attempt_id) = record.attempt_id {
- return Ok(Some(attempt_id));
- }
- }
- }
+ let latest_path = self.latest_attempt_path(operation);
+ if !latest_path.exists() {
+ self.rebuild_query_indexes_from_retained_logs()?;
}
-
- Ok(None)
+ self.read_latest_attempt_id_from_path(&latest_path)
}
fn list_matching<F>(
@@ -244,18 +241,15 @@ impl MycOperationAuditStore {
let mut newest_records = Vec::new();
for path in self.read_paths_newest_first()? {
- let mut file_records = self.read_records_from_path(&path)?;
- file_records.reverse();
-
- for record in file_records {
- if predicate(&record) {
- newest_records.push(record);
- if newest_records.len() == limit {
- newest_records.reverse();
- return Ok(newest_records);
- }
- }
+ let remaining = limit.saturating_sub(newest_records.len());
+ if remaining == 0 {
+ break;
}
+
+ let mut file_records =
+ self.read_recent_records_from_path_matching(&path, remaining, &predicate)?;
+ file_records.reverse();
+ newest_records.extend(file_records);
}
newest_records.reverse();
@@ -300,7 +294,7 @@ impl MycOperationAuditStore {
Ok(records)
}
- fn rotate_if_needed(&self, additional_bytes: u64) -> Result<(), MycError> {
+ fn rotate_if_needed(&self, additional_bytes: u64) -> Result<MycAuditRotationResult, MycError> {
let active_path = self.active_path();
let current_len = match fs::metadata(&active_path) {
Ok(metadata) => metadata.len(),
@@ -316,13 +310,16 @@ impl MycOperationAuditStore {
if current_len == 0
|| current_len.saturating_add(additional_bytes) <= self.config.max_active_file_bytes
{
- return Ok(());
+ return Ok(MycAuditRotationResult {
+ pruned_retained_records: false,
+ });
}
self.rotate_active_file()
}
- fn rotate_active_file(&self) -> Result<(), MycError> {
+ fn rotate_active_file(&self) -> Result<MycAuditRotationResult, MycError> {
+ let mut pruned_retained_records = false;
for index in (1..=self.config.max_archived_files).rev() {
let archived_path = self.archive_path(index);
if !archived_path.exists() {
@@ -334,6 +331,7 @@ impl MycOperationAuditStore {
path: archived_path,
source,
})?;
+ pruned_retained_records = true;
} else {
let next_path = self.archive_path(index + 1);
fs::rename(&archived_path, &next_path).map_err(|source| MycError::AuditIo {
@@ -345,7 +343,9 @@ impl MycOperationAuditStore {
let active_path = self.active_path();
if !active_path.exists() {
- return Ok(());
+ return Ok(MycAuditRotationResult {
+ pruned_retained_records,
+ });
}
if self.config.max_archived_files == 0 {
@@ -353,7 +353,9 @@ impl MycOperationAuditStore {
path: active_path,
source,
})?;
- return Ok(());
+ return Ok(MycAuditRotationResult {
+ pruned_retained_records: true,
+ });
}
let first_archive = self.archive_path(1);
@@ -361,7 +363,9 @@ impl MycOperationAuditStore {
path: active_path,
source,
})?;
- Ok(())
+ Ok(MycAuditRotationResult {
+ pruned_retained_records,
+ })
}
fn read_paths_newest_first(&self) -> Result<Vec<PathBuf>, MycError> {
@@ -416,6 +420,241 @@ impl MycOperationAuditStore {
"{MYC_OPERATION_AUDIT_ARCHIVE_PREFIX}{index}{MYC_OPERATION_AUDIT_ARCHIVE_SUFFIX}"
))
}
+
+ fn index_dir(&self) -> PathBuf {
+ self.audit_dir.join(MYC_OPERATION_AUDIT_INDEX_DIR_NAME)
+ }
+
+ fn attempt_index_dir(&self) -> PathBuf {
+ self.index_dir().join(MYC_OPERATION_AUDIT_ATTEMPTS_DIR_NAME)
+ }
+
+ fn latest_attempt_dir(&self) -> PathBuf {
+ self.index_dir().join(MYC_OPERATION_AUDIT_LATEST_DIR_NAME)
+ }
+
+ fn attempt_index_path(&self, attempt_id: &str) -> PathBuf {
+ self.attempt_index_dir()
+ .join(format!("{}.jsonl", encode_index_component(attempt_id)))
+ }
+
+ fn latest_attempt_path(&self, operation: MycOperationAuditKind) -> PathBuf {
+ self.latest_attempt_dir().join(format!(
+ "{}{MYC_OPERATION_AUDIT_LATEST_SUFFIX}",
+ operation_index_label(operation)
+ ))
+ }
+
+ fn append_encoded_record_line(&self, path: &Path, encoded: &[u8]) -> Result<(), MycError> {
+ let mut file = OpenOptions::new()
+ .create(true)
+ .append(true)
+ .open(path)
+ .map_err(|source| MycError::AuditIo {
+ path: path.to_path_buf(),
+ source,
+ })?;
+ file.write_all(encoded)
+ .map_err(|source| MycError::AuditIo {
+ path: path.to_path_buf(),
+ source,
+ })?;
+ file.write_all(b"\n").map_err(|source| MycError::AuditIo {
+ path: path.to_path_buf(),
+ source,
+ })?;
+ Ok(())
+ }
+
+ fn append_record_to_indexes(&self, record: &MycOperationAuditRecord) -> Result<(), MycError> {
+ let Some(attempt_id) = record.attempt_id.as_deref() else {
+ return Ok(());
+ };
+
+ self.ensure_index_dirs()?;
+ self.append_record_to_index_root(&self.index_dir(), record)?;
+ self.write_latest_attempt_pointer(record.operation, attempt_id)
+ }
+
+ fn ensure_index_dirs(&self) -> Result<(), MycError> {
+ fs::create_dir_all(self.attempt_index_dir()).map_err(|source| MycError::AuditIo {
+ path: self.attempt_index_dir(),
+ source,
+ })?;
+ fs::create_dir_all(self.latest_attempt_dir()).map_err(|source| MycError::AuditIo {
+ path: self.latest_attempt_dir(),
+ source,
+ })?;
+ Ok(())
+ }
+
+ fn append_record_to_index_root(
+ &self,
+ index_root: &Path,
+ record: &MycOperationAuditRecord,
+ ) -> Result<(), MycError> {
+ let Some(attempt_id) = record.attempt_id.as_deref() else {
+ return Ok(());
+ };
+
+ let attempts_dir = index_root.join(MYC_OPERATION_AUDIT_ATTEMPTS_DIR_NAME);
+ fs::create_dir_all(&attempts_dir).map_err(|source| MycError::AuditIo {
+ path: attempts_dir.clone(),
+ source,
+ })?;
+ let latest_dir = index_root.join(MYC_OPERATION_AUDIT_LATEST_DIR_NAME);
+ fs::create_dir_all(&latest_dir).map_err(|source| MycError::AuditIo {
+ path: latest_dir.clone(),
+ source,
+ })?;
+
+ let encoded = serde_json::to_vec(record).map_err(|source| MycError::AuditSerialize {
+ path: attempts_dir.join(format!("{}.jsonl", encode_index_component(attempt_id))),
+ source,
+ })?;
+ self.append_encoded_record_line(
+ &attempts_dir.join(format!("{}.jsonl", encode_index_component(attempt_id))),
+ &encoded,
+ )?;
+ self.write_latest_attempt_pointer_to_root(index_root, record.operation, attempt_id)
+ }
+
+ fn write_latest_attempt_pointer(
+ &self,
+ operation: MycOperationAuditKind,
+ attempt_id: &str,
+ ) -> Result<(), MycError> {
+ self.write_latest_attempt_pointer_to_root(&self.index_dir(), operation, attempt_id)
+ }
+
+ fn write_latest_attempt_pointer_to_root(
+ &self,
+ index_root: &Path,
+ operation: MycOperationAuditKind,
+ attempt_id: &str,
+ ) -> Result<(), MycError> {
+ let latest_dir = index_root.join(MYC_OPERATION_AUDIT_LATEST_DIR_NAME);
+ fs::create_dir_all(&latest_dir).map_err(|source| MycError::AuditIo {
+ path: latest_dir.clone(),
+ source,
+ })?;
+ let path = latest_dir.join(format!(
+ "{}{MYC_OPERATION_AUDIT_LATEST_SUFFIX}",
+ operation_index_label(operation)
+ ));
+ write_atomic_text(&path, attempt_id)
+ }
+
+ fn rebuild_query_indexes_from_retained_logs(&self) -> Result<(), MycError> {
+ let staging_root = self.audit_dir.join(MYC_OPERATION_AUDIT_INDEX_TMP_DIR_NAME);
+ if staging_root.exists() {
+ fs::remove_dir_all(&staging_root).map_err(|source| MycError::AuditIo {
+ path: staging_root.clone(),
+ source,
+ })?;
+ }
+ fs::create_dir_all(&staging_root).map_err(|source| MycError::AuditIo {
+ path: staging_root.clone(),
+ source,
+ })?;
+
+ let mut retained_paths = self.read_paths_newest_first()?;
+ retained_paths.reverse();
+ for path in retained_paths {
+ for record in self.read_records_from_path(&path)? {
+ self.append_record_to_index_root(&staging_root, &record)?;
+ }
+ }
+
+ let final_root = self.index_dir();
+ if final_root.exists() {
+ fs::remove_dir_all(&final_root).map_err(|source| MycError::AuditIo {
+ path: final_root.clone(),
+ source,
+ })?;
+ }
+ fs::rename(&staging_root, &final_root).map_err(|source| MycError::AuditIo {
+ path: staging_root,
+ source,
+ })?;
+ Ok(())
+ }
+
+ fn read_latest_attempt_id_from_path(&self, path: &Path) -> Result<Option<String>, MycError> {
+ match fs::read_to_string(path) {
+ Ok(contents) => {
+ let attempt_id = contents.trim();
+ if attempt_id.is_empty() {
+ Ok(None)
+ } else {
+ Ok(Some(attempt_id.to_owned()))
+ }
+ }
+ Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None),
+ Err(source) => Err(MycError::AuditIo {
+ path: path.to_path_buf(),
+ source,
+ }),
+ }
+ }
+
+ fn read_recent_records_from_path_with_limit(
+ &self,
+ path: &Path,
+ limit: usize,
+ ) -> Result<Vec<MycOperationAuditRecord>, MycError> {
+ self.read_recent_records_from_path_matching(path, limit, &|_| true)
+ }
+
+ fn read_recent_records_from_path_matching<F>(
+ &self,
+ path: &Path,
+ limit: usize,
+ predicate: &F,
+ ) -> Result<Vec<MycOperationAuditRecord>, MycError>
+ where
+ F: Fn(&MycOperationAuditRecord) -> bool,
+ {
+ if limit == 0 || !path.exists() {
+ return Ok(Vec::new());
+ }
+
+ let file = fs::File::open(path).map_err(|source| MycError::AuditIo {
+ path: path.to_path_buf(),
+ source,
+ })?;
+ let reader = BufReader::new(file);
+ let mut recent_records = VecDeque::new();
+
+ for (line_number, line) in reader.lines().enumerate() {
+ let line = line.map_err(|source| MycError::AuditIo {
+ path: path.to_path_buf(),
+ source,
+ })?;
+ if line.trim().is_empty() {
+ continue;
+ }
+
+ let record =
+ serde_json::from_str::<MycOperationAuditRecord>(&line).map_err(|source| {
+ MycError::AuditParse {
+ path: path.to_path_buf(),
+ line_number: line_number + 1,
+ source,
+ }
+ })?;
+ if !predicate(&record) {
+ continue;
+ }
+
+ if recent_records.len() == limit {
+ recent_records.pop_front();
+ }
+ recent_records.push_back(record);
+ }
+
+ Ok(recent_records.into_iter().collect())
+ }
}
fn parse_archive_index(file_name: &str) -> Option<usize> {
@@ -426,6 +665,41 @@ fn parse_archive_index(file_name: &str) -> Option<usize> {
.ok()
}
+fn operation_index_label(kind: MycOperationAuditKind) -> &'static str {
+ match kind {
+ MycOperationAuditKind::ListenerResponsePublish => "listener_response_publish",
+ MycOperationAuditKind::ConnectAcceptPublish => "connect_accept_publish",
+ MycOperationAuditKind::AuthReplayPublish => "auth_replay_publish",
+ MycOperationAuditKind::AuthReplayRestore => "auth_replay_restore",
+ MycOperationAuditKind::DiscoveryHandlerFetch => "discovery_handler_fetch",
+ MycOperationAuditKind::DiscoveryHandlerPublish => "discovery_handler_publish",
+ MycOperationAuditKind::DiscoveryHandlerCompare => "discovery_handler_compare",
+ MycOperationAuditKind::DiscoveryHandlerRefresh => "discovery_handler_refresh",
+ MycOperationAuditKind::DiscoveryHandlerRepair => "discovery_handler_repair",
+ }
+}
+
+fn encode_index_component(value: &str) -> String {
+ let mut encoded = String::with_capacity(value.len() * 2);
+ for byte in value.bytes() {
+ encoded.push_str(&format!("{byte:02x}"));
+ }
+ encoded
+}
+
+fn write_atomic_text(path: &Path, contents: &str) -> Result<(), MycError> {
+ let tmp_path = path.with_extension("tmp");
+ fs::write(&tmp_path, contents).map_err(|source| MycError::AuditIo {
+ path: tmp_path.clone(),
+ source,
+ })?;
+ fs::rename(&tmp_path, path).map_err(|source| MycError::AuditIo {
+ path: tmp_path,
+ source,
+ })?;
+ Ok(())
+}
+
fn now_unix_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
@@ -632,4 +906,62 @@ mod tests {
Some("attempt-2".to_owned())
);
}
+
+ #[test]
+ fn attempt_lookup_rebuilds_indexes_from_retained_logs() {
+ let temp = tempfile::tempdir().expect("tempdir");
+ let store = MycOperationAuditStore::new(temp.path(), config());
+
+ store
+ .append(
+ &MycOperationAuditRecord::new(
+ MycOperationAuditKind::DiscoveryHandlerRefresh,
+ MycOperationAuditOutcome::Rejected,
+ None,
+ None,
+ 2,
+ 0,
+ "first attempt rejected",
+ )
+ .with_attempt_id("attempt-1"),
+ )
+ .expect("append first attempt");
+ store
+ .append(
+ &MycOperationAuditRecord::new(
+ MycOperationAuditKind::DiscoveryHandlerRefresh,
+ MycOperationAuditOutcome::Succeeded,
+ None,
+ None,
+ 1,
+ 1,
+ "second attempt succeeded",
+ )
+ .with_attempt_id("attempt-2"),
+ )
+ .expect("append second attempt");
+
+ fs::remove_dir_all(store.index_dir()).expect("remove index dir");
+
+ let rebuilt_attempt_records = store
+ .list_for_attempt_id("attempt-1")
+ .expect("rebuild attempt records");
+ assert_eq!(rebuilt_attempt_records.len(), 1);
+ assert_eq!(
+ rebuilt_attempt_records[0].attempt_id.as_deref(),
+ Some("attempt-1")
+ );
+ assert_eq!(
+ store
+ .latest_attempt_id_for_operation(MycOperationAuditKind::DiscoveryHandlerRefresh)
+ .expect("latest attempt after rebuild"),
+ Some("attempt-2".to_owned())
+ );
+ assert!(store.attempt_index_path("attempt-1").exists());
+ assert!(
+ store
+ .latest_attempt_path(MycOperationAuditKind::DiscoveryHandlerRefresh)
+ .exists()
+ );
+ }
}