tor

The Tor anonymity network
git clone https://git.dasho.dev/tor.git
Log | Files | Refs | README | LICENSE

workqueue.c (28409B)


      1 /* copyright (c) 2013-2024, The Tor Project, Inc. */
      2 /* See LICENSE for licensing information */
      3 
      4 /**
      5 * \file workqueue.c
      6 *
      7 * \brief Implements worker threads, queues of work for them, and mechanisms
      8 * for them to send answers back to the main thread.
      9 *
     10 * The main structure here is a threadpool_t : it manages a set of worker
     11 * threads, a queue of pending work, and a reply queue.  Every piece of work
     12 * is a workqueue_entry_t, containing data to process and a function to
     13 * process it with.
     14 *
     15 * The main thread informs the worker threads of pending work by using a
     16 * condition variable.  The workers inform the main process of completed work
     17 * by using an alert_sockets_t object, as implemented in net/alertsock.c.
     18 *
     19 * The main thread can also queue an "update" that will be handled by all the
     20 * workers.  This is useful for updating state that all the workers share.
     21 *
     22 * In Tor today, there is currently only one thread pool, managed
     23 * in cpuworker.c and handling a variety of types of work, from the original
     24 * "onion skin" circuit handshakes, to consensus diff computation, to
     25 * client-side onion service PoW generation.
     26 */
     27 
     28 #include "orconfig.h"
     29 #include "lib/evloop/compat_libevent.h"
     30 #include "lib/evloop/workqueue.h"
     31 
     32 #include "lib/crypt_ops/crypto_rand.h"
     33 #include "lib/intmath/weakrng.h"
     34 #include "lib/log/ratelim.h"
     35 #include "lib/log/log.h"
     36 #include "lib/log/util_bug.h"
     37 #include "lib/net/alertsock.h"
     38 #include "lib/net/socket.h"
     39 #include "lib/thread/threads.h"
     40 #include "lib/time/compat_time.h"
     41 
     42 #include "ext/tor_queue.h"
     43 #include <event2/event.h>
     44 #include <string.h>
     45 
     46 #define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH
     47 #define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW
     48 #define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1)
     49 
     50 TOR_TAILQ_HEAD(work_tailq_t, workqueue_entry_t);
     51 typedef struct work_tailq_t work_tailq_t;
     52 
     53 struct threadpool_t {
     54  /** An array of pointers to workerthread_t: one for each running worker
     55   * thread. */
     56  struct workerthread_t **threads;
     57 
     58  /** Condition variable that we wait on when we have no work, and which
     59   * gets signaled when our queue becomes nonempty. */
     60  tor_cond_t condition;
     61  /** Queues of pending work that we have to do. The queue with priority
     62   * <b>p</b> is work[p]. */
     63  work_tailq_t work[WORKQUEUE_N_PRIORITIES];
     64 
     65  /** The current 'update generation' of the threadpool.  Any thread that is
     66   * at an earlier generation needs to run the update function. */
     67  unsigned generation;
     68 
     69  /** Function that should be run for updates on each thread. */
     70  workqueue_reply_t (*update_fn)(void *, void *);
     71  /** Function to free update arguments if they can't be run. */
     72  void (*free_update_arg_fn)(void *);
     73  /** Array of n_threads update arguments. */
     74  void **update_args;
     75  /** Event to notice when another thread has sent a reply. */
     76  struct event *reply_event;
     77  void (*reply_cb)(threadpool_t *);
     78 
     79  /** Number of elements in threads. */
     80  int n_threads;
     81  /** Number of elements to be created in threads. */
     82  int n_threads_max;
     83  /** Mutex to protect all the above fields. */
     84  tor_mutex_t lock;
     85 
     86  /** A reply queue to use when constructing new threads. */
     87  replyqueue_t *reply_queue;
     88 
     89  /** Functions used to allocate and free thread state. */
     90  void *(*new_thread_state_fn)(void*);
     91  void (*free_thread_state_fn)(void*);
     92  void *new_thread_state_arg;
     93 
     94  /** Used for signalling the worker threads to exit. */
     95  int exit;
     96  /** Mutex for controlling worker threads' startup and exit. */
     97  tor_mutex_t control_lock;
     98 };
     99 
    100 /** Used to put a workqueue_priority_t value into a bitfield. */
    101 #define workqueue_priority_bitfield_t ENUM_BF(workqueue_priority_t)
    102 /** Number of bits needed to hold all legal values of workqueue_priority_t */
    103 #define WORKQUEUE_PRIORITY_BITS 2
    104 
    105 struct workqueue_entry_t {
    106  /** The next workqueue_entry_t that's pending on the same thread or
    107   * reply queue. */
    108  TOR_TAILQ_ENTRY(workqueue_entry_t) next_work;
    109  /** The threadpool to which this workqueue_entry_t was assigned. This field
    110   * is set when the workqueue_entry_t is created, and won't be cleared until
    111   * after it's handled in the main thread. */
    112  struct threadpool_t *on_pool;
    113  /** True iff this entry is waiting for a worker to start processing it. */
    114  uint8_t pending;
    115  /** Priority of this entry. */
    116  workqueue_priority_bitfield_t priority : WORKQUEUE_PRIORITY_BITS;
    117  /** Function to run in the worker thread. */
    118  workqueue_reply_t (*fn)(void *state, void *arg);
    119  /** Function to run while processing the reply queue. */
    120  void (*reply_fn)(void *arg);
    121  /** Argument for the above functions. */
    122  void *arg;
    123 };
    124 
    125 struct replyqueue_t {
    126  /** Mutex to protect the answers field */
    127  tor_mutex_t lock;
    128  /** Doubly-linked list of answers that the reply queue needs to handle. */
    129  TOR_TAILQ_HEAD(, workqueue_entry_t) answers;
    130 
    131  /** Mechanism to wake up the main thread when it is receiving answers. */
    132  alert_sockets_t alert;
    133 };
    134 
    135 /** A worker thread represents a single thread in a thread pool. */
    136 typedef struct workerthread_t {
    137  /** Which thread it this?  In range 0..in_pool->n_threads-1 */
    138  int index;
    139  /** The pool this thread is a part of. */
    140  struct threadpool_t *in_pool;
    141  /** User-supplied state field that we pass to the worker functions of each
    142   * work item. */
    143  void *state;
    144  /** Reply queue to which we pass our results. */
    145  replyqueue_t *reply_queue;
    146  /** The current update generation of this thread */
    147  unsigned generation;
    148  /** One over the probability of taking work from a lower-priority queue. */
    149  int32_t lower_priority_chance;
    150 } workerthread_t;
    151 
    152 static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work);
    153 static void workerthread_free_(workerthread_t *thread);
    154 #define workerthread_free(thread) \
    155  FREE_AND_NULL(workerthread_t, workerthread_free_, (thread))
    156 static void replyqueue_free_(replyqueue_t *queue);
    157 #define replyqueue_free(queue) \
    158  FREE_AND_NULL(replyqueue_t, replyqueue_free_, (queue))
    159 
    160 /** Allocate and return a new workqueue_entry_t, set up to run the function
    161 * <b>fn</b> in the worker thread, and <b>reply_fn</b> in the main
    162 * thread. See threadpool_queue_work() for full documentation. */
    163 static workqueue_entry_t *
    164 workqueue_entry_new(workqueue_reply_t (*fn)(void*, void*),
    165                    void (*reply_fn)(void*),
    166                    void *arg)
    167 {
    168  workqueue_entry_t *ent = tor_malloc_zero(sizeof(workqueue_entry_t));
    169  ent->fn = fn;
    170  ent->reply_fn = reply_fn;
    171  ent->arg = arg;
    172  ent->priority = WQ_PRI_HIGH;
    173  return ent;
    174 }
    175 
    176 #define workqueue_entry_free(ent) \
    177  FREE_AND_NULL(workqueue_entry_t, workqueue_entry_free_, (ent))
    178 
    179 /**
    180 * Release all storage held in <b>ent</b>. Call only when <b>ent</b> is not on
    181 * any queue.
    182 */
    183 static void
    184 workqueue_entry_free_(workqueue_entry_t *ent)
    185 {
    186  if (!ent)
    187    return;
    188  memset(ent, 0xf0, sizeof(*ent));
    189  tor_free(ent);
    190 }
    191 
    192 /**
    193 * Cancel a workqueue_entry_t that has been returned from
    194 * threadpool_queue_work.
    195 *
    196 * You must not call this function on any work whose reply function has been
    197 * executed in the main thread; that will cause undefined behavior (probably,
    198 * a crash).
    199 *
    200 * If the work is cancelled, this function return the argument passed to the
    201 * work function. It is the caller's responsibility to free this storage.
    202 *
    203 * This function will have no effect if the worker thread has already executed
    204 * or begun to execute the work item.  In that case, it will return NULL.
    205 */
    206 void *
    207 workqueue_entry_cancel(workqueue_entry_t *ent)
    208 {
    209  int cancelled = 0;
    210  void *result = NULL;
    211  tor_mutex_acquire(&ent->on_pool->lock);
    212  workqueue_priority_t prio = ent->priority;
    213  if (ent->pending) {
    214    TOR_TAILQ_REMOVE(&ent->on_pool->work[prio], ent, next_work);
    215    cancelled = 1;
    216    result = ent->arg;
    217  }
    218  tor_mutex_release(&ent->on_pool->lock);
    219 
    220  if (cancelled) {
    221    workqueue_entry_free(ent);
    222  }
    223  return result;
    224 }
    225 
    226 /**DOCDOC
    227 
    228   must hold lock */
    229 static int
    230 worker_thread_has_work(workerthread_t *thread)
    231 {
    232  unsigned i;
    233  for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
    234    if (!TOR_TAILQ_EMPTY(&thread->in_pool->work[i]))
    235        return 1;
    236  }
    237  return thread->generation != thread->in_pool->generation;
    238 }
    239 
    240 /** Extract the next workqueue_entry_t from the the thread's pool, removing
    241 * it from the relevant queues and marking it as non-pending.
    242 *
    243 * The caller must hold the lock. */
    244 static workqueue_entry_t *
    245 worker_thread_extract_next_work(workerthread_t *thread)
    246 {
    247  threadpool_t *pool = thread->in_pool;
    248  work_tailq_t *queue = NULL, *this_queue;
    249  unsigned i;
    250  for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
    251    this_queue = &pool->work[i];
    252    if (!TOR_TAILQ_EMPTY(this_queue)) {
    253      queue = this_queue;
    254      if (! crypto_fast_rng_one_in_n(get_thread_fast_rng(),
    255                                     thread->lower_priority_chance)) {
    256        /* Usually we'll just break now, so that we can get out of the loop
    257         * and use the queue where we found work. But with a small
    258         * probability, we'll keep looking for lower priority work, so that
    259         * we don't ignore our low-priority queues entirely. */
    260        break;
    261      }
    262    }
    263  }
    264 
    265  if (queue == NULL)
    266    return NULL;
    267 
    268  workqueue_entry_t *work = TOR_TAILQ_FIRST(queue);
    269  TOR_TAILQ_REMOVE(queue, work, next_work);
    270  work->pending = 0;
    271  return work;
    272 }
    273 
    274 /**
    275 * Main function for the worker thread.
    276 */
    277 static void
    278 worker_thread_main(void *thread_)
    279 {
    280  static int n_worker_threads_running = 0;
    281  static unsigned long control_lock_owner = 0;
    282  workerthread_t *thread = thread_;
    283  threadpool_t *pool = thread->in_pool;
    284  workqueue_entry_t *work;
    285  workqueue_reply_t result;
    286 
    287  tor_mutex_acquire(&pool->control_lock);
    288  log_debug(LD_GENERAL, "Worker thread %u/%u has started [TID: %lu].",
    289            n_worker_threads_running + 1, pool->n_threads_max,
    290            tor_get_thread_id());
    291 
    292  if (++n_worker_threads_running == pool->n_threads_max)
    293    tor_cond_signal_one(&pool->condition);
    294 
    295  tor_mutex_release(&pool->control_lock);
    296 
    297  /* Wait until all worker threads have started.
    298   * pool->lock must be prelocked here. */
    299  tor_mutex_acquire(&pool->lock);
    300 
    301  if (control_lock_owner == 0) {
    302    /* pool->control_lock stays locked. This is required for the main thread
    303     * to wait for the worker threads to exit on shutdown, so the memory
    304     * clean up won't begin before all threads have exited. */
    305    tor_mutex_acquire(&pool->control_lock);
    306    control_lock_owner = tor_get_thread_id();
    307  }
    308 
    309  log_debug(LD_GENERAL, "Worker thread has entered the work loop [TID: %lu].",
    310            tor_get_thread_id());
    311 
    312  while (1) {
    313    /* Exit thread when signaled to exit */
    314    if (pool->exit)
    315      goto exit;
    316 
    317    /* lock must be held at this point. */
    318    while (worker_thread_has_work(thread)) {
    319      /* lock must be held at this point. */
    320      if (thread->in_pool->generation != thread->generation) {
    321        void *arg = thread->in_pool->update_args[thread->index];
    322        thread->in_pool->update_args[thread->index] = NULL;
    323        workqueue_reply_t (*update_fn)(void*,void*) =
    324            thread->in_pool->update_fn;
    325        thread->generation = thread->in_pool->generation;
    326        tor_mutex_release(&pool->lock);
    327 
    328        workqueue_reply_t r = update_fn(thread->state, arg);
    329 
    330        tor_mutex_acquire(&pool->lock);
    331 
    332        /* We may need to exit the thread. */
    333        if (r != WQ_RPL_REPLY)
    334          goto exit;
    335 
    336        continue;
    337      }
    338      work = worker_thread_extract_next_work(thread);
    339      if (BUG(work == NULL))
    340        break;
    341      tor_mutex_release(&pool->lock);
    342 
    343      /* We run the work function without holding the thread lock. This
    344       * is the main thread's first opportunity to give us more work. */
    345      result = work->fn(thread->state, work->arg);
    346 
    347      /* Queue the reply for the main thread. */
    348      queue_reply(thread->reply_queue, work);
    349 
    350      tor_mutex_acquire(&pool->lock);
    351 
    352      /* We may need to exit the thread. */
    353      if (result != WQ_RPL_REPLY)
    354        goto exit;
    355    }
    356    /* At this point the lock is held, and there is no work in this thread's
    357     * queue. */
    358 
    359    /* TODO: support an idle-function */
    360 
    361    /* Okay. Now, wait till somebody has work for us. */
    362    if (tor_cond_wait(&pool->condition, &pool->lock, NULL) < 0) {
    363      log_warn(LD_GENERAL, "Fail tor_cond_wait.");
    364    }
    365  }
    366 
    367 exit:
    368  /* At this point pool->lock must be held */
    369 
    370  log_debug(LD_GENERAL, "Worker thread %u/%u has exited [TID: %lu].",
    371            pool->n_threads_max - n_worker_threads_running + 1,
    372            pool->n_threads_max, tor_get_thread_id());
    373 
    374  if (tor_get_thread_id() == control_lock_owner) {
    375    /* Wait for the other worker threads to exit so we
    376     * can safely unlock pool->control_lock. */
    377    while (n_worker_threads_running > 1) {
    378      tor_mutex_release(&pool->lock);
    379      tor_sleep_msec(10);
    380      tor_mutex_acquire(&pool->lock);
    381    }
    382 
    383    tor_mutex_release(&pool->lock);
    384    /* Let the main thread know, the last worker thread has exited. */
    385    tor_mutex_release(&pool->control_lock);
    386  } else {
    387    --n_worker_threads_running;
    388    tor_mutex_release(&pool->lock);
    389  }
    390 }
    391 
    392 /** Put a reply on the reply queue.  The reply must not currently be on
    393 * any thread's work queue. */
    394 static void
    395 queue_reply(replyqueue_t *queue, workqueue_entry_t *work)
    396 {
    397  int was_empty;
    398  tor_mutex_acquire(&queue->lock);
    399  was_empty = TOR_TAILQ_EMPTY(&queue->answers);
    400  TOR_TAILQ_INSERT_TAIL(&queue->answers, work, next_work);
    401  tor_mutex_release(&queue->lock);
    402 
    403  if (was_empty) {
    404    if (queue->alert.alert_fn(queue->alert.write_fd) < 0) {
    405      /* XXXX complain! */
    406    }
    407  }
    408 }
    409 
    410 /** Allocate and start a new worker thread to use state object <b>state</b>,
    411 * and send responses to <b>replyqueue</b>. */
    412 static workerthread_t *
    413 workerthread_new(int32_t lower_priority_chance,
    414                 void *state, threadpool_t *pool, replyqueue_t *replyqueue)
    415 {
    416  workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t));
    417  thr->state = state;
    418  thr->reply_queue = replyqueue;
    419  thr->in_pool = pool;
    420  thr->lower_priority_chance = lower_priority_chance;
    421 
    422  if (spawn_func(worker_thread_main, thr) < 0) {
    423    //LCOV_EXCL_START
    424    tor_assert_nonfatal_unreached();
    425    log_err(LD_GENERAL, "Can't launch worker thread.");
    426    workerthread_free(thr);
    427    return NULL;
    428    //LCOV_EXCL_STOP
    429  }
    430 
    431  return thr;
    432 }
    433 
    434 /**
    435 * Free up the resources allocated by a worker thread.
    436 */
    437 static void
    438 workerthread_free_(workerthread_t *thread)
    439 {
    440  tor_free(thread);
    441 }
    442 
    443 /**
    444 * Queue an item of work for a thread in a thread pool.  The function
    445 * <b>fn</b> will be run in a worker thread, and will receive as arguments the
    446 * thread's state object, and the provided object <b>arg</b>. It must return
    447 * one of WQ_RPL_REPLY, WQ_RPL_ERROR, or WQ_RPL_SHUTDOWN.
    448 *
    449 * Regardless of its return value, the function <b>reply_fn</b> will later be
    450 * run in the main thread when it invokes replyqueue_process(), and will
    451 * receive as its argument the same <b>arg</b> object.  It's the reply
    452 * function's responsibility to free the work object.
    453 *
    454 * On success, return a workqueue_entry_t object that can be passed to
    455 * workqueue_entry_cancel(). On failure, return NULL.  (Failure is not
    456 * currently possible, but callers should check anyway.)
    457 *
    458 * Items are executed in a loose priority order -- each thread will usually
    459 * take from the queued work with the highest prioirity, but will occasionally
    460 * visit lower-priority queues to keep them from starving completely.
    461 *
    462 * Note that because of priorities and thread behavior, work items may not
    463 * be executed strictly in order.
    464 */
    465 workqueue_entry_t *
    466 threadpool_queue_work_priority(threadpool_t *pool,
    467                               workqueue_priority_t prio,
    468                               workqueue_reply_t (*fn)(void *, void *),
    469                               void (*reply_fn)(void *),
    470                               void *arg)
    471 {
    472  tor_assert(((int)prio) >= WORKQUEUE_PRIORITY_FIRST &&
    473             ((int)prio) <= WORKQUEUE_PRIORITY_LAST);
    474 
    475  workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg);
    476  ent->on_pool = pool;
    477  ent->pending = 1;
    478  ent->priority = prio;
    479 
    480  tor_mutex_acquire(&pool->lock);
    481 
    482  TOR_TAILQ_INSERT_TAIL(&pool->work[prio], ent, next_work);
    483 
    484  tor_cond_signal_one(&pool->condition);
    485 
    486  tor_mutex_release(&pool->lock);
    487 
    488  return ent;
    489 }
    490 
    491 /** As threadpool_queue_work_priority(), but assumes WQ_PRI_HIGH */
    492 workqueue_entry_t *
    493 threadpool_queue_work(threadpool_t *pool,
    494                      workqueue_reply_t (*fn)(void *, void *),
    495                      void (*reply_fn)(void *),
    496                      void *arg)
    497 {
    498  return threadpool_queue_work_priority(pool, WQ_PRI_HIGH, fn, reply_fn, arg);
    499 }
    500 
    501 /**
    502 * Queue a copy of a work item for every thread in a pool.  This can be used,
    503 * for example, to tell the threads to update some parameter in their states.
    504 *
    505 * Arguments are as for <b>threadpool_queue_work</b>, except that the
    506 * <b>arg</b> value is passed to <b>dup_fn</b> once per each thread to
    507 * make a copy of it.
    508 *
    509 * UPDATE FUNCTIONS MUST BE IDEMPOTENT.  We do not guarantee that every update
    510 * will be run.  If a new update is scheduled before the old update finishes
    511 * running, then the new will replace the old in any threads that haven't run
    512 * it yet.
    513 *
    514 * Return 0 on success, -1 on failure.
    515 */
    516 int
    517 threadpool_queue_update(threadpool_t *pool,
    518                         void *(*dup_fn)(void *),
    519                         workqueue_reply_t (*fn)(void *, void *),
    520                         void (*free_fn)(void *),
    521                         void *arg)
    522 {
    523  int i, n_threads;
    524  void (*old_args_free_fn)(void *arg);
    525  void **old_args;
    526  void **new_args;
    527 
    528  tor_mutex_acquire(&pool->lock);
    529  n_threads = pool->n_threads;
    530  old_args = pool->update_args;
    531  old_args_free_fn = pool->free_update_arg_fn;
    532 
    533  new_args = tor_calloc(n_threads, sizeof(void*));
    534  for (i = 0; i < n_threads; ++i) {
    535    if (dup_fn)
    536      new_args[i] = dup_fn(arg);
    537    else
    538      new_args[i] = arg;
    539  }
    540 
    541  pool->update_args = new_args;
    542  pool->free_update_arg_fn = free_fn;
    543  pool->update_fn = fn;
    544  ++pool->generation;
    545 
    546  tor_cond_signal_all(&pool->condition);
    547 
    548  tor_mutex_release(&pool->lock);
    549 
    550  if (old_args) {
    551    for (i = 0; i < n_threads; ++i) {
    552      if (old_args[i] && old_args_free_fn)
    553        old_args_free_fn(old_args[i]);
    554    }
    555    tor_free(old_args);
    556  }
    557 
    558  return 0;
    559 }
    560 
    561 /** Don't have more than this many threads per pool. */
    562 #define MAX_THREADS 1024
    563 
    564 /** For half of our threads, choose lower priority queues with probability
    565 * 1/N for each of these values. Both are chosen somewhat arbitrarily.  If
    566 * CHANCE_PERMISSIVE is too low, then we have a risk of low-priority tasks
    567 * stalling forever.  If it's too high, we have a risk of low-priority tasks
    568 * grabbing half of the threads. */
    569 #define CHANCE_PERMISSIVE 37
    570 #define CHANCE_STRICT INT32_MAX
    571 
    572 /** Launch threads until we have <b>n</b>. */
    573 static int
    574 threadpool_start_threads(threadpool_t *pool, int n)
    575 {
    576  if (BUG(n < 0))
    577    return -1; // LCOV_EXCL_LINE
    578  if (n > MAX_THREADS)
    579    n = MAX_THREADS;
    580 
    581  tor_mutex_acquire(&pool->control_lock);
    582  tor_mutex_acquire(&pool->lock);
    583 
    584  if (pool->n_threads < n)
    585    pool->threads = tor_reallocarray(pool->threads,
    586                                     sizeof(workerthread_t*), n);
    587 
    588  int status = 0;
    589  pool->n_threads_max = n;
    590  log_debug(LD_GENERAL, "Starting worker threads...");
    591 
    592  while (pool->n_threads < n) {
    593    /* For half of our threads, we'll choose lower priorities permissively;
    594     * for the other half, we'll stick more strictly to higher priorities.
    595     * This keeps slow low-priority tasks from taking over completely. */
    596    int32_t chance = (pool->n_threads & 1) ? CHANCE_STRICT : CHANCE_PERMISSIVE;
    597 
    598    void *state = pool->new_thread_state_fn(pool->new_thread_state_arg);
    599    workerthread_t *thr = workerthread_new(chance,
    600                                           state, pool, pool->reply_queue);
    601 
    602    if (!thr) {
    603      //LCOV_EXCL_START
    604      tor_assert_nonfatal_unreached();
    605      pool->free_thread_state_fn(state);
    606      status = -1;
    607      goto check_status;
    608      //LCOV_EXCL_STOP
    609    }
    610    thr->index = pool->n_threads;
    611    pool->threads[pool->n_threads++] = thr;
    612  }
    613 
    614  struct timeval tv = {.tv_sec = 30, .tv_usec = 0};
    615 
    616  /* Wait for the last launched thread to confirm us, it has started.
    617   * Wait max 30 seconds */
    618  status = tor_cond_wait(&pool->condition, &pool->control_lock, &tv);
    619 
    620 check_status:
    621  switch (status) {
    622  case 0:
    623    log_debug(LD_GENERAL, "Starting worker threads finished.");
    624    break;
    625  case -1:
    626    log_warn(LD_GENERAL, "Failed to confirm worker threads' start up.");
    627    break;
    628  case 1:
    629    log_warn(LD_GENERAL, "Failed to confirm worker threads' "
    630                         "start up after timeout.");
    631    FALLTHROUGH;
    632  default:
    633    status = -1;
    634  }
    635 
    636  log_debug(LD_GENERAL, "Signaled the worker threads to enter the work loop.");
    637 
    638  /* If we had an error, let the worker threads (if any) exit directly. */
    639  if (status != 0) {
    640    pool->exit = 1;
    641    log_debug(LD_GENERAL, "Signaled the worker threads to exit...");
    642  }
    643 
    644  /* Let worker threads enter the work loop. */
    645  tor_mutex_release(&pool->lock);
    646  /* Let one of the worker threads take the ownership of pool->control_lock.
    647   * This is required for compliance with POSIX. */
    648  tor_mutex_release(&pool->control_lock);
    649 
    650  return status;
    651 }
    652 
    653 /** Stop all worker threads */
    654 static void
    655 threadpool_stop_threads(threadpool_t *pool)
    656 {
    657  tor_mutex_acquire(&pool->lock);
    658 
    659  if (pool->exit == 0) {
    660    /* Signal the worker threads to exit */
    661    pool->exit = 1;
    662    /* If worker threads are waiting for work, let them continue to exit */
    663    tor_cond_signal_all(&pool->condition);
    664 
    665    log_debug(LD_GENERAL, "Signaled worker threads to exit. "
    666                          "Waiting for them to exit...");
    667  }
    668 
    669  tor_mutex_release(&pool->lock);
    670 
    671  /* Wait until all worker threads have exited.
    672   * pool->control_lock must be prelocked here. */
    673  tor_mutex_acquire(&pool->control_lock);
    674  /* Unlock required, else main thread hangs on mutex uninit. */
    675  tor_mutex_release(&pool->control_lock);
    676 
    677  /* If this message appears in the log before all threads have confirmed
    678   * their exit, then pool->control_lock wasn't prelocked for some reason. */
    679  log_debug(LD_GENERAL, "All worker threads have exited.");
    680 }
    681 
    682 /**
    683 * Construct a new thread pool with <b>n</b> worker threads, configured to
    684 * send their output to <b>replyqueue</b>.  The threads' states will be
    685 * constructed with the <b>new_thread_state_fn</b> call, receiving <b>arg</b>
    686 * as its argument.  When the threads close, they will call
    687 * <b>free_thread_state_fn</b> on their states.
    688 */
    689 threadpool_t *
    690 threadpool_new(int n_threads,
    691               replyqueue_t *replyqueue,
    692               void *(*new_thread_state_fn)(void*),
    693               void (*free_thread_state_fn)(void*),
    694               void *arg)
    695 {
    696  threadpool_t *pool;
    697  pool = tor_malloc_zero(sizeof(threadpool_t));
    698  tor_mutex_init_nonrecursive(&pool->lock);
    699  tor_cond_init(&pool->condition);
    700  tor_mutex_init_nonrecursive(&pool->control_lock);
    701  pool->exit = 0;
    702 
    703  unsigned i;
    704  for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
    705    TOR_TAILQ_INIT(&pool->work[i]);
    706  }
    707 
    708  pool->new_thread_state_fn = new_thread_state_fn;
    709  pool->new_thread_state_arg = arg;
    710  pool->free_thread_state_fn = free_thread_state_fn;
    711  pool->reply_queue = replyqueue;
    712 
    713  if (threadpool_start_threads(pool, n_threads) < 0) {
    714    //LCOV_EXCL_START
    715    tor_assert_nonfatal_unreached();
    716    threadpool_free(pool);
    717    return NULL;
    718    //LCOV_EXCL_STOP
    719  }
    720 
    721  return pool;
    722 }
    723 
    724 /**
    725 * Free up the resources allocated by worker threads, worker thread pool, ...
    726 */
    727 void
    728 threadpool_free_(threadpool_t *pool)
    729 {
    730  if (!pool)
    731    return;
    732 
    733  threadpool_stop_threads(pool);
    734 
    735  log_debug(LD_GENERAL, "Beginning to clean up...");
    736 
    737  tor_cond_uninit(&pool->condition);
    738  tor_mutex_uninit(&pool->lock);
    739  tor_mutex_uninit(&pool->control_lock);
    740 
    741  if (pool->threads) {
    742    for (int i = 0; i != pool->n_threads; ++i)
    743      workerthread_free(pool->threads[i]);
    744 
    745    tor_free(pool->threads);
    746  }
    747 
    748  if (pool->update_args) {
    749    if (!pool->free_update_arg_fn)
    750      log_warn(LD_GENERAL, "Freeing pool->update_args not possible. "
    751                           "pool->free_update_arg_fn is not set.");
    752    else
    753      pool->free_update_arg_fn(pool->update_args);
    754  }
    755 
    756  if (pool->reply_event) {
    757    if (tor_event_del(pool->reply_event) == -1)
    758      log_warn(LD_GENERAL, "libevent error: deleting reply event failed.");
    759    else
    760      tor_event_free(pool->reply_event);
    761  }
    762 
    763  if (pool->reply_queue)
    764    replyqueue_free(pool->reply_queue);
    765 
    766  if (pool->new_thread_state_arg) {
    767    if (!pool->free_thread_state_fn)
    768      log_warn(LD_GENERAL, "Freeing pool->new_thread_state_arg not possible. "
    769                           "pool->free_thread_state_fn is not set.");
    770    else
    771      pool->free_thread_state_fn(pool->new_thread_state_arg);
    772  }
    773 
    774  tor_free(pool);
    775 
    776  log_debug(LD_GENERAL, "Cleanup finished.");
    777 }
    778 
    779 /** Return the reply queue associated with a given thread pool. */
    780 replyqueue_t *
    781 threadpool_get_replyqueue(threadpool_t *tp)
    782 {
    783  return tp->reply_queue;
    784 }
    785 
    786 /** Allocate a new reply queue.  Reply queues are used to pass results from
    787 * worker threads to the main thread.  Since the main thread is running an
    788 * IO-centric event loop, it needs to get woken up with means other than a
    789 * condition variable. */
    790 replyqueue_t *
    791 replyqueue_new(uint32_t alertsocks_flags)
    792 {
    793  replyqueue_t *rq;
    794 
    795  rq = tor_malloc_zero(sizeof(replyqueue_t));
    796  if (alert_sockets_create(&rq->alert, alertsocks_flags) < 0) {
    797    //LCOV_EXCL_START
    798    replyqueue_free(rq);
    799    return NULL;
    800    //LCOV_EXCL_STOP
    801  }
    802 
    803  tor_mutex_init(&rq->lock);
    804  TOR_TAILQ_INIT(&rq->answers);
    805 
    806  return rq;
    807 }
    808 
    809 /**
    810 * Free up the resources allocated by a reply queue.
    811 */
    812 static void
    813 replyqueue_free_(replyqueue_t *queue)
    814 {
    815  if (!queue)
    816    return;
    817 
    818  workqueue_entry_t *work;
    819 
    820  while (!TOR_TAILQ_EMPTY(&queue->answers)) {
    821    work = TOR_TAILQ_FIRST(&queue->answers);
    822    TOR_TAILQ_REMOVE(&queue->answers, work, next_work);
    823    workqueue_entry_free(work);
    824  }
    825 
    826  tor_free(queue);
    827 }
    828 
    829 /** Internal: Run from the libevent mainloop when there is work to handle in
    830 * the reply queue handler. */
    831 static void
    832 reply_event_cb(evutil_socket_t sock, short events, void *arg)
    833 {
    834  threadpool_t *tp = arg;
    835  (void) sock;
    836  (void) events;
    837  replyqueue_process(tp->reply_queue);
    838  if (tp->reply_cb)
    839    tp->reply_cb(tp);
    840 }
    841 
    842 /** Register the threadpool <b>tp</b>'s reply queue with Tor's global
    843 * libevent mainloop. If <b>cb</b> is provided, it is run after
    844 * each time there is work to process from the reply queue. Return 0 on
    845 * success, -1 on failure.
    846 */
    847 int
    848 threadpool_register_reply_event(threadpool_t *tp,
    849                                void (*cb)(threadpool_t *tp))
    850 {
    851  struct event_base *base = tor_libevent_get_base();
    852 
    853  if (tp->reply_event) {
    854    tor_event_free(tp->reply_event);
    855  }
    856  tp->reply_event = tor_event_new(base,
    857                                  tp->reply_queue->alert.read_fd,
    858                                  EV_READ|EV_PERSIST,
    859                                  reply_event_cb,
    860                                  tp);
    861  tor_assert(tp->reply_event);
    862  tp->reply_cb = cb;
    863  return event_add(tp->reply_event, NULL);
    864 }
    865 
    866 /**
    867 * Process all pending replies on a reply queue. The main thread should call
    868 * this function every time the socket returned by replyqueue_get_socket() is
    869 * readable.
    870 */
    871 void
    872 replyqueue_process(replyqueue_t *queue)
    873 {
    874  int r = queue->alert.drain_fn(queue->alert.read_fd);
    875  if (r < 0) {
    876    //LCOV_EXCL_START
    877    static ratelim_t warn_limit = RATELIM_INIT(7200);
    878    log_fn_ratelim(&warn_limit, LOG_WARN, LD_GENERAL,
    879                 "Failure from drain_fd: %s",
    880                   tor_socket_strerror(-r));
    881    //LCOV_EXCL_STOP
    882  }
    883 
    884  tor_mutex_acquire(&queue->lock);
    885  while (!TOR_TAILQ_EMPTY(&queue->answers)) {
    886    /* lock must be held at this point.*/
    887    workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers);
    888    TOR_TAILQ_REMOVE(&queue->answers, work, next_work);
    889    tor_mutex_release(&queue->lock);
    890    work->on_pool = NULL;
    891 
    892    work->reply_fn(work->arg);
    893    workqueue_entry_free(work);
    894 
    895    tor_mutex_acquire(&queue->lock);
    896  }
    897 
    898  tor_mutex_release(&queue->lock);
    899 }
    900 
    901 /** Return the number of threads configured for the given pool. */
    902 unsigned int
    903 threadpool_get_n_threads(threadpool_t *tp)
    904 {
    905  tor_assert(tp);
    906  return tp->n_threads;
    907 }