export.rs (14618B)
1 #![forbid(unsafe_code)] 2 3 use crate::{ 4 TANGLE_RELAY_VERSION, 5 backup::{ 6 TANGLE_SPEC_VERSION, file_sha256_hex, load_selected_tenant_config, now_unix_seconds, 7 tenant_manifest_source, write_json_file, 8 }, 9 }; 10 use serde::{Deserialize, Serialize}; 11 use std::{ 12 fs::{self, File}, 13 io::Write, 14 path::{Path, PathBuf}, 15 str, 16 }; 17 use tangle_store_pocket::{PocketEvent, PocketStoreHandle}; 18 19 const EXPORT_SCHEMA: &str = "tangle.tenant.export.v1"; 20 21 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 22 pub struct TenantExportRequest<'a> { 23 pub config_path: &'a str, 24 pub tenant_id: &'a str, 25 pub output: &'a str, 26 } 27 28 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 29 pub struct TenantExportReport { 30 pub tenant_id: String, 31 pub output_path: String, 32 pub manifest_path: String, 33 pub event_count: u64, 34 pub sha256: String, 35 } 36 37 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 38 struct TenantExportManifest { 39 schema: String, 40 tangle_version: String, 41 tangle_spec_version: String, 42 created_at: u64, 43 source: crate::backup::TenantManifestSource, 44 events: TenantExportEventsManifest, 45 } 46 47 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] 48 struct TenantExportEventsManifest { 49 path: String, 50 count: u64, 51 sha256: String, 52 size_bytes: u64, 53 } 54 55 pub fn export_tenant(request: TenantExportRequest<'_>) -> Result<TenantExportReport, String> { 56 let tenant = load_selected_tenant_config(request.config_path, request.tenant_id)?; 57 if !tenant.backup_export().export_enabled() { 58 return Err(format!( 59 "tenant export is disabled for {}", 60 tenant.tenant_id().as_str() 61 )); 62 } 63 let output = PathBuf::from(request.output); 64 if output.exists() { 65 return Err(format!( 66 "export output already exists: {}", 67 output.display() 68 )); 69 } 70 let manifest_path = export_manifest_path(&output)?; 71 if manifest_path.exists() { 72 return Err(format!( 73 "export manifest already exists: {}", 74 manifest_path.display() 75 )); 76 } 77 if let Some(parent) = output.parent() 78 && !parent.as_os_str().is_empty() 79 { 80 fs::create_dir_all(parent) 81 .map_err(|error| format!("failed to create {}: {error}", parent.display()))?; 82 } 83 let handle = 84 PocketStoreHandle::open(tenant.pocket_config()).map_err(|error| error.to_string())?; 85 let events = handle.scan_events().map_err(|error| error.to_string())?; 86 let mut file = File::create(&output) 87 .map_err(|error| format!("failed to create {}: {error}", output.display()))?; 88 for stored in &events { 89 let raw = serde_json::to_string(&pocket_event_json(stored.event())?) 90 .map_err(|error| error.to_string())?; 91 file.write_all(raw.as_bytes()) 92 .map_err(|error| format!("failed to write {}: {error}", output.display()))?; 93 file.write_all(b"\n") 94 .map_err(|error| format!("failed to write {}: {error}", output.display()))?; 95 } 96 file.sync_all() 97 .map_err(|error| format!("failed to sync {}: {error}", output.display()))?; 98 drop(file); 99 let (sha256, size_bytes) = file_sha256_hex(&output)?; 100 let event_count = u64::try_from(events.len()).expect("event count fits u64"); 101 let manifest = TenantExportManifest { 102 schema: EXPORT_SCHEMA.to_owned(), 103 tangle_version: TANGLE_RELAY_VERSION.to_owned(), 104 tangle_spec_version: TANGLE_SPEC_VERSION.to_owned(), 105 created_at: now_unix_seconds()?, 106 source: tenant_manifest_source(&tenant)?, 107 events: TenantExportEventsManifest { 108 path: output 109 .file_name() 110 .and_then(|name| name.to_str()) 111 .ok_or_else(|| { 112 format!("export output has no UTF-8 file name: {}", output.display()) 113 })? 114 .to_owned(), 115 count: event_count, 116 sha256: sha256.clone(), 117 size_bytes, 118 }, 119 }; 120 write_json_file(&manifest_path, &manifest)?; 121 Ok(TenantExportReport { 122 tenant_id: tenant.tenant_id().as_str().to_owned(), 123 output_path: output.display().to_string(), 124 manifest_path: manifest_path.display().to_string(), 125 event_count, 126 sha256, 127 }) 128 } 129 130 pub fn export_manifest_path(output: &Path) -> Result<PathBuf, String> { 131 let file_name = output 132 .file_name() 133 .and_then(|name| name.to_str()) 134 .ok_or_else(|| format!("export output has no UTF-8 file name: {}", output.display()))?; 135 Ok(output.with_file_name(format!("{file_name}.manifest.json"))) 136 } 137 138 fn pocket_event_json(event: &PocketEvent) -> Result<serde_json::Value, String> { 139 let tags = event 140 .tags() 141 .map_err(|error| error.to_string())? 142 .iter() 143 .map(|tag| { 144 tag.map(|value| { 145 str::from_utf8(value) 146 .map(str::to_owned) 147 .map_err(|error| error.to_string()) 148 }) 149 .collect::<Result<Vec<_>, _>>() 150 }) 151 .collect::<Result<Vec<_>, _>>()?; 152 let content = str::from_utf8(event.content()).map_err(|error| error.to_string())?; 153 Ok(serde_json::json!({ 154 "id": event.id().as_hex_string(), 155 "pubkey": event.pubkey().as_hex_string(), 156 "created_at": event.created_at().as_u64(), 157 "kind": event.kind().as_u16(), 158 "tags": tags, 159 "content": content, 160 "sig": event.sig().to_string() 161 })) 162 } 163 164 #[cfg(test)] 165 mod tests { 166 use super::{TenantExportRequest, export_manifest_path, export_tenant}; 167 use crate::pocket_conversion::tangle_event_to_pocket; 168 use serde_json::{Value, json}; 169 use std::path::{Path, PathBuf}; 170 use tangle_protocol::Tag; 171 use tangle_store_pocket::{PocketStoreConfig, PocketStoreHandle, PocketSyncPolicy}; 172 use tangle_test_support::{FixtureKey, tangle_v2_event}; 173 174 #[test] 175 fn export_writes_selected_tenant_jsonl_and_manifest() { 176 let fixture = ExportFixture::new("export-selected"); 177 fixture.write_config(); 178 fixture.store_event(&fixture.alpha_store, "alpha note", 1_714_400_001); 179 fixture.store_event(&fixture.beta_store, "beta note", 1_714_400_002); 180 let report = export_tenant(TenantExportRequest { 181 config_path: fixture.host_config.to_str().expect("config"), 182 tenant_id: "alpha", 183 output: fixture.output.to_str().expect("output"), 184 }) 185 .expect("export"); 186 let jsonl = std::fs::read_to_string(&fixture.output).expect("jsonl"); 187 let manifest = read_json(&export_manifest_path(&fixture.output).expect("manifest path")); 188 189 assert_eq!(report.tenant_id, "alpha"); 190 assert_eq!(report.event_count, 1); 191 assert!(jsonl.contains("alpha note")); 192 assert!(!jsonl.contains("beta note")); 193 assert_eq!(manifest["schema"], "tangle.tenant.export.v1"); 194 assert_eq!(manifest["source"]["tenant_id"], "alpha"); 195 assert_eq!(manifest["events"]["count"], 1); 196 assert_eq!(manifest["events"]["sha256"], report.sha256); 197 198 fixture.cleanup(); 199 } 200 201 #[test] 202 fn export_refuses_existing_outputs() { 203 let fixture = ExportFixture::new("export-existing"); 204 fixture.write_config(); 205 std::fs::create_dir_all(fixture.output.parent().expect("parent")).expect("parent"); 206 std::fs::write(&fixture.output, b"exists").expect("output"); 207 let error = export_tenant(TenantExportRequest { 208 config_path: fixture.host_config.to_str().expect("config"), 209 tenant_id: "alpha", 210 output: fixture.output.to_str().expect("output"), 211 }) 212 .expect_err("existing"); 213 214 assert!(error.contains("export output already exists")); 215 216 fixture.cleanup(); 217 } 218 219 struct ExportFixture { 220 root: PathBuf, 221 host_config: PathBuf, 222 alpha_store: PathBuf, 223 beta_store: PathBuf, 224 output: PathBuf, 225 } 226 227 impl ExportFixture { 228 fn new(name: &str) -> Self { 229 let root = temp_root(name); 230 let _ = std::fs::remove_dir_all(&root); 231 Self { 232 host_config: root.join("host.json"), 233 alpha_store: root.join("alpha-pocket"), 234 beta_store: root.join("beta-pocket"), 235 output: root.join("exports").join("alpha.jsonl"), 236 root, 237 } 238 } 239 240 fn write_config(&self) { 241 std::fs::create_dir_all(self.root.join("tenants")).expect("tenants"); 242 std::fs::write( 243 &self.host_config, 244 json!({ 245 "listen_addr": "127.0.0.1:0", 246 "tenant_config_dir": "tenants" 247 }) 248 .to_string(), 249 ) 250 .expect("host"); 251 std::fs::write( 252 self.root.join("tenants").join("alpha.json"), 253 tenant_config_json("alpha", "alpha.test", &self.alpha_store).to_string(), 254 ) 255 .expect("alpha tenant"); 256 std::fs::write( 257 self.root.join("tenants").join("beta.json"), 258 tenant_config_json("beta", "beta.test", &self.beta_store).to_string(), 259 ) 260 .expect("beta tenant"); 261 } 262 263 fn store_event(&self, store: &Path, content: &str, created_at: u64) { 264 let config = 265 PocketStoreConfig::new(store, PocketSyncPolicy::FlushOnShutdown).expect("config"); 266 let handle = PocketStoreHandle::open(&config).expect("open"); 267 let event = tangle_v2_event( 268 FixtureKey::Member, 269 created_at, 270 1, 271 vec![Tag::from_parts("t", &[content]).expect("tag")], 272 content, 273 ) 274 .expect("event"); 275 let pocket = tangle_event_to_pocket(&event).expect("pocket"); 276 handle.store_event(&pocket).expect("store"); 277 handle.sync().expect("sync"); 278 } 279 280 fn cleanup(self) { 281 let _ = std::fs::remove_dir_all(self.root); 282 } 283 } 284 285 fn tenant_config_json(tenant_id: &str, host: &str, store: &Path) -> Value { 286 let relay_secret = if tenant_id == "alpha" { 287 "7777777777777777777777777777777777777777777777777777777777777777" 288 } else { 289 "8888888888888888888888888888888888888888888888888888888888888888" 290 }; 291 json!({ 292 "tenant_id": tenant_id, 293 "tenant_schema": tenant_id, 294 "host": host, 295 "relay_url": format!("wss://{host}"), 296 "info": {"name": format!("{tenant_id} relay")}, 297 "pocket": { 298 "data_directory": store, 299 "sync_policy": "flush_on_shutdown" 300 }, 301 "pocket_query": { 302 "allow_scraping": false, 303 "allow_scrape_if_limited_to": 100, 304 "allow_scrape_if_max_seconds": 3600 305 }, 306 "groups": { 307 "enabled": true, 308 "canonical_relay_url": format!("wss://{host}"), 309 "relay_secret": relay_secret, 310 "owner_pubkeys": [FixtureKey::Owner.public_key().as_str()], 311 "admin_pubkeys": [FixtureKey::Admin.public_key().as_str()] 312 }, 313 "auth": { 314 "challenge_ttl_seconds": 300, 315 "created_at_skew_seconds": 600 316 }, 317 "limits": { 318 "max_message_length": 1048576, 319 "max_subid_length": 64, 320 "max_subscriptions_per_connection": 64, 321 "max_filters_per_request": 10, 322 "max_tag_values_per_filter": 100, 323 "max_query_complexity": 2048, 324 "max_limit": 500, 325 "default_limit": 100, 326 "max_event_tags": 200, 327 "max_content_length": 65536, 328 "broadcast_channel_capacity": 16, 329 "per_connection_outbound_queue": 16 330 }, 331 "rate_limits": { 332 "auth": { 333 "per_ip": {"window_seconds": 60, "max_hits": 120}, 334 "per_pubkey": {"window_seconds": 60, "max_hits": 30}, 335 "failures": {"window_seconds": 300, "max_hits": 5}, 336 "failures_per_ip": {"window_seconds": 300, "max_hits": 20} 337 }, 338 "event": { 339 "per_ip": {"window_seconds": 60, "max_hits": 600}, 340 "per_pubkey": {"window_seconds": 60, "max_hits": 120}, 341 "per_kind": {"window_seconds": 60, "max_hits": 1000} 342 }, 343 "group": { 344 "write_per_ip": {"window_seconds": 60, "max_hits": 300}, 345 "write_per_pubkey": {"window_seconds": 60, "max_hits": 60}, 346 "write_per_group": {"window_seconds": 60, "max_hits": 90}, 347 "write_per_kind": {"window_seconds": 60, "max_hits": 300}, 348 "join_flow": {"window_seconds": 300, "max_hits": 10}, 349 "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30} 350 }, 351 "req": { 352 "per_ip": {"window_seconds": 60, "max_hits": 600}, 353 "per_connection": {"window_seconds": 60, "max_hits": 120}, 354 "per_pubkey": {"window_seconds": 60, "max_hits": 240}, 355 "per_group": {"window_seconds": 60, "max_hits": 240}, 356 "per_kind": {"window_seconds": 60, "max_hits": 500}, 357 "broad": {"window_seconds": 60, "max_hits": 30} 358 }, 359 "count": { 360 "per_ip": {"window_seconds": 60, "max_hits": 300}, 361 "per_connection": {"window_seconds": 60, "max_hits": 60}, 362 "per_pubkey": {"window_seconds": 60, "max_hits": 120}, 363 "per_group": {"window_seconds": 60, "max_hits": 120}, 364 "per_kind": {"window_seconds": 60, "max_hits": 240}, 365 "broad": {"window_seconds": 60, "max_hits": 20} 366 } 367 }, 368 "backup_export": { 369 "backup_enabled": true, 370 "export_enabled": true 371 } 372 }) 373 } 374 375 fn read_json(path: &Path) -> Value { 376 serde_json::from_str(&std::fs::read_to_string(path).expect("read")).expect("json") 377 } 378 379 fn temp_root(name: &str) -> PathBuf { 380 std::env::temp_dir().join(format!("tangle-runtime-{name}-{}", std::process::id())) 381 } 382 }