neovim

Neovim text editor
git clone https://git.dasho.dev/neovim.git
Log | Files | Refs | README

uv_stream.lua (7316B)


      1 ---
      2 --- Basic stream types.
      3 --- See `rpc_stream.lua` for the msgpack layer.
      4 ---
      5 
      6 local uv = vim.uv
      7 
      8 --- @class test.Stream
      9 --- @field write fun(self, data: string|string[])
     10 --- @field read_start fun(self, cb: fun(chunk: string))
     11 --- @field read_stop fun(self)
     12 --- @field close fun(self, signal?: string, noblock?: boolean)
     13 
     14 --- Stream over given pipes.
     15 ---
     16 --- @class vim.StdioStream : test.Stream
     17 --- @field private _in uv.uv_pipe_t
     18 --- @field private _out uv.uv_pipe_t
     19 local StdioStream = {}
     20 StdioStream.__index = StdioStream
     21 
     22 function StdioStream.open()
     23  local self = setmetatable({
     24    _in = assert(uv.new_pipe(false)),
     25    _out = assert(uv.new_pipe(false)),
     26  }, StdioStream)
     27  self._in:open(0)
     28  self._out:open(1)
     29  return self
     30 end
     31 
     32 --- @param data string|string[]
     33 function StdioStream:write(data)
     34  self._out:write(data)
     35 end
     36 
     37 function StdioStream:read_start(cb)
     38  self._in:read_start(function(err, chunk)
     39    if err then
     40      error(err)
     41    end
     42    cb(chunk)
     43  end)
     44 end
     45 
     46 function StdioStream:read_stop()
     47  self._in:read_stop()
     48 end
     49 
     50 function StdioStream:close()
     51  self._in:close()
     52  self._out:close()
     53 end
     54 
     55 --- Stream over a named pipe or TCP socket.
     56 ---
     57 --- @class test.SocketStream : test.Stream
     58 --- @field package _stream_error? string
     59 --- @field package _socket uv.uv_pipe_t
     60 local SocketStream = {}
     61 SocketStream.__index = SocketStream
     62 
     63 function SocketStream.open(file)
     64  local socket = assert(uv.new_pipe(false))
     65  local self = setmetatable({
     66    _socket = socket,
     67    _stream_error = nil,
     68  }, SocketStream)
     69  uv.pipe_connect(socket, file, function(err)
     70    uv.stop()
     71    self._stream_error = self._stream_error or err
     72  end)
     73  -- On Windows, writing to the pipe doesn't work if it's not connected yet,
     74  -- so wait for the connect callback to be called.
     75  uv.run()
     76  if self._stream_error then
     77    error(self._stream_error)
     78  end
     79  return self
     80 end
     81 
     82 function SocketStream.connect(host, port)
     83  local socket = assert(uv.new_tcp())
     84  local self = setmetatable({
     85    _socket = socket,
     86    _stream_error = nil,
     87  }, SocketStream)
     88  uv.tcp_connect(socket, host, port, function(err)
     89    self._stream_error = self._stream_error or err
     90  end)
     91  return self
     92 end
     93 
     94 function SocketStream:write(data)
     95  if self._stream_error then
     96    error(self._stream_error)
     97  end
     98  uv.write(self._socket, data, function(err)
     99    if err then
    100      error(self._stream_error or err)
    101    end
    102  end)
    103 end
    104 
    105 function SocketStream:read_start(cb)
    106  if self._stream_error then
    107    error(self._stream_error)
    108  end
    109  uv.read_start(self._socket, function(err, chunk)
    110    if err then
    111      error(err)
    112    end
    113    cb(chunk)
    114  end)
    115 end
    116 
    117 function SocketStream:read_stop()
    118  if self._stream_error then
    119    error(self._stream_error)
    120  end
    121  uv.read_stop(self._socket)
    122 end
    123 
    124 function SocketStream:close()
    125  uv.close(self._socket)
    126 end
    127 
    128 --- Stream over child process stdio.
    129 ---
    130 --- @class test.ProcStream : test.Stream
    131 --- @field private _proc uv.uv_process_t
    132 --- @field private _pid integer
    133 --- @field private _child_stdin uv.uv_pipe_t
    134 --- @field private _child_stdout uv.uv_pipe_t
    135 --- @field private _child_stderr uv.uv_pipe_t
    136 --- @field package _closed integer
    137 --- @field package _on_exit fun(closed: integer?)
    138 --- Collects stdout (if `collect_text=true`). Treats data as text (CRLF converted to LF).
    139 --- @field stdout string
    140 --- Collects stderr as raw data.
    141 --- @field stderr string
    142 --- Gets stderr+stdout as text (CRLF converted to LF).
    143 --- @field output fun(): string
    144 --- @field stdout_eof boolean
    145 --- @field stderr_eof boolean
    146 --- Collects text into the `stdout` field.
    147 --- @field collect_text boolean
    148 --- Exit code
    149 --- @field status integer
    150 --- @field signal integer
    151 local ProcStream = {}
    152 ProcStream.__index = ProcStream
    153 
    154 --- Starts child process specified by `argv`.
    155 ---
    156 --- @param argv string[]
    157 --- @param env string[]?
    158 --- @param io_extra uv.uv_pipe_t?
    159 --- @param on_exit fun(closed: integer?)? Called after the child process exits.
    160 --- `closed` is the timestamp (uv.now()) when close() was called, or nil if it wasn't.
    161 --- @param forward_stderr boolean? Forward child process stderr, otherwise collect it.
    162 --- @return test.ProcStream
    163 function ProcStream.spawn(argv, env, io_extra, on_exit, forward_stderr)
    164  local self = setmetatable({
    165    collect_text = false,
    166    output = function(self)
    167      if not self.collect_text then
    168        error('set collect_text=true')
    169      end
    170      return (self.stderr .. self.stdout):gsub('\r\n', '\n')
    171    end,
    172    stdout = '',
    173    stderr = '',
    174    stdout_eof = false,
    175    stderr_eof = false,
    176    _child_stdin = assert(uv.new_pipe(false)),
    177    _child_stdout = assert(uv.new_pipe(false)),
    178    _child_stderr = assert(uv.new_pipe(false)),
    179    _exiting = false,
    180    _on_exit = on_exit,
    181  }, ProcStream)
    182  local prog = argv[1]
    183  local args = {} --- @type string[]
    184  for i = 2, #argv do
    185    args[#args + 1] = argv[i]
    186  end
    187  local stderr = forward_stderr and 1 or self._child_stderr
    188  --- @diagnostic disable-next-line:missing-fields
    189  self._proc, self._pid = uv.spawn(prog, {
    190    stdio = { self._child_stdin, self._child_stdout, stderr, io_extra },
    191    args = args,
    192    --- @diagnostic disable-next-line:assign-type-mismatch
    193    env = env,
    194  }, function(status, signal)
    195    self.signal = signal
    196    -- "Abort" exit may not set status; force to nonzero in that case.
    197    self.status = (0 ~= (status or 0) or 0 == (signal or 0)) and status or (128 + (signal or 0))
    198    if self._on_exit then
    199      self._on_exit(self._closed)
    200    end
    201  end)
    202 
    203  if not self._proc then
    204    local err = self._pid
    205    error(err)
    206  end
    207 
    208  return self
    209 end
    210 
    211 function ProcStream:write(data)
    212  self._child_stdin:write(data)
    213 end
    214 
    215 function ProcStream:on_read(stream, cb, err, chunk)
    216  if err then
    217    error(err) -- stream read failed?
    218  elseif chunk then
    219    -- Always collect stderr, in case it gives useful info on failure.
    220    if stream == 'stderr' then
    221      self.stderr = self.stderr .. chunk --[[@as string]]
    222    elseif stream == 'stdout' and self.collect_text then
    223      -- Set `stdout` and convert CRLF => LF.
    224      self.stdout = (self.stdout .. chunk):gsub('\r\n', '\n')
    225    end
    226  else
    227    -- stderr_eof/stdout_eof
    228    self[stream .. '_eof'] = true ---@type boolean
    229  end
    230 
    231  -- Handler provided by the caller.
    232  if cb then
    233    cb(chunk)
    234  end
    235 end
    236 
    237 --- Collects output until the process exits.
    238 function ProcStream:wait()
    239  while not (self.stdout_eof and self.stderr_eof and (self.status or self.signal)) do
    240    uv.run('once')
    241  end
    242 end
    243 
    244 function ProcStream:read_start(on_stdout, on_stderr)
    245  self._child_stdout:read_start(function(err, chunk)
    246    self:on_read('stdout', on_stdout, err, chunk)
    247  end)
    248  self._child_stderr:read_start(function(err, chunk)
    249    self:on_read('stderr', on_stderr, err, chunk)
    250  end)
    251 end
    252 
    253 function ProcStream:read_stop()
    254  self._child_stdout:read_stop()
    255  self._child_stderr:read_stop()
    256 end
    257 
    258 function ProcStream:close(signal, noblock)
    259  if self._closed then
    260    return
    261  end
    262  self._closed = uv.now()
    263  self:read_stop()
    264  self._child_stdin:close()
    265  self._child_stdout:close()
    266  self._child_stderr:close()
    267  if type(signal) == 'string' then
    268    self._proc:kill('sig' .. signal)
    269  end
    270  if not noblock then
    271    while self.status == nil do
    272      uv.run 'once'
    273    end
    274    return self.status, self.signal
    275  end
    276 end
    277 
    278 return {
    279  StdioStream = StdioStream,
    280  ProcStream = ProcStream,
    281  SocketStream = SocketStream,
    282 }