commit 26cdd84e134cfdadef4f24d764a9c25be7cd205c
parent ba46c1912551820a6670aaddbcbf18306738be71
Author: triesap <tyson@radroots.org>
Date: Mon, 27 Apr 2026 08:44:26 +0000
cli: implement ndjson output frames
- generate invocation-unique request ids for output envelopes
- render supported ndjson operations as started and terminal frames
- keep unsupported ndjson failures structured in frame output
- add process coverage for frame output and request id fields
Diffstat:
4 files changed, 163 insertions(+), 10 deletions(-)
diff --git a/src/main.rs b/src/main.rs
@@ -17,6 +17,8 @@ mod target_cli;
use std::io::Write;
use std::process::ExitCode;
+use std::sync::atomic::{AtomicU64, Ordering};
+use std::time::{SystemTime, UNIX_EPOCH};
use clap::Parser;
@@ -38,6 +40,8 @@ use crate::runtime::logging::initialize_logging;
use crate::runtime_args::{RuntimeInvocationArgs, RuntimeOutputFormatArg};
use crate::target_cli::{TargetCliArgs, TargetOutputFormat};
+static REQUEST_SEQUENCE: AtomicU64 = AtomicU64::new(0);
+
fn main() -> ExitCode {
match run() {
Ok(exit_code) => exit_code,
@@ -301,7 +305,7 @@ where
let operation_id = request.operation_id().to_owned();
let envelope_context = request
.context
- .envelope_context(format!("req_{}", operation_id.replace('.', "_")));
+ .envelope_context(next_request_id(&operation_id));
match OperationAdapter::new(service)
.execute(request)
.and_then(|result| result.to_envelope(envelope_context.clone()))
@@ -416,7 +420,22 @@ fn failure_envelope(
error.to_output_error(),
request
.context()
- .envelope_context(format!("req_{}", request.operation_id().replace('.', "_"))),
+ .envelope_context(next_request_id(request.operation_id())),
+ )
+}
+
+fn next_request_id(operation_id: &str) -> String {
+ let sequence = REQUEST_SEQUENCE.fetch_add(1, Ordering::Relaxed);
+ let timestamp = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .map(|duration| duration.as_nanos())
+ .unwrap_or_default();
+ format!(
+ "req_{}_{}_{}_{}",
+ operation_id.replace('.', "_"),
+ std::process::id(),
+ timestamp,
+ sequence
)
}
@@ -431,7 +450,11 @@ fn render_envelope(
serde_json::to_writer_pretty(&mut handle, envelope)?;
}
TargetOutputFormat::Ndjson => {
- serde_json::to_writer(&mut handle, envelope)?;
+ for frame in envelope.to_ndjson_frames() {
+ serde_json::to_writer(&mut handle, &frame)?;
+ writeln!(handle)?;
+ }
+ return Ok(());
}
}
writeln!(handle)?;
diff --git a/src/output_contract.rs b/src/output_contract.rs
@@ -1,7 +1,7 @@
#![allow(dead_code)]
use serde::Serialize;
-use serde_json::Value;
+use serde_json::{Value, json};
pub const OUTPUT_SCHEMA_VERSION: &str = "radroots.cli.output.v1";
@@ -92,6 +92,43 @@ impl OutputEnvelope {
next_actions: Vec::new(),
}
}
+
+ pub fn to_ndjson_frames(&self) -> Vec<NdjsonFrame> {
+ let started = NdjsonFrame::new(
+ self.operation_id.clone(),
+ self.request_id.clone(),
+ 0,
+ NdjsonFrameType::Started,
+ json!({
+ "state": "started",
+ "dry_run": self.dry_run,
+ "correlation_id": &self.correlation_id,
+ "idempotency_key": &self.idempotency_key,
+ "actor": &self.actor,
+ }),
+ );
+ let mut terminal = NdjsonFrame::new(
+ self.operation_id.clone(),
+ self.request_id.clone(),
+ 1,
+ if self.errors.is_empty() {
+ NdjsonFrameType::Completed
+ } else {
+ NdjsonFrameType::Error
+ },
+ json!({
+ "result": &self.result,
+ "next_actions": &self.next_actions,
+ "dry_run": self.dry_run,
+ "correlation_id": &self.correlation_id,
+ "idempotency_key": &self.idempotency_key,
+ "actor": &self.actor,
+ }),
+ );
+ terminal.warnings = self.warnings.clone();
+ terminal.errors = self.errors.clone();
+ vec![started, terminal]
+ }
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
diff --git a/tests/support/mod.rs b/tests/support/mod.rs
@@ -29,6 +29,24 @@ pub fn json_from_stdout(output: &Output) -> Value {
})
}
+pub fn ndjson_from_stdout(output: &Output) -> Vec<Value> {
+ let stdout = String::from_utf8_lossy(&output.stdout);
+ let frames = stdout
+ .lines()
+ .filter(|line| !line.trim().is_empty())
+ .map(|line| {
+ serde_json::from_str::<Value>(line).unwrap_or_else(|error| {
+ panic!(
+ "stdout line was not json: {error}; stderr `{}`; line `{line}`; stdout `{stdout}`",
+ String::from_utf8_lossy(&output.stderr)
+ )
+ })
+ })
+ .collect::<Vec<_>>();
+ assert!(!frames.is_empty(), "stdout should contain ndjson frames");
+ frames
+}
+
pub struct RadrootsCliSandbox {
root: TempDir,
}
diff --git a/tests/target_cli.rs b/tests/target_cli.rs
@@ -4,7 +4,7 @@ use serde_json::Value;
use support::{
RadrootsCliSandbox, assert_no_removed_command_reference, create_listing_draft,
- make_listing_publishable, radroots,
+ make_listing_publishable, ndjson_from_stdout, radroots,
};
const LISTING_ADDR: &str =
@@ -171,6 +171,77 @@ fn target_command_outputs_standard_json_envelope() {
}
#[test]
+fn request_ids_are_invocation_unique_and_preserve_caller_fields() {
+ let first = radroots()
+ .args([
+ "--format",
+ "json",
+ "--correlation-id",
+ "corr_test",
+ "--idempotency-key",
+ "idem_test",
+ "workspace",
+ "get",
+ ])
+ .output()
+ .expect("run first workspace get");
+ let second = radroots()
+ .args([
+ "--format",
+ "json",
+ "--correlation-id",
+ "corr_test",
+ "--idempotency-key",
+ "idem_test",
+ "workspace",
+ "get",
+ ])
+ .output()
+ .expect("run second workspace get");
+
+ assert!(first.status.success());
+ assert!(second.status.success());
+ let first: Value = serde_json::from_slice(&first.stdout).expect("first json envelope");
+ let second: Value = serde_json::from_slice(&second.stdout).expect("second json envelope");
+
+ assert_eq!(first["correlation_id"], "corr_test");
+ assert_eq!(first["idempotency_key"], "idem_test");
+ assert_eq!(second["correlation_id"], "corr_test");
+ assert_eq!(second["idempotency_key"], "idem_test");
+ assert!(
+ first["request_id"]
+ .as_str()
+ .expect("first request id")
+ .starts_with("req_workspace_get_")
+ );
+ assert_ne!(first["request_id"], second["request_id"]);
+}
+
+#[test]
+fn supported_ndjson_outputs_started_and_completed_frames() {
+ let sandbox = RadrootsCliSandbox::new();
+ let output = sandbox
+ .command()
+ .args(["--format", "ndjson", "account", "list"])
+ .output()
+ .expect("run account list ndjson");
+
+ assert!(output.status.success());
+ let frames = ndjson_from_stdout(&output);
+
+ assert_eq!(frames.len(), 2);
+ assert_eq!(frames[0]["schema_version"], "radroots.cli.output.v1");
+ assert_eq!(frames[0]["operation_id"], "account.list");
+ assert_eq!(frames[0]["frame_type"], "started");
+ assert_eq!(frames[0]["sequence"], 0);
+ assert_eq!(frames[1]["operation_id"], "account.list");
+ assert_eq!(frames[1]["frame_type"], "completed");
+ assert_eq!(frames[1]["sequence"], 1);
+ assert_eq!(frames[1]["errors"].as_array().expect("errors").len(), 0);
+ assert_eq!(frames[0]["request_id"], frames[1]["request_id"]);
+}
+
+#[test]
fn unsupported_ndjson_returns_structured_invalid_input() {
let output = radroots()
.args(["--format", "ndjson", "workspace", "get"])
@@ -178,11 +249,15 @@ fn unsupported_ndjson_returns_structured_invalid_input() {
.expect("run workspace get ndjson");
assert_eq!(output.status.code(), Some(2));
- let value: Value = serde_json::from_slice(&output.stdout).expect("json envelope");
-
- assert_eq!(value["operation_id"], "workspace.get");
- assert_eq!(value["errors"][0]["code"], "invalid_input");
- assert_eq!(value["errors"][0]["exit_code"], 2);
+ let frames = ndjson_from_stdout(&output);
+
+ assert_eq!(frames.len(), 2);
+ assert_eq!(frames[0]["operation_id"], "workspace.get");
+ assert_eq!(frames[0]["frame_type"], "started");
+ assert_eq!(frames[1]["operation_id"], "workspace.get");
+ assert_eq!(frames[1]["frame_type"], "error");
+ assert_eq!(frames[1]["errors"][0]["code"], "invalid_input");
+ assert_eq!(frames[1]["errors"][0]["exit_code"], 2);
}
#[test]