commit 3258c607fd16ce83a49dc03115294326b1d09851
parent 3ce0ac336b523abb68fb001fb50c674d0c89b2a7
Author: triesap <tyson@radroots.org>
Date: Sat, 6 Jun 2026 02:06:03 -0700
cli: add event export command
Diffstat:
4 files changed, 216 insertions(+), 12 deletions(-)
diff --git a/crates/tangle/src/lib.rs b/crates/tangle/src/lib.rs
@@ -10,7 +10,7 @@ usage:
tangle migrate --config PATH
tangle run --config PATH
tangle event import --config PATH --input PATH
- tangle event export --config PATH
+ tangle event export --config PATH --output PATH
tangle projection rebuild --config PATH";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -40,7 +40,12 @@ impl TangleCommand {
pub fn implemented(self) -> bool {
matches!(
self,
- Self::Version | Self::Help | Self::Migrate | Self::Run | Self::EventImport
+ Self::Version
+ | Self::Help
+ | Self::Migrate
+ | Self::Run
+ | Self::EventImport
+ | Self::EventExport
)
}
}
@@ -50,6 +55,7 @@ pub struct TangleInvocation {
command: TangleCommand,
config_path: Option<String>,
input_path: Option<String>,
+ output_path: Option<String>,
}
impl TangleInvocation {
@@ -58,6 +64,7 @@ impl TangleInvocation {
command,
config_path,
input_path: None,
+ output_path: None,
}
}
@@ -66,6 +73,11 @@ impl TangleInvocation {
self
}
+ pub fn with_output_path(mut self, output_path: Option<String>) -> Self {
+ self.output_path = output_path;
+ self
+ }
+
pub fn command(&self) -> TangleCommand {
self.command
}
@@ -77,6 +89,10 @@ impl TangleInvocation {
pub fn input_path(&self) -> Option<&str> {
self.input_path.as_deref()
}
+
+ pub fn output_path(&self) -> Option<&str> {
+ self.output_path.as_deref()
+ }
}
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -170,6 +186,7 @@ where
};
let mut config_path = None;
let mut input_path = None;
+ let mut output_path = None;
while let Some(argument) = args.next() {
match argument.as_str() {
"--config" => {
@@ -190,6 +207,15 @@ where
};
input_path = Some(path);
}
+ "--output" => {
+ if output_path.is_some() {
+ return Err(TangleCliError::RepeatedOption("--output"));
+ }
+ let Some(path) = args.next() else {
+ return Err(TangleCliError::MissingOptionValue("--output"));
+ };
+ output_path = Some(path);
+ }
_ => {
return Err(TangleCliError::UnexpectedArgument {
command: command.as_str().to_owned(),
@@ -204,7 +230,15 @@ where
argument: "--input".to_owned(),
});
}
- Ok(TangleInvocation::new(command, config_path).with_input_path(input_path))
+ if output_path.is_some() && command != TangleCommand::EventExport {
+ return Err(TangleCliError::UnexpectedArgument {
+ command: command.as_str().to_owned(),
+ argument: "--output".to_owned(),
+ });
+ }
+ Ok(TangleInvocation::new(command, config_path)
+ .with_input_path(input_path)
+ .with_output_path(output_path))
}
pub fn require_config_path(invocation: &TangleInvocation) -> Result<&str, TangleCliError> {
@@ -219,6 +253,12 @@ pub fn require_input_path(invocation: &TangleInvocation) -> Result<&str, TangleC
.ok_or(TangleCliError::MissingOptionValue("--input"))
}
+pub fn require_output_path(invocation: &TangleInvocation) -> Result<&str, TangleCliError> {
+ invocation
+ .output_path()
+ .ok_or(TangleCliError::MissingOptionValue("--output"))
+}
+
pub fn migrate_output(report: tangle_runtime::RuntimeMigrationReport) -> String {
format!(
"migrations applied: {}\nmigrations already applied: {}\nmigrations total: {}",
@@ -239,6 +279,10 @@ pub fn event_import_output(report: tangle_runtime::RuntimeEventImportReport) ->
)
}
+pub fn event_export_output(report: tangle_runtime::RuntimeEventExportReport) -> String {
+ format!("events exported: {}", report.exported())
+}
+
pub async fn migrate_with_config(path: &str) -> Result<String, String> {
let config = tangle_runtime::load_runtime_config(path).map_err(|error| error.to_string())?;
let report = tangle_runtime::migrate_runtime_database(&config)
@@ -259,6 +303,18 @@ pub async fn event_import_with_config(
Ok(event_import_output(report))
}
+pub async fn event_export_with_config(
+ config_path: &str,
+ output_path: &str,
+) -> Result<String, String> {
+ let config =
+ tangle_runtime::load_runtime_config(config_path).map_err(|error| error.to_string())?;
+ let report = tangle_runtime::export_events_to_path(&config, output_path)
+ .await
+ .map_err(|error| error.to_string())?;
+ Ok(event_export_output(report))
+}
+
pub async fn run_with_config(path: &str) -> Result<(), String> {
let config = tangle_runtime::load_runtime_config(path).map_err(|error| error.to_string())?;
let (shutdown, _) = tangle_runtime::GracefulShutdownSignal::new();
@@ -278,10 +334,13 @@ pub async fn run_with_config(path: &str) -> Result<(), String> {
mod tests {
use super::{
PACKAGE_NAME, PACKAGE_VERSION, TangleCliError, TangleCommand, TangleInvocation,
- event_import_output, migrate_output, parse_tangle_command, parse_tangle_invocation,
- require_config_path, require_input_path, usage_output, version_output,
+ event_export_output, event_import_output, migrate_output, parse_tangle_command,
+ parse_tangle_invocation, require_config_path, require_input_path, require_output_path,
+ usage_output, version_output,
+ };
+ use tangle_runtime::{
+ RuntimeEventExportReport, RuntimeEventImportReport, RuntimeMigrationReport,
};
- use tangle_runtime::{RuntimeEventImportReport, RuntimeMigrationReport};
#[test]
fn package_name_is_tangle() {
@@ -302,7 +361,7 @@ mod tests {
fn usage_output_lists_supported_command_model() {
assert_eq!(
usage_output(),
- "usage:\n tangle [--version]\n tangle migrate --config PATH\n tangle run --config PATH\n tangle event import --config PATH --input PATH\n tangle event export --config PATH\n tangle projection rebuild --config PATH"
+ "usage:\n tangle [--version]\n tangle migrate --config PATH\n tangle run --config PATH\n tangle event import --config PATH --input PATH\n tangle event export --config PATH --output PATH\n tangle projection rebuild --config PATH"
);
}
@@ -335,12 +394,40 @@ mod tests {
| TangleCommand::Migrate
| TangleCommand::Run
| TangleCommand::EventImport
+ | TangleCommand::EventExport
)
);
}
}
#[test]
+ fn command_model_parses_export_output_option() {
+ let invocation = parse_tangle_invocation([
+ "event",
+ "export",
+ "--config",
+ "runtime.json",
+ "--output",
+ "events.jsonl",
+ ])
+ .expect("invocation");
+ assert_eq!(invocation.command(), TangleCommand::EventExport);
+ assert_eq!(
+ require_config_path(&invocation).expect("config"),
+ "runtime.json"
+ );
+ assert_eq!(
+ require_output_path(&invocation).expect("output"),
+ "events.jsonl"
+ );
+ assert_eq!(
+ require_output_path(&TangleInvocation::new(TangleCommand::EventExport, None))
+ .expect_err("output"),
+ TangleCliError::MissingOptionValue("--output")
+ );
+ }
+
+ #[test]
fn command_model_parses_common_config_option() {
assert_eq!(
parse_tangle_invocation(["migrate", "--config", "runtime.json"]).expect("invocation"),
@@ -434,6 +521,22 @@ mod tests {
.expect_err("repeated input"),
TangleCliError::RepeatedOption("--input")
);
+ assert_eq!(
+ parse_tangle_invocation(["run", "--output", "events.jsonl"]).expect_err("output"),
+ TangleCliError::UnexpectedArgument {
+ command: "run".to_owned(),
+ argument: "--output".to_owned()
+ }
+ );
+ assert_eq!(
+ parse_tangle_invocation(["event", "export", "--output"]).expect_err("missing output"),
+ TangleCliError::MissingOptionValue("--output")
+ );
+ assert_eq!(
+ parse_tangle_invocation(["event", "export", "--output", "a", "--output", "b"])
+ .expect_err("repeated output"),
+ TangleCliError::RepeatedOption("--output")
+ );
}
#[test]
@@ -451,4 +554,12 @@ mod tests {
"events total: 5\nevents inserted: 2\nevents duplicate: 1\nevents projected: 2\nevents skipped: 2"
);
}
+
+ #[test]
+ fn event_export_output_reports_outcome_counts() {
+ assert_eq!(
+ event_export_output(RuntimeEventExportReport::new(3)),
+ "events exported: 3"
+ );
+ }
}
diff --git a/crates/tangle/src/main.rs b/crates/tangle/src/main.rs
@@ -48,6 +48,16 @@ fn main() -> ExitCode {
ExitCode::from(2)
}
},
+ tangle::TangleCommand::EventExport => match run_event_export(&invocation) {
+ Ok(output) => {
+ println!("{output}");
+ ExitCode::SUCCESS
+ }
+ Err(error) => {
+ eprintln!("{error}");
+ ExitCode::from(2)
+ }
+ },
command => {
eprintln!("command not implemented: {}", command.as_str());
ExitCode::from(2)
@@ -82,3 +92,13 @@ fn run_event_import(invocation: &tangle::TangleInvocation) -> Result<String, Str
.map_err(|error| format!("failed to start runtime: {error}"))?;
runtime.block_on(tangle::event_import_with_config(config_path, input_path))
}
+
+fn run_event_export(invocation: &tangle::TangleInvocation) -> Result<String, String> {
+ let config_path = tangle::require_config_path(invocation).map_err(|error| error.to_string())?;
+ let output_path = tangle::require_output_path(invocation).map_err(|error| error.to_string())?;
+ let runtime = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .map_err(|error| format!("failed to start runtime: {error}"))?;
+ runtime.block_on(tangle::event_export_with_config(config_path, output_path))
+}
diff --git a/crates/tangle/tests/version.rs b/crates/tangle/tests/version.rs
@@ -27,7 +27,7 @@ fn tangle_without_args_reports_usage() {
assert!(output.status.success());
assert_eq!(
String::from_utf8_lossy(&output.stdout),
- "usage:\n tangle [--version]\n tangle migrate --config PATH\n tangle run --config PATH\n tangle event import --config PATH --input PATH\n tangle event export --config PATH\n tangle projection rebuild --config PATH\n"
+ "usage:\n tangle [--version]\n tangle migrate --config PATH\n tangle run --config PATH\n tangle event import --config PATH --input PATH\n tangle event export --config PATH --output PATH\n tangle projection rebuild --config PATH\n"
);
assert!(output.stderr.is_empty());
}
@@ -43,7 +43,7 @@ fn tangle_unknown_arg_reports_usage_error() {
assert!(output.stdout.is_empty());
assert_eq!(
String::from_utf8_lossy(&output.stderr),
- "unknown command: --unknown\nusage:\n tangle [--version]\n tangle migrate --config PATH\n tangle run --config PATH\n tangle event import --config PATH --input PATH\n tangle event export --config PATH\n tangle projection rebuild --config PATH\n"
+ "unknown command: --unknown\nusage:\n tangle [--version]\n tangle migrate --config PATH\n tangle run --config PATH\n tangle event import --config PATH --input PATH\n tangle event export --config PATH --output PATH\n tangle projection rebuild --config PATH\n"
);
}
@@ -102,6 +102,7 @@ async fn tangle_event_import_command_imports_canonical_jsonl() {
let db_path = root.join("db");
let config_path = root.join("runtime.json");
let input_path = root.join("events.jsonl");
+ let output_path = root.join("exported.jsonl");
std::fs::create_dir_all(&root).expect("runtime root");
write_rocksdb_config(&config_path, &db_path, "tangle_cli_import");
std::fs::write(&input_path, format!("{}\n", event_to_value(&listing)))
@@ -137,6 +138,25 @@ async fn tangle_event_import_command_imports_canonical_jsonl() {
);
assert!(second.stderr.is_empty());
+ let export = Command::new(env!("CARGO_BIN_EXE_tangle"))
+ .args(["event", "export", "--config"])
+ .arg(&config_path)
+ .args(["--output"])
+ .arg(&output_path)
+ .output()
+ .expect("run tangle event export");
+
+ assert!(export.status.success());
+ assert_eq!(
+ String::from_utf8_lossy(&export.stdout),
+ "events exported: 1\n"
+ );
+ assert!(export.stderr.is_empty());
+ assert_eq!(
+ std::fs::read_to_string(&output_path).expect("export file"),
+ format!("{}\n", event_to_value(&listing))
+ );
+
let seller = FixtureKey::Seller.public_key();
let listing_key = format!("30402:{}:listing-a", seller.as_str());
let store_config = SurrealConnectionConfig::rocksdb(
@@ -190,15 +210,15 @@ fn tangle_migrate_requires_config_path() {
#[test]
fn tangle_known_future_commands_report_not_implemented() {
let output = Command::new(env!("CARGO_BIN_EXE_tangle"))
- .args(["event", "export"])
+ .args(["projection", "rebuild"])
.output()
- .expect("run tangle event export");
+ .expect("run tangle projection rebuild");
assert_eq!(output.status.code(), Some(2));
assert!(output.stdout.is_empty());
assert_eq!(
String::from_utf8_lossy(&output.stderr),
- "command not implemented: event export\n"
+ "command not implemented: projection rebuild\n"
);
}
diff --git a/crates/tangle_runtime/src/lib.rs b/crates/tangle_runtime/src/lib.rs
@@ -639,6 +639,59 @@ fn event_from_import_line(line: &str, index: usize) -> Result<Event, RuntimeComm
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct RuntimeEventExportReport {
+ exported: u64,
+}
+
+impl RuntimeEventExportReport {
+ pub fn new(exported: u64) -> Self {
+ Self { exported }
+ }
+
+ pub fn exported(self) -> u64 {
+ self.exported
+ }
+}
+
+pub async fn export_events_to_path(
+ config: &TangleRuntimeConfig,
+ path: impl AsRef<FsPath>,
+) -> Result<RuntimeEventExportReport, RuntimeCommandError> {
+ let store = connect_runtime_store(config).await?;
+ store
+ .apply_plan(&base_migration_plan())
+ .await
+ .map_err(|error| RuntimeCommandError::store(error.to_string()))?;
+ let rows = store
+ .query_raw_events(&Filter::empty())
+ .await
+ .map_err(|error| RuntimeCommandError::store(error.to_string()))?;
+ let mut output = String::new();
+ for row in &rows {
+ output.push_str(&runtime_row_string(row, "raw_json")?);
+ output.push('\n');
+ }
+ let path = path.as_ref();
+ fs::write(path, output).map_err(|error| {
+ RuntimeCommandError::input(format!(
+ "failed to write event export file `{}`: {error}",
+ path.display()
+ ))
+ })?;
+ Ok(RuntimeEventExportReport::new(rows.len() as u64))
+}
+
+fn runtime_row_string(
+ row: &serde_json::Value,
+ field: &'static str,
+) -> Result<String, RuntimeCommandError> {
+ row.get(field)
+ .and_then(serde_json::Value::as_str)
+ .map(str::to_owned)
+ .ok_or_else(|| RuntimeCommandError::store(format!("stored row field `{field}` is invalid")))
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RuntimeServerReport {
listen_addr: SocketAddr,
}