tor

The Tor anonymity network
git clone https://git.dasho.dev/tor.git
Log | Files | Refs | README | LICENSE

test_workqueue.c (11333B)


      1 /* Copyright (c) 2001-2004, Roger Dingledine.
      2 * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
      3 * Copyright (c) 2007-2021, The Tor Project, Inc. */
      4 /* See LICENSE for licensing information */
      5 
      6 #include "core/or/or.h"
      7 #include "lib/thread/threads.h"
      8 #include "core/or/onion.h"
      9 #include "lib/evloop/workqueue.h"
     10 #include "lib/crypt_ops/crypto_curve25519.h"
     11 #include "lib/crypt_ops/crypto_rand.h"
     12 #include "lib/net/alertsock.h"
     13 #include "lib/evloop/compat_libevent.h"
     14 #include "lib/intmath/weakrng.h"
     15 #include "lib/crypt_ops/crypto_init.h"
     16 
     17 #include <stdio.h>
     18 
     19 #define MAX_INFLIGHT (1<<16)
     20 
     21 static int opt_verbose = 0;
     22 static int opt_n_threads = 8;
     23 static int opt_n_items = 10000;
     24 static int opt_n_inflight = 1000;
     25 static int opt_n_lowwater = 250;
     26 static int opt_n_cancel = 0;
     27 static int opt_ratio_rsa = 5;
     28 
     29 #ifdef TRACK_RESPONSES
     30 tor_mutex_t bitmap_mutex;
     31 int handled_len;
     32 bitarray_t *handled;
     33 #endif
     34 
     35 typedef struct state_t {
     36  int magic;
     37  int n_handled;
     38  crypto_pk_t *rsa;
     39  curve25519_secret_key_t ecdh;
     40  int is_shutdown;
     41 } state_t;
     42 
     43 typedef struct rsa_work_t {
     44  int serial;
     45  uint8_t msg[128];
     46  uint8_t msglen;
     47 } rsa_work_t;
     48 
     49 typedef struct ecdh_work_t {
     50  int serial;
     51  union {
     52    curve25519_public_key_t pk;
     53    uint8_t msg[32];
     54  } u;
     55 } ecdh_work_t;
     56 
     57 static void
     58 mark_handled(int serial)
     59 {
     60 #ifdef TRACK_RESPONSES
     61  tor_mutex_acquire(&bitmap_mutex);
     62  tor_assert(serial < handled_len);
     63  tor_assert(! bitarray_is_set(handled, serial));
     64  bitarray_set(handled, serial);
     65  tor_mutex_release(&bitmap_mutex);
     66 #else /* !defined(TRACK_RESPONSES) */
     67  (void)serial;
     68 #endif /* defined(TRACK_RESPONSES) */
     69 }
     70 
     71 static workqueue_reply_t
     72 workqueue_do_rsa(void *state, void *work)
     73 {
     74  rsa_work_t *rw = work;
     75  state_t *st = state;
     76  crypto_pk_t *rsa = st->rsa;
     77  uint8_t sig[256];
     78  int len;
     79 
     80  tor_assert(st->magic == 13371337);
     81 
     82  len = crypto_pk_private_sign(rsa, (char*)sig, 256,
     83                               (char*)rw->msg, rw->msglen);
     84  if (len < 0) {
     85    rw->msglen = 0;
     86    return WQ_RPL_ERROR;
     87  }
     88 
     89  memset(rw->msg, 0, sizeof(rw->msg));
     90  rw->msglen = len;
     91  memcpy(rw->msg, sig, len);
     92  ++st->n_handled;
     93 
     94  mark_handled(rw->serial);
     95 
     96  return WQ_RPL_REPLY;
     97 }
     98 
     99 static workqueue_reply_t
    100 workqueue_do_shutdown(void *state, void *work)
    101 {
    102  (void)state;
    103  (void)work;
    104  crypto_pk_free(((state_t*)state)->rsa);
    105  tor_free(state);
    106  return WQ_RPL_SHUTDOWN;
    107 }
    108 
    109 static workqueue_reply_t
    110 workqueue_do_ecdh(void *state, void *work)
    111 {
    112  ecdh_work_t *ew = work;
    113  uint8_t output[CURVE25519_OUTPUT_LEN];
    114  state_t *st = state;
    115 
    116  tor_assert(st->magic == 13371337);
    117 
    118  curve25519_handshake(output, &st->ecdh, &ew->u.pk);
    119  memcpy(ew->u.msg, output, CURVE25519_OUTPUT_LEN);
    120  ++st->n_handled;
    121  mark_handled(ew->serial);
    122  return WQ_RPL_REPLY;
    123 }
    124 
    125 static workqueue_reply_t
    126 workqueue_shutdown_error(void *state, void *work)
    127 {
    128  (void)state;
    129  (void)work;
    130  return WQ_RPL_REPLY;
    131 }
    132 
    133 static void *
    134 new_state(void *arg)
    135 {
    136  state_t *st;
    137  (void)arg;
    138 
    139  st = tor_malloc(sizeof(*st));
    140  /* Every thread gets its own keys. not a problem for benchmarking */
    141  st->rsa = crypto_pk_new();
    142  if (crypto_pk_generate_key_with_bits(st->rsa, 1024) < 0) {
    143    crypto_pk_free(st->rsa);
    144    tor_free(st);
    145    return NULL;
    146  }
    147  curve25519_secret_key_generate(&st->ecdh, 0);
    148  st->magic = 13371337;
    149  return st;
    150 }
    151 
    152 static void
    153 free_state(void *arg)
    154 {
    155  state_t *st = arg;
    156  crypto_pk_free(st->rsa);
    157  tor_free(st);
    158 }
    159 
    160 static tor_weak_rng_t weak_rng;
    161 static int n_sent = 0;
    162 static int rsa_sent = 0;
    163 static int ecdh_sent = 0;
    164 static int n_received_previously = 0;
    165 static int n_received = 0;
    166 static int no_shutdown = 0;
    167 
    168 #ifdef TRACK_RESPONSES
    169 bitarray_t *received;
    170 #endif
    171 
    172 static void
    173 handle_reply(void *arg)
    174 {
    175 #ifdef TRACK_RESPONSES
    176  rsa_work_t *rw = arg; /* Naughty cast, but only looking at serial. */
    177  tor_assert(! bitarray_is_set(received, rw->serial));
    178  bitarray_set(received,rw->serial);
    179 #endif
    180 
    181  tor_free(arg);
    182  ++n_received;
    183 }
    184 
    185 /* This should never get called. */
    186 static void
    187 handle_reply_shutdown(void *arg)
    188 {
    189  (void)arg;
    190  no_shutdown = 1;
    191 }
    192 
    193 static workqueue_entry_t *
    194 add_work(threadpool_t *tp)
    195 {
    196  int add_rsa =
    197    opt_ratio_rsa == 0 ||
    198    tor_weak_random_range(&weak_rng, opt_ratio_rsa) == 0;
    199 
    200  if (add_rsa) {
    201    rsa_work_t *w = tor_malloc_zero(sizeof(*w));
    202    w->serial = n_sent++;
    203    crypto_rand((char*)w->msg, 20);
    204    w->msglen = 20;
    205    ++rsa_sent;
    206    return threadpool_queue_work_priority(tp,
    207                                          WQ_PRI_MED,
    208                                          workqueue_do_rsa, handle_reply, w);
    209  } else {
    210    ecdh_work_t *w = tor_malloc_zero(sizeof(*w));
    211    w->serial = n_sent++;
    212    /* Not strictly right, but this is just for benchmarks. */
    213    crypto_rand((char*)w->u.pk.public_key, 32);
    214    ++ecdh_sent;
    215    return threadpool_queue_work(tp, workqueue_do_ecdh, handle_reply, w);
    216  }
    217 }
    218 
    219 static int n_failed_cancel = 0;
    220 static int n_successful_cancel = 0;
    221 
    222 static int
    223 add_n_work_items(threadpool_t *tp, int n)
    224 {
    225  int n_queued = 0;
    226  int n_try_cancel = 0, i;
    227  workqueue_entry_t **to_cancel;
    228  workqueue_entry_t *ent;
    229 
    230  // We'll choose randomly which entries to cancel.
    231  to_cancel = tor_calloc(opt_n_cancel, sizeof(workqueue_entry_t*));
    232 
    233  while (n_queued++ < n) {
    234    ent = add_work(tp);
    235    if (! ent) {
    236      puts("Z");
    237      tor_libevent_exit_loop_after_delay(tor_libevent_get_base(), NULL);
    238      return -1;
    239    }
    240 
    241    if (n_try_cancel < opt_n_cancel) {
    242      to_cancel[n_try_cancel++] = ent;
    243    } else {
    244      int p = tor_weak_random_range(&weak_rng, n_queued);
    245      if (p < n_try_cancel) {
    246        to_cancel[p] = ent;
    247      }
    248    }
    249  }
    250 
    251  for (i = 0; i < n_try_cancel; ++i) {
    252    void *work = workqueue_entry_cancel(to_cancel[i]);
    253    if (! work) {
    254      n_failed_cancel++;
    255    } else {
    256      n_successful_cancel++;
    257      tor_free(work);
    258    }
    259  }
    260 
    261  tor_free(to_cancel);
    262  return 0;
    263 }
    264 
    265 static int shutting_down = 0;
    266 
    267 static void
    268 replysock_readable_cb(threadpool_t *tp)
    269 {
    270  if (n_received_previously == n_received)
    271    return;
    272 
    273  n_received_previously = n_received;
    274 
    275  if (opt_verbose) {
    276    printf("%d / %d", n_received, n_sent);
    277    if (opt_n_cancel)
    278      printf(" (%d cancelled, %d uncancellable)",
    279             n_successful_cancel, n_failed_cancel);
    280    puts("");
    281  }
    282 #ifdef TRACK_RESPONSES
    283  tor_mutex_acquire(&bitmap_mutex);
    284  for (i = 0; i < opt_n_items; ++i) {
    285    if (bitarray_is_set(received, i))
    286      putc('o', stdout);
    287    else if (bitarray_is_set(handled, i))
    288      putc('!', stdout);
    289    else
    290      putc('.', stdout);
    291  }
    292  puts("");
    293  tor_mutex_release(&bitmap_mutex);
    294 #endif /* defined(TRACK_RESPONSES) */
    295 
    296  if (n_sent - (n_received+n_successful_cancel) < opt_n_lowwater) {
    297    int n_to_send = n_received + opt_n_inflight - n_sent;
    298    if (n_to_send > opt_n_items - n_sent)
    299      n_to_send = opt_n_items - n_sent;
    300    add_n_work_items(tp, n_to_send);
    301  }
    302 
    303  if (shutting_down == 0 &&
    304      n_received+n_successful_cancel == n_sent &&
    305      n_sent >= opt_n_items) {
    306    shutting_down = 1;
    307    threadpool_queue_update(tp, NULL,
    308                             workqueue_do_shutdown, NULL, NULL);
    309    // Anything we add after starting the shutdown must not be executed.
    310    threadpool_queue_work(tp, workqueue_shutdown_error,
    311                          handle_reply_shutdown, NULL);
    312    {
    313      struct timeval limit = { 2, 0 };
    314      tor_libevent_exit_loop_after_delay(tor_libevent_get_base(), &limit);
    315    }
    316  }
    317 }
    318 
    319 static void
    320 help(void)
    321 {
    322  puts(
    323     "Options:\n"
    324     "  -h            Display this information\n"
    325     "  -v            Be verbose\n"
    326     "  -N <items>    Run this many items of work\n"
    327     "  -T <threads>  Use this many threads\n"
    328     "  -I <inflight> Have no more than this many requests queued at once\n"
    329     "  -L <lowwater> Add items whenever fewer than this many are pending\n"
    330     "  -C <cancel>   Try to cancel N items of every batch that we add\n"
    331     "  -R <ratio>    Make one out of this many items be a slow (RSA) one\n"
    332     "  --no-{eventfd2,eventfd,pipe2,pipe,socketpair}\n"
    333     "                Disable one of the alert_socket backends.");
    334 }
    335 
    336 int
    337 main(int argc, char **argv)
    338 {
    339  replyqueue_t *rq;
    340  threadpool_t *tp;
    341  int i;
    342  tor_libevent_cfg_t evcfg;
    343  uint32_t as_flags = 0;
    344 
    345  for (i = 1; i < argc; ++i) {
    346    if (!strcmp(argv[i], "-v")) {
    347      opt_verbose = 1;
    348    } else if (!strcmp(argv[i], "-T") && i+1<argc) {
    349      opt_n_threads = atoi(argv[++i]);
    350    } else if (!strcmp(argv[i], "-N") && i+1<argc) {
    351      opt_n_items = atoi(argv[++i]);
    352    } else if (!strcmp(argv[i], "-I") && i+1<argc) {
    353      opt_n_inflight = atoi(argv[++i]);
    354    } else if (!strcmp(argv[i], "-L") && i+1<argc) {
    355      opt_n_lowwater = atoi(argv[++i]);
    356    } else if (!strcmp(argv[i], "-R") && i+1<argc) {
    357      opt_ratio_rsa = atoi(argv[++i]);
    358    } else if (!strcmp(argv[i], "-C") && i+1<argc) {
    359      opt_n_cancel = atoi(argv[++i]);
    360    } else if (!strcmp(argv[i], "--no-eventfd2")) {
    361      as_flags |= ASOCKS_NOEVENTFD2;
    362    } else if (!strcmp(argv[i], "--no-eventfd")) {
    363      as_flags |= ASOCKS_NOEVENTFD;
    364    } else if (!strcmp(argv[i], "--no-pipe2")) {
    365      as_flags |= ASOCKS_NOPIPE2;
    366    } else if (!strcmp(argv[i], "--no-pipe")) {
    367      as_flags |= ASOCKS_NOPIPE;
    368    } else if (!strcmp(argv[i], "--no-socketpair")) {
    369      as_flags |= ASOCKS_NOSOCKETPAIR;
    370    } else if (!strcmp(argv[i], "-h")) {
    371      help();
    372      return 0;
    373    } else {
    374      help();
    375      return 1;
    376    }
    377  }
    378 
    379  if (opt_n_threads < 1 ||
    380      opt_n_items < 1 || opt_n_inflight < 1 || opt_n_lowwater < 0 ||
    381      opt_n_cancel > opt_n_inflight || opt_n_inflight > MAX_INFLIGHT ||
    382      opt_ratio_rsa < 0) {
    383    help();
    384    return 1;
    385  }
    386 
    387  if (opt_n_inflight > opt_n_items) {
    388      opt_n_inflight = opt_n_items;
    389  }
    390 
    391  init_logging(1);
    392  network_init();
    393  if (crypto_global_init(1, NULL, NULL) < 0) {
    394    printf("Couldn't initialize crypto subsystem; exiting.\n");
    395    return 1;
    396  }
    397  if (crypto_seed_rng() < 0) {
    398    printf("Couldn't seed RNG; exiting.\n");
    399    return 1;
    400  }
    401 
    402  rq = replyqueue_new(as_flags);
    403  if (as_flags && rq == NULL)
    404    return 77; // 77 means "skipped".
    405 
    406  tor_assert(rq);
    407  tp = threadpool_new(opt_n_threads,
    408                      rq, new_state, free_state, NULL);
    409  tor_assert(tp);
    410 
    411  crypto_seed_weak_rng(&weak_rng);
    412 
    413  memset(&evcfg, 0, sizeof(evcfg));
    414  tor_libevent_initialize(&evcfg);
    415 
    416  {
    417    int r = threadpool_register_reply_event(tp,
    418                                            replysock_readable_cb);
    419    tor_assert(r == 0);
    420  }
    421 
    422 #ifdef TRACK_RESPONSES
    423  handled = bitarray_init_zero(opt_n_items);
    424  received = bitarray_init_zero(opt_n_items);
    425  tor_mutex_init(&bitmap_mutex);
    426  handled_len = opt_n_items;
    427 #endif /* defined(TRACK_RESPONSES) */
    428 
    429  for (i = 0; i < opt_n_inflight; ++i) {
    430    if (! add_work(tp)) {
    431      puts("Couldn't add work.");
    432      return 1;
    433    }
    434  }
    435 
    436  {
    437    struct timeval limit = { 180, 0 };
    438    tor_libevent_exit_loop_after_delay(tor_libevent_get_base(), &limit);
    439  }
    440 
    441  tor_libevent_run_event_loop(tor_libevent_get_base(), 0);
    442 
    443  if (n_sent != opt_n_items || n_received+n_successful_cancel != n_sent) {
    444    printf("%d vs %d\n", n_sent, opt_n_items);
    445    printf("%d+%d vs %d\n", n_received, n_successful_cancel, n_sent);
    446    puts("FAIL");
    447    return 1;
    448  } else if (no_shutdown) {
    449    puts("Accepted work after shutdown\n");
    450    puts("FAIL");
    451  } else {
    452    puts("OK");
    453    return 0;
    454  }
    455 }