neovim

Neovim text editor
git clone https://git.dasho.dev/neovim.git
Log | Files | Refs | README

channel.c (21094B)


      1 #include <assert.h>
      2 #include <inttypes.h>
      3 #include <stdbool.h>
      4 #include <stdio.h>
      5 #include <stdlib.h>
      6 
      7 #include "klib/kvec.h"
      8 #include "nvim/api/private/defs.h"
      9 #include "nvim/api/private/dispatch.h"
     10 #include "nvim/api/private/helpers.h"
     11 #include "nvim/api/ui.h"
     12 #include "nvim/channel.h"
     13 #include "nvim/channel_defs.h"
     14 #include "nvim/event/defs.h"
     15 #include "nvim/event/loop.h"
     16 #include "nvim/event/multiqueue.h"
     17 #include "nvim/event/proc.h"
     18 #include "nvim/event/rstream.h"
     19 #include "nvim/event/wstream.h"
     20 #include "nvim/globals.h"
     21 #include "nvim/log.h"
     22 #include "nvim/main.h"
     23 #include "nvim/map_defs.h"
     24 #include "nvim/memory.h"
     25 #include "nvim/message.h"
     26 #include "nvim/msgpack_rpc/channel.h"
     27 #include "nvim/msgpack_rpc/channel_defs.h"
     28 #include "nvim/msgpack_rpc/packer.h"
     29 #include "nvim/msgpack_rpc/packer_defs.h"
     30 #include "nvim/msgpack_rpc/unpacker.h"
     31 #include "nvim/os/input.h"
     32 #include "nvim/types_defs.h"
     33 #include "nvim/ui.h"
     34 #include "nvim/ui_client.h"
     35 
     36 #include "msgpack_rpc/channel.c.generated.h"
     37 
     38 #ifdef NVIM_LOG_DEBUG
     39 # define REQ "[request]  "
     40 # define RES "[response] "
     41 # define NOT "[notify]   "
     42 # define ERR "[error]    "
     43 
     44 # define SEND "->"
     45 # define RECV "<-"
     46 
     47 static void log_request(char *dir, uint64_t channel_id, uint32_t req_id, const char *name)
     48 {
     49  logmsg(LOGLVL_DBG, "RPC: ", NULL, -1, false, "%s %" PRIu64 ": %s id=%u: %s\n", dir, channel_id,
     50         REQ, req_id, name);
     51 }
     52 
     53 static void log_response(char *dir, uint64_t channel_id, char *kind, uint32_t req_id)
     54 {
     55  logmsg(LOGLVL_DBG, "RPC: ", NULL, -1, false, "%s %" PRIu64 ": %s id=%u\n", dir, channel_id, kind,
     56         req_id);
     57 }
     58 
     59 static void log_notify(char *dir, uint64_t channel_id, const char *name)
     60 {
     61  logmsg(LOGLVL_DBG, "RPC: ", NULL, -1, false, "%s %" PRIu64 ": %s %s\n", dir, channel_id, NOT,
     62         name);
     63 }
     64 
     65 #else
     66 # define log_request(...)
     67 # define log_response(...)
     68 # define log_notify(...)
     69 #endif
     70 
     71 void rpc_init(void)
     72 {
     73  ch_before_blocking_events = multiqueue_new_child(main_loop.events);
     74 }
     75 
     76 void rpc_start(Channel *channel)
     77 {
     78  channel_incref(channel);
     79  channel->is_rpc = true;
     80  RpcState *rpc = &channel->rpc;
     81  rpc->closed = false;
     82  rpc->unpacker = xcalloc(1, sizeof *rpc->unpacker);
     83  unpacker_init(rpc->unpacker);
     84  rpc->next_request_id = 1;
     85  rpc->info = (Dict)ARRAY_DICT_INIT;
     86  kv_init(rpc->call_stack);
     87 
     88  if (channel->streamtype != kChannelStreamInternal) {
     89    RStream *out = channel_outstream(channel);
     90 #ifdef NVIM_LOG_DEBUG
     91    Stream *in = channel_instream(channel);
     92    DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id,
     93         (void *)in, (void *)out);
     94 #endif
     95 
     96    rstream_start(out, receive_msgpack, channel);
     97  }
     98 }
     99 
    100 static Channel *find_rpc_channel(uint64_t id)
    101 {
    102  Channel *chan = find_channel(id);
    103  if (!chan || !chan->is_rpc || chan->rpc.closed) {
    104    return NULL;
    105  }
    106  return chan;
    107 }
    108 
    109 /// Publishes an event to a channel (emits a notification to method `name`).
    110 ///
    111 /// @param id Channel id, or 0 to broadcast to all RPC channels.
    112 /// @param name Event name (application-defined)
    113 /// @param args Array of event arguments
    114 /// @return True if the event was sent successfully, false otherwise.
    115 bool rpc_send_event(uint64_t id, const char *name, Array args)
    116 {
    117  Channel *channel = NULL;
    118 
    119  if (id && (!(channel = find_rpc_channel(id)))) {
    120    return false;
    121  }
    122 
    123  log_notify(SEND, channel ? channel->id : 0, name);
    124  if (channel) {
    125    serialize_request(&channel, 1, 0, name, args);
    126  } else {
    127    broadcast_event(name, args);
    128  }
    129 
    130  return true;
    131 }
    132 
    133 /// Sends a method call to a channel
    134 ///
    135 /// @param id The channel id
    136 /// @param method_name The method name, an arbitrary string
    137 /// @param args Array with method arguments
    138 /// @param[out] error True if the return value is an error
    139 /// @return Whatever the remote method returned
    140 Object rpc_send_call(uint64_t id, const char *method_name, Array args, ArenaMem *result_mem,
    141                     Error *err)
    142 {
    143  Channel *channel = NULL;
    144 
    145  if (!(channel = find_rpc_channel(id))) {
    146    api_set_error(err, kErrorTypeException, "Invalid channel: %" PRIu64, id);
    147    return NIL;
    148  }
    149 
    150  channel_incref(channel);
    151  RpcState *rpc = &channel->rpc;
    152  uint32_t request_id = rpc->next_request_id++;
    153  // Send the msgpack-rpc request
    154  serialize_request(&channel, 1, request_id, method_name, args);
    155 
    156  log_request(SEND, channel->id, request_id, method_name);
    157 
    158  // Push the frame
    159  ChannelCallFrame frame = { request_id, false, false, NIL, NULL };
    160  kv_push(rpc->call_stack, &frame);
    161  LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned || rpc->closed);
    162  (void)kv_pop(rpc->call_stack);
    163 
    164  // !frame.returned implies rpc->closed.
    165  // If frame.returned is true, its memory needs to be freed, so don't return here.
    166  if (!frame.returned) {
    167    api_set_error(err, kErrorTypeException, "Invalid channel: %" PRIu64, id);
    168    channel_decref(channel);
    169    return NIL;
    170  }
    171 
    172  if (frame.errored) {
    173    if (frame.result.type == kObjectTypeString) {
    174      api_set_error(err, kErrorTypeException, "%s",
    175                    frame.result.data.string.data);
    176    } else if (frame.result.type == kObjectTypeArray) {
    177      // Should be an error in the form [type, message]
    178      Array array = frame.result.data.array;
    179      if (array.size == 2 && array.items[0].type == kObjectTypeInteger
    180          && (array.items[0].data.integer == kErrorTypeException
    181              || array.items[0].data.integer == kErrorTypeValidation)
    182          && array.items[1].type == kObjectTypeString) {
    183        api_set_error(err, (ErrorType)array.items[0].data.integer, "%s",
    184                      array.items[1].data.string.data);
    185      } else {
    186        api_set_error(err, kErrorTypeException, "%s", "unknown error");
    187      }
    188    } else {
    189      api_set_error(err, kErrorTypeException, "%s", "unknown error");
    190    }
    191 
    192    // frame.result was allocated in an arena
    193    arena_mem_free(frame.result_mem);
    194    frame.result_mem = NULL;
    195  }
    196 
    197  channel_decref(channel);
    198 
    199  *result_mem = frame.result_mem;
    200 
    201  return frame.errored ? NIL : frame.result;
    202 }
    203 
    204 static size_t receive_msgpack(RStream *stream, const char *rbuf, size_t c, void *data, bool eof)
    205 {
    206  Channel *channel = data;
    207  channel_incref(channel);
    208  size_t consumed = 0;
    209 
    210  DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p",
    211       channel->id, c, (void *)stream);
    212 
    213  if (c > 0) {
    214    Unpacker *p = channel->rpc.unpacker;
    215    p->read_ptr = rbuf;
    216    p->read_size = c;
    217    parse_msgpack(channel);
    218 
    219    if (!unpacker_closed(p)) {
    220      consumed = c - p->read_size;
    221    }
    222  }
    223 
    224  if (eof) {
    225    char buf[256];
    226    snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the peer", channel->id);
    227    chan_close_on_err(channel, buf, LOGLVL_INF);
    228  }
    229 
    230  channel_decref(channel);
    231  return consumed;
    232 }
    233 
    234 static ChannelCallFrame *find_call_frame(RpcState *rpc, uint32_t request_id)
    235 {
    236  for (size_t i = 0; i < kv_size(rpc->call_stack); i++) {
    237    ChannelCallFrame *frame = kv_Z(rpc->call_stack, i);
    238    if (frame->request_id == request_id) {
    239      return frame;
    240    }
    241  }
    242  return NULL;
    243 }
    244 
    245 static void parse_msgpack(Channel *channel)
    246 {
    247  Unpacker *p = channel->rpc.unpacker;
    248  while (unpacker_advance(p)) {
    249    if (p->type == kMessageTypeRedrawEvent) {
    250      if (ui_client_attached) {
    251        if (p->has_grid_line_event) {
    252          ui_client_event_raw_line(&p->grid_line_event);
    253          p->has_grid_line_event = false;
    254        } else if (p->ui_handler.fn != NULL && p->result.type == kObjectTypeArray) {
    255          p->ui_handler.fn(p->result.data.array);
    256        }
    257      }
    258      arena_mem_free(arena_finish(&p->arena));
    259    } else if (p->type == kMessageTypeResponse) {
    260      ChannelCallFrame *frame = channel->rpc.client_type == kClientTypeMsgpackRpc
    261                                ? find_call_frame(&channel->rpc, p->request_id)
    262                                : kv_last(channel->rpc.call_stack);
    263      if (frame == NULL || p->request_id != frame->request_id) {
    264        char buf[256];
    265        snprintf(buf, sizeof(buf),
    266                 "ch %" PRIu64 " (type=%" PRIu32 ") returned a response with an unknown request "
    267                 "id %" PRIu32 ". Ensure the client is properly synchronized",
    268                 channel->id, (unsigned)channel->rpc.client_type, p->request_id);
    269        chan_close_on_err(channel, buf, LOGLVL_ERR);
    270        return;
    271      }
    272      frame->returned = true;
    273      frame->errored = (p->error.type != kObjectTypeNil);
    274 
    275      if (frame->errored) {
    276        frame->result = p->error;
    277        // TODO(bfredl): p->result should not even be decoded
    278        // api_free_object(p->result);
    279      } else {
    280        frame->result = p->result;
    281      }
    282      frame->result_mem = arena_finish(&p->arena);
    283      log_response(RECV, channel->id, frame->errored ? ERR : RES, p->request_id);
    284    } else {
    285      if (p->type == kMessageTypeNotification) {
    286        log_notify(RECV, channel->id, p->handler.name);
    287      } else {
    288        log_request(RECV, channel->id, p->request_id, p->handler.name);
    289      }
    290 
    291      Object res = p->result;
    292      if (p->result.type != kObjectTypeArray) {
    293        chan_close_on_err(channel, "msgpack-rpc request args must be an array", LOGLVL_ERR);
    294        return;
    295      }
    296      Array arg = res.data.array;
    297      handle_request(channel, p, arg);
    298    }
    299  }
    300 
    301  if (unpacker_closed(p)) {
    302    chan_close_on_err(channel, p->unpack_error.msg, LOGLVL_INF);
    303    api_clear_error(&p->unpack_error);
    304  }
    305 }
    306 
    307 /// Handles requests and notifications received on the channel.
    308 static void handle_request(Channel *channel, Unpacker *p, Array args)
    309  FUNC_ATTR_NONNULL_ALL
    310 {
    311  assert(p->type == kMessageTypeRequest || p->type == kMessageTypeNotification);
    312 
    313  if (!p->handler.fn) {
    314    send_error(channel, p->handler, p->type, p->request_id, p->unpack_error.msg);
    315    api_clear_error(&p->unpack_error);
    316    arena_mem_free(arena_finish(&p->arena));
    317    return;
    318  }
    319 
    320  RequestEvent *evdata = xmalloc(sizeof(RequestEvent));
    321  evdata->type = p->type;
    322  evdata->channel = channel;
    323  evdata->handler = p->handler;
    324  evdata->args = args;
    325  evdata->used_mem = p->arena;
    326  p->arena = (Arena)ARENA_EMPTY;
    327  evdata->request_id = p->request_id;
    328  channel_incref(channel);
    329  if (p->handler.fast) {
    330    bool is_get_mode = p->handler.fn == handle_nvim_get_mode;
    331 
    332    if (is_get_mode && !input_blocking()) {
    333      // Defer the event to a special queue used by os/input.c. #6247
    334      multiqueue_put(ch_before_blocking_events, request_event, evdata);
    335    } else {
    336      // Invoke immediately.
    337      request_event((void **)&evdata);
    338    }
    339  } else {
    340    bool is_resize = p->handler.fn == handle_nvim_ui_try_resize;
    341    if (is_resize) {
    342      Event ev = event_create_oneshot(event_create(request_event, evdata), 2);
    343      multiqueue_put_event(channel->events, ev);
    344      multiqueue_put_event(resize_events, ev);
    345    } else {
    346      multiqueue_put(channel->events, request_event, evdata);
    347      DLOG("RPC: scheduled %.*s", (int)p->method_name_len, p->handler.name);
    348    }
    349  }
    350 }
    351 
    352 /// Handles a message, depending on the type:
    353 ///   - Request: invokes method and writes the response (or error).
    354 ///   - Notification: invokes method (emits `nvim_error_event` on error).
    355 static void request_event(void **argv)
    356 {
    357  RequestEvent *e = argv[0];
    358  Channel *channel = e->channel;
    359  MsgpackRpcRequestHandler handler = e->handler;
    360  Error error = ERROR_INIT;
    361  if (channel->rpc.closed) {
    362    // channel was closed, abort any pending requests
    363    goto free_ret;
    364  }
    365 
    366  Object result = handler.fn(channel->id, e->args, &e->used_mem, &error);
    367  if (e->type == kMessageTypeRequest || ERROR_SET(&error)) {
    368    // Send the response.
    369    serialize_response(channel, e->handler, e->type, e->request_id, &error, &result);
    370  }
    371  if (handler.ret_alloc) {
    372    api_free_object(result);
    373  }
    374 
    375 free_ret:
    376  // e->args (and possibly result) are allocated in an arena
    377  arena_mem_free(arena_finish(&e->used_mem));
    378  channel_decref(channel);
    379  xfree(e);
    380  api_clear_error(&error);
    381 }
    382 
    383 bool rpc_write_raw(uint64_t id, WBuffer *buffer)
    384 {
    385  Channel *channel = find_rpc_channel(id);
    386  if (!channel) {
    387    wstream_release_wbuffer(buffer);
    388    return false;
    389  }
    390 
    391  return channel_write(channel, buffer);
    392 }
    393 
    394 static bool channel_write(Channel *channel, WBuffer *buffer)
    395 {
    396  int err = 0;
    397 
    398  if (channel->rpc.closed) {
    399    wstream_release_wbuffer(buffer);
    400    return false;
    401  }
    402 
    403  if (channel->streamtype == kChannelStreamInternal) {
    404    channel_incref(channel);
    405    CREATE_EVENT(channel->events, internal_read_event, channel, buffer);
    406  } else {
    407    Stream *in = channel_instream(channel);
    408    err = wstream_write(in, buffer);
    409  }
    410 
    411  if (err != 0) {
    412    // If the write failed for any reason, close the channel
    413    char buf[256];
    414    snprintf(buf,
    415             sizeof(buf),
    416             "ch %" PRIu64 ": stream write failed: %s. "
    417             "RPC canceled; closing channel",
    418             channel->id, os_strerror(err));
    419    // UV_EPIPE can happen if pipe is closed by peer and shouldn't be an error.
    420    chan_close_on_err(channel, buf, err == UV_EPIPE ? LOGLVL_INF : LOGLVL_ERR);
    421  }
    422 
    423  return err == 0;
    424 }
    425 
    426 static void internal_read_event(void **argv)
    427 {
    428  Channel *channel = argv[0];
    429  WBuffer *buffer = argv[1];
    430  Unpacker *p = channel->rpc.unpacker;
    431 
    432  p->read_ptr = buffer->data;
    433  p->read_size = buffer->size;
    434  parse_msgpack(channel);
    435 
    436  if (p->read_size) {
    437    // This should not happen, as WBuffer is one single serialized message.
    438    if (!channel->rpc.closed) {
    439      chan_close_on_err(channel, "internal channel: internal error", LOGLVL_ERR);
    440    }
    441  }
    442 
    443  channel_decref(channel);
    444  wstream_release_wbuffer(buffer);
    445 }
    446 
    447 static void send_error(Channel *chan, MsgpackRpcRequestHandler handler, MessageType type,
    448                       uint32_t id, char *err)
    449 {
    450  Error e = ERROR_INIT;
    451  api_set_error(&e, kErrorTypeException, "%s", err);
    452  serialize_response(chan, handler, type, id, &e, &NIL);
    453  api_clear_error(&e);
    454 }
    455 
    456 /// Broadcasts a notification to all RPC channels.
    457 static void broadcast_event(const char *name, Array args)
    458 {
    459  kvec_withinit_t(Channel *, 4) chans = KV_INITIAL_VALUE;
    460  kvi_init(chans);
    461  Channel *channel;
    462 
    463  map_foreach_value(&channels, channel, {
    464    if (channel->is_rpc) {
    465      kv_push(chans, channel);
    466    }
    467  });
    468 
    469  if (kv_size(chans)) {
    470    serialize_request(chans.items, kv_size(chans), 0, name, args);
    471  }
    472 
    473  kvi_destroy(chans);
    474 }
    475 
    476 /// Mark rpc state as closed, and release its reference to the channel.
    477 /// Don't call this directly, call channel_close(id, kChannelPartRpc, &error)
    478 void rpc_close(Channel *channel)
    479 {
    480  if (channel->rpc.closed) {
    481    return;
    482  }
    483 
    484  channel->rpc.closed = true;
    485 
    486  // Scheduled to avoid running UILeave autocommands in a libuv handler.
    487  multiqueue_put(main_loop.fast_events, rpc_close_event, channel);
    488 }
    489 
    490 static void rpc_close_event(void **argv)
    491 {
    492  Channel *channel = (Channel *)argv[0];
    493  assert(channel);
    494 
    495  channel_decref(channel);
    496 
    497  bool is_ui_client = ui_client_channel_id && channel->id == ui_client_channel_id;
    498  if (is_ui_client || channel->streamtype == kChannelStreamStdio) {
    499    if (!is_ui_client) {
    500      // Avoid hanging when there are no other UIs and a prompt is triggered on exit.
    501      remote_ui_disconnect(channel->id, NULL, false);
    502    } else {
    503      ui_client_may_restart_server();
    504      if (ui_client_channel_id != channel->id) {
    505        // A new server has been started. Don't exit.
    506        return;
    507      }
    508    }
    509    if (!channel->detach) {
    510      if (channel->streamtype == kChannelStreamProc && ui_client_error_exit < 0) {
    511        // Wait for the embedded server to exit instead of exiting immediately,
    512        // as it's necessary to get the server's exit code in on_proc_exit().
    513      } else {
    514        exit_on_closed_chan(0);
    515      }
    516    }
    517  }
    518 }
    519 
    520 void rpc_free(Channel *channel)
    521 {
    522  remote_ui_disconnect(channel->id, NULL, false);
    523  unpacker_teardown(channel->rpc.unpacker);
    524  xfree(channel->rpc.unpacker);
    525 
    526  kv_destroy(channel->rpc.call_stack);
    527  api_free_dict(channel->rpc.info);
    528 }
    529 
    530 /// Closes a channel after receiving fatal error, and logs a message.
    531 static void chan_close_on_err(Channel *channel, char *msg, int loglevel)
    532 {
    533  for (size_t i = 0; i < kv_size(channel->rpc.call_stack); i++) {
    534    ChannelCallFrame *frame = kv_A(channel->rpc.call_stack, i);
    535    if (frame->returned) {
    536      continue;  // Don't overwrite an already received result. #24214
    537    }
    538    frame->returned = true;
    539    frame->errored = true;
    540    frame->result = CSTR_TO_ARENA_OBJ(&channel->rpc.unpacker->arena, msg);
    541    frame->result_mem = arena_finish(&channel->rpc.unpacker->arena);
    542  }
    543 
    544  channel_close(channel->id, kChannelPartRpc, NULL);
    545 
    546  LOG(loglevel, "RPC: %s", msg);
    547 }
    548 
    549 static void serialize_request(Channel **chans, size_t nchans, uint32_t request_id,
    550                              const char *method, Array args)
    551 {
    552  PackerBuffer packer;
    553  packer_buffer_init_channels(chans, nchans, &packer);
    554 
    555  mpack_array(&packer.ptr, request_id ? 4 : 3);
    556  mpack_w(&packer.ptr, request_id ? 0 : 2);
    557 
    558  if (request_id) {
    559    mpack_uint(&packer.ptr, request_id);
    560  }
    561 
    562  mpack_str(cstr_as_string(method), &packer);
    563  mpack_object_array(args, &packer);
    564 
    565  packer_buffer_finish_channels(&packer);
    566 }
    567 
    568 void serialize_response(Channel *channel, MsgpackRpcRequestHandler handler, MessageType type,
    569                        uint32_t response_id, Error *err, Object *arg)
    570 {
    571  if (ERROR_SET(err) && type == kMessageTypeNotification) {
    572    if (handler.fn == handle_nvim_paste) {
    573      // TODO(bfredl): this is pretty much ad-hoc. maybe TUI and UI:s should be
    574      // allowed to ask nvim to just scream directly in the users face
    575      // instead of sending nvim_error_event, in general.
    576      semsg("paste: %s", err->msg);
    577      api_clear_error(err);
    578    } else {
    579      MAXSIZE_TEMP_ARRAY(args, 2);
    580      ADD_C(args, INTEGER_OBJ(err->type));
    581      ADD_C(args, CSTR_AS_OBJ(err->msg));
    582      serialize_request(&channel, 1, 0, "nvim_error_event", args);
    583    }
    584    return;
    585  }
    586 
    587  PackerBuffer packer;
    588  packer_buffer_init_channels(&channel, 1, &packer);
    589 
    590  mpack_array(&packer.ptr, 4);
    591  mpack_w(&packer.ptr, 1);
    592  mpack_uint(&packer.ptr, response_id);
    593 
    594  if (ERROR_SET(err)) {
    595    // error represented by a [type, message] array
    596    mpack_array(&packer.ptr, 2);
    597    mpack_integer(&packer.ptr, err->type);
    598    mpack_str(cstr_as_string(err->msg), &packer);
    599    // Nil result
    600    mpack_nil(&packer.ptr);
    601  } else {
    602    // Nil error
    603    mpack_nil(&packer.ptr);
    604    // Return value
    605    mpack_object(arg, &packer);
    606  }
    607 
    608  packer_buffer_finish_channels(&packer);
    609 
    610  log_response(SEND, channel->id, ERROR_SET(err) ? ERR : RES, response_id);
    611 }
    612 
    613 static void packer_buffer_init_channels(Channel **chans, size_t nchans, PackerBuffer *packer)
    614 {
    615  for (size_t i = 0; i < nchans; i++) {
    616    Channel *chan = chans[i];
    617    if (chan->rpc.ui && chan->rpc.ui->incomplete_event) {
    618      remote_ui_flush_pending_data(chan->rpc.ui);
    619    }
    620  }
    621  packer->startptr = alloc_block();
    622  packer->ptr = packer->startptr;
    623  packer->endptr = packer->startptr + ARENA_BLOCK_SIZE;
    624  packer->packer_flush = channel_flush_callback;
    625  packer->anydata = chans;
    626  packer->anyint = (int64_t)nchans;
    627 }
    628 
    629 static void packer_buffer_finish_channels(PackerBuffer *packer)
    630 {
    631  size_t len = (size_t)(packer->ptr - packer->startptr);
    632  if (len > 0) {
    633    WBuffer *buf = wstream_new_buffer(packer->startptr, len, (size_t)packer->anyint, free_block);
    634    Channel **chans = packer->anydata;
    635    for (int64_t i = 0; i < packer->anyint; i++) {
    636      channel_write(chans[i], buf);
    637    }
    638  } else {
    639    free_block(packer->startptr);
    640  }
    641 }
    642 
    643 static void channel_flush_callback(PackerBuffer *packer)
    644 {
    645  packer_buffer_finish_channels(packer);
    646  packer_buffer_init_channels(packer->anydata, (size_t)packer->anyint, packer);
    647 }
    648 
    649 void rpc_set_client_info(uint64_t id, Dict info)
    650 {
    651  Channel *chan = find_rpc_channel(id);
    652  if (!chan) {
    653    abort();
    654  }
    655 
    656  api_free_dict(chan->rpc.info);
    657  chan->rpc.info = info;
    658 
    659  // Parse "type" on "info" and set "client_type"
    660  const char *type = get_client_info(chan, "type");
    661  if (type == NULL || strequal(type, "remote")) {
    662    chan->rpc.client_type = kClientTypeRemote;
    663  } else if (strequal(type, "msgpack-rpc")) {
    664    chan->rpc.client_type = kClientTypeMsgpackRpc;
    665  } else if (strequal(type, "ui")) {
    666    chan->rpc.client_type = kClientTypeUi;
    667  } else if (strequal(type, "embedder")) {
    668    chan->rpc.client_type = kClientTypeEmbedder;
    669  } else if (strequal(type, "host")) {
    670    chan->rpc.client_type = kClientTypeHost;
    671  } else if (strequal(type, "plugin")) {
    672    chan->rpc.client_type = kClientTypePlugin;
    673  } else {
    674    chan->rpc.client_type = kClientTypeUnknown;
    675  }
    676 
    677  channel_info_changed(chan, false);
    678 }
    679 
    680 Dict rpc_client_info(Channel *chan)
    681 {
    682  return copy_dict(chan->rpc.info, NULL);
    683 }
    684 
    685 const char *get_client_info(Channel *chan, const char *key)
    686  FUNC_ATTR_NONNULL_ALL
    687 {
    688  if (!chan->is_rpc) {
    689    return NULL;
    690  }
    691  Dict info = chan->rpc.info;
    692  for (size_t i = 0; i < info.size; i++) {
    693    if (strequal(key, info.items[i].key.data)
    694        && info.items[i].value.type == kObjectTypeString) {
    695      return info.items[i].value.data.string.data;
    696    }
    697  }
    698 
    699  return NULL;
    700 }
    701 
    702 #ifdef EXITFREE
    703 void rpc_free_all_mem(void)
    704 {
    705  multiqueue_free(ch_before_blocking_events);
    706 }
    707 #endif