stdio_process_helper.mojo (5013B)
1 import std.os 2 from std.os import Pipe, Process 3 from std.ffi import CStringSlice, c_int, external_call 4 from std.sys._libc import close 5 from std.tempfile import TemporaryDirectory 6 7 from json import Value, loads 8 9 10 comptime HYF_PATHS_PROFILE_ENV = "HYF_PATHS_PROFILE" 11 comptime HYF_PATHS_REPO_LOCAL_ROOT_ENV = "HYF_PATHS_REPO_LOCAL_ROOT" 12 13 14 struct ScopedEnvVar: 15 var name: String 16 var value: String 17 var previous: String 18 var had_previous: Bool 19 20 def __init__(out self, name: String, value: String): 21 self.name = String(name) 22 self.value = String(value) 23 self.previous = std.os.getenv(name) 24 self.had_previous = self.previous != "" 25 26 def __enter__(mut self) raises: 27 _ = std.os.setenv(self.name, self.value, overwrite=True) 28 29 def __exit__(mut self): 30 if self.had_previous: 31 _ = std.os.setenv(self.name, self.previous, overwrite=True) 32 else: 33 _ = std.os.unsetenv(self.name) 34 35 36 def _dup2(oldfd: c_int, newfd: c_int) -> c_int: 37 return external_call["dup2", c_int](oldfd, newfd) 38 39 40 @always_inline 41 def _fork() -> c_int: 42 return external_call["fork", c_int]() 43 44 45 @always_inline 46 def _exit_child(code: c_int): 47 _ = external_call["_exit", c_int](code) 48 49 50 def _read_pipe_to_string(mut pipe: Pipe) raises -> String: 51 var buffer = InlineArray[Byte, 4096](fill=0) 52 var output = String("") 53 while True: 54 var read = pipe.read_bytes(Span(buffer)) 55 if read == 0: 56 break 57 output += String( 58 from_utf8=Span(ptr=buffer.unsafe_ptr(), length=Int(read)) 59 ) 60 return output^ 61 62 63 def run_stdio_entrypoint( 64 entrypoint: String, request_json: String 65 ) raises -> Value: 66 return run_stdio_entrypoint_with_2_args(entrypoint, request_json, "", "") 67 68 69 def run_stdio_entrypoint( 70 entrypoint: String, request_json: String, arg0: String, arg1: String 71 ) raises -> Value: 72 return run_stdio_entrypoint_with_2_args( 73 entrypoint, request_json, arg0, arg1 74 ) 75 76 77 def run_stdio_entrypoint_with_2_args( 78 entrypoint: String, request_json: String, arg0: String, arg1: String 79 ) raises -> Value: 80 var stdin_pipe = Pipe() 81 var stdout_pipe = Pipe() 82 var output = String("") 83 var command = String("mojo") 84 var include_flag = String("-I") 85 var include_path = String("src") 86 var entrypoint_path = String(entrypoint) 87 var process_arg0 = String(arg0) 88 var process_arg1 = String(arg1) 89 var argv = List[Optional[CStringSlice[ImmutAnyOrigin]]](length=8, fill={}) 90 argv[0] = rebind[CStringSlice[ImmutAnyOrigin]](command.as_c_string_slice()) 91 argv[1] = rebind[CStringSlice[ImmutAnyOrigin]]("run".as_c_string_slice()) 92 argv[2] = rebind[CStringSlice[ImmutAnyOrigin]]( 93 include_flag.as_c_string_slice() 94 ) 95 argv[3] = rebind[CStringSlice[ImmutAnyOrigin]]( 96 include_path.as_c_string_slice() 97 ) 98 argv[4] = rebind[CStringSlice[ImmutAnyOrigin]]( 99 entrypoint_path.as_c_string_slice() 100 ) 101 if process_arg0 != "": 102 argv[5] = rebind[CStringSlice[ImmutAnyOrigin]]( 103 process_arg0.as_c_string_slice() 104 ) 105 if process_arg1 != "": 106 argv[6] = rebind[CStringSlice[ImmutAnyOrigin]]( 107 process_arg1.as_c_string_slice() 108 ) 109 110 var stdin_read_fd = c_int(stdin_pipe.fd_in.value().value) 111 var stdin_write_fd = c_int(stdin_pipe.fd_out.value().value) 112 var stdout_read_fd = c_int(stdout_pipe.fd_in.value().value) 113 var stdout_write_fd = c_int(stdout_pipe.fd_out.value().value) 114 var command_ptr = command.as_c_string_slice().unsafe_ptr() 115 var argv_ptr = argv.unsafe_ptr() 116 117 var pid = _fork() 118 if pid < 0: 119 raise Error("failed to spawn hyf process test child") 120 121 if pid == 0: 122 if _dup2(stdin_read_fd, 0) < 0: 123 _exit_child(c_int(126)) 124 if _dup2(stdout_write_fd, 1) < 0: 125 _exit_child(c_int(126)) 126 _ = close(stdin_read_fd) 127 _ = close(stdin_write_fd) 128 _ = close(stdout_read_fd) 129 _ = close(stdout_write_fd) 130 _ = external_call["execvp", c_int](command_ptr, argv_ptr) 131 _exit_child(c_int(127)) 132 133 stdin_pipe.set_output_only() 134 stdout_pipe.set_input_only() 135 136 stdin_pipe.write_bytes((request_json + "\n").as_bytes()) 137 stdin_pipe.set_input_only() 138 139 output = _read_pipe_to_string(stdout_pipe) 140 stdout_pipe.set_output_only() 141 142 var process = Process(Int(pid)) 143 var status = process.wait() 144 if not status.exit_code or status.exit_code.value() != 0: 145 raise Error("hyf process exited unexpectedly") 146 147 if output == "": 148 raise Error("hyf process returned no stdout payload") 149 return loads(output) 150 151 152 def run_hyf_stdio(request_json: String) raises -> Value: 153 var response = Value(None) 154 with TemporaryDirectory() as temp_dir: 155 with ScopedEnvVar(HYF_PATHS_PROFILE_ENV, "repo_local"): 156 with ScopedEnvVar(HYF_PATHS_REPO_LOCAL_ROOT_ENV, temp_dir): 157 response = run_stdio_entrypoint("src/main.mojo", request_json) 158 return response^