ipc/chromium/src/third_party/libevent/bufferevent_ratelim.c

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

     1 /*
     2  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
     3  * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
     4  * All rights reserved.
     5  *
     6  * Redistribution and use in source and binary forms, with or without
     7  * modification, are permitted provided that the following conditions
     8  * are met:
     9  * 1. Redistributions of source code must retain the above copyright
    10  *    notice, this list of conditions and the following disclaimer.
    11  * 2. Redistributions in binary form must reproduce the above copyright
    12  *    notice, this list of conditions and the following disclaimer in the
    13  *    documentation and/or other materials provided with the distribution.
    14  * 3. The name of the author may not be used to endorse or promote products
    15  *    derived from this software without specific prior written permission.
    16  *
    17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
    18  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
    19  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
    20  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
    21  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
    22  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
    23  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
    24  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
    25  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
    26  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
    27  */
    29 #include <sys/types.h>
    30 #include <limits.h>
    31 #include <string.h>
    32 #include <stdlib.h>
    34 #include "event2/event.h"
    35 #include "event2/event_struct.h"
    36 #include "event2/util.h"
    37 #include "event2/bufferevent.h"
    38 #include "event2/bufferevent_struct.h"
    39 #include "event2/buffer.h"
    41 #include "ratelim-internal.h"
    43 #include "bufferevent-internal.h"
    44 #include "mm-internal.h"
    45 #include "util-internal.h"
    46 #include "event-internal.h"
    48 int
    49 ev_token_bucket_init(struct ev_token_bucket *bucket,
    50     const struct ev_token_bucket_cfg *cfg,
    51     ev_uint32_t current_tick,
    52     int reinitialize)
    53 {
    54 	if (reinitialize) {
    55 		/* on reinitialization, we only clip downwards, since we've
    56 		   already used who-knows-how-much bandwidth this tick.  We
    57 		   leave "last_updated" as it is; the next update will add the
    58 		   appropriate amount of bandwidth to the bucket.
    59 		*/
    60 		if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
    61 			bucket->read_limit = cfg->read_maximum;
    62 		if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
    63 			bucket->write_limit = cfg->write_maximum;
    64 	} else {
    65 		bucket->read_limit = cfg->read_rate;
    66 		bucket->write_limit = cfg->write_rate;
    67 		bucket->last_updated = current_tick;
    68 	}
    69 	return 0;
    70 }
    72 int
    73 ev_token_bucket_update(struct ev_token_bucket *bucket,
    74     const struct ev_token_bucket_cfg *cfg,
    75     ev_uint32_t current_tick)
    76 {
    77 	/* It's okay if the tick number overflows, since we'll just
    78 	 * wrap around when we do the unsigned substraction. */
    79 	unsigned n_ticks = current_tick - bucket->last_updated;
    81 	/* Make sure some ticks actually happened, and that time didn't
    82 	 * roll back. */
    83 	if (n_ticks == 0 || n_ticks > INT_MAX)
    84 		return 0;
    86 	/* Naively, we would say
    87 		bucket->limit += n_ticks * cfg->rate;
    89 		if (bucket->limit > cfg->maximum)
    90 			bucket->limit = cfg->maximum;
    92 	   But we're worried about overflow, so we do it like this:
    93 	*/
    95 	if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
    96 		bucket->read_limit = cfg->read_maximum;
    97 	else
    98 		bucket->read_limit += n_ticks * cfg->read_rate;
   101 	if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
   102 		bucket->write_limit = cfg->write_maximum;
   103 	else
   104 		bucket->write_limit += n_ticks * cfg->write_rate;
   107 	bucket->last_updated = current_tick;
   109 	return 1;
   110 }
   112 static inline void
   113 bufferevent_update_buckets(struct bufferevent_private *bev)
   114 {
   115 	/* Must hold lock on bev. */
   116 	struct timeval now;
   117 	unsigned tick;
   118 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
   119 	tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg);
   120 	if (tick != bev->rate_limiting->limit.last_updated)
   121 		ev_token_bucket_update(&bev->rate_limiting->limit,
   122 		    bev->rate_limiting->cfg, tick);
   123 }
   125 ev_uint32_t
   126 ev_token_bucket_get_tick(const struct timeval *tv,
   127     const struct ev_token_bucket_cfg *cfg)
   128 {
   129 	/* This computation uses two multiplies and a divide.  We could do
   130 	 * fewer if we knew that the tick length was an integer number of
   131 	 * seconds, or if we knew it divided evenly into a second.  We should
   132 	 * investigate that more.
   133 	 */
   135 	/* We cast to an ev_uint64_t first, since we don't want to overflow
   136 	 * before we do the final divide. */
   137 	ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
   138 	return (unsigned)(msec / cfg->msec_per_tick);
   139 }
   141 struct ev_token_bucket_cfg *
   142 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
   143     size_t write_rate, size_t write_burst,
   144     const struct timeval *tick_len)
   145 {
   146 	struct ev_token_bucket_cfg *r;
   147 	struct timeval g;
   148 	if (! tick_len) {
   149 		g.tv_sec = 1;
   150 		g.tv_usec = 0;
   151 		tick_len = &g;
   152 	}
   153 	if (read_rate > read_burst || write_rate > write_burst ||
   154 	    read_rate < 1 || write_rate < 1)
   155 		return NULL;
   156 	if (read_rate > EV_RATE_LIMIT_MAX ||
   157 	    write_rate > EV_RATE_LIMIT_MAX ||
   158 	    read_burst > EV_RATE_LIMIT_MAX ||
   159 	    write_burst > EV_RATE_LIMIT_MAX)
   160 		return NULL;
   161 	r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
   162 	if (!r)
   163 		return NULL;
   164 	r->read_rate = read_rate;
   165 	r->write_rate = write_rate;
   166 	r->read_maximum = read_burst;
   167 	r->write_maximum = write_burst;
   168 	memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
   169 	r->msec_per_tick = (tick_len->tv_sec * 1000) +
   170 	    (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
   171 	return r;
   172 }
   174 void
   175 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
   176 {
   177 	mm_free(cfg);
   178 }
   180 /* No matter how big our bucket gets, don't try to read more than this
   181  * much in a single read operation. */
   182 #define MAX_TO_READ_EVER 16384
   183 /* No matter how big our bucket gets, don't try to write more than this
   184  * much in a single write operation. */
   185 #define MAX_TO_WRITE_EVER 16384
   187 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
   188 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
   190 static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g);
   191 static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g);
   192 static void _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g);
   193 static void _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g);
   195 /** Helper: figure out the maximum amount we should write if is_write, or
   196     the maximum amount we should read if is_read.  Return that maximum, or
   197     0 if our bucket is wholly exhausted.
   198  */
   199 static inline ev_ssize_t
   200 _bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write)
   201 {
   202 	/* needs lock on bev. */
   203 	ev_ssize_t max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER;
   205 #define LIM(x)						\
   206 	(is_write ? (x).write_limit : (x).read_limit)
   208 #define GROUP_SUSPENDED(g)			\
   209 	(is_write ? (g)->write_suspended : (g)->read_suspended)
   211 	/* Sets max_so_far to MIN(x, max_so_far) */
   212 #define CLAMPTO(x)				\
   213 	do {					\
   214 		if (max_so_far > (x))		\
   215 			max_so_far = (x);	\
   216 	} while (0);
   218 	if (!bev->rate_limiting)
   219 		return max_so_far;
   221 	/* If rate-limiting is enabled at all, update the appropriate
   222 	   bucket, and take the smaller of our rate limit and the group
   223 	   rate limit.
   224 	 */
   226 	if (bev->rate_limiting->cfg) {
   227 		bufferevent_update_buckets(bev);
   228 		max_so_far = LIM(bev->rate_limiting->limit);
   229 	}
   230 	if (bev->rate_limiting->group) {
   231 		struct bufferevent_rate_limit_group *g =
   232 		    bev->rate_limiting->group;
   233 		ev_ssize_t share;
   234 		LOCK_GROUP(g);
   235 		if (GROUP_SUSPENDED(g)) {
   236 			/* We can get here if we failed to lock this
   237 			 * particular bufferevent while suspending the whole
   238 			 * group. */
   239 			if (is_write)
   240 				bufferevent_suspend_write(&bev->bev,
   241 				    BEV_SUSPEND_BW_GROUP);
   242 			else
   243 				bufferevent_suspend_read(&bev->bev,
   244 				    BEV_SUSPEND_BW_GROUP);
   245 			share = 0;
   246 		} else {
   247 			/* XXXX probably we should divide among the active
   248 			 * members, not the total members. */
   249 			share = LIM(g->rate_limit) / g->n_members;
   250 			if (share < g->min_share)
   251 				share = g->min_share;
   252 		}
   253 		UNLOCK_GROUP(g);
   254 		CLAMPTO(share);
   255 	}
   257 	if (max_so_far < 0)
   258 		max_so_far = 0;
   259 	return max_so_far;
   260 }
   262 ev_ssize_t
   263 _bufferevent_get_read_max(struct bufferevent_private *bev)
   264 {
   265 	return _bufferevent_get_rlim_max(bev, 0);
   266 }
   268 ev_ssize_t
   269 _bufferevent_get_write_max(struct bufferevent_private *bev)
   270 {
   271 	return _bufferevent_get_rlim_max(bev, 1);
   272 }
   274 int
   275 _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
   276 {
   277 	/* XXXXX Make sure all users of this function check its return value */
   278 	int r = 0;
   279 	/* need to hold lock on bev */
   280 	if (!bev->rate_limiting)
   281 		return 0;
   283 	if (bev->rate_limiting->cfg) {
   284 		bev->rate_limiting->limit.read_limit -= bytes;
   285 		if (bev->rate_limiting->limit.read_limit <= 0) {
   286 			bufferevent_suspend_read(&bev->bev, BEV_SUSPEND_BW);
   287 			if (event_add(&bev->rate_limiting->refill_bucket_event,
   288 				&bev->rate_limiting->cfg->tick_timeout) < 0)
   289 				r = -1;
   290 		} else if (bev->read_suspended & BEV_SUSPEND_BW) {
   291 			if (!(bev->write_suspended & BEV_SUSPEND_BW))
   292 				event_del(&bev->rate_limiting->refill_bucket_event);
   293 			bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
   294 		}
   295 	}
   297 	if (bev->rate_limiting->group) {
   298 		LOCK_GROUP(bev->rate_limiting->group);
   299 		bev->rate_limiting->group->rate_limit.read_limit -= bytes;
   300 		bev->rate_limiting->group->total_read += bytes;
   301 		if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
   302 			_bev_group_suspend_reading(bev->rate_limiting->group);
   303 		} else if (bev->rate_limiting->group->read_suspended) {
   304 			_bev_group_unsuspend_reading(bev->rate_limiting->group);
   305 		}
   306 		UNLOCK_GROUP(bev->rate_limiting->group);
   307 	}
   309 	return r;
   310 }
   312 int
   313 _bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t bytes)
   314 {
   315 	/* XXXXX Make sure all users of this function check its return value */
   316 	int r = 0;
   317 	/* need to hold lock */
   318 	if (!bev->rate_limiting)
   319 		return 0;
   321 	if (bev->rate_limiting->cfg) {
   322 		bev->rate_limiting->limit.write_limit -= bytes;
   323 		if (bev->rate_limiting->limit.write_limit <= 0) {
   324 			bufferevent_suspend_write(&bev->bev, BEV_SUSPEND_BW);
   325 			if (event_add(&bev->rate_limiting->refill_bucket_event,
   326 				&bev->rate_limiting->cfg->tick_timeout) < 0)
   327 				r = -1;
   328 		} else if (bev->write_suspended & BEV_SUSPEND_BW) {
   329 			if (!(bev->read_suspended & BEV_SUSPEND_BW))
   330 				event_del(&bev->rate_limiting->refill_bucket_event);
   331 			bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
   332 		}
   333 	}
   335 	if (bev->rate_limiting->group) {
   336 		LOCK_GROUP(bev->rate_limiting->group);
   337 		bev->rate_limiting->group->rate_limit.write_limit -= bytes;
   338 		bev->rate_limiting->group->total_written += bytes;
   339 		if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
   340 			_bev_group_suspend_writing(bev->rate_limiting->group);
   341 		} else if (bev->rate_limiting->group->write_suspended) {
   342 			_bev_group_unsuspend_writing(bev->rate_limiting->group);
   343 		}
   344 		UNLOCK_GROUP(bev->rate_limiting->group);
   345 	}
   347 	return r;
   348 }
   350 /** Stop reading on every bufferevent in <b>g</b> */
   351 static int
   352 _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g)
   353 {
   354 	/* Needs group lock */
   355 	struct bufferevent_private *bev;
   356 	g->read_suspended = 1;
   357 	g->pending_unsuspend_read = 0;
   359 	/* Note that in this loop we call EVLOCK_TRY_LOCK instead of BEV_LOCK,
   360 	   to prevent a deadlock.  (Ordinarily, the group lock nests inside
   361 	   the bufferevent locks.  If we are unable to lock any individual
   362 	   bufferevent, it will find out later when it looks at its limit
   363 	   and sees that its group is suspended.
   364 	*/
   365 	TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
   366 		if (EVLOCK_TRY_LOCK(bev->lock)) {
   367 			bufferevent_suspend_read(&bev->bev,
   368 			    BEV_SUSPEND_BW_GROUP);
   369 			EVLOCK_UNLOCK(bev->lock, 0);
   370 		}
   371 	}
   372 	return 0;
   373 }
   375 /** Stop writing on every bufferevent in <b>g</b> */
   376 static int
   377 _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g)
   378 {
   379 	/* Needs group lock */
   380 	struct bufferevent_private *bev;
   381 	g->write_suspended = 1;
   382 	g->pending_unsuspend_write = 0;
   383 	TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
   384 		if (EVLOCK_TRY_LOCK(bev->lock)) {
   385 			bufferevent_suspend_write(&bev->bev,
   386 			    BEV_SUSPEND_BW_GROUP);
   387 			EVLOCK_UNLOCK(bev->lock, 0);
   388 		}
   389 	}
   390 	return 0;
   391 }
   393 /** Timer callback invoked on a single bufferevent with one or more exhausted
   394     buckets when they are ready to refill. */
   395 static void
   396 _bev_refill_callback(evutil_socket_t fd, short what, void *arg)
   397 {
   398 	unsigned tick;
   399 	struct timeval now;
   400 	struct bufferevent_private *bev = arg;
   401 	int again = 0;
   402 	BEV_LOCK(&bev->bev);
   403 	if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
   404 		BEV_UNLOCK(&bev->bev);
   405 		return;
   406 	}
   408 	/* First, update the bucket */
   409 	event_base_gettimeofday_cached(bev->bev.ev_base, &now);
   410 	tick = ev_token_bucket_get_tick(&now,
   411 	    bev->rate_limiting->cfg);
   412 	ev_token_bucket_update(&bev->rate_limiting->limit,
   413 	    bev->rate_limiting->cfg,
   414 	    tick);
   416 	/* Now unsuspend any read/write operations as appropriate. */
   417 	if ((bev->read_suspended & BEV_SUSPEND_BW)) {
   418 		if (bev->rate_limiting->limit.read_limit > 0)
   419 			bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
   420 		else
   421 			again = 1;
   422 	}
   423 	if ((bev->write_suspended & BEV_SUSPEND_BW)) {
   424 		if (bev->rate_limiting->limit.write_limit > 0)
   425 			bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
   426 		else
   427 			again = 1;
   428 	}
   429 	if (again) {
   430 		/* One or more of the buckets may need another refill if they
   431 		   started negative.
   433 		   XXXX if we need to be quiet for more ticks, we should
   434 		   maybe figure out what timeout we really want.
   435 		*/
   436 		/* XXXX Handle event_add failure somehow */
   437 		event_add(&bev->rate_limiting->refill_bucket_event,
   438 		    &bev->rate_limiting->cfg->tick_timeout);
   439 	}
   440 	BEV_UNLOCK(&bev->bev);
   441 }
   443 /** Helper: grab a random element from a bufferevent group. */
   444 static struct bufferevent_private *
   445 _bev_group_random_element(struct bufferevent_rate_limit_group *group)
   446 {
   447 	int which;
   448 	struct bufferevent_private *bev;
   450 	/* requires group lock */
   452 	if (!group->n_members)
   453 		return NULL;
   455 	EVUTIL_ASSERT(! TAILQ_EMPTY(&group->members));
   457 	which = _evutil_weakrand() % group->n_members;
   459 	bev = TAILQ_FIRST(&group->members);
   460 	while (which--)
   461 		bev = TAILQ_NEXT(bev, rate_limiting->next_in_group);
   463 	return bev;
   464 }
   466 /** Iterate over the elements of a rate-limiting group 'g' with a random
   467     starting point, assigning each to the variable 'bev', and executing the
   468     block 'block'.
   470     We do this in a half-baked effort to get fairness among group members.
   471     XXX Round-robin or some kind of priority queue would be even more fair.
   472  */
   473 #define FOREACH_RANDOM_ORDER(block)			\
   474 	do {						\
   475 		first = _bev_group_random_element(g);	\
   476 		for (bev = first; bev != NULL; \
   477 		    bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
   478 			block ;					 \
   479 		}						 \
   480 		for (bev = TAILQ_FIRST(&g->members); bev && bev != first; \
   481 		    bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \
   482 			block ;						\
   483 		}							\
   484 	} while (0)
   486 static void
   487 _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g)
   488 {
   489 	int again = 0;
   490 	struct bufferevent_private *bev, *first;
   492 	g->read_suspended = 0;
   493 	FOREACH_RANDOM_ORDER({
   494 		if (EVLOCK_TRY_LOCK(bev->lock)) {
   495 			bufferevent_unsuspend_read(&bev->bev,
   496 			    BEV_SUSPEND_BW_GROUP);
   497 			EVLOCK_UNLOCK(bev->lock, 0);
   498 		} else {
   499 			again = 1;
   500 		}
   501 	});
   502 	g->pending_unsuspend_read = again;
   503 }
   505 static void
   506 _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g)
   507 {
   508 	int again = 0;
   509 	struct bufferevent_private *bev, *first;
   510 	g->write_suspended = 0;
   512 	FOREACH_RANDOM_ORDER({
   513 		if (EVLOCK_TRY_LOCK(bev->lock)) {
   514 			bufferevent_unsuspend_write(&bev->bev,
   515 			    BEV_SUSPEND_BW_GROUP);
   516 			EVLOCK_UNLOCK(bev->lock, 0);
   517 		} else {
   518 			again = 1;
   519 		}
   520 	});
   521 	g->pending_unsuspend_write = again;
   522 }
   524 /** Callback invoked every tick to add more elements to the group bucket
   525     and unsuspend group members as needed.
   526  */
   527 static void
   528 _bev_group_refill_callback(evutil_socket_t fd, short what, void *arg)
   529 {
   530 	struct bufferevent_rate_limit_group *g = arg;
   531 	unsigned tick;
   532 	struct timeval now;
   534 	event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
   536 	LOCK_GROUP(g);
   538 	tick = ev_token_bucket_get_tick(&now, &g->rate_limit_cfg);
   539 	ev_token_bucket_update(&g->rate_limit, &g->rate_limit_cfg, tick);
   541 	if (g->pending_unsuspend_read ||
   542 	    (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
   543 		_bev_group_unsuspend_reading(g);
   544 	}
   545 	if (g->pending_unsuspend_write ||
   546 	    (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
   547 		_bev_group_unsuspend_writing(g);
   548 	}
   550 	/* XXXX Rather than waiting to the next tick to unsuspend stuff
   551 	 * with pending_unsuspend_write/read, we should do it on the
   552 	 * next iteration of the mainloop.
   553 	 */
   555 	UNLOCK_GROUP(g);
   556 }
   558 int
   559 bufferevent_set_rate_limit(struct bufferevent *bev,
   560     struct ev_token_bucket_cfg *cfg)
   561 {
   562 	struct bufferevent_private *bevp =
   563 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
   564 	int r = -1;
   565 	struct bufferevent_rate_limit *rlim;
   566 	struct timeval now;
   567 	ev_uint32_t tick;
   568 	int reinit = 0, suspended = 0;
   569 	/* XXX reference-count cfg */
   571 	BEV_LOCK(bev);
   573 	if (cfg == NULL) {
   574 		if (bevp->rate_limiting) {
   575 			rlim = bevp->rate_limiting;
   576 			rlim->cfg = NULL;
   577 			bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
   578 			bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
   579 			if (event_initialized(&rlim->refill_bucket_event))
   580 				event_del(&rlim->refill_bucket_event);
   581 		}
   582 		r = 0;
   583 		goto done;
   584 	}
   586 	event_base_gettimeofday_cached(bev->ev_base, &now);
   587 	tick = ev_token_bucket_get_tick(&now, cfg);
   589 	if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
   590 		/* no-op */
   591 		r = 0;
   592 		goto done;
   593 	}
   594 	if (bevp->rate_limiting == NULL) {
   595 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
   596 		if (!rlim)
   597 			goto done;
   598 		bevp->rate_limiting = rlim;
   599 	} else {
   600 		rlim = bevp->rate_limiting;
   601 	}
   602 	reinit = rlim->cfg != NULL;
   604 	rlim->cfg = cfg;
   605 	ev_token_bucket_init(&rlim->limit, cfg, tick, reinit);
   607 	if (reinit) {
   608 		EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
   609 		event_del(&rlim->refill_bucket_event);
   610 	}
   611 	evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
   612 	    _bev_refill_callback, bevp);
   614 	if (rlim->limit.read_limit > 0) {
   615 		bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
   616 	} else {
   617 		bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
   618 		suspended=1;
   619 	}
   620 	if (rlim->limit.write_limit > 0) {
   621 		bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
   622 	} else {
   623 		bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
   624 		suspended = 1;
   625 	}
   627 	if (suspended)
   628 		event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
   630 	r = 0;
   632 done:
   633 	BEV_UNLOCK(bev);
   634 	return r;
   635 }
   637 struct bufferevent_rate_limit_group *
   638 bufferevent_rate_limit_group_new(struct event_base *base,
   639     const struct ev_token_bucket_cfg *cfg)
   640 {
   641 	struct bufferevent_rate_limit_group *g;
   642 	struct timeval now;
   643 	ev_uint32_t tick;
   645 	event_base_gettimeofday_cached(base, &now);
   646 	tick = ev_token_bucket_get_tick(&now, cfg);
   648 	g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
   649 	if (!g)
   650 		return NULL;
   651 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
   652 	TAILQ_INIT(&g->members);
   654 	ev_token_bucket_init(&g->rate_limit, cfg, tick, 0);
   656 	event_assign(&g->master_refill_event, base, -1, EV_PERSIST,
   657 	    _bev_group_refill_callback, g);
   658 	/*XXXX handle event_add failure */
   659 	event_add(&g->master_refill_event, &cfg->tick_timeout);
   661 	EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
   663 	bufferevent_rate_limit_group_set_min_share(g, 64);
   665 	return g;
   666 }
   668 int
   669 bufferevent_rate_limit_group_set_cfg(
   670 	struct bufferevent_rate_limit_group *g,
   671 	const struct ev_token_bucket_cfg *cfg)
   672 {
   673 	int same_tick;
   674 	if (!g || !cfg)
   675 		return -1;
   677 	LOCK_GROUP(g);
   678 	same_tick = evutil_timercmp(
   679 		&g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
   680 	memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
   682 	if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
   683 		g->rate_limit.read_limit = cfg->read_maximum;
   684 	if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
   685 		g->rate_limit.write_limit = cfg->write_maximum;
   687 	if (!same_tick) {
   688 		/* This can cause a hiccup in the schedule */
   689 		event_add(&g->master_refill_event, &cfg->tick_timeout);
   690 	}
   692 	/* The new limits might force us to adjust min_share differently. */
   693 	bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
   695 	UNLOCK_GROUP(g);
   696 	return 0;
   697 }
   699 int
   700 bufferevent_rate_limit_group_set_min_share(
   701 	struct bufferevent_rate_limit_group *g,
   702 	size_t share)
   703 {
   704 	if (share > EV_SSIZE_MAX)
   705 		return -1;
   707 	g->configured_min_share = share;
   709 	/* Can't set share to less than the one-tick maximum.  IOW, at steady
   710 	 * state, at least one connection can go per tick. */
   711 	if (share > g->rate_limit_cfg.read_rate)
   712 		share = g->rate_limit_cfg.read_rate;
   713 	if (share > g->rate_limit_cfg.write_rate)
   714 		share = g->rate_limit_cfg.write_rate;
   716 	g->min_share = share;
   717 	return 0;
   718 }
   720 void
   721 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
   722 {
   723 	LOCK_GROUP(g);
   724 	EVUTIL_ASSERT(0 == g->n_members);
   725 	event_del(&g->master_refill_event);
   726 	UNLOCK_GROUP(g);
   727 	EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
   728 	mm_free(g);
   729 }
   731 int
   732 bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
   733     struct bufferevent_rate_limit_group *g)
   734 {
   735 	int wsuspend, rsuspend;
   736 	struct bufferevent_private *bevp =
   737 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
   738 	BEV_LOCK(bev);
   740 	if (!bevp->rate_limiting) {
   741 		struct bufferevent_rate_limit *rlim;
   742 		rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
   743 		if (!rlim) {
   744 			BEV_UNLOCK(bev);
   745 			return -1;
   746 		}
   747 		evtimer_assign(&rlim->refill_bucket_event, bev->ev_base,
   748 		    _bev_refill_callback, bevp);
   749 		bevp->rate_limiting = rlim;
   750 	}
   752 	if (bevp->rate_limiting->group == g) {
   753 		BEV_UNLOCK(bev);
   754 		return 0;
   755 	}
   756 	if (bevp->rate_limiting->group)
   757 		bufferevent_remove_from_rate_limit_group(bev);
   759 	LOCK_GROUP(g);
   760 	bevp->rate_limiting->group = g;
   761 	++g->n_members;
   762 	TAILQ_INSERT_TAIL(&g->members, bevp, rate_limiting->next_in_group);
   764 	rsuspend = g->read_suspended;
   765 	wsuspend = g->write_suspended;
   767 	UNLOCK_GROUP(g);
   769 	if (rsuspend)
   770 		bufferevent_suspend_read(bev, BEV_SUSPEND_BW_GROUP);
   771 	if (wsuspend)
   772 		bufferevent_suspend_write(bev, BEV_SUSPEND_BW_GROUP);
   774 	BEV_UNLOCK(bev);
   775 	return 0;
   776 }
   778 int
   779 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
   780 {
   781 	return bufferevent_remove_from_rate_limit_group_internal(bev, 1);
   782 }
   784 int
   785 bufferevent_remove_from_rate_limit_group_internal(struct bufferevent *bev,
   786     int unsuspend)
   787 {
   788 	struct bufferevent_private *bevp =
   789 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
   790 	BEV_LOCK(bev);
   791 	if (bevp->rate_limiting && bevp->rate_limiting->group) {
   792 		struct bufferevent_rate_limit_group *g =
   793 		    bevp->rate_limiting->group;
   794 		LOCK_GROUP(g);
   795 		bevp->rate_limiting->group = NULL;
   796 		--g->n_members;
   797 		TAILQ_REMOVE(&g->members, bevp, rate_limiting->next_in_group);
   798 		UNLOCK_GROUP(g);
   799 	}
   800 	if (unsuspend) {
   801 		bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW_GROUP);
   802 		bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW_GROUP);
   803 	}
   804 	BEV_UNLOCK(bev);
   805 	return 0;
   806 }
   808 /* ===
   809  * API functions to expose rate limits.
   810  *
   811  * Don't use these from inside Libevent; they're meant to be for use by
   812  * the program.
   813  * === */
   815 /* Mostly you don't want to use this function from inside libevent;
   816  * _bufferevent_get_read_max() is more likely what you want*/
   817 ev_ssize_t
   818 bufferevent_get_read_limit(struct bufferevent *bev)
   819 {
   820 	ev_ssize_t r;
   821 	struct bufferevent_private *bevp;
   822 	BEV_LOCK(bev);
   823 	bevp = BEV_UPCAST(bev);
   824 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
   825 		bufferevent_update_buckets(bevp);
   826 		r = bevp->rate_limiting->limit.read_limit;
   827 	} else {
   828 		r = EV_SSIZE_MAX;
   829 	}
   830 	BEV_UNLOCK(bev);
   831 	return r;
   832 }
   834 /* Mostly you don't want to use this function from inside libevent;
   835  * _bufferevent_get_write_max() is more likely what you want*/
   836 ev_ssize_t
   837 bufferevent_get_write_limit(struct bufferevent *bev)
   838 {
   839 	ev_ssize_t r;
   840 	struct bufferevent_private *bevp;
   841 	BEV_LOCK(bev);
   842 	bevp = BEV_UPCAST(bev);
   843 	if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
   844 		bufferevent_update_buckets(bevp);
   845 		r = bevp->rate_limiting->limit.write_limit;
   846 	} else {
   847 		r = EV_SSIZE_MAX;
   848 	}
   849 	BEV_UNLOCK(bev);
   850 	return r;
   851 }
   853 ev_ssize_t
   854 bufferevent_get_max_to_read(struct bufferevent *bev)
   855 {
   856 	ev_ssize_t r;
   857 	BEV_LOCK(bev);
   858 	r = _bufferevent_get_read_max(BEV_UPCAST(bev));
   859 	BEV_UNLOCK(bev);
   860 	return r;
   861 }
   863 ev_ssize_t
   864 bufferevent_get_max_to_write(struct bufferevent *bev)
   865 {
   866 	ev_ssize_t r;
   867 	BEV_LOCK(bev);
   868 	r = _bufferevent_get_write_max(BEV_UPCAST(bev));
   869 	BEV_UNLOCK(bev);
   870 	return r;
   871 }
   874 /* Mostly you don't want to use this function from inside libevent;
   875  * _bufferevent_get_read_max() is more likely what you want*/
   876 ev_ssize_t
   877 bufferevent_rate_limit_group_get_read_limit(
   878 	struct bufferevent_rate_limit_group *grp)
   879 {
   880 	ev_ssize_t r;
   881 	LOCK_GROUP(grp);
   882 	r = grp->rate_limit.read_limit;
   883 	UNLOCK_GROUP(grp);
   884 	return r;
   885 }
   887 /* Mostly you don't want to use this function from inside libevent;
   888  * _bufferevent_get_write_max() is more likely what you want. */
   889 ev_ssize_t
   890 bufferevent_rate_limit_group_get_write_limit(
   891 	struct bufferevent_rate_limit_group *grp)
   892 {
   893 	ev_ssize_t r;
   894 	LOCK_GROUP(grp);
   895 	r = grp->rate_limit.write_limit;
   896 	UNLOCK_GROUP(grp);
   897 	return r;
   898 }
   900 int
   901 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
   902 {
   903 	int r = 0;
   904 	ev_ssize_t old_limit, new_limit;
   905 	struct bufferevent_private *bevp;
   906 	BEV_LOCK(bev);
   907 	bevp = BEV_UPCAST(bev);
   908 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
   909 	old_limit = bevp->rate_limiting->limit.read_limit;
   911 	new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
   912 	if (old_limit > 0 && new_limit <= 0) {
   913 		bufferevent_suspend_read(bev, BEV_SUSPEND_BW);
   914 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
   915 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
   916 			r = -1;
   917 	} else if (old_limit <= 0 && new_limit > 0) {
   918 		if (!(bevp->write_suspended & BEV_SUSPEND_BW))
   919 			event_del(&bevp->rate_limiting->refill_bucket_event);
   920 		bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW);
   921 	}
   923 	BEV_UNLOCK(bev);
   924 	return r;
   925 }
   927 int
   928 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
   929 {
   930 	/* XXXX this is mostly copy-and-paste from
   931 	 * bufferevent_decrement_read_limit */
   932 	int r = 0;
   933 	ev_ssize_t old_limit, new_limit;
   934 	struct bufferevent_private *bevp;
   935 	BEV_LOCK(bev);
   936 	bevp = BEV_UPCAST(bev);
   937 	EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
   938 	old_limit = bevp->rate_limiting->limit.write_limit;
   940 	new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
   941 	if (old_limit > 0 && new_limit <= 0) {
   942 		bufferevent_suspend_write(bev, BEV_SUSPEND_BW);
   943 		if (event_add(&bevp->rate_limiting->refill_bucket_event,
   944 			&bevp->rate_limiting->cfg->tick_timeout) < 0)
   945 			r = -1;
   946 	} else if (old_limit <= 0 && new_limit > 0) {
   947 		if (!(bevp->read_suspended & BEV_SUSPEND_BW))
   948 			event_del(&bevp->rate_limiting->refill_bucket_event);
   949 		bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW);
   950 	}
   952 	BEV_UNLOCK(bev);
   953 	return r;
   954 }
   956 int
   957 bufferevent_rate_limit_group_decrement_read(
   958 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
   959 {
   960 	int r = 0;
   961 	ev_ssize_t old_limit, new_limit;
   962 	LOCK_GROUP(grp);
   963 	old_limit = grp->rate_limit.read_limit;
   964 	new_limit = (grp->rate_limit.read_limit -= decr);
   966 	if (old_limit > 0 && new_limit <= 0) {
   967 		_bev_group_suspend_reading(grp);
   968 	} else if (old_limit <= 0 && new_limit > 0) {
   969 		_bev_group_unsuspend_reading(grp);
   970 	}
   972 	UNLOCK_GROUP(grp);
   973 	return r;
   974 }
   976 int
   977 bufferevent_rate_limit_group_decrement_write(
   978 	struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
   979 {
   980 	int r = 0;
   981 	ev_ssize_t old_limit, new_limit;
   982 	LOCK_GROUP(grp);
   983 	old_limit = grp->rate_limit.write_limit;
   984 	new_limit = (grp->rate_limit.write_limit -= decr);
   986 	if (old_limit > 0 && new_limit <= 0) {
   987 		_bev_group_suspend_writing(grp);
   988 	} else if (old_limit <= 0 && new_limit > 0) {
   989 		_bev_group_unsuspend_writing(grp);
   990 	}
   992 	UNLOCK_GROUP(grp);
   993 	return r;
   994 }
   996 void
   997 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
   998     ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
   999 {
  1000 	EVUTIL_ASSERT(grp != NULL);
  1001 	if (total_read_out)
  1002 		*total_read_out = grp->total_read;
  1003 	if (total_written_out)
  1004 		*total_written_out = grp->total_written;
  1007 void
  1008 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
  1010 	grp->total_read = grp->total_written = 0;

mercurial