tor

The Tor anonymity network
git clone https://git.dasho.dev/tor.git
Log | Files | Refs | README | LICENSE

congestion_control_flow.c (25633B)


      1 /* Copyright (c) 2019-2021, The Tor Project, Inc. */
      2 /* See LICENSE for licensing information */
      3 
      4 /**
      5 * \file congestion_control_flow.c
      6 * \brief Code that implements flow control for congestion controlled
      7 *        circuits.
      8 */
      9 
     10 #define TOR_CONGESTION_CONTROL_FLOW_PRIVATE
     11 
     12 #include "core/or/or.h"
     13 
     14 #include "core/or/relay.h"
     15 #include "core/mainloop/connection.h"
     16 #include "core/or/connection_edge.h"
     17 #include "core/mainloop/mainloop.h"
     18 #include "core/or/congestion_control_common.h"
     19 #include "core/or/congestion_control_flow.h"
     20 #include "core/or/congestion_control_st.h"
     21 #include "core/or/circuitlist.h"
     22 #include "core/or/trace_probes_cc.h"
     23 #include "feature/nodelist/networkstatus.h"
     24 #include "trunnel/flow_control_cells.h"
     25 #include "feature/control/control_events.h"
     26 #include "lib/math/stats.h"
     27 
     28 #include "core/or/connection_st.h"
     29 #include "core/or/cell_st.h"
     30 #include "app/config/config.h"
     31 #include "core/or/conflux_util.h"
     32 
     33 /** Cache consensus parameters */
     34 static uint32_t xoff_client;
     35 static uint32_t xoff_exit;
     36 
     37 static uint32_t xon_change_pct;
     38 static uint32_t xon_ewma_cnt;
     39 static uint32_t xon_rate_bytes;
     40 
     41 /** Metricsport stats */
     42 uint64_t cc_stats_flow_num_xoff_sent;
     43 uint64_t cc_stats_flow_num_xon_sent;
     44 double cc_stats_flow_xoff_outbuf_ma = 0;
     45 double cc_stats_flow_xon_outbuf_ma = 0;
     46 
     47 /* In normal operation, we can get a burst of up to 32 cells before returning
     48 * to libevent to flush the outbuf. This is a heuristic from hardcoded values
     49 * and strange logic in connection_bucket_get_share(). */
     50 #define MAX_EXPECTED_CELL_BURST 32
     51 
     52 /* This is the grace period that we use to give the edge connection a chance to
     53 * reduce its outbuf before we send an XOFF.
     54 *
     55 * The congestion control spec says:
     56 * > If the length of an edge outbuf queue exceeds the size provided in the
     57 * > appropriate client or exit XOFF consensus parameter, a
     58 * > RELAY_COMMAND_STREAM_XOFF will be sent
     59 *
     60 * This doesn't directly adapt well to tor, where we process many incoming
     61 * messages at once. We may buffer a lot of stream data before giving the
     62 * mainloop a chance to flush the the edge connection's outbuf, even if the
     63 * edge connection's socket is able to accept more bytes.
     64 *
     65 * Instead if we detect that we should send an XOFF (as described in the cc
     66 * spec), we delay sending an XOFF for `XOFF_GRACE_PERIOD_USEC` microseconds.
     67 * This gives the mainloop a chance to flush the buffer to the edge
     68 * connection's socket. If this flush causes the outbuf queue to shrink under
     69 * our XOFF limit, then we no longer need to send an XOFF. If after
     70 * `XOFF_GRACE_PERIOD_USEC` we receive another message and the outbuf queue
     71 * still exceeds the XOFF limit, we send an XOFF.
     72 *
     73 * The value of 5 milliseconds was chosen arbitrarily. In practice it should be
     74 * enough time for the edge connection to get a chance to flush, but not too
     75 * long to cause excessive buffering.
     76 */
     77 #define XOFF_GRACE_PERIOD_USEC (5000)
     78 
     79 /* The following three are for dropmark rate limiting. They define when we
     80 * scale down our XON, XOFF, and xmit byte counts. Early scaling is beneficial
     81 * because it limits the ability of spurious XON/XOFF to be sent after large
     82 * amounts of data without XON/XOFF. At these limits, after 10MB of data (or
     83 * more), an adversary can only inject (log2(10MB)-log2(200*500))*100 ~= 1000
     84 * cells of fake XOFF/XON before the xmit byte count will be halved enough to
     85 * triggering a limit. */
     86 #define XON_COUNT_SCALE_AT 200
     87 #define XOFF_COUNT_SCALE_AT 200
     88 #define ONE_MEGABYTE (UINT64_C(1) << 20)
     89 #define TOTAL_XMIT_SCALE_AT (10 * ONE_MEGABYTE)
     90 
     91 /**
     92 * Update global congestion control related consensus parameter values, every
     93 * consensus update.
     94 *
     95 * More details for each of the parameters can be found in proposal 324,
     96 * section 6.5 including tuning notes.
     97 */
     98 void
     99 flow_control_new_consensus_params(const networkstatus_t *ns)
    100 {
    101 #define CC_XOFF_CLIENT_DFLT 500
    102 #define CC_XOFF_CLIENT_MIN 1
    103 #define CC_XOFF_CLIENT_MAX 10000
    104  xoff_client = networkstatus_get_param(ns, "cc_xoff_client",
    105      CC_XOFF_CLIENT_DFLT,
    106      CC_XOFF_CLIENT_MIN,
    107      CC_XOFF_CLIENT_MAX)*RELAY_PAYLOAD_SIZE_MIN;
    108 
    109 #define CC_XOFF_EXIT_DFLT 500
    110 #define CC_XOFF_EXIT_MIN 1
    111 #define CC_XOFF_EXIT_MAX 10000
    112  xoff_exit = networkstatus_get_param(ns, "cc_xoff_exit",
    113      CC_XOFF_EXIT_DFLT,
    114      CC_XOFF_EXIT_MIN,
    115      CC_XOFF_EXIT_MAX)*RELAY_PAYLOAD_SIZE_MIN;
    116 
    117 #define CC_XON_CHANGE_PCT_DFLT 25
    118 #define CC_XON_CHANGE_PCT_MIN 1
    119 #define CC_XON_CHANGE_PCT_MAX 99
    120  xon_change_pct = networkstatus_get_param(ns, "cc_xon_change_pct",
    121      CC_XON_CHANGE_PCT_DFLT,
    122      CC_XON_CHANGE_PCT_MIN,
    123      CC_XON_CHANGE_PCT_MAX);
    124 
    125 #define CC_XON_RATE_BYTES_DFLT (500)
    126 #define CC_XON_RATE_BYTES_MIN (1)
    127 #define CC_XON_RATE_BYTES_MAX (5000)
    128  xon_rate_bytes = networkstatus_get_param(ns, "cc_xon_rate",
    129      CC_XON_RATE_BYTES_DFLT,
    130      CC_XON_RATE_BYTES_MIN,
    131      CC_XON_RATE_BYTES_MAX)*RELAY_PAYLOAD_SIZE_MAX;
    132 
    133 #define CC_XON_EWMA_CNT_DFLT (2)
    134 #define CC_XON_EWMA_CNT_MIN (2)
    135 #define CC_XON_EWMA_CNT_MAX (100)
    136  xon_ewma_cnt = networkstatus_get_param(ns, "cc_xon_ewma_cnt",
    137      CC_XON_EWMA_CNT_DFLT,
    138      CC_XON_EWMA_CNT_MIN,
    139      CC_XON_EWMA_CNT_MAX);
    140 }
    141 
    142 /**
    143 * Send an XOFF for this stream, and note that we sent one
    144 */
    145 static void
    146 circuit_send_stream_xoff(edge_connection_t *stream)
    147 {
    148  xoff_cell_t xoff;
    149  uint8_t payload[CELL_PAYLOAD_SIZE];
    150  ssize_t xoff_size;
    151 
    152  memset(&xoff, 0, sizeof(xoff));
    153  memset(payload, 0, sizeof(payload));
    154 
    155  xoff_cell_set_version(&xoff, 0);
    156 
    157  if ((xoff_size = xoff_cell_encode(payload, CELL_PAYLOAD_SIZE, &xoff)) < 0) {
    158    log_warn(LD_BUG, "Failed to encode xon cell");
    159    return;
    160  }
    161 
    162  if (connection_edge_send_command(stream, RELAY_COMMAND_XOFF,
    163                               (char*)payload, (size_t)xoff_size) == 0) {
    164    stream->xoff_sent = true;
    165    cc_stats_flow_num_xoff_sent++;
    166 
    167    /* If this is an entry conn, notify control port */
    168    if (TO_CONN(stream)->type == CONN_TYPE_AP) {
    169      control_event_stream_status(TO_ENTRY_CONN(TO_CONN(stream)),
    170                                  STREAM_EVENT_XOFF_SENT,
    171                                  0);
    172    }
    173  }
    174 }
    175 
    176 /**
    177 * Compute the recent drain rate (write rate) for this edge
    178 * connection and return it, in KB/sec (1000 bytes/sec).
    179 *
    180 * Returns 0 if the monotime clock is busted.
    181 */
    182 static inline uint32_t
    183 compute_drain_rate(const edge_connection_t *stream)
    184 {
    185  if (BUG(!is_monotime_clock_reliable())) {
    186    log_warn(LD_BUG, "Computing drain rate with stalled monotime clock");
    187    return 0;
    188  }
    189 
    190  uint64_t delta = monotime_absolute_usec() - stream->drain_start_usec;
    191 
    192  if (delta == 0) {
    193    log_warn(LD_BUG, "Computing stream drain rate with zero time delta");
    194    return 0;
    195  }
    196 
    197  /* Overflow checks */
    198  if (stream->prev_drained_bytes > INT32_MAX/1000 || /* Intermediate */
    199      stream->prev_drained_bytes/delta > INT32_MAX/1000) { /* full value */
    200    return INT32_MAX;
    201  }
    202 
    203  /* kb/sec = bytes/usec * 1000 usec/msec * 1000 msec/sec * kb/1000bytes */
    204  return MAX(1, (uint32_t)(stream->prev_drained_bytes * 1000)/delta);
    205 }
    206 
    207 /**
    208 * Send an XON for this stream, with appropriate advisory rate information.
    209 *
    210 * Reverts the xoff sent status, and stores the rate information we sent,
    211 * in case it changes.
    212 */
    213 static void
    214 circuit_send_stream_xon(edge_connection_t *stream)
    215 {
    216  xon_cell_t xon;
    217  uint8_t payload[CELL_PAYLOAD_SIZE];
    218  ssize_t xon_size;
    219 
    220  memset(&xon, 0, sizeof(xon));
    221  memset(payload, 0, sizeof(payload));
    222 
    223  xon_cell_set_version(&xon, 0);
    224  xon_cell_set_kbps_ewma(&xon, stream->ewma_drain_rate);
    225 
    226  if ((xon_size = xon_cell_encode(payload, CELL_PAYLOAD_SIZE, &xon)) < 0) {
    227    log_warn(LD_BUG, "Failed to encode xon cell");
    228    return;
    229  }
    230 
    231  /* Store the advisory rate information, to send advisory updates if
    232   * it changes */
    233  stream->ewma_rate_last_sent = stream->ewma_drain_rate;
    234 
    235  if (connection_edge_send_command(stream, RELAY_COMMAND_XON, (char*)payload,
    236                                   (size_t)xon_size) == 0) {
    237    /* Revert the xoff sent status, so we can send another one if need be */
    238    stream->xoff_sent = false;
    239 
    240    cc_stats_flow_num_xon_sent++;
    241 
    242    /* If it's an entry conn, notify control port */
    243    if (TO_CONN(stream)->type == CONN_TYPE_AP) {
    244      control_event_stream_status(TO_ENTRY_CONN(TO_CONN(stream)),
    245                                  STREAM_EVENT_XON_SENT,
    246                                  0);
    247    }
    248  }
    249 }
    250 
    251 /**
    252 * Process a stream XOFF, parsing it, and then stopping reading on
    253 * the edge connection.
    254 *
    255 * Record that we have received an xoff, so we know not to resume
    256 * reading on this edge conn until we get an XON.
    257 *
    258 * Returns false if the XOFF did not validate; true if it does.
    259 */
    260 bool
    261 circuit_process_stream_xoff(edge_connection_t *conn,
    262                            const crypt_path_t *layer_hint)
    263 {
    264  bool retval = true;
    265 
    266  if (BUG(!conn)) {
    267    log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
    268           "Got XOFF on invalid stream?");
    269    return false;
    270  }
    271 
    272  /* Make sure this XOFF came from the right hop */
    273  if (!edge_uses_cpath(conn, layer_hint)) {
    274    log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
    275            "Got XOFF from wrong hop.");
    276    return false;
    277  }
    278 
    279  if (!edge_uses_flow_control(conn)) {
    280    log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
    281           "Got XOFF for non-congestion control circuit");
    282    return false;
    283  }
    284 
    285  if (conn->xoff_received) {
    286    log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
    287           "Got multiple XOFF on connection");
    288    return false;
    289  }
    290 
    291  /* If we are near the max, scale everything down */
    292  if (conn->num_xoff_recv == XOFF_COUNT_SCALE_AT) {
    293    log_info(LD_EDGE, "Scaling down for XOFF count: %d %d %d",
    294             conn->total_bytes_xmit,
    295             conn->num_xoff_recv,
    296             conn->num_xon_recv);
    297    conn->total_bytes_xmit /= 2;
    298    conn->num_xoff_recv /= 2;
    299    conn->num_xon_recv /= 2;
    300  }
    301 
    302  conn->num_xoff_recv++;
    303 
    304  /* Client-side check to make sure that XOFF is not sent too early,
    305   * for dropmark attacks. The main sidechannel risk is early cells,
    306   * but we also check to make sure that we have not received more XOFFs
    307   * than could have been generated by the bytes we sent.
    308   */
    309  if (TO_CONN(conn)->type == CONN_TYPE_AP || conn->hs_ident != NULL) {
    310    uint32_t limit = 0;
    311    if (conn->hs_ident)
    312      limit = xoff_client;
    313    else
    314      limit = xoff_exit;
    315 
    316    if (conn->total_bytes_xmit < limit*conn->num_xoff_recv) {
    317      log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
    318             "Got extra XOFF for bytes sent. Got %d, expected max %d",
    319             conn->num_xoff_recv, conn->total_bytes_xmit/limit);
    320      /* We still process this, because the only dropmark defenses
    321       * in C tor are via the vanguards addon's use of the read valid
    322       * cells. So just signal that we think this is not valid protocol
    323       * data and proceed. */
    324      retval = false;
    325    }
    326  }
    327 
    328  log_info(LD_EDGE, "Got XOFF!");
    329  connection_stop_reading(TO_CONN(conn));
    330  conn->xoff_received = true;
    331 
    332  /* If this is an entry conn, notify control port */
    333  if (TO_CONN(conn)->type == CONN_TYPE_AP) {
    334    control_event_stream_status(TO_ENTRY_CONN(TO_CONN(conn)),
    335                                STREAM_EVENT_XOFF_RECV,
    336                                0);
    337  }
    338 
    339  return retval;
    340 }
    341 
    342 /**
    343 * Process a stream XON, and if it validates, clear the xoff
    344 * flag and resume reading on this edge connection.
    345 *
    346 * Also, use provided rate information to rate limit
    347 * reading on this edge (or packagaing from it onto
    348 * the circuit), to avoid XON/XOFF chatter.
    349 *
    350 * Returns true if the XON validates, false otherwise.
    351 */
    352 bool
    353 circuit_process_stream_xon(edge_connection_t *conn,
    354                           const crypt_path_t *layer_hint,
    355                           const relay_msg_t *msg)
    356 {
    357  xon_cell_t *xon;
    358  bool retval = true;
    359 
    360  if (BUG(!conn)) {
    361    log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
    362           "Got XON on invalid stream?");
    363    return false;
    364  }
    365 
    366  /* Make sure this XON came from the right hop */
    367  if (!edge_uses_cpath(conn, layer_hint)) {
    368    log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
    369           "Got XON from wrong hop.");
    370    return false;
    371  }
    372 
    373  if (!edge_uses_flow_control(conn)) {
    374    log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
    375           "Got XON for non-congestion control circuit");
    376    return false;
    377  }
    378 
    379  if (xon_cell_parse(&xon, msg->body, msg->length) < 0) {
    380    log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
    381          "Received malformed XON cell.");
    382    return false;
    383  }
    384 
    385  /* If we are near the max, scale everything down */
    386  if (conn->num_xon_recv == XON_COUNT_SCALE_AT) {
    387    log_info(LD_EDGE, "Scaling down for XON count: %d %d %d",
    388             conn->total_bytes_xmit,
    389             conn->num_xoff_recv,
    390             conn->num_xon_recv);
    391    conn->total_bytes_xmit /= 2;
    392    conn->num_xoff_recv /= 2;
    393    conn->num_xon_recv /= 2;
    394  }
    395 
    396  conn->num_xon_recv++;
    397 
    398  /* Client-side check to make sure that XON is not sent too early,
    399   * for dropmark attacks. The main sidechannel risk is early cells,
    400   * but we also check to see that we did not get more XONs than make
    401   * sense for the number of bytes we sent.
    402   */
    403  if (TO_CONN(conn)->type == CONN_TYPE_AP || conn->hs_ident != NULL) {
    404    uint32_t limit = 0;
    405 
    406    if (conn->hs_ident)
    407      limit = MIN(xoff_client, xon_rate_bytes);
    408    else
    409      limit = MIN(xoff_exit, xon_rate_bytes);
    410 
    411    if (conn->total_bytes_xmit < limit*conn->num_xon_recv) {
    412      log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
    413             "Got extra XON for bytes sent. Got %d, expected max %d",
    414             conn->num_xon_recv, conn->total_bytes_xmit/limit);
    415 
    416      /* We still process this, because the only dropmark defenses
    417       * in C tor are via the vanguards addon's use of the read valid
    418       * cells. So just signal that we think this is not valid protocol
    419       * data and proceed. */
    420      retval = false;
    421    }
    422  }
    423 
    424  log_info(LD_EDGE, "Got XON: %d", xon->kbps_ewma);
    425 
    426  /* Adjust the token bucket of this edge connection with the drain rate in
    427   * the XON. Rate is in bytes from kilobit (kpbs). */
    428  uint64_t rate = ((uint64_t) xon_cell_get_kbps_ewma(xon) * 1000);
    429  if (rate == 0 || INT32_MAX < rate) {
    430    /* No rate. */
    431    rate = INT32_MAX;
    432  }
    433  token_bucket_rw_adjust(&conn->bucket, (uint32_t) rate, (uint32_t) rate);
    434 
    435  if (conn->xoff_received) {
    436    /* Clear the fact that we got an XOFF, so that this edge can
    437     * start and stop reading normally */
    438    conn->xoff_received = false;
    439    connection_start_reading(TO_CONN(conn));
    440  }
    441 
    442  /* If this is an entry conn, notify control port */
    443  if (TO_CONN(conn)->type == CONN_TYPE_AP) {
    444    control_event_stream_status(TO_ENTRY_CONN(TO_CONN(conn)),
    445                                STREAM_EVENT_XON_RECV,
    446                                0);
    447  }
    448 
    449  xon_cell_free(xon);
    450 
    451  return retval;
    452 }
    453 
    454 /**
    455 * Called from sendme_stream_data_received(), when data arrives
    456 * from a circuit to our edge's outbuf, to decide if we need to send
    457 * an XOFF.
    458 *
    459 * Returns the amount of cells remaining until the buffer is full, at
    460 * which point it sends an XOFF, and returns 0.
    461 *
    462 * Returns less than 0 if we have queued more than a congestion window
    463 * worth of data and need to close the circuit.
    464 */
    465 int
    466 flow_control_decide_xoff(edge_connection_t *stream)
    467 {
    468  size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream));
    469  uint32_t buffer_limit_xoff = 0;
    470 
    471  if (BUG(!edge_uses_flow_control(stream))) {
    472    log_err(LD_BUG, "Flow control called for non-congestion control circuit");
    473    return -1;
    474  }
    475 
    476  /* Onion services and clients are typically localhost edges, so they
    477   * need different buffering limits than exits do */
    478  if (TO_CONN(stream)->type == CONN_TYPE_AP || stream->hs_ident != NULL) {
    479    buffer_limit_xoff = xoff_client;
    480  } else {
    481    buffer_limit_xoff = xoff_exit;
    482  }
    483 
    484  if (total_buffered > buffer_limit_xoff) {
    485    if (!stream->xoff_sent) {
    486      uint64_t now = monotime_absolute_usec();
    487 
    488      if (stream->xoff_grace_period_start_usec == 0) {
    489        /* If unset, we haven't begun the XOFF grace period. We need to start.
    490         */
    491        log_debug(LD_EDGE,
    492                  "Exceeded XOFF limit; Beginning grace period: "
    493                  "total-buffered=%" TOR_PRIuSZ " xoff-limit=%d",
    494                  total_buffered, buffer_limit_xoff);
    495 
    496        stream->xoff_grace_period_start_usec = now;
    497      } else if (now > stream->xoff_grace_period_start_usec +
    498                           XOFF_GRACE_PERIOD_USEC) {
    499        /* If we've exceeded our XOFF grace period, we need to send an XOFF. */
    500        log_info(LD_EDGE,
    501                 "Sending XOFF: total-buffered=%" TOR_PRIuSZ
    502                 " xoff-limit=%d grace-period-dur=%" PRIu64 "usec",
    503                 total_buffered, buffer_limit_xoff,
    504                 now - stream->xoff_grace_period_start_usec);
    505        tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xoff_sending), stream);
    506 
    507        cc_stats_flow_xoff_outbuf_ma =
    508          stats_update_running_avg(cc_stats_flow_xoff_outbuf_ma,
    509                                   total_buffered);
    510 
    511        circuit_send_stream_xoff(stream);
    512 
    513        /* Clear the drain rate. It is considered wrong if we
    514         * got all the way to XOFF */
    515        stream->ewma_drain_rate = 0;
    516 
    517        /* Unset our grace period. */
    518        stream->xoff_grace_period_start_usec = 0;
    519      } else {
    520        /* Else we're in the XOFF grace period, so don't do anything. */
    521      }
    522    }
    523  } else {
    524    /* The outbuf length is less than the XOFF limit, so unset our grace
    525     * period. */
    526    stream->xoff_grace_period_start_usec = 0;
    527  }
    528 
    529  /* If the outbuf has accumulated more than the expected burst limit of
    530   * cells, then assume it is not draining, and call decide_xon. We must
    531   * do this because writes only happen when the socket unblocks, so
    532   * may not otherwise notice accumulation of data in the outbuf for
    533   * advisory XONs. */
    534   if (total_buffered > MAX_EXPECTED_CELL_BURST*RELAY_PAYLOAD_SIZE_MIN) {
    535     flow_control_decide_xon(stream, 0);
    536   }
    537 
    538  /* Flow control always takes more data; we rely on the oomkiller to
    539   * handle misbehavior. */
    540  return 0;
    541 }
    542 
    543 /**
    544 * Returns true if the stream's drain rate has changed significantly.
    545 *
    546 * Returns false if the monotime clock is stalled, or if we have
    547 * no previous drain rate information.
    548 */
    549 static bool
    550 stream_drain_rate_changed(const edge_connection_t *stream)
    551 {
    552  if (!is_monotime_clock_reliable()) {
    553    return false;
    554  }
    555 
    556  if (!stream->ewma_rate_last_sent) {
    557    return false;
    558  }
    559 
    560  if (stream->ewma_drain_rate >
    561      (100+(uint64_t)xon_change_pct)*stream->ewma_rate_last_sent/100) {
    562    return true;
    563  }
    564 
    565  if (stream->ewma_drain_rate <
    566      (100-(uint64_t)xon_change_pct)*stream->ewma_rate_last_sent/100) {
    567    return true;
    568  }
    569 
    570  return false;
    571 }
    572 
    573 /**
    574 * Called whenever we drain an edge connection outbuf by writing on
    575 * its socket, to decide if it is time to send an xon.
    576 *
    577 * The n_written parameter tells us how many bytes we have written
    578 * this time, which is used to compute the advisory drain rate fields.
    579 */
    580 void
    581 flow_control_decide_xon(edge_connection_t *stream, size_t n_written)
    582 {
    583  size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream));
    584 
    585  /* Bounds check the number of drained bytes, and scale */
    586  if (stream->drained_bytes >= UINT32_MAX - n_written) {
    587    /* Cut the bytes in half, and move the start time up halfway to now
    588     * (if we have one). */
    589    stream->drained_bytes /= 2;
    590 
    591    if (stream->drain_start_usec) {
    592        uint64_t now = monotime_absolute_usec();
    593 
    594        stream->drain_start_usec = now - (now-stream->drain_start_usec)/2;
    595    }
    596  }
    597 
    598  /* Accumulate drained bytes since last rate computation */
    599  stream->drained_bytes += n_written;
    600 
    601  tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon), stream, n_written);
    602 
    603  /* Check for bad monotime clock and bytecount wrap */
    604  if (!is_monotime_clock_reliable()) {
    605    /* If the monotime clock ever goes wrong, the safest thing to do
    606     * is just clear our short-term rate info and wait for the clock to
    607     * become reliable again.. */
    608    stream->drain_start_usec = 0;
    609    stream->drained_bytes = 0;
    610  } else {
    611    /* If we have no drain start timestamp, and we still have
    612     * remaining buffer, start the buffering counter */
    613    if (!stream->drain_start_usec && total_buffered > 0) {
    614      log_debug(LD_EDGE, "Began edge buffering: %d %d %"TOR_PRIuSZ,
    615                 stream->ewma_rate_last_sent,
    616                 stream->ewma_drain_rate,
    617                 total_buffered);
    618      tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_start),
    619                stream);
    620      stream->drain_start_usec = monotime_absolute_usec();
    621      stream->drained_bytes = 0;
    622    }
    623  }
    624 
    625  if (stream->drain_start_usec) {
    626    /* If we have spent enough time in a queued state, update our drain
    627     * rate. */
    628    if (stream->drained_bytes > xon_rate_bytes) {
    629      /* No previous drained bytes means it is the first time we are computing
    630       * it so use the value we just drained onto the socket as a baseline. It
    631       * won't be accurate but it will be a start towards the right value.
    632       *
    633       * We have to do this in order to have a drain rate else we could be
    634       * sending a drain rate of 0 in an XON which would be undesirable and
    635       * basically like sending an XOFF. */
    636      if (stream->prev_drained_bytes == 0) {
    637        stream->prev_drained_bytes = stream->drained_bytes;
    638      }
    639      uint32_t drain_rate = compute_drain_rate(stream);
    640      /* Once the drain rate has been computed, note how many bytes we just
    641       * drained so it can be used at the next calculation. We do this here
    642       * because it gets reset once the rate is changed. */
    643      stream->prev_drained_bytes = stream->drained_bytes;
    644 
    645      if (drain_rate) {
    646        stream->ewma_drain_rate =
    647            (uint32_t)n_count_ewma(drain_rate,
    648                                   stream->ewma_drain_rate,
    649                                   xon_ewma_cnt);
    650        log_debug(LD_EDGE, "Updating drain rate: %d %d %"TOR_PRIuSZ,
    651                   drain_rate,
    652                   stream->ewma_drain_rate,
    653                   total_buffered);
    654        tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_update),
    655                  stream, drain_rate);
    656        /* Reset recent byte counts. This prevents us from sending advisory
    657         * XONs more frequent than every xon_rate_bytes. */
    658        stream->drained_bytes = 0;
    659        stream->drain_start_usec = 0;
    660      }
    661    }
    662  }
    663 
    664  /* If we don't have an XOFF outstanding, consider updating an
    665   * old rate */
    666  if (!stream->xoff_sent) {
    667    if (stream_drain_rate_changed(stream)) {
    668      /* If we are still buffering and the rate changed, update
    669       * advisory XON */
    670      log_info(LD_EDGE, "Sending rate-change XON: %d %d %"TOR_PRIuSZ,
    671                 stream->ewma_rate_last_sent,
    672                 stream->ewma_drain_rate,
    673                 total_buffered);
    674      tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_rate_change), stream);
    675 
    676      cc_stats_flow_xon_outbuf_ma =
    677        stats_update_running_avg(cc_stats_flow_xon_outbuf_ma,
    678                                 total_buffered);
    679 
    680      circuit_send_stream_xon(stream);
    681    }
    682  } else if (total_buffered == 0) {
    683    log_info(LD_EDGE, "Sending XON: %d %d %"TOR_PRIuSZ,
    684               stream->ewma_rate_last_sent,
    685               stream->ewma_drain_rate,
    686               total_buffered);
    687    tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_partial_drain), stream);
    688    circuit_send_stream_xon(stream);
    689  }
    690 
    691  /* If the buffer has fully emptied, clear the drain timestamp,
    692   * so we can total only bytes drained while outbuf is 0. */
    693  if (total_buffered == 0) {
    694    stream->drain_start_usec = 0;
    695 
    696    /* After we've spent 'xon_rate_bytes' with the queue fully drained,
    697     * double any rate we sent. */
    698    if (stream->drained_bytes >= xon_rate_bytes &&
    699        stream->ewma_rate_last_sent) {
    700      stream->ewma_drain_rate = MIN(INT32_MAX, 2*stream->ewma_drain_rate);
    701 
    702      log_debug(LD_EDGE,
    703                 "Queue empty for xon_rate_limit bytes: %d %d",
    704                 stream->ewma_rate_last_sent,
    705                 stream->ewma_drain_rate);
    706      tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_doubled), stream);
    707      /* Resetting the drained bytes count. We need to keep its value as a
    708       * previous so the drain rate calculation takes into account what was
    709       * actually drain the last time. */
    710      stream->prev_drained_bytes = stream->drained_bytes;
    711      stream->drained_bytes = 0;
    712    }
    713  }
    714 
    715  return;
    716 }
    717 
    718 /**
    719 * Note that we packaged some data on this stream. Used to enforce
    720 * client-side dropmark limits
    721 */
    722 void
    723 flow_control_note_sent_data(edge_connection_t *stream, size_t len)
    724 {
    725  /* If we are near the max, scale everything down */
    726  if (stream->total_bytes_xmit >= TOTAL_XMIT_SCALE_AT-len) {
    727    log_info(LD_EDGE, "Scaling down for flow control xmit bytes:: %d %d %d",
    728             stream->total_bytes_xmit,
    729             stream->num_xoff_recv,
    730             stream->num_xon_recv);
    731 
    732    stream->total_bytes_xmit /= 2;
    733    stream->num_xoff_recv /= 2;
    734    stream->num_xon_recv /= 2;
    735  }
    736 
    737  stream->total_bytes_xmit += len;
    738 }
    739 
    740 /** Returns true if an edge connection uses flow control */
    741 bool
    742 edge_uses_flow_control(const edge_connection_t *stream)
    743 {
    744  bool ret = (stream->on_circuit && stream->on_circuit->ccontrol) ||
    745             (stream->cpath_layer && stream->cpath_layer->ccontrol);
    746 
    747  /* All circuits with congestion control use flow control */
    748  return ret;
    749 }
    750 
    751 /** Returns true if a connection is an edge conn that uses flow control */
    752 bool
    753 conn_uses_flow_control(connection_t *conn)
    754 {
    755  bool ret = false;
    756 
    757  if (CONN_IS_EDGE(conn)) {
    758    edge_connection_t *edge = TO_EDGE_CONN(conn);
    759 
    760    if (edge_uses_flow_control(edge)) {
    761      ret = true;
    762    }
    763  }
    764 
    765  return ret;
    766 }