uv_stream.lua (7316B)
1 --- 2 --- Basic stream types. 3 --- See `rpc_stream.lua` for the msgpack layer. 4 --- 5 6 local uv = vim.uv 7 8 --- @class test.Stream 9 --- @field write fun(self, data: string|string[]) 10 --- @field read_start fun(self, cb: fun(chunk: string)) 11 --- @field read_stop fun(self) 12 --- @field close fun(self, signal?: string, noblock?: boolean) 13 14 --- Stream over given pipes. 15 --- 16 --- @class vim.StdioStream : test.Stream 17 --- @field private _in uv.uv_pipe_t 18 --- @field private _out uv.uv_pipe_t 19 local StdioStream = {} 20 StdioStream.__index = StdioStream 21 22 function StdioStream.open() 23 local self = setmetatable({ 24 _in = assert(uv.new_pipe(false)), 25 _out = assert(uv.new_pipe(false)), 26 }, StdioStream) 27 self._in:open(0) 28 self._out:open(1) 29 return self 30 end 31 32 --- @param data string|string[] 33 function StdioStream:write(data) 34 self._out:write(data) 35 end 36 37 function StdioStream:read_start(cb) 38 self._in:read_start(function(err, chunk) 39 if err then 40 error(err) 41 end 42 cb(chunk) 43 end) 44 end 45 46 function StdioStream:read_stop() 47 self._in:read_stop() 48 end 49 50 function StdioStream:close() 51 self._in:close() 52 self._out:close() 53 end 54 55 --- Stream over a named pipe or TCP socket. 56 --- 57 --- @class test.SocketStream : test.Stream 58 --- @field package _stream_error? string 59 --- @field package _socket uv.uv_pipe_t 60 local SocketStream = {} 61 SocketStream.__index = SocketStream 62 63 function SocketStream.open(file) 64 local socket = assert(uv.new_pipe(false)) 65 local self = setmetatable({ 66 _socket = socket, 67 _stream_error = nil, 68 }, SocketStream) 69 uv.pipe_connect(socket, file, function(err) 70 uv.stop() 71 self._stream_error = self._stream_error or err 72 end) 73 -- On Windows, writing to the pipe doesn't work if it's not connected yet, 74 -- so wait for the connect callback to be called. 75 uv.run() 76 if self._stream_error then 77 error(self._stream_error) 78 end 79 return self 80 end 81 82 function SocketStream.connect(host, port) 83 local socket = assert(uv.new_tcp()) 84 local self = setmetatable({ 85 _socket = socket, 86 _stream_error = nil, 87 }, SocketStream) 88 uv.tcp_connect(socket, host, port, function(err) 89 self._stream_error = self._stream_error or err 90 end) 91 return self 92 end 93 94 function SocketStream:write(data) 95 if self._stream_error then 96 error(self._stream_error) 97 end 98 uv.write(self._socket, data, function(err) 99 if err then 100 error(self._stream_error or err) 101 end 102 end) 103 end 104 105 function SocketStream:read_start(cb) 106 if self._stream_error then 107 error(self._stream_error) 108 end 109 uv.read_start(self._socket, function(err, chunk) 110 if err then 111 error(err) 112 end 113 cb(chunk) 114 end) 115 end 116 117 function SocketStream:read_stop() 118 if self._stream_error then 119 error(self._stream_error) 120 end 121 uv.read_stop(self._socket) 122 end 123 124 function SocketStream:close() 125 uv.close(self._socket) 126 end 127 128 --- Stream over child process stdio. 129 --- 130 --- @class test.ProcStream : test.Stream 131 --- @field private _proc uv.uv_process_t 132 --- @field private _pid integer 133 --- @field private _child_stdin uv.uv_pipe_t 134 --- @field private _child_stdout uv.uv_pipe_t 135 --- @field private _child_stderr uv.uv_pipe_t 136 --- @field package _closed integer 137 --- @field package _on_exit fun(closed: integer?) 138 --- Collects stdout (if `collect_text=true`). Treats data as text (CRLF converted to LF). 139 --- @field stdout string 140 --- Collects stderr as raw data. 141 --- @field stderr string 142 --- Gets stderr+stdout as text (CRLF converted to LF). 143 --- @field output fun(): string 144 --- @field stdout_eof boolean 145 --- @field stderr_eof boolean 146 --- Collects text into the `stdout` field. 147 --- @field collect_text boolean 148 --- Exit code 149 --- @field status integer 150 --- @field signal integer 151 local ProcStream = {} 152 ProcStream.__index = ProcStream 153 154 --- Starts child process specified by `argv`. 155 --- 156 --- @param argv string[] 157 --- @param env string[]? 158 --- @param io_extra uv.uv_pipe_t? 159 --- @param on_exit fun(closed: integer?)? Called after the child process exits. 160 --- `closed` is the timestamp (uv.now()) when close() was called, or nil if it wasn't. 161 --- @param forward_stderr boolean? Forward child process stderr, otherwise collect it. 162 --- @return test.ProcStream 163 function ProcStream.spawn(argv, env, io_extra, on_exit, forward_stderr) 164 local self = setmetatable({ 165 collect_text = false, 166 output = function(self) 167 if not self.collect_text then 168 error('set collect_text=true') 169 end 170 return (self.stderr .. self.stdout):gsub('\r\n', '\n') 171 end, 172 stdout = '', 173 stderr = '', 174 stdout_eof = false, 175 stderr_eof = false, 176 _child_stdin = assert(uv.new_pipe(false)), 177 _child_stdout = assert(uv.new_pipe(false)), 178 _child_stderr = assert(uv.new_pipe(false)), 179 _exiting = false, 180 _on_exit = on_exit, 181 }, ProcStream) 182 local prog = argv[1] 183 local args = {} --- @type string[] 184 for i = 2, #argv do 185 args[#args + 1] = argv[i] 186 end 187 local stderr = forward_stderr and 1 or self._child_stderr 188 --- @diagnostic disable-next-line:missing-fields 189 self._proc, self._pid = uv.spawn(prog, { 190 stdio = { self._child_stdin, self._child_stdout, stderr, io_extra }, 191 args = args, 192 --- @diagnostic disable-next-line:assign-type-mismatch 193 env = env, 194 }, function(status, signal) 195 self.signal = signal 196 -- "Abort" exit may not set status; force to nonzero in that case. 197 self.status = (0 ~= (status or 0) or 0 == (signal or 0)) and status or (128 + (signal or 0)) 198 if self._on_exit then 199 self._on_exit(self._closed) 200 end 201 end) 202 203 if not self._proc then 204 local err = self._pid 205 error(err) 206 end 207 208 return self 209 end 210 211 function ProcStream:write(data) 212 self._child_stdin:write(data) 213 end 214 215 function ProcStream:on_read(stream, cb, err, chunk) 216 if err then 217 error(err) -- stream read failed? 218 elseif chunk then 219 -- Always collect stderr, in case it gives useful info on failure. 220 if stream == 'stderr' then 221 self.stderr = self.stderr .. chunk --[[@as string]] 222 elseif stream == 'stdout' and self.collect_text then 223 -- Set `stdout` and convert CRLF => LF. 224 self.stdout = (self.stdout .. chunk):gsub('\r\n', '\n') 225 end 226 else 227 -- stderr_eof/stdout_eof 228 self[stream .. '_eof'] = true ---@type boolean 229 end 230 231 -- Handler provided by the caller. 232 if cb then 233 cb(chunk) 234 end 235 end 236 237 --- Collects output until the process exits. 238 function ProcStream:wait() 239 while not (self.stdout_eof and self.stderr_eof and (self.status or self.signal)) do 240 uv.run('once') 241 end 242 end 243 244 function ProcStream:read_start(on_stdout, on_stderr) 245 self._child_stdout:read_start(function(err, chunk) 246 self:on_read('stdout', on_stdout, err, chunk) 247 end) 248 self._child_stderr:read_start(function(err, chunk) 249 self:on_read('stderr', on_stderr, err, chunk) 250 end) 251 end 252 253 function ProcStream:read_stop() 254 self._child_stdout:read_stop() 255 self._child_stderr:read_stop() 256 end 257 258 function ProcStream:close(signal, noblock) 259 if self._closed then 260 return 261 end 262 self._closed = uv.now() 263 self:read_stop() 264 self._child_stdin:close() 265 self._child_stdout:close() 266 self._child_stderr:close() 267 if type(signal) == 'string' then 268 self._proc:kill('sig' .. signal) 269 end 270 if not noblock then 271 while self.status == nil do 272 uv.run 'once' 273 end 274 return self.status, self.signal 275 end 276 end 277 278 return { 279 StdioStream = StdioStream, 280 ProcStream = ProcStream, 281 SocketStream = SocketStream, 282 }