neovim

Neovim text editor
git clone https://git.dasho.dev/neovim.git
Log | Files | Refs | README

rstream.c (8264B)


      1 #include <assert.h>
      2 #include <stdbool.h>
      3 #include <string.h>
      4 #include <uv.h>
      5 
      6 #include "nvim/event/multiqueue.h"
      7 #include "nvim/event/rstream.h"
      8 #include "nvim/event/stream.h"
      9 #include "nvim/log.h"
     10 #include "nvim/macros_defs.h"
     11 #include "nvim/memory.h"
     12 #include "nvim/memory_defs.h"
     13 #include "nvim/os/os_defs.h"
     14 #include "nvim/types_defs.h"
     15 
     16 #ifndef MSWIN
     17 # include <errno.h>
     18 
     19 # include "nvim/fileio.h"
     20 #endif
     21 
     22 #include "event/rstream.c.generated.h"
     23 
     24 void rstream_init_fd(Loop *loop, RStream *stream, int fd)
     25  FUNC_ATTR_NONNULL_ARG(1, 2)
     26 {
     27  stream_init(loop, &stream->s, fd, false, NULL);
     28  rstream_init(stream);
     29 }
     30 
     31 void rstream_init_stream(RStream *stream, uv_stream_t *uvstream)
     32  FUNC_ATTR_NONNULL_ARG(1, 2)
     33 {
     34  stream_init(NULL, &stream->s, -1, false, uvstream);
     35  rstream_init(stream);
     36 }
     37 
     38 void rstream_init(RStream *stream)
     39  FUNC_ATTR_NONNULL_ARG(1)
     40 {
     41  stream->read_cb = NULL;
     42  stream->num_bytes = 0;
     43  stream->buffer = alloc_block();
     44  stream->read_pos = stream->write_pos = stream->buffer;
     45  stream->s.close_cb = rstream_close_cb;
     46  stream->s.close_cb_data = stream;
     47 }
     48 
     49 void rstream_start_inner(RStream *stream)
     50  FUNC_ATTR_NONNULL_ARG(1)
     51 {
     52  if (stream->s.uvstream) {
     53    uv_read_start(stream->s.uvstream, alloc_cb, read_cb);
     54 #ifndef MSWIN
     55  } else if (stream->s.use_poll) {
     56    uv_poll_start(&stream->s.uv.poll, UV_READABLE, poll_cb);
     57 #endif
     58  } else {
     59    uv_idle_start(&stream->s.uv.idle, fread_idle_cb);
     60  }
     61 }
     62 
     63 /// Starts watching for events from a `Stream` instance.
     64 ///
     65 /// @param stream The `Stream` instance
     66 void rstream_start(RStream *stream, stream_read_cb cb, void *data)
     67  FUNC_ATTR_NONNULL_ARG(1)
     68 {
     69  stream->read_cb = cb;
     70  stream->s.cb_data = data;
     71  stream->want_read = true;
     72  if (!stream->paused_full) {
     73    rstream_start_inner(stream);
     74  }
     75 }
     76 
     77 /// Stops watching for events from a `Stream` instance.
     78 ///
     79 /// @param stream The `Stream` instance
     80 void rstream_stop_inner(RStream *stream)
     81  FUNC_ATTR_NONNULL_ALL
     82 {
     83  if (stream->s.uvstream) {
     84    uv_read_stop(stream->s.uvstream);
     85 #ifndef MSWIN
     86  } else if (stream->s.use_poll) {
     87    uv_poll_stop(&stream->s.uv.poll);
     88 #endif
     89  } else {
     90    uv_idle_stop(&stream->s.uv.idle);
     91  }
     92 }
     93 
     94 /// Stops watching for events from a `Stream` instance.
     95 ///
     96 /// @param stream The `Stream` instance
     97 void rstream_stop(RStream *stream)
     98  FUNC_ATTR_NONNULL_ALL
     99 {
    100  rstream_stop_inner(stream);
    101  stream->want_read = false;
    102 }
    103 
    104 // Callbacks used by libuv
    105 
    106 /// Called by libuv to allocate memory for reading.
    107 static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf)
    108 {
    109  RStream *stream = handle->data;
    110  buf->base = stream->write_pos;
    111  // `uv_buf_t.len` happens to have different size on Windows (as a treat)
    112  buf->len = UV_BUF_LEN(rstream_space(stream));
    113 }
    114 
    115 /// Callback invoked by libuv after it copies the data into the buffer provided
    116 /// by `alloc_cb`. This is also called on EOF or when `alloc_cb` returns a
    117 /// 0-length buffer.
    118 static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf)
    119 {
    120  RStream *stream = uvstream->data;
    121 
    122  if (cnt <= 0) {
    123    // cnt == 0 means libuv asked for a buffer and decided it wasn't needed:
    124    // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start.
    125    //
    126    // We don't need to do anything with the buffer because the next call
    127    // to `alloc_cb` will return the same unused pointer (`rbuffer_produced`
    128    // won't be called)
    129    if (cnt == UV_ENOBUFS || cnt == 0) {
    130      return;
    131    } else if (cnt == UV_EOF && uvstream->type == UV_TTY) {
    132      // The TTY driver might signal EOF without closing the stream
    133      invoke_read_cb(stream, true);
    134    } else {
    135      DLOG("closing Stream (%p): %s (%s)", (void *)stream,
    136           uv_err_name((int)cnt), os_strerror((int)cnt));
    137      // Read error or EOF, either way stop the stream and invoke the callback
    138      // with eof == true
    139      uv_read_stop(uvstream);
    140      invoke_read_cb(stream, true);
    141    }
    142    return;
    143  }
    144 
    145  // at this point we're sure that cnt is positive, no error occurred
    146  size_t nread = (size_t)cnt;
    147  stream->num_bytes += nread;
    148  stream->write_pos += cnt;
    149  invoke_read_cb(stream, false);
    150 }
    151 
    152 static size_t rstream_space(RStream *stream)
    153 {
    154  return (size_t)((stream->buffer + ARENA_BLOCK_SIZE) - stream->write_pos);
    155 }
    156 
    157 /// Called by the by the 'idle' handle to emulate a reading event
    158 ///
    159 /// Idle callbacks are invoked once per event loop:
    160 ///  - to perform some very low priority activity.
    161 ///  - to keep the loop "alive" (so there is always an event to process)
    162 static void fread_idle_cb(uv_idle_t *handle)
    163 {
    164  uv_fs_t req;
    165  RStream *stream = handle->data;
    166 
    167  stream->uvbuf.base = stream->write_pos;
    168  // `uv_buf_t.len` happens to have different size on Windows.
    169  stream->uvbuf.len = UV_BUF_LEN(rstream_space(stream));
    170 
    171  // Synchronous read
    172  uv_fs_read(handle->loop, &req, stream->s.fd, &stream->uvbuf, 1, stream->s.fpos, NULL);
    173 
    174  uv_fs_req_cleanup(&req);
    175 
    176  if (req.result <= 0) {
    177    uv_idle_stop(&stream->s.uv.idle);
    178    invoke_read_cb(stream, true);
    179    return;
    180  }
    181 
    182  // no errors (req.result (ssize_t) is positive), it's safe to use.
    183  stream->write_pos += req.result;
    184  stream->s.fpos += req.result;
    185  invoke_read_cb(stream, false);
    186 }
    187 
    188 #ifndef MSWIN
    189 static void poll_cb(uv_poll_t *handle, int status, int events)
    190 {
    191  RStream *stream = handle->data;
    192 
    193  if (status < 0) {
    194    ELOG("poll error on Stream (%p) with descriptor %d: %s (%s)", (void *)stream,
    195         stream->s.fd, uv_err_name(status), os_strerror(status));
    196    return;
    197  }
    198  if (!(events & UV_READABLE)) {
    199    return;
    200  }
    201 
    202  ssize_t cnt = read_eintr(stream->s.fd, stream->write_pos, rstream_space(stream));
    203 
    204  if (cnt < 0) {
    205    if (errno == EAGAIN || errno == EWOULDBLOCK) {
    206      return;
    207    }
    208    DLOG("closing Stream (%p) with descriptor %d: %s", (void *)stream,
    209         stream->s.fd, strerror(errno));
    210  }
    211 
    212  if (cnt <= 0) {
    213    // Read error or EOF, either way stop the stream and invoke the callback
    214    // with eof == true
    215    uv_poll_stop(handle);
    216    invoke_read_cb(stream, true);
    217    return;
    218  }
    219 
    220  // at this point we're sure that cnt is positive, no error occurred
    221  size_t nread = (size_t)cnt;
    222  stream->num_bytes += nread;
    223  stream->write_pos += cnt;
    224  invoke_read_cb(stream, false);
    225 }
    226 #endif
    227 
    228 static void read_event(void **argv)
    229 {
    230  RStream *stream = argv[0];
    231  stream->pending_read = false;
    232  if (stream->read_cb) {
    233    size_t available = rstream_available(stream);
    234    size_t consumed = stream->read_cb(stream, stream->read_pos, available, stream->s.cb_data,
    235                                      stream->did_eof);
    236    assert(consumed <= available);
    237    rstream_consume(stream, consumed);
    238  }
    239  stream->s.pending_reqs--;
    240  if (stream->s.closed && !stream->s.pending_reqs) {
    241    // Last pending read; free the stream.
    242    stream_close_handle(&stream->s);
    243  }
    244 }
    245 
    246 size_t rstream_available(RStream *stream)
    247 {
    248  return (size_t)(stream->write_pos - stream->read_pos);
    249 }
    250 
    251 void rstream_consume(RStream *stream, size_t consumed)
    252 {
    253  stream->read_pos += consumed;
    254  size_t remaining = (size_t)(stream->write_pos - stream->read_pos);
    255  if (remaining > 0 && stream->read_pos > stream->buffer) {
    256    memmove(stream->buffer, stream->read_pos, remaining);
    257    stream->read_pos = stream->buffer;
    258    stream->write_pos = stream->buffer + remaining;
    259  } else if (remaining == 0) {
    260    stream->read_pos = stream->write_pos = stream->buffer;
    261  }
    262 
    263  if (stream->want_read && stream->paused_full && rstream_space(stream)) {
    264    assert(stream->read_cb);
    265    stream->paused_full = false;
    266    rstream_start_inner(stream);
    267  }
    268 }
    269 
    270 static void invoke_read_cb(RStream *stream, bool eof)
    271 {
    272  stream->did_eof |= eof;
    273 
    274  if (!rstream_space(stream)) {
    275    rstream_stop_inner(stream);
    276    stream->paused_full = true;
    277  }
    278 
    279  // we cannot use pending_reqs as a socket can have both pending reads and writes
    280  if (stream->pending_read) {
    281    return;
    282  }
    283 
    284  // Don't let the stream be closed before the event is processed.
    285  stream->s.pending_reqs++;
    286  stream->pending_read = true;
    287  CREATE_EVENT(stream->s.events, read_event, stream);
    288 }
    289 
    290 static void rstream_close_cb(Stream *s, void *data)
    291 {
    292  RStream *stream = data;
    293  assert(stream && s == &stream->s);
    294  if (stream->buffer) {
    295    free_block(stream->buffer);
    296  }
    297 }
    298 
    299 void rstream_may_close(RStream *stream)
    300 {
    301  stream_may_close(&stream->s);
    302 }