myc

Self-custodial remote signer for Radroots apps
git clone https://radroots.dev/git/myc.git
Log | Files | Refs | README | LICENSE

audit_sqlite.rs (29284B)


      1 use std::path::{Path, PathBuf};
      2 
      3 use radroots_nostr_signer::prelude::RadrootsNostrSignerConnectionId;
      4 use radroots_sql_core::migrations::{Migration, migrations_run_all_up};
      5 use radroots_sql_core::{SqlExecutor, SqliteExecutor};
      6 use serde::Deserialize;
      7 use serde::de::DeserializeOwned;
      8 use serde_json::{Value, json};
      9 
     10 use crate::audit::{
     11     MycOperationAuditKind, MycOperationAuditOutcome, MycOperationAuditRecord,
     12     MycOperationAuditStore,
     13 };
     14 use crate::config::{MycAuditConfig, MycTransportDeliveryPolicy};
     15 use crate::error::MycError;
     16 
     17 const MYC_OPERATION_AUDIT_SQLITE_FILE_NAME: &str = "operations.sqlite";
     18 #[cfg(test)]
     19 const MYC_OPERATION_AUDIT_MEMORY_PATH: &str = ":memory:";
     20 
     21 static MYC_OPERATION_AUDIT_MIGRATIONS: &[Migration] = &[Migration {
     22     name: "0000_runtime_audit_init",
     23     up_sql: include_str!("../migrations/0000_runtime_audit_init.up.sql"),
     24     down_sql: include_str!("../migrations/0000_runtime_audit_init.down.sql"),
     25 }];
     26 
     27 /// Myc keeps its operational audit store local to the service boundary.
     28 pub struct MycSqliteOperationAuditStore {
     29     db: MycOperationAuditSqliteDb,
     30     config: MycAuditConfig,
     31 }
     32 
     33 struct MycOperationAuditSqliteDb {
     34     path: PathBuf,
     35     executor: SqliteExecutor,
     36     file_backed: bool,
     37 }
     38 
     39 #[derive(Debug, Deserialize)]
     40 struct MycOperationAuditRow {
     41     audit_record_id: i64,
     42     recorded_at_unix: u64,
     43     operation: String,
     44     outcome: String,
     45     relay_url: Option<String>,
     46     connection_id: Option<String>,
     47     request_id: Option<String>,
     48     attempt_id: Option<String>,
     49     planned_repair_relays_json: String,
     50     blocked_relays_json: String,
     51     blocked_reason: Option<String>,
     52     delivery_policy: Option<String>,
     53     required_acknowledged_relay_count: Option<i64>,
     54     publish_attempt_count: Option<i64>,
     55     relay_count: i64,
     56     acknowledged_relay_count: i64,
     57     relay_outcome_summary: String,
     58 }
     59 
     60 #[derive(Debug, Deserialize)]
     61 struct MycLatestAttemptRow {
     62     attempt_id: String,
     63 }
     64 
     65 impl MycSqliteOperationAuditStore {
     66     pub fn open(audit_dir: impl AsRef<Path>, config: MycAuditConfig) -> Result<Self, MycError> {
     67         let db = MycOperationAuditSqliteDb::open(
     68             audit_dir
     69                 .as_ref()
     70                 .join(MYC_OPERATION_AUDIT_SQLITE_FILE_NAME),
     71         )?;
     72         Ok(Self { db, config })
     73     }
     74 
     75     #[cfg(test)]
     76     pub fn open_memory(config: MycAuditConfig) -> Result<Self, MycError> {
     77         let db = MycOperationAuditSqliteDb::open_memory()?;
     78         Ok(Self { db, config })
     79     }
     80 
     81     pub fn path(&self) -> &Path {
     82         self.db.path()
     83     }
     84 
     85     pub fn config(&self) -> &MycAuditConfig {
     86         &self.config
     87     }
     88 
     89     pub fn append(&self, record: &MycOperationAuditRecord) -> Result<(), MycError> {
     90         let planned_repair_relays_json =
     91             serialize_json_field(self.db.path(), &record.planned_repair_relays)?;
     92         let blocked_relays_json = serialize_json_field(self.db.path(), &record.blocked_relays)?;
     93         exec_json(
     94             self.db.path(),
     95             self.db.executor(),
     96             "INSERT INTO myc_operation_audit(recorded_at_unix, operation, outcome, relay_url, connection_id, request_id, attempt_id, planned_repair_relays_json, blocked_relays_json, blocked_reason, delivery_policy, required_acknowledged_relay_count, publish_attempt_count, relay_count, acknowledged_relay_count, relay_outcome_summary) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
     97             json!([
     98                 record.recorded_at_unix,
     99                 operation_kind_label(record.operation),
    100                 operation_outcome_label(record.outcome),
    101                 record.relay_url.clone(),
    102                 record.connection_id.clone(),
    103                 record.request_id.clone(),
    104                 record.attempt_id.clone(),
    105                 planned_repair_relays_json,
    106                 blocked_relays_json,
    107                 record.blocked_reason.clone(),
    108                 record
    109                     .delivery_policy
    110                     .map(MycTransportDeliveryPolicy::as_str),
    111                 record.required_acknowledged_relay_count,
    112                 record.publish_attempt_count,
    113                 record.relay_count,
    114                 record.acknowledged_relay_count,
    115                 record.relay_outcome_summary.clone(),
    116             ]),
    117         )
    118     }
    119 
    120     pub fn list_all(&self) -> Result<Vec<MycOperationAuditRecord>, MycError> {
    121         self.query_records(
    122             "SELECT audit_record_id, recorded_at_unix, operation, outcome, relay_url, connection_id, request_id, attempt_id, planned_repair_relays_json, blocked_relays_json, blocked_reason, delivery_policy, required_acknowledged_relay_count, publish_attempt_count, relay_count, acknowledged_relay_count, relay_outcome_summary FROM myc_operation_audit ORDER BY recorded_at_unix ASC, audit_record_id ASC",
    123             json!([]),
    124         )
    125     }
    126 
    127     pub fn list_with_limit(&self, limit: usize) -> Result<Vec<MycOperationAuditRecord>, MycError> {
    128         if limit == 0 {
    129             return Ok(Vec::new());
    130         }
    131 
    132         let mut records = self.query_records_with_limit(
    133             "SELECT audit_record_id, recorded_at_unix, operation, outcome, relay_url, connection_id, request_id, attempt_id, planned_repair_relays_json, blocked_relays_json, blocked_reason, delivery_policy, required_acknowledged_relay_count, publish_attempt_count, relay_count, acknowledged_relay_count, relay_outcome_summary FROM myc_operation_audit ORDER BY recorded_at_unix DESC, audit_record_id DESC",
    134             json!([]),
    135             limit,
    136         )?;
    137         records.reverse();
    138         Ok(records)
    139     }
    140 
    141     pub fn list_for_connection_with_limit(
    142         &self,
    143         connection_id: &RadrootsNostrSignerConnectionId,
    144         limit: usize,
    145     ) -> Result<Vec<MycOperationAuditRecord>, MycError> {
    146         if limit == 0 {
    147             return Ok(Vec::new());
    148         }
    149 
    150         let mut records = self.query_records_with_limit(
    151             "SELECT audit_record_id, recorded_at_unix, operation, outcome, relay_url, connection_id, request_id, attempt_id, planned_repair_relays_json, blocked_relays_json, blocked_reason, delivery_policy, required_acknowledged_relay_count, publish_attempt_count, relay_count, acknowledged_relay_count, relay_outcome_summary FROM myc_operation_audit WHERE connection_id = ? ORDER BY recorded_at_unix DESC, audit_record_id DESC",
    152             json!([connection_id.as_str()]),
    153             limit,
    154         )?;
    155         records.reverse();
    156         Ok(records)
    157     }
    158 
    159     pub fn list_for_attempt_id_with_limit(
    160         &self,
    161         attempt_id: &str,
    162         limit: usize,
    163     ) -> Result<Vec<MycOperationAuditRecord>, MycError> {
    164         if limit == 0 {
    165             return Ok(Vec::new());
    166         }
    167 
    168         let mut records = self.query_records_with_limit(
    169             "SELECT audit_record_id, recorded_at_unix, operation, outcome, relay_url, connection_id, request_id, attempt_id, planned_repair_relays_json, blocked_relays_json, blocked_reason, delivery_policy, required_acknowledged_relay_count, publish_attempt_count, relay_count, acknowledged_relay_count, relay_outcome_summary FROM myc_operation_audit WHERE attempt_id = ? ORDER BY recorded_at_unix DESC, audit_record_id DESC",
    170             json!([attempt_id]),
    171             limit,
    172         )?;
    173         records.reverse();
    174         Ok(records)
    175     }
    176 
    177     pub fn latest_attempt_id_for_operation(
    178         &self,
    179         operation: MycOperationAuditKind,
    180     ) -> Result<Option<String>, MycError> {
    181         let rows: Vec<MycLatestAttemptRow> = query_rows(
    182             self.db.path(),
    183             self.db.executor(),
    184             "SELECT attempt_id FROM myc_operation_audit WHERE operation = ? AND attempt_id IS NOT NULL ORDER BY recorded_at_unix DESC, audit_record_id DESC LIMIT 1",
    185             json!([operation_kind_label(operation)]),
    186         )?;
    187         Ok(rows.into_iter().next().map(|row| row.attempt_id))
    188     }
    189 
    190     fn query_records(
    191         &self,
    192         sql: &str,
    193         params: Value,
    194     ) -> Result<Vec<MycOperationAuditRecord>, MycError> {
    195         let rows: Vec<MycOperationAuditRow> =
    196             query_rows(self.db.path(), self.db.executor(), sql, params)?;
    197         rows.into_iter()
    198             .map(|row| row.into_record(self.db.path()))
    199             .collect()
    200     }
    201 
    202     fn query_records_with_limit(
    203         &self,
    204         base_sql: &str,
    205         params: Value,
    206         limit: usize,
    207     ) -> Result<Vec<MycOperationAuditRecord>, MycError> {
    208         if limit == usize::MAX {
    209             return self.query_records(base_sql, params);
    210         }
    211 
    212         let limit = i64::try_from(limit).map_err(|_| {
    213             MycError::InvalidOperation("audit read limit exceeds sqlite range".to_owned())
    214         })?;
    215         let mut params = params.as_array().cloned().unwrap_or_default();
    216         params.push(Value::from(limit));
    217         let sql = format!("{base_sql} LIMIT ?");
    218         self.query_records(sql.as_str(), Value::Array(params))
    219     }
    220 }
    221 
    222 impl MycOperationAuditStore for MycSqliteOperationAuditStore {
    223     fn config(&self) -> &MycAuditConfig {
    224         &self.config
    225     }
    226 
    227     fn append(&self, record: &MycOperationAuditRecord) -> Result<(), MycError> {
    228         MycSqliteOperationAuditStore::append(self, record)
    229     }
    230 
    231     fn list_all(&self) -> Result<Vec<MycOperationAuditRecord>, MycError> {
    232         MycSqliteOperationAuditStore::list_all(self)
    233     }
    234 
    235     fn list_with_limit(&self, limit: usize) -> Result<Vec<MycOperationAuditRecord>, MycError> {
    236         MycSqliteOperationAuditStore::list_with_limit(self, limit)
    237     }
    238 
    239     fn list_for_connection_with_limit(
    240         &self,
    241         connection_id: &RadrootsNostrSignerConnectionId,
    242         limit: usize,
    243     ) -> Result<Vec<MycOperationAuditRecord>, MycError> {
    244         MycSqliteOperationAuditStore::list_for_connection_with_limit(self, connection_id, limit)
    245     }
    246 
    247     fn list_for_attempt_id_with_limit(
    248         &self,
    249         attempt_id: &str,
    250         limit: usize,
    251     ) -> Result<Vec<MycOperationAuditRecord>, MycError> {
    252         MycSqliteOperationAuditStore::list_for_attempt_id_with_limit(self, attempt_id, limit)
    253     }
    254 
    255     fn latest_attempt_id_for_operation(
    256         &self,
    257         operation: MycOperationAuditKind,
    258     ) -> Result<Option<String>, MycError> {
    259         MycSqliteOperationAuditStore::latest_attempt_id_for_operation(self, operation)
    260     }
    261 }
    262 
    263 impl MycOperationAuditSqliteDb {
    264     fn open(path: impl AsRef<Path>) -> Result<Self, MycError> {
    265         let path = path.as_ref().to_path_buf();
    266         if let Some(parent) = path.parent()
    267             && !parent.as_os_str().is_empty()
    268         {
    269             std::fs::create_dir_all(parent).map_err(|source| MycError::CreateDir {
    270                 path: parent.to_path_buf(),
    271                 source,
    272             })?;
    273         }
    274         let executor = SqliteExecutor::open(&path).map_err(|source| MycError::AuditSql {
    275             path: path.clone(),
    276             source,
    277         })?;
    278         let db = Self {
    279             path,
    280             executor,
    281             file_backed: true,
    282         };
    283         db.configure()?;
    284         db.migrate_up()?;
    285         Ok(db)
    286     }
    287 
    288     #[cfg(test)]
    289     fn open_memory() -> Result<Self, MycError> {
    290         let path = PathBuf::from(MYC_OPERATION_AUDIT_MEMORY_PATH);
    291         let executor = SqliteExecutor::open_memory().map_err(|source| MycError::AuditSql {
    292             path: path.clone(),
    293             source,
    294         })?;
    295         let db = Self {
    296             path,
    297             executor,
    298             file_backed: false,
    299         };
    300         db.configure()?;
    301         db.migrate_up()?;
    302         Ok(db)
    303     }
    304 
    305     fn path(&self) -> &Path {
    306         &self.path
    307     }
    308 
    309     fn executor(&self) -> &SqliteExecutor {
    310         &self.executor
    311     }
    312 
    313     fn migrate_up(&self) -> Result<(), MycError> {
    314         migrations_run_all_up(&self.executor, MYC_OPERATION_AUDIT_MIGRATIONS).map_err(|source| {
    315             MycError::AuditSql {
    316                 path: self.path.clone(),
    317                 source,
    318             }
    319         })
    320     }
    321 
    322     #[cfg(test)]
    323     fn migrate_down(&self) -> Result<(), MycError> {
    324         use radroots_sql_core::migrations::migrations_run_all_down;
    325 
    326         migrations_run_all_down(&self.executor, MYC_OPERATION_AUDIT_MIGRATIONS).map_err(|source| {
    327             MycError::AuditSql {
    328                 path: self.path.clone(),
    329                 source,
    330             }
    331         })
    332     }
    333 
    334     fn configure(&self) -> Result<(), MycError> {
    335         let pragma_batch = if self.file_backed {
    336             "PRAGMA foreign_keys = ON;
    337              PRAGMA synchronous = FULL;
    338              PRAGMA wal_autocheckpoint = 1000;
    339              PRAGMA busy_timeout = 5000;
    340              PRAGMA temp_store = MEMORY;"
    341         } else {
    342             "PRAGMA foreign_keys = ON;
    343              PRAGMA synchronous = NORMAL;
    344              PRAGMA busy_timeout = 5000;
    345              PRAGMA temp_store = MEMORY;"
    346         };
    347         let _ = self
    348             .executor
    349             .exec(pragma_batch, "[]")
    350             .map_err(|source| MycError::AuditSql {
    351                 path: self.path.clone(),
    352                 source,
    353             })?;
    354         let journal_mode_sql = if self.file_backed {
    355             "PRAGMA journal_mode = WAL"
    356         } else {
    357             "PRAGMA journal_mode = MEMORY"
    358         };
    359         let _ = self
    360             .executor
    361             .query_raw(journal_mode_sql, "[]")
    362             .map_err(|source| MycError::AuditSql {
    363                 path: self.path.clone(),
    364                 source,
    365             })?;
    366         Ok(())
    367     }
    368 }
    369 
    370 impl MycOperationAuditRow {
    371     fn into_record(self, path: &Path) -> Result<MycOperationAuditRecord, MycError> {
    372         let _audit_record_id = self.audit_record_id;
    373         Ok(MycOperationAuditRecord {
    374             recorded_at_unix: self.recorded_at_unix,
    375             operation: parse_operation_kind(self.operation.as_str())?,
    376             outcome: parse_operation_outcome(self.outcome.as_str())?,
    377             relay_url: self.relay_url,
    378             connection_id: self.connection_id,
    379             request_id: self.request_id,
    380             attempt_id: self.attempt_id,
    381             planned_repair_relays: parse_json_field(
    382                 path,
    383                 self.planned_repair_relays_json.as_str(),
    384             )?,
    385             blocked_relays: parse_json_field(path, self.blocked_relays_json.as_str())?,
    386             blocked_reason: self.blocked_reason,
    387             delivery_policy: self
    388                 .delivery_policy
    389                 .as_deref()
    390                 .map(parse_delivery_policy)
    391                 .transpose()?,
    392             required_acknowledged_relay_count: self
    393                 .required_acknowledged_relay_count
    394                 .map(parse_optional_usize)
    395                 .transpose()?,
    396             publish_attempt_count: self
    397                 .publish_attempt_count
    398                 .map(parse_optional_usize)
    399                 .transpose()?,
    400             relay_count: parse_required_usize(self.relay_count, "relay_count")?,
    401             acknowledged_relay_count: parse_required_usize(
    402                 self.acknowledged_relay_count,
    403                 "acknowledged_relay_count",
    404             )?,
    405             relay_outcome_summary: self.relay_outcome_summary,
    406         })
    407     }
    408 }
    409 
    410 fn query_rows<T: DeserializeOwned>(
    411     path: &Path,
    412     executor: &impl SqlExecutor,
    413     sql: &str,
    414     params: Value,
    415 ) -> Result<Vec<T>, MycError> {
    416     let raw = executor
    417         .query_raw(sql, params.to_string().as_str())
    418         .map_err(|source| MycError::AuditSql {
    419             path: path.to_path_buf(),
    420             source,
    421         })?;
    422     serde_json::from_str(&raw).map_err(|source| MycError::AuditSqlDecode {
    423         path: path.to_path_buf(),
    424         source,
    425     })
    426 }
    427 
    428 fn exec_json(
    429     path: &Path,
    430     executor: &impl SqlExecutor,
    431     sql: &str,
    432     params: Value,
    433 ) -> Result<(), MycError> {
    434     let _ = executor
    435         .exec(sql, params.to_string().as_str())
    436         .map_err(|source| MycError::AuditSql {
    437             path: path.to_path_buf(),
    438             source,
    439         })?;
    440     Ok(())
    441 }
    442 
    443 fn parse_json_field<T: DeserializeOwned>(path: &Path, value: &str) -> Result<T, MycError> {
    444     serde_json::from_str(value).map_err(|source| MycError::AuditSqlDecode {
    445         path: path.to_path_buf(),
    446         source,
    447     })
    448 }
    449 
    450 fn serialize_json_field<T: serde::Serialize>(path: &Path, value: &T) -> Result<String, MycError> {
    451     serde_json::to_string(value).map_err(|source| MycError::AuditSerialize {
    452         path: path.to_path_buf(),
    453         source,
    454     })
    455 }
    456 
    457 fn parse_required_usize(value: i64, field: &str) -> Result<usize, MycError> {
    458     usize::try_from(value).map_err(|_| {
    459         MycError::InvalidOperation(format!(
    460             "sqlite runtime audit field `{field}` is out of range for usize"
    461         ))
    462     })
    463 }
    464 
    465 fn parse_optional_usize(value: i64) -> Result<usize, MycError> {
    466     usize::try_from(value).map_err(|_| {
    467         MycError::InvalidOperation(
    468             "sqlite runtime audit optional integer field is out of range for usize".to_owned(),
    469         )
    470     })
    471 }
    472 
    473 fn operation_kind_label(value: MycOperationAuditKind) -> &'static str {
    474     match value {
    475         MycOperationAuditKind::DeliveryRecovery => "delivery_recovery",
    476         MycOperationAuditKind::ListenerResponsePublish => "listener_response_publish",
    477         MycOperationAuditKind::ConnectAcceptPublish => "connect_accept_publish",
    478         MycOperationAuditKind::AuthReplayPublish => "auth_replay_publish",
    479         MycOperationAuditKind::AuthReplayRestore => "auth_replay_restore",
    480         MycOperationAuditKind::DiscoveryHandlerFetch => "discovery_handler_fetch",
    481         MycOperationAuditKind::DiscoveryHandlerPublish => "discovery_handler_publish",
    482         MycOperationAuditKind::DiscoveryHandlerCompare => "discovery_handler_compare",
    483         MycOperationAuditKind::DiscoveryHandlerRefresh => "discovery_handler_refresh",
    484         MycOperationAuditKind::DiscoveryHandlerRepair => "discovery_handler_repair",
    485     }
    486 }
    487 
    488 fn parse_operation_kind(value: &str) -> Result<MycOperationAuditKind, MycError> {
    489     match value {
    490         "delivery_recovery" => Ok(MycOperationAuditKind::DeliveryRecovery),
    491         "listener_response_publish" => Ok(MycOperationAuditKind::ListenerResponsePublish),
    492         "connect_accept_publish" => Ok(MycOperationAuditKind::ConnectAcceptPublish),
    493         "auth_replay_publish" => Ok(MycOperationAuditKind::AuthReplayPublish),
    494         "auth_replay_restore" => Ok(MycOperationAuditKind::AuthReplayRestore),
    495         "discovery_handler_fetch" => Ok(MycOperationAuditKind::DiscoveryHandlerFetch),
    496         "discovery_handler_publish" => Ok(MycOperationAuditKind::DiscoveryHandlerPublish),
    497         "discovery_handler_compare" => Ok(MycOperationAuditKind::DiscoveryHandlerCompare),
    498         "discovery_handler_refresh" => Ok(MycOperationAuditKind::DiscoveryHandlerRefresh),
    499         "discovery_handler_repair" => Ok(MycOperationAuditKind::DiscoveryHandlerRepair),
    500         other => Err(MycError::InvalidOperation(format!(
    501             "unknown sqlite runtime audit operation `{other}`"
    502         ))),
    503     }
    504 }
    505 
    506 fn operation_outcome_label(value: MycOperationAuditOutcome) -> &'static str {
    507     match value {
    508         MycOperationAuditOutcome::Succeeded => "succeeded",
    509         MycOperationAuditOutcome::Rejected => "rejected",
    510         MycOperationAuditOutcome::Restored => "restored",
    511         MycOperationAuditOutcome::Unavailable => "unavailable",
    512         MycOperationAuditOutcome::Missing => "missing",
    513         MycOperationAuditOutcome::Matched => "matched",
    514         MycOperationAuditOutcome::Drifted => "drifted",
    515         MycOperationAuditOutcome::Conflicted => "conflicted",
    516         MycOperationAuditOutcome::Skipped => "skipped",
    517     }
    518 }
    519 
    520 fn parse_operation_outcome(value: &str) -> Result<MycOperationAuditOutcome, MycError> {
    521     match value {
    522         "succeeded" => Ok(MycOperationAuditOutcome::Succeeded),
    523         "rejected" => Ok(MycOperationAuditOutcome::Rejected),
    524         "restored" => Ok(MycOperationAuditOutcome::Restored),
    525         "unavailable" => Ok(MycOperationAuditOutcome::Unavailable),
    526         "missing" => Ok(MycOperationAuditOutcome::Missing),
    527         "matched" => Ok(MycOperationAuditOutcome::Matched),
    528         "drifted" => Ok(MycOperationAuditOutcome::Drifted),
    529         "conflicted" => Ok(MycOperationAuditOutcome::Conflicted),
    530         "skipped" => Ok(MycOperationAuditOutcome::Skipped),
    531         other => Err(MycError::InvalidOperation(format!(
    532             "unknown sqlite runtime audit outcome `{other}`"
    533         ))),
    534     }
    535 }
    536 
    537 fn parse_delivery_policy(value: &str) -> Result<MycTransportDeliveryPolicy, MycError> {
    538     match value {
    539         "any" => Ok(MycTransportDeliveryPolicy::Any),
    540         "quorum" => Ok(MycTransportDeliveryPolicy::Quorum),
    541         "all" => Ok(MycTransportDeliveryPolicy::All),
    542         other => Err(MycError::InvalidOperation(format!(
    543             "unknown sqlite runtime audit delivery policy `{other}`"
    544         ))),
    545     }
    546 }
    547 
    548 #[cfg(test)]
    549 mod tests {
    550     use radroots_nostr_signer::prelude::RadrootsNostrSignerConnectionId;
    551     use radroots_sql_core::SqlExecutor;
    552     use serde_json::Value;
    553 
    554     use crate::audit::{
    555         MycOperationAuditKind, MycOperationAuditOutcome, MycOperationAuditRecord,
    556         MycOperationAuditStore,
    557     };
    558     use crate::config::MycAuditConfig;
    559 
    560     use super::{MycOperationAuditSqliteDb, MycSqliteOperationAuditStore};
    561 
    562     fn config() -> MycAuditConfig {
    563         MycAuditConfig {
    564             default_read_limit: 10,
    565             max_active_file_bytes: 512,
    566             max_archived_files: 2,
    567         }
    568     }
    569 
    570     fn query_values(
    571         store: &MycSqliteOperationAuditStore,
    572         sql: &str,
    573     ) -> Vec<serde_json::Map<String, Value>> {
    574         let raw = store.db.executor().query_raw(sql, "[]").expect("query");
    575         serde_json::from_str(&raw).expect("rows")
    576     }
    577 
    578     #[test]
    579     fn open_memory_bootstraps_runtime_audit_schema() {
    580         let db = MycOperationAuditSqliteDb::open_memory().expect("open memory db");
    581         db.migrate_up().expect("rerun migrations");
    582 
    583         let raw = db
    584             .executor()
    585             .query_raw(
    586                 "SELECT name FROM sqlite_master WHERE type = 'table' ORDER BY name",
    587                 "[]",
    588             )
    589             .expect("query");
    590         let tables: Vec<serde_json::Map<String, Value>> =
    591             serde_json::from_str(&raw).expect("table rows");
    592         let table_names = tables
    593             .into_iter()
    594             .filter_map(|row| {
    595                 row.get("name")
    596                     .and_then(Value::as_str)
    597                     .map(ToOwned::to_owned)
    598             })
    599             .collect::<Vec<_>>();
    600         assert!(table_names.iter().any(|name| name == "__migrations"));
    601         assert!(table_names.iter().any(|name| name == "myc_operation_audit"));
    602     }
    603 
    604     #[test]
    605     fn append_and_list_records_roundtrip_through_sqlite() {
    606         let store = MycSqliteOperationAuditStore::open_memory(config()).expect("sqlite store");
    607         let connection_id =
    608             RadrootsNostrSignerConnectionId::parse("connection-1").expect("connection id");
    609 
    610         store
    611             .append(
    612                 &MycOperationAuditRecord::new(
    613                     MycOperationAuditKind::ConnectAcceptPublish,
    614                     MycOperationAuditOutcome::Rejected,
    615                     Some(&connection_id),
    616                     Some("request-1"),
    617                     2,
    618                     0,
    619                     "0/2 relays acknowledged publish; failures: relay-a: rejected",
    620                 )
    621                 .with_attempt_id("attempt-1"),
    622             )
    623             .expect("append rejected record");
    624         store
    625             .append(&MycOperationAuditRecord::new(
    626                 MycOperationAuditKind::AuthReplayRestore,
    627                 MycOperationAuditOutcome::Restored,
    628                 Some(&connection_id),
    629                 Some("request-1"),
    630                 0,
    631                 0,
    632                 "restored pending auth challenge after replay publish rejection",
    633             ))
    634             .expect("append restored record");
    635 
    636         let records = store.list().expect("list records");
    637         assert_eq!(records.len(), 2);
    638         assert_eq!(
    639             records[0].operation,
    640             MycOperationAuditKind::ConnectAcceptPublish
    641         );
    642         assert_eq!(records[0].outcome, MycOperationAuditOutcome::Rejected);
    643         assert_eq!(records[0].attempt_id.as_deref(), Some("attempt-1"));
    644 
    645         let connection_records = store
    646             .list_for_connection(&connection_id)
    647             .expect("list connection records");
    648         assert_eq!(connection_records, records);
    649     }
    650 
    651     #[test]
    652     fn list_for_attempt_and_latest_attempt_work_with_sqlite() {
    653         let store = MycSqliteOperationAuditStore::open_memory(config()).expect("sqlite store");
    654 
    655         store
    656             .append(
    657                 &MycOperationAuditRecord::new(
    658                     MycOperationAuditKind::DiscoveryHandlerRefresh,
    659                     MycOperationAuditOutcome::Rejected,
    660                     None,
    661                     None,
    662                     2,
    663                     0,
    664                     "first attempt rejected",
    665                 )
    666                 .with_attempt_id("attempt-1"),
    667             )
    668             .expect("append first attempt");
    669         store
    670             .append(
    671                 &MycOperationAuditRecord::new(
    672                     MycOperationAuditKind::DiscoveryHandlerRefresh,
    673                     MycOperationAuditOutcome::Succeeded,
    674                     None,
    675                     None,
    676                     1,
    677                     1,
    678                     "second attempt succeeded",
    679                 )
    680                 .with_attempt_id("attempt-2"),
    681             )
    682             .expect("append second attempt");
    683 
    684         let attempt_records = store
    685             .list_for_attempt_id("attempt-1")
    686             .expect("list attempt records");
    687         assert_eq!(attempt_records.len(), 1);
    688         assert_eq!(
    689             store
    690                 .latest_attempt_id_for_operation(MycOperationAuditKind::DiscoveryHandlerRefresh)
    691                 .expect("latest attempt"),
    692             Some("attempt-2".to_owned())
    693         );
    694     }
    695 
    696     #[test]
    697     fn file_backed_store_reopens_existing_audit_records() {
    698         let temp = tempfile::tempdir().expect("tempdir");
    699         let path = temp.path().join("audit");
    700         {
    701             let store = MycSqliteOperationAuditStore::open(&path, config()).expect("open store");
    702             store
    703                 .append(
    704                     &MycOperationAuditRecord::new(
    705                         MycOperationAuditKind::ListenerResponsePublish,
    706                         MycOperationAuditOutcome::Succeeded,
    707                         None,
    708                         Some("request-1"),
    709                         1,
    710                         1,
    711                         "relay acknowledged publish",
    712                     )
    713                     .with_attempt_id("attempt-1"),
    714                 )
    715                 .expect("append");
    716         }
    717 
    718         let reopened = MycSqliteOperationAuditStore::open(&path, config()).expect("reopen store");
    719         assert_eq!(reopened.list().expect("reopened list").len(), 1);
    720         assert!(reopened.path().ends_with("operations.sqlite"));
    721         assert_eq!(
    722             reopened
    723                 .latest_attempt_id_for_operation(MycOperationAuditKind::ListenerResponsePublish)
    724                 .expect("latest attempt"),
    725             Some("attempt-1".to_owned())
    726         );
    727     }
    728 
    729     #[test]
    730     fn file_database_uses_wal_mode() {
    731         let temp = tempfile::tempdir().expect("tempdir");
    732         let store =
    733             MycSqliteOperationAuditStore::open(temp.path().join("audit"), config()).expect("open");
    734 
    735         let rows = query_values(&store, "PRAGMA journal_mode");
    736         assert_eq!(
    737             rows.into_iter()
    738                 .next()
    739                 .and_then(|row| row.get("journal_mode").cloned())
    740                 .and_then(|value| value.as_str().map(ToOwned::to_owned))
    741                 .expect("journal mode"),
    742             "wal"
    743         );
    744     }
    745 
    746     #[test]
    747     fn migrate_down_and_up_roundtrip_restores_schema() {
    748         let db = MycOperationAuditSqliteDb::open_memory().expect("open memory db");
    749         db.migrate_down().expect("migrate down");
    750 
    751         let raw = db
    752             .executor()
    753             .query_raw(
    754                 "SELECT name FROM sqlite_master WHERE type = 'table' ORDER BY name",
    755                 "[]",
    756             )
    757             .expect("query");
    758         let tables: Vec<serde_json::Map<String, Value>> =
    759             serde_json::from_str(&raw).expect("table rows");
    760         let table_names = tables
    761             .into_iter()
    762             .filter_map(|row| {
    763                 row.get("name")
    764                     .and_then(Value::as_str)
    765                     .map(ToOwned::to_owned)
    766             })
    767             .collect::<Vec<_>>();
    768         assert_eq!(table_names, vec!["__migrations".to_owned()]);
    769 
    770         db.migrate_up().expect("migrate up");
    771         let raw = db
    772             .executor()
    773             .query_raw("SELECT COUNT(*) AS row_count FROM __migrations", "[]")
    774             .expect("migration count");
    775         let rows: Vec<serde_json::Map<String, Value>> =
    776             serde_json::from_str(&raw).expect("migration rows");
    777         assert_eq!(
    778             rows.into_iter()
    779                 .next()
    780                 .and_then(|row| row.get("row_count").cloned())
    781                 .and_then(|value| value.as_i64())
    782                 .expect("migration row count"),
    783             1
    784         );
    785     }
    786 }