test_workqueue.c (11333B)
1 /* Copyright (c) 2001-2004, Roger Dingledine. 2 * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. 3 * Copyright (c) 2007-2021, The Tor Project, Inc. */ 4 /* See LICENSE for licensing information */ 5 6 #include "core/or/or.h" 7 #include "lib/thread/threads.h" 8 #include "core/or/onion.h" 9 #include "lib/evloop/workqueue.h" 10 #include "lib/crypt_ops/crypto_curve25519.h" 11 #include "lib/crypt_ops/crypto_rand.h" 12 #include "lib/net/alertsock.h" 13 #include "lib/evloop/compat_libevent.h" 14 #include "lib/intmath/weakrng.h" 15 #include "lib/crypt_ops/crypto_init.h" 16 17 #include <stdio.h> 18 19 #define MAX_INFLIGHT (1<<16) 20 21 static int opt_verbose = 0; 22 static int opt_n_threads = 8; 23 static int opt_n_items = 10000; 24 static int opt_n_inflight = 1000; 25 static int opt_n_lowwater = 250; 26 static int opt_n_cancel = 0; 27 static int opt_ratio_rsa = 5; 28 29 #ifdef TRACK_RESPONSES 30 tor_mutex_t bitmap_mutex; 31 int handled_len; 32 bitarray_t *handled; 33 #endif 34 35 typedef struct state_t { 36 int magic; 37 int n_handled; 38 crypto_pk_t *rsa; 39 curve25519_secret_key_t ecdh; 40 int is_shutdown; 41 } state_t; 42 43 typedef struct rsa_work_t { 44 int serial; 45 uint8_t msg[128]; 46 uint8_t msglen; 47 } rsa_work_t; 48 49 typedef struct ecdh_work_t { 50 int serial; 51 union { 52 curve25519_public_key_t pk; 53 uint8_t msg[32]; 54 } u; 55 } ecdh_work_t; 56 57 static void 58 mark_handled(int serial) 59 { 60 #ifdef TRACK_RESPONSES 61 tor_mutex_acquire(&bitmap_mutex); 62 tor_assert(serial < handled_len); 63 tor_assert(! bitarray_is_set(handled, serial)); 64 bitarray_set(handled, serial); 65 tor_mutex_release(&bitmap_mutex); 66 #else /* !defined(TRACK_RESPONSES) */ 67 (void)serial; 68 #endif /* defined(TRACK_RESPONSES) */ 69 } 70 71 static workqueue_reply_t 72 workqueue_do_rsa(void *state, void *work) 73 { 74 rsa_work_t *rw = work; 75 state_t *st = state; 76 crypto_pk_t *rsa = st->rsa; 77 uint8_t sig[256]; 78 int len; 79 80 tor_assert(st->magic == 13371337); 81 82 len = crypto_pk_private_sign(rsa, (char*)sig, 256, 83 (char*)rw->msg, rw->msglen); 84 if (len < 0) { 85 rw->msglen = 0; 86 return WQ_RPL_ERROR; 87 } 88 89 memset(rw->msg, 0, sizeof(rw->msg)); 90 rw->msglen = len; 91 memcpy(rw->msg, sig, len); 92 ++st->n_handled; 93 94 mark_handled(rw->serial); 95 96 return WQ_RPL_REPLY; 97 } 98 99 static workqueue_reply_t 100 workqueue_do_shutdown(void *state, void *work) 101 { 102 (void)state; 103 (void)work; 104 crypto_pk_free(((state_t*)state)->rsa); 105 tor_free(state); 106 return WQ_RPL_SHUTDOWN; 107 } 108 109 static workqueue_reply_t 110 workqueue_do_ecdh(void *state, void *work) 111 { 112 ecdh_work_t *ew = work; 113 uint8_t output[CURVE25519_OUTPUT_LEN]; 114 state_t *st = state; 115 116 tor_assert(st->magic == 13371337); 117 118 curve25519_handshake(output, &st->ecdh, &ew->u.pk); 119 memcpy(ew->u.msg, output, CURVE25519_OUTPUT_LEN); 120 ++st->n_handled; 121 mark_handled(ew->serial); 122 return WQ_RPL_REPLY; 123 } 124 125 static workqueue_reply_t 126 workqueue_shutdown_error(void *state, void *work) 127 { 128 (void)state; 129 (void)work; 130 return WQ_RPL_REPLY; 131 } 132 133 static void * 134 new_state(void *arg) 135 { 136 state_t *st; 137 (void)arg; 138 139 st = tor_malloc(sizeof(*st)); 140 /* Every thread gets its own keys. not a problem for benchmarking */ 141 st->rsa = crypto_pk_new(); 142 if (crypto_pk_generate_key_with_bits(st->rsa, 1024) < 0) { 143 crypto_pk_free(st->rsa); 144 tor_free(st); 145 return NULL; 146 } 147 curve25519_secret_key_generate(&st->ecdh, 0); 148 st->magic = 13371337; 149 return st; 150 } 151 152 static void 153 free_state(void *arg) 154 { 155 state_t *st = arg; 156 crypto_pk_free(st->rsa); 157 tor_free(st); 158 } 159 160 static tor_weak_rng_t weak_rng; 161 static int n_sent = 0; 162 static int rsa_sent = 0; 163 static int ecdh_sent = 0; 164 static int n_received_previously = 0; 165 static int n_received = 0; 166 static int no_shutdown = 0; 167 168 #ifdef TRACK_RESPONSES 169 bitarray_t *received; 170 #endif 171 172 static void 173 handle_reply(void *arg) 174 { 175 #ifdef TRACK_RESPONSES 176 rsa_work_t *rw = arg; /* Naughty cast, but only looking at serial. */ 177 tor_assert(! bitarray_is_set(received, rw->serial)); 178 bitarray_set(received,rw->serial); 179 #endif 180 181 tor_free(arg); 182 ++n_received; 183 } 184 185 /* This should never get called. */ 186 static void 187 handle_reply_shutdown(void *arg) 188 { 189 (void)arg; 190 no_shutdown = 1; 191 } 192 193 static workqueue_entry_t * 194 add_work(threadpool_t *tp) 195 { 196 int add_rsa = 197 opt_ratio_rsa == 0 || 198 tor_weak_random_range(&weak_rng, opt_ratio_rsa) == 0; 199 200 if (add_rsa) { 201 rsa_work_t *w = tor_malloc_zero(sizeof(*w)); 202 w->serial = n_sent++; 203 crypto_rand((char*)w->msg, 20); 204 w->msglen = 20; 205 ++rsa_sent; 206 return threadpool_queue_work_priority(tp, 207 WQ_PRI_MED, 208 workqueue_do_rsa, handle_reply, w); 209 } else { 210 ecdh_work_t *w = tor_malloc_zero(sizeof(*w)); 211 w->serial = n_sent++; 212 /* Not strictly right, but this is just for benchmarks. */ 213 crypto_rand((char*)w->u.pk.public_key, 32); 214 ++ecdh_sent; 215 return threadpool_queue_work(tp, workqueue_do_ecdh, handle_reply, w); 216 } 217 } 218 219 static int n_failed_cancel = 0; 220 static int n_successful_cancel = 0; 221 222 static int 223 add_n_work_items(threadpool_t *tp, int n) 224 { 225 int n_queued = 0; 226 int n_try_cancel = 0, i; 227 workqueue_entry_t **to_cancel; 228 workqueue_entry_t *ent; 229 230 // We'll choose randomly which entries to cancel. 231 to_cancel = tor_calloc(opt_n_cancel, sizeof(workqueue_entry_t*)); 232 233 while (n_queued++ < n) { 234 ent = add_work(tp); 235 if (! ent) { 236 puts("Z"); 237 tor_libevent_exit_loop_after_delay(tor_libevent_get_base(), NULL); 238 return -1; 239 } 240 241 if (n_try_cancel < opt_n_cancel) { 242 to_cancel[n_try_cancel++] = ent; 243 } else { 244 int p = tor_weak_random_range(&weak_rng, n_queued); 245 if (p < n_try_cancel) { 246 to_cancel[p] = ent; 247 } 248 } 249 } 250 251 for (i = 0; i < n_try_cancel; ++i) { 252 void *work = workqueue_entry_cancel(to_cancel[i]); 253 if (! work) { 254 n_failed_cancel++; 255 } else { 256 n_successful_cancel++; 257 tor_free(work); 258 } 259 } 260 261 tor_free(to_cancel); 262 return 0; 263 } 264 265 static int shutting_down = 0; 266 267 static void 268 replysock_readable_cb(threadpool_t *tp) 269 { 270 if (n_received_previously == n_received) 271 return; 272 273 n_received_previously = n_received; 274 275 if (opt_verbose) { 276 printf("%d / %d", n_received, n_sent); 277 if (opt_n_cancel) 278 printf(" (%d cancelled, %d uncancellable)", 279 n_successful_cancel, n_failed_cancel); 280 puts(""); 281 } 282 #ifdef TRACK_RESPONSES 283 tor_mutex_acquire(&bitmap_mutex); 284 for (i = 0; i < opt_n_items; ++i) { 285 if (bitarray_is_set(received, i)) 286 putc('o', stdout); 287 else if (bitarray_is_set(handled, i)) 288 putc('!', stdout); 289 else 290 putc('.', stdout); 291 } 292 puts(""); 293 tor_mutex_release(&bitmap_mutex); 294 #endif /* defined(TRACK_RESPONSES) */ 295 296 if (n_sent - (n_received+n_successful_cancel) < opt_n_lowwater) { 297 int n_to_send = n_received + opt_n_inflight - n_sent; 298 if (n_to_send > opt_n_items - n_sent) 299 n_to_send = opt_n_items - n_sent; 300 add_n_work_items(tp, n_to_send); 301 } 302 303 if (shutting_down == 0 && 304 n_received+n_successful_cancel == n_sent && 305 n_sent >= opt_n_items) { 306 shutting_down = 1; 307 threadpool_queue_update(tp, NULL, 308 workqueue_do_shutdown, NULL, NULL); 309 // Anything we add after starting the shutdown must not be executed. 310 threadpool_queue_work(tp, workqueue_shutdown_error, 311 handle_reply_shutdown, NULL); 312 { 313 struct timeval limit = { 2, 0 }; 314 tor_libevent_exit_loop_after_delay(tor_libevent_get_base(), &limit); 315 } 316 } 317 } 318 319 static void 320 help(void) 321 { 322 puts( 323 "Options:\n" 324 " -h Display this information\n" 325 " -v Be verbose\n" 326 " -N <items> Run this many items of work\n" 327 " -T <threads> Use this many threads\n" 328 " -I <inflight> Have no more than this many requests queued at once\n" 329 " -L <lowwater> Add items whenever fewer than this many are pending\n" 330 " -C <cancel> Try to cancel N items of every batch that we add\n" 331 " -R <ratio> Make one out of this many items be a slow (RSA) one\n" 332 " --no-{eventfd2,eventfd,pipe2,pipe,socketpair}\n" 333 " Disable one of the alert_socket backends."); 334 } 335 336 int 337 main(int argc, char **argv) 338 { 339 replyqueue_t *rq; 340 threadpool_t *tp; 341 int i; 342 tor_libevent_cfg_t evcfg; 343 uint32_t as_flags = 0; 344 345 for (i = 1; i < argc; ++i) { 346 if (!strcmp(argv[i], "-v")) { 347 opt_verbose = 1; 348 } else if (!strcmp(argv[i], "-T") && i+1<argc) { 349 opt_n_threads = atoi(argv[++i]); 350 } else if (!strcmp(argv[i], "-N") && i+1<argc) { 351 opt_n_items = atoi(argv[++i]); 352 } else if (!strcmp(argv[i], "-I") && i+1<argc) { 353 opt_n_inflight = atoi(argv[++i]); 354 } else if (!strcmp(argv[i], "-L") && i+1<argc) { 355 opt_n_lowwater = atoi(argv[++i]); 356 } else if (!strcmp(argv[i], "-R") && i+1<argc) { 357 opt_ratio_rsa = atoi(argv[++i]); 358 } else if (!strcmp(argv[i], "-C") && i+1<argc) { 359 opt_n_cancel = atoi(argv[++i]); 360 } else if (!strcmp(argv[i], "--no-eventfd2")) { 361 as_flags |= ASOCKS_NOEVENTFD2; 362 } else if (!strcmp(argv[i], "--no-eventfd")) { 363 as_flags |= ASOCKS_NOEVENTFD; 364 } else if (!strcmp(argv[i], "--no-pipe2")) { 365 as_flags |= ASOCKS_NOPIPE2; 366 } else if (!strcmp(argv[i], "--no-pipe")) { 367 as_flags |= ASOCKS_NOPIPE; 368 } else if (!strcmp(argv[i], "--no-socketpair")) { 369 as_flags |= ASOCKS_NOSOCKETPAIR; 370 } else if (!strcmp(argv[i], "-h")) { 371 help(); 372 return 0; 373 } else { 374 help(); 375 return 1; 376 } 377 } 378 379 if (opt_n_threads < 1 || 380 opt_n_items < 1 || opt_n_inflight < 1 || opt_n_lowwater < 0 || 381 opt_n_cancel > opt_n_inflight || opt_n_inflight > MAX_INFLIGHT || 382 opt_ratio_rsa < 0) { 383 help(); 384 return 1; 385 } 386 387 if (opt_n_inflight > opt_n_items) { 388 opt_n_inflight = opt_n_items; 389 } 390 391 init_logging(1); 392 network_init(); 393 if (crypto_global_init(1, NULL, NULL) < 0) { 394 printf("Couldn't initialize crypto subsystem; exiting.\n"); 395 return 1; 396 } 397 if (crypto_seed_rng() < 0) { 398 printf("Couldn't seed RNG; exiting.\n"); 399 return 1; 400 } 401 402 rq = replyqueue_new(as_flags); 403 if (as_flags && rq == NULL) 404 return 77; // 77 means "skipped". 405 406 tor_assert(rq); 407 tp = threadpool_new(opt_n_threads, 408 rq, new_state, free_state, NULL); 409 tor_assert(tp); 410 411 crypto_seed_weak_rng(&weak_rng); 412 413 memset(&evcfg, 0, sizeof(evcfg)); 414 tor_libevent_initialize(&evcfg); 415 416 { 417 int r = threadpool_register_reply_event(tp, 418 replysock_readable_cb); 419 tor_assert(r == 0); 420 } 421 422 #ifdef TRACK_RESPONSES 423 handled = bitarray_init_zero(opt_n_items); 424 received = bitarray_init_zero(opt_n_items); 425 tor_mutex_init(&bitmap_mutex); 426 handled_len = opt_n_items; 427 #endif /* defined(TRACK_RESPONSES) */ 428 429 for (i = 0; i < opt_n_inflight; ++i) { 430 if (! add_work(tp)) { 431 puts("Couldn't add work."); 432 return 1; 433 } 434 } 435 436 { 437 struct timeval limit = { 180, 0 }; 438 tor_libevent_exit_loop_after_delay(tor_libevent_get_base(), &limit); 439 } 440 441 tor_libevent_run_event_loop(tor_libevent_get_base(), 0); 442 443 if (n_sent != opt_n_items || n_received+n_successful_cancel != n_sent) { 444 printf("%d vs %d\n", n_sent, opt_n_items); 445 printf("%d+%d vs %d\n", n_received, n_successful_cancel, n_sent); 446 puts("FAIL"); 447 return 1; 448 } else if (no_shutdown) { 449 puts("Accepted work after shutdown\n"); 450 puts("FAIL"); 451 } else { 452 puts("OK"); 453 return 0; 454 } 455 }