tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

export.rs (14618B)


      1 #![forbid(unsafe_code)]
      2 
      3 use crate::{
      4     TANGLE_RELAY_VERSION,
      5     backup::{
      6         TANGLE_SPEC_VERSION, file_sha256_hex, load_selected_tenant_config, now_unix_seconds,
      7         tenant_manifest_source, write_json_file,
      8     },
      9 };
     10 use serde::{Deserialize, Serialize};
     11 use std::{
     12     fs::{self, File},
     13     io::Write,
     14     path::{Path, PathBuf},
     15     str,
     16 };
     17 use tangle_store_pocket::{PocketEvent, PocketStoreHandle};
     18 
     19 const EXPORT_SCHEMA: &str = "tangle.tenant.export.v1";
     20 
     21 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
     22 pub struct TenantExportRequest<'a> {
     23     pub config_path: &'a str,
     24     pub tenant_id: &'a str,
     25     pub output: &'a str,
     26 }
     27 
     28 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
     29 pub struct TenantExportReport {
     30     pub tenant_id: String,
     31     pub output_path: String,
     32     pub manifest_path: String,
     33     pub event_count: u64,
     34     pub sha256: String,
     35 }
     36 
     37 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
     38 struct TenantExportManifest {
     39     schema: String,
     40     tangle_version: String,
     41     tangle_spec_version: String,
     42     created_at: u64,
     43     source: crate::backup::TenantManifestSource,
     44     events: TenantExportEventsManifest,
     45 }
     46 
     47 #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
     48 struct TenantExportEventsManifest {
     49     path: String,
     50     count: u64,
     51     sha256: String,
     52     size_bytes: u64,
     53 }
     54 
     55 pub fn export_tenant(request: TenantExportRequest<'_>) -> Result<TenantExportReport, String> {
     56     let tenant = load_selected_tenant_config(request.config_path, request.tenant_id)?;
     57     if !tenant.backup_export().export_enabled() {
     58         return Err(format!(
     59             "tenant export is disabled for {}",
     60             tenant.tenant_id().as_str()
     61         ));
     62     }
     63     let output = PathBuf::from(request.output);
     64     if output.exists() {
     65         return Err(format!(
     66             "export output already exists: {}",
     67             output.display()
     68         ));
     69     }
     70     let manifest_path = export_manifest_path(&output)?;
     71     if manifest_path.exists() {
     72         return Err(format!(
     73             "export manifest already exists: {}",
     74             manifest_path.display()
     75         ));
     76     }
     77     if let Some(parent) = output.parent()
     78         && !parent.as_os_str().is_empty()
     79     {
     80         fs::create_dir_all(parent)
     81             .map_err(|error| format!("failed to create {}: {error}", parent.display()))?;
     82     }
     83     let handle =
     84         PocketStoreHandle::open(tenant.pocket_config()).map_err(|error| error.to_string())?;
     85     let events = handle.scan_events().map_err(|error| error.to_string())?;
     86     let mut file = File::create(&output)
     87         .map_err(|error| format!("failed to create {}: {error}", output.display()))?;
     88     for stored in &events {
     89         let raw = serde_json::to_string(&pocket_event_json(stored.event())?)
     90             .map_err(|error| error.to_string())?;
     91         file.write_all(raw.as_bytes())
     92             .map_err(|error| format!("failed to write {}: {error}", output.display()))?;
     93         file.write_all(b"\n")
     94             .map_err(|error| format!("failed to write {}: {error}", output.display()))?;
     95     }
     96     file.sync_all()
     97         .map_err(|error| format!("failed to sync {}: {error}", output.display()))?;
     98     drop(file);
     99     let (sha256, size_bytes) = file_sha256_hex(&output)?;
    100     let event_count = u64::try_from(events.len()).expect("event count fits u64");
    101     let manifest = TenantExportManifest {
    102         schema: EXPORT_SCHEMA.to_owned(),
    103         tangle_version: TANGLE_RELAY_VERSION.to_owned(),
    104         tangle_spec_version: TANGLE_SPEC_VERSION.to_owned(),
    105         created_at: now_unix_seconds()?,
    106         source: tenant_manifest_source(&tenant)?,
    107         events: TenantExportEventsManifest {
    108             path: output
    109                 .file_name()
    110                 .and_then(|name| name.to_str())
    111                 .ok_or_else(|| {
    112                     format!("export output has no UTF-8 file name: {}", output.display())
    113                 })?
    114                 .to_owned(),
    115             count: event_count,
    116             sha256: sha256.clone(),
    117             size_bytes,
    118         },
    119     };
    120     write_json_file(&manifest_path, &manifest)?;
    121     Ok(TenantExportReport {
    122         tenant_id: tenant.tenant_id().as_str().to_owned(),
    123         output_path: output.display().to_string(),
    124         manifest_path: manifest_path.display().to_string(),
    125         event_count,
    126         sha256,
    127     })
    128 }
    129 
    130 pub fn export_manifest_path(output: &Path) -> Result<PathBuf, String> {
    131     let file_name = output
    132         .file_name()
    133         .and_then(|name| name.to_str())
    134         .ok_or_else(|| format!("export output has no UTF-8 file name: {}", output.display()))?;
    135     Ok(output.with_file_name(format!("{file_name}.manifest.json")))
    136 }
    137 
    138 fn pocket_event_json(event: &PocketEvent) -> Result<serde_json::Value, String> {
    139     let tags = event
    140         .tags()
    141         .map_err(|error| error.to_string())?
    142         .iter()
    143         .map(|tag| {
    144             tag.map(|value| {
    145                 str::from_utf8(value)
    146                     .map(str::to_owned)
    147                     .map_err(|error| error.to_string())
    148             })
    149             .collect::<Result<Vec<_>, _>>()
    150         })
    151         .collect::<Result<Vec<_>, _>>()?;
    152     let content = str::from_utf8(event.content()).map_err(|error| error.to_string())?;
    153     Ok(serde_json::json!({
    154         "id": event.id().as_hex_string(),
    155         "pubkey": event.pubkey().as_hex_string(),
    156         "created_at": event.created_at().as_u64(),
    157         "kind": event.kind().as_u16(),
    158         "tags": tags,
    159         "content": content,
    160         "sig": event.sig().to_string()
    161     }))
    162 }
    163 
    164 #[cfg(test)]
    165 mod tests {
    166     use super::{TenantExportRequest, export_manifest_path, export_tenant};
    167     use crate::pocket_conversion::tangle_event_to_pocket;
    168     use serde_json::{Value, json};
    169     use std::path::{Path, PathBuf};
    170     use tangle_protocol::Tag;
    171     use tangle_store_pocket::{PocketStoreConfig, PocketStoreHandle, PocketSyncPolicy};
    172     use tangle_test_support::{FixtureKey, tangle_v2_event};
    173 
    174     #[test]
    175     fn export_writes_selected_tenant_jsonl_and_manifest() {
    176         let fixture = ExportFixture::new("export-selected");
    177         fixture.write_config();
    178         fixture.store_event(&fixture.alpha_store, "alpha note", 1_714_400_001);
    179         fixture.store_event(&fixture.beta_store, "beta note", 1_714_400_002);
    180         let report = export_tenant(TenantExportRequest {
    181             config_path: fixture.host_config.to_str().expect("config"),
    182             tenant_id: "alpha",
    183             output: fixture.output.to_str().expect("output"),
    184         })
    185         .expect("export");
    186         let jsonl = std::fs::read_to_string(&fixture.output).expect("jsonl");
    187         let manifest = read_json(&export_manifest_path(&fixture.output).expect("manifest path"));
    188 
    189         assert_eq!(report.tenant_id, "alpha");
    190         assert_eq!(report.event_count, 1);
    191         assert!(jsonl.contains("alpha note"));
    192         assert!(!jsonl.contains("beta note"));
    193         assert_eq!(manifest["schema"], "tangle.tenant.export.v1");
    194         assert_eq!(manifest["source"]["tenant_id"], "alpha");
    195         assert_eq!(manifest["events"]["count"], 1);
    196         assert_eq!(manifest["events"]["sha256"], report.sha256);
    197 
    198         fixture.cleanup();
    199     }
    200 
    201     #[test]
    202     fn export_refuses_existing_outputs() {
    203         let fixture = ExportFixture::new("export-existing");
    204         fixture.write_config();
    205         std::fs::create_dir_all(fixture.output.parent().expect("parent")).expect("parent");
    206         std::fs::write(&fixture.output, b"exists").expect("output");
    207         let error = export_tenant(TenantExportRequest {
    208             config_path: fixture.host_config.to_str().expect("config"),
    209             tenant_id: "alpha",
    210             output: fixture.output.to_str().expect("output"),
    211         })
    212         .expect_err("existing");
    213 
    214         assert!(error.contains("export output already exists"));
    215 
    216         fixture.cleanup();
    217     }
    218 
    219     struct ExportFixture {
    220         root: PathBuf,
    221         host_config: PathBuf,
    222         alpha_store: PathBuf,
    223         beta_store: PathBuf,
    224         output: PathBuf,
    225     }
    226 
    227     impl ExportFixture {
    228         fn new(name: &str) -> Self {
    229             let root = temp_root(name);
    230             let _ = std::fs::remove_dir_all(&root);
    231             Self {
    232                 host_config: root.join("host.json"),
    233                 alpha_store: root.join("alpha-pocket"),
    234                 beta_store: root.join("beta-pocket"),
    235                 output: root.join("exports").join("alpha.jsonl"),
    236                 root,
    237             }
    238         }
    239 
    240         fn write_config(&self) {
    241             std::fs::create_dir_all(self.root.join("tenants")).expect("tenants");
    242             std::fs::write(
    243                 &self.host_config,
    244                 json!({
    245                     "listen_addr": "127.0.0.1:0",
    246                     "tenant_config_dir": "tenants"
    247                 })
    248                 .to_string(),
    249             )
    250             .expect("host");
    251             std::fs::write(
    252                 self.root.join("tenants").join("alpha.json"),
    253                 tenant_config_json("alpha", "alpha.test", &self.alpha_store).to_string(),
    254             )
    255             .expect("alpha tenant");
    256             std::fs::write(
    257                 self.root.join("tenants").join("beta.json"),
    258                 tenant_config_json("beta", "beta.test", &self.beta_store).to_string(),
    259             )
    260             .expect("beta tenant");
    261         }
    262 
    263         fn store_event(&self, store: &Path, content: &str, created_at: u64) {
    264             let config =
    265                 PocketStoreConfig::new(store, PocketSyncPolicy::FlushOnShutdown).expect("config");
    266             let handle = PocketStoreHandle::open(&config).expect("open");
    267             let event = tangle_v2_event(
    268                 FixtureKey::Member,
    269                 created_at,
    270                 1,
    271                 vec![Tag::from_parts("t", &[content]).expect("tag")],
    272                 content,
    273             )
    274             .expect("event");
    275             let pocket = tangle_event_to_pocket(&event).expect("pocket");
    276             handle.store_event(&pocket).expect("store");
    277             handle.sync().expect("sync");
    278         }
    279 
    280         fn cleanup(self) {
    281             let _ = std::fs::remove_dir_all(self.root);
    282         }
    283     }
    284 
    285     fn tenant_config_json(tenant_id: &str, host: &str, store: &Path) -> Value {
    286         let relay_secret = if tenant_id == "alpha" {
    287             "7777777777777777777777777777777777777777777777777777777777777777"
    288         } else {
    289             "8888888888888888888888888888888888888888888888888888888888888888"
    290         };
    291         json!({
    292             "tenant_id": tenant_id,
    293             "tenant_schema": tenant_id,
    294             "host": host,
    295             "relay_url": format!("wss://{host}"),
    296             "info": {"name": format!("{tenant_id} relay")},
    297             "pocket": {
    298                 "data_directory": store,
    299                 "sync_policy": "flush_on_shutdown"
    300             },
    301             "pocket_query": {
    302                 "allow_scraping": false,
    303                 "allow_scrape_if_limited_to": 100,
    304                 "allow_scrape_if_max_seconds": 3600
    305             },
    306             "groups": {
    307                 "enabled": true,
    308                 "canonical_relay_url": format!("wss://{host}"),
    309                 "relay_secret": relay_secret,
    310                 "owner_pubkeys": [FixtureKey::Owner.public_key().as_str()],
    311                 "admin_pubkeys": [FixtureKey::Admin.public_key().as_str()]
    312             },
    313             "auth": {
    314                 "challenge_ttl_seconds": 300,
    315                 "created_at_skew_seconds": 600
    316             },
    317             "limits": {
    318                 "max_message_length": 1048576,
    319                 "max_subid_length": 64,
    320                 "max_subscriptions_per_connection": 64,
    321                 "max_filters_per_request": 10,
    322                 "max_tag_values_per_filter": 100,
    323                 "max_query_complexity": 2048,
    324                 "max_limit": 500,
    325                 "default_limit": 100,
    326                 "max_event_tags": 200,
    327                 "max_content_length": 65536,
    328                 "broadcast_channel_capacity": 16,
    329                 "per_connection_outbound_queue": 16
    330             },
    331             "rate_limits": {
    332                 "auth": {
    333                     "per_ip": {"window_seconds": 60, "max_hits": 120},
    334                     "per_pubkey": {"window_seconds": 60, "max_hits": 30},
    335                     "failures": {"window_seconds": 300, "max_hits": 5},
    336                     "failures_per_ip": {"window_seconds": 300, "max_hits": 20}
    337                 },
    338                 "event": {
    339                     "per_ip": {"window_seconds": 60, "max_hits": 600},
    340                     "per_pubkey": {"window_seconds": 60, "max_hits": 120},
    341                     "per_kind": {"window_seconds": 60, "max_hits": 1000}
    342                 },
    343                 "group": {
    344                     "write_per_ip": {"window_seconds": 60, "max_hits": 300},
    345                     "write_per_pubkey": {"window_seconds": 60, "max_hits": 60},
    346                     "write_per_group": {"window_seconds": 60, "max_hits": 90},
    347                     "write_per_kind": {"window_seconds": 60, "max_hits": 300},
    348                     "join_flow": {"window_seconds": 300, "max_hits": 10},
    349                     "join_flow_per_ip": {"window_seconds": 300, "max_hits": 30}
    350                 },
    351                 "req": {
    352                     "per_ip": {"window_seconds": 60, "max_hits": 600},
    353                     "per_connection": {"window_seconds": 60, "max_hits": 120},
    354                     "per_pubkey": {"window_seconds": 60, "max_hits": 240},
    355                     "per_group": {"window_seconds": 60, "max_hits": 240},
    356                     "per_kind": {"window_seconds": 60, "max_hits": 500},
    357                     "broad": {"window_seconds": 60, "max_hits": 30}
    358                 },
    359                 "count": {
    360                     "per_ip": {"window_seconds": 60, "max_hits": 300},
    361                     "per_connection": {"window_seconds": 60, "max_hits": 60},
    362                     "per_pubkey": {"window_seconds": 60, "max_hits": 120},
    363                     "per_group": {"window_seconds": 60, "max_hits": 120},
    364                     "per_kind": {"window_seconds": 60, "max_hits": 240},
    365                     "broad": {"window_seconds": 60, "max_hits": 20}
    366                 }
    367             },
    368             "backup_export": {
    369                 "backup_enabled": true,
    370                 "export_enabled": true
    371             }
    372         })
    373     }
    374 
    375     fn read_json(path: &Path) -> Value {
    376         serde_json::from_str(&std::fs::read_to_string(path).expect("read")).expect("json")
    377     }
    378 
    379     fn temp_root(name: &str) -> PathBuf {
    380         std::env::temp_dir().join(format!("tangle-runtime-{name}-{}", std::process::id()))
    381     }
    382 }