conflux.c (31917B)
1 /* Copyright (c) 2021, The Tor Project, Inc. */ 2 /* See LICENSE for licensing information */ 3 4 /** 5 * \file conflux.c 6 * \brief Conflux multipath core algorithms 7 */ 8 9 #include "core/or/relay_msg.h" 10 #define TOR_CONFLUX_PRIVATE 11 12 #include "core/or/or.h" 13 14 #include "core/or/circuit_st.h" 15 #include "core/or/sendme.h" 16 #include "core/or/relay.h" 17 #include "core/or/congestion_control_common.h" 18 #include "core/or/congestion_control_st.h" 19 #include "core/or/origin_circuit_st.h" 20 #include "core/or/circuitlist.h" 21 #include "core/or/circuituse.h" 22 #include "core/or/conflux.h" 23 #include "core/or/conflux_params.h" 24 #include "core/or/conflux_util.h" 25 #include "core/or/conflux_pool.h" 26 #include "core/or/conflux_st.h" 27 #include "core/or/conflux_cell.h" 28 #include "lib/time/compat_time.h" 29 #include "app/config/config.h" 30 31 /** One million microseconds in a second */ 32 #define USEC_PER_SEC 1000000 33 34 static inline uint64_t cwnd_sendable(const circuit_t *on_circ, 35 uint64_t in_usec, uint64_t our_usec); 36 37 /* Track the total number of bytes used by all ooo_q so it can be used by the 38 * OOM handler to assess. 39 * 40 * When adding or subtracting to this value, use conflux_msg_alloc_cost(). */ 41 static uint64_t total_ooo_q_bytes = 0; 42 43 /** 44 * Determine if we should multiplex a specific relay command or not. 45 * 46 * TODO: Version of this that is the set of forbidden commands 47 * on linked circuits 48 */ 49 bool 50 conflux_should_multiplex(int relay_command) 51 { 52 switch (relay_command) { 53 /* These are all fine to multiplex, and must be 54 * so that ordering is preserved */ 55 case RELAY_COMMAND_BEGIN: 56 case RELAY_COMMAND_DATA: 57 case RELAY_COMMAND_END: 58 case RELAY_COMMAND_CONNECTED: 59 return true; 60 61 /* We can't multiplex these because they are 62 * circuit-specific */ 63 case RELAY_COMMAND_SENDME: 64 case RELAY_COMMAND_EXTEND: 65 case RELAY_COMMAND_EXTENDED: 66 case RELAY_COMMAND_TRUNCATE: 67 case RELAY_COMMAND_TRUNCATED: 68 case RELAY_COMMAND_DROP: 69 return false; 70 71 /* We must multiplex RESOLVEs because their ordering 72 * impacts begin/end. */ 73 case RELAY_COMMAND_RESOLVE: 74 case RELAY_COMMAND_RESOLVED: 75 return true; 76 77 /* These are all circuit-specific */ 78 case RELAY_COMMAND_BEGIN_DIR: 79 case RELAY_COMMAND_EXTEND2: 80 case RELAY_COMMAND_EXTENDED2: 81 case RELAY_COMMAND_ESTABLISH_INTRO: 82 case RELAY_COMMAND_ESTABLISH_RENDEZVOUS: 83 case RELAY_COMMAND_INTRODUCE1: 84 case RELAY_COMMAND_INTRODUCE2: 85 case RELAY_COMMAND_RENDEZVOUS1: 86 case RELAY_COMMAND_RENDEZVOUS2: 87 case RELAY_COMMAND_INTRO_ESTABLISHED: 88 case RELAY_COMMAND_RENDEZVOUS_ESTABLISHED: 89 case RELAY_COMMAND_INTRODUCE_ACK: 90 case RELAY_COMMAND_PADDING_NEGOTIATE: 91 case RELAY_COMMAND_PADDING_NEGOTIATED: 92 return false; 93 94 /* These must be multiplexed because their ordering 95 * relative to BEGIN/END must be preserved */ 96 case RELAY_COMMAND_XOFF: 97 case RELAY_COMMAND_XON: 98 return true; 99 100 /* These two are not multiplexed, because they must 101 * be processed immediately to update sequence numbers 102 * before any other cells are processed on the circuit */ 103 case RELAY_COMMAND_CONFLUX_SWITCH: 104 case RELAY_COMMAND_CONFLUX_LINK: 105 case RELAY_COMMAND_CONFLUX_LINKED: 106 case RELAY_COMMAND_CONFLUX_LINKED_ACK: 107 return false; 108 109 default: 110 log_warn(LD_BUG, "Conflux asked to multiplex unknown relay command %d", 111 relay_command); 112 return false; 113 } 114 } 115 116 /** Return the leg for a circuit in a conflux set. Return NULL if not found. */ 117 conflux_leg_t * 118 conflux_get_leg(conflux_t *cfx, const circuit_t *circ) 119 { 120 conflux_leg_t *leg_found = NULL; 121 tor_assert(cfx); 122 tor_assert(cfx->legs); 123 124 // Find the leg that the cell is written on 125 CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) { 126 if (leg->circ == circ) { 127 leg_found = leg; 128 break; 129 } 130 } CONFLUX_FOR_EACH_LEG_END(leg); 131 132 return leg_found; 133 } 134 135 /** 136 * Gets the maximum last_seq_sent from all legs. 137 */ 138 uint64_t 139 conflux_get_max_seq_sent(const conflux_t *cfx) 140 { 141 uint64_t max_seq_sent = 0; 142 143 CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) { 144 if (leg->last_seq_sent > max_seq_sent) { 145 max_seq_sent = leg->last_seq_sent; 146 } 147 } CONFLUX_FOR_EACH_LEG_END(leg); 148 149 return max_seq_sent; 150 } 151 152 /** 153 * Gets the maximum last_seq_recv from all legs. 154 */ 155 uint64_t 156 conflux_get_max_seq_recv(const conflux_t *cfx) 157 { 158 uint64_t max_seq_recv = 0; 159 160 CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) { 161 if (leg->last_seq_recv > max_seq_recv) { 162 max_seq_recv = leg->last_seq_recv; 163 } 164 } CONFLUX_FOR_EACH_LEG_END(leg); 165 166 return max_seq_recv; 167 } 168 169 /** Return the total memory allocation the circuit is using by conflux. If this 170 * circuit is not a Conflux circuit, 0 is returned. */ 171 uint64_t 172 conflux_get_circ_bytes_allocation(const circuit_t *circ) 173 { 174 if (circ->conflux) { 175 return smartlist_len(circ->conflux->ooo_q) * sizeof(void*) 176 + circ->conflux->ooo_q_alloc_cost; 177 } 178 return 0; 179 } 180 181 /** Return the total memory allocation in bytes by the subsystem. 182 * 183 * At the moment, only out of order queues are consiered. */ 184 uint64_t 185 conflux_get_total_bytes_allocation(void) 186 { 187 return total_ooo_q_bytes; 188 } 189 190 /** The OOM handler is asking us to try to free at least bytes_to_remove. */ 191 size_t 192 conflux_handle_oom(size_t bytes_to_remove) 193 { 194 (void) bytes_to_remove; 195 196 /* We are not doing anything on the sets, the OOM handler will trigger a 197 * circuit clean up which will affect conflux sets, by pruning oldest 198 * circuits. */ 199 200 log_info(LD_CIRC, "OOM handler triggered. OOO queus allocation: %" PRIu64, 201 total_ooo_q_bytes); 202 return 0; 203 } 204 205 /** 206 * Returns true if a circuit has package window space to send, and is 207 * not blocked locally. 208 */ 209 static inline bool 210 circuit_ready_to_send(const circuit_t *circ) 211 { 212 const congestion_control_t *cc = circuit_ccontrol(circ); 213 bool cc_sendable = true; 214 215 /* We consider ourselves blocked if we're within 1 sendme of the 216 * cwnd, because inflight is decremented before this check */ 217 // TODO-329-TUNING: This subtraction not be right.. It depends 218 // on call order wrt decisions and sendme arrival 219 if (cc->inflight >= cc->cwnd) { 220 cc_sendable = false; 221 } 222 223 /* Origin circuits use the package window of the last hop, and 224 * have an outbound cell direction (towards exit). Otherwise, 225 * there is no cpath and direction is inbound. */ 226 if (CIRCUIT_IS_ORIGIN(circ)) { 227 return cc_sendable && !circ->circuit_blocked_on_n_chan; 228 } else { 229 return cc_sendable && !circ->circuit_blocked_on_p_chan; 230 } 231 } 232 233 /** 234 * Return the circuit with the minimum RTT. Do not use any 235 * other circuit. 236 * 237 * This algorithm will minimize RTT always, and will not provide 238 * any throughput benefit. We expect it to be useful for VoIP/UDP 239 * use cases. Because it only uses one circuit on a leg at a time, 240 * it can have more than one circuit per guard (ie: to find 241 * lower-latency middles for the path). 242 */ 243 static const circuit_t * 244 conflux_decide_circ_minrtt(const conflux_t *cfx) 245 { 246 uint64_t min_rtt = UINT64_MAX; 247 const circuit_t *circ = NULL; 248 249 /* Can't get here without any legs. */ 250 tor_assert(CONFLUX_NUM_LEGS(cfx)); 251 252 CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) { 253 254 /* Ignore circuits with no RTT measurement */ 255 if (leg->circ_rtts_usec && leg->circ_rtts_usec < min_rtt) { 256 circ = leg->circ; 257 min_rtt = leg->circ_rtts_usec; 258 } 259 } CONFLUX_FOR_EACH_LEG_END(leg); 260 261 /* If the minRTT circuit can't send, dont send on any circuit. */ 262 if (!circ || !circuit_ready_to_send(circ)) { 263 return NULL; 264 } 265 return circ; 266 } 267 268 /** 269 * Favor the circuit with the lowest RTT that still has space in the 270 * congestion window. 271 * 272 * This algorithm will maximize total throughput at the expense of 273 * bloating out-of-order queues. 274 */ 275 static const circuit_t * 276 conflux_decide_circ_lowrtt(const conflux_t *cfx) 277 { 278 uint64_t low_rtt = UINT64_MAX; 279 const circuit_t *circ = NULL; 280 281 /* Can't get here without any legs. */ 282 tor_assert(CONFLUX_NUM_LEGS(cfx)); 283 284 CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) { 285 /* If the package window is full, skip it */ 286 if (!circuit_ready_to_send(leg->circ)) { 287 continue; 288 } 289 290 /* Ignore circuits with no RTT */ 291 if (leg->circ_rtts_usec && leg->circ_rtts_usec < low_rtt) { 292 low_rtt = leg->circ_rtts_usec; 293 circ = leg->circ; 294 } 295 } CONFLUX_FOR_EACH_LEG_END(leg); 296 297 /* At this point, if we found a circuit, we've already validated that its 298 * congestion window has room. */ 299 return circ; 300 } 301 302 /** 303 * Returns the amount of room in a cwnd on a circuit. 304 */ 305 static inline uint64_t 306 cwnd_available(const circuit_t *on_circ) 307 { 308 const congestion_control_t *cc = circuit_ccontrol(on_circ); 309 tor_assert(cc); 310 311 if (cc->cwnd < cc->inflight) 312 return 0; 313 314 return cc->cwnd - cc->inflight; 315 } 316 317 /** 318 * Return the amount of congestion window we can send on 319 * on_circ during in_usec. However, if we're still in 320 * slow-start, send the whole window to establish the true 321 * cwnd. 322 */ 323 static inline uint64_t 324 cwnd_sendable(const circuit_t *on_circ, uint64_t in_usec, 325 uint64_t our_usec) 326 { 327 const congestion_control_t *cc = circuit_ccontrol(on_circ); 328 tor_assert(cc); 329 uint64_t cwnd_adjusted = cwnd_available(on_circ); 330 331 if (our_usec == 0 || in_usec == 0) { 332 log_fn(LOG_PROTOCOL_WARN, LD_CIRC, 333 "cwnd_sendable: Missing RTT data. in_usec: %" PRIu64 334 " our_usec: %" PRIu64, in_usec, our_usec); 335 return cwnd_adjusted; 336 } 337 338 if (cc->in_slow_start) { 339 return cwnd_adjusted; 340 } else { 341 /* For any given leg, it has min_rtt/2 time before the 'primary' 342 * leg's acks start arriving. So, the amount of data this 343 * 'secondary' leg can send while the min_rtt leg transmits these 344 * acks is: 345 * (cwnd_leg/(leg_rtt/2))*min_rtt/2 = cwnd_leg*min_rtt/leg_rtt. 346 */ 347 uint64_t sendable = cwnd_adjusted*in_usec/our_usec; 348 return MIN(cc->cwnd, sendable); 349 } 350 } 351 352 /** 353 * Returns true if we can switch to a new circuit, false otherwise. 354 * 355 * This function assumes we're primarily switching between two circuits, 356 * the current and the prev. If we're using more than two circuits, we 357 * need to set cfx_drain_pct to 100. 358 */ 359 static inline bool 360 conflux_can_switch(const conflux_t *cfx) 361 { 362 /* If we still expected to send more cells on this circuit, 363 * we're only allowed to switch if the previous circuit emptied. */ 364 if (cfx->cells_until_switch > 0) { 365 /* If there is no prev leg, skip the inflight check. */ 366 if (!cfx->prev_leg) { 367 return false; 368 } 369 const congestion_control_t *ccontrol = 370 circuit_ccontrol(cfx->prev_leg->circ); 371 372 /* If the inflight count has drained to below cfx_drain_pct 373 * of the congestion window, then we can switch. 374 * We check the sendme_inc because there may be un-ackable 375 * data in inflight as well, and we can still switch then. */ 376 // TODO-329-TUNING: Should we try to switch if the prev_leg is 377 // ready to send, instead of this? 378 if (ccontrol->inflight < ccontrol->sendme_inc || 379 100*ccontrol->inflight <= 380 conflux_params_get_drain_pct()*ccontrol->cwnd) { 381 return true; 382 } 383 384 return false; 385 } 386 387 return true; 388 } 389 390 /** 391 * Favor the circuit with the lowest RTT that still has space in the 392 * congestion window up to the ratio of RTTs. 393 * 394 * This algorithm should only use auxillary legs up to the point 395 * where their data arrives roughly the same time as the lowest 396 * RTT leg. It will not utilize the full cwnd of auxillary legs, 397 * except in slow start. Therefore, out-of-order queue bloat should 398 * be minimized to just the slow-start phase. 399 */ 400 static const circuit_t * 401 conflux_decide_circ_cwndrtt(const conflux_t *cfx) 402 { 403 uint64_t min_rtt = UINT64_MAX; 404 const conflux_leg_t *leg = NULL; 405 406 /* Can't get here without any legs. */ 407 tor_assert(!CONFLUX_NUM_LEGS(cfx)); 408 409 /* Find the leg with the minimum RTT.*/ 410 CONFLUX_FOR_EACH_LEG_BEGIN(cfx, l) { 411 /* Ignore circuits with invalid RTT */ 412 if (l->circ_rtts_usec && l->circ_rtts_usec < min_rtt) { 413 min_rtt = l->circ_rtts_usec; 414 leg = l; 415 } 416 } CONFLUX_FOR_EACH_LEG_END(l); 417 418 /* If the package window is has room, use it */ 419 if (leg && circuit_ready_to_send(leg->circ)) { 420 return leg->circ; 421 } 422 423 leg = NULL; 424 425 CONFLUX_FOR_EACH_LEG_BEGIN(cfx, l) { 426 if (!circuit_ready_to_send(l->circ)) { 427 continue; 428 } 429 430 /* Pick a 'min_leg' with the lowest RTT that still has 431 * room in the congestion window. Note that this works for 432 * min_leg itself, up to inflight. */ 433 if (l->circ_rtts_usec && 434 cwnd_sendable(l->circ, min_rtt, l->circ_rtts_usec) > 0) { 435 leg = l; 436 } 437 } CONFLUX_FOR_EACH_LEG_END(l); 438 439 /* If the circuit can't send, don't send on any circuit. */ 440 if (!leg || !circuit_ready_to_send(leg->circ)) { 441 return NULL; 442 } 443 return leg->circ; 444 } 445 446 /** 447 * This function is called when we want to send a relay cell on a 448 * conflux, as well as when we want to compute available space in 449 * to package from streams. 450 * 451 * It determines the circuit that relay command should be sent on, 452 * and sends a SWITCH cell if necessary. 453 * 454 * It returns the circuit we should send on. If no circuits are ready 455 * to send, it returns NULL. 456 */ 457 circuit_t * 458 conflux_decide_circ_for_send(conflux_t *cfx, 459 circuit_t *orig_circ, 460 uint8_t relay_command) 461 { 462 /* If this command should not be multiplexed, send it on the original 463 * circuit */ 464 if (!conflux_should_multiplex(relay_command)) { 465 return orig_circ; 466 } 467 468 circuit_t *new_circ = conflux_decide_next_circ(cfx); 469 470 /* Because our congestion window only cover relay data command, we can end up 471 * in a situation where we need to send non data command when all circuits 472 * are at capacity. For those cases, keep using the *current* leg, 473 * so these commands arrive in-order. */ 474 if (!new_circ && relay_command != RELAY_COMMAND_DATA) { 475 /* Curr leg should be set, because conflux_decide_next_circ() should 476 * have set it earlier. No BUG() here because the only caller BUG()s. */ 477 if (!cfx->curr_leg) { 478 log_warn(LD_BUG, "No current leg for conflux with relay command %d", 479 relay_command); 480 return NULL; 481 } 482 return cfx->curr_leg->circ; 483 } 484 485 /* 486 * If we are switching to a new circuit, we need to send a SWITCH command. 487 * We also need to compute an estimate of how much data we can send on 488 * the new circuit before we are allowed to switch again, to rate 489 * limit the frequency of switching. 490 */ 491 if (new_circ) { 492 conflux_leg_t *new_leg = conflux_get_leg(cfx, new_circ); 493 tor_assert(cfx->curr_leg); 494 495 if (new_circ != cfx->curr_leg->circ) { 496 // TODO-329-TUNING: This is one mechanism to rate limit switching, 497 // which should reduce the OOQ mem. However, we're not going to do that 498 // until we get some data on if the memory usage is high 499 cfx->cells_until_switch = 0; 500 //cwnd_sendable(new_circ,cfx->curr_leg->circ_rtts_usec, 501 // new_leg->circ_rtts_usec); 502 503 conflux_validate_stream_lists(cfx); 504 505 cfx->prev_leg = cfx->curr_leg; 506 cfx->curr_leg = new_leg; 507 508 tor_assert(cfx->prev_leg); 509 tor_assert(cfx->curr_leg); 510 511 uint64_t relative_seq = cfx->prev_leg->last_seq_sent - 512 cfx->curr_leg->last_seq_sent; 513 514 if (cfx->curr_leg->last_seq_sent > cfx->prev_leg->last_seq_sent) { 515 /* Having incoherent sequence numbers, log warn about it but rate limit 516 * it to every hour so we avoid redundent report. */ 517 static ratelim_t rlimit = RATELIM_INIT(60 * 60); 518 log_fn_ratelim(&rlimit, LOG_WARN, LD_BUG, 519 "Current conflux leg last_seq_sent=%"PRIu64 520 " is above previous leg at %" PRIu64 ". Closing set.", 521 cfx->curr_leg->last_seq_sent, 522 cfx->prev_leg->last_seq_sent); 523 conflux_mark_all_for_close(cfx->nonce, CIRCUIT_IS_ORIGIN(new_circ), 524 END_CIRC_REASON_TORPROTOCOL); 525 return NULL; 526 } 527 528 /* On failure to send the SWITCH, we close everything. This means we have 529 * a protocol error or the sending failed and the circuit is closed. */ 530 if (!conflux_send_switch_command(cfx->curr_leg->circ, relative_seq)) { 531 conflux_mark_all_for_close(cfx->nonce, CIRCUIT_IS_ORIGIN(new_circ), 532 END_CIRC_REASON_TORPROTOCOL); 533 return NULL; 534 } 535 cfx->curr_leg->last_seq_sent = cfx->prev_leg->last_seq_sent; 536 } 537 } 538 539 return new_circ; 540 } 541 542 /** Called after conflux actually sent a cell on a circuit. 543 * This function updates sequence number counters, and 544 * switch counters. 545 */ 546 void 547 conflux_note_cell_sent(conflux_t *cfx, circuit_t *circ, uint8_t relay_command) 548 { 549 conflux_leg_t *leg = NULL; 550 551 if (!conflux_should_multiplex(relay_command)) { 552 return; 553 } 554 555 leg = conflux_get_leg(cfx, circ); 556 if (leg == NULL) { 557 log_fn(LOG_PROTOCOL_WARN, LD_BUG, "No Conflux leg after sending a cell"); 558 return; 559 } 560 561 leg->last_seq_sent++; 562 563 if (cfx->cells_until_switch > 0) { 564 cfx->cells_until_switch--; 565 } 566 } 567 568 /** Find the leg with lowest non-zero curr_rtt_usec, and 569 * pick it for our current leg. */ 570 static inline bool 571 conflux_pick_first_leg(conflux_t *cfx) 572 { 573 conflux_leg_t *min_leg = NULL; 574 575 CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) { 576 /* We need to skip 0-RTT legs, since this can happen at the exit 577 * when there is a race between BEGIN and LINKED_ACK, and BEGIN 578 * wins the race. The good news is that because BEGIN won, 579 * we don't need to consider those other legs, since they are 580 * slower. */ 581 if (leg->circ_rtts_usec > 0) { 582 if (!min_leg || leg->circ_rtts_usec < min_leg->circ_rtts_usec) { 583 min_leg = leg; 584 } 585 } 586 } CONFLUX_FOR_EACH_LEG_END(leg); 587 588 if (!min_leg) { 589 // Get the 0th leg; if it does not exist, log the set. 590 // Bug 40827 managed to hit this, so let's dump the sets 591 // in case it happens again. 592 if (BUG(smartlist_len(cfx->legs) <= 0)) { 593 // Since we have no legs, we have no idea if this is really a client 594 // or server set. Try to find any that match: 595 log_warn(LD_BUG, "Matching client sets:"); 596 conflux_log_set(LOG_WARN, cfx, true); 597 log_warn(LD_BUG, "Matching server sets:"); 598 conflux_log_set(LOG_WARN, cfx, false); 599 log_warn(LD_BUG, "End conflux set dump"); 600 return false; 601 } 602 603 min_leg = smartlist_get(cfx->legs, 0); 604 tor_assert(min_leg); 605 if (BUG(min_leg->linked_sent_usec == 0)) { 606 log_warn(LD_BUG, "Conflux has no legs with non-zero RTT. " 607 "Using first leg."); 608 conflux_log_set(LOG_WARN, cfx, CIRCUIT_IS_ORIGIN(min_leg->circ)); 609 } 610 } 611 612 // TODO-329-TUNING: We may want to initialize this to a cwnd, to 613 // minimize early switching? 614 //cfx->cells_until_switch = circuit_ccontrol(min_leg->circ)->cwnd; 615 cfx->cells_until_switch = 0; 616 617 cfx->curr_leg = min_leg; 618 619 return true; 620 } 621 622 /** 623 * Returns the circuit that conflux would send on next, if 624 * conflux_decide_circ_for_send were called. This is used to compute 625 * available space in the package window. 626 */ 627 circuit_t * 628 conflux_decide_next_circ(conflux_t *cfx) 629 { 630 // TODO-329-TUNING: Temporarily validate legs here. We can remove 631 // this once tuning is complete. 632 conflux_validate_legs(cfx); 633 634 /* If the conflux set is tearing down and has no current leg, 635 * bail and give up */ 636 if (cfx->in_full_teardown) { 637 return NULL; 638 } 639 640 /* If we don't have a current leg yet, pick one. 641 * (This is the only non-const operation in this function). */ 642 if (!cfx->curr_leg) { 643 if (!conflux_pick_first_leg(cfx)) 644 return NULL; 645 } 646 647 /* First, check if we can switch. */ 648 if (!conflux_can_switch(cfx)) { 649 tor_assert(cfx->curr_leg); 650 circuit_t *curr_circ = cfx->curr_leg->circ; 651 652 /* If we can't switch, and the current circuit can't send, 653 * then return null. */ 654 if (circuit_ready_to_send(curr_circ)) { 655 return curr_circ; 656 } 657 log_info(LD_CIRC, "Conflux can't switch; no circuit to send on."); 658 return NULL; 659 } 660 661 switch (cfx->params.alg) { 662 case CONFLUX_ALG_MINRTT: // latency (no ooq) 663 return (circuit_t*)conflux_decide_circ_minrtt(cfx); 664 case CONFLUX_ALG_LOWRTT: // high throughput (high oooq) 665 return (circuit_t*)conflux_decide_circ_lowrtt(cfx); 666 case CONFLUX_ALG_CWNDRTT: // throughput (low oooq) 667 return (circuit_t*)conflux_decide_circ_cwndrtt(cfx); 668 default: 669 return NULL; 670 } 671 } 672 673 /** 674 * Called when we have a new RTT estimate for a circuit. 675 */ 676 void 677 conflux_update_rtt(conflux_t *cfx, circuit_t *circ, uint64_t rtt_usec) 678 { 679 conflux_leg_t *leg = conflux_get_leg(cfx, circ); 680 681 if (!leg) { 682 log_warn(LD_BUG, "Got RTT update for circuit not in conflux"); 683 return; 684 } 685 686 // Update RTT 687 leg->circ_rtts_usec = rtt_usec; 688 689 // TODO-329-ARTI: For UDP latency targeting, arti could decide to launch 690 // new a test leg to potentially replace this one, if a latency target 691 // was requested and we now exceed it. Since C-Tor client likely 692 // will not have UDP support, we aren't doing this here. 693 } 694 695 /** 696 * Comparison function for ooo_q pqueue. 697 * 698 * Ensures that lower sequence numbers are at the head of the pqueue. 699 */ 700 static int 701 conflux_queue_cmp(const void *a, const void *b) 702 { 703 // Compare a and b as conflux_cell_t using the seq field, and return a 704 // comparison result such that the lowest seq is at the head of the pqueue. 705 const conflux_msg_t *cell_a = a; 706 const conflux_msg_t *cell_b = b; 707 708 tor_assert(cell_a); 709 tor_assert(cell_b); 710 711 if (cell_a->seq < cell_b->seq) { 712 return -1; 713 } else if (cell_a->seq > cell_b->seq) { 714 return 1; 715 } else { 716 return 0; 717 } 718 } 719 720 /** 721 * Get the congestion control object for a conflux circuit. 722 * 723 * Because conflux can only be negotiated with the last hop, we 724 * can use the last hop of the cpath to obtain the congestion 725 * control object for origin circuits. For non-origin circuits, 726 * we can use the circuit itself. 727 */ 728 const congestion_control_t * 729 circuit_ccontrol(const circuit_t *circ) 730 { 731 const congestion_control_t *ccontrol = NULL; 732 tor_assert(circ); 733 734 if (CIRCUIT_IS_ORIGIN(circ)) { 735 tor_assert(CONST_TO_ORIGIN_CIRCUIT(circ)->cpath); 736 tor_assert(CONST_TO_ORIGIN_CIRCUIT(circ)->cpath->prev); 737 ccontrol = CONST_TO_ORIGIN_CIRCUIT(circ)->cpath->prev->ccontrol; 738 } else { 739 ccontrol = circ->ccontrol; 740 } 741 742 /* Conflux circuits always have congestion control*/ 743 tor_assert(ccontrol); 744 return ccontrol; 745 } 746 747 // TODO-329-TUNING: For LowRTT, we can at most switch every SENDME, 748 // but for BLEST, we should switch at most every cwnd.. But 749 // we do not know the other side's CWND here.. We can at best 750 // asssume it is above the cwnd_min 751 #define CONFLUX_MIN_LINK_INCREMENT 31 752 /** 753 * Validate and handle RELAY_COMMAND_CONFLUX_SWITCH. 754 */ 755 int 756 conflux_process_switch_command(circuit_t *in_circ, 757 crypt_path_t *layer_hint, 758 const relay_msg_t *msg) 759 { 760 tor_assert(in_circ); 761 tor_assert(msg); 762 763 conflux_t *cfx = in_circ->conflux; 764 uint32_t relative_seq; 765 conflux_leg_t *leg; 766 767 if (!conflux_is_enabled(in_circ)) { 768 circuit_mark_for_close(in_circ, END_CIRC_REASON_TORPROTOCOL); 769 return -1; 770 } 771 772 /* If there is no conflux object negotiated, this is invalid. 773 * log and close circ */ 774 if (!cfx) { 775 log_warn(LD_BUG, "Got a conflux switch command on a circuit without " 776 "conflux negotiated. Closing circuit."); 777 778 circuit_mark_for_close(in_circ, END_CIRC_REASON_TORPROTOCOL); 779 return -1; 780 } 781 782 // TODO-329-TUNING: Temporarily validate that we have all legs. 783 // After tuning is complete, we can remove this. 784 conflux_validate_legs(cfx); 785 786 leg = conflux_get_leg(cfx, in_circ); 787 788 /* If we can't find the conflux leg, we got big problems.. 789 * Close the circuit. */ 790 if (!leg) { 791 log_warn(LD_BUG, "Got a conflux switch command on a circuit without " 792 "conflux leg. Closing circuit."); 793 circuit_mark_for_close(in_circ, END_CIRC_REASON_INTERNAL); 794 return -1; 795 } 796 797 // Check source hop via layer_hint 798 if (!conflux_validate_source_hop(in_circ, layer_hint)) { 799 log_warn(LD_BUG, "Got a conflux switch command on a circuit with " 800 "invalid source hop. Closing circuit."); 801 circuit_mark_for_close(in_circ, END_CIRC_REASON_TORPROTOCOL); 802 return -1; 803 } 804 805 relative_seq = conflux_cell_parse_switch(msg); 806 807 /* 808 * We have to make sure that the switch command is truely 809 * incrementing the sequence number, or else it becomes 810 * a side channel that can be spammed for traffic analysis. 811 */ 812 // TODO-329-TUNING: This can happen. Disabling for now.. 813 //if (relative_seq < CONFLUX_MIN_LINK_INCREMENT) { 814 // log_warn(LD_CIRC, "Got a conflux switch command with a relative " 815 // "sequence number less than the minimum increment. Closing " 816 // "circuit."); 817 // circuit_mark_for_close(in_circ, END_CIRC_REASON_TORPROTOCOL); 818 // return -1; 819 //} 820 821 // TODO-329-UDP: When Prop#340 exits and was negotiated, ensure we're 822 // in a packed cell, with another cell following, otherwise 823 // this is a spammed side-channel. 824 // - We definitely should never get switches back-to-back. 825 // - We should not get switches across all legs with no data 826 // But before Prop#340, it doesn't make much sense to do this. 827 // C-Tor is riddled with side-channels like this anyway, unless 828 // vanguards is in use. And this feature is not supported by 829 // onion servicees in C-Tor, so we're good there. 830 831 /* Update the absolute sequence number on this leg by the delta. 832 * Since this cell is not multiplexed, we do not count it towards 833 * absolute sequence numbers. We only increment the sequence 834 * numbers for multiplexed cells. Hence there is no +1 here. */ 835 leg->last_seq_recv += relative_seq; 836 837 /* Mark this data as validated for controlport and vanguards 838 * dropped cell handling */ 839 if (CIRCUIT_IS_ORIGIN(in_circ)) { 840 circuit_read_valid_data(TO_ORIGIN_CIRCUIT(in_circ), msg->length); 841 } 842 843 return 0; 844 } 845 846 /** 847 * Return the total number of required allocated to store `msg`. 848 */ 849 static inline size_t 850 conflux_msg_alloc_cost(conflux_msg_t *msg) 851 { 852 return msg->msg->length + sizeof(conflux_msg_t) + sizeof(relay_msg_t); 853 } 854 855 /** 856 * Process an incoming relay cell for conflux. Called from 857 * connection_edge_process_relay_cell(). 858 * 859 * Returns true if the conflux system now has well-ordered cells to deliver 860 * to streams, false otherwise. 861 */ 862 bool 863 conflux_process_relay_msg(conflux_t *cfx, circuit_t *in_circ, 864 crypt_path_t *layer_hint, const relay_msg_t *msg) 865 { 866 // TODO-329-TUNING: Temporarily validate legs here. We can remove 867 // this after tuning is complete. 868 conflux_validate_legs(cfx); 869 870 conflux_leg_t *leg = conflux_get_leg(cfx, in_circ); 871 if (!leg) { 872 log_warn(LD_BUG, "Got a conflux cell on a circuit without " 873 "conflux leg. Closing circuit."); 874 circuit_mark_for_close(in_circ, END_CIRC_REASON_INTERNAL); 875 return false; 876 } 877 878 /* We need to make sure this cell came from the expected hop, or 879 * else it could be a data corruption attack from a middle node. */ 880 if (!conflux_validate_source_hop(in_circ, layer_hint)) { 881 circuit_mark_for_close(in_circ, END_CIRC_REASON_TORPROTOCOL); 882 return false; 883 } 884 885 /* Update the running absolute sequence number */ 886 leg->last_seq_recv++; 887 888 /* If this cell is next, fast-path it by processing the cell in-place */ 889 if (leg->last_seq_recv == cfx->last_seq_delivered + 1) { 890 /* The cell is now ready to be processed, and rest of the queue should 891 * now be checked for remaining elements */ 892 cfx->last_seq_delivered++; 893 return true; 894 } else if (leg->last_seq_recv <= cfx->last_seq_delivered) { 895 /* Anyone can mangle these sequence number. */ 896 log_fn(LOG_PROTOCOL_WARN, LD_BUG, 897 "Got a conflux cell with a sequence number " 898 "less than the last delivered. Closing circuit."); 899 circuit_mark_for_close(in_circ, END_CIRC_REASON_INTERNAL); 900 return false; 901 } else { 902 /* Both cost and param are in bytes. */ 903 if (cfx->ooo_q_alloc_cost >= conflux_params_get_max_oooq()) { 904 /* Log rate limit every hour. In heavy DDoS scenario, this could be 905 * triggered many times so avoid the spam. */ 906 static ratelim_t rlimit = RATELIM_INIT(60 * 60); 907 log_fn_ratelim(&rlimit, LOG_WARN, LD_CIRC, 908 "Conflux OOO queue is at maximum. Currently at " 909 "%"TOR_PRIuSZ " bytes, maximum allowed is %u bytes. " 910 "Closing.", 911 cfx->ooo_q_alloc_cost, conflux_params_get_max_oooq()); 912 circuit_mark_for_close(in_circ, END_CIRC_REASON_RESOURCELIMIT); 913 return false; 914 } 915 conflux_msg_t *c_msg = tor_malloc_zero(sizeof(conflux_msg_t)); 916 c_msg->seq = leg->last_seq_recv; 917 /* Notice the copy here. Reason is that we don't have ownership of the 918 * message. If we wanted to pull that off, we would need to change the 919 * whole calling stack and unit tests on either not touching it after this 920 * function indicates that it has taken it or never allocate it from the 921 * stack. This is simpler and less error prone but might show up in our 922 * profile (maybe?). The Maze is serious. It needs to be respected. */ 923 c_msg->msg = relay_msg_copy(msg); 924 size_t cost = conflux_msg_alloc_cost(c_msg); 925 926 smartlist_pqueue_add(cfx->ooo_q, conflux_queue_cmp, 927 offsetof(conflux_msg_t, heap_idx), c_msg); 928 929 total_ooo_q_bytes += cost; 930 cfx->ooo_q_alloc_cost += cost; 931 932 /* This cell should not be processed yet, and the queue is not ready 933 * to process because the next absolute seqnum has not yet arrived */ 934 return false; 935 } 936 } 937 938 /** 939 * Dequeue the top cell from our queue. 940 * 941 * Returns the cell as a conflux_cell_t, or NULL if the queue is empty 942 * or has a hole. 943 */ 944 conflux_msg_t * 945 conflux_dequeue_relay_msg(circuit_t *circ) 946 { 947 conflux_msg_t *top = NULL; 948 /* Related to #41162. This is really a consequence of the C-tor maze. 949 * The function above can close a circuit without returning an error 950 * due to several return code ignored. Auditting all of the cell code 951 * path and fixing them to not ignore errors could bring many more 952 * issues as this behavior has been in tor forever. So do the bandaid 953 * fix of bailing if the circuit is closed. */ 954 if (circ->marked_for_close) { 955 static ratelim_t rlim = RATELIM_INIT(60 * 60); 956 log_fn_ratelim(&rlim, (circ->conflux == NULL) ? LOG_WARN : LOG_NOTICE, 957 LD_CIRC, 958 "Circuit was closed at %s:%u when dequeuing from OOO", 959 circ->marked_for_close_file, circ->marked_for_close); 960 return NULL; 961 } 962 conflux_t *cfx = circ->conflux; 963 if (cfx == NULL) { 964 static ratelim_t rlim = RATELIM_INIT(60 * 60); 965 log_fn_ratelim(&rlim, LOG_WARN, LD_CIRC, 966 "Bug: Non marked for close circuit with NULL conflux"); 967 return NULL; 968 } 969 if (cfx->ooo_q == NULL) { 970 static ratelim_t rlim = RATELIM_INIT(60 * 60); 971 log_fn_ratelim(&rlim, LOG_WARN, LD_CIRC, 972 "Bug: Non marked for close circuit with NULL OOO queue"); 973 return NULL; 974 } 975 976 if (smartlist_len(cfx->ooo_q) == 0) 977 return NULL; 978 979 top = smartlist_get(cfx->ooo_q, 0); 980 981 /* If the top cell is the next sequence number we need, then 982 * pop and return it. */ 983 if (top->seq == cfx->last_seq_delivered+1) { 984 smartlist_pqueue_pop(cfx->ooo_q, conflux_queue_cmp, 985 offsetof(conflux_msg_t, heap_idx)); 986 987 size_t cost = conflux_msg_alloc_cost(top); 988 total_ooo_q_bytes -= cost; 989 cfx->ooo_q_alloc_cost -= cost; 990 991 cfx->last_seq_delivered++; 992 return top; 993 } else { 994 return NULL; 995 } 996 } 997 998 /** Free a given conflux msg object. */ 999 void 1000 conflux_relay_msg_free_(conflux_msg_t *msg) 1001 { 1002 if (msg) { 1003 relay_msg_free(msg->msg); 1004 tor_free(msg); 1005 } 1006 }