wstream.c (4708B)
1 #include <assert.h> 2 #include <stdbool.h> 3 #include <stddef.h> 4 #include <uv.h> 5 6 #include "nvim/event/defs.h" 7 #include "nvim/event/stream.h" 8 #include "nvim/event/wstream.h" 9 #include "nvim/macros_defs.h" 10 #include "nvim/memory.h" 11 #include "nvim/types_defs.h" 12 13 #define DEFAULT_MAXMEM 1024 * 1024 * 2000 14 15 typedef struct { 16 Stream *stream; 17 WBuffer *buffer; 18 uv_write_t uv_req; 19 } WRequest; 20 21 #include "event/wstream.c.generated.h" 22 23 void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem) 24 FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) 25 { 26 stream_init(loop, stream, fd, false, NULL); 27 wstream_init(stream, maxmem); 28 } 29 30 void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem) 31 FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2) 32 { 33 stream_init(NULL, stream, -1, false, uvstream); 34 wstream_init(stream, maxmem); 35 } 36 37 void wstream_init(Stream *stream, size_t maxmem) 38 { 39 stream->maxmem = maxmem ? maxmem : DEFAULT_MAXMEM; 40 } 41 42 /// Sets a callback that will be called on completion of a write request, 43 /// indicating failure/success. 44 /// 45 /// This affects all requests currently in-flight as well. Overwrites any 46 /// possible earlier callback. 47 /// 48 /// @note This callback will not fire if the write request couldn't even be 49 /// queued properly (i.e.: when `wstream_write() returns an error`). 50 /// 51 /// @param stream The `Stream` instance 52 /// @param cb The callback 53 void wstream_set_write_cb(Stream *stream, stream_write_cb cb, void *data) 54 FUNC_ATTR_NONNULL_ARG(1, 2) 55 { 56 stream->write_cb = cb; 57 stream->cb_data = data; 58 } 59 60 /// Queues data for writing to the backing file descriptor of a `Stream` 61 /// instance. This will fail if the write would cause the Stream use more 62 /// memory than specified by `maxmem`. 63 /// 64 /// @param stream The `Stream` instance 65 /// @param buffer The buffer which contains data to be written 66 /// @return 0 on success, or libuv error code on failure 67 int wstream_write(Stream *stream, WBuffer *buffer) 68 FUNC_ATTR_NONNULL_ALL 69 { 70 assert(stream->maxmem); 71 assert(!stream->use_poll); 72 // This should not be called after a stream was freed 73 assert(!stream->closed); 74 75 int err = 0; 76 uv_buf_t uvbuf; 77 uvbuf.base = buffer->data; 78 uvbuf.len = UV_BUF_LEN(buffer->size); 79 80 if (!stream->uvstream) { 81 uv_fs_t req; 82 83 // Synchronous write 84 err = uv_fs_write(stream->uv.idle.loop, &req, stream->fd, &uvbuf, 1, stream->fpos, NULL); 85 86 uv_fs_req_cleanup(&req); 87 88 wstream_release_wbuffer(buffer); 89 90 assert(stream->write_cb == NULL); 91 92 stream->fpos += MAX(req.result, 0); 93 return req.result > 0 ? 0 : err != 0 ? err : UV_UNKNOWN; 94 } 95 96 if (stream->curmem > stream->maxmem) { 97 err = UV_ENOMEM; 98 goto fail; 99 } 100 101 stream->curmem += buffer->size; 102 103 WRequest *data = xmalloc(sizeof(WRequest)); 104 data->stream = stream; 105 data->buffer = buffer; 106 data->uv_req.data = data; 107 108 if ((err = uv_write(&data->uv_req, stream->uvstream, &uvbuf, 1, write_cb)) != 0) { 109 xfree(data); 110 goto fail; 111 } 112 113 stream->pending_reqs++; 114 assert(err == 0); 115 return 0; 116 117 fail: 118 wstream_release_wbuffer(buffer); 119 assert(err != 0); 120 return err; 121 } 122 123 /// Creates a WBuffer object for holding output data. Instances of this 124 /// object can be reused across Stream instances, and the memory is freed 125 /// automatically when no longer needed (it tracks the number of references 126 /// internally) 127 /// 128 /// @param data Data stored by the WBuffer 129 /// @param size The size of the data array 130 /// @param refcount The number of references for the WBuffer. This will be used 131 /// by Stream instances to decide when a WBuffer should be freed. 132 /// @param cb Pointer to function that will be responsible for freeing 133 /// the buffer data (passing `xfree` will work as expected). 134 /// @return The allocated WBuffer instance 135 WBuffer *wstream_new_buffer(char *data, size_t size, size_t refcount, wbuffer_data_finalizer cb) 136 FUNC_ATTR_NONNULL_ARG(1) 137 { 138 WBuffer *rv = xmalloc(sizeof(WBuffer)); 139 rv->size = size; 140 rv->refcount = refcount; 141 rv->cb = cb; 142 rv->data = data; 143 144 return rv; 145 } 146 147 static void write_cb(uv_write_t *req, int status) 148 { 149 WRequest *data = req->data; 150 151 data->stream->curmem -= data->buffer->size; 152 153 wstream_release_wbuffer(data->buffer); 154 155 if (data->stream->write_cb) { 156 data->stream->write_cb(data->stream, data->stream->cb_data, status); 157 } 158 159 data->stream->pending_reqs--; 160 161 if (data->stream->closed && data->stream->pending_reqs == 0) { 162 // Last pending write; free the stream. 163 stream_close_handle(data->stream); 164 } 165 166 xfree(data); 167 } 168 169 void wstream_release_wbuffer(WBuffer *buffer) 170 FUNC_ATTR_NONNULL_ALL 171 { 172 if (!--buffer->refcount) { 173 if (buffer->cb) { 174 buffer->cb(buffer->data); 175 } 176 177 xfree(buffer); 178 } 179 }