commit f8f7d29b8300a0567dfe2ba688b132615c0134e2
parent d260c3884bf78adbfb0abcb08011b2be849ce868
Author: triesap <tyson@radroots.org>
Date: Tue, 7 Apr 2026 07:07:23 +0000
land durable listing publish update and archive
Diffstat:
9 files changed, 1084 insertions(+), 20 deletions(-)
diff --git a/src/cli.rs b/src/cli.rs
@@ -164,10 +164,7 @@ impl Command {
}) | Self::Sync(SyncArgs {
command: SyncCommand::Pull | SyncCommand::Push,
}) | Self::Listing(ListingArgs {
- command: ListingCommand::New(_)
- | ListingCommand::Publish(_)
- | ListingCommand::Update(_)
- | ListingCommand::Archive(_),
+ command: ListingCommand::New(_),
}) | Self::Order(OrderArgs {
command: OrderCommand::New | OrderCommand::Submit | OrderCommand::Cancel(_),
})
@@ -331,9 +328,9 @@ pub enum ListingCommand {
New(ListingNewArgs),
Validate(ListingFileArgs),
Get(RecordKeyArgs),
- Publish(ListingFileArgs),
- Update(RecordKeyArgs),
- Archive(RecordKeyArgs),
+ Publish(ListingMutationArgs),
+ Update(ListingMutationArgs),
+ Archive(ListingMutationArgs),
}
#[derive(Debug, Clone, Args, Default)]
@@ -348,6 +345,17 @@ pub struct ListingFileArgs {
}
#[derive(Debug, Clone, Args)]
+pub struct ListingMutationArgs {
+ pub file: PathBuf,
+ #[arg(long)]
+ pub idempotency_key: Option<String>,
+ #[arg(long = "print-job", action = ArgAction::SetTrue)]
+ pub print_job: bool,
+ #[arg(long = "print-event", action = ArgAction::SetTrue)]
+ pub print_event: bool,
+}
+
+#[derive(Debug, Clone, Args)]
pub struct JobArgs {
#[command(subcommand)]
pub command: JobCommand,
@@ -668,6 +676,32 @@ mod tests {
Command::Listing(args) => match args.command {
ListingCommand::Publish(file) => {
assert_eq!(file.file.to_str(), Some("draft.toml"));
+ assert!(file.idempotency_key.is_none());
+ assert!(!file.print_job);
+ assert!(!file.print_event);
+ }
+ _ => panic!("unexpected listing subcommand"),
+ },
+ _ => panic!("unexpected command variant"),
+ }
+
+ let listing_archive = CliArgs::parse_from([
+ "radroots",
+ "listing",
+ "archive",
+ "--idempotency-key",
+ "archive-key",
+ "--print-job",
+ "--print-event",
+ "draft.toml",
+ ]);
+ match listing_archive.command {
+ Command::Listing(args) => match args.command {
+ ListingCommand::Archive(file) => {
+ assert_eq!(file.file.to_str(), Some("draft.toml"));
+ assert_eq!(file.idempotency_key.as_deref(), Some("archive-key"));
+ assert!(file.print_job);
+ assert!(file.print_event);
}
_ => panic!("unexpected listing subcommand"),
},
diff --git a/src/commands/listing.rs b/src/commands/listing.rs
@@ -1,4 +1,4 @@
-use crate::cli::{ListingFileArgs, ListingNewArgs, RecordKeyArgs};
+use crate::cli::{ListingFileArgs, ListingMutationArgs, ListingNewArgs, RecordKeyArgs};
use crate::domain::runtime::{CommandOutput, CommandView};
use crate::runtime::RuntimeError;
use crate::runtime::config::RuntimeConfig;
@@ -34,3 +34,66 @@ pub fn get(config: &RuntimeConfig, args: &RecordKeyArgs) -> Result<CommandOutput
};
Ok(output)
}
+
+pub fn publish(
+ config: &RuntimeConfig,
+ args: &ListingMutationArgs,
+) -> Result<CommandOutput, RuntimeError> {
+ let view = crate::runtime::listing::publish(config, args)?;
+ Ok(match view.disposition() {
+ crate::domain::runtime::CommandDisposition::Success => {
+ CommandOutput::success(CommandView::ListingMutation(view))
+ }
+ crate::domain::runtime::CommandDisposition::Unconfigured => {
+ CommandOutput::unconfigured(CommandView::ListingMutation(view))
+ }
+ crate::domain::runtime::CommandDisposition::ExternalUnavailable => {
+ CommandOutput::external_unavailable(CommandView::ListingMutation(view))
+ }
+ crate::domain::runtime::CommandDisposition::InternalError => {
+ CommandOutput::internal_error(CommandView::ListingMutation(view))
+ }
+ })
+}
+
+pub fn update(
+ config: &RuntimeConfig,
+ args: &ListingMutationArgs,
+) -> Result<CommandOutput, RuntimeError> {
+ let view = crate::runtime::listing::update(config, args)?;
+ Ok(match view.disposition() {
+ crate::domain::runtime::CommandDisposition::Success => {
+ CommandOutput::success(CommandView::ListingMutation(view))
+ }
+ crate::domain::runtime::CommandDisposition::Unconfigured => {
+ CommandOutput::unconfigured(CommandView::ListingMutation(view))
+ }
+ crate::domain::runtime::CommandDisposition::ExternalUnavailable => {
+ CommandOutput::external_unavailable(CommandView::ListingMutation(view))
+ }
+ crate::domain::runtime::CommandDisposition::InternalError => {
+ CommandOutput::internal_error(CommandView::ListingMutation(view))
+ }
+ })
+}
+
+pub fn archive(
+ config: &RuntimeConfig,
+ args: &ListingMutationArgs,
+) -> Result<CommandOutput, RuntimeError> {
+ let view = crate::runtime::listing::archive(config, args)?;
+ Ok(match view.disposition() {
+ crate::domain::runtime::CommandDisposition::Success => {
+ CommandOutput::success(CommandView::ListingMutation(view))
+ }
+ crate::domain::runtime::CommandDisposition::Unconfigured => {
+ CommandOutput::unconfigured(CommandView::ListingMutation(view))
+ }
+ crate::domain::runtime::CommandDisposition::ExternalUnavailable => {
+ CommandOutput::external_unavailable(CommandView::ListingMutation(view))
+ }
+ crate::domain::runtime::CommandDisposition::InternalError => {
+ CommandOutput::internal_error(CommandView::ListingMutation(view))
+ }
+ })
+}
diff --git a/src/commands/mod.rs b/src/commands/mod.rs
@@ -59,9 +59,9 @@ pub fn dispatch(
ListingCommand::New(args) => listing::new(config, args),
ListingCommand::Validate(args) => listing::validate(config, args),
ListingCommand::Get(args) => listing::get(config, args),
- ListingCommand::Publish(_) => unimplemented_command("listing publish"),
- ListingCommand::Update(_) => unimplemented_command("listing update"),
- ListingCommand::Archive(_) => unimplemented_command("listing archive"),
+ ListingCommand::Publish(args) => listing::publish(config, args),
+ ListingCommand::Update(args) => listing::update(config, args),
+ ListingCommand::Archive(args) => listing::archive(config, args),
},
Command::Local(local) => match &local.command {
LocalCommand::Init => local::init(config),
diff --git a/src/domain/runtime.rs b/src/domain/runtime.rs
@@ -79,6 +79,7 @@ pub enum CommandView {
JobList(JobListView),
JobWatch(JobWatchView),
ListingGet(ListingGetView),
+ ListingMutation(ListingMutationView),
ListingNew(ListingNewView),
ListingValidate(ListingValidateView),
LocalBackup(LocalBackupView),
@@ -537,6 +538,76 @@ impl ListingGetView {
}
#[derive(Debug, Clone, Serialize)]
+pub struct ListingMutationView {
+ pub state: String,
+ pub operation: String,
+ pub source: String,
+ pub file: String,
+ pub listing_id: String,
+ pub listing_addr: String,
+ pub seller_pubkey: String,
+ pub event_kind: u32,
+ #[serde(default)]
+ pub dry_run: bool,
+ #[serde(default)]
+ pub deduplicated: bool,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub job_id: Option<String>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub job_status: Option<String>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub signer_mode: Option<String>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub event_id: Option<String>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub event_addr: Option<String>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub idempotency_key: Option<String>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub reason: Option<String>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub job: Option<ListingMutationJobView>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub event: Option<ListingMutationEventView>,
+ #[serde(default, skip_serializing_if = "Vec::is_empty")]
+ pub actions: Vec<String>,
+}
+
+impl ListingMutationView {
+ pub fn disposition(&self) -> CommandDisposition {
+ match self.state.as_str() {
+ "unconfigured" => CommandDisposition::Unconfigured,
+ "unavailable" => CommandDisposition::ExternalUnavailable,
+ "error" => CommandDisposition::InternalError,
+ _ => CommandDisposition::Success,
+ }
+ }
+}
+
+#[derive(Debug, Clone, Serialize)]
+pub struct ListingMutationJobView {
+ pub rpc_method: String,
+ pub state: String,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub job_id: Option<String>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub idempotency_key: Option<String>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub signer_mode: Option<String>,
+}
+
+#[derive(Debug, Clone, Serialize)]
+pub struct ListingMutationEventView {
+ pub kind: u32,
+ pub author: String,
+ pub content: String,
+ pub tags: Vec<Vec<String>>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub event_id: Option<String>,
+ pub event_addr: String,
+}
+
+#[derive(Debug, Clone, Serialize)]
pub struct FindResultView {
pub id: String,
pub product_key: String,
diff --git a/src/render/mod.rs b/src/render/mod.rs
@@ -2,10 +2,10 @@ use std::io::{self, Write};
use crate::domain::runtime::{
AccountListView, AccountSummaryView, CommandOutput, CommandView, DoctorCheckView, DoctorView,
- FindView, JobGetView, JobListView, JobWatchView, ListingGetView, ListingNewView,
- ListingValidateView, LocalBackupView, LocalExportView, LocalInitView, LocalStatusView,
- NetStatusView, RelayListView, RpcSessionsView, RpcStatusView, SyncActionView, SyncStatusView,
- SyncWatchView,
+ FindView, JobGetView, JobListView, JobWatchView, ListingGetView, ListingMutationView,
+ ListingNewView, ListingValidateView, LocalBackupView, LocalExportView, LocalInitView,
+ LocalStatusView, NetStatusView, RelayListView, RpcSessionsView, RpcStatusView, SyncActionView,
+ SyncStatusView, SyncWatchView,
};
use crate::runtime::RuntimeError;
use crate::runtime::config::{OutputConfig, OutputFormat};
@@ -105,6 +105,9 @@ fn render_human_to(stdout: &mut dyn Write, output: &CommandOutput) -> Result<(),
CommandView::ListingGet(view) => {
render_listing_get(stdout, view)?;
}
+ CommandView::ListingMutation(view) => {
+ render_listing_mutation(stdout, view)?;
+ }
CommandView::ListingNew(view) => {
render_listing_new(stdout, view)?;
}
@@ -241,6 +244,10 @@ fn render_json_to(stdout: &mut dyn Write, output: &CommandOutput) -> Result<(),
serde_json::to_writer_pretty(&mut *stdout, view)?;
writeln!(stdout)?;
}
+ CommandView::ListingMutation(view) => {
+ serde_json::to_writer_pretty(&mut *stdout, view)?;
+ writeln!(stdout)?;
+ }
CommandView::ListingNew(view) => {
serde_json::to_writer_pretty(&mut *stdout, view)?;
writeln!(stdout)?;
@@ -861,6 +868,64 @@ fn render_listing_get(stdout: &mut dyn Write, view: &ListingGetView) -> Result<(
Ok(())
}
+fn render_listing_mutation(
+ stdout: &mut dyn Write,
+ view: &ListingMutationView,
+) -> Result<(), RuntimeError> {
+ let context = match view.state.as_str() {
+ "dry_run" => format!("listing · {} dry run", view.operation),
+ "deduplicated" => format!("listing · {} deduplicated", view.operation),
+ "published" => format!("listing · {} completed", view.operation),
+ "failed" | "unavailable" => format!("listing · {} unavailable", view.operation),
+ "unconfigured" => format!("listing · {} unconfigured", view.operation),
+ "error" => format!("listing · {} error", view.operation),
+ other => format!("listing · {} {other}", view.operation),
+ };
+ write_context(stdout, context.as_str())?;
+
+ let mut rows = vec![
+ ("file", view.file.as_str()),
+ ("listing id", view.listing_id.as_str()),
+ ("event addr", view.listing_addr.as_str()),
+ ];
+ if let Some(job_id) = &view.job_id {
+ rows.push(("job id", job_id.as_str()));
+ }
+ if let Some(job_status) = &view.job_status {
+ rows.push(("status", job_status.as_str()));
+ }
+ if let Some(event_id) = &view.event_id {
+ rows.push(("event id", event_id.as_str()));
+ }
+ if let Some(signer_mode) = &view.signer_mode {
+ rows.push(("signer", signer_mode.as_str()));
+ }
+ render_pairs(stdout, "listing", rows.as_slice())?;
+ if let Some(reason) = &view.reason {
+ writeln!(stdout, "reason: {reason}")?;
+ }
+ writeln!(stdout, "source: {}", view.source)?;
+
+ if let Some(job) = &view.job {
+ writeln!(stdout)?;
+ writeln!(stdout, "job preview")?;
+ let job_json = serde_json::to_string_pretty(job)?;
+ for line in job_json.lines() {
+ writeln!(stdout, " {line}")?;
+ }
+ }
+ if let Some(event) = &view.event {
+ writeln!(stdout)?;
+ writeln!(stdout, "event preview")?;
+ let event_json = serde_json::to_string_pretty(event)?;
+ for line in event_json.lines() {
+ writeln!(stdout, " {line}")?;
+ }
+ }
+ render_actions(stdout, &view.actions)?;
+ Ok(())
+}
+
fn render_relay_list(stdout: &mut dyn Write, view: &RelayListView) -> Result<(), RuntimeError> {
write_context(
stdout,
@@ -1462,6 +1527,12 @@ fn human_command_name(view: &CommandView) -> &'static str {
CommandView::JobList(_) => "job ls",
CommandView::JobWatch(_) => "job watch",
CommandView::ListingGet(_) => "listing get",
+ CommandView::ListingMutation(view) => match view.operation.as_str() {
+ "publish" => "listing publish",
+ "update" => "listing update",
+ "archive" => "listing archive",
+ _ => "listing publish",
+ },
CommandView::ListingNew(_) => "listing new",
CommandView::ListingValidate(_) => "listing validate",
CommandView::LocalBackup(_) => "local backup",
diff --git a/src/runtime/daemon.rs b/src/runtime/daemon.rs
@@ -1,5 +1,6 @@
use std::time::Duration;
+use radroots_events::listing::RadrootsListing;
use reqwest::blocking::Client;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
@@ -78,12 +79,16 @@ struct BridgeStatusRemote {
struct BridgeJobRemote {
job_id: String,
command: String,
+ #[serde(default)]
+ idempotency_key: Option<String>,
status: String,
terminal: bool,
recovered_after_restart: bool,
requested_at_unix: u64,
completed_at_unix: Option<u64>,
signer_mode: String,
+ #[serde(default)]
+ event_kind: Option<u32>,
event_id: Option<String>,
event_addr: Option<String>,
delivery_policy: String,
@@ -113,6 +118,24 @@ struct Nip46SessionRemote {
expires_in_secs: Option<u64>,
}
+#[derive(Debug, Clone, Deserialize)]
+struct BridgePublishResponseRemote {
+ deduplicated: bool,
+ job: BridgeJobRemote,
+}
+
+#[derive(Debug, Clone)]
+pub struct BridgeListingPublishResult {
+ pub deduplicated: bool,
+ pub job_id: String,
+ pub idempotency_key: Option<String>,
+ pub status: String,
+ pub signer_mode: String,
+ pub event_kind: Option<u32>,
+ pub event_id: Option<String>,
+ pub event_addr: Option<String>,
+}
+
pub fn status(config: &RuntimeConfig) -> CommandOutput {
match bridge_status(config) {
Ok(status) => CommandOutput::success(CommandView::RpcStatus(RpcStatusView {
@@ -334,6 +357,34 @@ pub fn bridge_job(
}
}
+pub fn bridge_listing_publish(
+ config: &RuntimeConfig,
+ listing: &RadrootsListing,
+ kind: u32,
+ idempotency_key: Option<&str>,
+) -> Result<BridgeListingPublishResult, DaemonRpcError> {
+ let response: BridgePublishResponseRemote = call(
+ config,
+ "bridge.listing.publish",
+ Some(serde_json::json!({
+ "listing": listing,
+ "kind": kind,
+ "idempotency_key": idempotency_key,
+ })),
+ RpcAuthMode::BridgeBearer,
+ )?;
+ Ok(BridgeListingPublishResult {
+ deduplicated: response.deduplicated,
+ job_id: response.job.job_id,
+ idempotency_key: response.job.idempotency_key,
+ status: response.job.status,
+ signer_mode: response.job.signer_mode,
+ event_kind: response.job.event_kind,
+ event_id: response.job.event_id,
+ event_addr: response.job.event_addr,
+ })
+}
+
fn bridge_status(config: &RuntimeConfig) -> Result<BridgeStatusRemote, DaemonRpcError> {
call(config, "bridge.status", None, RpcAuthMode::BridgeBearer)
}
diff --git a/src/runtime/listing.rs b/src/runtime/listing.rs
@@ -8,7 +8,7 @@ use radroots_core::{
RadrootsCoreQuantityPrice, RadrootsCoreUnit,
};
use radroots_events::RadrootsNostrEvent;
-use radroots_events::kinds::KIND_LISTING_DRAFT;
+use radroots_events::kinds::{KIND_LISTING, KIND_LISTING_DRAFT};
use radroots_events::listing::{
RadrootsListing, RadrootsListingAvailability, RadrootsListingBin,
RadrootsListingDeliveryMethod, RadrootsListingFarmRef, RadrootsListingLocation,
@@ -22,19 +22,23 @@ use radroots_trade::listing::validation::validate_listing_event;
use serde::{Deserialize, Serialize};
use serde_json::Value;
-use crate::cli::{ListingFileArgs, ListingNewArgs, RecordKeyArgs};
+use crate::cli::{ListingFileArgs, ListingMutationArgs, ListingNewArgs, RecordKeyArgs};
use crate::domain::runtime::{
- FindPriceView, FindQuantityView, FindResultProvenanceView, ListingGetView, ListingNewView,
+ FindPriceView, FindQuantityView, FindResultProvenanceView, ListingGetView,
+ ListingMutationEventView, ListingMutationJobView, ListingMutationView, ListingNewView,
ListingValidateView, ListingValidationIssueView, SyncFreshnessView,
};
use crate::runtime::RuntimeError;
use crate::runtime::accounts;
use crate::runtime::config::RuntimeConfig;
+use crate::runtime::daemon;
+use crate::runtime::daemon::DaemonRpcError;
use crate::runtime::sync::freshness_from_executor;
const DRAFT_KIND: &str = "listing_draft_v1";
const LISTING_SOURCE: &str = "local draft · local first";
const LISTING_READ_SOURCE: &str = "local replica · local first";
+const LISTING_WRITE_SOURCE: &str = "daemon bridge · durable write plane";
static D_TAG_COUNTER: AtomicU64 = AtomicU64::new(0);
@@ -158,6 +162,23 @@ struct FarmRow {
d_tag: String,
}
+#[derive(Debug, Clone, Copy)]
+pub enum ListingMutationOperation {
+ Publish,
+ Update,
+ Archive,
+}
+
+impl ListingMutationOperation {
+ fn as_str(self) -> &'static str {
+ match self {
+ Self::Publish => "publish",
+ Self::Update => "update",
+ Self::Archive => "archive",
+ }
+ }
+}
+
pub fn scaffold(
config: &RuntimeConfig,
args: &ListingNewArgs,
@@ -430,6 +451,169 @@ pub fn get(config: &RuntimeConfig, args: &RecordKeyArgs) -> Result<ListingGetVie
})
}
+pub fn publish(
+ config: &RuntimeConfig,
+ args: &ListingMutationArgs,
+) -> Result<ListingMutationView, RuntimeError> {
+ mutate(config, args, ListingMutationOperation::Publish)
+}
+
+pub fn update(
+ config: &RuntimeConfig,
+ args: &ListingMutationArgs,
+) -> Result<ListingMutationView, RuntimeError> {
+ mutate(config, args, ListingMutationOperation::Update)
+}
+
+pub fn archive(
+ config: &RuntimeConfig,
+ args: &ListingMutationArgs,
+) -> Result<ListingMutationView, RuntimeError> {
+ mutate(config, args, ListingMutationOperation::Archive)
+}
+
+fn mutate(
+ config: &RuntimeConfig,
+ args: &ListingMutationArgs,
+ operation: ListingMutationOperation,
+) -> Result<ListingMutationView, RuntimeError> {
+ let contents = fs::read_to_string(&args.file)?;
+ let parsed = toml::from_str::<ListingDraftDocument>(&contents).map_err(|error| {
+ RuntimeError::Config(format!(
+ "invalid listing draft {}: {error}",
+ args.file.display()
+ ))
+ })?;
+ let context = validation_context(config)?;
+ let mut canonical = canonicalize_draft(&parsed, &contents, &context).map_err(|issue| {
+ RuntimeError::Config(format!(
+ "invalid listing draft {}: {} ({})",
+ args.file.display(),
+ issue.message,
+ issue.field
+ ))
+ })?;
+
+ if matches!(operation, ListingMutationOperation::Archive) {
+ canonical.listing.availability = Some(RadrootsListingAvailability::Status {
+ status: RadrootsListingStatus::Other {
+ value: "archived".to_owned(),
+ },
+ });
+ }
+
+ let (event_preview, listing_addr) = build_listing_event_preview(&canonical)?;
+
+ if config.output.dry_run {
+ return Ok(ListingMutationView {
+ state: "dry_run".to_owned(),
+ operation: operation.as_str().to_owned(),
+ source: LISTING_WRITE_SOURCE.to_owned(),
+ file: args.file.display().to_string(),
+ listing_id: canonical.listing_id.clone(),
+ listing_addr: listing_addr.clone(),
+ seller_pubkey: canonical.seller_pubkey.clone(),
+ event_kind: KIND_LISTING,
+ dry_run: true,
+ deduplicated: false,
+ job_id: None,
+ job_status: None,
+ signer_mode: None,
+ event_id: None,
+ event_addr: Some(listing_addr.clone()),
+ idempotency_key: args.idempotency_key.clone(),
+ reason: Some("dry run requested; daemon publish skipped".to_owned()),
+ job: args.print_job.then(|| ListingMutationJobView {
+ rpc_method: "bridge.listing.publish".to_owned(),
+ state: "not_submitted".to_owned(),
+ job_id: None,
+ idempotency_key: args.idempotency_key.clone(),
+ signer_mode: Some(config.signer.backend.as_str().to_owned()),
+ }),
+ event: args.print_event.then_some(event_preview),
+ actions: vec![format!(
+ "radroots listing {} {}",
+ operation.as_str(),
+ args.file.display()
+ )],
+ });
+ }
+
+ match daemon::bridge_listing_publish(
+ config,
+ &canonical.listing,
+ KIND_LISTING,
+ args.idempotency_key.as_deref(),
+ ) {
+ Ok(result) => {
+ let failed = result.status == "failed";
+ let mut actions = Vec::new();
+ if failed {
+ if let Some(job_id) = &Some(result.job_id.clone()) {
+ actions.push(format!("radroots job get {job_id}"));
+ }
+ actions.push("radroots rpc status".to_owned());
+ } else {
+ actions.push(format!("radroots job get {}", result.job_id));
+ actions.push(format!("radroots job watch {}", result.job_id));
+ }
+
+ Ok(ListingMutationView {
+ state: if failed {
+ "unavailable".to_owned()
+ } else if result.deduplicated {
+ "deduplicated".to_owned()
+ } else {
+ result.status.clone()
+ },
+ operation: operation.as_str().to_owned(),
+ source: LISTING_WRITE_SOURCE.to_owned(),
+ file: args.file.display().to_string(),
+ listing_id: canonical.listing_id,
+ listing_addr: listing_addr.clone(),
+ seller_pubkey: canonical.seller_pubkey.clone(),
+ event_kind: result.event_kind.unwrap_or(KIND_LISTING),
+ dry_run: false,
+ deduplicated: result.deduplicated,
+ job_id: Some(result.job_id.clone()),
+ job_status: Some(result.status.clone()),
+ signer_mode: Some(result.signer_mode.clone()),
+ event_id: result.event_id.clone(),
+ event_addr: result
+ .event_addr
+ .clone()
+ .or_else(|| Some(listing_addr.clone())),
+ idempotency_key: result.idempotency_key.clone(),
+ reason: failed.then(|| {
+ "daemon publish job failed before relay delivery completed".to_owned()
+ }),
+ job: args.print_job.then(|| ListingMutationJobView {
+ rpc_method: "bridge.listing.publish".to_owned(),
+ state: result.status,
+ job_id: Some(result.job_id),
+ idempotency_key: result.idempotency_key,
+ signer_mode: Some(result.signer_mode),
+ }),
+ event: args.print_event.then(|| ListingMutationEventView {
+ event_id: result.event_id,
+ event_addr: result.event_addr.unwrap_or(listing_addr),
+ ..event_preview
+ }),
+ actions,
+ })
+ }
+ Err(error) => Ok(daemon_error_view(
+ config,
+ args,
+ operation,
+ &canonical,
+ listing_addr,
+ event_preview,
+ error,
+ )),
+ }
+}
+
fn scaffold_contents(draft: &ListingDraftDocument) -> Result<String, RuntimeError> {
let toml = toml::to_string_pretty(draft).map_err(|error| {
RuntimeError::Config(format!("failed to render listing draft: {error}"))
@@ -732,6 +916,139 @@ fn invalid_validation_view(
}
}
+fn build_listing_event_preview(
+ canonical: &CanonicalListingDraft,
+) -> Result<(ListingMutationEventView, String), RuntimeError> {
+ let parts = to_wire_parts_with_kind(&canonical.listing, KIND_LISTING)
+ .map_err(|error| RuntimeError::Config(format!("invalid listing contract: {error}")))?;
+ let event = RadrootsNostrEvent {
+ id: String::new(),
+ author: canonical.seller_pubkey.clone(),
+ created_at: 0,
+ kind: KIND_LISTING,
+ tags: parts.tags.clone(),
+ content: parts.content.clone(),
+ sig: String::new(),
+ };
+ let validated = validate_listing_event(&event)
+ .map_err(|error| RuntimeError::Config(format!("invalid listing contract: {error}")))?;
+ Ok((
+ ListingMutationEventView {
+ kind: KIND_LISTING,
+ author: canonical.seller_pubkey.clone(),
+ content: parts.content,
+ tags: parts.tags,
+ event_id: None,
+ event_addr: validated.listing_addr.clone(),
+ },
+ validated.listing_addr,
+ ))
+}
+
+fn daemon_error_view(
+ config: &RuntimeConfig,
+ args: &ListingMutationArgs,
+ operation: ListingMutationOperation,
+ canonical: &CanonicalListingDraft,
+ listing_addr: String,
+ event_preview: ListingMutationEventView,
+ error: DaemonRpcError,
+) -> ListingMutationView {
+ match error {
+ DaemonRpcError::Unconfigured(reason)
+ | DaemonRpcError::Unauthorized(reason)
+ | DaemonRpcError::MethodUnavailable(reason) => ListingMutationView {
+ state: "unconfigured".to_owned(),
+ operation: operation.as_str().to_owned(),
+ source: LISTING_WRITE_SOURCE.to_owned(),
+ file: args.file.display().to_string(),
+ listing_id: canonical.listing_id.clone(),
+ listing_addr,
+ seller_pubkey: canonical.seller_pubkey.clone(),
+ event_kind: KIND_LISTING,
+ dry_run: false,
+ deduplicated: false,
+ job_id: None,
+ job_status: None,
+ signer_mode: None,
+ event_id: None,
+ event_addr: None,
+ idempotency_key: args.idempotency_key.clone(),
+ reason: Some(reason),
+ job: args.print_job.then(|| ListingMutationJobView {
+ rpc_method: "bridge.listing.publish".to_owned(),
+ state: "unconfigured".to_owned(),
+ job_id: None,
+ idempotency_key: args.idempotency_key.clone(),
+ signer_mode: Some(config.signer.backend.as_str().to_owned()),
+ }),
+ event: args.print_event.then_some(event_preview),
+ actions: vec![
+ "set RADROOTS_RPC_BEARER_TOKEN in .env or your shell".to_owned(),
+ "start radrootsd with bridge ingress enabled".to_owned(),
+ ],
+ },
+ DaemonRpcError::External(reason) => ListingMutationView {
+ state: "unavailable".to_owned(),
+ operation: operation.as_str().to_owned(),
+ source: LISTING_WRITE_SOURCE.to_owned(),
+ file: args.file.display().to_string(),
+ listing_id: canonical.listing_id.clone(),
+ listing_addr,
+ seller_pubkey: canonical.seller_pubkey.clone(),
+ event_kind: KIND_LISTING,
+ dry_run: false,
+ deduplicated: false,
+ job_id: None,
+ job_status: None,
+ signer_mode: None,
+ event_id: None,
+ event_addr: None,
+ idempotency_key: args.idempotency_key.clone(),
+ reason: Some(reason),
+ job: args.print_job.then(|| ListingMutationJobView {
+ rpc_method: "bridge.listing.publish".to_owned(),
+ state: "unavailable".to_owned(),
+ job_id: None,
+ idempotency_key: args.idempotency_key.clone(),
+ signer_mode: Some(config.signer.backend.as_str().to_owned()),
+ }),
+ event: args.print_event.then_some(event_preview),
+ actions: vec!["start radrootsd and verify the rpc url".to_owned()],
+ },
+ DaemonRpcError::InvalidResponse(reason)
+ | DaemonRpcError::Remote(reason)
+ | DaemonRpcError::UnknownJob(reason) => ListingMutationView {
+ state: "error".to_owned(),
+ operation: operation.as_str().to_owned(),
+ source: LISTING_WRITE_SOURCE.to_owned(),
+ file: args.file.display().to_string(),
+ listing_id: canonical.listing_id.clone(),
+ listing_addr,
+ seller_pubkey: canonical.seller_pubkey.clone(),
+ event_kind: KIND_LISTING,
+ dry_run: false,
+ deduplicated: false,
+ job_id: None,
+ job_status: None,
+ signer_mode: None,
+ event_id: None,
+ event_addr: None,
+ idempotency_key: args.idempotency_key.clone(),
+ reason: Some(reason),
+ job: args.print_job.then(|| ListingMutationJobView {
+ rpc_method: "bridge.listing.publish".to_owned(),
+ state: "error".to_owned(),
+ job_id: None,
+ idempotency_key: args.idempotency_key.clone(),
+ signer_mode: Some(config.signer.backend.as_str().to_owned()),
+ }),
+ event: args.print_event.then_some(event_preview),
+ actions: vec!["inspect the daemon rpc response contract".to_owned()],
+ },
+ }
+}
+
fn issue_from_trade_validation(
error: RadrootsTradeListingValidationError,
contents: &str,
diff --git a/tests/job_rpc.rs b/tests/job_rpc.rs
@@ -3,7 +3,7 @@ use std::net::{TcpListener, TcpStream};
use std::path::Path;
use std::process::Command;
use std::sync::atomic::{AtomicBool, Ordering};
-use std::sync::{Arc, Mutex};
+use std::sync::{Arc, Mutex, MutexGuard, OnceLock};
use std::thread::{self, JoinHandle};
use std::time::Duration;
@@ -37,6 +37,13 @@ fn job_rpc_command_in(workdir: &Path) -> Command {
command
}
+fn job_rpc_test_guard() -> MutexGuard<'static, ()> {
+ static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
+ LOCK.get_or_init(|| Mutex::new(()))
+ .lock()
+ .expect("job rpc test lock")
+}
+
#[derive(Debug, Clone)]
struct MockRpcRequest {
method: String,
@@ -298,6 +305,7 @@ fn sample_job(job_id: &str, state: &str, terminal: bool, completed_at_unix: Opti
#[test]
fn rpc_status_reports_bridge_ready_via_daemon_rpc() {
+ let _guard = job_rpc_test_guard();
let requests = Arc::new(Mutex::new(Vec::<MockRpcRequest>::new()));
let recorded = Arc::clone(&requests);
let server = MockRpcServer::start(move |method, auth_header| {
@@ -338,6 +346,7 @@ fn rpc_status_reports_bridge_ready_via_daemon_rpc() {
#[test]
fn rpc_sessions_ndjson_emits_public_session_entries() {
+ let _guard = job_rpc_test_guard();
let requests = Arc::new(Mutex::new(Vec::<MockRpcRequest>::new()));
let recorded = Arc::clone(&requests);
let server = MockRpcServer::start(move |method, auth_header| {
@@ -401,6 +410,7 @@ fn rpc_sessions_ndjson_emits_public_session_entries() {
#[test]
fn job_commands_require_bridge_bearer_token() {
+ let _guard = job_rpc_test_guard();
let requests = Arc::new(Mutex::new(Vec::<MockRpcRequest>::new()));
let recorded = Arc::clone(&requests);
let server = MockRpcServer::start(move |method, auth_header| {
@@ -435,6 +445,7 @@ fn job_commands_require_bridge_bearer_token() {
#[test]
fn job_ls_and_get_report_retained_bridge_jobs() {
+ let _guard = job_rpc_test_guard();
let requests = Arc::new(Mutex::new(Vec::<MockRpcRequest>::new()));
let recorded = Arc::clone(&requests);
let server = MockRpcServer::start(move |method, auth_header| {
@@ -499,6 +510,7 @@ fn job_ls_and_get_report_retained_bridge_jobs() {
#[test]
fn job_watch_ndjson_emits_one_frame_per_poll_until_terminal() {
+ let _guard = job_rpc_test_guard();
let sequence = Arc::new(Mutex::new(0_usize));
let requests = Arc::new(Mutex::new(Vec::<MockRpcRequest>::new()));
let observed = Arc::clone(&requests);
@@ -542,7 +554,7 @@ fn job_watch_ndjson_emits_one_frame_per_poll_until_terminal() {
"--frames",
"3",
"--interval-ms",
- "1",
+ "5",
])
.output()
.expect("run job watch");
diff --git a/tests/listing.rs b/tests/listing.rs
@@ -1,6 +1,12 @@
use std::fs;
+use std::io::{Read, Write};
+use std::net::{TcpListener, TcpStream};
use std::path::Path;
use std::process::Command;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::{Arc, Mutex, MutexGuard, OnceLock};
+use std::thread::{self, JoinHandle};
+use std::time::Duration;
use assert_cmd::prelude::*;
use radroots_sql_core::{SqlExecutor, SqliteExecutor};
@@ -33,8 +39,16 @@ fn cli_command_in(workdir: &Path) -> Command {
command
}
+fn listing_test_guard() -> MutexGuard<'static, ()> {
+ static LOCK: OnceLock<Mutex<()>> = OnceLock::new();
+ LOCK.get_or_init(|| Mutex::new(()))
+ .lock()
+ .expect("listing test lock")
+}
+
#[test]
fn listing_new_scaffolds_a_toml_draft_with_account_and_farm_defaults() {
+ let _guard = listing_test_guard();
let dir = tempdir().expect("tempdir");
let init = cli_command_in(dir.path())
.args(["local", "init"])
@@ -75,6 +89,7 @@ fn listing_new_scaffolds_a_toml_draft_with_account_and_farm_defaults() {
#[test]
fn listing_validate_resolves_selected_account_and_matching_farm() {
+ let _guard = listing_test_guard();
let dir = tempdir().expect("tempdir");
let init = cli_command_in(dir.path())
.args(["local", "init"])
@@ -138,6 +153,7 @@ fn listing_validate_resolves_selected_account_and_matching_farm() {
#[test]
fn listing_validate_reports_invalid_drafts_with_field_lines() {
+ let _guard = listing_test_guard();
let dir = tempdir().expect("tempdir");
let draft_path = dir.path().join("invalid.toml");
fs::write(
@@ -182,6 +198,7 @@ fn listing_validate_reports_invalid_drafts_with_field_lines() {
#[test]
fn listing_get_reads_real_local_rows_and_reports_missing() {
+ let _guard = listing_test_guard();
let dir = tempdir().expect("tempdir");
let init = cli_command_in(dir.path())
.args(["local", "init"])
@@ -233,6 +250,219 @@ fn listing_get_reads_real_local_rows_and_reports_missing() {
assert_eq!(missing_json["state"], "missing");
}
+#[test]
+fn listing_publish_and_update_use_durable_bridge_publish() {
+ let _guard = listing_test_guard();
+ let dir = tempdir().expect("tempdir");
+ let init = cli_command_in(dir.path())
+ .args(["local", "init"])
+ .output()
+ .expect("run local init");
+ assert!(init.status.success());
+
+ let account_output = cli_command_in(dir.path())
+ .args(["--json", "account", "new"])
+ .output()
+ .expect("run account new");
+ assert!(account_output.status.success());
+ let account_json: Value =
+ serde_json::from_slice(account_output.stdout.as_slice()).expect("account json");
+ let seller_pubkey = account_json["public_identity"]["public_key_hex"]
+ .as_str()
+ .expect("seller pubkey");
+ let farm_d_tag = "AAAAAAAAAAAAAAAAAAAAAw";
+ seed_farm(dir.path(), seller_pubkey, farm_d_tag, "La Huerta");
+
+ let draft_path = dir.path().join("eggs.toml");
+ fs::write(
+ &draft_path,
+ valid_listing_draft(
+ "AAAAAAAAAAAAAAAAAAAAAg",
+ "",
+ "",
+ "eggs",
+ "Pasture eggs",
+ "Protein",
+ "Fresh pasture-raised eggs collected daily.",
+ "12",
+ "each",
+ "4.50",
+ "USD",
+ "1",
+ "each",
+ "18",
+ "pickup",
+ "La Huerta del Sur",
+ ),
+ )
+ .expect("write listing draft");
+
+ let requests = Arc::new(Mutex::new(Vec::<Value>::new()));
+ let recorded = Arc::clone(&requests);
+ let server = MockRpcServer::start(move |body, auth_header| {
+ recorded.lock().expect("recorded").push(body.clone());
+ assert_eq!(auth_header.as_deref(), Some("Bearer bridge-secret"));
+ match body["method"].as_str().unwrap_or_default() {
+ "bridge.listing.publish" => MockRpcResponse::success(json!({
+ "deduplicated": false,
+ "job": sample_listing_job("job_listing_01", "published", "event_listing_01", "30402:deadbeef:AAAAAAAAAAAAAAAAAAAAAg")
+ })),
+ other => MockRpcResponse::rpc_error(-32601, &format!("unexpected method: {other}")),
+ }
+ });
+
+ let publish_output = cli_command_in(dir.path())
+ .env("RADROOTS_RPC_URL", server.url())
+ .env("RADROOTS_RPC_BEARER_TOKEN", "bridge-secret")
+ .args([
+ "--json",
+ "listing",
+ "publish",
+ "--idempotency-key",
+ "publish-key",
+ "--print-job",
+ "--print-event",
+ draft_path.to_str().expect("draft path"),
+ ])
+ .output()
+ .expect("run listing publish");
+ assert!(publish_output.status.success());
+ let publish_json: Value =
+ serde_json::from_slice(publish_output.stdout.as_slice()).expect("publish json");
+ assert_eq!(publish_json["operation"], "publish");
+ assert_eq!(publish_json["job_id"], "job_listing_01");
+ assert_eq!(publish_json["job_status"], "published");
+ assert_eq!(publish_json["event_id"], "event_listing_01");
+ assert_eq!(publish_json["event"]["kind"], 30402);
+ assert_eq!(publish_json["job"]["rpc_method"], "bridge.listing.publish");
+
+ let update_output = cli_command_in(dir.path())
+ .env("RADROOTS_RPC_URL", server.url())
+ .env("RADROOTS_RPC_BEARER_TOKEN", "bridge-secret")
+ .args([
+ "--json",
+ "listing",
+ "update",
+ draft_path.to_str().expect("draft path"),
+ ])
+ .output()
+ .expect("run listing update");
+ assert!(update_output.status.success());
+ let update_json: Value =
+ serde_json::from_slice(update_output.stdout.as_slice()).expect("update json");
+ assert_eq!(update_json["operation"], "update");
+
+ let recorded = requests.lock().expect("requests");
+ assert_eq!(recorded.len(), 2);
+ assert_eq!(recorded[0]["params"]["kind"], 30402);
+ assert_eq!(recorded[0]["params"]["idempotency_key"], "publish-key");
+ assert_eq!(recorded[1]["params"]["kind"], 30402);
+}
+
+#[test]
+fn listing_archive_and_dry_run_are_truthful() {
+ let _guard = listing_test_guard();
+ let dir = tempdir().expect("tempdir");
+ let init = cli_command_in(dir.path())
+ .args(["local", "init"])
+ .output()
+ .expect("run local init");
+ assert!(init.status.success());
+
+ let account_output = cli_command_in(dir.path())
+ .args(["--json", "account", "new"])
+ .output()
+ .expect("run account new");
+ assert!(account_output.status.success());
+ let account_json: Value =
+ serde_json::from_slice(account_output.stdout.as_slice()).expect("account json");
+ let seller_pubkey = account_json["public_identity"]["public_key_hex"]
+ .as_str()
+ .expect("seller pubkey");
+ seed_farm(
+ dir.path(),
+ seller_pubkey,
+ "AAAAAAAAAAAAAAAAAAAAAw",
+ "La Huerta",
+ );
+
+ let draft_path = dir.path().join("archive.toml");
+ fs::write(
+ &draft_path,
+ valid_listing_draft(
+ "AAAAAAAAAAAAAAAAAAAAAg",
+ "",
+ "",
+ "eggs",
+ "Pasture eggs",
+ "Protein",
+ "Fresh pasture-raised eggs collected daily.",
+ "12",
+ "each",
+ "4.50",
+ "USD",
+ "1",
+ "each",
+ "18",
+ "pickup",
+ "La Huerta del Sur",
+ ),
+ )
+ .expect("write listing draft");
+
+ let requests = Arc::new(Mutex::new(Vec::<String>::new()));
+ let recorded = Arc::clone(&requests);
+ let server = MockRpcServer::start(move |body, _auth_header| {
+ recorded.lock().expect("recorded").push(body.to_string());
+ MockRpcResponse::success(json!({
+ "deduplicated": false,
+ "job": sample_listing_job("job_listing_archive", "published", "event_listing_archive", "30402:deadbeef:AAAAAAAAAAAAAAAAAAAAAg")
+ }))
+ });
+
+ let archive_output = cli_command_in(dir.path())
+ .env("RADROOTS_RPC_URL", server.url())
+ .env("RADROOTS_RPC_BEARER_TOKEN", "bridge-secret")
+ .args([
+ "--json",
+ "listing",
+ "archive",
+ draft_path.to_str().expect("draft path"),
+ ])
+ .output()
+ .expect("run listing archive");
+ assert!(archive_output.status.success());
+ let archive_json: Value =
+ serde_json::from_slice(archive_output.stdout.as_slice()).expect("archive json");
+ assert_eq!(archive_json["operation"], "archive");
+ assert_eq!(archive_json["job_status"], "published");
+
+ let dry_run_output = cli_command_in(dir.path())
+ .args([
+ "--json",
+ "--dry-run",
+ "listing",
+ "publish",
+ "--print-event",
+ "--print-job",
+ draft_path.to_str().expect("draft path"),
+ ])
+ .output()
+ .expect("run listing publish dry run");
+ assert!(dry_run_output.status.success());
+ let dry_run_json: Value =
+ serde_json::from_slice(dry_run_output.stdout.as_slice()).expect("dry run json");
+ assert_eq!(dry_run_json["state"], "dry_run");
+ assert_eq!(dry_run_json["dry_run"], true);
+ assert_eq!(dry_run_json["job"]["state"], "not_submitted");
+ assert_eq!(dry_run_json["event"]["kind"], 30402);
+ assert!(dry_run_json["event"]["event_id"].is_null());
+
+ let recorded = requests.lock().expect("requests");
+ assert_eq!(recorded.len(), 1);
+ assert!(recorded[0].contains("archived"));
+}
+
fn seed_farm(workdir: &Path, pubkey: &str, d_tag: &str, name: &str) {
let replica_db = workdir
.join("home")
@@ -264,6 +494,221 @@ fn seed_farm(workdir: &Path, pubkey: &str, d_tag: &str, name: &str) {
.expect("insert farm");
}
+#[derive(Debug, Clone)]
+struct MockRpcRequest {
+ body: Value,
+ auth_header: Option<String>,
+}
+
+#[derive(Debug, Clone)]
+struct MockRpcResponse {
+ body: Value,
+}
+
+impl MockRpcResponse {
+ fn success(result: Value) -> Self {
+ Self {
+ body: json!({
+ "jsonrpc": "2.0",
+ "id": 1,
+ "result": result,
+ }),
+ }
+ }
+
+ fn rpc_error(code: i64, message: &str) -> Self {
+ Self {
+ body: json!({
+ "jsonrpc": "2.0",
+ "id": 1,
+ "error": {
+ "code": code,
+ "message": message,
+ }
+ }),
+ }
+ }
+}
+
+struct MockRpcServer {
+ address: String,
+ shutdown: Arc<AtomicBool>,
+ handle: Option<JoinHandle<()>>,
+}
+
+impl MockRpcServer {
+ fn start<F>(handler: F) -> Self
+ where
+ F: Fn(Value, Option<String>) -> MockRpcResponse + Send + Sync + 'static,
+ {
+ let listener = TcpListener::bind("127.0.0.1:0").expect("bind mock rpc listener");
+ listener
+ .set_nonblocking(true)
+ .expect("set listener nonblocking");
+ let address = listener.local_addr().expect("local addr").to_string();
+ let shutdown = Arc::new(AtomicBool::new(false));
+ let shutdown_flag = Arc::clone(&shutdown);
+ let handler: Arc<dyn Fn(Value, Option<String>) -> MockRpcResponse + Send + Sync> =
+ Arc::new(handler);
+ let handle = thread::spawn(move || {
+ while !shutdown_flag.load(Ordering::SeqCst) {
+ match listener.accept() {
+ Ok((mut stream, _)) => {
+ if let Ok(request) = read_request(&mut stream) {
+ let response =
+ handler(request.body.clone(), request.auth_header.clone());
+ let _ = write_response(&mut stream, &response);
+ }
+ }
+ Err(error) if error.kind() == std::io::ErrorKind::WouldBlock => {
+ thread::sleep(Duration::from_millis(10));
+ }
+ Err(_) => break,
+ }
+ }
+ });
+ Self {
+ address,
+ shutdown,
+ handle: Some(handle),
+ }
+ }
+
+ fn url(&self) -> String {
+ format!("http://{}", self.address)
+ }
+}
+
+impl Drop for MockRpcServer {
+ fn drop(&mut self) {
+ self.shutdown.store(true, Ordering::SeqCst);
+ let _ = TcpStream::connect(&self.address);
+ if let Some(handle) = self.handle.take() {
+ handle.join().expect("join mock rpc server");
+ }
+ }
+}
+
+fn read_request(stream: &mut TcpStream) -> Result<MockRpcRequest, String> {
+ stream
+ .set_read_timeout(Some(Duration::from_secs(2)))
+ .map_err(|error| format!("set mock rpc read timeout: {error}"))?;
+ let mut buffer = Vec::new();
+ let mut chunk = [0_u8; 4096];
+ let mut header_end = None;
+ let mut content_length = 0usize;
+
+ loop {
+ let read = stream
+ .read(&mut chunk)
+ .map_err(|error| format!("read mock rpc request: {error}"))?;
+ if read == 0 {
+ break;
+ }
+ buffer.extend_from_slice(&chunk[..read]);
+ if header_end.is_none() {
+ header_end = find_subslice(&buffer, b"\r\n\r\n").map(|index| index + 4);
+ if let Some(end) = header_end {
+ content_length = parse_content_length(&buffer[..end])?;
+ if buffer.len() >= end + content_length {
+ break;
+ }
+ }
+ } else if let Some(end) = header_end {
+ if buffer.len() >= end + content_length {
+ break;
+ }
+ }
+ }
+
+ let end = header_end.ok_or_else(|| "mock rpc request missing headers".to_owned())?;
+ let headers = std::str::from_utf8(&buffer[..end])
+ .map_err(|error| format!("mock rpc headers not utf-8: {error}"))?;
+ let auth_header = parse_header(headers, "authorization");
+ let body = std::str::from_utf8(&buffer[end..end + content_length])
+ .map_err(|error| format!("mock rpc body not utf-8: {error}"))?;
+ let json: Value =
+ serde_json::from_str(body).map_err(|error| format!("parse mock rpc body: {error}"))?;
+
+ Ok(MockRpcRequest {
+ body: json,
+ auth_header,
+ })
+}
+
+fn parse_content_length(headers: &[u8]) -> Result<usize, String> {
+ let text = std::str::from_utf8(headers)
+ .map_err(|error| format!("header utf-8 parse failed: {error}"))?;
+ for line in text.lines() {
+ if let Some((name, value)) = line.split_once(':') {
+ if name.trim().eq_ignore_ascii_case("content-length") {
+ return value
+ .trim()
+ .parse::<usize>()
+ .map_err(|error| format!("content-length parse failed: {error}"));
+ }
+ }
+ }
+ Ok(0)
+}
+
+fn parse_header(headers: &str, wanted: &str) -> Option<String> {
+ headers.lines().find_map(|line| {
+ let (name, value) = line.split_once(':')?;
+ if name.trim().eq_ignore_ascii_case(wanted) {
+ Some(value.trim().to_owned())
+ } else {
+ None
+ }
+ })
+}
+
+fn find_subslice(haystack: &[u8], needle: &[u8]) -> Option<usize> {
+ haystack
+ .windows(needle.len())
+ .position(|window| window == needle)
+}
+
+fn write_response(stream: &mut TcpStream, response: &MockRpcResponse) -> Result<(), String> {
+ let body = response.body.to_string();
+ write!(
+ stream,
+ "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
+ body.len(),
+ body
+ )
+ .map_err(|error| format!("write mock rpc response: {error}"))?;
+ stream
+ .flush()
+ .map_err(|error| format!("flush mock rpc response: {error}"))
+}
+
+fn sample_listing_job(job_id: &str, status: &str, event_id: &str, event_addr: &str) -> Value {
+ json!({
+ "job_id": job_id,
+ "command": "bridge.listing.publish",
+ "idempotency_key": "publish-key",
+ "status": status,
+ "terminal": status != "accepted",
+ "recovered_after_restart": false,
+ "requested_at_unix": 1_712_720_000,
+ "completed_at_unix": 1_712_720_010,
+ "signer_mode": "local",
+ "event_kind": 30402,
+ "event_id": event_id,
+ "event_addr": event_addr,
+ "delivery_policy": "best_effort",
+ "delivery_quorum": 2,
+ "relay_count": 2,
+ "acknowledged_relay_count": 2,
+ "required_acknowledged_relay_count": 2,
+ "attempt_count": 1,
+ "attempt_summaries": ["attempt 1: relay.one accepted"],
+ "relay_results": [],
+ "relay_outcome_summary": "published to 2 relays"
+ })
+}
+
fn seed_trade_product(
workdir: &Path,
product_id: &str,