michael@0: /* michael@0: * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson michael@0: * Copyright (c) 2002-2006 Niels Provos michael@0: * All rights reserved. michael@0: * michael@0: * Redistribution and use in source and binary forms, with or without michael@0: * modification, are permitted provided that the following conditions michael@0: * are met: michael@0: * 1. Redistributions of source code must retain the above copyright michael@0: * notice, this list of conditions and the following disclaimer. michael@0: * 2. Redistributions in binary form must reproduce the above copyright michael@0: * notice, this list of conditions and the following disclaimer in the michael@0: * documentation and/or other materials provided with the distribution. michael@0: * 3. The name of the author may not be used to endorse or promote products michael@0: * derived from this software without specific prior written permission. michael@0: * michael@0: * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR michael@0: * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES michael@0: * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. michael@0: * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, michael@0: * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT michael@0: * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, michael@0: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY michael@0: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT michael@0: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF michael@0: * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. michael@0: */ michael@0: michael@0: #include michael@0: #include michael@0: #include michael@0: #include michael@0: michael@0: #include "event2/event.h" michael@0: #include "event2/event_struct.h" michael@0: #include "event2/util.h" michael@0: #include "event2/bufferevent.h" michael@0: #include "event2/bufferevent_struct.h" michael@0: #include "event2/buffer.h" michael@0: michael@0: #include "ratelim-internal.h" michael@0: michael@0: #include "bufferevent-internal.h" michael@0: #include "mm-internal.h" michael@0: #include "util-internal.h" michael@0: #include "event-internal.h" michael@0: michael@0: int michael@0: ev_token_bucket_init(struct ev_token_bucket *bucket, michael@0: const struct ev_token_bucket_cfg *cfg, michael@0: ev_uint32_t current_tick, michael@0: int reinitialize) michael@0: { michael@0: if (reinitialize) { michael@0: /* on reinitialization, we only clip downwards, since we've michael@0: already used who-knows-how-much bandwidth this tick. We michael@0: leave "last_updated" as it is; the next update will add the michael@0: appropriate amount of bandwidth to the bucket. michael@0: */ michael@0: if (bucket->read_limit > (ev_int64_t) cfg->read_maximum) michael@0: bucket->read_limit = cfg->read_maximum; michael@0: if (bucket->write_limit > (ev_int64_t) cfg->write_maximum) michael@0: bucket->write_limit = cfg->write_maximum; michael@0: } else { michael@0: bucket->read_limit = cfg->read_rate; michael@0: bucket->write_limit = cfg->write_rate; michael@0: bucket->last_updated = current_tick; michael@0: } michael@0: return 0; michael@0: } michael@0: michael@0: int michael@0: ev_token_bucket_update(struct ev_token_bucket *bucket, michael@0: const struct ev_token_bucket_cfg *cfg, michael@0: ev_uint32_t current_tick) michael@0: { michael@0: /* It's okay if the tick number overflows, since we'll just michael@0: * wrap around when we do the unsigned substraction. */ michael@0: unsigned n_ticks = current_tick - bucket->last_updated; michael@0: michael@0: /* Make sure some ticks actually happened, and that time didn't michael@0: * roll back. */ michael@0: if (n_ticks == 0 || n_ticks > INT_MAX) michael@0: return 0; michael@0: michael@0: /* Naively, we would say michael@0: bucket->limit += n_ticks * cfg->rate; michael@0: michael@0: if (bucket->limit > cfg->maximum) michael@0: bucket->limit = cfg->maximum; michael@0: michael@0: But we're worried about overflow, so we do it like this: michael@0: */ michael@0: michael@0: if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate) michael@0: bucket->read_limit = cfg->read_maximum; michael@0: else michael@0: bucket->read_limit += n_ticks * cfg->read_rate; michael@0: michael@0: michael@0: if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate) michael@0: bucket->write_limit = cfg->write_maximum; michael@0: else michael@0: bucket->write_limit += n_ticks * cfg->write_rate; michael@0: michael@0: michael@0: bucket->last_updated = current_tick; michael@0: michael@0: return 1; michael@0: } michael@0: michael@0: static inline void michael@0: bufferevent_update_buckets(struct bufferevent_private *bev) michael@0: { michael@0: /* Must hold lock on bev. */ michael@0: struct timeval now; michael@0: unsigned tick; michael@0: event_base_gettimeofday_cached(bev->bev.ev_base, &now); michael@0: tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg); michael@0: if (tick != bev->rate_limiting->limit.last_updated) michael@0: ev_token_bucket_update(&bev->rate_limiting->limit, michael@0: bev->rate_limiting->cfg, tick); michael@0: } michael@0: michael@0: ev_uint32_t michael@0: ev_token_bucket_get_tick(const struct timeval *tv, michael@0: const struct ev_token_bucket_cfg *cfg) michael@0: { michael@0: /* This computation uses two multiplies and a divide. We could do michael@0: * fewer if we knew that the tick length was an integer number of michael@0: * seconds, or if we knew it divided evenly into a second. We should michael@0: * investigate that more. michael@0: */ michael@0: michael@0: /* We cast to an ev_uint64_t first, since we don't want to overflow michael@0: * before we do the final divide. */ michael@0: ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000; michael@0: return (unsigned)(msec / cfg->msec_per_tick); michael@0: } michael@0: michael@0: struct ev_token_bucket_cfg * michael@0: ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst, michael@0: size_t write_rate, size_t write_burst, michael@0: const struct timeval *tick_len) michael@0: { michael@0: struct ev_token_bucket_cfg *r; michael@0: struct timeval g; michael@0: if (! tick_len) { michael@0: g.tv_sec = 1; michael@0: g.tv_usec = 0; michael@0: tick_len = &g; michael@0: } michael@0: if (read_rate > read_burst || write_rate > write_burst || michael@0: read_rate < 1 || write_rate < 1) michael@0: return NULL; michael@0: if (read_rate > EV_RATE_LIMIT_MAX || michael@0: write_rate > EV_RATE_LIMIT_MAX || michael@0: read_burst > EV_RATE_LIMIT_MAX || michael@0: write_burst > EV_RATE_LIMIT_MAX) michael@0: return NULL; michael@0: r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg)); michael@0: if (!r) michael@0: return NULL; michael@0: r->read_rate = read_rate; michael@0: r->write_rate = write_rate; michael@0: r->read_maximum = read_burst; michael@0: r->write_maximum = write_burst; michael@0: memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval)); michael@0: r->msec_per_tick = (tick_len->tv_sec * 1000) + michael@0: (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000; michael@0: return r; michael@0: } michael@0: michael@0: void michael@0: ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg) michael@0: { michael@0: mm_free(cfg); michael@0: } michael@0: michael@0: /* No matter how big our bucket gets, don't try to read more than this michael@0: * much in a single read operation. */ michael@0: #define MAX_TO_READ_EVER 16384 michael@0: /* No matter how big our bucket gets, don't try to write more than this michael@0: * much in a single write operation. */ michael@0: #define MAX_TO_WRITE_EVER 16384 michael@0: michael@0: #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0) michael@0: #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0) michael@0: michael@0: static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g); michael@0: static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g); michael@0: static void _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g); michael@0: static void _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g); michael@0: michael@0: /** Helper: figure out the maximum amount we should write if is_write, or michael@0: the maximum amount we should read if is_read. Return that maximum, or michael@0: 0 if our bucket is wholly exhausted. michael@0: */ michael@0: static inline ev_ssize_t michael@0: _bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write) michael@0: { michael@0: /* needs lock on bev. */ michael@0: ev_ssize_t max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER; michael@0: michael@0: #define LIM(x) \ michael@0: (is_write ? (x).write_limit : (x).read_limit) michael@0: michael@0: #define GROUP_SUSPENDED(g) \ michael@0: (is_write ? (g)->write_suspended : (g)->read_suspended) michael@0: michael@0: /* Sets max_so_far to MIN(x, max_so_far) */ michael@0: #define CLAMPTO(x) \ michael@0: do { \ michael@0: if (max_so_far > (x)) \ michael@0: max_so_far = (x); \ michael@0: } while (0); michael@0: michael@0: if (!bev->rate_limiting) michael@0: return max_so_far; michael@0: michael@0: /* If rate-limiting is enabled at all, update the appropriate michael@0: bucket, and take the smaller of our rate limit and the group michael@0: rate limit. michael@0: */ michael@0: michael@0: if (bev->rate_limiting->cfg) { michael@0: bufferevent_update_buckets(bev); michael@0: max_so_far = LIM(bev->rate_limiting->limit); michael@0: } michael@0: if (bev->rate_limiting->group) { michael@0: struct bufferevent_rate_limit_group *g = michael@0: bev->rate_limiting->group; michael@0: ev_ssize_t share; michael@0: LOCK_GROUP(g); michael@0: if (GROUP_SUSPENDED(g)) { michael@0: /* We can get here if we failed to lock this michael@0: * particular bufferevent while suspending the whole michael@0: * group. */ michael@0: if (is_write) michael@0: bufferevent_suspend_write(&bev->bev, michael@0: BEV_SUSPEND_BW_GROUP); michael@0: else michael@0: bufferevent_suspend_read(&bev->bev, michael@0: BEV_SUSPEND_BW_GROUP); michael@0: share = 0; michael@0: } else { michael@0: /* XXXX probably we should divide among the active michael@0: * members, not the total members. */ michael@0: share = LIM(g->rate_limit) / g->n_members; michael@0: if (share < g->min_share) michael@0: share = g->min_share; michael@0: } michael@0: UNLOCK_GROUP(g); michael@0: CLAMPTO(share); michael@0: } michael@0: michael@0: if (max_so_far < 0) michael@0: max_so_far = 0; michael@0: return max_so_far; michael@0: } michael@0: michael@0: ev_ssize_t michael@0: _bufferevent_get_read_max(struct bufferevent_private *bev) michael@0: { michael@0: return _bufferevent_get_rlim_max(bev, 0); michael@0: } michael@0: michael@0: ev_ssize_t michael@0: _bufferevent_get_write_max(struct bufferevent_private *bev) michael@0: { michael@0: return _bufferevent_get_rlim_max(bev, 1); michael@0: } michael@0: michael@0: int michael@0: _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t bytes) michael@0: { michael@0: /* XXXXX Make sure all users of this function check its return value */ michael@0: int r = 0; michael@0: /* need to hold lock on bev */ michael@0: if (!bev->rate_limiting) michael@0: return 0; michael@0: michael@0: if (bev->rate_limiting->cfg) { michael@0: bev->rate_limiting->limit.read_limit -= bytes; michael@0: if (bev->rate_limiting->limit.read_limit <= 0) { michael@0: bufferevent_suspend_read(&bev->bev, BEV_SUSPEND_BW); michael@0: if (event_add(&bev->rate_limiting->refill_bucket_event, michael@0: &bev->rate_limiting->cfg->tick_timeout) < 0) michael@0: r = -1; michael@0: } else if (bev->read_suspended & BEV_SUSPEND_BW) { michael@0: if (!(bev->write_suspended & BEV_SUSPEND_BW)) michael@0: event_del(&bev->rate_limiting->refill_bucket_event); michael@0: bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW); michael@0: } michael@0: } michael@0: michael@0: if (bev->rate_limiting->group) { michael@0: LOCK_GROUP(bev->rate_limiting->group); michael@0: bev->rate_limiting->group->rate_limit.read_limit -= bytes; michael@0: bev->rate_limiting->group->total_read += bytes; michael@0: if (bev->rate_limiting->group->rate_limit.read_limit <= 0) { michael@0: _bev_group_suspend_reading(bev->rate_limiting->group); michael@0: } else if (bev->rate_limiting->group->read_suspended) { michael@0: _bev_group_unsuspend_reading(bev->rate_limiting->group); michael@0: } michael@0: UNLOCK_GROUP(bev->rate_limiting->group); michael@0: } michael@0: michael@0: return r; michael@0: } michael@0: michael@0: int michael@0: _bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t bytes) michael@0: { michael@0: /* XXXXX Make sure all users of this function check its return value */ michael@0: int r = 0; michael@0: /* need to hold lock */ michael@0: if (!bev->rate_limiting) michael@0: return 0; michael@0: michael@0: if (bev->rate_limiting->cfg) { michael@0: bev->rate_limiting->limit.write_limit -= bytes; michael@0: if (bev->rate_limiting->limit.write_limit <= 0) { michael@0: bufferevent_suspend_write(&bev->bev, BEV_SUSPEND_BW); michael@0: if (event_add(&bev->rate_limiting->refill_bucket_event, michael@0: &bev->rate_limiting->cfg->tick_timeout) < 0) michael@0: r = -1; michael@0: } else if (bev->write_suspended & BEV_SUSPEND_BW) { michael@0: if (!(bev->read_suspended & BEV_SUSPEND_BW)) michael@0: event_del(&bev->rate_limiting->refill_bucket_event); michael@0: bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW); michael@0: } michael@0: } michael@0: michael@0: if (bev->rate_limiting->group) { michael@0: LOCK_GROUP(bev->rate_limiting->group); michael@0: bev->rate_limiting->group->rate_limit.write_limit -= bytes; michael@0: bev->rate_limiting->group->total_written += bytes; michael@0: if (bev->rate_limiting->group->rate_limit.write_limit <= 0) { michael@0: _bev_group_suspend_writing(bev->rate_limiting->group); michael@0: } else if (bev->rate_limiting->group->write_suspended) { michael@0: _bev_group_unsuspend_writing(bev->rate_limiting->group); michael@0: } michael@0: UNLOCK_GROUP(bev->rate_limiting->group); michael@0: } michael@0: michael@0: return r; michael@0: } michael@0: michael@0: /** Stop reading on every bufferevent in g */ michael@0: static int michael@0: _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g) michael@0: { michael@0: /* Needs group lock */ michael@0: struct bufferevent_private *bev; michael@0: g->read_suspended = 1; michael@0: g->pending_unsuspend_read = 0; michael@0: michael@0: /* Note that in this loop we call EVLOCK_TRY_LOCK instead of BEV_LOCK, michael@0: to prevent a deadlock. (Ordinarily, the group lock nests inside michael@0: the bufferevent locks. If we are unable to lock any individual michael@0: bufferevent, it will find out later when it looks at its limit michael@0: and sees that its group is suspended. michael@0: */ michael@0: TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) { michael@0: if (EVLOCK_TRY_LOCK(bev->lock)) { michael@0: bufferevent_suspend_read(&bev->bev, michael@0: BEV_SUSPEND_BW_GROUP); michael@0: EVLOCK_UNLOCK(bev->lock, 0); michael@0: } michael@0: } michael@0: return 0; michael@0: } michael@0: michael@0: /** Stop writing on every bufferevent in g */ michael@0: static int michael@0: _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g) michael@0: { michael@0: /* Needs group lock */ michael@0: struct bufferevent_private *bev; michael@0: g->write_suspended = 1; michael@0: g->pending_unsuspend_write = 0; michael@0: TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) { michael@0: if (EVLOCK_TRY_LOCK(bev->lock)) { michael@0: bufferevent_suspend_write(&bev->bev, michael@0: BEV_SUSPEND_BW_GROUP); michael@0: EVLOCK_UNLOCK(bev->lock, 0); michael@0: } michael@0: } michael@0: return 0; michael@0: } michael@0: michael@0: /** Timer callback invoked on a single bufferevent with one or more exhausted michael@0: buckets when they are ready to refill. */ michael@0: static void michael@0: _bev_refill_callback(evutil_socket_t fd, short what, void *arg) michael@0: { michael@0: unsigned tick; michael@0: struct timeval now; michael@0: struct bufferevent_private *bev = arg; michael@0: int again = 0; michael@0: BEV_LOCK(&bev->bev); michael@0: if (!bev->rate_limiting || !bev->rate_limiting->cfg) { michael@0: BEV_UNLOCK(&bev->bev); michael@0: return; michael@0: } michael@0: michael@0: /* First, update the bucket */ michael@0: event_base_gettimeofday_cached(bev->bev.ev_base, &now); michael@0: tick = ev_token_bucket_get_tick(&now, michael@0: bev->rate_limiting->cfg); michael@0: ev_token_bucket_update(&bev->rate_limiting->limit, michael@0: bev->rate_limiting->cfg, michael@0: tick); michael@0: michael@0: /* Now unsuspend any read/write operations as appropriate. */ michael@0: if ((bev->read_suspended & BEV_SUSPEND_BW)) { michael@0: if (bev->rate_limiting->limit.read_limit > 0) michael@0: bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW); michael@0: else michael@0: again = 1; michael@0: } michael@0: if ((bev->write_suspended & BEV_SUSPEND_BW)) { michael@0: if (bev->rate_limiting->limit.write_limit > 0) michael@0: bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW); michael@0: else michael@0: again = 1; michael@0: } michael@0: if (again) { michael@0: /* One or more of the buckets may need another refill if they michael@0: started negative. michael@0: michael@0: XXXX if we need to be quiet for more ticks, we should michael@0: maybe figure out what timeout we really want. michael@0: */ michael@0: /* XXXX Handle event_add failure somehow */ michael@0: event_add(&bev->rate_limiting->refill_bucket_event, michael@0: &bev->rate_limiting->cfg->tick_timeout); michael@0: } michael@0: BEV_UNLOCK(&bev->bev); michael@0: } michael@0: michael@0: /** Helper: grab a random element from a bufferevent group. */ michael@0: static struct bufferevent_private * michael@0: _bev_group_random_element(struct bufferevent_rate_limit_group *group) michael@0: { michael@0: int which; michael@0: struct bufferevent_private *bev; michael@0: michael@0: /* requires group lock */ michael@0: michael@0: if (!group->n_members) michael@0: return NULL; michael@0: michael@0: EVUTIL_ASSERT(! TAILQ_EMPTY(&group->members)); michael@0: michael@0: which = _evutil_weakrand() % group->n_members; michael@0: michael@0: bev = TAILQ_FIRST(&group->members); michael@0: while (which--) michael@0: bev = TAILQ_NEXT(bev, rate_limiting->next_in_group); michael@0: michael@0: return bev; michael@0: } michael@0: michael@0: /** Iterate over the elements of a rate-limiting group 'g' with a random michael@0: starting point, assigning each to the variable 'bev', and executing the michael@0: block 'block'. michael@0: michael@0: We do this in a half-baked effort to get fairness among group members. michael@0: XXX Round-robin or some kind of priority queue would be even more fair. michael@0: */ michael@0: #define FOREACH_RANDOM_ORDER(block) \ michael@0: do { \ michael@0: first = _bev_group_random_element(g); \ michael@0: for (bev = first; bev != NULL; \ michael@0: bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \ michael@0: block ; \ michael@0: } \ michael@0: for (bev = TAILQ_FIRST(&g->members); bev && bev != first; \ michael@0: bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \ michael@0: block ; \ michael@0: } \ michael@0: } while (0) michael@0: michael@0: static void michael@0: _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g) michael@0: { michael@0: int again = 0; michael@0: struct bufferevent_private *bev, *first; michael@0: michael@0: g->read_suspended = 0; michael@0: FOREACH_RANDOM_ORDER({ michael@0: if (EVLOCK_TRY_LOCK(bev->lock)) { michael@0: bufferevent_unsuspend_read(&bev->bev, michael@0: BEV_SUSPEND_BW_GROUP); michael@0: EVLOCK_UNLOCK(bev->lock, 0); michael@0: } else { michael@0: again = 1; michael@0: } michael@0: }); michael@0: g->pending_unsuspend_read = again; michael@0: } michael@0: michael@0: static void michael@0: _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g) michael@0: { michael@0: int again = 0; michael@0: struct bufferevent_private *bev, *first; michael@0: g->write_suspended = 0; michael@0: michael@0: FOREACH_RANDOM_ORDER({ michael@0: if (EVLOCK_TRY_LOCK(bev->lock)) { michael@0: bufferevent_unsuspend_write(&bev->bev, michael@0: BEV_SUSPEND_BW_GROUP); michael@0: EVLOCK_UNLOCK(bev->lock, 0); michael@0: } else { michael@0: again = 1; michael@0: } michael@0: }); michael@0: g->pending_unsuspend_write = again; michael@0: } michael@0: michael@0: /** Callback invoked every tick to add more elements to the group bucket michael@0: and unsuspend group members as needed. michael@0: */ michael@0: static void michael@0: _bev_group_refill_callback(evutil_socket_t fd, short what, void *arg) michael@0: { michael@0: struct bufferevent_rate_limit_group *g = arg; michael@0: unsigned tick; michael@0: struct timeval now; michael@0: michael@0: event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now); michael@0: michael@0: LOCK_GROUP(g); michael@0: michael@0: tick = ev_token_bucket_get_tick(&now, &g->rate_limit_cfg); michael@0: ev_token_bucket_update(&g->rate_limit, &g->rate_limit_cfg, tick); michael@0: michael@0: if (g->pending_unsuspend_read || michael@0: (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) { michael@0: _bev_group_unsuspend_reading(g); michael@0: } michael@0: if (g->pending_unsuspend_write || michael@0: (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){ michael@0: _bev_group_unsuspend_writing(g); michael@0: } michael@0: michael@0: /* XXXX Rather than waiting to the next tick to unsuspend stuff michael@0: * with pending_unsuspend_write/read, we should do it on the michael@0: * next iteration of the mainloop. michael@0: */ michael@0: michael@0: UNLOCK_GROUP(g); michael@0: } michael@0: michael@0: int michael@0: bufferevent_set_rate_limit(struct bufferevent *bev, michael@0: struct ev_token_bucket_cfg *cfg) michael@0: { michael@0: struct bufferevent_private *bevp = michael@0: EVUTIL_UPCAST(bev, struct bufferevent_private, bev); michael@0: int r = -1; michael@0: struct bufferevent_rate_limit *rlim; michael@0: struct timeval now; michael@0: ev_uint32_t tick; michael@0: int reinit = 0, suspended = 0; michael@0: /* XXX reference-count cfg */ michael@0: michael@0: BEV_LOCK(bev); michael@0: michael@0: if (cfg == NULL) { michael@0: if (bevp->rate_limiting) { michael@0: rlim = bevp->rate_limiting; michael@0: rlim->cfg = NULL; michael@0: bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW); michael@0: bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW); michael@0: if (event_initialized(&rlim->refill_bucket_event)) michael@0: event_del(&rlim->refill_bucket_event); michael@0: } michael@0: r = 0; michael@0: goto done; michael@0: } michael@0: michael@0: event_base_gettimeofday_cached(bev->ev_base, &now); michael@0: tick = ev_token_bucket_get_tick(&now, cfg); michael@0: michael@0: if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) { michael@0: /* no-op */ michael@0: r = 0; michael@0: goto done; michael@0: } michael@0: if (bevp->rate_limiting == NULL) { michael@0: rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit)); michael@0: if (!rlim) michael@0: goto done; michael@0: bevp->rate_limiting = rlim; michael@0: } else { michael@0: rlim = bevp->rate_limiting; michael@0: } michael@0: reinit = rlim->cfg != NULL; michael@0: michael@0: rlim->cfg = cfg; michael@0: ev_token_bucket_init(&rlim->limit, cfg, tick, reinit); michael@0: michael@0: if (reinit) { michael@0: EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event)); michael@0: event_del(&rlim->refill_bucket_event); michael@0: } michael@0: evtimer_assign(&rlim->refill_bucket_event, bev->ev_base, michael@0: _bev_refill_callback, bevp); michael@0: michael@0: if (rlim->limit.read_limit > 0) { michael@0: bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW); michael@0: } else { michael@0: bufferevent_suspend_read(bev, BEV_SUSPEND_BW); michael@0: suspended=1; michael@0: } michael@0: if (rlim->limit.write_limit > 0) { michael@0: bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW); michael@0: } else { michael@0: bufferevent_suspend_write(bev, BEV_SUSPEND_BW); michael@0: suspended = 1; michael@0: } michael@0: michael@0: if (suspended) michael@0: event_add(&rlim->refill_bucket_event, &cfg->tick_timeout); michael@0: michael@0: r = 0; michael@0: michael@0: done: michael@0: BEV_UNLOCK(bev); michael@0: return r; michael@0: } michael@0: michael@0: struct bufferevent_rate_limit_group * michael@0: bufferevent_rate_limit_group_new(struct event_base *base, michael@0: const struct ev_token_bucket_cfg *cfg) michael@0: { michael@0: struct bufferevent_rate_limit_group *g; michael@0: struct timeval now; michael@0: ev_uint32_t tick; michael@0: michael@0: event_base_gettimeofday_cached(base, &now); michael@0: tick = ev_token_bucket_get_tick(&now, cfg); michael@0: michael@0: g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group)); michael@0: if (!g) michael@0: return NULL; michael@0: memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg)); michael@0: TAILQ_INIT(&g->members); michael@0: michael@0: ev_token_bucket_init(&g->rate_limit, cfg, tick, 0); michael@0: michael@0: event_assign(&g->master_refill_event, base, -1, EV_PERSIST, michael@0: _bev_group_refill_callback, g); michael@0: /*XXXX handle event_add failure */ michael@0: event_add(&g->master_refill_event, &cfg->tick_timeout); michael@0: michael@0: EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE); michael@0: michael@0: bufferevent_rate_limit_group_set_min_share(g, 64); michael@0: michael@0: return g; michael@0: } michael@0: michael@0: int michael@0: bufferevent_rate_limit_group_set_cfg( michael@0: struct bufferevent_rate_limit_group *g, michael@0: const struct ev_token_bucket_cfg *cfg) michael@0: { michael@0: int same_tick; michael@0: if (!g || !cfg) michael@0: return -1; michael@0: michael@0: LOCK_GROUP(g); michael@0: same_tick = evutil_timercmp( michael@0: &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==); michael@0: memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg)); michael@0: michael@0: if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum) michael@0: g->rate_limit.read_limit = cfg->read_maximum; michael@0: if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum) michael@0: g->rate_limit.write_limit = cfg->write_maximum; michael@0: michael@0: if (!same_tick) { michael@0: /* This can cause a hiccup in the schedule */ michael@0: event_add(&g->master_refill_event, &cfg->tick_timeout); michael@0: } michael@0: michael@0: /* The new limits might force us to adjust min_share differently. */ michael@0: bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share); michael@0: michael@0: UNLOCK_GROUP(g); michael@0: return 0; michael@0: } michael@0: michael@0: int michael@0: bufferevent_rate_limit_group_set_min_share( michael@0: struct bufferevent_rate_limit_group *g, michael@0: size_t share) michael@0: { michael@0: if (share > EV_SSIZE_MAX) michael@0: return -1; michael@0: michael@0: g->configured_min_share = share; michael@0: michael@0: /* Can't set share to less than the one-tick maximum. IOW, at steady michael@0: * state, at least one connection can go per tick. */ michael@0: if (share > g->rate_limit_cfg.read_rate) michael@0: share = g->rate_limit_cfg.read_rate; michael@0: if (share > g->rate_limit_cfg.write_rate) michael@0: share = g->rate_limit_cfg.write_rate; michael@0: michael@0: g->min_share = share; michael@0: return 0; michael@0: } michael@0: michael@0: void michael@0: bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g) michael@0: { michael@0: LOCK_GROUP(g); michael@0: EVUTIL_ASSERT(0 == g->n_members); michael@0: event_del(&g->master_refill_event); michael@0: UNLOCK_GROUP(g); michael@0: EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE); michael@0: mm_free(g); michael@0: } michael@0: michael@0: int michael@0: bufferevent_add_to_rate_limit_group(struct bufferevent *bev, michael@0: struct bufferevent_rate_limit_group *g) michael@0: { michael@0: int wsuspend, rsuspend; michael@0: struct bufferevent_private *bevp = michael@0: EVUTIL_UPCAST(bev, struct bufferevent_private, bev); michael@0: BEV_LOCK(bev); michael@0: michael@0: if (!bevp->rate_limiting) { michael@0: struct bufferevent_rate_limit *rlim; michael@0: rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit)); michael@0: if (!rlim) { michael@0: BEV_UNLOCK(bev); michael@0: return -1; michael@0: } michael@0: evtimer_assign(&rlim->refill_bucket_event, bev->ev_base, michael@0: _bev_refill_callback, bevp); michael@0: bevp->rate_limiting = rlim; michael@0: } michael@0: michael@0: if (bevp->rate_limiting->group == g) { michael@0: BEV_UNLOCK(bev); michael@0: return 0; michael@0: } michael@0: if (bevp->rate_limiting->group) michael@0: bufferevent_remove_from_rate_limit_group(bev); michael@0: michael@0: LOCK_GROUP(g); michael@0: bevp->rate_limiting->group = g; michael@0: ++g->n_members; michael@0: TAILQ_INSERT_TAIL(&g->members, bevp, rate_limiting->next_in_group); michael@0: michael@0: rsuspend = g->read_suspended; michael@0: wsuspend = g->write_suspended; michael@0: michael@0: UNLOCK_GROUP(g); michael@0: michael@0: if (rsuspend) michael@0: bufferevent_suspend_read(bev, BEV_SUSPEND_BW_GROUP); michael@0: if (wsuspend) michael@0: bufferevent_suspend_write(bev, BEV_SUSPEND_BW_GROUP); michael@0: michael@0: BEV_UNLOCK(bev); michael@0: return 0; michael@0: } michael@0: michael@0: int michael@0: bufferevent_remove_from_rate_limit_group(struct bufferevent *bev) michael@0: { michael@0: return bufferevent_remove_from_rate_limit_group_internal(bev, 1); michael@0: } michael@0: michael@0: int michael@0: bufferevent_remove_from_rate_limit_group_internal(struct bufferevent *bev, michael@0: int unsuspend) michael@0: { michael@0: struct bufferevent_private *bevp = michael@0: EVUTIL_UPCAST(bev, struct bufferevent_private, bev); michael@0: BEV_LOCK(bev); michael@0: if (bevp->rate_limiting && bevp->rate_limiting->group) { michael@0: struct bufferevent_rate_limit_group *g = michael@0: bevp->rate_limiting->group; michael@0: LOCK_GROUP(g); michael@0: bevp->rate_limiting->group = NULL; michael@0: --g->n_members; michael@0: TAILQ_REMOVE(&g->members, bevp, rate_limiting->next_in_group); michael@0: UNLOCK_GROUP(g); michael@0: } michael@0: if (unsuspend) { michael@0: bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW_GROUP); michael@0: bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW_GROUP); michael@0: } michael@0: BEV_UNLOCK(bev); michael@0: return 0; michael@0: } michael@0: michael@0: /* === michael@0: * API functions to expose rate limits. michael@0: * michael@0: * Don't use these from inside Libevent; they're meant to be for use by michael@0: * the program. michael@0: * === */ michael@0: michael@0: /* Mostly you don't want to use this function from inside libevent; michael@0: * _bufferevent_get_read_max() is more likely what you want*/ michael@0: ev_ssize_t michael@0: bufferevent_get_read_limit(struct bufferevent *bev) michael@0: { michael@0: ev_ssize_t r; michael@0: struct bufferevent_private *bevp; michael@0: BEV_LOCK(bev); michael@0: bevp = BEV_UPCAST(bev); michael@0: if (bevp->rate_limiting && bevp->rate_limiting->cfg) { michael@0: bufferevent_update_buckets(bevp); michael@0: r = bevp->rate_limiting->limit.read_limit; michael@0: } else { michael@0: r = EV_SSIZE_MAX; michael@0: } michael@0: BEV_UNLOCK(bev); michael@0: return r; michael@0: } michael@0: michael@0: /* Mostly you don't want to use this function from inside libevent; michael@0: * _bufferevent_get_write_max() is more likely what you want*/ michael@0: ev_ssize_t michael@0: bufferevent_get_write_limit(struct bufferevent *bev) michael@0: { michael@0: ev_ssize_t r; michael@0: struct bufferevent_private *bevp; michael@0: BEV_LOCK(bev); michael@0: bevp = BEV_UPCAST(bev); michael@0: if (bevp->rate_limiting && bevp->rate_limiting->cfg) { michael@0: bufferevent_update_buckets(bevp); michael@0: r = bevp->rate_limiting->limit.write_limit; michael@0: } else { michael@0: r = EV_SSIZE_MAX; michael@0: } michael@0: BEV_UNLOCK(bev); michael@0: return r; michael@0: } michael@0: michael@0: ev_ssize_t michael@0: bufferevent_get_max_to_read(struct bufferevent *bev) michael@0: { michael@0: ev_ssize_t r; michael@0: BEV_LOCK(bev); michael@0: r = _bufferevent_get_read_max(BEV_UPCAST(bev)); michael@0: BEV_UNLOCK(bev); michael@0: return r; michael@0: } michael@0: michael@0: ev_ssize_t michael@0: bufferevent_get_max_to_write(struct bufferevent *bev) michael@0: { michael@0: ev_ssize_t r; michael@0: BEV_LOCK(bev); michael@0: r = _bufferevent_get_write_max(BEV_UPCAST(bev)); michael@0: BEV_UNLOCK(bev); michael@0: return r; michael@0: } michael@0: michael@0: michael@0: /* Mostly you don't want to use this function from inside libevent; michael@0: * _bufferevent_get_read_max() is more likely what you want*/ michael@0: ev_ssize_t michael@0: bufferevent_rate_limit_group_get_read_limit( michael@0: struct bufferevent_rate_limit_group *grp) michael@0: { michael@0: ev_ssize_t r; michael@0: LOCK_GROUP(grp); michael@0: r = grp->rate_limit.read_limit; michael@0: UNLOCK_GROUP(grp); michael@0: return r; michael@0: } michael@0: michael@0: /* Mostly you don't want to use this function from inside libevent; michael@0: * _bufferevent_get_write_max() is more likely what you want. */ michael@0: ev_ssize_t michael@0: bufferevent_rate_limit_group_get_write_limit( michael@0: struct bufferevent_rate_limit_group *grp) michael@0: { michael@0: ev_ssize_t r; michael@0: LOCK_GROUP(grp); michael@0: r = grp->rate_limit.write_limit; michael@0: UNLOCK_GROUP(grp); michael@0: return r; michael@0: } michael@0: michael@0: int michael@0: bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr) michael@0: { michael@0: int r = 0; michael@0: ev_ssize_t old_limit, new_limit; michael@0: struct bufferevent_private *bevp; michael@0: BEV_LOCK(bev); michael@0: bevp = BEV_UPCAST(bev); michael@0: EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg); michael@0: old_limit = bevp->rate_limiting->limit.read_limit; michael@0: michael@0: new_limit = (bevp->rate_limiting->limit.read_limit -= decr); michael@0: if (old_limit > 0 && new_limit <= 0) { michael@0: bufferevent_suspend_read(bev, BEV_SUSPEND_BW); michael@0: if (event_add(&bevp->rate_limiting->refill_bucket_event, michael@0: &bevp->rate_limiting->cfg->tick_timeout) < 0) michael@0: r = -1; michael@0: } else if (old_limit <= 0 && new_limit > 0) { michael@0: if (!(bevp->write_suspended & BEV_SUSPEND_BW)) michael@0: event_del(&bevp->rate_limiting->refill_bucket_event); michael@0: bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW); michael@0: } michael@0: michael@0: BEV_UNLOCK(bev); michael@0: return r; michael@0: } michael@0: michael@0: int michael@0: bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr) michael@0: { michael@0: /* XXXX this is mostly copy-and-paste from michael@0: * bufferevent_decrement_read_limit */ michael@0: int r = 0; michael@0: ev_ssize_t old_limit, new_limit; michael@0: struct bufferevent_private *bevp; michael@0: BEV_LOCK(bev); michael@0: bevp = BEV_UPCAST(bev); michael@0: EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg); michael@0: old_limit = bevp->rate_limiting->limit.write_limit; michael@0: michael@0: new_limit = (bevp->rate_limiting->limit.write_limit -= decr); michael@0: if (old_limit > 0 && new_limit <= 0) { michael@0: bufferevent_suspend_write(bev, BEV_SUSPEND_BW); michael@0: if (event_add(&bevp->rate_limiting->refill_bucket_event, michael@0: &bevp->rate_limiting->cfg->tick_timeout) < 0) michael@0: r = -1; michael@0: } else if (old_limit <= 0 && new_limit > 0) { michael@0: if (!(bevp->read_suspended & BEV_SUSPEND_BW)) michael@0: event_del(&bevp->rate_limiting->refill_bucket_event); michael@0: bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW); michael@0: } michael@0: michael@0: BEV_UNLOCK(bev); michael@0: return r; michael@0: } michael@0: michael@0: int michael@0: bufferevent_rate_limit_group_decrement_read( michael@0: struct bufferevent_rate_limit_group *grp, ev_ssize_t decr) michael@0: { michael@0: int r = 0; michael@0: ev_ssize_t old_limit, new_limit; michael@0: LOCK_GROUP(grp); michael@0: old_limit = grp->rate_limit.read_limit; michael@0: new_limit = (grp->rate_limit.read_limit -= decr); michael@0: michael@0: if (old_limit > 0 && new_limit <= 0) { michael@0: _bev_group_suspend_reading(grp); michael@0: } else if (old_limit <= 0 && new_limit > 0) { michael@0: _bev_group_unsuspend_reading(grp); michael@0: } michael@0: michael@0: UNLOCK_GROUP(grp); michael@0: return r; michael@0: } michael@0: michael@0: int michael@0: bufferevent_rate_limit_group_decrement_write( michael@0: struct bufferevent_rate_limit_group *grp, ev_ssize_t decr) michael@0: { michael@0: int r = 0; michael@0: ev_ssize_t old_limit, new_limit; michael@0: LOCK_GROUP(grp); michael@0: old_limit = grp->rate_limit.write_limit; michael@0: new_limit = (grp->rate_limit.write_limit -= decr); michael@0: michael@0: if (old_limit > 0 && new_limit <= 0) { michael@0: _bev_group_suspend_writing(grp); michael@0: } else if (old_limit <= 0 && new_limit > 0) { michael@0: _bev_group_unsuspend_writing(grp); michael@0: } michael@0: michael@0: UNLOCK_GROUP(grp); michael@0: return r; michael@0: } michael@0: michael@0: void michael@0: bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp, michael@0: ev_uint64_t *total_read_out, ev_uint64_t *total_written_out) michael@0: { michael@0: EVUTIL_ASSERT(grp != NULL); michael@0: if (total_read_out) michael@0: *total_read_out = grp->total_read; michael@0: if (total_written_out) michael@0: *total_written_out = grp->total_written; michael@0: } michael@0: michael@0: void michael@0: bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp) michael@0: { michael@0: grp->total_read = grp->total_written = 0; michael@0: }