channel.c (21094B)
1 #include <assert.h> 2 #include <inttypes.h> 3 #include <stdbool.h> 4 #include <stdio.h> 5 #include <stdlib.h> 6 7 #include "klib/kvec.h" 8 #include "nvim/api/private/defs.h" 9 #include "nvim/api/private/dispatch.h" 10 #include "nvim/api/private/helpers.h" 11 #include "nvim/api/ui.h" 12 #include "nvim/channel.h" 13 #include "nvim/channel_defs.h" 14 #include "nvim/event/defs.h" 15 #include "nvim/event/loop.h" 16 #include "nvim/event/multiqueue.h" 17 #include "nvim/event/proc.h" 18 #include "nvim/event/rstream.h" 19 #include "nvim/event/wstream.h" 20 #include "nvim/globals.h" 21 #include "nvim/log.h" 22 #include "nvim/main.h" 23 #include "nvim/map_defs.h" 24 #include "nvim/memory.h" 25 #include "nvim/message.h" 26 #include "nvim/msgpack_rpc/channel.h" 27 #include "nvim/msgpack_rpc/channel_defs.h" 28 #include "nvim/msgpack_rpc/packer.h" 29 #include "nvim/msgpack_rpc/packer_defs.h" 30 #include "nvim/msgpack_rpc/unpacker.h" 31 #include "nvim/os/input.h" 32 #include "nvim/types_defs.h" 33 #include "nvim/ui.h" 34 #include "nvim/ui_client.h" 35 36 #include "msgpack_rpc/channel.c.generated.h" 37 38 #ifdef NVIM_LOG_DEBUG 39 # define REQ "[request] " 40 # define RES "[response] " 41 # define NOT "[notify] " 42 # define ERR "[error] " 43 44 # define SEND "->" 45 # define RECV "<-" 46 47 static void log_request(char *dir, uint64_t channel_id, uint32_t req_id, const char *name) 48 { 49 logmsg(LOGLVL_DBG, "RPC: ", NULL, -1, false, "%s %" PRIu64 ": %s id=%u: %s\n", dir, channel_id, 50 REQ, req_id, name); 51 } 52 53 static void log_response(char *dir, uint64_t channel_id, char *kind, uint32_t req_id) 54 { 55 logmsg(LOGLVL_DBG, "RPC: ", NULL, -1, false, "%s %" PRIu64 ": %s id=%u\n", dir, channel_id, kind, 56 req_id); 57 } 58 59 static void log_notify(char *dir, uint64_t channel_id, const char *name) 60 { 61 logmsg(LOGLVL_DBG, "RPC: ", NULL, -1, false, "%s %" PRIu64 ": %s %s\n", dir, channel_id, NOT, 62 name); 63 } 64 65 #else 66 # define log_request(...) 67 # define log_response(...) 68 # define log_notify(...) 69 #endif 70 71 void rpc_init(void) 72 { 73 ch_before_blocking_events = multiqueue_new_child(main_loop.events); 74 } 75 76 void rpc_start(Channel *channel) 77 { 78 channel_incref(channel); 79 channel->is_rpc = true; 80 RpcState *rpc = &channel->rpc; 81 rpc->closed = false; 82 rpc->unpacker = xcalloc(1, sizeof *rpc->unpacker); 83 unpacker_init(rpc->unpacker); 84 rpc->next_request_id = 1; 85 rpc->info = (Dict)ARRAY_DICT_INIT; 86 kv_init(rpc->call_stack); 87 88 if (channel->streamtype != kChannelStreamInternal) { 89 RStream *out = channel_outstream(channel); 90 #ifdef NVIM_LOG_DEBUG 91 Stream *in = channel_instream(channel); 92 DLOG("rpc ch %" PRIu64 " in-stream=%p out-stream=%p", channel->id, 93 (void *)in, (void *)out); 94 #endif 95 96 rstream_start(out, receive_msgpack, channel); 97 } 98 } 99 100 static Channel *find_rpc_channel(uint64_t id) 101 { 102 Channel *chan = find_channel(id); 103 if (!chan || !chan->is_rpc || chan->rpc.closed) { 104 return NULL; 105 } 106 return chan; 107 } 108 109 /// Publishes an event to a channel (emits a notification to method `name`). 110 /// 111 /// @param id Channel id, or 0 to broadcast to all RPC channels. 112 /// @param name Event name (application-defined) 113 /// @param args Array of event arguments 114 /// @return True if the event was sent successfully, false otherwise. 115 bool rpc_send_event(uint64_t id, const char *name, Array args) 116 { 117 Channel *channel = NULL; 118 119 if (id && (!(channel = find_rpc_channel(id)))) { 120 return false; 121 } 122 123 log_notify(SEND, channel ? channel->id : 0, name); 124 if (channel) { 125 serialize_request(&channel, 1, 0, name, args); 126 } else { 127 broadcast_event(name, args); 128 } 129 130 return true; 131 } 132 133 /// Sends a method call to a channel 134 /// 135 /// @param id The channel id 136 /// @param method_name The method name, an arbitrary string 137 /// @param args Array with method arguments 138 /// @param[out] error True if the return value is an error 139 /// @return Whatever the remote method returned 140 Object rpc_send_call(uint64_t id, const char *method_name, Array args, ArenaMem *result_mem, 141 Error *err) 142 { 143 Channel *channel = NULL; 144 145 if (!(channel = find_rpc_channel(id))) { 146 api_set_error(err, kErrorTypeException, "Invalid channel: %" PRIu64, id); 147 return NIL; 148 } 149 150 channel_incref(channel); 151 RpcState *rpc = &channel->rpc; 152 uint32_t request_id = rpc->next_request_id++; 153 // Send the msgpack-rpc request 154 serialize_request(&channel, 1, request_id, method_name, args); 155 156 log_request(SEND, channel->id, request_id, method_name); 157 158 // Push the frame 159 ChannelCallFrame frame = { request_id, false, false, NIL, NULL }; 160 kv_push(rpc->call_stack, &frame); 161 LOOP_PROCESS_EVENTS_UNTIL(&main_loop, channel->events, -1, frame.returned || rpc->closed); 162 (void)kv_pop(rpc->call_stack); 163 164 // !frame.returned implies rpc->closed. 165 // If frame.returned is true, its memory needs to be freed, so don't return here. 166 if (!frame.returned) { 167 api_set_error(err, kErrorTypeException, "Invalid channel: %" PRIu64, id); 168 channel_decref(channel); 169 return NIL; 170 } 171 172 if (frame.errored) { 173 if (frame.result.type == kObjectTypeString) { 174 api_set_error(err, kErrorTypeException, "%s", 175 frame.result.data.string.data); 176 } else if (frame.result.type == kObjectTypeArray) { 177 // Should be an error in the form [type, message] 178 Array array = frame.result.data.array; 179 if (array.size == 2 && array.items[0].type == kObjectTypeInteger 180 && (array.items[0].data.integer == kErrorTypeException 181 || array.items[0].data.integer == kErrorTypeValidation) 182 && array.items[1].type == kObjectTypeString) { 183 api_set_error(err, (ErrorType)array.items[0].data.integer, "%s", 184 array.items[1].data.string.data); 185 } else { 186 api_set_error(err, kErrorTypeException, "%s", "unknown error"); 187 } 188 } else { 189 api_set_error(err, kErrorTypeException, "%s", "unknown error"); 190 } 191 192 // frame.result was allocated in an arena 193 arena_mem_free(frame.result_mem); 194 frame.result_mem = NULL; 195 } 196 197 channel_decref(channel); 198 199 *result_mem = frame.result_mem; 200 201 return frame.errored ? NIL : frame.result; 202 } 203 204 static size_t receive_msgpack(RStream *stream, const char *rbuf, size_t c, void *data, bool eof) 205 { 206 Channel *channel = data; 207 channel_incref(channel); 208 size_t consumed = 0; 209 210 DLOG("ch %" PRIu64 ": parsing %zu bytes from msgpack Stream: %p", 211 channel->id, c, (void *)stream); 212 213 if (c > 0) { 214 Unpacker *p = channel->rpc.unpacker; 215 p->read_ptr = rbuf; 216 p->read_size = c; 217 parse_msgpack(channel); 218 219 if (!unpacker_closed(p)) { 220 consumed = c - p->read_size; 221 } 222 } 223 224 if (eof) { 225 char buf[256]; 226 snprintf(buf, sizeof(buf), "ch %" PRIu64 " was closed by the peer", channel->id); 227 chan_close_on_err(channel, buf, LOGLVL_INF); 228 } 229 230 channel_decref(channel); 231 return consumed; 232 } 233 234 static ChannelCallFrame *find_call_frame(RpcState *rpc, uint32_t request_id) 235 { 236 for (size_t i = 0; i < kv_size(rpc->call_stack); i++) { 237 ChannelCallFrame *frame = kv_Z(rpc->call_stack, i); 238 if (frame->request_id == request_id) { 239 return frame; 240 } 241 } 242 return NULL; 243 } 244 245 static void parse_msgpack(Channel *channel) 246 { 247 Unpacker *p = channel->rpc.unpacker; 248 while (unpacker_advance(p)) { 249 if (p->type == kMessageTypeRedrawEvent) { 250 if (ui_client_attached) { 251 if (p->has_grid_line_event) { 252 ui_client_event_raw_line(&p->grid_line_event); 253 p->has_grid_line_event = false; 254 } else if (p->ui_handler.fn != NULL && p->result.type == kObjectTypeArray) { 255 p->ui_handler.fn(p->result.data.array); 256 } 257 } 258 arena_mem_free(arena_finish(&p->arena)); 259 } else if (p->type == kMessageTypeResponse) { 260 ChannelCallFrame *frame = channel->rpc.client_type == kClientTypeMsgpackRpc 261 ? find_call_frame(&channel->rpc, p->request_id) 262 : kv_last(channel->rpc.call_stack); 263 if (frame == NULL || p->request_id != frame->request_id) { 264 char buf[256]; 265 snprintf(buf, sizeof(buf), 266 "ch %" PRIu64 " (type=%" PRIu32 ") returned a response with an unknown request " 267 "id %" PRIu32 ". Ensure the client is properly synchronized", 268 channel->id, (unsigned)channel->rpc.client_type, p->request_id); 269 chan_close_on_err(channel, buf, LOGLVL_ERR); 270 return; 271 } 272 frame->returned = true; 273 frame->errored = (p->error.type != kObjectTypeNil); 274 275 if (frame->errored) { 276 frame->result = p->error; 277 // TODO(bfredl): p->result should not even be decoded 278 // api_free_object(p->result); 279 } else { 280 frame->result = p->result; 281 } 282 frame->result_mem = arena_finish(&p->arena); 283 log_response(RECV, channel->id, frame->errored ? ERR : RES, p->request_id); 284 } else { 285 if (p->type == kMessageTypeNotification) { 286 log_notify(RECV, channel->id, p->handler.name); 287 } else { 288 log_request(RECV, channel->id, p->request_id, p->handler.name); 289 } 290 291 Object res = p->result; 292 if (p->result.type != kObjectTypeArray) { 293 chan_close_on_err(channel, "msgpack-rpc request args must be an array", LOGLVL_ERR); 294 return; 295 } 296 Array arg = res.data.array; 297 handle_request(channel, p, arg); 298 } 299 } 300 301 if (unpacker_closed(p)) { 302 chan_close_on_err(channel, p->unpack_error.msg, LOGLVL_INF); 303 api_clear_error(&p->unpack_error); 304 } 305 } 306 307 /// Handles requests and notifications received on the channel. 308 static void handle_request(Channel *channel, Unpacker *p, Array args) 309 FUNC_ATTR_NONNULL_ALL 310 { 311 assert(p->type == kMessageTypeRequest || p->type == kMessageTypeNotification); 312 313 if (!p->handler.fn) { 314 send_error(channel, p->handler, p->type, p->request_id, p->unpack_error.msg); 315 api_clear_error(&p->unpack_error); 316 arena_mem_free(arena_finish(&p->arena)); 317 return; 318 } 319 320 RequestEvent *evdata = xmalloc(sizeof(RequestEvent)); 321 evdata->type = p->type; 322 evdata->channel = channel; 323 evdata->handler = p->handler; 324 evdata->args = args; 325 evdata->used_mem = p->arena; 326 p->arena = (Arena)ARENA_EMPTY; 327 evdata->request_id = p->request_id; 328 channel_incref(channel); 329 if (p->handler.fast) { 330 bool is_get_mode = p->handler.fn == handle_nvim_get_mode; 331 332 if (is_get_mode && !input_blocking()) { 333 // Defer the event to a special queue used by os/input.c. #6247 334 multiqueue_put(ch_before_blocking_events, request_event, evdata); 335 } else { 336 // Invoke immediately. 337 request_event((void **)&evdata); 338 } 339 } else { 340 bool is_resize = p->handler.fn == handle_nvim_ui_try_resize; 341 if (is_resize) { 342 Event ev = event_create_oneshot(event_create(request_event, evdata), 2); 343 multiqueue_put_event(channel->events, ev); 344 multiqueue_put_event(resize_events, ev); 345 } else { 346 multiqueue_put(channel->events, request_event, evdata); 347 DLOG("RPC: scheduled %.*s", (int)p->method_name_len, p->handler.name); 348 } 349 } 350 } 351 352 /// Handles a message, depending on the type: 353 /// - Request: invokes method and writes the response (or error). 354 /// - Notification: invokes method (emits `nvim_error_event` on error). 355 static void request_event(void **argv) 356 { 357 RequestEvent *e = argv[0]; 358 Channel *channel = e->channel; 359 MsgpackRpcRequestHandler handler = e->handler; 360 Error error = ERROR_INIT; 361 if (channel->rpc.closed) { 362 // channel was closed, abort any pending requests 363 goto free_ret; 364 } 365 366 Object result = handler.fn(channel->id, e->args, &e->used_mem, &error); 367 if (e->type == kMessageTypeRequest || ERROR_SET(&error)) { 368 // Send the response. 369 serialize_response(channel, e->handler, e->type, e->request_id, &error, &result); 370 } 371 if (handler.ret_alloc) { 372 api_free_object(result); 373 } 374 375 free_ret: 376 // e->args (and possibly result) are allocated in an arena 377 arena_mem_free(arena_finish(&e->used_mem)); 378 channel_decref(channel); 379 xfree(e); 380 api_clear_error(&error); 381 } 382 383 bool rpc_write_raw(uint64_t id, WBuffer *buffer) 384 { 385 Channel *channel = find_rpc_channel(id); 386 if (!channel) { 387 wstream_release_wbuffer(buffer); 388 return false; 389 } 390 391 return channel_write(channel, buffer); 392 } 393 394 static bool channel_write(Channel *channel, WBuffer *buffer) 395 { 396 int err = 0; 397 398 if (channel->rpc.closed) { 399 wstream_release_wbuffer(buffer); 400 return false; 401 } 402 403 if (channel->streamtype == kChannelStreamInternal) { 404 channel_incref(channel); 405 CREATE_EVENT(channel->events, internal_read_event, channel, buffer); 406 } else { 407 Stream *in = channel_instream(channel); 408 err = wstream_write(in, buffer); 409 } 410 411 if (err != 0) { 412 // If the write failed for any reason, close the channel 413 char buf[256]; 414 snprintf(buf, 415 sizeof(buf), 416 "ch %" PRIu64 ": stream write failed: %s. " 417 "RPC canceled; closing channel", 418 channel->id, os_strerror(err)); 419 // UV_EPIPE can happen if pipe is closed by peer and shouldn't be an error. 420 chan_close_on_err(channel, buf, err == UV_EPIPE ? LOGLVL_INF : LOGLVL_ERR); 421 } 422 423 return err == 0; 424 } 425 426 static void internal_read_event(void **argv) 427 { 428 Channel *channel = argv[0]; 429 WBuffer *buffer = argv[1]; 430 Unpacker *p = channel->rpc.unpacker; 431 432 p->read_ptr = buffer->data; 433 p->read_size = buffer->size; 434 parse_msgpack(channel); 435 436 if (p->read_size) { 437 // This should not happen, as WBuffer is one single serialized message. 438 if (!channel->rpc.closed) { 439 chan_close_on_err(channel, "internal channel: internal error", LOGLVL_ERR); 440 } 441 } 442 443 channel_decref(channel); 444 wstream_release_wbuffer(buffer); 445 } 446 447 static void send_error(Channel *chan, MsgpackRpcRequestHandler handler, MessageType type, 448 uint32_t id, char *err) 449 { 450 Error e = ERROR_INIT; 451 api_set_error(&e, kErrorTypeException, "%s", err); 452 serialize_response(chan, handler, type, id, &e, &NIL); 453 api_clear_error(&e); 454 } 455 456 /// Broadcasts a notification to all RPC channels. 457 static void broadcast_event(const char *name, Array args) 458 { 459 kvec_withinit_t(Channel *, 4) chans = KV_INITIAL_VALUE; 460 kvi_init(chans); 461 Channel *channel; 462 463 map_foreach_value(&channels, channel, { 464 if (channel->is_rpc) { 465 kv_push(chans, channel); 466 } 467 }); 468 469 if (kv_size(chans)) { 470 serialize_request(chans.items, kv_size(chans), 0, name, args); 471 } 472 473 kvi_destroy(chans); 474 } 475 476 /// Mark rpc state as closed, and release its reference to the channel. 477 /// Don't call this directly, call channel_close(id, kChannelPartRpc, &error) 478 void rpc_close(Channel *channel) 479 { 480 if (channel->rpc.closed) { 481 return; 482 } 483 484 channel->rpc.closed = true; 485 486 // Scheduled to avoid running UILeave autocommands in a libuv handler. 487 multiqueue_put(main_loop.fast_events, rpc_close_event, channel); 488 } 489 490 static void rpc_close_event(void **argv) 491 { 492 Channel *channel = (Channel *)argv[0]; 493 assert(channel); 494 495 channel_decref(channel); 496 497 bool is_ui_client = ui_client_channel_id && channel->id == ui_client_channel_id; 498 if (is_ui_client || channel->streamtype == kChannelStreamStdio) { 499 if (!is_ui_client) { 500 // Avoid hanging when there are no other UIs and a prompt is triggered on exit. 501 remote_ui_disconnect(channel->id, NULL, false); 502 } else { 503 ui_client_may_restart_server(); 504 if (ui_client_channel_id != channel->id) { 505 // A new server has been started. Don't exit. 506 return; 507 } 508 } 509 if (!channel->detach) { 510 if (channel->streamtype == kChannelStreamProc && ui_client_error_exit < 0) { 511 // Wait for the embedded server to exit instead of exiting immediately, 512 // as it's necessary to get the server's exit code in on_proc_exit(). 513 } else { 514 exit_on_closed_chan(0); 515 } 516 } 517 } 518 } 519 520 void rpc_free(Channel *channel) 521 { 522 remote_ui_disconnect(channel->id, NULL, false); 523 unpacker_teardown(channel->rpc.unpacker); 524 xfree(channel->rpc.unpacker); 525 526 kv_destroy(channel->rpc.call_stack); 527 api_free_dict(channel->rpc.info); 528 } 529 530 /// Closes a channel after receiving fatal error, and logs a message. 531 static void chan_close_on_err(Channel *channel, char *msg, int loglevel) 532 { 533 for (size_t i = 0; i < kv_size(channel->rpc.call_stack); i++) { 534 ChannelCallFrame *frame = kv_A(channel->rpc.call_stack, i); 535 if (frame->returned) { 536 continue; // Don't overwrite an already received result. #24214 537 } 538 frame->returned = true; 539 frame->errored = true; 540 frame->result = CSTR_TO_ARENA_OBJ(&channel->rpc.unpacker->arena, msg); 541 frame->result_mem = arena_finish(&channel->rpc.unpacker->arena); 542 } 543 544 channel_close(channel->id, kChannelPartRpc, NULL); 545 546 LOG(loglevel, "RPC: %s", msg); 547 } 548 549 static void serialize_request(Channel **chans, size_t nchans, uint32_t request_id, 550 const char *method, Array args) 551 { 552 PackerBuffer packer; 553 packer_buffer_init_channels(chans, nchans, &packer); 554 555 mpack_array(&packer.ptr, request_id ? 4 : 3); 556 mpack_w(&packer.ptr, request_id ? 0 : 2); 557 558 if (request_id) { 559 mpack_uint(&packer.ptr, request_id); 560 } 561 562 mpack_str(cstr_as_string(method), &packer); 563 mpack_object_array(args, &packer); 564 565 packer_buffer_finish_channels(&packer); 566 } 567 568 void serialize_response(Channel *channel, MsgpackRpcRequestHandler handler, MessageType type, 569 uint32_t response_id, Error *err, Object *arg) 570 { 571 if (ERROR_SET(err) && type == kMessageTypeNotification) { 572 if (handler.fn == handle_nvim_paste) { 573 // TODO(bfredl): this is pretty much ad-hoc. maybe TUI and UI:s should be 574 // allowed to ask nvim to just scream directly in the users face 575 // instead of sending nvim_error_event, in general. 576 semsg("paste: %s", err->msg); 577 api_clear_error(err); 578 } else { 579 MAXSIZE_TEMP_ARRAY(args, 2); 580 ADD_C(args, INTEGER_OBJ(err->type)); 581 ADD_C(args, CSTR_AS_OBJ(err->msg)); 582 serialize_request(&channel, 1, 0, "nvim_error_event", args); 583 } 584 return; 585 } 586 587 PackerBuffer packer; 588 packer_buffer_init_channels(&channel, 1, &packer); 589 590 mpack_array(&packer.ptr, 4); 591 mpack_w(&packer.ptr, 1); 592 mpack_uint(&packer.ptr, response_id); 593 594 if (ERROR_SET(err)) { 595 // error represented by a [type, message] array 596 mpack_array(&packer.ptr, 2); 597 mpack_integer(&packer.ptr, err->type); 598 mpack_str(cstr_as_string(err->msg), &packer); 599 // Nil result 600 mpack_nil(&packer.ptr); 601 } else { 602 // Nil error 603 mpack_nil(&packer.ptr); 604 // Return value 605 mpack_object(arg, &packer); 606 } 607 608 packer_buffer_finish_channels(&packer); 609 610 log_response(SEND, channel->id, ERROR_SET(err) ? ERR : RES, response_id); 611 } 612 613 static void packer_buffer_init_channels(Channel **chans, size_t nchans, PackerBuffer *packer) 614 { 615 for (size_t i = 0; i < nchans; i++) { 616 Channel *chan = chans[i]; 617 if (chan->rpc.ui && chan->rpc.ui->incomplete_event) { 618 remote_ui_flush_pending_data(chan->rpc.ui); 619 } 620 } 621 packer->startptr = alloc_block(); 622 packer->ptr = packer->startptr; 623 packer->endptr = packer->startptr + ARENA_BLOCK_SIZE; 624 packer->packer_flush = channel_flush_callback; 625 packer->anydata = chans; 626 packer->anyint = (int64_t)nchans; 627 } 628 629 static void packer_buffer_finish_channels(PackerBuffer *packer) 630 { 631 size_t len = (size_t)(packer->ptr - packer->startptr); 632 if (len > 0) { 633 WBuffer *buf = wstream_new_buffer(packer->startptr, len, (size_t)packer->anyint, free_block); 634 Channel **chans = packer->anydata; 635 for (int64_t i = 0; i < packer->anyint; i++) { 636 channel_write(chans[i], buf); 637 } 638 } else { 639 free_block(packer->startptr); 640 } 641 } 642 643 static void channel_flush_callback(PackerBuffer *packer) 644 { 645 packer_buffer_finish_channels(packer); 646 packer_buffer_init_channels(packer->anydata, (size_t)packer->anyint, packer); 647 } 648 649 void rpc_set_client_info(uint64_t id, Dict info) 650 { 651 Channel *chan = find_rpc_channel(id); 652 if (!chan) { 653 abort(); 654 } 655 656 api_free_dict(chan->rpc.info); 657 chan->rpc.info = info; 658 659 // Parse "type" on "info" and set "client_type" 660 const char *type = get_client_info(chan, "type"); 661 if (type == NULL || strequal(type, "remote")) { 662 chan->rpc.client_type = kClientTypeRemote; 663 } else if (strequal(type, "msgpack-rpc")) { 664 chan->rpc.client_type = kClientTypeMsgpackRpc; 665 } else if (strequal(type, "ui")) { 666 chan->rpc.client_type = kClientTypeUi; 667 } else if (strequal(type, "embedder")) { 668 chan->rpc.client_type = kClientTypeEmbedder; 669 } else if (strequal(type, "host")) { 670 chan->rpc.client_type = kClientTypeHost; 671 } else if (strequal(type, "plugin")) { 672 chan->rpc.client_type = kClientTypePlugin; 673 } else { 674 chan->rpc.client_type = kClientTypeUnknown; 675 } 676 677 channel_info_changed(chan, false); 678 } 679 680 Dict rpc_client_info(Channel *chan) 681 { 682 return copy_dict(chan->rpc.info, NULL); 683 } 684 685 const char *get_client_info(Channel *chan, const char *key) 686 FUNC_ATTR_NONNULL_ALL 687 { 688 if (!chan->is_rpc) { 689 return NULL; 690 } 691 Dict info = chan->rpc.info; 692 for (size_t i = 0; i < info.size; i++) { 693 if (strequal(key, info.items[i].key.data) 694 && info.items[i].value.type == kObjectTypeString) { 695 return info.items[i].value.data.string.data; 696 } 697 } 698 699 return NULL; 700 } 701 702 #ifdef EXITFREE 703 void rpc_free_all_mem(void) 704 { 705 multiqueue_free(ch_before_blocking_events); 706 } 707 #endif