commit becd9e6cc6bcab00c1cf5d66409a298897fd522f
parent 420f0dbbd7e83982a9c6c94d778d60e9cc2523ac
Author: triesap <tyson@radroots.org>
Date: Wed, 8 Apr 2026 19:43:05 +0000
stdio: reconcile hyf wire envelope contract
Diffstat:
5 files changed, 201 insertions(+), 24 deletions(-)
diff --git a/src/hyf_core/metadata.mojo b/src/hyf_core/metadata.mojo
@@ -1,3 +1,7 @@
+def hyf_protocol_version() -> Int:
+ return 1
+
+
@fieldwise_init
struct HyfBuildIdentity(Copyable, Movable):
var service_name: String
@@ -18,7 +22,7 @@ def current_build_identity() -> HyfBuildIdentity:
package_version="0.1.0",
daemon_name="hyfd",
transport="stdio",
- protocol_version=1,
+ protocol_version=hyf_protocol_version(),
default_execution_mode="deterministic",
deterministic_execution_available=True,
assisted_execution_available=False,
diff --git a/src/hyf_stdio/codec.mojo b/src/hyf_stdio/codec.mojo
@@ -1,5 +1,6 @@
-from mojson import dumps
-from mojson.deserialize import deserialize
+from std.collections import Optional
+
+from mojson import Value, dumps, loads
from hyf_stdio.envelope import (
WireErrorResponse,
@@ -8,10 +9,54 @@ from hyf_stdio.envelope import (
)
+@fieldwise_init
+struct RequestCorrelation(Copyable, Movable):
+ var request_id: String
+ var trace_id: Optional[String]
+
+
+def _extract_optional_string(value: Value, key: String) -> Optional[String]:
+ if not value.is_object():
+ return None
+
+ for candidate in value.object_keys():
+ if candidate == key:
+ try:
+ var field_value = value[key]
+ if field_value.is_string():
+ return String(field_value.string_value())
+ except e:
+ pass
+ return None
+ return None
+
+
def decode_request(line: String) raises -> WireRequest:
if line == "":
raise Error("request line must not be empty")
- return deserialize[WireRequest](line)
+
+ var json = loads(line)
+ return WireRequest.from_json(json)
+
+
+def extract_request_correlation(line: String) -> RequestCorrelation:
+ var request_id = String()
+ var trace_id: Optional[String] = None
+
+ if line == "":
+ return RequestCorrelation(request_id=request_id, trace_id=trace_id^)
+
+ try:
+ var json = loads(line)
+ var extracted_request_id = _extract_optional_string(json, "request_id")
+ if extracted_request_id:
+ request_id = extracted_request_id.value()
+
+ trace_id = _extract_optional_string(json, "trace_id")
+ except e:
+ pass
+
+ return RequestCorrelation(request_id=request_id, trace_id=trace_id^)
def encode_success(response: WireSuccessResponse) raises -> String:
diff --git a/src/hyf_stdio/envelope.mojo b/src/hyf_stdio/envelope.mojo
@@ -3,6 +3,7 @@ from std.collections import Optional
from mojson import Value, loads
from mojson.deserialize import Deserializable, get_string
+from hyf_core.metadata import hyf_protocol_version
from hyf_core.request_context import RequestContext, parse_request_context
from hyf_stdio.errors import WireError
@@ -22,7 +23,9 @@ def _require_non_empty(value: String, field_name: String) raises:
def _require_request_keys(value: Value) raises:
for key in value.object_keys():
if (
- key != "request_id"
+ key != "version"
+ and key != "request_id"
+ and key != "trace_id"
and key != "capability"
and key != "context"
and key != "input"
@@ -39,9 +42,39 @@ def _has_key(value: Value, key: String) -> Bool:
return False
+def _require_protocol_version(json: Value) raises -> Int:
+ if not _has_key(json, "version"):
+ raise Error("request envelope field 'version' is required")
+
+ var version = json["version"]
+ if not version.is_int():
+ raise Error("request envelope field 'version' must be an integer")
+
+ var version_value = Int(version.int_value())
+ if version_value != hyf_protocol_version():
+ raise Error(
+ "request envelope version "
+ + String(version_value)
+ + " is unsupported; expected "
+ + String(hyf_protocol_version())
+ )
+ return version_value
+
+
+def _parse_optional_trace_id(json: Value) raises -> Optional[String]:
+ if not _has_key(json, "trace_id"):
+ return None
+
+ var trace_id = get_string(json, "trace_id")
+ _require_non_empty(trace_id, "trace_id")
+ return String(trace_id)
+
+
@fieldwise_init
struct WireRequest(Deserializable, Copyable, Movable):
+ var version: Int
var request_id: String
+ var trace_id: Optional[String]
var capability: String
var context: RequestContext
var input: Value
@@ -50,10 +83,13 @@ struct WireRequest(Deserializable, Copyable, Movable):
def from_json(json: Value) raises -> Self:
_require_object(json, "request envelope")
_require_request_keys(json)
+ var version = _require_protocol_version(json)
var request_id = get_string(json, "request_id")
_require_non_empty(request_id, "request_id")
+ var trace_id = _parse_optional_trace_id(json)
+
var capability = get_string(json, "capability")
_require_non_empty(capability, "capability")
@@ -64,7 +100,9 @@ struct WireRequest(Deserializable, Copyable, Movable):
var context = parse_request_context(context_json)
return Self(
+ version=version,
request_id=request_id,
+ trace_id=trace_id^,
capability=capability,
context=context^,
input=json["input"].clone(),
@@ -73,13 +111,18 @@ struct WireRequest(Deserializable, Copyable, Movable):
@fieldwise_init
struct WireSuccessResponse(Copyable, Movable):
+ var version: Int
var request_id: String
+ var trace_id: Optional[String]
var output: Value
var meta: Optional[Value]
def to_json_value(self) raises -> Value:
var value = loads("{}")
+ value.set("version", Value(self.version))
value.set("request_id", Value(String(self.request_id)))
+ if self.trace_id:
+ value.set("trace_id", Value(String(self.trace_id.value())))
value.set("ok", Value(True))
value.set("output", self.output.clone())
if self.meta:
@@ -89,12 +132,17 @@ struct WireSuccessResponse(Copyable, Movable):
@fieldwise_init
struct WireErrorResponse(Copyable, Movable):
+ var version: Int
var request_id: String
+ var trace_id: Optional[String]
var error: WireError
def to_json_value(self) raises -> Value:
var value = loads("{}")
+ value.set("version", Value(self.version))
value.set("request_id", Value(String(self.request_id)))
+ if self.trace_id:
+ value.set("trace_id", Value(String(self.trace_id.value())))
value.set("ok", Value(False))
value.set("error", self.error.to_json_value())
return value^
diff --git a/src/hyf_stdio/server.mojo b/src/hyf_stdio/server.mojo
@@ -12,7 +12,13 @@ from hyf_core.capabilities.registry import (
from hyf_core.capabilities.query_rewrite import execute_query_rewrite
from hyf_core.capabilities.semantic_rank import execute_semantic_rank
from hyf_core.errors import CapabilityFailure, CapabilityResult, CapabilitySuccess
-from hyf_stdio.codec import decode_request, encode_error, encode_success
+from hyf_core.metadata import hyf_protocol_version
+from hyf_stdio.codec import (
+ decode_request,
+ encode_error,
+ encode_success,
+ extract_request_correlation,
+)
from hyf_stdio.control.capabilities import build_capabilities_output
from hyf_stdio.control.status import build_status_output
from hyf_stdio.envelope import (
@@ -38,21 +44,27 @@ def _read_request_line() raises -> String:
def _unsupported_response(request: WireRequest) -> WireErrorResponse:
return WireErrorResponse(
+ version=hyf_protocol_version(),
request_id=String(request.request_id),
+ trace_id=request.trace_id,
error=unsupported_capability_error(String(request.capability)),
)
def _disabled_response(request: WireRequest) -> WireErrorResponse:
return WireErrorResponse(
+ version=hyf_protocol_version(),
request_id=String(request.request_id),
+ trace_id=request.trace_id,
error=capability_disabled_error(String(request.capability)),
)
def _unavailable_response(request: WireRequest) -> WireErrorResponse:
return WireErrorResponse(
+ version=hyf_protocol_version(),
request_id=String(request.request_id),
+ trace_id=request.trace_id,
error=capability_unavailable_error(String(request.capability)),
)
@@ -66,39 +78,53 @@ def _write_success(response: WireSuccessResponse) raises:
def _wire_error_from_core_failure(
- request_id: String, failure: CapabilityFailure
+ request_id: String,
+ trace_id: Optional[String],
+ failure: CapabilityFailure,
) -> WireErrorResponse:
var code = String(failure.error.code)
if code == "invalid_input":
code = "invalid_request"
return WireErrorResponse(
+ version=hyf_protocol_version(),
request_id=request_id,
+ trace_id=trace_id,
error=WireError(code=code, message=String(failure.error.message)),
)
def _wire_success_from_core_success(
- request_id: String, success: CapabilitySuccess
+ request_id: String,
+ trace_id: Optional[String],
+ success: CapabilitySuccess,
) raises -> WireSuccessResponse:
var meta: Optional[Value] = None
if success.meta:
meta = serialize_core_response_meta(success.meta.value())
return WireSuccessResponse(
+ version=hyf_protocol_version(),
request_id=request_id,
+ trace_id=trace_id,
output=success.output.clone(),
meta=meta^,
)
def _dispatch_capability_result(
- request_id: String, result: CapabilityResult
+ request_id: String,
+ trace_id: Optional[String],
+ result: CapabilityResult,
) raises -> String:
if result.failure:
return encode_error(
- _wire_error_from_core_failure(request_id, result.failure.value())
+ _wire_error_from_core_failure(
+ request_id, trace_id, result.failure.value()
+ )
)
return encode_success(
- _wire_success_from_core_success(request_id, result.success.value())
+ _wire_success_from_core_success(
+ request_id, trace_id, result.success.value()
+ )
)
@@ -106,30 +132,33 @@ def _dispatch_query_rewrite(request: WireRequest, request_id: String) raises ->
var result = execute_query_rewrite(
request.input.clone(), request.context.copy()
)
- return _dispatch_capability_result(request_id, result)
+ return _dispatch_capability_result(request_id, request.trace_id, result)
def _dispatch_semantic_rank(request: WireRequest, request_id: String) raises -> String:
var result = execute_semantic_rank(
request.input.clone(), request.context.copy()
)
- return _dispatch_capability_result(request_id, result)
+ return _dispatch_capability_result(request_id, request.trace_id, result)
def _dispatch_explain_result(request: WireRequest, request_id: String) raises -> String:
var result = execute_explain_result(
request.input.clone(), request.context.copy()
)
- return _dispatch_capability_result(request_id, result)
+ return _dispatch_capability_result(request_id, request.trace_id, result)
def handle_request(request: WireRequest) raises -> String:
var request_id = String(request.request_id)
+ var trace_id = request.trace_id
try:
if request.capability == "sys.status":
return encode_success(
WireSuccessResponse(
+ version=hyf_protocol_version(),
request_id=request_id,
+ trace_id=trace_id,
output=build_status_output(),
meta=None,
)
@@ -137,7 +166,9 @@ def handle_request(request: WireRequest) raises -> String:
elif request.capability == "sys.capabilities":
return encode_success(
WireSuccessResponse(
+ version=hyf_protocol_version(),
request_id=request_id,
+ trace_id=trace_id,
output=build_capabilities_output(),
meta=None,
)
@@ -156,7 +187,9 @@ def handle_request(request: WireRequest) raises -> String:
except e:
return encode_error(
WireErrorResponse(
+ version=hyf_protocol_version(),
request_id=request_id,
+ trace_id=trace_id,
error=internal_error(String(e)),
)
)
@@ -167,9 +200,12 @@ def handle_request_line(line: String) raises -> String:
var request = decode_request(line)
return handle_request(request^)
except e:
+ var correlation = extract_request_correlation(line)
return encode_error(
WireErrorResponse(
- request_id="",
+ version=hyf_protocol_version(),
+ request_id=correlation.request_id,
+ trace_id=correlation.trace_id,
error=invalid_request_error(String(e)),
)
)
diff --git a/tests/test_hyf.mojo b/tests/test_hyf.mojo
@@ -12,6 +12,13 @@ def _dispatch(line: String) raises -> Value:
return loads(handle_request_line(line))
+def _has_key(value: Value, key: String) -> Bool:
+ for candidate in value.object_keys():
+ if candidate == key:
+ return True
+ return False
+
+
def _business_capability(result: Value, capability_id: String) raises -> Value:
for capability in result["output"]["business_capabilities"].array_items():
if capability["id"].string_value() == capability_id:
@@ -21,10 +28,12 @@ def _business_capability(result: Value, capability_id: String) raises -> Value:
def test_decode_request_parses_context_and_input() raises:
var request = decode_request(
- '{"request_id":"req-1","capability":"query_rewrite","context":{"consumer":"radroots-cli","execution_mode_preference":"deterministic","return_provenance":true},"input":{"query":"eggs near me"}}'
+ '{"version":1,"request_id":"req-1","trace_id":"trace-1","capability":"query_rewrite","context":{"consumer":"radroots-cli","execution_mode_preference":"deterministic","return_provenance":true},"input":{"query":"eggs near me"}}'
)
+ assert_equal(request.version, 1)
assert_equal(request.request_id, "req-1")
+ assert_equal(request.trace_id.value(), "trace-1")
assert_equal(request.capability, "query_rewrite")
assert_equal(request.context.consumer, "radroots-cli")
assert_equal(
@@ -37,7 +46,7 @@ def test_decode_request_parses_context_and_input() raises:
def test_decode_request_rejects_unexpected_field() raises:
with assert_raises():
_ = decode_request(
- '{"request_id":"req-1","capability":"query_rewrite","input":{"query":"eggs"},"unexpected":true}'
+ '{"version":1,"request_id":"req-1","capability":"query_rewrite","input":{"query":"eggs"},"unexpected":true}'
)
@@ -51,13 +60,17 @@ def test_encode_success_and_error_shapes() raises:
var success = loads(
encode_success(
WireSuccessResponse(
+ version=1,
request_id="req-success",
+ trace_id=String("trace-success"),
output=output.copy(),
meta=meta.copy(),
)
)
)
+ assert_equal(Int(success["version"].int_value()), 1)
assert_equal(success["request_id"].string_value(), "req-success")
+ assert_equal(success["trace_id"].string_value(), "trace-success")
assert_equal(success["ok"].bool_value(), True)
assert_equal(success["output"]["kind"].string_value(), "ok")
assert_equal(
@@ -68,12 +81,16 @@ def test_encode_success_and_error_shapes() raises:
var failure = loads(
encode_error(
WireErrorResponse(
+ version=1,
request_id="req-error",
+ trace_id=String("trace-error"),
error=WireError(code="invalid_request", message="bad request"),
)
)
)
+ assert_equal(Int(failure["version"].int_value()), 1)
assert_equal(failure["request_id"].string_value(), "req-error")
+ assert_equal(failure["trace_id"].string_value(), "trace-error")
assert_equal(failure["ok"].bool_value(), False)
assert_equal(failure["error"]["code"].string_value(), "invalid_request")
assert_equal(failure["error"]["message"].string_value(), "bad request")
@@ -81,16 +98,20 @@ def test_encode_success_and_error_shapes() raises:
def test_handle_request_line_returns_invalid_request_for_bad_line() raises:
var result = _dispatch("")
+ assert_equal(Int(result["version"].int_value()), 1)
assert_equal(result["request_id"].string_value(), "")
+ assert_equal(_has_key(result, "trace_id"), False)
assert_equal(result["ok"].bool_value(), False)
assert_equal(result["error"]["code"].string_value(), "invalid_request")
def test_status_reports_registered_deterministic_ready() raises:
var result = _dispatch(
- '{"request_id":"status-1","capability":"sys.status","input":{}}'
+ '{"version":1,"request_id":"status-1","trace_id":"trace-status-1","capability":"sys.status","input":{}}'
)
+ assert_equal(Int(result["version"].int_value()), 1)
+ assert_equal(result["trace_id"].string_value(), "trace-status-1")
assert_equal(result["ok"].bool_value(), True)
assert_equal(
result["output"]["build_identity"]["service_name"].string_value(),
@@ -144,9 +165,10 @@ def test_status_reports_registered_deterministic_ready() raises:
def test_capabilities_report_implemented_and_disabled_states() raises:
var result = _dispatch(
- '{"request_id":"caps-1","capability":"sys.capabilities","input":{}}'
+ '{"version":1,"request_id":"caps-1","capability":"sys.capabilities","input":{}}'
)
+ assert_equal(Int(result["version"].int_value()), 1)
var query_rewrite = _business_capability(result, "query_rewrite")
var semantic_rank = _business_capability(result, "semantic_rank")
var explain_result = _business_capability(result, "explain_result")
@@ -172,9 +194,10 @@ def test_capabilities_report_implemented_and_disabled_states() raises:
def test_disabled_capability_returns_capability_disabled() raises:
var result = _dispatch(
- '{"request_id":"disabled-1","capability":"filter_extraction","input":{}}'
+ '{"version":1,"request_id":"disabled-1","capability":"filter_extraction","input":{}}'
)
+ assert_equal(Int(result["version"].int_value()), 1)
assert_equal(result["ok"].bool_value(), False)
assert_equal(result["request_id"].string_value(), "disabled-1")
assert_equal(result["error"]["code"].string_value(), "capability_disabled")
@@ -182,9 +205,11 @@ def test_disabled_capability_returns_capability_disabled() raises:
def test_query_rewrite_returns_deterministic_output() raises:
var result = _dispatch(
- '{"request_id":"rewrite-1","capability":"query_rewrite","input":{"text":"eggs near me with weekend pickup"}}'
+ '{"version":1,"request_id":"rewrite-1","trace_id":"trace-rewrite-1","capability":"query_rewrite","input":{"text":"eggs near me with weekend pickup"}}'
)
+ assert_equal(Int(result["version"].int_value()), 1)
+ assert_equal(result["trace_id"].string_value(), "trace-rewrite-1")
assert_equal(result["ok"].bool_value(), True)
assert_equal(
result["output"]["rewritten_text"].string_value(),
@@ -199,9 +224,10 @@ def test_query_rewrite_returns_deterministic_output() raises:
def test_semantic_rank_returns_ranked_ids_and_reasons() raises:
var result = _dispatch(
- '{"request_id":"rank-1","capability":"semantic_rank","input":{"query":"eggs near me with weekend pickup","candidates":[{"id":"lst_7ak2","title":"Pasture eggs","farm":"La Huerta del Sur","delivery":"pickup","distance_km":3.2,"freshness_minutes":2},{"id":"lst_8k1p","title":"Free range eggs","farm":"Santa Elena","delivery":"delivery","distance_km":8.7,"freshness_minutes":18}]}}'
+ '{"version":1,"request_id":"rank-1","capability":"semantic_rank","input":{"query":"eggs near me with weekend pickup","candidates":[{"id":"lst_7ak2","title":"Pasture eggs","farm":"La Huerta del Sur","delivery":"pickup","distance_km":3.2,"freshness_minutes":2},{"id":"lst_8k1p","title":"Free range eggs","farm":"Santa Elena","delivery":"delivery","distance_km":8.7,"freshness_minutes":18}]}}'
)
+ assert_equal(Int(result["version"].int_value()), 1)
assert_equal(result["ok"].bool_value(), True)
assert_equal(
result["output"]["ranked_ids"][0].string_value(),
@@ -223,9 +249,10 @@ def test_semantic_rank_returns_ranked_ids_and_reasons() raises:
def test_explain_result_returns_deterministic_summary_and_provenance() raises:
var result = _dispatch(
- '{"request_id":"explain-1","capability":"explain_result","context":{"consumer":"radroots-cli","return_provenance":true},"input":{"query":"eggs near me with weekend pickup","candidate":{"id":"lst_7ak2","title":"Pasture eggs","farm":"La Huerta del Sur","delivery":"pickup","distance_km":3.2,"freshness_minutes":2}}}'
+ '{"version":1,"request_id":"explain-1","capability":"explain_result","context":{"consumer":"radroots-cli","return_provenance":true},"input":{"query":"eggs near me with weekend pickup","candidate":{"id":"lst_7ak2","title":"Pasture eggs","farm":"La Huerta del Sur","delivery":"pickup","distance_km":3.2,"freshness_minutes":2}}}'
)
+ assert_equal(Int(result["version"].int_value()), 1)
assert_equal(result["ok"].bool_value(), True)
assert_equal(
result["output"]["explanation_kind"].string_value(),
@@ -246,16 +273,33 @@ def test_explain_result_returns_deterministic_summary_and_provenance() raises:
def test_semantic_rank_invalid_input_returns_invalid_request() raises:
var result = _dispatch(
- '{"request_id":"rank-bad-1","capability":"semantic_rank","input":{"query":"eggs near me with weekend pickup","candidates":[]}}'
+ '{"version":1,"request_id":"rank-bad-1","trace_id":"trace-rank-bad-1","capability":"semantic_rank","input":{"query":"eggs near me with weekend pickup","candidates":[]}}'
)
+ assert_equal(Int(result["version"].int_value()), 1)
assert_equal(result["ok"].bool_value(), False)
assert_equal(result["request_id"].string_value(), "rank-bad-1")
+ assert_equal(result["trace_id"].string_value(), "trace-rank-bad-1")
assert_equal(result["error"]["code"].string_value(), "invalid_request")
assert_true(
result["error"]["message"].string_value().find("must not be empty") >= 0
)
+def test_invalid_request_preserves_request_and_trace_correlation() raises:
+ var result = _dispatch(
+ '{"version":2,"request_id":"bad-version-1","trace_id":"trace-bad-version-1","capability":"sys.status","input":{}}'
+ )
+
+ assert_equal(Int(result["version"].int_value()), 1)
+ assert_equal(result["request_id"].string_value(), "bad-version-1")
+ assert_equal(result["trace_id"].string_value(), "trace-bad-version-1")
+ assert_equal(result["ok"].bool_value(), False)
+ assert_equal(result["error"]["code"].string_value(), "invalid_request")
+ assert_true(
+ result["error"]["message"].string_value().find("unsupported") >= 0
+ )
+
+
def main() raises:
TestSuite.discover_tests[__functions_in_module()]().run()