tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

bufferevent_ratelim.c (30092B)


      1 /*
      2 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
      3 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
      4 * All rights reserved.
      5 *
      6 * Redistribution and use in source and binary forms, with or without
      7 * modification, are permitted provided that the following conditions
      8 * are met:
      9 * 1. Redistributions of source code must retain the above copyright
     10 *    notice, this list of conditions and the following disclaimer.
     11 * 2. Redistributions in binary form must reproduce the above copyright
     12 *    notice, this list of conditions and the following disclaimer in the
     13 *    documentation and/or other materials provided with the distribution.
     14 * 3. The name of the author may not be used to endorse or promote products
     15 *    derived from this software without specific prior written permission.
     16 *
     17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
     18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
     19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
     20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
     21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
     22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
     26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     27 */
     28 #include "evconfig-private.h"
     29 
     30 #include <sys/types.h>
     31 #include <limits.h>
     32 #include <string.h>
     33 #include <stdlib.h>
     34 
     35 #include "event2/event.h"
     36 #include "event2/event_struct.h"
     37 #include "event2/util.h"
     38 #include "event2/bufferevent.h"
     39 #include "event2/bufferevent_struct.h"
     40 #include "event2/buffer.h"
     41 
     42 #include "ratelim-internal.h"
     43 
     44 #include "bufferevent-internal.h"
     45 #include "mm-internal.h"
     46 #include "util-internal.h"
     47 #include "event-internal.h"
     48 
     49 int
     50 ev_token_bucket_init_(struct ev_token_bucket *bucket,
     51    const struct ev_token_bucket_cfg *cfg,
     52    ev_uint32_t current_tick,
     53    int reinitialize)
     54 {
     55 if (reinitialize) {
     56 	/* on reinitialization, we only clip downwards, since we've
     57 	   already used who-knows-how-much bandwidth this tick.  We
     58 	   leave "last_updated" as it is; the next update will add the
     59 	   appropriate amount of bandwidth to the bucket.
     60 	*/
     61 	if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
     62 		bucket->read_limit = cfg->read_maximum;
     63 	if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
     64 		bucket->write_limit = cfg->write_maximum;
     65 } else {
     66 	bucket->read_limit = cfg->read_rate;
     67 	bucket->write_limit = cfg->write_rate;
     68 	bucket->last_updated = current_tick;
     69 }
     70 return 0;
     71 }
     72 
     73 int
     74 ev_token_bucket_update_(struct ev_token_bucket *bucket,
     75    const struct ev_token_bucket_cfg *cfg,
     76    ev_uint32_t current_tick)
     77 {
     78 /* It's okay if the tick number overflows, since we'll just
     79  * wrap around when we do the unsigned substraction. */
     80 unsigned n_ticks = current_tick - bucket->last_updated;
     81 
     82 /* Make sure some ticks actually happened, and that time didn't
     83  * roll back. */
     84 if (n_ticks == 0 || n_ticks > INT_MAX)
     85 	return 0;
     86 
     87 /* Naively, we would say
     88 	bucket->limit += n_ticks * cfg->rate;
     89 
     90 	if (bucket->limit > cfg->maximum)
     91 		bucket->limit = cfg->maximum;
     92 
     93    But we're worried about overflow, so we do it like this:
     94 */
     95 
     96 if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
     97 	bucket->read_limit = cfg->read_maximum;
     98 else
     99 	bucket->read_limit += n_ticks * cfg->read_rate;
    100 
    101 
    102 if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
    103 	bucket->write_limit = cfg->write_maximum;
    104 else
    105 	bucket->write_limit += n_ticks * cfg->write_rate;
    106 
    107 
    108 bucket->last_updated = current_tick;
    109 
    110 return 1;
    111 }
    112 
    113 static inline void
    114 bufferevent_update_buckets(struct bufferevent_private *bev)
    115 {
    116 /* Must hold lock on bev. */
    117 struct timeval now;
    118 unsigned tick;
    119 event_base_gettimeofday_cached(bev->bev.ev_base, &now);
    120 tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
    121 if (tick != bev->rate_limiting->limit.last_updated)
    122 	ev_token_bucket_update_(&bev->rate_limiting->limit,
    123 	    bev->rate_limiting->cfg, tick);
    124 }
    125 
    126 ev_uint32_t
    127 ev_token_bucket_get_tick_(const struct timeval *tv,
    128    const struct ev_token_bucket_cfg *cfg)
    129 {
    130 /* This computation uses two multiplies and a divide.  We could do
    131  * fewer if we knew that the tick length was an integer number of
    132  * seconds, or if we knew it divided evenly into a second.  We should
    133  * investigate that more.
    134  */
    135 
    136 /* We cast to an ev_uint64_t first, since we don't want to overflow
    137  * before we do the final divide. */
    138 ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
    139 return (unsigned)(msec / cfg->msec_per_tick);
    140 }
    141 
    142 struct ev_token_bucket_cfg *
    143 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
    144    size_t write_rate, size_t write_burst,
    145    const struct timeval *tick_len)
    146 {
    147 struct ev_token_bucket_cfg *r;
    148 struct timeval g;
    149 if (! tick_len) {
    150 	g.tv_sec = 1;
    151 	g.tv_usec = 0;
    152 	tick_len = &g;
    153 }
    154 if (read_rate > read_burst || write_rate > write_burst ||
    155     read_rate < 1 || write_rate < 1)
    156 	return NULL;
    157 if (read_rate > EV_RATE_LIMIT_MAX ||
    158     write_rate > EV_RATE_LIMIT_MAX ||
    159     read_burst > EV_RATE_LIMIT_MAX ||
    160     write_burst > EV_RATE_LIMIT_MAX)
    161 	return NULL;
    162 r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
    163 if (!r)
    164 	return NULL;
    165 r->read_rate = read_rate;
    166 r->write_rate = write_rate;
    167 r->read_maximum = read_burst;
    168 r->write_maximum = write_burst;
    169 memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
    170 r->msec_per_tick = (tick_len->tv_sec * 1000) +
    171     (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
    172 return r;
    173 }
    174 
    175 void
    176 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
    177 {
    178 mm_free(cfg);
    179 }
    180 
    181 /* Default values for max_single_read & max_single_write variables. */
    182 #define MAX_SINGLE_READ_DEFAULT 16384
    183 #define MAX_SINGLE_WRITE_DEFAULT 16384
    184 
    185 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
    186 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
    187 
    188 static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
    189 static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
    190 static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
    191 static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
    192 
    193 /** Helper: figure out the maximum amount we should write if is_write, or
    194    the maximum amount we should read if is_read.  Return that maximum, or
    195    0 if our bucket is wholly exhausted.
    196 */
    197 static inline ev_ssize_t
    198 bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
    199 {
    200 /* needs lock on bev. */
    201 ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
    202 
    203 #define LIM(x)						\
    204 (is_write ? (x).write_limit : (x).read_limit)
    205 
    206 #define GROUP_SUSPENDED(g)			\
    207 (is_write ? (g)->write_suspended : (g)->read_suspended)
    208 
    209 /* Sets max_so_far to MIN(x, max_so_far) */
    210 #define CLAMPTO(x)				\
    211 do {					\
    212 	if (max_so_far > (x))		\
    213 		max_so_far = (x);	\
    214 } while (0);
    215 
    216 if (!bev->rate_limiting)
    217 	return max_so_far;
    218 
    219 /* If rate-limiting is enabled at all, update the appropriate
    220    bucket, and take the smaller of our rate limit and the group
    221    rate limit.
    222  */
    223 
    224 if (bev->rate_limiting->cfg) {
    225 	bufferevent_update_buckets(bev);
    226 	max_so_far = LIM(bev->rate_limiting->limit);
    227 }
    228 if (bev->rate_limiting->group) {
    229 	struct bufferevent_rate_limit_group *g =
    230 	    bev->rate_limiting->group;
    231 	ev_ssize_t share;
    232 	LOCK_GROUP(g);
    233 	if (GROUP_SUSPENDED(g)) {
    234 		/* We can get here if we failed to lock this
    235 		 * particular bufferevent while suspending the whole
    236 		 * group. */
    237 		if (is_write)
    238 			bufferevent_suspend_write_(&bev->bev,
    239 			    BEV_SUSPEND_BW_GROUP);
    240 		else
    241 			bufferevent_suspend_read_(&bev->bev,
    242 			    BEV_SUSPEND_BW_GROUP);
    243 		share = 0;
    244 	} else {
    245 		/* XXXX probably we should divide among the active
    246 		 * members, not the total members. */
    247 		share = LIM(g->rate_limit) / g->n_members;
    248 		if (share < g->min_share)
    249 			share = g->min_share;
    250 	}
    251 	UNLOCK_GROUP(g);
    252 	CLAMPTO(share);
    253 }
    254 
    255 if (max_so_far < 0)
    256 	max_so_far = 0;
    257 return max_so_far;
    258 }
    259 
    260 ev_ssize_t
    261 bufferevent_get_read_max_(struct bufferevent_private *bev)
    262 {
    263 return bufferevent_get_rlim_max_(bev, 0);
    264 }
    265 
    266 ev_ssize_t
    267 bufferevent_get_write_max_(struct bufferevent_private *bev)
    268 {
    269 return bufferevent_get_rlim_max_(bev, 1);
    270 }
    271 
    272 int
    273 bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
    274 {
    275 /* XXXXX Make sure all users of this function check its return value */
    276 int r = 0;
    277 /* need to hold lock on bev */
    278 if (!bev->rate_limiting)
    279 	return 0;
    280 
    281 if (bev->rate_limiting->cfg) {
    282 	bev->rate_limiting->limit.read_limit -= bytes;
    283 	if (bev->rate_limiting->limit.read_limit <= 0) {
    284 		bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
    285 		if (event_add(&bev->rate_limiting->refill_bucket_event,
    286 			&bev->rate_limiting->cfg->tick_timeout) < 0)
    287 			r = -1;
    288 	} else if (bev->read_suspended & BEV_SUSPEND_BW) {
    289 		if (!(bev->write_suspended & BEV_SUSPEND_BW))
    290 			event_del(&bev->rate_limiting->refill_bucket_event);
    291 		bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
    292 	}
    293 }
    294 
    295 if (bev->rate_limiting->group) {
    296 	LOCK_GROUP(bev->rate_limiting->group);
    297 	bev->rate_limiting->group->rate_limit.read_limit -= bytes;
    298 	bev->rate_limiting->group->total_read += bytes;
    299 	if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
    300 		bev_group_suspend_reading_(bev->rate_limiting->group);
    301 	} else if (bev->rate_limiting->group->read_suspended) {
    302 		bev_group_unsuspend_reading_(bev->rate_limiting->group);
    303 	}
    304 	UNLOCK_GROUP(bev->rate_limiting->group);
    305 }
    306 
    307 return r;
    308 }
    309 
    310 int
    311 bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
    312 {
    313 /* XXXXX Make sure all users of this function check its return value */
    314 int r = 0;
    315 /* need to hold lock */
    316 if (!bev->rate_limiting)
    317 	return 0;
    318 
    319 if (bev->rate_limiting->cfg) {
    320 	bev->rate_limiting->limit.write_limit -= bytes;
    321 	if (bev->rate_limiting->limit.write_limit <= 0) {
    322 		bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
    323 		if (event_add(&bev->rate_limiting->refill_bucket_event,
    324 			&bev->rate_limiting->cfg->tick_timeout) < 0)
    325 			r = -1;
    326 	} else if (bev->write_suspended & BEV_SUSPEND_BW) {
    327 		if (!(bev->read_suspended & BEV_SUSPEND_BW))
    328 			event_del(&bev->rate_limiting->refill_bucket_event);
    329 		bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
    330 	}
    331 }
    332 
    333 if (bev->rate_limiting->group) {
    334 	LOCK_GROUP(bev->rate_limiting->group);
    335 	bev->rate_limiting->group->rate_limit.write_limit -= bytes;
    336 	bev->rate_limiting->group->total_written += bytes;
    337 	if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
    338 		bev_group_suspend_writing_(bev->rate_limiting->group);
    339 	} else if (bev->rate_limiting->group->write_suspended) {
    340 		bev_group_unsuspend_writing_(bev->rate_limiting->group);
    341 	}
    342 	UNLOCK_GROUP(bev->rate_limiting->group);
    343 }
    344 
    345 return r;
    346 }
    347 
    348 /** Stop reading on every bufferevent in <b>g</b> */
    349 static int
    350 bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
    351 {
    352 /* Needs group lock */
    353 struct bufferevent_private *bev;
    354 g->read_suspended = 1;
    355 g->pending_unsuspend_read = 0;
    356 
    357 /* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
    358    to prevent a deadlock.  (Ordinarily, the group lock nests inside
    359    the bufferevent locks.  If we are unable to lock any individual
    360    bufferevent, it will find out later when it looks at its limit
    361    and sees that its group is suspended.)
    362 */
    363 LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
    364 	if (EVLOCK_TRY_LOCK_(bev->lock)) {
    365 		bufferevent_suspend_read_(&bev->bev,
    366 		    BEV_SUSPEND_BW_GROUP);
    367 		EVLOCK_UNLOCK(bev->lock, 0);
    368 	}
    369 }
    370 return 0;
    371 }
    372 
    373 /** Stop writing on every bufferevent in <b>g</b> */
    374 static int
    375 bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
    376 {
    377 /* Needs group lock */
    378 struct bufferevent_private *bev;
    379 g->write_suspended = 1;
    380 g->pending_unsuspend_write = 0;
    381 LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
    382 	if (EVLOCK_TRY_LOCK_(bev->lock)) {
    383 		bufferevent_suspend_write_(&bev->bev,
    384 		    BEV_SUSPEND_BW_GROUP);
    385 		EVLOCK_UNLOCK(bev->lock, 0);
    386 	}
    387 }
    388 return 0;
    389 }
    390 
    391 /** Timer callback invoked on a single bufferevent with one or more exhausted
    392    buckets when they are ready to refill. */
    393 static void
    394 bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
    395 {
    396 unsigned tick;
    397 struct timeval now;
    398 struct bufferevent_private *bev = arg;
    399 int again = 0;
    400 BEV_LOCK(&bev->bev);
    401 if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
    402 	BEV_UNLOCK(&bev->bev);
    403 	return;
    404 }
    405 
    406 /* First, update the bucket */
    407 event_base_gettimeofday_cached(bev->bev.ev_base, &now);
    408 tick = ev_token_bucket_get_tick_(&now,
    409     bev->rate_limiting->cfg);
    410 ev_token_bucket_update_(&bev->rate_limiting->limit,
    411     bev->rate_limiting->cfg,
    412     tick);
    413 
    414 /* Now unsuspend any read/write operations as appropriate. */
    415 if ((bev->read_suspended & BEV_SUSPEND_BW)) {
    416 	if (bev->rate_limiting->limit.read_limit > 0)
    417 		bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
    418 	else
    419 		again = 1;
    420 }
    421 if ((bev->write_suspended & BEV_SUSPEND_BW)) {
    422 	if (bev->rate_limiting->limit.write_limit > 0)
    423 		bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
    424 	else
    425 		again = 1;
    426 }
    427 if (again) {
    428 	/* One or more of the buckets may need another refill if they
    429 	   started negative.
    430 
    431 	   XXXX if we need to be quiet for more ticks, we should
    432 	   maybe figure out what timeout we really want.
    433 	*/
    434 	/* XXXX Handle event_add failure somehow */
    435 	event_add(&bev->rate_limiting->refill_bucket_event,
    436 	    &bev->rate_limiting->cfg->tick_timeout);
    437 }
    438 BEV_UNLOCK(&bev->bev);
    439 }
    440 
    441 /** Helper: grab a random element from a bufferevent group.
    442 *
    443 * Requires that we hold the lock on the group.
    444 */
    445 static struct bufferevent_private *
    446 bev_group_random_element_(struct bufferevent_rate_limit_group *group)
    447 {
    448 int which;
    449 struct bufferevent_private *bev;
    450 
    451 /* requires group lock */
    452 
    453 if (!group->n_members)
    454 	return NULL;
    455 
    456 EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
    457 
    458 which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
    459 
    460 bev = LIST_FIRST(&group->members);
    461 while (which--)
    462 	bev = LIST_NEXT(bev, rate_limiting->next_in_group);
    463 
    464 return bev;
    465 }
    466 
    467 /** Iterate over the elements of a rate-limiting group 'g' with a random
    468    starting point, assigning each to the variable 'bev', and executing the
    469    block 'block'.
    470 
    471    We do this in a half-baked effort to get fairness among group members.
    472    XXX Round-robin or some kind of priority queue would be even more fair.
    473 */
    474 #define FOREACH_RANDOM_ORDER(block)			\
    475 do {						\
    476 	first = bev_group_random_element_(g);	\
    477 	for (bev = first; bev != LIST_END(&g->members); \
    478 	    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
    479 		block ;					 \
    480 	}						 \
    481 	for (bev = LIST_FIRST(&g->members); bev && bev != first; \
    482 	    bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
    483 		block ;						\
    484 	}							\
    485 } while (0)
    486 
    487 static void
    488 bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
    489 {
    490 int again = 0;
    491 struct bufferevent_private *bev, *first;
    492 
    493 g->read_suspended = 0;
    494 FOREACH_RANDOM_ORDER({
    495 	if (EVLOCK_TRY_LOCK_(bev->lock)) {
    496 		bufferevent_unsuspend_read_(&bev->bev,
    497 		    BEV_SUSPEND_BW_GROUP);
    498 		EVLOCK_UNLOCK(bev->lock, 0);
    499 	} else {
    500 		again = 1;
    501 	}
    502 });
    503 g->pending_unsuspend_read = again;
    504 }
    505 
    506 static void
    507 bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
    508 {
    509 int again = 0;
    510 struct bufferevent_private *bev, *first;
    511 g->write_suspended = 0;
    512 
    513 FOREACH_RANDOM_ORDER({
    514 	if (EVLOCK_TRY_LOCK_(bev->lock)) {
    515 		bufferevent_unsuspend_write_(&bev->bev,
    516 		    BEV_SUSPEND_BW_GROUP);
    517 		EVLOCK_UNLOCK(bev->lock, 0);
    518 	} else {
    519 		again = 1;
    520 	}
    521 });
    522 g->pending_unsuspend_write = again;
    523 }
    524 
    525 /** Callback invoked every tick to add more elements to the group bucket
    526    and unsuspend group members as needed.
    527 */
    528 static void
    529 bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
    530 {
    531 struct bufferevent_rate_limit_group *g = arg;
    532 unsigned tick;
    533 struct timeval now;
    534 
    535 event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
    536 
    537 LOCK_GROUP(g);
    538 
    539 tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
    540 ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
    541 
    542 if (g->pending_unsuspend_read ||
    543     (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
    544 	bev_group_unsuspend_reading_(g);
    545 }
    546 if (g->pending_unsuspend_write ||
    547     (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
    548 	bev_group_unsuspend_writing_(g);
    549 }
    550 
    551 /* XXXX Rather than waiting to the next tick to unsuspend stuff
    552  * with pending_unsuspend_write/read, we should do it on the
    553  * next iteration of the mainloop.
    554  */
    555 
    556 UNLOCK_GROUP(g);
    557 }
    558 
    559 int
    560 bufferevent_set_rate_limit(struct bufferevent *bev,
    561    struct ev_token_bucket_cfg *cfg)
    562 {
    563 struct bufferevent_private *bevp = BEV_UPCAST(bev);
    564 int r = -1;
    565 struct bufferevent_rate_limit *rlim;
    566 struct timeval now;
    567 ev_uint32_t tick;
    568 int reinit = 0, suspended = 0;
    569 /* XXX reference-count cfg */
    570 
    571 BEV_LOCK(bev);
    572 
    573 if (cfg == NULL) {
    574 	if (bevp->rate_limiting) {
    575 		rlim = bevp->rate_limiting;
    576 		rlim->cfg = NULL;
    577 		bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
    578 		bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
    579 		if (event_initialized(&rlim->refill_bucket_event))
    580 			event_del(&rlim->refill_bucket_event);
    581 	}
    582 	r = 0;
    583 	goto done;
    584 }
    585 
    586 event_base_gettimeofday_cached(bev->ev_base, &now);
    587 tick = ev_token_bucket_get_tick_(&now, cfg);
    588 
    589 if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
    590 	/* no-op */
    591 	r = 0;
    592 	goto done;
    593 }
    594 if (bevp->rate_limiting == NULL) {
    595 	rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
    596 	if (!rlim)
    597 		goto done;
    598 	bevp->rate_limiting = rlim;
    599 } else {
    600 	rlim = bevp->rate_limiting;
    601 }
    602 reinit = rlim->cfg != NULL;
    603 
    604 rlim->cfg = cfg;
    605 ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
    606 
    607 if (reinit) {
    608 	EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
    609 	event_del(&rlim->refill_bucket_event);
    610 }
    611 event_assign(&rlim->refill_bucket_event, bev->ev_base,
    612     -1, EV_FINALIZE, bev_refill_callback_, bevp);
    613 
    614 if (rlim->limit.read_limit > 0) {
    615 	bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
    616 } else {
    617 	bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
    618 	suspended=1;
    619 }
    620 if (rlim->limit.write_limit > 0) {
    621 	bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
    622 } else {
    623 	bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
    624 	suspended = 1;
    625 }
    626 
    627 if (suspended)
    628 	event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
    629 
    630 r = 0;
    631 
    632 done:
    633 BEV_UNLOCK(bev);
    634 return r;
    635 }
    636 
    637 struct bufferevent_rate_limit_group *
    638 bufferevent_rate_limit_group_new(struct event_base *base,
    639    const struct ev_token_bucket_cfg *cfg)
    640 {
    641 struct bufferevent_rate_limit_group *g;
    642 struct timeval now;
    643 ev_uint32_t tick;
    644 
    645 event_base_gettimeofday_cached(base, &now);
    646 tick = ev_token_bucket_get_tick_(&now, cfg);
    647 
    648 g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
    649 if (!g)
    650 	return NULL;
    651 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
    652 LIST_INIT(&g->members);
    653 
    654 ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
    655 
    656 event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
    657     bev_group_refill_callback_, g);
    658 /*XXXX handle event_add failure */
    659 event_add(&g->master_refill_event, &cfg->tick_timeout);
    660 
    661 EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
    662 
    663 bufferevent_rate_limit_group_set_min_share(g, 64);
    664 
    665 evutil_weakrand_seed_(&g->weakrand_seed,
    666     (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
    667 
    668 return g;
    669 }
    670 
    671 int
    672 bufferevent_rate_limit_group_set_cfg(
    673 struct bufferevent_rate_limit_group *g,
    674 const struct ev_token_bucket_cfg *cfg)
    675 {
    676 int same_tick;
    677 if (!g || !cfg)
    678 	return -1;
    679 
    680 LOCK_GROUP(g);
    681 same_tick = evutil_timercmp(
    682 	&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
    683 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
    684 
    685 if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
    686 	g->rate_limit.read_limit = cfg->read_maximum;
    687 if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
    688 	g->rate_limit.write_limit = cfg->write_maximum;
    689 
    690 if (!same_tick) {
    691 	/* This can cause a hiccup in the schedule */
    692 	event_add(&g->master_refill_event, &cfg->tick_timeout);
    693 }
    694 
    695 /* The new limits might force us to adjust min_share differently. */
    696 bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
    697 
    698 UNLOCK_GROUP(g);
    699 return 0;
    700 }
    701 
    702 int
    703 bufferevent_rate_limit_group_set_min_share(
    704 struct bufferevent_rate_limit_group *g,
    705 size_t share)
    706 {
    707 if (share > EV_SSIZE_MAX)
    708 	return -1;
    709 
    710 g->configured_min_share = share;
    711 
    712 /* Can't set share to less than the one-tick maximum.  IOW, at steady
    713  * state, at least one connection can go per tick. */
    714 if (share > g->rate_limit_cfg.read_rate)
    715 	share = g->rate_limit_cfg.read_rate;
    716 if (share > g->rate_limit_cfg.write_rate)
    717 	share = g->rate_limit_cfg.write_rate;
    718 
    719 g->min_share = share;
    720 return 0;
    721 }
    722 
    723 void
    724 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
    725 {
    726 LOCK_GROUP(g);
    727 EVUTIL_ASSERT(0 == g->n_members);
    728 event_del(&g->master_refill_event);
    729 UNLOCK_GROUP(g);
    730 EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
    731 mm_free(g);
    732 }
    733 
    734 int
    735 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
    736    struct bufferevent_rate_limit_group *g)
    737 {
    738 int wsuspend, rsuspend;
    739 struct bufferevent_private *bevp = BEV_UPCAST(bev);
    740 BEV_LOCK(bev);
    741 
    742 if (!bevp->rate_limiting) {
    743 	struct bufferevent_rate_limit *rlim;
    744 	rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
    745 	if (!rlim) {
    746 		BEV_UNLOCK(bev);
    747 		return -1;
    748 	}
    749 	event_assign(&rlim->refill_bucket_event, bev->ev_base,
    750 	    -1, EV_FINALIZE, bev_refill_callback_, bevp);
    751 	bevp->rate_limiting = rlim;
    752 }
    753 
    754 if (bevp->rate_limiting->group == g) {
    755 	BEV_UNLOCK(bev);
    756 	return 0;
    757 }
    758 if (bevp->rate_limiting->group)
    759 	bufferevent_remove_from_rate_limit_group(bev);
    760 
    761 LOCK_GROUP(g);
    762 bevp->rate_limiting->group = g;
    763 ++g->n_members;
    764 LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
    765 
    766 rsuspend = g->read_suspended;
    767 wsuspend = g->write_suspended;
    768 
    769 UNLOCK_GROUP(g);
    770 
    771 if (rsuspend)
    772 	bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
    773 if (wsuspend)
    774 	bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
    775 
    776 BEV_UNLOCK(bev);
    777 return 0;
    778 }
    779 
    780 int
    781 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
    782 {
    783 return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
    784 }
    785 
    786 int
    787 bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
    788    int unsuspend)
    789 {
    790 struct bufferevent_private *bevp = BEV_UPCAST(bev);
    791 BEV_LOCK(bev);
    792 if (bevp->rate_limiting && bevp->rate_limiting->group) {
    793 	struct bufferevent_rate_limit_group *g =
    794 	    bevp->rate_limiting->group;
    795 	LOCK_GROUP(g);
    796 	bevp->rate_limiting->group = NULL;
    797 	--g->n_members;
    798 	LIST_REMOVE(bevp, rate_limiting->next_in_group);
    799 	UNLOCK_GROUP(g);
    800 }
    801 if (unsuspend) {
    802 	bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
    803 	bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
    804 }
    805 BEV_UNLOCK(bev);
    806 return 0;
    807 }
    808 
    809 /* ===
    810 * API functions to expose rate limits.
    811 *
    812 * Don't use these from inside Libevent; they're meant to be for use by
    813 * the program.
    814 * === */
    815 
    816 /* Mostly you don't want to use this function from inside libevent;
    817 * bufferevent_get_read_max_() is more likely what you want*/
    818 ev_ssize_t
    819 bufferevent_get_read_limit(struct bufferevent *bev)
    820 {
    821 ev_ssize_t r;
    822 struct bufferevent_private *bevp;
    823 BEV_LOCK(bev);
    824 bevp = BEV_UPCAST(bev);
    825 if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
    826 	bufferevent_update_buckets(bevp);
    827 	r = bevp->rate_limiting->limit.read_limit;
    828 } else {
    829 	r = EV_SSIZE_MAX;
    830 }
    831 BEV_UNLOCK(bev);
    832 return r;
    833 }
    834 
    835 /* Mostly you don't want to use this function from inside libevent;
    836 * bufferevent_get_write_max_() is more likely what you want*/
    837 ev_ssize_t
    838 bufferevent_get_write_limit(struct bufferevent *bev)
    839 {
    840 ev_ssize_t r;
    841 struct bufferevent_private *bevp;
    842 BEV_LOCK(bev);
    843 bevp = BEV_UPCAST(bev);
    844 if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
    845 	bufferevent_update_buckets(bevp);
    846 	r = bevp->rate_limiting->limit.write_limit;
    847 } else {
    848 	r = EV_SSIZE_MAX;
    849 }
    850 BEV_UNLOCK(bev);
    851 return r;
    852 }
    853 
    854 int
    855 bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
    856 {
    857 struct bufferevent_private *bevp;
    858 BEV_LOCK(bev);
    859 bevp = BEV_UPCAST(bev);
    860 if (size == 0 || size > EV_SSIZE_MAX)
    861 	bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
    862 else
    863 	bevp->max_single_read = size;
    864 BEV_UNLOCK(bev);
    865 return 0;
    866 }
    867 
    868 int
    869 bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
    870 {
    871 struct bufferevent_private *bevp;
    872 BEV_LOCK(bev);
    873 bevp = BEV_UPCAST(bev);
    874 if (size == 0 || size > EV_SSIZE_MAX)
    875 	bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
    876 else
    877 	bevp->max_single_write = size;
    878 BEV_UNLOCK(bev);
    879 return 0;
    880 }
    881 
    882 ev_ssize_t
    883 bufferevent_get_max_single_read(struct bufferevent *bev)
    884 {
    885 ev_ssize_t r;
    886 
    887 BEV_LOCK(bev);
    888 r = BEV_UPCAST(bev)->max_single_read;
    889 BEV_UNLOCK(bev);
    890 return r;
    891 }
    892 
    893 ev_ssize_t
    894 bufferevent_get_max_single_write(struct bufferevent *bev)
    895 {
    896 ev_ssize_t r;
    897 
    898 BEV_LOCK(bev);
    899 r = BEV_UPCAST(bev)->max_single_write;
    900 BEV_UNLOCK(bev);
    901 return r;
    902 }
    903 
    904 ev_ssize_t
    905 bufferevent_get_max_to_read(struct bufferevent *bev)
    906 {
    907 ev_ssize_t r;
    908 BEV_LOCK(bev);
    909 r = bufferevent_get_read_max_(BEV_UPCAST(bev));
    910 BEV_UNLOCK(bev);
    911 return r;
    912 }
    913 
    914 ev_ssize_t
    915 bufferevent_get_max_to_write(struct bufferevent *bev)
    916 {
    917 ev_ssize_t r;
    918 BEV_LOCK(bev);
    919 r = bufferevent_get_write_max_(BEV_UPCAST(bev));
    920 BEV_UNLOCK(bev);
    921 return r;
    922 }
    923 
    924 const struct ev_token_bucket_cfg *
    925 bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
    926 struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
    927 struct ev_token_bucket_cfg *cfg;
    928 
    929 BEV_LOCK(bev);
    930 
    931 if (bufev_private->rate_limiting) {
    932 	cfg = bufev_private->rate_limiting->cfg;
    933 } else {
    934 	cfg = NULL;
    935 }
    936 
    937 BEV_UNLOCK(bev);
    938 
    939 return cfg;
    940 }
    941 
    942 /* Mostly you don't want to use this function from inside libevent;
    943 * bufferevent_get_read_max_() is more likely what you want*/
    944 ev_ssize_t
    945 bufferevent_rate_limit_group_get_read_limit(
    946 struct bufferevent_rate_limit_group *grp)
    947 {
    948 ev_ssize_t r;
    949 LOCK_GROUP(grp);
    950 r = grp->rate_limit.read_limit;
    951 UNLOCK_GROUP(grp);
    952 return r;
    953 }
    954 
    955 /* Mostly you don't want to use this function from inside libevent;
    956 * bufferevent_get_write_max_() is more likely what you want. */
    957 ev_ssize_t
    958 bufferevent_rate_limit_group_get_write_limit(
    959 struct bufferevent_rate_limit_group *grp)
    960 {
    961 ev_ssize_t r;
    962 LOCK_GROUP(grp);
    963 r = grp->rate_limit.write_limit;
    964 UNLOCK_GROUP(grp);
    965 return r;
    966 }
    967 
    968 int
    969 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
    970 {
    971 int r = 0;
    972 ev_ssize_t old_limit, new_limit;
    973 struct bufferevent_private *bevp;
    974 BEV_LOCK(bev);
    975 bevp = BEV_UPCAST(bev);
    976 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
    977 old_limit = bevp->rate_limiting->limit.read_limit;
    978 
    979 new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
    980 if (old_limit > 0 && new_limit <= 0) {
    981 	bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
    982 	if (event_add(&bevp->rate_limiting->refill_bucket_event,
    983 		&bevp->rate_limiting->cfg->tick_timeout) < 0)
    984 		r = -1;
    985 } else if (old_limit <= 0 && new_limit > 0) {
    986 	if (!(bevp->write_suspended & BEV_SUSPEND_BW))
    987 		event_del(&bevp->rate_limiting->refill_bucket_event);
    988 	bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
    989 }
    990 
    991 BEV_UNLOCK(bev);
    992 return r;
    993 }
    994 
    995 int
    996 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
    997 {
    998 /* XXXX this is mostly copy-and-paste from
    999  * bufferevent_decrement_read_limit */
   1000 int r = 0;
   1001 ev_ssize_t old_limit, new_limit;
   1002 struct bufferevent_private *bevp;
   1003 BEV_LOCK(bev);
   1004 bevp = BEV_UPCAST(bev);
   1005 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
   1006 old_limit = bevp->rate_limiting->limit.write_limit;
   1007 
   1008 new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
   1009 if (old_limit > 0 && new_limit <= 0) {
   1010 	bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
   1011 	if (event_add(&bevp->rate_limiting->refill_bucket_event,
   1012 		&bevp->rate_limiting->cfg->tick_timeout) < 0)
   1013 		r = -1;
   1014 } else if (old_limit <= 0 && new_limit > 0) {
   1015 	if (!(bevp->read_suspended & BEV_SUSPEND_BW))
   1016 		event_del(&bevp->rate_limiting->refill_bucket_event);
   1017 	bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
   1018 }
   1019 
   1020 BEV_UNLOCK(bev);
   1021 return r;
   1022 }
   1023 
   1024 int
   1025 bufferevent_rate_limit_group_decrement_read(
   1026 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
   1027 {
   1028 int r = 0;
   1029 ev_ssize_t old_limit, new_limit;
   1030 LOCK_GROUP(grp);
   1031 old_limit = grp->rate_limit.read_limit;
   1032 new_limit = (grp->rate_limit.read_limit -= decr);
   1033 
   1034 if (old_limit > 0 && new_limit <= 0) {
   1035 	bev_group_suspend_reading_(grp);
   1036 } else if (old_limit <= 0 && new_limit > 0) {
   1037 	bev_group_unsuspend_reading_(grp);
   1038 }
   1039 
   1040 UNLOCK_GROUP(grp);
   1041 return r;
   1042 }
   1043 
   1044 int
   1045 bufferevent_rate_limit_group_decrement_write(
   1046 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
   1047 {
   1048 int r = 0;
   1049 ev_ssize_t old_limit, new_limit;
   1050 LOCK_GROUP(grp);
   1051 old_limit = grp->rate_limit.write_limit;
   1052 new_limit = (grp->rate_limit.write_limit -= decr);
   1053 
   1054 if (old_limit > 0 && new_limit <= 0) {
   1055 	bev_group_suspend_writing_(grp);
   1056 } else if (old_limit <= 0 && new_limit > 0) {
   1057 	bev_group_unsuspend_writing_(grp);
   1058 }
   1059 
   1060 UNLOCK_GROUP(grp);
   1061 return r;
   1062 }
   1063 
   1064 void
   1065 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
   1066    ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
   1067 {
   1068 EVUTIL_ASSERT(grp != NULL);
   1069 if (total_read_out)
   1070 	*total_read_out = grp->total_read;
   1071 if (total_written_out)
   1072 	*total_written_out = grp->total_written;
   1073 }
   1074 
   1075 void
   1076 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
   1077 {
   1078 grp->total_read = grp->total_written = 0;
   1079 }
   1080 
   1081 int
   1082 bufferevent_ratelim_init_(struct bufferevent_private *bev)
   1083 {
   1084 bev->rate_limiting = NULL;
   1085 bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
   1086 bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
   1087 
   1088 return 0;
   1089 }