commit 85faca6f29319af61eefdabfae659d406fec8888
parent f8b10791fad18e96e8a1b9bc38cb17a5f0464b65
Author: triesap <tyson@radroots.org>
Date: Sat, 6 Jun 2026 13:10:45 -0700
ops: add restore command
- add ops restore CLI dispatch and output reporting
- validate backup manifests before importing raw event JSONL
- rebuild projections after restore on the active store handle
- prove fresh-database restore with runtime and binary tests
Diffstat:
4 files changed, 484 insertions(+), 29 deletions(-)
diff --git a/crates/tangle/src/lib.rs b/crates/tangle/src/lib.rs
@@ -12,7 +12,8 @@ usage:
tangle event import --config PATH --input PATH
tangle event export --config PATH --output PATH
tangle projection rebuild --config PATH
- tangle ops backup --config PATH --output DIR";
+ tangle ops backup --config PATH --output DIR
+ tangle ops restore --config PATH --input DIR";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TangleCommand {
@@ -24,6 +25,7 @@ pub enum TangleCommand {
EventExport,
ProjectionRebuild,
OpsBackup,
+ OpsRestore,
}
impl TangleCommand {
@@ -37,6 +39,7 @@ impl TangleCommand {
Self::EventExport => "event export",
Self::ProjectionRebuild => "projection rebuild",
Self::OpsBackup => "ops backup",
+ Self::OpsRestore => "ops restore",
}
}
@@ -51,6 +54,7 @@ impl TangleCommand {
| Self::EventExport
| Self::ProjectionRebuild
| Self::OpsBackup
+ | Self::OpsRestore
)
}
}
@@ -193,6 +197,7 @@ where
};
match nested.as_str() {
"backup" => TangleCommand::OpsBackup,
+ "restore" => TangleCommand::OpsRestore,
_ => return Err(TangleCliError::UnknownCommand(format!("ops {nested}"))),
}
}
@@ -238,7 +243,12 @@ where
}
}
}
- if input_path.is_some() && command != TangleCommand::EventImport {
+ if input_path.is_some()
+ && !matches!(
+ command,
+ TangleCommand::EventImport | TangleCommand::OpsRestore
+ )
+ {
return Err(TangleCliError::UnexpectedArgument {
command: command.as_str().to_owned(),
argument: "--input".to_owned(),
@@ -324,6 +334,20 @@ pub fn ops_backup_output(report: &tangle_runtime::RuntimeBackupReport) -> String
)
}
+pub fn ops_restore_output(report: &tangle_runtime::RuntimeRestoreReport) -> String {
+ format!(
+ "restore directory: {}\nraw events: {}\nraw events sha256: {}\nevents inserted: {}\nevents duplicate: {}\nevents rebuilt: {}\nlistings projected: {}\nevents skipped: {}",
+ report.input_dir().display(),
+ report.raw_event_count(),
+ report.raw_events_sha256(),
+ report.import_report().inserted(),
+ report.import_report().duplicate(),
+ report.rebuild_report().rebuilt(),
+ report.rebuild_report().projected(),
+ report.import_report().skipped() + report.rebuild_report().skipped()
+ )
+}
+
pub async fn migrate_with_config(path: &str) -> Result<String, String> {
let config = tangle_runtime::load_runtime_config(path).map_err(|error| error.to_string())?;
initialize_tracing(config.tracing_config())?;
@@ -379,6 +403,16 @@ pub async fn ops_backup_with_config(config_path: &str, output_dir: &str) -> Resu
Ok(ops_backup_output(&report))
}
+pub async fn ops_restore_with_config(config_path: &str, input_dir: &str) -> Result<String, String> {
+ let config =
+ tangle_runtime::load_runtime_config(config_path).map_err(|error| error.to_string())?;
+ initialize_tracing(config.tracing_config())?;
+ let report = tangle_runtime::restore_runtime_database(&config, input_dir)
+ .await
+ .map_err(|error| error.to_string())?;
+ Ok(ops_restore_output(&report))
+}
+
pub async fn run_with_config(path: &str) -> Result<(), String> {
let config = tangle_runtime::load_runtime_config(path).map_err(|error| error.to_string())?;
initialize_tracing(config.tracing_config())?;
@@ -429,15 +463,15 @@ mod tests {
use super::{
PACKAGE_NAME, PACKAGE_VERSION, TangleCliError, TangleCommand, TangleInvocation,
event_export_output, event_import_output, initialize_tracing, migrate_output,
- ops_backup_output, parse_tangle_command, parse_tangle_invocation,
+ ops_backup_output, ops_restore_output, parse_tangle_command, parse_tangle_invocation,
projection_rebuild_output, require_config_path, require_input_path, require_output_path,
usage_output, version_output,
};
use std::path::PathBuf;
use tangle_runtime::{
RuntimeBackupReport, RuntimeEventExportReport, RuntimeEventImportReport,
- RuntimeMigrationReport, RuntimeProjectionRebuildReport, RuntimeTracingConfig,
- RuntimeTracingFormat,
+ RuntimeMigrationReport, RuntimeProjectionRebuildReport, RuntimeRestoreReport,
+ RuntimeTracingConfig, RuntimeTracingFormat,
};
#[test]
@@ -474,7 +508,7 @@ mod tests {
fn usage_output_lists_supported_command_model() {
assert_eq!(
usage_output(),
- "usage:\n tangle [--version]\n tangle migrate --config PATH\n tangle run --config PATH\n tangle event import --config PATH --input PATH\n tangle event export --config PATH --output PATH\n tangle projection rebuild --config PATH\n tangle ops backup --config PATH --output DIR"
+ "usage:\n tangle [--version]\n tangle migrate --config PATH\n tangle run --config PATH\n tangle event import --config PATH --input PATH\n tangle event export --config PATH --output PATH\n tangle projection rebuild --config PATH\n tangle ops backup --config PATH --output DIR\n tangle ops restore --config PATH --input DIR"
);
}
@@ -495,6 +529,7 @@ mod tests {
TangleCommand::ProjectionRebuild,
),
(vec!["ops", "backup"], TangleCommand::OpsBackup),
+ (vec!["ops", "restore"], TangleCommand::OpsRestore),
];
for (args, expected) in cases {
@@ -511,6 +546,7 @@ mod tests {
| TangleCommand::EventExport
| TangleCommand::ProjectionRebuild
| TangleCommand::OpsBackup
+ | TangleCommand::OpsRestore
)
);
}
@@ -539,6 +575,28 @@ mod tests {
}
#[test]
+ fn command_model_parses_ops_restore_input_option() {
+ let invocation = parse_tangle_invocation([
+ "ops",
+ "restore",
+ "--config",
+ "runtime.json",
+ "--input",
+ "backup-dir",
+ ])
+ .expect("invocation");
+ assert_eq!(invocation.command(), TangleCommand::OpsRestore);
+ assert_eq!(
+ require_config_path(&invocation).expect("config"),
+ "runtime.json"
+ );
+ assert_eq!(
+ require_input_path(&invocation).expect("input"),
+ "backup-dir"
+ );
+ }
+
+ #[test]
fn command_model_parses_export_output_option() {
let invocation = parse_tangle_invocation([
"event",
@@ -738,4 +796,23 @@ mod tests {
)
);
}
+
+ #[test]
+ fn ops_restore_output_reports_import_and_rebuild_counts() {
+ let report = RuntimeRestoreReport::new(
+ PathBuf::from("backup"),
+ 3,
+ "c".repeat(64),
+ RuntimeEventImportReport::new(3, 2, 1, 2, 0),
+ RuntimeProjectionRebuildReport::new(3, 3, 2, 0),
+ );
+
+ assert_eq!(
+ ops_restore_output(&report),
+ format!(
+ "restore directory: backup\nraw events: 3\nraw events sha256: {}\nevents inserted: 2\nevents duplicate: 1\nevents rebuilt: 3\nlistings projected: 2\nevents skipped: 0",
+ "c".repeat(64)
+ )
+ );
+ }
}
diff --git a/crates/tangle/src/main.rs b/crates/tangle/src/main.rs
@@ -78,6 +78,16 @@ fn main() -> ExitCode {
ExitCode::from(2)
}
},
+ tangle::TangleCommand::OpsRestore => match run_ops_restore(&invocation) {
+ Ok(output) => {
+ println!("{output}");
+ ExitCode::SUCCESS
+ }
+ Err(error) => {
+ eprintln!("{error}");
+ ExitCode::from(2)
+ }
+ },
}
}
@@ -137,3 +147,13 @@ fn run_ops_backup(invocation: &tangle::TangleInvocation) -> Result<String, Strin
.map_err(|error| format!("failed to start runtime: {error}"))?;
runtime.block_on(tangle::ops_backup_with_config(config_path, output_path))
}
+
+fn run_ops_restore(invocation: &tangle::TangleInvocation) -> Result<String, String> {
+ let config_path = tangle::require_config_path(invocation).map_err(|error| error.to_string())?;
+ let input_path = tangle::require_input_path(invocation).map_err(|error| error.to_string())?;
+ let runtime = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .map_err(|error| format!("failed to start runtime: {error}"))?;
+ runtime.block_on(tangle::ops_restore_with_config(config_path, input_path))
+}
diff --git a/crates/tangle/tests/version.rs b/crates/tangle/tests/version.rs
@@ -27,7 +27,7 @@ fn tangle_without_args_reports_usage() {
assert!(output.status.success());
assert_eq!(
String::from_utf8_lossy(&output.stdout),
- "usage:\n tangle [--version]\n tangle migrate --config PATH\n tangle run --config PATH\n tangle event import --config PATH --input PATH\n tangle event export --config PATH --output PATH\n tangle projection rebuild --config PATH\n tangle ops backup --config PATH --output DIR\n"
+ "usage:\n tangle [--version]\n tangle migrate --config PATH\n tangle run --config PATH\n tangle event import --config PATH --input PATH\n tangle event export --config PATH --output PATH\n tangle projection rebuild --config PATH\n tangle ops backup --config PATH --output DIR\n tangle ops restore --config PATH --input DIR\n"
);
assert!(output.stderr.is_empty());
}
@@ -43,7 +43,7 @@ fn tangle_unknown_arg_reports_usage_error() {
assert!(output.stdout.is_empty());
assert_eq!(
String::from_utf8_lossy(&output.stderr),
- "unknown command: --unknown\nusage:\n tangle [--version]\n tangle migrate --config PATH\n tangle run --config PATH\n tangle event import --config PATH --input PATH\n tangle event export --config PATH --output PATH\n tangle projection rebuild --config PATH\n tangle ops backup --config PATH --output DIR\n"
+ "unknown command: --unknown\nusage:\n tangle [--version]\n tangle migrate --config PATH\n tangle run --config PATH\n tangle event import --config PATH --input PATH\n tangle event export --config PATH --output PATH\n tangle projection rebuild --config PATH\n tangle ops backup --config PATH --output DIR\n tangle ops restore --config PATH --input DIR\n"
);
}
@@ -107,6 +107,8 @@ async fn tangle_event_import_command_imports_canonical_jsonl() {
let input_path = root.join("events.jsonl");
let output_path = root.join("exported.jsonl");
let backup_path = root.join("backup");
+ let restore_db_path = root.join("restore-db");
+ let restore_config_path = root.join("restore-runtime.json");
std::fs::create_dir_all(&root).expect("runtime root");
write_rocksdb_config(&config_path, &db_path, "tangle_cli_import");
std::fs::write(&input_path, format!("{}\n", event_to_value(&listing)))
@@ -204,6 +206,26 @@ async fn tangle_event_import_command_imports_canonical_jsonl() {
assert!(manifest["surrealdb_export"]["path"].is_null());
assert!(manifest["surrealdb_export"]["sha256"].is_null());
+ write_rocksdb_config(&restore_config_path, &restore_db_path, "tangle_cli_restore");
+ let restore = Command::new(env!("CARGO_BIN_EXE_tangle"))
+ .args(["ops", "restore", "--config"])
+ .arg(&restore_config_path)
+ .args(["--input"])
+ .arg(&backup_path)
+ .output()
+ .expect("run tangle ops restore");
+
+ assert!(restore.status.success());
+ assert!(restore.stderr.is_empty());
+ let restore_stdout = String::from_utf8_lossy(&restore.stdout);
+ assert!(restore_stdout.starts_with(&format!(
+ "restore directory: {}\nraw events: 1\nraw events sha256: ",
+ backup_path.display()
+ )));
+ assert!(restore_stdout.contains(
+ "\nevents inserted: 1\nevents duplicate: 0\nevents rebuilt: 1\nlistings projected: 1\nevents skipped: 0\n"
+ ));
+
let rebuild = Command::new(env!("CARGO_BIN_EXE_tangle"))
.args(["projection", "rebuild", "--config"])
.arg(&config_path)
@@ -232,6 +254,36 @@ async fn tangle_event_import_command_imports_canonical_jsonl() {
let seller = FixtureKey::Seller.public_key();
let listing_key = format!("30402:{}:listing-a", seller.as_str());
+ let restore_store_config = SurrealConnectionConfig::rocksdb(
+ restore_db_path.to_str().expect("restore db path"),
+ "tangle_cli_restore",
+ "relay",
+ )
+ .expect("restore store config");
+ let restore_store = reopen_store(&restore_store_config).await;
+ assert!(
+ restore_store
+ .raw_event_row(listing.id())
+ .await
+ .expect("restore raw row")
+ .is_some()
+ );
+ assert!(
+ restore_store
+ .listing_current_row(&listing_key)
+ .await
+ .expect("restore listing row")
+ .is_some()
+ );
+ assert!(
+ restore_store
+ .search_document_row(&listing_key)
+ .await
+ .expect("restore search row")
+ .is_some()
+ );
+ drop(restore_store);
+
let store_config = SurrealConnectionConfig::rocksdb(
db_path.to_str().expect("db path"),
"tangle_cli_import",
diff --git a/crates/tangle_runtime/src/lib.rs b/crates/tangle_runtime/src/lib.rs
@@ -15,7 +15,7 @@ use std::{
collections::BTreeSet,
fs,
net::SocketAddr,
- path::{Path as FsPath, PathBuf},
+ path::{Component, Path as FsPath, PathBuf},
sync::{
Arc,
atomic::{AtomicU64, Ordering},
@@ -621,6 +621,24 @@ pub async fn import_events_from_path(
})?;
let events = parse_event_import_document(&raw)?;
let store = connect_runtime_store(config).await?;
+ let report = import_events_into_store(config, &store, events).await?;
+ tracing::info!(
+ command = "event import",
+ total = report.total(),
+ inserted = report.inserted(),
+ duplicate = report.duplicate(),
+ projected = report.projected(),
+ skipped = report.skipped(),
+ "finished event import"
+ );
+ Ok(report)
+}
+
+async fn import_events_into_store(
+ config: &TangleRuntimeConfig,
+ store: &SurrealStore,
+ events: Vec<Event>,
+) -> Result<RuntimeEventImportReport, RuntimeCommandError> {
store
.apply_plan(&base_migration_plan())
.await
@@ -635,18 +653,9 @@ pub async fn import_events_from_path(
let mut report = RuntimeEventImportReport::default();
let now = now_timestamp();
for event in events {
- let outcome = import_single_event(&store, &validator, event, now).await?;
+ let outcome = import_single_event(store, &validator, event, now).await?;
report.record(outcome);
}
- tracing::info!(
- command = "event import",
- total = report.total(),
- inserted = report.inserted(),
- duplicate = report.duplicate(),
- projected = report.projected(),
- skipped = report.skipped(),
- "finished event import"
- );
Ok(report)
}
@@ -1037,6 +1046,161 @@ async fn backup_runtime_store(
))
}
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct RuntimeRestoreReport {
+ input_dir: PathBuf,
+ raw_event_count: u64,
+ raw_events_sha256: String,
+ import_report: RuntimeEventImportReport,
+ rebuild_report: RuntimeProjectionRebuildReport,
+}
+
+impl RuntimeRestoreReport {
+ pub fn new(
+ input_dir: PathBuf,
+ raw_event_count: u64,
+ raw_events_sha256: String,
+ import_report: RuntimeEventImportReport,
+ rebuild_report: RuntimeProjectionRebuildReport,
+ ) -> Self {
+ Self {
+ input_dir,
+ raw_event_count,
+ raw_events_sha256,
+ import_report,
+ rebuild_report,
+ }
+ }
+
+ pub fn input_dir(&self) -> &FsPath {
+ &self.input_dir
+ }
+
+ pub fn raw_event_count(&self) -> u64 {
+ self.raw_event_count
+ }
+
+ pub fn raw_events_sha256(&self) -> &str {
+ &self.raw_events_sha256
+ }
+
+ pub fn import_report(&self) -> RuntimeEventImportReport {
+ self.import_report
+ }
+
+ pub fn rebuild_report(&self) -> RuntimeProjectionRebuildReport {
+ self.rebuild_report
+ }
+}
+
+pub async fn restore_runtime_database(
+ config: &TangleRuntimeConfig,
+ input_dir: impl AsRef<FsPath>,
+) -> Result<RuntimeRestoreReport, RuntimeCommandError> {
+ let input_dir = input_dir.as_ref();
+ tracing::info!(
+ command = "ops restore",
+ input_dir = input_dir.display().to_string(),
+ "starting runtime restore"
+ );
+ let store = connect_runtime_store(config).await?;
+ let report = restore_runtime_store(config, &store, input_dir).await?;
+ tracing::info!(
+ command = "ops restore",
+ raw_event_count = report.raw_event_count(),
+ raw_events_sha256 = report.raw_events_sha256(),
+ inserted = report.import_report().inserted(),
+ duplicate = report.import_report().duplicate(),
+ rebuilt = report.rebuild_report().rebuilt(),
+ "finished runtime restore"
+ );
+ Ok(report)
+}
+
+async fn restore_runtime_store(
+ config: &TangleRuntimeConfig,
+ store: &SurrealStore,
+ input_dir: &FsPath,
+) -> Result<RuntimeRestoreReport, RuntimeCommandError> {
+ let manifest_path = input_dir.join("manifest.json");
+ let manifest_raw = fs::read_to_string(&manifest_path).map_err(|error| {
+ RuntimeCommandError::input(format!(
+ "failed to read backup manifest file `{}`: {error}",
+ manifest_path.display()
+ ))
+ })?;
+ let manifest: RuntimeBackupManifestDocument =
+ serde_json::from_str(&manifest_raw).map_err(|error| {
+ RuntimeCommandError::input(format!("backup manifest JSON is invalid: {error}"))
+ })?;
+ validate_backup_manifest(&manifest)?;
+ let raw_events_path = backup_artifact_path(input_dir, &manifest.raw_events.path)?;
+ let raw_events = fs::read_to_string(&raw_events_path).map_err(|error| {
+ RuntimeCommandError::input(format!(
+ "failed to read backup raw events file `{}`: {error}",
+ raw_events_path.display()
+ ))
+ })?;
+ let raw_events_sha256 = sha256_hex(raw_events.as_bytes());
+ if raw_events_sha256 != manifest.raw_events.sha256 {
+ return Err(RuntimeCommandError::input(format!(
+ "backup raw events checksum mismatch: expected {}, got {}",
+ manifest.raw_events.sha256, raw_events_sha256
+ )));
+ }
+ let events = parse_event_import_document(&raw_events)?;
+ if events.len() as u64 != manifest.raw_events.count {
+ return Err(RuntimeCommandError::input(format!(
+ "backup raw events count mismatch: expected {}, got {}",
+ manifest.raw_events.count,
+ events.len()
+ )));
+ }
+ let import_report = import_events_into_store(config, store, events).await?;
+ let rebuild_report = rebuild_projections_in_store(config, store).await?;
+ Ok(RuntimeRestoreReport::new(
+ input_dir.to_path_buf(),
+ manifest.raw_events.count,
+ raw_events_sha256,
+ import_report,
+ rebuild_report,
+ ))
+}
+
+fn validate_backup_manifest(
+ manifest: &RuntimeBackupManifestDocument,
+) -> Result<(), RuntimeCommandError> {
+ if manifest.format != "tangle-backup-v1" {
+ return Err(RuntimeCommandError::input(format!(
+ "backup manifest format is unsupported: {}",
+ manifest.format
+ )));
+ }
+ if manifest.raw_events.path.trim().is_empty() {
+ return Err(RuntimeCommandError::input(
+ "backup manifest raw_events.path must not be empty",
+ ));
+ }
+ Ok(())
+}
+
+fn backup_artifact_path(
+ input_dir: &FsPath,
+ artifact: &str,
+) -> Result<PathBuf, RuntimeCommandError> {
+ let path = FsPath::new(artifact);
+ if path.is_absolute()
+ || path
+ .components()
+ .any(|component| matches!(component, Component::ParentDir))
+ {
+ return Err(RuntimeCommandError::input(
+ "backup manifest artifact paths must be relative to the backup directory",
+ ));
+ }
+ Ok(input_dir.join(path))
+}
+
fn runtime_row_string(
row: &serde_json::Value,
field: &'static str,
@@ -1124,6 +1288,22 @@ pub async fn rebuild_projections(
"starting projection rebuild"
);
let store = connect_runtime_store(config).await?;
+ let report = rebuild_projections_in_store(config, &store).await?;
+ tracing::info!(
+ command = "projection rebuild",
+ scanned = report.scanned(),
+ rebuilt = report.rebuilt(),
+ projected = report.projected(),
+ skipped = report.skipped(),
+ "finished projection rebuild"
+ );
+ Ok(report)
+}
+
+async fn rebuild_projections_in_store(
+ config: &TangleRuntimeConfig,
+ store: &SurrealStore,
+) -> Result<RuntimeProjectionRebuildReport, RuntimeCommandError> {
store
.apply_plan(&base_migration_plan())
.await
@@ -1146,17 +1326,9 @@ pub async fn rebuild_projections(
.map_err(|error| RuntimeCommandError::store(error.to_string()))?;
let event = parse_event_json(&raw)
.map_err(|error| RuntimeCommandError::store(error.to_string()))?;
- let outcome = rebuild_single_event_projection(&store, &validator, event, now).await?;
+ let outcome = rebuild_single_event_projection(store, &validator, event, now).await?;
report.record(outcome);
}
- tracing::info!(
- command = "projection rebuild",
- scanned = report.scanned(),
- rebuilt = report.rebuilt(),
- projected = report.projected(),
- skipped = report.skipped(),
- "finished projection rebuild"
- );
Ok(report)
}
@@ -4725,7 +4897,7 @@ mod tests {
listing_item_document, listing_projection_query, listings_router, load_runtime_config,
metrics_router, migrate_runtime_database, parse_listing_query,
parse_marketplace_search_query, parse_runtime_config_json, relay_info_router,
- search_document_query, websocket_router,
+ restore_runtime_store, search_document_query, websocket_router,
};
use axum::{body::Body, response::IntoResponse};
use http::{HeaderValue, Request, StatusCode, header};
@@ -5467,6 +5639,140 @@ mod tests {
}
#[tokio::test]
+ async fn runtime_restore_command_imports_backup_and_rebuilds_projection_state() {
+ let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing");
+ let root = std::env::temp_dir().join(format!(
+ "tangle-runtime-restore-{}-{}",
+ std::process::id(),
+ &listing.id().as_str()[..8]
+ ));
+ let _ = std::fs::remove_dir_all(&root);
+ let source_db_path = root.join("source-db");
+ let restore_db_path = root.join("restore-db");
+ let backup_path = root.join("backup");
+ std::fs::create_dir_all(&root).expect("runtime root");
+ let source_config_json = serde_json::json!({
+ "server": {
+ "listen_addr": "127.0.0.1:7302",
+ "relay_url": "ws://127.0.0.1:7302"
+ },
+ "database": {
+ "mode": "rocks_db",
+ "path": source_db_path.to_str().expect("source db path"),
+ "namespace": "tangle_restore_source",
+ "database": "relay"
+ },
+ "auth": {
+ "challenge_ttl_seconds": 300
+ },
+ "limits": {
+ "message_rate_limit": {
+ "limit": 120,
+ "window_seconds": 60
+ }
+ },
+ "policy": {
+ "approved_sellers": [FixtureKey::Seller.public_key().as_str()]
+ }
+ });
+ let restore_config_json = serde_json::json!({
+ "server": {
+ "listen_addr": "127.0.0.1:7303",
+ "relay_url": "ws://127.0.0.1:7303"
+ },
+ "database": {
+ "mode": "rocks_db",
+ "path": restore_db_path.to_str().expect("restore db path"),
+ "namespace": "tangle_restore_destination",
+ "database": "relay"
+ },
+ "auth": {
+ "challenge_ttl_seconds": 300
+ },
+ "limits": {
+ "message_rate_limit": {
+ "limit": 120,
+ "window_seconds": 60
+ }
+ },
+ "policy": {
+ "approved_sellers": [FixtureKey::Seller.public_key().as_str()]
+ }
+ });
+ let source_config = parse_runtime_config_json(
+ &serde_json::to_string(&source_config_json).expect("source config JSON"),
+ )
+ .expect("source config");
+ let restore_config = parse_runtime_config_json(
+ &serde_json::to_string(&restore_config_json).expect("restore config JSON"),
+ )
+ .expect("restore config");
+ let source_store = SurrealStore::connect(source_config.database_config())
+ .await
+ .expect("source store");
+ source_store
+ .apply_plan(&base_migration_plan())
+ .await
+ .expect("source migrations");
+ assert_eq!(
+ source_store
+ .store_raw_event(&StoredEvent::new(
+ listing.clone(),
+ UnixTimestamp::new(1_714_124_500)
+ ))
+ .await
+ .expect("source raw event"),
+ StoreEventOutcome::Inserted
+ );
+ let backup_report = backup_runtime_store(&source_config, &source_store, &backup_path)
+ .await
+ .expect("backup");
+ assert_eq!(backup_report.raw_event_count(), 1);
+ drop(source_store);
+
+ let restore_store = SurrealStore::connect(restore_config.database_config())
+ .await
+ .expect("restore store");
+ let restore_report = restore_runtime_store(&restore_config, &restore_store, &backup_path)
+ .await
+ .expect("restore");
+ assert_eq!(restore_report.raw_event_count(), 1);
+ assert_eq!(restore_report.import_report().inserted(), 1);
+ assert_eq!(restore_report.import_report().duplicate(), 0);
+ assert_eq!(restore_report.rebuild_report().rebuilt(), 1);
+ assert_eq!(restore_report.rebuild_report().projected(), 1);
+ assert_eq!(
+ restore_report.raw_events_sha256(),
+ backup_report.raw_events_sha256()
+ );
+ let seller = FixtureKey::Seller.public_key();
+ let listing_key = format!("30402:{}:listing-a", seller.as_str());
+ assert!(
+ restore_store
+ .raw_event_row(listing.id())
+ .await
+ .expect("raw row")
+ .is_some()
+ );
+ assert!(
+ restore_store
+ .listing_current_row(&listing_key)
+ .await
+ .expect("listing row")
+ .is_some()
+ );
+ assert!(
+ restore_store
+ .search_document_row(&listing_key)
+ .await
+ .expect("search row")
+ .is_some()
+ );
+ drop(restore_store);
+ std::fs::remove_dir_all(&root).expect("remove runtime root");
+ }
+
+ #[tokio::test]
async fn websocket_route_requires_upgrade_headers() {
let response = websocket_router(WebSocketHttpState::default())
.oneshot(