commit bc3706aab53c4ac838d0b13c0da5c8ad5fe8c13f
parent c89a8187e0046e1aafcd88e43b4738971dfe9dbd
Author: triesap <tyson@radroots.org>
Date: Wed, 8 Apr 2026 22:27:31 +0000
tests: extract hyf stdio process helper
Diffstat:
2 files changed, 133 insertions(+), 123 deletions(-)
diff --git a/tests/stdio_process_helper.mojo b/tests/stdio_process_helper.mojo
@@ -0,0 +1,117 @@
+import std.os
+from std.os import Pipe, Process
+from std.ffi import CStringSlice, c_int, external_call
+from std.sys._libc import close, exit, vfork
+
+from mojson import Value, loads
+
+
+comptime HYF_DIAGNOSTICS_DIR_ENV = "HYF_DIAGNOSTICS_DIR"
+
+
+struct ScopedEnvVar:
+ var name: String
+ var value: String
+ var previous: String
+ var had_previous: Bool
+
+ def __init__(out self, name: String, value: String):
+ self.name = String(name)
+ self.value = String(value)
+ self.previous = std.os.getenv(name)
+ self.had_previous = self.previous != ""
+
+ def __enter__(mut self) raises:
+ _ = std.os.setenv(self.name, self.value, overwrite=True)
+
+ def __exit__(mut self):
+ if self.had_previous:
+ _ = std.os.setenv(self.name, self.previous, overwrite=True)
+ else:
+ _ = std.os.unsetenv(self.name)
+
+
+def _dup2(oldfd: c_int, newfd: c_int) -> c_int:
+ return external_call["dup2", c_int](oldfd, newfd)
+
+
+def _read_pipe_to_string(mut pipe: Pipe) raises -> String:
+ var buffer = InlineArray[Byte, 4096](fill=0)
+ var output = String("")
+ while True:
+ var read = pipe.read_bytes(Span(buffer))
+ if read == 0:
+ break
+ output += String(
+ from_utf8=Span(ptr=buffer.unsafe_ptr(), length=Int(read))
+ )
+ return output^
+
+
+def run_stdio_entrypoint(entrypoint: String, request_json: String) raises -> Value:
+ var stdin_pipe = Pipe()
+ var stdout_pipe = Pipe()
+ var output = String("")
+ var command = String("mojo")
+ var include_flag = String("-I")
+ var include_path = String("src")
+ var entrypoint_path = String(entrypoint)
+ var argv = List[Optional[CStringSlice[ImmutAnyOrigin]]](
+ length=6, fill={}
+ )
+ argv[0] = rebind[CStringSlice[ImmutAnyOrigin]](
+ command.as_c_string_slice()
+ )
+ argv[1] = rebind[CStringSlice[ImmutAnyOrigin]](
+ "run".as_c_string_slice()
+ )
+ argv[2] = rebind[CStringSlice[ImmutAnyOrigin]](
+ include_flag.as_c_string_slice()
+ )
+ argv[3] = rebind[CStringSlice[ImmutAnyOrigin]](
+ include_path.as_c_string_slice()
+ )
+ argv[4] = rebind[CStringSlice[ImmutAnyOrigin]](
+ entrypoint_path.as_c_string_slice()
+ )
+
+ var pid = vfork()
+ if pid < 0:
+ raise Error("failed to spawn hyf process test child")
+
+ if pid == 0:
+ if _dup2(c_int(stdin_pipe.fd_in.value().value), 0) < 0:
+ exit(126)
+ if _dup2(c_int(stdout_pipe.fd_out.value().value), 1) < 0:
+ exit(126)
+ _ = close(c_int(stdin_pipe.fd_in.value().value))
+ _ = close(c_int(stdin_pipe.fd_out.value().value))
+ _ = close(c_int(stdout_pipe.fd_in.value().value))
+ _ = close(c_int(stdout_pipe.fd_out.value().value))
+ _ = external_call["execvp", c_int](
+ command.as_c_string_slice().unsafe_ptr(),
+ argv.unsafe_ptr(),
+ )
+ exit(127)
+
+ stdin_pipe.set_output_only()
+ stdout_pipe.set_input_only()
+
+ stdin_pipe.write_bytes((request_json + "\n").as_bytes())
+ stdin_pipe.set_input_only()
+
+ output = _read_pipe_to_string(stdout_pipe)
+ stdout_pipe.set_output_only()
+
+ var process = Process(Int(pid))
+ var status = process.wait()
+ if not status.exit_code or status.exit_code.value() != 0:
+ raise Error("hyf process exited unexpectedly")
+
+ if output == "":
+ raise Error("hyf process returned no stdout payload")
+ return loads(output)
+
+
+def run_hyf_stdio(request_json: String) raises -> Value:
+ return run_stdio_entrypoint("src/main.mojo", request_json)
diff --git a/tests/test_stdio_contract.mojo b/tests/test_stdio_contract.mojo
@@ -1,128 +1,21 @@
import std.os
-from std.os import Pipe, Process
from std.os.path import exists
from std.pathlib import Path
from std.testing import assert_equal, assert_true, TestSuite
-from std.ffi import CStringSlice, c_int, external_call
-from std.sys._libc import close, exit, vfork
from std.tempfile import TemporaryDirectory
-from mojson import Value, loads
+from mojson import Value
+from stdio_process_helper import (
+ HYF_DIAGNOSTICS_DIR_ENV,
+ ScopedEnvVar,
+ run_hyf_stdio,
+ run_stdio_entrypoint,
+)
comptime _EXPECTED_INTERNAL_ERROR_MESSAGE = (
"internal hyf daemon error; inspect local diagnostics"
)
-comptime _HYF_DIAGNOSTICS_DIR_ENV = "HYF_DIAGNOSTICS_DIR"
-
-
-struct ScopedEnvVar:
- var name: String
- var value: String
- var previous: String
- var had_previous: Bool
-
- def __init__(out self, name: String, value: String):
- self.name = String(name)
- self.value = String(value)
- self.previous = std.os.getenv(name)
- self.had_previous = self.previous != ""
-
- def __enter__(mut self) raises:
- _ = std.os.setenv(self.name, self.value, overwrite=True)
-
- def __exit__(mut self):
- if self.had_previous:
- _ = std.os.setenv(self.name, self.previous, overwrite=True)
- else:
- _ = std.os.unsetenv(self.name)
-
-
-def _dup2(oldfd: c_int, newfd: c_int) -> c_int:
- return external_call["dup2", c_int](oldfd, newfd)
-
-
-def _read_pipe_to_string(mut pipe: Pipe) raises -> String:
- var buffer = InlineArray[Byte, 4096](fill=0)
- var output = String("")
- while True:
- var read = pipe.read_bytes(Span(buffer))
- if read == 0:
- break
- output += String(
- from_utf8=Span(ptr=buffer.unsafe_ptr(), length=Int(read))
- )
- return output^
-
-
-def _run_entrypoint(entrypoint: String, request_json: String) raises -> Value:
- var stdin_pipe = Pipe()
- var stdout_pipe = Pipe()
- var output = String("")
- var command = String("mojo")
- var include_flag = String("-I")
- var include_path = String("src")
- var entrypoint_path = String(entrypoint)
- var argv = List[Optional[CStringSlice[ImmutAnyOrigin]]](
- length=6, fill={}
- )
- argv[0] = rebind[CStringSlice[ImmutAnyOrigin]](
- command.as_c_string_slice()
- )
- argv[1] = rebind[CStringSlice[ImmutAnyOrigin]](
- "run".as_c_string_slice()
- )
- argv[2] = rebind[CStringSlice[ImmutAnyOrigin]](
- include_flag.as_c_string_slice()
- )
- argv[3] = rebind[CStringSlice[ImmutAnyOrigin]](
- include_path.as_c_string_slice()
- )
- argv[4] = rebind[CStringSlice[ImmutAnyOrigin]](
- entrypoint_path.as_c_string_slice()
- )
-
- var pid = vfork()
- if pid < 0:
- raise Error("failed to spawn hyf process test child")
-
- if pid == 0:
- if _dup2(c_int(stdin_pipe.fd_in.value().value), 0) < 0:
- exit(126)
- if _dup2(c_int(stdout_pipe.fd_out.value().value), 1) < 0:
- exit(126)
- _ = close(c_int(stdin_pipe.fd_in.value().value))
- _ = close(c_int(stdin_pipe.fd_out.value().value))
- _ = close(c_int(stdout_pipe.fd_in.value().value))
- _ = close(c_int(stdout_pipe.fd_out.value().value))
- _ = external_call["execvp", c_int](
- command.as_c_string_slice().unsafe_ptr(),
- argv.unsafe_ptr(),
- )
- exit(127)
-
- stdin_pipe.set_output_only()
- stdout_pipe.set_input_only()
-
- stdin_pipe.write_bytes((request_json + "\n").as_bytes())
- stdin_pipe.set_input_only()
-
- output = _read_pipe_to_string(stdout_pipe)
- stdout_pipe.set_output_only()
-
- var process = Process(Int(pid))
- var status = process.wait()
- if not status.exit_code or status.exit_code.value() != 0:
- raise Error("hyf process exited unexpectedly")
-
- if output == "":
- raise Error("hyf process returned no stdout payload")
- return loads(output)
-
-
-def _run_hyf(request_json: String) raises -> Value:
- return _run_entrypoint("src/main.mojo", request_json)
-
def _has_key(value: Value, key: String) -> Bool:
for candidate in value.object_keys():
if candidate == key:
@@ -131,7 +24,7 @@ def _has_key(value: Value, key: String) -> Bool:
def test_status_success() raises:
- var response = _run_hyf(
+ var response = run_hyf_stdio(
'{"version":1,"request_id":"status-proc-1","trace_id":"trace-status-proc-1","capability":"sys.status","input":{}}'
)
@@ -158,7 +51,7 @@ def test_status_success() raises:
def test_invalid_envelope_preserves_correlation() raises:
- var response = _run_hyf(
+ var response = run_hyf_stdio(
'{"version":2,"request_id":"bad-envelope-proc-1","trace_id":"trace-bad-envelope-proc-1","capability":"sys.status","input":{}}'
)
@@ -172,7 +65,7 @@ def test_invalid_envelope_preserves_correlation() raises:
def test_assisted_request_fails_explicitly() raises:
- var response = _run_hyf(
+ var response = run_hyf_stdio(
'{"version":1,"request_id":"assisted-proc-1","capability":"query_rewrite","context":{"execution_mode_preference":"assisted"},"input":{"text":"eggs near me"}}'
)
@@ -181,7 +74,7 @@ def test_assisted_request_fails_explicitly() raises:
def test_semantic_rank_exports_heuristic_score_without_latency() raises:
- var response = _run_hyf(
+ var response = run_hyf_stdio(
'{"version":1,"request_id":"rank-proc-1","capability":"semantic_rank","input":{"query":"eggs near me with weekend pickup","candidates":[{"id":"lst_7ak2","title":"Pasture eggs","farm":"La Huerta del Sur","delivery":"pickup","distance_km":3.2,"freshness_minutes":2},{"id":"lst_8k1p","title":"Free range eggs","farm":"Santa Elena","delivery":"delivery","distance_km":8.7,"freshness_minutes":18}]}}'
)
@@ -195,7 +88,7 @@ def test_semantic_rank_exports_heuristic_score_without_latency() raises:
def test_strict_query_rewrite_failure() raises:
- var response = _run_hyf(
+ var response = run_hyf_stdio(
'{"version":1,"request_id":"rewrite-bad-proc-1","capability":"query_rewrite","input":{"text":"eggs near me","tone":"brief"}}'
)
@@ -208,7 +101,7 @@ def test_strict_query_rewrite_failure() raises:
def test_strict_semantic_rank_failure() raises:
- var response = _run_hyf(
+ var response = run_hyf_stdio(
'{"version":1,"request_id":"rank-bad-proc-1","capability":"semantic_rank","input":{"query":"eggs near me","candidates":[{"id":"lst_7ak2","title":"Pasture eggs","farm":"La Huerta del Sur","delivery":"pickup","distance_km":3.2,"freshness_minutes":2,"rating":5}]}}'
)
@@ -221,7 +114,7 @@ def test_strict_semantic_rank_failure() raises:
def test_internal_error_is_bounded_on_wire() raises:
- var response = _run_entrypoint(
+ var response = run_stdio_entrypoint(
"tests/internal_error_stdio_main.mojo",
'{"version":1,"request_id":"status-internal-proc-1","trace_id":"trace-status-internal-proc-1","capability":"sys.status","input":{}}',
)
@@ -253,9 +146,9 @@ def test_internal_error_records_detail_in_explicit_diagnostics_dir() raises:
with TemporaryDirectory() as temp_dir:
var diagnostics_dir = Path(temp_dir) / "hyf-internal-diagnostics"
with ScopedEnvVar(
- _HYF_DIAGNOSTICS_DIR_ENV, diagnostics_dir.__fspath__()
+ HYF_DIAGNOSTICS_DIR_ENV, diagnostics_dir.__fspath__()
):
- var response = _run_entrypoint(
+ var response = run_stdio_entrypoint(
"tests/internal_error_stdio_main.mojo",
'{"version":1,"request_id":"status-internal-proc-diag-1","trace_id":"trace-status-internal-proc-diag-1","capability":"sys.status","input":{}}',
)