1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/ipc/chromium/src/third_party/libevent/bufferevent_ratelim.c Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,1011 @@ 1.4 +/* 1.5 + * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 1.6 + * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu> 1.7 + * All rights reserved. 1.8 + * 1.9 + * Redistribution and use in source and binary forms, with or without 1.10 + * modification, are permitted provided that the following conditions 1.11 + * are met: 1.12 + * 1. Redistributions of source code must retain the above copyright 1.13 + * notice, this list of conditions and the following disclaimer. 1.14 + * 2. Redistributions in binary form must reproduce the above copyright 1.15 + * notice, this list of conditions and the following disclaimer in the 1.16 + * documentation and/or other materials provided with the distribution. 1.17 + * 3. The name of the author may not be used to endorse or promote products 1.18 + * derived from this software without specific prior written permission. 1.19 + * 1.20 + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 1.21 + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 1.22 + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 1.23 + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 1.24 + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 1.25 + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 1.26 + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 1.27 + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 1.28 + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 1.29 + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 1.30 + */ 1.31 + 1.32 +#include <sys/types.h> 1.33 +#include <limits.h> 1.34 +#include <string.h> 1.35 +#include <stdlib.h> 1.36 + 1.37 +#include "event2/event.h" 1.38 +#include "event2/event_struct.h" 1.39 +#include "event2/util.h" 1.40 +#include "event2/bufferevent.h" 1.41 +#include "event2/bufferevent_struct.h" 1.42 +#include "event2/buffer.h" 1.43 + 1.44 +#include "ratelim-internal.h" 1.45 + 1.46 +#include "bufferevent-internal.h" 1.47 +#include "mm-internal.h" 1.48 +#include "util-internal.h" 1.49 +#include "event-internal.h" 1.50 + 1.51 +int 1.52 +ev_token_bucket_init(struct ev_token_bucket *bucket, 1.53 + const struct ev_token_bucket_cfg *cfg, 1.54 + ev_uint32_t current_tick, 1.55 + int reinitialize) 1.56 +{ 1.57 + if (reinitialize) { 1.58 + /* on reinitialization, we only clip downwards, since we've 1.59 + already used who-knows-how-much bandwidth this tick. We 1.60 + leave "last_updated" as it is; the next update will add the 1.61 + appropriate amount of bandwidth to the bucket. 1.62 + */ 1.63 + if (bucket->read_limit > (ev_int64_t) cfg->read_maximum) 1.64 + bucket->read_limit = cfg->read_maximum; 1.65 + if (bucket->write_limit > (ev_int64_t) cfg->write_maximum) 1.66 + bucket->write_limit = cfg->write_maximum; 1.67 + } else { 1.68 + bucket->read_limit = cfg->read_rate; 1.69 + bucket->write_limit = cfg->write_rate; 1.70 + bucket->last_updated = current_tick; 1.71 + } 1.72 + return 0; 1.73 +} 1.74 + 1.75 +int 1.76 +ev_token_bucket_update(struct ev_token_bucket *bucket, 1.77 + const struct ev_token_bucket_cfg *cfg, 1.78 + ev_uint32_t current_tick) 1.79 +{ 1.80 + /* It's okay if the tick number overflows, since we'll just 1.81 + * wrap around when we do the unsigned substraction. */ 1.82 + unsigned n_ticks = current_tick - bucket->last_updated; 1.83 + 1.84 + /* Make sure some ticks actually happened, and that time didn't 1.85 + * roll back. */ 1.86 + if (n_ticks == 0 || n_ticks > INT_MAX) 1.87 + return 0; 1.88 + 1.89 + /* Naively, we would say 1.90 + bucket->limit += n_ticks * cfg->rate; 1.91 + 1.92 + if (bucket->limit > cfg->maximum) 1.93 + bucket->limit = cfg->maximum; 1.94 + 1.95 + But we're worried about overflow, so we do it like this: 1.96 + */ 1.97 + 1.98 + if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate) 1.99 + bucket->read_limit = cfg->read_maximum; 1.100 + else 1.101 + bucket->read_limit += n_ticks * cfg->read_rate; 1.102 + 1.103 + 1.104 + if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate) 1.105 + bucket->write_limit = cfg->write_maximum; 1.106 + else 1.107 + bucket->write_limit += n_ticks * cfg->write_rate; 1.108 + 1.109 + 1.110 + bucket->last_updated = current_tick; 1.111 + 1.112 + return 1; 1.113 +} 1.114 + 1.115 +static inline void 1.116 +bufferevent_update_buckets(struct bufferevent_private *bev) 1.117 +{ 1.118 + /* Must hold lock on bev. */ 1.119 + struct timeval now; 1.120 + unsigned tick; 1.121 + event_base_gettimeofday_cached(bev->bev.ev_base, &now); 1.122 + tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg); 1.123 + if (tick != bev->rate_limiting->limit.last_updated) 1.124 + ev_token_bucket_update(&bev->rate_limiting->limit, 1.125 + bev->rate_limiting->cfg, tick); 1.126 +} 1.127 + 1.128 +ev_uint32_t 1.129 +ev_token_bucket_get_tick(const struct timeval *tv, 1.130 + const struct ev_token_bucket_cfg *cfg) 1.131 +{ 1.132 + /* This computation uses two multiplies and a divide. We could do 1.133 + * fewer if we knew that the tick length was an integer number of 1.134 + * seconds, or if we knew it divided evenly into a second. We should 1.135 + * investigate that more. 1.136 + */ 1.137 + 1.138 + /* We cast to an ev_uint64_t first, since we don't want to overflow 1.139 + * before we do the final divide. */ 1.140 + ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000; 1.141 + return (unsigned)(msec / cfg->msec_per_tick); 1.142 +} 1.143 + 1.144 +struct ev_token_bucket_cfg * 1.145 +ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst, 1.146 + size_t write_rate, size_t write_burst, 1.147 + const struct timeval *tick_len) 1.148 +{ 1.149 + struct ev_token_bucket_cfg *r; 1.150 + struct timeval g; 1.151 + if (! tick_len) { 1.152 + g.tv_sec = 1; 1.153 + g.tv_usec = 0; 1.154 + tick_len = &g; 1.155 + } 1.156 + if (read_rate > read_burst || write_rate > write_burst || 1.157 + read_rate < 1 || write_rate < 1) 1.158 + return NULL; 1.159 + if (read_rate > EV_RATE_LIMIT_MAX || 1.160 + write_rate > EV_RATE_LIMIT_MAX || 1.161 + read_burst > EV_RATE_LIMIT_MAX || 1.162 + write_burst > EV_RATE_LIMIT_MAX) 1.163 + return NULL; 1.164 + r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg)); 1.165 + if (!r) 1.166 + return NULL; 1.167 + r->read_rate = read_rate; 1.168 + r->write_rate = write_rate; 1.169 + r->read_maximum = read_burst; 1.170 + r->write_maximum = write_burst; 1.171 + memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval)); 1.172 + r->msec_per_tick = (tick_len->tv_sec * 1000) + 1.173 + (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000; 1.174 + return r; 1.175 +} 1.176 + 1.177 +void 1.178 +ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg) 1.179 +{ 1.180 + mm_free(cfg); 1.181 +} 1.182 + 1.183 +/* No matter how big our bucket gets, don't try to read more than this 1.184 + * much in a single read operation. */ 1.185 +#define MAX_TO_READ_EVER 16384 1.186 +/* No matter how big our bucket gets, don't try to write more than this 1.187 + * much in a single write operation. */ 1.188 +#define MAX_TO_WRITE_EVER 16384 1.189 + 1.190 +#define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0) 1.191 +#define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0) 1.192 + 1.193 +static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g); 1.194 +static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g); 1.195 +static void _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g); 1.196 +static void _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g); 1.197 + 1.198 +/** Helper: figure out the maximum amount we should write if is_write, or 1.199 + the maximum amount we should read if is_read. Return that maximum, or 1.200 + 0 if our bucket is wholly exhausted. 1.201 + */ 1.202 +static inline ev_ssize_t 1.203 +_bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write) 1.204 +{ 1.205 + /* needs lock on bev. */ 1.206 + ev_ssize_t max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER; 1.207 + 1.208 +#define LIM(x) \ 1.209 + (is_write ? (x).write_limit : (x).read_limit) 1.210 + 1.211 +#define GROUP_SUSPENDED(g) \ 1.212 + (is_write ? (g)->write_suspended : (g)->read_suspended) 1.213 + 1.214 + /* Sets max_so_far to MIN(x, max_so_far) */ 1.215 +#define CLAMPTO(x) \ 1.216 + do { \ 1.217 + if (max_so_far > (x)) \ 1.218 + max_so_far = (x); \ 1.219 + } while (0); 1.220 + 1.221 + if (!bev->rate_limiting) 1.222 + return max_so_far; 1.223 + 1.224 + /* If rate-limiting is enabled at all, update the appropriate 1.225 + bucket, and take the smaller of our rate limit and the group 1.226 + rate limit. 1.227 + */ 1.228 + 1.229 + if (bev->rate_limiting->cfg) { 1.230 + bufferevent_update_buckets(bev); 1.231 + max_so_far = LIM(bev->rate_limiting->limit); 1.232 + } 1.233 + if (bev->rate_limiting->group) { 1.234 + struct bufferevent_rate_limit_group *g = 1.235 + bev->rate_limiting->group; 1.236 + ev_ssize_t share; 1.237 + LOCK_GROUP(g); 1.238 + if (GROUP_SUSPENDED(g)) { 1.239 + /* We can get here if we failed to lock this 1.240 + * particular bufferevent while suspending the whole 1.241 + * group. */ 1.242 + if (is_write) 1.243 + bufferevent_suspend_write(&bev->bev, 1.244 + BEV_SUSPEND_BW_GROUP); 1.245 + else 1.246 + bufferevent_suspend_read(&bev->bev, 1.247 + BEV_SUSPEND_BW_GROUP); 1.248 + share = 0; 1.249 + } else { 1.250 + /* XXXX probably we should divide among the active 1.251 + * members, not the total members. */ 1.252 + share = LIM(g->rate_limit) / g->n_members; 1.253 + if (share < g->min_share) 1.254 + share = g->min_share; 1.255 + } 1.256 + UNLOCK_GROUP(g); 1.257 + CLAMPTO(share); 1.258 + } 1.259 + 1.260 + if (max_so_far < 0) 1.261 + max_so_far = 0; 1.262 + return max_so_far; 1.263 +} 1.264 + 1.265 +ev_ssize_t 1.266 +_bufferevent_get_read_max(struct bufferevent_private *bev) 1.267 +{ 1.268 + return _bufferevent_get_rlim_max(bev, 0); 1.269 +} 1.270 + 1.271 +ev_ssize_t 1.272 +_bufferevent_get_write_max(struct bufferevent_private *bev) 1.273 +{ 1.274 + return _bufferevent_get_rlim_max(bev, 1); 1.275 +} 1.276 + 1.277 +int 1.278 +_bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t bytes) 1.279 +{ 1.280 + /* XXXXX Make sure all users of this function check its return value */ 1.281 + int r = 0; 1.282 + /* need to hold lock on bev */ 1.283 + if (!bev->rate_limiting) 1.284 + return 0; 1.285 + 1.286 + if (bev->rate_limiting->cfg) { 1.287 + bev->rate_limiting->limit.read_limit -= bytes; 1.288 + if (bev->rate_limiting->limit.read_limit <= 0) { 1.289 + bufferevent_suspend_read(&bev->bev, BEV_SUSPEND_BW); 1.290 + if (event_add(&bev->rate_limiting->refill_bucket_event, 1.291 + &bev->rate_limiting->cfg->tick_timeout) < 0) 1.292 + r = -1; 1.293 + } else if (bev->read_suspended & BEV_SUSPEND_BW) { 1.294 + if (!(bev->write_suspended & BEV_SUSPEND_BW)) 1.295 + event_del(&bev->rate_limiting->refill_bucket_event); 1.296 + bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW); 1.297 + } 1.298 + } 1.299 + 1.300 + if (bev->rate_limiting->group) { 1.301 + LOCK_GROUP(bev->rate_limiting->group); 1.302 + bev->rate_limiting->group->rate_limit.read_limit -= bytes; 1.303 + bev->rate_limiting->group->total_read += bytes; 1.304 + if (bev->rate_limiting->group->rate_limit.read_limit <= 0) { 1.305 + _bev_group_suspend_reading(bev->rate_limiting->group); 1.306 + } else if (bev->rate_limiting->group->read_suspended) { 1.307 + _bev_group_unsuspend_reading(bev->rate_limiting->group); 1.308 + } 1.309 + UNLOCK_GROUP(bev->rate_limiting->group); 1.310 + } 1.311 + 1.312 + return r; 1.313 +} 1.314 + 1.315 +int 1.316 +_bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t bytes) 1.317 +{ 1.318 + /* XXXXX Make sure all users of this function check its return value */ 1.319 + int r = 0; 1.320 + /* need to hold lock */ 1.321 + if (!bev->rate_limiting) 1.322 + return 0; 1.323 + 1.324 + if (bev->rate_limiting->cfg) { 1.325 + bev->rate_limiting->limit.write_limit -= bytes; 1.326 + if (bev->rate_limiting->limit.write_limit <= 0) { 1.327 + bufferevent_suspend_write(&bev->bev, BEV_SUSPEND_BW); 1.328 + if (event_add(&bev->rate_limiting->refill_bucket_event, 1.329 + &bev->rate_limiting->cfg->tick_timeout) < 0) 1.330 + r = -1; 1.331 + } else if (bev->write_suspended & BEV_SUSPEND_BW) { 1.332 + if (!(bev->read_suspended & BEV_SUSPEND_BW)) 1.333 + event_del(&bev->rate_limiting->refill_bucket_event); 1.334 + bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW); 1.335 + } 1.336 + } 1.337 + 1.338 + if (bev->rate_limiting->group) { 1.339 + LOCK_GROUP(bev->rate_limiting->group); 1.340 + bev->rate_limiting->group->rate_limit.write_limit -= bytes; 1.341 + bev->rate_limiting->group->total_written += bytes; 1.342 + if (bev->rate_limiting->group->rate_limit.write_limit <= 0) { 1.343 + _bev_group_suspend_writing(bev->rate_limiting->group); 1.344 + } else if (bev->rate_limiting->group->write_suspended) { 1.345 + _bev_group_unsuspend_writing(bev->rate_limiting->group); 1.346 + } 1.347 + UNLOCK_GROUP(bev->rate_limiting->group); 1.348 + } 1.349 + 1.350 + return r; 1.351 +} 1.352 + 1.353 +/** Stop reading on every bufferevent in <b>g</b> */ 1.354 +static int 1.355 +_bev_group_suspend_reading(struct bufferevent_rate_limit_group *g) 1.356 +{ 1.357 + /* Needs group lock */ 1.358 + struct bufferevent_private *bev; 1.359 + g->read_suspended = 1; 1.360 + g->pending_unsuspend_read = 0; 1.361 + 1.362 + /* Note that in this loop we call EVLOCK_TRY_LOCK instead of BEV_LOCK, 1.363 + to prevent a deadlock. (Ordinarily, the group lock nests inside 1.364 + the bufferevent locks. If we are unable to lock any individual 1.365 + bufferevent, it will find out later when it looks at its limit 1.366 + and sees that its group is suspended. 1.367 + */ 1.368 + TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) { 1.369 + if (EVLOCK_TRY_LOCK(bev->lock)) { 1.370 + bufferevent_suspend_read(&bev->bev, 1.371 + BEV_SUSPEND_BW_GROUP); 1.372 + EVLOCK_UNLOCK(bev->lock, 0); 1.373 + } 1.374 + } 1.375 + return 0; 1.376 +} 1.377 + 1.378 +/** Stop writing on every bufferevent in <b>g</b> */ 1.379 +static int 1.380 +_bev_group_suspend_writing(struct bufferevent_rate_limit_group *g) 1.381 +{ 1.382 + /* Needs group lock */ 1.383 + struct bufferevent_private *bev; 1.384 + g->write_suspended = 1; 1.385 + g->pending_unsuspend_write = 0; 1.386 + TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) { 1.387 + if (EVLOCK_TRY_LOCK(bev->lock)) { 1.388 + bufferevent_suspend_write(&bev->bev, 1.389 + BEV_SUSPEND_BW_GROUP); 1.390 + EVLOCK_UNLOCK(bev->lock, 0); 1.391 + } 1.392 + } 1.393 + return 0; 1.394 +} 1.395 + 1.396 +/** Timer callback invoked on a single bufferevent with one or more exhausted 1.397 + buckets when they are ready to refill. */ 1.398 +static void 1.399 +_bev_refill_callback(evutil_socket_t fd, short what, void *arg) 1.400 +{ 1.401 + unsigned tick; 1.402 + struct timeval now; 1.403 + struct bufferevent_private *bev = arg; 1.404 + int again = 0; 1.405 + BEV_LOCK(&bev->bev); 1.406 + if (!bev->rate_limiting || !bev->rate_limiting->cfg) { 1.407 + BEV_UNLOCK(&bev->bev); 1.408 + return; 1.409 + } 1.410 + 1.411 + /* First, update the bucket */ 1.412 + event_base_gettimeofday_cached(bev->bev.ev_base, &now); 1.413 + tick = ev_token_bucket_get_tick(&now, 1.414 + bev->rate_limiting->cfg); 1.415 + ev_token_bucket_update(&bev->rate_limiting->limit, 1.416 + bev->rate_limiting->cfg, 1.417 + tick); 1.418 + 1.419 + /* Now unsuspend any read/write operations as appropriate. */ 1.420 + if ((bev->read_suspended & BEV_SUSPEND_BW)) { 1.421 + if (bev->rate_limiting->limit.read_limit > 0) 1.422 + bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW); 1.423 + else 1.424 + again = 1; 1.425 + } 1.426 + if ((bev->write_suspended & BEV_SUSPEND_BW)) { 1.427 + if (bev->rate_limiting->limit.write_limit > 0) 1.428 + bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW); 1.429 + else 1.430 + again = 1; 1.431 + } 1.432 + if (again) { 1.433 + /* One or more of the buckets may need another refill if they 1.434 + started negative. 1.435 + 1.436 + XXXX if we need to be quiet for more ticks, we should 1.437 + maybe figure out what timeout we really want. 1.438 + */ 1.439 + /* XXXX Handle event_add failure somehow */ 1.440 + event_add(&bev->rate_limiting->refill_bucket_event, 1.441 + &bev->rate_limiting->cfg->tick_timeout); 1.442 + } 1.443 + BEV_UNLOCK(&bev->bev); 1.444 +} 1.445 + 1.446 +/** Helper: grab a random element from a bufferevent group. */ 1.447 +static struct bufferevent_private * 1.448 +_bev_group_random_element(struct bufferevent_rate_limit_group *group) 1.449 +{ 1.450 + int which; 1.451 + struct bufferevent_private *bev; 1.452 + 1.453 + /* requires group lock */ 1.454 + 1.455 + if (!group->n_members) 1.456 + return NULL; 1.457 + 1.458 + EVUTIL_ASSERT(! TAILQ_EMPTY(&group->members)); 1.459 + 1.460 + which = _evutil_weakrand() % group->n_members; 1.461 + 1.462 + bev = TAILQ_FIRST(&group->members); 1.463 + while (which--) 1.464 + bev = TAILQ_NEXT(bev, rate_limiting->next_in_group); 1.465 + 1.466 + return bev; 1.467 +} 1.468 + 1.469 +/** Iterate over the elements of a rate-limiting group 'g' with a random 1.470 + starting point, assigning each to the variable 'bev', and executing the 1.471 + block 'block'. 1.472 + 1.473 + We do this in a half-baked effort to get fairness among group members. 1.474 + XXX Round-robin or some kind of priority queue would be even more fair. 1.475 + */ 1.476 +#define FOREACH_RANDOM_ORDER(block) \ 1.477 + do { \ 1.478 + first = _bev_group_random_element(g); \ 1.479 + for (bev = first; bev != NULL; \ 1.480 + bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \ 1.481 + block ; \ 1.482 + } \ 1.483 + for (bev = TAILQ_FIRST(&g->members); bev && bev != first; \ 1.484 + bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \ 1.485 + block ; \ 1.486 + } \ 1.487 + } while (0) 1.488 + 1.489 +static void 1.490 +_bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g) 1.491 +{ 1.492 + int again = 0; 1.493 + struct bufferevent_private *bev, *first; 1.494 + 1.495 + g->read_suspended = 0; 1.496 + FOREACH_RANDOM_ORDER({ 1.497 + if (EVLOCK_TRY_LOCK(bev->lock)) { 1.498 + bufferevent_unsuspend_read(&bev->bev, 1.499 + BEV_SUSPEND_BW_GROUP); 1.500 + EVLOCK_UNLOCK(bev->lock, 0); 1.501 + } else { 1.502 + again = 1; 1.503 + } 1.504 + }); 1.505 + g->pending_unsuspend_read = again; 1.506 +} 1.507 + 1.508 +static void 1.509 +_bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g) 1.510 +{ 1.511 + int again = 0; 1.512 + struct bufferevent_private *bev, *first; 1.513 + g->write_suspended = 0; 1.514 + 1.515 + FOREACH_RANDOM_ORDER({ 1.516 + if (EVLOCK_TRY_LOCK(bev->lock)) { 1.517 + bufferevent_unsuspend_write(&bev->bev, 1.518 + BEV_SUSPEND_BW_GROUP); 1.519 + EVLOCK_UNLOCK(bev->lock, 0); 1.520 + } else { 1.521 + again = 1; 1.522 + } 1.523 + }); 1.524 + g->pending_unsuspend_write = again; 1.525 +} 1.526 + 1.527 +/** Callback invoked every tick to add more elements to the group bucket 1.528 + and unsuspend group members as needed. 1.529 + */ 1.530 +static void 1.531 +_bev_group_refill_callback(evutil_socket_t fd, short what, void *arg) 1.532 +{ 1.533 + struct bufferevent_rate_limit_group *g = arg; 1.534 + unsigned tick; 1.535 + struct timeval now; 1.536 + 1.537 + event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now); 1.538 + 1.539 + LOCK_GROUP(g); 1.540 + 1.541 + tick = ev_token_bucket_get_tick(&now, &g->rate_limit_cfg); 1.542 + ev_token_bucket_update(&g->rate_limit, &g->rate_limit_cfg, tick); 1.543 + 1.544 + if (g->pending_unsuspend_read || 1.545 + (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) { 1.546 + _bev_group_unsuspend_reading(g); 1.547 + } 1.548 + if (g->pending_unsuspend_write || 1.549 + (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){ 1.550 + _bev_group_unsuspend_writing(g); 1.551 + } 1.552 + 1.553 + /* XXXX Rather than waiting to the next tick to unsuspend stuff 1.554 + * with pending_unsuspend_write/read, we should do it on the 1.555 + * next iteration of the mainloop. 1.556 + */ 1.557 + 1.558 + UNLOCK_GROUP(g); 1.559 +} 1.560 + 1.561 +int 1.562 +bufferevent_set_rate_limit(struct bufferevent *bev, 1.563 + struct ev_token_bucket_cfg *cfg) 1.564 +{ 1.565 + struct bufferevent_private *bevp = 1.566 + EVUTIL_UPCAST(bev, struct bufferevent_private, bev); 1.567 + int r = -1; 1.568 + struct bufferevent_rate_limit *rlim; 1.569 + struct timeval now; 1.570 + ev_uint32_t tick; 1.571 + int reinit = 0, suspended = 0; 1.572 + /* XXX reference-count cfg */ 1.573 + 1.574 + BEV_LOCK(bev); 1.575 + 1.576 + if (cfg == NULL) { 1.577 + if (bevp->rate_limiting) { 1.578 + rlim = bevp->rate_limiting; 1.579 + rlim->cfg = NULL; 1.580 + bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW); 1.581 + bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW); 1.582 + if (event_initialized(&rlim->refill_bucket_event)) 1.583 + event_del(&rlim->refill_bucket_event); 1.584 + } 1.585 + r = 0; 1.586 + goto done; 1.587 + } 1.588 + 1.589 + event_base_gettimeofday_cached(bev->ev_base, &now); 1.590 + tick = ev_token_bucket_get_tick(&now, cfg); 1.591 + 1.592 + if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) { 1.593 + /* no-op */ 1.594 + r = 0; 1.595 + goto done; 1.596 + } 1.597 + if (bevp->rate_limiting == NULL) { 1.598 + rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit)); 1.599 + if (!rlim) 1.600 + goto done; 1.601 + bevp->rate_limiting = rlim; 1.602 + } else { 1.603 + rlim = bevp->rate_limiting; 1.604 + } 1.605 + reinit = rlim->cfg != NULL; 1.606 + 1.607 + rlim->cfg = cfg; 1.608 + ev_token_bucket_init(&rlim->limit, cfg, tick, reinit); 1.609 + 1.610 + if (reinit) { 1.611 + EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event)); 1.612 + event_del(&rlim->refill_bucket_event); 1.613 + } 1.614 + evtimer_assign(&rlim->refill_bucket_event, bev->ev_base, 1.615 + _bev_refill_callback, bevp); 1.616 + 1.617 + if (rlim->limit.read_limit > 0) { 1.618 + bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW); 1.619 + } else { 1.620 + bufferevent_suspend_read(bev, BEV_SUSPEND_BW); 1.621 + suspended=1; 1.622 + } 1.623 + if (rlim->limit.write_limit > 0) { 1.624 + bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW); 1.625 + } else { 1.626 + bufferevent_suspend_write(bev, BEV_SUSPEND_BW); 1.627 + suspended = 1; 1.628 + } 1.629 + 1.630 + if (suspended) 1.631 + event_add(&rlim->refill_bucket_event, &cfg->tick_timeout); 1.632 + 1.633 + r = 0; 1.634 + 1.635 +done: 1.636 + BEV_UNLOCK(bev); 1.637 + return r; 1.638 +} 1.639 + 1.640 +struct bufferevent_rate_limit_group * 1.641 +bufferevent_rate_limit_group_new(struct event_base *base, 1.642 + const struct ev_token_bucket_cfg *cfg) 1.643 +{ 1.644 + struct bufferevent_rate_limit_group *g; 1.645 + struct timeval now; 1.646 + ev_uint32_t tick; 1.647 + 1.648 + event_base_gettimeofday_cached(base, &now); 1.649 + tick = ev_token_bucket_get_tick(&now, cfg); 1.650 + 1.651 + g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group)); 1.652 + if (!g) 1.653 + return NULL; 1.654 + memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg)); 1.655 + TAILQ_INIT(&g->members); 1.656 + 1.657 + ev_token_bucket_init(&g->rate_limit, cfg, tick, 0); 1.658 + 1.659 + event_assign(&g->master_refill_event, base, -1, EV_PERSIST, 1.660 + _bev_group_refill_callback, g); 1.661 + /*XXXX handle event_add failure */ 1.662 + event_add(&g->master_refill_event, &cfg->tick_timeout); 1.663 + 1.664 + EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE); 1.665 + 1.666 + bufferevent_rate_limit_group_set_min_share(g, 64); 1.667 + 1.668 + return g; 1.669 +} 1.670 + 1.671 +int 1.672 +bufferevent_rate_limit_group_set_cfg( 1.673 + struct bufferevent_rate_limit_group *g, 1.674 + const struct ev_token_bucket_cfg *cfg) 1.675 +{ 1.676 + int same_tick; 1.677 + if (!g || !cfg) 1.678 + return -1; 1.679 + 1.680 + LOCK_GROUP(g); 1.681 + same_tick = evutil_timercmp( 1.682 + &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==); 1.683 + memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg)); 1.684 + 1.685 + if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum) 1.686 + g->rate_limit.read_limit = cfg->read_maximum; 1.687 + if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum) 1.688 + g->rate_limit.write_limit = cfg->write_maximum; 1.689 + 1.690 + if (!same_tick) { 1.691 + /* This can cause a hiccup in the schedule */ 1.692 + event_add(&g->master_refill_event, &cfg->tick_timeout); 1.693 + } 1.694 + 1.695 + /* The new limits might force us to adjust min_share differently. */ 1.696 + bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share); 1.697 + 1.698 + UNLOCK_GROUP(g); 1.699 + return 0; 1.700 +} 1.701 + 1.702 +int 1.703 +bufferevent_rate_limit_group_set_min_share( 1.704 + struct bufferevent_rate_limit_group *g, 1.705 + size_t share) 1.706 +{ 1.707 + if (share > EV_SSIZE_MAX) 1.708 + return -1; 1.709 + 1.710 + g->configured_min_share = share; 1.711 + 1.712 + /* Can't set share to less than the one-tick maximum. IOW, at steady 1.713 + * state, at least one connection can go per tick. */ 1.714 + if (share > g->rate_limit_cfg.read_rate) 1.715 + share = g->rate_limit_cfg.read_rate; 1.716 + if (share > g->rate_limit_cfg.write_rate) 1.717 + share = g->rate_limit_cfg.write_rate; 1.718 + 1.719 + g->min_share = share; 1.720 + return 0; 1.721 +} 1.722 + 1.723 +void 1.724 +bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g) 1.725 +{ 1.726 + LOCK_GROUP(g); 1.727 + EVUTIL_ASSERT(0 == g->n_members); 1.728 + event_del(&g->master_refill_event); 1.729 + UNLOCK_GROUP(g); 1.730 + EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE); 1.731 + mm_free(g); 1.732 +} 1.733 + 1.734 +int 1.735 +bufferevent_add_to_rate_limit_group(struct bufferevent *bev, 1.736 + struct bufferevent_rate_limit_group *g) 1.737 +{ 1.738 + int wsuspend, rsuspend; 1.739 + struct bufferevent_private *bevp = 1.740 + EVUTIL_UPCAST(bev, struct bufferevent_private, bev); 1.741 + BEV_LOCK(bev); 1.742 + 1.743 + if (!bevp->rate_limiting) { 1.744 + struct bufferevent_rate_limit *rlim; 1.745 + rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit)); 1.746 + if (!rlim) { 1.747 + BEV_UNLOCK(bev); 1.748 + return -1; 1.749 + } 1.750 + evtimer_assign(&rlim->refill_bucket_event, bev->ev_base, 1.751 + _bev_refill_callback, bevp); 1.752 + bevp->rate_limiting = rlim; 1.753 + } 1.754 + 1.755 + if (bevp->rate_limiting->group == g) { 1.756 + BEV_UNLOCK(bev); 1.757 + return 0; 1.758 + } 1.759 + if (bevp->rate_limiting->group) 1.760 + bufferevent_remove_from_rate_limit_group(bev); 1.761 + 1.762 + LOCK_GROUP(g); 1.763 + bevp->rate_limiting->group = g; 1.764 + ++g->n_members; 1.765 + TAILQ_INSERT_TAIL(&g->members, bevp, rate_limiting->next_in_group); 1.766 + 1.767 + rsuspend = g->read_suspended; 1.768 + wsuspend = g->write_suspended; 1.769 + 1.770 + UNLOCK_GROUP(g); 1.771 + 1.772 + if (rsuspend) 1.773 + bufferevent_suspend_read(bev, BEV_SUSPEND_BW_GROUP); 1.774 + if (wsuspend) 1.775 + bufferevent_suspend_write(bev, BEV_SUSPEND_BW_GROUP); 1.776 + 1.777 + BEV_UNLOCK(bev); 1.778 + return 0; 1.779 +} 1.780 + 1.781 +int 1.782 +bufferevent_remove_from_rate_limit_group(struct bufferevent *bev) 1.783 +{ 1.784 + return bufferevent_remove_from_rate_limit_group_internal(bev, 1); 1.785 +} 1.786 + 1.787 +int 1.788 +bufferevent_remove_from_rate_limit_group_internal(struct bufferevent *bev, 1.789 + int unsuspend) 1.790 +{ 1.791 + struct bufferevent_private *bevp = 1.792 + EVUTIL_UPCAST(bev, struct bufferevent_private, bev); 1.793 + BEV_LOCK(bev); 1.794 + if (bevp->rate_limiting && bevp->rate_limiting->group) { 1.795 + struct bufferevent_rate_limit_group *g = 1.796 + bevp->rate_limiting->group; 1.797 + LOCK_GROUP(g); 1.798 + bevp->rate_limiting->group = NULL; 1.799 + --g->n_members; 1.800 + TAILQ_REMOVE(&g->members, bevp, rate_limiting->next_in_group); 1.801 + UNLOCK_GROUP(g); 1.802 + } 1.803 + if (unsuspend) { 1.804 + bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW_GROUP); 1.805 + bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW_GROUP); 1.806 + } 1.807 + BEV_UNLOCK(bev); 1.808 + return 0; 1.809 +} 1.810 + 1.811 +/* === 1.812 + * API functions to expose rate limits. 1.813 + * 1.814 + * Don't use these from inside Libevent; they're meant to be for use by 1.815 + * the program. 1.816 + * === */ 1.817 + 1.818 +/* Mostly you don't want to use this function from inside libevent; 1.819 + * _bufferevent_get_read_max() is more likely what you want*/ 1.820 +ev_ssize_t 1.821 +bufferevent_get_read_limit(struct bufferevent *bev) 1.822 +{ 1.823 + ev_ssize_t r; 1.824 + struct bufferevent_private *bevp; 1.825 + BEV_LOCK(bev); 1.826 + bevp = BEV_UPCAST(bev); 1.827 + if (bevp->rate_limiting && bevp->rate_limiting->cfg) { 1.828 + bufferevent_update_buckets(bevp); 1.829 + r = bevp->rate_limiting->limit.read_limit; 1.830 + } else { 1.831 + r = EV_SSIZE_MAX; 1.832 + } 1.833 + BEV_UNLOCK(bev); 1.834 + return r; 1.835 +} 1.836 + 1.837 +/* Mostly you don't want to use this function from inside libevent; 1.838 + * _bufferevent_get_write_max() is more likely what you want*/ 1.839 +ev_ssize_t 1.840 +bufferevent_get_write_limit(struct bufferevent *bev) 1.841 +{ 1.842 + ev_ssize_t r; 1.843 + struct bufferevent_private *bevp; 1.844 + BEV_LOCK(bev); 1.845 + bevp = BEV_UPCAST(bev); 1.846 + if (bevp->rate_limiting && bevp->rate_limiting->cfg) { 1.847 + bufferevent_update_buckets(bevp); 1.848 + r = bevp->rate_limiting->limit.write_limit; 1.849 + } else { 1.850 + r = EV_SSIZE_MAX; 1.851 + } 1.852 + BEV_UNLOCK(bev); 1.853 + return r; 1.854 +} 1.855 + 1.856 +ev_ssize_t 1.857 +bufferevent_get_max_to_read(struct bufferevent *bev) 1.858 +{ 1.859 + ev_ssize_t r; 1.860 + BEV_LOCK(bev); 1.861 + r = _bufferevent_get_read_max(BEV_UPCAST(bev)); 1.862 + BEV_UNLOCK(bev); 1.863 + return r; 1.864 +} 1.865 + 1.866 +ev_ssize_t 1.867 +bufferevent_get_max_to_write(struct bufferevent *bev) 1.868 +{ 1.869 + ev_ssize_t r; 1.870 + BEV_LOCK(bev); 1.871 + r = _bufferevent_get_write_max(BEV_UPCAST(bev)); 1.872 + BEV_UNLOCK(bev); 1.873 + return r; 1.874 +} 1.875 + 1.876 + 1.877 +/* Mostly you don't want to use this function from inside libevent; 1.878 + * _bufferevent_get_read_max() is more likely what you want*/ 1.879 +ev_ssize_t 1.880 +bufferevent_rate_limit_group_get_read_limit( 1.881 + struct bufferevent_rate_limit_group *grp) 1.882 +{ 1.883 + ev_ssize_t r; 1.884 + LOCK_GROUP(grp); 1.885 + r = grp->rate_limit.read_limit; 1.886 + UNLOCK_GROUP(grp); 1.887 + return r; 1.888 +} 1.889 + 1.890 +/* Mostly you don't want to use this function from inside libevent; 1.891 + * _bufferevent_get_write_max() is more likely what you want. */ 1.892 +ev_ssize_t 1.893 +bufferevent_rate_limit_group_get_write_limit( 1.894 + struct bufferevent_rate_limit_group *grp) 1.895 +{ 1.896 + ev_ssize_t r; 1.897 + LOCK_GROUP(grp); 1.898 + r = grp->rate_limit.write_limit; 1.899 + UNLOCK_GROUP(grp); 1.900 + return r; 1.901 +} 1.902 + 1.903 +int 1.904 +bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr) 1.905 +{ 1.906 + int r = 0; 1.907 + ev_ssize_t old_limit, new_limit; 1.908 + struct bufferevent_private *bevp; 1.909 + BEV_LOCK(bev); 1.910 + bevp = BEV_UPCAST(bev); 1.911 + EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg); 1.912 + old_limit = bevp->rate_limiting->limit.read_limit; 1.913 + 1.914 + new_limit = (bevp->rate_limiting->limit.read_limit -= decr); 1.915 + if (old_limit > 0 && new_limit <= 0) { 1.916 + bufferevent_suspend_read(bev, BEV_SUSPEND_BW); 1.917 + if (event_add(&bevp->rate_limiting->refill_bucket_event, 1.918 + &bevp->rate_limiting->cfg->tick_timeout) < 0) 1.919 + r = -1; 1.920 + } else if (old_limit <= 0 && new_limit > 0) { 1.921 + if (!(bevp->write_suspended & BEV_SUSPEND_BW)) 1.922 + event_del(&bevp->rate_limiting->refill_bucket_event); 1.923 + bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW); 1.924 + } 1.925 + 1.926 + BEV_UNLOCK(bev); 1.927 + return r; 1.928 +} 1.929 + 1.930 +int 1.931 +bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr) 1.932 +{ 1.933 + /* XXXX this is mostly copy-and-paste from 1.934 + * bufferevent_decrement_read_limit */ 1.935 + int r = 0; 1.936 + ev_ssize_t old_limit, new_limit; 1.937 + struct bufferevent_private *bevp; 1.938 + BEV_LOCK(bev); 1.939 + bevp = BEV_UPCAST(bev); 1.940 + EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg); 1.941 + old_limit = bevp->rate_limiting->limit.write_limit; 1.942 + 1.943 + new_limit = (bevp->rate_limiting->limit.write_limit -= decr); 1.944 + if (old_limit > 0 && new_limit <= 0) { 1.945 + bufferevent_suspend_write(bev, BEV_SUSPEND_BW); 1.946 + if (event_add(&bevp->rate_limiting->refill_bucket_event, 1.947 + &bevp->rate_limiting->cfg->tick_timeout) < 0) 1.948 + r = -1; 1.949 + } else if (old_limit <= 0 && new_limit > 0) { 1.950 + if (!(bevp->read_suspended & BEV_SUSPEND_BW)) 1.951 + event_del(&bevp->rate_limiting->refill_bucket_event); 1.952 + bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW); 1.953 + } 1.954 + 1.955 + BEV_UNLOCK(bev); 1.956 + return r; 1.957 +} 1.958 + 1.959 +int 1.960 +bufferevent_rate_limit_group_decrement_read( 1.961 + struct bufferevent_rate_limit_group *grp, ev_ssize_t decr) 1.962 +{ 1.963 + int r = 0; 1.964 + ev_ssize_t old_limit, new_limit; 1.965 + LOCK_GROUP(grp); 1.966 + old_limit = grp->rate_limit.read_limit; 1.967 + new_limit = (grp->rate_limit.read_limit -= decr); 1.968 + 1.969 + if (old_limit > 0 && new_limit <= 0) { 1.970 + _bev_group_suspend_reading(grp); 1.971 + } else if (old_limit <= 0 && new_limit > 0) { 1.972 + _bev_group_unsuspend_reading(grp); 1.973 + } 1.974 + 1.975 + UNLOCK_GROUP(grp); 1.976 + return r; 1.977 +} 1.978 + 1.979 +int 1.980 +bufferevent_rate_limit_group_decrement_write( 1.981 + struct bufferevent_rate_limit_group *grp, ev_ssize_t decr) 1.982 +{ 1.983 + int r = 0; 1.984 + ev_ssize_t old_limit, new_limit; 1.985 + LOCK_GROUP(grp); 1.986 + old_limit = grp->rate_limit.write_limit; 1.987 + new_limit = (grp->rate_limit.write_limit -= decr); 1.988 + 1.989 + if (old_limit > 0 && new_limit <= 0) { 1.990 + _bev_group_suspend_writing(grp); 1.991 + } else if (old_limit <= 0 && new_limit > 0) { 1.992 + _bev_group_unsuspend_writing(grp); 1.993 + } 1.994 + 1.995 + UNLOCK_GROUP(grp); 1.996 + return r; 1.997 +} 1.998 + 1.999 +void 1.1000 +bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp, 1.1001 + ev_uint64_t *total_read_out, ev_uint64_t *total_written_out) 1.1002 +{ 1.1003 + EVUTIL_ASSERT(grp != NULL); 1.1004 + if (total_read_out) 1.1005 + *total_read_out = grp->total_read; 1.1006 + if (total_written_out) 1.1007 + *total_written_out = grp->total_written; 1.1008 +} 1.1009 + 1.1010 +void 1.1011 +bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp) 1.1012 +{ 1.1013 + grp->total_read = grp->total_written = 0; 1.1014 +}