tor

The Tor anonymity network
git clone https://git.dasho.dev/tor.git
Log | Files | Refs | README | LICENSE

conflux.c (31917B)


      1 /* Copyright (c) 2021, The Tor Project, Inc. */
      2 /* See LICENSE for licensing information */
      3 
      4 /**
      5 * \file conflux.c
      6 * \brief Conflux multipath core algorithms
      7 */
      8 
      9 #include "core/or/relay_msg.h"
     10 #define TOR_CONFLUX_PRIVATE
     11 
     12 #include "core/or/or.h"
     13 
     14 #include "core/or/circuit_st.h"
     15 #include "core/or/sendme.h"
     16 #include "core/or/relay.h"
     17 #include "core/or/congestion_control_common.h"
     18 #include "core/or/congestion_control_st.h"
     19 #include "core/or/origin_circuit_st.h"
     20 #include "core/or/circuitlist.h"
     21 #include "core/or/circuituse.h"
     22 #include "core/or/conflux.h"
     23 #include "core/or/conflux_params.h"
     24 #include "core/or/conflux_util.h"
     25 #include "core/or/conflux_pool.h"
     26 #include "core/or/conflux_st.h"
     27 #include "core/or/conflux_cell.h"
     28 #include "lib/time/compat_time.h"
     29 #include "app/config/config.h"
     30 
     31 /** One million microseconds in a second */
     32 #define USEC_PER_SEC 1000000
     33 
     34 static inline uint64_t cwnd_sendable(const circuit_t *on_circ,
     35                                     uint64_t in_usec, uint64_t our_usec);
     36 
     37 /* Track the total number of bytes used by all ooo_q so it can be used by the
     38 * OOM handler to assess.
     39 *
     40 * When adding or subtracting to this value, use conflux_msg_alloc_cost(). */
     41 static uint64_t total_ooo_q_bytes = 0;
     42 
     43 /**
     44 * Determine if we should multiplex a specific relay command or not.
     45 *
     46 * TODO: Version of this that is the set of forbidden commands
     47 * on linked circuits
     48 */
     49 bool
     50 conflux_should_multiplex(int relay_command)
     51 {
     52  switch (relay_command) {
     53    /* These are all fine to multiplex, and must be
     54     * so that ordering is preserved */
     55    case RELAY_COMMAND_BEGIN:
     56    case RELAY_COMMAND_DATA:
     57    case RELAY_COMMAND_END:
     58    case RELAY_COMMAND_CONNECTED:
     59      return true;
     60 
     61    /* We can't multiplex these because they are
     62     * circuit-specific */
     63    case RELAY_COMMAND_SENDME:
     64    case RELAY_COMMAND_EXTEND:
     65    case RELAY_COMMAND_EXTENDED:
     66    case RELAY_COMMAND_TRUNCATE:
     67    case RELAY_COMMAND_TRUNCATED:
     68    case RELAY_COMMAND_DROP:
     69      return false;
     70 
     71    /* We must multiplex RESOLVEs because their ordering
     72     * impacts begin/end. */
     73    case RELAY_COMMAND_RESOLVE:
     74    case RELAY_COMMAND_RESOLVED:
     75      return true;
     76 
     77    /* These are all circuit-specific */
     78    case RELAY_COMMAND_BEGIN_DIR:
     79    case RELAY_COMMAND_EXTEND2:
     80    case RELAY_COMMAND_EXTENDED2:
     81    case RELAY_COMMAND_ESTABLISH_INTRO:
     82    case RELAY_COMMAND_ESTABLISH_RENDEZVOUS:
     83    case RELAY_COMMAND_INTRODUCE1:
     84    case RELAY_COMMAND_INTRODUCE2:
     85    case RELAY_COMMAND_RENDEZVOUS1:
     86    case RELAY_COMMAND_RENDEZVOUS2:
     87    case RELAY_COMMAND_INTRO_ESTABLISHED:
     88    case RELAY_COMMAND_RENDEZVOUS_ESTABLISHED:
     89    case RELAY_COMMAND_INTRODUCE_ACK:
     90    case RELAY_COMMAND_PADDING_NEGOTIATE:
     91    case RELAY_COMMAND_PADDING_NEGOTIATED:
     92      return false;
     93 
     94    /* These must be multiplexed because their ordering
     95     * relative to BEGIN/END must be preserved */
     96    case RELAY_COMMAND_XOFF:
     97    case RELAY_COMMAND_XON:
     98      return true;
     99 
    100    /* These two are not multiplexed, because they must
    101     * be processed immediately to update sequence numbers
    102     * before any other cells are processed on the circuit */
    103    case RELAY_COMMAND_CONFLUX_SWITCH:
    104    case RELAY_COMMAND_CONFLUX_LINK:
    105    case RELAY_COMMAND_CONFLUX_LINKED:
    106    case RELAY_COMMAND_CONFLUX_LINKED_ACK:
    107      return false;
    108 
    109    default:
    110      log_warn(LD_BUG, "Conflux asked to multiplex unknown relay command %d",
    111               relay_command);
    112      return false;
    113  }
    114 }
    115 
    116 /** Return the leg for a circuit in a conflux set. Return NULL if not found. */
    117 conflux_leg_t *
    118 conflux_get_leg(conflux_t *cfx, const circuit_t *circ)
    119 {
    120  conflux_leg_t *leg_found = NULL;
    121  tor_assert(cfx);
    122  tor_assert(cfx->legs);
    123 
    124  // Find the leg that the cell is written on
    125  CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) {
    126    if (leg->circ == circ) {
    127      leg_found = leg;
    128      break;
    129    }
    130  } CONFLUX_FOR_EACH_LEG_END(leg);
    131 
    132  return leg_found;
    133 }
    134 
    135 /**
    136 * Gets the maximum last_seq_sent from all legs.
    137 */
    138 uint64_t
    139 conflux_get_max_seq_sent(const conflux_t *cfx)
    140 {
    141  uint64_t max_seq_sent = 0;
    142 
    143  CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) {
    144    if (leg->last_seq_sent > max_seq_sent) {
    145      max_seq_sent = leg->last_seq_sent;
    146    }
    147  } CONFLUX_FOR_EACH_LEG_END(leg);
    148 
    149  return max_seq_sent;
    150 }
    151 
    152 /**
    153 * Gets the maximum last_seq_recv from all legs.
    154 */
    155 uint64_t
    156 conflux_get_max_seq_recv(const conflux_t *cfx)
    157 {
    158  uint64_t max_seq_recv = 0;
    159 
    160  CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) {
    161    if (leg->last_seq_recv > max_seq_recv) {
    162      max_seq_recv = leg->last_seq_recv;
    163    }
    164  } CONFLUX_FOR_EACH_LEG_END(leg);
    165 
    166  return max_seq_recv;
    167 }
    168 
    169 /** Return the total memory allocation the circuit is using by conflux. If this
    170 * circuit is not a Conflux circuit, 0 is returned. */
    171 uint64_t
    172 conflux_get_circ_bytes_allocation(const circuit_t *circ)
    173 {
    174  if (circ->conflux) {
    175    return smartlist_len(circ->conflux->ooo_q) * sizeof(void*)
    176      + circ->conflux->ooo_q_alloc_cost;
    177  }
    178  return 0;
    179 }
    180 
    181 /** Return the total memory allocation in bytes by the subsystem.
    182 *
    183 * At the moment, only out of order queues are consiered. */
    184 uint64_t
    185 conflux_get_total_bytes_allocation(void)
    186 {
    187  return total_ooo_q_bytes;
    188 }
    189 
    190 /** The OOM handler is asking us to try to free at least bytes_to_remove. */
    191 size_t
    192 conflux_handle_oom(size_t bytes_to_remove)
    193 {
    194  (void) bytes_to_remove;
    195 
    196  /* We are not doing anything on the sets, the OOM handler will trigger a
    197   * circuit clean up which will affect conflux sets, by pruning oldest
    198   * circuits. */
    199 
    200  log_info(LD_CIRC, "OOM handler triggered. OOO queus allocation: %" PRIu64,
    201           total_ooo_q_bytes);
    202  return 0;
    203 }
    204 
    205 /**
    206 * Returns true if a circuit has package window space to send, and is
    207 * not blocked locally.
    208 */
    209 static inline bool
    210 circuit_ready_to_send(const circuit_t *circ)
    211 {
    212  const congestion_control_t *cc = circuit_ccontrol(circ);
    213  bool cc_sendable = true;
    214 
    215  /* We consider ourselves blocked if we're within 1 sendme of the
    216   * cwnd, because inflight is decremented before this check */
    217  // TODO-329-TUNING: This subtraction not be right.. It depends
    218  // on call order wrt decisions and sendme arrival
    219  if (cc->inflight >= cc->cwnd) {
    220    cc_sendable = false;
    221  }
    222 
    223  /* Origin circuits use the package window of the last hop, and
    224   * have an outbound cell direction (towards exit). Otherwise,
    225   * there is no cpath and direction is inbound. */
    226  if (CIRCUIT_IS_ORIGIN(circ)) {
    227    return cc_sendable && !circ->circuit_blocked_on_n_chan;
    228  } else {
    229    return cc_sendable && !circ->circuit_blocked_on_p_chan;
    230  }
    231 }
    232 
    233 /**
    234 * Return the circuit with the minimum RTT. Do not use any
    235 * other circuit.
    236 *
    237 * This algorithm will minimize RTT always, and will not provide
    238 * any throughput benefit. We expect it to be useful for VoIP/UDP
    239 * use cases. Because it only uses one circuit on a leg at a time,
    240 * it can have more than one circuit per guard (ie: to find
    241 * lower-latency middles for the path).
    242 */
    243 static const circuit_t *
    244 conflux_decide_circ_minrtt(const conflux_t *cfx)
    245 {
    246  uint64_t min_rtt = UINT64_MAX;
    247  const circuit_t *circ = NULL;
    248 
    249  /* Can't get here without any legs. */
    250  tor_assert(CONFLUX_NUM_LEGS(cfx));
    251 
    252  CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) {
    253 
    254    /* Ignore circuits with no RTT measurement */
    255    if (leg->circ_rtts_usec && leg->circ_rtts_usec < min_rtt) {
    256      circ = leg->circ;
    257      min_rtt = leg->circ_rtts_usec;
    258    }
    259  } CONFLUX_FOR_EACH_LEG_END(leg);
    260 
    261  /* If the minRTT circuit can't send, dont send on any circuit. */
    262  if (!circ || !circuit_ready_to_send(circ)) {
    263    return NULL;
    264  }
    265  return circ;
    266 }
    267 
    268 /**
    269 * Favor the circuit with the lowest RTT that still has space in the
    270 * congestion window.
    271 *
    272 * This algorithm will maximize total throughput at the expense of
    273 * bloating out-of-order queues.
    274 */
    275 static const circuit_t *
    276 conflux_decide_circ_lowrtt(const conflux_t *cfx)
    277 {
    278  uint64_t low_rtt = UINT64_MAX;
    279  const circuit_t *circ = NULL;
    280 
    281  /* Can't get here without any legs. */
    282  tor_assert(CONFLUX_NUM_LEGS(cfx));
    283 
    284  CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) {
    285    /* If the package window is full, skip it */
    286    if (!circuit_ready_to_send(leg->circ)) {
    287      continue;
    288    }
    289 
    290    /* Ignore circuits with no RTT */
    291    if (leg->circ_rtts_usec && leg->circ_rtts_usec < low_rtt) {
    292      low_rtt = leg->circ_rtts_usec;
    293      circ = leg->circ;
    294    }
    295  } CONFLUX_FOR_EACH_LEG_END(leg);
    296 
    297  /* At this point, if we found a circuit, we've already validated that its
    298   * congestion window has room. */
    299  return circ;
    300 }
    301 
    302 /**
    303 * Returns the amount of room in a cwnd on a circuit.
    304 */
    305 static inline uint64_t
    306 cwnd_available(const circuit_t *on_circ)
    307 {
    308  const congestion_control_t *cc = circuit_ccontrol(on_circ);
    309  tor_assert(cc);
    310 
    311  if (cc->cwnd < cc->inflight)
    312    return 0;
    313 
    314  return cc->cwnd - cc->inflight;
    315 }
    316 
    317 /**
    318 * Return the amount of congestion window we can send on
    319 * on_circ during in_usec. However, if we're still in
    320 * slow-start, send the whole window to establish the true
    321 * cwnd.
    322 */
    323 static inline uint64_t
    324 cwnd_sendable(const circuit_t *on_circ, uint64_t in_usec,
    325              uint64_t our_usec)
    326 {
    327  const congestion_control_t *cc = circuit_ccontrol(on_circ);
    328  tor_assert(cc);
    329  uint64_t cwnd_adjusted = cwnd_available(on_circ);
    330 
    331  if (our_usec == 0 || in_usec == 0) {
    332    log_fn(LOG_PROTOCOL_WARN, LD_CIRC,
    333       "cwnd_sendable: Missing RTT data. in_usec: %" PRIu64
    334       " our_usec: %" PRIu64, in_usec, our_usec);
    335    return cwnd_adjusted;
    336  }
    337 
    338  if (cc->in_slow_start) {
    339    return cwnd_adjusted;
    340  } else {
    341    /* For any given leg, it has min_rtt/2 time before the 'primary'
    342     * leg's acks start arriving. So, the amount of data this
    343     * 'secondary' leg can send while the min_rtt leg transmits these
    344     * acks is:
    345     *   (cwnd_leg/(leg_rtt/2))*min_rtt/2 = cwnd_leg*min_rtt/leg_rtt.
    346     */
    347    uint64_t sendable = cwnd_adjusted*in_usec/our_usec;
    348    return MIN(cc->cwnd, sendable);
    349  }
    350 }
    351 
    352 /**
    353 * Returns true if we can switch to a new circuit, false otherwise.
    354 *
    355 * This function assumes we're primarily switching between two circuits,
    356 * the current and the prev. If we're using more than two circuits, we
    357 * need to set cfx_drain_pct to 100.
    358 */
    359 static inline bool
    360 conflux_can_switch(const conflux_t *cfx)
    361 {
    362  /* If we still expected to send more cells on this circuit,
    363   * we're only allowed to switch if the previous circuit emptied. */
    364  if (cfx->cells_until_switch > 0) {
    365    /* If there is no prev leg, skip the inflight check. */
    366    if (!cfx->prev_leg) {
    367      return false;
    368    }
    369    const congestion_control_t *ccontrol =
    370      circuit_ccontrol(cfx->prev_leg->circ);
    371 
    372    /* If the inflight count has drained to below cfx_drain_pct
    373     * of the congestion window, then we can switch.
    374     * We check the sendme_inc because there may be un-ackable
    375     * data in inflight as well, and we can still switch then. */
    376    // TODO-329-TUNING: Should we try to switch if the prev_leg is
    377    // ready to send, instead of this?
    378    if (ccontrol->inflight < ccontrol->sendme_inc ||
    379        100*ccontrol->inflight <=
    380        conflux_params_get_drain_pct()*ccontrol->cwnd) {
    381      return true;
    382    }
    383 
    384    return false;
    385  }
    386 
    387  return true;
    388 }
    389 
    390 /**
    391 * Favor the circuit with the lowest RTT that still has space in the
    392 * congestion window up to the ratio of RTTs.
    393 *
    394 * This algorithm should only use auxillary legs up to the point
    395 * where their data arrives roughly the same time as the lowest
    396 * RTT leg. It will not utilize the full cwnd of auxillary legs,
    397 * except in slow start. Therefore, out-of-order queue bloat should
    398 * be minimized to just the slow-start phase.
    399 */
    400 static const circuit_t *
    401 conflux_decide_circ_cwndrtt(const conflux_t *cfx)
    402 {
    403  uint64_t min_rtt = UINT64_MAX;
    404  const conflux_leg_t *leg = NULL;
    405 
    406  /* Can't get here without any legs. */
    407  tor_assert(!CONFLUX_NUM_LEGS(cfx));
    408 
    409  /* Find the leg with the minimum RTT.*/
    410  CONFLUX_FOR_EACH_LEG_BEGIN(cfx, l) {
    411    /* Ignore circuits with invalid RTT */
    412    if (l->circ_rtts_usec && l->circ_rtts_usec < min_rtt) {
    413      min_rtt = l->circ_rtts_usec;
    414      leg = l;
    415    }
    416  } CONFLUX_FOR_EACH_LEG_END(l);
    417 
    418  /* If the package window is has room, use it */
    419  if (leg && circuit_ready_to_send(leg->circ)) {
    420    return leg->circ;
    421  }
    422 
    423  leg = NULL;
    424 
    425  CONFLUX_FOR_EACH_LEG_BEGIN(cfx, l) {
    426    if (!circuit_ready_to_send(l->circ)) {
    427      continue;
    428    }
    429 
    430    /* Pick a 'min_leg' with the lowest RTT that still has
    431     * room in the congestion window. Note that this works for
    432     * min_leg itself, up to inflight. */
    433    if (l->circ_rtts_usec &&
    434        cwnd_sendable(l->circ, min_rtt, l->circ_rtts_usec) > 0) {
    435      leg = l;
    436    }
    437  } CONFLUX_FOR_EACH_LEG_END(l);
    438 
    439  /* If the circuit can't send, don't send on any circuit. */
    440  if (!leg || !circuit_ready_to_send(leg->circ)) {
    441    return NULL;
    442  }
    443  return leg->circ;
    444 }
    445 
    446 /**
    447 * This function is called when we want to send a relay cell on a
    448 * conflux, as well as when we want to compute available space in
    449 * to package from streams.
    450 *
    451 * It determines the circuit that relay command should be sent on,
    452 * and sends a SWITCH cell if necessary.
    453 *
    454 * It returns the circuit we should send on. If no circuits are ready
    455 * to send, it returns NULL.
    456 */
    457 circuit_t *
    458 conflux_decide_circ_for_send(conflux_t *cfx,
    459                             circuit_t *orig_circ,
    460                             uint8_t relay_command)
    461 {
    462  /* If this command should not be multiplexed, send it on the original
    463   * circuit */
    464  if (!conflux_should_multiplex(relay_command)) {
    465    return orig_circ;
    466  }
    467 
    468  circuit_t *new_circ = conflux_decide_next_circ(cfx);
    469 
    470  /* Because our congestion window only cover relay data command, we can end up
    471   * in a situation where we need to send non data command when all circuits
    472   * are at capacity. For those cases, keep using the *current* leg,
    473   * so these commands arrive in-order. */
    474  if (!new_circ && relay_command != RELAY_COMMAND_DATA) {
    475    /* Curr leg should be set, because conflux_decide_next_circ() should
    476     * have set it earlier. No BUG() here because the only caller BUG()s. */
    477    if (!cfx->curr_leg) {
    478      log_warn(LD_BUG, "No current leg for conflux with relay command %d",
    479               relay_command);
    480      return NULL;
    481    }
    482    return cfx->curr_leg->circ;
    483  }
    484 
    485  /*
    486   * If we are switching to a new circuit, we need to send a SWITCH command.
    487   * We also need to compute an estimate of how much data we can send on
    488   * the new circuit before we are allowed to switch again, to rate
    489   * limit the frequency of switching.
    490   */
    491  if (new_circ) {
    492    conflux_leg_t *new_leg = conflux_get_leg(cfx, new_circ);
    493    tor_assert(cfx->curr_leg);
    494 
    495    if (new_circ != cfx->curr_leg->circ) {
    496      // TODO-329-TUNING: This is one mechanism to rate limit switching,
    497      // which should reduce the OOQ mem. However, we're not going to do that
    498      // until we get some data on if the memory usage is high
    499      cfx->cells_until_switch = 0;
    500        //cwnd_sendable(new_circ,cfx->curr_leg->circ_rtts_usec,
    501        //                         new_leg->circ_rtts_usec);
    502 
    503      conflux_validate_stream_lists(cfx);
    504 
    505      cfx->prev_leg = cfx->curr_leg;
    506      cfx->curr_leg = new_leg;
    507 
    508      tor_assert(cfx->prev_leg);
    509      tor_assert(cfx->curr_leg);
    510 
    511      uint64_t relative_seq = cfx->prev_leg->last_seq_sent -
    512                              cfx->curr_leg->last_seq_sent;
    513 
    514      if (cfx->curr_leg->last_seq_sent > cfx->prev_leg->last_seq_sent) {
    515        /* Having incoherent sequence numbers, log warn about it but rate limit
    516         * it to every hour so we avoid redundent report. */
    517        static ratelim_t rlimit = RATELIM_INIT(60 * 60);
    518        log_fn_ratelim(&rlimit, LOG_WARN, LD_BUG,
    519                       "Current conflux leg last_seq_sent=%"PRIu64
    520                       " is above previous leg at %" PRIu64 ". Closing set.",
    521                       cfx->curr_leg->last_seq_sent,
    522                       cfx->prev_leg->last_seq_sent);
    523        conflux_mark_all_for_close(cfx->nonce, CIRCUIT_IS_ORIGIN(new_circ),
    524                                   END_CIRC_REASON_TORPROTOCOL);
    525        return NULL;
    526      }
    527 
    528      /* On failure to send the SWITCH, we close everything. This means we have
    529       * a protocol error or the sending failed and the circuit is closed. */
    530      if (!conflux_send_switch_command(cfx->curr_leg->circ, relative_seq)) {
    531        conflux_mark_all_for_close(cfx->nonce, CIRCUIT_IS_ORIGIN(new_circ),
    532                                   END_CIRC_REASON_TORPROTOCOL);
    533        return NULL;
    534      }
    535      cfx->curr_leg->last_seq_sent = cfx->prev_leg->last_seq_sent;
    536    }
    537  }
    538 
    539  return new_circ;
    540 }
    541 
    542 /** Called after conflux actually sent a cell on a circuit.
    543 * This function updates sequence number counters, and
    544 * switch counters.
    545 */
    546 void
    547 conflux_note_cell_sent(conflux_t *cfx, circuit_t *circ, uint8_t relay_command)
    548 {
    549  conflux_leg_t *leg = NULL;
    550 
    551  if (!conflux_should_multiplex(relay_command)) {
    552    return;
    553  }
    554 
    555  leg = conflux_get_leg(cfx, circ);
    556  if (leg == NULL) {
    557    log_fn(LOG_PROTOCOL_WARN, LD_BUG, "No Conflux leg after sending a cell");
    558    return;
    559  }
    560 
    561  leg->last_seq_sent++;
    562 
    563  if (cfx->cells_until_switch > 0) {
    564    cfx->cells_until_switch--;
    565  }
    566 }
    567 
    568 /** Find the leg with lowest non-zero curr_rtt_usec, and
    569 * pick it for our current leg. */
    570 static inline bool
    571 conflux_pick_first_leg(conflux_t *cfx)
    572 {
    573  conflux_leg_t *min_leg = NULL;
    574 
    575  CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) {
    576    /* We need to skip 0-RTT legs, since this can happen at the exit
    577     * when there is a race between BEGIN and LINKED_ACK, and BEGIN
    578     * wins the race. The good news is that because BEGIN won,
    579     * we don't need to consider those other legs, since they are
    580     * slower. */
    581    if (leg->circ_rtts_usec > 0) {
    582      if (!min_leg || leg->circ_rtts_usec < min_leg->circ_rtts_usec) {
    583        min_leg = leg;
    584      }
    585    }
    586  } CONFLUX_FOR_EACH_LEG_END(leg);
    587 
    588  if (!min_leg) {
    589    // Get the 0th leg; if it does not exist, log the set.
    590    // Bug 40827 managed to hit this, so let's dump the sets
    591    // in case it happens again.
    592    if (BUG(smartlist_len(cfx->legs) <= 0)) {
    593      // Since we have no legs, we have no idea if this is really a client
    594      // or server set. Try to find any that match:
    595      log_warn(LD_BUG, "Matching client sets:");
    596      conflux_log_set(LOG_WARN, cfx, true);
    597      log_warn(LD_BUG, "Matching server sets:");
    598      conflux_log_set(LOG_WARN, cfx, false);
    599      log_warn(LD_BUG, "End conflux set dump");
    600      return false;
    601    }
    602 
    603    min_leg = smartlist_get(cfx->legs, 0);
    604    tor_assert(min_leg);
    605    if (BUG(min_leg->linked_sent_usec == 0)) {
    606      log_warn(LD_BUG, "Conflux has no legs with non-zero RTT. "
    607               "Using first leg.");
    608      conflux_log_set(LOG_WARN, cfx, CIRCUIT_IS_ORIGIN(min_leg->circ));
    609    }
    610  }
    611 
    612  // TODO-329-TUNING: We may want to initialize this to a cwnd, to
    613  // minimize early switching?
    614  //cfx->cells_until_switch = circuit_ccontrol(min_leg->circ)->cwnd;
    615  cfx->cells_until_switch = 0;
    616 
    617  cfx->curr_leg = min_leg;
    618 
    619  return true;
    620 }
    621 
    622 /**
    623 * Returns the circuit that conflux would send on next, if
    624 * conflux_decide_circ_for_send were called. This is used to compute
    625 * available space in the package window.
    626 */
    627 circuit_t *
    628 conflux_decide_next_circ(conflux_t *cfx)
    629 {
    630  // TODO-329-TUNING: Temporarily validate legs here. We can remove
    631  // this once tuning is complete.
    632  conflux_validate_legs(cfx);
    633 
    634  /* If the conflux set is tearing down and has no current leg,
    635   * bail and give up */
    636  if (cfx->in_full_teardown) {
    637    return NULL;
    638  }
    639 
    640  /* If we don't have a current leg yet, pick one.
    641   * (This is the only non-const operation in this function). */
    642  if (!cfx->curr_leg) {
    643    if (!conflux_pick_first_leg(cfx))
    644      return NULL;
    645  }
    646 
    647  /* First, check if we can switch. */
    648  if (!conflux_can_switch(cfx)) {
    649    tor_assert(cfx->curr_leg);
    650    circuit_t *curr_circ = cfx->curr_leg->circ;
    651 
    652    /* If we can't switch, and the current circuit can't send,
    653     * then return null. */
    654    if (circuit_ready_to_send(curr_circ)) {
    655      return curr_circ;
    656    }
    657    log_info(LD_CIRC, "Conflux can't switch; no circuit to send on.");
    658    return NULL;
    659  }
    660 
    661  switch (cfx->params.alg) {
    662    case CONFLUX_ALG_MINRTT: // latency (no ooq)
    663      return (circuit_t*)conflux_decide_circ_minrtt(cfx);
    664    case CONFLUX_ALG_LOWRTT: // high throughput (high oooq)
    665      return (circuit_t*)conflux_decide_circ_lowrtt(cfx);
    666    case CONFLUX_ALG_CWNDRTT: // throughput (low oooq)
    667      return (circuit_t*)conflux_decide_circ_cwndrtt(cfx);
    668    default:
    669      return NULL;
    670  }
    671 }
    672 
    673 /**
    674 * Called when we have a new RTT estimate for a circuit.
    675 */
    676 void
    677 conflux_update_rtt(conflux_t *cfx, circuit_t *circ, uint64_t rtt_usec)
    678 {
    679  conflux_leg_t *leg = conflux_get_leg(cfx, circ);
    680 
    681  if (!leg) {
    682    log_warn(LD_BUG, "Got RTT update for circuit not in conflux");
    683    return;
    684  }
    685 
    686  // Update RTT
    687  leg->circ_rtts_usec = rtt_usec;
    688 
    689  // TODO-329-ARTI: For UDP latency targeting, arti could decide to launch
    690  // new a test leg to potentially replace this one, if a latency target
    691  // was requested and we now exceed it. Since C-Tor client likely
    692  // will not have UDP support, we aren't doing this here.
    693 }
    694 
    695 /**
    696 * Comparison function for ooo_q pqueue.
    697 *
    698 * Ensures that lower sequence numbers are at the head of the pqueue.
    699 */
    700 static int
    701 conflux_queue_cmp(const void *a, const void *b)
    702 {
    703  // Compare a and b as conflux_cell_t using the seq field, and return a
    704  // comparison result such that the lowest seq is at the head of the pqueue.
    705  const conflux_msg_t *cell_a = a;
    706  const conflux_msg_t *cell_b = b;
    707 
    708  tor_assert(cell_a);
    709  tor_assert(cell_b);
    710 
    711  if (cell_a->seq < cell_b->seq) {
    712    return -1;
    713  } else if (cell_a->seq > cell_b->seq) {
    714    return 1;
    715  } else {
    716    return 0;
    717  }
    718 }
    719 
    720 /**
    721 * Get the congestion control object for a conflux circuit.
    722 *
    723 * Because conflux can only be negotiated with the last hop, we
    724 * can use the last hop of the cpath to obtain the congestion
    725 * control object for origin circuits. For non-origin circuits,
    726 * we can use the circuit itself.
    727 */
    728 const congestion_control_t *
    729 circuit_ccontrol(const circuit_t *circ)
    730 {
    731  const congestion_control_t *ccontrol = NULL;
    732  tor_assert(circ);
    733 
    734  if (CIRCUIT_IS_ORIGIN(circ)) {
    735    tor_assert(CONST_TO_ORIGIN_CIRCUIT(circ)->cpath);
    736    tor_assert(CONST_TO_ORIGIN_CIRCUIT(circ)->cpath->prev);
    737    ccontrol = CONST_TO_ORIGIN_CIRCUIT(circ)->cpath->prev->ccontrol;
    738  } else {
    739    ccontrol = circ->ccontrol;
    740  }
    741 
    742  /* Conflux circuits always have congestion control*/
    743  tor_assert(ccontrol);
    744  return ccontrol;
    745 }
    746 
    747 // TODO-329-TUNING: For LowRTT, we can at most switch every SENDME,
    748 // but for BLEST, we should switch at most every cwnd.. But
    749 // we do not know the other side's CWND here.. We can at best
    750 // asssume it is above the cwnd_min
    751 #define CONFLUX_MIN_LINK_INCREMENT 31
    752 /**
    753 * Validate and handle RELAY_COMMAND_CONFLUX_SWITCH.
    754 */
    755 int
    756 conflux_process_switch_command(circuit_t *in_circ,
    757                               crypt_path_t *layer_hint,
    758                               const relay_msg_t *msg)
    759 {
    760  tor_assert(in_circ);
    761  tor_assert(msg);
    762 
    763  conflux_t *cfx = in_circ->conflux;
    764  uint32_t relative_seq;
    765  conflux_leg_t *leg;
    766 
    767  if (!conflux_is_enabled(in_circ)) {
    768    circuit_mark_for_close(in_circ, END_CIRC_REASON_TORPROTOCOL);
    769    return -1;
    770  }
    771 
    772  /* If there is no conflux object negotiated, this is invalid.
    773   * log and close circ */
    774  if (!cfx) {
    775    log_warn(LD_BUG, "Got a conflux switch command on a circuit without "
    776             "conflux negotiated. Closing circuit.");
    777 
    778    circuit_mark_for_close(in_circ, END_CIRC_REASON_TORPROTOCOL);
    779    return -1;
    780  }
    781 
    782  // TODO-329-TUNING: Temporarily validate that we have all legs.
    783  // After tuning is complete, we can remove this.
    784  conflux_validate_legs(cfx);
    785 
    786  leg = conflux_get_leg(cfx, in_circ);
    787 
    788  /* If we can't find the conflux leg, we got big problems..
    789   * Close the circuit. */
    790  if (!leg) {
    791    log_warn(LD_BUG, "Got a conflux switch command on a circuit without "
    792             "conflux leg. Closing circuit.");
    793    circuit_mark_for_close(in_circ, END_CIRC_REASON_INTERNAL);
    794    return -1;
    795  }
    796 
    797  // Check source hop via layer_hint
    798  if (!conflux_validate_source_hop(in_circ, layer_hint)) {
    799    log_warn(LD_BUG, "Got a conflux switch command on a circuit with "
    800             "invalid source hop. Closing circuit.");
    801    circuit_mark_for_close(in_circ, END_CIRC_REASON_TORPROTOCOL);
    802    return -1;
    803  }
    804 
    805  relative_seq = conflux_cell_parse_switch(msg);
    806 
    807  /*
    808   * We have to make sure that the switch command is truely
    809   * incrementing the sequence number, or else it becomes
    810   * a side channel that can be spammed for traffic analysis.
    811   */
    812  // TODO-329-TUNING: This can happen. Disabling for now..
    813  //if (relative_seq < CONFLUX_MIN_LINK_INCREMENT) {
    814  // log_warn(LD_CIRC, "Got a conflux switch command with a relative "
    815  //          "sequence number less than the minimum increment. Closing "
    816  //          "circuit.");
    817  // circuit_mark_for_close(in_circ, END_CIRC_REASON_TORPROTOCOL);
    818  // return -1;
    819  //}
    820 
    821  // TODO-329-UDP: When Prop#340 exits and was negotiated, ensure we're
    822  // in a packed cell, with another cell following, otherwise
    823  // this is a spammed side-channel.
    824  //   - We definitely should never get switches back-to-back.
    825  //   - We should not get switches across all legs with no data
    826  // But before Prop#340, it doesn't make much sense to do this.
    827  // C-Tor is riddled with side-channels like this anyway, unless
    828  // vanguards is in use. And this feature is not supported by
    829  // onion servicees in C-Tor, so we're good there.
    830 
    831  /* Update the absolute sequence number on this leg by the delta.
    832   * Since this cell is not multiplexed, we do not count it towards
    833   * absolute sequence numbers. We only increment the sequence
    834   * numbers for multiplexed cells. Hence there is no +1 here. */
    835  leg->last_seq_recv += relative_seq;
    836 
    837  /* Mark this data as validated for controlport and vanguards
    838   * dropped cell handling */
    839  if (CIRCUIT_IS_ORIGIN(in_circ)) {
    840    circuit_read_valid_data(TO_ORIGIN_CIRCUIT(in_circ), msg->length);
    841  }
    842 
    843  return 0;
    844 }
    845 
    846 /**
    847 * Return the total number of required allocated to store `msg`.
    848 */
    849 static inline size_t
    850 conflux_msg_alloc_cost(conflux_msg_t *msg)
    851 {
    852  return msg->msg->length + sizeof(conflux_msg_t) + sizeof(relay_msg_t);
    853 }
    854 
    855 /**
    856 * Process an incoming relay cell for conflux. Called from
    857 * connection_edge_process_relay_cell().
    858 *
    859 * Returns true if the conflux system now has well-ordered cells to deliver
    860 * to streams, false otherwise.
    861 */
    862 bool
    863 conflux_process_relay_msg(conflux_t *cfx, circuit_t *in_circ,
    864                          crypt_path_t *layer_hint, const relay_msg_t *msg)
    865 {
    866  // TODO-329-TUNING: Temporarily validate legs here. We can remove
    867  // this after tuning is complete.
    868  conflux_validate_legs(cfx);
    869 
    870  conflux_leg_t *leg = conflux_get_leg(cfx, in_circ);
    871  if (!leg) {
    872    log_warn(LD_BUG, "Got a conflux cell on a circuit without "
    873             "conflux leg. Closing circuit.");
    874    circuit_mark_for_close(in_circ, END_CIRC_REASON_INTERNAL);
    875    return false;
    876  }
    877 
    878  /* We need to make sure this cell came from the expected hop, or
    879   * else it could be a data corruption attack from a middle node. */
    880  if (!conflux_validate_source_hop(in_circ, layer_hint)) {
    881    circuit_mark_for_close(in_circ, END_CIRC_REASON_TORPROTOCOL);
    882    return false;
    883  }
    884 
    885  /* Update the running absolute sequence number */
    886  leg->last_seq_recv++;
    887 
    888  /* If this cell is next, fast-path it by processing the cell in-place */
    889  if (leg->last_seq_recv == cfx->last_seq_delivered + 1) {
    890    /* The cell is now ready to be processed, and rest of the queue should
    891     * now be checked for remaining elements */
    892    cfx->last_seq_delivered++;
    893    return true;
    894  } else if (leg->last_seq_recv <= cfx->last_seq_delivered) {
    895    /* Anyone can mangle these sequence number. */
    896    log_fn(LOG_PROTOCOL_WARN, LD_BUG,
    897           "Got a conflux cell with a sequence number "
    898           "less than the last delivered. Closing circuit.");
    899    circuit_mark_for_close(in_circ, END_CIRC_REASON_INTERNAL);
    900    return false;
    901  } else {
    902    /* Both cost and param are in bytes. */
    903    if (cfx->ooo_q_alloc_cost >= conflux_params_get_max_oooq()) {
    904      /* Log rate limit every hour. In heavy DDoS scenario, this could be
    905       * triggered many times so avoid the spam. */
    906      static ratelim_t rlimit = RATELIM_INIT(60 * 60);
    907      log_fn_ratelim(&rlimit, LOG_WARN, LD_CIRC,
    908                     "Conflux OOO queue is at maximum. Currently at "
    909                     "%"TOR_PRIuSZ " bytes, maximum allowed is %u bytes. "
    910                     "Closing.",
    911                     cfx->ooo_q_alloc_cost, conflux_params_get_max_oooq());
    912      circuit_mark_for_close(in_circ, END_CIRC_REASON_RESOURCELIMIT);
    913      return false;
    914    }
    915    conflux_msg_t *c_msg = tor_malloc_zero(sizeof(conflux_msg_t));
    916    c_msg->seq = leg->last_seq_recv;
    917    /* Notice the copy here. Reason is that we don't have ownership of the
    918     * message. If we wanted to pull that off, we would need to change the
    919     * whole calling stack and unit tests on either not touching it after this
    920     * function indicates that it has taken it or never allocate it from the
    921     * stack. This is simpler and less error prone but might show up in our
    922     * profile (maybe?). The Maze is serious. It needs to be respected. */
    923    c_msg->msg = relay_msg_copy(msg);
    924    size_t cost = conflux_msg_alloc_cost(c_msg);
    925 
    926    smartlist_pqueue_add(cfx->ooo_q, conflux_queue_cmp,
    927                         offsetof(conflux_msg_t, heap_idx), c_msg);
    928 
    929    total_ooo_q_bytes += cost;
    930    cfx->ooo_q_alloc_cost += cost;
    931 
    932    /* This cell should not be processed yet, and the queue is not ready
    933     * to process because the next absolute seqnum has not yet arrived */
    934    return false;
    935  }
    936 }
    937 
    938 /**
    939 * Dequeue the top cell from our queue.
    940 *
    941 * Returns the cell as a conflux_cell_t, or NULL if the queue is empty
    942 * or has a hole.
    943 */
    944 conflux_msg_t *
    945 conflux_dequeue_relay_msg(circuit_t *circ)
    946 {
    947  conflux_msg_t *top = NULL;
    948  /* Related to #41162. This is really a consequence of the C-tor maze.
    949   * The function above can close a circuit without returning an error
    950   * due to several return code ignored. Auditting all of the cell code
    951   * path and fixing them to not ignore errors could bring many more
    952   * issues as this behavior has been in tor forever. So do the bandaid
    953   * fix of bailing if the circuit is closed. */
    954  if (circ->marked_for_close) {
    955    static ratelim_t rlim = RATELIM_INIT(60 * 60);
    956    log_fn_ratelim(&rlim, (circ->conflux == NULL) ? LOG_WARN : LOG_NOTICE,
    957                   LD_CIRC,
    958                   "Circuit was closed at %s:%u when dequeuing from OOO",
    959                   circ->marked_for_close_file, circ->marked_for_close);
    960    return NULL;
    961  }
    962  conflux_t *cfx = circ->conflux;
    963  if (cfx == NULL) {
    964    static ratelim_t rlim = RATELIM_INIT(60 * 60);
    965    log_fn_ratelim(&rlim, LOG_WARN, LD_CIRC,
    966                   "Bug: Non marked for close circuit with NULL conflux");
    967    return NULL;
    968  }
    969  if (cfx->ooo_q == NULL) {
    970    static ratelim_t rlim = RATELIM_INIT(60 * 60);
    971    log_fn_ratelim(&rlim, LOG_WARN, LD_CIRC,
    972                   "Bug: Non marked for close circuit with NULL OOO queue");
    973    return NULL;
    974  }
    975 
    976  if (smartlist_len(cfx->ooo_q) == 0)
    977    return NULL;
    978 
    979  top = smartlist_get(cfx->ooo_q, 0);
    980 
    981  /* If the top cell is the next sequence number we need, then
    982   * pop and return it. */
    983  if (top->seq == cfx->last_seq_delivered+1) {
    984    smartlist_pqueue_pop(cfx->ooo_q, conflux_queue_cmp,
    985                         offsetof(conflux_msg_t, heap_idx));
    986 
    987    size_t cost = conflux_msg_alloc_cost(top);
    988    total_ooo_q_bytes -= cost;
    989    cfx->ooo_q_alloc_cost -= cost;
    990 
    991    cfx->last_seq_delivered++;
    992    return top;
    993  } else {
    994    return NULL;
    995  }
    996 }
    997 
    998 /** Free a given conflux msg object. */
    999 void
   1000 conflux_relay_msg_free_(conflux_msg_t *msg)
   1001 {
   1002  if (msg) {
   1003    relay_msg_free(msg->msg);
   1004    tor_free(msg);
   1005  }
   1006 }