tangle


git clone https://radroots.dev/git/tangle.git
Log | Files | Refs | README | LICENSE

commit 3ce0ac336b523abb68fb001fb50c674d0c89b2a7
parent 9cdaf9dc25a60c48e7da6d9ec5db32ad4b9935b0
Author: triesap <tyson@radroots.org>
Date:   Sat,  6 Jun 2026 02:03:28 -0700

cli: add event import command

Diffstat:
Mcrates/tangle/src/lib.rs | 138+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------
Mcrates/tangle/src/main.rs | 20++++++++++++++++++++
Mcrates/tangle/tests/version.rs | 142++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
Mcrates/tangle_runtime/src/lib.rs | 202+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 483 insertions(+), 19 deletions(-)

diff --git a/crates/tangle/src/lib.rs b/crates/tangle/src/lib.rs @@ -5,14 +5,13 @@ use std::fmt; pub const PACKAGE_NAME: &str = env!("CARGO_PKG_NAME"); pub const PACKAGE_VERSION: &str = env!("CARGO_PKG_VERSION"); pub const USAGE: &str = "\ -usage: tangle [--version] <command> [--config PATH] - -commands: - migrate - run - event import - event export - projection rebuild"; +usage: + tangle [--version] + tangle migrate --config PATH + tangle run --config PATH + tangle event import --config PATH --input PATH + tangle event export --config PATH + tangle projection rebuild --config PATH"; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum TangleCommand { @@ -39,7 +38,10 @@ impl TangleCommand { } pub fn implemented(self) -> bool { - matches!(self, Self::Version | Self::Help | Self::Migrate | Self::Run) + matches!( + self, + Self::Version | Self::Help | Self::Migrate | Self::Run | Self::EventImport + ) } } @@ -47,6 +49,7 @@ impl TangleCommand { pub struct TangleInvocation { command: TangleCommand, config_path: Option<String>, + input_path: Option<String>, } impl TangleInvocation { @@ -54,9 +57,15 @@ impl TangleInvocation { Self { command, config_path, + input_path: None, } } + pub fn with_input_path(mut self, input_path: Option<String>) -> Self { + self.input_path = input_path; + self + } + pub fn command(&self) -> TangleCommand { self.command } @@ -64,6 +73,10 @@ impl TangleInvocation { pub fn config_path(&self) -> Option<&str> { self.config_path.as_deref() } + + pub fn input_path(&self) -> Option<&str> { + self.input_path.as_deref() + } } #[derive(Debug, Clone, PartialEq, Eq)] @@ -156,6 +169,7 @@ where _ => return Err(TangleCliError::UnknownCommand(first)), }; let mut config_path = None; + let mut input_path = None; while let Some(argument) = args.next() { match argument.as_str() { "--config" => { @@ -167,6 +181,15 @@ where }; config_path = Some(path); } + "--input" => { + if input_path.is_some() { + return Err(TangleCliError::RepeatedOption("--input")); + } + let Some(path) = args.next() else { + return Err(TangleCliError::MissingOptionValue("--input")); + }; + input_path = Some(path); + } _ => { return Err(TangleCliError::UnexpectedArgument { command: command.as_str().to_owned(), @@ -175,7 +198,13 @@ where } } } - Ok(TangleInvocation::new(command, config_path)) + if input_path.is_some() && command != TangleCommand::EventImport { + return Err(TangleCliError::UnexpectedArgument { + command: command.as_str().to_owned(), + argument: "--input".to_owned(), + }); + } + Ok(TangleInvocation::new(command, config_path).with_input_path(input_path)) } pub fn require_config_path(invocation: &TangleInvocation) -> Result<&str, TangleCliError> { @@ -184,6 +213,12 @@ pub fn require_config_path(invocation: &TangleInvocation) -> Result<&str, Tangle .ok_or(TangleCliError::MissingOptionValue("--config")) } +pub fn require_input_path(invocation: &TangleInvocation) -> Result<&str, TangleCliError> { + invocation + .input_path() + .ok_or(TangleCliError::MissingOptionValue("--input")) +} + pub fn migrate_output(report: tangle_runtime::RuntimeMigrationReport) -> String { format!( "migrations applied: {}\nmigrations already applied: {}\nmigrations total: {}", @@ -193,6 +228,17 @@ pub fn migrate_output(report: tangle_runtime::RuntimeMigrationReport) -> String ) } +pub fn event_import_output(report: tangle_runtime::RuntimeEventImportReport) -> String { + format!( + "events total: {}\nevents inserted: {}\nevents duplicate: {}\nevents projected: {}\nevents skipped: {}", + report.total(), + report.inserted(), + report.duplicate(), + report.projected(), + report.skipped() + ) +} + pub async fn migrate_with_config(path: &str) -> Result<String, String> { let config = tangle_runtime::load_runtime_config(path).map_err(|error| error.to_string())?; let report = tangle_runtime::migrate_runtime_database(&config) @@ -201,6 +247,18 @@ pub async fn migrate_with_config(path: &str) -> Result<String, String> { Ok(migrate_output(report)) } +pub async fn event_import_with_config( + config_path: &str, + input_path: &str, +) -> Result<String, String> { + let config = + tangle_runtime::load_runtime_config(config_path).map_err(|error| error.to_string())?; + let report = tangle_runtime::import_events_from_path(&config, input_path) + .await + .map_err(|error| error.to_string())?; + Ok(event_import_output(report)) +} + pub async fn run_with_config(path: &str) -> Result<(), String> { let config = tangle_runtime::load_runtime_config(path).map_err(|error| error.to_string())?; let (shutdown, _) = tangle_runtime::GracefulShutdownSignal::new(); @@ -220,10 +278,10 @@ pub async fn run_with_config(path: &str) -> Result<(), String> { mod tests { use super::{ PACKAGE_NAME, PACKAGE_VERSION, TangleCliError, TangleCommand, TangleInvocation, - migrate_output, parse_tangle_command, parse_tangle_invocation, require_config_path, - usage_output, version_output, + event_import_output, migrate_output, parse_tangle_command, parse_tangle_invocation, + require_config_path, require_input_path, usage_output, version_output, }; - use tangle_runtime::RuntimeMigrationReport; + use tangle_runtime::{RuntimeEventImportReport, RuntimeMigrationReport}; #[test] fn package_name_is_tangle() { @@ -244,7 +302,7 @@ mod tests { fn usage_output_lists_supported_command_model() { assert_eq!( usage_output(), - "usage: tangle [--version] <command> [--config PATH]\n\ncommands:\n migrate\n run\n event import\n event export\n projection rebuild" + "usage:\n tangle [--version]\n tangle migrate --config PATH\n tangle run --config PATH\n tangle event import --config PATH --input PATH\n tangle event export --config PATH\n tangle projection rebuild --config PATH" ); } @@ -276,6 +334,7 @@ mod tests { | TangleCommand::Help | TangleCommand::Migrate | TangleCommand::Run + | TangleCommand::EventImport ) ); } @@ -303,6 +362,33 @@ mod tests { } #[test] + fn command_model_parses_import_input_option() { + let invocation = parse_tangle_invocation([ + "event", + "import", + "--config", + "runtime.json", + "--input", + "events.jsonl", + ]) + .expect("invocation"); + assert_eq!(invocation.command(), TangleCommand::EventImport); + assert_eq!( + require_config_path(&invocation).expect("config"), + "runtime.json" + ); + assert_eq!( + require_input_path(&invocation).expect("input"), + "events.jsonl" + ); + assert_eq!( + require_input_path(&TangleInvocation::new(TangleCommand::EventImport, None)) + .expect_err("input"), + TangleCliError::MissingOptionValue("--input") + ); + } + + #[test] fn command_model_rejects_unknown_or_extra_arguments() { assert_eq!( parse_tangle_command(["unknown"]).expect_err("unknown"), @@ -332,6 +418,22 @@ mod tests { .expect_err("repeated config"), TangleCliError::RepeatedOption("--config") ); + assert_eq!( + parse_tangle_invocation(["migrate", "--input", "events.jsonl"]).expect_err("input"), + TangleCliError::UnexpectedArgument { + command: "migrate".to_owned(), + argument: "--input".to_owned() + } + ); + assert_eq!( + parse_tangle_invocation(["event", "import", "--input"]).expect_err("missing input"), + TangleCliError::MissingOptionValue("--input") + ); + assert_eq!( + parse_tangle_invocation(["event", "import", "--input", "a", "--input", "b"]) + .expect_err("repeated input"), + TangleCliError::RepeatedOption("--input") + ); } #[test] @@ -341,4 +443,12 @@ mod tests { "migrations applied: 8\nmigrations already applied: 2\nmigrations total: 10" ); } + + #[test] + fn event_import_output_reports_outcome_counts() { + assert_eq!( + event_import_output(RuntimeEventImportReport::new(5, 2, 1, 2, 2)), + "events total: 5\nevents inserted: 2\nevents duplicate: 1\nevents projected: 2\nevents skipped: 2" + ); + } } diff --git a/crates/tangle/src/main.rs b/crates/tangle/src/main.rs @@ -38,6 +38,16 @@ fn main() -> ExitCode { ExitCode::from(2) } }, + tangle::TangleCommand::EventImport => match run_event_import(&invocation) { + Ok(output) => { + println!("{output}"); + ExitCode::SUCCESS + } + Err(error) => { + eprintln!("{error}"); + ExitCode::from(2) + } + }, command => { eprintln!("command not implemented: {}", command.as_str()); ExitCode::from(2) @@ -62,3 +72,13 @@ fn run_server(invocation: &tangle::TangleInvocation) -> Result<(), String> { .map_err(|error| format!("failed to start runtime: {error}"))?; runtime.block_on(tangle::run_with_config(config_path)) } + +fn run_event_import(invocation: &tangle::TangleInvocation) -> Result<String, String> { + let config_path = tangle::require_config_path(invocation).map_err(|error| error.to_string())?; + let input_path = tangle::require_input_path(invocation).map_err(|error| error.to_string())?; + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .map_err(|error| format!("failed to start runtime: {error}"))?; + runtime.block_on(tangle::event_import_with_config(config_path, input_path)) +} diff --git a/crates/tangle/tests/version.rs b/crates/tangle/tests/version.rs @@ -1,6 +1,10 @@ #![forbid(unsafe_code)] use std::process::Command; +use std::time::{Duration, Instant}; +use tangle_protocol::event_to_value; +use tangle_store_surreal::{SurrealConnectionConfig, SurrealStore}; +use tangle_test_support::{FixtureKey, build_fixture_event, valid_public_listing_spec}; #[test] fn tangle_version_command_reports_package_version() { @@ -23,7 +27,7 @@ fn tangle_without_args_reports_usage() { assert!(output.status.success()); assert_eq!( String::from_utf8_lossy(&output.stdout), - "usage: tangle [--version] <command> [--config PATH]\n\ncommands:\n migrate\n run\n event import\n event export\n projection rebuild\n" + "usage:\n tangle [--version]\n tangle migrate --config PATH\n tangle run --config PATH\n tangle event import --config PATH --input PATH\n tangle event export --config PATH\n tangle projection rebuild --config PATH\n" ); assert!(output.stderr.is_empty()); } @@ -39,7 +43,7 @@ fn tangle_unknown_arg_reports_usage_error() { assert!(output.stdout.is_empty()); assert_eq!( String::from_utf8_lossy(&output.stderr), - "unknown command: --unknown\nusage: tangle [--version] <command> [--config PATH]\n\ncommands:\n migrate\n run\n event import\n event export\n projection rebuild\n" + "unknown command: --unknown\nusage:\n tangle [--version]\n tangle migrate --config PATH\n tangle run --config PATH\n tangle event import --config PATH --input PATH\n tangle event export --config PATH\n tangle projection rebuild --config PATH\n" ); } @@ -86,6 +90,88 @@ fn tangle_migrate_command_applies_configured_migrations() { assert!(output.stderr.is_empty()); } +#[tokio::test] +async fn tangle_event_import_command_imports_canonical_jsonl() { + let listing = build_fixture_event(&valid_public_listing_spec()).expect("listing"); + let root = std::env::temp_dir().join(format!( + "tangle-cli-import-{}-{}", + std::process::id(), + &listing.id().as_str()[..8] + )); + let _ = std::fs::remove_dir_all(&root); + let db_path = root.join("db"); + let config_path = root.join("runtime.json"); + let input_path = root.join("events.jsonl"); + std::fs::create_dir_all(&root).expect("runtime root"); + write_rocksdb_config(&config_path, &db_path, "tangle_cli_import"); + std::fs::write(&input_path, format!("{}\n", event_to_value(&listing))) + .expect("write import file"); + + let first = Command::new(env!("CARGO_BIN_EXE_tangle")) + .args(["event", "import", "--config"]) + .arg(&config_path) + .args(["--input"]) + .arg(&input_path) + .output() + .expect("run tangle event import"); + + assert!(first.status.success()); + assert_eq!( + String::from_utf8_lossy(&first.stdout), + "events total: 1\nevents inserted: 1\nevents duplicate: 0\nevents projected: 1\nevents skipped: 0\n" + ); + assert!(first.stderr.is_empty()); + + let second = Command::new(env!("CARGO_BIN_EXE_tangle")) + .args(["event", "import", "--config"]) + .arg(&config_path) + .args(["--input"]) + .arg(&input_path) + .output() + .expect("rerun tangle event import"); + + assert!(second.status.success()); + assert_eq!( + String::from_utf8_lossy(&second.stdout), + "events total: 1\nevents inserted: 0\nevents duplicate: 1\nevents projected: 0\nevents skipped: 0\n" + ); + assert!(second.stderr.is_empty()); + + let seller = FixtureKey::Seller.public_key(); + let listing_key = format!("30402:{}:listing-a", seller.as_str()); + let store_config = SurrealConnectionConfig::rocksdb( + db_path.to_str().expect("db path"), + "tangle_cli_import", + "relay", + ) + .expect("store config"); + let store = reopen_store(&store_config).await; + assert!( + store + .raw_event_row(listing.id()) + .await + .expect("raw row") + .is_some() + ); + assert!( + store + .listing_current_row(&listing_key) + .await + .expect("listing row") + .is_some() + ); + assert!( + store + .search_document_row(&listing_key) + .await + .expect("search row") + .is_some() + ); + + drop(store); + std::fs::remove_dir_all(&root).expect("remove runtime root"); +} + #[test] fn tangle_migrate_requires_config_path() { let output = Command::new(env!("CARGO_BIN_EXE_tangle")) @@ -104,14 +190,60 @@ fn tangle_migrate_requires_config_path() { #[test] fn tangle_known_future_commands_report_not_implemented() { let output = Command::new(env!("CARGO_BIN_EXE_tangle")) - .args(["event", "import"]) + .args(["event", "export"]) .output() - .expect("run tangle event import"); + .expect("run tangle event export"); assert_eq!(output.status.code(), Some(2)); assert!(output.stdout.is_empty()); assert_eq!( String::from_utf8_lossy(&output.stderr), - "command not implemented: event import\n" + "command not implemented: event export\n" ); } + +fn write_rocksdb_config(path: &std::path::Path, db_path: &std::path::Path, namespace: &str) { + let config = serde_json::json!({ + "server": { + "listen_addr": "127.0.0.1:0", + "relay_url": "wss://relay.radroots.test" + }, + "database": { + "mode": "rocks_db", + "path": db_path.to_str().expect("db path"), + "namespace": namespace, + "database": "relay" + }, + "auth": { + "challenge_ttl_seconds": 300 + }, + "limits": { + "message_rate_limit": { + "limit": 120, + "window_seconds": 60 + } + }, + "policy": { + "approved_sellers": [FixtureKey::Seller.public_key().as_str()] + } + }); + std::fs::write( + path, + serde_json::to_string_pretty(&config).expect("config JSON"), + ) + .expect("write config"); +} + +async fn reopen_store(config: &SurrealConnectionConfig) -> SurrealStore { + let started = Instant::now(); + loop { + match SurrealStore::connect_local(config).await { + Ok(store) => return store, + Err(error) if started.elapsed() < Duration::from_secs(5) => { + let _ = error; + tokio::time::sleep(Duration::from_millis(50)).await; + } + Err(error) => panic!("store reopen failed: {error}"), + } + } +} diff --git a/crates/tangle_runtime/src/lib.rs b/crates/tangle_runtime/src/lib.rs @@ -371,6 +371,7 @@ impl RuntimeMigrationReport { #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum RuntimeCommandErrorKind { Unsupported, + Input, Store, } @@ -392,6 +393,10 @@ impl RuntimeCommandError { Self::new(RuntimeCommandErrorKind::Unsupported, message) } + pub fn input(message: impl Into<String>) -> Self { + Self::new(RuntimeCommandErrorKind::Input, message) + } + pub fn store(message: impl Into<String>) -> Self { Self::new(RuntimeCommandErrorKind::Store, message) } @@ -436,6 +441,203 @@ pub async fn migrate_runtime_database( )) } +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub struct RuntimeEventImportReport { + total: u64, + inserted: u64, + duplicate: u64, + projected: u64, + skipped: u64, +} + +impl RuntimeEventImportReport { + pub fn new(total: u64, inserted: u64, duplicate: u64, projected: u64, skipped: u64) -> Self { + Self { + total, + inserted, + duplicate, + projected, + skipped, + } + } + + pub fn total(self) -> u64 { + self.total + } + + pub fn inserted(self) -> u64 { + self.inserted + } + + pub fn duplicate(self) -> u64 { + self.duplicate + } + + pub fn projected(self) -> u64 { + self.projected + } + + pub fn skipped(self) -> u64 { + self.skipped + } + + fn record(&mut self, outcome: RuntimeEventImportOutcome) { + self.total += 1; + match outcome { + RuntimeEventImportOutcome::Inserted { projected } => { + self.inserted += 1; + if projected { + self.projected += 1; + } + } + RuntimeEventImportOutcome::Duplicate => { + self.duplicate += 1; + } + RuntimeEventImportOutcome::Skipped => { + self.skipped += 1; + } + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum RuntimeEventImportOutcome { + Inserted { projected: bool }, + Duplicate, + Skipped, +} + +pub async fn import_events_from_path( + config: &TangleRuntimeConfig, + path: impl AsRef<FsPath>, +) -> Result<RuntimeEventImportReport, RuntimeCommandError> { + let path = path.as_ref(); + let raw = fs::read_to_string(path).map_err(|error| { + RuntimeCommandError::input(format!( + "failed to read event import file `{}`: {error}", + path.display() + )) + })?; + let events = parse_event_import_document(&raw)?; + let store = connect_runtime_store(config).await?; + store + .apply_plan(&base_migration_plan()) + .await + .map_err(|error| RuntimeCommandError::store(error.to_string()))?; + let validator = EventValidator::new( + config.limits(), + config + .admission_policy() + .clone() + .with_write_auth_required(false), + ); + let mut report = RuntimeEventImportReport::default(); + let now = now_timestamp(); + for event in events { + let outcome = import_single_event(&store, &validator, event, now).await?; + report.record(outcome); + } + Ok(report) +} + +async fn import_single_event( + store: &SurrealStore, + validator: &EventValidator, + event: Event, + now: UnixTimestamp, +) -> Result<RuntimeEventImportOutcome, RuntimeCommandError> { + let validated = match validator.validate(&event, &AdmissionContext::unauthenticated(), now) { + Ok(validated) => validated, + Err(_) => return Ok(RuntimeEventImportOutcome::Skipped), + }; + if validated.admission().effect() == AdmissionEffect::AuthenticateOnly { + return Ok(RuntimeEventImportOutcome::Skipped); + } + if event.unsigned().kind().is_ephemeral() { + return Ok(RuntimeEventImportOutcome::Skipped); + } + let raw_outcome = store + .store_raw_event(&StoredEvent::new(event.clone(), now)) + .await + .map_err(|error| RuntimeCommandError::store(error.to_string()))?; + if raw_outcome == StoreEventOutcome::Duplicate { + return Ok(RuntimeEventImportOutcome::Duplicate); + } + if store.index_event_tags(&event).await.is_err() + || store.maintain_current_event(&event).await.is_err() + || store.apply_deletion_markers(&event).await.is_err() + || store.store_listing_revision(&event, now).await.is_err() + { + return Err(RuntimeCommandError::store("event projection failed")); + } + let projected = + if validated.admission().effect() == AdmissionEffect::StoreRawAndProjectPublicListing { + if store.project_current_listing(&event, now).await.is_err() + || store.project_listing_helpers(&event).await.is_err() + || store.index_listing_search_document(&event).await.is_err() + { + return Err(RuntimeCommandError::store("event projection failed")); + } + true + } else { + false + }; + Ok(RuntimeEventImportOutcome::Inserted { projected }) +} + +fn parse_event_import_document(raw: &str) -> Result<Vec<Event>, RuntimeCommandError> { + let trimmed = raw.trim(); + if trimmed.is_empty() { + return Ok(Vec::new()); + } + match serde_json::from_str::<serde_json::Value>(trimmed) { + Ok(serde_json::Value::Array(events)) => events + .iter() + .enumerate() + .map(|(index, value)| event_from_import_value(value, index + 1)) + .collect(), + Ok(value @ serde_json::Value::Object(_)) => { + event_from_import_value(&value, 1).map(|event| vec![event]) + } + Ok(_) => Err(RuntimeCommandError::input( + "event import file must contain event objects", + )), + Err(_) => trimmed + .lines() + .enumerate() + .filter_map(|(index, line)| { + let line = line.trim(); + if line.is_empty() { + None + } else { + Some(event_from_import_line(line, index + 1)) + } + }) + .collect(), + } +} + +fn event_from_import_value( + value: &serde_json::Value, + index: usize, +) -> Result<Event, RuntimeCommandError> { + let raw = RawEventJson::new(&value.to_string()).map_err(|error| { + RuntimeCommandError::input(format!("event import item {index} is invalid: {error}")) + })?; + parse_event_json(&raw).map_err(|error| { + RuntimeCommandError::input(format!("event import item {index} is invalid: {error}")) + }) +} + +fn event_from_import_line(line: &str, index: usize) -> Result<Event, RuntimeCommandError> { + let raw = RawEventJson::new(line).map_err(|error| { + RuntimeCommandError::input(format!("event import line {index} is invalid: {error}")) + })?; + parse_event_json(&raw).map_err(|error| { + RuntimeCommandError::input(format!("event import line {index} is invalid: {error}")) + }) +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct RuntimeServerReport { listen_addr: SocketAddr,