stream.c (4655B)
1 #include <assert.h> 2 #include <stdbool.h> 3 #include <stddef.h> 4 #include <uv.h> 5 #include <uv/version.h> 6 7 #include "nvim/event/defs.h" 8 #include "nvim/event/loop.h" 9 #include "nvim/event/stream.h" 10 #include "nvim/log.h" 11 #include "nvim/memory.h" 12 #include "nvim/types_defs.h" 13 #ifdef MSWIN 14 # include "nvim/os/os_win_console.h" 15 #endif 16 17 #include "event/stream.c.generated.h" 18 19 // For compatibility with libuv < 1.19.0 (tested on 1.18.0) 20 #if UV_VERSION_MINOR < 19 21 # define uv_stream_get_write_queue_size(stream) stream->write_queue_size 22 #endif 23 24 /// Sets the stream associated with `fd` to "blocking" mode. 25 /// 26 /// @return `0` on success, or libuv error code on failure. 27 int stream_set_blocking(int fd, bool blocking) 28 { 29 // Private loop to avoid conflict with existing watcher(s): 30 // uv__io_stop: Assertion `loop->watchers[w->fd] == w' failed. 31 uv_loop_t loop; 32 uv_pipe_t stream; 33 uv_loop_init(&loop); 34 uv_pipe_init(&loop, &stream, 0); 35 uv_pipe_open(&stream, fd); 36 int retval = uv_stream_set_blocking((uv_stream_t *)&stream, blocking); 37 uv_close((uv_handle_t *)&stream, NULL); 38 uv_run(&loop, UV_RUN_NOWAIT); // not necessary, but couldn't hurt. 39 uv_loop_close(&loop); 40 return retval; 41 } 42 43 void stream_init(Loop *loop, Stream *stream, int fd, bool poll, uv_stream_t *uvstream) 44 FUNC_ATTR_NONNULL_ARG(2) 45 { 46 // The underlying stream is either a file or an existing uv stream. 47 assert(uvstream == NULL ? fd >= 0 && loop != NULL : fd < 0 && loop == NULL && !poll); 48 #ifdef MSWIN 49 assert(!poll); 50 #endif 51 stream->uvstream = uvstream; 52 53 if (fd >= 0) { 54 uv_handle_type type = uv_guess_handle(fd); 55 stream->fd = fd; 56 57 if (type == UV_FILE) { 58 assert(!poll); 59 // Non-blocking file reads are simulated with an idle handle that reads in 60 // chunks of the ring buffer size, giving time for other events to be 61 // processed between reads. 62 uv_idle_init(&loop->uv, &stream->uv.idle); 63 stream->uv.idle.data = stream; 64 #ifdef MSWIN 65 } else if (type == UV_TTY) { 66 uv_tty_init(&loop->uv, &stream->uv.tty, fd, 0); 67 uv_tty_set_mode(&stream->uv.tty, UV_TTY_MODE_RAW); 68 DWORD dwMode; 69 if (GetConsoleMode(stream->uv.tty.handle, &dwMode)) { 70 dwMode |= ENABLE_VIRTUAL_TERMINAL_INPUT; 71 SetConsoleMode(stream->uv.tty.handle, dwMode); 72 } 73 stream->uvstream = (uv_stream_t *)&stream->uv.tty; 74 #else 75 } else if (poll) { 76 uv_poll_init(&loop->uv, &stream->uv.poll, fd); 77 stream->uv.poll.data = stream; 78 stream->use_poll = true; 79 #endif 80 } else { 81 assert(type == UV_NAMED_PIPE || type == UV_TTY); 82 uv_pipe_init(&loop->uv, &stream->uv.pipe, 0); 83 uv_pipe_open(&stream->uv.pipe, fd); 84 stream->uvstream = (uv_stream_t *)&stream->uv.pipe; 85 } 86 } 87 88 if (stream->uvstream) { 89 stream->uvstream->data = stream; 90 } 91 92 stream->fpos = 0; 93 stream->internal_data = NULL; 94 stream->curmem = 0; 95 stream->maxmem = 0; 96 stream->pending_reqs = 0; 97 stream->write_cb = NULL; 98 stream->close_cb = NULL; 99 stream->internal_close_cb = NULL; 100 stream->closed = false; 101 stream->events = NULL; 102 } 103 104 void stream_may_close(Stream *stream) 105 FUNC_ATTR_NONNULL_ARG(1) 106 { 107 if (stream->closed) { 108 return; 109 } 110 DLOG("closing Stream: %p", (void *)stream); 111 stream->closed = true; 112 113 #ifdef MSWIN 114 if (UV_TTY == uv_guess_handle(stream->fd)) { 115 // Undo UV_TTY_MODE_RAW from stream_init(). #10801 116 uv_tty_set_mode(&stream->uv.tty, UV_TTY_MODE_NORMAL); 117 } 118 #endif 119 120 if (!stream->pending_reqs) { 121 stream_close_handle(stream); 122 } // Else: rstream.c:read_event() or wstream.c:write_cb() will call stream_close_handle(). 123 } 124 125 void stream_close_handle(Stream *stream) 126 FUNC_ATTR_NONNULL_ALL 127 { 128 uv_handle_t *handle = NULL; 129 if (stream->uvstream) { 130 if (uv_stream_get_write_queue_size(stream->uvstream) > 0) { 131 WLOG("closed Stream (%p) with %zu unwritten bytes", 132 (void *)stream, 133 uv_stream_get_write_queue_size(stream->uvstream)); 134 } 135 handle = (uv_handle_t *)stream->uvstream; 136 } else { 137 // All members of the stream->uv union share the same address. 138 handle = (uv_handle_t *)&stream->uv; 139 } 140 141 assert(handle != NULL); 142 143 if (!uv_is_closing(handle)) { 144 uv_close(handle, close_cb); 145 } 146 } 147 148 static void close_cb(uv_handle_t *handle) 149 { 150 Stream *stream = handle->data; 151 // Need to check if handle->data is NULL here as this callback may be called between 152 // the handle's initialization and stream_init() (e.g. in socket_connect()). 153 if (stream && stream->close_cb) { 154 stream->close_cb(stream, stream->close_cb_data); 155 } 156 if (stream && stream->internal_close_cb) { 157 stream->internal_close_cb(stream, stream->internal_data); 158 } 159 }