neovim

Neovim text editor
git clone https://git.dasho.dev/neovim.git
Log | Files | Refs | README

channel.c (27991B)


      1 #include <assert.h>
      2 #include <fcntl.h>
      3 #include <inttypes.h>
      4 #include <lauxlib.h>
      5 #include <stddef.h>
      6 #include <stdio.h>
      7 #include <string.h>
      8 
      9 #include "klib/kvec.h"
     10 #include "nvim/api/private/converter.h"
     11 #include "nvim/api/private/defs.h"
     12 #include "nvim/api/private/helpers.h"
     13 #include "nvim/autocmd.h"
     14 #include "nvim/autocmd_defs.h"
     15 #include "nvim/buffer_defs.h"
     16 #include "nvim/channel.h"
     17 #include "nvim/errors.h"
     18 #include "nvim/eval.h"
     19 #include "nvim/eval/encode.h"
     20 #include "nvim/eval/typval.h"
     21 #include "nvim/event/loop.h"
     22 #include "nvim/event/multiqueue.h"
     23 #include "nvim/event/proc.h"
     24 #include "nvim/event/rstream.h"
     25 #include "nvim/event/socket.h"
     26 #include "nvim/event/stream.h"
     27 #include "nvim/event/wstream.h"
     28 #include "nvim/garray.h"
     29 #include "nvim/gettext_defs.h"
     30 #include "nvim/globals.h"
     31 #include "nvim/log.h"
     32 #include "nvim/lua/executor.h"
     33 #include "nvim/main.h"
     34 #include "nvim/mbyte.h"
     35 #include "nvim/memory.h"
     36 #include "nvim/memory_defs.h"
     37 #include "nvim/message.h"
     38 #include "nvim/msgpack_rpc/channel.h"
     39 #include "nvim/msgpack_rpc/server.h"
     40 #include "nvim/os/fs.h"
     41 #include "nvim/os/os_defs.h"
     42 #include "nvim/os/shell.h"
     43 #include "nvim/terminal.h"
     44 #include "nvim/types_defs.h"
     45 
     46 #ifdef MSWIN
     47 # include "nvim/os/fs.h"
     48 # include "nvim/os/os_win_console.h"
     49 # include "nvim/os/pty_conpty_win.h"
     50 #endif
     51 
     52 static bool did_stdio = false;
     53 
     54 /// next free id for a job or rpc channel
     55 /// 1 is reserved for stdio channel
     56 /// 2 is reserved for stderr channel
     57 static uint64_t next_chan_id = CHAN_STDERR + 1;
     58 
     59 #include "channel.c.generated.h"
     60 
     61 /// Teardown the module
     62 void channel_teardown(void)
     63 {
     64  Channel *chan;
     65  map_foreach_value(&channels, chan, {
     66    channel_close(chan->id, kChannelPartAll, NULL);
     67  });
     68 }
     69 
     70 #ifdef EXITFREE
     71 void channel_free_all_mem(void)
     72 {
     73  Channel *chan;
     74  map_foreach_value(&channels, chan, {
     75    channel_destroy(chan);
     76  });
     77  map_destroy(uint64_t, &channels);
     78 
     79  callback_free(&on_print);
     80 }
     81 #endif
     82 
     83 /// Closes a channel
     84 ///
     85 /// @param id The channel id
     86 /// @return true if successful, false otherwise
     87 bool channel_close(uint64_t id, ChannelPart part, const char **error)
     88 {
     89  Channel *chan;
     90  Proc *proc;
     91 
     92  const char *dummy;
     93  if (!error) {
     94    error = &dummy;
     95  }
     96 
     97  if (!(chan = find_channel(id))) {
     98    if (id < next_chan_id) {
     99      // allow double close, even though we can't say what parts was valid.
    100      return true;
    101    }
    102    *error = e_invchan;
    103    return false;
    104  }
    105 
    106  bool close_main = false;
    107  if (part == kChannelPartRpc || part == kChannelPartAll) {
    108    close_main = true;
    109    if (chan->is_rpc) {
    110      rpc_close(chan);
    111    } else if (part == kChannelPartRpc) {
    112      *error = e_invstream;
    113      return false;
    114    }
    115  } else if ((part == kChannelPartStdin || part == kChannelPartStdout)
    116             && chan->is_rpc) {
    117    *error = e_invstreamrpc;
    118    return false;
    119  }
    120 
    121  switch (chan->streamtype) {
    122  case kChannelStreamSocket:
    123    if (!close_main) {
    124      *error = e_invstream;
    125      return false;
    126    }
    127    rstream_may_close(&chan->stream.socket);
    128    break;
    129 
    130  case kChannelStreamProc:
    131    proc = &chan->stream.proc;
    132    if (part == kChannelPartStdin || close_main) {
    133      stream_may_close(&proc->in);
    134    }
    135    if (part == kChannelPartStdout || close_main) {
    136      rstream_may_close(&proc->out);
    137    }
    138    if (part == kChannelPartStderr || part == kChannelPartAll) {
    139      rstream_may_close(&proc->err);
    140    }
    141    if (proc->type == kProcTypePty && part == kChannelPartAll) {
    142      pty_proc_close_master(&chan->stream.pty);
    143    }
    144 
    145    break;
    146 
    147  case kChannelStreamStdio:
    148    if (part == kChannelPartStdin || close_main) {
    149      rstream_may_close(&chan->stream.stdio.in);
    150    }
    151    if (part == kChannelPartStdout || close_main) {
    152      stream_may_close(&chan->stream.stdio.out);
    153    }
    154    if (part == kChannelPartStderr) {
    155      *error = e_invstream;
    156      return false;
    157    }
    158    break;
    159 
    160  case kChannelStreamStderr:
    161    if (part != kChannelPartAll && part != kChannelPartStderr) {
    162      *error = e_invstream;
    163      return false;
    164    }
    165    if (!chan->stream.err.closed) {
    166      chan->stream.err.closed = true;
    167      // Don't close on exit, in case late error messages
    168      if (!exiting) {
    169        fclose(stderr);
    170      }
    171      channel_decref(chan);
    172    }
    173    break;
    174 
    175  case kChannelStreamInternal:
    176    if (!close_main) {
    177      *error = e_invstream;
    178      return false;
    179    }
    180    if (chan->term) {
    181      api_free_luaref(chan->stream.internal.cb);
    182      chan->stream.internal.cb = LUA_NOREF;
    183      chan->stream.internal.closed = true;
    184      terminal_close(&chan->term, 0);
    185    } else {
    186      channel_decref(chan);
    187    }
    188    break;
    189  }
    190 
    191  return true;
    192 }
    193 
    194 /// Initializes the module
    195 void channel_init(void)
    196 {
    197  channel_alloc(kChannelStreamStderr);
    198  rpc_init();
    199 }
    200 
    201 /// Allocates a channel.
    202 ///
    203 /// Channel is allocated with refcount 1, which should be decreased
    204 /// when the underlying stream closes.
    205 Channel *channel_alloc(ChannelStreamType type)
    206  FUNC_ATTR_NONNULL_RET
    207 {
    208  Channel *chan = xcalloc(1, sizeof(*chan));
    209  if (type == kChannelStreamStdio) {
    210    chan->id = CHAN_STDIO;
    211  } else if (type == kChannelStreamStderr) {
    212    chan->id = CHAN_STDERR;
    213  } else {
    214    chan->id = next_chan_id++;
    215  }
    216  chan->events = multiqueue_new_child(main_loop.events);
    217  chan->refcount = 1;
    218  chan->exit_status = -1;
    219  chan->streamtype = type;
    220  chan->detach = false;
    221  assert(chan->id <= VARNUMBER_MAX);
    222  pmap_put(uint64_t)(&channels, chan->id, chan);
    223  return chan;
    224 }
    225 
    226 void channel_create_event(Channel *chan, const char *ext_source)
    227 {
    228 #ifdef NVIM_LOG_DEBUG
    229  const char *source;
    230 
    231  if (ext_source) {
    232    // TODO(bfredl): in a future improved traceback solution,
    233    // external events should be included.
    234    source = ext_source;
    235  } else {
    236    eval_fmt_source_name_line(IObuff, sizeof(IObuff));
    237    source = IObuff;
    238  }
    239 
    240  assert(chan->id <= VARNUMBER_MAX);
    241  Arena arena = ARENA_EMPTY;
    242  Dict info = channel_info(chan->id, &arena);
    243  typval_T tv = TV_INITIAL_VALUE;
    244  // TODO(bfredl): do the conversion in one step. Also would be nice
    245  // to pretty print top level dict in defined order
    246  object_to_vim(DICT_OBJ(info), &tv, NULL);
    247  assert(tv.v_type == VAR_DICT);
    248  char *str = encode_tv2json(&tv, NULL);
    249  ILOG("new channel %" PRIu64 " (%s) : %s", chan->id, source, str);
    250  xfree(str);
    251  arena_mem_free(arena_finish(&arena));
    252 
    253 #else
    254  (void)ext_source;
    255 #endif
    256 
    257  channel_info_changed(chan, true);
    258 }
    259 
    260 void channel_incref(Channel *chan)
    261 {
    262  chan->refcount++;
    263 }
    264 
    265 void channel_decref(Channel *chan)
    266 {
    267  if (!(--chan->refcount)) {
    268    // delay free, so that libuv is done with the handles
    269    multiqueue_put(main_loop.events, free_channel_event, chan);
    270  }
    271 }
    272 
    273 void callback_reader_free(CallbackReader *reader)
    274 {
    275  callback_free(&reader->cb);
    276  ga_clear(&reader->buffer);
    277 }
    278 
    279 void callback_reader_start(CallbackReader *reader, const char *type)
    280 {
    281  ga_init(&reader->buffer, sizeof(char *), 32);
    282  reader->type = type;
    283 }
    284 
    285 static void channel_destroy(Channel *chan)
    286 {
    287  if (chan->is_rpc) {
    288    rpc_free(chan);
    289  }
    290 
    291  if (chan->streamtype == kChannelStreamProc) {
    292    proc_free(&chan->stream.proc);
    293  }
    294 
    295  callback_reader_free(&chan->on_data);
    296  callback_reader_free(&chan->on_stderr);
    297  callback_free(&chan->on_exit);
    298 
    299  multiqueue_free(chan->events);
    300  xfree(chan);
    301 }
    302 
    303 static void free_channel_event(void **argv)
    304 {
    305  Channel *chan = argv[0];
    306  pmap_del(uint64_t)(&channels, chan->id, NULL);
    307  channel_destroy(chan);
    308 }
    309 
    310 static void channel_destroy_early(Channel *chan)
    311 {
    312  if ((chan->id != --next_chan_id)) {
    313    abort();
    314  }
    315  pmap_del(uint64_t)(&channels, chan->id, NULL);
    316  chan->id = 0;
    317 
    318  if ((--chan->refcount != 0)) {
    319    abort();
    320  }
    321 
    322  // uv will keep a reference to handles until next loop tick, so delay free
    323  multiqueue_put(main_loop.events, free_channel_event, chan);
    324 }
    325 
    326 static void close_cb(Stream *stream, void *data)
    327 {
    328  channel_decref(data);
    329 }
    330 
    331 /// Starts a job and returns the associated channel
    332 ///
    333 /// @param[in]  argv  Arguments vector specifying the command to run,
    334 ///                   NULL-terminated
    335 /// @param[in]  exepath  The path to the executable. If NULL, use `argv[0]`.
    336 /// @param[in]  on_stdout  Callback to read the job's stdout
    337 /// @param[in]  on_stderr  Callback to read the job's stderr
    338 /// @param[in]  on_exit  Callback to receive the job's exit status
    339 /// @param[in]  pty  True if the job should run attached to a pty
    340 /// @param[in]  rpc  True to communicate with the job using msgpack-rpc,
    341 ///                  `on_stdout` is ignored
    342 /// @param[in]  detach  True if the job should not be killed when nvim exits,
    343 ///                     ignored if `pty` is true
    344 /// @param[in]  stdin_mode  Stdin mode. Either kChannelStdinPipe to open a
    345 ///                         channel for stdin or kChannelStdinNull to leave
    346 ///                         stdin disconnected.
    347 /// @param[in]  cwd  Initial working directory for the job.  Nvim's working
    348 ///                  directory if `cwd` is NULL
    349 /// @param[in]  pty_width  Width of the pty, ignored if `pty` is false
    350 /// @param[in]  pty_height  Height of the pty, ignored if `pty` is false
    351 /// @param[in]  env  Nvim's configured environment is used if this is NULL,
    352 ///                  otherwise defines all environment variables
    353 /// @param[out]  status_out  0 for invalid arguments, > 0 for the channel id,
    354 ///                          < 0 if the job can't start
    355 ///
    356 /// @returns [allocated] channel
    357 Channel *channel_job_start(char **argv, const char *exepath, CallbackReader on_stdout,
    358                           CallbackReader on_stderr, Callback on_exit, bool pty, bool rpc,
    359                           bool overlapped, bool detach, ChannelStdinMode stdin_mode,
    360                           const char *cwd, uint16_t pty_width, uint16_t pty_height, dict_T *env,
    361                           varnumber_T *status_out)
    362 {
    363  Channel *chan = channel_alloc(kChannelStreamProc);
    364  chan->on_data = on_stdout;
    365  chan->on_stderr = on_stderr;
    366  chan->on_exit = on_exit;
    367 
    368  if (pty) {
    369    if (detach) {
    370      semsg(_(e_invarg2), "terminal/pty job cannot be detached");
    371      shell_free_argv(argv);
    372      if (env) {
    373        tv_dict_free(env);
    374      }
    375      channel_destroy_early(chan);
    376      *status_out = 0;
    377      return NULL;
    378    }
    379    chan->stream.pty = pty_proc_init(&main_loop, chan);
    380    if (pty_width > 0) {
    381      chan->stream.pty.width = pty_width;
    382    }
    383    if (pty_height > 0) {
    384      chan->stream.pty.height = pty_height;
    385    }
    386  } else {
    387    chan->stream.uv = libuv_proc_init(&main_loop, chan);
    388  }
    389 
    390  Proc *proc = &chan->stream.proc;
    391  proc->argv = argv;
    392  proc->exepath = exepath;
    393  proc->cb = channel_proc_exit_cb;
    394  proc->state_cb = channel_proc_state_cb;
    395  proc->events = chan->events;
    396  proc->detach = detach;
    397  proc->cwd = cwd;
    398  proc->env = env;
    399  proc->overlapped = overlapped;
    400 
    401  char *cmd = xstrdup(proc_get_exepath(proc));
    402  bool has_out, has_err;
    403  if (proc->type == kProcTypePty) {
    404    has_out = true;
    405    has_err = false;
    406  } else {
    407    has_out = rpc || callback_reader_set(chan->on_data);
    408    has_err = chan->on_stderr.fwd_err || callback_reader_set(chan->on_stderr);
    409  }
    410 
    411  bool has_in = stdin_mode == kChannelStdinPipe;
    412 
    413  int status = proc_spawn(proc, has_in, has_out, has_err);
    414  if (status) {
    415    semsg(_(e_jobspawn), os_strerror(status), cmd);
    416    xfree(cmd);
    417    if (proc->env) {
    418      tv_dict_free(proc->env);
    419    }
    420    channel_destroy_early(chan);
    421    *status_out = proc->status;
    422    return NULL;
    423  }
    424  xfree(cmd);
    425  if (proc->env) {
    426    tv_dict_free(proc->env);
    427  }
    428 
    429  if (has_in) {
    430    wstream_init(&proc->in, 0);
    431  }
    432  if (has_out) {
    433    rstream_init(&proc->out);
    434  }
    435 
    436  if (rpc) {
    437    // the rpc takes over the in and out streams
    438    rpc_start(chan);
    439  } else {
    440    if (has_out) {
    441      callback_reader_start(&chan->on_data, "stdout");
    442      rstream_start(&proc->out, on_channel_data, chan);
    443    }
    444  }
    445 
    446  if (has_err) {
    447    callback_reader_start(&chan->on_stderr, "stderr");
    448    rstream_init(&proc->err);
    449    rstream_start(&proc->err, on_job_stderr, chan);
    450  }
    451 
    452  *status_out = (varnumber_T)chan->id;
    453  return chan;
    454 }
    455 
    456 uint64_t channel_connect(bool tcp, const char *address, bool rpc, CallbackReader on_output,
    457                         int timeout, const char **error)
    458 {
    459  Channel *channel;
    460 
    461  if (!tcp && rpc) {
    462    if (server_owns_pipe_address(address)) {
    463      // Create a loopback channel. This avoids deadlock if nvim connects to
    464      // its own named pipe.
    465      channel = channel_alloc(kChannelStreamInternal);
    466      channel->stream.internal.cb = LUA_NOREF;
    467      rpc_start(channel);
    468      goto end;
    469    }
    470  }
    471 
    472  channel = channel_alloc(kChannelStreamSocket);
    473  if (!socket_connect(&main_loop, &channel->stream.socket,
    474                      tcp, address, timeout, error)) {
    475    // Don't use channel_destroy_early() as new channels may have been allocated
    476    // by channel_from_connection() while polling for uv events.
    477    channel_decref(channel);
    478    return 0;
    479  }
    480 
    481  channel->stream.socket.s.internal_close_cb = close_cb;
    482  channel->stream.socket.s.internal_data = channel;
    483  wstream_init(&channel->stream.socket.s, 0);
    484  rstream_init(&channel->stream.socket);
    485 
    486  if (rpc) {
    487    rpc_start(channel);
    488  } else {
    489    channel->on_data = on_output;
    490    callback_reader_start(&channel->on_data, "data");
    491    rstream_start(&channel->stream.socket, on_channel_data, channel);
    492  }
    493 
    494 end:
    495  channel_create_event(channel, address);
    496  return channel->id;
    497 }
    498 
    499 /// Creates an RPC channel from a tcp/pipe socket connection
    500 ///
    501 /// @param watcher The SocketWatcher ready to accept the connection
    502 void channel_from_connection(SocketWatcher *watcher)
    503 {
    504  Channel *channel = channel_alloc(kChannelStreamSocket);
    505  socket_watcher_accept(watcher, &channel->stream.socket);
    506  channel->stream.socket.s.internal_close_cb = close_cb;
    507  channel->stream.socket.s.internal_data = channel;
    508  wstream_init(&channel->stream.socket.s, 0);
    509  rstream_init(&channel->stream.socket);
    510  rpc_start(channel);
    511  channel_create_event(channel, watcher->addr);
    512 }
    513 
    514 /// Creates an API channel from stdin/stdout. Used when embedding Nvim.
    515 uint64_t channel_from_stdio(bool rpc, CallbackReader on_output, const char **error)
    516  FUNC_ATTR_NONNULL_ALL
    517 {
    518  if (!headless_mode && !embedded_mode) {
    519    *error = _("can only be opened in headless mode");
    520    return 0;
    521  }
    522 
    523  if (did_stdio) {
    524    *error = _("channel was already open");
    525    return 0;
    526  }
    527  did_stdio = true;
    528 
    529  Channel *channel = channel_alloc(kChannelStreamStdio);
    530 
    531  int stdin_dup_fd = STDIN_FILENO;
    532  int stdout_dup_fd = STDOUT_FILENO;
    533 #ifdef MSWIN
    534  // Strangely, ConPTY doesn't work if stdin and stdout are pipes. So replace
    535  // stdin and stdout with CONIN$ and CONOUT$, respectively.
    536  if (embedded_mode && os_has_conpty_working()) {
    537    stdin_dup_fd = os_dup(STDIN_FILENO);
    538    os_set_cloexec(stdin_dup_fd);
    539    stdout_dup_fd = os_dup(STDOUT_FILENO);
    540    os_set_cloexec(stdout_dup_fd);
    541 
    542    // The server may have no console (spawned with UV_PROCESS_DETACHED for
    543    // :detach support). Allocate a hidden one so CONIN$/CONOUT$ and ConPTY
    544    // (:terminal) work.
    545    if (!GetConsoleWindow()) {
    546      AllocConsole();
    547      ShowWindow(GetConsoleWindow(), SW_HIDE);
    548    }
    549    os_replace_stdin_to_conin();
    550    os_replace_stdout_and_stderr_to_conout();
    551  }
    552 #else
    553  if (embedded_mode) {
    554    // Redirect stdout/stdin (the UI channel) to stderr. Use fnctl(F_DUPFD_CLOEXEC) instead of dup()
    555    // to prevent child processes from inheriting the file descriptors, which are used by UIs to
    556    // detect when Nvim exits.
    557    stdin_dup_fd = fcntl(STDIN_FILENO, F_DUPFD_CLOEXEC, STDERR_FILENO + 1);
    558    stdout_dup_fd = fcntl(STDOUT_FILENO, F_DUPFD_CLOEXEC, STDERR_FILENO + 1);
    559    dup2(STDERR_FILENO, STDOUT_FILENO);
    560    dup2(STDERR_FILENO, STDIN_FILENO);
    561  }
    562 #endif
    563  rstream_init_fd(&main_loop, &channel->stream.stdio.in, stdin_dup_fd);
    564  wstream_init_fd(&main_loop, &channel->stream.stdio.out, stdout_dup_fd, 0);
    565 
    566  if (rpc) {
    567    rpc_start(channel);
    568  } else {
    569    channel->on_data = on_output;
    570    callback_reader_start(&channel->on_data, "stdin");
    571    rstream_start(&channel->stream.stdio.in, on_channel_data, channel);
    572  }
    573 
    574  return channel->id;
    575 }
    576 
    577 /// @param data will be consumed
    578 size_t channel_send(uint64_t id, char *data, size_t len, bool data_owned, const char **error)
    579  FUNC_ATTR_NONNULL_ALL
    580 {
    581  Channel *chan = find_channel(id);
    582  size_t written = 0;
    583  if (!chan) {
    584    *error = _(e_invchan);
    585    goto retfree;
    586  }
    587 
    588  if (chan->streamtype == kChannelStreamStderr) {
    589    if (chan->stream.err.closed) {
    590      *error = _("Can't send data to closed stream");
    591      goto retfree;
    592    }
    593    // unbuffered write
    594    ptrdiff_t wres = os_write(STDERR_FILENO, data, len, false);
    595    if (wres >= 0) {
    596      written = (size_t)wres;
    597    }
    598    goto retfree;
    599  }
    600 
    601  if (chan->streamtype == kChannelStreamInternal) {
    602    if (chan->is_rpc) {
    603      *error = _("Can't send raw data to rpc channel");
    604      goto retfree;
    605    }
    606    if (!chan->term || chan->stream.internal.closed) {
    607      *error = _("Can't send data to closed stream");
    608      goto retfree;
    609    }
    610    terminal_receive(chan->term, data, len);
    611    written = len;
    612    goto retfree;
    613  }
    614 
    615  Stream *in = channel_instream(chan);
    616  if (in->closed) {
    617    *error = _("Can't send data to closed stream");
    618    goto retfree;
    619  }
    620 
    621  if (chan->is_rpc) {
    622    *error = _("Can't send raw data to rpc channel");
    623    goto retfree;
    624  }
    625 
    626  // write can be delayed indefinitely, so always use an allocated buffer
    627  WBuffer *buf = wstream_new_buffer(data_owned ? data : xmemdup(data, len),
    628                                    len, 1, xfree);
    629  return wstream_write(in, buf) == 0 ? len : 0;
    630 
    631 retfree:
    632  if (data_owned) {
    633    xfree(data);
    634  }
    635  return written;
    636 }
    637 
    638 /// Convert binary byte array to a readfile()-style list
    639 ///
    640 /// @param[in]  buf  Array to convert.
    641 /// @param[in]  len  Array length.
    642 ///
    643 /// @return [allocated] Converted list.
    644 static inline list_T *buffer_to_tv_list(const char *const buf, const size_t len)
    645  FUNC_ATTR_WARN_UNUSED_RESULT FUNC_ATTR_ALWAYS_INLINE
    646 {
    647  list_T *const l = tv_list_alloc(kListLenMayKnow);
    648  // Empty buffer should be represented by [''], encode_list_write() thinks
    649  // empty list is fine for the case.
    650  tv_list_append_string(l, "", 0);
    651  if (len > 0) {
    652    encode_list_write(l, buf, len);
    653  }
    654  return l;
    655 }
    656 
    657 size_t on_channel_data(RStream *stream, const char *buf, size_t count, void *data, bool eof)
    658 {
    659  Channel *chan = data;
    660  return on_channel_output(stream, chan, buf, count, eof, &chan->on_data);
    661 }
    662 
    663 size_t on_job_stderr(RStream *stream, const char *buf, size_t count, void *data, bool eof)
    664 {
    665  Channel *chan = data;
    666  return on_channel_output(stream, chan, buf, count, eof, &chan->on_stderr);
    667 }
    668 
    669 static size_t on_channel_output(RStream *stream, Channel *chan, const char *buf, size_t count,
    670                                bool eof, CallbackReader *reader)
    671 {
    672  if (chan->term) {
    673    terminal_receive(chan->term, buf, count);
    674  }
    675 
    676  if (eof) {
    677    reader->eof = true;
    678  }
    679 
    680  if (reader->fwd_err && count > 0) {
    681    ptrdiff_t wres = os_write(STDERR_FILENO, buf, count, false);
    682    return (size_t)MAX(wres, 0);
    683  }
    684 
    685  if (callback_reader_set(*reader)) {
    686    ga_concat_len(&reader->buffer, buf, count);
    687    schedule_channel_event(chan);
    688  }
    689 
    690  return count;
    691 }
    692 
    693 /// schedule the necessary callbacks to be invoked as a deferred event
    694 static void schedule_channel_event(Channel *chan)
    695 {
    696  if (!chan->callback_scheduled) {
    697    if (!chan->callback_busy) {
    698      multiqueue_put(chan->events, on_channel_event, chan);
    699      channel_incref(chan);
    700    }
    701    chan->callback_scheduled = true;
    702  }
    703 }
    704 
    705 static void on_channel_event(void **args)
    706 {
    707  Channel *chan = (Channel *)args[0];
    708 
    709  chan->callback_busy = true;
    710  chan->callback_scheduled = false;
    711 
    712  int exit_status = chan->exit_status;
    713  channel_reader_callbacks(chan, &chan->on_data);
    714  channel_reader_callbacks(chan, &chan->on_stderr);
    715  if (exit_status > -1) {
    716    channel_callback_call(chan, NULL);
    717    chan->exit_status = -1;
    718  }
    719 
    720  chan->callback_busy = false;
    721  if (chan->callback_scheduled) {
    722    // further callback was deferred to avoid recursion.
    723    multiqueue_put(chan->events, on_channel_event, chan);
    724    channel_incref(chan);
    725  }
    726 
    727  channel_decref(chan);
    728 }
    729 
    730 void channel_reader_callbacks(Channel *chan, CallbackReader *reader)
    731 {
    732  if (reader->buffered) {
    733    if (reader->eof) {
    734      if (reader->self) {
    735        if (tv_dict_find(reader->self, reader->type, -1) == NULL) {
    736          list_T *data = buffer_to_tv_list(reader->buffer.ga_data,
    737                                           (size_t)reader->buffer.ga_len);
    738          tv_dict_add_list(reader->self, reader->type, strlen(reader->type),
    739                           data);
    740        } else {
    741          semsg(_(e_streamkey), reader->type, chan->id);
    742        }
    743      } else {
    744        channel_callback_call(chan, reader);
    745      }
    746      reader->eof = false;
    747    }
    748  } else {
    749    bool is_eof = reader->eof;
    750    if (reader->buffer.ga_len > 0) {
    751      channel_callback_call(chan, reader);
    752    }
    753    // if the stream reached eof, invoke extra callback with no data
    754    if (is_eof) {
    755      channel_callback_call(chan, reader);
    756      reader->eof = false;
    757    }
    758  }
    759 }
    760 
    761 static void channel_proc_exit_cb(Proc *proc, int status, void *data)
    762 {
    763  Channel *chan = data;
    764  if (chan->term) {
    765    terminal_close(&chan->term, status);
    766  }
    767 
    768  // If process did not exit, we only closed the handle of a detached process.
    769  bool exited = (status >= 0);
    770  if (exited && chan->on_exit.type != kCallbackNone) {
    771    schedule_channel_event(chan);
    772    chan->exit_status = status;
    773  }
    774 
    775  channel_decref(chan);
    776 }
    777 
    778 static void channel_proc_state_cb(Proc *proc, bool suspended, void *data)
    779 {
    780  Channel *chan = data;
    781  if (chan->term) {
    782    terminal_set_state(chan->term, suspended);
    783  }
    784 }
    785 
    786 static void channel_callback_call(Channel *chan, CallbackReader *reader)
    787 {
    788  Callback *cb;
    789  typval_T argv[4];
    790 
    791  argv[0].v_type = VAR_NUMBER;
    792  argv[0].v_lock = VAR_UNLOCKED;
    793  argv[0].vval.v_number = (varnumber_T)chan->id;
    794 
    795  if (reader) {
    796    argv[1].v_type = VAR_LIST;
    797    argv[1].v_lock = VAR_UNLOCKED;
    798    argv[1].vval.v_list = buffer_to_tv_list(reader->buffer.ga_data,
    799                                            (size_t)reader->buffer.ga_len);
    800    tv_list_ref(argv[1].vval.v_list);
    801    ga_clear(&reader->buffer);
    802    cb = &reader->cb;
    803    argv[2].vval.v_string = (char *)reader->type;
    804  } else {
    805    argv[1].v_type = VAR_NUMBER;
    806    argv[1].v_lock = VAR_UNLOCKED;
    807    argv[1].vval.v_number = chan->exit_status;
    808    cb = &chan->on_exit;
    809    argv[2].vval.v_string = "exit";
    810  }
    811 
    812  argv[2].v_type = VAR_STRING;
    813  argv[2].v_lock = VAR_UNLOCKED;
    814 
    815  typval_T rettv = TV_INITIAL_VALUE;
    816  callback_call(cb, 3, argv, &rettv);
    817  tv_clear(&rettv);
    818 
    819  if (reader) {
    820    tv_list_unref(argv[1].vval.v_list);
    821  }
    822 }
    823 
    824 /// Allocate terminal for channel
    825 ///
    826 /// Channel `chan` is assumed to be an open pty channel,
    827 /// and `buf` is assumed to be a new, unmodified buffer.
    828 void channel_terminal_alloc(buf_T *buf, Channel *chan)
    829 {
    830  TerminalOptions topts = {
    831    .data = chan,
    832    .width = chan->stream.pty.width,
    833    .height = chan->stream.pty.height,
    834    .write_cb = term_write,
    835    .resize_cb = term_resize,
    836    .resume_cb = term_resume,
    837    .close_cb = term_close,
    838    .force_crlf = false,
    839  };
    840  buf->b_p_channel = (OptInt)chan->id;  // 'channel' option
    841  channel_incref(chan);
    842  chan->term = terminal_alloc(buf, topts);
    843 }
    844 
    845 static void term_write(const char *buf, size_t size, void *data)
    846 {
    847  Channel *chan = data;
    848  if (chan->stream.proc.in.closed) {
    849    // If the backing stream was closed abruptly, there may be write events
    850    // ahead of the terminal close event. Just ignore the writes.
    851    ILOG("write failed: stream is closed");
    852    return;
    853  }
    854  WBuffer *wbuf = wstream_new_buffer(xmemdup(buf, size), size, 1, xfree);
    855  wstream_write(&chan->stream.proc.in, wbuf);
    856 }
    857 
    858 static void term_resize(uint16_t width, uint16_t height, void *data)
    859 {
    860  Channel *chan = data;
    861  pty_proc_resize(&chan->stream.pty, width, height);
    862 }
    863 
    864 static void term_resume(void *data)
    865 {
    866  Channel *chan = data;
    867  pty_proc_resume(&chan->stream.pty);
    868 }
    869 
    870 static inline void term_delayed_free(void **argv)
    871 {
    872  Channel *chan = argv[0];
    873  if (chan->stream.proc.in.pending_reqs || chan->stream.proc.out.s.pending_reqs) {
    874    multiqueue_put(chan->events, term_delayed_free, chan);
    875    return;
    876  }
    877 
    878  if (chan->term) {
    879    terminal_destroy(&chan->term);
    880  }
    881  channel_decref(chan);
    882 }
    883 
    884 static void term_close(void *data)
    885 {
    886  Channel *chan = data;
    887  proc_stop(&chan->stream.proc);
    888  multiqueue_put(chan->events, term_delayed_free, data);
    889 }
    890 
    891 void channel_info_changed(Channel *chan, bool new_chan)
    892 {
    893  event_T event = new_chan ? EVENT_CHANOPEN : EVENT_CHANINFO;
    894  if (has_event(event)) {
    895    channel_incref(chan);
    896    multiqueue_put(main_loop.events, set_info_event, chan, (void *)(intptr_t)event);
    897  }
    898 }
    899 
    900 static void set_info_event(void **argv)
    901 {
    902  Channel *chan = argv[0];
    903  event_T event = (event_T)(ptrdiff_t)argv[1];
    904 
    905  save_v_event_T save_v_event;
    906  dict_T *dict = get_v_event(&save_v_event);
    907  Arena arena = ARENA_EMPTY;
    908  Dict info = channel_info(chan->id, &arena);
    909  typval_T retval;
    910  object_to_vim(DICT_OBJ(info), &retval, NULL);
    911  assert(retval.v_type == VAR_DICT);
    912  tv_dict_add_dict(dict, S_LEN("info"), retval.vval.v_dict);
    913  tv_dict_set_keys_readonly(dict);
    914 
    915  apply_autocmds(event, NULL, NULL, true, curbuf);
    916 
    917  restore_v_event(dict, &save_v_event);
    918  arena_mem_free(arena_finish(&arena));
    919  channel_decref(chan);
    920 }
    921 
    922 /// Unlike terminal_running(), this returns false immediately after stopping a job.
    923 /// However, this always returns false for nvim_open_term() terminals.
    924 bool channel_job_running(uint64_t id)
    925 {
    926  Channel *chan = find_channel(id);
    927  return (chan
    928          && chan->streamtype == kChannelStreamProc
    929          && !proc_is_stopped(&chan->stream.proc));
    930 }
    931 
    932 Dict channel_info(uint64_t id, Arena *arena)
    933 {
    934  Channel *chan = find_channel(id);
    935  if (!chan) {
    936    return (Dict)ARRAY_DICT_INIT;
    937  }
    938 
    939  Dict info = arena_dict(arena, 8);
    940  PUT_C(info, "id", INTEGER_OBJ((Integer)chan->id));
    941 
    942  const char *stream_desc, *mode_desc;
    943  switch (chan->streamtype) {
    944  case kChannelStreamProc: {
    945    stream_desc = "job";
    946    if (chan->stream.proc.type == kProcTypePty) {
    947      const char *name = pty_proc_tty_name(&chan->stream.pty);
    948      PUT_C(info, "pty", CSTR_TO_ARENA_OBJ(arena, name));
    949    }
    950 
    951    char **args = chan->stream.proc.argv;
    952    Array argv = ARRAY_DICT_INIT;
    953    if (args != NULL) {
    954      size_t n;
    955      for (n = 0; args[n] != NULL; n++) {}
    956      argv = arena_array(arena, n);
    957      for (size_t i = 0; i < n; i++) {
    958        ADD_C(argv, CSTR_AS_OBJ(args[i]));
    959      }
    960    }
    961    PUT_C(info, "argv", ARRAY_OBJ(argv));
    962    break;
    963  }
    964 
    965  case kChannelStreamStdio:
    966    stream_desc = "stdio";
    967    break;
    968 
    969  case kChannelStreamStderr:
    970    stream_desc = "stderr";
    971    break;
    972 
    973  case kChannelStreamInternal:
    974    PUT_C(info, "internal", BOOLEAN_OBJ(true));
    975    FALLTHROUGH;
    976 
    977  case kChannelStreamSocket:
    978    stream_desc = "socket";
    979    break;
    980  }
    981  PUT_C(info, "stream", CSTR_AS_OBJ(stream_desc));
    982 
    983  if (chan->is_rpc) {
    984    mode_desc = "rpc";
    985    PUT_C(info, "client", DICT_OBJ(chan->rpc.info));
    986  } else if (chan->term) {
    987    mode_desc = "terminal";
    988    PUT_C(info, "buffer", BUFFER_OBJ(terminal_buf(chan->term)));
    989  } else {
    990    mode_desc = "bytes";
    991  }
    992  PUT_C(info, "mode", CSTR_AS_OBJ(mode_desc));
    993 
    994  return info;
    995 }
    996 
    997 /// Simple int64_t comparison function for use with qsort()
    998 static int int64_t_cmp(const void *pa, const void *pb)
    999 {
   1000  const int64_t a = *(const int64_t *)pa;
   1001  const int64_t b = *(const int64_t *)pb;
   1002  return a == b ? 0 : a > b ? 1 : -1;
   1003 }
   1004 
   1005 Array channel_all_info(Arena *arena)
   1006 {
   1007  // order the items in the array by channel number, for Determinismâ„¢
   1008  kvec_t(int64_t) ids = KV_INITIAL_VALUE;
   1009  kv_fixsize_arena(arena, ids, map_size(&channels));
   1010  uint64_t id;
   1011  map_foreach_key(&channels, id, {
   1012    kv_push(ids, (int64_t)id);
   1013  });
   1014  qsort(ids.items, ids.size, sizeof ids.items[0], int64_t_cmp);
   1015 
   1016  Array ret = arena_array(arena, ids.size);
   1017  for (size_t i = 0; i < ids.size; i++) {
   1018    ADD_C(ret, DICT_OBJ(channel_info((uint64_t)ids.items[i], arena)));
   1019  }
   1020  return ret;
   1021 }