cli

Command-line interface for Radroots
git clone https://radroots.dev/git/cli.git
Log | Files | Refs | README | LICENSE

commit 060130e8975a1d676870ab949433cc7c27fb04c4
parent c5a3e754be55bcbdcb695047071bea119a244a25
Author: triesap <tyson@radroots.org>
Date:   Tue,  7 Apr 2026 08:04:50 +0000

land durable order submit watch history and narrowed cancel

Diffstat:
Msrc/cli.rs | 86++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------
Msrc/commands/mod.rs | 14++++----------
Msrc/commands/order.rs | 69++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
Msrc/domain/runtime.rs | 150+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Msrc/render/mod.rs | 236++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
Msrc/runtime/daemon.rs | 37+++++++++++++++++++++++++++++++++++++
Msrc/runtime/order.rs | 678++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
Mtests/order.rs | 476++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
8 files changed, 1702 insertions(+), 44 deletions(-)

diff --git a/src/cli.rs b/src/cli.rs @@ -106,7 +106,7 @@ impl Command { OrderCommand::New(_) => "order new", OrderCommand::Get(_) => "order get", OrderCommand::Ls => "order ls", - OrderCommand::Submit => "order submit", + OrderCommand::Submit(_) => "order submit", OrderCommand::Watch(_) => "order watch", OrderCommand::Cancel(_) => "order cancel", OrderCommand::History => "order history", @@ -146,7 +146,7 @@ impl Command { }) | Self::Rpc(RpcArgs { command: RpcCommand::Sessions, }) | Self::Order(OrderArgs { - command: OrderCommand::Ls | OrderCommand::History, + command: OrderCommand::Ls | OrderCommand::Watch(_) | OrderCommand::History, }) | Self::Sync(SyncArgs { command: SyncCommand::Watch(_), }) | Self::Find(_) @@ -166,7 +166,7 @@ impl Command { }) | Self::Listing(ListingArgs { command: ListingCommand::New(_), }) | Self::Order(OrderArgs { - command: OrderCommand::New(_) | OrderCommand::Submit | OrderCommand::Cancel(_), + command: OrderCommand::New(_) | OrderCommand::Cancel(_), }) ) } @@ -400,8 +400,8 @@ pub enum OrderCommand { New(OrderNewArgs), Get(RecordKeyArgs), Ls, - Submit, - Watch(RecordKeyArgs), + Submit(OrderSubmitArgs), + Watch(OrderWatchArgs), Cancel(RecordKeyArgs), History, } @@ -419,6 +419,22 @@ pub struct OrderNewArgs { } #[derive(Debug, Clone, Args)] +pub struct OrderSubmitArgs { + pub key: String, + #[arg(long)] + pub idempotency_key: Option<String>, +} + +#[derive(Debug, Clone, Args)] +pub struct OrderWatchArgs { + pub key: String, + #[arg(long)] + pub frames: Option<usize>, + #[arg(long, default_value_t = 1_000)] + pub interval_ms: u64, +} + +#[derive(Debug, Clone, Args)] pub struct RecordKeyArgs { pub key: String, } @@ -427,8 +443,8 @@ pub struct RecordKeyArgs { mod tests { use super::{ AccountCommand, CliArgs, Command, ConfigCommand, JobCommand, JobWatchArgs, ListingCommand, - LocalCommand, LocalExportFormatArg, MycCommand, NetCommand, OrderCommand, RelayCommand, - RpcCommand, SignerCommand, SyncCommand, SyncWatchArgs, + LocalCommand, LocalExportFormatArg, MycCommand, NetCommand, OrderCommand, OrderWatchArgs, + RelayCommand, RpcCommand, SignerCommand, SyncCommand, SyncWatchArgs, }; use crate::runtime::config::OutputFormat; use clap::Parser; @@ -819,6 +835,51 @@ mod tests { }, _ => panic!("unexpected command variant"), } + + let order_submit = CliArgs::parse_from([ + "radroots", + "order", + "submit", + "ord_demo", + "--idempotency-key", + "submit-1", + ]); + match order_submit.command { + Command::Order(args) => match args.command { + OrderCommand::Submit(submit) => { + assert_eq!(submit.key, "ord_demo"); + assert_eq!(submit.idempotency_key.as_deref(), Some("submit-1")); + } + _ => panic!("unexpected order subcommand"), + }, + _ => panic!("unexpected command variant"), + } + + let order_watch = CliArgs::parse_from([ + "radroots", + "order", + "watch", + "ord_demo", + "--frames", + "3", + "--interval-ms", + "25", + ]); + match order_watch.command { + Command::Order(args) => match args.command { + OrderCommand::Watch(OrderWatchArgs { + key, + frames, + interval_ms, + }) => { + assert_eq!(key, "ord_demo"); + assert_eq!(frames, Some(3)); + assert_eq!(interval_ms, 25); + } + _ => panic!("unexpected order subcommand"), + }, + _ => panic!("unexpected command variant"), + } } #[test] @@ -854,5 +915,16 @@ mod tests { .command .supports_output_format(OutputFormat::Ndjson) ); + + let order_watch = CliArgs::parse_from(["radroots", "order", "watch", "ord_demo"]); + assert!( + order_watch + .command + .supports_output_format(OutputFormat::Ndjson) + ); + + let order_submit = CliArgs::parse_from(["radroots", "order", "submit", "ord_demo"]); + assert_eq!(order_submit.command.display_name(), "order submit"); + assert!(order_submit.command.supports_dry_run()); } } diff --git a/src/commands/mod.rs b/src/commands/mod.rs @@ -77,10 +77,10 @@ pub fn dispatch( OrderCommand::New(args) => order::new(config, args), OrderCommand::Get(args) => order::get(config, args), OrderCommand::Ls => order::list(config), - OrderCommand::Submit => unimplemented_command("order submit"), - OrderCommand::Watch(_) => unimplemented_command("order watch"), - OrderCommand::Cancel(_) => unimplemented_command("order cancel"), - OrderCommand::History => unimplemented_command("order history"), + OrderCommand::Submit(args) => order::submit(config, args), + OrderCommand::Watch(args) => order::watch(config, args), + OrderCommand::Cancel(args) => order::cancel(config, args), + OrderCommand::History => order::history(config), }, Command::Relay(relay) => match &relay.command { RelayCommand::Ls => Ok(relay::list(config)), @@ -97,9 +97,3 @@ pub fn dispatch( }, } } - -fn unimplemented_command(name: &str) -> Result<CommandOutput, RuntimeError> { - Err(RuntimeError::Config(format!( - "`{name}` is not implemented yet" - ))) -} diff --git a/src/commands/order.rs b/src/commands/order.rs @@ -1,4 +1,4 @@ -use crate::cli::{OrderNewArgs, RecordKeyArgs}; +use crate::cli::{OrderNewArgs, OrderSubmitArgs, OrderWatchArgs, RecordKeyArgs}; use crate::domain::runtime::{CommandDisposition, CommandOutput, CommandView}; use crate::runtime::RuntimeError; use crate::runtime::config::RuntimeConfig; @@ -50,3 +50,70 @@ pub fn list(config: &RuntimeConfig) -> Result<CommandOutput, RuntimeError> { } }) } + +pub fn submit( + config: &RuntimeConfig, + args: &OrderSubmitArgs, +) -> Result<CommandOutput, RuntimeError> { + let view = crate::runtime::order::submit(config, args)?; + Ok(match view.disposition() { + CommandDisposition::Success => CommandOutput::success(CommandView::OrderSubmit(view)), + CommandDisposition::Unconfigured => { + CommandOutput::unconfigured(CommandView::OrderSubmit(view)) + } + CommandDisposition::ExternalUnavailable => { + CommandOutput::external_unavailable(CommandView::OrderSubmit(view)) + } + CommandDisposition::InternalError => { + CommandOutput::internal_error(CommandView::OrderSubmit(view)) + } + }) +} + +pub fn watch(config: &RuntimeConfig, args: &OrderWatchArgs) -> Result<CommandOutput, RuntimeError> { + let view = crate::runtime::order::watch(config, args)?; + Ok(match view.disposition() { + CommandDisposition::Success => CommandOutput::success(CommandView::OrderWatch(view)), + CommandDisposition::Unconfigured => { + CommandOutput::unconfigured(CommandView::OrderWatch(view)) + } + CommandDisposition::ExternalUnavailable => { + CommandOutput::external_unavailable(CommandView::OrderWatch(view)) + } + CommandDisposition::InternalError => { + CommandOutput::internal_error(CommandView::OrderWatch(view)) + } + }) +} + +pub fn cancel(config: &RuntimeConfig, args: &RecordKeyArgs) -> Result<CommandOutput, RuntimeError> { + let view = crate::runtime::order::cancel(config, args)?; + Ok(match view.disposition() { + CommandDisposition::Success => CommandOutput::success(CommandView::OrderCancel(view)), + CommandDisposition::Unconfigured => { + CommandOutput::unconfigured(CommandView::OrderCancel(view)) + } + CommandDisposition::ExternalUnavailable => { + CommandOutput::external_unavailable(CommandView::OrderCancel(view)) + } + CommandDisposition::InternalError => { + CommandOutput::internal_error(CommandView::OrderCancel(view)) + } + }) +} + +pub fn history(config: &RuntimeConfig) -> Result<CommandOutput, RuntimeError> { + let view = crate::runtime::order::history(config)?; + Ok(match view.disposition() { + CommandDisposition::Success => CommandOutput::success(CommandView::OrderHistory(view)), + CommandDisposition::Unconfigured => { + CommandOutput::unconfigured(CommandView::OrderHistory(view)) + } + CommandDisposition::ExternalUnavailable => { + CommandOutput::external_unavailable(CommandView::OrderHistory(view)) + } + CommandDisposition::InternalError => { + CommandOutput::internal_error(CommandView::OrderHistory(view)) + } + }) +} diff --git a/src/domain/runtime.rs b/src/domain/runtime.rs @@ -88,9 +88,13 @@ pub enum CommandView { LocalStatus(LocalStatusView), MycStatus(MycStatusView), NetStatus(NetStatusView), + OrderCancel(OrderCancelView), OrderGet(OrderGetView), + OrderHistory(OrderHistoryView), OrderList(OrderListView), OrderNew(OrderNewView), + OrderSubmit(OrderSubmitView), + OrderWatch(OrderWatchView), RpcSessions(RpcSessionsView), RpcStatus(RpcStatusView), RelayList(RelayListView), @@ -559,6 +563,152 @@ impl OrderListView { } #[derive(Debug, Clone, Serialize)] +pub struct OrderSubmitView { + pub state: String, + pub source: String, + pub order_id: String, + pub file: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub listing_lookup: Option<String>, + #[serde(skip_serializing_if = "Option::is_none")] + pub listing_addr: Option<String>, + #[serde(skip_serializing_if = "Option::is_none")] + pub buyer_account_id: Option<String>, + #[serde(skip_serializing_if = "Option::is_none")] + pub buyer_pubkey: Option<String>, + #[serde(skip_serializing_if = "Option::is_none")] + pub seller_pubkey: Option<String>, + #[serde(default)] + pub dry_run: bool, + #[serde(default)] + pub deduplicated: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub idempotency_key: Option<String>, + #[serde(skip_serializing_if = "Option::is_none")] + pub reason: Option<String>, + #[serde(skip_serializing_if = "Option::is_none")] + pub job: Option<OrderJobView>, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub issues: Vec<OrderIssueView>, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub actions: Vec<String>, +} + +impl OrderSubmitView { + pub fn disposition(&self) -> CommandDisposition { + match self.state.as_str() { + "unconfigured" => CommandDisposition::Unconfigured, + "unavailable" => CommandDisposition::ExternalUnavailable, + "error" => CommandDisposition::InternalError, + _ => CommandDisposition::Success, + } + } +} + +#[derive(Debug, Clone, Serialize)] +pub struct OrderWatchView { + pub state: String, + pub source: String, + pub order_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub job_id: Option<String>, + pub interval_ms: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub reason: Option<String>, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub frames: Vec<OrderWatchFrameView>, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub actions: Vec<String>, +} + +impl OrderWatchView { + pub fn disposition(&self) -> CommandDisposition { + match self.state.as_str() { + "unconfigured" => CommandDisposition::Unconfigured, + "unavailable" => CommandDisposition::ExternalUnavailable, + "error" => CommandDisposition::InternalError, + _ => CommandDisposition::Success, + } + } +} + +#[derive(Debug, Clone, Serialize)] +pub struct OrderWatchFrameView { + pub sequence: usize, + pub observed_at_unix: u64, + pub state: String, + pub terminal: bool, + pub summary: String, +} + +#[derive(Debug, Clone, Serialize)] +pub struct OrderHistoryView { + pub state: String, + pub source: String, + pub count: usize, + #[serde(skip_serializing_if = "Option::is_none")] + pub reason: Option<String>, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub orders: Vec<OrderHistoryEntryView>, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub actions: Vec<String>, +} + +impl OrderHistoryView { + pub fn disposition(&self) -> CommandDisposition { + match self.state.as_str() { + "error" => CommandDisposition::InternalError, + _ => CommandDisposition::Success, + } + } +} + +#[derive(Debug, Clone, Serialize)] +pub struct OrderHistoryEntryView { + pub id: String, + pub state: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub listing_lookup: Option<String>, + #[serde(skip_serializing_if = "Option::is_none")] + pub listing_addr: Option<String>, + #[serde(skip_serializing_if = "Option::is_none")] + pub buyer_account_id: Option<String>, + #[serde(skip_serializing_if = "Option::is_none")] + pub submitted_at_unix: Option<u64>, + pub updated_at_unix: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub job: Option<OrderJobView>, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub issues: Vec<OrderIssueView>, +} + +#[derive(Debug, Clone, Serialize)] +pub struct OrderCancelView { + pub state: String, + pub source: String, + pub lookup: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub order_id: Option<String>, + #[serde(skip_serializing_if = "Option::is_none")] + pub reason: Option<String>, + #[serde(skip_serializing_if = "Option::is_none")] + pub job: Option<OrderJobView>, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub actions: Vec<String>, +} + +impl OrderCancelView { + pub fn disposition(&self) -> CommandDisposition { + match self.state.as_str() { + "unconfigured" => CommandDisposition::Unconfigured, + "unavailable" => CommandDisposition::ExternalUnavailable, + "error" => CommandDisposition::InternalError, + _ => CommandDisposition::Success, + } + } +} + +#[derive(Debug, Clone, Serialize)] pub struct OrderSummaryView { pub id: String, pub state: String, diff --git a/src/render/mod.rs b/src/render/mod.rs @@ -4,9 +4,9 @@ use crate::domain::runtime::{ AccountListView, AccountSummaryView, CommandOutput, CommandView, DoctorCheckView, DoctorView, FindView, JobGetView, JobListView, JobWatchView, ListingGetView, ListingMutationView, ListingNewView, ListingValidateView, LocalBackupView, LocalExportView, LocalInitView, - LocalStatusView, NetStatusView, OrderDraftItemView, OrderGetView, OrderJobView, OrderListView, - OrderNewView, RelayListView, RpcSessionsView, RpcStatusView, SyncActionView, SyncStatusView, - SyncWatchView, + LocalStatusView, NetStatusView, OrderCancelView, OrderDraftItemView, OrderGetView, + OrderHistoryView, OrderJobView, OrderListView, OrderNewView, OrderSubmitView, OrderWatchView, + RelayListView, RpcSessionsView, RpcStatusView, SyncActionView, SyncStatusView, SyncWatchView, }; use crate::runtime::RuntimeError; use crate::runtime::config::{OutputConfig, OutputFormat}; @@ -79,15 +79,27 @@ fn render_human_to(stdout: &mut dyn Write, output: &CommandOutput) -> Result<(), CommandView::NetStatus(view) => { render_net_status(stdout, view)?; } + CommandView::OrderCancel(view) => { + render_order_cancel(stdout, view)?; + } CommandView::OrderGet(view) => { render_order_get(stdout, view)?; } + CommandView::OrderHistory(view) => { + render_order_history(stdout, view)?; + } CommandView::OrderList(view) => { render_order_list(stdout, view)?; } CommandView::OrderNew(view) => { render_order_new(stdout, view)?; } + CommandView::OrderSubmit(view) => { + render_order_submit(stdout, view)?; + } + CommandView::OrderWatch(view) => { + render_order_watch(stdout, view)?; + } CommandView::RpcSessions(view) => { render_rpc_sessions(stdout, view)?; } @@ -218,10 +230,18 @@ fn render_json_to(stdout: &mut dyn Write, output: &CommandOutput) -> Result<(), serde_json::to_writer_pretty(&mut *stdout, view)?; writeln!(stdout)?; } + CommandView::OrderCancel(view) => { + serde_json::to_writer_pretty(&mut *stdout, view)?; + writeln!(stdout)?; + } CommandView::OrderGet(view) => { serde_json::to_writer_pretty(&mut *stdout, view)?; writeln!(stdout)?; } + CommandView::OrderHistory(view) => { + serde_json::to_writer_pretty(&mut *stdout, view)?; + writeln!(stdout)?; + } CommandView::OrderList(view) => { serde_json::to_writer_pretty(&mut *stdout, view)?; writeln!(stdout)?; @@ -230,6 +250,14 @@ fn render_json_to(stdout: &mut dyn Write, output: &CommandOutput) -> Result<(), serde_json::to_writer_pretty(&mut *stdout, view)?; writeln!(stdout)?; } + CommandView::OrderSubmit(view) => { + serde_json::to_writer_pretty(&mut *stdout, view)?; + writeln!(stdout)?; + } + CommandView::OrderWatch(view) => { + serde_json::to_writer_pretty(&mut *stdout, view)?; + writeln!(stdout)?; + } CommandView::RpcSessions(view) => { serde_json::to_writer_pretty(&mut *stdout, view)?; writeln!(stdout)?; @@ -364,6 +392,13 @@ fn render_ndjson_to(stdout: &mut dyn Write, output: &CommandOutput) -> Result<() } Ok(()) } + CommandView::OrderHistory(view) => { + for order in &view.orders { + serde_json::to_writer(&mut *stdout, order)?; + writeln!(stdout)?; + } + Ok(()) + } CommandView::OrderList(view) => { for order in &view.orders { serde_json::to_writer(&mut *stdout, order)?; @@ -371,6 +406,13 @@ fn render_ndjson_to(stdout: &mut dyn Write, output: &CommandOutput) -> Result<() } Ok(()) } + CommandView::OrderWatch(view) => { + for frame in &view.frames { + serde_json::to_writer(&mut *stdout, frame)?; + writeln!(stdout)?; + } + Ok(()) + } CommandView::RpcSessions(view) => { for session in &view.sessions { serde_json::to_writer(&mut *stdout, session)?; @@ -893,6 +935,190 @@ fn render_order_list(stdout: &mut dyn Write, view: &OrderListView) -> Result<(), Ok(()) } +fn render_order_submit(stdout: &mut dyn Write, view: &OrderSubmitView) -> Result<(), RuntimeError> { + let context = match view.state.as_str() { + "missing" => format!("order · {} missing", view.order_id), + "already_submitted" => format!("order · {} already submitted", view.order_id), + "unconfigured" => format!("order · {} not ready", view.order_id), + "unavailable" => format!("order · {} submit unavailable", view.order_id), + "error" => format!("order · {} error", view.order_id), + "dry_run" => format!("order · {} dry run", view.order_id), + "deduplicated" => format!("order · {} deduplicated", view.order_id), + _ => format!("order · {} submitted", view.order_id), + }; + write_context(stdout, context.as_str())?; + + let mut rows = vec![ + ("order id", view.order_id.as_str()), + ("file", view.file.as_str()), + ]; + if let Some(listing_lookup) = &view.listing_lookup { + rows.push(("listing", listing_lookup.as_str())); + } + if let Some(listing_addr) = &view.listing_addr { + rows.push(("listing addr", listing_addr.as_str())); + } + if let Some(account_id) = &view.buyer_account_id { + rows.push(("buyer account", account_id.as_str())); + } + if let Some(buyer_pubkey) = &view.buyer_pubkey { + rows.push(("buyer pubkey", buyer_pubkey.as_str())); + } + if let Some(seller_pubkey) = &view.seller_pubkey { + rows.push(("seller pubkey", seller_pubkey.as_str())); + } + if view.dry_run { + rows.push(("dry run", yes_no(true))); + } + if view.deduplicated { + rows.push(("deduplicated", yes_no(true))); + } + if let Some(idempotency_key) = &view.idempotency_key { + rows.push(("idempotency key", idempotency_key.as_str())); + } + render_pairs(stdout, "order", rows.as_slice())?; + if let Some(job) = &view.job { + render_order_job(stdout, job)?; + } + render_order_issues(stdout, &view.issues)?; + if let Some(reason) = &view.reason { + writeln!(stdout, "reason: {reason}")?; + } + writeln!(stdout, "source: {}", view.source)?; + render_actions(stdout, &view.actions)?; + Ok(()) +} + +fn render_order_watch(stdout: &mut dyn Write, view: &OrderWatchView) -> Result<(), RuntimeError> { + let context = match view.state.as_str() { + "missing" => format!("order · {} watch missing", view.order_id), + "not_submitted" => format!("order · {} not submitted", view.order_id), + "unconfigured" => format!("order · {} watch unconfigured", view.order_id), + "unavailable" => format!("order · {} watch unavailable", view.order_id), + "error" => format!("order · {} watch error", view.order_id), + "watching" => format!("order · {} watching", view.order_id), + _ => format!("order · {} {}", view.order_id, view.state), + }; + write_context(stdout, context.as_str())?; + + let interval = format!("{} ms", view.interval_ms); + let mut rows = vec![("order id", view.order_id.clone()), ("interval", interval)]; + if let Some(job_id) = &view.job_id { + rows.push(("job id", job_id.clone())); + } + render_owned_pairs(stdout, "watch", rows.as_slice())?; + if !view.frames.is_empty() { + let table = Table { + headers: &["frame", "time", "state", "terminal", "summary"], + rows: view + .frames + .iter() + .map(|frame| { + vec![ + frame.sequence.to_string(), + crate::runtime::job::format_clock(frame.observed_at_unix), + frame.state.clone(), + yes_no(frame.terminal).to_owned(), + frame.summary.clone(), + ] + }) + .collect(), + }; + render_table(stdout, &table)?; + writeln!(stdout)?; + } + if let Some(reason) = &view.reason { + writeln!(stdout, "reason: {reason}")?; + } + writeln!(stdout, "source: {}", view.source)?; + render_actions(stdout, &view.actions)?; + Ok(()) +} + +fn render_order_history( + stdout: &mut dyn Write, + view: &OrderHistoryView, +) -> Result<(), RuntimeError> { + let context = match view.state.as_str() { + "empty" => "order history · no submitted orders".to_owned(), + _ => format!( + "order history · {} submitted order{}", + view.count, + if view.count == 1 { "" } else { "s" } + ), + }; + write_context(stdout, context.as_str())?; + if view.orders.is_empty() { + if let Some(reason) = &view.reason { + writeln!(stdout, "{reason}")?; + writeln!(stdout)?; + } + } else { + let table = Table { + headers: &["order", "listing", "state", "job", "submitted", "updated"], + rows: view + .orders + .iter() + .map(|order| { + vec![ + order.id.clone(), + order + .listing_lookup + .clone() + .or_else(|| order.listing_addr.clone()) + .unwrap_or_default(), + order.state.clone(), + order + .job + .as_ref() + .map(|job| job.job_id.clone()) + .unwrap_or_default(), + order + .submitted_at_unix + .map(crate::runtime::job::format_timestamp) + .unwrap_or_default(), + crate::runtime::job::format_timestamp(order.updated_at_unix), + ] + }) + .collect(), + }; + render_table(stdout, &table)?; + writeln!(stdout)?; + if let Some(reason) = &view.reason { + writeln!(stdout, "note: {reason}")?; + } + } + writeln!(stdout, "source: {}", view.source)?; + render_actions(stdout, &view.actions)?; + Ok(()) +} + +fn render_order_cancel(stdout: &mut dyn Write, view: &OrderCancelView) -> Result<(), RuntimeError> { + let context = match view.state.as_str() { + "missing" => format!("order · {} missing", view.lookup), + "not_submitted" => format!("order · {} not submitted", view.lookup), + "unconfigured" => format!("order · {} cancel unavailable", view.lookup), + "unavailable" => format!("order · {} cancel unavailable", view.lookup), + "error" => format!("order · {} cancel error", view.lookup), + _ => format!("order · {} cancel", view.lookup), + }; + write_context(stdout, context.as_str())?; + let mut rows = vec![("lookup", view.lookup.as_str())]; + if let Some(order_id) = &view.order_id { + rows.push(("order id", order_id.as_str())); + } + render_pairs(stdout, "order", rows.as_slice())?; + if let Some(job) = &view.job { + render_order_job(stdout, job)?; + } + if let Some(reason) = &view.reason { + writeln!(stdout, "reason: {reason}")?; + } + writeln!(stdout, "source: {}", view.source)?; + render_actions(stdout, &view.actions)?; + Ok(()) +} + fn render_order_items( stdout: &mut dyn Write, items: &[OrderDraftItemView], @@ -1775,9 +2001,13 @@ fn human_command_name(view: &CommandView) -> &'static str { CommandView::LocalStatus(_) => "local status", CommandView::MycStatus(_) => "myc status", CommandView::NetStatus(_) => "net status", + CommandView::OrderCancel(_) => "order cancel", CommandView::OrderGet(_) => "order get", + CommandView::OrderHistory(_) => "order history", CommandView::OrderList(_) => "order ls", CommandView::OrderNew(_) => "order new", + CommandView::OrderSubmit(_) => "order submit", + CommandView::OrderWatch(_) => "order watch", CommandView::RpcSessions(_) => "rpc sessions", CommandView::RpcStatus(_) => "rpc status", CommandView::RelayList(_) => "relay ls", diff --git a/src/runtime/daemon.rs b/src/runtime/daemon.rs @@ -1,6 +1,7 @@ use std::time::Duration; use radroots_events::listing::RadrootsListing; +use radroots_events::trade::RadrootsTradeOrder; use reqwest::blocking::Client; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; @@ -136,6 +137,17 @@ pub struct BridgeListingPublishResult { pub event_addr: Option<String>, } +#[derive(Debug, Clone)] +pub struct BridgeOrderRequestResult { + pub deduplicated: bool, + pub job_id: String, + pub idempotency_key: Option<String>, + pub status: String, + pub signer_mode: String, + pub event_id: Option<String>, + pub event_addr: Option<String>, +} + pub fn status(config: &RuntimeConfig) -> CommandOutput { match bridge_status(config) { Ok(status) => CommandOutput::success(CommandView::RpcStatus(RpcStatusView { @@ -385,6 +397,31 @@ pub fn bridge_listing_publish( }) } +pub fn bridge_order_request( + config: &RuntimeConfig, + order: &RadrootsTradeOrder, + idempotency_key: Option<&str>, +) -> Result<BridgeOrderRequestResult, DaemonRpcError> { + let response: BridgePublishResponseRemote = call( + config, + "bridge.order.request", + Some(serde_json::json!({ + "order": order, + "idempotency_key": idempotency_key, + })), + RpcAuthMode::BridgeBearer, + )?; + Ok(BridgeOrderRequestResult { + deduplicated: response.deduplicated, + job_id: response.job.job_id, + idempotency_key: response.job.idempotency_key, + status: response.job.status, + signer_mode: response.job.signer_mode, + event_id: response.job.event_id, + event_addr: response.job.event_addr, + }) +} + fn bridge_status(config: &RuntimeConfig) -> Result<BridgeStatusRemote, DaemonRpcError> { call(config, "bridge.status", None, RpcAuthMode::BridgeBearer) } diff --git a/src/runtime/order.rs b/src/runtime/order.rs @@ -1,17 +1,20 @@ use std::fs; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU64, Ordering}; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::thread; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use radroots_events::kinds::KIND_LISTING; +use radroots_events::trade::{RadrootsTradeOrder, RadrootsTradeOrderItem}; use radroots_events_codec::d_tag::is_d_tag_base64url; use radroots_events_codec::trade::RadrootsTradeListingAddress; use serde::{Deserialize, Serialize}; -use crate::cli::{OrderNewArgs, RecordKeyArgs}; +use crate::cli::{OrderNewArgs, OrderSubmitArgs, OrderWatchArgs, RecordKeyArgs}; use crate::domain::runtime::{ - OrderDraftItemView, OrderGetView, OrderIssueView, OrderJobView, OrderListView, OrderNewView, - OrderSummaryView, + OrderCancelView, OrderDraftItemView, OrderGetView, OrderHistoryEntryView, OrderHistoryView, + OrderIssueView, OrderJobView, OrderListView, OrderNewView, OrderSubmitView, OrderSummaryView, + OrderWatchFrameView, OrderWatchView, }; use crate::runtime::RuntimeError; use crate::runtime::accounts; @@ -20,6 +23,7 @@ use crate::runtime::daemon::{self, DaemonRpcError}; const ORDER_DRAFT_KIND: &str = "order_draft_v1"; const ORDER_SOURCE: &str = "local order drafts · local first"; +const ORDER_LIFECYCLE_SOURCE: &str = "local order drafts · durable job lifecycle"; const ORDERS_DIR: &str = "orders/drafts"; static ORDER_COUNTER: AtomicU64 = AtomicU64::new(0); @@ -63,6 +67,18 @@ struct OrderDraftItem { #[serde(deny_unknown_fields)] struct OrderDraftSubmission { job_id: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + state: Option<String>, + #[serde(default, skip_serializing_if = "Option::is_none")] + signer_mode: Option<String>, + #[serde(default, skip_serializing_if = "Option::is_none")] + command: Option<String>, + #[serde(default, skip_serializing_if = "Option::is_none")] + event_id: Option<String>, + #[serde(default, skip_serializing_if = "Option::is_none")] + event_addr: Option<String>, + #[serde(default, skip_serializing_if = "Option::is_none")] + submitted_at_unix: Option<u64>, } #[derive(Debug, Clone)] @@ -119,7 +135,7 @@ pub fn scaffold(config: &RuntimeConfig, args: &OrderNewArgs) -> Result<OrderNewV buyer_account_id, submission: None, }; - fs::write(&file, scaffold_contents(&document)?)?; + save_draft(file.as_path(), &document)?; let mut view: OrderNewView = view_from_loaded( config, @@ -243,6 +259,459 @@ pub fn list(config: &RuntimeConfig) -> Result<OrderListView, RuntimeError> { }) } +pub fn submit( + config: &RuntimeConfig, + args: &OrderSubmitArgs, +) -> Result<OrderSubmitView, RuntimeError> { + let file = draft_lookup_path(config, args.key.as_str()); + if !file.exists() { + return Ok(OrderSubmitView { + state: "missing".to_owned(), + source: ORDER_LIFECYCLE_SOURCE.to_owned(), + order_id: args.key.clone(), + file: file.display().to_string(), + listing_lookup: None, + listing_addr: None, + buyer_account_id: None, + buyer_pubkey: None, + seller_pubkey: None, + dry_run: false, + deduplicated: false, + idempotency_key: args.idempotency_key.clone(), + reason: Some(format!("order draft `{}` was not found", args.key)), + job: None, + issues: Vec::new(), + actions: vec![ + "radroots order ls".to_owned(), + "radroots order new".to_owned(), + ], + }); + } + + let loaded = match load_draft(file.as_path()) { + Ok(loaded) => loaded, + Err(reason) => { + return Ok(OrderSubmitView { + state: "error".to_owned(), + source: ORDER_LIFECYCLE_SOURCE.to_owned(), + order_id: args.key.clone(), + file: file.display().to_string(), + listing_lookup: None, + listing_addr: None, + buyer_account_id: None, + buyer_pubkey: None, + seller_pubkey: None, + dry_run: false, + deduplicated: false, + idempotency_key: args.idempotency_key.clone(), + reason: Some(reason), + job: None, + issues: Vec::new(), + actions: Vec::new(), + }); + } + }; + + if let Some(job) = submission_job_view(config, &loaded.document, true) { + let mut actions = vec![ + format!("radroots order watch {}", loaded.document.order.order_id), + format!("radroots order history"), + ]; + actions.push(format!("radroots job get {}", job.job_id)); + return Ok(OrderSubmitView { + state: "already_submitted".to_owned(), + source: ORDER_LIFECYCLE_SOURCE.to_owned(), + order_id: loaded.document.order.order_id.clone(), + file: loaded.file.display().to_string(), + listing_lookup: loaded.document.listing_lookup.clone(), + listing_addr: non_empty_string(loaded.document.order.listing_addr.clone()), + buyer_account_id: loaded.document.buyer_account_id.clone(), + buyer_pubkey: non_empty_string(loaded.document.order.buyer_pubkey.clone()), + seller_pubkey: non_empty_string(loaded.document.order.seller_pubkey.clone()), + dry_run: false, + deduplicated: false, + idempotency_key: args.idempotency_key.clone(), + reason: Some("order draft already has a recorded submission job".to_owned()), + job: Some(job), + issues: Vec::new(), + actions, + }); + } + + let issues = collect_issues(&loaded.document); + if !issues.is_empty() { + let mut actions = actions_for_document(&loaded.document, loaded.file.as_path(), &issues); + actions.push(format!( + "radroots order get {}", + loaded.document.order.order_id + )); + return Ok(OrderSubmitView { + state: "unconfigured".to_owned(), + source: ORDER_LIFECYCLE_SOURCE.to_owned(), + order_id: loaded.document.order.order_id.clone(), + file: loaded.file.display().to_string(), + listing_lookup: loaded.document.listing_lookup.clone(), + listing_addr: non_empty_string(loaded.document.order.listing_addr.clone()), + buyer_account_id: loaded.document.buyer_account_id.clone(), + buyer_pubkey: non_empty_string(loaded.document.order.buyer_pubkey.clone()), + seller_pubkey: non_empty_string(loaded.document.order.seller_pubkey.clone()), + dry_run: false, + deduplicated: false, + idempotency_key: args.idempotency_key.clone(), + reason: Some("order draft is not ready for durable submit".to_owned()), + job: None, + issues, + actions, + }); + } + + if config.output.dry_run { + return Ok(OrderSubmitView { + state: "dry_run".to_owned(), + source: ORDER_LIFECYCLE_SOURCE.to_owned(), + order_id: loaded.document.order.order_id.clone(), + file: loaded.file.display().to_string(), + listing_lookup: loaded.document.listing_lookup.clone(), + listing_addr: non_empty_string(loaded.document.order.listing_addr.clone()), + buyer_account_id: loaded.document.buyer_account_id.clone(), + buyer_pubkey: non_empty_string(loaded.document.order.buyer_pubkey.clone()), + seller_pubkey: non_empty_string(loaded.document.order.seller_pubkey.clone()), + dry_run: true, + deduplicated: false, + idempotency_key: args.idempotency_key.clone(), + reason: Some("dry run requested; daemon order submission skipped".to_owned()), + job: Some(OrderJobView { + job_id: "not_submitted".to_owned(), + state: "not_submitted".to_owned(), + command: Some("order.submit".to_owned()), + event_id: None, + event_addr: None, + reason: None, + }), + issues: Vec::new(), + actions: vec![format!( + "radroots order submit {}", + loaded.document.order.order_id + )], + }); + } + + let order = trade_order_from_document(&loaded.document); + match daemon::bridge_order_request(config, &order, args.idempotency_key.as_deref()) { + Ok(result) => { + let mut updated = loaded.document.clone(); + updated.submission = Some(OrderDraftSubmission { + job_id: result.job_id.clone(), + state: Some(result.status.clone()), + signer_mode: Some(result.signer_mode.clone()), + command: Some("order.submit".to_owned()), + event_id: result.event_id.clone(), + event_addr: result.event_addr.clone(), + submitted_at_unix: Some(now_unix()), + }); + save_draft(loaded.file.as_path(), &updated)?; + + let failed = result.status == "failed"; + let mut actions = Vec::new(); + if failed { + actions.push(format!("radroots job get {}", result.job_id)); + actions.push("radroots rpc status".to_owned()); + actions.push("radroots order history".to_owned()); + } else { + actions.push(format!("radroots order watch {}", updated.order.order_id)); + actions.push(format!("radroots job get {}", result.job_id)); + actions.push("radroots order history".to_owned()); + } + + Ok(OrderSubmitView { + state: if failed { + "unavailable".to_owned() + } else if result.deduplicated { + "deduplicated".to_owned() + } else { + result.status.clone() + }, + source: daemon::bridge_source().to_owned(), + order_id: updated.order.order_id.clone(), + file: loaded.file.display().to_string(), + listing_lookup: updated.listing_lookup.clone(), + listing_addr: non_empty_string(updated.order.listing_addr.clone()), + buyer_account_id: updated.buyer_account_id.clone(), + buyer_pubkey: non_empty_string(updated.order.buyer_pubkey.clone()), + seller_pubkey: non_empty_string(updated.order.seller_pubkey.clone()), + dry_run: false, + deduplicated: result.deduplicated, + idempotency_key: result.idempotency_key.clone(), + reason: failed.then(|| { + "daemon order request failed before relay delivery completed".to_owned() + }), + job: Some(OrderJobView { + job_id: result.job_id, + state: result.status, + command: Some("order.submit".to_owned()), + event_id: result.event_id, + event_addr: result.event_addr, + reason: None, + }), + issues: Vec::new(), + actions, + }) + } + Err(error) => Ok(order_submit_error_view(&loaded, args, error)), + } +} + +pub fn watch( + config: &RuntimeConfig, + args: &OrderWatchArgs, +) -> Result<OrderWatchView, RuntimeError> { + if args.frames == Some(0) { + return Err(RuntimeError::Config( + "--frames must be greater than zero when provided".to_owned(), + )); + } + + let file = draft_lookup_path(config, args.key.as_str()); + if !file.exists() { + return Ok(OrderWatchView { + state: "missing".to_owned(), + source: ORDER_LIFECYCLE_SOURCE.to_owned(), + order_id: args.key.clone(), + job_id: None, + interval_ms: args.interval_ms, + reason: Some(format!("order draft `{}` was not found", args.key)), + frames: Vec::new(), + actions: vec!["radroots order ls".to_owned()], + }); + } + + let loaded = match load_draft(file.as_path()) { + Ok(loaded) => loaded, + Err(reason) => { + return Ok(OrderWatchView { + state: "error".to_owned(), + source: ORDER_LIFECYCLE_SOURCE.to_owned(), + order_id: args.key.clone(), + job_id: None, + interval_ms: args.interval_ms, + reason: Some(reason), + frames: Vec::new(), + actions: Vec::new(), + }); + } + }; + + let Some(submission) = loaded.document.submission.as_ref() else { + return Ok(OrderWatchView { + state: "not_submitted".to_owned(), + source: ORDER_LIFECYCLE_SOURCE.to_owned(), + order_id: loaded.document.order.order_id.clone(), + job_id: None, + interval_ms: args.interval_ms, + reason: Some("order draft does not have a recorded submission job yet".to_owned()), + frames: Vec::new(), + actions: vec![format!( + "radroots order submit {}", + loaded.document.order.order_id + )], + }); + }; + + let job_id = submission.job_id.clone(); + let max_frames = args.frames.unwrap_or(usize::MAX); + let mut frames = Vec::new(); + loop { + match daemon::bridge_job(config, job_id.as_str()) { + Ok(Some(job)) => { + frames.push(OrderWatchFrameView { + sequence: frames.len() + 1, + observed_at_unix: job.completed_at_unix.unwrap_or(job.requested_at_unix), + state: job.state.clone(), + terminal: job.terminal, + summary: job.relay_outcome_summary.clone(), + }); + if job.terminal || frames.len() >= max_frames { + return Ok(OrderWatchView { + state: if job.terminal { + job.state + } else { + "watching".to_owned() + }, + source: ORDER_LIFECYCLE_SOURCE.to_owned(), + order_id: loaded.document.order.order_id.clone(), + job_id: Some(job_id.clone()), + interval_ms: args.interval_ms, + reason: None, + frames, + actions: vec!["radroots order history".to_owned()], + }); + } + } + Ok(None) => { + return Ok(OrderWatchView { + state: "missing".to_owned(), + source: ORDER_LIFECYCLE_SOURCE.to_owned(), + order_id: loaded.document.order.order_id.clone(), + job_id: Some(job_id.clone()), + interval_ms: args.interval_ms, + reason: Some("recorded job id was not found in radrootsd".to_owned()), + frames, + actions: vec!["radroots order history".to_owned()], + }); + } + Err(error) => return Ok(order_watch_error_view(&loaded, args, job_id, frames, error)), + } + + thread::sleep(Duration::from_millis(args.interval_ms)); + } +} + +pub fn history(config: &RuntimeConfig) -> Result<OrderHistoryView, RuntimeError> { + let dir = drafts_dir(config); + if !dir.exists() { + return Ok(OrderHistoryView { + state: "empty".to_owned(), + source: ORDER_LIFECYCLE_SOURCE.to_owned(), + count: 0, + reason: Some("no submitted order drafts recorded yet".to_owned()), + orders: Vec::new(), + actions: vec!["radroots order ls".to_owned()], + }); + } + + let mut orders = Vec::new(); + let mut invalid_count = 0usize; + for entry in fs::read_dir(&dir)? { + let entry = entry?; + let path = entry.path(); + if path.extension().and_then(|value| value.to_str()) != Some("toml") { + continue; + } + match load_draft(path.as_path()) { + Ok(loaded) => { + if loaded.document.submission.is_some() { + orders.push(history_entry_from_loaded(config, &loaded)); + } + } + Err(_) => { + invalid_count += 1; + } + } + } + + orders.sort_by(|left, right| { + right + .submitted_at_unix + .unwrap_or(right.updated_at_unix) + .cmp(&left.submitted_at_unix.unwrap_or(left.updated_at_unix)) + .then_with(|| left.id.cmp(&right.id)) + }); + + let state = if orders.is_empty() { + "empty" + } else if invalid_count > 0 + || orders + .iter() + .any(|order| matches!(order.state.as_str(), "error" | "unavailable")) + { + "degraded" + } else { + "ready" + }; + + let reason = if orders.is_empty() { + Some("no submitted order drafts recorded yet".to_owned()) + } else if invalid_count > 0 { + Some(format!( + "{invalid_count} invalid order draft file{} skipped while building history", + if invalid_count == 1 { "" } else { "s" } + )) + } else { + None + }; + + Ok(OrderHistoryView { + state: state.to_owned(), + source: ORDER_LIFECYCLE_SOURCE.to_owned(), + count: orders.len(), + reason, + orders, + actions: if state == "empty" { + vec!["radroots order new".to_owned()] + } else { + Vec::new() + }, + }) +} + +pub fn cancel( + config: &RuntimeConfig, + args: &RecordKeyArgs, +) -> Result<OrderCancelView, RuntimeError> { + let file = draft_lookup_path(config, args.key.as_str()); + if !file.exists() { + return Ok(OrderCancelView { + state: "missing".to_owned(), + source: ORDER_LIFECYCLE_SOURCE.to_owned(), + lookup: args.key.clone(), + order_id: None, + reason: Some(format!("order draft `{}` was not found", args.key)), + job: None, + actions: vec!["radroots order ls".to_owned()], + }); + } + + let loaded = match load_draft(file.as_path()) { + Ok(loaded) => loaded, + Err(reason) => { + return Ok(OrderCancelView { + state: "error".to_owned(), + source: ORDER_LIFECYCLE_SOURCE.to_owned(), + lookup: args.key.clone(), + order_id: None, + reason: Some(reason), + job: None, + actions: Vec::new(), + }); + } + }; + + let Some(job) = submission_job_view(config, &loaded.document, false) else { + return Ok(OrderCancelView { + state: "not_submitted".to_owned(), + source: ORDER_LIFECYCLE_SOURCE.to_owned(), + lookup: args.key.clone(), + order_id: Some(loaded.document.order.order_id.clone()), + reason: Some("order draft has not been submitted yet".to_owned()), + job: None, + actions: vec![format!( + "radroots order submit {}", + loaded.document.order.order_id + )], + }); + }; + + let job_id = loaded + .document + .submission + .as_ref() + .map(|submission| submission.job_id.clone()); + Ok(OrderCancelView { + state: "unconfigured".to_owned(), + source: ORDER_LIFECYCLE_SOURCE.to_owned(), + lookup: args.key.clone(), + order_id: Some(loaded.document.order.order_id.clone()), + reason: Some( + "durable order cancel needs trade-chain root and previous event refs that the current local order read plane does not persist yet".to_owned(), + ), + job: Some(job), + actions: vec![ + "radroots order history".to_owned(), + format!("radroots job get {}", job_id.unwrap_or_default()), + ], + }) +} + fn validate_scaffold_args(args: &OrderNewArgs) -> Result<(), RuntimeError> { match (normalize_optional(args.bin_id.as_deref()), args.bin_count) { (None, Some(_)) => Err(RuntimeError::Config( @@ -273,6 +742,7 @@ fn view_from_loaded( actions_for_document(&loaded.document, loaded.file.as_path(), issues.as_slice()); if let Some(job) = &job { actions.push(format!("radroots job get {}", job.job_id)); + actions.push("radroots order history".to_owned()); } OrderGetView { @@ -330,6 +800,32 @@ fn summary_from_loaded(config: &RuntimeConfig, loaded: &LoadedOrderDraft) -> Ord } } +fn history_entry_from_loaded( + config: &RuntimeConfig, + loaded: &LoadedOrderDraft, +) -> OrderHistoryEntryView { + let job = submission_job_view(config, &loaded.document, true); + let submitted_at_unix = loaded + .document + .submission + .as_ref() + .and_then(|submission| submission.submitted_at_unix); + OrderHistoryEntryView { + id: loaded.document.order.order_id.clone(), + state: job + .as_ref() + .map(|job| job.state.clone()) + .unwrap_or_else(|| "recorded".to_owned()), + listing_lookup: loaded.document.listing_lookup.clone(), + listing_addr: non_empty_string(loaded.document.order.listing_addr.clone()), + buyer_account_id: loaded.document.buyer_account_id.clone(), + submitted_at_unix, + updated_at_unix: loaded.updated_at_unix, + job, + issues: Vec::new(), + } +} + fn summary_for_invalid_file(path: &Path, reason: String) -> OrderSummaryView { let id = path .file_stem() @@ -498,19 +994,10 @@ fn submission_job_view( document: &OrderDraftDocument, enrich: bool, ) -> Option<OrderJobView> { - let job_id = document - .submission - .as_ref() - .and_then(|submission| normalize_optional(Some(submission.job_id.as_str())))?; + let submission = document.submission.as_ref()?; + let job_id = normalize_optional(Some(submission.job_id.as_str()))?; if !enrich || config.rpc.bridge_bearer_token.is_none() { - return Some(OrderJobView { - job_id, - state: "recorded".to_owned(), - command: None, - event_id: None, - event_addr: None, - reason: None, - }); + return Some(recorded_job_view(submission, job_id)); } match daemon::bridge_job(config, job_id.as_str()) { @@ -525,15 +1012,29 @@ fn submission_job_view( Ok(None) => Some(OrderJobView { job_id, state: "missing".to_owned(), - command: None, - event_id: None, - event_addr: None, + command: submission.command.clone(), + event_id: submission.event_id.clone(), + event_addr: submission.event_addr.clone(), reason: Some("recorded job id was not found in radrootsd".to_owned()), }), Err(error) => Some(job_view_from_error(job_id, error)), } } +fn recorded_job_view(submission: &OrderDraftSubmission, job_id: String) -> OrderJobView { + OrderJobView { + job_id, + state: submission + .state + .clone() + .unwrap_or_else(|| "recorded".to_owned()), + command: submission.command.clone(), + event_id: submission.event_id.clone(), + event_addr: submission.event_addr.clone(), + reason: None, + } +} + fn job_view_from_error(job_id: String, error: DaemonRpcError) -> OrderJobView { match error { DaemonRpcError::Unconfigured(reason) @@ -567,6 +1068,119 @@ fn job_view_from_error(job_id: String, error: DaemonRpcError) -> OrderJobView { } } +fn order_submit_error_view( + loaded: &LoadedOrderDraft, + args: &OrderSubmitArgs, + error: DaemonRpcError, +) -> OrderSubmitView { + let (state, reason, mut actions) = match error { + DaemonRpcError::Unconfigured(reason) + | DaemonRpcError::Unauthorized(reason) + | DaemonRpcError::MethodUnavailable(reason) => ( + "unconfigured".to_owned(), + reason, + vec![ + "set RADROOTS_RPC_BEARER_TOKEN in .env or your shell".to_owned(), + "start radrootsd with bridge ingress enabled".to_owned(), + ], + ), + DaemonRpcError::External(reason) => ( + "unavailable".to_owned(), + reason, + vec!["start radrootsd and verify the rpc url".to_owned()], + ), + DaemonRpcError::InvalidResponse(reason) + | DaemonRpcError::Remote(reason) + | DaemonRpcError::UnknownJob(reason) => ("error".to_owned(), reason, Vec::new()), + }; + actions.push(format!( + "radroots order get {}", + loaded.document.order.order_id + )); + + OrderSubmitView { + state, + source: daemon::bridge_source().to_owned(), + order_id: loaded.document.order.order_id.clone(), + file: loaded.file.display().to_string(), + listing_lookup: loaded.document.listing_lookup.clone(), + listing_addr: non_empty_string(loaded.document.order.listing_addr.clone()), + buyer_account_id: loaded.document.buyer_account_id.clone(), + buyer_pubkey: non_empty_string(loaded.document.order.buyer_pubkey.clone()), + seller_pubkey: non_empty_string(loaded.document.order.seller_pubkey.clone()), + dry_run: false, + deduplicated: false, + idempotency_key: args.idempotency_key.clone(), + reason: Some(reason), + job: None, + issues: Vec::new(), + actions, + } +} + +fn order_watch_error_view( + loaded: &LoadedOrderDraft, + args: &OrderWatchArgs, + job_id: String, + frames: Vec<OrderWatchFrameView>, + error: DaemonRpcError, +) -> OrderWatchView { + let (state, reason, actions) = match error { + DaemonRpcError::Unconfigured(reason) + | DaemonRpcError::Unauthorized(reason) + | DaemonRpcError::MethodUnavailable(reason) => ( + "unconfigured".to_owned(), + reason, + vec![ + "set RADROOTS_RPC_BEARER_TOKEN in .env or your shell".to_owned(), + "start radrootsd with bridge ingress enabled".to_owned(), + ], + ), + DaemonRpcError::External(reason) => ( + "unavailable".to_owned(), + reason, + vec!["start radrootsd and verify the rpc url".to_owned()], + ), + DaemonRpcError::InvalidResponse(reason) + | DaemonRpcError::Remote(reason) + | DaemonRpcError::UnknownJob(reason) => ("error".to_owned(), reason, Vec::new()), + }; + + OrderWatchView { + state, + source: ORDER_LIFECYCLE_SOURCE.to_owned(), + order_id: loaded.document.order.order_id.clone(), + job_id: Some(job_id), + interval_ms: args.interval_ms, + reason: Some(reason), + frames, + actions: if actions.is_empty() { + Vec::new() + } else { + actions + }, + } +} + +fn trade_order_from_document(document: &OrderDraftDocument) -> RadrootsTradeOrder { + RadrootsTradeOrder { + order_id: document.order.order_id.clone(), + listing_addr: document.order.listing_addr.clone(), + buyer_pubkey: document.order.buyer_pubkey.clone(), + seller_pubkey: document.order.seller_pubkey.clone(), + items: document + .order + .items + .iter() + .map(|item| RadrootsTradeOrderItem { + bin_id: item.bin_id.clone(), + bin_count: item.bin_count, + }) + .collect(), + discounts: None, + } +} + fn load_draft(path: &Path) -> Result<LoadedOrderDraft, String> { let contents = fs::read_to_string(path) .map_err(|error| format!("read order draft {}: {error}", path.display()))?; @@ -579,6 +1193,14 @@ fn load_draft(path: &Path) -> Result<LoadedOrderDraft, String> { }) } +fn save_draft(path: &Path, draft: &OrderDraftDocument) -> Result<(), RuntimeError> { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent)?; + } + fs::write(path, scaffold_contents(draft)?)?; + Ok(()) +} + fn scaffold_contents(draft: &OrderDraftDocument) -> Result<String, RuntimeError> { let toml = toml::to_string_pretty(draft) .map_err(|error| RuntimeError::Config(format!("render order draft: {error}")))?; @@ -727,7 +1349,10 @@ impl From<OrderGetView> for OrderNewView { #[cfg(test)] mod tests { - use super::{ORDER_DRAFT_KIND, OrderDraft, OrderDraftDocument, OrderDraftItem, next_order_id}; + use super::{ + ORDER_DRAFT_KIND, OrderDraft, OrderDraftDocument, OrderDraftItem, OrderDraftSubmission, + next_order_id, + }; #[test] fn generated_order_id_uses_stable_prefix() { @@ -753,11 +1378,20 @@ mod tests { }, listing_lookup: Some("fresh-eggs".to_owned()), buyer_account_id: Some("acct_demo".to_owned()), - submission: None, + submission: Some(OrderDraftSubmission { + job_id: "job_01".to_owned(), + state: Some("accepted".to_owned()), + signer_mode: Some("embedded_service_identity".to_owned()), + command: Some("order.submit".to_owned()), + event_id: None, + event_addr: None, + submitted_at_unix: Some(1), + }), }; let rendered = toml::to_string_pretty(&document).expect("render draft"); assert!(rendered.contains("kind = \"order_draft_v1\"")); assert!(rendered.contains("order_id = \"ord_AAAAAAAAAAAAAAAAAAAAAg\"")); + assert!(rendered.contains("job_id = \"job_01\"")); } } diff --git a/tests/order.rs b/tests/order.rs @@ -1,7 +1,12 @@ use std::fs; +use std::io::{Read, Write}; +use std::net::{TcpListener, TcpStream}; use std::path::Path; use std::process::Command; -use std::sync::{Mutex, MutexGuard, OnceLock}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex, MutexGuard, OnceLock}; +use std::thread::{self, JoinHandle}; +use std::time::Duration; use assert_cmd::prelude::*; use serde_json::Value; @@ -40,6 +45,220 @@ fn order_test_guard() -> MutexGuard<'static, ()> { .expect("order test lock") } +#[derive(Debug, Clone)] +struct MockRpcRequest { + method: String, + auth_header: Option<String>, +} + +#[derive(Debug, Clone)] +struct MockRpcResponse { + body: Value, +} + +impl MockRpcResponse { + fn success(result: Value) -> Self { + Self { + body: serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "result": result, + }), + } + } +} + +struct MockRpcServer { + address: String, + shutdown: Arc<AtomicBool>, + handle: Option<JoinHandle<()>>, +} + +impl MockRpcServer { + fn start<F>(handler: F) -> Self + where + F: Fn(String, Option<String>) -> MockRpcResponse + Send + Sync + 'static, + { + let listener = TcpListener::bind("127.0.0.1:0").expect("bind mock rpc listener"); + listener + .set_nonblocking(true) + .expect("set mock rpc listener nonblocking"); + let address = listener + .local_addr() + .expect("mock rpc local addr") + .to_string(); + let shutdown = Arc::new(AtomicBool::new(false)); + let shutdown_flag = Arc::clone(&shutdown); + let handler: Arc<dyn Fn(String, Option<String>) -> MockRpcResponse + Send + Sync> = + Arc::new(handler); + let handle = thread::spawn(move || { + while !shutdown_flag.load(Ordering::SeqCst) { + match listener.accept() { + Ok((mut stream, _)) => { + if let Ok(request) = read_request(&mut stream) { + let response = + handler(request.method.clone(), request.auth_header.clone()); + let _ = write_response(&mut stream, &response); + } + } + Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => { + thread::sleep(Duration::from_millis(10)); + } + Err(_) => break, + } + } + }); + + Self { + address, + shutdown, + handle: Some(handle), + } + } + + fn url(&self) -> String { + format!("http://{}", self.address) + } +} + +impl Drop for MockRpcServer { + fn drop(&mut self) { + self.shutdown.store(true, Ordering::SeqCst); + let _ = TcpStream::connect(&self.address); + if let Some(handle) = self.handle.take() { + handle.join().expect("join mock rpc thread"); + } + } +} + +fn read_request(stream: &mut TcpStream) -> Result<MockRpcRequest, String> { + stream + .set_read_timeout(Some(Duration::from_secs(2))) + .map_err(|error| format!("set mock rpc read timeout: {error}"))?; + + let mut buffer = Vec::new(); + let mut chunk = [0_u8; 4096]; + let mut header_end = None; + let mut content_length = 0_usize; + + loop { + let read = stream + .read(&mut chunk) + .map_err(|error| format!("read mock rpc request: {error}"))?; + if read == 0 { + break; + } + buffer.extend_from_slice(&chunk[..read]); + if header_end.is_none() { + header_end = find_subslice(&buffer, b"\r\n\r\n").map(|index| index + 4); + if let Some(end) = header_end { + content_length = parse_content_length(&buffer[..end])?; + if buffer.len() >= end + content_length { + break; + } + } + } else if let Some(end) = header_end { + if buffer.len() >= end + content_length { + break; + } + } + } + + let end = header_end.ok_or_else(|| "mock rpc request did not include headers".to_owned())?; + let headers = std::str::from_utf8(&buffer[..end]) + .map_err(|error| format!("mock rpc headers were not utf-8: {error}"))?; + let auth_header = parse_header(headers, "authorization"); + let body = std::str::from_utf8(&buffer[end..end + content_length]) + .map_err(|error| format!("mock rpc body was not utf-8: {error}"))?; + let envelope: Value = + serde_json::from_str(body).map_err(|error| format!("parse mock rpc body: {error}"))?; + let method = envelope["method"] + .as_str() + .ok_or_else(|| "mock rpc body did not include method".to_owned())? + .to_owned(); + + Ok(MockRpcRequest { + method, + auth_header, + }) +} + +fn parse_content_length(headers: &[u8]) -> Result<usize, String> { + let text = + std::str::from_utf8(headers).map_err(|error| format!("parse mock rpc headers: {error}"))?; + for line in text.lines() { + if let Some((name, value)) = line.split_once(':') { + if name.trim().eq_ignore_ascii_case("content-length") { + return value + .trim() + .parse::<usize>() + .map_err(|error| format!("parse content-length: {error}")); + } + } + } + Ok(0) +} + +fn parse_header(headers: &str, wanted: &str) -> Option<String> { + headers.lines().find_map(|line| { + let (name, value) = line.split_once(':')?; + if name.trim().eq_ignore_ascii_case(wanted) { + Some(value.trim().to_owned()) + } else { + None + } + }) +} + +fn find_subslice(haystack: &[u8], needle: &[u8]) -> Option<usize> { + haystack + .windows(needle.len()) + .position(|window| window == needle) +} + +fn write_response(stream: &mut TcpStream, response: &MockRpcResponse) -> Result<(), String> { + let body = response.body.to_string(); + write!( + stream, + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + body.len(), + body + ) + .map_err(|error| format!("write mock rpc response: {error}"))?; + stream + .flush() + .map_err(|error| format!("flush mock rpc response: {error}")) +} + +fn sample_bridge_job(job_id: &str, state: &str, terminal: bool) -> Value { + serde_json::json!({ + "job_id": job_id, + "command": "bridge.order.request", + "idempotency_key": "order-submit-1", + "status": state, + "terminal": terminal, + "recovered_after_restart": false, + "requested_at_unix": 1_712_720_000, + "completed_at_unix": terminal.then_some(1_712_720_030), + "signer_mode": "embedded_service_identity", + "event_kind": 30420, + "event_id": "evt_order_01", + "event_addr": "30402:deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef:AAAAAAAAAAAAAAAAAAAAAg", + "delivery_policy": "best_effort", + "delivery_quorum": 2, + "relay_count": 2, + "acknowledged_relay_count": if terminal { 2 } else { 1 }, + "required_acknowledged_relay_count": 2, + "attempt_count": if terminal { 2 } else { 1 }, + "relay_outcome_summary": if terminal { "submitted to 2 relays" } else { "awaiting relay quorum" }, + "attempt_summaries": if terminal { + serde_json::json!(["attempt 1: relay.one accepted", "attempt 2: relay.two accepted"]) + } else { + serde_json::json!(["attempt 1: relay.one accepted"]) + } + }) +} + #[test] fn order_new_creates_a_local_draft_with_selected_account_defaults() { let _guard = order_test_guard(); @@ -212,3 +431,258 @@ job_id = "job_order_01" assert_eq!(json["job"]["state"], "recorded"); assert_eq!(json["ready_for_submit"], false); } + +#[test] +fn order_submit_persists_submission_metadata_and_reports_job() { + let _guard = order_test_guard(); + let dir = tempdir().expect("tempdir"); + let requests = Arc::new(Mutex::new(Vec::<MockRpcRequest>::new())); + let recorded = Arc::clone(&requests); + let server = MockRpcServer::start(move |method, auth_header| { + recorded + .lock() + .expect("recorded requests lock") + .push(MockRpcRequest { + method: method.clone(), + auth_header, + }); + match method.as_str() { + "bridge.order.request" => MockRpcResponse::success(serde_json::json!({ + "deduplicated": false, + "job": sample_bridge_job("job_order_01", "accepted", false), + })), + "bridge.job.status" => { + MockRpcResponse::success(sample_bridge_job("job_order_01", "accepted", false)) + } + other => panic!("unexpected mock rpc method {other}"), + } + }); + + let account_output = order_command_in(dir.path()) + .args(["--json", "account", "new"]) + .output() + .expect("run account new"); + assert!(account_output.status.success()); + + let new_output = order_command_in(dir.path()) + .args([ + "--json", + "order", + "new", + "--listing", + "pasture-eggs", + "--listing-addr", + "30402:deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef:AAAAAAAAAAAAAAAAAAAAAg", + "--bin", + "bin-1", + ]) + .output() + .expect("run order new"); + assert!(new_output.status.success()); + let new_json: Value = serde_json::from_slice(new_output.stdout.as_slice()).expect("new json"); + let order_id = new_json["order_id"].as_str().expect("order id"); + let file = new_json["file"].as_str().expect("file"); + + let submit_output = order_command_in(dir.path()) + .env("RADROOTS_RPC_URL", server.url()) + .env("RADROOTS_RPC_BEARER_TOKEN", "test-token") + .args([ + "--json", + "order", + "submit", + order_id, + "--idempotency-key", + "order-submit-1", + ]) + .output() + .expect("run order submit"); + assert!(submit_output.status.success()); + let submit_json: Value = + serde_json::from_slice(submit_output.stdout.as_slice()).expect("submit json"); + assert_eq!(submit_json["state"], "accepted"); + assert_eq!(submit_json["job"]["job_id"], "job_order_01"); + assert_eq!(submit_json["job"]["command"], "order.submit"); + + let contents = fs::read_to_string(file).expect("read updated order draft"); + assert!(contents.contains("job_id = \"job_order_01\"")); + assert!(contents.contains("state = \"accepted\"")); + assert!(contents.contains("command = \"order.submit\"")); + + let recorded_requests = requests.lock().expect("requests lock"); + assert!( + recorded_requests + .iter() + .any(|request| request.method == "bridge.order.request") + ); + assert!( + recorded_requests + .iter() + .any(|request| { request.auth_header.as_deref() == Some("Bearer test-token") }) + ); +} + +#[test] +fn order_watch_reports_job_frames_for_submitted_order() { + let _guard = order_test_guard(); + let dir = tempdir().expect("tempdir"); + let polls = Arc::new(Mutex::new(0usize)); + let watch_polls = Arc::clone(&polls); + let server = MockRpcServer::start(move |method, _auth_header| match method.as_str() { + "bridge.job.status" => { + let mut count = watch_polls.lock().expect("watch polls lock"); + *count += 1; + if *count == 1 { + MockRpcResponse::success(sample_bridge_job("job_watch_01", "accepted", false)) + } else { + MockRpcResponse::success(sample_bridge_job("job_watch_01", "completed", true)) + } + } + other => panic!("unexpected mock rpc method {other}"), + }); + + let drafts_dir = dir.path().join("home/.local/share/radroots/orders/drafts"); + fs::create_dir_all(&drafts_dir).expect("create drafts dir"); + fs::write( + drafts_dir.join("ord_AAAAAAAAAAAAAAAAAAAAAg.toml"), + r#"version = 1 +kind = "order_draft_v1" +listing_lookup = "fresh-eggs" +buyer_account_id = "acct_demo" + +[order] +order_id = "ord_AAAAAAAAAAAAAAAAAAAAAg" +listing_addr = "30402:deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef:AAAAAAAAAAAAAAAAAAAAAg" +buyer_pubkey = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" +seller_pubkey = "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef" + +[[order.items]] +bin_id = "bin-1" +bin_count = 2 + +[submission] +job_id = "job_watch_01" +"#, + ) + .expect("write watch draft"); + + let output = order_command_in(dir.path()) + .env("RADROOTS_RPC_URL", server.url()) + .env("RADROOTS_RPC_BEARER_TOKEN", "watch-token") + .args([ + "--json", + "order", + "watch", + "ord_AAAAAAAAAAAAAAAAAAAAAg", + "--frames", + "2", + "--interval-ms", + "1", + ]) + .output() + .expect("run order watch"); + assert!(output.status.success()); + let json: Value = serde_json::from_slice(output.stdout.as_slice()).expect("watch json"); + assert_eq!(json["state"], "completed"); + assert_eq!(json["frames"].as_array().map(Vec::len), Some(2)); + assert_eq!(json["frames"][0]["state"], "accepted"); + assert_eq!(json["frames"][1]["state"], "completed"); +} + +#[test] +fn order_history_lists_submitted_order_drafts() { + let _guard = order_test_guard(); + let dir = tempdir().expect("tempdir"); + let drafts_dir = dir.path().join("home/.local/share/radroots/orders/drafts"); + fs::create_dir_all(&drafts_dir).expect("create drafts dir"); + fs::write( + drafts_dir.join("ord_AAAAAAAAAAAAAAAAAAAAAg.toml"), + r#"version = 1 +kind = "order_draft_v1" +listing_lookup = "fresh-eggs" +buyer_account_id = "acct_demo" + +[order] +order_id = "ord_AAAAAAAAAAAAAAAAAAAAAg" +listing_addr = "30402:deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef:AAAAAAAAAAAAAAAAAAAAAg" +buyer_pubkey = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" +seller_pubkey = "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef" + +[[order.items]] +bin_id = "bin-1" +bin_count = 2 + +[submission] +job_id = "job_order_01" +state = "accepted" +command = "order.submit" +submitted_at_unix = 1712720000 +"#, + ) + .expect("write history draft"); + + let json_output = order_command_in(dir.path()) + .args(["--json", "order", "history"]) + .output() + .expect("run order history json"); + assert!(json_output.status.success()); + let json: Value = serde_json::from_slice(json_output.stdout.as_slice()).expect("history json"); + assert_eq!(json["count"], 1); + assert_eq!(json["orders"][0]["id"], "ord_AAAAAAAAAAAAAAAAAAAAAg"); + assert_eq!(json["orders"][0]["state"], "accepted"); + + let ndjson_output = order_command_in(dir.path()) + .args(["--ndjson", "order", "history"]) + .output() + .expect("run order history ndjson"); + assert!(ndjson_output.status.success()); + let ndjson = String::from_utf8(ndjson_output.stdout).expect("history ndjson"); + let lines = ndjson.lines().collect::<Vec<_>>(); + assert_eq!(lines.len(), 1); + assert!(lines[0].contains("ord_AAAAAAAAAAAAAAAAAAAAAg")); +} + +#[test] +fn order_cancel_is_truthfully_narrowed_when_trade_chain_state_is_unavailable() { + let _guard = order_test_guard(); + let dir = tempdir().expect("tempdir"); + let drafts_dir = dir.path().join("home/.local/share/radroots/orders/drafts"); + fs::create_dir_all(&drafts_dir).expect("create drafts dir"); + fs::write( + drafts_dir.join("ord_AAAAAAAAAAAAAAAAAAAAAg.toml"), + r#"version = 1 +kind = "order_draft_v1" +listing_lookup = "fresh-eggs" +buyer_account_id = "acct_demo" + +[order] +order_id = "ord_AAAAAAAAAAAAAAAAAAAAAg" +listing_addr = "30402:deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef:AAAAAAAAAAAAAAAAAAAAAg" +buyer_pubkey = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" +seller_pubkey = "deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef" + +[[order.items]] +bin_id = "bin-1" +bin_count = 2 + +[submission] +job_id = "job_order_01" +state = "accepted" +command = "order.submit" +"#, + ) + .expect("write cancel draft"); + + let output = order_command_in(dir.path()) + .args(["--json", "order", "cancel", "ord_AAAAAAAAAAAAAAAAAAAAAg"]) + .output() + .expect("run order cancel"); + assert_eq!(output.status.code(), Some(3)); + let json: Value = serde_json::from_slice(output.stdout.as_slice()).expect("cancel json"); + assert_eq!(json["state"], "unconfigured"); + assert!( + json["reason"] + .as_str() + .expect("cancel reason") + .contains("trade-chain") + ); +}