rstream.c (8264B)
1 #include <assert.h> 2 #include <stdbool.h> 3 #include <string.h> 4 #include <uv.h> 5 6 #include "nvim/event/multiqueue.h" 7 #include "nvim/event/rstream.h" 8 #include "nvim/event/stream.h" 9 #include "nvim/log.h" 10 #include "nvim/macros_defs.h" 11 #include "nvim/memory.h" 12 #include "nvim/memory_defs.h" 13 #include "nvim/os/os_defs.h" 14 #include "nvim/types_defs.h" 15 16 #ifndef MSWIN 17 # include <errno.h> 18 19 # include "nvim/fileio.h" 20 #endif 21 22 #include "event/rstream.c.generated.h" 23 24 void rstream_init_fd(Loop *loop, RStream *stream, int fd) 25 FUNC_ATTR_NONNULL_ARG(1, 2) 26 { 27 stream_init(loop, &stream->s, fd, false, NULL); 28 rstream_init(stream); 29 } 30 31 void rstream_init_stream(RStream *stream, uv_stream_t *uvstream) 32 FUNC_ATTR_NONNULL_ARG(1, 2) 33 { 34 stream_init(NULL, &stream->s, -1, false, uvstream); 35 rstream_init(stream); 36 } 37 38 void rstream_init(RStream *stream) 39 FUNC_ATTR_NONNULL_ARG(1) 40 { 41 stream->read_cb = NULL; 42 stream->num_bytes = 0; 43 stream->buffer = alloc_block(); 44 stream->read_pos = stream->write_pos = stream->buffer; 45 stream->s.close_cb = rstream_close_cb; 46 stream->s.close_cb_data = stream; 47 } 48 49 void rstream_start_inner(RStream *stream) 50 FUNC_ATTR_NONNULL_ARG(1) 51 { 52 if (stream->s.uvstream) { 53 uv_read_start(stream->s.uvstream, alloc_cb, read_cb); 54 #ifndef MSWIN 55 } else if (stream->s.use_poll) { 56 uv_poll_start(&stream->s.uv.poll, UV_READABLE, poll_cb); 57 #endif 58 } else { 59 uv_idle_start(&stream->s.uv.idle, fread_idle_cb); 60 } 61 } 62 63 /// Starts watching for events from a `Stream` instance. 64 /// 65 /// @param stream The `Stream` instance 66 void rstream_start(RStream *stream, stream_read_cb cb, void *data) 67 FUNC_ATTR_NONNULL_ARG(1) 68 { 69 stream->read_cb = cb; 70 stream->s.cb_data = data; 71 stream->want_read = true; 72 if (!stream->paused_full) { 73 rstream_start_inner(stream); 74 } 75 } 76 77 /// Stops watching for events from a `Stream` instance. 78 /// 79 /// @param stream The `Stream` instance 80 void rstream_stop_inner(RStream *stream) 81 FUNC_ATTR_NONNULL_ALL 82 { 83 if (stream->s.uvstream) { 84 uv_read_stop(stream->s.uvstream); 85 #ifndef MSWIN 86 } else if (stream->s.use_poll) { 87 uv_poll_stop(&stream->s.uv.poll); 88 #endif 89 } else { 90 uv_idle_stop(&stream->s.uv.idle); 91 } 92 } 93 94 /// Stops watching for events from a `Stream` instance. 95 /// 96 /// @param stream The `Stream` instance 97 void rstream_stop(RStream *stream) 98 FUNC_ATTR_NONNULL_ALL 99 { 100 rstream_stop_inner(stream); 101 stream->want_read = false; 102 } 103 104 // Callbacks used by libuv 105 106 /// Called by libuv to allocate memory for reading. 107 static void alloc_cb(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) 108 { 109 RStream *stream = handle->data; 110 buf->base = stream->write_pos; 111 // `uv_buf_t.len` happens to have different size on Windows (as a treat) 112 buf->len = UV_BUF_LEN(rstream_space(stream)); 113 } 114 115 /// Callback invoked by libuv after it copies the data into the buffer provided 116 /// by `alloc_cb`. This is also called on EOF or when `alloc_cb` returns a 117 /// 0-length buffer. 118 static void read_cb(uv_stream_t *uvstream, ssize_t cnt, const uv_buf_t *buf) 119 { 120 RStream *stream = uvstream->data; 121 122 if (cnt <= 0) { 123 // cnt == 0 means libuv asked for a buffer and decided it wasn't needed: 124 // http://docs.libuv.org/en/latest/stream.html#c.uv_read_start. 125 // 126 // We don't need to do anything with the buffer because the next call 127 // to `alloc_cb` will return the same unused pointer (`rbuffer_produced` 128 // won't be called) 129 if (cnt == UV_ENOBUFS || cnt == 0) { 130 return; 131 } else if (cnt == UV_EOF && uvstream->type == UV_TTY) { 132 // The TTY driver might signal EOF without closing the stream 133 invoke_read_cb(stream, true); 134 } else { 135 DLOG("closing Stream (%p): %s (%s)", (void *)stream, 136 uv_err_name((int)cnt), os_strerror((int)cnt)); 137 // Read error or EOF, either way stop the stream and invoke the callback 138 // with eof == true 139 uv_read_stop(uvstream); 140 invoke_read_cb(stream, true); 141 } 142 return; 143 } 144 145 // at this point we're sure that cnt is positive, no error occurred 146 size_t nread = (size_t)cnt; 147 stream->num_bytes += nread; 148 stream->write_pos += cnt; 149 invoke_read_cb(stream, false); 150 } 151 152 static size_t rstream_space(RStream *stream) 153 { 154 return (size_t)((stream->buffer + ARENA_BLOCK_SIZE) - stream->write_pos); 155 } 156 157 /// Called by the by the 'idle' handle to emulate a reading event 158 /// 159 /// Idle callbacks are invoked once per event loop: 160 /// - to perform some very low priority activity. 161 /// - to keep the loop "alive" (so there is always an event to process) 162 static void fread_idle_cb(uv_idle_t *handle) 163 { 164 uv_fs_t req; 165 RStream *stream = handle->data; 166 167 stream->uvbuf.base = stream->write_pos; 168 // `uv_buf_t.len` happens to have different size on Windows. 169 stream->uvbuf.len = UV_BUF_LEN(rstream_space(stream)); 170 171 // Synchronous read 172 uv_fs_read(handle->loop, &req, stream->s.fd, &stream->uvbuf, 1, stream->s.fpos, NULL); 173 174 uv_fs_req_cleanup(&req); 175 176 if (req.result <= 0) { 177 uv_idle_stop(&stream->s.uv.idle); 178 invoke_read_cb(stream, true); 179 return; 180 } 181 182 // no errors (req.result (ssize_t) is positive), it's safe to use. 183 stream->write_pos += req.result; 184 stream->s.fpos += req.result; 185 invoke_read_cb(stream, false); 186 } 187 188 #ifndef MSWIN 189 static void poll_cb(uv_poll_t *handle, int status, int events) 190 { 191 RStream *stream = handle->data; 192 193 if (status < 0) { 194 ELOG("poll error on Stream (%p) with descriptor %d: %s (%s)", (void *)stream, 195 stream->s.fd, uv_err_name(status), os_strerror(status)); 196 return; 197 } 198 if (!(events & UV_READABLE)) { 199 return; 200 } 201 202 ssize_t cnt = read_eintr(stream->s.fd, stream->write_pos, rstream_space(stream)); 203 204 if (cnt < 0) { 205 if (errno == EAGAIN || errno == EWOULDBLOCK) { 206 return; 207 } 208 DLOG("closing Stream (%p) with descriptor %d: %s", (void *)stream, 209 stream->s.fd, strerror(errno)); 210 } 211 212 if (cnt <= 0) { 213 // Read error or EOF, either way stop the stream and invoke the callback 214 // with eof == true 215 uv_poll_stop(handle); 216 invoke_read_cb(stream, true); 217 return; 218 } 219 220 // at this point we're sure that cnt is positive, no error occurred 221 size_t nread = (size_t)cnt; 222 stream->num_bytes += nread; 223 stream->write_pos += cnt; 224 invoke_read_cb(stream, false); 225 } 226 #endif 227 228 static void read_event(void **argv) 229 { 230 RStream *stream = argv[0]; 231 stream->pending_read = false; 232 if (stream->read_cb) { 233 size_t available = rstream_available(stream); 234 size_t consumed = stream->read_cb(stream, stream->read_pos, available, stream->s.cb_data, 235 stream->did_eof); 236 assert(consumed <= available); 237 rstream_consume(stream, consumed); 238 } 239 stream->s.pending_reqs--; 240 if (stream->s.closed && !stream->s.pending_reqs) { 241 // Last pending read; free the stream. 242 stream_close_handle(&stream->s); 243 } 244 } 245 246 size_t rstream_available(RStream *stream) 247 { 248 return (size_t)(stream->write_pos - stream->read_pos); 249 } 250 251 void rstream_consume(RStream *stream, size_t consumed) 252 { 253 stream->read_pos += consumed; 254 size_t remaining = (size_t)(stream->write_pos - stream->read_pos); 255 if (remaining > 0 && stream->read_pos > stream->buffer) { 256 memmove(stream->buffer, stream->read_pos, remaining); 257 stream->read_pos = stream->buffer; 258 stream->write_pos = stream->buffer + remaining; 259 } else if (remaining == 0) { 260 stream->read_pos = stream->write_pos = stream->buffer; 261 } 262 263 if (stream->want_read && stream->paused_full && rstream_space(stream)) { 264 assert(stream->read_cb); 265 stream->paused_full = false; 266 rstream_start_inner(stream); 267 } 268 } 269 270 static void invoke_read_cb(RStream *stream, bool eof) 271 { 272 stream->did_eof |= eof; 273 274 if (!rstream_space(stream)) { 275 rstream_stop_inner(stream); 276 stream->paused_full = true; 277 } 278 279 // we cannot use pending_reqs as a socket can have both pending reads and writes 280 if (stream->pending_read) { 281 return; 282 } 283 284 // Don't let the stream be closed before the event is processed. 285 stream->s.pending_reqs++; 286 stream->pending_read = true; 287 CREATE_EVENT(stream->s.events, read_event, stream); 288 } 289 290 static void rstream_close_cb(Stream *s, void *data) 291 { 292 RStream *stream = data; 293 assert(stream && s == &stream->s); 294 if (stream->buffer) { 295 free_block(stream->buffer); 296 } 297 } 298 299 void rstream_may_close(RStream *stream) 300 { 301 stream_may_close(&stream->s); 302 }