tor

The Tor anonymity network
git clone https://git.dasho.dev/tor.git
Log | Files | Refs | README | LICENSE

cpuworker.c (21765B)


      1 /* Copyright (c) 2003-2004, Roger Dingledine.
      2 * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
      3 * Copyright (c) 2007-2024, The Tor Project, Inc. */
      4 /* See LICENSE for licensing information */
      5 
      6 /**
      7 * \file cpuworker.c
      8 * \brief Uses the workqueue/threadpool code to farm CPU-intensive activities
      9 * out to subprocesses.
     10 *
     11 * The multithreading backend for this module is in workqueue.c; this module
     12 * specializes workqueue.c.
     13 *
     14 * Right now, we use this infrastructure
     15 *  <ul><li>for processing onionskins in onion.c
     16 *      <li>for compressing consensuses in consdiffmgr.c,
     17 *      <li>for calculating diffs and compressing them in consdiffmgr.c.
     18 *      <li>and for solving onion service PoW challenges in pow.c.
     19 *  </ul>
     20 **/
     21 #include "core/or/or.h"
     22 #include "core/or/channel.h"
     23 #include "core/or/circuitlist.h"
     24 #include "core/or/connection_or.h"
     25 #include "core/or/congestion_control_common.h"
     26 #include "core/or/congestion_control_flow.h"
     27 #include "app/config/config.h"
     28 #include "core/mainloop/cpuworker.h"
     29 #include "lib/crypt_ops/crypto_rand.h"
     30 #include "lib/crypt_ops/crypto_util.h"
     31 #include "core/or/onion.h"
     32 #include "feature/relay/circuitbuild_relay.h"
     33 #include "feature/relay/onion_queue.h"
     34 #include "feature/stats/rephist.h"
     35 #include "feature/relay/router.h"
     36 #include "feature/nodelist/networkstatus.h"
     37 #include "lib/evloop/workqueue.h"
     38 #include "core/crypto/onion_crypto.h"
     39 
     40 #include "core/or/or_circuit_st.h"
     41 
     42 static void queue_pending_tasks(void);
     43 
     44 typedef struct worker_state_t {
     45  int generation;
     46  server_onion_keys_t *onion_keys;
     47 } worker_state_t;
     48 
     49 static void *
     50 worker_state_new(void *arg)
     51 {
     52  worker_state_t *ws;
     53  (void)arg;
     54  ws = tor_malloc_zero(sizeof(worker_state_t));
     55  ws->onion_keys = server_onion_keys_new();
     56  return ws;
     57 }
     58 
     59 #define worker_state_free(ws) \
     60  FREE_AND_NULL(worker_state_t, worker_state_free_, (ws))
     61 
     62 static void
     63 worker_state_free_(worker_state_t *ws)
     64 {
     65  if (!ws)
     66    return;
     67  server_onion_keys_free(ws->onion_keys);
     68  tor_free(ws);
     69 }
     70 
     71 static void
     72 worker_state_free_void(void *arg)
     73 {
     74  worker_state_free_(arg);
     75 }
     76 
     77 static threadpool_t *threadpool = NULL;
     78 
     79 static uint32_t total_pending_tasks = 0;
     80 static uint32_t max_pending_tasks = 128;
     81 
     82 /** Return the consensus parameter max pending tasks per CPU. */
     83 static uint32_t
     84 get_max_pending_tasks_per_cpu(const networkstatus_t *ns)
     85 {
     86 /* Total voodoo. Can we make this more sensible? Maybe, that is why we made it
     87 * a consensus parameter so our future self can figure out this magic. */
     88 #define MAX_PENDING_TASKS_PER_CPU_DEFAULT 64
     89 #define MAX_PENDING_TASKS_PER_CPU_MIN 1
     90 #define MAX_PENDING_TASKS_PER_CPU_MAX INT32_MAX
     91 
     92  return networkstatus_get_param(ns, "max_pending_tasks_per_cpu",
     93                                 MAX_PENDING_TASKS_PER_CPU_DEFAULT,
     94                                 MAX_PENDING_TASKS_PER_CPU_MIN,
     95                                 MAX_PENDING_TASKS_PER_CPU_MAX);
     96 }
     97 
     98 /** Set the max pending tasks per CPU worker. This uses the consensus to check
     99 * for the allowed number per CPU. The ns parameter can be NULL as in that no
    100 * consensus is available at the time of setting this value. */
    101 static void
    102 set_max_pending_tasks(const networkstatus_t *ns)
    103 {
    104  max_pending_tasks =
    105    get_num_cpus(get_options()) * get_max_pending_tasks_per_cpu(ns);
    106 }
    107 
    108 /** Called when the consensus has changed. */
    109 void
    110 cpuworker_consensus_has_changed(const networkstatus_t *ns)
    111 {
    112  tor_assert(ns);
    113  set_max_pending_tasks(ns);
    114 }
    115 
    116 /** Initialize the cpuworker subsystem. */
    117 int
    118 cpuworker_init(void)
    119 {
    120  /*
    121    In our threadpool implementation, half the threads are permissive and
    122    half are strict (when it comes to running lower-priority tasks). So we
    123    always make sure we have at least two threads, so that there will be at
    124    least one thread of each kind.
    125  */
    126  const int n_threads = MAX(get_num_cpus(get_options()), 2);
    127  threadpool = threadpool_new(n_threads,
    128                              replyqueue_new(0),
    129                              worker_state_new,
    130                              worker_state_free_void,
    131                              NULL);
    132 
    133  if (!threadpool) {
    134    log_err(LD_GENERAL, "Can't create worker thread pool");
    135    return -1;
    136  }
    137 
    138  int r = threadpool_register_reply_event(threadpool, NULL);
    139 
    140  tor_assert(r == 0);
    141 
    142  set_max_pending_tasks(NULL);
    143 
    144  return 0;
    145 }
    146 
    147 /** Free all resources allocated by cpuworker. */
    148 void
    149 cpuworker_free_all(void)
    150 {
    151  threadpool_free(threadpool);
    152 }
    153 
    154 /** Return the number of threads configured for our CPU worker. */
    155 unsigned int
    156 cpuworker_get_n_threads(void)
    157 {
    158  if (!threadpool) {
    159    return 0;
    160  }
    161  return threadpool_get_n_threads(threadpool);
    162 }
    163 
    164 /** Magic numbers to make sure our cpuworker_requests don't grow any
    165 * mis-framing bugs. */
    166 #define CPUWORKER_REQUEST_MAGIC 0xda4afeed
    167 #define CPUWORKER_REPLY_MAGIC 0x5eedf00d
    168 
    169 /** A request sent to a cpuworker. */
    170 typedef struct cpuworker_request_t {
    171  /** Magic number; must be CPUWORKER_REQUEST_MAGIC. */
    172  uint32_t magic;
    173 
    174  /** Flag: Are we timing this request? */
    175  unsigned timed : 1;
    176  /** If we're timing this request, when was it sent to the cpuworker? */
    177  struct timeval started_at;
    178 
    179  /** A create cell for the cpuworker to process. */
    180  create_cell_t create_cell;
    181 
    182  /**
    183   * A copy of this relay's consensus params that are relevant to
    184   * the circuit, for use in negotiation. */
    185  circuit_params_t circ_ns_params;
    186 
    187  /* Turn the above into a tagged union if needed. */
    188 } cpuworker_request_t;
    189 
    190 /** A reply sent by a cpuworker. */
    191 typedef struct cpuworker_reply_t {
    192  /** Magic number; must be CPUWORKER_REPLY_MAGIC. */
    193  uint32_t magic;
    194 
    195  /** True iff we got a successful request. */
    196  uint8_t success;
    197 
    198  /** Are we timing this request? */
    199  unsigned int timed : 1;
    200  /** What handshake type was the request? (Used for timing) */
    201  uint16_t handshake_type;
    202  /** When did we send the request to the cpuworker? */
    203  struct timeval started_at;
    204  /** Once the cpuworker received the request, how many microseconds did it
    205   * take? (This shouldn't overflow; 4 billion micoseconds is over an hour,
    206   * and we'll never have an onion handshake that takes so long.) */
    207  uint32_t n_usec;
    208 
    209  /** Output of processing a create cell
    210   *
    211   * @{
    212   */
    213  /** The created cell to send back. */
    214  created_cell_t created_cell;
    215  /** The keys to use on this circuit. */
    216  uint8_t keys[MAX_RELAY_KEY_MATERIAL_LEN];
    217  /** Length of the generated key material. */
    218  size_t keys_len;
    219  /** Input to use for authenticating introduce1 cells. */
    220  uint8_t rend_auth_material[DIGEST_LEN];
    221  /** Negotiated circuit parameters. */
    222  circuit_params_t circ_params;
    223 } cpuworker_reply_t;
    224 
    225 typedef struct cpuworker_job_u_t {
    226  or_circuit_t *circ;
    227  union {
    228    cpuworker_request_t request;
    229    cpuworker_reply_t reply;
    230  } u;
    231 } cpuworker_job_t;
    232 
    233 static workqueue_reply_t
    234 update_state_threadfn(void *state_, void *work_)
    235 {
    236  worker_state_t *state = state_;
    237  worker_state_t *update = work_;
    238  server_onion_keys_free(state->onion_keys);
    239  state->onion_keys = update->onion_keys;
    240  update->onion_keys = NULL;
    241  worker_state_free(update);
    242  ++state->generation;
    243  return WQ_RPL_REPLY;
    244 }
    245 
    246 /** Called when the onion key has changed so update all CPU worker(s) with
    247 * new function pointers with which a new state will be generated.
    248 */
    249 void
    250 cpuworkers_rotate_keyinfo(void)
    251 {
    252  if (!threadpool) {
    253    /* If we're a client, then we won't have cpuworkers, and we won't need
    254     * to tell them to rotate their state.
    255     */
    256    return;
    257  }
    258  if (threadpool_queue_update(threadpool,
    259                              worker_state_new,
    260                              update_state_threadfn,
    261                              worker_state_free_void,
    262                              NULL)) {
    263    log_warn(LD_OR, "Failed to queue key update for worker threads.");
    264  }
    265 }
    266 
    267 /** Indexed by handshake type: how many onionskins have we processed and
    268 * counted of that type? */
    269 static uint64_t onionskins_n_processed[MAX_ONION_HANDSHAKE_TYPE+1];
    270 /** Indexed by handshake type, corresponding to the onionskins counted in
    271 * onionskins_n_processed: how many microseconds have we spent in cpuworkers
    272 * processing that kind of onionskin? */
    273 static uint64_t onionskins_usec_internal[MAX_ONION_HANDSHAKE_TYPE+1];
    274 /** Indexed by handshake type, corresponding to onionskins counted in
    275 * onionskins_n_processed: how many microseconds have we spent waiting for
    276 * cpuworkers to give us answers for that kind of onionskin?
    277 */
    278 static uint64_t onionskins_usec_roundtrip[MAX_ONION_HANDSHAKE_TYPE+1];
    279 
    280 /** If any onionskin takes longer than this, we clip them to this
    281 * time. (microseconds) */
    282 #define MAX_BELIEVABLE_ONIONSKIN_DELAY (2*1000*1000)
    283 
    284 /** Return true iff we'd like to measure a handshake of type
    285 * <b>onionskin_type</b>. Call only from the main thread. */
    286 static int
    287 should_time_request(uint16_t onionskin_type)
    288 {
    289  /* If we've never heard of this type, we shouldn't even be here. */
    290  if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE)
    291    return 0;
    292  /* Measure the first N handshakes of each type, to ensure we have a
    293   * sample */
    294  if (onionskins_n_processed[onionskin_type] < 4096)
    295    return 1;
    296 
    297  /** Otherwise, measure with P=1/128.  We avoid doing this for every
    298   * handshake, since the measurement itself can take a little time. */
    299  return crypto_fast_rng_one_in_n(get_thread_fast_rng(), 128);
    300 }
    301 
    302 /** Return an estimate of how many microseconds we will need for a single
    303 * cpuworker to process <b>n_requests</b> onionskins of type
    304 * <b>onionskin_type</b>. */
    305 uint64_t
    306 estimated_usec_for_onionskins(uint32_t n_requests, uint16_t onionskin_type)
    307 {
    308  if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
    309    return 1000 * (uint64_t)n_requests;
    310  if (PREDICT_UNLIKELY(onionskins_n_processed[onionskin_type] < 100)) {
    311    /* Until we have 100 data points, just assume everything takes 1 msec. */
    312    return 1000 * (uint64_t)n_requests;
    313  } else {
    314    /* This can't overflow: we'll never have more than 500000 onionskins
    315     * measured in onionskin_usec_internal, and they won't take anything near
    316     * 1 sec each, and we won't have anything like 1 million queued
    317     * onionskins.  But that's 5e5 * 1e6 * 1e6, which is still less than
    318     * UINT64_MAX. */
    319    return (onionskins_usec_internal[onionskin_type] * n_requests) /
    320      onionskins_n_processed[onionskin_type];
    321  }
    322 }
    323 
    324 /** Compute the absolute and relative overhead of using the cpuworker
    325 * framework for onionskins of type <b>onionskin_type</b>.*/
    326 static int
    327 get_overhead_for_onionskins(uint32_t *usec_out, double *frac_out,
    328                            uint16_t onionskin_type)
    329 {
    330  uint64_t overhead;
    331 
    332  *usec_out = 0;
    333  *frac_out = 0.0;
    334 
    335  if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */
    336    return -1;
    337  if (onionskins_n_processed[onionskin_type] == 0 ||
    338      onionskins_usec_internal[onionskin_type] == 0 ||
    339      onionskins_usec_roundtrip[onionskin_type] == 0)
    340    return -1;
    341 
    342  overhead = onionskins_usec_roundtrip[onionskin_type] -
    343    onionskins_usec_internal[onionskin_type];
    344 
    345  *usec_out = (uint32_t)(overhead / onionskins_n_processed[onionskin_type]);
    346  *frac_out = ((double)overhead) / onionskins_usec_internal[onionskin_type];
    347 
    348  return 0;
    349 }
    350 
    351 /** If we've measured overhead for onionskins of type <b>onionskin_type</b>,
    352 * log it. */
    353 void
    354 cpuworker_log_onionskin_overhead(int severity, int onionskin_type,
    355                                 const char *onionskin_type_name)
    356 {
    357  uint32_t overhead;
    358  double relative_overhead;
    359  int r;
    360 
    361  r = get_overhead_for_onionskins(&overhead,  &relative_overhead,
    362                                  onionskin_type);
    363  if (!overhead || r<0)
    364    return;
    365 
    366  log_fn(severity, LD_OR,
    367         "%s onionskins have averaged %u usec overhead (%.2f%%) in "
    368         "cpuworker code ",
    369         onionskin_type_name, (unsigned)overhead, relative_overhead*100);
    370 }
    371 
    372 /** Handle a reply from the worker threads. */
    373 static void
    374 cpuworker_onion_handshake_replyfn(void *work_)
    375 {
    376  cpuworker_job_t *job = work_;
    377  cpuworker_reply_t rpl;
    378  or_circuit_t *circ = NULL;
    379 
    380  tor_assert(total_pending_tasks > 0);
    381  --total_pending_tasks;
    382 
    383  /* Could avoid this, but doesn't matter. */
    384  memcpy(&rpl, &job->u.reply, sizeof(rpl));
    385 
    386  tor_assert(rpl.magic == CPUWORKER_REPLY_MAGIC);
    387 
    388  if (rpl.timed && rpl.success &&
    389      rpl.handshake_type <= MAX_ONION_HANDSHAKE_TYPE) {
    390    /* Time how long this request took. The handshake_type check should be
    391       needless, but let's leave it in to be safe. */
    392    struct timeval tv_end, tv_diff;
    393    int64_t usec_roundtrip;
    394    tor_gettimeofday(&tv_end);
    395    timersub(&tv_end, &rpl.started_at, &tv_diff);
    396    usec_roundtrip = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
    397    if (usec_roundtrip >= 0 &&
    398        usec_roundtrip < MAX_BELIEVABLE_ONIONSKIN_DELAY) {
    399      ++onionskins_n_processed[rpl.handshake_type];
    400      onionskins_usec_internal[rpl.handshake_type] += rpl.n_usec;
    401      onionskins_usec_roundtrip[rpl.handshake_type] += usec_roundtrip;
    402      if (onionskins_n_processed[rpl.handshake_type] >= 500000) {
    403        /* Scale down every 500000 handshakes.  On a busy server, that's
    404         * less impressive than it sounds. */
    405        onionskins_n_processed[rpl.handshake_type] /= 2;
    406        onionskins_usec_internal[rpl.handshake_type] /= 2;
    407        onionskins_usec_roundtrip[rpl.handshake_type] /= 2;
    408      }
    409    }
    410  }
    411 
    412  circ = job->circ;
    413 
    414  log_debug(LD_OR,
    415            "Unpacking cpuworker reply %p, circ=%p, success=%d",
    416            job, circ, rpl.success);
    417 
    418  if (circ->base_.magic == DEAD_CIRCUIT_MAGIC) {
    419    /* The circuit was supposed to get freed while the reply was
    420     * pending. Instead, it got left for us to free so that we wouldn't freak
    421     * out when the job->circ field wound up pointing to nothing. */
    422    log_debug(LD_OR, "Circuit died while reply was pending. Freeing memory.");
    423    circ->base_.magic = 0;
    424    tor_free(circ);
    425    goto done_processing;
    426  }
    427 
    428  circ->workqueue_entry = NULL;
    429 
    430  if (TO_CIRCUIT(circ)->marked_for_close) {
    431    /* We already marked this circuit; we can't call it open. */
    432    log_debug(LD_OR,"circuit is already marked.");
    433    goto done_processing;
    434  }
    435 
    436  if (rpl.success == 0) {
    437    log_debug(LD_OR,
    438              "decoding onionskin failed. "
    439              "(Old key or bad software.) Closing.");
    440    circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_TORPROTOCOL);
    441    goto done_processing;
    442  }
    443 
    444  /* If the client asked for congestion control, if our consensus parameter
    445   * allowed it to negotiate as enabled, allocate a congestion control obj. */
    446  if (rpl.circ_params.cc_enabled) {
    447    if (get_options()->SbwsExit) {
    448      TO_CIRCUIT(circ)->ccontrol = congestion_control_new(&rpl.circ_params,
    449                                                          CC_PATH_SBWS);
    450    } else {
    451      TO_CIRCUIT(circ)->ccontrol = congestion_control_new(&rpl.circ_params,
    452                                                          CC_PATH_EXIT);
    453    }
    454  }
    455 
    456  circ->relay_cell_format = rpl.circ_params.cell_fmt;
    457 
    458  if (onionskin_answer(circ,
    459                       &rpl.created_cell,
    460                       rpl.circ_params.crypto_alg,
    461                       (const char*)rpl.keys, rpl.keys_len,
    462                       rpl.rend_auth_material) < 0) {
    463    log_warn(LD_OR,"onionskin_answer failed. Closing.");
    464    circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_INTERNAL);
    465    goto done_processing;
    466  }
    467 
    468  log_debug(LD_OR,"onionskin_answer succeeded. Yay.");
    469 
    470 done_processing:
    471  memwipe(&rpl, 0, sizeof(rpl));
    472  memwipe(job, 0, sizeof(*job));
    473  tor_free(job);
    474  queue_pending_tasks();
    475 }
    476 
    477 /** Implementation function for onion handshake requests. */
    478 static workqueue_reply_t
    479 cpuworker_onion_handshake_threadfn(void *state_, void *work_)
    480 {
    481  worker_state_t *state = state_;
    482  cpuworker_job_t *job = work_;
    483 
    484  /* variables for onion processing */
    485  server_onion_keys_t *onion_keys = state->onion_keys;
    486  cpuworker_request_t req;
    487  cpuworker_reply_t rpl;
    488 
    489  memcpy(&req, &job->u.request, sizeof(req));
    490 
    491  tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC);
    492  memset(&rpl, 0, sizeof(rpl));
    493 
    494  const create_cell_t *cc = &req.create_cell;
    495  created_cell_t *cell_out = &rpl.created_cell;
    496  struct timeval tv_start = {0,0}, tv_end;
    497  int n;
    498  rpl.timed = req.timed;
    499  rpl.started_at = req.started_at;
    500  rpl.handshake_type = cc->handshake_type;
    501  if (req.timed)
    502    tor_gettimeofday(&tv_start);
    503  rpl.keys_len = sizeof(rpl.keys);
    504  n = onion_skin_server_handshake(cc->handshake_type,
    505                                  cc->onionskin, cc->handshake_len,
    506                                  onion_keys,
    507                                  &req.circ_ns_params,
    508                                  cell_out->reply,
    509                                  sizeof(cell_out->reply),
    510                                  rpl.keys, &rpl.keys_len,
    511                                  rpl.rend_auth_material,
    512                                  &rpl.circ_params);
    513 
    514  if (n < 0) {
    515    /* failure */
    516    log_debug(LD_OR,"onion_skin_server_handshake failed.");
    517    memset(&rpl, 0, sizeof(rpl));
    518    rpl.success = 0;
    519  } else {
    520    /* success */
    521    log_debug(LD_OR,"onion_skin_server_handshake succeeded.");
    522    cell_out->handshake_len = n;
    523    switch (cc->cell_type) {
    524    case CELL_CREATE:
    525      cell_out->cell_type = CELL_CREATED; break;
    526    case CELL_CREATE2:
    527      cell_out->cell_type = CELL_CREATED2; break;
    528    case CELL_CREATE_FAST:
    529      cell_out->cell_type = CELL_CREATED_FAST; break;
    530    default:
    531      tor_assert(0);
    532      return WQ_RPL_SHUTDOWN;
    533    }
    534    rpl.success = 1;
    535  }
    536 
    537  rpl.magic = CPUWORKER_REPLY_MAGIC;
    538  if (req.timed) {
    539    struct timeval tv_diff;
    540    int64_t usec;
    541    tor_gettimeofday(&tv_end);
    542    timersub(&tv_end, &tv_start, &tv_diff);
    543    usec = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec;
    544    if (usec < 0 || usec > MAX_BELIEVABLE_ONIONSKIN_DELAY)
    545      rpl.n_usec = MAX_BELIEVABLE_ONIONSKIN_DELAY;
    546    else
    547      rpl.n_usec = (uint32_t) usec;
    548  }
    549 
    550  memcpy(&job->u.reply, &rpl, sizeof(rpl));
    551 
    552  memwipe(&req, 0, sizeof(req));
    553  memwipe(&rpl, 0, sizeof(req));
    554  return WQ_RPL_REPLY;
    555 }
    556 
    557 /** Take pending tasks from the queue and assign them to cpuworkers. */
    558 static void
    559 queue_pending_tasks(void)
    560 {
    561  or_circuit_t *circ;
    562  create_cell_t *onionskin = NULL;
    563 
    564  while (total_pending_tasks < max_pending_tasks) {
    565    circ = onion_next_task(&onionskin);
    566 
    567    if (!circ)
    568      return;
    569 
    570    if (assign_onionskin_to_cpuworker(circ, onionskin) < 0)
    571      log_info(LD_OR,"assign_to_cpuworker failed. Ignoring.");
    572  }
    573 }
    574 
    575 /** DOCDOC */
    576 MOCK_IMPL(workqueue_entry_t *,
    577 cpuworker_queue_work,(workqueue_priority_t priority,
    578                      workqueue_reply_t (*fn)(void *, void *),
    579                      void (*reply_fn)(void *),
    580                      void *arg))
    581 {
    582  tor_assert(threadpool);
    583 
    584  return threadpool_queue_work_priority(threadpool,
    585                                        priority,
    586                                        fn,
    587                                        reply_fn,
    588                                        arg);
    589 }
    590 
    591 /** Try to tell a cpuworker to perform the public key operations necessary to
    592 * respond to <b>onionskin</b> for the circuit <b>circ</b>.
    593 *
    594 * Return 0 if we successfully assign the task, or -1 on failure.
    595 */
    596 int
    597 assign_onionskin_to_cpuworker(or_circuit_t *circ,
    598                              create_cell_t *onionskin)
    599 {
    600  workqueue_entry_t *queue_entry;
    601  cpuworker_job_t *job;
    602  cpuworker_request_t req;
    603  int should_time;
    604 
    605  tor_assert(threadpool);
    606 
    607  if (!circ->p_chan) {
    608    log_info(LD_OR,"circ->p_chan gone. Failing circ.");
    609    tor_free(onionskin);
    610    return -1;
    611  }
    612 
    613  if (total_pending_tasks >= max_pending_tasks) {
    614    log_debug(LD_OR,"No idle cpuworkers. Queuing.");
    615    if (onion_pending_add(circ, onionskin) < 0) {
    616      tor_free(onionskin);
    617      return -1;
    618    }
    619    return 0;
    620  }
    621 
    622  if (!channel_is_client(circ->p_chan))
    623    rep_hist_note_circuit_handshake_assigned(onionskin->handshake_type);
    624 
    625  should_time = should_time_request(onionskin->handshake_type);
    626  memset(&req, 0, sizeof(req));
    627  req.magic = CPUWORKER_REQUEST_MAGIC;
    628  req.timed = should_time;
    629 
    630  memcpy(&req.create_cell, onionskin, sizeof(create_cell_t));
    631 
    632  tor_free(onionskin);
    633 
    634  if (should_time)
    635    tor_gettimeofday(&req.started_at);
    636 
    637  /* Copy the current cached consensus params relevant to
    638   * circuit negotiation into the CPU worker context */
    639  req.circ_ns_params.cc_enabled = congestion_control_enabled();
    640  req.circ_ns_params.sendme_inc_cells = congestion_control_sendme_inc();
    641 
    642  job = tor_malloc_zero(sizeof(cpuworker_job_t));
    643  job->circ = circ;
    644  memcpy(&job->u.request, &req, sizeof(req));
    645  memwipe(&req, 0, sizeof(req));
    646 
    647  ++total_pending_tasks;
    648  queue_entry = threadpool_queue_work_priority(threadpool,
    649                                      WQ_PRI_HIGH,
    650                                      cpuworker_onion_handshake_threadfn,
    651                                      cpuworker_onion_handshake_replyfn,
    652                                      job);
    653  if (!queue_entry) {
    654    log_warn(LD_BUG, "Couldn't queue work on threadpool");
    655    tor_free(job);
    656    return -1;
    657  }
    658 
    659  log_debug(LD_OR, "Queued task %p (qe=%p, circ=%p)",
    660            job, queue_entry, job->circ);
    661 
    662  circ->workqueue_entry = queue_entry;
    663 
    664  return 0;
    665 }
    666 
    667 /** If <b>circ</b> has a pending handshake that hasn't been processed yet,
    668 * remove it from the worker queue. */
    669 void
    670 cpuworker_cancel_circ_handshake(or_circuit_t *circ)
    671 {
    672  cpuworker_job_t *job;
    673  if (circ->workqueue_entry == NULL)
    674    return;
    675 
    676  job = workqueue_entry_cancel(circ->workqueue_entry);
    677  if (job) {
    678    /* It successfully cancelled. */
    679    memwipe(job, 0xe0, sizeof(*job));
    680    tor_free(job);
    681    tor_assert(total_pending_tasks > 0);
    682    --total_pending_tasks;
    683    /* if (!job), this is done in cpuworker_onion_handshake_replyfn. */
    684    circ->workqueue_entry = NULL;
    685  }
    686 }