myc

Self-custodial remote signer for Radroots apps
git clone https://radroots.dev/git/myc.git
Log | Files | Refs | README | LICENSE

audit.rs (36565B)


      1 use std::collections::VecDeque;
      2 use std::fs::{self, OpenOptions};
      3 use std::io::{BufRead, BufReader, Write};
      4 use std::path::{Path, PathBuf};
      5 use std::time::{SystemTime, UNIX_EPOCH};
      6 
      7 use radroots_nostr_signer::prelude::RadrootsNostrSignerConnectionId;
      8 use serde::{Deserialize, Serialize};
      9 
     10 use crate::config::MycAuditConfig;
     11 use crate::config::MycTransportDeliveryPolicy;
     12 use crate::error::MycError;
     13 
     14 const MYC_OPERATION_AUDIT_FILE_NAME: &str = "operations.jsonl";
     15 const MYC_OPERATION_AUDIT_ARCHIVE_PREFIX: &str = "operations.";
     16 const MYC_OPERATION_AUDIT_ARCHIVE_SUFFIX: &str = ".jsonl";
     17 const MYC_OPERATION_AUDIT_INDEX_DIR_NAME: &str = "index";
     18 const MYC_OPERATION_AUDIT_INDEX_TMP_DIR_NAME: &str = "index.tmp";
     19 const MYC_OPERATION_AUDIT_ATTEMPTS_DIR_NAME: &str = "attempts";
     20 const MYC_OPERATION_AUDIT_LATEST_DIR_NAME: &str = "latest";
     21 const MYC_OPERATION_AUDIT_LATEST_SUFFIX: &str = ".attempt";
     22 
     23 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
     24 #[serde(rename_all = "snake_case")]
     25 pub enum MycOperationAuditKind {
     26     DeliveryRecovery,
     27     ListenerResponsePublish,
     28     ConnectAcceptPublish,
     29     AuthReplayPublish,
     30     AuthReplayRestore,
     31     DiscoveryHandlerFetch,
     32     DiscoveryHandlerPublish,
     33     DiscoveryHandlerCompare,
     34     DiscoveryHandlerRefresh,
     35     DiscoveryHandlerRepair,
     36 }
     37 
     38 #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
     39 #[serde(rename_all = "snake_case")]
     40 pub enum MycOperationAuditOutcome {
     41     Succeeded,
     42     Rejected,
     43     Restored,
     44     Unavailable,
     45     Missing,
     46     Matched,
     47     Drifted,
     48     Conflicted,
     49     Skipped,
     50 }
     51 
     52 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
     53 pub struct MycOperationAuditRecord {
     54     pub recorded_at_unix: u64,
     55     pub operation: MycOperationAuditKind,
     56     pub outcome: MycOperationAuditOutcome,
     57     #[serde(default, skip_serializing_if = "Option::is_none")]
     58     pub relay_url: Option<String>,
     59     #[serde(default, skip_serializing_if = "Option::is_none")]
     60     pub connection_id: Option<String>,
     61     #[serde(default, skip_serializing_if = "Option::is_none")]
     62     pub request_id: Option<String>,
     63     #[serde(default, skip_serializing_if = "Option::is_none")]
     64     pub attempt_id: Option<String>,
     65     #[serde(default, skip_serializing_if = "Vec::is_empty")]
     66     pub planned_repair_relays: Vec<String>,
     67     #[serde(default, skip_serializing_if = "Vec::is_empty")]
     68     pub blocked_relays: Vec<String>,
     69     #[serde(default, skip_serializing_if = "Option::is_none")]
     70     pub blocked_reason: Option<String>,
     71     #[serde(default, skip_serializing_if = "Option::is_none")]
     72     pub delivery_policy: Option<MycTransportDeliveryPolicy>,
     73     #[serde(default, skip_serializing_if = "Option::is_none")]
     74     pub required_acknowledged_relay_count: Option<usize>,
     75     #[serde(default, skip_serializing_if = "Option::is_none")]
     76     pub publish_attempt_count: Option<usize>,
     77     pub relay_count: usize,
     78     pub acknowledged_relay_count: usize,
     79     pub relay_outcome_summary: String,
     80 }
     81 
     82 pub trait MycOperationAuditStore: Send + Sync {
     83     fn config(&self) -> &MycAuditConfig;
     84     fn append(&self, record: &MycOperationAuditRecord) -> Result<(), MycError>;
     85     fn list(&self) -> Result<Vec<MycOperationAuditRecord>, MycError> {
     86         self.list_with_limit(self.config().default_read_limit)
     87     }
     88     fn list_all(&self) -> Result<Vec<MycOperationAuditRecord>, MycError>;
     89     fn list_with_limit(&self, limit: usize) -> Result<Vec<MycOperationAuditRecord>, MycError>;
     90     fn list_for_connection(
     91         &self,
     92         connection_id: &RadrootsNostrSignerConnectionId,
     93     ) -> Result<Vec<MycOperationAuditRecord>, MycError> {
     94         self.list_for_connection_with_limit(connection_id, self.config().default_read_limit)
     95     }
     96     fn list_for_connection_with_limit(
     97         &self,
     98         connection_id: &RadrootsNostrSignerConnectionId,
     99         limit: usize,
    100     ) -> Result<Vec<MycOperationAuditRecord>, MycError>;
    101     fn list_for_attempt_id(
    102         &self,
    103         attempt_id: &str,
    104     ) -> Result<Vec<MycOperationAuditRecord>, MycError> {
    105         self.list_for_attempt_id_with_limit(attempt_id, usize::MAX)
    106     }
    107     fn list_for_attempt_id_with_limit(
    108         &self,
    109         attempt_id: &str,
    110         limit: usize,
    111     ) -> Result<Vec<MycOperationAuditRecord>, MycError>;
    112     fn latest_attempt_id_for_operation(
    113         &self,
    114         operation: MycOperationAuditKind,
    115     ) -> Result<Option<String>, MycError>;
    116 }
    117 
    118 #[derive(Debug, Clone)]
    119 pub struct MycJsonlOperationAuditStore {
    120     audit_dir: PathBuf,
    121     config: MycAuditConfig,
    122 }
    123 
    124 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    125 struct MycAuditRotationResult {
    126     pruned_retained_records: bool,
    127 }
    128 
    129 impl MycOperationAuditRecord {
    130     pub fn new(
    131         operation: MycOperationAuditKind,
    132         outcome: MycOperationAuditOutcome,
    133         connection_id: Option<&RadrootsNostrSignerConnectionId>,
    134         request_id: Option<&str>,
    135         relay_count: usize,
    136         acknowledged_relay_count: usize,
    137         relay_outcome_summary: impl Into<String>,
    138     ) -> Self {
    139         Self {
    140             recorded_at_unix: now_unix_secs(),
    141             operation,
    142             outcome,
    143             relay_url: None,
    144             connection_id: connection_id.map(ToString::to_string),
    145             request_id: request_id.map(ToOwned::to_owned),
    146             attempt_id: None,
    147             planned_repair_relays: Vec::new(),
    148             blocked_relays: Vec::new(),
    149             blocked_reason: None,
    150             delivery_policy: None,
    151             required_acknowledged_relay_count: None,
    152             publish_attempt_count: None,
    153             relay_count,
    154             acknowledged_relay_count,
    155             relay_outcome_summary: relay_outcome_summary.into(),
    156         }
    157     }
    158 
    159     pub fn with_relay_url(mut self, relay_url: impl Into<String>) -> Self {
    160         self.relay_url = Some(relay_url.into());
    161         self
    162     }
    163 
    164     pub fn with_attempt_id(mut self, attempt_id: impl Into<String>) -> Self {
    165         self.attempt_id = Some(attempt_id.into());
    166         self
    167     }
    168 
    169     pub fn with_planned_repair_relays(mut self, planned_repair_relays: Vec<String>) -> Self {
    170         self.planned_repair_relays = planned_repair_relays;
    171         self
    172     }
    173 
    174     pub fn with_blocked_relays(
    175         mut self,
    176         blocked_reason: impl Into<String>,
    177         blocked_relays: Vec<String>,
    178     ) -> Self {
    179         self.blocked_reason = Some(blocked_reason.into());
    180         self.blocked_relays = blocked_relays;
    181         self
    182     }
    183 
    184     pub fn with_delivery_details(
    185         mut self,
    186         delivery_policy: MycTransportDeliveryPolicy,
    187         required_acknowledged_relay_count: usize,
    188         publish_attempt_count: usize,
    189     ) -> Self {
    190         self.delivery_policy = Some(delivery_policy);
    191         self.required_acknowledged_relay_count = Some(required_acknowledged_relay_count);
    192         self.publish_attempt_count = Some(publish_attempt_count);
    193         self
    194     }
    195 }
    196 
    197 impl MycJsonlOperationAuditStore {
    198     pub fn new(audit_dir: impl AsRef<Path>, config: MycAuditConfig) -> Self {
    199         Self {
    200             audit_dir: audit_dir.as_ref().to_path_buf(),
    201             config,
    202         }
    203     }
    204 
    205     pub fn path(&self) -> PathBuf {
    206         self.active_path()
    207     }
    208 
    209     pub fn config(&self) -> &MycAuditConfig {
    210         &self.config
    211     }
    212 
    213     pub fn append(&self, record: &MycOperationAuditRecord) -> Result<(), MycError> {
    214         let active_path = self.active_path();
    215         let encoded = serde_json::to_vec(record).map_err(|source| MycError::AuditSerialize {
    216             path: active_path.clone(),
    217             source,
    218         })?;
    219         let rotation = self.rotate_if_needed(encoded.len() as u64 + 1)?;
    220         self.append_encoded_record_line(&active_path, &encoded)?;
    221 
    222         if rotation.pruned_retained_records {
    223             self.rebuild_query_indexes_from_retained_logs()?;
    224         } else {
    225             self.append_record_to_indexes(record)?;
    226         }
    227         Ok(())
    228     }
    229 
    230     pub fn list(&self) -> Result<Vec<MycOperationAuditRecord>, MycError> {
    231         self.list_with_limit(self.config.default_read_limit)
    232     }
    233 
    234     pub fn list_all(&self) -> Result<Vec<MycOperationAuditRecord>, MycError> {
    235         self.list_matching(usize::MAX, |_| true)
    236     }
    237 
    238     pub fn list_with_limit(&self, limit: usize) -> Result<Vec<MycOperationAuditRecord>, MycError> {
    239         self.list_matching(limit, |_| true)
    240     }
    241 
    242     pub fn list_for_connection(
    243         &self,
    244         connection_id: &RadrootsNostrSignerConnectionId,
    245     ) -> Result<Vec<MycOperationAuditRecord>, MycError> {
    246         self.list_for_connection_with_limit(connection_id, self.config.default_read_limit)
    247     }
    248 
    249     pub fn list_for_connection_with_limit(
    250         &self,
    251         connection_id: &RadrootsNostrSignerConnectionId,
    252         limit: usize,
    253     ) -> Result<Vec<MycOperationAuditRecord>, MycError> {
    254         self.list_matching(limit, |record| {
    255             record.connection_id.as_deref() == Some(connection_id.as_str())
    256         })
    257     }
    258 
    259     pub fn list_for_attempt_id(
    260         &self,
    261         attempt_id: &str,
    262     ) -> Result<Vec<MycOperationAuditRecord>, MycError> {
    263         self.list_for_attempt_id_with_limit(attempt_id, usize::MAX)
    264     }
    265 
    266     pub fn list_for_attempt_id_with_limit(
    267         &self,
    268         attempt_id: &str,
    269         limit: usize,
    270     ) -> Result<Vec<MycOperationAuditRecord>, MycError> {
    271         if limit == 0 {
    272             return Ok(Vec::new());
    273         }
    274 
    275         let attempt_path = self.attempt_index_path(attempt_id);
    276         if !attempt_path.exists() {
    277             self.rebuild_query_indexes_from_retained_logs()?;
    278         }
    279         self.read_recent_records_from_path_with_limit(&attempt_path, limit)
    280     }
    281 
    282     pub fn latest_attempt_id_for_operation(
    283         &self,
    284         operation: MycOperationAuditKind,
    285     ) -> Result<Option<String>, MycError> {
    286         let latest_path = self.latest_attempt_path(operation);
    287         if !latest_path.exists() {
    288             self.rebuild_query_indexes_from_retained_logs()?;
    289         }
    290         self.read_latest_attempt_id_from_path(&latest_path)
    291     }
    292 
    293     fn list_matching<F>(
    294         &self,
    295         limit: usize,
    296         predicate: F,
    297     ) -> Result<Vec<MycOperationAuditRecord>, MycError>
    298     where
    299         F: Fn(&MycOperationAuditRecord) -> bool,
    300     {
    301         if limit == 0 {
    302             return Ok(Vec::new());
    303         }
    304 
    305         let mut newest_records = Vec::new();
    306         for path in self.read_paths_newest_first()? {
    307             let remaining = limit.saturating_sub(newest_records.len());
    308             if remaining == 0 {
    309                 break;
    310             }
    311 
    312             let mut file_records =
    313                 self.read_recent_records_from_path_matching(&path, remaining, &predicate)?;
    314             file_records.reverse();
    315             newest_records.extend(file_records);
    316         }
    317 
    318         newest_records.reverse();
    319         Ok(newest_records)
    320     }
    321 
    322     fn read_records_from_path(
    323         &self,
    324         path: &Path,
    325     ) -> Result<Vec<MycOperationAuditRecord>, MycError> {
    326         if !path.exists() {
    327             return Ok(Vec::new());
    328         }
    329 
    330         let file = fs::File::open(path).map_err(|source| MycError::AuditIo {
    331             path: path.to_path_buf(),
    332             source,
    333         })?;
    334         let reader = BufReader::new(file);
    335         let mut records = Vec::new();
    336 
    337         for (line_number, line) in reader.lines().enumerate() {
    338             let line = line.map_err(|source| MycError::AuditIo {
    339                 path: path.to_path_buf(),
    340                 source,
    341             })?;
    342             if line.trim().is_empty() {
    343                 continue;
    344             }
    345 
    346             let record =
    347                 serde_json::from_str::<MycOperationAuditRecord>(&line).map_err(|source| {
    348                     MycError::AuditParse {
    349                         path: path.to_path_buf(),
    350                         line_number: line_number + 1,
    351                         source,
    352                     }
    353                 })?;
    354             records.push(record);
    355         }
    356 
    357         Ok(records)
    358     }
    359 
    360     fn rotate_if_needed(&self, additional_bytes: u64) -> Result<MycAuditRotationResult, MycError> {
    361         let active_path = self.active_path();
    362         let current_len = match fs::metadata(&active_path) {
    363             Ok(metadata) => metadata.len(),
    364             Err(error) if error.kind() == std::io::ErrorKind::NotFound => 0,
    365             Err(source) => {
    366                 return Err(MycError::AuditIo {
    367                     path: active_path,
    368                     source,
    369                 });
    370             }
    371         };
    372 
    373         if current_len == 0
    374             || current_len.saturating_add(additional_bytes) <= self.config.max_active_file_bytes
    375         {
    376             return Ok(MycAuditRotationResult {
    377                 pruned_retained_records: false,
    378             });
    379         }
    380 
    381         self.rotate_active_file()
    382     }
    383 
    384     fn rotate_active_file(&self) -> Result<MycAuditRotationResult, MycError> {
    385         let mut pruned_retained_records = false;
    386         for index in (1..=self.config.max_archived_files).rev() {
    387             let archived_path = self.archive_path(index);
    388             if !archived_path.exists() {
    389                 continue;
    390             }
    391 
    392             if index == self.config.max_archived_files {
    393                 fs::remove_file(&archived_path).map_err(|source| MycError::AuditIo {
    394                     path: archived_path,
    395                     source,
    396                 })?;
    397                 pruned_retained_records = true;
    398             } else {
    399                 let next_path = self.archive_path(index + 1);
    400                 fs::rename(&archived_path, &next_path).map_err(|source| MycError::AuditIo {
    401                     path: archived_path,
    402                     source,
    403                 })?;
    404             }
    405         }
    406 
    407         let active_path = self.active_path();
    408         if !active_path.exists() {
    409             return Ok(MycAuditRotationResult {
    410                 pruned_retained_records,
    411             });
    412         }
    413 
    414         if self.config.max_archived_files == 0 {
    415             fs::remove_file(&active_path).map_err(|source| MycError::AuditIo {
    416                 path: active_path,
    417                 source,
    418             })?;
    419             return Ok(MycAuditRotationResult {
    420                 pruned_retained_records: true,
    421             });
    422         }
    423 
    424         let first_archive = self.archive_path(1);
    425         fs::rename(&active_path, &first_archive).map_err(|source| MycError::AuditIo {
    426             path: active_path,
    427             source,
    428         })?;
    429         Ok(MycAuditRotationResult {
    430             pruned_retained_records,
    431         })
    432     }
    433 
    434     fn read_paths_newest_first(&self) -> Result<Vec<PathBuf>, MycError> {
    435         let mut paths = Vec::new();
    436         let active_path = self.active_path();
    437         if active_path.exists() {
    438             paths.push(active_path);
    439         }
    440 
    441         let mut archived = self.archived_paths()?;
    442         archived.sort_by_key(|(_, index)| *index);
    443         for (path, _) in archived {
    444             paths.push(path);
    445         }
    446 
    447         Ok(paths)
    448     }
    449 
    450     fn archived_paths(&self) -> Result<Vec<(PathBuf, usize)>, MycError> {
    451         let mut archived = Vec::new();
    452         if !self.audit_dir.exists() {
    453             return Ok(archived);
    454         }
    455 
    456         for entry in fs::read_dir(&self.audit_dir).map_err(|source| MycError::AuditIo {
    457             path: self.audit_dir.clone(),
    458             source,
    459         })? {
    460             let entry = entry.map_err(|source| MycError::AuditIo {
    461                 path: self.audit_dir.clone(),
    462                 source,
    463             })?;
    464             let file_name = entry.file_name();
    465             let Some(file_name) = file_name.to_str() else {
    466                 continue;
    467             };
    468             let Some(index) = parse_archive_index(file_name) else {
    469                 continue;
    470             };
    471             archived.push((entry.path(), index));
    472         }
    473 
    474         Ok(archived)
    475     }
    476 
    477     fn active_path(&self) -> PathBuf {
    478         self.audit_dir.join(MYC_OPERATION_AUDIT_FILE_NAME)
    479     }
    480 
    481     fn archive_path(&self, index: usize) -> PathBuf {
    482         self.audit_dir.join(format!(
    483             "{MYC_OPERATION_AUDIT_ARCHIVE_PREFIX}{index}{MYC_OPERATION_AUDIT_ARCHIVE_SUFFIX}"
    484         ))
    485     }
    486 
    487     fn index_dir(&self) -> PathBuf {
    488         self.audit_dir.join(MYC_OPERATION_AUDIT_INDEX_DIR_NAME)
    489     }
    490 
    491     fn attempt_index_dir(&self) -> PathBuf {
    492         self.index_dir().join(MYC_OPERATION_AUDIT_ATTEMPTS_DIR_NAME)
    493     }
    494 
    495     fn latest_attempt_dir(&self) -> PathBuf {
    496         self.index_dir().join(MYC_OPERATION_AUDIT_LATEST_DIR_NAME)
    497     }
    498 
    499     fn attempt_index_path(&self, attempt_id: &str) -> PathBuf {
    500         self.attempt_index_dir()
    501             .join(format!("{}.jsonl", encode_index_component(attempt_id)))
    502     }
    503 
    504     fn latest_attempt_path(&self, operation: MycOperationAuditKind) -> PathBuf {
    505         self.latest_attempt_dir().join(format!(
    506             "{}{MYC_OPERATION_AUDIT_LATEST_SUFFIX}",
    507             operation_index_label(operation)
    508         ))
    509     }
    510 
    511     fn append_encoded_record_line(&self, path: &Path, encoded: &[u8]) -> Result<(), MycError> {
    512         let mut file = OpenOptions::new()
    513             .create(true)
    514             .append(true)
    515             .open(path)
    516             .map_err(|source| MycError::AuditIo {
    517                 path: path.to_path_buf(),
    518                 source,
    519             })?;
    520         file.write_all(encoded)
    521             .map_err(|source| MycError::AuditIo {
    522                 path: path.to_path_buf(),
    523                 source,
    524             })?;
    525         file.write_all(b"\n").map_err(|source| MycError::AuditIo {
    526             path: path.to_path_buf(),
    527             source,
    528         })?;
    529         Ok(())
    530     }
    531 
    532     fn append_record_to_indexes(&self, record: &MycOperationAuditRecord) -> Result<(), MycError> {
    533         let Some(attempt_id) = record.attempt_id.as_deref() else {
    534             return Ok(());
    535         };
    536 
    537         self.ensure_index_dirs()?;
    538         self.append_record_to_index_root(&self.index_dir(), record)?;
    539         self.write_latest_attempt_pointer(record.operation, attempt_id)
    540     }
    541 
    542     fn ensure_index_dirs(&self) -> Result<(), MycError> {
    543         fs::create_dir_all(self.attempt_index_dir()).map_err(|source| MycError::AuditIo {
    544             path: self.attempt_index_dir(),
    545             source,
    546         })?;
    547         fs::create_dir_all(self.latest_attempt_dir()).map_err(|source| MycError::AuditIo {
    548             path: self.latest_attempt_dir(),
    549             source,
    550         })?;
    551         Ok(())
    552     }
    553 
    554     fn append_record_to_index_root(
    555         &self,
    556         index_root: &Path,
    557         record: &MycOperationAuditRecord,
    558     ) -> Result<(), MycError> {
    559         let Some(attempt_id) = record.attempt_id.as_deref() else {
    560             return Ok(());
    561         };
    562 
    563         let attempts_dir = index_root.join(MYC_OPERATION_AUDIT_ATTEMPTS_DIR_NAME);
    564         fs::create_dir_all(&attempts_dir).map_err(|source| MycError::AuditIo {
    565             path: attempts_dir.clone(),
    566             source,
    567         })?;
    568         let latest_dir = index_root.join(MYC_OPERATION_AUDIT_LATEST_DIR_NAME);
    569         fs::create_dir_all(&latest_dir).map_err(|source| MycError::AuditIo {
    570             path: latest_dir.clone(),
    571             source,
    572         })?;
    573 
    574         let encoded = serde_json::to_vec(record).map_err(|source| MycError::AuditSerialize {
    575             path: attempts_dir.join(format!("{}.jsonl", encode_index_component(attempt_id))),
    576             source,
    577         })?;
    578         self.append_encoded_record_line(
    579             &attempts_dir.join(format!("{}.jsonl", encode_index_component(attempt_id))),
    580             &encoded,
    581         )?;
    582         self.write_latest_attempt_pointer_to_root(index_root, record.operation, attempt_id)
    583     }
    584 
    585     fn write_latest_attempt_pointer(
    586         &self,
    587         operation: MycOperationAuditKind,
    588         attempt_id: &str,
    589     ) -> Result<(), MycError> {
    590         self.write_latest_attempt_pointer_to_root(&self.index_dir(), operation, attempt_id)
    591     }
    592 
    593     fn write_latest_attempt_pointer_to_root(
    594         &self,
    595         index_root: &Path,
    596         operation: MycOperationAuditKind,
    597         attempt_id: &str,
    598     ) -> Result<(), MycError> {
    599         let latest_dir = index_root.join(MYC_OPERATION_AUDIT_LATEST_DIR_NAME);
    600         fs::create_dir_all(&latest_dir).map_err(|source| MycError::AuditIo {
    601             path: latest_dir.clone(),
    602             source,
    603         })?;
    604         let path = latest_dir.join(format!(
    605             "{}{MYC_OPERATION_AUDIT_LATEST_SUFFIX}",
    606             operation_index_label(operation)
    607         ));
    608         write_atomic_text(&path, attempt_id)
    609     }
    610 
    611     fn rebuild_query_indexes_from_retained_logs(&self) -> Result<(), MycError> {
    612         let staging_root = self.audit_dir.join(MYC_OPERATION_AUDIT_INDEX_TMP_DIR_NAME);
    613         if staging_root.exists() {
    614             fs::remove_dir_all(&staging_root).map_err(|source| MycError::AuditIo {
    615                 path: staging_root.clone(),
    616                 source,
    617             })?;
    618         }
    619         fs::create_dir_all(&staging_root).map_err(|source| MycError::AuditIo {
    620             path: staging_root.clone(),
    621             source,
    622         })?;
    623 
    624         let mut retained_paths = self.read_paths_newest_first()?;
    625         retained_paths.reverse();
    626         for path in retained_paths {
    627             for record in self.read_records_from_path(&path)? {
    628                 self.append_record_to_index_root(&staging_root, &record)?;
    629             }
    630         }
    631 
    632         let final_root = self.index_dir();
    633         if final_root.exists() {
    634             fs::remove_dir_all(&final_root).map_err(|source| MycError::AuditIo {
    635                 path: final_root.clone(),
    636                 source,
    637             })?;
    638         }
    639         fs::rename(&staging_root, &final_root).map_err(|source| MycError::AuditIo {
    640             path: staging_root,
    641             source,
    642         })?;
    643         Ok(())
    644     }
    645 
    646     fn read_latest_attempt_id_from_path(&self, path: &Path) -> Result<Option<String>, MycError> {
    647         match fs::read_to_string(path) {
    648             Ok(contents) => {
    649                 let attempt_id = contents.trim();
    650                 if attempt_id.is_empty() {
    651                     Ok(None)
    652                 } else {
    653                     Ok(Some(attempt_id.to_owned()))
    654                 }
    655             }
    656             Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None),
    657             Err(source) => Err(MycError::AuditIo {
    658                 path: path.to_path_buf(),
    659                 source,
    660             }),
    661         }
    662     }
    663 
    664     fn read_recent_records_from_path_with_limit(
    665         &self,
    666         path: &Path,
    667         limit: usize,
    668     ) -> Result<Vec<MycOperationAuditRecord>, MycError> {
    669         self.read_recent_records_from_path_matching(path, limit, &|_| true)
    670     }
    671 
    672     fn read_recent_records_from_path_matching<F>(
    673         &self,
    674         path: &Path,
    675         limit: usize,
    676         predicate: &F,
    677     ) -> Result<Vec<MycOperationAuditRecord>, MycError>
    678     where
    679         F: Fn(&MycOperationAuditRecord) -> bool,
    680     {
    681         if limit == 0 || !path.exists() {
    682             return Ok(Vec::new());
    683         }
    684 
    685         let file = fs::File::open(path).map_err(|source| MycError::AuditIo {
    686             path: path.to_path_buf(),
    687             source,
    688         })?;
    689         let reader = BufReader::new(file);
    690         let mut recent_records = VecDeque::new();
    691 
    692         for (line_number, line) in reader.lines().enumerate() {
    693             let line = line.map_err(|source| MycError::AuditIo {
    694                 path: path.to_path_buf(),
    695                 source,
    696             })?;
    697             if line.trim().is_empty() {
    698                 continue;
    699             }
    700 
    701             let record =
    702                 serde_json::from_str::<MycOperationAuditRecord>(&line).map_err(|source| {
    703                     MycError::AuditParse {
    704                         path: path.to_path_buf(),
    705                         line_number: line_number + 1,
    706                         source,
    707                     }
    708                 })?;
    709             if !predicate(&record) {
    710                 continue;
    711             }
    712 
    713             if recent_records.len() == limit {
    714                 recent_records.pop_front();
    715             }
    716             recent_records.push_back(record);
    717         }
    718 
    719         Ok(recent_records.into_iter().collect())
    720     }
    721 }
    722 
    723 impl MycOperationAuditStore for MycJsonlOperationAuditStore {
    724     fn config(&self) -> &MycAuditConfig {
    725         &self.config
    726     }
    727 
    728     fn append(&self, record: &MycOperationAuditRecord) -> Result<(), MycError> {
    729         MycJsonlOperationAuditStore::append(self, record)
    730     }
    731 
    732     fn list_all(&self) -> Result<Vec<MycOperationAuditRecord>, MycError> {
    733         MycJsonlOperationAuditStore::list_all(self)
    734     }
    735 
    736     fn list_with_limit(&self, limit: usize) -> Result<Vec<MycOperationAuditRecord>, MycError> {
    737         MycJsonlOperationAuditStore::list_with_limit(self, limit)
    738     }
    739 
    740     fn list_for_connection_with_limit(
    741         &self,
    742         connection_id: &RadrootsNostrSignerConnectionId,
    743         limit: usize,
    744     ) -> Result<Vec<MycOperationAuditRecord>, MycError> {
    745         MycJsonlOperationAuditStore::list_for_connection_with_limit(self, connection_id, limit)
    746     }
    747 
    748     fn list_for_attempt_id_with_limit(
    749         &self,
    750         attempt_id: &str,
    751         limit: usize,
    752     ) -> Result<Vec<MycOperationAuditRecord>, MycError> {
    753         MycJsonlOperationAuditStore::list_for_attempt_id_with_limit(self, attempt_id, limit)
    754     }
    755 
    756     fn latest_attempt_id_for_operation(
    757         &self,
    758         operation: MycOperationAuditKind,
    759     ) -> Result<Option<String>, MycError> {
    760         MycJsonlOperationAuditStore::latest_attempt_id_for_operation(self, operation)
    761     }
    762 }
    763 
    764 fn parse_archive_index(file_name: &str) -> Option<usize> {
    765     file_name
    766         .strip_prefix(MYC_OPERATION_AUDIT_ARCHIVE_PREFIX)?
    767         .strip_suffix(MYC_OPERATION_AUDIT_ARCHIVE_SUFFIX)?
    768         .parse()
    769         .ok()
    770 }
    771 
    772 fn operation_index_label(kind: MycOperationAuditKind) -> &'static str {
    773     match kind {
    774         MycOperationAuditKind::DeliveryRecovery => "delivery_recovery",
    775         MycOperationAuditKind::ListenerResponsePublish => "listener_response_publish",
    776         MycOperationAuditKind::ConnectAcceptPublish => "connect_accept_publish",
    777         MycOperationAuditKind::AuthReplayPublish => "auth_replay_publish",
    778         MycOperationAuditKind::AuthReplayRestore => "auth_replay_restore",
    779         MycOperationAuditKind::DiscoveryHandlerFetch => "discovery_handler_fetch",
    780         MycOperationAuditKind::DiscoveryHandlerPublish => "discovery_handler_publish",
    781         MycOperationAuditKind::DiscoveryHandlerCompare => "discovery_handler_compare",
    782         MycOperationAuditKind::DiscoveryHandlerRefresh => "discovery_handler_refresh",
    783         MycOperationAuditKind::DiscoveryHandlerRepair => "discovery_handler_repair",
    784     }
    785 }
    786 
    787 fn encode_index_component(value: &str) -> String {
    788     let mut encoded = String::with_capacity(value.len() * 2);
    789     for byte in value.bytes() {
    790         encoded.push_str(&format!("{byte:02x}"));
    791     }
    792     encoded
    793 }
    794 
    795 fn write_atomic_text(path: &Path, contents: &str) -> Result<(), MycError> {
    796     let tmp_path = path.with_extension("tmp");
    797     fs::write(&tmp_path, contents).map_err(|source| MycError::AuditIo {
    798         path: tmp_path.clone(),
    799         source,
    800     })?;
    801     fs::rename(&tmp_path, path).map_err(|source| MycError::AuditIo {
    802         path: tmp_path,
    803         source,
    804     })?;
    805     Ok(())
    806 }
    807 
    808 fn now_unix_secs() -> u64 {
    809     SystemTime::now()
    810         .duration_since(UNIX_EPOCH)
    811         .expect("system clock is before unix epoch")
    812         .as_secs()
    813 }
    814 
    815 #[cfg(test)]
    816 mod tests {
    817     use std::fs;
    818 
    819     use radroots_nostr_signer::prelude::RadrootsNostrSignerConnectionId;
    820 
    821     use crate::config::MycAuditConfig;
    822 
    823     use super::{
    824         MycJsonlOperationAuditStore, MycOperationAuditKind, MycOperationAuditOutcome,
    825         MycOperationAuditRecord,
    826     };
    827 
    828     fn config() -> MycAuditConfig {
    829         MycAuditConfig {
    830             default_read_limit: 10,
    831             max_active_file_bytes: 512,
    832             max_archived_files: 2,
    833         }
    834     }
    835 
    836     #[test]
    837     fn append_and_list_operation_audit_records() {
    838         let temp = tempfile::tempdir().expect("tempdir");
    839         let store = MycJsonlOperationAuditStore::new(temp.path(), config());
    840         let connection_id =
    841             RadrootsNostrSignerConnectionId::parse("connection-1").expect("connection id");
    842 
    843         store
    844             .append(
    845                 &MycOperationAuditRecord::new(
    846                     MycOperationAuditKind::ConnectAcceptPublish,
    847                     MycOperationAuditOutcome::Rejected,
    848                     Some(&connection_id),
    849                     Some("request-1"),
    850                     2,
    851                     0,
    852                     "0/2 relays acknowledged publish; failures: relay-a: rejected",
    853                 )
    854                 .with_attempt_id("attempt-1"),
    855             )
    856             .expect("append rejected record");
    857         store
    858             .append(&MycOperationAuditRecord::new(
    859                 MycOperationAuditKind::AuthReplayRestore,
    860                 MycOperationAuditOutcome::Restored,
    861                 Some(&connection_id),
    862                 Some("request-1"),
    863                 0,
    864                 0,
    865                 "restored pending auth challenge after replay publish rejection",
    866             ))
    867             .expect("append restored record");
    868 
    869         let records = store.list().expect("list records");
    870         assert_eq!(records.len(), 2);
    871         assert_eq!(
    872             records[0].operation,
    873             MycOperationAuditKind::ConnectAcceptPublish
    874         );
    875         assert_eq!(records[0].outcome, MycOperationAuditOutcome::Rejected);
    876         assert_eq!(records[0].connection_id.as_deref(), Some("connection-1"));
    877         assert_eq!(records[0].request_id.as_deref(), Some("request-1"));
    878         assert_eq!(records[0].attempt_id.as_deref(), Some("attempt-1"));
    879         assert_eq!(records[0].relay_count, 2);
    880         assert_eq!(records[0].acknowledged_relay_count, 0);
    881 
    882         let connection_records = store
    883             .list_for_connection(&connection_id)
    884             .expect("list connection records");
    885         assert_eq!(connection_records, records);
    886     }
    887 
    888     #[test]
    889     fn list_returns_empty_when_audit_file_is_missing() {
    890         let temp = tempfile::tempdir().expect("tempdir");
    891         let store = MycJsonlOperationAuditStore::new(temp.path(), config());
    892 
    893         assert!(store.list().expect("list missing records").is_empty());
    894     }
    895 
    896     #[test]
    897     fn rotation_and_bounded_reads_keep_recent_records() {
    898         let temp = tempfile::tempdir().expect("tempdir");
    899         let store = MycJsonlOperationAuditStore::new(
    900             temp.path(),
    901             MycAuditConfig {
    902                 default_read_limit: 3,
    903                 max_active_file_bytes: 180,
    904                 max_archived_files: 2,
    905             },
    906         );
    907 
    908         for index in 0..6 {
    909             store
    910                 .append(
    911                     &MycOperationAuditRecord::new(
    912                         MycOperationAuditKind::ListenerResponsePublish,
    913                         MycOperationAuditOutcome::Rejected,
    914                         None,
    915                         Some(&format!("request-{index}")),
    916                         1,
    917                         0,
    918                         format!("failure-{index}"),
    919                     )
    920                     .with_attempt_id(format!("attempt-{index}")),
    921                 )
    922                 .expect("append record");
    923         }
    924 
    925         let records = store.list().expect("list bounded records");
    926         assert_eq!(records.len(), 3);
    927         assert_eq!(records[0].request_id.as_deref(), Some("request-3"));
    928         assert_eq!(records[2].request_id.as_deref(), Some("request-5"));
    929         assert!(temp.path().join("operations.1.jsonl").exists());
    930         assert!(temp.path().join("operations.2.jsonl").exists());
    931         assert!(!temp.path().join("operations.3.jsonl").exists());
    932     }
    933 
    934     #[test]
    935     fn list_for_attempt_and_latest_attempt_id_work() {
    936         let temp = tempfile::tempdir().expect("tempdir");
    937         let store = MycJsonlOperationAuditStore::new(temp.path(), config());
    938 
    939         store
    940             .append(
    941                 &MycOperationAuditRecord::new(
    942                     MycOperationAuditKind::DiscoveryHandlerRefresh,
    943                     MycOperationAuditOutcome::Rejected,
    944                     None,
    945                     None,
    946                     2,
    947                     0,
    948                     "first attempt rejected",
    949                 )
    950                 .with_attempt_id("attempt-1")
    951                 .with_planned_repair_relays(vec!["wss://relay-a.example.com".to_owned()])
    952                 .with_blocked_relays(
    953                     "unavailable_relays",
    954                     vec!["wss://relay-b.example.com".to_owned()],
    955                 ),
    956             )
    957             .expect("append first attempt");
    958         store
    959             .append(
    960                 &MycOperationAuditRecord::new(
    961                     MycOperationAuditKind::DiscoveryHandlerRepair,
    962                     MycOperationAuditOutcome::Rejected,
    963                     None,
    964                     None,
    965                     1,
    966                     0,
    967                     "relay-a rejected",
    968                 )
    969                 .with_attempt_id("attempt-1")
    970                 .with_relay_url("wss://relay-a.example.com"),
    971             )
    972             .expect("append first repair");
    973         store
    974             .append(
    975                 &MycOperationAuditRecord::new(
    976                     MycOperationAuditKind::DiscoveryHandlerRefresh,
    977                     MycOperationAuditOutcome::Succeeded,
    978                     None,
    979                     None,
    980                     1,
    981                     1,
    982                     "second attempt succeeded",
    983                 )
    984                 .with_attempt_id("attempt-2"),
    985             )
    986             .expect("append second attempt");
    987 
    988         let attempt_records = store
    989             .list_for_attempt_id("attempt-1")
    990             .expect("list attempt records");
    991         assert_eq!(attempt_records.len(), 2);
    992         assert!(
    993             attempt_records
    994                 .iter()
    995                 .all(|record| record.attempt_id.as_deref() == Some("attempt-1"))
    996         );
    997         assert_eq!(
    998             attempt_records[0].planned_repair_relays,
    999             vec!["wss://relay-a.example.com".to_owned()]
   1000         );
   1001         assert_eq!(
   1002             attempt_records[0].blocked_relays,
   1003             vec!["wss://relay-b.example.com".to_owned()]
   1004         );
   1005         assert_eq!(
   1006             attempt_records[0].blocked_reason.as_deref(),
   1007             Some("unavailable_relays")
   1008         );
   1009         assert_eq!(
   1010             store
   1011                 .latest_attempt_id_for_operation(MycOperationAuditKind::DiscoveryHandlerRefresh)
   1012                 .expect("latest attempt"),
   1013             Some("attempt-2".to_owned())
   1014         );
   1015     }
   1016 
   1017     #[test]
   1018     fn attempt_lookup_rebuilds_indexes_from_retained_logs() {
   1019         let temp = tempfile::tempdir().expect("tempdir");
   1020         let store = MycJsonlOperationAuditStore::new(temp.path(), config());
   1021 
   1022         store
   1023             .append(
   1024                 &MycOperationAuditRecord::new(
   1025                     MycOperationAuditKind::DiscoveryHandlerRefresh,
   1026                     MycOperationAuditOutcome::Rejected,
   1027                     None,
   1028                     None,
   1029                     2,
   1030                     0,
   1031                     "first attempt rejected",
   1032                 )
   1033                 .with_attempt_id("attempt-1"),
   1034             )
   1035             .expect("append first attempt");
   1036         store
   1037             .append(
   1038                 &MycOperationAuditRecord::new(
   1039                     MycOperationAuditKind::DiscoveryHandlerRefresh,
   1040                     MycOperationAuditOutcome::Succeeded,
   1041                     None,
   1042                     None,
   1043                     1,
   1044                     1,
   1045                     "second attempt succeeded",
   1046                 )
   1047                 .with_attempt_id("attempt-2"),
   1048             )
   1049             .expect("append second attempt");
   1050 
   1051         fs::remove_dir_all(store.index_dir()).expect("remove index dir");
   1052 
   1053         let rebuilt_attempt_records = store
   1054             .list_for_attempt_id("attempt-1")
   1055             .expect("rebuild attempt records");
   1056         assert_eq!(rebuilt_attempt_records.len(), 1);
   1057         assert_eq!(
   1058             rebuilt_attempt_records[0].attempt_id.as_deref(),
   1059             Some("attempt-1")
   1060         );
   1061         assert_eq!(
   1062             store
   1063                 .latest_attempt_id_for_operation(MycOperationAuditKind::DiscoveryHandlerRefresh)
   1064                 .expect("latest attempt after rebuild"),
   1065             Some("attempt-2".to_owned())
   1066         );
   1067         assert!(store.attempt_index_path("attempt-1").exists());
   1068         assert!(
   1069             store
   1070                 .latest_attempt_path(MycOperationAuditKind::DiscoveryHandlerRefresh)
   1071                 .exists()
   1072         );
   1073     }
   1074 }