commit 81307328f3dcf4814ef0ffe55558124ac7d330b4
parent b2b99bcb5fb3f6250eedac5fbbe61db8ff8be6dd
Author: triesap <tyson@radroots.org>
Date: Wed, 8 Apr 2026 17:54:02 +0000
capabilities: add deterministic query rewrite
- implement request-local mode-a query_rewrite with typed output and optional provenance
- map core result metadata through the stdio response envelope
- update control-plane truthfulness for the first implemented business capability
- validate success, provenance, and invalid input behavior via pixi run run
Diffstat:
8 files changed, 473 insertions(+), 8 deletions(-)
diff --git a/src/hyf_core/capabilities/query_rewrite.mojo b/src/hyf_core/capabilities/query_rewrite.mojo
@@ -0,0 +1,318 @@
+from std.collections import List, Optional
+
+from mojson import Value, loads
+
+from hyf_core.errors import (
+ CapabilityResult,
+ failed_capability,
+ invalid_input_error,
+ successful_capability,
+)
+from hyf_core.provenance import (
+ CoreResponseMeta,
+ ExecutionProvenance,
+ ProvenanceSourceRef,
+)
+from hyf_core.request_context import RequestContext
+
+
+def _has_key(value: Value, key: String) -> Bool:
+ for candidate in value.object_keys():
+ if candidate == key:
+ return True
+ return False
+
+
+def _copy_strings(items: List[String]) -> List[String]:
+ var copied = List[String]()
+ for item in items:
+ copied.append(String(item))
+ return copied^
+
+
+def _string_array(items: List[String]) raises -> Value:
+ var array = loads("[]")
+ for item in items:
+ array.append(Value(String(item)))
+ return array^
+
+
+def _collapse_whitespace(text: String) -> String:
+ var parts = text.split()
+ var collapsed = String()
+ var first = True
+ for part in parts:
+ if not first:
+ collapsed += " "
+ collapsed += String(part)
+ first = False
+ return collapsed^
+
+
+def _join_strings(items: List[String]) -> String:
+ var joined = String()
+ var first = True
+ for item in items:
+ if not first:
+ joined += " "
+ joined += String(item)
+ first = False
+ return joined^
+
+
+def _normalize_text(text: String, mut signals: List[String]) -> String:
+ var normalized = text.lower()
+ if normalized != text:
+ signals.append("lowercase")
+
+ var replaced = normalized
+ replaced = replaced.replace(",", " ")
+ replaced = replaced.replace(".", " ")
+ replaced = replaced.replace("!", " ")
+ replaced = replaced.replace("?", " ")
+ replaced = replaced.replace(":", " ")
+ replaced = replaced.replace(";", " ")
+ replaced = replaced.replace("/", " ")
+ replaced = replaced.replace("\\", " ")
+ replaced = replaced.replace("(", " ")
+ replaced = replaced.replace(")", " ")
+ replaced = replaced.replace("[", " ")
+ replaced = replaced.replace("]", " ")
+ replaced = replaced.replace("{", " ")
+ replaced = replaced.replace("}", " ")
+ replaced = replaced.replace("\"", " ")
+ replaced = replaced.replace("'", " ")
+ replaced = replaced.replace("-", " ")
+ if replaced != normalized:
+ signals.append("punctuation_trimmed")
+
+ var collapsed = _collapse_whitespace(replaced)
+ if collapsed != replaced:
+ signals.append("whitespace_collapsed")
+
+ return collapsed^
+
+
+def _contains_token(items: List[String], token: String) -> Bool:
+ for item in items:
+ if item == token:
+ return True
+ return False
+
+
+def _is_stop_word(token: String) -> Bool:
+ return (
+ token == "a"
+ or token == "an"
+ or token == "and"
+ or token == "for"
+ or token == "from"
+ or token == "in"
+ or token == "me"
+ or token == "near"
+ or token == "of"
+ or token == "on"
+ or token == "the"
+ or token == "to"
+ or token == "with"
+ )
+
+
+def _extract_text_input(input: Value) raises -> String:
+ if not input.is_object():
+ raise Error("query_rewrite input must be a JSON object")
+
+ if _has_key(input, "text"):
+ var text_value = input["text"]
+ if not text_value.is_string():
+ raise Error("query_rewrite input field 'text' must be a string")
+ var collapsed = _collapse_whitespace(text_value.string_value())
+ if collapsed == "":
+ raise Error("query_rewrite input text must not be empty")
+ return collapsed^
+ elif _has_key(input, "query"):
+ var query_value = input["query"]
+ if not query_value.is_string():
+ raise Error("query_rewrite input field 'query' must be a string")
+ var collapsed = _collapse_whitespace(query_value.string_value())
+ if collapsed == "":
+ raise Error("query_rewrite input text must not be empty")
+ return collapsed^
+ else:
+ raise Error("query_rewrite input requires 'text' or 'query'")
+
+
+def _build_output(
+ original_text: String,
+ normalized_text: String,
+ rewritten_text: String,
+ query_terms: List[String],
+ normalization_signals: List[String],
+ ranking_hints: List[String],
+ local_intent: Bool,
+ fulfillment: String,
+ time_window: String,
+) raises -> Value:
+ var output = loads("{}")
+ output.set("original_text", Value(String(original_text)))
+ output.set("normalized_text", Value(String(normalized_text)))
+ output.set("rewritten_text", Value(String(rewritten_text)))
+ output.set("query_terms", _string_array(query_terms))
+ output.set("normalization_signals", _string_array(normalization_signals))
+ output.set("ranking_hints", _string_array(ranking_hints))
+
+ var filters = loads("{}")
+ filters.set("local_intent", Value(local_intent))
+ filters.set("fulfillment", Value(String(fulfillment)))
+ filters.set("time_window", Value(String(time_window)))
+ output.set("extracted_filters", filters)
+ return output^
+
+
+def _build_meta(
+ context: RequestContext,
+ normalization_signals: List[String],
+ ranking_hints: List[String],
+) -> Optional[CoreResponseMeta]:
+ var source_refs = List[ProvenanceSourceRef]()
+ source_refs.append(
+ ProvenanceSourceRef(
+ source_kind="local_input",
+ source_ref="query_rewrite:input",
+ )
+ )
+ if context.scope:
+ source_refs.append(
+ ProvenanceSourceRef(
+ source_kind="request_scope",
+ source_ref="request_context.scope",
+ )
+ )
+
+ var signal_tags = _copy_strings(normalization_signals)
+ for hint in ranking_hints:
+ signal_tags.append(String(hint))
+
+ if context.return_provenance:
+ return CoreResponseMeta(
+ mode="a",
+ backend="heuristic",
+ latency_ms=0,
+ provenance=ExecutionProvenance(
+ kind="deterministic",
+ signal_tags=signal_tags^,
+ source_refs=source_refs^,
+ fallback=None,
+ evidence_set_id=None,
+ ),
+ )
+
+ return CoreResponseMeta(
+ mode="a",
+ backend="heuristic",
+ latency_ms=0,
+ provenance=None,
+ )
+
+
+def execute_query_rewrite(
+ input: Value, context: RequestContext
+) raises -> CapabilityResult:
+ try:
+ var original_text = _extract_text_input(input)
+
+ var normalization_signals = List[String]()
+ var normalized_text = _normalize_text(original_text, normalization_signals)
+ var normalized_tokens = normalized_text.split()
+
+ var query_terms = List[String]()
+ var ranking_hints = List[String]()
+ var local_intent = False
+ var fulfillment = "unspecified"
+ var time_window = "unspecified"
+ var removed_stop_words = False
+ var extracted_filter_tokens = False
+
+ for raw_token in normalized_tokens:
+ var token = String(raw_token)
+ if token == "":
+ continue
+
+ if (
+ token == "near"
+ or token == "me"
+ or token == "nearby"
+ or token == "local"
+ ):
+ local_intent = True
+ extracted_filter_tokens = True
+ continue
+
+ if token == "pickup" or token == "curbside":
+ fulfillment = "pickup"
+ extracted_filter_tokens = True
+ continue
+
+ if token == "delivery" or token == "ship" or token == "shipping":
+ fulfillment = "delivery"
+ extracted_filter_tokens = True
+ continue
+
+ if token == "weekend" or token == "saturday" or token == "sunday":
+ time_window = "weekend"
+ extracted_filter_tokens = True
+ continue
+
+ if _is_stop_word(token):
+ removed_stop_words = True
+ continue
+
+ if not _contains_token(query_terms, token):
+ query_terms.append(token)
+
+ if local_intent:
+ normalization_signals.append("local_intent_detected")
+ ranking_hints.append("prefer_local_results")
+ if fulfillment == "pickup":
+ normalization_signals.append("pickup_filter_detected")
+ ranking_hints.append("prefer_pickup")
+ elif fulfillment == "delivery":
+ normalization_signals.append("delivery_filter_detected")
+ ranking_hints.append("prefer_delivery")
+ if time_window == "weekend":
+ normalization_signals.append("weekend_filter_detected")
+ ranking_hints.append("prefer_weekend_availability")
+ if removed_stop_words:
+ normalization_signals.append("stopwords_removed")
+ if extracted_filter_tokens:
+ normalization_signals.append("filter_tokens_extracted")
+ if context.scope:
+ ranking_hints.append("respect_scope")
+ normalization_signals.append("scope_present")
+
+ if len(query_terms) == 0:
+ query_terms.append(String(normalized_text))
+ normalization_signals.append("fallback_to_normalized_query")
+
+ var rewritten_text = _join_strings(query_terms)
+
+ return successful_capability(
+ _build_output(
+ original_text=original_text,
+ normalized_text=normalized_text,
+ rewritten_text=rewritten_text,
+ query_terms=query_terms,
+ normalization_signals=normalization_signals,
+ ranking_hints=ranking_hints,
+ local_intent=local_intent,
+ fulfillment=fulfillment,
+ time_window=time_window,
+ ),
+ meta=_build_meta(
+ context=context,
+ normalization_signals=normalization_signals,
+ ranking_hints=ranking_hints,
+ ),
+ )
+ except e:
+ return failed_capability(invalid_input_error(String(e)))
diff --git a/src/hyf_core/capabilities/registry.mojo b/src/hyf_core/capabilities/registry.mojo
@@ -17,8 +17,8 @@ def canonical_business_capabilities() -> List[BusinessCapabilityDescriptor]:
BusinessCapabilityDescriptor(
id="query_rewrite",
mode_a_enabled=True,
- implemented=False,
- callable=False,
+ implemented=True,
+ callable=True,
mode_b_available=False,
disabled_reason="",
)
diff --git a/src/hyf_core/errors.mojo b/src/hyf_core/errors.mojo
@@ -23,10 +23,44 @@ struct CapabilityFailure(Copyable, Movable):
var error: CoreError
+@fieldwise_init
+struct CapabilityResult(Copyable, Movable):
+ var success: Optional[CapabilitySuccess]
+ var failure: Optional[CapabilityFailure]
+
+
+def _copy_core_error(error: CoreError) -> CoreError:
+ return CoreError(
+ code=String(error.code),
+ message=String(error.message),
+ retryable=error.retryable,
+ )
+
+
+def successful_capability(
+ output: Value, meta: Optional[CoreResponseMeta] = None
+) -> CapabilityResult:
+ return CapabilityResult(
+ success=CapabilitySuccess(output=output.copy(), meta=meta.copy()),
+ failure=None,
+ )
+
+
+def failed_capability(error: CoreError) -> CapabilityResult:
+ return CapabilityResult(
+ success=None,
+ failure=CapabilityFailure(error=_copy_core_error(error)),
+ )
+
+
def invalid_context_error(message: String) -> CoreError:
return CoreError(code="invalid_context", message=message, retryable=False)
+def invalid_input_error(message: String) -> CoreError:
+ return CoreError(code="invalid_input", message=message, retryable=False)
+
+
def capability_not_implemented_error(capability: String) -> CoreError:
return CoreError(
code="capability_not_implemented",
diff --git a/src/hyf_stdio/control/capabilities.mojo b/src/hyf_stdio/control/capabilities.mojo
@@ -33,9 +33,13 @@ def build_capabilities_output() raises -> Value:
)
value.set(
"implementation_status",
- Value("not_implemented")
- if capability.mode_a_enabled
- else Value("disabled"),
+ Value("implemented")
+ if capability.implemented
+ else (
+ Value("not_implemented")
+ if capability.mode_a_enabled
+ else Value("disabled")
+ ),
)
value.set("callable", Value(capability.callable))
value.set("implemented", Value(capability.implemented))
diff --git a/src/hyf_stdio/control/status.mojo b/src/hyf_stdio/control/status.mojo
@@ -22,7 +22,7 @@ def build_status_output() raises -> Value:
output.set("daemon", Value("hyfd"))
output.set("transport", Value("stdio"))
output.set("request_framing", Value("newline_delimited_json"))
- output.set("implementation_status", Value("bootstrap_control_plane_only"))
+ output.set("implementation_status", Value("bootstrap_partial_mode_a"))
var modes = loads("{}")
modes.set("a", Value(True))
@@ -30,7 +30,7 @@ def build_status_output() raises -> Value:
output.set("enabled_modes", modes)
var backends = loads("{}")
- backends.set("mode_a_deterministic", Value("not_implemented"))
+ backends.set("mode_a_deterministic", Value("partially_available"))
backends.set("mode_b_model_assisted", Value("unavailable"))
output.set("backend_reachability", backends)
diff --git a/src/hyf_stdio/envelope.mojo b/src/hyf_stdio/envelope.mojo
@@ -1,3 +1,5 @@
+from std.collections import Optional
+
from mojson import Value, loads
from mojson.deserialize import Deserializable, get_string
@@ -26,7 +28,7 @@ def _require_request_keys(value: Value) raises:
and key != "input"
):
raise Error(
- "request envelope contains unexpected field '" + key + "'"
+ "request envelope contains unexpected field '" + key + "'"
)
@@ -73,12 +75,15 @@ struct WireRequest(Deserializable, Copyable, Movable):
struct WireSuccessResponse(Copyable, Movable):
var request_id: String
var output: Value
+ var meta: Optional[Value]
def to_json_value(self) raises -> Value:
var value = loads("{}")
value.set("request_id", Value(String(self.request_id)))
value.set("ok", Value(True))
value.set("output", self.output.clone())
+ if self.meta:
+ value.set("meta", self.meta.value().clone())
return value^
diff --git a/src/hyf_stdio/meta.mojo b/src/hyf_stdio/meta.mojo
@@ -0,0 +1,56 @@
+from std.collections import List
+
+from mojson import Value, loads
+
+from hyf_core.provenance import CoreResponseMeta, ExecutionProvenance
+
+
+def _string_array(items: List[String]) raises -> Value:
+ var array = loads("[]")
+ for item in items:
+ array.append(Value(String(item)))
+ return array^
+
+
+def _serialize_provenance(provenance: ExecutionProvenance) raises -> Value:
+ var value = loads("{}")
+ value.set("kind", Value(String(provenance.kind)))
+ value.set("signal_tags", _string_array(provenance.signal_tags))
+
+ var source_refs = loads("[]")
+ for source_ref in provenance.source_refs:
+ var ref_value = loads("{}")
+ ref_value.set("source_kind", Value(String(source_ref.source_kind)))
+ ref_value.set("source_ref", Value(String(source_ref.source_ref)))
+ source_refs.append(ref_value)
+ value.set("source_refs", source_refs)
+
+ if provenance.fallback:
+ var fallback = provenance.fallback.value().copy()
+ var fallback_value = loads("{}")
+ fallback_value.set(
+ "fallback_kind", Value(String(fallback.fallback_kind))
+ )
+ fallback_value.set("reason", Value(String(fallback.reason)))
+ value.set("fallback", fallback_value)
+ else:
+ value.set("fallback", Value(None))
+
+ if provenance.evidence_set_id:
+ value.set(
+ "evidence_set_id", Value(String(provenance.evidence_set_id.value()))
+ )
+ else:
+ value.set("evidence_set_id", Value(None))
+
+ return value^
+
+
+def serialize_core_response_meta(meta: CoreResponseMeta) raises -> Value:
+ var value = loads("{}")
+ value.set("mode", Value(String(meta.mode)))
+ value.set("backend", Value(String(meta.backend)))
+ value.set("latency_ms", Value(meta.latency_ms))
+ if meta.provenance:
+ value.set("provenance", _serialize_provenance(meta.provenance.value()))
+ return value^
diff --git a/src/hyf_stdio/server.mojo b/src/hyf_stdio/server.mojo
@@ -1,10 +1,15 @@
+from std.collections import Optional
from std.io.io import _fdopen
from std.sys import stdin
+from mojson import Value
+
from hyf_core.capabilities.registry import (
is_deferred_capability,
is_known_business_capability,
)
+from hyf_core.capabilities.query_rewrite import execute_query_rewrite
+from hyf_core.errors import CapabilityFailure, CapabilitySuccess
from hyf_stdio.codec import decode_request, encode_error, encode_success
from hyf_stdio.control.capabilities import build_capabilities_output
from hyf_stdio.control.status import build_status_output
@@ -14,12 +19,14 @@ from hyf_stdio.envelope import (
WireSuccessResponse,
)
from hyf_stdio.errors import (
+ WireError,
capability_disabled_error,
capability_unavailable_error,
internal_error,
invalid_request_error,
unsupported_capability_error,
)
+from hyf_stdio.meta import serialize_core_response_meta
def _read_request_line() raises -> String:
@@ -56,6 +63,43 @@ def _write_success(response: WireSuccessResponse) raises:
print(encode_success(response))
+def _wire_error_from_core_failure(
+ request_id: String, failure: CapabilityFailure
+) -> WireErrorResponse:
+ var code = String(failure.error.code)
+ if code == "invalid_input":
+ code = "invalid_request"
+ return WireErrorResponse(
+ request_id=request_id,
+ error=WireError(code=code, message=String(failure.error.message)),
+ )
+
+
+def _wire_success_from_core_success(
+ request_id: String, success: CapabilitySuccess
+) raises -> WireSuccessResponse:
+ var meta: Optional[Value] = None
+ if success.meta:
+ meta = serialize_core_response_meta(success.meta.value())
+ return WireSuccessResponse(
+ request_id=request_id,
+ output=success.output.clone(),
+ meta=meta^,
+ )
+
+
+def _write_query_rewrite(request: WireRequest, request_id: String) raises:
+ var result = execute_query_rewrite(
+ request.input.clone(), request.context.copy()
+ )
+ if result.failure:
+ _write_error(_wire_error_from_core_failure(request_id, result.failure.value()))
+ return
+ _write_success(
+ _wire_success_from_core_success(request_id, result.success.value())
+ )
+
+
def run_stdio_server() raises:
if stdin.isatty():
return
@@ -73,6 +117,7 @@ def run_stdio_server() raises:
WireSuccessResponse(
request_id=request_id,
output=build_status_output(),
+ meta=None,
)
)
elif request.capability == "sys.capabilities":
@@ -80,8 +125,11 @@ def run_stdio_server() raises:
WireSuccessResponse(
request_id=request_id,
output=build_capabilities_output(),
+ meta=None,
)
)
+ elif request.capability == "query_rewrite":
+ _write_query_rewrite(request^, request_id)
elif is_deferred_capability(request.capability):
_write_error(_disabled_response(request))
elif is_known_business_capability(request.capability):