commit 6d2bdc5fb5172de8e369a4658399d1eaabffbab3
parent 75754382516b726699b43ac708728ebb33dee404
Author: triesap <tyson@radroots.org>
Date: Thu, 26 Mar 2026 15:50:50 +0000
audit: add sqlite runtime store
Diffstat:
9 files changed, 1000 insertions(+), 8 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -666,6 +666,18 @@ dependencies = [
]
[[package]]
+name = "fallible-iterator"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649"
+
+[[package]]
+name = "fallible-streaming-iterator"
+version = "0.1.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
+
+[[package]]
name = "fastrand"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -684,6 +696,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
[[package]]
+name = "foldhash"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb"
+
+[[package]]
name = "foreign-types"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -859,7 +877,7 @@ version = "0.15.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
dependencies = [
- "foldhash",
+ "foldhash 0.1.5",
]
[[package]]
@@ -867,6 +885,9 @@ name = "hashbrown"
version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100"
+dependencies = [
+ "foldhash 0.2.0",
+]
[[package]]
name = "hashlink"
@@ -1233,6 +1254,17 @@ dependencies = [
]
[[package]]
+name = "libsqlite3-sys"
+version = "0.37.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b1f111c8c41e7c61a49cd34e44c7619462967221a6443b0ec299e0ac30cfb9b1"
+dependencies = [
+ "cc",
+ "pkg-config",
+ "vcpkg",
+]
+
+[[package]]
name = "linux-keyutils"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1324,6 +1356,7 @@ dependencies = [
"radroots-nostr-accounts",
"radroots-nostr-connect",
"radroots-nostr-signer",
+ "radroots-sql-core",
"serde",
"serde_json",
"tempfile",
@@ -1796,6 +1829,7 @@ dependencies = [
"radroots-nostr-connect",
"radroots-runtime",
"serde",
+ "serde_json",
"sha2",
"thiserror 1.0.69",
"url",
@@ -1819,6 +1853,18 @@ dependencies = [
]
[[package]]
+name = "radroots-sql-core"
+version = "0.1.0-alpha.1"
+dependencies = [
+ "chrono",
+ "rusqlite",
+ "serde",
+ "serde_json",
+ "thiserror 1.0.69",
+ "uuid",
+]
+
+[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1921,6 +1967,30 @@ dependencies = [
]
[[package]]
+name = "rsqlite-vfs"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a8a1f2315036ef6b1fbacd1972e8ee7688030b0a2121edfc2a6550febd41574d"
+dependencies = [
+ "hashbrown 0.16.1",
+ "thiserror 2.0.18",
+]
+
+[[package]]
+name = "rusqlite"
+version = "0.39.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a0d2b0146dd9661bf67bb107c0bb2a55064d556eeb3fc314151b957f313bcd4e"
+dependencies = [
+ "bitflags",
+ "fallible-iterator",
+ "fallible-streaming-iterator",
+ "libsqlite3-sys",
+ "smallvec",
+ "sqlite-wasm-rs",
+]
+
+[[package]]
name = "rust-ini"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2220,6 +2290,18 @@ dependencies = [
]
[[package]]
+name = "sqlite-wasm-rs"
+version = "0.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2f4206ed3a67690b9c29b77d728f6acc3ce78f16bf846d83c94f76400320181b"
+dependencies = [
+ "cc",
+ "js-sys",
+ "rsqlite-vfs",
+ "wasm-bindgen",
+]
+
+[[package]]
name = "stable_deref_trait"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
@@ -23,6 +23,7 @@ radroots-nostr-accounts = { path = "../lib/crates/nostr-accounts", default-featu
radroots-nostr = { path = "../lib/crates/nostr", features = ["client", "events"] }
radroots-nostr-connect = { path = "../lib/crates/nostr-connect" }
radroots-nostr-signer = { path = "../lib/crates/nostr-signer" }
+radroots-sql-core = { path = "../lib/crates/sql-core", features = ["native"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "2.0"
diff --git a/migrations/0000_runtime_audit_init.down.sql b/migrations/0000_runtime_audit_init.down.sql
@@ -0,0 +1,5 @@
+DROP INDEX IF EXISTS idx_myc_operation_audit_operation_attempt;
+DROP INDEX IF EXISTS idx_myc_operation_audit_attempt_id;
+DROP INDEX IF EXISTS idx_myc_operation_audit_connection_id;
+DROP INDEX IF EXISTS idx_myc_operation_audit_recorded_at;
+DROP TABLE IF EXISTS myc_operation_audit;
diff --git a/migrations/0000_runtime_audit_init.up.sql b/migrations/0000_runtime_audit_init.up.sql
@@ -0,0 +1,31 @@
+CREATE TABLE myc_operation_audit (
+ audit_record_id INTEGER PRIMARY KEY,
+ recorded_at_unix INTEGER NOT NULL,
+ operation TEXT NOT NULL,
+ outcome TEXT NOT NULL,
+ relay_url TEXT,
+ connection_id TEXT,
+ request_id TEXT,
+ attempt_id TEXT,
+ planned_repair_relays_json TEXT NOT NULL,
+ blocked_relays_json TEXT NOT NULL,
+ blocked_reason TEXT,
+ delivery_policy TEXT,
+ required_acknowledged_relay_count INTEGER,
+ publish_attempt_count INTEGER,
+ relay_count INTEGER NOT NULL,
+ acknowledged_relay_count INTEGER NOT NULL,
+ relay_outcome_summary TEXT NOT NULL
+);
+
+CREATE INDEX idx_myc_operation_audit_recorded_at
+ ON myc_operation_audit(recorded_at_unix, audit_record_id);
+
+CREATE INDEX idx_myc_operation_audit_connection_id
+ ON myc_operation_audit(connection_id, recorded_at_unix, audit_record_id);
+
+CREATE INDEX idx_myc_operation_audit_attempt_id
+ ON myc_operation_audit(attempt_id, recorded_at_unix, audit_record_id);
+
+CREATE INDEX idx_myc_operation_audit_operation_attempt
+ ON myc_operation_audit(operation, recorded_at_unix, audit_record_id);
diff --git a/src/app/runtime.rs b/src/app/runtime.rs
@@ -5,6 +5,7 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use crate::audit::{MycJsonlOperationAuditStore, MycOperationAuditRecord, MycOperationAuditStore};
+use crate::audit_sqlite::MycSqliteOperationAuditStore;
use crate::config::{
MycAuditConfig, MycConfig, MycIdentitySourceSpec, MycPersistenceConfig, MycRuntimeAuditBackend,
MycSignerStateBackend,
@@ -386,7 +387,7 @@ impl MycSignerContext {
let user_identity = user_identity_provider.load_identity()?;
let signer_store = Self::build_signer_store(persistence, &paths.signer_state_path);
let operation_audit_store =
- Self::build_operation_audit_store(persistence, &paths.audit_dir, audit_config);
+ Self::build_operation_audit_store(persistence, &paths.audit_dir, audit_config)?;
let manager = Self::load_signer_manager_from_store(signer_store.clone())?;
let configured_public = signer_identity.to_public();
@@ -428,11 +429,16 @@ impl MycSignerContext {
persistence: &MycPersistenceConfig,
audit_dir: &Path,
audit_config: MycAuditConfig,
- ) -> Arc<dyn MycOperationAuditStore> {
+ ) -> Result<Arc<dyn MycOperationAuditStore>, MycError> {
match persistence.runtime_audit_backend {
- MycRuntimeAuditBackend::JsonlFile => {
- Arc::new(MycJsonlOperationAuditStore::new(audit_dir, audit_config))
- }
+ MycRuntimeAuditBackend::JsonlFile => Ok(Arc::new(MycJsonlOperationAuditStore::new(
+ audit_dir,
+ audit_config,
+ ))),
+ MycRuntimeAuditBackend::Sqlite => Ok(Arc::new(MycSqliteOperationAuditStore::open(
+ audit_dir,
+ audit_config,
+ )?)),
}
}
@@ -495,7 +501,8 @@ mod tests {
RadrootsNostrFileSignerStore, RadrootsNostrSignerManager,
};
- use crate::config::MycConfig;
+ use crate::audit::{MycOperationAuditKind, MycOperationAuditOutcome, MycOperationAuditRecord};
+ use crate::config::{MycConfig, MycRuntimeAuditBackend};
use crate::error::MycError;
use super::MycRuntime;
@@ -663,4 +670,46 @@ mod tests {
assert_eq!(runtime.snapshot().transport.relay_count, 1);
assert_eq!(runtime.snapshot().transport.connect_timeout_secs, 15);
}
+
+ #[test]
+ fn bootstrap_supports_sqlite_operation_audit_backend() {
+ let temp = tempfile::tempdir().expect("tempdir");
+ let mut config = MycConfig::default();
+ config.paths.state_dir = temp.path().join("state");
+ config.paths.signer_identity_path = temp.path().join("signer.json");
+ config.paths.user_identity_path = temp.path().join("user.json");
+ config.persistence.runtime_audit_backend = MycRuntimeAuditBackend::Sqlite;
+ write_test_identity(
+ &config.paths.signer_identity_path,
+ "1111111111111111111111111111111111111111111111111111111111111111",
+ );
+ write_test_identity(
+ &config.paths.user_identity_path,
+ "2222222222222222222222222222222222222222222222222222222222222222",
+ );
+
+ let runtime = MycRuntime::bootstrap(config).expect("runtime");
+ runtime.record_operation_audit(&MycOperationAuditRecord::new(
+ MycOperationAuditKind::ListenerResponsePublish,
+ MycOperationAuditOutcome::Succeeded,
+ None,
+ Some("request-1"),
+ 1,
+ 1,
+ "relay acknowledged publish",
+ ));
+
+ let records = runtime
+ .operation_audit_store()
+ .list()
+ .expect("list runtime audit");
+ assert_eq!(records.len(), 1);
+ assert!(
+ runtime
+ .paths()
+ .audit_dir
+ .join("operations.sqlite")
+ .is_file()
+ );
+ }
}
diff --git a/src/audit_sqlite.rs b/src/audit_sqlite.rs
@@ -0,0 +1,783 @@
+use std::path::{Path, PathBuf};
+
+use radroots_nostr_signer::prelude::RadrootsNostrSignerConnectionId;
+use radroots_sql_core::migrations::{Migration, migrations_run_all_up};
+use radroots_sql_core::{SqlExecutor, SqliteExecutor};
+use serde::Deserialize;
+use serde::de::DeserializeOwned;
+use serde_json::{Value, json};
+
+use crate::audit::{
+ MycOperationAuditKind, MycOperationAuditOutcome, MycOperationAuditRecord,
+ MycOperationAuditStore,
+};
+use crate::config::{MycAuditConfig, MycTransportDeliveryPolicy};
+use crate::error::MycError;
+
+const MYC_OPERATION_AUDIT_SQLITE_FILE_NAME: &str = "operations.sqlite";
+#[cfg(test)]
+const MYC_OPERATION_AUDIT_MEMORY_PATH: &str = ":memory:";
+
+static MYC_OPERATION_AUDIT_MIGRATIONS: &[Migration] = &[Migration {
+ name: "0000_runtime_audit_init",
+ up_sql: include_str!("../migrations/0000_runtime_audit_init.up.sql"),
+ down_sql: include_str!("../migrations/0000_runtime_audit_init.down.sql"),
+}];
+
+pub struct MycSqliteOperationAuditStore {
+ db: MycOperationAuditSqliteDb,
+ config: MycAuditConfig,
+}
+
+struct MycOperationAuditSqliteDb {
+ path: PathBuf,
+ executor: SqliteExecutor,
+ file_backed: bool,
+}
+
+#[derive(Debug, Deserialize)]
+struct MycOperationAuditRow {
+ audit_record_id: i64,
+ recorded_at_unix: u64,
+ operation: String,
+ outcome: String,
+ relay_url: Option<String>,
+ connection_id: Option<String>,
+ request_id: Option<String>,
+ attempt_id: Option<String>,
+ planned_repair_relays_json: String,
+ blocked_relays_json: String,
+ blocked_reason: Option<String>,
+ delivery_policy: Option<String>,
+ required_acknowledged_relay_count: Option<i64>,
+ publish_attempt_count: Option<i64>,
+ relay_count: i64,
+ acknowledged_relay_count: i64,
+ relay_outcome_summary: String,
+}
+
+#[derive(Debug, Deserialize)]
+struct MycLatestAttemptRow {
+ attempt_id: String,
+}
+
+impl MycSqliteOperationAuditStore {
+ pub fn open(audit_dir: impl AsRef<Path>, config: MycAuditConfig) -> Result<Self, MycError> {
+ let db = MycOperationAuditSqliteDb::open(
+ audit_dir
+ .as_ref()
+ .join(MYC_OPERATION_AUDIT_SQLITE_FILE_NAME),
+ )?;
+ Ok(Self { db, config })
+ }
+
+ #[cfg(test)]
+ pub fn open_memory(config: MycAuditConfig) -> Result<Self, MycError> {
+ let db = MycOperationAuditSqliteDb::open_memory()?;
+ Ok(Self { db, config })
+ }
+
+ pub fn path(&self) -> &Path {
+ self.db.path()
+ }
+
+ pub fn config(&self) -> &MycAuditConfig {
+ &self.config
+ }
+
+ pub fn append(&self, record: &MycOperationAuditRecord) -> Result<(), MycError> {
+ let planned_repair_relays_json =
+ serialize_json_field(self.db.path(), &record.planned_repair_relays)?;
+ let blocked_relays_json = serialize_json_field(self.db.path(), &record.blocked_relays)?;
+ exec_json(
+ self.db.path(),
+ self.db.executor(),
+ "INSERT INTO myc_operation_audit(recorded_at_unix, operation, outcome, relay_url, connection_id, request_id, attempt_id, planned_repair_relays_json, blocked_relays_json, blocked_reason, delivery_policy, required_acknowledged_relay_count, publish_attempt_count, relay_count, acknowledged_relay_count, relay_outcome_summary) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
+ json!([
+ record.recorded_at_unix,
+ operation_kind_label(record.operation),
+ operation_outcome_label(record.outcome),
+ record.relay_url.clone(),
+ record.connection_id.clone(),
+ record.request_id.clone(),
+ record.attempt_id.clone(),
+ planned_repair_relays_json,
+ blocked_relays_json,
+ record.blocked_reason.clone(),
+ record
+ .delivery_policy
+ .map(MycTransportDeliveryPolicy::as_str),
+ record.required_acknowledged_relay_count,
+ record.publish_attempt_count,
+ record.relay_count,
+ record.acknowledged_relay_count,
+ record.relay_outcome_summary.clone(),
+ ]),
+ )
+ }
+
+ pub fn list_all(&self) -> Result<Vec<MycOperationAuditRecord>, MycError> {
+ self.query_records(
+ "SELECT audit_record_id, recorded_at_unix, operation, outcome, relay_url, connection_id, request_id, attempt_id, planned_repair_relays_json, blocked_relays_json, blocked_reason, delivery_policy, required_acknowledged_relay_count, publish_attempt_count, relay_count, acknowledged_relay_count, relay_outcome_summary FROM myc_operation_audit ORDER BY recorded_at_unix ASC, audit_record_id ASC",
+ json!([]),
+ )
+ }
+
+ pub fn list_with_limit(&self, limit: usize) -> Result<Vec<MycOperationAuditRecord>, MycError> {
+ if limit == 0 {
+ return Ok(Vec::new());
+ }
+
+ let mut records = self.query_records_with_limit(
+ "SELECT audit_record_id, recorded_at_unix, operation, outcome, relay_url, connection_id, request_id, attempt_id, planned_repair_relays_json, blocked_relays_json, blocked_reason, delivery_policy, required_acknowledged_relay_count, publish_attempt_count, relay_count, acknowledged_relay_count, relay_outcome_summary FROM myc_operation_audit ORDER BY recorded_at_unix DESC, audit_record_id DESC",
+ json!([]),
+ limit,
+ )?;
+ records.reverse();
+ Ok(records)
+ }
+
+ pub fn list_for_connection_with_limit(
+ &self,
+ connection_id: &RadrootsNostrSignerConnectionId,
+ limit: usize,
+ ) -> Result<Vec<MycOperationAuditRecord>, MycError> {
+ if limit == 0 {
+ return Ok(Vec::new());
+ }
+
+ let mut records = self.query_records_with_limit(
+ "SELECT audit_record_id, recorded_at_unix, operation, outcome, relay_url, connection_id, request_id, attempt_id, planned_repair_relays_json, blocked_relays_json, blocked_reason, delivery_policy, required_acknowledged_relay_count, publish_attempt_count, relay_count, acknowledged_relay_count, relay_outcome_summary FROM myc_operation_audit WHERE connection_id = ? ORDER BY recorded_at_unix DESC, audit_record_id DESC",
+ json!([connection_id.as_str()]),
+ limit,
+ )?;
+ records.reverse();
+ Ok(records)
+ }
+
+ pub fn list_for_attempt_id_with_limit(
+ &self,
+ attempt_id: &str,
+ limit: usize,
+ ) -> Result<Vec<MycOperationAuditRecord>, MycError> {
+ if limit == 0 {
+ return Ok(Vec::new());
+ }
+
+ let mut records = self.query_records_with_limit(
+ "SELECT audit_record_id, recorded_at_unix, operation, outcome, relay_url, connection_id, request_id, attempt_id, planned_repair_relays_json, blocked_relays_json, blocked_reason, delivery_policy, required_acknowledged_relay_count, publish_attempt_count, relay_count, acknowledged_relay_count, relay_outcome_summary FROM myc_operation_audit WHERE attempt_id = ? ORDER BY recorded_at_unix DESC, audit_record_id DESC",
+ json!([attempt_id]),
+ limit,
+ )?;
+ records.reverse();
+ Ok(records)
+ }
+
+ pub fn latest_attempt_id_for_operation(
+ &self,
+ operation: MycOperationAuditKind,
+ ) -> Result<Option<String>, MycError> {
+ let rows: Vec<MycLatestAttemptRow> = query_rows(
+ self.db.path(),
+ self.db.executor(),
+ "SELECT attempt_id FROM myc_operation_audit WHERE operation = ? AND attempt_id IS NOT NULL ORDER BY recorded_at_unix DESC, audit_record_id DESC LIMIT 1",
+ json!([operation_kind_label(operation)]),
+ )?;
+ Ok(rows.into_iter().next().map(|row| row.attempt_id))
+ }
+
+ fn query_records(
+ &self,
+ sql: &str,
+ params: Value,
+ ) -> Result<Vec<MycOperationAuditRecord>, MycError> {
+ let rows: Vec<MycOperationAuditRow> =
+ query_rows(self.db.path(), self.db.executor(), sql, params)?;
+ rows.into_iter()
+ .map(|row| row.into_record(self.db.path()))
+ .collect()
+ }
+
+ fn query_records_with_limit(
+ &self,
+ base_sql: &str,
+ params: Value,
+ limit: usize,
+ ) -> Result<Vec<MycOperationAuditRecord>, MycError> {
+ if limit == usize::MAX {
+ return self.query_records(base_sql, params);
+ }
+
+ let limit = i64::try_from(limit).map_err(|_| {
+ MycError::InvalidOperation("audit read limit exceeds sqlite range".to_owned())
+ })?;
+ let mut params = params.as_array().cloned().unwrap_or_default();
+ params.push(Value::from(limit));
+ let sql = format!("{base_sql} LIMIT ?");
+ self.query_records(sql.as_str(), Value::Array(params))
+ }
+}
+
+impl MycOperationAuditStore for MycSqliteOperationAuditStore {
+ fn config(&self) -> &MycAuditConfig {
+ &self.config
+ }
+
+ fn append(&self, record: &MycOperationAuditRecord) -> Result<(), MycError> {
+ MycSqliteOperationAuditStore::append(self, record)
+ }
+
+ fn list_all(&self) -> Result<Vec<MycOperationAuditRecord>, MycError> {
+ MycSqliteOperationAuditStore::list_all(self)
+ }
+
+ fn list_with_limit(&self, limit: usize) -> Result<Vec<MycOperationAuditRecord>, MycError> {
+ MycSqliteOperationAuditStore::list_with_limit(self, limit)
+ }
+
+ fn list_for_connection_with_limit(
+ &self,
+ connection_id: &RadrootsNostrSignerConnectionId,
+ limit: usize,
+ ) -> Result<Vec<MycOperationAuditRecord>, MycError> {
+ MycSqliteOperationAuditStore::list_for_connection_with_limit(self, connection_id, limit)
+ }
+
+ fn list_for_attempt_id_with_limit(
+ &self,
+ attempt_id: &str,
+ limit: usize,
+ ) -> Result<Vec<MycOperationAuditRecord>, MycError> {
+ MycSqliteOperationAuditStore::list_for_attempt_id_with_limit(self, attempt_id, limit)
+ }
+
+ fn latest_attempt_id_for_operation(
+ &self,
+ operation: MycOperationAuditKind,
+ ) -> Result<Option<String>, MycError> {
+ MycSqliteOperationAuditStore::latest_attempt_id_for_operation(self, operation)
+ }
+}
+
+impl MycOperationAuditSqliteDb {
+ fn open(path: impl AsRef<Path>) -> Result<Self, MycError> {
+ let path = path.as_ref().to_path_buf();
+ if let Some(parent) = path.parent()
+ && !parent.as_os_str().is_empty()
+ {
+ std::fs::create_dir_all(parent).map_err(|source| MycError::CreateDir {
+ path: parent.to_path_buf(),
+ source,
+ })?;
+ }
+ let executor = SqliteExecutor::open(&path).map_err(|source| MycError::AuditSql {
+ path: path.clone(),
+ source,
+ })?;
+ let db = Self {
+ path,
+ executor,
+ file_backed: true,
+ };
+ db.configure()?;
+ db.migrate_up()?;
+ Ok(db)
+ }
+
+ #[cfg(test)]
+ fn open_memory() -> Result<Self, MycError> {
+ let path = PathBuf::from(MYC_OPERATION_AUDIT_MEMORY_PATH);
+ let executor = SqliteExecutor::open_memory().map_err(|source| MycError::AuditSql {
+ path: path.clone(),
+ source,
+ })?;
+ let db = Self {
+ path,
+ executor,
+ file_backed: false,
+ };
+ db.configure()?;
+ db.migrate_up()?;
+ Ok(db)
+ }
+
+ fn path(&self) -> &Path {
+ &self.path
+ }
+
+ fn executor(&self) -> &SqliteExecutor {
+ &self.executor
+ }
+
+ fn migrate_up(&self) -> Result<(), MycError> {
+ migrations_run_all_up(&self.executor, MYC_OPERATION_AUDIT_MIGRATIONS).map_err(|source| {
+ MycError::AuditSql {
+ path: self.path.clone(),
+ source,
+ }
+ })
+ }
+
+ #[cfg(test)]
+ fn migrate_down(&self) -> Result<(), MycError> {
+ use radroots_sql_core::migrations::migrations_run_all_down;
+
+ migrations_run_all_down(&self.executor, MYC_OPERATION_AUDIT_MIGRATIONS).map_err(|source| {
+ MycError::AuditSql {
+ path: self.path.clone(),
+ source,
+ }
+ })
+ }
+
+ fn configure(&self) -> Result<(), MycError> {
+ let pragma_batch = if self.file_backed {
+ "PRAGMA foreign_keys = ON;
+ PRAGMA synchronous = FULL;
+ PRAGMA wal_autocheckpoint = 1000;
+ PRAGMA busy_timeout = 5000;
+ PRAGMA temp_store = MEMORY;"
+ } else {
+ "PRAGMA foreign_keys = ON;
+ PRAGMA synchronous = NORMAL;
+ PRAGMA busy_timeout = 5000;
+ PRAGMA temp_store = MEMORY;"
+ };
+ let _ = self
+ .executor
+ .exec(pragma_batch, "[]")
+ .map_err(|source| MycError::AuditSql {
+ path: self.path.clone(),
+ source,
+ })?;
+ let journal_mode_sql = if self.file_backed {
+ "PRAGMA journal_mode = WAL"
+ } else {
+ "PRAGMA journal_mode = MEMORY"
+ };
+ let _ = self
+ .executor
+ .query_raw(journal_mode_sql, "[]")
+ .map_err(|source| MycError::AuditSql {
+ path: self.path.clone(),
+ source,
+ })?;
+ Ok(())
+ }
+}
+
+impl MycOperationAuditRow {
+ fn into_record(self, path: &Path) -> Result<MycOperationAuditRecord, MycError> {
+ let _audit_record_id = self.audit_record_id;
+ Ok(MycOperationAuditRecord {
+ recorded_at_unix: self.recorded_at_unix,
+ operation: parse_operation_kind(self.operation.as_str())?,
+ outcome: parse_operation_outcome(self.outcome.as_str())?,
+ relay_url: self.relay_url,
+ connection_id: self.connection_id,
+ request_id: self.request_id,
+ attempt_id: self.attempt_id,
+ planned_repair_relays: parse_json_field(
+ path,
+ self.planned_repair_relays_json.as_str(),
+ )?,
+ blocked_relays: parse_json_field(path, self.blocked_relays_json.as_str())?,
+ blocked_reason: self.blocked_reason,
+ delivery_policy: self
+ .delivery_policy
+ .as_deref()
+ .map(parse_delivery_policy)
+ .transpose()?,
+ required_acknowledged_relay_count: self
+ .required_acknowledged_relay_count
+ .map(parse_optional_usize)
+ .transpose()?,
+ publish_attempt_count: self
+ .publish_attempt_count
+ .map(parse_optional_usize)
+ .transpose()?,
+ relay_count: parse_required_usize(self.relay_count, "relay_count")?,
+ acknowledged_relay_count: parse_required_usize(
+ self.acknowledged_relay_count,
+ "acknowledged_relay_count",
+ )?,
+ relay_outcome_summary: self.relay_outcome_summary,
+ })
+ }
+}
+
+fn query_rows<T: DeserializeOwned>(
+ path: &Path,
+ executor: &impl SqlExecutor,
+ sql: &str,
+ params: Value,
+) -> Result<Vec<T>, MycError> {
+ let raw = executor
+ .query_raw(sql, params.to_string().as_str())
+ .map_err(|source| MycError::AuditSql {
+ path: path.to_path_buf(),
+ source,
+ })?;
+ serde_json::from_str(&raw).map_err(|source| MycError::AuditSqlDecode {
+ path: path.to_path_buf(),
+ source,
+ })
+}
+
+fn exec_json(
+ path: &Path,
+ executor: &impl SqlExecutor,
+ sql: &str,
+ params: Value,
+) -> Result<(), MycError> {
+ let _ = executor
+ .exec(sql, params.to_string().as_str())
+ .map_err(|source| MycError::AuditSql {
+ path: path.to_path_buf(),
+ source,
+ })?;
+ Ok(())
+}
+
+fn parse_json_field<T: DeserializeOwned>(path: &Path, value: &str) -> Result<T, MycError> {
+ serde_json::from_str(value).map_err(|source| MycError::AuditSqlDecode {
+ path: path.to_path_buf(),
+ source,
+ })
+}
+
+fn serialize_json_field<T: serde::Serialize>(path: &Path, value: &T) -> Result<String, MycError> {
+ serde_json::to_string(value).map_err(|source| MycError::AuditSerialize {
+ path: path.to_path_buf(),
+ source,
+ })
+}
+
+fn parse_required_usize(value: i64, field: &str) -> Result<usize, MycError> {
+ usize::try_from(value).map_err(|_| {
+ MycError::InvalidOperation(format!(
+ "sqlite runtime audit field `{field}` is out of range for usize"
+ ))
+ })
+}
+
+fn parse_optional_usize(value: i64) -> Result<usize, MycError> {
+ usize::try_from(value).map_err(|_| {
+ MycError::InvalidOperation(
+ "sqlite runtime audit optional integer field is out of range for usize".to_owned(),
+ )
+ })
+}
+
+fn operation_kind_label(value: MycOperationAuditKind) -> &'static str {
+ match value {
+ MycOperationAuditKind::ListenerResponsePublish => "listener_response_publish",
+ MycOperationAuditKind::ConnectAcceptPublish => "connect_accept_publish",
+ MycOperationAuditKind::AuthReplayPublish => "auth_replay_publish",
+ MycOperationAuditKind::AuthReplayRestore => "auth_replay_restore",
+ MycOperationAuditKind::DiscoveryHandlerFetch => "discovery_handler_fetch",
+ MycOperationAuditKind::DiscoveryHandlerPublish => "discovery_handler_publish",
+ MycOperationAuditKind::DiscoveryHandlerCompare => "discovery_handler_compare",
+ MycOperationAuditKind::DiscoveryHandlerRefresh => "discovery_handler_refresh",
+ MycOperationAuditKind::DiscoveryHandlerRepair => "discovery_handler_repair",
+ }
+}
+
+fn parse_operation_kind(value: &str) -> Result<MycOperationAuditKind, MycError> {
+ match value {
+ "listener_response_publish" => Ok(MycOperationAuditKind::ListenerResponsePublish),
+ "connect_accept_publish" => Ok(MycOperationAuditKind::ConnectAcceptPublish),
+ "auth_replay_publish" => Ok(MycOperationAuditKind::AuthReplayPublish),
+ "auth_replay_restore" => Ok(MycOperationAuditKind::AuthReplayRestore),
+ "discovery_handler_fetch" => Ok(MycOperationAuditKind::DiscoveryHandlerFetch),
+ "discovery_handler_publish" => Ok(MycOperationAuditKind::DiscoveryHandlerPublish),
+ "discovery_handler_compare" => Ok(MycOperationAuditKind::DiscoveryHandlerCompare),
+ "discovery_handler_refresh" => Ok(MycOperationAuditKind::DiscoveryHandlerRefresh),
+ "discovery_handler_repair" => Ok(MycOperationAuditKind::DiscoveryHandlerRepair),
+ other => Err(MycError::InvalidOperation(format!(
+ "unknown sqlite runtime audit operation `{other}`"
+ ))),
+ }
+}
+
+fn operation_outcome_label(value: MycOperationAuditOutcome) -> &'static str {
+ match value {
+ MycOperationAuditOutcome::Succeeded => "succeeded",
+ MycOperationAuditOutcome::Rejected => "rejected",
+ MycOperationAuditOutcome::Restored => "restored",
+ MycOperationAuditOutcome::Unavailable => "unavailable",
+ MycOperationAuditOutcome::Missing => "missing",
+ MycOperationAuditOutcome::Matched => "matched",
+ MycOperationAuditOutcome::Drifted => "drifted",
+ MycOperationAuditOutcome::Conflicted => "conflicted",
+ MycOperationAuditOutcome::Skipped => "skipped",
+ }
+}
+
+fn parse_operation_outcome(value: &str) -> Result<MycOperationAuditOutcome, MycError> {
+ match value {
+ "succeeded" => Ok(MycOperationAuditOutcome::Succeeded),
+ "rejected" => Ok(MycOperationAuditOutcome::Rejected),
+ "restored" => Ok(MycOperationAuditOutcome::Restored),
+ "unavailable" => Ok(MycOperationAuditOutcome::Unavailable),
+ "missing" => Ok(MycOperationAuditOutcome::Missing),
+ "matched" => Ok(MycOperationAuditOutcome::Matched),
+ "drifted" => Ok(MycOperationAuditOutcome::Drifted),
+ "conflicted" => Ok(MycOperationAuditOutcome::Conflicted),
+ "skipped" => Ok(MycOperationAuditOutcome::Skipped),
+ other => Err(MycError::InvalidOperation(format!(
+ "unknown sqlite runtime audit outcome `{other}`"
+ ))),
+ }
+}
+
+fn parse_delivery_policy(value: &str) -> Result<MycTransportDeliveryPolicy, MycError> {
+ match value {
+ "any" => Ok(MycTransportDeliveryPolicy::Any),
+ "quorum" => Ok(MycTransportDeliveryPolicy::Quorum),
+ "all" => Ok(MycTransportDeliveryPolicy::All),
+ other => Err(MycError::InvalidOperation(format!(
+ "unknown sqlite runtime audit delivery policy `{other}`"
+ ))),
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use radroots_nostr_signer::prelude::RadrootsNostrSignerConnectionId;
+ use radroots_sql_core::SqlExecutor;
+ use serde_json::Value;
+
+ use crate::audit::{
+ MycOperationAuditKind, MycOperationAuditOutcome, MycOperationAuditRecord,
+ MycOperationAuditStore,
+ };
+ use crate::config::MycAuditConfig;
+
+ use super::{MycOperationAuditSqliteDb, MycSqliteOperationAuditStore};
+
+ fn config() -> MycAuditConfig {
+ MycAuditConfig {
+ default_read_limit: 10,
+ max_active_file_bytes: 512,
+ max_archived_files: 2,
+ }
+ }
+
+ fn query_values(
+ store: &MycSqliteOperationAuditStore,
+ sql: &str,
+ ) -> Vec<serde_json::Map<String, Value>> {
+ let raw = store.db.executor().query_raw(sql, "[]").expect("query");
+ serde_json::from_str(&raw).expect("rows")
+ }
+
+ #[test]
+ fn open_memory_bootstraps_runtime_audit_schema() {
+ let db = MycOperationAuditSqliteDb::open_memory().expect("open memory db");
+ db.migrate_up().expect("rerun migrations");
+
+ let raw = db
+ .executor()
+ .query_raw(
+ "SELECT name FROM sqlite_master WHERE type = 'table' ORDER BY name",
+ "[]",
+ )
+ .expect("query");
+ let tables: Vec<serde_json::Map<String, Value>> =
+ serde_json::from_str(&raw).expect("table rows");
+ let table_names = tables
+ .into_iter()
+ .filter_map(|row| {
+ row.get("name")
+ .and_then(Value::as_str)
+ .map(ToOwned::to_owned)
+ })
+ .collect::<Vec<_>>();
+ assert!(table_names.iter().any(|name| name == "__migrations"));
+ assert!(table_names.iter().any(|name| name == "myc_operation_audit"));
+ }
+
+ #[test]
+ fn append_and_list_records_roundtrip_through_sqlite() {
+ let store = MycSqliteOperationAuditStore::open_memory(config()).expect("sqlite store");
+ let connection_id =
+ RadrootsNostrSignerConnectionId::parse("connection-1").expect("connection id");
+
+ store
+ .append(
+ &MycOperationAuditRecord::new(
+ MycOperationAuditKind::ConnectAcceptPublish,
+ MycOperationAuditOutcome::Rejected,
+ Some(&connection_id),
+ Some("request-1"),
+ 2,
+ 0,
+ "0/2 relays acknowledged publish; failures: relay-a: rejected",
+ )
+ .with_attempt_id("attempt-1"),
+ )
+ .expect("append rejected record");
+ store
+ .append(&MycOperationAuditRecord::new(
+ MycOperationAuditKind::AuthReplayRestore,
+ MycOperationAuditOutcome::Restored,
+ Some(&connection_id),
+ Some("request-1"),
+ 0,
+ 0,
+ "restored pending auth challenge after replay publish rejection",
+ ))
+ .expect("append restored record");
+
+ let records = store.list().expect("list records");
+ assert_eq!(records.len(), 2);
+ assert_eq!(
+ records[0].operation,
+ MycOperationAuditKind::ConnectAcceptPublish
+ );
+ assert_eq!(records[0].outcome, MycOperationAuditOutcome::Rejected);
+ assert_eq!(records[0].attempt_id.as_deref(), Some("attempt-1"));
+
+ let connection_records = store
+ .list_for_connection(&connection_id)
+ .expect("list connection records");
+ assert_eq!(connection_records, records);
+ }
+
+ #[test]
+ fn list_for_attempt_and_latest_attempt_work_with_sqlite() {
+ let store = MycSqliteOperationAuditStore::open_memory(config()).expect("sqlite store");
+
+ store
+ .append(
+ &MycOperationAuditRecord::new(
+ MycOperationAuditKind::DiscoveryHandlerRefresh,
+ MycOperationAuditOutcome::Rejected,
+ None,
+ None,
+ 2,
+ 0,
+ "first attempt rejected",
+ )
+ .with_attempt_id("attempt-1"),
+ )
+ .expect("append first attempt");
+ store
+ .append(
+ &MycOperationAuditRecord::new(
+ MycOperationAuditKind::DiscoveryHandlerRefresh,
+ MycOperationAuditOutcome::Succeeded,
+ None,
+ None,
+ 1,
+ 1,
+ "second attempt succeeded",
+ )
+ .with_attempt_id("attempt-2"),
+ )
+ .expect("append second attempt");
+
+ let attempt_records = store
+ .list_for_attempt_id("attempt-1")
+ .expect("list attempt records");
+ assert_eq!(attempt_records.len(), 1);
+ assert_eq!(
+ store
+ .latest_attempt_id_for_operation(MycOperationAuditKind::DiscoveryHandlerRefresh)
+ .expect("latest attempt"),
+ Some("attempt-2".to_owned())
+ );
+ }
+
+ #[test]
+ fn file_backed_store_reopens_existing_audit_records() {
+ let temp = tempfile::tempdir().expect("tempdir");
+ let path = temp.path().join("audit");
+ {
+ let store = MycSqliteOperationAuditStore::open(&path, config()).expect("open store");
+ store
+ .append(
+ &MycOperationAuditRecord::new(
+ MycOperationAuditKind::ListenerResponsePublish,
+ MycOperationAuditOutcome::Succeeded,
+ None,
+ Some("request-1"),
+ 1,
+ 1,
+ "relay acknowledged publish",
+ )
+ .with_attempt_id("attempt-1"),
+ )
+ .expect("append");
+ }
+
+ let reopened = MycSqliteOperationAuditStore::open(&path, config()).expect("reopen store");
+ assert_eq!(reopened.list().expect("reopened list").len(), 1);
+ assert!(reopened.path().ends_with("operations.sqlite"));
+ assert_eq!(
+ reopened
+ .latest_attempt_id_for_operation(MycOperationAuditKind::ListenerResponsePublish)
+ .expect("latest attempt"),
+ Some("attempt-1".to_owned())
+ );
+ }
+
+ #[test]
+ fn file_database_uses_wal_mode() {
+ let temp = tempfile::tempdir().expect("tempdir");
+ let store =
+ MycSqliteOperationAuditStore::open(temp.path().join("audit"), config()).expect("open");
+
+ let rows = query_values(&store, "PRAGMA journal_mode");
+ assert_eq!(
+ rows.into_iter()
+ .next()
+ .and_then(|row| row.get("journal_mode").cloned())
+ .and_then(|value| value.as_str().map(ToOwned::to_owned))
+ .expect("journal mode"),
+ "wal"
+ );
+ }
+
+ #[test]
+ fn migrate_down_and_up_roundtrip_restores_schema() {
+ let db = MycOperationAuditSqliteDb::open_memory().expect("open memory db");
+ db.migrate_down().expect("migrate down");
+
+ let raw = db
+ .executor()
+ .query_raw(
+ "SELECT name FROM sqlite_master WHERE type = 'table' ORDER BY name",
+ "[]",
+ )
+ .expect("query");
+ let tables: Vec<serde_json::Map<String, Value>> =
+ serde_json::from_str(&raw).expect("table rows");
+ let table_names = tables
+ .into_iter()
+ .filter_map(|row| {
+ row.get("name")
+ .and_then(Value::as_str)
+ .map(ToOwned::to_owned)
+ })
+ .collect::<Vec<_>>();
+ assert_eq!(table_names, vec!["__migrations".to_owned()]);
+
+ db.migrate_up().expect("migrate up");
+ let raw = db
+ .executor()
+ .query_raw("SELECT COUNT(*) AS row_count FROM __migrations", "[]")
+ .expect("migration count");
+ let rows: Vec<serde_json::Map<String, Value>> =
+ serde_json::from_str(&raw).expect("migration rows");
+ assert_eq!(
+ rows.into_iter()
+ .next()
+ .and_then(|row| row.get("row_count").cloned())
+ .and_then(|value| value.as_i64())
+ .expect("migration row count"),
+ 1
+ );
+ }
+}
diff --git a/src/config.rs b/src/config.rs
@@ -147,6 +147,7 @@ pub enum MycSignerStateBackend {
#[serde(rename_all = "snake_case")]
pub enum MycRuntimeAuditBackend {
JsonlFile,
+ Sqlite,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
@@ -375,6 +376,7 @@ impl MycRuntimeAuditBackend {
pub fn as_str(self) -> &'static str {
match self {
Self::JsonlFile => "jsonl_file",
+ Self::Sqlite => "sqlite",
}
}
}
@@ -1366,10 +1368,11 @@ fn parse_runtime_audit_backend_env(
) -> Result<MycRuntimeAuditBackend, MycError> {
match value {
"jsonl_file" => Ok(MycRuntimeAuditBackend::JsonlFile),
+ "sqlite" => Ok(MycRuntimeAuditBackend::Sqlite),
_ => Err(config_parse_error(
path,
line_number,
- format!("{key} must be `jsonl_file`"),
+ format!("{key} must be `jsonl_file` or `sqlite`"),
)),
}
}
@@ -2288,4 +2291,27 @@ MYC_TRANSPORT_PUBLISH_MAX_BACKOFF_MILLIS=800
assert_eq!(reparsed, config);
}
+
+ #[test]
+ fn parse_runtime_audit_backend_supports_sqlite() {
+ let config = MycConfig::from_env_str(
+ r#"
+MYC_PATHS_SIGNER_IDENTITY_PATH=/tmp/signer.json
+MYC_PATHS_USER_IDENTITY_PATH=/tmp/user.json
+MYC_PERSISTENCE_RUNTIME_AUDIT_BACKEND=sqlite
+ "#,
+ )
+ .expect("config");
+
+ assert_eq!(
+ config.persistence.runtime_audit_backend,
+ MycRuntimeAuditBackend::Sqlite
+ );
+ assert!(
+ config
+ .to_env_string()
+ .expect("render env")
+ .contains("MYC_PERSISTENCE_RUNTIME_AUDIT_BACKEND=sqlite")
+ );
+ }
}
diff --git a/src/error.rs b/src/error.rs
@@ -6,6 +6,7 @@ use radroots_nostr::prelude::RadrootsNostrError;
use radroots_nostr_accounts::prelude::RadrootsNostrAccountsError;
use radroots_nostr_connect::prelude::RadrootsNostrConnectError;
use radroots_nostr_signer::prelude::RadrootsNostrSignerError;
+use radroots_sql_core::error::SqlError;
use thiserror::Error;
use crate::config::MycTransportDeliveryPolicy;
@@ -73,6 +74,18 @@ pub enum MycError {
#[source]
source: serde_json::Error,
},
+ #[error("audit sqlite error at {path}: {source}")]
+ AuditSql {
+ path: PathBuf,
+ #[source]
+ source: SqlError,
+ },
+ #[error("audit sqlite decode error at {path}: {source}")]
+ AuditSqlDecode {
+ path: PathBuf,
+ #[source]
+ source: serde_json::Error,
+ },
#[error("discovery io error at {path}: {source}")]
DiscoveryIo {
path: PathBuf,
diff --git a/src/lib.rs b/src/lib.rs
@@ -2,6 +2,7 @@
pub mod app;
pub mod audit;
+mod audit_sqlite;
pub mod cli;
pub mod config;
pub mod control;
@@ -18,6 +19,7 @@ pub use audit::{
MycJsonlOperationAuditStore, MycOperationAuditKind, MycOperationAuditOutcome,
MycOperationAuditRecord, MycOperationAuditStore,
};
+pub use audit_sqlite::MycSqliteOperationAuditStore;
pub use config::{
DEFAULT_ENV_PATH, MycAuditConfig, MycConfig, MycConnectionApproval, MycDiscoveryConfig,
MycDiscoveryMetadataConfig, MycIdentityBackend, MycIdentitySourceSpec, MycLoggingConfig,