commit 4205ebd96701352446a0383636305b2bfcd4feb5
parent 5fe78da93769cbf66df9b1eb91460cdf1c21fa2c
Author: triesap <tyson@radroots.org>
Date: Wed, 15 Apr 2026 05:35:01 +0000
cli: productize localhost workflow truth
Diffstat:
4 files changed, 354 insertions(+), 40 deletions(-)
diff --git a/src/commands/doctor.rs b/src/commands/doctor.rs
@@ -305,8 +305,8 @@ fn hyf_check(hyf: &crate::runtime::provider::HyfProviderView) -> EvaluatedCheck
fn workflow_check(workflow: &crate::runtime::provider::WorkflowProviderView) -> EvaluatedCheck {
let severity = match workflow.state.as_str() {
"ready" => DoctorSeverity::Ok,
- "not_configured" => DoctorSeverity::Warn,
- "unsupported" | "unavailable" | "incompatible" => DoctorSeverity::ExternalFail,
+ "not_configured" | "disabled" | "unavailable" => DoctorSeverity::Warn,
+ "unsupported" | "incompatible" => DoctorSeverity::ExternalFail,
_ => DoctorSeverity::InternalFail,
};
diff --git a/src/runtime/order.rs b/src/runtime/order.rs
@@ -45,7 +45,7 @@ const WORKFLOW_TARGET: &str = "workflow-default";
const WORKFLOW_STATE_DIR_NAME: &str = "trade-listing";
const WORKFLOW_STATE_FILE_NAME: &str = "state.json";
const WORKFLOW_IDENTITY_FILE_NAME: &str = "identity.secret.json";
-const WORKFLOW_FETCH_TIMEOUT: Duration = Duration::from_secs(30);
+const WORKFLOW_FETCH_TIMEOUT: Duration = Duration::from_secs(60);
const WORKFLOW_POLL_INTERVAL: Duration = Duration::from_millis(250);
const WORKFLOW_REPLAY_WINDOW_SECS: u64 = 24 * 60 * 60;
const WORKFLOW_REPLAY_OVERLAP_SECS: u64 = 5 * 60;
@@ -1452,9 +1452,10 @@ fn wait_for_order_workflow_truth(
})?;
runtime.block_on(async move {
+ let replay_window_secs = workflow_replay_window_secs(document);
let trade_listing_runtime = TradeListingRuntime::load(TradeListingRuntimeConfig {
state_path: context.state_path.clone(),
- replay_window_secs: WORKFLOW_REPLAY_WINDOW_SECS,
+ replay_window_secs,
replay_overlap_secs: WORKFLOW_REPLAY_OVERLAP_SECS,
})
.await
@@ -1531,6 +1532,26 @@ fn wait_for_order_workflow_truth(
})
}
+fn workflow_replay_window_secs(document: &OrderDraftDocument) -> u64 {
+ let Some(submitted_at_unix) = document
+ .submission
+ .as_ref()
+ .and_then(|submission| submission.submitted_at_unix)
+ else {
+ return WORKFLOW_REPLAY_WINDOW_SECS;
+ };
+
+ let now = now_unix();
+ if submitted_at_unix >= now {
+ return WORKFLOW_REPLAY_OVERLAP_SECS.max(1);
+ }
+
+ let recent_window = now
+ .saturating_sub(submitted_at_unix)
+ .saturating_add(WORKFLOW_REPLAY_OVERLAP_SECS);
+ recent_window.clamp(1, WORKFLOW_REPLAY_WINDOW_SECS)
+}
+
fn resolve_workflow_context(
config: &RuntimeConfig,
) -> Result<Option<WorkflowContext>, WorkflowResolutionError> {
@@ -1967,7 +1988,8 @@ impl From<OrderGetView> for OrderNewView {
mod tests {
use super::{
ORDER_DRAFT_KIND, OrderDraft, OrderDraftDocument, OrderDraftItem, OrderDraftSubmission,
- next_order_id,
+ WORKFLOW_REPLAY_OVERLAP_SECS, WORKFLOW_REPLAY_WINDOW_SECS, next_order_id, now_unix,
+ workflow_replay_window_secs,
};
#[test]
@@ -2011,4 +2033,78 @@ mod tests {
assert!(rendered.contains("order_id = \"ord_AAAAAAAAAAAAAAAAAAAAAg\""));
assert!(rendered.contains("job_id = \"job_01\""));
}
+
+ #[test]
+ fn workflow_replay_window_prefers_recent_submission_age_plus_overlap() {
+ let now = now_unix();
+ let document = OrderDraftDocument {
+ version: 1,
+ kind: ORDER_DRAFT_KIND.to_owned(),
+ order: OrderDraft {
+ order_id: "ord_AAAAAAAAAAAAAAAAAAAAAg".to_owned(),
+ listing_addr: "30402:deadbeef:AAAAAAAAAAAAAAAAAAAAAg".to_owned(),
+ buyer_pubkey: "a".repeat(64),
+ seller_pubkey: "b".repeat(64),
+ items: vec![OrderDraftItem {
+ bin_id: "bin-1".to_owned(),
+ bin_count: 2,
+ }],
+ },
+ listing_lookup: Some("fresh-eggs".to_owned()),
+ buyer_account_id: Some("acct_demo".to_owned()),
+ submission: Some(OrderDraftSubmission {
+ job_id: "job_01".to_owned(),
+ state: Some("accepted".to_owned()),
+ signer_mode: Some("embedded_service_identity".to_owned()),
+ signer_session_id: None,
+ command: Some("order.submit".to_owned()),
+ event_id: None,
+ event_addr: None,
+ submitted_at_unix: Some(now.saturating_sub(42)),
+ }),
+ };
+
+ assert_eq!(
+ workflow_replay_window_secs(&document),
+ 42 + WORKFLOW_REPLAY_OVERLAP_SECS
+ );
+ }
+
+ #[test]
+ fn workflow_replay_window_caps_at_default_window_for_old_orders() {
+ let now = now_unix();
+ let document = OrderDraftDocument {
+ version: 1,
+ kind: ORDER_DRAFT_KIND.to_owned(),
+ order: OrderDraft {
+ order_id: "ord_AAAAAAAAAAAAAAAAAAAAAg".to_owned(),
+ listing_addr: "30402:deadbeef:AAAAAAAAAAAAAAAAAAAAAg".to_owned(),
+ buyer_pubkey: "a".repeat(64),
+ seller_pubkey: "b".repeat(64),
+ items: vec![OrderDraftItem {
+ bin_id: "bin-1".to_owned(),
+ bin_count: 2,
+ }],
+ },
+ listing_lookup: Some("fresh-eggs".to_owned()),
+ buyer_account_id: Some("acct_demo".to_owned()),
+ submission: Some(OrderDraftSubmission {
+ job_id: "job_01".to_owned(),
+ state: Some("accepted".to_owned()),
+ signer_mode: Some("embedded_service_identity".to_owned()),
+ signer_session_id: None,
+ command: Some("order.submit".to_owned()),
+ event_id: None,
+ event_addr: None,
+ submitted_at_unix: Some(
+ now.saturating_sub(WORKFLOW_REPLAY_WINDOW_SECS + WORKFLOW_REPLAY_OVERLAP_SECS),
+ ),
+ }),
+ };
+
+ assert_eq!(
+ workflow_replay_window_secs(&document),
+ WORKFLOW_REPLAY_WINDOW_SECS
+ );
+ }
}
diff --git a/src/runtime/provider.rs b/src/runtime/provider.rs
@@ -1,6 +1,10 @@
-use std::path::PathBuf;
+use std::path::{Path, PathBuf};
use radroots_runtime_manager::{ManagedRuntimeInstallState, load_registry, read_secret_file};
+use radroots_runtime_paths::{
+ RadrootsPathOverrides, RadrootsPathProfile, RadrootsPathResolver, RadrootsRuntimeNamespace,
+};
+use radroots_sdk::RadrootsSdkConfig;
use url::Url;
use crate::runtime::config::{
@@ -10,6 +14,10 @@ use crate::runtime::config::{
};
use crate::runtime::hyf;
+const WORKFLOW_PROVIDER_RUNTIME_ID: &str = "rhi";
+const WORKFLOW_TARGET: &str = "workflow-default";
+const WORKFLOW_IDENTITY_FILE_NAME: &str = "identity.secret.json";
+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProviderProvenance {
ExplicitBinding,
@@ -73,25 +81,24 @@ pub struct WorkflowProviderView {
pub target: Option<String>,
pub hyf_helper_state: String,
pub hyf_helper_detail: String,
+ pub reason: Option<String>,
}
impl WorkflowProviderView {
pub fn detail(&self) -> String {
- match (self.state.as_str(), self.target_kind.as_deref(), self.target.as_deref()) {
- ("not_configured", _, _) => {
- "optional workflow provider is not configured; standalone rhi remains outside the cli composed-provider path in this wave"
+ match self.state.as_str() {
+ "not_configured" => {
+ "optional workflow provider is not configured; canonical localhost workflow progression remains inactive"
.to_owned()
}
- ("unsupported", Some(target_kind), Some(target)) => {
- format!(
- "configured workflow binding via {} {} is not executable through the cli composed-provider path in this wave; standalone rhi remains a separate workflow worker",
- target_kind, target
- )
- }
- ("unsupported", _, _) => {
- "configured workflow binding is not executable through the cli composed-provider path in this wave; standalone rhi remains a separate workflow worker"
+ "disabled" => "workflow provider is disabled by config".to_owned(),
+ "ready" => self.reason.clone().unwrap_or_else(|| {
+ "canonical repo-local localhost workflow progression is executable".to_owned()
+ }),
+ "unavailable" | "incompatible" | "unsupported" => self.reason.clone().unwrap_or_else(|| {
+ "configured workflow binding is not currently executable on canonical localhost"
.to_owned()
- }
+ }),
_ => self.source.clone(),
}
}
@@ -138,18 +145,24 @@ pub fn resolve_actor_write_plane_target(
pub fn resolve_workflow_provider(config: &RuntimeConfig) -> WorkflowProviderView {
let binding = inspect_binding(config, WORKFLOW_TRADE_CAPABILITY);
- let (state, provenance) = match binding.state {
- CapabilityBindingInspectionState::Configured => (
- "unsupported".to_owned(),
- ProviderProvenance::ExplicitBinding.as_str().to_owned(),
- ),
+ let (state, provenance, reason) = match binding.state {
+ CapabilityBindingInspectionState::Configured => {
+ let (state, reason) = resolve_workflow_execution_state(config, &binding);
+ (
+ state,
+ binding_provenance(&binding).as_str().to_owned(),
+ Some(reason),
+ )
+ }
CapabilityBindingInspectionState::Disabled => (
"disabled".to_owned(),
ProviderProvenance::Disabled.as_str().to_owned(),
+ None,
),
CapabilityBindingInspectionState::NotConfigured => (
"not_configured".to_owned(),
ProviderProvenance::Unavailable.as_str().to_owned(),
+ None,
),
};
@@ -165,6 +178,7 @@ pub fn resolve_workflow_provider(config: &RuntimeConfig) -> WorkflowProviderView
hyf_helper_detail:
"cli bindings do not imply an rhi -> hyf helper path; any worker helper remains explicit and optional"
.to_owned(),
+ reason,
}
}
@@ -476,6 +490,167 @@ fn validate_write_plane_url(value: &str) -> Result<String, String> {
Ok(trimmed.to_owned())
}
+fn resolve_workflow_execution_state(
+ config: &RuntimeConfig,
+ binding: &CapabilityBindingInspection,
+) -> (String, String) {
+ if binding.provider_runtime_id != WORKFLOW_PROVIDER_RUNTIME_ID {
+ return (
+ "incompatible".to_owned(),
+ format!(
+ "workflow.trade binding must use provider `{WORKFLOW_PROVIDER_RUNTIME_ID}`, got `{}`",
+ binding.provider_runtime_id
+ ),
+ );
+ }
+ if binding.target_kind.as_deref() != Some("managed_instance") {
+ return (
+ "incompatible".to_owned(),
+ format!(
+ "workflow.trade binding must use target_kind `managed_instance`, got `{}`",
+ binding.target_kind.as_deref().unwrap_or("unknown"),
+ ),
+ );
+ }
+ if binding.target.as_deref() != Some(WORKFLOW_TARGET) {
+ return (
+ "incompatible".to_owned(),
+ format!(
+ "workflow.trade binding must target `{WORKFLOW_TARGET}`, got `{}`",
+ binding.target.as_deref().unwrap_or(""),
+ ),
+ );
+ }
+ if config.paths.profile != "repo_local" {
+ return (
+ "unavailable".to_owned(),
+ "workflow.trade progression requires RADROOTS_CLI_PATHS_PROFILE=repo_local".to_owned(),
+ );
+ }
+ let Some(repo_local_root) = config.paths.repo_local_root.as_ref() else {
+ return (
+ "unavailable".to_owned(),
+ "workflow.trade progression requires a repo-local cli root".to_owned(),
+ );
+ };
+
+ let canonical_relay_url = match canonical_local_relay_url() {
+ Ok(url) => url,
+ Err(error) => return ("unavailable".to_owned(), error),
+ };
+ if !config
+ .relay
+ .urls
+ .iter()
+ .any(|configured| loopback_endpoint_matches(configured, canonical_relay_url.as_str()))
+ {
+ return (
+ "unavailable".to_owned(),
+ format!(
+ "workflow.trade progression requires canonical localhost relay `{canonical_relay_url}`"
+ ),
+ );
+ }
+
+ let canonical_rpc_url = match canonical_local_radrootsd_url() {
+ Ok(url) => url,
+ Err(error) => return ("unavailable".to_owned(), error),
+ };
+ if !loopback_endpoint_matches(config.rpc.url.as_str(), canonical_rpc_url.as_str()) {
+ return (
+ "unavailable".to_owned(),
+ format!(
+ "workflow.trade progression requires canonical localhost radrootsd `{canonical_rpc_url}`"
+ ),
+ );
+ }
+
+ let identity_path = match workflow_identity_path(repo_local_root) {
+ Ok(path) => path,
+ Err(error) => return ("unavailable".to_owned(), error),
+ };
+ if !identity_path.is_file() {
+ return (
+ "unavailable".to_owned(),
+ format!(
+ "workflow.trade progression requires repo-local rhi identity at {}",
+ identity_path.display()
+ ),
+ );
+ }
+
+ (
+ "ready".to_owned(),
+ format!(
+ "canonical repo-local localhost workflow progression is executable via `{WORKFLOW_PROVIDER_RUNTIME_ID}` and `{canonical_relay_url}`"
+ ),
+ )
+}
+
+fn canonical_local_relay_url() -> Result<String, String> {
+ let config = RadrootsSdkConfig::local();
+ let relays = config
+ .resolved_relay_urls()
+ .map_err(|error| format!("resolve canonical localhost relay url: {error}"))?;
+ relays
+ .into_iter()
+ .next()
+ .ok_or_else(|| "canonical localhost relay config did not define any relay urls".to_owned())
+}
+
+fn canonical_local_radrootsd_url() -> Result<String, String> {
+ RadrootsSdkConfig::local()
+ .resolved_radrootsd_endpoint()
+ .map_err(|error| format!("resolve canonical localhost radrootsd endpoint: {error}"))
+}
+
+fn workflow_identity_path(repo_local_root: &Path) -> Result<PathBuf, String> {
+ let base_paths = RadrootsPathResolver::current()
+ .resolve(
+ RadrootsPathProfile::RepoLocal,
+ &RadrootsPathOverrides::repo_local(repo_local_root),
+ )
+ .map_err(|error| {
+ format!(
+ "resolve repo-local workflow verifier roots from {}: {error}",
+ repo_local_root.display()
+ )
+ })?;
+ let worker_namespace = RadrootsRuntimeNamespace::worker(WORKFLOW_PROVIDER_RUNTIME_ID)
+ .map_err(|error| format!("resolve worker namespace `{WORKFLOW_PROVIDER_RUNTIME_ID}`: {error}"))?;
+ Ok(base_paths
+ .namespaced(&worker_namespace)
+ .secrets
+ .join(WORKFLOW_IDENTITY_FILE_NAME))
+}
+
+fn loopback_endpoint_matches(configured: &str, canonical: &str) -> bool {
+ let Ok(configured_url) = Url::parse(configured) else {
+ return false;
+ };
+ let Ok(canonical_url) = Url::parse(canonical) else {
+ return false;
+ };
+
+ configured_url.port_or_known_default() == canonical_url.port_or_known_default()
+ && loopback_host_matches(configured_url.host_str(), canonical_url.host_str())
+}
+
+fn loopback_host_matches(left: Option<&str>, right: Option<&str>) -> bool {
+ match (left, right) {
+ (Some(left), Some(right)) => normalize_loopback_host(left) == normalize_loopback_host(right),
+ _ => false,
+ }
+}
+
+fn normalize_loopback_host(host: &str) -> &str {
+ if host.eq_ignore_ascii_case("localhost") {
+ "127.0.0.1"
+ } else {
+ host
+ }
+}
+
fn inspect_binding(config: &RuntimeConfig, capability_id: &str) -> CapabilityBindingInspection {
config
.inspect_capability_bindings()
@@ -542,7 +717,10 @@ mod tests {
use std::fs;
use std::path::PathBuf;
- use radroots_runtime_paths::RadrootsMigrationReport;
+ use radroots_runtime_paths::{
+ RadrootsMigrationReport, RadrootsPathOverrides, RadrootsPathProfile, RadrootsPathResolver,
+ RadrootsRuntimeNamespace,
+ };
use radroots_secret_vault::RadrootsSecretBackend;
use tempfile::tempdir;
@@ -639,6 +817,19 @@ mod tests {
}
}
+ fn sample_workflow_binding() -> CapabilityBindingConfig {
+ CapabilityBindingConfig {
+ capability_id: "workflow.trade".into(),
+ provider_runtime_id: "rhi".into(),
+ binding_model: "out_of_process_worker".into(),
+ source: CapabilityBindingSource::WorkspaceConfig,
+ target_kind: CapabilityBindingTargetKind::ManagedInstance,
+ target: "workflow-default".into(),
+ managed_account_ref: None,
+ signer_session_ref: None,
+ }
+ }
+
#[test]
fn write_plane_requires_authoritative_binding_or_managed_default() {
let view = resolve_write_plane_provider(&sample_config(Vec::new(), false));
@@ -648,24 +839,51 @@ mod tests {
}
#[test]
- fn workflow_uses_explicit_binding_provenance_when_configured() {
- let binding = CapabilityBindingConfig {
- capability_id: "workflow.trade".into(),
- provider_runtime_id: "rhi".into(),
- binding_model: "out_of_process_worker".into(),
- source: CapabilityBindingSource::WorkspaceConfig,
- target_kind: CapabilityBindingTargetKind::ExplicitEndpoint,
- target: "/tmp/rhi".into(),
- managed_account_ref: None,
- signer_session_ref: None,
- };
+ fn workflow_reports_unavailable_when_repo_local_posture_is_missing() {
+ let binding = sample_workflow_binding();
let view = resolve_workflow_provider(&sample_config(vec![binding], false));
- assert_eq!(view.state, "unsupported");
+ assert_eq!(view.state, "unavailable");
assert_eq!(
view.provenance,
- ProviderProvenance::ExplicitBinding.as_str()
+ ProviderProvenance::ManagedDefault.as_str()
);
- assert_eq!(view.target_kind.as_deref(), Some("explicit_endpoint"));
+ assert!(
+ view.detail()
+ .contains("RADROOTS_CLI_PATHS_PROFILE=repo_local")
+ );
+ }
+
+ #[test]
+ fn workflow_reports_ready_for_canonical_repo_local_localhost_posture() {
+ let dir = tempdir().expect("tempdir");
+ let repo_local_root = dir.path().join(".radroots");
+ let base_paths = RadrootsPathResolver::current()
+ .resolve(
+ RadrootsPathProfile::RepoLocal,
+ &RadrootsPathOverrides::repo_local(&repo_local_root),
+ )
+ .expect("resolve repo_local paths");
+ let worker_namespace = RadrootsRuntimeNamespace::worker("rhi").expect("rhi namespace");
+ let worker_paths = base_paths.namespaced(&worker_namespace);
+ fs::create_dir_all(&worker_paths.secrets).expect("create worker secrets dir");
+ fs::write(
+ worker_paths.secrets.join("identity.secret.json"),
+ r#"{"secret_key_hex":"1111111111111111111111111111111111111111111111111111111111111111"}"#,
+ )
+ .expect("write repo_local rhi identity");
+
+ let mut config = sample_config(vec![sample_workflow_binding()], false);
+ config.paths.profile = "repo_local".into();
+ config.paths.repo_local_root = Some(repo_local_root);
+ config.relay.urls = vec!["ws://127.0.0.1:8080".into()];
+
+ let view = resolve_workflow_provider(&config);
+ assert_eq!(view.state, "ready");
+ assert_eq!(
+ view.provenance,
+ ProviderProvenance::ManagedDefault.as_str()
+ );
+ assert!(view.detail().contains("canonical repo-local localhost workflow progression"));
}
#[test]
diff --git a/tests/runtime_show.rs b/tests/runtime_show.rs
@@ -683,8 +683,8 @@ target = "bin/hyfd-user"
assert_eq!(workflow["target_kind"], "managed_instance");
assert_eq!(workflow["target"], "workflow-default");
assert_eq!(json["workflow"]["provider_runtime_id"], "rhi");
- assert_eq!(json["workflow"]["state"], "unsupported");
- assert_eq!(json["workflow"]["provenance"], "explicit_binding");
+ assert_eq!(json["workflow"]["state"], "unavailable");
+ assert_eq!(json["workflow"]["provenance"], "managed_default");
assert_eq!(
json["workflow"]["source"],
"user config [[capability_binding]]"