workqueue.c (28409B)
1 /* copyright (c) 2013-2024, The Tor Project, Inc. */ 2 /* See LICENSE for licensing information */ 3 4 /** 5 * \file workqueue.c 6 * 7 * \brief Implements worker threads, queues of work for them, and mechanisms 8 * for them to send answers back to the main thread. 9 * 10 * The main structure here is a threadpool_t : it manages a set of worker 11 * threads, a queue of pending work, and a reply queue. Every piece of work 12 * is a workqueue_entry_t, containing data to process and a function to 13 * process it with. 14 * 15 * The main thread informs the worker threads of pending work by using a 16 * condition variable. The workers inform the main process of completed work 17 * by using an alert_sockets_t object, as implemented in net/alertsock.c. 18 * 19 * The main thread can also queue an "update" that will be handled by all the 20 * workers. This is useful for updating state that all the workers share. 21 * 22 * In Tor today, there is currently only one thread pool, managed 23 * in cpuworker.c and handling a variety of types of work, from the original 24 * "onion skin" circuit handshakes, to consensus diff computation, to 25 * client-side onion service PoW generation. 26 */ 27 28 #include "orconfig.h" 29 #include "lib/evloop/compat_libevent.h" 30 #include "lib/evloop/workqueue.h" 31 32 #include "lib/crypt_ops/crypto_rand.h" 33 #include "lib/intmath/weakrng.h" 34 #include "lib/log/ratelim.h" 35 #include "lib/log/log.h" 36 #include "lib/log/util_bug.h" 37 #include "lib/net/alertsock.h" 38 #include "lib/net/socket.h" 39 #include "lib/thread/threads.h" 40 #include "lib/time/compat_time.h" 41 42 #include "ext/tor_queue.h" 43 #include <event2/event.h> 44 #include <string.h> 45 46 #define WORKQUEUE_PRIORITY_FIRST WQ_PRI_HIGH 47 #define WORKQUEUE_PRIORITY_LAST WQ_PRI_LOW 48 #define WORKQUEUE_N_PRIORITIES (((int) WORKQUEUE_PRIORITY_LAST)+1) 49 50 TOR_TAILQ_HEAD(work_tailq_t, workqueue_entry_t); 51 typedef struct work_tailq_t work_tailq_t; 52 53 struct threadpool_t { 54 /** An array of pointers to workerthread_t: one for each running worker 55 * thread. */ 56 struct workerthread_t **threads; 57 58 /** Condition variable that we wait on when we have no work, and which 59 * gets signaled when our queue becomes nonempty. */ 60 tor_cond_t condition; 61 /** Queues of pending work that we have to do. The queue with priority 62 * <b>p</b> is work[p]. */ 63 work_tailq_t work[WORKQUEUE_N_PRIORITIES]; 64 65 /** The current 'update generation' of the threadpool. Any thread that is 66 * at an earlier generation needs to run the update function. */ 67 unsigned generation; 68 69 /** Function that should be run for updates on each thread. */ 70 workqueue_reply_t (*update_fn)(void *, void *); 71 /** Function to free update arguments if they can't be run. */ 72 void (*free_update_arg_fn)(void *); 73 /** Array of n_threads update arguments. */ 74 void **update_args; 75 /** Event to notice when another thread has sent a reply. */ 76 struct event *reply_event; 77 void (*reply_cb)(threadpool_t *); 78 79 /** Number of elements in threads. */ 80 int n_threads; 81 /** Number of elements to be created in threads. */ 82 int n_threads_max; 83 /** Mutex to protect all the above fields. */ 84 tor_mutex_t lock; 85 86 /** A reply queue to use when constructing new threads. */ 87 replyqueue_t *reply_queue; 88 89 /** Functions used to allocate and free thread state. */ 90 void *(*new_thread_state_fn)(void*); 91 void (*free_thread_state_fn)(void*); 92 void *new_thread_state_arg; 93 94 /** Used for signalling the worker threads to exit. */ 95 int exit; 96 /** Mutex for controlling worker threads' startup and exit. */ 97 tor_mutex_t control_lock; 98 }; 99 100 /** Used to put a workqueue_priority_t value into a bitfield. */ 101 #define workqueue_priority_bitfield_t ENUM_BF(workqueue_priority_t) 102 /** Number of bits needed to hold all legal values of workqueue_priority_t */ 103 #define WORKQUEUE_PRIORITY_BITS 2 104 105 struct workqueue_entry_t { 106 /** The next workqueue_entry_t that's pending on the same thread or 107 * reply queue. */ 108 TOR_TAILQ_ENTRY(workqueue_entry_t) next_work; 109 /** The threadpool to which this workqueue_entry_t was assigned. This field 110 * is set when the workqueue_entry_t is created, and won't be cleared until 111 * after it's handled in the main thread. */ 112 struct threadpool_t *on_pool; 113 /** True iff this entry is waiting for a worker to start processing it. */ 114 uint8_t pending; 115 /** Priority of this entry. */ 116 workqueue_priority_bitfield_t priority : WORKQUEUE_PRIORITY_BITS; 117 /** Function to run in the worker thread. */ 118 workqueue_reply_t (*fn)(void *state, void *arg); 119 /** Function to run while processing the reply queue. */ 120 void (*reply_fn)(void *arg); 121 /** Argument for the above functions. */ 122 void *arg; 123 }; 124 125 struct replyqueue_t { 126 /** Mutex to protect the answers field */ 127 tor_mutex_t lock; 128 /** Doubly-linked list of answers that the reply queue needs to handle. */ 129 TOR_TAILQ_HEAD(, workqueue_entry_t) answers; 130 131 /** Mechanism to wake up the main thread when it is receiving answers. */ 132 alert_sockets_t alert; 133 }; 134 135 /** A worker thread represents a single thread in a thread pool. */ 136 typedef struct workerthread_t { 137 /** Which thread it this? In range 0..in_pool->n_threads-1 */ 138 int index; 139 /** The pool this thread is a part of. */ 140 struct threadpool_t *in_pool; 141 /** User-supplied state field that we pass to the worker functions of each 142 * work item. */ 143 void *state; 144 /** Reply queue to which we pass our results. */ 145 replyqueue_t *reply_queue; 146 /** The current update generation of this thread */ 147 unsigned generation; 148 /** One over the probability of taking work from a lower-priority queue. */ 149 int32_t lower_priority_chance; 150 } workerthread_t; 151 152 static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work); 153 static void workerthread_free_(workerthread_t *thread); 154 #define workerthread_free(thread) \ 155 FREE_AND_NULL(workerthread_t, workerthread_free_, (thread)) 156 static void replyqueue_free_(replyqueue_t *queue); 157 #define replyqueue_free(queue) \ 158 FREE_AND_NULL(replyqueue_t, replyqueue_free_, (queue)) 159 160 /** Allocate and return a new workqueue_entry_t, set up to run the function 161 * <b>fn</b> in the worker thread, and <b>reply_fn</b> in the main 162 * thread. See threadpool_queue_work() for full documentation. */ 163 static workqueue_entry_t * 164 workqueue_entry_new(workqueue_reply_t (*fn)(void*, void*), 165 void (*reply_fn)(void*), 166 void *arg) 167 { 168 workqueue_entry_t *ent = tor_malloc_zero(sizeof(workqueue_entry_t)); 169 ent->fn = fn; 170 ent->reply_fn = reply_fn; 171 ent->arg = arg; 172 ent->priority = WQ_PRI_HIGH; 173 return ent; 174 } 175 176 #define workqueue_entry_free(ent) \ 177 FREE_AND_NULL(workqueue_entry_t, workqueue_entry_free_, (ent)) 178 179 /** 180 * Release all storage held in <b>ent</b>. Call only when <b>ent</b> is not on 181 * any queue. 182 */ 183 static void 184 workqueue_entry_free_(workqueue_entry_t *ent) 185 { 186 if (!ent) 187 return; 188 memset(ent, 0xf0, sizeof(*ent)); 189 tor_free(ent); 190 } 191 192 /** 193 * Cancel a workqueue_entry_t that has been returned from 194 * threadpool_queue_work. 195 * 196 * You must not call this function on any work whose reply function has been 197 * executed in the main thread; that will cause undefined behavior (probably, 198 * a crash). 199 * 200 * If the work is cancelled, this function return the argument passed to the 201 * work function. It is the caller's responsibility to free this storage. 202 * 203 * This function will have no effect if the worker thread has already executed 204 * or begun to execute the work item. In that case, it will return NULL. 205 */ 206 void * 207 workqueue_entry_cancel(workqueue_entry_t *ent) 208 { 209 int cancelled = 0; 210 void *result = NULL; 211 tor_mutex_acquire(&ent->on_pool->lock); 212 workqueue_priority_t prio = ent->priority; 213 if (ent->pending) { 214 TOR_TAILQ_REMOVE(&ent->on_pool->work[prio], ent, next_work); 215 cancelled = 1; 216 result = ent->arg; 217 } 218 tor_mutex_release(&ent->on_pool->lock); 219 220 if (cancelled) { 221 workqueue_entry_free(ent); 222 } 223 return result; 224 } 225 226 /**DOCDOC 227 228 must hold lock */ 229 static int 230 worker_thread_has_work(workerthread_t *thread) 231 { 232 unsigned i; 233 for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) { 234 if (!TOR_TAILQ_EMPTY(&thread->in_pool->work[i])) 235 return 1; 236 } 237 return thread->generation != thread->in_pool->generation; 238 } 239 240 /** Extract the next workqueue_entry_t from the the thread's pool, removing 241 * it from the relevant queues and marking it as non-pending. 242 * 243 * The caller must hold the lock. */ 244 static workqueue_entry_t * 245 worker_thread_extract_next_work(workerthread_t *thread) 246 { 247 threadpool_t *pool = thread->in_pool; 248 work_tailq_t *queue = NULL, *this_queue; 249 unsigned i; 250 for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) { 251 this_queue = &pool->work[i]; 252 if (!TOR_TAILQ_EMPTY(this_queue)) { 253 queue = this_queue; 254 if (! crypto_fast_rng_one_in_n(get_thread_fast_rng(), 255 thread->lower_priority_chance)) { 256 /* Usually we'll just break now, so that we can get out of the loop 257 * and use the queue where we found work. But with a small 258 * probability, we'll keep looking for lower priority work, so that 259 * we don't ignore our low-priority queues entirely. */ 260 break; 261 } 262 } 263 } 264 265 if (queue == NULL) 266 return NULL; 267 268 workqueue_entry_t *work = TOR_TAILQ_FIRST(queue); 269 TOR_TAILQ_REMOVE(queue, work, next_work); 270 work->pending = 0; 271 return work; 272 } 273 274 /** 275 * Main function for the worker thread. 276 */ 277 static void 278 worker_thread_main(void *thread_) 279 { 280 static int n_worker_threads_running = 0; 281 static unsigned long control_lock_owner = 0; 282 workerthread_t *thread = thread_; 283 threadpool_t *pool = thread->in_pool; 284 workqueue_entry_t *work; 285 workqueue_reply_t result; 286 287 tor_mutex_acquire(&pool->control_lock); 288 log_debug(LD_GENERAL, "Worker thread %u/%u has started [TID: %lu].", 289 n_worker_threads_running + 1, pool->n_threads_max, 290 tor_get_thread_id()); 291 292 if (++n_worker_threads_running == pool->n_threads_max) 293 tor_cond_signal_one(&pool->condition); 294 295 tor_mutex_release(&pool->control_lock); 296 297 /* Wait until all worker threads have started. 298 * pool->lock must be prelocked here. */ 299 tor_mutex_acquire(&pool->lock); 300 301 if (control_lock_owner == 0) { 302 /* pool->control_lock stays locked. This is required for the main thread 303 * to wait for the worker threads to exit on shutdown, so the memory 304 * clean up won't begin before all threads have exited. */ 305 tor_mutex_acquire(&pool->control_lock); 306 control_lock_owner = tor_get_thread_id(); 307 } 308 309 log_debug(LD_GENERAL, "Worker thread has entered the work loop [TID: %lu].", 310 tor_get_thread_id()); 311 312 while (1) { 313 /* Exit thread when signaled to exit */ 314 if (pool->exit) 315 goto exit; 316 317 /* lock must be held at this point. */ 318 while (worker_thread_has_work(thread)) { 319 /* lock must be held at this point. */ 320 if (thread->in_pool->generation != thread->generation) { 321 void *arg = thread->in_pool->update_args[thread->index]; 322 thread->in_pool->update_args[thread->index] = NULL; 323 workqueue_reply_t (*update_fn)(void*,void*) = 324 thread->in_pool->update_fn; 325 thread->generation = thread->in_pool->generation; 326 tor_mutex_release(&pool->lock); 327 328 workqueue_reply_t r = update_fn(thread->state, arg); 329 330 tor_mutex_acquire(&pool->lock); 331 332 /* We may need to exit the thread. */ 333 if (r != WQ_RPL_REPLY) 334 goto exit; 335 336 continue; 337 } 338 work = worker_thread_extract_next_work(thread); 339 if (BUG(work == NULL)) 340 break; 341 tor_mutex_release(&pool->lock); 342 343 /* We run the work function without holding the thread lock. This 344 * is the main thread's first opportunity to give us more work. */ 345 result = work->fn(thread->state, work->arg); 346 347 /* Queue the reply for the main thread. */ 348 queue_reply(thread->reply_queue, work); 349 350 tor_mutex_acquire(&pool->lock); 351 352 /* We may need to exit the thread. */ 353 if (result != WQ_RPL_REPLY) 354 goto exit; 355 } 356 /* At this point the lock is held, and there is no work in this thread's 357 * queue. */ 358 359 /* TODO: support an idle-function */ 360 361 /* Okay. Now, wait till somebody has work for us. */ 362 if (tor_cond_wait(&pool->condition, &pool->lock, NULL) < 0) { 363 log_warn(LD_GENERAL, "Fail tor_cond_wait."); 364 } 365 } 366 367 exit: 368 /* At this point pool->lock must be held */ 369 370 log_debug(LD_GENERAL, "Worker thread %u/%u has exited [TID: %lu].", 371 pool->n_threads_max - n_worker_threads_running + 1, 372 pool->n_threads_max, tor_get_thread_id()); 373 374 if (tor_get_thread_id() == control_lock_owner) { 375 /* Wait for the other worker threads to exit so we 376 * can safely unlock pool->control_lock. */ 377 while (n_worker_threads_running > 1) { 378 tor_mutex_release(&pool->lock); 379 tor_sleep_msec(10); 380 tor_mutex_acquire(&pool->lock); 381 } 382 383 tor_mutex_release(&pool->lock); 384 /* Let the main thread know, the last worker thread has exited. */ 385 tor_mutex_release(&pool->control_lock); 386 } else { 387 --n_worker_threads_running; 388 tor_mutex_release(&pool->lock); 389 } 390 } 391 392 /** Put a reply on the reply queue. The reply must not currently be on 393 * any thread's work queue. */ 394 static void 395 queue_reply(replyqueue_t *queue, workqueue_entry_t *work) 396 { 397 int was_empty; 398 tor_mutex_acquire(&queue->lock); 399 was_empty = TOR_TAILQ_EMPTY(&queue->answers); 400 TOR_TAILQ_INSERT_TAIL(&queue->answers, work, next_work); 401 tor_mutex_release(&queue->lock); 402 403 if (was_empty) { 404 if (queue->alert.alert_fn(queue->alert.write_fd) < 0) { 405 /* XXXX complain! */ 406 } 407 } 408 } 409 410 /** Allocate and start a new worker thread to use state object <b>state</b>, 411 * and send responses to <b>replyqueue</b>. */ 412 static workerthread_t * 413 workerthread_new(int32_t lower_priority_chance, 414 void *state, threadpool_t *pool, replyqueue_t *replyqueue) 415 { 416 workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t)); 417 thr->state = state; 418 thr->reply_queue = replyqueue; 419 thr->in_pool = pool; 420 thr->lower_priority_chance = lower_priority_chance; 421 422 if (spawn_func(worker_thread_main, thr) < 0) { 423 //LCOV_EXCL_START 424 tor_assert_nonfatal_unreached(); 425 log_err(LD_GENERAL, "Can't launch worker thread."); 426 workerthread_free(thr); 427 return NULL; 428 //LCOV_EXCL_STOP 429 } 430 431 return thr; 432 } 433 434 /** 435 * Free up the resources allocated by a worker thread. 436 */ 437 static void 438 workerthread_free_(workerthread_t *thread) 439 { 440 tor_free(thread); 441 } 442 443 /** 444 * Queue an item of work for a thread in a thread pool. The function 445 * <b>fn</b> will be run in a worker thread, and will receive as arguments the 446 * thread's state object, and the provided object <b>arg</b>. It must return 447 * one of WQ_RPL_REPLY, WQ_RPL_ERROR, or WQ_RPL_SHUTDOWN. 448 * 449 * Regardless of its return value, the function <b>reply_fn</b> will later be 450 * run in the main thread when it invokes replyqueue_process(), and will 451 * receive as its argument the same <b>arg</b> object. It's the reply 452 * function's responsibility to free the work object. 453 * 454 * On success, return a workqueue_entry_t object that can be passed to 455 * workqueue_entry_cancel(). On failure, return NULL. (Failure is not 456 * currently possible, but callers should check anyway.) 457 * 458 * Items are executed in a loose priority order -- each thread will usually 459 * take from the queued work with the highest prioirity, but will occasionally 460 * visit lower-priority queues to keep them from starving completely. 461 * 462 * Note that because of priorities and thread behavior, work items may not 463 * be executed strictly in order. 464 */ 465 workqueue_entry_t * 466 threadpool_queue_work_priority(threadpool_t *pool, 467 workqueue_priority_t prio, 468 workqueue_reply_t (*fn)(void *, void *), 469 void (*reply_fn)(void *), 470 void *arg) 471 { 472 tor_assert(((int)prio) >= WORKQUEUE_PRIORITY_FIRST && 473 ((int)prio) <= WORKQUEUE_PRIORITY_LAST); 474 475 workqueue_entry_t *ent = workqueue_entry_new(fn, reply_fn, arg); 476 ent->on_pool = pool; 477 ent->pending = 1; 478 ent->priority = prio; 479 480 tor_mutex_acquire(&pool->lock); 481 482 TOR_TAILQ_INSERT_TAIL(&pool->work[prio], ent, next_work); 483 484 tor_cond_signal_one(&pool->condition); 485 486 tor_mutex_release(&pool->lock); 487 488 return ent; 489 } 490 491 /** As threadpool_queue_work_priority(), but assumes WQ_PRI_HIGH */ 492 workqueue_entry_t * 493 threadpool_queue_work(threadpool_t *pool, 494 workqueue_reply_t (*fn)(void *, void *), 495 void (*reply_fn)(void *), 496 void *arg) 497 { 498 return threadpool_queue_work_priority(pool, WQ_PRI_HIGH, fn, reply_fn, arg); 499 } 500 501 /** 502 * Queue a copy of a work item for every thread in a pool. This can be used, 503 * for example, to tell the threads to update some parameter in their states. 504 * 505 * Arguments are as for <b>threadpool_queue_work</b>, except that the 506 * <b>arg</b> value is passed to <b>dup_fn</b> once per each thread to 507 * make a copy of it. 508 * 509 * UPDATE FUNCTIONS MUST BE IDEMPOTENT. We do not guarantee that every update 510 * will be run. If a new update is scheduled before the old update finishes 511 * running, then the new will replace the old in any threads that haven't run 512 * it yet. 513 * 514 * Return 0 on success, -1 on failure. 515 */ 516 int 517 threadpool_queue_update(threadpool_t *pool, 518 void *(*dup_fn)(void *), 519 workqueue_reply_t (*fn)(void *, void *), 520 void (*free_fn)(void *), 521 void *arg) 522 { 523 int i, n_threads; 524 void (*old_args_free_fn)(void *arg); 525 void **old_args; 526 void **new_args; 527 528 tor_mutex_acquire(&pool->lock); 529 n_threads = pool->n_threads; 530 old_args = pool->update_args; 531 old_args_free_fn = pool->free_update_arg_fn; 532 533 new_args = tor_calloc(n_threads, sizeof(void*)); 534 for (i = 0; i < n_threads; ++i) { 535 if (dup_fn) 536 new_args[i] = dup_fn(arg); 537 else 538 new_args[i] = arg; 539 } 540 541 pool->update_args = new_args; 542 pool->free_update_arg_fn = free_fn; 543 pool->update_fn = fn; 544 ++pool->generation; 545 546 tor_cond_signal_all(&pool->condition); 547 548 tor_mutex_release(&pool->lock); 549 550 if (old_args) { 551 for (i = 0; i < n_threads; ++i) { 552 if (old_args[i] && old_args_free_fn) 553 old_args_free_fn(old_args[i]); 554 } 555 tor_free(old_args); 556 } 557 558 return 0; 559 } 560 561 /** Don't have more than this many threads per pool. */ 562 #define MAX_THREADS 1024 563 564 /** For half of our threads, choose lower priority queues with probability 565 * 1/N for each of these values. Both are chosen somewhat arbitrarily. If 566 * CHANCE_PERMISSIVE is too low, then we have a risk of low-priority tasks 567 * stalling forever. If it's too high, we have a risk of low-priority tasks 568 * grabbing half of the threads. */ 569 #define CHANCE_PERMISSIVE 37 570 #define CHANCE_STRICT INT32_MAX 571 572 /** Launch threads until we have <b>n</b>. */ 573 static int 574 threadpool_start_threads(threadpool_t *pool, int n) 575 { 576 if (BUG(n < 0)) 577 return -1; // LCOV_EXCL_LINE 578 if (n > MAX_THREADS) 579 n = MAX_THREADS; 580 581 tor_mutex_acquire(&pool->control_lock); 582 tor_mutex_acquire(&pool->lock); 583 584 if (pool->n_threads < n) 585 pool->threads = tor_reallocarray(pool->threads, 586 sizeof(workerthread_t*), n); 587 588 int status = 0; 589 pool->n_threads_max = n; 590 log_debug(LD_GENERAL, "Starting worker threads..."); 591 592 while (pool->n_threads < n) { 593 /* For half of our threads, we'll choose lower priorities permissively; 594 * for the other half, we'll stick more strictly to higher priorities. 595 * This keeps slow low-priority tasks from taking over completely. */ 596 int32_t chance = (pool->n_threads & 1) ? CHANCE_STRICT : CHANCE_PERMISSIVE; 597 598 void *state = pool->new_thread_state_fn(pool->new_thread_state_arg); 599 workerthread_t *thr = workerthread_new(chance, 600 state, pool, pool->reply_queue); 601 602 if (!thr) { 603 //LCOV_EXCL_START 604 tor_assert_nonfatal_unreached(); 605 pool->free_thread_state_fn(state); 606 status = -1; 607 goto check_status; 608 //LCOV_EXCL_STOP 609 } 610 thr->index = pool->n_threads; 611 pool->threads[pool->n_threads++] = thr; 612 } 613 614 struct timeval tv = {.tv_sec = 30, .tv_usec = 0}; 615 616 /* Wait for the last launched thread to confirm us, it has started. 617 * Wait max 30 seconds */ 618 status = tor_cond_wait(&pool->condition, &pool->control_lock, &tv); 619 620 check_status: 621 switch (status) { 622 case 0: 623 log_debug(LD_GENERAL, "Starting worker threads finished."); 624 break; 625 case -1: 626 log_warn(LD_GENERAL, "Failed to confirm worker threads' start up."); 627 break; 628 case 1: 629 log_warn(LD_GENERAL, "Failed to confirm worker threads' " 630 "start up after timeout."); 631 FALLTHROUGH; 632 default: 633 status = -1; 634 } 635 636 log_debug(LD_GENERAL, "Signaled the worker threads to enter the work loop."); 637 638 /* If we had an error, let the worker threads (if any) exit directly. */ 639 if (status != 0) { 640 pool->exit = 1; 641 log_debug(LD_GENERAL, "Signaled the worker threads to exit..."); 642 } 643 644 /* Let worker threads enter the work loop. */ 645 tor_mutex_release(&pool->lock); 646 /* Let one of the worker threads take the ownership of pool->control_lock. 647 * This is required for compliance with POSIX. */ 648 tor_mutex_release(&pool->control_lock); 649 650 return status; 651 } 652 653 /** Stop all worker threads */ 654 static void 655 threadpool_stop_threads(threadpool_t *pool) 656 { 657 tor_mutex_acquire(&pool->lock); 658 659 if (pool->exit == 0) { 660 /* Signal the worker threads to exit */ 661 pool->exit = 1; 662 /* If worker threads are waiting for work, let them continue to exit */ 663 tor_cond_signal_all(&pool->condition); 664 665 log_debug(LD_GENERAL, "Signaled worker threads to exit. " 666 "Waiting for them to exit..."); 667 } 668 669 tor_mutex_release(&pool->lock); 670 671 /* Wait until all worker threads have exited. 672 * pool->control_lock must be prelocked here. */ 673 tor_mutex_acquire(&pool->control_lock); 674 /* Unlock required, else main thread hangs on mutex uninit. */ 675 tor_mutex_release(&pool->control_lock); 676 677 /* If this message appears in the log before all threads have confirmed 678 * their exit, then pool->control_lock wasn't prelocked for some reason. */ 679 log_debug(LD_GENERAL, "All worker threads have exited."); 680 } 681 682 /** 683 * Construct a new thread pool with <b>n</b> worker threads, configured to 684 * send their output to <b>replyqueue</b>. The threads' states will be 685 * constructed with the <b>new_thread_state_fn</b> call, receiving <b>arg</b> 686 * as its argument. When the threads close, they will call 687 * <b>free_thread_state_fn</b> on their states. 688 */ 689 threadpool_t * 690 threadpool_new(int n_threads, 691 replyqueue_t *replyqueue, 692 void *(*new_thread_state_fn)(void*), 693 void (*free_thread_state_fn)(void*), 694 void *arg) 695 { 696 threadpool_t *pool; 697 pool = tor_malloc_zero(sizeof(threadpool_t)); 698 tor_mutex_init_nonrecursive(&pool->lock); 699 tor_cond_init(&pool->condition); 700 tor_mutex_init_nonrecursive(&pool->control_lock); 701 pool->exit = 0; 702 703 unsigned i; 704 for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) { 705 TOR_TAILQ_INIT(&pool->work[i]); 706 } 707 708 pool->new_thread_state_fn = new_thread_state_fn; 709 pool->new_thread_state_arg = arg; 710 pool->free_thread_state_fn = free_thread_state_fn; 711 pool->reply_queue = replyqueue; 712 713 if (threadpool_start_threads(pool, n_threads) < 0) { 714 //LCOV_EXCL_START 715 tor_assert_nonfatal_unreached(); 716 threadpool_free(pool); 717 return NULL; 718 //LCOV_EXCL_STOP 719 } 720 721 return pool; 722 } 723 724 /** 725 * Free up the resources allocated by worker threads, worker thread pool, ... 726 */ 727 void 728 threadpool_free_(threadpool_t *pool) 729 { 730 if (!pool) 731 return; 732 733 threadpool_stop_threads(pool); 734 735 log_debug(LD_GENERAL, "Beginning to clean up..."); 736 737 tor_cond_uninit(&pool->condition); 738 tor_mutex_uninit(&pool->lock); 739 tor_mutex_uninit(&pool->control_lock); 740 741 if (pool->threads) { 742 for (int i = 0; i != pool->n_threads; ++i) 743 workerthread_free(pool->threads[i]); 744 745 tor_free(pool->threads); 746 } 747 748 if (pool->update_args) { 749 if (!pool->free_update_arg_fn) 750 log_warn(LD_GENERAL, "Freeing pool->update_args not possible. " 751 "pool->free_update_arg_fn is not set."); 752 else 753 pool->free_update_arg_fn(pool->update_args); 754 } 755 756 if (pool->reply_event) { 757 if (tor_event_del(pool->reply_event) == -1) 758 log_warn(LD_GENERAL, "libevent error: deleting reply event failed."); 759 else 760 tor_event_free(pool->reply_event); 761 } 762 763 if (pool->reply_queue) 764 replyqueue_free(pool->reply_queue); 765 766 if (pool->new_thread_state_arg) { 767 if (!pool->free_thread_state_fn) 768 log_warn(LD_GENERAL, "Freeing pool->new_thread_state_arg not possible. " 769 "pool->free_thread_state_fn is not set."); 770 else 771 pool->free_thread_state_fn(pool->new_thread_state_arg); 772 } 773 774 tor_free(pool); 775 776 log_debug(LD_GENERAL, "Cleanup finished."); 777 } 778 779 /** Return the reply queue associated with a given thread pool. */ 780 replyqueue_t * 781 threadpool_get_replyqueue(threadpool_t *tp) 782 { 783 return tp->reply_queue; 784 } 785 786 /** Allocate a new reply queue. Reply queues are used to pass results from 787 * worker threads to the main thread. Since the main thread is running an 788 * IO-centric event loop, it needs to get woken up with means other than a 789 * condition variable. */ 790 replyqueue_t * 791 replyqueue_new(uint32_t alertsocks_flags) 792 { 793 replyqueue_t *rq; 794 795 rq = tor_malloc_zero(sizeof(replyqueue_t)); 796 if (alert_sockets_create(&rq->alert, alertsocks_flags) < 0) { 797 //LCOV_EXCL_START 798 replyqueue_free(rq); 799 return NULL; 800 //LCOV_EXCL_STOP 801 } 802 803 tor_mutex_init(&rq->lock); 804 TOR_TAILQ_INIT(&rq->answers); 805 806 return rq; 807 } 808 809 /** 810 * Free up the resources allocated by a reply queue. 811 */ 812 static void 813 replyqueue_free_(replyqueue_t *queue) 814 { 815 if (!queue) 816 return; 817 818 workqueue_entry_t *work; 819 820 while (!TOR_TAILQ_EMPTY(&queue->answers)) { 821 work = TOR_TAILQ_FIRST(&queue->answers); 822 TOR_TAILQ_REMOVE(&queue->answers, work, next_work); 823 workqueue_entry_free(work); 824 } 825 826 tor_free(queue); 827 } 828 829 /** Internal: Run from the libevent mainloop when there is work to handle in 830 * the reply queue handler. */ 831 static void 832 reply_event_cb(evutil_socket_t sock, short events, void *arg) 833 { 834 threadpool_t *tp = arg; 835 (void) sock; 836 (void) events; 837 replyqueue_process(tp->reply_queue); 838 if (tp->reply_cb) 839 tp->reply_cb(tp); 840 } 841 842 /** Register the threadpool <b>tp</b>'s reply queue with Tor's global 843 * libevent mainloop. If <b>cb</b> is provided, it is run after 844 * each time there is work to process from the reply queue. Return 0 on 845 * success, -1 on failure. 846 */ 847 int 848 threadpool_register_reply_event(threadpool_t *tp, 849 void (*cb)(threadpool_t *tp)) 850 { 851 struct event_base *base = tor_libevent_get_base(); 852 853 if (tp->reply_event) { 854 tor_event_free(tp->reply_event); 855 } 856 tp->reply_event = tor_event_new(base, 857 tp->reply_queue->alert.read_fd, 858 EV_READ|EV_PERSIST, 859 reply_event_cb, 860 tp); 861 tor_assert(tp->reply_event); 862 tp->reply_cb = cb; 863 return event_add(tp->reply_event, NULL); 864 } 865 866 /** 867 * Process all pending replies on a reply queue. The main thread should call 868 * this function every time the socket returned by replyqueue_get_socket() is 869 * readable. 870 */ 871 void 872 replyqueue_process(replyqueue_t *queue) 873 { 874 int r = queue->alert.drain_fn(queue->alert.read_fd); 875 if (r < 0) { 876 //LCOV_EXCL_START 877 static ratelim_t warn_limit = RATELIM_INIT(7200); 878 log_fn_ratelim(&warn_limit, LOG_WARN, LD_GENERAL, 879 "Failure from drain_fd: %s", 880 tor_socket_strerror(-r)); 881 //LCOV_EXCL_STOP 882 } 883 884 tor_mutex_acquire(&queue->lock); 885 while (!TOR_TAILQ_EMPTY(&queue->answers)) { 886 /* lock must be held at this point.*/ 887 workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers); 888 TOR_TAILQ_REMOVE(&queue->answers, work, next_work); 889 tor_mutex_release(&queue->lock); 890 work->on_pool = NULL; 891 892 work->reply_fn(work->arg); 893 workqueue_entry_free(work); 894 895 tor_mutex_acquire(&queue->lock); 896 } 897 898 tor_mutex_release(&queue->lock); 899 } 900 901 /** Return the number of threads configured for the given pool. */ 902 unsigned int 903 threadpool_get_n_threads(threadpool_t *tp) 904 { 905 tor_assert(tp); 906 return tp->n_threads; 907 }