cpuworker.c (21765B)
1 /* Copyright (c) 2003-2004, Roger Dingledine. 2 * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson. 3 * Copyright (c) 2007-2024, The Tor Project, Inc. */ 4 /* See LICENSE for licensing information */ 5 6 /** 7 * \file cpuworker.c 8 * \brief Uses the workqueue/threadpool code to farm CPU-intensive activities 9 * out to subprocesses. 10 * 11 * The multithreading backend for this module is in workqueue.c; this module 12 * specializes workqueue.c. 13 * 14 * Right now, we use this infrastructure 15 * <ul><li>for processing onionskins in onion.c 16 * <li>for compressing consensuses in consdiffmgr.c, 17 * <li>for calculating diffs and compressing them in consdiffmgr.c. 18 * <li>and for solving onion service PoW challenges in pow.c. 19 * </ul> 20 **/ 21 #include "core/or/or.h" 22 #include "core/or/channel.h" 23 #include "core/or/circuitlist.h" 24 #include "core/or/connection_or.h" 25 #include "core/or/congestion_control_common.h" 26 #include "core/or/congestion_control_flow.h" 27 #include "app/config/config.h" 28 #include "core/mainloop/cpuworker.h" 29 #include "lib/crypt_ops/crypto_rand.h" 30 #include "lib/crypt_ops/crypto_util.h" 31 #include "core/or/onion.h" 32 #include "feature/relay/circuitbuild_relay.h" 33 #include "feature/relay/onion_queue.h" 34 #include "feature/stats/rephist.h" 35 #include "feature/relay/router.h" 36 #include "feature/nodelist/networkstatus.h" 37 #include "lib/evloop/workqueue.h" 38 #include "core/crypto/onion_crypto.h" 39 40 #include "core/or/or_circuit_st.h" 41 42 static void queue_pending_tasks(void); 43 44 typedef struct worker_state_t { 45 int generation; 46 server_onion_keys_t *onion_keys; 47 } worker_state_t; 48 49 static void * 50 worker_state_new(void *arg) 51 { 52 worker_state_t *ws; 53 (void)arg; 54 ws = tor_malloc_zero(sizeof(worker_state_t)); 55 ws->onion_keys = server_onion_keys_new(); 56 return ws; 57 } 58 59 #define worker_state_free(ws) \ 60 FREE_AND_NULL(worker_state_t, worker_state_free_, (ws)) 61 62 static void 63 worker_state_free_(worker_state_t *ws) 64 { 65 if (!ws) 66 return; 67 server_onion_keys_free(ws->onion_keys); 68 tor_free(ws); 69 } 70 71 static void 72 worker_state_free_void(void *arg) 73 { 74 worker_state_free_(arg); 75 } 76 77 static threadpool_t *threadpool = NULL; 78 79 static uint32_t total_pending_tasks = 0; 80 static uint32_t max_pending_tasks = 128; 81 82 /** Return the consensus parameter max pending tasks per CPU. */ 83 static uint32_t 84 get_max_pending_tasks_per_cpu(const networkstatus_t *ns) 85 { 86 /* Total voodoo. Can we make this more sensible? Maybe, that is why we made it 87 * a consensus parameter so our future self can figure out this magic. */ 88 #define MAX_PENDING_TASKS_PER_CPU_DEFAULT 64 89 #define MAX_PENDING_TASKS_PER_CPU_MIN 1 90 #define MAX_PENDING_TASKS_PER_CPU_MAX INT32_MAX 91 92 return networkstatus_get_param(ns, "max_pending_tasks_per_cpu", 93 MAX_PENDING_TASKS_PER_CPU_DEFAULT, 94 MAX_PENDING_TASKS_PER_CPU_MIN, 95 MAX_PENDING_TASKS_PER_CPU_MAX); 96 } 97 98 /** Set the max pending tasks per CPU worker. This uses the consensus to check 99 * for the allowed number per CPU. The ns parameter can be NULL as in that no 100 * consensus is available at the time of setting this value. */ 101 static void 102 set_max_pending_tasks(const networkstatus_t *ns) 103 { 104 max_pending_tasks = 105 get_num_cpus(get_options()) * get_max_pending_tasks_per_cpu(ns); 106 } 107 108 /** Called when the consensus has changed. */ 109 void 110 cpuworker_consensus_has_changed(const networkstatus_t *ns) 111 { 112 tor_assert(ns); 113 set_max_pending_tasks(ns); 114 } 115 116 /** Initialize the cpuworker subsystem. */ 117 int 118 cpuworker_init(void) 119 { 120 /* 121 In our threadpool implementation, half the threads are permissive and 122 half are strict (when it comes to running lower-priority tasks). So we 123 always make sure we have at least two threads, so that there will be at 124 least one thread of each kind. 125 */ 126 const int n_threads = MAX(get_num_cpus(get_options()), 2); 127 threadpool = threadpool_new(n_threads, 128 replyqueue_new(0), 129 worker_state_new, 130 worker_state_free_void, 131 NULL); 132 133 if (!threadpool) { 134 log_err(LD_GENERAL, "Can't create worker thread pool"); 135 return -1; 136 } 137 138 int r = threadpool_register_reply_event(threadpool, NULL); 139 140 tor_assert(r == 0); 141 142 set_max_pending_tasks(NULL); 143 144 return 0; 145 } 146 147 /** Free all resources allocated by cpuworker. */ 148 void 149 cpuworker_free_all(void) 150 { 151 threadpool_free(threadpool); 152 } 153 154 /** Return the number of threads configured for our CPU worker. */ 155 unsigned int 156 cpuworker_get_n_threads(void) 157 { 158 if (!threadpool) { 159 return 0; 160 } 161 return threadpool_get_n_threads(threadpool); 162 } 163 164 /** Magic numbers to make sure our cpuworker_requests don't grow any 165 * mis-framing bugs. */ 166 #define CPUWORKER_REQUEST_MAGIC 0xda4afeed 167 #define CPUWORKER_REPLY_MAGIC 0x5eedf00d 168 169 /** A request sent to a cpuworker. */ 170 typedef struct cpuworker_request_t { 171 /** Magic number; must be CPUWORKER_REQUEST_MAGIC. */ 172 uint32_t magic; 173 174 /** Flag: Are we timing this request? */ 175 unsigned timed : 1; 176 /** If we're timing this request, when was it sent to the cpuworker? */ 177 struct timeval started_at; 178 179 /** A create cell for the cpuworker to process. */ 180 create_cell_t create_cell; 181 182 /** 183 * A copy of this relay's consensus params that are relevant to 184 * the circuit, for use in negotiation. */ 185 circuit_params_t circ_ns_params; 186 187 /* Turn the above into a tagged union if needed. */ 188 } cpuworker_request_t; 189 190 /** A reply sent by a cpuworker. */ 191 typedef struct cpuworker_reply_t { 192 /** Magic number; must be CPUWORKER_REPLY_MAGIC. */ 193 uint32_t magic; 194 195 /** True iff we got a successful request. */ 196 uint8_t success; 197 198 /** Are we timing this request? */ 199 unsigned int timed : 1; 200 /** What handshake type was the request? (Used for timing) */ 201 uint16_t handshake_type; 202 /** When did we send the request to the cpuworker? */ 203 struct timeval started_at; 204 /** Once the cpuworker received the request, how many microseconds did it 205 * take? (This shouldn't overflow; 4 billion micoseconds is over an hour, 206 * and we'll never have an onion handshake that takes so long.) */ 207 uint32_t n_usec; 208 209 /** Output of processing a create cell 210 * 211 * @{ 212 */ 213 /** The created cell to send back. */ 214 created_cell_t created_cell; 215 /** The keys to use on this circuit. */ 216 uint8_t keys[MAX_RELAY_KEY_MATERIAL_LEN]; 217 /** Length of the generated key material. */ 218 size_t keys_len; 219 /** Input to use for authenticating introduce1 cells. */ 220 uint8_t rend_auth_material[DIGEST_LEN]; 221 /** Negotiated circuit parameters. */ 222 circuit_params_t circ_params; 223 } cpuworker_reply_t; 224 225 typedef struct cpuworker_job_u_t { 226 or_circuit_t *circ; 227 union { 228 cpuworker_request_t request; 229 cpuworker_reply_t reply; 230 } u; 231 } cpuworker_job_t; 232 233 static workqueue_reply_t 234 update_state_threadfn(void *state_, void *work_) 235 { 236 worker_state_t *state = state_; 237 worker_state_t *update = work_; 238 server_onion_keys_free(state->onion_keys); 239 state->onion_keys = update->onion_keys; 240 update->onion_keys = NULL; 241 worker_state_free(update); 242 ++state->generation; 243 return WQ_RPL_REPLY; 244 } 245 246 /** Called when the onion key has changed so update all CPU worker(s) with 247 * new function pointers with which a new state will be generated. 248 */ 249 void 250 cpuworkers_rotate_keyinfo(void) 251 { 252 if (!threadpool) { 253 /* If we're a client, then we won't have cpuworkers, and we won't need 254 * to tell them to rotate their state. 255 */ 256 return; 257 } 258 if (threadpool_queue_update(threadpool, 259 worker_state_new, 260 update_state_threadfn, 261 worker_state_free_void, 262 NULL)) { 263 log_warn(LD_OR, "Failed to queue key update for worker threads."); 264 } 265 } 266 267 /** Indexed by handshake type: how many onionskins have we processed and 268 * counted of that type? */ 269 static uint64_t onionskins_n_processed[MAX_ONION_HANDSHAKE_TYPE+1]; 270 /** Indexed by handshake type, corresponding to the onionskins counted in 271 * onionskins_n_processed: how many microseconds have we spent in cpuworkers 272 * processing that kind of onionskin? */ 273 static uint64_t onionskins_usec_internal[MAX_ONION_HANDSHAKE_TYPE+1]; 274 /** Indexed by handshake type, corresponding to onionskins counted in 275 * onionskins_n_processed: how many microseconds have we spent waiting for 276 * cpuworkers to give us answers for that kind of onionskin? 277 */ 278 static uint64_t onionskins_usec_roundtrip[MAX_ONION_HANDSHAKE_TYPE+1]; 279 280 /** If any onionskin takes longer than this, we clip them to this 281 * time. (microseconds) */ 282 #define MAX_BELIEVABLE_ONIONSKIN_DELAY (2*1000*1000) 283 284 /** Return true iff we'd like to measure a handshake of type 285 * <b>onionskin_type</b>. Call only from the main thread. */ 286 static int 287 should_time_request(uint16_t onionskin_type) 288 { 289 /* If we've never heard of this type, we shouldn't even be here. */ 290 if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) 291 return 0; 292 /* Measure the first N handshakes of each type, to ensure we have a 293 * sample */ 294 if (onionskins_n_processed[onionskin_type] < 4096) 295 return 1; 296 297 /** Otherwise, measure with P=1/128. We avoid doing this for every 298 * handshake, since the measurement itself can take a little time. */ 299 return crypto_fast_rng_one_in_n(get_thread_fast_rng(), 128); 300 } 301 302 /** Return an estimate of how many microseconds we will need for a single 303 * cpuworker to process <b>n_requests</b> onionskins of type 304 * <b>onionskin_type</b>. */ 305 uint64_t 306 estimated_usec_for_onionskins(uint32_t n_requests, uint16_t onionskin_type) 307 { 308 if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */ 309 return 1000 * (uint64_t)n_requests; 310 if (PREDICT_UNLIKELY(onionskins_n_processed[onionskin_type] < 100)) { 311 /* Until we have 100 data points, just assume everything takes 1 msec. */ 312 return 1000 * (uint64_t)n_requests; 313 } else { 314 /* This can't overflow: we'll never have more than 500000 onionskins 315 * measured in onionskin_usec_internal, and they won't take anything near 316 * 1 sec each, and we won't have anything like 1 million queued 317 * onionskins. But that's 5e5 * 1e6 * 1e6, which is still less than 318 * UINT64_MAX. */ 319 return (onionskins_usec_internal[onionskin_type] * n_requests) / 320 onionskins_n_processed[onionskin_type]; 321 } 322 } 323 324 /** Compute the absolute and relative overhead of using the cpuworker 325 * framework for onionskins of type <b>onionskin_type</b>.*/ 326 static int 327 get_overhead_for_onionskins(uint32_t *usec_out, double *frac_out, 328 uint16_t onionskin_type) 329 { 330 uint64_t overhead; 331 332 *usec_out = 0; 333 *frac_out = 0.0; 334 335 if (onionskin_type > MAX_ONION_HANDSHAKE_TYPE) /* should be impossible */ 336 return -1; 337 if (onionskins_n_processed[onionskin_type] == 0 || 338 onionskins_usec_internal[onionskin_type] == 0 || 339 onionskins_usec_roundtrip[onionskin_type] == 0) 340 return -1; 341 342 overhead = onionskins_usec_roundtrip[onionskin_type] - 343 onionskins_usec_internal[onionskin_type]; 344 345 *usec_out = (uint32_t)(overhead / onionskins_n_processed[onionskin_type]); 346 *frac_out = ((double)overhead) / onionskins_usec_internal[onionskin_type]; 347 348 return 0; 349 } 350 351 /** If we've measured overhead for onionskins of type <b>onionskin_type</b>, 352 * log it. */ 353 void 354 cpuworker_log_onionskin_overhead(int severity, int onionskin_type, 355 const char *onionskin_type_name) 356 { 357 uint32_t overhead; 358 double relative_overhead; 359 int r; 360 361 r = get_overhead_for_onionskins(&overhead, &relative_overhead, 362 onionskin_type); 363 if (!overhead || r<0) 364 return; 365 366 log_fn(severity, LD_OR, 367 "%s onionskins have averaged %u usec overhead (%.2f%%) in " 368 "cpuworker code ", 369 onionskin_type_name, (unsigned)overhead, relative_overhead*100); 370 } 371 372 /** Handle a reply from the worker threads. */ 373 static void 374 cpuworker_onion_handshake_replyfn(void *work_) 375 { 376 cpuworker_job_t *job = work_; 377 cpuworker_reply_t rpl; 378 or_circuit_t *circ = NULL; 379 380 tor_assert(total_pending_tasks > 0); 381 --total_pending_tasks; 382 383 /* Could avoid this, but doesn't matter. */ 384 memcpy(&rpl, &job->u.reply, sizeof(rpl)); 385 386 tor_assert(rpl.magic == CPUWORKER_REPLY_MAGIC); 387 388 if (rpl.timed && rpl.success && 389 rpl.handshake_type <= MAX_ONION_HANDSHAKE_TYPE) { 390 /* Time how long this request took. The handshake_type check should be 391 needless, but let's leave it in to be safe. */ 392 struct timeval tv_end, tv_diff; 393 int64_t usec_roundtrip; 394 tor_gettimeofday(&tv_end); 395 timersub(&tv_end, &rpl.started_at, &tv_diff); 396 usec_roundtrip = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec; 397 if (usec_roundtrip >= 0 && 398 usec_roundtrip < MAX_BELIEVABLE_ONIONSKIN_DELAY) { 399 ++onionskins_n_processed[rpl.handshake_type]; 400 onionskins_usec_internal[rpl.handshake_type] += rpl.n_usec; 401 onionskins_usec_roundtrip[rpl.handshake_type] += usec_roundtrip; 402 if (onionskins_n_processed[rpl.handshake_type] >= 500000) { 403 /* Scale down every 500000 handshakes. On a busy server, that's 404 * less impressive than it sounds. */ 405 onionskins_n_processed[rpl.handshake_type] /= 2; 406 onionskins_usec_internal[rpl.handshake_type] /= 2; 407 onionskins_usec_roundtrip[rpl.handshake_type] /= 2; 408 } 409 } 410 } 411 412 circ = job->circ; 413 414 log_debug(LD_OR, 415 "Unpacking cpuworker reply %p, circ=%p, success=%d", 416 job, circ, rpl.success); 417 418 if (circ->base_.magic == DEAD_CIRCUIT_MAGIC) { 419 /* The circuit was supposed to get freed while the reply was 420 * pending. Instead, it got left for us to free so that we wouldn't freak 421 * out when the job->circ field wound up pointing to nothing. */ 422 log_debug(LD_OR, "Circuit died while reply was pending. Freeing memory."); 423 circ->base_.magic = 0; 424 tor_free(circ); 425 goto done_processing; 426 } 427 428 circ->workqueue_entry = NULL; 429 430 if (TO_CIRCUIT(circ)->marked_for_close) { 431 /* We already marked this circuit; we can't call it open. */ 432 log_debug(LD_OR,"circuit is already marked."); 433 goto done_processing; 434 } 435 436 if (rpl.success == 0) { 437 log_debug(LD_OR, 438 "decoding onionskin failed. " 439 "(Old key or bad software.) Closing."); 440 circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_TORPROTOCOL); 441 goto done_processing; 442 } 443 444 /* If the client asked for congestion control, if our consensus parameter 445 * allowed it to negotiate as enabled, allocate a congestion control obj. */ 446 if (rpl.circ_params.cc_enabled) { 447 if (get_options()->SbwsExit) { 448 TO_CIRCUIT(circ)->ccontrol = congestion_control_new(&rpl.circ_params, 449 CC_PATH_SBWS); 450 } else { 451 TO_CIRCUIT(circ)->ccontrol = congestion_control_new(&rpl.circ_params, 452 CC_PATH_EXIT); 453 } 454 } 455 456 circ->relay_cell_format = rpl.circ_params.cell_fmt; 457 458 if (onionskin_answer(circ, 459 &rpl.created_cell, 460 rpl.circ_params.crypto_alg, 461 (const char*)rpl.keys, rpl.keys_len, 462 rpl.rend_auth_material) < 0) { 463 log_warn(LD_OR,"onionskin_answer failed. Closing."); 464 circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_INTERNAL); 465 goto done_processing; 466 } 467 468 log_debug(LD_OR,"onionskin_answer succeeded. Yay."); 469 470 done_processing: 471 memwipe(&rpl, 0, sizeof(rpl)); 472 memwipe(job, 0, sizeof(*job)); 473 tor_free(job); 474 queue_pending_tasks(); 475 } 476 477 /** Implementation function for onion handshake requests. */ 478 static workqueue_reply_t 479 cpuworker_onion_handshake_threadfn(void *state_, void *work_) 480 { 481 worker_state_t *state = state_; 482 cpuworker_job_t *job = work_; 483 484 /* variables for onion processing */ 485 server_onion_keys_t *onion_keys = state->onion_keys; 486 cpuworker_request_t req; 487 cpuworker_reply_t rpl; 488 489 memcpy(&req, &job->u.request, sizeof(req)); 490 491 tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC); 492 memset(&rpl, 0, sizeof(rpl)); 493 494 const create_cell_t *cc = &req.create_cell; 495 created_cell_t *cell_out = &rpl.created_cell; 496 struct timeval tv_start = {0,0}, tv_end; 497 int n; 498 rpl.timed = req.timed; 499 rpl.started_at = req.started_at; 500 rpl.handshake_type = cc->handshake_type; 501 if (req.timed) 502 tor_gettimeofday(&tv_start); 503 rpl.keys_len = sizeof(rpl.keys); 504 n = onion_skin_server_handshake(cc->handshake_type, 505 cc->onionskin, cc->handshake_len, 506 onion_keys, 507 &req.circ_ns_params, 508 cell_out->reply, 509 sizeof(cell_out->reply), 510 rpl.keys, &rpl.keys_len, 511 rpl.rend_auth_material, 512 &rpl.circ_params); 513 514 if (n < 0) { 515 /* failure */ 516 log_debug(LD_OR,"onion_skin_server_handshake failed."); 517 memset(&rpl, 0, sizeof(rpl)); 518 rpl.success = 0; 519 } else { 520 /* success */ 521 log_debug(LD_OR,"onion_skin_server_handshake succeeded."); 522 cell_out->handshake_len = n; 523 switch (cc->cell_type) { 524 case CELL_CREATE: 525 cell_out->cell_type = CELL_CREATED; break; 526 case CELL_CREATE2: 527 cell_out->cell_type = CELL_CREATED2; break; 528 case CELL_CREATE_FAST: 529 cell_out->cell_type = CELL_CREATED_FAST; break; 530 default: 531 tor_assert(0); 532 return WQ_RPL_SHUTDOWN; 533 } 534 rpl.success = 1; 535 } 536 537 rpl.magic = CPUWORKER_REPLY_MAGIC; 538 if (req.timed) { 539 struct timeval tv_diff; 540 int64_t usec; 541 tor_gettimeofday(&tv_end); 542 timersub(&tv_end, &tv_start, &tv_diff); 543 usec = ((int64_t)tv_diff.tv_sec)*1000000 + tv_diff.tv_usec; 544 if (usec < 0 || usec > MAX_BELIEVABLE_ONIONSKIN_DELAY) 545 rpl.n_usec = MAX_BELIEVABLE_ONIONSKIN_DELAY; 546 else 547 rpl.n_usec = (uint32_t) usec; 548 } 549 550 memcpy(&job->u.reply, &rpl, sizeof(rpl)); 551 552 memwipe(&req, 0, sizeof(req)); 553 memwipe(&rpl, 0, sizeof(req)); 554 return WQ_RPL_REPLY; 555 } 556 557 /** Take pending tasks from the queue and assign them to cpuworkers. */ 558 static void 559 queue_pending_tasks(void) 560 { 561 or_circuit_t *circ; 562 create_cell_t *onionskin = NULL; 563 564 while (total_pending_tasks < max_pending_tasks) { 565 circ = onion_next_task(&onionskin); 566 567 if (!circ) 568 return; 569 570 if (assign_onionskin_to_cpuworker(circ, onionskin) < 0) 571 log_info(LD_OR,"assign_to_cpuworker failed. Ignoring."); 572 } 573 } 574 575 /** DOCDOC */ 576 MOCK_IMPL(workqueue_entry_t *, 577 cpuworker_queue_work,(workqueue_priority_t priority, 578 workqueue_reply_t (*fn)(void *, void *), 579 void (*reply_fn)(void *), 580 void *arg)) 581 { 582 tor_assert(threadpool); 583 584 return threadpool_queue_work_priority(threadpool, 585 priority, 586 fn, 587 reply_fn, 588 arg); 589 } 590 591 /** Try to tell a cpuworker to perform the public key operations necessary to 592 * respond to <b>onionskin</b> for the circuit <b>circ</b>. 593 * 594 * Return 0 if we successfully assign the task, or -1 on failure. 595 */ 596 int 597 assign_onionskin_to_cpuworker(or_circuit_t *circ, 598 create_cell_t *onionskin) 599 { 600 workqueue_entry_t *queue_entry; 601 cpuworker_job_t *job; 602 cpuworker_request_t req; 603 int should_time; 604 605 tor_assert(threadpool); 606 607 if (!circ->p_chan) { 608 log_info(LD_OR,"circ->p_chan gone. Failing circ."); 609 tor_free(onionskin); 610 return -1; 611 } 612 613 if (total_pending_tasks >= max_pending_tasks) { 614 log_debug(LD_OR,"No idle cpuworkers. Queuing."); 615 if (onion_pending_add(circ, onionskin) < 0) { 616 tor_free(onionskin); 617 return -1; 618 } 619 return 0; 620 } 621 622 if (!channel_is_client(circ->p_chan)) 623 rep_hist_note_circuit_handshake_assigned(onionskin->handshake_type); 624 625 should_time = should_time_request(onionskin->handshake_type); 626 memset(&req, 0, sizeof(req)); 627 req.magic = CPUWORKER_REQUEST_MAGIC; 628 req.timed = should_time; 629 630 memcpy(&req.create_cell, onionskin, sizeof(create_cell_t)); 631 632 tor_free(onionskin); 633 634 if (should_time) 635 tor_gettimeofday(&req.started_at); 636 637 /* Copy the current cached consensus params relevant to 638 * circuit negotiation into the CPU worker context */ 639 req.circ_ns_params.cc_enabled = congestion_control_enabled(); 640 req.circ_ns_params.sendme_inc_cells = congestion_control_sendme_inc(); 641 642 job = tor_malloc_zero(sizeof(cpuworker_job_t)); 643 job->circ = circ; 644 memcpy(&job->u.request, &req, sizeof(req)); 645 memwipe(&req, 0, sizeof(req)); 646 647 ++total_pending_tasks; 648 queue_entry = threadpool_queue_work_priority(threadpool, 649 WQ_PRI_HIGH, 650 cpuworker_onion_handshake_threadfn, 651 cpuworker_onion_handshake_replyfn, 652 job); 653 if (!queue_entry) { 654 log_warn(LD_BUG, "Couldn't queue work on threadpool"); 655 tor_free(job); 656 return -1; 657 } 658 659 log_debug(LD_OR, "Queued task %p (qe=%p, circ=%p)", 660 job, queue_entry, job->circ); 661 662 circ->workqueue_entry = queue_entry; 663 664 return 0; 665 } 666 667 /** If <b>circ</b> has a pending handshake that hasn't been processed yet, 668 * remove it from the worker queue. */ 669 void 670 cpuworker_cancel_circ_handshake(or_circuit_t *circ) 671 { 672 cpuworker_job_t *job; 673 if (circ->workqueue_entry == NULL) 674 return; 675 676 job = workqueue_entry_cancel(circ->workqueue_entry); 677 if (job) { 678 /* It successfully cancelled. */ 679 memwipe(job, 0xe0, sizeof(*job)); 680 tor_free(job); 681 tor_assert(total_pending_tasks > 0); 682 --total_pending_tasks; 683 /* if (!job), this is done in cpuworker_onion_handshake_replyfn. */ 684 circ->workqueue_entry = NULL; 685 } 686 }