tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 9035a5f813ace390e4ac29f96da367a4839b06ea
parent 4b49dfefdf4106dc7f10ad4af00e9d38d53dcf9d
Author: triesap <tyson@radroots.org>
Date:   Wed, 17 Jun 2026 16:36:00 -0700

feat: add tenant backup restore export

- add tenant-scoped backup bundles with redacted config and checksums

- add checksum-verified clean-target restore

- add tenant-local JSONL export with manifests

- expose the approved tenant admin CLI commands

Diffstat:
MCargo.lock | 1+
Mcrates/tangle/src/lib.rs | 372+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
Mcrates/tangle/src/main.rs | 81+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcrates/tangle/tests/version.rs | 4++--
Mcrates/tangle_runtime/Cargo.toml | 1+
Acrates/tangle_runtime/src/backup.rs | 815+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acrates/tangle_runtime/src/export.rs | 382+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcrates/tangle_runtime/src/lib.rs | 2++
Mcrates/tangle_runtime/src/pocket_conversion.rs | 6+++---
9 files changed, 1653 insertions(+), 11 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -1328,6 +1328,7 @@ dependencies = [ "http", "serde", "serde_json", + "sha2", "tangle_crypto", "tangle_groups", "tangle_protocol", diff --git a/crates/tangle/src/lib.rs b/crates/tangle/src/lib.rs @@ -10,7 +10,10 @@ usage: tangle run --config PATH tangle config validate --config PATH tangle config inspect --config PATH --redacted - tangle tenant list --config PATH"; + tangle tenant list --config PATH + tangle tenant backup --config PATH --tenant TENANT_ID --output PATH + tangle tenant restore --config PATH --tenant TENANT_ID --input PATH --target-data-dir PATH + tangle tenant export --config PATH --tenant TENANT_ID --output PATH"; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum TangleCommand { @@ -20,6 +23,9 @@ pub enum TangleCommand { ConfigValidate, ConfigInspect, TenantList, + TenantBackup, + TenantRestore, + TenantExport, } impl TangleCommand { @@ -31,6 +37,9 @@ impl TangleCommand { Self::ConfigValidate => "config validate", Self::ConfigInspect => "config inspect", Self::TenantList => "tenant list", + Self::TenantBackup => "tenant backup", + Self::TenantRestore => "tenant restore", + Self::TenantExport => "tenant export", } } } @@ -39,7 +48,12 @@ impl TangleCommand { pub struct TangleInvocation { command: TangleCommand, config_path: Option<String>, + tenant_id: Option<String>, + input_path: Option<String>, + output_path: Option<String>, + target_data_dir: Option<String>, redacted: bool, + include_secrets: bool, } impl TangleInvocation { @@ -55,7 +69,33 @@ impl TangleInvocation { Self { command, config_path, + tenant_id: None, + input_path: None, + output_path: None, + target_data_dir: None, redacted, + include_secrets: false, + } + } + + pub fn new_with_admin_options( + command: TangleCommand, + config_path: Option<String>, + tenant_id: Option<String>, + input_path: Option<String>, + output_path: Option<String>, + target_data_dir: Option<String>, + include_secrets: bool, + ) -> Self { + Self { + command, + config_path, + tenant_id, + input_path, + output_path, + target_data_dir, + redacted: false, + include_secrets, } } @@ -67,9 +107,29 @@ impl TangleInvocation { self.config_path.as_deref() } + pub fn tenant_id(&self) -> Option<&str> { + self.tenant_id.as_deref() + } + + pub fn input_path(&self) -> Option<&str> { + self.input_path.as_deref() + } + + pub fn output_path(&self) -> Option<&str> { + self.output_path.as_deref() + } + + pub fn target_data_dir(&self) -> Option<&str> { + self.target_data_dir.as_deref() + } + pub fn redacted(&self) -> bool { self.redacted } + + pub fn include_secrets(&self) -> bool { + self.include_secrets + } } #[derive(Debug, Clone, PartialEq, Eq)] @@ -141,6 +201,9 @@ where }, "tenant" => match args.next().as_deref() { Some("list") => TangleCommand::TenantList, + Some("backup") => TangleCommand::TenantBackup, + Some("restore") => TangleCommand::TenantRestore, + Some("export") => TangleCommand::TenantExport, Some(command) => { return Err(TangleCliError::UnknownCommand(format!("tenant {command}"))); } @@ -149,7 +212,12 @@ where _ => return Err(TangleCliError::UnknownCommand(first)), }; let mut config_path = None; + let mut tenant_id = None; + let mut input_path = None; + let mut output_path = None; + let mut target_data_dir = None; let mut redacted = false; + let mut include_secrets = false; while let Some(argument) = args.next() { match argument.as_str() { "--config" => { @@ -161,9 +229,48 @@ where }; config_path = Some(path); } + "--tenant" if command_accepts_tenant(command) => { + if tenant_id.is_some() { + return Err(TangleCliError::RepeatedOption("--tenant")); + } + let Some(value) = args.next() else { + return Err(TangleCliError::MissingOptionValue("--tenant")); + }; + tenant_id = Some(value); + } + "--input" if command == TangleCommand::TenantRestore => { + if input_path.is_some() { + return Err(TangleCliError::RepeatedOption("--input")); + } + let Some(path) = args.next() else { + return Err(TangleCliError::MissingOptionValue("--input")); + }; + input_path = Some(path); + } + "--output" if command_accepts_output(command) => { + if output_path.is_some() { + return Err(TangleCliError::RepeatedOption("--output")); + } + let Some(path) = args.next() else { + return Err(TangleCliError::MissingOptionValue("--output")); + }; + output_path = Some(path); + } + "--target-data-dir" if command == TangleCommand::TenantRestore => { + if target_data_dir.is_some() { + return Err(TangleCliError::RepeatedOption("--target-data-dir")); + } + let Some(path) = args.next() else { + return Err(TangleCliError::MissingOptionValue("--target-data-dir")); + }; + target_data_dir = Some(path); + } "--redacted" if command == TangleCommand::ConfigInspect => { redacted = true; } + "--include-secrets" if command == TangleCommand::TenantBackup => { + include_secrets = true; + } _ => { return Err(TangleCliError::UnexpectedArgument { command: command.as_str().to_owned(), @@ -178,11 +285,16 @@ where argument: "--config".to_owned(), }); } - Ok(TangleInvocation::new_with_options( + Ok(TangleInvocation { command, config_path, + tenant_id, + input_path, + output_path, + target_data_dir, redacted, - )) + include_secrets, + }) } pub fn require_config_path(invocation: &TangleInvocation) -> Result<&str, TangleCliError> { @@ -191,6 +303,30 @@ pub fn require_config_path(invocation: &TangleInvocation) -> Result<&str, Tangle .ok_or(TangleCliError::MissingOptionValue("--config")) } +pub fn require_tenant_id(invocation: &TangleInvocation) -> Result<&str, TangleCliError> { + invocation + .tenant_id() + .ok_or(TangleCliError::MissingOptionValue("--tenant")) +} + +pub fn require_input_path(invocation: &TangleInvocation) -> Result<&str, TangleCliError> { + invocation + .input_path() + .ok_or(TangleCliError::MissingOptionValue("--input")) +} + +pub fn require_output_path(invocation: &TangleInvocation) -> Result<&str, TangleCliError> { + invocation + .output_path() + .ok_or(TangleCliError::MissingOptionValue("--output")) +} + +pub fn require_target_data_dir(invocation: &TangleInvocation) -> Result<&str, TangleCliError> { + invocation + .target_data_dir() + .ok_or(TangleCliError::MissingOptionValue("--target-data-dir")) +} + pub async fn run_with_config( config_path: &str, ) -> Result<tangle_runtime::server::TangleServeReport, String> { @@ -297,6 +433,48 @@ pub fn list_tenants(config_path: &str) -> Result<String, String> { Ok(lines.join("\n")) } +pub fn backup_tenant( + config_path: &str, + tenant_id: &str, + output: &str, + include_secrets: bool, +) -> Result<String, String> { + let report = + tangle_runtime::backup::backup_tenant(tangle_runtime::backup::TenantBackupRequest { + config_path, + tenant_id, + output, + include_secrets, + })?; + serde_json::to_string_pretty(&report).map_err(|error| error.to_string()) +} + +pub fn restore_tenant( + config_path: &str, + tenant_id: &str, + input: &str, + target_data_dir: &str, +) -> Result<String, String> { + let report = + tangle_runtime::backup::restore_tenant(tangle_runtime::backup::TenantRestoreRequest { + config_path, + tenant_id, + input, + target_data_dir, + })?; + serde_json::to_string_pretty(&report).map_err(|error| error.to_string()) +} + +pub fn export_tenant(config_path: &str, tenant_id: &str, output: &str) -> Result<String, String> { + let report = + tangle_runtime::export::export_tenant(tangle_runtime::export::TenantExportRequest { + config_path, + tenant_id, + output, + })?; + serde_json::to_string_pretty(&report).map_err(|error| error.to_string()) +} + fn command_accepts_config(command: TangleCommand) -> bool { matches!( command, @@ -304,6 +482,23 @@ fn command_accepts_config(command: TangleCommand) -> bool { | TangleCommand::ConfigValidate | TangleCommand::ConfigInspect | TangleCommand::TenantList + | TangleCommand::TenantBackup + | TangleCommand::TenantRestore + | TangleCommand::TenantExport + ) +} + +fn command_accepts_tenant(command: TangleCommand) -> bool { + matches!( + command, + TangleCommand::TenantBackup | TangleCommand::TenantRestore | TangleCommand::TenantExport + ) +} + +fn command_accepts_output(command: TangleCommand) -> bool { + matches!( + command, + TangleCommand::TenantBackup | TangleCommand::TenantExport ) } @@ -311,8 +506,9 @@ fn command_accepts_config(command: TangleCommand) -> bool { mod tests { use super::{ PACKAGE_NAME, PACKAGE_VERSION, TangleCliError, TangleCommand, TangleInvocation, - inspect_config, list_tenants, parse_tangle_invocation, require_config_path, usage_output, - validate_config, version_output, + inspect_config, list_tenants, parse_tangle_invocation, require_config_path, + require_input_path, require_output_path, require_target_data_dir, require_tenant_id, + usage_output, validate_config, version_output, }; #[test] @@ -326,7 +522,7 @@ mod tests { fn usage_lists_only_v2_command_surface() { assert_eq!( usage_output(), - "usage:\n tangle [--version]\n tangle run --config PATH\n tangle config validate --config PATH\n tangle config inspect --config PATH --redacted\n tangle tenant list --config PATH" + "usage:\n tangle [--version]\n tangle run --config PATH\n tangle config validate --config PATH\n tangle config inspect --config PATH --redacted\n tangle tenant list --config PATH\n tangle tenant backup --config PATH --tenant TENANT_ID --output PATH\n tangle tenant restore --config PATH --tenant TENANT_ID --input PATH --target-data-dir PATH\n tangle tenant export --config PATH --tenant TENANT_ID --output PATH" ); } @@ -389,6 +585,74 @@ mod tests { Some("config/tangle.host.example.json".to_owned()) ) ); + assert_eq!( + parse_tangle_invocation([ + "tenant", + "backup", + "--config", + "config/tangle.host.example.json", + "--tenant", + "farmers-market", + "--output", + "backup" + ]) + .expect("tenant backup"), + TangleInvocation::new_with_admin_options( + TangleCommand::TenantBackup, + Some("config/tangle.host.example.json".to_owned()), + Some("farmers-market".to_owned()), + None, + Some("backup".to_owned()), + None, + false + ) + ); + assert_eq!( + parse_tangle_invocation([ + "tenant", + "restore", + "--config", + "config/tangle.host.example.json", + "--tenant", + "farmers-market", + "--input", + "backup", + "--target-data-dir", + "runtime/restored" + ]) + .expect("tenant restore"), + TangleInvocation::new_with_admin_options( + TangleCommand::TenantRestore, + Some("config/tangle.host.example.json".to_owned()), + Some("farmers-market".to_owned()), + Some("backup".to_owned()), + None, + Some("runtime/restored".to_owned()), + false + ) + ); + assert_eq!( + parse_tangle_invocation([ + "tenant", + "export", + "--config", + "config/tangle.host.example.json", + "--tenant", + "farmers-market", + "--output", + "events.jsonl" + ]) + .expect("tenant export"), + TangleInvocation::new_with_admin_options( + TangleCommand::TenantExport, + Some("config/tangle.host.example.json".to_owned()), + Some("farmers-market".to_owned()), + None, + Some("events.jsonl".to_owned()), + None, + false + ) + ); } #[test] @@ -446,6 +710,102 @@ mod tests { } #[test] + fn tenant_admin_required_options_report_missing_values() { + let backup = TangleInvocation::new(TangleCommand::TenantBackup, None); + let restore = TangleInvocation::new(TangleCommand::TenantRestore, None); + + assert_eq!( + require_tenant_id(&backup).expect_err("tenant"), + TangleCliError::MissingOptionValue("--tenant") + ); + assert_eq!( + require_output_path(&backup).expect_err("output"), + TangleCliError::MissingOptionValue("--output") + ); + assert_eq!( + require_input_path(&restore).expect_err("input"), + TangleCliError::MissingOptionValue("--input") + ); + assert_eq!( + require_target_data_dir(&restore).expect_err("target"), + TangleCliError::MissingOptionValue("--target-data-dir") + ); + } + + #[test] + fn tenant_admin_parser_accepts_backup_restore_export_commands() { + assert_eq!( + parse_tangle_invocation([ + "tenant", + "backup", + "--config", + "host.json", + "--tenant", + "market", + "--output", + "backup", + "--include-secrets" + ]) + .expect("backup"), + TangleInvocation::new_with_admin_options( + TangleCommand::TenantBackup, + Some("host.json".to_owned()), + Some("market".to_owned()), + None, + Some("backup".to_owned()), + None, + true + ) + ); + assert_eq!( + parse_tangle_invocation([ + "tenant", + "restore", + "--config", + "host.json", + "--tenant", + "market", + "--input", + "backup", + "--target-data-dir", + "runtime/market" + ]) + .expect("restore"), + TangleInvocation::new_with_admin_options( + TangleCommand::TenantRestore, + Some("host.json".to_owned()), + Some("market".to_owned()), + Some("backup".to_owned()), + None, + Some("runtime/market".to_owned()), + false + ) + ); + assert_eq!( + parse_tangle_invocation([ + "tenant", + "export", + "--config", + "host.json", + "--tenant", + "market", + "--output", + "events.jsonl" + ]) + .expect("export"), + TangleInvocation::new_with_admin_options( + TangleCommand::TenantExport, + Some("host.json".to_owned()), + Some("market".to_owned()), + None, + Some("events.jsonl".to_owned()), + None, + false + ) + ); + } + + #[test] fn config_commands_use_new_host_config_surface() { let config_path = workspace_file("config/tangle.host.example.json"); validate_config(&config_path).expect("validate"); diff --git a/crates/tangle/src/main.rs b/crates/tangle/src/main.rs @@ -65,6 +65,36 @@ async fn main() -> ExitCode { } } } + tangle::TangleCommand::TenantBackup => match run_tenant_backup(&invocation) { + Ok(output) => { + println!("{output}"); + ExitCode::SUCCESS + } + Err(error) => { + eprintln!("{error}"); + ExitCode::from(2) + } + }, + tangle::TangleCommand::TenantRestore => match run_tenant_restore(&invocation) { + Ok(output) => { + println!("{output}"); + ExitCode::SUCCESS + } + Err(error) => { + eprintln!("{error}"); + ExitCode::from(2) + } + }, + tangle::TangleCommand::TenantExport => match run_tenant_export(&invocation) { + Ok(output) => { + println!("{output}"); + ExitCode::SUCCESS + } + Err(error) => { + eprintln!("{error}"); + ExitCode::from(2) + } + }, } } @@ -79,6 +109,52 @@ fn config_path_or_error(invocation: &tangle::TangleInvocation) -> Result<String, .map_err(|error| error.to_string()) } +fn tenant_id_or_error(invocation: &tangle::TangleInvocation) -> Result<String, String> { + tangle::require_tenant_id(invocation) + .map(str::to_owned) + .map_err(|error| error.to_string()) +} + +fn output_path_or_error(invocation: &tangle::TangleInvocation) -> Result<String, String> { + tangle::require_output_path(invocation) + .map(str::to_owned) + .map_err(|error| error.to_string()) +} + +fn input_path_or_error(invocation: &tangle::TangleInvocation) -> Result<String, String> { + tangle::require_input_path(invocation) + .map(str::to_owned) + .map_err(|error| error.to_string()) +} + +fn target_data_dir_or_error(invocation: &tangle::TangleInvocation) -> Result<String, String> { + tangle::require_target_data_dir(invocation) + .map(str::to_owned) + .map_err(|error| error.to_string()) +} + +fn run_tenant_backup(invocation: &tangle::TangleInvocation) -> Result<String, String> { + let config = config_path_or_error(invocation)?; + let tenant = tenant_id_or_error(invocation)?; + let output = output_path_or_error(invocation)?; + tangle::backup_tenant(&config, &tenant, &output, invocation.include_secrets()) +} + +fn run_tenant_restore(invocation: &tangle::TangleInvocation) -> Result<String, String> { + let config = config_path_or_error(invocation)?; + let tenant = tenant_id_or_error(invocation)?; + let input = input_path_or_error(invocation)?; + let target_data_dir = target_data_dir_or_error(invocation)?; + tangle::restore_tenant(&config, &tenant, &input, &target_data_dir) +} + +fn run_tenant_export(invocation: &tangle::TangleInvocation) -> Result<String, String> { + let config = config_path_or_error(invocation)?; + let tenant = tenant_id_or_error(invocation)?; + let output = output_path_or_error(invocation)?; + tangle::export_tenant(&config, &tenant, &output) +} + #[cfg(test)] mod tests { #[tokio::test] @@ -88,5 +164,10 @@ mod tests { super::run_server(&run).await.expect_err("run config"), "--config requires a value" ); + let backup = tangle::TangleInvocation::new(tangle::TangleCommand::TenantBackup, None); + assert_eq!( + super::run_tenant_backup(&backup).expect_err("backup config"), + "--config requires a value" + ); } } diff --git a/crates/tangle/tests/version.rs b/crates/tangle/tests/version.rs @@ -29,7 +29,7 @@ fn tangle_without_args_reports_usage() { assert!(output.status.success()); assert_eq!( String::from_utf8_lossy(&output.stdout), - "usage:\n tangle [--version]\n tangle run --config PATH\n tangle config validate --config PATH\n tangle config inspect --config PATH --redacted\n tangle tenant list --config PATH\n" + format!("{}\n", tangle::usage_output()) ); assert!(output.stderr.is_empty()); } @@ -45,7 +45,7 @@ fn tangle_unknown_arg_reports_usage_error() { assert!(output.stdout.is_empty()); assert_eq!( String::from_utf8_lossy(&output.stderr), - "unknown command: --unknown\nusage:\n tangle [--version]\n tangle run --config PATH\n tangle config validate --config PATH\n tangle config inspect --config PATH --redacted\n tangle tenant list --config PATH\n" + format!("unknown command: --unknown\n{}\n", tangle::usage_output()) ); } diff --git a/crates/tangle_runtime/Cargo.toml b/crates/tangle_runtime/Cargo.toml @@ -13,6 +13,7 @@ getrandom = "0.3" http = "1" serde = { version = "1", features = ["derive"] } serde_json = { version = "1", features = ["raw_value"] } +sha2 = "0.10" tangle_crypto = { path = "../tangle_crypto" } tangle_groups = { path = "../tangle_groups" } tangle_protocol = { path = "../tangle_protocol" } diff --git a/crates/tangle_runtime/src/backup.rs b/crates/tangle_runtime/src/backup.rs @@ -0,0 +1,815 @@ +#![forbid(unsafe_code)] + +use crate::{TANGLE_RELAY_VERSION, config::TenantRuntimeConfig, load_tangle_host_runtime_config}; +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; +use std::{ + fs, + io::Read, + path::{Component, Path, PathBuf}, + time::{SystemTime, UNIX_EPOCH}, +}; +use tangle_store_pocket::{PocketStoreConfig, PocketStoreHandle}; + +pub const TANGLE_SPEC_VERSION: &str = "tangle_v1_mvp"; +const BACKUP_SCHEMA: &str = "tangle.tenant.backup.v1"; +const CHECKSUM_SCHEMA: &str = "tangle.tenant.checksums.v1"; +const POCKET_STORE_DIR: &str = "pocket_store"; +const REDACTED_TENANT_CONFIG: &str = "tenant_config.redacted.json"; +const BACKUP_MANIFEST: &str = "backup_manifest.json"; +const CHECKSUM_MANIFEST: &str = "checksums.json"; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct TenantBackupRequest<'a> { + pub config_path: &'a str, + pub tenant_id: &'a str, + pub output: &'a str, + pub include_secrets: bool, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct TenantRestoreRequest<'a> { + pub config_path: &'a str, + pub tenant_id: &'a str, + pub input: &'a str, + pub target_data_dir: &'a str, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct TenantBackupReport { + pub tenant_id: String, + pub output_path: String, + pub manifest_path: String, + pub checksum_manifest_path: String, + pub checksum_file_count: usize, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct TenantRestoreReport { + pub tenant_id: String, + pub input_path: String, + pub target_data_dir: String, + pub restored_file_count: usize, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub(crate) struct ChecksumManifest { + schema: String, + algorithm: String, + files: Vec<ChecksumFile>, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub(crate) struct ChecksumFile { + path: String, + sha256: String, + size_bytes: u64, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +struct TenantBackupManifest { + schema: String, + tangle_version: String, + tangle_spec_version: String, + created_at: u64, + source: TenantManifestSource, + store: TenantStoreManifest, + redacted_tenant_config_path: String, + checksum_manifest_path: String, + checksum_manifest_sha256: String, + checksum_file_count: usize, + includes_secrets: bool, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub(crate) struct TenantManifestSource { + tenant_id: String, + tenant_schema: String, + host: String, + relay_url: String, + relay_self_pubkey: Option<String>, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +struct TenantStoreManifest { + source_data_directory: String, + snapshot_path: String, +} + +pub fn backup_tenant(request: TenantBackupRequest<'_>) -> Result<TenantBackupReport, String> { + if request.include_secrets { + return Err("including tenant secrets in backups is unsupported".to_owned()); + } + let tenant = load_selected_tenant_config(request.config_path, request.tenant_id)?; + if !tenant.backup_export().backup_enabled() { + return Err(format!( + "tenant backup is disabled for {}", + tenant.tenant_id().as_str() + )); + } + let output = PathBuf::from(request.output); + prepare_empty_directory(&output, "backup output")?; + let snapshot_path = output.join(POCKET_STORE_DIR); + copy_directory(tenant.pocket_config().data_directory(), &snapshot_path)?; + let redacted_path = output.join(REDACTED_TENANT_CONFIG); + write_json_file(&redacted_path, &redacted_tenant_config_value(&tenant)?)?; + let checksums = ChecksumManifest { + schema: CHECKSUM_SCHEMA.to_owned(), + algorithm: "sha256".to_owned(), + files: collect_checksums(&output)?, + }; + let checksum_path = output.join(CHECKSUM_MANIFEST); + write_json_file(&checksum_path, &checksums)?; + let (checksum_manifest_sha256, _) = file_sha256_hex(&checksum_path)?; + let source = tenant_manifest_source(&tenant)?; + let manifest = TenantBackupManifest { + schema: BACKUP_SCHEMA.to_owned(), + tangle_version: TANGLE_RELAY_VERSION.to_owned(), + tangle_spec_version: TANGLE_SPEC_VERSION.to_owned(), + created_at: now_unix_seconds()?, + source, + store: TenantStoreManifest { + source_data_directory: tenant + .pocket_config() + .data_directory() + .display() + .to_string(), + snapshot_path: POCKET_STORE_DIR.to_owned(), + }, + redacted_tenant_config_path: REDACTED_TENANT_CONFIG.to_owned(), + checksum_manifest_path: CHECKSUM_MANIFEST.to_owned(), + checksum_manifest_sha256, + checksum_file_count: checksums.files.len(), + includes_secrets: false, + }; + let manifest_path = output.join(BACKUP_MANIFEST); + write_json_file(&manifest_path, &manifest)?; + Ok(TenantBackupReport { + tenant_id: tenant.tenant_id().as_str().to_owned(), + output_path: output.display().to_string(), + manifest_path: manifest_path.display().to_string(), + checksum_manifest_path: checksum_path.display().to_string(), + checksum_file_count: checksums.files.len(), + }) +} + +pub fn restore_tenant(request: TenantRestoreRequest<'_>) -> Result<TenantRestoreReport, String> { + let tenant = load_selected_tenant_config(request.config_path, request.tenant_id)?; + if !tenant.backup_export().backup_enabled() { + return Err(format!( + "tenant backup is disabled for {}", + tenant.tenant_id().as_str() + )); + } + let input = PathBuf::from(request.input); + let manifest = read_backup_manifest(&input.join(BACKUP_MANIFEST))?; + if manifest.schema != BACKUP_SCHEMA { + return Err(format!("unsupported backup schema: {}", manifest.schema)); + } + if manifest.source.tenant_id != tenant.tenant_id().as_str() { + return Err(format!( + "backup tenant {} does not match requested tenant {}", + manifest.source.tenant_id, + tenant.tenant_id().as_str() + )); + } + let checksum_path = input.join(&manifest.checksum_manifest_path); + let (actual_checksum_manifest_sha256, _) = file_sha256_hex(&checksum_path)?; + if actual_checksum_manifest_sha256 != manifest.checksum_manifest_sha256 { + return Err("backup checksum manifest digest mismatch".to_owned()); + } + let checksum_manifest = read_checksum_manifest(&checksum_path)?; + verify_checksums(&input, &checksum_manifest.files)?; + let target = PathBuf::from(request.target_data_dir); + prepare_empty_directory(&target, "restore target data directory")?; + copy_directory(&input.join(&manifest.store.snapshot_path), &target)?; + let restored_config = PocketStoreConfig::new(&target, tenant.pocket_config().sync_policy()) + .map_err(|error| error.to_string())?; + let restored = PocketStoreHandle::open(&restored_config).map_err(|error| error.to_string())?; + let restored_file_count = collect_files(&target)?.len(); + restored.scan_events().map_err(|error| error.to_string())?; + Ok(TenantRestoreReport { + tenant_id: tenant.tenant_id().as_str().to_owned(), + input_path: input.display().to_string(), + target_data_dir: target.display().to_string(), + restored_file_count, + }) +} + +pub(crate) fn load_selected_tenant_config( + config_path: &str, + tenant_id: &str, +) -> Result<TenantRuntimeConfig, String> { + let config = load_tangle_host_runtime_config(config_path).map_err(|error| error.to_string())?; + config + .tenants() + .iter() + .find(|tenant| tenant.tenant_id().as_str() == tenant_id) + .cloned() + .ok_or_else(|| format!("tenant not found: {tenant_id}")) +} + +pub(crate) fn tenant_manifest_source( + tenant: &TenantRuntimeConfig, +) -> Result<TenantManifestSource, String> { + Ok(TenantManifestSource { + tenant_id: tenant.tenant_id().as_str().to_owned(), + tenant_schema: tenant.tenant_schema().as_str().to_owned(), + host: tenant.host().as_str().to_owned(), + relay_url: tenant.relay_url().as_str().to_owned(), + relay_self_pubkey: tenant + .relay_self_pubkey() + .map_err(|error| error.to_string())? + .map(|pubkey| pubkey.as_str().to_owned()), + }) +} + +pub(crate) fn redacted_tenant_config_value( + tenant: &TenantRuntimeConfig, +) -> Result<serde_json::Value, String> { + Ok(serde_json::json!({ + "tenant_id": tenant.tenant_id().as_str(), + "tenant_schema": tenant.tenant_schema().as_str(), + "host": tenant.host().as_str(), + "relay_url": tenant.relay_url().as_str(), + "inactive": tenant.inactive(), + "info": { + "name": tenant.info().name(), + "description": tenant.info().description(), + "contact": tenant.info().contact(), + "icon": tenant.info().icon() + }, + "pocket": { + "data_directory": tenant.pocket_config().data_directory().display().to_string(), + "sync_policy": format!("{:?}", tenant.pocket_config().sync_policy()) + }, + "groups": { + "enabled": tenant.groups().enabled(), + "relay_secret": "<redacted>", + "relay_self": tenant.relay_self_pubkey().map_err(|error| error.to_string())?.map(|pubkey| pubkey.as_str().to_owned()) + }, + "backup_export": { + "backup_enabled": tenant.backup_export().backup_enabled(), + "export_enabled": tenant.backup_export().export_enabled() + } + })) +} + +pub(crate) fn write_json_file<T>(path: &Path, value: &T) -> Result<(), String> +where + T: Serialize, +{ + if let Some(parent) = path.parent() + && !parent.as_os_str().is_empty() + { + fs::create_dir_all(parent) + .map_err(|error| format!("failed to create {}: {error}", parent.display()))?; + } + let raw = serde_json::to_vec_pretty(value).map_err(|error| error.to_string())?; + fs::write(path, raw).map_err(|error| format!("failed to write {}: {error}", path.display())) +} + +pub(crate) fn file_sha256_hex(path: &Path) -> Result<(String, u64), String> { + let mut file = fs::File::open(path) + .map_err(|error| format!("failed to open {}: {error}", path.display()))?; + let mut hasher = Sha256::new(); + let mut size = 0_u64; + let mut buffer = [0_u8; 16 * 1024]; + loop { + let read = file + .read(&mut buffer) + .map_err(|error| format!("failed to read {}: {error}", path.display()))?; + if read == 0 { + break; + } + hasher.update(&buffer[..read]); + size = size + .checked_add(u64::try_from(read).expect("read size fits u64")) + .ok_or_else(|| format!("file {} exceeds u64 size", path.display()))?; + } + Ok((lower_hex(&hasher.finalize()), size)) +} + +pub(crate) fn now_unix_seconds() -> Result<u64, String> { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|duration| duration.as_secs()) + .map_err(|error| error.to_string()) +} + +pub(crate) fn collect_files(root: &Path) -> Result<Vec<PathBuf>, String> { + let mut files = Vec::new(); + collect_files_into(root, root, &mut files)?; + files.sort(); + Ok(files) +} + +pub(crate) fn lower_hex(bytes: &[u8]) -> String { + const HEX: &[u8; 16] = b"0123456789abcdef"; + let mut output = String::with_capacity(bytes.len() * 2); + for byte in bytes { + output.push(char::from(HEX[usize::from(byte >> 4)])); + output.push(char::from(HEX[usize::from(byte & 0x0f)])); + } + output +} + +fn prepare_empty_directory(path: &Path, label: &str) -> Result<(), String> { + if path.exists() { + if !path.is_dir() { + return Err(format!("{label} is not a directory: {}", path.display())); + } + if fs::read_dir(path) + .map_err(|error| format!("failed to read {}: {error}", path.display()))? + .next() + .transpose() + .map_err(|error| format!("failed to read {}: {error}", path.display()))? + .is_some() + { + return Err(format!("{label} must be empty: {}", path.display())); + } + } + fs::create_dir_all(path) + .map_err(|error| format!("failed to create {}: {error}", path.display())) +} + +fn copy_directory(source: &Path, target: &Path) -> Result<(), String> { + if !source.is_dir() { + return Err(format!( + "source directory does not exist: {}", + source.display() + )); + } + fs::create_dir_all(target) + .map_err(|error| format!("failed to create {}: {error}", target.display()))?; + let mut entries = fs::read_dir(source) + .map_err(|error| format!("failed to read {}: {error}", source.display()))? + .collect::<Result<Vec<_>, _>>() + .map_err(|error| format!("failed to read {}: {error}", source.display()))?; + entries.sort_by_key(|entry| entry.path()); + for entry in entries { + let source_path = entry.path(); + let target_path = target.join(entry.file_name()); + let metadata = fs::symlink_metadata(&source_path) + .map_err(|error| format!("failed to stat {}: {error}", source_path.display()))?; + let file_type = metadata.file_type(); + if file_type.is_symlink() { + return Err(format!( + "symlink is not supported in backup bundles: {}", + source_path.display() + )); + } + if file_type.is_dir() { + copy_directory(&source_path, &target_path)?; + } else if file_type.is_file() { + fs::copy(&source_path, &target_path).map_err(|error| { + format!( + "failed to copy {} to {}: {error}", + source_path.display(), + target_path.display() + ) + })?; + } else { + return Err(format!( + "special file is not supported in backup bundles: {}", + source_path.display() + )); + } + } + Ok(()) +} + +fn collect_checksums(root: &Path) -> Result<Vec<ChecksumFile>, String> { + collect_files(root)? + .into_iter() + .map(|path| { + let relative = path + .strip_prefix(root) + .map_err(|error| error.to_string()) + .and_then(relative_path_string)?; + let (sha256, size_bytes) = file_sha256_hex(&path)?; + Ok(ChecksumFile { + path: relative, + sha256, + size_bytes, + }) + }) + .collect() +} + +fn collect_files_into(root: &Path, path: &Path, files: &mut Vec<PathBuf>) -> Result<(), String> { + if !path.exists() { + return Ok(()); + } + let metadata = fs::symlink_metadata(path) + .map_err(|error| format!("failed to stat {}: {error}", path.display()))?; + let file_type = metadata.file_type(); + if file_type.is_symlink() { + return Err(format!( + "symlink is not supported in backup bundles: {}", + path.display() + )); + } + if file_type.is_file() { + files.push(path.to_path_buf()); + return Ok(()); + } + if !file_type.is_dir() { + return Err(format!( + "special file is not supported in backup bundles: {}", + path.display() + )); + } + let mut entries = fs::read_dir(path) + .map_err(|error| format!("failed to read {}: {error}", path.display()))? + .collect::<Result<Vec<_>, _>>() + .map_err(|error| format!("failed to read {}: {error}", path.display()))?; + entries.sort_by_key(|entry| entry.path()); + for entry in entries { + let child = entry.path(); + if child == root.join(BACKUP_MANIFEST) || child == root.join(CHECKSUM_MANIFEST) { + continue; + } + collect_files_into(root, &child, files)?; + } + Ok(()) +} + +fn relative_path_string(path: &Path) -> Result<String, String> { + let mut parts = Vec::new(); + for component in path.components() { + match component { + Component::Normal(part) => { + parts.push( + part.to_str() + .ok_or_else(|| format!("path is not UTF-8: {}", path.display()))? + .to_owned(), + ); + } + Component::CurDir => {} + Component::ParentDir | Component::RootDir | Component::Prefix(_) => { + return Err(format!("path is not relative: {}", path.display())); + } + } + } + Ok(parts.join("/")) +} + +fn read_backup_manifest(path: &Path) -> Result<TenantBackupManifest, String> { + let raw = fs::read_to_string(path) + .map_err(|error| format!("failed to read {}: {error}", path.display()))?; + serde_json::from_str(&raw).map_err(|error| format!("backup manifest JSON is invalid: {error}")) +} + +fn read_checksum_manifest(path: &Path) -> Result<ChecksumManifest, String> { + let raw = fs::read_to_string(path) + .map_err(|error| format!("failed to read {}: {error}", path.display()))?; + let manifest: ChecksumManifest = serde_json::from_str(&raw) + .map_err(|error| format!("checksum manifest JSON is invalid: {error}"))?; + if manifest.schema != CHECKSUM_SCHEMA { + return Err(format!("unsupported checksum schema: {}", manifest.schema)); + } + if manifest.algorithm != "sha256" { + return Err(format!( + "unsupported checksum algorithm: {}", + manifest.algorithm + )); + } + Ok(manifest) +} + +fn verify_checksums(root: &Path, files: &[ChecksumFile]) -> Result<(), String> { + for expected in files { + let path = root.join(&expected.path); + let (sha256, size_bytes) = file_sha256_hex(&path)?; + if sha256 != expected.sha256 || size_bytes != expected.size_bytes { + return Err(format!("backup checksum mismatch for {}", expected.path)); + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::{TenantBackupRequest, TenantRestoreRequest, backup_tenant, restore_tenant}; + use crate::{ + backup::{BACKUP_MANIFEST, CHECKSUM_MANIFEST, REDACTED_TENANT_CONFIG}, + pocket_conversion::tangle_event_to_pocket, + }; + use serde_json::{Value, json}; + use std::path::{Path, PathBuf}; + use tangle_protocol::Tag; + use tangle_store_pocket::{PocketStoreConfig, PocketStoreHandle, PocketSyncPolicy}; + use tangle_test_support::{FixtureKey, tangle_v2_event}; + + #[test] + fn backup_creates_manifest_redacted_config_checksum_and_store_snapshot() { + let fixture = BackupFixture::new("backup-create"); + fixture.write_config(); + fixture.store_event("alpha event", 1_714_300_001); + let report = backup_tenant(TenantBackupRequest { + config_path: fixture.host_config.to_str().expect("config"), + tenant_id: "alpha", + output: fixture.backup_dir.to_str().expect("backup"), + include_secrets: false, + }) + .expect("backup"); + + assert_eq!(report.tenant_id, "alpha"); + let manifest = read_json(&fixture.backup_dir.join(BACKUP_MANIFEST)); + assert_eq!(manifest["schema"], "tangle.tenant.backup.v1"); + assert_eq!(manifest["source"]["tenant_id"], "alpha"); + assert_eq!(manifest["includes_secrets"], false); + assert!(manifest["checksum_file_count"].as_u64().expect("count") >= 3); + assert!( + fixture + .backup_dir + .join("pocket_store") + .join("event.map") + .exists() + ); + assert!( + fixture + .backup_dir + .join("pocket_store") + .join("lmdb") + .join("data.mdb") + .exists() + ); + assert!(fixture.backup_dir.join(CHECKSUM_MANIFEST).exists()); + let redacted = fs_read(&fixture.backup_dir.join(REDACTED_TENANT_CONFIG)); + assert!(redacted.contains("\"relay_secret\": \"<redacted>\"")); + assert!( + !redacted.contains("7777777777777777777777777777777777777777777777777777777777777777") + ); + + fixture.cleanup(); + } + + #[test] + fn backup_rejects_secret_inclusion_requests() { + let fixture = BackupFixture::new("backup-secrets"); + fixture.write_config(); + fixture.store_event("alpha event", 1_714_300_011); + let error = backup_tenant(TenantBackupRequest { + config_path: fixture.host_config.to_str().expect("config"), + tenant_id: "alpha", + output: fixture.backup_dir.to_str().expect("backup"), + include_secrets: true, + }) + .expect_err("secrets unsupported"); + + assert_eq!(error, "including tenant secrets in backups is unsupported"); + + fixture.cleanup(); + } + + #[test] + fn restore_verifies_checksums_and_recreates_usable_store() { + let fixture = BackupFixture::new("backup-restore"); + fixture.write_config(); + fixture.store_event("alpha event", 1_714_300_021); + backup_tenant(TenantBackupRequest { + config_path: fixture.host_config.to_str().expect("config"), + tenant_id: "alpha", + output: fixture.backup_dir.to_str().expect("backup"), + include_secrets: false, + }) + .expect("backup"); + let report = restore_tenant(TenantRestoreRequest { + config_path: fixture.host_config.to_str().expect("config"), + tenant_id: "alpha", + input: fixture.backup_dir.to_str().expect("backup"), + target_data_dir: fixture.restore_dir.to_str().expect("restore"), + }) + .expect("restore"); + let restored_config = + PocketStoreConfig::new(&fixture.restore_dir, PocketSyncPolicy::FlushOnShutdown) + .expect("config"); + let restored = PocketStoreHandle::open(&restored_config).expect("open"); + let events = restored.scan_events().expect("scan"); + + assert_eq!(report.tenant_id, "alpha"); + assert_eq!(events.len(), 1); + assert_eq!(event_content(events[0].event()), "alpha event"); + + fixture.cleanup(); + } + + #[test] + fn restore_refuses_non_empty_targets_and_corrupt_backup_files() { + let fixture = BackupFixture::new("backup-corrupt"); + fixture.write_config(); + fixture.store_event("alpha event", 1_714_300_031); + backup_tenant(TenantBackupRequest { + config_path: fixture.host_config.to_str().expect("config"), + tenant_id: "alpha", + output: fixture.backup_dir.to_str().expect("backup"), + include_secrets: false, + }) + .expect("backup"); + std::fs::create_dir_all(&fixture.restore_dir).expect("restore dir"); + std::fs::write(fixture.restore_dir.join("existing"), b"present").expect("existing"); + let dirty_error = restore_tenant(TenantRestoreRequest { + config_path: fixture.host_config.to_str().expect("config"), + tenant_id: "alpha", + input: fixture.backup_dir.to_str().expect("backup"), + target_data_dir: fixture.restore_dir.to_str().expect("restore"), + }) + .expect_err("dirty target"); + + assert!(dirty_error.contains("restore target data directory must be empty")); + std::fs::remove_dir_all(&fixture.restore_dir).expect("clean target"); + std::fs::write( + fixture.backup_dir.join("pocket_store").join("event.map"), + b"corrupt", + ) + .expect("corrupt"); + let corrupt_error = restore_tenant(TenantRestoreRequest { + config_path: fixture.host_config.to_str().expect("config"), + tenant_id: "alpha", + input: fixture.backup_dir.to_str().expect("backup"), + target_data_dir: fixture.restore_dir.to_str().expect("restore"), + }) + .expect_err("corrupt"); + + assert!(corrupt_error.contains("backup checksum mismatch")); + + fixture.cleanup(); + } + + struct BackupFixture { + root: PathBuf, + host_config: PathBuf, + alpha_store: PathBuf, + backup_dir: PathBuf, + restore_dir: PathBuf, + } + + impl BackupFixture { + fn new(name: &str) -> Self { + let root = temp_root(name); + let _ = std::fs::remove_dir_all(&root); + Self { + host_config: root.join("host.json"), + alpha_store: root.join("alpha-pocket"), + backup_dir: root.join("backup"), + restore_dir: root.join("restore-pocket"), + root, + } + } + + fn write_config(&self) { + std::fs::create_dir_all(self.root.join("tenants")).expect("tenants"); + std::fs::write( + &self.host_config, + json!({ + "listen_addr": "127.0.0.1:0", + "tenant_config_dir": "tenants" + }) + .to_string(), + ) + .expect("host"); + std::fs::write( + self.root.join("tenants").join("alpha.json"), + tenant_config_json("alpha", "alpha.test", &self.alpha_store).to_string(), + ) + .expect("alpha tenant"); + std::fs::write( + self.root.join("tenants").join("beta.json"), + tenant_config_json("beta", "beta.test", &self.root.join("beta-pocket")).to_string(), + ) + .expect("beta tenant"); + } + + fn store_event(&self, content: &str, created_at: u64) { + let config = + PocketStoreConfig::new(&self.alpha_store, PocketSyncPolicy::FlushOnShutdown) + .expect("config"); + let handle = PocketStoreHandle::open(&config).expect("open"); + let event = tangle_v2_event( + FixtureKey::Member, + created_at, + 1, + vec![Tag::from_parts("t", &["alpha"]).expect("tag")], + content, + ) + .expect("event"); + let pocket = tangle_event_to_pocket(&event).expect("pocket"); + handle.store_event(&pocket).expect("store"); + handle.sync().expect("sync"); + } + + fn cleanup(self) { + let _ = std::fs::remove_dir_all(self.root); + } + } + + fn tenant_config_json(tenant_id: &str, host: &str, store: &Path) -> Value { + let relay_secret = if tenant_id == "alpha" { + "7777777777777777777777777777777777777777777777777777777777777777" + } else { + "8888888888888888888888888888888888888888888888888888888888888888" + }; + json!({ + "tenant_id": tenant_id, + "tenant_schema": tenant_id, + "host": host, + "relay_url": format!("wss://{host}"), + "info": {"name": format!("{tenant_id} relay")}, + "pocket": { + "data_directory": store, + "sync_policy": "flush_on_shutdown" + }, + "pocket_query": { + "allow_scraping": false, + "allow_scrape_if_limited_to": 100, + "allow_scrape_if_max_seconds": 3600 + }, + "groups": { + "enabled": true, + "canonical_relay_url": format!("wss://{host}"), + "relay_secret": relay_secret, + "owner_pubkeys": [FixtureKey::Owner.public_key().as_str()], + "admin_pubkeys": [FixtureKey::Admin.public_key().as_str()] + }, + "auth": { + "challenge_ttl_seconds": 300, + "created_at_skew_seconds": 600 + }, + "limits": { + "max_message_length": 1048576, + "max_subid_length": 64, + "max_subscriptions_per_connection": 64, + "max_filters_per_request": 10, + "max_tag_values_per_filter": 100, + "max_query_complexity": 2048, + "max_limit": 500, + "default_limit": 100, + "max_event_tags": 200, + "max_content_length": 65536, + "broadcast_channel_capacity": 16, + "per_connection_outbound_queue": 16 + }, + "rate_limits": { + "auth": { + "per_ip": {"window_seconds": 60, "max_hits": 120}, + "per_pubkey": {"window_seconds": 60, "max_hits": 30}, + "failures": {"window_seconds": 300, "max_hits": 5}, + "failures_per_ip": {"window_seconds": 300, "max_hits": 20} + }, + "event": { + "per_ip": {"window_seconds": 60, "max_hits": 600}, + "per_pubkey": {"window_seconds": 60, "max_hits": 120}, + "per_kind": {"window_seconds": 60, "max_hits": 1000} + }, + "group": { + "write_per_ip": {"window_seconds": 60, "max_hits": 300}, + "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, + "write_per_group": {"window_seconds": 60, "max_hits": 90}, + "write_per_kind": {"window_seconds": 60, "max_hits": 300}, + "join_flow": {"window_seconds": 300, "max_hits": 10}, + "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30} + }, + "req": { + "per_ip": {"window_seconds": 60, "max_hits": 600}, + "per_connection": {"window_seconds": 60, "max_hits": 120}, + "per_pubkey": {"window_seconds": 60, "max_hits": 240}, + "per_group": {"window_seconds": 60, "max_hits": 240}, + "per_kind": {"window_seconds": 60, "max_hits": 500}, + "broad": {"window_seconds": 60, "max_hits": 30} + }, + "count": { + "per_ip": {"window_seconds": 60, "max_hits": 300}, + "per_connection": {"window_seconds": 60, "max_hits": 60}, + "per_pubkey": {"window_seconds": 60, "max_hits": 120}, + "per_group": {"window_seconds": 60, "max_hits": 120}, + "per_kind": {"window_seconds": 60, "max_hits": 240}, + "broad": {"window_seconds": 60, "max_hits": 20} + } + }, + "backup_export": { + "backup_enabled": true, + "export_enabled": true + } + }) + } + + fn read_json(path: &Path) -> Value { + serde_json::from_str(&fs_read(path)).expect("json") + } + + fn fs_read(path: &Path) -> String { + std::fs::read_to_string(path).expect("read") + } + + fn event_content(event: &tangle_store_pocket::PocketEvent) -> String { + std::str::from_utf8(event.content()) + .expect("utf8") + .to_owned() + } + + fn temp_root(name: &str) -> PathBuf { + std::env::temp_dir().join(format!("tangle-runtime-{name}-{}", std::process::id())) + } +} diff --git a/crates/tangle_runtime/src/export.rs b/crates/tangle_runtime/src/export.rs @@ -0,0 +1,382 @@ +#![forbid(unsafe_code)] + +use crate::{ + TANGLE_RELAY_VERSION, + backup::{ + TANGLE_SPEC_VERSION, file_sha256_hex, load_selected_tenant_config, now_unix_seconds, + tenant_manifest_source, write_json_file, + }, +}; +use serde::{Deserialize, Serialize}; +use std::{ + fs::{self, File}, + io::Write, + path::{Path, PathBuf}, + str, +}; +use tangle_store_pocket::{PocketEvent, PocketStoreHandle}; + +const EXPORT_SCHEMA: &str = "tangle.tenant.export.v1"; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct TenantExportRequest<'a> { + pub config_path: &'a str, + pub tenant_id: &'a str, + pub output: &'a str, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct TenantExportReport { + pub tenant_id: String, + pub output_path: String, + pub manifest_path: String, + pub event_count: u64, + pub sha256: String, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +struct TenantExportManifest { + schema: String, + tangle_version: String, + tangle_spec_version: String, + created_at: u64, + source: crate::backup::TenantManifestSource, + events: TenantExportEventsManifest, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +struct TenantExportEventsManifest { + path: String, + count: u64, + sha256: String, + size_bytes: u64, +} + +pub fn export_tenant(request: TenantExportRequest<'_>) -> Result<TenantExportReport, String> { + let tenant = load_selected_tenant_config(request.config_path, request.tenant_id)?; + if !tenant.backup_export().export_enabled() { + return Err(format!( + "tenant export is disabled for {}", + tenant.tenant_id().as_str() + )); + } + let output = PathBuf::from(request.output); + if output.exists() { + return Err(format!( + "export output already exists: {}", + output.display() + )); + } + let manifest_path = export_manifest_path(&output)?; + if manifest_path.exists() { + return Err(format!( + "export manifest already exists: {}", + manifest_path.display() + )); + } + if let Some(parent) = output.parent() + && !parent.as_os_str().is_empty() + { + fs::create_dir_all(parent) + .map_err(|error| format!("failed to create {}: {error}", parent.display()))?; + } + let handle = + PocketStoreHandle::open(tenant.pocket_config()).map_err(|error| error.to_string())?; + let events = handle.scan_events().map_err(|error| error.to_string())?; + let mut file = File::create(&output) + .map_err(|error| format!("failed to create {}: {error}", output.display()))?; + for stored in &events { + let raw = serde_json::to_string(&pocket_event_json(stored.event())?) + .map_err(|error| error.to_string())?; + file.write_all(raw.as_bytes()) + .map_err(|error| format!("failed to write {}: {error}", output.display()))?; + file.write_all(b"\n") + .map_err(|error| format!("failed to write {}: {error}", output.display()))?; + } + file.sync_all() + .map_err(|error| format!("failed to sync {}: {error}", output.display()))?; + drop(file); + let (sha256, size_bytes) = file_sha256_hex(&output)?; + let event_count = u64::try_from(events.len()).expect("event count fits u64"); + let manifest = TenantExportManifest { + schema: EXPORT_SCHEMA.to_owned(), + tangle_version: TANGLE_RELAY_VERSION.to_owned(), + tangle_spec_version: TANGLE_SPEC_VERSION.to_owned(), + created_at: now_unix_seconds()?, + source: tenant_manifest_source(&tenant)?, + events: TenantExportEventsManifest { + path: output + .file_name() + .and_then(|name| name.to_str()) + .ok_or_else(|| { + format!("export output has no UTF-8 file name: {}", output.display()) + })? + .to_owned(), + count: event_count, + sha256: sha256.clone(), + size_bytes, + }, + }; + write_json_file(&manifest_path, &manifest)?; + Ok(TenantExportReport { + tenant_id: tenant.tenant_id().as_str().to_owned(), + output_path: output.display().to_string(), + manifest_path: manifest_path.display().to_string(), + event_count, + sha256, + }) +} + +pub fn export_manifest_path(output: &Path) -> Result<PathBuf, String> { + let file_name = output + .file_name() + .and_then(|name| name.to_str()) + .ok_or_else(|| format!("export output has no UTF-8 file name: {}", output.display()))?; + Ok(output.with_file_name(format!("{file_name}.manifest.json"))) +} + +fn pocket_event_json(event: &PocketEvent) -> Result<serde_json::Value, String> { + let tags = event + .tags() + .map_err(|error| error.to_string())? + .iter() + .map(|tag| { + tag.map(|value| { + str::from_utf8(value) + .map(str::to_owned) + .map_err(|error| error.to_string()) + }) + .collect::<Result<Vec<_>, _>>() + }) + .collect::<Result<Vec<_>, _>>()?; + let content = str::from_utf8(event.content()).map_err(|error| error.to_string())?; + Ok(serde_json::json!({ + "id": event.id().as_hex_string(), + "pubkey": event.pubkey().as_hex_string(), + "created_at": event.created_at().as_u64(), + "kind": event.kind().as_u16(), + "tags": tags, + "content": content, + "sig": event.sig().to_string() + })) +} + +#[cfg(test)] +mod tests { + use super::{TenantExportRequest, export_manifest_path, export_tenant}; + use crate::pocket_conversion::tangle_event_to_pocket; + use serde_json::{Value, json}; + use std::path::{Path, PathBuf}; + use tangle_protocol::Tag; + use tangle_store_pocket::{PocketStoreConfig, PocketStoreHandle, PocketSyncPolicy}; + use tangle_test_support::{FixtureKey, tangle_v2_event}; + + #[test] + fn export_writes_selected_tenant_jsonl_and_manifest() { + let fixture = ExportFixture::new("export-selected"); + fixture.write_config(); + fixture.store_event(&fixture.alpha_store, "alpha note", 1_714_400_001); + fixture.store_event(&fixture.beta_store, "beta note", 1_714_400_002); + let report = export_tenant(TenantExportRequest { + config_path: fixture.host_config.to_str().expect("config"), + tenant_id: "alpha", + output: fixture.output.to_str().expect("output"), + }) + .expect("export"); + let jsonl = std::fs::read_to_string(&fixture.output).expect("jsonl"); + let manifest = read_json(&export_manifest_path(&fixture.output).expect("manifest path")); + + assert_eq!(report.tenant_id, "alpha"); + assert_eq!(report.event_count, 1); + assert!(jsonl.contains("alpha note")); + assert!(!jsonl.contains("beta note")); + assert_eq!(manifest["schema"], "tangle.tenant.export.v1"); + assert_eq!(manifest["source"]["tenant_id"], "alpha"); + assert_eq!(manifest["events"]["count"], 1); + assert_eq!(manifest["events"]["sha256"], report.sha256); + + fixture.cleanup(); + } + + #[test] + fn export_refuses_existing_outputs() { + let fixture = ExportFixture::new("export-existing"); + fixture.write_config(); + std::fs::create_dir_all(fixture.output.parent().expect("parent")).expect("parent"); + std::fs::write(&fixture.output, b"exists").expect("output"); + let error = export_tenant(TenantExportRequest { + config_path: fixture.host_config.to_str().expect("config"), + tenant_id: "alpha", + output: fixture.output.to_str().expect("output"), + }) + .expect_err("existing"); + + assert!(error.contains("export output already exists")); + + fixture.cleanup(); + } + + struct ExportFixture { + root: PathBuf, + host_config: PathBuf, + alpha_store: PathBuf, + beta_store: PathBuf, + output: PathBuf, + } + + impl ExportFixture { + fn new(name: &str) -> Self { + let root = temp_root(name); + let _ = std::fs::remove_dir_all(&root); + Self { + host_config: root.join("host.json"), + alpha_store: root.join("alpha-pocket"), + beta_store: root.join("beta-pocket"), + output: root.join("exports").join("alpha.jsonl"), + root, + } + } + + fn write_config(&self) { + std::fs::create_dir_all(self.root.join("tenants")).expect("tenants"); + std::fs::write( + &self.host_config, + json!({ + "listen_addr": "127.0.0.1:0", + "tenant_config_dir": "tenants" + }) + .to_string(), + ) + .expect("host"); + std::fs::write( + self.root.join("tenants").join("alpha.json"), + tenant_config_json("alpha", "alpha.test", &self.alpha_store).to_string(), + ) + .expect("alpha tenant"); + std::fs::write( + self.root.join("tenants").join("beta.json"), + tenant_config_json("beta", "beta.test", &self.beta_store).to_string(), + ) + .expect("beta tenant"); + } + + fn store_event(&self, store: &Path, content: &str, created_at: u64) { + let config = + PocketStoreConfig::new(store, PocketSyncPolicy::FlushOnShutdown).expect("config"); + let handle = PocketStoreHandle::open(&config).expect("open"); + let event = tangle_v2_event( + FixtureKey::Member, + created_at, + 1, + vec![Tag::from_parts("t", &[content]).expect("tag")], + content, + ) + .expect("event"); + let pocket = tangle_event_to_pocket(&event).expect("pocket"); + handle.store_event(&pocket).expect("store"); + handle.sync().expect("sync"); + } + + fn cleanup(self) { + let _ = std::fs::remove_dir_all(self.root); + } + } + + fn tenant_config_json(tenant_id: &str, host: &str, store: &Path) -> Value { + let relay_secret = if tenant_id == "alpha" { + "7777777777777777777777777777777777777777777777777777777777777777" + } else { + "8888888888888888888888888888888888888888888888888888888888888888" + }; + json!({ + "tenant_id": tenant_id, + "tenant_schema": tenant_id, + "host": host, + "relay_url": format!("wss://{host}"), + "info": {"name": format!("{tenant_id} relay")}, + "pocket": { + "data_directory": store, + "sync_policy": "flush_on_shutdown" + }, + "pocket_query": { + "allow_scraping": false, + "allow_scrape_if_limited_to": 100, + "allow_scrape_if_max_seconds": 3600 + }, + "groups": { + "enabled": true, + "canonical_relay_url": format!("wss://{host}"), + "relay_secret": relay_secret, + "owner_pubkeys": [FixtureKey::Owner.public_key().as_str()], + "admin_pubkeys": [FixtureKey::Admin.public_key().as_str()] + }, + "auth": { + "challenge_ttl_seconds": 300, + "created_at_skew_seconds": 600 + }, + "limits": { + "max_message_length": 1048576, + "max_subid_length": 64, + "max_subscriptions_per_connection": 64, + "max_filters_per_request": 10, + "max_tag_values_per_filter": 100, + "max_query_complexity": 2048, + "max_limit": 500, + "default_limit": 100, + "max_event_tags": 200, + "max_content_length": 65536, + "broadcast_channel_capacity": 16, + "per_connection_outbound_queue": 16 + }, + "rate_limits": { + "auth": { + "per_ip": {"window_seconds": 60, "max_hits": 120}, + "per_pubkey": {"window_seconds": 60, "max_hits": 30}, + "failures": {"window_seconds": 300, "max_hits": 5}, + "failures_per_ip": {"window_seconds": 300, "max_hits": 20} + }, + "event": { + "per_ip": {"window_seconds": 60, "max_hits": 600}, + "per_pubkey": {"window_seconds": 60, "max_hits": 120}, + "per_kind": {"window_seconds": 60, "max_hits": 1000} + }, + "group": { + "write_per_ip": {"window_seconds": 60, "max_hits": 300}, + "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, + "write_per_group": {"window_seconds": 60, "max_hits": 90}, + "write_per_kind": {"window_seconds": 60, "max_hits": 300}, + "join_flow": {"window_seconds": 300, "max_hits": 10}, + "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30} + }, + "req": { + "per_ip": {"window_seconds": 60, "max_hits": 600}, + "per_connection": {"window_seconds": 60, "max_hits": 120}, + "per_pubkey": {"window_seconds": 60, "max_hits": 240}, + "per_group": {"window_seconds": 60, "max_hits": 240}, + "per_kind": {"window_seconds": 60, "max_hits": 500}, + "broad": {"window_seconds": 60, "max_hits": 30} + }, + "count": { + "per_ip": {"window_seconds": 60, "max_hits": 300}, + "per_connection": {"window_seconds": 60, "max_hits": 60}, + "per_pubkey": {"window_seconds": 60, "max_hits": 120}, + "per_group": {"window_seconds": 60, "max_hits": 120}, + "per_kind": {"window_seconds": 60, "max_hits": 240}, + "broad": {"window_seconds": 60, "max_hits": 20} + } + }, + "backup_export": { + "backup_enabled": true, + "export_enabled": true + } + }) + } + + fn read_json(path: &Path) -> Value { + serde_json::from_str(&std::fs::read_to_string(path).expect("read")).expect("json") + } + + fn temp_root(name: &str) -> PathBuf { + std::env::temp_dir().join(format!("tangle-runtime-{name}-{}", std::process::id())) + } +} diff --git a/crates/tangle_runtime/src/lib.rs b/crates/tangle_runtime/src/lib.rs @@ -1,9 +1,11 @@ #![forbid(unsafe_code)] +pub mod backup; pub(crate) mod client_message; pub mod config; pub mod errors; pub mod event_bus; +pub mod export; pub mod groups; pub mod host; pub mod logging; diff --git a/crates/tangle_runtime/src/pocket_conversion.rs b/crates/tangle_runtime/src/pocket_conversion.rs @@ -5,9 +5,9 @@ use crate::errors::BaseRelayError; use std::str; use tangle_protocol::EventId; #[cfg(test)] -use tangle_protocol::Filter; -#[cfg(test)] -use tangle_protocol::{Event, Kind, PublicKeyHex, SignatureHex, Tag, UnixTimestamp, UnsignedEvent}; +use tangle_protocol::{ + Event, Filter, Kind, PublicKeyHex, SignatureHex, Tag, UnixTimestamp, UnsignedEvent, +}; #[cfg(test)] use tangle_store_pocket::PocketEvent; use tangle_store_pocket::PocketEventId;