neovim

Neovim text editor
git clone https://git.dasho.dev/neovim.git
Log | Files | Refs | README

wstream.c (4708B)


      1 #include <assert.h>
      2 #include <stdbool.h>
      3 #include <stddef.h>
      4 #include <uv.h>
      5 
      6 #include "nvim/event/defs.h"
      7 #include "nvim/event/stream.h"
      8 #include "nvim/event/wstream.h"
      9 #include "nvim/macros_defs.h"
     10 #include "nvim/memory.h"
     11 #include "nvim/types_defs.h"
     12 
     13 #define DEFAULT_MAXMEM 1024 * 1024 * 2000
     14 
     15 typedef struct {
     16  Stream *stream;
     17  WBuffer *buffer;
     18  uv_write_t uv_req;
     19 } WRequest;
     20 
     21 #include "event/wstream.c.generated.h"
     22 
     23 void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem)
     24  FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
     25 {
     26  stream_init(loop, stream, fd, false, NULL);
     27  wstream_init(stream, maxmem);
     28 }
     29 
     30 void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem)
     31  FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
     32 {
     33  stream_init(NULL, stream, -1, false, uvstream);
     34  wstream_init(stream, maxmem);
     35 }
     36 
     37 void wstream_init(Stream *stream, size_t maxmem)
     38 {
     39  stream->maxmem = maxmem ? maxmem : DEFAULT_MAXMEM;
     40 }
     41 
     42 /// Sets a callback that will be called on completion of a write request,
     43 /// indicating failure/success.
     44 ///
     45 /// This affects all requests currently in-flight as well. Overwrites any
     46 /// possible earlier callback.
     47 ///
     48 /// @note This callback will not fire if the write request couldn't even be
     49 ///       queued properly (i.e.: when `wstream_write() returns an error`).
     50 ///
     51 /// @param stream The `Stream` instance
     52 /// @param cb The callback
     53 void wstream_set_write_cb(Stream *stream, stream_write_cb cb, void *data)
     54  FUNC_ATTR_NONNULL_ARG(1, 2)
     55 {
     56  stream->write_cb = cb;
     57  stream->cb_data = data;
     58 }
     59 
     60 /// Queues data for writing to the backing file descriptor of a `Stream`
     61 /// instance. This will fail if the write would cause the Stream use more
     62 /// memory than specified by `maxmem`.
     63 ///
     64 /// @param stream The `Stream` instance
     65 /// @param buffer The buffer which contains data to be written
     66 /// @return 0 on success, or libuv error code on failure
     67 int wstream_write(Stream *stream, WBuffer *buffer)
     68  FUNC_ATTR_NONNULL_ALL
     69 {
     70  assert(stream->maxmem);
     71  assert(!stream->use_poll);
     72  // This should not be called after a stream was freed
     73  assert(!stream->closed);
     74 
     75  int err = 0;
     76  uv_buf_t uvbuf;
     77  uvbuf.base = buffer->data;
     78  uvbuf.len = UV_BUF_LEN(buffer->size);
     79 
     80  if (!stream->uvstream) {
     81    uv_fs_t req;
     82 
     83    // Synchronous write
     84    err = uv_fs_write(stream->uv.idle.loop, &req, stream->fd, &uvbuf, 1, stream->fpos, NULL);
     85 
     86    uv_fs_req_cleanup(&req);
     87 
     88    wstream_release_wbuffer(buffer);
     89 
     90    assert(stream->write_cb == NULL);
     91 
     92    stream->fpos += MAX(req.result, 0);
     93    return req.result > 0 ? 0 : err != 0 ? err : UV_UNKNOWN;
     94  }
     95 
     96  if (stream->curmem > stream->maxmem) {
     97    err = UV_ENOMEM;
     98    goto fail;
     99  }
    100 
    101  stream->curmem += buffer->size;
    102 
    103  WRequest *data = xmalloc(sizeof(WRequest));
    104  data->stream = stream;
    105  data->buffer = buffer;
    106  data->uv_req.data = data;
    107 
    108  if ((err = uv_write(&data->uv_req, stream->uvstream, &uvbuf, 1, write_cb)) != 0) {
    109    xfree(data);
    110    goto fail;
    111  }
    112 
    113  stream->pending_reqs++;
    114  assert(err == 0);
    115  return 0;
    116 
    117 fail:
    118  wstream_release_wbuffer(buffer);
    119  assert(err != 0);
    120  return err;
    121 }
    122 
    123 /// Creates a WBuffer object for holding output data. Instances of this
    124 /// object can be reused across Stream instances, and the memory is freed
    125 /// automatically when no longer needed (it tracks the number of references
    126 /// internally)
    127 ///
    128 /// @param data Data stored by the WBuffer
    129 /// @param size The size of the data array
    130 /// @param refcount The number of references for the WBuffer. This will be used
    131 ///        by Stream instances to decide when a WBuffer should be freed.
    132 /// @param cb Pointer to function that will be responsible for freeing
    133 ///        the buffer data (passing `xfree` will work as expected).
    134 /// @return The allocated WBuffer instance
    135 WBuffer *wstream_new_buffer(char *data, size_t size, size_t refcount, wbuffer_data_finalizer cb)
    136  FUNC_ATTR_NONNULL_ARG(1)
    137 {
    138  WBuffer *rv = xmalloc(sizeof(WBuffer));
    139  rv->size = size;
    140  rv->refcount = refcount;
    141  rv->cb = cb;
    142  rv->data = data;
    143 
    144  return rv;
    145 }
    146 
    147 static void write_cb(uv_write_t *req, int status)
    148 {
    149  WRequest *data = req->data;
    150 
    151  data->stream->curmem -= data->buffer->size;
    152 
    153  wstream_release_wbuffer(data->buffer);
    154 
    155  if (data->stream->write_cb) {
    156    data->stream->write_cb(data->stream, data->stream->cb_data, status);
    157  }
    158 
    159  data->stream->pending_reqs--;
    160 
    161  if (data->stream->closed && data->stream->pending_reqs == 0) {
    162    // Last pending write; free the stream.
    163    stream_close_handle(data->stream);
    164  }
    165 
    166  xfree(data);
    167 }
    168 
    169 void wstream_release_wbuffer(WBuffer *buffer)
    170  FUNC_ATTR_NONNULL_ALL
    171 {
    172  if (!--buffer->refcount) {
    173    if (buffer->cb) {
    174      buffer->cb(buffer->data);
    175    }
    176 
    177    xfree(buffer);
    178  }
    179 }