rpc.c (8886B)
1 #include <string.h> 2 3 #include "rpc.h" 4 5 enum { 6 MPACK_RPC_RECEIVE_ARRAY = 1, 7 MPACK_RPC_RECEIVE_TYPE, 8 MPACK_RPC_RECEIVE_ID 9 }; 10 11 static mpack_rpc_header_t mpack_rpc_request_hdr(void); 12 static mpack_rpc_header_t mpack_rpc_reply_hdr(void); 13 static mpack_rpc_header_t mpack_rpc_notify_hdr(void); 14 static int mpack_rpc_put(mpack_rpc_session_t *s, mpack_rpc_message_t m); 15 static int mpack_rpc_pop(mpack_rpc_session_t *s, mpack_rpc_message_t *m); 16 static void mpack_rpc_reset_hdr(mpack_rpc_header_t *hdr); 17 18 MPACK_API void mpack_rpc_session_init(mpack_rpc_session_t *session, 19 mpack_uint32_t capacity) 20 { 21 session->capacity = capacity ? capacity : MPACK_RPC_MAX_REQUESTS; 22 session->request_id = 0; 23 mpack_tokbuf_init(&session->reader); 24 mpack_tokbuf_init(&session->writer); 25 mpack_rpc_reset_hdr(&session->receive); 26 mpack_rpc_reset_hdr(&session->send); 27 memset(session->slots, 0, 28 sizeof(struct mpack_rpc_slot_s) * session->capacity); 29 } 30 31 MPACK_API int mpack_rpc_receive_tok(mpack_rpc_session_t *session, 32 mpack_token_t tok, mpack_rpc_message_t *msg) 33 { 34 int type; 35 36 if (session->receive.index == 0) { 37 if (tok.type != MPACK_TOKEN_ARRAY) 38 /* not an array */ 39 return MPACK_RPC_EARRAY; 40 41 if (tok.length < 3 || tok.length > 4) 42 /* invalid array length */ 43 return MPACK_RPC_EARRAYL; 44 45 session->receive.toks[0] = tok; 46 session->receive.index++; 47 return MPACK_EOF; /* get the type */ 48 } 49 50 if (session->receive.index == 1) { 51 52 if (tok.type != MPACK_TOKEN_UINT || tok.length > 1 || tok.data.value.lo > 2) 53 /* invalid type */ 54 return MPACK_RPC_ETYPE; 55 56 if (tok.data.value.lo < 2 && session->receive.toks[0].length != 4) 57 /* request or response with array length != 4 */ 58 return MPACK_RPC_EARRAYL; 59 60 if (tok.data.value.lo == 2 && session->receive.toks[0].length != 3) 61 /* notification with array length != 3 */ 62 return MPACK_RPC_EARRAYL; 63 64 session->receive.toks[1] = tok; 65 session->receive.index++; 66 67 if (tok.data.value.lo < 2) return MPACK_EOF; 68 69 type = MPACK_RPC_NOTIFICATION; 70 goto end; 71 } 72 73 assert(session->receive.index == 2); 74 75 if (tok.type != MPACK_TOKEN_UINT || tok.length > 4) 76 /* invalid request/response id */ 77 return MPACK_RPC_EMSGID; 78 79 msg->id = tok.data.value.lo; 80 msg->data.p = NULL; 81 type = (int)session->receive.toks[1].data.value.lo + MPACK_RPC_REQUEST; 82 83 if (type == MPACK_RPC_RESPONSE && !mpack_rpc_pop(session, msg)) 84 /* response with invalid id */ 85 return MPACK_RPC_ERESPID; 86 87 end: 88 mpack_rpc_reset_hdr(&session->receive); 89 return type; 90 } 91 92 MPACK_API int mpack_rpc_request_tok(mpack_rpc_session_t *session, 93 mpack_token_t *tok, mpack_data_t data) 94 { 95 if (session->send.index == 0) { 96 int status; 97 mpack_rpc_message_t msg; 98 do { 99 msg.id = session->request_id; 100 msg.data = data; 101 session->send = mpack_rpc_request_hdr(); 102 session->send.toks[2].type = MPACK_TOKEN_UINT; 103 session->send.toks[2].data.value.lo = msg.id; 104 session->send.toks[2].data.value.hi = 0; 105 *tok = session->send.toks[0]; 106 status = mpack_rpc_put(session, msg); 107 if (status == -1) return MPACK_NOMEM; 108 session->request_id = (session->request_id + 1) % 0xffffffff; 109 } while (!status); 110 session->send.index++; 111 return MPACK_EOF; 112 } 113 114 if (session->send.index == 1) { 115 *tok = session->send.toks[1]; 116 session->send.index++; 117 return MPACK_EOF; 118 } 119 120 assert(session->send.index == 2); 121 *tok = session->send.toks[2]; 122 mpack_rpc_reset_hdr(&session->send); 123 return MPACK_OK; 124 } 125 126 MPACK_API int mpack_rpc_reply_tok(mpack_rpc_session_t *session, 127 mpack_token_t *tok, mpack_uint32_t id) 128 { 129 if (session->send.index == 0) { 130 session->send = mpack_rpc_reply_hdr(); 131 session->send.toks[2].type = MPACK_TOKEN_UINT; 132 session->send.toks[2].data.value.lo = id; 133 session->send.toks[2].data.value.hi = 0; 134 *tok = session->send.toks[0]; 135 session->send.index++; 136 return MPACK_EOF; 137 } 138 139 if (session->send.index == 1) { 140 *tok = session->send.toks[1]; 141 session->send.index++; 142 return MPACK_EOF; 143 } 144 145 assert(session->send.index == 2); 146 *tok = session->send.toks[2]; 147 mpack_rpc_reset_hdr(&session->send); 148 return MPACK_OK; 149 } 150 151 MPACK_API int mpack_rpc_notify_tok(mpack_rpc_session_t *session, 152 mpack_token_t *tok) 153 { 154 if (session->send.index == 0) { 155 session->send = mpack_rpc_notify_hdr(); 156 *tok = session->send.toks[0]; 157 session->send.index++; 158 return MPACK_EOF; 159 } 160 161 assert(session->send.index == 1); 162 *tok = session->send.toks[1]; 163 mpack_rpc_reset_hdr(&session->send); 164 return MPACK_OK; 165 } 166 167 MPACK_API int mpack_rpc_receive(mpack_rpc_session_t *session, const char **buf, 168 size_t *buflen, mpack_rpc_message_t *msg) 169 { 170 int status; 171 172 do { 173 mpack_token_t tok; 174 status = mpack_read(&session->reader, buf, buflen, &tok); 175 if (status) break; 176 status = mpack_rpc_receive_tok(session, tok, msg); 177 if (status >= MPACK_RPC_REQUEST) break; 178 } while (*buflen); 179 180 return status; 181 } 182 183 MPACK_API int mpack_rpc_request(mpack_rpc_session_t *session, char **buf, 184 size_t *buflen, mpack_data_t data) 185 { 186 int status = MPACK_EOF; 187 188 while (status && *buflen) { 189 int write_status; 190 mpack_token_t tok; 191 if (!session->writer.plen) { 192 status = mpack_rpc_request_tok(session, &tok, data); 193 } 194 if (status == MPACK_NOMEM) break; 195 write_status = mpack_write(&session->writer, buf, buflen, &tok); 196 status = write_status ? write_status : status; 197 } 198 199 return status; 200 } 201 202 MPACK_API int mpack_rpc_reply(mpack_rpc_session_t *session, char **buf, 203 size_t *buflen, mpack_uint32_t id) 204 { 205 int status = MPACK_EOF; 206 207 while (status && *buflen) { 208 int write_status; 209 mpack_token_t tok; 210 if (!session->writer.plen) { 211 status = mpack_rpc_reply_tok(session, &tok, id); 212 } 213 write_status = mpack_write(&session->writer, buf, buflen, &tok); 214 status = write_status ? write_status : status; 215 } 216 217 return status; 218 } 219 220 MPACK_API int mpack_rpc_notify(mpack_rpc_session_t *session, char **buf, 221 size_t *buflen) 222 { 223 int status = MPACK_EOF; 224 225 while (status && *buflen) { 226 int write_status; 227 mpack_token_t tok; 228 if (!session->writer.plen) { 229 status = mpack_rpc_notify_tok(session, &tok); 230 } 231 write_status = mpack_write(&session->writer, buf, buflen, &tok); 232 status = write_status ? write_status : status; 233 } 234 235 return status; 236 } 237 238 MPACK_API void mpack_rpc_session_copy(mpack_rpc_session_t *dst, 239 mpack_rpc_session_t *src) 240 { 241 mpack_uint32_t i; 242 mpack_uint32_t dst_capacity = dst->capacity; 243 assert(src->capacity <= dst_capacity); 244 /* copy all fields except slots */ 245 memcpy(dst, src, sizeof(mpack_rpc_one_session_t) - 246 sizeof(struct mpack_rpc_slot_s)); 247 /* reset capacity */ 248 dst->capacity = dst_capacity; 249 /* reinsert requests */ 250 memset(dst->slots, 0, sizeof(struct mpack_rpc_slot_s) * dst->capacity); 251 for (i = 0; i < src->capacity; i++) { 252 if (src->slots[i].used) mpack_rpc_put(dst, src->slots[i].msg); 253 } 254 } 255 256 static mpack_rpc_header_t mpack_rpc_request_hdr(void) 257 { 258 mpack_rpc_header_t hdr; 259 hdr.index = 0; 260 hdr.toks[0].type = MPACK_TOKEN_ARRAY; 261 hdr.toks[0].length = 4; 262 hdr.toks[1].type = MPACK_TOKEN_UINT; 263 hdr.toks[1].data.value.lo = 0; 264 hdr.toks[1].data.value.hi = 0; 265 return hdr; 266 } 267 268 static mpack_rpc_header_t mpack_rpc_reply_hdr(void) 269 { 270 mpack_rpc_header_t hdr = mpack_rpc_request_hdr(); 271 hdr.toks[1].data.value.lo = 1; 272 hdr.toks[1].data.value.hi = 0; 273 return hdr; 274 } 275 276 static mpack_rpc_header_t mpack_rpc_notify_hdr(void) 277 { 278 mpack_rpc_header_t hdr = mpack_rpc_request_hdr(); 279 hdr.toks[0].length = 3; 280 hdr.toks[1].data.value.lo = 2; 281 hdr.toks[1].data.value.hi = 0; 282 return hdr; 283 } 284 285 static int mpack_rpc_put(mpack_rpc_session_t *session, mpack_rpc_message_t msg) 286 { 287 struct mpack_rpc_slot_s *slot = NULL; 288 mpack_uint32_t i; 289 mpack_uint32_t hash = msg.id % session->capacity; 290 291 for (i = 0; i < session->capacity; i++) { 292 if (!session->slots[hash].used || session->slots[hash].msg.id == msg.id) { 293 slot = session->slots + hash; 294 break; 295 } 296 hash = hash > 0 ? hash - 1 : session->capacity - 1; 297 } 298 299 if (!slot) return -1; /* no space */ 300 if (slot->msg.id == msg.id && slot->used) return 0; /* duplicate key */ 301 slot->msg = msg; 302 slot->used = 1; 303 return 1; 304 } 305 306 static int mpack_rpc_pop(mpack_rpc_session_t *session, mpack_rpc_message_t *msg) 307 { 308 struct mpack_rpc_slot_s *slot = NULL; 309 mpack_uint32_t i; 310 mpack_uint32_t hash = msg->id % session->capacity; 311 312 for (i = 0; i < session->capacity; i++) { 313 if (session->slots[hash].used && session->slots[hash].msg.id == msg->id) { 314 slot = session->slots + hash; 315 break; 316 } 317 hash = hash > 0 ? hash - 1 : session->capacity - 1; 318 } 319 320 if (!slot) return 0; 321 322 *msg = slot->msg; 323 slot->used = 0; 324 325 return 1; 326 } 327 328 static void mpack_rpc_reset_hdr(mpack_rpc_header_t *hdr) 329 { 330 hdr->index = 0; 331 }