audit_sqlite.rs (29284B)
1 use std::path::{Path, PathBuf}; 2 3 use radroots_nostr_signer::prelude::RadrootsNostrSignerConnectionId; 4 use radroots_sql_core::migrations::{Migration, migrations_run_all_up}; 5 use radroots_sql_core::{SqlExecutor, SqliteExecutor}; 6 use serde::Deserialize; 7 use serde::de::DeserializeOwned; 8 use serde_json::{Value, json}; 9 10 use crate::audit::{ 11 MycOperationAuditKind, MycOperationAuditOutcome, MycOperationAuditRecord, 12 MycOperationAuditStore, 13 }; 14 use crate::config::{MycAuditConfig, MycTransportDeliveryPolicy}; 15 use crate::error::MycError; 16 17 const MYC_OPERATION_AUDIT_SQLITE_FILE_NAME: &str = "operations.sqlite"; 18 #[cfg(test)] 19 const MYC_OPERATION_AUDIT_MEMORY_PATH: &str = ":memory:"; 20 21 static MYC_OPERATION_AUDIT_MIGRATIONS: &[Migration] = &[Migration { 22 name: "0000_runtime_audit_init", 23 up_sql: include_str!("../migrations/0000_runtime_audit_init.up.sql"), 24 down_sql: include_str!("../migrations/0000_runtime_audit_init.down.sql"), 25 }]; 26 27 /// Myc keeps its operational audit store local to the service boundary. 28 pub struct MycSqliteOperationAuditStore { 29 db: MycOperationAuditSqliteDb, 30 config: MycAuditConfig, 31 } 32 33 struct MycOperationAuditSqliteDb { 34 path: PathBuf, 35 executor: SqliteExecutor, 36 file_backed: bool, 37 } 38 39 #[derive(Debug, Deserialize)] 40 struct MycOperationAuditRow { 41 audit_record_id: i64, 42 recorded_at_unix: u64, 43 operation: String, 44 outcome: String, 45 relay_url: Option<String>, 46 connection_id: Option<String>, 47 request_id: Option<String>, 48 attempt_id: Option<String>, 49 planned_repair_relays_json: String, 50 blocked_relays_json: String, 51 blocked_reason: Option<String>, 52 delivery_policy: Option<String>, 53 required_acknowledged_relay_count: Option<i64>, 54 publish_attempt_count: Option<i64>, 55 relay_count: i64, 56 acknowledged_relay_count: i64, 57 relay_outcome_summary: String, 58 } 59 60 #[derive(Debug, Deserialize)] 61 struct MycLatestAttemptRow { 62 attempt_id: String, 63 } 64 65 impl MycSqliteOperationAuditStore { 66 pub fn open(audit_dir: impl AsRef<Path>, config: MycAuditConfig) -> Result<Self, MycError> { 67 let db = MycOperationAuditSqliteDb::open( 68 audit_dir 69 .as_ref() 70 .join(MYC_OPERATION_AUDIT_SQLITE_FILE_NAME), 71 )?; 72 Ok(Self { db, config }) 73 } 74 75 #[cfg(test)] 76 pub fn open_memory(config: MycAuditConfig) -> Result<Self, MycError> { 77 let db = MycOperationAuditSqliteDb::open_memory()?; 78 Ok(Self { db, config }) 79 } 80 81 pub fn path(&self) -> &Path { 82 self.db.path() 83 } 84 85 pub fn config(&self) -> &MycAuditConfig { 86 &self.config 87 } 88 89 pub fn append(&self, record: &MycOperationAuditRecord) -> Result<(), MycError> { 90 let planned_repair_relays_json = 91 serialize_json_field(self.db.path(), &record.planned_repair_relays)?; 92 let blocked_relays_json = serialize_json_field(self.db.path(), &record.blocked_relays)?; 93 exec_json( 94 self.db.path(), 95 self.db.executor(), 96 "INSERT INTO myc_operation_audit(recorded_at_unix, operation, outcome, relay_url, connection_id, request_id, attempt_id, planned_repair_relays_json, blocked_relays_json, blocked_reason, delivery_policy, required_acknowledged_relay_count, publish_attempt_count, relay_count, acknowledged_relay_count, relay_outcome_summary) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", 97 json!([ 98 record.recorded_at_unix, 99 operation_kind_label(record.operation), 100 operation_outcome_label(record.outcome), 101 record.relay_url.clone(), 102 record.connection_id.clone(), 103 record.request_id.clone(), 104 record.attempt_id.clone(), 105 planned_repair_relays_json, 106 blocked_relays_json, 107 record.blocked_reason.clone(), 108 record 109 .delivery_policy 110 .map(MycTransportDeliveryPolicy::as_str), 111 record.required_acknowledged_relay_count, 112 record.publish_attempt_count, 113 record.relay_count, 114 record.acknowledged_relay_count, 115 record.relay_outcome_summary.clone(), 116 ]), 117 ) 118 } 119 120 pub fn list_all(&self) -> Result<Vec<MycOperationAuditRecord>, MycError> { 121 self.query_records( 122 "SELECT audit_record_id, recorded_at_unix, operation, outcome, relay_url, connection_id, request_id, attempt_id, planned_repair_relays_json, blocked_relays_json, blocked_reason, delivery_policy, required_acknowledged_relay_count, publish_attempt_count, relay_count, acknowledged_relay_count, relay_outcome_summary FROM myc_operation_audit ORDER BY recorded_at_unix ASC, audit_record_id ASC", 123 json!([]), 124 ) 125 } 126 127 pub fn list_with_limit(&self, limit: usize) -> Result<Vec<MycOperationAuditRecord>, MycError> { 128 if limit == 0 { 129 return Ok(Vec::new()); 130 } 131 132 let mut records = self.query_records_with_limit( 133 "SELECT audit_record_id, recorded_at_unix, operation, outcome, relay_url, connection_id, request_id, attempt_id, planned_repair_relays_json, blocked_relays_json, blocked_reason, delivery_policy, required_acknowledged_relay_count, publish_attempt_count, relay_count, acknowledged_relay_count, relay_outcome_summary FROM myc_operation_audit ORDER BY recorded_at_unix DESC, audit_record_id DESC", 134 json!([]), 135 limit, 136 )?; 137 records.reverse(); 138 Ok(records) 139 } 140 141 pub fn list_for_connection_with_limit( 142 &self, 143 connection_id: &RadrootsNostrSignerConnectionId, 144 limit: usize, 145 ) -> Result<Vec<MycOperationAuditRecord>, MycError> { 146 if limit == 0 { 147 return Ok(Vec::new()); 148 } 149 150 let mut records = self.query_records_with_limit( 151 "SELECT audit_record_id, recorded_at_unix, operation, outcome, relay_url, connection_id, request_id, attempt_id, planned_repair_relays_json, blocked_relays_json, blocked_reason, delivery_policy, required_acknowledged_relay_count, publish_attempt_count, relay_count, acknowledged_relay_count, relay_outcome_summary FROM myc_operation_audit WHERE connection_id = ? ORDER BY recorded_at_unix DESC, audit_record_id DESC", 152 json!([connection_id.as_str()]), 153 limit, 154 )?; 155 records.reverse(); 156 Ok(records) 157 } 158 159 pub fn list_for_attempt_id_with_limit( 160 &self, 161 attempt_id: &str, 162 limit: usize, 163 ) -> Result<Vec<MycOperationAuditRecord>, MycError> { 164 if limit == 0 { 165 return Ok(Vec::new()); 166 } 167 168 let mut records = self.query_records_with_limit( 169 "SELECT audit_record_id, recorded_at_unix, operation, outcome, relay_url, connection_id, request_id, attempt_id, planned_repair_relays_json, blocked_relays_json, blocked_reason, delivery_policy, required_acknowledged_relay_count, publish_attempt_count, relay_count, acknowledged_relay_count, relay_outcome_summary FROM myc_operation_audit WHERE attempt_id = ? ORDER BY recorded_at_unix DESC, audit_record_id DESC", 170 json!([attempt_id]), 171 limit, 172 )?; 173 records.reverse(); 174 Ok(records) 175 } 176 177 pub fn latest_attempt_id_for_operation( 178 &self, 179 operation: MycOperationAuditKind, 180 ) -> Result<Option<String>, MycError> { 181 let rows: Vec<MycLatestAttemptRow> = query_rows( 182 self.db.path(), 183 self.db.executor(), 184 "SELECT attempt_id FROM myc_operation_audit WHERE operation = ? AND attempt_id IS NOT NULL ORDER BY recorded_at_unix DESC, audit_record_id DESC LIMIT 1", 185 json!([operation_kind_label(operation)]), 186 )?; 187 Ok(rows.into_iter().next().map(|row| row.attempt_id)) 188 } 189 190 fn query_records( 191 &self, 192 sql: &str, 193 params: Value, 194 ) -> Result<Vec<MycOperationAuditRecord>, MycError> { 195 let rows: Vec<MycOperationAuditRow> = 196 query_rows(self.db.path(), self.db.executor(), sql, params)?; 197 rows.into_iter() 198 .map(|row| row.into_record(self.db.path())) 199 .collect() 200 } 201 202 fn query_records_with_limit( 203 &self, 204 base_sql: &str, 205 params: Value, 206 limit: usize, 207 ) -> Result<Vec<MycOperationAuditRecord>, MycError> { 208 if limit == usize::MAX { 209 return self.query_records(base_sql, params); 210 } 211 212 let limit = i64::try_from(limit).map_err(|_| { 213 MycError::InvalidOperation("audit read limit exceeds sqlite range".to_owned()) 214 })?; 215 let mut params = params.as_array().cloned().unwrap_or_default(); 216 params.push(Value::from(limit)); 217 let sql = format!("{base_sql} LIMIT ?"); 218 self.query_records(sql.as_str(), Value::Array(params)) 219 } 220 } 221 222 impl MycOperationAuditStore for MycSqliteOperationAuditStore { 223 fn config(&self) -> &MycAuditConfig { 224 &self.config 225 } 226 227 fn append(&self, record: &MycOperationAuditRecord) -> Result<(), MycError> { 228 MycSqliteOperationAuditStore::append(self, record) 229 } 230 231 fn list_all(&self) -> Result<Vec<MycOperationAuditRecord>, MycError> { 232 MycSqliteOperationAuditStore::list_all(self) 233 } 234 235 fn list_with_limit(&self, limit: usize) -> Result<Vec<MycOperationAuditRecord>, MycError> { 236 MycSqliteOperationAuditStore::list_with_limit(self, limit) 237 } 238 239 fn list_for_connection_with_limit( 240 &self, 241 connection_id: &RadrootsNostrSignerConnectionId, 242 limit: usize, 243 ) -> Result<Vec<MycOperationAuditRecord>, MycError> { 244 MycSqliteOperationAuditStore::list_for_connection_with_limit(self, connection_id, limit) 245 } 246 247 fn list_for_attempt_id_with_limit( 248 &self, 249 attempt_id: &str, 250 limit: usize, 251 ) -> Result<Vec<MycOperationAuditRecord>, MycError> { 252 MycSqliteOperationAuditStore::list_for_attempt_id_with_limit(self, attempt_id, limit) 253 } 254 255 fn latest_attempt_id_for_operation( 256 &self, 257 operation: MycOperationAuditKind, 258 ) -> Result<Option<String>, MycError> { 259 MycSqliteOperationAuditStore::latest_attempt_id_for_operation(self, operation) 260 } 261 } 262 263 impl MycOperationAuditSqliteDb { 264 fn open(path: impl AsRef<Path>) -> Result<Self, MycError> { 265 let path = path.as_ref().to_path_buf(); 266 if let Some(parent) = path.parent() 267 && !parent.as_os_str().is_empty() 268 { 269 std::fs::create_dir_all(parent).map_err(|source| MycError::CreateDir { 270 path: parent.to_path_buf(), 271 source, 272 })?; 273 } 274 let executor = SqliteExecutor::open(&path).map_err(|source| MycError::AuditSql { 275 path: path.clone(), 276 source, 277 })?; 278 let db = Self { 279 path, 280 executor, 281 file_backed: true, 282 }; 283 db.configure()?; 284 db.migrate_up()?; 285 Ok(db) 286 } 287 288 #[cfg(test)] 289 fn open_memory() -> Result<Self, MycError> { 290 let path = PathBuf::from(MYC_OPERATION_AUDIT_MEMORY_PATH); 291 let executor = SqliteExecutor::open_memory().map_err(|source| MycError::AuditSql { 292 path: path.clone(), 293 source, 294 })?; 295 let db = Self { 296 path, 297 executor, 298 file_backed: false, 299 }; 300 db.configure()?; 301 db.migrate_up()?; 302 Ok(db) 303 } 304 305 fn path(&self) -> &Path { 306 &self.path 307 } 308 309 fn executor(&self) -> &SqliteExecutor { 310 &self.executor 311 } 312 313 fn migrate_up(&self) -> Result<(), MycError> { 314 migrations_run_all_up(&self.executor, MYC_OPERATION_AUDIT_MIGRATIONS).map_err(|source| { 315 MycError::AuditSql { 316 path: self.path.clone(), 317 source, 318 } 319 }) 320 } 321 322 #[cfg(test)] 323 fn migrate_down(&self) -> Result<(), MycError> { 324 use radroots_sql_core::migrations::migrations_run_all_down; 325 326 migrations_run_all_down(&self.executor, MYC_OPERATION_AUDIT_MIGRATIONS).map_err(|source| { 327 MycError::AuditSql { 328 path: self.path.clone(), 329 source, 330 } 331 }) 332 } 333 334 fn configure(&self) -> Result<(), MycError> { 335 let pragma_batch = if self.file_backed { 336 "PRAGMA foreign_keys = ON; 337 PRAGMA synchronous = FULL; 338 PRAGMA wal_autocheckpoint = 1000; 339 PRAGMA busy_timeout = 5000; 340 PRAGMA temp_store = MEMORY;" 341 } else { 342 "PRAGMA foreign_keys = ON; 343 PRAGMA synchronous = NORMAL; 344 PRAGMA busy_timeout = 5000; 345 PRAGMA temp_store = MEMORY;" 346 }; 347 let _ = self 348 .executor 349 .exec(pragma_batch, "[]") 350 .map_err(|source| MycError::AuditSql { 351 path: self.path.clone(), 352 source, 353 })?; 354 let journal_mode_sql = if self.file_backed { 355 "PRAGMA journal_mode = WAL" 356 } else { 357 "PRAGMA journal_mode = MEMORY" 358 }; 359 let _ = self 360 .executor 361 .query_raw(journal_mode_sql, "[]") 362 .map_err(|source| MycError::AuditSql { 363 path: self.path.clone(), 364 source, 365 })?; 366 Ok(()) 367 } 368 } 369 370 impl MycOperationAuditRow { 371 fn into_record(self, path: &Path) -> Result<MycOperationAuditRecord, MycError> { 372 let _audit_record_id = self.audit_record_id; 373 Ok(MycOperationAuditRecord { 374 recorded_at_unix: self.recorded_at_unix, 375 operation: parse_operation_kind(self.operation.as_str())?, 376 outcome: parse_operation_outcome(self.outcome.as_str())?, 377 relay_url: self.relay_url, 378 connection_id: self.connection_id, 379 request_id: self.request_id, 380 attempt_id: self.attempt_id, 381 planned_repair_relays: parse_json_field( 382 path, 383 self.planned_repair_relays_json.as_str(), 384 )?, 385 blocked_relays: parse_json_field(path, self.blocked_relays_json.as_str())?, 386 blocked_reason: self.blocked_reason, 387 delivery_policy: self 388 .delivery_policy 389 .as_deref() 390 .map(parse_delivery_policy) 391 .transpose()?, 392 required_acknowledged_relay_count: self 393 .required_acknowledged_relay_count 394 .map(parse_optional_usize) 395 .transpose()?, 396 publish_attempt_count: self 397 .publish_attempt_count 398 .map(parse_optional_usize) 399 .transpose()?, 400 relay_count: parse_required_usize(self.relay_count, "relay_count")?, 401 acknowledged_relay_count: parse_required_usize( 402 self.acknowledged_relay_count, 403 "acknowledged_relay_count", 404 )?, 405 relay_outcome_summary: self.relay_outcome_summary, 406 }) 407 } 408 } 409 410 fn query_rows<T: DeserializeOwned>( 411 path: &Path, 412 executor: &impl SqlExecutor, 413 sql: &str, 414 params: Value, 415 ) -> Result<Vec<T>, MycError> { 416 let raw = executor 417 .query_raw(sql, params.to_string().as_str()) 418 .map_err(|source| MycError::AuditSql { 419 path: path.to_path_buf(), 420 source, 421 })?; 422 serde_json::from_str(&raw).map_err(|source| MycError::AuditSqlDecode { 423 path: path.to_path_buf(), 424 source, 425 }) 426 } 427 428 fn exec_json( 429 path: &Path, 430 executor: &impl SqlExecutor, 431 sql: &str, 432 params: Value, 433 ) -> Result<(), MycError> { 434 let _ = executor 435 .exec(sql, params.to_string().as_str()) 436 .map_err(|source| MycError::AuditSql { 437 path: path.to_path_buf(), 438 source, 439 })?; 440 Ok(()) 441 } 442 443 fn parse_json_field<T: DeserializeOwned>(path: &Path, value: &str) -> Result<T, MycError> { 444 serde_json::from_str(value).map_err(|source| MycError::AuditSqlDecode { 445 path: path.to_path_buf(), 446 source, 447 }) 448 } 449 450 fn serialize_json_field<T: serde::Serialize>(path: &Path, value: &T) -> Result<String, MycError> { 451 serde_json::to_string(value).map_err(|source| MycError::AuditSerialize { 452 path: path.to_path_buf(), 453 source, 454 }) 455 } 456 457 fn parse_required_usize(value: i64, field: &str) -> Result<usize, MycError> { 458 usize::try_from(value).map_err(|_| { 459 MycError::InvalidOperation(format!( 460 "sqlite runtime audit field `{field}` is out of range for usize" 461 )) 462 }) 463 } 464 465 fn parse_optional_usize(value: i64) -> Result<usize, MycError> { 466 usize::try_from(value).map_err(|_| { 467 MycError::InvalidOperation( 468 "sqlite runtime audit optional integer field is out of range for usize".to_owned(), 469 ) 470 }) 471 } 472 473 fn operation_kind_label(value: MycOperationAuditKind) -> &'static str { 474 match value { 475 MycOperationAuditKind::DeliveryRecovery => "delivery_recovery", 476 MycOperationAuditKind::ListenerResponsePublish => "listener_response_publish", 477 MycOperationAuditKind::ConnectAcceptPublish => "connect_accept_publish", 478 MycOperationAuditKind::AuthReplayPublish => "auth_replay_publish", 479 MycOperationAuditKind::AuthReplayRestore => "auth_replay_restore", 480 MycOperationAuditKind::DiscoveryHandlerFetch => "discovery_handler_fetch", 481 MycOperationAuditKind::DiscoveryHandlerPublish => "discovery_handler_publish", 482 MycOperationAuditKind::DiscoveryHandlerCompare => "discovery_handler_compare", 483 MycOperationAuditKind::DiscoveryHandlerRefresh => "discovery_handler_refresh", 484 MycOperationAuditKind::DiscoveryHandlerRepair => "discovery_handler_repair", 485 } 486 } 487 488 fn parse_operation_kind(value: &str) -> Result<MycOperationAuditKind, MycError> { 489 match value { 490 "delivery_recovery" => Ok(MycOperationAuditKind::DeliveryRecovery), 491 "listener_response_publish" => Ok(MycOperationAuditKind::ListenerResponsePublish), 492 "connect_accept_publish" => Ok(MycOperationAuditKind::ConnectAcceptPublish), 493 "auth_replay_publish" => Ok(MycOperationAuditKind::AuthReplayPublish), 494 "auth_replay_restore" => Ok(MycOperationAuditKind::AuthReplayRestore), 495 "discovery_handler_fetch" => Ok(MycOperationAuditKind::DiscoveryHandlerFetch), 496 "discovery_handler_publish" => Ok(MycOperationAuditKind::DiscoveryHandlerPublish), 497 "discovery_handler_compare" => Ok(MycOperationAuditKind::DiscoveryHandlerCompare), 498 "discovery_handler_refresh" => Ok(MycOperationAuditKind::DiscoveryHandlerRefresh), 499 "discovery_handler_repair" => Ok(MycOperationAuditKind::DiscoveryHandlerRepair), 500 other => Err(MycError::InvalidOperation(format!( 501 "unknown sqlite runtime audit operation `{other}`" 502 ))), 503 } 504 } 505 506 fn operation_outcome_label(value: MycOperationAuditOutcome) -> &'static str { 507 match value { 508 MycOperationAuditOutcome::Succeeded => "succeeded", 509 MycOperationAuditOutcome::Rejected => "rejected", 510 MycOperationAuditOutcome::Restored => "restored", 511 MycOperationAuditOutcome::Unavailable => "unavailable", 512 MycOperationAuditOutcome::Missing => "missing", 513 MycOperationAuditOutcome::Matched => "matched", 514 MycOperationAuditOutcome::Drifted => "drifted", 515 MycOperationAuditOutcome::Conflicted => "conflicted", 516 MycOperationAuditOutcome::Skipped => "skipped", 517 } 518 } 519 520 fn parse_operation_outcome(value: &str) -> Result<MycOperationAuditOutcome, MycError> { 521 match value { 522 "succeeded" => Ok(MycOperationAuditOutcome::Succeeded), 523 "rejected" => Ok(MycOperationAuditOutcome::Rejected), 524 "restored" => Ok(MycOperationAuditOutcome::Restored), 525 "unavailable" => Ok(MycOperationAuditOutcome::Unavailable), 526 "missing" => Ok(MycOperationAuditOutcome::Missing), 527 "matched" => Ok(MycOperationAuditOutcome::Matched), 528 "drifted" => Ok(MycOperationAuditOutcome::Drifted), 529 "conflicted" => Ok(MycOperationAuditOutcome::Conflicted), 530 "skipped" => Ok(MycOperationAuditOutcome::Skipped), 531 other => Err(MycError::InvalidOperation(format!( 532 "unknown sqlite runtime audit outcome `{other}`" 533 ))), 534 } 535 } 536 537 fn parse_delivery_policy(value: &str) -> Result<MycTransportDeliveryPolicy, MycError> { 538 match value { 539 "any" => Ok(MycTransportDeliveryPolicy::Any), 540 "quorum" => Ok(MycTransportDeliveryPolicy::Quorum), 541 "all" => Ok(MycTransportDeliveryPolicy::All), 542 other => Err(MycError::InvalidOperation(format!( 543 "unknown sqlite runtime audit delivery policy `{other}`" 544 ))), 545 } 546 } 547 548 #[cfg(test)] 549 mod tests { 550 use radroots_nostr_signer::prelude::RadrootsNostrSignerConnectionId; 551 use radroots_sql_core::SqlExecutor; 552 use serde_json::Value; 553 554 use crate::audit::{ 555 MycOperationAuditKind, MycOperationAuditOutcome, MycOperationAuditRecord, 556 MycOperationAuditStore, 557 }; 558 use crate::config::MycAuditConfig; 559 560 use super::{MycOperationAuditSqliteDb, MycSqliteOperationAuditStore}; 561 562 fn config() -> MycAuditConfig { 563 MycAuditConfig { 564 default_read_limit: 10, 565 max_active_file_bytes: 512, 566 max_archived_files: 2, 567 } 568 } 569 570 fn query_values( 571 store: &MycSqliteOperationAuditStore, 572 sql: &str, 573 ) -> Vec<serde_json::Map<String, Value>> { 574 let raw = store.db.executor().query_raw(sql, "[]").expect("query"); 575 serde_json::from_str(&raw).expect("rows") 576 } 577 578 #[test] 579 fn open_memory_bootstraps_runtime_audit_schema() { 580 let db = MycOperationAuditSqliteDb::open_memory().expect("open memory db"); 581 db.migrate_up().expect("rerun migrations"); 582 583 let raw = db 584 .executor() 585 .query_raw( 586 "SELECT name FROM sqlite_master WHERE type = 'table' ORDER BY name", 587 "[]", 588 ) 589 .expect("query"); 590 let tables: Vec<serde_json::Map<String, Value>> = 591 serde_json::from_str(&raw).expect("table rows"); 592 let table_names = tables 593 .into_iter() 594 .filter_map(|row| { 595 row.get("name") 596 .and_then(Value::as_str) 597 .map(ToOwned::to_owned) 598 }) 599 .collect::<Vec<_>>(); 600 assert!(table_names.iter().any(|name| name == "__migrations")); 601 assert!(table_names.iter().any(|name| name == "myc_operation_audit")); 602 } 603 604 #[test] 605 fn append_and_list_records_roundtrip_through_sqlite() { 606 let store = MycSqliteOperationAuditStore::open_memory(config()).expect("sqlite store"); 607 let connection_id = 608 RadrootsNostrSignerConnectionId::parse("connection-1").expect("connection id"); 609 610 store 611 .append( 612 &MycOperationAuditRecord::new( 613 MycOperationAuditKind::ConnectAcceptPublish, 614 MycOperationAuditOutcome::Rejected, 615 Some(&connection_id), 616 Some("request-1"), 617 2, 618 0, 619 "0/2 relays acknowledged publish; failures: relay-a: rejected", 620 ) 621 .with_attempt_id("attempt-1"), 622 ) 623 .expect("append rejected record"); 624 store 625 .append(&MycOperationAuditRecord::new( 626 MycOperationAuditKind::AuthReplayRestore, 627 MycOperationAuditOutcome::Restored, 628 Some(&connection_id), 629 Some("request-1"), 630 0, 631 0, 632 "restored pending auth challenge after replay publish rejection", 633 )) 634 .expect("append restored record"); 635 636 let records = store.list().expect("list records"); 637 assert_eq!(records.len(), 2); 638 assert_eq!( 639 records[0].operation, 640 MycOperationAuditKind::ConnectAcceptPublish 641 ); 642 assert_eq!(records[0].outcome, MycOperationAuditOutcome::Rejected); 643 assert_eq!(records[0].attempt_id.as_deref(), Some("attempt-1")); 644 645 let connection_records = store 646 .list_for_connection(&connection_id) 647 .expect("list connection records"); 648 assert_eq!(connection_records, records); 649 } 650 651 #[test] 652 fn list_for_attempt_and_latest_attempt_work_with_sqlite() { 653 let store = MycSqliteOperationAuditStore::open_memory(config()).expect("sqlite store"); 654 655 store 656 .append( 657 &MycOperationAuditRecord::new( 658 MycOperationAuditKind::DiscoveryHandlerRefresh, 659 MycOperationAuditOutcome::Rejected, 660 None, 661 None, 662 2, 663 0, 664 "first attempt rejected", 665 ) 666 .with_attempt_id("attempt-1"), 667 ) 668 .expect("append first attempt"); 669 store 670 .append( 671 &MycOperationAuditRecord::new( 672 MycOperationAuditKind::DiscoveryHandlerRefresh, 673 MycOperationAuditOutcome::Succeeded, 674 None, 675 None, 676 1, 677 1, 678 "second attempt succeeded", 679 ) 680 .with_attempt_id("attempt-2"), 681 ) 682 .expect("append second attempt"); 683 684 let attempt_records = store 685 .list_for_attempt_id("attempt-1") 686 .expect("list attempt records"); 687 assert_eq!(attempt_records.len(), 1); 688 assert_eq!( 689 store 690 .latest_attempt_id_for_operation(MycOperationAuditKind::DiscoveryHandlerRefresh) 691 .expect("latest attempt"), 692 Some("attempt-2".to_owned()) 693 ); 694 } 695 696 #[test] 697 fn file_backed_store_reopens_existing_audit_records() { 698 let temp = tempfile::tempdir().expect("tempdir"); 699 let path = temp.path().join("audit"); 700 { 701 let store = MycSqliteOperationAuditStore::open(&path, config()).expect("open store"); 702 store 703 .append( 704 &MycOperationAuditRecord::new( 705 MycOperationAuditKind::ListenerResponsePublish, 706 MycOperationAuditOutcome::Succeeded, 707 None, 708 Some("request-1"), 709 1, 710 1, 711 "relay acknowledged publish", 712 ) 713 .with_attempt_id("attempt-1"), 714 ) 715 .expect("append"); 716 } 717 718 let reopened = MycSqliteOperationAuditStore::open(&path, config()).expect("reopen store"); 719 assert_eq!(reopened.list().expect("reopened list").len(), 1); 720 assert!(reopened.path().ends_with("operations.sqlite")); 721 assert_eq!( 722 reopened 723 .latest_attempt_id_for_operation(MycOperationAuditKind::ListenerResponsePublish) 724 .expect("latest attempt"), 725 Some("attempt-1".to_owned()) 726 ); 727 } 728 729 #[test] 730 fn file_database_uses_wal_mode() { 731 let temp = tempfile::tempdir().expect("tempdir"); 732 let store = 733 MycSqliteOperationAuditStore::open(temp.path().join("audit"), config()).expect("open"); 734 735 let rows = query_values(&store, "PRAGMA journal_mode"); 736 assert_eq!( 737 rows.into_iter() 738 .next() 739 .and_then(|row| row.get("journal_mode").cloned()) 740 .and_then(|value| value.as_str().map(ToOwned::to_owned)) 741 .expect("journal mode"), 742 "wal" 743 ); 744 } 745 746 #[test] 747 fn migrate_down_and_up_roundtrip_restores_schema() { 748 let db = MycOperationAuditSqliteDb::open_memory().expect("open memory db"); 749 db.migrate_down().expect("migrate down"); 750 751 let raw = db 752 .executor() 753 .query_raw( 754 "SELECT name FROM sqlite_master WHERE type = 'table' ORDER BY name", 755 "[]", 756 ) 757 .expect("query"); 758 let tables: Vec<serde_json::Map<String, Value>> = 759 serde_json::from_str(&raw).expect("table rows"); 760 let table_names = tables 761 .into_iter() 762 .filter_map(|row| { 763 row.get("name") 764 .and_then(Value::as_str) 765 .map(ToOwned::to_owned) 766 }) 767 .collect::<Vec<_>>(); 768 assert_eq!(table_names, vec!["__migrations".to_owned()]); 769 770 db.migrate_up().expect("migrate up"); 771 let raw = db 772 .executor() 773 .query_raw("SELECT COUNT(*) AS row_count FROM __migrations", "[]") 774 .expect("migration count"); 775 let rows: Vec<serde_json::Map<String, Value>> = 776 serde_json::from_str(&raw).expect("migration rows"); 777 assert_eq!( 778 rows.into_iter() 779 .next() 780 .and_then(|row| row.get("row_count").cloned()) 781 .and_then(|value| value.as_i64()) 782 .expect("migration row count"), 783 1 784 ); 785 } 786 }