|
1 /* |
|
2 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson |
|
3 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu> |
|
4 * All rights reserved. |
|
5 * |
|
6 * Redistribution and use in source and binary forms, with or without |
|
7 * modification, are permitted provided that the following conditions |
|
8 * are met: |
|
9 * 1. Redistributions of source code must retain the above copyright |
|
10 * notice, this list of conditions and the following disclaimer. |
|
11 * 2. Redistributions in binary form must reproduce the above copyright |
|
12 * notice, this list of conditions and the following disclaimer in the |
|
13 * documentation and/or other materials provided with the distribution. |
|
14 * 3. The name of the author may not be used to endorse or promote products |
|
15 * derived from this software without specific prior written permission. |
|
16 * |
|
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR |
|
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES |
|
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. |
|
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, |
|
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT |
|
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
|
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
|
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
|
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF |
|
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
|
27 */ |
|
28 |
|
29 #include <sys/types.h> |
|
30 #include <limits.h> |
|
31 #include <string.h> |
|
32 #include <stdlib.h> |
|
33 |
|
34 #include "event2/event.h" |
|
35 #include "event2/event_struct.h" |
|
36 #include "event2/util.h" |
|
37 #include "event2/bufferevent.h" |
|
38 #include "event2/bufferevent_struct.h" |
|
39 #include "event2/buffer.h" |
|
40 |
|
41 #include "ratelim-internal.h" |
|
42 |
|
43 #include "bufferevent-internal.h" |
|
44 #include "mm-internal.h" |
|
45 #include "util-internal.h" |
|
46 #include "event-internal.h" |
|
47 |
|
48 int |
|
49 ev_token_bucket_init(struct ev_token_bucket *bucket, |
|
50 const struct ev_token_bucket_cfg *cfg, |
|
51 ev_uint32_t current_tick, |
|
52 int reinitialize) |
|
53 { |
|
54 if (reinitialize) { |
|
55 /* on reinitialization, we only clip downwards, since we've |
|
56 already used who-knows-how-much bandwidth this tick. We |
|
57 leave "last_updated" as it is; the next update will add the |
|
58 appropriate amount of bandwidth to the bucket. |
|
59 */ |
|
60 if (bucket->read_limit > (ev_int64_t) cfg->read_maximum) |
|
61 bucket->read_limit = cfg->read_maximum; |
|
62 if (bucket->write_limit > (ev_int64_t) cfg->write_maximum) |
|
63 bucket->write_limit = cfg->write_maximum; |
|
64 } else { |
|
65 bucket->read_limit = cfg->read_rate; |
|
66 bucket->write_limit = cfg->write_rate; |
|
67 bucket->last_updated = current_tick; |
|
68 } |
|
69 return 0; |
|
70 } |
|
71 |
|
72 int |
|
73 ev_token_bucket_update(struct ev_token_bucket *bucket, |
|
74 const struct ev_token_bucket_cfg *cfg, |
|
75 ev_uint32_t current_tick) |
|
76 { |
|
77 /* It's okay if the tick number overflows, since we'll just |
|
78 * wrap around when we do the unsigned substraction. */ |
|
79 unsigned n_ticks = current_tick - bucket->last_updated; |
|
80 |
|
81 /* Make sure some ticks actually happened, and that time didn't |
|
82 * roll back. */ |
|
83 if (n_ticks == 0 || n_ticks > INT_MAX) |
|
84 return 0; |
|
85 |
|
86 /* Naively, we would say |
|
87 bucket->limit += n_ticks * cfg->rate; |
|
88 |
|
89 if (bucket->limit > cfg->maximum) |
|
90 bucket->limit = cfg->maximum; |
|
91 |
|
92 But we're worried about overflow, so we do it like this: |
|
93 */ |
|
94 |
|
95 if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate) |
|
96 bucket->read_limit = cfg->read_maximum; |
|
97 else |
|
98 bucket->read_limit += n_ticks * cfg->read_rate; |
|
99 |
|
100 |
|
101 if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate) |
|
102 bucket->write_limit = cfg->write_maximum; |
|
103 else |
|
104 bucket->write_limit += n_ticks * cfg->write_rate; |
|
105 |
|
106 |
|
107 bucket->last_updated = current_tick; |
|
108 |
|
109 return 1; |
|
110 } |
|
111 |
|
112 static inline void |
|
113 bufferevent_update_buckets(struct bufferevent_private *bev) |
|
114 { |
|
115 /* Must hold lock on bev. */ |
|
116 struct timeval now; |
|
117 unsigned tick; |
|
118 event_base_gettimeofday_cached(bev->bev.ev_base, &now); |
|
119 tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg); |
|
120 if (tick != bev->rate_limiting->limit.last_updated) |
|
121 ev_token_bucket_update(&bev->rate_limiting->limit, |
|
122 bev->rate_limiting->cfg, tick); |
|
123 } |
|
124 |
|
125 ev_uint32_t |
|
126 ev_token_bucket_get_tick(const struct timeval *tv, |
|
127 const struct ev_token_bucket_cfg *cfg) |
|
128 { |
|
129 /* This computation uses two multiplies and a divide. We could do |
|
130 * fewer if we knew that the tick length was an integer number of |
|
131 * seconds, or if we knew it divided evenly into a second. We should |
|
132 * investigate that more. |
|
133 */ |
|
134 |
|
135 /* We cast to an ev_uint64_t first, since we don't want to overflow |
|
136 * before we do the final divide. */ |
|
137 ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000; |
|
138 return (unsigned)(msec / cfg->msec_per_tick); |
|
139 } |
|
140 |
|
141 struct ev_token_bucket_cfg * |
|
142 ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst, |
|
143 size_t write_rate, size_t write_burst, |
|
144 const struct timeval *tick_len) |
|
145 { |
|
146 struct ev_token_bucket_cfg *r; |
|
147 struct timeval g; |
|
148 if (! tick_len) { |
|
149 g.tv_sec = 1; |
|
150 g.tv_usec = 0; |
|
151 tick_len = &g; |
|
152 } |
|
153 if (read_rate > read_burst || write_rate > write_burst || |
|
154 read_rate < 1 || write_rate < 1) |
|
155 return NULL; |
|
156 if (read_rate > EV_RATE_LIMIT_MAX || |
|
157 write_rate > EV_RATE_LIMIT_MAX || |
|
158 read_burst > EV_RATE_LIMIT_MAX || |
|
159 write_burst > EV_RATE_LIMIT_MAX) |
|
160 return NULL; |
|
161 r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg)); |
|
162 if (!r) |
|
163 return NULL; |
|
164 r->read_rate = read_rate; |
|
165 r->write_rate = write_rate; |
|
166 r->read_maximum = read_burst; |
|
167 r->write_maximum = write_burst; |
|
168 memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval)); |
|
169 r->msec_per_tick = (tick_len->tv_sec * 1000) + |
|
170 (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000; |
|
171 return r; |
|
172 } |
|
173 |
|
174 void |
|
175 ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg) |
|
176 { |
|
177 mm_free(cfg); |
|
178 } |
|
179 |
|
180 /* No matter how big our bucket gets, don't try to read more than this |
|
181 * much in a single read operation. */ |
|
182 #define MAX_TO_READ_EVER 16384 |
|
183 /* No matter how big our bucket gets, don't try to write more than this |
|
184 * much in a single write operation. */ |
|
185 #define MAX_TO_WRITE_EVER 16384 |
|
186 |
|
187 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0) |
|
188 #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0) |
|
189 |
|
190 static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g); |
|
191 static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g); |
|
192 static void _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g); |
|
193 static void _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g); |
|
194 |
|
195 /** Helper: figure out the maximum amount we should write if is_write, or |
|
196 the maximum amount we should read if is_read. Return that maximum, or |
|
197 0 if our bucket is wholly exhausted. |
|
198 */ |
|
199 static inline ev_ssize_t |
|
200 _bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write) |
|
201 { |
|
202 /* needs lock on bev. */ |
|
203 ev_ssize_t max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER; |
|
204 |
|
205 #define LIM(x) \ |
|
206 (is_write ? (x).write_limit : (x).read_limit) |
|
207 |
|
208 #define GROUP_SUSPENDED(g) \ |
|
209 (is_write ? (g)->write_suspended : (g)->read_suspended) |
|
210 |
|
211 /* Sets max_so_far to MIN(x, max_so_far) */ |
|
212 #define CLAMPTO(x) \ |
|
213 do { \ |
|
214 if (max_so_far > (x)) \ |
|
215 max_so_far = (x); \ |
|
216 } while (0); |
|
217 |
|
218 if (!bev->rate_limiting) |
|
219 return max_so_far; |
|
220 |
|
221 /* If rate-limiting is enabled at all, update the appropriate |
|
222 bucket, and take the smaller of our rate limit and the group |
|
223 rate limit. |
|
224 */ |
|
225 |
|
226 if (bev->rate_limiting->cfg) { |
|
227 bufferevent_update_buckets(bev); |
|
228 max_so_far = LIM(bev->rate_limiting->limit); |
|
229 } |
|
230 if (bev->rate_limiting->group) { |
|
231 struct bufferevent_rate_limit_group *g = |
|
232 bev->rate_limiting->group; |
|
233 ev_ssize_t share; |
|
234 LOCK_GROUP(g); |
|
235 if (GROUP_SUSPENDED(g)) { |
|
236 /* We can get here if we failed to lock this |
|
237 * particular bufferevent while suspending the whole |
|
238 * group. */ |
|
239 if (is_write) |
|
240 bufferevent_suspend_write(&bev->bev, |
|
241 BEV_SUSPEND_BW_GROUP); |
|
242 else |
|
243 bufferevent_suspend_read(&bev->bev, |
|
244 BEV_SUSPEND_BW_GROUP); |
|
245 share = 0; |
|
246 } else { |
|
247 /* XXXX probably we should divide among the active |
|
248 * members, not the total members. */ |
|
249 share = LIM(g->rate_limit) / g->n_members; |
|
250 if (share < g->min_share) |
|
251 share = g->min_share; |
|
252 } |
|
253 UNLOCK_GROUP(g); |
|
254 CLAMPTO(share); |
|
255 } |
|
256 |
|
257 if (max_so_far < 0) |
|
258 max_so_far = 0; |
|
259 return max_so_far; |
|
260 } |
|
261 |
|
262 ev_ssize_t |
|
263 _bufferevent_get_read_max(struct bufferevent_private *bev) |
|
264 { |
|
265 return _bufferevent_get_rlim_max(bev, 0); |
|
266 } |
|
267 |
|
268 ev_ssize_t |
|
269 _bufferevent_get_write_max(struct bufferevent_private *bev) |
|
270 { |
|
271 return _bufferevent_get_rlim_max(bev, 1); |
|
272 } |
|
273 |
|
274 int |
|
275 _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t bytes) |
|
276 { |
|
277 /* XXXXX Make sure all users of this function check its return value */ |
|
278 int r = 0; |
|
279 /* need to hold lock on bev */ |
|
280 if (!bev->rate_limiting) |
|
281 return 0; |
|
282 |
|
283 if (bev->rate_limiting->cfg) { |
|
284 bev->rate_limiting->limit.read_limit -= bytes; |
|
285 if (bev->rate_limiting->limit.read_limit <= 0) { |
|
286 bufferevent_suspend_read(&bev->bev, BEV_SUSPEND_BW); |
|
287 if (event_add(&bev->rate_limiting->refill_bucket_event, |
|
288 &bev->rate_limiting->cfg->tick_timeout) < 0) |
|
289 r = -1; |
|
290 } else if (bev->read_suspended & BEV_SUSPEND_BW) { |
|
291 if (!(bev->write_suspended & BEV_SUSPEND_BW)) |
|
292 event_del(&bev->rate_limiting->refill_bucket_event); |
|
293 bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW); |
|
294 } |
|
295 } |
|
296 |
|
297 if (bev->rate_limiting->group) { |
|
298 LOCK_GROUP(bev->rate_limiting->group); |
|
299 bev->rate_limiting->group->rate_limit.read_limit -= bytes; |
|
300 bev->rate_limiting->group->total_read += bytes; |
|
301 if (bev->rate_limiting->group->rate_limit.read_limit <= 0) { |
|
302 _bev_group_suspend_reading(bev->rate_limiting->group); |
|
303 } else if (bev->rate_limiting->group->read_suspended) { |
|
304 _bev_group_unsuspend_reading(bev->rate_limiting->group); |
|
305 } |
|
306 UNLOCK_GROUP(bev->rate_limiting->group); |
|
307 } |
|
308 |
|
309 return r; |
|
310 } |
|
311 |
|
312 int |
|
313 _bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t bytes) |
|
314 { |
|
315 /* XXXXX Make sure all users of this function check its return value */ |
|
316 int r = 0; |
|
317 /* need to hold lock */ |
|
318 if (!bev->rate_limiting) |
|
319 return 0; |
|
320 |
|
321 if (bev->rate_limiting->cfg) { |
|
322 bev->rate_limiting->limit.write_limit -= bytes; |
|
323 if (bev->rate_limiting->limit.write_limit <= 0) { |
|
324 bufferevent_suspend_write(&bev->bev, BEV_SUSPEND_BW); |
|
325 if (event_add(&bev->rate_limiting->refill_bucket_event, |
|
326 &bev->rate_limiting->cfg->tick_timeout) < 0) |
|
327 r = -1; |
|
328 } else if (bev->write_suspended & BEV_SUSPEND_BW) { |
|
329 if (!(bev->read_suspended & BEV_SUSPEND_BW)) |
|
330 event_del(&bev->rate_limiting->refill_bucket_event); |
|
331 bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW); |
|
332 } |
|
333 } |
|
334 |
|
335 if (bev->rate_limiting->group) { |
|
336 LOCK_GROUP(bev->rate_limiting->group); |
|
337 bev->rate_limiting->group->rate_limit.write_limit -= bytes; |
|
338 bev->rate_limiting->group->total_written += bytes; |
|
339 if (bev->rate_limiting->group->rate_limit.write_limit <= 0) { |
|
340 _bev_group_suspend_writing(bev->rate_limiting->group); |
|
341 } else if (bev->rate_limiting->group->write_suspended) { |
|
342 _bev_group_unsuspend_writing(bev->rate_limiting->group); |
|
343 } |
|
344 UNLOCK_GROUP(bev->rate_limiting->group); |
|
345 } |
|
346 |
|
347 return r; |
|
348 } |
|
349 |
|
350 /** Stop reading on every bufferevent in <b>g</b> */ |
|
351 static int |
|
352 _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g) |
|
353 { |
|
354 /* Needs group lock */ |
|
355 struct bufferevent_private *bev; |
|
356 g->read_suspended = 1; |
|
357 g->pending_unsuspend_read = 0; |
|
358 |
|
359 /* Note that in this loop we call EVLOCK_TRY_LOCK instead of BEV_LOCK, |
|
360 to prevent a deadlock. (Ordinarily, the group lock nests inside |
|
361 the bufferevent locks. If we are unable to lock any individual |
|
362 bufferevent, it will find out later when it looks at its limit |
|
363 and sees that its group is suspended. |
|
364 */ |
|
365 TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) { |
|
366 if (EVLOCK_TRY_LOCK(bev->lock)) { |
|
367 bufferevent_suspend_read(&bev->bev, |
|
368 BEV_SUSPEND_BW_GROUP); |
|
369 EVLOCK_UNLOCK(bev->lock, 0); |
|
370 } |
|
371 } |
|
372 return 0; |
|
373 } |
|
374 |
|
375 /** Stop writing on every bufferevent in <b>g</b> */ |
|
376 static int |
|
377 _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g) |
|
378 { |
|
379 /* Needs group lock */ |
|
380 struct bufferevent_private *bev; |
|
381 g->write_suspended = 1; |
|
382 g->pending_unsuspend_write = 0; |
|
383 TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) { |
|
384 if (EVLOCK_TRY_LOCK(bev->lock)) { |
|
385 bufferevent_suspend_write(&bev->bev, |
|
386 BEV_SUSPEND_BW_GROUP); |
|
387 EVLOCK_UNLOCK(bev->lock, 0); |
|
388 } |
|
389 } |
|
390 return 0; |
|
391 } |
|
392 |
|
393 /** Timer callback invoked on a single bufferevent with one or more exhausted |
|
394 buckets when they are ready to refill. */ |
|
395 static void |
|
396 _bev_refill_callback(evutil_socket_t fd, short what, void *arg) |
|
397 { |
|
398 unsigned tick; |
|
399 struct timeval now; |
|
400 struct bufferevent_private *bev = arg; |
|
401 int again = 0; |
|
402 BEV_LOCK(&bev->bev); |
|
403 if (!bev->rate_limiting || !bev->rate_limiting->cfg) { |
|
404 BEV_UNLOCK(&bev->bev); |
|
405 return; |
|
406 } |
|
407 |
|
408 /* First, update the bucket */ |
|
409 event_base_gettimeofday_cached(bev->bev.ev_base, &now); |
|
410 tick = ev_token_bucket_get_tick(&now, |
|
411 bev->rate_limiting->cfg); |
|
412 ev_token_bucket_update(&bev->rate_limiting->limit, |
|
413 bev->rate_limiting->cfg, |
|
414 tick); |
|
415 |
|
416 /* Now unsuspend any read/write operations as appropriate. */ |
|
417 if ((bev->read_suspended & BEV_SUSPEND_BW)) { |
|
418 if (bev->rate_limiting->limit.read_limit > 0) |
|
419 bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW); |
|
420 else |
|
421 again = 1; |
|
422 } |
|
423 if ((bev->write_suspended & BEV_SUSPEND_BW)) { |
|
424 if (bev->rate_limiting->limit.write_limit > 0) |
|
425 bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW); |
|
426 else |
|
427 again = 1; |
|
428 } |
|
429 if (again) { |
|
430 /* One or more of the buckets may need another refill if they |
|
431 started negative. |
|
432 |
|
433 XXXX if we need to be quiet for more ticks, we should |
|
434 maybe figure out what timeout we really want. |
|
435 */ |
|
436 /* XXXX Handle event_add failure somehow */ |
|
437 event_add(&bev->rate_limiting->refill_bucket_event, |
|
438 &bev->rate_limiting->cfg->tick_timeout); |
|
439 } |
|
440 BEV_UNLOCK(&bev->bev); |
|
441 } |
|
442 |
|
443 /** Helper: grab a random element from a bufferevent group. */ |
|
444 static struct bufferevent_private * |
|
445 _bev_group_random_element(struct bufferevent_rate_limit_group *group) |
|
446 { |
|
447 int which; |
|
448 struct bufferevent_private *bev; |
|
449 |
|
450 /* requires group lock */ |
|
451 |
|
452 if (!group->n_members) |
|
453 return NULL; |
|
454 |
|
455 EVUTIL_ASSERT(! TAILQ_EMPTY(&group->members)); |
|
456 |
|
457 which = _evutil_weakrand() % group->n_members; |
|
458 |
|
459 bev = TAILQ_FIRST(&group->members); |
|
460 while (which--) |
|
461 bev = TAILQ_NEXT(bev, rate_limiting->next_in_group); |
|
462 |
|
463 return bev; |
|
464 } |
|
465 |
|
466 /** Iterate over the elements of a rate-limiting group 'g' with a random |
|
467 starting point, assigning each to the variable 'bev', and executing the |
|
468 block 'block'. |
|
469 |
|
470 We do this in a half-baked effort to get fairness among group members. |
|
471 XXX Round-robin or some kind of priority queue would be even more fair. |
|
472 */ |
|
473 #define FOREACH_RANDOM_ORDER(block) \ |
|
474 do { \ |
|
475 first = _bev_group_random_element(g); \ |
|
476 for (bev = first; bev != NULL; \ |
|
477 bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \ |
|
478 block ; \ |
|
479 } \ |
|
480 for (bev = TAILQ_FIRST(&g->members); bev && bev != first; \ |
|
481 bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \ |
|
482 block ; \ |
|
483 } \ |
|
484 } while (0) |
|
485 |
|
486 static void |
|
487 _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g) |
|
488 { |
|
489 int again = 0; |
|
490 struct bufferevent_private *bev, *first; |
|
491 |
|
492 g->read_suspended = 0; |
|
493 FOREACH_RANDOM_ORDER({ |
|
494 if (EVLOCK_TRY_LOCK(bev->lock)) { |
|
495 bufferevent_unsuspend_read(&bev->bev, |
|
496 BEV_SUSPEND_BW_GROUP); |
|
497 EVLOCK_UNLOCK(bev->lock, 0); |
|
498 } else { |
|
499 again = 1; |
|
500 } |
|
501 }); |
|
502 g->pending_unsuspend_read = again; |
|
503 } |
|
504 |
|
505 static void |
|
506 _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g) |
|
507 { |
|
508 int again = 0; |
|
509 struct bufferevent_private *bev, *first; |
|
510 g->write_suspended = 0; |
|
511 |
|
512 FOREACH_RANDOM_ORDER({ |
|
513 if (EVLOCK_TRY_LOCK(bev->lock)) { |
|
514 bufferevent_unsuspend_write(&bev->bev, |
|
515 BEV_SUSPEND_BW_GROUP); |
|
516 EVLOCK_UNLOCK(bev->lock, 0); |
|
517 } else { |
|
518 again = 1; |
|
519 } |
|
520 }); |
|
521 g->pending_unsuspend_write = again; |
|
522 } |
|
523 |
|
524 /** Callback invoked every tick to add more elements to the group bucket |
|
525 and unsuspend group members as needed. |
|
526 */ |
|
527 static void |
|
528 _bev_group_refill_callback(evutil_socket_t fd, short what, void *arg) |
|
529 { |
|
530 struct bufferevent_rate_limit_group *g = arg; |
|
531 unsigned tick; |
|
532 struct timeval now; |
|
533 |
|
534 event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now); |
|
535 |
|
536 LOCK_GROUP(g); |
|
537 |
|
538 tick = ev_token_bucket_get_tick(&now, &g->rate_limit_cfg); |
|
539 ev_token_bucket_update(&g->rate_limit, &g->rate_limit_cfg, tick); |
|
540 |
|
541 if (g->pending_unsuspend_read || |
|
542 (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) { |
|
543 _bev_group_unsuspend_reading(g); |
|
544 } |
|
545 if (g->pending_unsuspend_write || |
|
546 (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){ |
|
547 _bev_group_unsuspend_writing(g); |
|
548 } |
|
549 |
|
550 /* XXXX Rather than waiting to the next tick to unsuspend stuff |
|
551 * with pending_unsuspend_write/read, we should do it on the |
|
552 * next iteration of the mainloop. |
|
553 */ |
|
554 |
|
555 UNLOCK_GROUP(g); |
|
556 } |
|
557 |
|
558 int |
|
559 bufferevent_set_rate_limit(struct bufferevent *bev, |
|
560 struct ev_token_bucket_cfg *cfg) |
|
561 { |
|
562 struct bufferevent_private *bevp = |
|
563 EVUTIL_UPCAST(bev, struct bufferevent_private, bev); |
|
564 int r = -1; |
|
565 struct bufferevent_rate_limit *rlim; |
|
566 struct timeval now; |
|
567 ev_uint32_t tick; |
|
568 int reinit = 0, suspended = 0; |
|
569 /* XXX reference-count cfg */ |
|
570 |
|
571 BEV_LOCK(bev); |
|
572 |
|
573 if (cfg == NULL) { |
|
574 if (bevp->rate_limiting) { |
|
575 rlim = bevp->rate_limiting; |
|
576 rlim->cfg = NULL; |
|
577 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW); |
|
578 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW); |
|
579 if (event_initialized(&rlim->refill_bucket_event)) |
|
580 event_del(&rlim->refill_bucket_event); |
|
581 } |
|
582 r = 0; |
|
583 goto done; |
|
584 } |
|
585 |
|
586 event_base_gettimeofday_cached(bev->ev_base, &now); |
|
587 tick = ev_token_bucket_get_tick(&now, cfg); |
|
588 |
|
589 if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) { |
|
590 /* no-op */ |
|
591 r = 0; |
|
592 goto done; |
|
593 } |
|
594 if (bevp->rate_limiting == NULL) { |
|
595 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit)); |
|
596 if (!rlim) |
|
597 goto done; |
|
598 bevp->rate_limiting = rlim; |
|
599 } else { |
|
600 rlim = bevp->rate_limiting; |
|
601 } |
|
602 reinit = rlim->cfg != NULL; |
|
603 |
|
604 rlim->cfg = cfg; |
|
605 ev_token_bucket_init(&rlim->limit, cfg, tick, reinit); |
|
606 |
|
607 if (reinit) { |
|
608 EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event)); |
|
609 event_del(&rlim->refill_bucket_event); |
|
610 } |
|
611 evtimer_assign(&rlim->refill_bucket_event, bev->ev_base, |
|
612 _bev_refill_callback, bevp); |
|
613 |
|
614 if (rlim->limit.read_limit > 0) { |
|
615 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW); |
|
616 } else { |
|
617 bufferevent_suspend_read(bev, BEV_SUSPEND_BW); |
|
618 suspended=1; |
|
619 } |
|
620 if (rlim->limit.write_limit > 0) { |
|
621 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW); |
|
622 } else { |
|
623 bufferevent_suspend_write(bev, BEV_SUSPEND_BW); |
|
624 suspended = 1; |
|
625 } |
|
626 |
|
627 if (suspended) |
|
628 event_add(&rlim->refill_bucket_event, &cfg->tick_timeout); |
|
629 |
|
630 r = 0; |
|
631 |
|
632 done: |
|
633 BEV_UNLOCK(bev); |
|
634 return r; |
|
635 } |
|
636 |
|
637 struct bufferevent_rate_limit_group * |
|
638 bufferevent_rate_limit_group_new(struct event_base *base, |
|
639 const struct ev_token_bucket_cfg *cfg) |
|
640 { |
|
641 struct bufferevent_rate_limit_group *g; |
|
642 struct timeval now; |
|
643 ev_uint32_t tick; |
|
644 |
|
645 event_base_gettimeofday_cached(base, &now); |
|
646 tick = ev_token_bucket_get_tick(&now, cfg); |
|
647 |
|
648 g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group)); |
|
649 if (!g) |
|
650 return NULL; |
|
651 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg)); |
|
652 TAILQ_INIT(&g->members); |
|
653 |
|
654 ev_token_bucket_init(&g->rate_limit, cfg, tick, 0); |
|
655 |
|
656 event_assign(&g->master_refill_event, base, -1, EV_PERSIST, |
|
657 _bev_group_refill_callback, g); |
|
658 /*XXXX handle event_add failure */ |
|
659 event_add(&g->master_refill_event, &cfg->tick_timeout); |
|
660 |
|
661 EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE); |
|
662 |
|
663 bufferevent_rate_limit_group_set_min_share(g, 64); |
|
664 |
|
665 return g; |
|
666 } |
|
667 |
|
668 int |
|
669 bufferevent_rate_limit_group_set_cfg( |
|
670 struct bufferevent_rate_limit_group *g, |
|
671 const struct ev_token_bucket_cfg *cfg) |
|
672 { |
|
673 int same_tick; |
|
674 if (!g || !cfg) |
|
675 return -1; |
|
676 |
|
677 LOCK_GROUP(g); |
|
678 same_tick = evutil_timercmp( |
|
679 &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==); |
|
680 memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg)); |
|
681 |
|
682 if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum) |
|
683 g->rate_limit.read_limit = cfg->read_maximum; |
|
684 if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum) |
|
685 g->rate_limit.write_limit = cfg->write_maximum; |
|
686 |
|
687 if (!same_tick) { |
|
688 /* This can cause a hiccup in the schedule */ |
|
689 event_add(&g->master_refill_event, &cfg->tick_timeout); |
|
690 } |
|
691 |
|
692 /* The new limits might force us to adjust min_share differently. */ |
|
693 bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share); |
|
694 |
|
695 UNLOCK_GROUP(g); |
|
696 return 0; |
|
697 } |
|
698 |
|
699 int |
|
700 bufferevent_rate_limit_group_set_min_share( |
|
701 struct bufferevent_rate_limit_group *g, |
|
702 size_t share) |
|
703 { |
|
704 if (share > EV_SSIZE_MAX) |
|
705 return -1; |
|
706 |
|
707 g->configured_min_share = share; |
|
708 |
|
709 /* Can't set share to less than the one-tick maximum. IOW, at steady |
|
710 * state, at least one connection can go per tick. */ |
|
711 if (share > g->rate_limit_cfg.read_rate) |
|
712 share = g->rate_limit_cfg.read_rate; |
|
713 if (share > g->rate_limit_cfg.write_rate) |
|
714 share = g->rate_limit_cfg.write_rate; |
|
715 |
|
716 g->min_share = share; |
|
717 return 0; |
|
718 } |
|
719 |
|
720 void |
|
721 bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g) |
|
722 { |
|
723 LOCK_GROUP(g); |
|
724 EVUTIL_ASSERT(0 == g->n_members); |
|
725 event_del(&g->master_refill_event); |
|
726 UNLOCK_GROUP(g); |
|
727 EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE); |
|
728 mm_free(g); |
|
729 } |
|
730 |
|
731 int |
|
732 bufferevent_add_to_rate_limit_group(struct bufferevent *bev, |
|
733 struct bufferevent_rate_limit_group *g) |
|
734 { |
|
735 int wsuspend, rsuspend; |
|
736 struct bufferevent_private *bevp = |
|
737 EVUTIL_UPCAST(bev, struct bufferevent_private, bev); |
|
738 BEV_LOCK(bev); |
|
739 |
|
740 if (!bevp->rate_limiting) { |
|
741 struct bufferevent_rate_limit *rlim; |
|
742 rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit)); |
|
743 if (!rlim) { |
|
744 BEV_UNLOCK(bev); |
|
745 return -1; |
|
746 } |
|
747 evtimer_assign(&rlim->refill_bucket_event, bev->ev_base, |
|
748 _bev_refill_callback, bevp); |
|
749 bevp->rate_limiting = rlim; |
|
750 } |
|
751 |
|
752 if (bevp->rate_limiting->group == g) { |
|
753 BEV_UNLOCK(bev); |
|
754 return 0; |
|
755 } |
|
756 if (bevp->rate_limiting->group) |
|
757 bufferevent_remove_from_rate_limit_group(bev); |
|
758 |
|
759 LOCK_GROUP(g); |
|
760 bevp->rate_limiting->group = g; |
|
761 ++g->n_members; |
|
762 TAILQ_INSERT_TAIL(&g->members, bevp, rate_limiting->next_in_group); |
|
763 |
|
764 rsuspend = g->read_suspended; |
|
765 wsuspend = g->write_suspended; |
|
766 |
|
767 UNLOCK_GROUP(g); |
|
768 |
|
769 if (rsuspend) |
|
770 bufferevent_suspend_read(bev, BEV_SUSPEND_BW_GROUP); |
|
771 if (wsuspend) |
|
772 bufferevent_suspend_write(bev, BEV_SUSPEND_BW_GROUP); |
|
773 |
|
774 BEV_UNLOCK(bev); |
|
775 return 0; |
|
776 } |
|
777 |
|
778 int |
|
779 bufferevent_remove_from_rate_limit_group(struct bufferevent *bev) |
|
780 { |
|
781 return bufferevent_remove_from_rate_limit_group_internal(bev, 1); |
|
782 } |
|
783 |
|
784 int |
|
785 bufferevent_remove_from_rate_limit_group_internal(struct bufferevent *bev, |
|
786 int unsuspend) |
|
787 { |
|
788 struct bufferevent_private *bevp = |
|
789 EVUTIL_UPCAST(bev, struct bufferevent_private, bev); |
|
790 BEV_LOCK(bev); |
|
791 if (bevp->rate_limiting && bevp->rate_limiting->group) { |
|
792 struct bufferevent_rate_limit_group *g = |
|
793 bevp->rate_limiting->group; |
|
794 LOCK_GROUP(g); |
|
795 bevp->rate_limiting->group = NULL; |
|
796 --g->n_members; |
|
797 TAILQ_REMOVE(&g->members, bevp, rate_limiting->next_in_group); |
|
798 UNLOCK_GROUP(g); |
|
799 } |
|
800 if (unsuspend) { |
|
801 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW_GROUP); |
|
802 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW_GROUP); |
|
803 } |
|
804 BEV_UNLOCK(bev); |
|
805 return 0; |
|
806 } |
|
807 |
|
808 /* === |
|
809 * API functions to expose rate limits. |
|
810 * |
|
811 * Don't use these from inside Libevent; they're meant to be for use by |
|
812 * the program. |
|
813 * === */ |
|
814 |
|
815 /* Mostly you don't want to use this function from inside libevent; |
|
816 * _bufferevent_get_read_max() is more likely what you want*/ |
|
817 ev_ssize_t |
|
818 bufferevent_get_read_limit(struct bufferevent *bev) |
|
819 { |
|
820 ev_ssize_t r; |
|
821 struct bufferevent_private *bevp; |
|
822 BEV_LOCK(bev); |
|
823 bevp = BEV_UPCAST(bev); |
|
824 if (bevp->rate_limiting && bevp->rate_limiting->cfg) { |
|
825 bufferevent_update_buckets(bevp); |
|
826 r = bevp->rate_limiting->limit.read_limit; |
|
827 } else { |
|
828 r = EV_SSIZE_MAX; |
|
829 } |
|
830 BEV_UNLOCK(bev); |
|
831 return r; |
|
832 } |
|
833 |
|
834 /* Mostly you don't want to use this function from inside libevent; |
|
835 * _bufferevent_get_write_max() is more likely what you want*/ |
|
836 ev_ssize_t |
|
837 bufferevent_get_write_limit(struct bufferevent *bev) |
|
838 { |
|
839 ev_ssize_t r; |
|
840 struct bufferevent_private *bevp; |
|
841 BEV_LOCK(bev); |
|
842 bevp = BEV_UPCAST(bev); |
|
843 if (bevp->rate_limiting && bevp->rate_limiting->cfg) { |
|
844 bufferevent_update_buckets(bevp); |
|
845 r = bevp->rate_limiting->limit.write_limit; |
|
846 } else { |
|
847 r = EV_SSIZE_MAX; |
|
848 } |
|
849 BEV_UNLOCK(bev); |
|
850 return r; |
|
851 } |
|
852 |
|
853 ev_ssize_t |
|
854 bufferevent_get_max_to_read(struct bufferevent *bev) |
|
855 { |
|
856 ev_ssize_t r; |
|
857 BEV_LOCK(bev); |
|
858 r = _bufferevent_get_read_max(BEV_UPCAST(bev)); |
|
859 BEV_UNLOCK(bev); |
|
860 return r; |
|
861 } |
|
862 |
|
863 ev_ssize_t |
|
864 bufferevent_get_max_to_write(struct bufferevent *bev) |
|
865 { |
|
866 ev_ssize_t r; |
|
867 BEV_LOCK(bev); |
|
868 r = _bufferevent_get_write_max(BEV_UPCAST(bev)); |
|
869 BEV_UNLOCK(bev); |
|
870 return r; |
|
871 } |
|
872 |
|
873 |
|
874 /* Mostly you don't want to use this function from inside libevent; |
|
875 * _bufferevent_get_read_max() is more likely what you want*/ |
|
876 ev_ssize_t |
|
877 bufferevent_rate_limit_group_get_read_limit( |
|
878 struct bufferevent_rate_limit_group *grp) |
|
879 { |
|
880 ev_ssize_t r; |
|
881 LOCK_GROUP(grp); |
|
882 r = grp->rate_limit.read_limit; |
|
883 UNLOCK_GROUP(grp); |
|
884 return r; |
|
885 } |
|
886 |
|
887 /* Mostly you don't want to use this function from inside libevent; |
|
888 * _bufferevent_get_write_max() is more likely what you want. */ |
|
889 ev_ssize_t |
|
890 bufferevent_rate_limit_group_get_write_limit( |
|
891 struct bufferevent_rate_limit_group *grp) |
|
892 { |
|
893 ev_ssize_t r; |
|
894 LOCK_GROUP(grp); |
|
895 r = grp->rate_limit.write_limit; |
|
896 UNLOCK_GROUP(grp); |
|
897 return r; |
|
898 } |
|
899 |
|
900 int |
|
901 bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr) |
|
902 { |
|
903 int r = 0; |
|
904 ev_ssize_t old_limit, new_limit; |
|
905 struct bufferevent_private *bevp; |
|
906 BEV_LOCK(bev); |
|
907 bevp = BEV_UPCAST(bev); |
|
908 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg); |
|
909 old_limit = bevp->rate_limiting->limit.read_limit; |
|
910 |
|
911 new_limit = (bevp->rate_limiting->limit.read_limit -= decr); |
|
912 if (old_limit > 0 && new_limit <= 0) { |
|
913 bufferevent_suspend_read(bev, BEV_SUSPEND_BW); |
|
914 if (event_add(&bevp->rate_limiting->refill_bucket_event, |
|
915 &bevp->rate_limiting->cfg->tick_timeout) < 0) |
|
916 r = -1; |
|
917 } else if (old_limit <= 0 && new_limit > 0) { |
|
918 if (!(bevp->write_suspended & BEV_SUSPEND_BW)) |
|
919 event_del(&bevp->rate_limiting->refill_bucket_event); |
|
920 bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW); |
|
921 } |
|
922 |
|
923 BEV_UNLOCK(bev); |
|
924 return r; |
|
925 } |
|
926 |
|
927 int |
|
928 bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr) |
|
929 { |
|
930 /* XXXX this is mostly copy-and-paste from |
|
931 * bufferevent_decrement_read_limit */ |
|
932 int r = 0; |
|
933 ev_ssize_t old_limit, new_limit; |
|
934 struct bufferevent_private *bevp; |
|
935 BEV_LOCK(bev); |
|
936 bevp = BEV_UPCAST(bev); |
|
937 EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg); |
|
938 old_limit = bevp->rate_limiting->limit.write_limit; |
|
939 |
|
940 new_limit = (bevp->rate_limiting->limit.write_limit -= decr); |
|
941 if (old_limit > 0 && new_limit <= 0) { |
|
942 bufferevent_suspend_write(bev, BEV_SUSPEND_BW); |
|
943 if (event_add(&bevp->rate_limiting->refill_bucket_event, |
|
944 &bevp->rate_limiting->cfg->tick_timeout) < 0) |
|
945 r = -1; |
|
946 } else if (old_limit <= 0 && new_limit > 0) { |
|
947 if (!(bevp->read_suspended & BEV_SUSPEND_BW)) |
|
948 event_del(&bevp->rate_limiting->refill_bucket_event); |
|
949 bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW); |
|
950 } |
|
951 |
|
952 BEV_UNLOCK(bev); |
|
953 return r; |
|
954 } |
|
955 |
|
956 int |
|
957 bufferevent_rate_limit_group_decrement_read( |
|
958 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr) |
|
959 { |
|
960 int r = 0; |
|
961 ev_ssize_t old_limit, new_limit; |
|
962 LOCK_GROUP(grp); |
|
963 old_limit = grp->rate_limit.read_limit; |
|
964 new_limit = (grp->rate_limit.read_limit -= decr); |
|
965 |
|
966 if (old_limit > 0 && new_limit <= 0) { |
|
967 _bev_group_suspend_reading(grp); |
|
968 } else if (old_limit <= 0 && new_limit > 0) { |
|
969 _bev_group_unsuspend_reading(grp); |
|
970 } |
|
971 |
|
972 UNLOCK_GROUP(grp); |
|
973 return r; |
|
974 } |
|
975 |
|
976 int |
|
977 bufferevent_rate_limit_group_decrement_write( |
|
978 struct bufferevent_rate_limit_group *grp, ev_ssize_t decr) |
|
979 { |
|
980 int r = 0; |
|
981 ev_ssize_t old_limit, new_limit; |
|
982 LOCK_GROUP(grp); |
|
983 old_limit = grp->rate_limit.write_limit; |
|
984 new_limit = (grp->rate_limit.write_limit -= decr); |
|
985 |
|
986 if (old_limit > 0 && new_limit <= 0) { |
|
987 _bev_group_suspend_writing(grp); |
|
988 } else if (old_limit <= 0 && new_limit > 0) { |
|
989 _bev_group_unsuspend_writing(grp); |
|
990 } |
|
991 |
|
992 UNLOCK_GROUP(grp); |
|
993 return r; |
|
994 } |
|
995 |
|
996 void |
|
997 bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp, |
|
998 ev_uint64_t *total_read_out, ev_uint64_t *total_written_out) |
|
999 { |
|
1000 EVUTIL_ASSERT(grp != NULL); |
|
1001 if (total_read_out) |
|
1002 *total_read_out = grp->total_read; |
|
1003 if (total_written_out) |
|
1004 *total_written_out = grp->total_written; |
|
1005 } |
|
1006 |
|
1007 void |
|
1008 bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp) |
|
1009 { |
|
1010 grp->total_read = grp->total_written = 0; |
|
1011 } |