ipc/chromium/src/third_party/libevent/bufferevent_ratelim.c

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

michael@0 1 /*
michael@0 2 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
michael@0 3 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
michael@0 4 * All rights reserved.
michael@0 5 *
michael@0 6 * Redistribution and use in source and binary forms, with or without
michael@0 7 * modification, are permitted provided that the following conditions
michael@0 8 * are met:
michael@0 9 * 1. Redistributions of source code must retain the above copyright
michael@0 10 * notice, this list of conditions and the following disclaimer.
michael@0 11 * 2. Redistributions in binary form must reproduce the above copyright
michael@0 12 * notice, this list of conditions and the following disclaimer in the
michael@0 13 * documentation and/or other materials provided with the distribution.
michael@0 14 * 3. The name of the author may not be used to endorse or promote products
michael@0 15 * derived from this software without specific prior written permission.
michael@0 16 *
michael@0 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
michael@0 18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
michael@0 19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
michael@0 20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
michael@0 21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
michael@0 22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
michael@0 23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
michael@0 24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
michael@0 25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
michael@0 26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
michael@0 27 */
michael@0 28
michael@0 29 #include <sys/types.h>
michael@0 30 #include <limits.h>
michael@0 31 #include <string.h>
michael@0 32 #include <stdlib.h>
michael@0 33
michael@0 34 #include "event2/event.h"
michael@0 35 #include "event2/event_struct.h"
michael@0 36 #include "event2/util.h"
michael@0 37 #include "event2/bufferevent.h"
michael@0 38 #include "event2/bufferevent_struct.h"
michael@0 39 #include "event2/buffer.h"
michael@0 40
michael@0 41 #include "ratelim-internal.h"
michael@0 42
michael@0 43 #include "bufferevent-internal.h"
michael@0 44 #include "mm-internal.h"
michael@0 45 #include "util-internal.h"
michael@0 46 #include "event-internal.h"
michael@0 47
michael@0 48 int
michael@0 49 ev_token_bucket_init(struct ev_token_bucket *bucket,
michael@0 50 const struct ev_token_bucket_cfg *cfg,
michael@0 51 ev_uint32_t current_tick,
michael@0 52 int reinitialize)
michael@0 53 {
michael@0 54 if (reinitialize) {
michael@0 55 /* on reinitialization, we only clip downwards, since we've
michael@0 56 already used who-knows-how-much bandwidth this tick. We
michael@0 57 leave "last_updated" as it is; the next update will add the
michael@0 58 appropriate amount of bandwidth to the bucket.
michael@0 59 */
michael@0 60 if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
michael@0 61 bucket->read_limit = cfg->read_maximum;
michael@0 62 if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
michael@0 63 bucket->write_limit = cfg->write_maximum;
michael@0 64 } else {
michael@0 65 bucket->read_limit = cfg->read_rate;
michael@0 66 bucket->write_limit = cfg->write_rate;
michael@0 67 bucket->last_updated = current_tick;
michael@0 68 }
michael@0 69 return 0;
michael@0 70 }
michael@0 71
michael@0 72 int
michael@0 73 ev_token_bucket_update(struct ev_token_bucket *bucket,
michael@0 74 const struct ev_token_bucket_cfg *cfg,
michael@0 75 ev_uint32_t current_tick)
michael@0 76 {
michael@0 77 /* It's okay if the tick number overflows, since we'll just
michael@0 78 * wrap around when we do the unsigned substraction. */
michael@0 79 unsigned n_ticks = current_tick - bucket->last_updated;
michael@0 80
michael@0 81 /* Make sure some ticks actually happened, and that time didn't
michael@0 82 * roll back. */
michael@0 83 if (n_ticks == 0 || n_ticks > INT_MAX)
michael@0 84 return 0;
michael@0 85
michael@0 86 /* Naively, we would say
michael@0 87 bucket->limit += n_ticks * cfg->rate;
michael@0 88
michael@0 89 if (bucket->limit > cfg->maximum)
michael@0 90 bucket->limit = cfg->maximum;
michael@0 91
michael@0 92 But we're worried about overflow, so we do it like this:
michael@0 93 */
michael@0 94
michael@0 95 if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
michael@0 96 bucket->read_limit = cfg->read_maximum;
michael@0 97 else
michael@0 98 bucket->read_limit += n_ticks * cfg->read_rate;
michael@0 99
michael@0 100
michael@0 101 if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
michael@0 102 bucket->write_limit = cfg->write_maximum;
michael@0 103 else
michael@0 104 bucket->write_limit += n_ticks * cfg->write_rate;
michael@0 105
michael@0 106
michael@0 107 bucket->last_updated = current_tick;
michael@0 108
michael@0 109 return 1;
michael@0 110 }
michael@0 111
michael@0 112 static inline void
michael@0 113 bufferevent_update_buckets(struct bufferevent_private *bev)
michael@0 114 {
michael@0 115 /* Must hold lock on bev. */
michael@0 116 struct timeval now;
michael@0 117 unsigned tick;
michael@0 118 event_base_gettimeofday_cached(bev->bev.ev_base, &now);
michael@0 119 tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg);
michael@0 120 if (tick != bev->rate_limiting->limit.last_updated)
michael@0 121 ev_token_bucket_update(&bev->rate_limiting->limit,
michael@0 122 bev->rate_limiting->cfg, tick);
michael@0 123 }
michael@0 124
michael@0 125 ev_uint32_t
michael@0 126 ev_token_bucket_get_tick(const struct timeval *tv,
michael@0 127 const struct ev_token_bucket_cfg *cfg)
michael@0 128 {
michael@0 129 /* This computation uses two multiplies and a divide. We could do
michael@0 130 * fewer if we knew that the tick length was an integer number of
michael@0 131 * seconds, or if we knew it divided evenly into a second. We should
michael@0 132 * investigate that more.
michael@0 133 */
michael@0 134
michael@0 135 /* We cast to an ev_uint64_t first, since we don't want to overflow
michael@0 136 * before we do the final divide. */
michael@0 137 ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
michael@0 138 return (unsigned)(msec / cfg->msec_per_tick);
michael@0 139 }
michael@0 140
michael@0 141 struct ev_token_bucket_cfg *
michael@0 142 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
michael@0 143 size_t write_rate, size_t write_burst,
michael@0 144 const struct timeval *tick_len)
michael@0 145 {
michael@0 146 struct ev_token_bucket_cfg *r;
michael@0 147 struct timeval g;
michael@0 148 if (! tick_len) {
michael@0 149 g.tv_sec = 1;
michael@0 150 g.tv_usec = 0;
michael@0 151 tick_len = &g;
michael@0 152 }
michael@0 153 if (read_rate > read_burst || write_rate > write_burst ||
michael@0 154 read_rate < 1 || write_rate < 1)
michael@0 155 return NULL;
michael@0 156 if (read_rate > EV_RATE_LIMIT_MAX ||
michael@0 157 write_rate > EV_RATE_LIMIT_MAX ||
michael@0 158 read_burst > EV_RATE_LIMIT_MAX ||
michael@0 159 write_burst > EV_RATE_LIMIT_MAX)
michael@0 160 return NULL;
michael@0 161 r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
michael@0 162 if (!r)
michael@0 163 return NULL;
michael@0 164 r->read_rate = read_rate;
michael@0 165 r->write_rate = write_rate;
michael@0 166 r->read_maximum = read_burst;
michael@0 167 r->write_maximum = write_burst;
michael@0 168 memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
michael@0 169 r->msec_per_tick = (tick_len->tv_sec * 1000) +
michael@0 170 (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
michael@0 171 return r;
michael@0 172 }
michael@0 173
michael@0 174 void
michael@0 175 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
michael@0 176 {
michael@0 177 mm_free(cfg);
michael@0 178 }
michael@0 179
michael@0 180 /* No matter how big our bucket gets, don't try to read more than this
michael@0 181 * much in a single read operation. */
michael@0 182 #define MAX_TO_READ_EVER 16384
michael@0 183 /* No matter how big our bucket gets, don't try to write more than this
michael@0 184 * much in a single write operation. */
michael@0 185 #define MAX_TO_WRITE_EVER 16384
michael@0 186
michael@0 187 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
michael@0 188 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
michael@0 189
michael@0 190 static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g);
michael@0 191 static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g);
michael@0 192 static void _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g);
michael@0 193 static void _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g);
michael@0 194
michael@0 195 /** Helper: figure out the maximum amount we should write if is_write, or
michael@0 196 the maximum amount we should read if is_read. Return that maximum, or
michael@0 197 0 if our bucket is wholly exhausted.
michael@0 198 */
michael@0 199 static inline ev_ssize_t
michael@0 200 _bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write)
michael@0 201 {
michael@0 202 /* needs lock on bev. */
michael@0 203 ev_ssize_t max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER;
michael@0 204
michael@0 205 #define LIM(x) \
michael@0 206 (is_write ? (x).write_limit : (x).read_limit)
michael@0 207
michael@0 208 #define GROUP_SUSPENDED(g) \
michael@0 209 (is_write ? (g)->write_suspended : (g)->read_suspended)
michael@0 210
michael@0 211 /* Sets max_so_far to MIN(x, max_so_far) */
michael@0 212 #define CLAMPTO(x) \
michael@0 213 do { \
michael@0 214 if (max_so_far > (x)) \
michael@0 215 max_so_far = (x); \
michael@0 216 } while (0);
michael@0 217
michael@0 218 if (!bev->rate_limiting)
michael@0 219 return max_so_far;
michael@0 220
michael@0 221 /* If rate-limiting is enabled at all, update the appropriate
michael@0 222 bucket, and take the smaller of our rate limit and the group
michael@0 223 rate limit.
michael@0 224 */
michael@0 225
michael@0 226 if (bev->rate_limiting->cfg) {
michael@0 227 bufferevent_update_buckets(bev);
michael@0 228 max_so_far = LIM(bev->rate_limiting->limit);
michael@0 229 }
michael@0 230 if (bev->rate_limiting->group) {
michael@0 231 struct bufferevent_rate_limit_group *g =
michael@0 232 bev->rate_limiting->group;
michael@0 233 ev_ssize_t share;
michael@0 234 LOCK_GROUP(g);
michael@0 235 if (GROUP_SUSPENDED(g)) {
michael@0 236 /* We can get here if we failed to lock this
michael@0 237 * particular bufferevent while suspending the whole
michael@0 238 * group. */
michael@0 239 if (is_write)
michael@0 240 bufferevent_suspend_write(&bev->bev,
michael@0 241 BEV_SUSPEND_BW_GROUP);
michael@0 242 else
michael@0 243 bufferevent_suspend_read(&bev->bev,
michael@0 244 BEV_SUSPEND_BW_GROUP);
michael@0 245 share = 0;
michael@0 246 } else {
michael@0 247 /* XXXX probably we should divide among the active
michael@0 248 * members, not the total members. */
michael@0 249 share = LIM(g->rate_limit) / g->n_members;
michael@0 250 if (share < g->min_share)
michael@0 251 share = g->min_share;
michael@0 252 }
michael@0 253 UNLOCK_GROUP(g);
michael@0 254 CLAMPTO(share);
michael@0 255 }
michael@0 256
michael@0 257 if (max_so_far < 0)
michael@0 258 max_so_far = 0;
michael@0 259 return max_so_far;
michael@0 260 }
michael@0 261
michael@0 262 ev_ssize_t
michael@0 263 _bufferevent_get_read_max(struct bufferevent_private *bev)
michael@0 264 {
michael@0 265 return _bufferevent_get_rlim_max(bev, 0);
michael@0 266 }
michael@0 267
michael@0 268 ev_ssize_t
michael@0 269 _bufferevent_get_write_max(struct bufferevent_private *bev)
michael@0 270 {
michael@0 271 return _bufferevent_get_rlim_max(bev, 1);
michael@0 272 }
michael@0 273
michael@0 274 int
michael@0 275 _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
michael@0 276 {
michael@0 277 /* XXXXX Make sure all users of this function check its return value */
michael@0 278 int r = 0;
michael@0 279 /* need to hold lock on bev */
michael@0 280 if (!bev->rate_limiting)
michael@0 281 return 0;
michael@0 282
michael@0 283 if (bev->rate_limiting->cfg) {
michael@0 284 bev->rate_limiting->limit.read_limit -= bytes;
michael@0 285 if (bev->rate_limiting->limit.read_limit <= 0) {
michael@0 286 bufferevent_suspend_read(&bev->bev, BEV_SUSPEND_BW);
michael@0 287 if (event_add(&bev->rate_limiting->refill_bucket_event,
michael@0 288 &bev->rate_limiting->cfg->tick_timeout) < 0)
michael@0 289 r = -1;
michael@0 290 } else if (bev->read_suspended & BEV_SUSPEND_BW) {
michael@0 291 if (!(bev->write_suspended & BEV_SUSPEND_BW))
michael@0 292 event_del(&bev->rate_limiting->refill_bucket_event);
michael@0 293 bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
michael@0 294 }
michael@0 295 }
michael@0 296
michael@0 297 if (bev->rate_limiting->group) {
michael@0 298 LOCK_GROUP(bev->rate_limiting->group);
michael@0 299 bev->rate_limiting->group->rate_limit.read_limit -= bytes;
michael@0 300 bev->rate_limiting->group->total_read += bytes;
michael@0 301 if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
michael@0 302 _bev_group_suspend_reading(bev->rate_limiting->group);
michael@0 303 } else if (bev->rate_limiting->group->read_suspended) {
michael@0 304 _bev_group_unsuspend_reading(bev->rate_limiting->group);
michael@0 305 }
michael@0 306 UNLOCK_GROUP(bev->rate_limiting->group);
michael@0 307 }
michael@0 308
michael@0 309 return r;
michael@0 310 }
michael@0 311
michael@0 312 int
michael@0 313 _bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
michael@0 314 {
michael@0 315 /* XXXXX Make sure all users of this function check its return value */
michael@0 316 int r = 0;
michael@0 317 /* need to hold lock */
michael@0 318 if (!bev->rate_limiting)
michael@0 319 return 0;
michael@0 320
michael@0 321 if (bev->rate_limiting->cfg) {
michael@0 322 bev->rate_limiting->limit.write_limit -= bytes;
michael@0 323 if (bev->rate_limiting->limit.write_limit <= 0) {
michael@0 324 bufferevent_suspend_write(&bev->bev, BEV_SUSPEND_BW);
michael@0 325 if (event_add(&bev->rate_limiting->refill_bucket_event,
michael@0 326 &bev->rate_limiting->cfg->tick_timeout) < 0)
michael@0 327 r = -1;
michael@0 328 } else if (bev->write_suspended & BEV_SUSPEND_BW) {
michael@0 329 if (!(bev->read_suspended & BEV_SUSPEND_BW))
michael@0 330 event_del(&bev->rate_limiting->refill_bucket_event);
michael@0 331 bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
michael@0 332 }
michael@0 333 }
michael@0 334
michael@0 335 if (bev->rate_limiting->group) {
michael@0 336 LOCK_GROUP(bev->rate_limiting->group);
michael@0 337 bev->rate_limiting->group->rate_limit.write_limit -= bytes;
michael@0 338 bev->rate_limiting->group->total_written += bytes;
michael@0 339 if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
michael@0 340 _bev_group_suspend_writing(bev->rate_limiting->group);
michael@0 341 } else if (bev->rate_limiting->group->write_suspended) {
michael@0 342 _bev_group_unsuspend_writing(bev->rate_limiting->group);
michael@0 343 }
michael@0 344 UNLOCK_GROUP(bev->rate_limiting->group);
michael@0 345 }
michael@0 346
michael@0 347 return r;
michael@0 348 }
michael@0 349
michael@0 350 /** Stop reading on every bufferevent in <b>g</b> */
michael@0 351 static int
michael@0 352 _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g)
michael@0 353 {
michael@0 354 /* Needs group lock */
michael@0 355 struct bufferevent_private *bev;
michael@0 356 g->read_suspended = 1;
michael@0 357 g->pending_unsuspend_read = 0;
michael@0 358
michael@0 359 /* Note that in this loop we call EVLOCK_TRY_LOCK instead of BEV_LOCK,
michael@0 360 to prevent a deadlock. (Ordinarily, the group lock nests inside
michael@0 361 the bufferevent locks. If we are unable to lock any individual
michael@0 362 bufferevent, it will find out later when it looks at its limit
michael@0 363 and sees that its group is suspended.
michael@0 364 */
michael@0 365 TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
michael@0 366 if (EVLOCK_TRY_LOCK(bev->lock)) {
michael@0 367 bufferevent_suspend_read(&bev->bev,
michael@0 368 BEV_SUSPEND_BW_GROUP);
michael@0 369 EVLOCK_UNLOCK(bev->lock, 0);
michael@0 370 }
michael@0 371 }
michael@0 372 return 0;
michael@0 373 }
michael@0 374
michael@0 375 /** Stop writing on every bufferevent in <b>g</b> */
michael@0 376 static int
michael@0 377 _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g)
michael@0 378 {
michael@0 379 /* Needs group lock */
michael@0 380 struct bufferevent_private *bev;
michael@0 381 g->write_suspended = 1;
michael@0 382 g->pending_unsuspend_write = 0;
michael@0 383 TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
michael@0 384 if (EVLOCK_TRY_LOCK(bev->lock)) {
michael@0 385 bufferevent_suspend_write(&bev->bev,
michael@0 386 BEV_SUSPEND_BW_GROUP);
michael@0 387 EVLOCK_UNLOCK(bev->lock, 0);
michael@0 388 }
michael@0 389 }
michael@0 390 return 0;
michael@0 391 }
michael@0 392
michael@0 393 /** Timer callback invoked on a single bufferevent with one or more exhausted
michael@0 394 buckets when they are ready to refill. */
michael@0 395 static void
michael@0 396 _bev_refill_callback(evutil_socket_t fd, short what, void *arg)
michael@0 397 {
michael@0 398 unsigned tick;
michael@0 399 struct timeval now;
michael@0 400 struct bufferevent_private *bev = arg;
michael@0 401 int again = 0;
michael@0 402 BEV_LOCK(&bev->bev);
michael@0 403 if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
michael@0 404 BEV_UNLOCK(&bev->bev);
michael@0 405 return;
michael@0 406 }
michael@0 407
michael@0 408 /* First, update the bucket */
michael@0 409 event_base_gettimeofday_cached(bev->bev.ev_base, &now);
michael@0 410 tick = ev_token_bucket_get_tick(&now,
michael@0 411 bev->rate_limiting->cfg);
michael@0 412 ev_token_bucket_update(&bev->rate_limiting->limit,
michael@0 413 bev->rate_limiting->cfg,
michael@0 414 tick);
michael@0 415
michael@0 416 /* Now unsuspend any read/write operations as appropriate. */
michael@0 417 if ((bev->read_suspended & BEV_SUSPEND_BW)) {
michael@0 418 if (bev->rate_limiting->limit.read_limit > 0)
michael@0 419 bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
michael@0 420 else
michael@0 421 again = 1;
michael@0 422 }
michael@0 423 if ((bev->write_suspended & BEV_SUSPEND_BW)) {
michael@0 424 if (bev->rate_limiting->limit.write_limit > 0)
michael@0 425 bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
michael@0 426 else
michael@0 427 again = 1;
michael@0 428 }
michael@0 429 if (again) {
michael@0 430 /* One or more of the buckets may need another refill if they
michael@0 431 started negative.
michael@0 432
michael@0 433 XXXX if we need to be quiet for more ticks, we should
michael@0 434 maybe figure out what timeout we really want.
michael@0 435 */
michael@0 436 /* XXXX Handle event_add failure somehow */
michael@0 437 event_add(&bev->rate_limiting->refill_bucket_event,
michael@0 438 &bev->rate_limiting->cfg->tick_timeout);
michael@0 439 }
michael@0 440 BEV_UNLOCK(&bev->bev);
michael@0 441 }
michael@0 442
michael@0 443 /** Helper: grab a random element from a bufferevent group. */
michael@0 444 static struct bufferevent_private *
michael@0 445 _bev_group_random_element(struct bufferevent_rate_limit_group *group)
michael@0 446 {
michael@0 447 int which;
michael@0 448 struct bufferevent_private *bev;
michael@0 449
michael@0 450 /* requires group lock */
michael@0 451
michael@0 452 if (!group->n_members)
michael@0 453 return NULL;
michael@0 454
michael@0 455 EVUTIL_ASSERT(! TAILQ_EMPTY(&group->members));
michael@0 456
michael@0 457 which = _evutil_weakrand() % group->n_members;
michael@0 458
michael@0 459 bev = TAILQ_FIRST(&group->members);
michael@0 460 while (which--)
michael@0 461 bev = TAILQ_NEXT(bev, rate_limiting->next_in_group);
michael@0 462
michael@0 463 return bev;
michael@0 464 }
michael@0 465
michael@0 466 /** Iterate over the elements of a rate-limiting group 'g' with a random
michael@0 467 starting point, assigning each to the variable 'bev', and executing the
michael@0 468 block 'block'.
michael@0 469
michael@0 470 We do this in a half-baked effort to get fairness among group members.
michael@0 471 XXX Round-robin or some kind of priority queue would be even more fair.
michael@0 472 */
michael@0 473 #define FOREACH_RANDOM_ORDER(block) \
michael@0 474 do { \
michael@0 475 first = _bev_group_random_element(g); \
michael@0 476 for (bev = first; bev != NULL; \
michael@0 477 bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
michael@0 478 block ; \
michael@0 479 } \
michael@0 480 for (bev = TAILQ_FIRST(&g->members); bev && bev != first; \
michael@0 481 bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
michael@0 482 block ; \
michael@0 483 } \
michael@0 484 } while (0)
michael@0 485
michael@0 486 static void
michael@0 487 _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g)
michael@0 488 {
michael@0 489 int again = 0;
michael@0 490 struct bufferevent_private *bev, *first;
michael@0 491
michael@0 492 g->read_suspended = 0;
michael@0 493 FOREACH_RANDOM_ORDER({
michael@0 494 if (EVLOCK_TRY_LOCK(bev->lock)) {
michael@0 495 bufferevent_unsuspend_read(&bev->bev,
michael@0 496 BEV_SUSPEND_BW_GROUP);
michael@0 497 EVLOCK_UNLOCK(bev->lock, 0);
michael@0 498 } else {
michael@0 499 again = 1;
michael@0 500 }
michael@0 501 });
michael@0 502 g->pending_unsuspend_read = again;
michael@0 503 }
michael@0 504
michael@0 505 static void
michael@0 506 _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g)
michael@0 507 {
michael@0 508 int again = 0;
michael@0 509 struct bufferevent_private *bev, *first;
michael@0 510 g->write_suspended = 0;
michael@0 511
michael@0 512 FOREACH_RANDOM_ORDER({
michael@0 513 if (EVLOCK_TRY_LOCK(bev->lock)) {
michael@0 514 bufferevent_unsuspend_write(&bev->bev,
michael@0 515 BEV_SUSPEND_BW_GROUP);
michael@0 516 EVLOCK_UNLOCK(bev->lock, 0);
michael@0 517 } else {
michael@0 518 again = 1;
michael@0 519 }
michael@0 520 });
michael@0 521 g->pending_unsuspend_write = again;
michael@0 522 }
michael@0 523
michael@0 524 /** Callback invoked every tick to add more elements to the group bucket
michael@0 525 and unsuspend group members as needed.
michael@0 526 */
michael@0 527 static void
michael@0 528 _bev_group_refill_callback(evutil_socket_t fd, short what, void *arg)
michael@0 529 {
michael@0 530 struct bufferevent_rate_limit_group *g = arg;
michael@0 531 unsigned tick;
michael@0 532 struct timeval now;
michael@0 533
michael@0 534 event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
michael@0 535
michael@0 536 LOCK_GROUP(g);
michael@0 537
michael@0 538 tick = ev_token_bucket_get_tick(&now, &g->rate_limit_cfg);
michael@0 539 ev_token_bucket_update(&g->rate_limit, &g->rate_limit_cfg, tick);
michael@0 540
michael@0 541 if (g->pending_unsuspend_read ||
michael@0 542 (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
michael@0 543 _bev_group_unsuspend_reading(g);
michael@0 544 }
michael@0 545 if (g->pending_unsuspend_write ||
michael@0 546 (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
michael@0 547 _bev_group_unsuspend_writing(g);
michael@0 548 }
michael@0 549
michael@0 550 /* XXXX Rather than waiting to the next tick to unsuspend stuff
michael@0 551 * with pending_unsuspend_write/read, we should do it on the
michael@0 552 * next iteration of the mainloop.
michael@0 553 */
michael@0 554
michael@0 555 UNLOCK_GROUP(g);
michael@0 556 }
michael@0 557
michael@0 558 int
michael@0 559 bufferevent_set_rate_limit(struct bufferevent *bev,
michael@0 560 struct ev_token_bucket_cfg *cfg)
michael@0 561 {
michael@0 562 struct bufferevent_private *bevp =
michael@0 563 EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
michael@0 564 int r = -1;
michael@0 565 struct bufferevent_rate_limit *rlim;
michael@0 566 struct timeval now;
michael@0 567 ev_uint32_t tick;
michael@0 568 int reinit = 0, suspended = 0;
michael@0 569 /* XXX reference-count cfg */
michael@0 570
michael@0 571 BEV_LOCK(bev);
michael@0 572
michael@0 573 if (cfg == NULL) {
michael@0 574 if (bevp->rate_limiting) {
michael@0 575 rlim = bevp->rate_limiting;
michael@0 576 rlim->cfg = NULL;
michael@0 577 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
michael@0 578 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
michael@0 579 if (event_initialized(&rlim->refill_bucket_event))
michael@0 580 event_del(&rlim->refill_bucket_event);
michael@0 581 }
michael@0 582 r = 0;
michael@0 583 goto done;
michael@0 584 }
michael@0 585
michael@0 586 event_base_gettimeofday_cached(bev->ev_base, &now);
michael@0 587 tick = ev_token_bucket_get_tick(&now, cfg);
michael@0 588
michael@0 589 if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
michael@0 590 /* no-op */
michael@0 591 r = 0;
michael@0 592 goto done;
michael@0 593 }
michael@0 594 if (bevp->rate_limiting == NULL) {
michael@0 595 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
michael@0 596 if (!rlim)
michael@0 597 goto done;
michael@0 598 bevp->rate_limiting = rlim;
michael@0 599 } else {
michael@0 600 rlim = bevp->rate_limiting;
michael@0 601 }
michael@0 602 reinit = rlim->cfg != NULL;
michael@0 603
michael@0 604 rlim->cfg = cfg;
michael@0 605 ev_token_bucket_init(&rlim->limit, cfg, tick, reinit);
michael@0 606
michael@0 607 if (reinit) {
michael@0 608 EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
michael@0 609 event_del(&rlim->refill_bucket_event);
michael@0 610 }
michael@0 611 evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
michael@0 612 _bev_refill_callback, bevp);
michael@0 613
michael@0 614 if (rlim->limit.read_limit > 0) {
michael@0 615 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
michael@0 616 } else {
michael@0 617 bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
michael@0 618 suspended=1;
michael@0 619 }
michael@0 620 if (rlim->limit.write_limit > 0) {
michael@0 621 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
michael@0 622 } else {
michael@0 623 bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
michael@0 624 suspended = 1;
michael@0 625 }
michael@0 626
michael@0 627 if (suspended)
michael@0 628 event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
michael@0 629
michael@0 630 r = 0;
michael@0 631
michael@0 632 done:
michael@0 633 BEV_UNLOCK(bev);
michael@0 634 return r;
michael@0 635 }
michael@0 636
michael@0 637 struct bufferevent_rate_limit_group *
michael@0 638 bufferevent_rate_limit_group_new(struct event_base *base,
michael@0 639 const struct ev_token_bucket_cfg *cfg)
michael@0 640 {
michael@0 641 struct bufferevent_rate_limit_group *g;
michael@0 642 struct timeval now;
michael@0 643 ev_uint32_t tick;
michael@0 644
michael@0 645 event_base_gettimeofday_cached(base, &now);
michael@0 646 tick = ev_token_bucket_get_tick(&now, cfg);
michael@0 647
michael@0 648 g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
michael@0 649 if (!g)
michael@0 650 return NULL;
michael@0 651 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
michael@0 652 TAILQ_INIT(&g->members);
michael@0 653
michael@0 654 ev_token_bucket_init(&g->rate_limit, cfg, tick, 0);
michael@0 655
michael@0 656 event_assign(&g->master_refill_event, base, -1, EV_PERSIST,
michael@0 657 _bev_group_refill_callback, g);
michael@0 658 /*XXXX handle event_add failure */
michael@0 659 event_add(&g->master_refill_event, &cfg->tick_timeout);
michael@0 660
michael@0 661 EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
michael@0 662
michael@0 663 bufferevent_rate_limit_group_set_min_share(g, 64);
michael@0 664
michael@0 665 return g;
michael@0 666 }
michael@0 667
michael@0 668 int
michael@0 669 bufferevent_rate_limit_group_set_cfg(
michael@0 670 struct bufferevent_rate_limit_group *g,
michael@0 671 const struct ev_token_bucket_cfg *cfg)
michael@0 672 {
michael@0 673 int same_tick;
michael@0 674 if (!g || !cfg)
michael@0 675 return -1;
michael@0 676
michael@0 677 LOCK_GROUP(g);
michael@0 678 same_tick = evutil_timercmp(
michael@0 679 &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
michael@0 680 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
michael@0 681
michael@0 682 if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
michael@0 683 g->rate_limit.read_limit = cfg->read_maximum;
michael@0 684 if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
michael@0 685 g->rate_limit.write_limit = cfg->write_maximum;
michael@0 686
michael@0 687 if (!same_tick) {
michael@0 688 /* This can cause a hiccup in the schedule */
michael@0 689 event_add(&g->master_refill_event, &cfg->tick_timeout);
michael@0 690 }
michael@0 691
michael@0 692 /* The new limits might force us to adjust min_share differently. */
michael@0 693 bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
michael@0 694
michael@0 695 UNLOCK_GROUP(g);
michael@0 696 return 0;
michael@0 697 }
michael@0 698
michael@0 699 int
michael@0 700 bufferevent_rate_limit_group_set_min_share(
michael@0 701 struct bufferevent_rate_limit_group *g,
michael@0 702 size_t share)
michael@0 703 {
michael@0 704 if (share > EV_SSIZE_MAX)
michael@0 705 return -1;
michael@0 706
michael@0 707 g->configured_min_share = share;
michael@0 708
michael@0 709 /* Can't set share to less than the one-tick maximum. IOW, at steady
michael@0 710 * state, at least one connection can go per tick. */
michael@0 711 if (share > g->rate_limit_cfg.read_rate)
michael@0 712 share = g->rate_limit_cfg.read_rate;
michael@0 713 if (share > g->rate_limit_cfg.write_rate)
michael@0 714 share = g->rate_limit_cfg.write_rate;
michael@0 715
michael@0 716 g->min_share = share;
michael@0 717 return 0;
michael@0 718 }
michael@0 719
michael@0 720 void
michael@0 721 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
michael@0 722 {
michael@0 723 LOCK_GROUP(g);
michael@0 724 EVUTIL_ASSERT(0 == g->n_members);
michael@0 725 event_del(&g->master_refill_event);
michael@0 726 UNLOCK_GROUP(g);
michael@0 727 EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
michael@0 728 mm_free(g);
michael@0 729 }
michael@0 730
michael@0 731 int
michael@0 732 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
michael@0 733 struct bufferevent_rate_limit_group *g)
michael@0 734 {
michael@0 735 int wsuspend, rsuspend;
michael@0 736 struct bufferevent_private *bevp =
michael@0 737 EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
michael@0 738 BEV_LOCK(bev);
michael@0 739
michael@0 740 if (!bevp->rate_limiting) {
michael@0 741 struct bufferevent_rate_limit *rlim;
michael@0 742 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
michael@0 743 if (!rlim) {
michael@0 744 BEV_UNLOCK(bev);
michael@0 745 return -1;
michael@0 746 }
michael@0 747 evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
michael@0 748 _bev_refill_callback, bevp);
michael@0 749 bevp->rate_limiting = rlim;
michael@0 750 }
michael@0 751
michael@0 752 if (bevp->rate_limiting->group == g) {
michael@0 753 BEV_UNLOCK(bev);
michael@0 754 return 0;
michael@0 755 }
michael@0 756 if (bevp->rate_limiting->group)
michael@0 757 bufferevent_remove_from_rate_limit_group(bev);
michael@0 758
michael@0 759 LOCK_GROUP(g);
michael@0 760 bevp->rate_limiting->group = g;
michael@0 761 ++g->n_members;
michael@0 762 TAILQ_INSERT_TAIL(&g->members, bevp, rate_limiting->next_in_group);
michael@0 763
michael@0 764 rsuspend = g->read_suspended;
michael@0 765 wsuspend = g->write_suspended;
michael@0 766
michael@0 767 UNLOCK_GROUP(g);
michael@0 768
michael@0 769 if (rsuspend)
michael@0 770 bufferevent_suspend_read(bev, BEV_SUSPEND_BW_GROUP);
michael@0 771 if (wsuspend)
michael@0 772 bufferevent_suspend_write(bev, BEV_SUSPEND_BW_GROUP);
michael@0 773
michael@0 774 BEV_UNLOCK(bev);
michael@0 775 return 0;
michael@0 776 }
michael@0 777
michael@0 778 int
michael@0 779 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
michael@0 780 {
michael@0 781 return bufferevent_remove_from_rate_limit_group_internal(bev, 1);
michael@0 782 }
michael@0 783
michael@0 784 int
michael@0 785 bufferevent_remove_from_rate_limit_group_internal(struct bufferevent *bev,
michael@0 786 int unsuspend)
michael@0 787 {
michael@0 788 struct bufferevent_private *bevp =
michael@0 789 EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
michael@0 790 BEV_LOCK(bev);
michael@0 791 if (bevp->rate_limiting && bevp->rate_limiting->group) {
michael@0 792 struct bufferevent_rate_limit_group *g =
michael@0 793 bevp->rate_limiting->group;
michael@0 794 LOCK_GROUP(g);
michael@0 795 bevp->rate_limiting->group = NULL;
michael@0 796 --g->n_members;
michael@0 797 TAILQ_REMOVE(&g->members, bevp, rate_limiting->next_in_group);
michael@0 798 UNLOCK_GROUP(g);
michael@0 799 }
michael@0 800 if (unsuspend) {
michael@0 801 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW_GROUP);
michael@0 802 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW_GROUP);
michael@0 803 }
michael@0 804 BEV_UNLOCK(bev);
michael@0 805 return 0;
michael@0 806 }
michael@0 807
michael@0 808 /* ===
michael@0 809 * API functions to expose rate limits.
michael@0 810 *
michael@0 811 * Don't use these from inside Libevent; they're meant to be for use by
michael@0 812 * the program.
michael@0 813 * === */
michael@0 814
michael@0 815 /* Mostly you don't want to use this function from inside libevent;
michael@0 816 * _bufferevent_get_read_max() is more likely what you want*/
michael@0 817 ev_ssize_t
michael@0 818 bufferevent_get_read_limit(struct bufferevent *bev)
michael@0 819 {
michael@0 820 ev_ssize_t r;
michael@0 821 struct bufferevent_private *bevp;
michael@0 822 BEV_LOCK(bev);
michael@0 823 bevp = BEV_UPCAST(bev);
michael@0 824 if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
michael@0 825 bufferevent_update_buckets(bevp);
michael@0 826 r = bevp->rate_limiting->limit.read_limit;
michael@0 827 } else {
michael@0 828 r = EV_SSIZE_MAX;
michael@0 829 }
michael@0 830 BEV_UNLOCK(bev);
michael@0 831 return r;
michael@0 832 }
michael@0 833
michael@0 834 /* Mostly you don't want to use this function from inside libevent;
michael@0 835 * _bufferevent_get_write_max() is more likely what you want*/
michael@0 836 ev_ssize_t
michael@0 837 bufferevent_get_write_limit(struct bufferevent *bev)
michael@0 838 {
michael@0 839 ev_ssize_t r;
michael@0 840 struct bufferevent_private *bevp;
michael@0 841 BEV_LOCK(bev);
michael@0 842 bevp = BEV_UPCAST(bev);
michael@0 843 if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
michael@0 844 bufferevent_update_buckets(bevp);
michael@0 845 r = bevp->rate_limiting->limit.write_limit;
michael@0 846 } else {
michael@0 847 r = EV_SSIZE_MAX;
michael@0 848 }
michael@0 849 BEV_UNLOCK(bev);
michael@0 850 return r;
michael@0 851 }
michael@0 852
michael@0 853 ev_ssize_t
michael@0 854 bufferevent_get_max_to_read(struct bufferevent *bev)
michael@0 855 {
michael@0 856 ev_ssize_t r;
michael@0 857 BEV_LOCK(bev);
michael@0 858 r = _bufferevent_get_read_max(BEV_UPCAST(bev));
michael@0 859 BEV_UNLOCK(bev);
michael@0 860 return r;
michael@0 861 }
michael@0 862
michael@0 863 ev_ssize_t
michael@0 864 bufferevent_get_max_to_write(struct bufferevent *bev)
michael@0 865 {
michael@0 866 ev_ssize_t r;
michael@0 867 BEV_LOCK(bev);
michael@0 868 r = _bufferevent_get_write_max(BEV_UPCAST(bev));
michael@0 869 BEV_UNLOCK(bev);
michael@0 870 return r;
michael@0 871 }
michael@0 872
michael@0 873
michael@0 874 /* Mostly you don't want to use this function from inside libevent;
michael@0 875 * _bufferevent_get_read_max() is more likely what you want*/
michael@0 876 ev_ssize_t
michael@0 877 bufferevent_rate_limit_group_get_read_limit(
michael@0 878 struct bufferevent_rate_limit_group *grp)
michael@0 879 {
michael@0 880 ev_ssize_t r;
michael@0 881 LOCK_GROUP(grp);
michael@0 882 r = grp->rate_limit.read_limit;
michael@0 883 UNLOCK_GROUP(grp);
michael@0 884 return r;
michael@0 885 }
michael@0 886
michael@0 887 /* Mostly you don't want to use this function from inside libevent;
michael@0 888 * _bufferevent_get_write_max() is more likely what you want. */
michael@0 889 ev_ssize_t
michael@0 890 bufferevent_rate_limit_group_get_write_limit(
michael@0 891 struct bufferevent_rate_limit_group *grp)
michael@0 892 {
michael@0 893 ev_ssize_t r;
michael@0 894 LOCK_GROUP(grp);
michael@0 895 r = grp->rate_limit.write_limit;
michael@0 896 UNLOCK_GROUP(grp);
michael@0 897 return r;
michael@0 898 }
michael@0 899
michael@0 900 int
michael@0 901 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
michael@0 902 {
michael@0 903 int r = 0;
michael@0 904 ev_ssize_t old_limit, new_limit;
michael@0 905 struct bufferevent_private *bevp;
michael@0 906 BEV_LOCK(bev);
michael@0 907 bevp = BEV_UPCAST(bev);
michael@0 908 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
michael@0 909 old_limit = bevp->rate_limiting->limit.read_limit;
michael@0 910
michael@0 911 new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
michael@0 912 if (old_limit > 0 && new_limit <= 0) {
michael@0 913 bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
michael@0 914 if (event_add(&bevp->rate_limiting->refill_bucket_event,
michael@0 915 &bevp->rate_limiting->cfg->tick_timeout) < 0)
michael@0 916 r = -1;
michael@0 917 } else if (old_limit <= 0 && new_limit > 0) {
michael@0 918 if (!(bevp->write_suspended & BEV_SUSPEND_BW))
michael@0 919 event_del(&bevp->rate_limiting->refill_bucket_event);
michael@0 920 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
michael@0 921 }
michael@0 922
michael@0 923 BEV_UNLOCK(bev);
michael@0 924 return r;
michael@0 925 }
michael@0 926
michael@0 927 int
michael@0 928 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
michael@0 929 {
michael@0 930 /* XXXX this is mostly copy-and-paste from
michael@0 931 * bufferevent_decrement_read_limit */
michael@0 932 int r = 0;
michael@0 933 ev_ssize_t old_limit, new_limit;
michael@0 934 struct bufferevent_private *bevp;
michael@0 935 BEV_LOCK(bev);
michael@0 936 bevp = BEV_UPCAST(bev);
michael@0 937 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
michael@0 938 old_limit = bevp->rate_limiting->limit.write_limit;
michael@0 939
michael@0 940 new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
michael@0 941 if (old_limit > 0 && new_limit <= 0) {
michael@0 942 bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
michael@0 943 if (event_add(&bevp->rate_limiting->refill_bucket_event,
michael@0 944 &bevp->rate_limiting->cfg->tick_timeout) < 0)
michael@0 945 r = -1;
michael@0 946 } else if (old_limit <= 0 && new_limit > 0) {
michael@0 947 if (!(bevp->read_suspended & BEV_SUSPEND_BW))
michael@0 948 event_del(&bevp->rate_limiting->refill_bucket_event);
michael@0 949 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
michael@0 950 }
michael@0 951
michael@0 952 BEV_UNLOCK(bev);
michael@0 953 return r;
michael@0 954 }
michael@0 955
michael@0 956 int
michael@0 957 bufferevent_rate_limit_group_decrement_read(
michael@0 958 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
michael@0 959 {
michael@0 960 int r = 0;
michael@0 961 ev_ssize_t old_limit, new_limit;
michael@0 962 LOCK_GROUP(grp);
michael@0 963 old_limit = grp->rate_limit.read_limit;
michael@0 964 new_limit = (grp->rate_limit.read_limit -= decr);
michael@0 965
michael@0 966 if (old_limit > 0 && new_limit <= 0) {
michael@0 967 _bev_group_suspend_reading(grp);
michael@0 968 } else if (old_limit <= 0 && new_limit > 0) {
michael@0 969 _bev_group_unsuspend_reading(grp);
michael@0 970 }
michael@0 971
michael@0 972 UNLOCK_GROUP(grp);
michael@0 973 return r;
michael@0 974 }
michael@0 975
michael@0 976 int
michael@0 977 bufferevent_rate_limit_group_decrement_write(
michael@0 978 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
michael@0 979 {
michael@0 980 int r = 0;
michael@0 981 ev_ssize_t old_limit, new_limit;
michael@0 982 LOCK_GROUP(grp);
michael@0 983 old_limit = grp->rate_limit.write_limit;
michael@0 984 new_limit = (grp->rate_limit.write_limit -= decr);
michael@0 985
michael@0 986 if (old_limit > 0 && new_limit <= 0) {
michael@0 987 _bev_group_suspend_writing(grp);
michael@0 988 } else if (old_limit <= 0 && new_limit > 0) {
michael@0 989 _bev_group_unsuspend_writing(grp);
michael@0 990 }
michael@0 991
michael@0 992 UNLOCK_GROUP(grp);
michael@0 993 return r;
michael@0 994 }
michael@0 995
michael@0 996 void
michael@0 997 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
michael@0 998 ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
michael@0 999 {
michael@0 1000 EVUTIL_ASSERT(grp != NULL);
michael@0 1001 if (total_read_out)
michael@0 1002 *total_read_out = grp->total_read;
michael@0 1003 if (total_written_out)
michael@0 1004 *total_written_out = grp->total_written;
michael@0 1005 }
michael@0 1006
michael@0 1007 void
michael@0 1008 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
michael@0 1009 {
michael@0 1010 grp->total_read = grp->total_written = 0;
michael@0 1011 }

mercurial