channel.c (27991B)
1 #include <assert.h> 2 #include <fcntl.h> 3 #include <inttypes.h> 4 #include <lauxlib.h> 5 #include <stddef.h> 6 #include <stdio.h> 7 #include <string.h> 8 9 #include "klib/kvec.h" 10 #include "nvim/api/private/converter.h" 11 #include "nvim/api/private/defs.h" 12 #include "nvim/api/private/helpers.h" 13 #include "nvim/autocmd.h" 14 #include "nvim/autocmd_defs.h" 15 #include "nvim/buffer_defs.h" 16 #include "nvim/channel.h" 17 #include "nvim/errors.h" 18 #include "nvim/eval.h" 19 #include "nvim/eval/encode.h" 20 #include "nvim/eval/typval.h" 21 #include "nvim/event/loop.h" 22 #include "nvim/event/multiqueue.h" 23 #include "nvim/event/proc.h" 24 #include "nvim/event/rstream.h" 25 #include "nvim/event/socket.h" 26 #include "nvim/event/stream.h" 27 #include "nvim/event/wstream.h" 28 #include "nvim/garray.h" 29 #include "nvim/gettext_defs.h" 30 #include "nvim/globals.h" 31 #include "nvim/log.h" 32 #include "nvim/lua/executor.h" 33 #include "nvim/main.h" 34 #include "nvim/mbyte.h" 35 #include "nvim/memory.h" 36 #include "nvim/memory_defs.h" 37 #include "nvim/message.h" 38 #include "nvim/msgpack_rpc/channel.h" 39 #include "nvim/msgpack_rpc/server.h" 40 #include "nvim/os/fs.h" 41 #include "nvim/os/os_defs.h" 42 #include "nvim/os/shell.h" 43 #include "nvim/terminal.h" 44 #include "nvim/types_defs.h" 45 46 #ifdef MSWIN 47 # include "nvim/os/fs.h" 48 # include "nvim/os/os_win_console.h" 49 # include "nvim/os/pty_conpty_win.h" 50 #endif 51 52 static bool did_stdio = false; 53 54 /// next free id for a job or rpc channel 55 /// 1 is reserved for stdio channel 56 /// 2 is reserved for stderr channel 57 static uint64_t next_chan_id = CHAN_STDERR + 1; 58 59 #include "channel.c.generated.h" 60 61 /// Teardown the module 62 void channel_teardown(void) 63 { 64 Channel *chan; 65 map_foreach_value(&channels, chan, { 66 channel_close(chan->id, kChannelPartAll, NULL); 67 }); 68 } 69 70 #ifdef EXITFREE 71 void channel_free_all_mem(void) 72 { 73 Channel *chan; 74 map_foreach_value(&channels, chan, { 75 channel_destroy(chan); 76 }); 77 map_destroy(uint64_t, &channels); 78 79 callback_free(&on_print); 80 } 81 #endif 82 83 /// Closes a channel 84 /// 85 /// @param id The channel id 86 /// @return true if successful, false otherwise 87 bool channel_close(uint64_t id, ChannelPart part, const char **error) 88 { 89 Channel *chan; 90 Proc *proc; 91 92 const char *dummy; 93 if (!error) { 94 error = &dummy; 95 } 96 97 if (!(chan = find_channel(id))) { 98 if (id < next_chan_id) { 99 // allow double close, even though we can't say what parts was valid. 100 return true; 101 } 102 *error = e_invchan; 103 return false; 104 } 105 106 bool close_main = false; 107 if (part == kChannelPartRpc || part == kChannelPartAll) { 108 close_main = true; 109 if (chan->is_rpc) { 110 rpc_close(chan); 111 } else if (part == kChannelPartRpc) { 112 *error = e_invstream; 113 return false; 114 } 115 } else if ((part == kChannelPartStdin || part == kChannelPartStdout) 116 && chan->is_rpc) { 117 *error = e_invstreamrpc; 118 return false; 119 } 120 121 switch (chan->streamtype) { 122 case kChannelStreamSocket: 123 if (!close_main) { 124 *error = e_invstream; 125 return false; 126 } 127 rstream_may_close(&chan->stream.socket); 128 break; 129 130 case kChannelStreamProc: 131 proc = &chan->stream.proc; 132 if (part == kChannelPartStdin || close_main) { 133 stream_may_close(&proc->in); 134 } 135 if (part == kChannelPartStdout || close_main) { 136 rstream_may_close(&proc->out); 137 } 138 if (part == kChannelPartStderr || part == kChannelPartAll) { 139 rstream_may_close(&proc->err); 140 } 141 if (proc->type == kProcTypePty && part == kChannelPartAll) { 142 pty_proc_close_master(&chan->stream.pty); 143 } 144 145 break; 146 147 case kChannelStreamStdio: 148 if (part == kChannelPartStdin || close_main) { 149 rstream_may_close(&chan->stream.stdio.in); 150 } 151 if (part == kChannelPartStdout || close_main) { 152 stream_may_close(&chan->stream.stdio.out); 153 } 154 if (part == kChannelPartStderr) { 155 *error = e_invstream; 156 return false; 157 } 158 break; 159 160 case kChannelStreamStderr: 161 if (part != kChannelPartAll && part != kChannelPartStderr) { 162 *error = e_invstream; 163 return false; 164 } 165 if (!chan->stream.err.closed) { 166 chan->stream.err.closed = true; 167 // Don't close on exit, in case late error messages 168 if (!exiting) { 169 fclose(stderr); 170 } 171 channel_decref(chan); 172 } 173 break; 174 175 case kChannelStreamInternal: 176 if (!close_main) { 177 *error = e_invstream; 178 return false; 179 } 180 if (chan->term) { 181 api_free_luaref(chan->stream.internal.cb); 182 chan->stream.internal.cb = LUA_NOREF; 183 chan->stream.internal.closed = true; 184 terminal_close(&chan->term, 0); 185 } else { 186 channel_decref(chan); 187 } 188 break; 189 } 190 191 return true; 192 } 193 194 /// Initializes the module 195 void channel_init(void) 196 { 197 channel_alloc(kChannelStreamStderr); 198 rpc_init(); 199 } 200 201 /// Allocates a channel. 202 /// 203 /// Channel is allocated with refcount 1, which should be decreased 204 /// when the underlying stream closes. 205 Channel *channel_alloc(ChannelStreamType type) 206 FUNC_ATTR_NONNULL_RET 207 { 208 Channel *chan = xcalloc(1, sizeof(*chan)); 209 if (type == kChannelStreamStdio) { 210 chan->id = CHAN_STDIO; 211 } else if (type == kChannelStreamStderr) { 212 chan->id = CHAN_STDERR; 213 } else { 214 chan->id = next_chan_id++; 215 } 216 chan->events = multiqueue_new_child(main_loop.events); 217 chan->refcount = 1; 218 chan->exit_status = -1; 219 chan->streamtype = type; 220 chan->detach = false; 221 assert(chan->id <= VARNUMBER_MAX); 222 pmap_put(uint64_t)(&channels, chan->id, chan); 223 return chan; 224 } 225 226 void channel_create_event(Channel *chan, const char *ext_source) 227 { 228 #ifdef NVIM_LOG_DEBUG 229 const char *source; 230 231 if (ext_source) { 232 // TODO(bfredl): in a future improved traceback solution, 233 // external events should be included. 234 source = ext_source; 235 } else { 236 eval_fmt_source_name_line(IObuff, sizeof(IObuff)); 237 source = IObuff; 238 } 239 240 assert(chan->id <= VARNUMBER_MAX); 241 Arena arena = ARENA_EMPTY; 242 Dict info = channel_info(chan->id, &arena); 243 typval_T tv = TV_INITIAL_VALUE; 244 // TODO(bfredl): do the conversion in one step. Also would be nice 245 // to pretty print top level dict in defined order 246 object_to_vim(DICT_OBJ(info), &tv, NULL); 247 assert(tv.v_type == VAR_DICT); 248 char *str = encode_tv2json(&tv, NULL); 249 ILOG("new channel %" PRIu64 " (%s) : %s", chan->id, source, str); 250 xfree(str); 251 arena_mem_free(arena_finish(&arena)); 252 253 #else 254 (void)ext_source; 255 #endif 256 257 channel_info_changed(chan, true); 258 } 259 260 void channel_incref(Channel *chan) 261 { 262 chan->refcount++; 263 } 264 265 void channel_decref(Channel *chan) 266 { 267 if (!(--chan->refcount)) { 268 // delay free, so that libuv is done with the handles 269 multiqueue_put(main_loop.events, free_channel_event, chan); 270 } 271 } 272 273 void callback_reader_free(CallbackReader *reader) 274 { 275 callback_free(&reader->cb); 276 ga_clear(&reader->buffer); 277 } 278 279 void callback_reader_start(CallbackReader *reader, const char *type) 280 { 281 ga_init(&reader->buffer, sizeof(char *), 32); 282 reader->type = type; 283 } 284 285 static void channel_destroy(Channel *chan) 286 { 287 if (chan->is_rpc) { 288 rpc_free(chan); 289 } 290 291 if (chan->streamtype == kChannelStreamProc) { 292 proc_free(&chan->stream.proc); 293 } 294 295 callback_reader_free(&chan->on_data); 296 callback_reader_free(&chan->on_stderr); 297 callback_free(&chan->on_exit); 298 299 multiqueue_free(chan->events); 300 xfree(chan); 301 } 302 303 static void free_channel_event(void **argv) 304 { 305 Channel *chan = argv[0]; 306 pmap_del(uint64_t)(&channels, chan->id, NULL); 307 channel_destroy(chan); 308 } 309 310 static void channel_destroy_early(Channel *chan) 311 { 312 if ((chan->id != --next_chan_id)) { 313 abort(); 314 } 315 pmap_del(uint64_t)(&channels, chan->id, NULL); 316 chan->id = 0; 317 318 if ((--chan->refcount != 0)) { 319 abort(); 320 } 321 322 // uv will keep a reference to handles until next loop tick, so delay free 323 multiqueue_put(main_loop.events, free_channel_event, chan); 324 } 325 326 static void close_cb(Stream *stream, void *data) 327 { 328 channel_decref(data); 329 } 330 331 /// Starts a job and returns the associated channel 332 /// 333 /// @param[in] argv Arguments vector specifying the command to run, 334 /// NULL-terminated 335 /// @param[in] exepath The path to the executable. If NULL, use `argv[0]`. 336 /// @param[in] on_stdout Callback to read the job's stdout 337 /// @param[in] on_stderr Callback to read the job's stderr 338 /// @param[in] on_exit Callback to receive the job's exit status 339 /// @param[in] pty True if the job should run attached to a pty 340 /// @param[in] rpc True to communicate with the job using msgpack-rpc, 341 /// `on_stdout` is ignored 342 /// @param[in] detach True if the job should not be killed when nvim exits, 343 /// ignored if `pty` is true 344 /// @param[in] stdin_mode Stdin mode. Either kChannelStdinPipe to open a 345 /// channel for stdin or kChannelStdinNull to leave 346 /// stdin disconnected. 347 /// @param[in] cwd Initial working directory for the job. Nvim's working 348 /// directory if `cwd` is NULL 349 /// @param[in] pty_width Width of the pty, ignored if `pty` is false 350 /// @param[in] pty_height Height of the pty, ignored if `pty` is false 351 /// @param[in] env Nvim's configured environment is used if this is NULL, 352 /// otherwise defines all environment variables 353 /// @param[out] status_out 0 for invalid arguments, > 0 for the channel id, 354 /// < 0 if the job can't start 355 /// 356 /// @returns [allocated] channel 357 Channel *channel_job_start(char **argv, const char *exepath, CallbackReader on_stdout, 358 CallbackReader on_stderr, Callback on_exit, bool pty, bool rpc, 359 bool overlapped, bool detach, ChannelStdinMode stdin_mode, 360 const char *cwd, uint16_t pty_width, uint16_t pty_height, dict_T *env, 361 varnumber_T *status_out) 362 { 363 Channel *chan = channel_alloc(kChannelStreamProc); 364 chan->on_data = on_stdout; 365 chan->on_stderr = on_stderr; 366 chan->on_exit = on_exit; 367 368 if (pty) { 369 if (detach) { 370 semsg(_(e_invarg2), "terminal/pty job cannot be detached"); 371 shell_free_argv(argv); 372 if (env) { 373 tv_dict_free(env); 374 } 375 channel_destroy_early(chan); 376 *status_out = 0; 377 return NULL; 378 } 379 chan->stream.pty = pty_proc_init(&main_loop, chan); 380 if (pty_width > 0) { 381 chan->stream.pty.width = pty_width; 382 } 383 if (pty_height > 0) { 384 chan->stream.pty.height = pty_height; 385 } 386 } else { 387 chan->stream.uv = libuv_proc_init(&main_loop, chan); 388 } 389 390 Proc *proc = &chan->stream.proc; 391 proc->argv = argv; 392 proc->exepath = exepath; 393 proc->cb = channel_proc_exit_cb; 394 proc->state_cb = channel_proc_state_cb; 395 proc->events = chan->events; 396 proc->detach = detach; 397 proc->cwd = cwd; 398 proc->env = env; 399 proc->overlapped = overlapped; 400 401 char *cmd = xstrdup(proc_get_exepath(proc)); 402 bool has_out, has_err; 403 if (proc->type == kProcTypePty) { 404 has_out = true; 405 has_err = false; 406 } else { 407 has_out = rpc || callback_reader_set(chan->on_data); 408 has_err = chan->on_stderr.fwd_err || callback_reader_set(chan->on_stderr); 409 } 410 411 bool has_in = stdin_mode == kChannelStdinPipe; 412 413 int status = proc_spawn(proc, has_in, has_out, has_err); 414 if (status) { 415 semsg(_(e_jobspawn), os_strerror(status), cmd); 416 xfree(cmd); 417 if (proc->env) { 418 tv_dict_free(proc->env); 419 } 420 channel_destroy_early(chan); 421 *status_out = proc->status; 422 return NULL; 423 } 424 xfree(cmd); 425 if (proc->env) { 426 tv_dict_free(proc->env); 427 } 428 429 if (has_in) { 430 wstream_init(&proc->in, 0); 431 } 432 if (has_out) { 433 rstream_init(&proc->out); 434 } 435 436 if (rpc) { 437 // the rpc takes over the in and out streams 438 rpc_start(chan); 439 } else { 440 if (has_out) { 441 callback_reader_start(&chan->on_data, "stdout"); 442 rstream_start(&proc->out, on_channel_data, chan); 443 } 444 } 445 446 if (has_err) { 447 callback_reader_start(&chan->on_stderr, "stderr"); 448 rstream_init(&proc->err); 449 rstream_start(&proc->err, on_job_stderr, chan); 450 } 451 452 *status_out = (varnumber_T)chan->id; 453 return chan; 454 } 455 456 uint64_t channel_connect(bool tcp, const char *address, bool rpc, CallbackReader on_output, 457 int timeout, const char **error) 458 { 459 Channel *channel; 460 461 if (!tcp && rpc) { 462 if (server_owns_pipe_address(address)) { 463 // Create a loopback channel. This avoids deadlock if nvim connects to 464 // its own named pipe. 465 channel = channel_alloc(kChannelStreamInternal); 466 channel->stream.internal.cb = LUA_NOREF; 467 rpc_start(channel); 468 goto end; 469 } 470 } 471 472 channel = channel_alloc(kChannelStreamSocket); 473 if (!socket_connect(&main_loop, &channel->stream.socket, 474 tcp, address, timeout, error)) { 475 // Don't use channel_destroy_early() as new channels may have been allocated 476 // by channel_from_connection() while polling for uv events. 477 channel_decref(channel); 478 return 0; 479 } 480 481 channel->stream.socket.s.internal_close_cb = close_cb; 482 channel->stream.socket.s.internal_data = channel; 483 wstream_init(&channel->stream.socket.s, 0); 484 rstream_init(&channel->stream.socket); 485 486 if (rpc) { 487 rpc_start(channel); 488 } else { 489 channel->on_data = on_output; 490 callback_reader_start(&channel->on_data, "data"); 491 rstream_start(&channel->stream.socket, on_channel_data, channel); 492 } 493 494 end: 495 channel_create_event(channel, address); 496 return channel->id; 497 } 498 499 /// Creates an RPC channel from a tcp/pipe socket connection 500 /// 501 /// @param watcher The SocketWatcher ready to accept the connection 502 void channel_from_connection(SocketWatcher *watcher) 503 { 504 Channel *channel = channel_alloc(kChannelStreamSocket); 505 socket_watcher_accept(watcher, &channel->stream.socket); 506 channel->stream.socket.s.internal_close_cb = close_cb; 507 channel->stream.socket.s.internal_data = channel; 508 wstream_init(&channel->stream.socket.s, 0); 509 rstream_init(&channel->stream.socket); 510 rpc_start(channel); 511 channel_create_event(channel, watcher->addr); 512 } 513 514 /// Creates an API channel from stdin/stdout. Used when embedding Nvim. 515 uint64_t channel_from_stdio(bool rpc, CallbackReader on_output, const char **error) 516 FUNC_ATTR_NONNULL_ALL 517 { 518 if (!headless_mode && !embedded_mode) { 519 *error = _("can only be opened in headless mode"); 520 return 0; 521 } 522 523 if (did_stdio) { 524 *error = _("channel was already open"); 525 return 0; 526 } 527 did_stdio = true; 528 529 Channel *channel = channel_alloc(kChannelStreamStdio); 530 531 int stdin_dup_fd = STDIN_FILENO; 532 int stdout_dup_fd = STDOUT_FILENO; 533 #ifdef MSWIN 534 // Strangely, ConPTY doesn't work if stdin and stdout are pipes. So replace 535 // stdin and stdout with CONIN$ and CONOUT$, respectively. 536 if (embedded_mode && os_has_conpty_working()) { 537 stdin_dup_fd = os_dup(STDIN_FILENO); 538 os_set_cloexec(stdin_dup_fd); 539 stdout_dup_fd = os_dup(STDOUT_FILENO); 540 os_set_cloexec(stdout_dup_fd); 541 542 // The server may have no console (spawned with UV_PROCESS_DETACHED for 543 // :detach support). Allocate a hidden one so CONIN$/CONOUT$ and ConPTY 544 // (:terminal) work. 545 if (!GetConsoleWindow()) { 546 AllocConsole(); 547 ShowWindow(GetConsoleWindow(), SW_HIDE); 548 } 549 os_replace_stdin_to_conin(); 550 os_replace_stdout_and_stderr_to_conout(); 551 } 552 #else 553 if (embedded_mode) { 554 // Redirect stdout/stdin (the UI channel) to stderr. Use fnctl(F_DUPFD_CLOEXEC) instead of dup() 555 // to prevent child processes from inheriting the file descriptors, which are used by UIs to 556 // detect when Nvim exits. 557 stdin_dup_fd = fcntl(STDIN_FILENO, F_DUPFD_CLOEXEC, STDERR_FILENO + 1); 558 stdout_dup_fd = fcntl(STDOUT_FILENO, F_DUPFD_CLOEXEC, STDERR_FILENO + 1); 559 dup2(STDERR_FILENO, STDOUT_FILENO); 560 dup2(STDERR_FILENO, STDIN_FILENO); 561 } 562 #endif 563 rstream_init_fd(&main_loop, &channel->stream.stdio.in, stdin_dup_fd); 564 wstream_init_fd(&main_loop, &channel->stream.stdio.out, stdout_dup_fd, 0); 565 566 if (rpc) { 567 rpc_start(channel); 568 } else { 569 channel->on_data = on_output; 570 callback_reader_start(&channel->on_data, "stdin"); 571 rstream_start(&channel->stream.stdio.in, on_channel_data, channel); 572 } 573 574 return channel->id; 575 } 576 577 /// @param data will be consumed 578 size_t channel_send(uint64_t id, char *data, size_t len, bool data_owned, const char **error) 579 FUNC_ATTR_NONNULL_ALL 580 { 581 Channel *chan = find_channel(id); 582 size_t written = 0; 583 if (!chan) { 584 *error = _(e_invchan); 585 goto retfree; 586 } 587 588 if (chan->streamtype == kChannelStreamStderr) { 589 if (chan->stream.err.closed) { 590 *error = _("Can't send data to closed stream"); 591 goto retfree; 592 } 593 // unbuffered write 594 ptrdiff_t wres = os_write(STDERR_FILENO, data, len, false); 595 if (wres >= 0) { 596 written = (size_t)wres; 597 } 598 goto retfree; 599 } 600 601 if (chan->streamtype == kChannelStreamInternal) { 602 if (chan->is_rpc) { 603 *error = _("Can't send raw data to rpc channel"); 604 goto retfree; 605 } 606 if (!chan->term || chan->stream.internal.closed) { 607 *error = _("Can't send data to closed stream"); 608 goto retfree; 609 } 610 terminal_receive(chan->term, data, len); 611 written = len; 612 goto retfree; 613 } 614 615 Stream *in = channel_instream(chan); 616 if (in->closed) { 617 *error = _("Can't send data to closed stream"); 618 goto retfree; 619 } 620 621 if (chan->is_rpc) { 622 *error = _("Can't send raw data to rpc channel"); 623 goto retfree; 624 } 625 626 // write can be delayed indefinitely, so always use an allocated buffer 627 WBuffer *buf = wstream_new_buffer(data_owned ? data : xmemdup(data, len), 628 len, 1, xfree); 629 return wstream_write(in, buf) == 0 ? len : 0; 630 631 retfree: 632 if (data_owned) { 633 xfree(data); 634 } 635 return written; 636 } 637 638 /// Convert binary byte array to a readfile()-style list 639 /// 640 /// @param[in] buf Array to convert. 641 /// @param[in] len Array length. 642 /// 643 /// @return [allocated] Converted list. 644 static inline list_T *buffer_to_tv_list(const char *const buf, const size_t len) 645 FUNC_ATTR_WARN_UNUSED_RESULT FUNC_ATTR_ALWAYS_INLINE 646 { 647 list_T *const l = tv_list_alloc(kListLenMayKnow); 648 // Empty buffer should be represented by [''], encode_list_write() thinks 649 // empty list is fine for the case. 650 tv_list_append_string(l, "", 0); 651 if (len > 0) { 652 encode_list_write(l, buf, len); 653 } 654 return l; 655 } 656 657 size_t on_channel_data(RStream *stream, const char *buf, size_t count, void *data, bool eof) 658 { 659 Channel *chan = data; 660 return on_channel_output(stream, chan, buf, count, eof, &chan->on_data); 661 } 662 663 size_t on_job_stderr(RStream *stream, const char *buf, size_t count, void *data, bool eof) 664 { 665 Channel *chan = data; 666 return on_channel_output(stream, chan, buf, count, eof, &chan->on_stderr); 667 } 668 669 static size_t on_channel_output(RStream *stream, Channel *chan, const char *buf, size_t count, 670 bool eof, CallbackReader *reader) 671 { 672 if (chan->term) { 673 terminal_receive(chan->term, buf, count); 674 } 675 676 if (eof) { 677 reader->eof = true; 678 } 679 680 if (reader->fwd_err && count > 0) { 681 ptrdiff_t wres = os_write(STDERR_FILENO, buf, count, false); 682 return (size_t)MAX(wres, 0); 683 } 684 685 if (callback_reader_set(*reader)) { 686 ga_concat_len(&reader->buffer, buf, count); 687 schedule_channel_event(chan); 688 } 689 690 return count; 691 } 692 693 /// schedule the necessary callbacks to be invoked as a deferred event 694 static void schedule_channel_event(Channel *chan) 695 { 696 if (!chan->callback_scheduled) { 697 if (!chan->callback_busy) { 698 multiqueue_put(chan->events, on_channel_event, chan); 699 channel_incref(chan); 700 } 701 chan->callback_scheduled = true; 702 } 703 } 704 705 static void on_channel_event(void **args) 706 { 707 Channel *chan = (Channel *)args[0]; 708 709 chan->callback_busy = true; 710 chan->callback_scheduled = false; 711 712 int exit_status = chan->exit_status; 713 channel_reader_callbacks(chan, &chan->on_data); 714 channel_reader_callbacks(chan, &chan->on_stderr); 715 if (exit_status > -1) { 716 channel_callback_call(chan, NULL); 717 chan->exit_status = -1; 718 } 719 720 chan->callback_busy = false; 721 if (chan->callback_scheduled) { 722 // further callback was deferred to avoid recursion. 723 multiqueue_put(chan->events, on_channel_event, chan); 724 channel_incref(chan); 725 } 726 727 channel_decref(chan); 728 } 729 730 void channel_reader_callbacks(Channel *chan, CallbackReader *reader) 731 { 732 if (reader->buffered) { 733 if (reader->eof) { 734 if (reader->self) { 735 if (tv_dict_find(reader->self, reader->type, -1) == NULL) { 736 list_T *data = buffer_to_tv_list(reader->buffer.ga_data, 737 (size_t)reader->buffer.ga_len); 738 tv_dict_add_list(reader->self, reader->type, strlen(reader->type), 739 data); 740 } else { 741 semsg(_(e_streamkey), reader->type, chan->id); 742 } 743 } else { 744 channel_callback_call(chan, reader); 745 } 746 reader->eof = false; 747 } 748 } else { 749 bool is_eof = reader->eof; 750 if (reader->buffer.ga_len > 0) { 751 channel_callback_call(chan, reader); 752 } 753 // if the stream reached eof, invoke extra callback with no data 754 if (is_eof) { 755 channel_callback_call(chan, reader); 756 reader->eof = false; 757 } 758 } 759 } 760 761 static void channel_proc_exit_cb(Proc *proc, int status, void *data) 762 { 763 Channel *chan = data; 764 if (chan->term) { 765 terminal_close(&chan->term, status); 766 } 767 768 // If process did not exit, we only closed the handle of a detached process. 769 bool exited = (status >= 0); 770 if (exited && chan->on_exit.type != kCallbackNone) { 771 schedule_channel_event(chan); 772 chan->exit_status = status; 773 } 774 775 channel_decref(chan); 776 } 777 778 static void channel_proc_state_cb(Proc *proc, bool suspended, void *data) 779 { 780 Channel *chan = data; 781 if (chan->term) { 782 terminal_set_state(chan->term, suspended); 783 } 784 } 785 786 static void channel_callback_call(Channel *chan, CallbackReader *reader) 787 { 788 Callback *cb; 789 typval_T argv[4]; 790 791 argv[0].v_type = VAR_NUMBER; 792 argv[0].v_lock = VAR_UNLOCKED; 793 argv[0].vval.v_number = (varnumber_T)chan->id; 794 795 if (reader) { 796 argv[1].v_type = VAR_LIST; 797 argv[1].v_lock = VAR_UNLOCKED; 798 argv[1].vval.v_list = buffer_to_tv_list(reader->buffer.ga_data, 799 (size_t)reader->buffer.ga_len); 800 tv_list_ref(argv[1].vval.v_list); 801 ga_clear(&reader->buffer); 802 cb = &reader->cb; 803 argv[2].vval.v_string = (char *)reader->type; 804 } else { 805 argv[1].v_type = VAR_NUMBER; 806 argv[1].v_lock = VAR_UNLOCKED; 807 argv[1].vval.v_number = chan->exit_status; 808 cb = &chan->on_exit; 809 argv[2].vval.v_string = "exit"; 810 } 811 812 argv[2].v_type = VAR_STRING; 813 argv[2].v_lock = VAR_UNLOCKED; 814 815 typval_T rettv = TV_INITIAL_VALUE; 816 callback_call(cb, 3, argv, &rettv); 817 tv_clear(&rettv); 818 819 if (reader) { 820 tv_list_unref(argv[1].vval.v_list); 821 } 822 } 823 824 /// Allocate terminal for channel 825 /// 826 /// Channel `chan` is assumed to be an open pty channel, 827 /// and `buf` is assumed to be a new, unmodified buffer. 828 void channel_terminal_alloc(buf_T *buf, Channel *chan) 829 { 830 TerminalOptions topts = { 831 .data = chan, 832 .width = chan->stream.pty.width, 833 .height = chan->stream.pty.height, 834 .write_cb = term_write, 835 .resize_cb = term_resize, 836 .resume_cb = term_resume, 837 .close_cb = term_close, 838 .force_crlf = false, 839 }; 840 buf->b_p_channel = (OptInt)chan->id; // 'channel' option 841 channel_incref(chan); 842 chan->term = terminal_alloc(buf, topts); 843 } 844 845 static void term_write(const char *buf, size_t size, void *data) 846 { 847 Channel *chan = data; 848 if (chan->stream.proc.in.closed) { 849 // If the backing stream was closed abruptly, there may be write events 850 // ahead of the terminal close event. Just ignore the writes. 851 ILOG("write failed: stream is closed"); 852 return; 853 } 854 WBuffer *wbuf = wstream_new_buffer(xmemdup(buf, size), size, 1, xfree); 855 wstream_write(&chan->stream.proc.in, wbuf); 856 } 857 858 static void term_resize(uint16_t width, uint16_t height, void *data) 859 { 860 Channel *chan = data; 861 pty_proc_resize(&chan->stream.pty, width, height); 862 } 863 864 static void term_resume(void *data) 865 { 866 Channel *chan = data; 867 pty_proc_resume(&chan->stream.pty); 868 } 869 870 static inline void term_delayed_free(void **argv) 871 { 872 Channel *chan = argv[0]; 873 if (chan->stream.proc.in.pending_reqs || chan->stream.proc.out.s.pending_reqs) { 874 multiqueue_put(chan->events, term_delayed_free, chan); 875 return; 876 } 877 878 if (chan->term) { 879 terminal_destroy(&chan->term); 880 } 881 channel_decref(chan); 882 } 883 884 static void term_close(void *data) 885 { 886 Channel *chan = data; 887 proc_stop(&chan->stream.proc); 888 multiqueue_put(chan->events, term_delayed_free, data); 889 } 890 891 void channel_info_changed(Channel *chan, bool new_chan) 892 { 893 event_T event = new_chan ? EVENT_CHANOPEN : EVENT_CHANINFO; 894 if (has_event(event)) { 895 channel_incref(chan); 896 multiqueue_put(main_loop.events, set_info_event, chan, (void *)(intptr_t)event); 897 } 898 } 899 900 static void set_info_event(void **argv) 901 { 902 Channel *chan = argv[0]; 903 event_T event = (event_T)(ptrdiff_t)argv[1]; 904 905 save_v_event_T save_v_event; 906 dict_T *dict = get_v_event(&save_v_event); 907 Arena arena = ARENA_EMPTY; 908 Dict info = channel_info(chan->id, &arena); 909 typval_T retval; 910 object_to_vim(DICT_OBJ(info), &retval, NULL); 911 assert(retval.v_type == VAR_DICT); 912 tv_dict_add_dict(dict, S_LEN("info"), retval.vval.v_dict); 913 tv_dict_set_keys_readonly(dict); 914 915 apply_autocmds(event, NULL, NULL, true, curbuf); 916 917 restore_v_event(dict, &save_v_event); 918 arena_mem_free(arena_finish(&arena)); 919 channel_decref(chan); 920 } 921 922 /// Unlike terminal_running(), this returns false immediately after stopping a job. 923 /// However, this always returns false for nvim_open_term() terminals. 924 bool channel_job_running(uint64_t id) 925 { 926 Channel *chan = find_channel(id); 927 return (chan 928 && chan->streamtype == kChannelStreamProc 929 && !proc_is_stopped(&chan->stream.proc)); 930 } 931 932 Dict channel_info(uint64_t id, Arena *arena) 933 { 934 Channel *chan = find_channel(id); 935 if (!chan) { 936 return (Dict)ARRAY_DICT_INIT; 937 } 938 939 Dict info = arena_dict(arena, 8); 940 PUT_C(info, "id", INTEGER_OBJ((Integer)chan->id)); 941 942 const char *stream_desc, *mode_desc; 943 switch (chan->streamtype) { 944 case kChannelStreamProc: { 945 stream_desc = "job"; 946 if (chan->stream.proc.type == kProcTypePty) { 947 const char *name = pty_proc_tty_name(&chan->stream.pty); 948 PUT_C(info, "pty", CSTR_TO_ARENA_OBJ(arena, name)); 949 } 950 951 char **args = chan->stream.proc.argv; 952 Array argv = ARRAY_DICT_INIT; 953 if (args != NULL) { 954 size_t n; 955 for (n = 0; args[n] != NULL; n++) {} 956 argv = arena_array(arena, n); 957 for (size_t i = 0; i < n; i++) { 958 ADD_C(argv, CSTR_AS_OBJ(args[i])); 959 } 960 } 961 PUT_C(info, "argv", ARRAY_OBJ(argv)); 962 break; 963 } 964 965 case kChannelStreamStdio: 966 stream_desc = "stdio"; 967 break; 968 969 case kChannelStreamStderr: 970 stream_desc = "stderr"; 971 break; 972 973 case kChannelStreamInternal: 974 PUT_C(info, "internal", BOOLEAN_OBJ(true)); 975 FALLTHROUGH; 976 977 case kChannelStreamSocket: 978 stream_desc = "socket"; 979 break; 980 } 981 PUT_C(info, "stream", CSTR_AS_OBJ(stream_desc)); 982 983 if (chan->is_rpc) { 984 mode_desc = "rpc"; 985 PUT_C(info, "client", DICT_OBJ(chan->rpc.info)); 986 } else if (chan->term) { 987 mode_desc = "terminal"; 988 PUT_C(info, "buffer", BUFFER_OBJ(terminal_buf(chan->term))); 989 } else { 990 mode_desc = "bytes"; 991 } 992 PUT_C(info, "mode", CSTR_AS_OBJ(mode_desc)); 993 994 return info; 995 } 996 997 /// Simple int64_t comparison function for use with qsort() 998 static int int64_t_cmp(const void *pa, const void *pb) 999 { 1000 const int64_t a = *(const int64_t *)pa; 1001 const int64_t b = *(const int64_t *)pb; 1002 return a == b ? 0 : a > b ? 1 : -1; 1003 } 1004 1005 Array channel_all_info(Arena *arena) 1006 { 1007 // order the items in the array by channel number, for Determinismâ„¢ 1008 kvec_t(int64_t) ids = KV_INITIAL_VALUE; 1009 kv_fixsize_arena(arena, ids, map_size(&channels)); 1010 uint64_t id; 1011 map_foreach_key(&channels, id, { 1012 kv_push(ids, (int64_t)id); 1013 }); 1014 qsort(ids.items, ids.size, sizeof ids.items[0], int64_t_cmp); 1015 1016 Array ret = arena_array(arena, ids.size); 1017 for (size_t i = 0; i < ids.size; i++) { 1018 ADD_C(ret, DICT_OBJ(channel_info((uint64_t)ids.items[i], arena))); 1019 } 1020 return ret; 1021 }