congestion_control_flow.c (25633B)
1 /* Copyright (c) 2019-2021, The Tor Project, Inc. */ 2 /* See LICENSE for licensing information */ 3 4 /** 5 * \file congestion_control_flow.c 6 * \brief Code that implements flow control for congestion controlled 7 * circuits. 8 */ 9 10 #define TOR_CONGESTION_CONTROL_FLOW_PRIVATE 11 12 #include "core/or/or.h" 13 14 #include "core/or/relay.h" 15 #include "core/mainloop/connection.h" 16 #include "core/or/connection_edge.h" 17 #include "core/mainloop/mainloop.h" 18 #include "core/or/congestion_control_common.h" 19 #include "core/or/congestion_control_flow.h" 20 #include "core/or/congestion_control_st.h" 21 #include "core/or/circuitlist.h" 22 #include "core/or/trace_probes_cc.h" 23 #include "feature/nodelist/networkstatus.h" 24 #include "trunnel/flow_control_cells.h" 25 #include "feature/control/control_events.h" 26 #include "lib/math/stats.h" 27 28 #include "core/or/connection_st.h" 29 #include "core/or/cell_st.h" 30 #include "app/config/config.h" 31 #include "core/or/conflux_util.h" 32 33 /** Cache consensus parameters */ 34 static uint32_t xoff_client; 35 static uint32_t xoff_exit; 36 37 static uint32_t xon_change_pct; 38 static uint32_t xon_ewma_cnt; 39 static uint32_t xon_rate_bytes; 40 41 /** Metricsport stats */ 42 uint64_t cc_stats_flow_num_xoff_sent; 43 uint64_t cc_stats_flow_num_xon_sent; 44 double cc_stats_flow_xoff_outbuf_ma = 0; 45 double cc_stats_flow_xon_outbuf_ma = 0; 46 47 /* In normal operation, we can get a burst of up to 32 cells before returning 48 * to libevent to flush the outbuf. This is a heuristic from hardcoded values 49 * and strange logic in connection_bucket_get_share(). */ 50 #define MAX_EXPECTED_CELL_BURST 32 51 52 /* This is the grace period that we use to give the edge connection a chance to 53 * reduce its outbuf before we send an XOFF. 54 * 55 * The congestion control spec says: 56 * > If the length of an edge outbuf queue exceeds the size provided in the 57 * > appropriate client or exit XOFF consensus parameter, a 58 * > RELAY_COMMAND_STREAM_XOFF will be sent 59 * 60 * This doesn't directly adapt well to tor, where we process many incoming 61 * messages at once. We may buffer a lot of stream data before giving the 62 * mainloop a chance to flush the the edge connection's outbuf, even if the 63 * edge connection's socket is able to accept more bytes. 64 * 65 * Instead if we detect that we should send an XOFF (as described in the cc 66 * spec), we delay sending an XOFF for `XOFF_GRACE_PERIOD_USEC` microseconds. 67 * This gives the mainloop a chance to flush the buffer to the edge 68 * connection's socket. If this flush causes the outbuf queue to shrink under 69 * our XOFF limit, then we no longer need to send an XOFF. If after 70 * `XOFF_GRACE_PERIOD_USEC` we receive another message and the outbuf queue 71 * still exceeds the XOFF limit, we send an XOFF. 72 * 73 * The value of 5 milliseconds was chosen arbitrarily. In practice it should be 74 * enough time for the edge connection to get a chance to flush, but not too 75 * long to cause excessive buffering. 76 */ 77 #define XOFF_GRACE_PERIOD_USEC (5000) 78 79 /* The following three are for dropmark rate limiting. They define when we 80 * scale down our XON, XOFF, and xmit byte counts. Early scaling is beneficial 81 * because it limits the ability of spurious XON/XOFF to be sent after large 82 * amounts of data without XON/XOFF. At these limits, after 10MB of data (or 83 * more), an adversary can only inject (log2(10MB)-log2(200*500))*100 ~= 1000 84 * cells of fake XOFF/XON before the xmit byte count will be halved enough to 85 * triggering a limit. */ 86 #define XON_COUNT_SCALE_AT 200 87 #define XOFF_COUNT_SCALE_AT 200 88 #define ONE_MEGABYTE (UINT64_C(1) << 20) 89 #define TOTAL_XMIT_SCALE_AT (10 * ONE_MEGABYTE) 90 91 /** 92 * Update global congestion control related consensus parameter values, every 93 * consensus update. 94 * 95 * More details for each of the parameters can be found in proposal 324, 96 * section 6.5 including tuning notes. 97 */ 98 void 99 flow_control_new_consensus_params(const networkstatus_t *ns) 100 { 101 #define CC_XOFF_CLIENT_DFLT 500 102 #define CC_XOFF_CLIENT_MIN 1 103 #define CC_XOFF_CLIENT_MAX 10000 104 xoff_client = networkstatus_get_param(ns, "cc_xoff_client", 105 CC_XOFF_CLIENT_DFLT, 106 CC_XOFF_CLIENT_MIN, 107 CC_XOFF_CLIENT_MAX)*RELAY_PAYLOAD_SIZE_MIN; 108 109 #define CC_XOFF_EXIT_DFLT 500 110 #define CC_XOFF_EXIT_MIN 1 111 #define CC_XOFF_EXIT_MAX 10000 112 xoff_exit = networkstatus_get_param(ns, "cc_xoff_exit", 113 CC_XOFF_EXIT_DFLT, 114 CC_XOFF_EXIT_MIN, 115 CC_XOFF_EXIT_MAX)*RELAY_PAYLOAD_SIZE_MIN; 116 117 #define CC_XON_CHANGE_PCT_DFLT 25 118 #define CC_XON_CHANGE_PCT_MIN 1 119 #define CC_XON_CHANGE_PCT_MAX 99 120 xon_change_pct = networkstatus_get_param(ns, "cc_xon_change_pct", 121 CC_XON_CHANGE_PCT_DFLT, 122 CC_XON_CHANGE_PCT_MIN, 123 CC_XON_CHANGE_PCT_MAX); 124 125 #define CC_XON_RATE_BYTES_DFLT (500) 126 #define CC_XON_RATE_BYTES_MIN (1) 127 #define CC_XON_RATE_BYTES_MAX (5000) 128 xon_rate_bytes = networkstatus_get_param(ns, "cc_xon_rate", 129 CC_XON_RATE_BYTES_DFLT, 130 CC_XON_RATE_BYTES_MIN, 131 CC_XON_RATE_BYTES_MAX)*RELAY_PAYLOAD_SIZE_MAX; 132 133 #define CC_XON_EWMA_CNT_DFLT (2) 134 #define CC_XON_EWMA_CNT_MIN (2) 135 #define CC_XON_EWMA_CNT_MAX (100) 136 xon_ewma_cnt = networkstatus_get_param(ns, "cc_xon_ewma_cnt", 137 CC_XON_EWMA_CNT_DFLT, 138 CC_XON_EWMA_CNT_MIN, 139 CC_XON_EWMA_CNT_MAX); 140 } 141 142 /** 143 * Send an XOFF for this stream, and note that we sent one 144 */ 145 static void 146 circuit_send_stream_xoff(edge_connection_t *stream) 147 { 148 xoff_cell_t xoff; 149 uint8_t payload[CELL_PAYLOAD_SIZE]; 150 ssize_t xoff_size; 151 152 memset(&xoff, 0, sizeof(xoff)); 153 memset(payload, 0, sizeof(payload)); 154 155 xoff_cell_set_version(&xoff, 0); 156 157 if ((xoff_size = xoff_cell_encode(payload, CELL_PAYLOAD_SIZE, &xoff)) < 0) { 158 log_warn(LD_BUG, "Failed to encode xon cell"); 159 return; 160 } 161 162 if (connection_edge_send_command(stream, RELAY_COMMAND_XOFF, 163 (char*)payload, (size_t)xoff_size) == 0) { 164 stream->xoff_sent = true; 165 cc_stats_flow_num_xoff_sent++; 166 167 /* If this is an entry conn, notify control port */ 168 if (TO_CONN(stream)->type == CONN_TYPE_AP) { 169 control_event_stream_status(TO_ENTRY_CONN(TO_CONN(stream)), 170 STREAM_EVENT_XOFF_SENT, 171 0); 172 } 173 } 174 } 175 176 /** 177 * Compute the recent drain rate (write rate) for this edge 178 * connection and return it, in KB/sec (1000 bytes/sec). 179 * 180 * Returns 0 if the monotime clock is busted. 181 */ 182 static inline uint32_t 183 compute_drain_rate(const edge_connection_t *stream) 184 { 185 if (BUG(!is_monotime_clock_reliable())) { 186 log_warn(LD_BUG, "Computing drain rate with stalled monotime clock"); 187 return 0; 188 } 189 190 uint64_t delta = monotime_absolute_usec() - stream->drain_start_usec; 191 192 if (delta == 0) { 193 log_warn(LD_BUG, "Computing stream drain rate with zero time delta"); 194 return 0; 195 } 196 197 /* Overflow checks */ 198 if (stream->prev_drained_bytes > INT32_MAX/1000 || /* Intermediate */ 199 stream->prev_drained_bytes/delta > INT32_MAX/1000) { /* full value */ 200 return INT32_MAX; 201 } 202 203 /* kb/sec = bytes/usec * 1000 usec/msec * 1000 msec/sec * kb/1000bytes */ 204 return MAX(1, (uint32_t)(stream->prev_drained_bytes * 1000)/delta); 205 } 206 207 /** 208 * Send an XON for this stream, with appropriate advisory rate information. 209 * 210 * Reverts the xoff sent status, and stores the rate information we sent, 211 * in case it changes. 212 */ 213 static void 214 circuit_send_stream_xon(edge_connection_t *stream) 215 { 216 xon_cell_t xon; 217 uint8_t payload[CELL_PAYLOAD_SIZE]; 218 ssize_t xon_size; 219 220 memset(&xon, 0, sizeof(xon)); 221 memset(payload, 0, sizeof(payload)); 222 223 xon_cell_set_version(&xon, 0); 224 xon_cell_set_kbps_ewma(&xon, stream->ewma_drain_rate); 225 226 if ((xon_size = xon_cell_encode(payload, CELL_PAYLOAD_SIZE, &xon)) < 0) { 227 log_warn(LD_BUG, "Failed to encode xon cell"); 228 return; 229 } 230 231 /* Store the advisory rate information, to send advisory updates if 232 * it changes */ 233 stream->ewma_rate_last_sent = stream->ewma_drain_rate; 234 235 if (connection_edge_send_command(stream, RELAY_COMMAND_XON, (char*)payload, 236 (size_t)xon_size) == 0) { 237 /* Revert the xoff sent status, so we can send another one if need be */ 238 stream->xoff_sent = false; 239 240 cc_stats_flow_num_xon_sent++; 241 242 /* If it's an entry conn, notify control port */ 243 if (TO_CONN(stream)->type == CONN_TYPE_AP) { 244 control_event_stream_status(TO_ENTRY_CONN(TO_CONN(stream)), 245 STREAM_EVENT_XON_SENT, 246 0); 247 } 248 } 249 } 250 251 /** 252 * Process a stream XOFF, parsing it, and then stopping reading on 253 * the edge connection. 254 * 255 * Record that we have received an xoff, so we know not to resume 256 * reading on this edge conn until we get an XON. 257 * 258 * Returns false if the XOFF did not validate; true if it does. 259 */ 260 bool 261 circuit_process_stream_xoff(edge_connection_t *conn, 262 const crypt_path_t *layer_hint) 263 { 264 bool retval = true; 265 266 if (BUG(!conn)) { 267 log_fn(LOG_PROTOCOL_WARN, LD_EDGE, 268 "Got XOFF on invalid stream?"); 269 return false; 270 } 271 272 /* Make sure this XOFF came from the right hop */ 273 if (!edge_uses_cpath(conn, layer_hint)) { 274 log_fn(LOG_PROTOCOL_WARN, LD_EDGE, 275 "Got XOFF from wrong hop."); 276 return false; 277 } 278 279 if (!edge_uses_flow_control(conn)) { 280 log_fn(LOG_PROTOCOL_WARN, LD_EDGE, 281 "Got XOFF for non-congestion control circuit"); 282 return false; 283 } 284 285 if (conn->xoff_received) { 286 log_fn(LOG_PROTOCOL_WARN, LD_EDGE, 287 "Got multiple XOFF on connection"); 288 return false; 289 } 290 291 /* If we are near the max, scale everything down */ 292 if (conn->num_xoff_recv == XOFF_COUNT_SCALE_AT) { 293 log_info(LD_EDGE, "Scaling down for XOFF count: %d %d %d", 294 conn->total_bytes_xmit, 295 conn->num_xoff_recv, 296 conn->num_xon_recv); 297 conn->total_bytes_xmit /= 2; 298 conn->num_xoff_recv /= 2; 299 conn->num_xon_recv /= 2; 300 } 301 302 conn->num_xoff_recv++; 303 304 /* Client-side check to make sure that XOFF is not sent too early, 305 * for dropmark attacks. The main sidechannel risk is early cells, 306 * but we also check to make sure that we have not received more XOFFs 307 * than could have been generated by the bytes we sent. 308 */ 309 if (TO_CONN(conn)->type == CONN_TYPE_AP || conn->hs_ident != NULL) { 310 uint32_t limit = 0; 311 if (conn->hs_ident) 312 limit = xoff_client; 313 else 314 limit = xoff_exit; 315 316 if (conn->total_bytes_xmit < limit*conn->num_xoff_recv) { 317 log_fn(LOG_PROTOCOL_WARN, LD_EDGE, 318 "Got extra XOFF for bytes sent. Got %d, expected max %d", 319 conn->num_xoff_recv, conn->total_bytes_xmit/limit); 320 /* We still process this, because the only dropmark defenses 321 * in C tor are via the vanguards addon's use of the read valid 322 * cells. So just signal that we think this is not valid protocol 323 * data and proceed. */ 324 retval = false; 325 } 326 } 327 328 log_info(LD_EDGE, "Got XOFF!"); 329 connection_stop_reading(TO_CONN(conn)); 330 conn->xoff_received = true; 331 332 /* If this is an entry conn, notify control port */ 333 if (TO_CONN(conn)->type == CONN_TYPE_AP) { 334 control_event_stream_status(TO_ENTRY_CONN(TO_CONN(conn)), 335 STREAM_EVENT_XOFF_RECV, 336 0); 337 } 338 339 return retval; 340 } 341 342 /** 343 * Process a stream XON, and if it validates, clear the xoff 344 * flag and resume reading on this edge connection. 345 * 346 * Also, use provided rate information to rate limit 347 * reading on this edge (or packagaing from it onto 348 * the circuit), to avoid XON/XOFF chatter. 349 * 350 * Returns true if the XON validates, false otherwise. 351 */ 352 bool 353 circuit_process_stream_xon(edge_connection_t *conn, 354 const crypt_path_t *layer_hint, 355 const relay_msg_t *msg) 356 { 357 xon_cell_t *xon; 358 bool retval = true; 359 360 if (BUG(!conn)) { 361 log_fn(LOG_PROTOCOL_WARN, LD_EDGE, 362 "Got XON on invalid stream?"); 363 return false; 364 } 365 366 /* Make sure this XON came from the right hop */ 367 if (!edge_uses_cpath(conn, layer_hint)) { 368 log_fn(LOG_PROTOCOL_WARN, LD_EDGE, 369 "Got XON from wrong hop."); 370 return false; 371 } 372 373 if (!edge_uses_flow_control(conn)) { 374 log_fn(LOG_PROTOCOL_WARN, LD_EDGE, 375 "Got XON for non-congestion control circuit"); 376 return false; 377 } 378 379 if (xon_cell_parse(&xon, msg->body, msg->length) < 0) { 380 log_fn(LOG_PROTOCOL_WARN, LD_EDGE, 381 "Received malformed XON cell."); 382 return false; 383 } 384 385 /* If we are near the max, scale everything down */ 386 if (conn->num_xon_recv == XON_COUNT_SCALE_AT) { 387 log_info(LD_EDGE, "Scaling down for XON count: %d %d %d", 388 conn->total_bytes_xmit, 389 conn->num_xoff_recv, 390 conn->num_xon_recv); 391 conn->total_bytes_xmit /= 2; 392 conn->num_xoff_recv /= 2; 393 conn->num_xon_recv /= 2; 394 } 395 396 conn->num_xon_recv++; 397 398 /* Client-side check to make sure that XON is not sent too early, 399 * for dropmark attacks. The main sidechannel risk is early cells, 400 * but we also check to see that we did not get more XONs than make 401 * sense for the number of bytes we sent. 402 */ 403 if (TO_CONN(conn)->type == CONN_TYPE_AP || conn->hs_ident != NULL) { 404 uint32_t limit = 0; 405 406 if (conn->hs_ident) 407 limit = MIN(xoff_client, xon_rate_bytes); 408 else 409 limit = MIN(xoff_exit, xon_rate_bytes); 410 411 if (conn->total_bytes_xmit < limit*conn->num_xon_recv) { 412 log_fn(LOG_PROTOCOL_WARN, LD_EDGE, 413 "Got extra XON for bytes sent. Got %d, expected max %d", 414 conn->num_xon_recv, conn->total_bytes_xmit/limit); 415 416 /* We still process this, because the only dropmark defenses 417 * in C tor are via the vanguards addon's use of the read valid 418 * cells. So just signal that we think this is not valid protocol 419 * data and proceed. */ 420 retval = false; 421 } 422 } 423 424 log_info(LD_EDGE, "Got XON: %d", xon->kbps_ewma); 425 426 /* Adjust the token bucket of this edge connection with the drain rate in 427 * the XON. Rate is in bytes from kilobit (kpbs). */ 428 uint64_t rate = ((uint64_t) xon_cell_get_kbps_ewma(xon) * 1000); 429 if (rate == 0 || INT32_MAX < rate) { 430 /* No rate. */ 431 rate = INT32_MAX; 432 } 433 token_bucket_rw_adjust(&conn->bucket, (uint32_t) rate, (uint32_t) rate); 434 435 if (conn->xoff_received) { 436 /* Clear the fact that we got an XOFF, so that this edge can 437 * start and stop reading normally */ 438 conn->xoff_received = false; 439 connection_start_reading(TO_CONN(conn)); 440 } 441 442 /* If this is an entry conn, notify control port */ 443 if (TO_CONN(conn)->type == CONN_TYPE_AP) { 444 control_event_stream_status(TO_ENTRY_CONN(TO_CONN(conn)), 445 STREAM_EVENT_XON_RECV, 446 0); 447 } 448 449 xon_cell_free(xon); 450 451 return retval; 452 } 453 454 /** 455 * Called from sendme_stream_data_received(), when data arrives 456 * from a circuit to our edge's outbuf, to decide if we need to send 457 * an XOFF. 458 * 459 * Returns the amount of cells remaining until the buffer is full, at 460 * which point it sends an XOFF, and returns 0. 461 * 462 * Returns less than 0 if we have queued more than a congestion window 463 * worth of data and need to close the circuit. 464 */ 465 int 466 flow_control_decide_xoff(edge_connection_t *stream) 467 { 468 size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream)); 469 uint32_t buffer_limit_xoff = 0; 470 471 if (BUG(!edge_uses_flow_control(stream))) { 472 log_err(LD_BUG, "Flow control called for non-congestion control circuit"); 473 return -1; 474 } 475 476 /* Onion services and clients are typically localhost edges, so they 477 * need different buffering limits than exits do */ 478 if (TO_CONN(stream)->type == CONN_TYPE_AP || stream->hs_ident != NULL) { 479 buffer_limit_xoff = xoff_client; 480 } else { 481 buffer_limit_xoff = xoff_exit; 482 } 483 484 if (total_buffered > buffer_limit_xoff) { 485 if (!stream->xoff_sent) { 486 uint64_t now = monotime_absolute_usec(); 487 488 if (stream->xoff_grace_period_start_usec == 0) { 489 /* If unset, we haven't begun the XOFF grace period. We need to start. 490 */ 491 log_debug(LD_EDGE, 492 "Exceeded XOFF limit; Beginning grace period: " 493 "total-buffered=%" TOR_PRIuSZ " xoff-limit=%d", 494 total_buffered, buffer_limit_xoff); 495 496 stream->xoff_grace_period_start_usec = now; 497 } else if (now > stream->xoff_grace_period_start_usec + 498 XOFF_GRACE_PERIOD_USEC) { 499 /* If we've exceeded our XOFF grace period, we need to send an XOFF. */ 500 log_info(LD_EDGE, 501 "Sending XOFF: total-buffered=%" TOR_PRIuSZ 502 " xoff-limit=%d grace-period-dur=%" PRIu64 "usec", 503 total_buffered, buffer_limit_xoff, 504 now - stream->xoff_grace_period_start_usec); 505 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xoff_sending), stream); 506 507 cc_stats_flow_xoff_outbuf_ma = 508 stats_update_running_avg(cc_stats_flow_xoff_outbuf_ma, 509 total_buffered); 510 511 circuit_send_stream_xoff(stream); 512 513 /* Clear the drain rate. It is considered wrong if we 514 * got all the way to XOFF */ 515 stream->ewma_drain_rate = 0; 516 517 /* Unset our grace period. */ 518 stream->xoff_grace_period_start_usec = 0; 519 } else { 520 /* Else we're in the XOFF grace period, so don't do anything. */ 521 } 522 } 523 } else { 524 /* The outbuf length is less than the XOFF limit, so unset our grace 525 * period. */ 526 stream->xoff_grace_period_start_usec = 0; 527 } 528 529 /* If the outbuf has accumulated more than the expected burst limit of 530 * cells, then assume it is not draining, and call decide_xon. We must 531 * do this because writes only happen when the socket unblocks, so 532 * may not otherwise notice accumulation of data in the outbuf for 533 * advisory XONs. */ 534 if (total_buffered > MAX_EXPECTED_CELL_BURST*RELAY_PAYLOAD_SIZE_MIN) { 535 flow_control_decide_xon(stream, 0); 536 } 537 538 /* Flow control always takes more data; we rely on the oomkiller to 539 * handle misbehavior. */ 540 return 0; 541 } 542 543 /** 544 * Returns true if the stream's drain rate has changed significantly. 545 * 546 * Returns false if the monotime clock is stalled, or if we have 547 * no previous drain rate information. 548 */ 549 static bool 550 stream_drain_rate_changed(const edge_connection_t *stream) 551 { 552 if (!is_monotime_clock_reliable()) { 553 return false; 554 } 555 556 if (!stream->ewma_rate_last_sent) { 557 return false; 558 } 559 560 if (stream->ewma_drain_rate > 561 (100+(uint64_t)xon_change_pct)*stream->ewma_rate_last_sent/100) { 562 return true; 563 } 564 565 if (stream->ewma_drain_rate < 566 (100-(uint64_t)xon_change_pct)*stream->ewma_rate_last_sent/100) { 567 return true; 568 } 569 570 return false; 571 } 572 573 /** 574 * Called whenever we drain an edge connection outbuf by writing on 575 * its socket, to decide if it is time to send an xon. 576 * 577 * The n_written parameter tells us how many bytes we have written 578 * this time, which is used to compute the advisory drain rate fields. 579 */ 580 void 581 flow_control_decide_xon(edge_connection_t *stream, size_t n_written) 582 { 583 size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream)); 584 585 /* Bounds check the number of drained bytes, and scale */ 586 if (stream->drained_bytes >= UINT32_MAX - n_written) { 587 /* Cut the bytes in half, and move the start time up halfway to now 588 * (if we have one). */ 589 stream->drained_bytes /= 2; 590 591 if (stream->drain_start_usec) { 592 uint64_t now = monotime_absolute_usec(); 593 594 stream->drain_start_usec = now - (now-stream->drain_start_usec)/2; 595 } 596 } 597 598 /* Accumulate drained bytes since last rate computation */ 599 stream->drained_bytes += n_written; 600 601 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon), stream, n_written); 602 603 /* Check for bad monotime clock and bytecount wrap */ 604 if (!is_monotime_clock_reliable()) { 605 /* If the monotime clock ever goes wrong, the safest thing to do 606 * is just clear our short-term rate info and wait for the clock to 607 * become reliable again.. */ 608 stream->drain_start_usec = 0; 609 stream->drained_bytes = 0; 610 } else { 611 /* If we have no drain start timestamp, and we still have 612 * remaining buffer, start the buffering counter */ 613 if (!stream->drain_start_usec && total_buffered > 0) { 614 log_debug(LD_EDGE, "Began edge buffering: %d %d %"TOR_PRIuSZ, 615 stream->ewma_rate_last_sent, 616 stream->ewma_drain_rate, 617 total_buffered); 618 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_start), 619 stream); 620 stream->drain_start_usec = monotime_absolute_usec(); 621 stream->drained_bytes = 0; 622 } 623 } 624 625 if (stream->drain_start_usec) { 626 /* If we have spent enough time in a queued state, update our drain 627 * rate. */ 628 if (stream->drained_bytes > xon_rate_bytes) { 629 /* No previous drained bytes means it is the first time we are computing 630 * it so use the value we just drained onto the socket as a baseline. It 631 * won't be accurate but it will be a start towards the right value. 632 * 633 * We have to do this in order to have a drain rate else we could be 634 * sending a drain rate of 0 in an XON which would be undesirable and 635 * basically like sending an XOFF. */ 636 if (stream->prev_drained_bytes == 0) { 637 stream->prev_drained_bytes = stream->drained_bytes; 638 } 639 uint32_t drain_rate = compute_drain_rate(stream); 640 /* Once the drain rate has been computed, note how many bytes we just 641 * drained so it can be used at the next calculation. We do this here 642 * because it gets reset once the rate is changed. */ 643 stream->prev_drained_bytes = stream->drained_bytes; 644 645 if (drain_rate) { 646 stream->ewma_drain_rate = 647 (uint32_t)n_count_ewma(drain_rate, 648 stream->ewma_drain_rate, 649 xon_ewma_cnt); 650 log_debug(LD_EDGE, "Updating drain rate: %d %d %"TOR_PRIuSZ, 651 drain_rate, 652 stream->ewma_drain_rate, 653 total_buffered); 654 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_update), 655 stream, drain_rate); 656 /* Reset recent byte counts. This prevents us from sending advisory 657 * XONs more frequent than every xon_rate_bytes. */ 658 stream->drained_bytes = 0; 659 stream->drain_start_usec = 0; 660 } 661 } 662 } 663 664 /* If we don't have an XOFF outstanding, consider updating an 665 * old rate */ 666 if (!stream->xoff_sent) { 667 if (stream_drain_rate_changed(stream)) { 668 /* If we are still buffering and the rate changed, update 669 * advisory XON */ 670 log_info(LD_EDGE, "Sending rate-change XON: %d %d %"TOR_PRIuSZ, 671 stream->ewma_rate_last_sent, 672 stream->ewma_drain_rate, 673 total_buffered); 674 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_rate_change), stream); 675 676 cc_stats_flow_xon_outbuf_ma = 677 stats_update_running_avg(cc_stats_flow_xon_outbuf_ma, 678 total_buffered); 679 680 circuit_send_stream_xon(stream); 681 } 682 } else if (total_buffered == 0) { 683 log_info(LD_EDGE, "Sending XON: %d %d %"TOR_PRIuSZ, 684 stream->ewma_rate_last_sent, 685 stream->ewma_drain_rate, 686 total_buffered); 687 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_partial_drain), stream); 688 circuit_send_stream_xon(stream); 689 } 690 691 /* If the buffer has fully emptied, clear the drain timestamp, 692 * so we can total only bytes drained while outbuf is 0. */ 693 if (total_buffered == 0) { 694 stream->drain_start_usec = 0; 695 696 /* After we've spent 'xon_rate_bytes' with the queue fully drained, 697 * double any rate we sent. */ 698 if (stream->drained_bytes >= xon_rate_bytes && 699 stream->ewma_rate_last_sent) { 700 stream->ewma_drain_rate = MIN(INT32_MAX, 2*stream->ewma_drain_rate); 701 702 log_debug(LD_EDGE, 703 "Queue empty for xon_rate_limit bytes: %d %d", 704 stream->ewma_rate_last_sent, 705 stream->ewma_drain_rate); 706 tor_trace(TR_SUBSYS(cc), TR_EV(flow_decide_xon_drain_doubled), stream); 707 /* Resetting the drained bytes count. We need to keep its value as a 708 * previous so the drain rate calculation takes into account what was 709 * actually drain the last time. */ 710 stream->prev_drained_bytes = stream->drained_bytes; 711 stream->drained_bytes = 0; 712 } 713 } 714 715 return; 716 } 717 718 /** 719 * Note that we packaged some data on this stream. Used to enforce 720 * client-side dropmark limits 721 */ 722 void 723 flow_control_note_sent_data(edge_connection_t *stream, size_t len) 724 { 725 /* If we are near the max, scale everything down */ 726 if (stream->total_bytes_xmit >= TOTAL_XMIT_SCALE_AT-len) { 727 log_info(LD_EDGE, "Scaling down for flow control xmit bytes:: %d %d %d", 728 stream->total_bytes_xmit, 729 stream->num_xoff_recv, 730 stream->num_xon_recv); 731 732 stream->total_bytes_xmit /= 2; 733 stream->num_xoff_recv /= 2; 734 stream->num_xon_recv /= 2; 735 } 736 737 stream->total_bytes_xmit += len; 738 } 739 740 /** Returns true if an edge connection uses flow control */ 741 bool 742 edge_uses_flow_control(const edge_connection_t *stream) 743 { 744 bool ret = (stream->on_circuit && stream->on_circuit->ccontrol) || 745 (stream->cpath_layer && stream->cpath_layer->ccontrol); 746 747 /* All circuits with congestion control use flow control */ 748 return ret; 749 } 750 751 /** Returns true if a connection is an edge conn that uses flow control */ 752 bool 753 conn_uses_flow_control(connection_t *conn) 754 { 755 bool ret = false; 756 757 if (CONN_IS_EDGE(conn)) { 758 edge_connection_t *edge = TO_EDGE_CONN(conn); 759 760 if (edge_uses_flow_control(edge)) { 761 ret = true; 762 } 763 } 764 765 return ret; 766 }