neovim

Neovim text editor
git clone https://git.dasho.dev/neovim.git
Log | Files | Refs | README

stream.c (4655B)


      1 #include <assert.h>
      2 #include <stdbool.h>
      3 #include <stddef.h>
      4 #include <uv.h>
      5 #include <uv/version.h>
      6 
      7 #include "nvim/event/defs.h"
      8 #include "nvim/event/loop.h"
      9 #include "nvim/event/stream.h"
     10 #include "nvim/log.h"
     11 #include "nvim/memory.h"
     12 #include "nvim/types_defs.h"
     13 #ifdef MSWIN
     14 # include "nvim/os/os_win_console.h"
     15 #endif
     16 
     17 #include "event/stream.c.generated.h"
     18 
     19 // For compatibility with libuv < 1.19.0 (tested on 1.18.0)
     20 #if UV_VERSION_MINOR < 19
     21 # define uv_stream_get_write_queue_size(stream) stream->write_queue_size
     22 #endif
     23 
     24 /// Sets the stream associated with `fd` to "blocking" mode.
     25 ///
     26 /// @return `0` on success, or libuv error code on failure.
     27 int stream_set_blocking(int fd, bool blocking)
     28 {
     29  // Private loop to avoid conflict with existing watcher(s):
     30  //    uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed.
     31  uv_loop_t loop;
     32  uv_pipe_t stream;
     33  uv_loop_init(&loop);
     34  uv_pipe_init(&loop, &stream, 0);
     35  uv_pipe_open(&stream, fd);
     36  int retval = uv_stream_set_blocking((uv_stream_t *)&stream, blocking);
     37  uv_close((uv_handle_t *)&stream, NULL);
     38  uv_run(&loop, UV_RUN_NOWAIT);  // not necessary, but couldn't hurt.
     39  uv_loop_close(&loop);
     40  return retval;
     41 }
     42 
     43 void stream_init(Loop *loop, Stream *stream, int fd, bool poll, uv_stream_t *uvstream)
     44  FUNC_ATTR_NONNULL_ARG(2)
     45 {
     46  // The underlying stream is either a file or an existing uv stream.
     47  assert(uvstream == NULL ? fd >= 0 && loop != NULL : fd < 0 && loop == NULL && !poll);
     48 #ifdef MSWIN
     49  assert(!poll);
     50 #endif
     51  stream->uvstream = uvstream;
     52 
     53  if (fd >= 0) {
     54    uv_handle_type type = uv_guess_handle(fd);
     55    stream->fd = fd;
     56 
     57    if (type == UV_FILE) {
     58      assert(!poll);
     59      // Non-blocking file reads are simulated with an idle handle that reads in
     60      // chunks of the ring buffer size, giving time for other events to be
     61      // processed between reads.
     62      uv_idle_init(&loop->uv, &stream->uv.idle);
     63      stream->uv.idle.data = stream;
     64 #ifdef MSWIN
     65    } else if (type == UV_TTY) {
     66      uv_tty_init(&loop->uv, &stream->uv.tty, fd, 0);
     67      uv_tty_set_mode(&stream->uv.tty, UV_TTY_MODE_RAW);
     68      DWORD dwMode;
     69      if (GetConsoleMode(stream->uv.tty.handle, &dwMode)) {
     70        dwMode |= ENABLE_VIRTUAL_TERMINAL_INPUT;
     71        SetConsoleMode(stream->uv.tty.handle, dwMode);
     72      }
     73      stream->uvstream = (uv_stream_t *)&stream->uv.tty;
     74 #else
     75    } else if (poll) {
     76      uv_poll_init(&loop->uv, &stream->uv.poll, fd);
     77      stream->uv.poll.data = stream;
     78      stream->use_poll = true;
     79 #endif
     80    } else {
     81      assert(type == UV_NAMED_PIPE || type == UV_TTY);
     82      uv_pipe_init(&loop->uv, &stream->uv.pipe, 0);
     83      uv_pipe_open(&stream->uv.pipe, fd);
     84      stream->uvstream = (uv_stream_t *)&stream->uv.pipe;
     85    }
     86  }
     87 
     88  if (stream->uvstream) {
     89    stream->uvstream->data = stream;
     90  }
     91 
     92  stream->fpos = 0;
     93  stream->internal_data = NULL;
     94  stream->curmem = 0;
     95  stream->maxmem = 0;
     96  stream->pending_reqs = 0;
     97  stream->write_cb = NULL;
     98  stream->close_cb = NULL;
     99  stream->internal_close_cb = NULL;
    100  stream->closed = false;
    101  stream->events = NULL;
    102 }
    103 
    104 void stream_may_close(Stream *stream)
    105  FUNC_ATTR_NONNULL_ARG(1)
    106 {
    107  if (stream->closed) {
    108    return;
    109  }
    110  DLOG("closing Stream: %p", (void *)stream);
    111  stream->closed = true;
    112 
    113 #ifdef MSWIN
    114  if (UV_TTY == uv_guess_handle(stream->fd)) {
    115    // Undo UV_TTY_MODE_RAW from stream_init(). #10801
    116    uv_tty_set_mode(&stream->uv.tty, UV_TTY_MODE_NORMAL);
    117  }
    118 #endif
    119 
    120  if (!stream->pending_reqs) {
    121    stream_close_handle(stream);
    122  }  // Else: rstream.c:read_event() or wstream.c:write_cb() will call stream_close_handle().
    123 }
    124 
    125 void stream_close_handle(Stream *stream)
    126  FUNC_ATTR_NONNULL_ALL
    127 {
    128  uv_handle_t *handle = NULL;
    129  if (stream->uvstream) {
    130    if (uv_stream_get_write_queue_size(stream->uvstream) > 0) {
    131      WLOG("closed Stream (%p) with %zu unwritten bytes",
    132           (void *)stream,
    133           uv_stream_get_write_queue_size(stream->uvstream));
    134    }
    135    handle = (uv_handle_t *)stream->uvstream;
    136  } else {
    137    // All members of the stream->uv union share the same address.
    138    handle = (uv_handle_t *)&stream->uv;
    139  }
    140 
    141  assert(handle != NULL);
    142 
    143  if (!uv_is_closing(handle)) {
    144    uv_close(handle, close_cb);
    145  }
    146 }
    147 
    148 static void close_cb(uv_handle_t *handle)
    149 {
    150  Stream *stream = handle->data;
    151  // Need to check if handle->data is NULL here as this callback may be called between
    152  // the handle's initialization and stream_init() (e.g. in socket_connect()).
    153  if (stream && stream->close_cb) {
    154    stream->close_cb(stream, stream->close_cb_data);
    155  }
    156  if (stream && stream->internal_close_cb) {
    157    stream->internal_close_cb(stream, stream->internal_data);
    158  }
    159 }