channels_spec.lua (16566B)
1 local t = require('test.testutil') 2 local n = require('test.functional.testnvim')() 3 4 local clear, eq, eval, next_msg, ok, source = n.clear, t.eq, n.eval, n.next_msg, t.ok, n.source 5 local command, fn, api = n.command, n.fn, n.api 6 local feed = n.feed 7 local exec_lua = n.exec_lua 8 local matches = t.matches 9 local sleep = vim.uv.sleep 10 local get_session, set_session = n.get_session, n.set_session 11 local nvim_prog = n.nvim_prog 12 local is_os = t.is_os 13 local retry = t.retry 14 local expect_twostreams = n.expect_twostreams 15 local assert_alive = n.assert_alive 16 local pcall_err = t.pcall_err 17 local skip = t.skip 18 19 describe('channels', function() 20 local init = [[ 21 function! Normalize(data) abort 22 " Windows: remove ^M 23 return type([]) == type(a:data) 24 \ ? mapnew(a:data, 'substitute(v:val, "\r", "", "g")') 25 \ : a:data 26 endfunction 27 function! OnEvent(id, data, event) dict 28 call rpcnotify(1, a:event, a:id, a:data) 29 endfunction 30 ]] 31 before_each(function() 32 clear() 33 source(init) 34 end) 35 36 pending('can connect to socket', function() 37 local server = n.new_session(true) 38 set_session(server) 39 local address = fn.serverlist()[1] 40 local client = n.new_session(true) 41 set_session(client) 42 source(init) 43 44 api.nvim_set_var('address', address) 45 command("let g:id = sockconnect('pipe', address, {'on_data':'OnEvent'})") 46 local id = eval('g:id') 47 ok(id > 0) 48 49 command("call chansend(g:id, msgpackdump([[2,'nvim_set_var',['code',23]]]))") 50 set_session(server) 51 retry(nil, 1000, function() 52 eq(23, api.nvim_get_var('code')) 53 end) 54 set_session(client) 55 56 command("call chansend(g:id, msgpackdump([[0,0,'nvim_eval',['2+3']]]))") 57 58 local res = eval('msgpackdump([[1,0,v:null,5]])') 59 eq({ '\148\001\n\192\005' }, res) 60 eq({ 'notification', 'data', { id, res } }, next_msg()) 61 command("call chansend(g:id, msgpackdump([[2,'nvim_command',['quit']]]))") 62 eq({ 'notification', 'data', { id, { '' } } }, next_msg()) 63 end) 64 65 it('dont crash due to garbage in rpc #23781', function() 66 local client = get_session() 67 local server = n.new_session(true) 68 set_session(server) 69 local address = fn.serverlist()[1] 70 set_session(client) 71 72 api.nvim_set_var('address', address) 73 command("let g:id = sockconnect('pipe', address, {'on_data':'OnEvent'})") 74 local id = eval('g:id') 75 ok(id > 0) 76 77 command("call chansend(g:id, 'F')") 78 eq({ 'notification', 'data', { id, { '' } } }, next_msg()) 79 set_session(server) 80 assert_alive() 81 82 set_session(client) 83 command('call chanclose(g:id)') 84 command("let g:id = sockconnect('pipe', address, {'on_data':'OnEvent'})") 85 id = eval('g:id') 86 ok(id > 0) 87 88 command("call chansend(g:id, msgpackdump([[2, 'redraw', 'F']], 'B')[:-4])") 89 set_session(server) 90 assert_alive() 91 set_session(client) 92 command("call chansend(g:id, 'F')") 93 eq({ 'notification', 'data', { id, { '' } } }, next_msg()) 94 95 set_session(server) 96 assert_alive() 97 set_session(client) 98 command('call chanclose(g:id)') 99 server:close() 100 end) 101 102 it('can use stdio channel', function() 103 source([[ 104 let g:job_opts = { 105 \ 'on_stdout': function('OnEvent'), 106 \ 'on_stderr': function('OnEvent'), 107 \ 'on_exit': function('OnEvent'), 108 \ } 109 ]]) 110 api.nvim_set_var('nvim_prog', nvim_prog) 111 api.nvim_set_var( 112 'code', 113 [[ 114 function! OnEvent(id, data, event) dict 115 let text = string([a:id, a:data, a:event]) 116 call chansend(g:x, text) 117 118 if a:data == [''] 119 call chansend(v:stderr, "*dies*") 120 quit 121 endif 122 endfunction 123 let g:x = stdioopen({'on_stdin':'OnEvent'}) 124 call chansend(x, "hello") 125 ]] 126 ) 127 command( 128 "let g:id = jobstart([ g:nvim_prog, '-u', 'NONE', '-i', 'NONE', '--cmd', 'set noswapfile', '--headless', '--cmd', g:code], g:job_opts)" 129 ) 130 local id = eval('g:id') 131 ok(id > 0) 132 133 eq({ 'notification', 'stdout', { id, { 'hello' } } }, next_msg()) 134 135 command("call chansend(id, 'howdy')") 136 eq({ 'notification', 'stdout', { id, { "[1, ['howdy'], 'stdin']" } } }, next_msg()) 137 138 command('call chansend(id, 0z686f6c61)') 139 eq({ 'notification', 'stdout', { id, { "[1, ['hola'], 'stdin']" } } }, next_msg()) 140 141 command("call chanclose(id, 'stdin')") 142 expect_twostreams({ 143 { 'notification', 'stdout', { id, { "[1, [''], 'stdin']" } } }, 144 { 'notification', 'stdout', { id, { '' } } }, 145 }, { 146 { 'notification', 'stderr', { id, { '*dies*' } } }, 147 { 'notification', 'stderr', { id, { '' } } }, 148 }) 149 eq({ 'notification', 'exit', { 3, 0 } }, next_msg()) 150 end) 151 152 it('can use stdio channel and on_print callback', function() 153 source([[ 154 let g:job_opts = { 155 \ 'on_stdout': function('OnEvent'), 156 \ 'on_stderr': function('OnEvent'), 157 \ 'on_exit': function('OnEvent'), 158 \ } 159 ]]) 160 api.nvim_set_var('nvim_prog', nvim_prog) 161 api.nvim_set_var( 162 'code', 163 [[ 164 function! OnStdin(id, data, event) dict 165 echo string([a:id, a:data, a:event]) 166 if a:data == [''] 167 quit 168 endif 169 endfunction 170 function! OnPrint(text) dict 171 call chansend(g:x, ['OnPrint:' .. a:text]) 172 endfunction 173 let g:x = stdioopen({'on_stdin': funcref('OnStdin'), 'on_print':'OnPrint'}) 174 call chansend(x, "hello") 175 ]] 176 ) 177 command( 178 "let g:id = jobstart([ g:nvim_prog, '-u', 'NONE', '-i', 'NONE', '--cmd', 'set noswapfile', '--headless', '--cmd', g:code], g:job_opts)" 179 ) 180 local id = eval('g:id') 181 ok(id > 0) 182 183 eq({ 'notification', 'stdout', { id, { 'hello' } } }, next_msg()) 184 185 command("call chansend(id, 'howdy')") 186 eq({ 'notification', 'stdout', { id, { "OnPrint:[1, ['howdy'], 'stdin']" } } }, next_msg()) 187 end) 188 189 -- Helper to accumulate PTY stdout data until expected string is received. 190 -- PTY reads are non-atomic and may deliver data in chunks. 191 local function expect_stdout(id, expected) 192 local accumulated = '' 193 while #accumulated < #expected do 194 local msg = next_msg(1000) 195 if not msg then 196 break 197 end 198 eq('notification', msg[1]) 199 eq('stdout', msg[2]) 200 eq(id, msg[3][1]) 201 accumulated = accumulated .. table.concat(msg[3][2], '\n') 202 end 203 eq(expected, accumulated) 204 end 205 206 it('can use stdio channel with pty', function() 207 skip(is_os('win')) 208 source([[ 209 let g:job_opts = { 210 \ 'on_stdout': function('OnEvent'), 211 \ 'on_exit': function('OnEvent'), 212 \ 'pty': v:true, 213 \ } 214 ]]) 215 api.nvim_set_var('nvim_prog', nvim_prog) 216 api.nvim_set_var( 217 'code', 218 [[ 219 function! OnEvent(id, data, event) dict 220 let text = string([a:id, a:data, a:event]) 221 call chansend(g:x, text) 222 endfunction 223 let g:x = stdioopen({'on_stdin':'OnEvent'}) 224 ]] 225 ) 226 command( 227 "let g:id = jobstart([ g:nvim_prog, '-u', 'NONE', '-i', 'NONE', '--cmd', 'set noswapfile', '--headless', '--cmd', g:code], g:job_opts)" 228 ) 229 local id = eval('g:id') 230 ok(id > 0) 231 232 command("call chansend(id, 'TEXT\n')") 233 expect_stdout(id, "TEXT\r\n[1, ['TEXT', ''], 'stdin']") 234 235 command('call chansend(id, 0z426c6f6273210a)') 236 expect_stdout(id, "Blobs!\r\n[1, ['Blobs!', ''], 'stdin']") 237 238 command("call chansend(id, 'neovan')") 239 expect_stdout(id, 'neovan') 240 command("call chansend(id, '\127\127im\n')") 241 expect_stdout(id, "\b \b\b \bim\r\n[1, ['neovim', ''], 'stdin']") 242 243 command("call chansend(id, 'incomplet\004')") 244 245 local bsdlike = is_os('bsd') or is_os('mac') 246 local extra = bsdlike and '^D\008\008' or '' 247 expect_stdout(id, 'incomplet' .. extra .. "[1, ['incomplet'], 'stdin']") 248 249 command("call chansend(id, '\004')") 250 expect_stdout(id, extra .. "[1, [''], 'stdin']") 251 252 -- channel is still open 253 command("call chansend(id, 'hi again!\n')") 254 expect_stdout(id, 'hi again!\r\n') 255 end) 256 257 it('stdio channel can use rpc and stderr simultaneously', function() 258 skip(is_os('win')) 259 source([[ 260 let g:job_opts = { 261 \ 'on_stderr': function('OnEvent'), 262 \ 'on_exit': function('OnEvent'), 263 \ 'rpc': v:true, 264 \ } 265 ]]) 266 api.nvim_set_var('nvim_prog', nvim_prog) 267 api.nvim_set_var( 268 'code', 269 [[ 270 let id = stdioopen({'rpc':v:true}) 271 call rpcnotify(id,"nvim_call_function", "rpcnotify", [1, "message", "hi there!", id]) 272 call chansend(v:stderr, "trouble!") 273 ]] 274 ) 275 command( 276 "let id = jobstart([ g:nvim_prog, '-u', 'NONE', '-i', 'NONE', '--cmd', 'set noswapfile', '--headless', '--cmd', g:code], g:job_opts)" 277 ) 278 eq({ 'notification', 'message', { 'hi there!', 1 } }, next_msg()) 279 eq({ 'notification', 'stderr', { 3, { 'trouble!' } } }, next_msg()) 280 281 eq(30, eval("rpcrequest(id, 'nvim_eval', '[chansend(v:stderr, \"math??\"), 5*6][1]')")) 282 eq({ 'notification', 'stderr', { 3, { 'math??' } } }, next_msg()) 283 284 local _, err = 285 pcall(command, "call rpcrequest(id, 'nvim_command', 'call chanclose(v:stderr, \"stdin\")')") 286 matches('E906: invalid stream for channel', err) 287 288 eq(1, eval("rpcrequest(id, 'nvim_eval', 'chanclose(v:stderr, \"stderr\")')")) 289 eq({ 'notification', 'stderr', { 3, { '' } } }, next_msg()) 290 291 command("call rpcnotify(id, 'nvim_command', 'quit')") 292 eq({ 'notification', 'exit', { 3, 0 } }, next_msg()) 293 end) 294 295 it('stdio channel works with stdout redirected to file #30509', function() 296 t.write_file( 297 'Xstdio_write.vim', 298 [[ 299 let chan = stdioopen({}) 300 call chansend(chan, 'foo') 301 call chansend(chan, 'bar') 302 qall! 303 ]] 304 ) 305 local fd = assert(vim.uv.fs_open('Xstdio_redir', 'w', 420)) 306 local exit_code, exit_signal 307 local handle = vim.uv.spawn(nvim_prog, { 308 args = { '-u', 'NONE', '-i', 'NONE', '--headless', '-S', 'Xstdio_write.vim' }, 309 -- Simulate shell redirection: "nvim ... > Xstdio_redir". #30509 310 stdio = { nil, fd, nil }, 311 }, function(code, signal) 312 vim.uv.stop() 313 exit_code, exit_signal = code, signal 314 end) 315 finally(function() 316 handle:close() 317 vim.uv.fs_close(fd) 318 os.remove('Xstdio_write.vim') 319 os.remove('Xstdio_redir') 320 end) 321 vim.uv.run('default') 322 eq({ 0, 0 }, { exit_code, exit_signal }) 323 eq('foobar', t.read_file('Xstdio_redir')) 324 end) 325 326 it('can use buffered output mode', function() 327 skip(fn.executable('grep') == 0, 'missing "grep" command') 328 source([[ 329 let g:job_opts = { 330 \ 'on_stdout': function('OnEvent'), 331 \ 'on_exit': function('OnEvent'), 332 \ 'stdout_buffered': v:true, 333 \ } 334 ]]) 335 command("let id = jobstart(['grep', '^[0-9]'], g:job_opts)") 336 local id = eval('g:id') 337 338 command([[call chansend(id, "stuff\n10 PRINT \"NVIM\"\nxx")]]) 339 sleep(10) 340 command([[call chansend(id, "xx\n20 GOTO 10\nzz\n")]]) 341 command("call chanclose(id, 'stdin')") 342 343 eq({ 344 'notification', 345 'stdout', 346 { id, { '10 PRINT "NVIM"', '20 GOTO 10', '' } }, 347 }, next_msg()) 348 eq({ 'notification', 'exit', { id, 0 } }, next_msg()) 349 350 command("let id = jobstart(['grep', '^[0-9]'], g:job_opts)") 351 id = eval('g:id') 352 353 command([[call chansend(id, "is no number\nnot at all")]]) 354 command("call chanclose(id, 'stdin')") 355 356 -- works correctly with no output 357 eq({ 'notification', 'stdout', { id, { '' } } }, next_msg()) 358 eq({ 'notification', 'exit', { id, 1 } }, next_msg()) 359 end) 360 361 it('can use buffered output mode with no stream callback', function() 362 skip(fn.executable('grep') == 0, 'missing "grep" command') 363 source([[ 364 function! OnEvent(id, data, event) dict 365 call rpcnotify(1, a:event, a:id, a:data, self.stdout) 366 endfunction 367 let g:job_opts = { 368 \ 'on_exit': function('OnEvent'), 369 \ 'stdout_buffered': v:true, 370 \ } 371 ]]) 372 command("let id = jobstart(['grep', '^[0-9]'], g:job_opts)") 373 local id = eval('g:id') 374 375 command([[call chansend(id, "stuff\n10 PRINT \"NVIM\"\nxx")]]) 376 sleep(10) 377 command([[call chansend(id, "xx\n20 GOTO 10\nzz\n")]]) 378 command("call chanclose(id, 'stdin')") 379 380 eq({ 381 'notification', 382 'exit', 383 { id, 0, { '10 PRINT "NVIM"', '20 GOTO 10', '' } }, 384 }, next_msg()) 385 386 -- if dict is reused the new value is not stored, 387 -- but nvim also does not crash 388 command("let id = jobstart(['cat'], g:job_opts)") 389 id = eval('g:id') 390 391 command([[call chansend(id, "cat text\n")]]) 392 sleep(10) 393 command("call chanclose(id, 'stdin')") 394 395 -- old value was not overwritten 396 eq({ 397 'notification', 398 'exit', 399 { id, 0, { '10 PRINT "NVIM"', '20 GOTO 10', '' } }, 400 }, next_msg()) 401 402 -- and an error was thrown. 403 eq( 404 "E5210: dict key 'stdout' already set for buffered stream in channel " .. id, 405 eval('v:errmsg') 406 ) 407 408 -- reset dictionary 409 source([[ 410 let g:job_opts = { 411 \ 'on_exit': function('OnEvent'), 412 \ 'stdout_buffered': v:true, 413 \ } 414 ]]) 415 command("let id = jobstart(['grep', '^[0-9]'], g:job_opts)") 416 id = eval('g:id') 417 418 command([[call chansend(id, "is no number\nnot at all")]]) 419 command("call chanclose(id, 'stdin')") 420 421 -- works correctly with no output 422 eq({ 'notification', 'exit', { id, 1, { '' } } }, next_msg()) 423 end) 424 425 it('ChanOpen works with vim.wait() from another autocommand #32706', function() 426 exec_lua([[ 427 vim.api.nvim_create_autocmd('ChanOpen', { 428 callback = function(ev) 429 _G.chan = vim.v.event.info.id 430 end, 431 }) 432 vim.api.nvim_create_autocmd('InsertEnter', { 433 buffer = 0, 434 callback = function() 435 local chan = vim.fn.jobstart({ 'cat' }) 436 _G.result = vim.wait(3000, function() 437 return _G.chan == chan 438 end) 439 end, 440 }) 441 ]]) 442 feed('i') 443 retry(nil, 4000, function() 444 eq(true, exec_lua('return _G.result')) 445 end) 446 end) 447 448 it('ChanInfo works with vim.wait() from another autocommand #32706', function() 449 exec_lua([[ 450 vim.api.nvim_create_autocmd('ChanInfo', { 451 callback = function(ev) 452 _G.foo = vim.v.event.info.client.attributes.foo 453 end, 454 }) 455 vim.api.nvim_create_autocmd('InsertEnter', { 456 buffer = 0, 457 callback = function() 458 local chan = vim.fn.jobstart({ 'cat' }) 459 _G.result = vim.wait(3000, function() 460 return _G.foo == 'bar' 461 end) 462 end, 463 }) 464 ]]) 465 feed('i') 466 api.nvim_set_client_info('test', {}, 'remote', {}, { foo = 'bar' }) 467 retry(nil, 4000, function() 468 eq(true, exec_lua('return _G.result')) 469 end) 470 end) 471 472 describe('sockconnect() reports error when connection fails', function() 473 it('in "pipe" mode', function() 474 eq( 475 'Vim:connection failed: connection refused', 476 pcall_err(fn.sockconnect, 'pipe', n.new_pipename()) 477 ) 478 end) 479 480 it('in "tcp" mode', function() 481 eq( 482 'Vim:connection failed: connection refused', 483 pcall_err(fn.sockconnect, 'tcp', '127.0.0.1:0') 484 ) 485 end) 486 487 it('with another connection accepted while polling #37807', function() 488 local server = api.nvim_get_vvar('servername') 489 local invalid_pipe = n.new_pipename() 490 exec_lua(function() 491 vim.defer_fn(function() 492 vim.uv.sleep(50) -- Block the uv event loop. 493 vim.fn.sockconnect('pipe', invalid_pipe) 494 end, 10) 495 end) 496 vim.uv.sleep(20) 497 -- The server uv event loop is currently blocked, so the connection will 498 -- be accepted when sockconnect() polls. 499 local other_session = n.connect(server) 500 eq({ true, { 1000 } }, { other_session:request('nvim_list_wins') }) 501 other_session:close() 502 matches( 503 '^vim.schedule callback: Vim:connection failed: connection refused\n', 504 api.nvim_get_vvar('errmsg') 505 ) 506 end) 507 end) 508 end) 509 510 describe('loopback', function() 511 before_each(function() 512 clear() 513 command("let chan = sockconnect('pipe', v:servername, {'rpc': v:true})") 514 end) 515 516 it('does not crash when sending raw data', function() 517 eq( 518 "Vim(call):Can't send raw data to rpc channel", 519 pcall_err(command, "call chansend(chan, 'test')") 520 ) 521 assert_alive() 522 end) 523 524 it('are released when closed', function() 525 local chans = eval('len(nvim_list_chans())') 526 command('call chanclose(chan)') 527 n.poke_eventloop() -- Process rpc_close_event(). 528 -- Channel has been released after processing free_channel_event(). 529 eq(chans - 1, eval('len(nvim_list_chans())')) 530 end) 531 end)