commit 4451adbf84c311905f91fad8b88e9636dae860d2
parent 896968cad1ea853521ea71a9fef854337a21843c
Author: zeertzjq <zeertzjq@outlook.com>
Date: Thu, 15 Jan 2026 09:57:12 +0800
Merge pull request #37325 from zeertzjq/term-poll
fix(process): don't limit PTY master remaining data size
Diffstat:
9 files changed, 148 insertions(+), 34 deletions(-)
diff --git a/src/nvim/event/defs.h b/src/nvim/event/defs.h
@@ -78,12 +78,15 @@ typedef void (*stream_close_cb)(Stream *stream, void *data);
struct stream {
bool closed;
+ bool use_poll;
union {
uv_pipe_t pipe;
uv_tcp_t tcp;
uv_idle_t idle;
#ifdef MSWIN
uv_tty_t tty;
+#else
+ uv_poll_t poll;
#endif
} uv;
uv_stream_t *uvstream; ///< NULL when the stream is a file
diff --git a/src/nvim/event/proc.c b/src/nvim/event/proc.c
@@ -46,6 +46,16 @@ int proc_spawn(Proc *proc, bool in, bool out, bool err)
// forwarding stderr contradicts with processing it internally
assert(!(err && proc->fwd_err));
+#ifdef MSWIN
+ const bool out_use_poll = false;
+#else
+ // Using uv_pipe_t to read from PTY master may drop data if the PTY process exits
+ // immediately after output, as libuv treats a partial read after POLLHUP as EOF,
+ // which isn't true for PTY master on Linux. Therefore use uv_poll_t instead. #3030
+ // Ref: https://github.com/libuv/libuv/issues/4992
+ const bool out_use_poll = proc->type == kProcTypePty;
+#endif
+
if (in) {
uv_pipe_init(&proc->loop->uv, &proc->in.uv.pipe, 0);
} else {
@@ -53,7 +63,9 @@ int proc_spawn(Proc *proc, bool in, bool out, bool err)
}
if (out) {
- uv_pipe_init(&proc->loop->uv, &proc->out.s.uv.pipe, 0);
+ if (!out_use_poll) {
+ uv_pipe_init(&proc->loop->uv, &proc->out.s.uv.pipe, 0);
+ }
} else {
proc->out.s.closed = true;
}
@@ -83,7 +95,7 @@ int proc_spawn(Proc *proc, bool in, bool out, bool err)
if (in) {
uv_close((uv_handle_t *)&proc->in.uv.pipe, NULL);
}
- if (out) {
+ if (out && !out_use_poll) {
uv_close((uv_handle_t *)&proc->out.s.uv.pipe, NULL);
}
if (err) {
@@ -101,21 +113,29 @@ int proc_spawn(Proc *proc, bool in, bool out, bool err)
}
if (in) {
- stream_init(NULL, &proc->in, -1, (uv_stream_t *)&proc->in.uv.pipe);
+ stream_init(NULL, &proc->in, -1, false, (uv_stream_t *)&proc->in.uv.pipe);
proc->in.internal_data = proc;
proc->in.internal_close_cb = on_proc_stream_close;
proc->refcount++;
}
if (out) {
- stream_init(NULL, &proc->out.s, -1, (uv_stream_t *)&proc->out.s.uv.pipe);
+ if (out_use_poll) {
+#ifdef MSWIN
+ abort();
+#else
+ stream_init(proc->loop, &proc->out.s, ((PtyProc *)proc)->tty_fd, true, NULL);
+#endif
+ } else {
+ stream_init(NULL, &proc->out.s, -1, false, (uv_stream_t *)&proc->out.s.uv.pipe);
+ }
proc->out.s.internal_data = proc;
proc->out.s.internal_close_cb = on_proc_stream_close;
proc->refcount++;
}
if (err) {
- stream_init(NULL, &proc->err.s, -1, (uv_stream_t *)&proc->err.s.uv.pipe);
+ stream_init(NULL, &proc->err.s, -1, false, (uv_stream_t *)&proc->err.s.uv.pipe);
proc->err.s.internal_data = proc;
proc->err.s.internal_close_cb = on_proc_stream_close;
proc->refcount++;
@@ -348,20 +368,27 @@ static void flush_stream(Proc *proc, RStream *stream)
return;
}
- // Maximal remaining data size of terminated process is system
- // buffer size.
- // Also helps with a child process that keeps the output streams open. If it
- // keeps sending data, we only accept as much data as the system buffer size.
- // Otherwise this would block cleanup/teardown.
- int system_buffer_size = 0;
- int err = uv_recv_buffer_size((uv_handle_t *)&stream->s.uv.pipe,
- &system_buffer_size);
- if (err) {
- system_buffer_size = ARENA_BLOCK_SIZE;
+ size_t max_bytes = SIZE_MAX;
+#ifdef MSWIN
+ if (true) {
+#else
+ // Don't limit remaining data size of PTY master unless when tearing down, as it may
+ // have more remaining data than system buffer size (at least on Linux). #3030
+ if (proc->type != kProcTypePty || proc_is_tearing_down) {
+#endif
+ // Maximal remaining data size of terminated process is system buffer size.
+ // Also helps with a child process that keeps the output streams open. If it
+ // keeps sending data, we only accept as much data as the system buffer size.
+ // Otherwise this would block cleanup/teardown.
+ int system_buffer_size = 0;
+ // All members of the stream->s.uv union share the same address.
+ int err = uv_recv_buffer_size((uv_handle_t *)&stream->s.uv, &system_buffer_size);
+ if (err != 0) {
+ system_buffer_size = ARENA_BLOCK_SIZE;
+ }
+ max_bytes = stream->num_bytes + (size_t)system_buffer_size;
}
- size_t max_bytes = stream->num_bytes + (size_t)system_buffer_size;
-
// Read remaining data.
while (!stream->s.closed && stream->num_bytes < max_bytes) {
// Remember number of bytes before polling
diff --git a/src/nvim/event/rstream.c b/src/nvim/event/rstream.c
@@ -13,19 +13,25 @@
#include "nvim/os/os_defs.h"
#include "nvim/types_defs.h"
+#ifndef MSWIN
+# include <errno.h>
+
+# include "nvim/fileio.h"
+#endif
+
#include "event/rstream.c.generated.h"
void rstream_init_fd(Loop *loop, RStream *stream, int fd)
FUNC_ATTR_NONNULL_ARG(1, 2)
{
- stream_init(loop, &stream->s, fd, NULL);
+ stream_init(loop, &stream->s, fd, false, NULL);
rstream_init(stream);
}
void rstream_init_stream(RStream *stream, uv_stream_t *uvstream)
FUNC_ATTR_NONNULL_ARG(1, 2)
{
- stream_init(NULL, &stream->s, -1, uvstream);
+ stream_init(NULL, &stream->s, -1, false, uvstream);
rstream_init(stream);
}
@@ -45,6 +51,10 @@ void rstream_start_inner(RStream *stream)
{
if (stream->s.uvstream) {
uv_read_start(stream->s.uvstream, alloc_cb, read_cb);
+#ifndef MSWIN
+ } else if (stream->s.use_poll) {
+ uv_poll_start(&stream->s.uv.poll, UV_READABLE, poll_cb);
+#endif
} else {
uv_idle_start(&stream->s.uv.idle, fread_idle_cb);
}
@@ -72,6 +82,10 @@ void rstream_stop_inner(RStream *stream)
{
if (stream->s.uvstream) {
uv_read_stop(stream->s.uvstream);
+#ifndef MSWIN
+ } else if (stream->s.use_poll) {
+ uv_poll_stop(&stream->s.uv.poll);
+#endif
} else {
uv_idle_stop(&stream->s.uv.idle);
}
@@ -171,6 +185,46 @@ static void fread_idle_cb(uv_idle_t *handle)
invoke_read_cb(stream, false);
}
+#ifndef MSWIN
+static void poll_cb(uv_poll_t *handle, int status, int events)
+{
+ RStream *stream = handle->data;
+
+ if (status < 0) {
+ ELOG("poll error on Stream (%p) with descriptor %d: %s (%s)", (void *)stream,
+ stream->s.fd, uv_err_name(status), os_strerror(status));
+ return;
+ }
+ if (!(events & UV_READABLE)) {
+ return;
+ }
+
+ ssize_t cnt = read_eintr(stream->s.fd, stream->write_pos, rstream_space(stream));
+
+ if (cnt < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return;
+ }
+ DLOG("closing Stream (%p) with descriptor %d: %s", (void *)stream,
+ stream->s.fd, strerror(errno));
+ }
+
+ if (cnt <= 0) {
+ // Read error or EOF, either way stop the stream and invoke the callback
+ // with eof == true
+ uv_poll_stop(handle);
+ invoke_read_cb(stream, true);
+ return;
+ }
+
+ // at this point we're sure that cnt is positive, no error occurred
+ size_t nread = (size_t)cnt;
+ stream->num_bytes += nread;
+ stream->write_pos += cnt;
+ invoke_read_cb(stream, false);
+}
+#endif
+
static void read_event(void **argv)
{
RStream *stream = argv[0];
diff --git a/src/nvim/event/socket.c b/src/nvim/event/socket.c
@@ -154,7 +154,7 @@ int socket_watcher_accept(SocketWatcher *watcher, RStream *stream)
return result;
}
- stream_init(NULL, &stream->s, -1, client);
+ stream_init(NULL, &stream->s, -1, false, client);
return 0;
}
@@ -244,7 +244,7 @@ tcp_retry:
status = 1;
LOOP_PROCESS_EVENTS_UNTIL(&main_loop, NULL, timeout, status != 1);
if (status == 0) {
- stream_init(NULL, &stream->s, -1, uv_stream);
+ stream_init(NULL, &stream->s, -1, false, uv_stream);
success = true;
} else {
if (!uv_is_closing((uv_handle_t *)uv_stream)) {
diff --git a/src/nvim/event/stream.c b/src/nvim/event/stream.c
@@ -40,11 +40,14 @@ int stream_set_blocking(int fd, bool blocking)
return retval;
}
-void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream)
+void stream_init(Loop *loop, Stream *stream, int fd, bool poll, uv_stream_t *uvstream)
FUNC_ATTR_NONNULL_ARG(2)
{
// The underlying stream is either a file or an existing uv stream.
- assert(uvstream == NULL ? fd >= 0 && loop != NULL : fd < 0 && loop == NULL);
+ assert(uvstream == NULL ? fd >= 0 && loop != NULL : fd < 0 && loop == NULL && !poll);
+#ifdef MSWIN
+ assert(!poll);
+#endif
stream->uvstream = uvstream;
if (fd >= 0) {
@@ -52,6 +55,7 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream)
stream->fd = fd;
if (type == UV_FILE) {
+ assert(!poll);
// Non-blocking file reads are simulated with an idle handle that reads in
// chunks of the ring buffer size, giving time for other events to be
// processed between reads.
@@ -67,6 +71,11 @@ void stream_init(Loop *loop, Stream *stream, int fd, uv_stream_t *uvstream)
SetConsoleMode(stream->uv.tty.handle, dwMode);
}
stream->uvstream = (uv_stream_t *)&stream->uv.tty;
+#else
+ } else if (poll) {
+ uv_poll_init(&loop->uv, &stream->uv.poll, fd);
+ stream->uv.poll.data = stream;
+ stream->use_poll = true;
#endif
} else {
assert(type == UV_NAMED_PIPE || type == UV_TTY);
@@ -125,7 +134,8 @@ void stream_close_handle(Stream *stream)
}
handle = (uv_handle_t *)stream->uvstream;
} else {
- handle = (uv_handle_t *)&stream->uv.idle;
+ // All members of the stream->uv union share the same address.
+ handle = (uv_handle_t *)&stream->uv;
}
assert(handle != NULL);
diff --git a/src/nvim/event/wstream.c b/src/nvim/event/wstream.c
@@ -23,14 +23,14 @@ typedef struct {
void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
{
- stream_init(loop, stream, fd, NULL);
+ stream_init(loop, stream, fd, false, NULL);
wstream_init(stream, maxmem);
}
void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem)
FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
{
- stream_init(NULL, stream, -1, uvstream);
+ stream_init(NULL, stream, -1, false, uvstream);
wstream_init(stream, maxmem);
}
@@ -68,6 +68,7 @@ bool wstream_write(Stream *stream, WBuffer *buffer)
FUNC_ATTR_NONNULL_ALL
{
assert(stream->maxmem);
+ assert(!stream->use_poll);
// This should not be called after a stream was freed
assert(!stream->closed);
diff --git a/src/nvim/os/pty_proc_unix.c b/src/nvim/os/pty_proc_unix.c
@@ -214,10 +214,7 @@ int pty_proc_spawn(PtyProc *ptyproc)
&& (status = set_duplicating_descriptor(master, &proc->in.uv.pipe))) {
goto error;
}
- if (!proc->out.s.closed
- && (status = set_duplicating_descriptor(master, &proc->out.s.uv.pipe))) {
- goto error;
- }
+ // The stream_init() call in proc_spawn() will initialize proc->out.s.uv.poll.
ptyproc->tty_fd = master;
proc->pid = pid;
diff --git a/test/functional/fixtures/shell-test.c b/test/functional/fixtures/shell-test.c
@@ -32,12 +32,14 @@ static void help(void)
puts(" shell-test -t {prompt text} EXE \"prog args...\"");
puts(" Prints \"{prompt text} $ progs args...\" to stderr.");
puts(" shell-test REP N {text}");
- puts(" Prints \"{lnr}: {text}\\n\" to stdout N times, taking N milliseconds.");
+ puts(" Prints \"{lnr}: {text}\\n\" to stdout N times, pausing every 100 lines.");
puts(" Example:");
puts(" shell-test REP 97 \"foo bar\"");
puts(" 0: foo bar");
puts(" ...");
puts(" 96: foo bar");
+ puts(" shell-test REPFAST N {text}");
+ puts(" Like REP, but print as fast as possible and then exit immediately.");
puts(" shell-test INTERACT");
puts(" Prints \"interact $ \" to stderr, and waits for \"exit\" input.");
puts(" shell-test EXIT {code}");
@@ -71,9 +73,10 @@ int main(int argc, char **argv)
} else {
fprintf(stderr, "ready $ ");
}
- } else if (strcmp(argv[1], "REP") == 0) {
+ } else if (strcmp(argv[1], "REP") == 0 || strcmp(argv[1], "REPFAST") == 0) {
+ bool fast = strcmp(argv[1], "REPFAST") == 0;
if (argc != 4) {
- fprintf(stderr, "REP expects exactly 3 arguments\n");
+ fprintf(stderr, "REP/REPFAST expects exactly 3 arguments\n");
return 4;
}
int count = 0;
@@ -83,7 +86,7 @@ int main(int argc, char **argv)
}
for (int i = 0; i < count; i++) {
printf("%d: %s\n", i, argv[3]);
- if (i % 100 == 0) {
+ if (!fast && i % 100 == 0) {
usleep(1000); // Wait 1 ms (simulate typical output).
}
fflush(NULL);
diff --git a/test/functional/terminal/buffer_spec.lua b/test/functional/terminal/buffer_spec.lua
@@ -758,6 +758,25 @@ describe(':terminal buffer', function()
]])
end)
+ it('does not drop data when job exits immediately after output #3030', function()
+ local screen = Screen.new(50, 7)
+ api.nvim_create_autocmd('TermClose', { command = 'let g:did_termclose = 1' })
+ fn.jobstart({ testprg('shell-test'), 'REPFAST', '20000', 'TEST' }, { term = true })
+ retry(nil, nil, function()
+ eq(1, api.nvim_get_var('did_termclose'))
+ end)
+ feed('i')
+ screen:expect([[
+ 19996: TEST |
+ 19997: TEST |
+ 19998: TEST |
+ 19999: TEST |
+ |
+ [Process exited 0]^ |
+ {5:-- TERMINAL --} |
+ ]])
+ end)
+
it('handles unprintable chars', function()
local screen = Screen.new(50, 7)
feed 'i'