hyf

Context-aware query service for Radroots
git clone https://radroots.dev/git/hyf.git
Log | Files | Refs | README | LICENSE

server.mojo (9415B)


      1 from std.collections import Optional
      2 from std.io.io import _fdopen
      3 from std.sys import stdin
      4 
      5 from json import Value
      6 
      7 from hyf_runtime.diagnostics import (
      8     append_internal_diagnostic as append_internal_diagnostic_to_dir,
      9     effective_diagnostics_dir_for_runtime_paths,
     10 )
     11 from hyf_runtime.startup import (
     12     RuntimeStartupContext,
     13     resolve_startup_context_from_process,
     14 )
     15 from hyf_core.capabilities.registry import (
     16     canonical_business_capability,
     17 )
     18 from hyf_core.errors import (
     19     CapabilityFailure,
     20     CapabilityResult,
     21     CapabilitySuccess,
     22 )
     23 from hyf_core.metadata import hyf_protocol_version
     24 from hyf_stdio.codec import (
     25     decode_request,
     26     encode_error,
     27     encode_success,
     28     extract_request_correlation,
     29 )
     30 from hyf_stdio.control.capabilities import (
     31     build_capabilities_output_with_runtime_context,
     32 )
     33 from hyf_stdio.control.status import (
     34     build_status_output_with_runtime_context,
     35 )
     36 from hyf_stdio.envelope import (
     37     WireErrorResponse,
     38     WireRequest,
     39     WireSuccessResponse,
     40 )
     41 from hyf_stdio.errors import (
     42     WireError,
     43     capability_disabled_error,
     44     capability_unavailable_error,
     45     internal_error,
     46     invalid_request_error,
     47     unsupported_capability_error,
     48 )
     49 from hyf_stdio.meta import serialize_core_response_meta
     50 from hyf_stdio.provider_execution import (
     51     execute_runtime_aware_business_capability,
     52 )
     53 
     54 
     55 def _read_request_line() raises -> String:
     56     with _fdopen["r"](stdin) as input_file:
     57         return input_file.readline()
     58 
     59 
     60 def _unsupported_response(request: WireRequest) -> WireErrorResponse:
     61     return WireErrorResponse(
     62         version=hyf_protocol_version(),
     63         request_id=String(request.request_id),
     64         trace_id=request.trace_id,
     65         error=unsupported_capability_error(String(request.capability)),
     66     )
     67 
     68 
     69 def _disabled_response(request: WireRequest) -> WireErrorResponse:
     70     return WireErrorResponse(
     71         version=hyf_protocol_version(),
     72         request_id=String(request.request_id),
     73         trace_id=request.trace_id,
     74         error=capability_disabled_error(String(request.capability)),
     75     )
     76 
     77 
     78 def _unavailable_response(request: WireRequest) -> WireErrorResponse:
     79     return WireErrorResponse(
     80         version=hyf_protocol_version(),
     81         request_id=String(request.request_id),
     82         trace_id=request.trace_id,
     83         error=capability_unavailable_error(String(request.capability)),
     84     )
     85 
     86 
     87 def _write_error(response: WireErrorResponse) raises:
     88     print(encode_error(response))
     89 
     90 
     91 def _write_success(response: WireSuccessResponse) raises:
     92     print(encode_success(response))
     93 
     94 
     95 def _diagnostic_value(value: String) -> String:
     96     return value.replace("\n", "\\n").replace("\r", "\\r")
     97 
     98 
     99 def _diagnostic_trace_id(trace_id: Optional[String]) -> String:
    100     if trace_id:
    101         return _diagnostic_value(String(trace_id.value()))
    102     return ""
    103 
    104 
    105 def _emit_internal_diagnostic(
    106     request_id: String,
    107     trace_id: Optional[String],
    108     capability: String,
    109     detail: String,
    110     diagnostics_dir: String,
    111 ):
    112     append_internal_diagnostic_to_dir(
    113         'hyf_internal_error request_id="'
    114         + _diagnostic_value(request_id)
    115         + '" trace_id="'
    116         + _diagnostic_trace_id(trace_id)
    117         + '" capability="'
    118         + _diagnostic_value(capability)
    119         + '" detail="'
    120         + _diagnostic_value(detail)
    121         + '"\n',
    122         diagnostics_dir,
    123     )
    124 
    125 
    126 def _wire_error_from_core_failure(
    127     request_id: String,
    128     trace_id: Optional[String],
    129     failure: CapabilityFailure,
    130 ) -> WireErrorResponse:
    131     var code = String(failure.error.code)
    132     if code == "invalid_input":
    133         code = "invalid_request"
    134     return WireErrorResponse(
    135         version=hyf_protocol_version(),
    136         request_id=request_id,
    137         trace_id=trace_id,
    138         error=WireError(code=code, message=String(failure.error.message)),
    139     )
    140 
    141 
    142 def _wire_success_from_core_success(
    143     request_id: String,
    144     trace_id: Optional[String],
    145     success: CapabilitySuccess,
    146 ) raises -> WireSuccessResponse:
    147     var meta: Optional[Value] = None
    148     if success.meta:
    149         meta = serialize_core_response_meta(success.meta.value())
    150     return WireSuccessResponse(
    151         version=hyf_protocol_version(),
    152         request_id=request_id,
    153         trace_id=trace_id,
    154         output=success.output.clone(),
    155         meta=meta^,
    156     )
    157 
    158 
    159 def _dispatch_capability_result(
    160     request_id: String,
    161     trace_id: Optional[String],
    162     result: CapabilityResult,
    163 ) raises -> String:
    164     if result.failure:
    165         return encode_error(
    166             _wire_error_from_core_failure(
    167                 request_id, trace_id, result.failure.value()
    168             )
    169         )
    170     return encode_success(
    171         _wire_success_from_core_success(
    172             request_id, trace_id, result.success.value()
    173         )
    174     )
    175 
    176 
    177 def _dispatch_business_capability(
    178     request: WireRequest,
    179     request_id: String,
    180     runtime_context: RuntimeStartupContext,
    181 ) raises -> String:
    182     var result = execute_runtime_aware_business_capability(
    183         request.capability,
    184         request.input.clone(),
    185         request.context.copy(),
    186         runtime_context,
    187     )
    188     return _dispatch_capability_result(request_id, request.trace_id, result)
    189 
    190 
    191 def _route_business_capability(
    192     request: WireRequest,
    193     request_id: String,
    194     runtime_context: RuntimeStartupContext,
    195 ) raises -> String:
    196     var capability = canonical_business_capability(request.capability)
    197     if not capability:
    198         return encode_error(_unsupported_response(request))
    199 
    200     var descriptor = capability.value().copy()
    201     if not descriptor.deterministic_enabled:
    202         return encode_error(_disabled_response(request))
    203 
    204     if descriptor.implemented and descriptor.callable:
    205         return _dispatch_business_capability(
    206             request, request_id, runtime_context
    207         )
    208 
    209     return encode_error(_unavailable_response(request))
    210 
    211 
    212 def handle_request(request: WireRequest) raises -> String:
    213     return handle_request_with_runtime_context(
    214         request, resolve_startup_context_from_process()
    215     )
    216 
    217 
    218 def handle_request_with_runtime_context(
    219     request: WireRequest, runtime_context: RuntimeStartupContext
    220 ) raises -> String:
    221     var request_id = String(request.request_id)
    222     var trace_id = request.trace_id
    223     var diagnostics_dir = effective_diagnostics_dir_for_runtime_paths(
    224         runtime_context.paths
    225     )
    226     try:
    227         if request.capability == "sys.status":
    228             return encode_success(
    229                 WireSuccessResponse(
    230                     version=hyf_protocol_version(),
    231                     request_id=request_id,
    232                     trace_id=trace_id,
    233                     output=build_status_output_with_runtime_context(
    234                         runtime_context
    235                     ),
    236                     meta=None,
    237                 )
    238             )
    239         if request.capability == "sys.capabilities":
    240             return encode_success(
    241                 WireSuccessResponse(
    242                     version=hyf_protocol_version(),
    243                     request_id=request_id,
    244                     trace_id=trace_id,
    245                     output=build_capabilities_output_with_runtime_context(
    246                         runtime_context
    247                     ),
    248                     meta=None,
    249                 )
    250             )
    251         return _route_business_capability(
    252             request.copy(), request_id, runtime_context
    253         )
    254     except e:
    255         _emit_internal_diagnostic(
    256             request_id,
    257             trace_id,
    258             String(request.capability),
    259             String(e),
    260             diagnostics_dir,
    261         )
    262         return encode_error(
    263             WireErrorResponse(
    264                 version=hyf_protocol_version(),
    265                 request_id=request_id,
    266                 trace_id=trace_id,
    267                 error=internal_error(),
    268             )
    269         )
    270 
    271 
    272 def handle_request_line(line: String) raises -> String:
    273     try:
    274         var request = decode_request(line)
    275         return handle_request(request^)
    276     except e:
    277         var correlation = extract_request_correlation(line)
    278         return encode_error(
    279             WireErrorResponse(
    280                 version=hyf_protocol_version(),
    281                 request_id=correlation.request_id,
    282                 trace_id=correlation.trace_id,
    283                 error=invalid_request_error(String(e)),
    284             )
    285         )
    286 
    287 
    288 def handle_request_line_with_runtime_context(
    289     line: String, runtime_context: RuntimeStartupContext
    290 ) raises -> String:
    291     try:
    292         var request = decode_request(line)
    293         return handle_request_with_runtime_context(request^, runtime_context)
    294     except e:
    295         var correlation = extract_request_correlation(line)
    296         return encode_error(
    297             WireErrorResponse(
    298                 version=hyf_protocol_version(),
    299                 request_id=correlation.request_id,
    300                 trace_id=correlation.trace_id,
    301                 error=invalid_request_error(String(e)),
    302             )
    303         )
    304 
    305 
    306 def run_stdio_server() raises:
    307     run_stdio_server_with_runtime_context(
    308         resolve_startup_context_from_process()
    309     )
    310 
    311 
    312 def run_stdio_server_with_runtime_context(
    313     runtime_context: RuntimeStartupContext,
    314 ) raises:
    315     if stdin.isatty():
    316         return
    317 
    318     try:
    319         var line = _read_request_line()
    320 
    321         print(handle_request_line_with_runtime_context(line, runtime_context))
    322     except e:
    323         if String(e) == "EOF":
    324             return
    325         raise e^