hyf.rs (39153B)
1 #![cfg_attr(not(test), allow(dead_code))] 2 3 use std::collections::BTreeMap; 4 use std::io::{Read, Write}; 5 use std::path::{Path, PathBuf}; 6 use std::process::{Child, Command, ExitStatus, Output, Stdio}; 7 use std::thread; 8 use std::time::{Duration, Instant}; 9 10 use serde::{Deserialize, Serialize}; 11 use serde_json::Value; 12 13 use crate::runtime::config::{ 14 CapabilityBindingTargetKind, HyfConfig, INFERENCE_HYF_STDIO_CAPABILITY, RuntimeConfig, 15 }; 16 17 const HYF_CONTROL_TIMEOUT: Duration = Duration::from_secs(2); 18 const HYF_BUSINESS_TIMEOUT: Duration = Duration::from_secs(4); 19 const HYF_TIMEOUT_POLL_INTERVAL: Duration = Duration::from_millis(10); 20 const HYF_STATUS_REQUEST_ID: &str = "cli-doctor-hyf-status"; 21 const HYF_CAPABILITIES_REQUEST_ID: &str = "cli-runtime-hyf-capabilities"; 22 const HYF_SOURCE: &str = "hyf status control request · local first"; 23 const HYF_PROTOCOL_VERSION: u64 = 1; 24 const HYF_CONSUMER: &str = "radroots-cli"; 25 26 #[derive(Debug, Clone, PartialEq, Eq)] 27 pub struct HyfStatusView { 28 pub executable: String, 29 pub state: String, 30 pub source: String, 31 pub reason: Option<String>, 32 pub protocol_version: Option<u64>, 33 pub deterministic_available: Option<bool>, 34 } 35 36 #[derive(Debug, Clone, PartialEq, Eq)] 37 pub struct HyfClient { 38 executable: PathBuf, 39 timeouts: HyfClientTimeouts, 40 } 41 42 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 43 struct HyfClientTimeouts { 44 control: Duration, 45 business: Duration, 46 } 47 48 impl Default for HyfClientTimeouts { 49 fn default() -> Self { 50 Self { 51 control: HYF_CONTROL_TIMEOUT, 52 business: HYF_BUSINESS_TIMEOUT, 53 } 54 } 55 } 56 57 impl HyfClient { 58 pub fn new(executable: PathBuf) -> Self { 59 Self { 60 executable, 61 timeouts: HyfClientTimeouts::default(), 62 } 63 } 64 65 #[cfg(test)] 66 fn with_timeouts(executable: PathBuf, control: Duration, business: Duration) -> Self { 67 Self { 68 executable, 69 timeouts: HyfClientTimeouts { control, business }, 70 } 71 } 72 73 pub fn executable(&self) -> &Path { 74 self.executable.as_path() 75 } 76 77 pub fn status(&self) -> Result<HyfSuccess<HyfStatusOutput>, HyfClientError> { 78 self.call( 79 HYF_STATUS_REQUEST_ID, 80 Some(HYF_STATUS_REQUEST_ID), 81 "sys.status", 82 None, 83 &HyfEmptyInput::default(), 84 self.timeouts.control, 85 ) 86 } 87 88 pub fn capabilities(&self) -> Result<HyfSuccess<HyfCapabilitiesOutput>, HyfClientError> { 89 self.call( 90 HYF_CAPABILITIES_REQUEST_ID, 91 None, 92 "sys.capabilities", 93 None, 94 &HyfEmptyInput::default(), 95 self.timeouts.control, 96 ) 97 } 98 99 pub fn query_rewrite( 100 &self, 101 request_id: &str, 102 trace_id: Option<&str>, 103 context: &HyfRequestContext, 104 request: &HyfQueryRewriteRequest, 105 ) -> Result<HyfSuccess<HyfQueryRewriteOutput>, HyfClientError> { 106 self.call( 107 request_id, 108 trace_id, 109 "query_rewrite", 110 Some(context), 111 request, 112 self.timeouts.business, 113 ) 114 } 115 116 pub fn semantic_rank( 117 &self, 118 request_id: &str, 119 trace_id: Option<&str>, 120 context: &HyfRequestContext, 121 request: &HyfSemanticRankRequest, 122 ) -> Result<HyfSuccess<HyfSemanticRankOutput>, HyfClientError> { 123 self.call( 124 request_id, 125 trace_id, 126 "semantic_rank", 127 Some(context), 128 request, 129 self.timeouts.business, 130 ) 131 } 132 133 pub fn explain_result( 134 &self, 135 request_id: &str, 136 trace_id: Option<&str>, 137 context: &HyfRequestContext, 138 request: &HyfExplainResultRequest, 139 ) -> Result<HyfSuccess<HyfExplainResultOutput>, HyfClientError> { 140 self.call( 141 request_id, 142 trace_id, 143 "explain_result", 144 Some(context), 145 request, 146 self.timeouts.business, 147 ) 148 } 149 150 fn call<TRequest, TResponse>( 151 &self, 152 request_id: &str, 153 trace_id: Option<&str>, 154 capability: &str, 155 context: Option<&HyfRequestContext>, 156 input: &TRequest, 157 timeout: Duration, 158 ) -> Result<HyfSuccess<TResponse>, HyfClientError> 159 where 160 TRequest: Serialize, 161 TResponse: for<'de> Deserialize<'de>, 162 { 163 let request = serialize_request(request_id, trace_id, capability, context, input) 164 .map_err(HyfClientError::SerializeRequest)?; 165 166 let output = self.run_request(request.as_str(), timeout)?; 167 let stdout = String::from_utf8(output.stdout).map_err(HyfClientError::InvalidUtf8)?; 168 let response: HyfWireResponse<TResponse> = 169 serde_json::from_str(stdout.as_str()).map_err(HyfClientError::InvalidJson)?; 170 171 if !response.ok { 172 return Err(HyfClientError::RemoteError { 173 code: response.error.as_ref().and_then(|error| error.code.clone()), 174 message: response 175 .error 176 .as_ref() 177 .and_then(|error| error.message.clone()), 178 }); 179 } 180 181 let Some(output) = response.output else { 182 return Err(HyfClientError::InvalidResponse( 183 "hyf response omitted output for a successful request".to_owned(), 184 )); 185 }; 186 187 Ok(HyfSuccess { 188 version: response.version, 189 request_id: response.request_id, 190 trace_id: response.trace_id, 191 output, 192 meta: response.meta, 193 }) 194 } 195 196 fn run_request(&self, request: &str, timeout: Duration) -> Result<Output, HyfClientError> { 197 let mut child = Command::new(&self.executable) 198 .stdin(Stdio::piped()) 199 .stdout(Stdio::piped()) 200 .stderr(Stdio::piped()) 201 .spawn() 202 .map_err(|error| match error.kind() { 203 std::io::ErrorKind::NotFound => HyfClientError::NotFound, 204 _ => HyfClientError::Start(error), 205 })?; 206 207 if let Some(mut stdin) = child.stdin.take() { 208 writeln!(stdin, "{request}").map_err(HyfClientError::Write)?; 209 } 210 211 let output = collect_output_with_timeout(child, timeout)?; 212 if !output.status.success() { 213 return Err(HyfClientError::NonZeroExit { 214 status: output.status.code(), 215 stderr: String::from_utf8_lossy(&output.stderr).trim().to_owned(), 216 }); 217 } 218 Ok(output) 219 } 220 } 221 222 fn serialize_request<TRequest: Serialize>( 223 request_id: &str, 224 trace_id: Option<&str>, 225 capability: &str, 226 context: Option<&HyfRequestContext>, 227 input: &TRequest, 228 ) -> Result<String, serde_json::Error> { 229 serde_json::to_string(&HyfRequestEnvelope { 230 version: HYF_PROTOCOL_VERSION, 231 request_id, 232 trace_id, 233 capability, 234 context, 235 input, 236 }) 237 } 238 239 #[derive(Debug, Clone, PartialEq, Eq)] 240 pub struct HyfSuccess<T> { 241 pub version: u64, 242 pub request_id: String, 243 pub trace_id: Option<String>, 244 pub output: T, 245 pub meta: Option<Value>, 246 } 247 248 #[derive(Debug, thiserror::Error)] 249 pub enum HyfClientError { 250 #[error("hyf executable was not found")] 251 NotFound, 252 #[error("failed to start hyf request: {0}")] 253 Start(std::io::Error), 254 #[error("failed to write hyf request stdin: {0}")] 255 Write(std::io::Error), 256 #[error("failed to wait on hyf request: {0}")] 257 Wait(std::io::Error), 258 #[error("failed to read hyf request output: {0}")] 259 Read(std::io::Error), 260 #[error("hyf request timed out after {0}ms")] 261 Timeout(u128), 262 #[error("hyf request exited unsuccessfully")] 263 NonZeroExit { status: Option<i32>, stderr: String }, 264 #[error("failed to serialize hyf request: {0}")] 265 SerializeRequest(serde_json::Error), 266 #[error("hyf response was not valid UTF-8: {0}")] 267 InvalidUtf8(std::string::FromUtf8Error), 268 #[error("hyf response was not valid JSON: {0}")] 269 InvalidJson(serde_json::Error), 270 #[error("{0}")] 271 InvalidResponse(String), 272 #[error("hyf request returned a remote error")] 273 RemoteError { 274 code: Option<String>, 275 message: Option<String>, 276 }, 277 } 278 279 #[derive(Debug, Clone, Default, Serialize, PartialEq, Eq)] 280 pub struct HyfRequestContext { 281 #[serde(skip_serializing_if = "Option::is_none")] 282 pub consumer: Option<String>, 283 #[serde(skip_serializing_if = "Option::is_none")] 284 pub execution_mode_preference: Option<String>, 285 #[serde(skip_serializing_if = "Option::is_none")] 286 pub scope: Option<HyfRequestScope>, 287 #[serde(skip_serializing_if = "Option::is_none")] 288 pub return_provenance: Option<bool>, 289 } 290 291 impl HyfRequestContext { 292 pub fn deterministic_cli() -> Self { 293 Self { 294 consumer: Some(HYF_CONSUMER.to_owned()), 295 execution_mode_preference: Some("deterministic".to_owned()), 296 scope: None, 297 return_provenance: None, 298 } 299 } 300 301 pub fn with_return_provenance(mut self, return_provenance: bool) -> Self { 302 self.return_provenance = Some(return_provenance); 303 self 304 } 305 306 pub fn with_listing_scope(mut self, listing_ids: Vec<String>) -> Self { 307 self.scope = if listing_ids.is_empty() { 308 None 309 } else { 310 Some(HyfRequestScope { listing_ids }) 311 }; 312 self 313 } 314 } 315 316 #[derive(Debug, Clone, Default, Serialize, PartialEq, Eq)] 317 pub struct HyfRequestScope { 318 #[serde(default, skip_serializing_if = "Vec::is_empty")] 319 pub listing_ids: Vec<String>, 320 } 321 322 #[derive(Debug, Clone, Serialize, PartialEq, Eq)] 323 pub struct HyfQueryRewriteRequest { 324 pub query: String, 325 } 326 327 impl HyfQueryRewriteRequest { 328 pub fn new(query: impl Into<String>) -> Self { 329 Self { 330 query: query.into(), 331 } 332 } 333 } 334 335 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] 336 pub struct HyfSemanticCandidate { 337 pub id: String, 338 pub title: String, 339 pub farm: String, 340 pub delivery: String, 341 pub distance_km: f64, 342 pub freshness_minutes: i64, 343 } 344 345 #[derive(Debug, Clone, Serialize, PartialEq)] 346 pub struct HyfSemanticRankRequest { 347 pub query: String, 348 pub candidates: Vec<HyfSemanticCandidate>, 349 } 350 351 impl HyfSemanticRankRequest { 352 pub fn new(query: impl Into<String>, candidates: Vec<HyfSemanticCandidate>) -> Self { 353 Self { 354 query: query.into(), 355 candidates, 356 } 357 } 358 } 359 360 #[derive(Debug, Clone, Serialize, PartialEq)] 361 pub struct HyfExplainResultRequest { 362 pub query: String, 363 pub candidate: HyfSemanticCandidate, 364 } 365 366 impl HyfExplainResultRequest { 367 pub fn new(query: impl Into<String>, candidate: HyfSemanticCandidate) -> Self { 368 Self { 369 query: query.into(), 370 candidate, 371 } 372 } 373 } 374 375 #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] 376 pub struct HyfBuildIdentity { 377 pub protocol_version: u64, 378 } 379 380 #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] 381 pub struct HyfExecutionModes { 382 pub deterministic: bool, 383 #[serde(default)] 384 pub assisted: Option<bool>, 385 } 386 387 #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] 388 pub struct HyfStatusOutput { 389 pub build_identity: HyfBuildIdentity, 390 pub enabled_execution_modes: HyfExecutionModes, 391 } 392 393 #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] 394 pub struct HyfRequestContextContract { 395 pub accepted_features: Vec<String>, 396 pub effective_features: Vec<String>, 397 pub unsupported_field_behavior: String, 398 } 399 400 #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] 401 pub struct HyfBusinessCapability { 402 pub id: String, 403 pub kind: String, 404 pub deterministic_execution: String, 405 pub implementation_status: String, 406 pub callable: bool, 407 pub implemented: bool, 408 pub assisted_execution: String, 409 pub assisted_backend_available: bool, 410 #[serde(default)] 411 pub disabled_reason: Option<String>, 412 } 413 414 #[derive(Debug, Clone, Deserialize, PartialEq)] 415 pub struct HyfCapabilitiesOutput { 416 pub control_routes: Vec<String>, 417 pub business_capabilities: Vec<HyfBusinessCapability>, 418 pub assisted_backend_capabilities: Vec<Value>, 419 pub request_context_contract: HyfRequestContextContract, 420 } 421 422 #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] 423 pub struct HyfExtractedFilters { 424 pub local_intent: bool, 425 pub fulfillment: String, 426 pub time_window: String, 427 } 428 429 #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] 430 pub struct HyfQueryRewriteOutput { 431 pub original_text: String, 432 pub normalized_text: String, 433 pub rewritten_text: String, 434 pub query_terms: Vec<String>, 435 pub normalization_signals: Vec<String>, 436 pub ranking_hints: Vec<String>, 437 pub extracted_filters: HyfExtractedFilters, 438 } 439 440 #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] 441 pub struct HyfScoredCandidate { 442 pub id: String, 443 pub heuristic_score: i64, 444 pub matched_terms: Vec<String>, 445 pub reasons: Vec<String>, 446 pub delivery_alignment: String, 447 pub distance_band: String, 448 pub freshness_band: String, 449 pub scope_match: bool, 450 } 451 452 #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] 453 pub struct HyfSemanticRankOutput { 454 pub ranked_ids: Vec<String>, 455 pub reasons: BTreeMap<String, Vec<String>>, 456 pub scored_candidates: Vec<HyfScoredCandidate>, 457 pub ranking_hints: Vec<String>, 458 pub extracted_filters: HyfExtractedFilters, 459 } 460 461 #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] 462 pub struct HyfSignalAssessment { 463 pub delivery_alignment: String, 464 pub distance_band: String, 465 pub freshness_band: String, 466 pub scope_match: bool, 467 } 468 469 #[derive(Debug, Clone, Deserialize, PartialEq, Eq)] 470 pub struct HyfExplainResultOutput { 471 pub result_id: String, 472 pub explanation_kind: String, 473 pub summary: String, 474 pub score: i64, 475 pub reasons: Vec<String>, 476 pub matched_terms: Vec<String>, 477 pub ranking_hints: Vec<String>, 478 pub extracted_filters: HyfExtractedFilters, 479 pub signal_assessment: HyfSignalAssessment, 480 } 481 482 #[derive(Debug, Clone, Serialize, Default)] 483 struct HyfEmptyInput {} 484 485 #[derive(Debug, Serialize)] 486 struct HyfRequestEnvelope<'a, T> { 487 version: u64, 488 request_id: &'a str, 489 #[serde(skip_serializing_if = "Option::is_none")] 490 trace_id: Option<&'a str>, 491 capability: &'a str, 492 #[serde(skip_serializing_if = "Option::is_none")] 493 context: Option<&'a HyfRequestContext>, 494 input: &'a T, 495 } 496 497 #[derive(Debug, Deserialize)] 498 #[serde(bound(deserialize = "T: Deserialize<'de>"))] 499 struct HyfWireResponse<T> { 500 version: u64, 501 request_id: String, 502 #[serde(default)] 503 trace_id: Option<String>, 504 ok: bool, 505 #[serde(default)] 506 output: Option<T>, 507 #[serde(default)] 508 meta: Option<Value>, 509 #[serde(default)] 510 error: Option<HyfWireError>, 511 } 512 513 #[derive(Debug, Clone, Deserialize)] 514 struct HyfWireError { 515 #[serde(default)] 516 code: Option<String>, 517 #[serde(default)] 518 message: Option<String>, 519 } 520 521 pub fn resolve_runtime_client(config: &RuntimeConfig) -> Result<HyfClient, HyfStatusView> { 522 if !config.hyf.enabled { 523 return Err(disabled_status(config.hyf.executable.display().to_string())); 524 } 525 526 let Some(binding) = config.capability_binding(INFERENCE_HYF_STDIO_CAPABILITY) else { 527 return resolve_client(&config.hyf); 528 }; 529 530 match binding.target_kind { 531 CapabilityBindingTargetKind::ExplicitEndpoint => resolve_client(&HyfConfig { 532 enabled: true, 533 executable: binding.target.clone().into(), 534 }), 535 CapabilityBindingTargetKind::ManagedInstance => Err(unavailable_status( 536 config.hyf.executable.display().to_string(), 537 format!( 538 "configured hyf binding target `{}` uses unsupported target_kind `managed_instance`; use `explicit_endpoint` for `inference.hyf_stdio`", 539 binding.target 540 ), 541 None, 542 None, 543 )), 544 } 545 } 546 547 pub fn resolve_runtime_status(config: &RuntimeConfig) -> HyfStatusView { 548 match resolve_runtime_client(config) { 549 Ok(client) => resolve_status_for_client(&client), 550 Err(view) => view, 551 } 552 } 553 554 pub fn resolve_status(config: &HyfConfig) -> HyfStatusView { 555 match resolve_client(config) { 556 Ok(client) => resolve_status_for_client(&client), 557 Err(view) => view, 558 } 559 } 560 561 fn resolve_client(config: &HyfConfig) -> Result<HyfClient, HyfStatusView> { 562 let executable = config.executable.display().to_string(); 563 if !config.enabled { 564 return Err(disabled_status(executable)); 565 } 566 567 if config.executable.as_os_str().is_empty() { 568 return Err(unavailable_status( 569 executable, 570 "hyf executable path is not configured".to_owned(), 571 None, 572 None, 573 )); 574 } 575 576 Ok(HyfClient::new(config.executable.clone())) 577 } 578 579 fn resolve_status_for_client(client: &HyfClient) -> HyfStatusView { 580 let executable = client.executable().display().to_string(); 581 let response = match client.status() { 582 Ok(response) => response, 583 Err(HyfClientError::NotFound) => { 584 return unavailable_status( 585 executable, 586 format!( 587 "hyf executable was not found at {}", 588 client.executable().display() 589 ), 590 None, 591 None, 592 ); 593 } 594 Err(HyfClientError::Start(error)) => { 595 return unavailable_status( 596 executable, 597 format!( 598 "failed to start hyf control request at {}: {error}", 599 client.executable().display() 600 ), 601 None, 602 None, 603 ); 604 } 605 Err(HyfClientError::Write(error)) => { 606 return unavailable_status( 607 executable, 608 format!("failed to write hyf control request stdin: {error}"), 609 None, 610 None, 611 ); 612 } 613 Err(HyfClientError::Timeout(timeout_ms)) => { 614 return unavailable_status( 615 executable, 616 format!("hyf status control request timed out after {timeout_ms}ms"), 617 None, 618 None, 619 ); 620 } 621 Err(HyfClientError::Wait(error)) | Err(HyfClientError::Read(error)) => { 622 return unavailable_status( 623 executable, 624 format!("failed to capture hyf status control output: {error}"), 625 None, 626 None, 627 ); 628 } 629 Err(HyfClientError::NonZeroExit { status, stderr }) => { 630 return unavailable_status( 631 executable, 632 format_nonzero_exit("hyf status control request", status, stderr.as_str()), 633 None, 634 None, 635 ); 636 } 637 Err(HyfClientError::InvalidUtf8(error)) => { 638 return unavailable_status( 639 executable, 640 format!("hyf status output was not valid UTF-8: {error}"), 641 None, 642 None, 643 ); 644 } 645 Err(HyfClientError::InvalidJson(error)) => { 646 return unavailable_status( 647 executable, 648 format!("hyf status output was not valid JSON: {error}"), 649 None, 650 None, 651 ); 652 } 653 Err(HyfClientError::RemoteError { code, .. }) => { 654 let reason = code 655 .map(|code| format!("hyf status control request returned error code {code}")) 656 .unwrap_or_else(|| { 657 "hyf status control request returned an invalid error response".to_owned() 658 }); 659 return unavailable_status(executable, reason, None, None); 660 } 661 Err(HyfClientError::SerializeRequest(_) | HyfClientError::InvalidResponse(_)) => { 662 return unavailable_status( 663 executable, 664 "hyf status control request returned an invalid error response".to_owned(), 665 None, 666 None, 667 ); 668 } 669 }; 670 671 let protocol_version = Some(response.output.build_identity.protocol_version); 672 let deterministic_available = Some(response.output.enabled_execution_modes.deterministic); 673 674 if response.version != HYF_PROTOCOL_VERSION { 675 return unavailable_status( 676 executable, 677 format!( 678 "hyf status response version {:?} is incompatible with cli expected {}", 679 Some(response.version), 680 HYF_PROTOCOL_VERSION 681 ), 682 protocol_version, 683 deterministic_available, 684 ); 685 } 686 687 if response.request_id != HYF_STATUS_REQUEST_ID { 688 return unavailable_status( 689 executable, 690 "hyf status response did not preserve the control request id".to_owned(), 691 protocol_version, 692 deterministic_available, 693 ); 694 } 695 696 if protocol_version != Some(HYF_PROTOCOL_VERSION) { 697 return unavailable_status( 698 executable, 699 format!( 700 "hyf protocol version {:?} is incompatible with cli expected {}", 701 protocol_version, HYF_PROTOCOL_VERSION 702 ), 703 protocol_version, 704 deterministic_available, 705 ); 706 } 707 708 if deterministic_available != Some(true) { 709 return unavailable_status( 710 executable, 711 "hyf deterministic execution is unavailable".to_owned(), 712 protocol_version, 713 deterministic_available, 714 ); 715 } 716 717 HyfStatusView { 718 executable, 719 state: "ready".to_owned(), 720 source: HYF_SOURCE.to_owned(), 721 reason: Some("healthy · protocol 1 · deterministic available".to_owned()), 722 protocol_version, 723 deterministic_available, 724 } 725 } 726 727 fn collect_output_with_timeout( 728 mut child: Child, 729 timeout: Duration, 730 ) -> Result<Output, HyfClientError> { 731 let started_at = Instant::now(); 732 loop { 733 match child.try_wait() { 734 Ok(Some(status)) => return collect_output(child, status), 735 Ok(None) => { 736 if started_at.elapsed() >= timeout { 737 let _ = child.kill(); 738 let _ = child.wait(); 739 return Err(HyfClientError::Timeout(timeout.as_millis())); 740 } 741 thread::sleep(HYF_TIMEOUT_POLL_INTERVAL); 742 } 743 Err(error) => { 744 let _ = child.kill(); 745 let _ = child.wait(); 746 return Err(HyfClientError::Wait(error)); 747 } 748 } 749 } 750 } 751 752 fn collect_output(mut child: Child, status: ExitStatus) -> Result<Output, HyfClientError> { 753 let mut stdout = Vec::new(); 754 let mut stderr = Vec::new(); 755 756 if let Some(mut pipe) = child.stdout.take() { 757 pipe.read_to_end(&mut stdout) 758 .map_err(HyfClientError::Read)?; 759 } 760 if let Some(mut pipe) = child.stderr.take() { 761 pipe.read_to_end(&mut stderr) 762 .map_err(HyfClientError::Read)?; 763 } 764 765 Ok(Output { 766 status, 767 stdout, 768 stderr, 769 }) 770 } 771 772 fn disabled_status(executable: String) -> HyfStatusView { 773 HyfStatusView { 774 executable, 775 state: "disabled".to_owned(), 776 source: HYF_SOURCE.to_owned(), 777 reason: Some("disabled by config".to_owned()), 778 protocol_version: None, 779 deterministic_available: None, 780 } 781 } 782 783 fn unavailable_status( 784 executable: String, 785 reason: String, 786 protocol_version: Option<u64>, 787 deterministic_available: Option<bool>, 788 ) -> HyfStatusView { 789 HyfStatusView { 790 executable, 791 state: "unavailable".to_owned(), 792 source: HYF_SOURCE.to_owned(), 793 reason: Some(reason), 794 protocol_version, 795 deterministic_available, 796 } 797 } 798 799 fn format_nonzero_exit(request_label: &str, status: Option<i32>, stderr: &str) -> String { 800 match status { 801 Some(code) if stderr.is_empty() => { 802 format!("{request_label} exited with status code {code}") 803 } 804 Some(code) => { 805 format!("{request_label} exited with status code {code}: {stderr}") 806 } 807 None if stderr.is_empty() => format!("{request_label} terminated by signal"), 808 None => format!("{request_label} terminated by signal: {stderr}"), 809 } 810 } 811 812 #[cfg(test)] 813 mod tests { 814 use super::{ 815 HYF_PROTOCOL_VERSION, HyfClient, HyfClientError, HyfEmptyInput, HyfExplainResultRequest, 816 HyfQueryRewriteRequest, HyfRequestContext, HyfSemanticCandidate, HyfSemanticRankRequest, 817 resolve_status, 818 }; 819 use crate::runtime::config::HyfConfig; 820 use serde::Serialize; 821 use serde_json::Value; 822 use std::fs; 823 use std::os::unix::fs::PermissionsExt; 824 use std::path::{Path, PathBuf}; 825 use std::sync::{Mutex, MutexGuard, OnceLock}; 826 use std::time::Duration; 827 use tempfile::tempdir; 828 829 fn hyf_test_lock() -> &'static Mutex<()> { 830 static LOCK: OnceLock<Mutex<()>> = OnceLock::new(); 831 LOCK.get_or_init(|| Mutex::new(())) 832 } 833 834 fn lock_hyf_tests() -> MutexGuard<'static, ()> { 835 hyf_test_lock() 836 .lock() 837 .unwrap_or_else(|poisoned| poisoned.into_inner()) 838 } 839 840 #[test] 841 fn disabled_hyf_reports_disabled_state_without_spawning() { 842 let _guard = lock_hyf_tests(); 843 let view = resolve_status(&HyfConfig { 844 enabled: false, 845 executable: "hyfd".into(), 846 }); 847 assert_eq!(view.state, "disabled"); 848 assert_eq!(view.reason.as_deref(), Some("disabled by config")); 849 } 850 851 #[test] 852 fn healthy_hyf_status_reports_ready() { 853 let _guard = lock_hyf_tests(); 854 let dir = tempdir().expect("tempdir"); 855 let executable = write_response_script( 856 dir.path(), 857 format!( 858 "{{\"version\":{HYF_PROTOCOL_VERSION},\"request_id\":\"cli-doctor-hyf-status\",\"trace_id\":\"cli-doctor-hyf-status\",\"ok\":true,\"output\":{{\"build_identity\":{{\"protocol_version\":{HYF_PROTOCOL_VERSION}}},\"enabled_execution_modes\":{{\"deterministic\":true}}}}}}" 859 ) 860 .as_str(), 861 ); 862 863 let view = resolve_status(&HyfConfig { 864 enabled: true, 865 executable, 866 }); 867 assert_eq!(view.state, "ready", "reason: {:?}", view.reason); 868 assert_eq!(view.protocol_version, Some(HYF_PROTOCOL_VERSION)); 869 assert_eq!(view.deterministic_available, Some(true)); 870 } 871 872 #[test] 873 fn incompatible_hyf_status_reports_unavailable() { 874 let _guard = lock_hyf_tests(); 875 let dir = tempdir().expect("tempdir"); 876 let executable = write_response_script( 877 dir.path(), 878 "{\"version\":1,\"request_id\":\"cli-doctor-hyf-status\",\"trace_id\":\"cli-doctor-hyf-status\",\"ok\":true,\"output\":{\"build_identity\":{\"protocol_version\":2},\"enabled_execution_modes\":{\"deterministic\":true}}}", 879 ); 880 881 let view = resolve_status(&HyfConfig { 882 enabled: true, 883 executable, 884 }); 885 assert_eq!(view.state, "unavailable", "reason: {:?}", view.reason); 886 assert!( 887 view.reason 888 .as_deref() 889 .is_some_and(|reason| reason.contains("incompatible")) 890 ); 891 } 892 893 #[test] 894 fn capabilities_request_uses_typed_client() { 895 let _guard = lock_hyf_tests(); 896 let dir = tempdir().expect("tempdir"); 897 let executable = write_response_script( 898 dir.path(), 899 "{\"version\":1,\"request_id\":\"cli-runtime-hyf-capabilities\",\"ok\":true,\"output\":{\"control_routes\":[\"sys.status\",\"sys.capabilities\"],\"business_capabilities\":[{\"id\":\"query_rewrite\",\"kind\":\"business\",\"deterministic_execution\":\"enabled\",\"implementation_status\":\"implemented\",\"callable\":true,\"implemented\":true,\"assisted_execution\":\"unavailable\",\"assisted_backend_available\":false}],\"assisted_backend_capabilities\":[],\"request_context_contract\":{\"accepted_features\":[\"consumer\",\"execution_mode_preference\"],\"effective_features\":[\"execution_mode_preference\"],\"unsupported_field_behavior\":\"reject\"}}}", 900 ); 901 902 let request = request_json( 903 "cli-runtime-hyf-capabilities", 904 None, 905 "sys.capabilities", 906 None, 907 &HyfEmptyInput::default(), 908 ); 909 let response = 910 HyfClient::with_timeouts(executable, Duration::from_secs(5), Duration::from_secs(5)) 911 .capabilities() 912 .expect("capabilities"); 913 914 assert_eq!(request["capability"], "sys.capabilities"); 915 assert_eq!(request["input"], serde_json::json!({})); 916 assert!(request.get("context").is_none()); 917 assert_eq!( 918 response.output.control_routes, 919 vec!["sys.status", "sys.capabilities"] 920 ); 921 assert_eq!(response.output.business_capabilities[0].id, "query_rewrite"); 922 } 923 924 #[test] 925 fn query_rewrite_request_round_trips_typed_output() { 926 let _guard = lock_hyf_tests(); 927 let dir = tempdir().expect("tempdir"); 928 let executable = write_response_script( 929 dir.path(), 930 "{\"version\":1,\"request_id\":\"rewrite-test-1\",\"trace_id\":\"trace-rewrite-test-1\",\"ok\":true,\"output\":{\"original_text\":\"apples near me with weekend pickup\",\"normalized_text\":\"apples near me with weekend pickup\",\"rewritten_text\":\"apples\",\"query_terms\":[\"apples\"],\"normalization_signals\":[\"local_intent_detected\"],\"ranking_hints\":[\"prefer_local_results\"],\"extracted_filters\":{\"local_intent\":true,\"fulfillment\":\"pickup\",\"time_window\":\"weekend\"}},\"meta\":{\"execution_mode\":\"deterministic\",\"backend\":\"heuristic\"}}", 931 ); 932 let context = HyfRequestContext::deterministic_cli().with_return_provenance(true); 933 let request = request_json( 934 "rewrite-test-1", 935 Some("trace-rewrite-test-1"), 936 "query_rewrite", 937 Some(&context), 938 &HyfQueryRewriteRequest::new("apples near me with weekend pickup"), 939 ); 940 let client = HyfClient::new(executable); 941 let response = client 942 .query_rewrite( 943 "rewrite-test-1", 944 Some("trace-rewrite-test-1"), 945 &context, 946 &HyfQueryRewriteRequest::new("apples near me with weekend pickup"), 947 ) 948 .expect("query rewrite"); 949 assert_eq!(request["capability"], "query_rewrite"); 950 assert_eq!( 951 request["context"]["execution_mode_preference"], 952 "deterministic" 953 ); 954 assert_eq!(request["context"]["consumer"], "radroots-cli"); 955 assert_eq!(request["context"]["return_provenance"], true); 956 assert_eq!( 957 request["input"]["query"], 958 "apples near me with weekend pickup" 959 ); 960 assert_eq!(response.output.rewritten_text, "apples"); 961 assert_eq!(response.output.query_terms, vec!["apples"]); 962 assert_eq!( 963 response.meta, 964 Some(serde_json::json!({"execution_mode":"deterministic","backend":"heuristic"})) 965 ); 966 } 967 968 #[test] 969 fn business_requests_use_a_longer_timeout_than_control_requests() { 970 let _guard = lock_hyf_tests(); 971 let dir = tempdir().expect("tempdir"); 972 let executable = write_script( 973 dir.path(), 974 "#!/bin/sh\nread -r request || exit 64\ncase \"$request\" in\n *'\"capability\":\"sys.status\"'*)\n sleep 1\n cat <<'JSON'\n{\"version\":1,\"request_id\":\"cli-doctor-hyf-status\",\"trace_id\":\"cli-doctor-hyf-status\",\"ok\":true,\"output\":{\"build_identity\":{\"protocol_version\":1},\"enabled_execution_modes\":{\"deterministic\":true}}}\nJSON\n ;;\n *'\"capability\":\"query_rewrite\"'*)\n sleep 1\n cat <<'JSON'\n{\"version\":1,\"request_id\":\"rewrite-timeout-test\",\"trace_id\":\"rewrite-timeout-test\",\"ok\":true,\"output\":{\"original_text\":\"henhouse\",\"normalized_text\":\"henhouse\",\"rewritten_text\":\"eggs\",\"query_terms\":[\"eggs\"],\"normalization_signals\":[\"query_rewrite\"],\"ranking_hints\":[\"local_first\"],\"extracted_filters\":{\"local_intent\":false,\"fulfillment\":\"any\",\"time_window\":\"any\"}}}\nJSON\n ;;\n *)\n exit 65\n ;;\nesac\n", 975 ); 976 let client = HyfClient::with_timeouts( 977 executable, 978 Duration::from_millis(100), 979 Duration::from_secs(5), 980 ); 981 assert!(client.timeouts.business > client.timeouts.control); 982 983 let status = client.status().expect_err("status should time out"); 984 assert!(matches!(status, HyfClientError::Timeout(100))); 985 986 let rewrite = client 987 .query_rewrite( 988 "rewrite-timeout-test", 989 Some("rewrite-timeout-test"), 990 &HyfRequestContext::deterministic_cli(), 991 &HyfQueryRewriteRequest::new("henhouse"), 992 ) 993 .expect("query rewrite should use longer timeout"); 994 assert_eq!(rewrite.output.rewritten_text, "eggs"); 995 } 996 997 #[test] 998 fn semantic_rank_request_round_trips_typed_output() { 999 let _guard = lock_hyf_tests(); 1000 let dir = tempdir().expect("tempdir"); 1001 let executable = write_response_script( 1002 dir.path(), 1003 "{\"version\":1,\"request_id\":\"rank-test-1\",\"ok\":true,\"output\":{\"ranked_ids\":[\"listing_local_1\",\"listing_regional_1\"],\"reasons\":{\"listing_local_1\":[\"apples match\",\"pickup match\"],\"listing_regional_1\":[\"delivery mismatch\"]},\"scored_candidates\":[{\"id\":\"listing_local_1\",\"heuristic_score\":14,\"matched_terms\":[\"apples\"],\"reasons\":[\"apples match\",\"pickup match\"],\"delivery_alignment\":\"match\",\"distance_band\":\"closer\",\"freshness_band\":\"fresher\",\"scope_match\":true}],\"ranking_hints\":[\"prefer_local_results\"],\"extracted_filters\":{\"local_intent\":true,\"fulfillment\":\"pickup\",\"time_window\":\"weekend\"}},\"meta\":{\"execution_mode\":\"deterministic\",\"backend\":\"heuristic\"}}", 1004 ); 1005 let context = HyfRequestContext::deterministic_cli() 1006 .with_listing_scope(vec!["listing_local_1".to_owned()]); 1007 let request = request_json( 1008 "rank-test-1", 1009 None, 1010 "semantic_rank", 1011 Some(&context), 1012 &HyfSemanticRankRequest::new( 1013 "apples near me with weekend pickup", 1014 vec![sample_candidate("listing_local_1")], 1015 ), 1016 ); 1017 let client = HyfClient::new(executable); 1018 let response = client 1019 .semantic_rank( 1020 "rank-test-1", 1021 None, 1022 &context, 1023 &HyfSemanticRankRequest::new( 1024 "apples near me with weekend pickup", 1025 vec![sample_candidate("listing_local_1")], 1026 ), 1027 ) 1028 .expect("semantic rank"); 1029 1030 assert_eq!(request["capability"], "semantic_rank"); 1031 assert_eq!( 1032 request["context"]["scope"]["listing_ids"], 1033 serde_json::json!(["listing_local_1"]) 1034 ); 1035 assert_eq!(request["input"]["candidates"][0]["id"], "listing_local_1"); 1036 assert_eq!(response.output.ranked_ids[0], "listing_local_1"); 1037 assert_eq!(response.output.scored_candidates[0].heuristic_score, 14); 1038 } 1039 1040 #[test] 1041 fn explain_result_request_round_trips_typed_output() { 1042 let _guard = lock_hyf_tests(); 1043 let dir = tempdir().expect("tempdir"); 1044 let executable = write_response_script( 1045 dir.path(), 1046 "{\"version\":1,\"request_id\":\"explain-test-1\",\"trace_id\":\"trace-explain-test-1\",\"ok\":true,\"output\":{\"result_id\":\"listing_local_1\",\"explanation_kind\":\"deterministic\",\"summary\":\"Result listing_local_1 was ranked using deterministic heuristic signals: apples match and pickup match.\",\"score\":14,\"reasons\":[\"apples match\",\"pickup match\"],\"matched_terms\":[\"apples\"],\"ranking_hints\":[\"prefer_local_results\"],\"extracted_filters\":{\"local_intent\":true,\"fulfillment\":\"pickup\",\"time_window\":\"weekend\"},\"signal_assessment\":{\"delivery_alignment\":\"match\",\"distance_band\":\"closer\",\"freshness_band\":\"fresher\",\"scope_match\":true}},\"meta\":{\"execution_mode\":\"deterministic\",\"backend\":\"heuristic\"}}", 1047 ); 1048 let context = HyfRequestContext::deterministic_cli().with_return_provenance(true); 1049 let request = request_json( 1050 "explain-test-1", 1051 Some("trace-explain-test-1"), 1052 "explain_result", 1053 Some(&context), 1054 &HyfExplainResultRequest::new( 1055 "apples near me with weekend pickup", 1056 sample_candidate("listing_local_1"), 1057 ), 1058 ); 1059 let client = HyfClient::new(executable); 1060 let response = client 1061 .explain_result( 1062 "explain-test-1", 1063 Some("trace-explain-test-1"), 1064 &context, 1065 &HyfExplainResultRequest::new( 1066 "apples near me with weekend pickup", 1067 sample_candidate("listing_local_1"), 1068 ), 1069 ) 1070 .expect("explain result"); 1071 1072 assert_eq!(request["capability"], "explain_result"); 1073 assert_eq!(request["context"]["return_provenance"], true); 1074 assert_eq!(request["input"]["candidate"]["id"], "listing_local_1"); 1075 assert_eq!(response.output.result_id, "listing_local_1"); 1076 assert_eq!( 1077 response.output.signal_assessment.delivery_alignment, 1078 "match" 1079 ); 1080 } 1081 1082 fn sample_candidate(id: &str) -> HyfSemanticCandidate { 1083 HyfSemanticCandidate { 1084 id: id.to_owned(), 1085 title: "Organic apples".to_owned(), 1086 farm: "Local Orchard".to_owned(), 1087 delivery: "pickup".to_owned(), 1088 distance_km: 4.1, 1089 freshness_minutes: 3, 1090 } 1091 } 1092 1093 fn request_json<T: Serialize>( 1094 request_id: &str, 1095 trace_id: Option<&str>, 1096 capability: &str, 1097 context: Option<&HyfRequestContext>, 1098 input: &T, 1099 ) -> Value { 1100 let raw = super::serialize_request(request_id, trace_id, capability, context, input) 1101 .expect("serialize request"); 1102 serde_json::from_str(raw.as_str()).expect("request json") 1103 } 1104 1105 fn write_response_script(dir: &Path, response: &str) -> PathBuf { 1106 write_script( 1107 dir, 1108 format!("#!/bin/sh\nread -r _request || exit 64\ncat <<'JSON'\n{response}\nJSON\n") 1109 .as_str(), 1110 ) 1111 } 1112 fn write_script(dir: &Path, script: &str) -> PathBuf { 1113 let path = dir.join("fake-hyfd"); 1114 fs::write(&path, script).expect("write fake hyfd"); 1115 let mut permissions = fs::metadata(&path).expect("metadata").permissions(); 1116 permissions.set_mode(0o755); 1117 fs::set_permissions(&path, permissions).expect("chmod fake hyfd"); 1118 path 1119 } 1120 }