server.mojo (9415B)
1 from std.collections import Optional 2 from std.io.io import _fdopen 3 from std.sys import stdin 4 5 from json import Value 6 7 from hyf_runtime.diagnostics import ( 8 append_internal_diagnostic as append_internal_diagnostic_to_dir, 9 effective_diagnostics_dir_for_runtime_paths, 10 ) 11 from hyf_runtime.startup import ( 12 RuntimeStartupContext, 13 resolve_startup_context_from_process, 14 ) 15 from hyf_core.capabilities.registry import ( 16 canonical_business_capability, 17 ) 18 from hyf_core.errors import ( 19 CapabilityFailure, 20 CapabilityResult, 21 CapabilitySuccess, 22 ) 23 from hyf_core.metadata import hyf_protocol_version 24 from hyf_stdio.codec import ( 25 decode_request, 26 encode_error, 27 encode_success, 28 extract_request_correlation, 29 ) 30 from hyf_stdio.control.capabilities import ( 31 build_capabilities_output_with_runtime_context, 32 ) 33 from hyf_stdio.control.status import ( 34 build_status_output_with_runtime_context, 35 ) 36 from hyf_stdio.envelope import ( 37 WireErrorResponse, 38 WireRequest, 39 WireSuccessResponse, 40 ) 41 from hyf_stdio.errors import ( 42 WireError, 43 capability_disabled_error, 44 capability_unavailable_error, 45 internal_error, 46 invalid_request_error, 47 unsupported_capability_error, 48 ) 49 from hyf_stdio.meta import serialize_core_response_meta 50 from hyf_stdio.provider_execution import ( 51 execute_runtime_aware_business_capability, 52 ) 53 54 55 def _read_request_line() raises -> String: 56 with _fdopen["r"](stdin) as input_file: 57 return input_file.readline() 58 59 60 def _unsupported_response(request: WireRequest) -> WireErrorResponse: 61 return WireErrorResponse( 62 version=hyf_protocol_version(), 63 request_id=String(request.request_id), 64 trace_id=request.trace_id, 65 error=unsupported_capability_error(String(request.capability)), 66 ) 67 68 69 def _disabled_response(request: WireRequest) -> WireErrorResponse: 70 return WireErrorResponse( 71 version=hyf_protocol_version(), 72 request_id=String(request.request_id), 73 trace_id=request.trace_id, 74 error=capability_disabled_error(String(request.capability)), 75 ) 76 77 78 def _unavailable_response(request: WireRequest) -> WireErrorResponse: 79 return WireErrorResponse( 80 version=hyf_protocol_version(), 81 request_id=String(request.request_id), 82 trace_id=request.trace_id, 83 error=capability_unavailable_error(String(request.capability)), 84 ) 85 86 87 def _write_error(response: WireErrorResponse) raises: 88 print(encode_error(response)) 89 90 91 def _write_success(response: WireSuccessResponse) raises: 92 print(encode_success(response)) 93 94 95 def _diagnostic_value(value: String) -> String: 96 return value.replace("\n", "\\n").replace("\r", "\\r") 97 98 99 def _diagnostic_trace_id(trace_id: Optional[String]) -> String: 100 if trace_id: 101 return _diagnostic_value(String(trace_id.value())) 102 return "" 103 104 105 def _emit_internal_diagnostic( 106 request_id: String, 107 trace_id: Optional[String], 108 capability: String, 109 detail: String, 110 diagnostics_dir: String, 111 ): 112 append_internal_diagnostic_to_dir( 113 'hyf_internal_error request_id="' 114 + _diagnostic_value(request_id) 115 + '" trace_id="' 116 + _diagnostic_trace_id(trace_id) 117 + '" capability="' 118 + _diagnostic_value(capability) 119 + '" detail="' 120 + _diagnostic_value(detail) 121 + '"\n', 122 diagnostics_dir, 123 ) 124 125 126 def _wire_error_from_core_failure( 127 request_id: String, 128 trace_id: Optional[String], 129 failure: CapabilityFailure, 130 ) -> WireErrorResponse: 131 var code = String(failure.error.code) 132 if code == "invalid_input": 133 code = "invalid_request" 134 return WireErrorResponse( 135 version=hyf_protocol_version(), 136 request_id=request_id, 137 trace_id=trace_id, 138 error=WireError(code=code, message=String(failure.error.message)), 139 ) 140 141 142 def _wire_success_from_core_success( 143 request_id: String, 144 trace_id: Optional[String], 145 success: CapabilitySuccess, 146 ) raises -> WireSuccessResponse: 147 var meta: Optional[Value] = None 148 if success.meta: 149 meta = serialize_core_response_meta(success.meta.value()) 150 return WireSuccessResponse( 151 version=hyf_protocol_version(), 152 request_id=request_id, 153 trace_id=trace_id, 154 output=success.output.clone(), 155 meta=meta^, 156 ) 157 158 159 def _dispatch_capability_result( 160 request_id: String, 161 trace_id: Optional[String], 162 result: CapabilityResult, 163 ) raises -> String: 164 if result.failure: 165 return encode_error( 166 _wire_error_from_core_failure( 167 request_id, trace_id, result.failure.value() 168 ) 169 ) 170 return encode_success( 171 _wire_success_from_core_success( 172 request_id, trace_id, result.success.value() 173 ) 174 ) 175 176 177 def _dispatch_business_capability( 178 request: WireRequest, 179 request_id: String, 180 runtime_context: RuntimeStartupContext, 181 ) raises -> String: 182 var result = execute_runtime_aware_business_capability( 183 request.capability, 184 request.input.clone(), 185 request.context.copy(), 186 runtime_context, 187 ) 188 return _dispatch_capability_result(request_id, request.trace_id, result) 189 190 191 def _route_business_capability( 192 request: WireRequest, 193 request_id: String, 194 runtime_context: RuntimeStartupContext, 195 ) raises -> String: 196 var capability = canonical_business_capability(request.capability) 197 if not capability: 198 return encode_error(_unsupported_response(request)) 199 200 var descriptor = capability.value().copy() 201 if not descriptor.deterministic_enabled: 202 return encode_error(_disabled_response(request)) 203 204 if descriptor.implemented and descriptor.callable: 205 return _dispatch_business_capability( 206 request, request_id, runtime_context 207 ) 208 209 return encode_error(_unavailable_response(request)) 210 211 212 def handle_request(request: WireRequest) raises -> String: 213 return handle_request_with_runtime_context( 214 request, resolve_startup_context_from_process() 215 ) 216 217 218 def handle_request_with_runtime_context( 219 request: WireRequest, runtime_context: RuntimeStartupContext 220 ) raises -> String: 221 var request_id = String(request.request_id) 222 var trace_id = request.trace_id 223 var diagnostics_dir = effective_diagnostics_dir_for_runtime_paths( 224 runtime_context.paths 225 ) 226 try: 227 if request.capability == "sys.status": 228 return encode_success( 229 WireSuccessResponse( 230 version=hyf_protocol_version(), 231 request_id=request_id, 232 trace_id=trace_id, 233 output=build_status_output_with_runtime_context( 234 runtime_context 235 ), 236 meta=None, 237 ) 238 ) 239 if request.capability == "sys.capabilities": 240 return encode_success( 241 WireSuccessResponse( 242 version=hyf_protocol_version(), 243 request_id=request_id, 244 trace_id=trace_id, 245 output=build_capabilities_output_with_runtime_context( 246 runtime_context 247 ), 248 meta=None, 249 ) 250 ) 251 return _route_business_capability( 252 request.copy(), request_id, runtime_context 253 ) 254 except e: 255 _emit_internal_diagnostic( 256 request_id, 257 trace_id, 258 String(request.capability), 259 String(e), 260 diagnostics_dir, 261 ) 262 return encode_error( 263 WireErrorResponse( 264 version=hyf_protocol_version(), 265 request_id=request_id, 266 trace_id=trace_id, 267 error=internal_error(), 268 ) 269 ) 270 271 272 def handle_request_line(line: String) raises -> String: 273 try: 274 var request = decode_request(line) 275 return handle_request(request^) 276 except e: 277 var correlation = extract_request_correlation(line) 278 return encode_error( 279 WireErrorResponse( 280 version=hyf_protocol_version(), 281 request_id=correlation.request_id, 282 trace_id=correlation.trace_id, 283 error=invalid_request_error(String(e)), 284 ) 285 ) 286 287 288 def handle_request_line_with_runtime_context( 289 line: String, runtime_context: RuntimeStartupContext 290 ) raises -> String: 291 try: 292 var request = decode_request(line) 293 return handle_request_with_runtime_context(request^, runtime_context) 294 except e: 295 var correlation = extract_request_correlation(line) 296 return encode_error( 297 WireErrorResponse( 298 version=hyf_protocol_version(), 299 request_id=correlation.request_id, 300 trace_id=correlation.trace_id, 301 error=invalid_request_error(String(e)), 302 ) 303 ) 304 305 306 def run_stdio_server() raises: 307 run_stdio_server_with_runtime_context( 308 resolve_startup_context_from_process() 309 ) 310 311 312 def run_stdio_server_with_runtime_context( 313 runtime_context: RuntimeStartupContext, 314 ) raises: 315 if stdin.isatty(): 316 return 317 318 try: 319 var line = _read_request_line() 320 321 print(handle_request_line_with_runtime_context(line, runtime_context)) 322 except e: 323 if String(e) == "EOF": 324 return 325 raise e^