rpc_stream.lua (2855B)
1 --- 2 --- Reading/writing of msgpack over any of the stream types from `uv_stream.lua`. 3 --- Does not implement the RPC protocol, see `session.lua` for that. 4 --- 5 6 local mpack = vim.mpack 7 8 local Response = {} 9 Response.__index = Response 10 11 function Response.new(rpc_stream, request_id) 12 return setmetatable({ 13 _rpc_stream = rpc_stream, 14 _request_id = request_id, 15 }, Response) 16 end 17 18 function Response:send(value, is_error) 19 local data = self._rpc_stream._session:reply(self._request_id) 20 if is_error then 21 data = data .. self._rpc_stream._pack(value) 22 data = data .. self._rpc_stream._pack(mpack.NIL) 23 else 24 data = data .. self._rpc_stream._pack(mpack.NIL) 25 data = data .. self._rpc_stream._pack(value) 26 end 27 self._rpc_stream._stream:write(data) 28 end 29 30 --- Nvim msgpack RPC stream. 31 --- 32 --- @class test.RpcStream 33 --- @field private _stream test.Stream 34 --- @field private __pack table 35 local RpcStream = {} 36 RpcStream.__index = RpcStream 37 38 function RpcStream.new(stream) 39 return setmetatable({ 40 _stream = stream, 41 _pack = mpack.Packer(), 42 _session = mpack.Session({ 43 unpack = mpack.Unpacker({ 44 ext = { 45 -- Buffer 46 [0] = function(_c, s) 47 return mpack.decode(s) 48 end, 49 -- Window 50 [1] = function(_c, s) 51 return mpack.decode(s) 52 end, 53 -- Tabpage 54 [2] = function(_c, s) 55 return mpack.decode(s) 56 end, 57 }, 58 }), 59 }), 60 }, RpcStream) 61 end 62 63 function RpcStream:write(method, args, response_cb) 64 local data 65 if response_cb then 66 assert(type(response_cb) == 'function') 67 data = self._session:request(response_cb) 68 else 69 data = self._session:notify() 70 end 71 72 data = data .. self._pack(method) .. self._pack(args) 73 self._stream:write(data) 74 end 75 76 function RpcStream:read_start(on_request, on_notification, on_eof) 77 self._stream:read_start(function(data) 78 if not data then 79 return on_eof() 80 end 81 local type, id_or_cb, method_or_error, args_or_result 82 local pos = 1 83 local len = #data 84 while pos <= len do 85 type, id_or_cb, method_or_error, args_or_result, pos = self._session:receive(data, pos) 86 if type == 'request' or type == 'notification' then 87 if type == 'request' then 88 on_request(method_or_error, args_or_result, Response.new(self, id_or_cb)) 89 else 90 on_notification(method_or_error, args_or_result) 91 end 92 elseif type == 'response' then 93 if method_or_error == mpack.NIL then 94 method_or_error = nil 95 else 96 args_or_result = nil 97 end 98 id_or_cb(method_or_error, args_or_result) 99 end 100 end 101 end) 102 end 103 104 function RpcStream:read_stop() 105 self._stream:read_stop() 106 end 107 108 function RpcStream:close(signal, noblock) 109 self._stream:close(signal, noblock) 110 end 111 112 return RpcStream