neovim

Neovim text editor
git clone https://git.dasho.dev/neovim.git
Log | Files | Refs | README

rpc_stream.lua (2855B)


      1 ---
      2 --- Reading/writing of msgpack over any of the stream types from `uv_stream.lua`.
      3 --- Does not implement the RPC protocol, see `session.lua` for that.
      4 ---
      5 
      6 local mpack = vim.mpack
      7 
      8 local Response = {}
      9 Response.__index = Response
     10 
     11 function Response.new(rpc_stream, request_id)
     12  return setmetatable({
     13    _rpc_stream = rpc_stream,
     14    _request_id = request_id,
     15  }, Response)
     16 end
     17 
     18 function Response:send(value, is_error)
     19  local data = self._rpc_stream._session:reply(self._request_id)
     20  if is_error then
     21    data = data .. self._rpc_stream._pack(value)
     22    data = data .. self._rpc_stream._pack(mpack.NIL)
     23  else
     24    data = data .. self._rpc_stream._pack(mpack.NIL)
     25    data = data .. self._rpc_stream._pack(value)
     26  end
     27  self._rpc_stream._stream:write(data)
     28 end
     29 
     30 --- Nvim msgpack RPC stream.
     31 ---
     32 --- @class test.RpcStream
     33 --- @field private _stream test.Stream
     34 --- @field private __pack table
     35 local RpcStream = {}
     36 RpcStream.__index = RpcStream
     37 
     38 function RpcStream.new(stream)
     39  return setmetatable({
     40    _stream = stream,
     41    _pack = mpack.Packer(),
     42    _session = mpack.Session({
     43      unpack = mpack.Unpacker({
     44        ext = {
     45          -- Buffer
     46          [0] = function(_c, s)
     47            return mpack.decode(s)
     48          end,
     49          -- Window
     50          [1] = function(_c, s)
     51            return mpack.decode(s)
     52          end,
     53          -- Tabpage
     54          [2] = function(_c, s)
     55            return mpack.decode(s)
     56          end,
     57        },
     58      }),
     59    }),
     60  }, RpcStream)
     61 end
     62 
     63 function RpcStream:write(method, args, response_cb)
     64  local data
     65  if response_cb then
     66    assert(type(response_cb) == 'function')
     67    data = self._session:request(response_cb)
     68  else
     69    data = self._session:notify()
     70  end
     71 
     72  data = data .. self._pack(method) .. self._pack(args)
     73  self._stream:write(data)
     74 end
     75 
     76 function RpcStream:read_start(on_request, on_notification, on_eof)
     77  self._stream:read_start(function(data)
     78    if not data then
     79      return on_eof()
     80    end
     81    local type, id_or_cb, method_or_error, args_or_result
     82    local pos = 1
     83    local len = #data
     84    while pos <= len do
     85      type, id_or_cb, method_or_error, args_or_result, pos = self._session:receive(data, pos)
     86      if type == 'request' or type == 'notification' then
     87        if type == 'request' then
     88          on_request(method_or_error, args_or_result, Response.new(self, id_or_cb))
     89        else
     90          on_notification(method_or_error, args_or_result)
     91        end
     92      elseif type == 'response' then
     93        if method_or_error == mpack.NIL then
     94          method_or_error = nil
     95        else
     96          args_or_result = nil
     97        end
     98        id_or_cb(method_or_error, args_or_result)
     99      end
    100    end
    101  end)
    102 end
    103 
    104 function RpcStream:read_stop()
    105  self._stream:read_stop()
    106 end
    107 
    108 function RpcStream:close(signal, noblock)
    109  self._stream:close(signal, noblock)
    110 end
    111 
    112 return RpcStream