neovim

Neovim text editor
git clone https://git.dasho.dev/neovim.git
Log | Files | Refs | README

rpc.c (8886B)


      1 #include <string.h>
      2 
      3 #include "rpc.h"
      4 
      5 enum {
      6  MPACK_RPC_RECEIVE_ARRAY = 1,
      7  MPACK_RPC_RECEIVE_TYPE,
      8  MPACK_RPC_RECEIVE_ID
      9 };
     10 
     11 static mpack_rpc_header_t mpack_rpc_request_hdr(void);
     12 static mpack_rpc_header_t mpack_rpc_reply_hdr(void);
     13 static mpack_rpc_header_t mpack_rpc_notify_hdr(void);
     14 static int mpack_rpc_put(mpack_rpc_session_t *s, mpack_rpc_message_t m);
     15 static int mpack_rpc_pop(mpack_rpc_session_t *s, mpack_rpc_message_t *m);
     16 static void mpack_rpc_reset_hdr(mpack_rpc_header_t *hdr);
     17 
     18 MPACK_API void mpack_rpc_session_init(mpack_rpc_session_t *session,
     19    mpack_uint32_t capacity)
     20 {
     21  session->capacity = capacity ? capacity : MPACK_RPC_MAX_REQUESTS;
     22  session->request_id = 0;
     23  mpack_tokbuf_init(&session->reader);
     24  mpack_tokbuf_init(&session->writer);
     25  mpack_rpc_reset_hdr(&session->receive);
     26  mpack_rpc_reset_hdr(&session->send);
     27  memset(session->slots, 0,
     28      sizeof(struct mpack_rpc_slot_s) * session->capacity);
     29 }
     30 
     31 MPACK_API int mpack_rpc_receive_tok(mpack_rpc_session_t *session,
     32    mpack_token_t tok, mpack_rpc_message_t *msg)
     33 {
     34  int type;
     35 
     36  if (session->receive.index == 0) {
     37    if (tok.type != MPACK_TOKEN_ARRAY)
     38      /* not an array */
     39      return MPACK_RPC_EARRAY;
     40 
     41    if (tok.length < 3 || tok.length > 4)
     42      /* invalid array length */
     43      return MPACK_RPC_EARRAYL;
     44 
     45    session->receive.toks[0] = tok;
     46    session->receive.index++;
     47    return MPACK_EOF;  /* get the type */
     48  }
     49 
     50  if (session->receive.index == 1) {
     51 
     52    if (tok.type != MPACK_TOKEN_UINT || tok.length > 1 || tok.data.value.lo > 2)
     53      /* invalid type */
     54      return MPACK_RPC_ETYPE;
     55 
     56    if (tok.data.value.lo < 2 && session->receive.toks[0].length != 4)
     57      /* request or response with array length != 4 */
     58      return MPACK_RPC_EARRAYL;
     59 
     60    if (tok.data.value.lo == 2 && session->receive.toks[0].length != 3)
     61      /* notification with array length != 3 */
     62      return MPACK_RPC_EARRAYL;
     63 
     64    session->receive.toks[1] = tok;
     65    session->receive.index++;
     66 
     67    if (tok.data.value.lo < 2) return MPACK_EOF;
     68 
     69    type = MPACK_RPC_NOTIFICATION;
     70    goto end;
     71  }
     72 
     73  assert(session->receive.index == 2);
     74  
     75  if (tok.type != MPACK_TOKEN_UINT || tok.length > 4)
     76    /* invalid request/response id */
     77    return MPACK_RPC_EMSGID;
     78    
     79  msg->id = tok.data.value.lo;
     80  msg->data.p = NULL;
     81  type = (int)session->receive.toks[1].data.value.lo + MPACK_RPC_REQUEST;
     82 
     83  if (type == MPACK_RPC_RESPONSE && !mpack_rpc_pop(session, msg))
     84    /* response with invalid id */
     85    return MPACK_RPC_ERESPID;
     86 
     87 end:
     88  mpack_rpc_reset_hdr(&session->receive);
     89  return type;
     90 }
     91 
     92 MPACK_API int mpack_rpc_request_tok(mpack_rpc_session_t *session, 
     93    mpack_token_t *tok, mpack_data_t data)
     94 {
     95  if (session->send.index == 0) {
     96    int status;
     97    mpack_rpc_message_t msg;
     98    do {
     99      msg.id = session->request_id;
    100      msg.data = data;
    101      session->send = mpack_rpc_request_hdr();
    102      session->send.toks[2].type = MPACK_TOKEN_UINT;
    103      session->send.toks[2].data.value.lo = msg.id;
    104      session->send.toks[2].data.value.hi = 0;
    105      *tok = session->send.toks[0];
    106      status = mpack_rpc_put(session, msg);
    107      if (status == -1) return MPACK_NOMEM;
    108      session->request_id = (session->request_id + 1) % 0xffffffff;
    109    } while (!status);
    110    session->send.index++;
    111    return MPACK_EOF;
    112  }
    113  
    114  if (session->send.index == 1) {
    115    *tok = session->send.toks[1];
    116    session->send.index++;
    117    return MPACK_EOF;
    118  }
    119 
    120  assert(session->send.index == 2);
    121  *tok = session->send.toks[2];
    122  mpack_rpc_reset_hdr(&session->send);
    123  return MPACK_OK;
    124 }
    125 
    126 MPACK_API int mpack_rpc_reply_tok(mpack_rpc_session_t *session,
    127    mpack_token_t *tok, mpack_uint32_t id)
    128 {
    129  if (session->send.index == 0) {
    130    session->send = mpack_rpc_reply_hdr();
    131    session->send.toks[2].type = MPACK_TOKEN_UINT;
    132    session->send.toks[2].data.value.lo = id;
    133    session->send.toks[2].data.value.hi = 0;
    134    *tok = session->send.toks[0];
    135    session->send.index++;
    136    return MPACK_EOF;
    137  }
    138 
    139  if (session->send.index == 1) {
    140    *tok = session->send.toks[1];
    141    session->send.index++;
    142    return MPACK_EOF;
    143  }
    144 
    145  assert(session->send.index == 2);
    146  *tok = session->send.toks[2];
    147  mpack_rpc_reset_hdr(&session->send);
    148  return MPACK_OK;
    149 }
    150 
    151 MPACK_API int mpack_rpc_notify_tok(mpack_rpc_session_t *session,
    152    mpack_token_t *tok)
    153 {
    154  if (session->send.index == 0) {
    155    session->send = mpack_rpc_notify_hdr();
    156    *tok = session->send.toks[0];
    157    session->send.index++;
    158    return MPACK_EOF;
    159  }
    160 
    161  assert(session->send.index == 1);
    162  *tok = session->send.toks[1];
    163  mpack_rpc_reset_hdr(&session->send);
    164  return MPACK_OK;
    165 }
    166 
    167 MPACK_API int mpack_rpc_receive(mpack_rpc_session_t *session, const char **buf,
    168    size_t *buflen, mpack_rpc_message_t *msg)
    169 {
    170  int status;
    171 
    172  do {
    173    mpack_token_t tok;
    174    status = mpack_read(&session->reader, buf, buflen, &tok);
    175    if (status) break;
    176    status = mpack_rpc_receive_tok(session, tok, msg);
    177    if (status >= MPACK_RPC_REQUEST) break;
    178  } while (*buflen);
    179 
    180  return status;
    181 }
    182 
    183 MPACK_API int mpack_rpc_request(mpack_rpc_session_t *session, char **buf,
    184    size_t *buflen, mpack_data_t data)
    185 {
    186  int status = MPACK_EOF;
    187 
    188  while (status && *buflen) {
    189    int write_status;
    190    mpack_token_t tok;
    191    if (!session->writer.plen) {
    192      status = mpack_rpc_request_tok(session, &tok, data);
    193    }
    194    if (status == MPACK_NOMEM) break;
    195    write_status = mpack_write(&session->writer, buf, buflen, &tok);
    196    status = write_status ? write_status : status;
    197  }
    198 
    199  return status;
    200 }
    201 
    202 MPACK_API int mpack_rpc_reply(mpack_rpc_session_t *session, char **buf,
    203    size_t *buflen, mpack_uint32_t id)
    204 {
    205  int status = MPACK_EOF;
    206 
    207  while (status && *buflen) {
    208    int write_status;
    209    mpack_token_t tok;
    210    if (!session->writer.plen) {
    211      status = mpack_rpc_reply_tok(session, &tok, id);
    212    }
    213    write_status = mpack_write(&session->writer, buf, buflen, &tok);
    214    status = write_status ? write_status : status;
    215  }
    216 
    217  return status;
    218 }
    219 
    220 MPACK_API int mpack_rpc_notify(mpack_rpc_session_t *session, char **buf,
    221    size_t *buflen)
    222 {
    223  int status = MPACK_EOF;
    224 
    225  while (status && *buflen) {
    226    int write_status;
    227    mpack_token_t tok;
    228    if (!session->writer.plen) {
    229      status = mpack_rpc_notify_tok(session, &tok);
    230    }
    231    write_status = mpack_write(&session->writer, buf, buflen, &tok);
    232    status = write_status ? write_status : status;
    233  }
    234 
    235  return status;
    236 }
    237 
    238 MPACK_API void mpack_rpc_session_copy(mpack_rpc_session_t *dst,
    239    mpack_rpc_session_t *src)
    240 {
    241  mpack_uint32_t i;
    242  mpack_uint32_t dst_capacity = dst->capacity; 
    243  assert(src->capacity <= dst_capacity);
    244  /* copy all fields except slots */
    245  memcpy(dst, src, sizeof(mpack_rpc_one_session_t) -
    246      sizeof(struct mpack_rpc_slot_s));
    247  /* reset capacity */
    248  dst->capacity = dst_capacity;
    249  /* reinsert requests  */
    250  memset(dst->slots, 0, sizeof(struct mpack_rpc_slot_s) * dst->capacity);
    251  for (i = 0; i < src->capacity; i++) {
    252    if (src->slots[i].used) mpack_rpc_put(dst, src->slots[i].msg);
    253  }
    254 }
    255 
    256 static mpack_rpc_header_t mpack_rpc_request_hdr(void)
    257 {
    258  mpack_rpc_header_t hdr;
    259  hdr.index = 0;
    260  hdr.toks[0].type = MPACK_TOKEN_ARRAY;
    261  hdr.toks[0].length = 4;
    262  hdr.toks[1].type = MPACK_TOKEN_UINT;
    263  hdr.toks[1].data.value.lo = 0;
    264  hdr.toks[1].data.value.hi = 0;
    265  return hdr;
    266 }
    267 
    268 static mpack_rpc_header_t mpack_rpc_reply_hdr(void)
    269 {
    270  mpack_rpc_header_t hdr = mpack_rpc_request_hdr();
    271  hdr.toks[1].data.value.lo = 1;
    272  hdr.toks[1].data.value.hi = 0;
    273  return hdr;
    274 }
    275 
    276 static mpack_rpc_header_t mpack_rpc_notify_hdr(void)
    277 {
    278  mpack_rpc_header_t hdr = mpack_rpc_request_hdr();
    279  hdr.toks[0].length = 3;
    280  hdr.toks[1].data.value.lo = 2;
    281  hdr.toks[1].data.value.hi = 0;
    282  return hdr;
    283 }
    284 
    285 static int mpack_rpc_put(mpack_rpc_session_t *session, mpack_rpc_message_t msg)
    286 {
    287  struct mpack_rpc_slot_s *slot = NULL;
    288  mpack_uint32_t i;
    289  mpack_uint32_t hash = msg.id % session->capacity;
    290 
    291  for (i = 0; i < session->capacity; i++) {
    292    if (!session->slots[hash].used || session->slots[hash].msg.id == msg.id) {
    293      slot = session->slots + hash;
    294      break;
    295    }
    296    hash = hash > 0 ? hash - 1 : session->capacity - 1;
    297  }
    298 
    299  if (!slot) return -1; /* no space */
    300  if (slot->msg.id == msg.id && slot->used) return 0;  /* duplicate key */
    301  slot->msg = msg;
    302  slot->used = 1;
    303  return 1;
    304 }
    305 
    306 static int mpack_rpc_pop(mpack_rpc_session_t *session, mpack_rpc_message_t *msg)
    307 {
    308  struct mpack_rpc_slot_s *slot = NULL;
    309  mpack_uint32_t i;
    310  mpack_uint32_t hash = msg->id % session->capacity;
    311 
    312  for (i = 0; i < session->capacity; i++) {
    313    if (session->slots[hash].used && session->slots[hash].msg.id == msg->id) {
    314      slot = session->slots + hash;
    315      break;
    316    }
    317    hash = hash > 0 ? hash - 1 : session->capacity - 1;
    318  }
    319  
    320  if (!slot) return 0;
    321 
    322  *msg = slot->msg;
    323  slot->used = 0;
    324 
    325  return 1;
    326 }
    327 
    328 static void mpack_rpc_reset_hdr(mpack_rpc_header_t *hdr)
    329 {
    330  hdr->index = 0;
    331 }