hyf

Context-aware query service for Radroots
git clone https://radroots.dev/git/hyf.git
Log | Files | Refs | README | LICENSE

commit b40a1bdfd183aa25abba875d08835ee9af6407b0
parent 3aede8a6840eec079c3a473c4421340f581ba48d
Author: triesap <tyson@radroots.org>
Date:   Sun, 12 Apr 2026 03:33:45 +0000

provider: port max-local health and request logic to mojo

Diffstat:
Mpixi.lock | 72++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------
Mpixi.toml | 12+++++++-----
Asrc/hyf_provider/__init__.mojo | 1+
Asrc/hyf_provider/client.mojo | 21+++++++++++++++++++++
Asrc/hyf_provider/config.mojo | 40++++++++++++++++++++++++++++++++++++++++
Asrc/hyf_provider/health.mojo | 31+++++++++++++++++++++++++++++++
Asrc/hyf_provider/max_local.mojo | 56++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/hyf_provider/result.mojo | 120+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/hyf_provider/schema.mojo | 129+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Atests/max_local_http_stub.mojo | 122+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Atests/max_local_process_helper.mojo | 123+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mtests/test_hyf.mojo | 153++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
12 files changed, 862 insertions(+), 18 deletions(-)

diff --git a/pixi.lock b/pixi.lock @@ -40,7 +40,7 @@ environments: - conda: https://conda.modular.com/max-nightly/noarch/mojo-python-0.26.3.0.dev2026040805-release.conda - conda: https://conda.anaconda.org/conda-forge/noarch/mypy_extensions-1.1.0-pyha770c72_0.conda - conda: https://conda.anaconda.org/conda-forge/linux-64/ncurses-6.5-h2d0b736_3.conda - - conda: https://conda.anaconda.org/conda-forge/linux-64/openssl-3.6.1-h35e630c_1.conda + - conda: https://conda.anaconda.org/conda-forge/linux-64/openssl-3.6.2-h35e630c_0.conda - conda: https://conda.anaconda.org/conda-forge/noarch/packaging-26.0-pyhcf101f3_0.conda - conda: https://conda.anaconda.org/conda-forge/noarch/pathspec-1.0.4-pyhd8ed1ab_0.conda - conda: https://conda.anaconda.org/conda-forge/noarch/platformdirs-4.9.4-pyhcf101f3_0.conda @@ -60,10 +60,14 @@ environments: - conda: https://conda.anaconda.org/conda-forge/linux-64/zeromq-4.3.5-h41580af_10.conda - conda: https://conda.anaconda.org/conda-forge/noarch/zipp-3.23.0-pyhcf101f3_1.conda - conda: https://conda.anaconda.org/conda-forge/linux-64/zstd-1.5.7-hb78ec9c_6.conda + - conda: ../../../../vendor/mojo/envo + build: hb0f4dca_0 - conda: ../../../../vendor/mojo/mojson build: hb0f4dca_0 - conda: ../../../../vendor/mojo/morph build: hb0f4dca_0 + - conda: ../../../../vendor/mojo/tempo + build: hb0f4dca_0 osx-arm64: - conda: https://conda.anaconda.org/conda-forge/noarch/_python_abi3_support-1.0-hd8ed1ab_2.conda - conda: https://conda.anaconda.org/conda-forge/osx-arm64/bzip2-1.0.8-hd037594_9.conda @@ -90,7 +94,7 @@ environments: - conda: https://conda.modular.com/max-nightly/noarch/mojo-python-0.26.3.0.dev2026040805-release.conda - conda: https://conda.anaconda.org/conda-forge/noarch/mypy_extensions-1.1.0-pyha770c72_0.conda - conda: https://conda.anaconda.org/conda-forge/osx-arm64/ncurses-6.5-h5e97a16_3.conda - - conda: https://conda.anaconda.org/conda-forge/osx-arm64/openssl-3.6.1-hd24854e_1.conda + - conda: https://conda.anaconda.org/conda-forge/osx-arm64/openssl-3.6.2-hd24854e_0.conda - conda: https://conda.anaconda.org/conda-forge/noarch/packaging-26.0-pyhcf101f3_0.conda - conda: https://conda.anaconda.org/conda-forge/noarch/pathspec-1.0.4-pyhd8ed1ab_0.conda - conda: https://conda.anaconda.org/conda-forge/noarch/platformdirs-4.9.4-pyhcf101f3_0.conda @@ -110,10 +114,14 @@ environments: - conda: https://conda.anaconda.org/conda-forge/osx-arm64/zeromq-4.3.5-h4818236_10.conda - conda: https://conda.anaconda.org/conda-forge/noarch/zipp-3.23.0-pyhcf101f3_1.conda - conda: https://conda.anaconda.org/conda-forge/osx-arm64/zstd-1.5.7-hbf9d68e_6.conda + - conda: ../../../../vendor/mojo/envo + build: h60d57d3_0 - conda: ../../../../vendor/mojo/mojson build: h60d57d3_0 - conda: ../../../../vendor/mojo/morph build: h60d57d3_0 + - conda: ../../../../vendor/mojo/tempo + build: h60d57d3_0 packages: - conda: https://conda.anaconda.org/conda-forge/linux-64/_openmp_mutex-4.5-20_gnu.conda build_number: 20 @@ -186,6 +194,26 @@ packages: license: Python-2.0 size: 50078 timestamp: 1770674447292 +- conda: ../../../../vendor/mojo/envo + name: envo + version: 0.1.0 + build: h60d57d3_0 + subdir: osx-arm64 + variants: + target_platform: osx-arm64 + depends: + - mojo <1.0 + license: MIT +- conda: ../../../../vendor/mojo/envo + name: envo + version: 0.1.0 + build: hb0f4dca_0 + subdir: linux-64 + variants: + target_platform: linux-64 + depends: + - mojo <1.0 + license: MIT - conda: https://conda.anaconda.org/conda-forge/linux-64/icu-78.3-h33c6efd_0.conda sha256: fbf86c4a59c2ed05bbffb2ba25c7ed94f6185ec30ecb691615d42342baa1a16a md5: c80d8a3b84358cb967fa81e7075fbc8a @@ -648,27 +676,27 @@ packages: license: X11 AND BSD-3-Clause size: 797030 timestamp: 1738196177597 -- conda: https://conda.anaconda.org/conda-forge/linux-64/openssl-3.6.1-h35e630c_1.conda - sha256: 44c877f8af015332a5d12f5ff0fb20ca32f896526a7d0cdb30c769df1144fb5c - md5: f61eb8cd60ff9057122a3d338b99c00f +- conda: https://conda.anaconda.org/conda-forge/linux-64/openssl-3.6.2-h35e630c_0.conda + sha256: c0ef482280e38c71a08ad6d71448194b719630345b0c9c60744a2010e8a8e0cb + md5: da1b85b6a87e141f5140bb9924cecab0 depends: - __glibc >=2.17,<3.0.a0 - ca-certificates - libgcc >=14 license: Apache-2.0 license_family: Apache - size: 3164551 - timestamp: 1769555830639 -- conda: https://conda.anaconda.org/conda-forge/osx-arm64/openssl-3.6.1-hd24854e_1.conda - sha256: 361f5c5e60052abc12bdd1b50d7a1a43e6a6653aab99a2263bf2288d709dcf67 - md5: f4f6ad63f98f64191c3e77c5f5f29d76 + size: 3167099 + timestamp: 1775587756857 +- conda: https://conda.anaconda.org/conda-forge/osx-arm64/openssl-3.6.2-hd24854e_0.conda + sha256: c91bf510c130a1ea1b6ff023e28bac0ccaef869446acd805e2016f69ebdc49ea + md5: 25dcccd4f80f1638428613e0d7c9b4e1 depends: - __osx >=11.0 - ca-certificates license: Apache-2.0 license_family: Apache - size: 3104268 - timestamp: 1769556384749 + size: 3106008 + timestamp: 1775587972483 - conda: https://conda.anaconda.org/conda-forge/noarch/packaging-26.0-pyhcf101f3_0.conda sha256: c1fc0f953048f743385d31c468b4a678b3ad20caffdeaa94bed85ba63049fd58 md5: b76541e68fea4d511b1ac46a28dcd2c6 @@ -862,6 +890,26 @@ packages: license_family: MIT size: 18455 timestamp: 1753199211006 +- conda: ../../../../vendor/mojo/tempo + name: tempo + version: 0.1.0 + build: h60d57d3_0 + subdir: osx-arm64 + variants: + target_platform: osx-arm64 + depends: + - mojo <1.0 + license: MIT +- conda: ../../../../vendor/mojo/tempo + name: tempo + version: 0.1.0 + build: hb0f4dca_0 + subdir: linux-64 + variants: + target_platform: linux-64 + depends: + - mojo <1.0 + license: MIT - conda: https://conda.anaconda.org/conda-forge/linux-64/tk-8.6.13-noxft_h366c992_103.conda sha256: cafeec44494f842ffeca27e9c8b0c27ed714f93ac77ddadc6aaf726b5554ebac md5: cffd3bdd58090148f4cfcd831f4b26ab diff --git a/pixi.toml b/pixi.toml @@ -13,15 +13,17 @@ preview = ["pixi-build"] [dependencies] mojo = "=0.26.3.0.dev2026040805" +envo = { path = "../../../../vendor/mojo/envo" } mojson = { path = "../../../../vendor/mojo/mojson" } morph = { path = "../../../../vendor/mojo/morph" } +tempo = { path = "../../../../vendor/mojo/tempo" } [tasks] -run = "mojo run src/main.mojo" -test-unit = "mojo -I src tests/test_hyf.mojo" -test-runtime = "mojo -I src tests/test_runtime_paths.mojo" -test-repo-local-process = "mojo -I src tests/test_repo_local_process_contract.mojo" -test-stdio = "mojo -I src tests/test_stdio_contract.mojo" +run = "FLARE_LIB=../../../../vendor/mojo/flare/build/libflare_tls.so FLARE_ZLIB_LIB=../../../../vendor/mojo/flare/build/libflare_zlib.so mojo run -I src -I ../../../../vendor/mojo/flare src/main.mojo" +test-unit = "FLARE_LIB=../../../../vendor/mojo/flare/build/libflare_tls.so FLARE_ZLIB_LIB=../../../../vendor/mojo/flare/build/libflare_zlib.so mojo -I src -I ../../../../vendor/mojo/flare tests/test_hyf.mojo" +test-runtime = "FLARE_LIB=../../../../vendor/mojo/flare/build/libflare_tls.so FLARE_ZLIB_LIB=../../../../vendor/mojo/flare/build/libflare_zlib.so mojo -I src -I ../../../../vendor/mojo/flare tests/test_runtime_paths.mojo" +test-repo-local-process = "FLARE_LIB=../../../../vendor/mojo/flare/build/libflare_tls.so FLARE_ZLIB_LIB=../../../../vendor/mojo/flare/build/libflare_zlib.so mojo -I src -I ../../../../vendor/mojo/flare tests/test_repo_local_process_contract.mojo" +test-stdio = "FLARE_LIB=../../../../vendor/mojo/flare/build/libflare_tls.so FLARE_ZLIB_LIB=../../../../vendor/mojo/flare/build/libflare_zlib.so mojo -I src -I ../../../../vendor/mojo/flare tests/test_stdio_contract.mojo" test-runtime-contract = { depends-on = [ "test-runtime", "test-repo-local-process", diff --git a/src/hyf_provider/__init__.mojo b/src/hyf_provider/__init__.mojo @@ -0,0 +1 @@ +"""Pure-Mojo provider integration modules for HYF.""" diff --git a/src/hyf_provider/client.mojo b/src/hyf_provider/client.mojo @@ -0,0 +1,21 @@ +from flare.http import HttpClient + +from hyf_provider.config import MaxLocalProviderConfig + + +def _trim_trailing_slash(url: String) -> String: + if url.endswith("/") and url.byte_length() > 1: + return String(url[byte = 0 : url.byte_length() - 1]) + return String(url) + + +def make_max_local_http_client( + config: MaxLocalProviderConfig, +) -> HttpClient: + return HttpClient(timeout_ms=config.request_timeout_ms) + + +def max_local_chat_completions_url( + config: MaxLocalProviderConfig, +) -> String: + return _trim_trailing_slash(config.base_url) + "/chat/completions" diff --git a/src/hyf_provider/config.mojo b/src/hyf_provider/config.mojo @@ -0,0 +1,40 @@ +from envo import getenv_or + + +@fieldwise_init +struct MaxLocalProviderConfig(Defaultable, Copyable, Movable): + var base_url: String + var health_url: String + var model: String + var route: String + var request_timeout_ms: Int + + def __init__(out self): + self.base_url = "http://127.0.0.1:8000/v1" + self.health_url = "http://127.0.0.1:8000/health" + self.model = "max-local-query-rewrite" + self.route = "assist_bridge.query_rewrite.max_local" + self.request_timeout_ms = 15_000 + + +def default_max_local_provider_config() -> MaxLocalProviderConfig: + return MaxLocalProviderConfig() + + +def load_max_local_provider_config() raises -> MaxLocalProviderConfig: + var config = default_max_local_provider_config() + config.base_url = getenv_or("HYF_MAX_LOCAL_BASE_URL", config.base_url) + config.health_url = getenv_or( + "HYF_MAX_LOCAL_HEALTH_URL", config.health_url + ) + config.model = getenv_or("HYF_MAX_LOCAL_MODEL", config.model) + config.route = getenv_or("HYF_MAX_LOCAL_ROUTE", config.route) + + var timeout_value = getenv_or( + "HYF_MAX_LOCAL_REQUEST_TIMEOUT_MS", String(config.request_timeout_ms) + ) + var parsed_timeout = atol(timeout_value) + if parsed_timeout > 0: + config.request_timeout_ms = parsed_timeout + + return config^ diff --git a/src/hyf_provider/health.mojo b/src/hyf_provider/health.mojo @@ -0,0 +1,31 @@ +from hyf_provider.client import make_max_local_http_client +from hyf_provider.config import MaxLocalProviderConfig +from hyf_provider.result import MaxLocalProviderStatus + + +def resolve_max_local_provider_status( + config: MaxLocalProviderConfig, +) -> MaxLocalProviderStatus: + try: + with make_max_local_http_client(config) as client: + var response = client.get(config.health_url) + var reachable = response.ok() + return MaxLocalProviderStatus( + backend_kind="max_local", + provider="max_local", + route=String(config.route), + model=String(config.model), + reachable=reachable, + state="ready" if reachable else "unavailable", + ) + except: + pass + + return MaxLocalProviderStatus( + backend_kind="max_local", + provider="max_local", + route=String(config.route), + model=String(config.model), + reachable=False, + state="unavailable", + ) diff --git a/src/hyf_provider/max_local.mojo b/src/hyf_provider/max_local.mojo @@ -0,0 +1,56 @@ +from tempo import Timestamp + +from hyf_assist.contract import AssistQueryRewriteResult +from hyf_core.request_context import RequestContext +from hyf_provider.client import ( + make_max_local_http_client, + max_local_chat_completions_url, +) +from hyf_provider.config import MaxLocalProviderConfig +from hyf_provider.health import resolve_max_local_provider_status +from hyf_provider.result import ( + MaxLocalProviderStatus, + parse_query_analysis_from_chat_completion, +) +from hyf_provider.schema import ( + build_query_rewrite_request_body, + query_rewrite_schema_version, +) + + +def execute_query_rewrite_via_max_local_provider( + config: MaxLocalProviderConfig, text: String, context: RequestContext +) raises -> AssistQueryRewriteResult: + var started_at_ms = Timestamp.now().unix_ms() + with make_max_local_http_client(config) as client: + var response = client.post( + max_local_chat_completions_url(config), + build_query_rewrite_request_body(config, text, context), + ) + if not response.ok(): + raise Error( + "max_local provider returned HTTP " + + String(response.status) + ) + + var analysis = parse_query_analysis_from_chat_completion( + response.json() + ) + var latency_ms = Int(Timestamp.now().unix_ms() - started_at_ms) + if latency_ms < 0: + latency_ms = 0 + + return AssistQueryRewriteResult( + analysis=analysis^, + provider="max_local", + route=String(config.route), + model=String(config.model), + latency_ms=latency_ms, + schema_version=query_rewrite_schema_version(), + ) + + +def max_local_provider_status( + config: MaxLocalProviderConfig, +) -> MaxLocalProviderStatus: + return resolve_max_local_provider_status(config) diff --git a/src/hyf_provider/result.mojo b/src/hyf_provider/result.mojo @@ -0,0 +1,120 @@ +from std.collections import List + +from mojson import Value, loads, validate + +from hyf_core.capabilities.query_analysis import ( + ExtractedFilters, + QueryAnalysis, +) +from hyf_provider.schema import query_rewrite_schema + + +@fieldwise_init +struct MaxLocalProviderStatus(Copyable, Movable): + var backend_kind: String + var provider: String + var route: String + var model: String + var reachable: Bool + var state: String + + +def _has_key(value: Value, key: String) -> Bool: + for candidate in value.object_keys(): + if candidate == key: + return True + return False + + +def _string_array(value: Value, context: String) raises -> List[String]: + if not value.is_array(): + raise Error(context + " must be an array") + + var items = List[String]() + for item in value.array_items(): + if not item.is_string(): + raise Error(context + " items must be strings") + items.append(item.string_value()) + return items^ + + +def _first_validation_error(value: Value) raises -> String: + var validation = validate(value, query_rewrite_schema()) + if validation.valid: + return "" + if len(validation.errors) == 0: + return "query_rewrite structured output failed schema validation" + var error = validation.errors[0].copy() + if error.path == "": + return String(error.message) + return String(error.path) + ": " + String(error.message) + + +def extract_chat_completion_text(response: Value) raises -> String: + if not response.is_object(): + raise Error("max_local response must be a JSON object") + if not _has_key(response, "choices"): + raise Error("max_local response must contain choices") + if ( + not response["choices"].is_array() + or len(response["choices"].array_items()) == 0 + ): + raise Error("max_local response choices must be a non-empty array") + + var message = response["choices"][0]["message"].clone() + if not message.is_object(): + raise Error("max_local response choice message must be an object") + + var content = message["content"].clone() + if content.is_string(): + return content.string_value() + + if content.is_array(): + var collected = String("") + for part in content.array_items(): + if ( + part.is_object() + and _has_key(part, "type") + and part["type"].is_string() + and part["type"].string_value() == "text" + and _has_key(part, "text") + and part["text"].is_string() + ): + collected += part["text"].string_value() + + if collected != "": + return collected^ + + raise Error("max_local response contained no text content") + + +def parse_query_analysis_json(value: Value) raises -> QueryAnalysis: + if not value.is_object(): + raise Error("query_rewrite structured output must be an object") + + var validation_error = _first_validation_error(value.clone()) + if validation_error != "": + raise Error(validation_error) + + var filters = value["extracted_filters"].clone() + return QueryAnalysis( + original_text=value["original_text"].string_value(), + normalized_text=value["normalized_text"].string_value(), + rewritten_text=value["rewritten_text"].string_value(), + query_terms=_string_array(value["query_terms"], "query_terms"), + normalization_signals=_string_array( + value["normalization_signals"], "normalization_signals" + ), + ranking_hints=_string_array(value["ranking_hints"], "ranking_hints"), + extracted_filters=ExtractedFilters( + local_intent=filters["local_intent"].bool_value(), + fulfillment=filters["fulfillment"].string_value(), + time_window=filters["time_window"].string_value(), + ), + ) + + +def parse_query_analysis_from_chat_completion( + response: Value, +) raises -> QueryAnalysis: + return parse_query_analysis_json(loads(extract_chat_completion_text(response))) diff --git a/src/hyf_provider/schema.mojo b/src/hyf_provider/schema.mojo @@ -0,0 +1,129 @@ +from mojson import Value, loads + +from hyf_core.request_context import RequestContext +from hyf_provider.config import MaxLocalProviderConfig + + +def query_rewrite_schema_version() -> Int: + return 1 + + +def query_rewrite_schema() raises -> Value: + var schema = loads("{}") + schema.set("type", Value("object")) + schema.set("additionalProperties", Value(False)) + + var required = loads("[]") + required.append(Value("original_text")) + required.append(Value("normalized_text")) + required.append(Value("rewritten_text")) + required.append(Value("query_terms")) + required.append(Value("normalization_signals")) + required.append(Value("ranking_hints")) + required.append(Value("extracted_filters")) + schema.set("required", required) + + var properties = loads("{}") + properties.set("original_text", loads('{"type":"string"}')) + properties.set("normalized_text", loads('{"type":"string"}')) + properties.set("rewritten_text", loads('{"type":"string"}')) + properties.set( + "query_terms", + loads('{"type":"array","items":{"type":"string"}}'), + ) + properties.set( + "normalization_signals", + loads('{"type":"array","items":{"type":"string"}}'), + ) + properties.set( + "ranking_hints", + loads('{"type":"array","items":{"type":"string"}}'), + ) + properties.set( + "extracted_filters", + loads( + '{"type":"object","additionalProperties":false,"required":["local_intent","fulfillment","time_window"],"properties":{"local_intent":{"type":"boolean"},"fulfillment":{"type":"string"},"time_window":{"type":"string"}}}' + ), + ) + schema.set("properties", properties) + return schema^ + + +def query_rewrite_system_prompt() -> String: + return ( + "Return only strict JSON matching the supplied schema. Preserve " + + "original_text, normalized_text, rewritten_text, query_terms, " + + "normalization_signals, ranking_hints, and extracted_filters." + ) + + +def build_query_rewrite_user_prompt( + text: String, context: RequestContext +) -> String: + var prompt = ( + "Rewrite the market search query into normalized search terms and " + + "extracted filters.\nquery: " + + text + + "\n" + ) + + if context.scope and len(context.scope.value().listing_ids) > 0: + var first = True + prompt += "scope_listing_ids: " + for listing_id in context.scope.value().listing_ids: + if not first: + prompt += "," + prompt += String(listing_id) + first = False + prompt += "\n" + + if context.time_range: + prompt += ( + "time_range: " + + context.time_range.value().start + + " -> " + + context.time_range.value().end + + "\n" + ) + + prompt += "consistency: " + context.consistency + "\n" + prompt += "evidence_limit: " + String(context.evidence_limit) + "\n" + prompt += "explain_plan: " + String(context.explain_plan) + "\n" + return prompt^ + + +def build_query_rewrite_request_body( + config: MaxLocalProviderConfig, text: String, context: RequestContext +) raises -> Value: + var body = loads("{}") + body.set("model", Value(String(config.model))) + + var messages = loads("[]") + + var system_message = loads("{}") + system_message.set("role", Value("system")) + system_message.set("content", Value(query_rewrite_system_prompt())) + messages.append(system_message) + + var user_message = loads("{}") + user_message.set("role", Value("user")) + user_message.set( + "content", Value(build_query_rewrite_user_prompt(text, context)) + ) + messages.append(user_message) + + body.set("messages", messages) + body.set("temperature", Value(0.1)) + body.set("max_tokens", Value(256)) + + var response_format = loads("{}") + response_format.set("type", Value("json_schema")) + + var json_schema = loads("{}") + json_schema.set("name", Value("query_rewrite")) + json_schema.set("strict", Value(True)) + json_schema.set("schema", query_rewrite_schema()) + response_format.set("json_schema", json_schema) + + body.set("response_format", response_format) + return body^ diff --git a/tests/max_local_http_stub.mojo b/tests/max_local_http_stub.mojo @@ -0,0 +1,122 @@ +from std.collections import List +from std.sys import argv + +from flare.http import Request, Response, Status +from flare.http.server import _handle_connection +from flare.net import SocketAddr +from flare.tcp import TcpListener +from mojson import Value, dumps, loads + + +def _arg_value(flag: String) raises -> String: + var args = argv() + var index = 0 + while index < len(args): + if args[index] == flag: + index += 1 + if index >= len(args): + raise Error("missing value for " + flag) + return args[index] + index += 1 + raise Error("missing required flag " + flag) + + +def _to_body(text: String) -> List[UInt8]: + var body = List[UInt8](capacity=len(text)) + for byte in text.as_bytes(): + body.append(byte) + return body^ + + +def _query_rewrite_result_json() raises -> String: + var analysis = loads("{}") + analysis.set("original_text", Value("local apples pickup weekend")) + analysis.set("normalized_text", Value("local apples pickup weekend")) + analysis.set("rewritten_text", Value("apples pickup weekend")) + + var query_terms = loads("[]") + query_terms.append(Value("apples")) + query_terms.append(Value("pickup")) + query_terms.append(Value("weekend")) + analysis.set("query_terms", query_terms) + + var normalization_signals = loads("[]") + normalization_signals.append(Value("lowercase")) + analysis.set("normalization_signals", normalization_signals) + + var ranking_hints = loads("[]") + ranking_hints.append(Value("local_intent")) + analysis.set("ranking_hints", ranking_hints) + + var filters = loads("{}") + filters.set("local_intent", Value(True)) + filters.set("fulfillment", Value("pickup")) + filters.set("time_window", Value("weekend")) + analysis.set("extracted_filters", filters) + + return dumps(analysis) + + +def _health_response() -> Response: + return Response( + status=Status.OK, + reason="OK", + body=_to_body('{"status":"ok"}'), + ) + + +def _query_rewrite_response() raises -> Response: + var root = loads("{}") + var choices = loads("[]") + var choice = loads("{}") + var message = loads("{}") + message.set("content", Value(_query_rewrite_result_json())) + choice.set("message", message) + choices.append(choice) + root.set("choices", choices) + + return Response( + status=Status.OK, reason="OK", body=_to_body(dumps(root)) + ) + + +def _health_router(request: Request) raises -> Response: + if request.method == "GET" and request.url == "/health": + return _health_response() + return Response( + status=Status.NOT_FOUND, + reason="Not Found", + body=_to_body("not found"), + ) + + +def _query_rewrite_router(request: Request) raises -> Response: + if ( + request.method == "POST" + and request.url == "/v1/chat/completions" + ): + return _query_rewrite_response() + return Response( + status=Status.NOT_FOUND, + reason="Not Found", + body=_to_body("not found"), + ) + + +def main() raises: + var port = UInt16(atol(_arg_value("--port"))) + var mode = _arg_value("--mode") + + var listener = TcpListener.bind(SocketAddr.localhost(port)) + print("ready") + + var stream = listener.accept() + if mode == "health_ok": + _handle_connection(stream^, _health_router, 8192, 1024 * 1024) + elif mode == "query_rewrite_ok": + _handle_connection( + stream^, _query_rewrite_router, 8192, 1024 * 1024 + ) + else: + raise Error("unsupported mode " + mode) + listener.close() diff --git a/tests/max_local_process_helper.mojo b/tests/max_local_process_helper.mojo @@ -0,0 +1,123 @@ +from std.collections import List, Optional +from std.ffi import CStringSlice, c_int, external_call +from std.os import Pipe, Process +from std.sys._libc import close + +from flare.net import SocketAddr +from flare.tcp import TcpListener + + +def _dup2(oldfd: c_int, newfd: c_int) -> c_int: + return external_call["dup2", c_int](oldfd, newfd) + + +@always_inline +def _fork() -> c_int: + return external_call["fork", c_int]() + + +@always_inline +def _exit_child(code: c_int): + _ = external_call["_exit", c_int](code) + + +def _read_pipe_line(mut pipe: Pipe) raises -> String: + var buffer = InlineArray[Byte, 1](fill=0) + var output = String("") + while True: + var read = pipe.read_bytes(Span(buffer)) + if read == 0: + break + var chunk = String( + from_utf8=Span(ptr=buffer.unsafe_ptr(), length=Int(read)) + ) + if chunk == "\n": + break + output += chunk + return output^ + + +struct SpawnedMaxLocalStub(Movable): + var pid: Int + + def __init__(out self, pid: Int): + self.pid = pid + + def wait(mut self) raises: + var process = Process(self.pid) + var status = process.wait() + if not status.exit_code or status.exit_code.value() != 0: + raise Error("max_local stub exited unexpectedly") + + +def reserve_loopback_port() raises -> Int: + var listener = TcpListener.bind(SocketAddr.localhost(0)) + var port = Int(listener.local_addr().port) + listener.close() + return port + + +def spawn_max_local_stub( + port: Int, mode: String +) raises -> SpawnedMaxLocalStub: + var stdout_pipe = Pipe() + var command = String("mojo") + var include_flag = String("-I") + var include_path = String("src") + var vendor_include_path = String("../../../../vendor/mojo/flare") + var entrypoint = String("tests/max_local_http_stub.mojo") + var arg_port_flag = String("--port") + var arg_port = String(port) + var arg_mode_flag = String("--mode") + var arg_mode = String(mode) + var argv = List[Optional[CStringSlice[ImmutAnyOrigin]]](length=12, fill={}) + argv[0] = rebind[CStringSlice[ImmutAnyOrigin]](command.as_c_string_slice()) + argv[1] = rebind[CStringSlice[ImmutAnyOrigin]]("run".as_c_string_slice()) + argv[2] = rebind[CStringSlice[ImmutAnyOrigin]]( + include_flag.as_c_string_slice() + ) + argv[3] = rebind[CStringSlice[ImmutAnyOrigin]]( + include_path.as_c_string_slice() + ) + argv[4] = rebind[CStringSlice[ImmutAnyOrigin]]( + include_flag.as_c_string_slice() + ) + argv[5] = rebind[CStringSlice[ImmutAnyOrigin]]( + vendor_include_path.as_c_string_slice() + ) + argv[6] = rebind[CStringSlice[ImmutAnyOrigin]](entrypoint.as_c_string_slice()) + argv[7] = rebind[CStringSlice[ImmutAnyOrigin]]( + arg_port_flag.as_c_string_slice() + ) + argv[8] = rebind[CStringSlice[ImmutAnyOrigin]](arg_port.as_c_string_slice()) + argv[9] = rebind[CStringSlice[ImmutAnyOrigin]]( + arg_mode_flag.as_c_string_slice() + ) + argv[10] = rebind[CStringSlice[ImmutAnyOrigin]](arg_mode.as_c_string_slice()) + var stdout_read_fd = c_int(stdout_pipe.fd_in.value().value) + var stdout_write_fd = c_int(stdout_pipe.fd_out.value().value) + var command_ptr = command.as_c_string_slice().unsafe_ptr() + var argv_ptr = argv.unsafe_ptr() + + var pid = _fork() + if pid < 0: + raise Error("failed to spawn max_local stub") + + if pid == 0: + if _dup2(stdout_write_fd, 1) < 0: + _exit_child(c_int(126)) + _ = close(stdout_read_fd) + _ = close(stdout_write_fd) + _ = external_call["execvp", c_int](command_ptr, argv_ptr) + _exit_child(c_int(127)) + + stdout_pipe.set_input_only() + var ready_line = _read_pipe_line(stdout_pipe) + if ready_line != "ready": + stdout_pipe.set_output_only() + var process = Process(Int(pid)) + _ = process.wait() + raise Error("max_local stub failed to report ready") + + stdout_pipe.set_output_only() + return SpawnedMaxLocalStub(Int(pid)) diff --git a/tests/test_hyf.mojo b/tests/test_hyf.mojo @@ -32,7 +32,17 @@ from hyf_core.backends.selector import ( ) from hyf_core.capabilities.registry import canonical_business_capabilities from hyf_core.metadata import current_build_identity, current_package_surface -from hyf_core.request_context import default_request_context +from hyf_core.request_context import ( + RequestScope, + TimeRange, + default_request_context, +) +from hyf_provider.config import load_max_local_provider_config +from hyf_provider.max_local import ( + execute_query_rewrite_via_max_local_provider, + max_local_provider_status, +) +from hyf_provider.schema import build_query_rewrite_request_body from hyf_stdio.control.capabilities import build_capabilities_output from hyf_stdio.codec import decode_request, encode_error, encode_success from hyf_stdio.envelope import WireErrorResponse, WireSuccessResponse @@ -42,6 +52,10 @@ from hyf_stdio.server import ( handle_request_line_with_runtime_context, handle_request_line_with_control_builders, ) +from max_local_process_helper import ( + reserve_loopback_port, + spawn_max_local_stub, +) comptime _EXPECTED_INTERNAL_ERROR_MESSAGE = ( @@ -869,6 +883,143 @@ def test_invalid_request_preserves_request_and_trace_correlation() raises: ) +def test_max_local_provider_status_probes_health_without_sidecar() raises: + var port = reserve_loopback_port() + var stub = spawn_max_local_stub(port, "health_ok") + + with ScopedEnvVar( + "HYF_MAX_LOCAL_BASE_URL", + "http://127.0.0.1:" + String(port) + "/v1", + ): + with ScopedEnvVar( + "HYF_MAX_LOCAL_HEALTH_URL", + "http://127.0.0.1:" + String(port) + "/health", + ): + with ScopedEnvVar("HYF_MAX_LOCAL_REQUEST_TIMEOUT_MS", "1000"): + var config = load_max_local_provider_config() + assert_equal( + config.base_url, + "http://127.0.0.1:" + String(port) + "/v1", + ) + assert_equal( + config.health_url, + "http://127.0.0.1:" + String(port) + "/health", + ) + var status = max_local_provider_status(config) + assert_equal(status.backend_kind, "max_local") + assert_equal(status.provider, "max_local") + assert_equal( + status.route, "assist_bridge.query_rewrite.max_local" + ) + assert_equal(status.model, "max-local-query-rewrite") + assert_equal(status.reachable, True) + assert_equal(status.state, "ready") + + stub.wait() + + +def test_max_local_query_rewrite_request_is_mojo_owned() raises: + var port = reserve_loopback_port() + var stub = spawn_max_local_stub(port, "query_rewrite_ok") + + with ScopedEnvVar( + "HYF_MAX_LOCAL_BASE_URL", + "http://127.0.0.1:" + String(port) + "/v1", + ): + with ScopedEnvVar( + "HYF_MAX_LOCAL_HEALTH_URL", + "http://127.0.0.1:" + String(port) + "/health", + ): + with ScopedEnvVar("HYF_MAX_LOCAL_REQUEST_TIMEOUT_MS", "1000"): + var context = default_request_context() + var listing_ids = List[String]() + listing_ids.append("listing-1") + context.scope = RequestScope( + listing_ids=listing_ids^, + farm_ids=List[String](), + account_ids=List[String](), + platform_ids=List[String](), + object_filters=None, + ) + context.time_range = TimeRange( + start="2026-04-12", end="2026-04-13" + ) + context.explain_plan = True + + var config = load_max_local_provider_config() + var request_body = build_query_rewrite_request_body( + config, + "local apples pickup weekend", + context, + ) + assert_equal( + request_body["model"].string_value(), + "max-local-query-rewrite", + ) + assert_equal( + request_body["response_format"]["type"].string_value(), + "json_schema", + ) + assert_equal( + request_body["response_format"]["json_schema"]["strict"] + .bool_value(), + True, + ) + assert_equal( + request_body["response_format"]["json_schema"]["name"] + .string_value(), + "query_rewrite", + ) + assert_equal( + request_body["response_format"]["json_schema"]["schema"][ + "additionalProperties" + ].bool_value(), + False, + ) + assert_true( + request_body["messages"][1]["content"] + .string_value() + .find("scope_listing_ids: listing-1") + >= 0 + ) + assert_true( + request_body["messages"][1]["content"] + .string_value() + .find("time_range: 2026-04-12 -> 2026-04-13") + >= 0 + ) + assert_true( + request_body["messages"][1]["content"] + .string_value() + .find("explain_plan: True") + >= 0 + ) + + var result = execute_query_rewrite_via_max_local_provider( + config, + "local apples pickup weekend", + context, + ) + assert_equal(result.provider, "max_local") + assert_equal( + result.route, "assist_bridge.query_rewrite.max_local" + ) + assert_equal(result.model, "max-local-query-rewrite") + assert_equal(result.schema_version, 1) + assert_true(result.latency_ms >= 0) + assert_equal( + result.analysis.rewritten_text, "apples pickup weekend" + ) + assert_equal( + result.analysis.extracted_filters.fulfillment, "pickup" + ) + assert_equal( + result.analysis.extracted_filters.time_window, "weekend" + ) + + stub.wait() + + def test_internal_error_is_bounded_on_wire() raises: with TemporaryDirectory() as temp_dir: var diagnostics_dir = Path(temp_dir) / "hyf-internal-diagnostics"