|
1 /* |
|
2 * Copyright (c) 2002-2007 Niels Provos <provos@citi.umich.edu> |
|
3 * Copyright (c) 2007-2012 Niels Provos, Nick Mathewson |
|
4 * |
|
5 * Redistribution and use in source and binary forms, with or without |
|
6 * modification, are permitted provided that the following conditions |
|
7 * are met: |
|
8 * 1. Redistributions of source code must retain the above copyright |
|
9 * notice, this list of conditions and the following disclaimer. |
|
10 * 2. Redistributions in binary form must reproduce the above copyright |
|
11 * notice, this list of conditions and the following disclaimer in the |
|
12 * documentation and/or other materials provided with the distribution. |
|
13 * 3. The name of the author may not be used to endorse or promote products |
|
14 * derived from this software without specific prior written permission. |
|
15 * |
|
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR |
|
17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES |
|
18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. |
|
19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, |
|
20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT |
|
21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
|
22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
|
23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
|
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF |
|
25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
|
26 */ |
|
27 |
|
28 #include <sys/types.h> |
|
29 |
|
30 #include "event2/event-config.h" |
|
31 |
|
32 #ifdef _EVENT_HAVE_SYS_TIME_H |
|
33 #include <sys/time.h> |
|
34 #endif |
|
35 |
|
36 #include <errno.h> |
|
37 #include <stdio.h> |
|
38 #include <stdlib.h> |
|
39 #include <string.h> |
|
40 #ifdef _EVENT_HAVE_STDARG_H |
|
41 #include <stdarg.h> |
|
42 #endif |
|
43 |
|
44 #ifdef WIN32 |
|
45 #include <winsock2.h> |
|
46 #endif |
|
47 #include <errno.h> |
|
48 |
|
49 #include "event2/util.h" |
|
50 #include "event2/buffer.h" |
|
51 #include "event2/buffer_compat.h" |
|
52 #include "event2/bufferevent.h" |
|
53 #include "event2/bufferevent_struct.h" |
|
54 #include "event2/bufferevent_compat.h" |
|
55 #include "event2/event.h" |
|
56 #include "log-internal.h" |
|
57 #include "mm-internal.h" |
|
58 #include "bufferevent-internal.h" |
|
59 #include "evbuffer-internal.h" |
|
60 #include "util-internal.h" |
|
61 |
|
62 static void _bufferevent_cancel_all(struct bufferevent *bev); |
|
63 |
|
64 |
|
65 void |
|
66 bufferevent_suspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what) |
|
67 { |
|
68 struct bufferevent_private *bufev_private = |
|
69 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); |
|
70 BEV_LOCK(bufev); |
|
71 if (!bufev_private->read_suspended) |
|
72 bufev->be_ops->disable(bufev, EV_READ); |
|
73 bufev_private->read_suspended |= what; |
|
74 BEV_UNLOCK(bufev); |
|
75 } |
|
76 |
|
77 void |
|
78 bufferevent_unsuspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what) |
|
79 { |
|
80 struct bufferevent_private *bufev_private = |
|
81 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); |
|
82 BEV_LOCK(bufev); |
|
83 bufev_private->read_suspended &= ~what; |
|
84 if (!bufev_private->read_suspended && (bufev->enabled & EV_READ)) |
|
85 bufev->be_ops->enable(bufev, EV_READ); |
|
86 BEV_UNLOCK(bufev); |
|
87 } |
|
88 |
|
89 void |
|
90 bufferevent_suspend_write(struct bufferevent *bufev, bufferevent_suspend_flags what) |
|
91 { |
|
92 struct bufferevent_private *bufev_private = |
|
93 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); |
|
94 BEV_LOCK(bufev); |
|
95 if (!bufev_private->write_suspended) |
|
96 bufev->be_ops->disable(bufev, EV_WRITE); |
|
97 bufev_private->write_suspended |= what; |
|
98 BEV_UNLOCK(bufev); |
|
99 } |
|
100 |
|
101 void |
|
102 bufferevent_unsuspend_write(struct bufferevent *bufev, bufferevent_suspend_flags what) |
|
103 { |
|
104 struct bufferevent_private *bufev_private = |
|
105 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); |
|
106 BEV_LOCK(bufev); |
|
107 bufev_private->write_suspended &= ~what; |
|
108 if (!bufev_private->write_suspended && (bufev->enabled & EV_WRITE)) |
|
109 bufev->be_ops->enable(bufev, EV_WRITE); |
|
110 BEV_UNLOCK(bufev); |
|
111 } |
|
112 |
|
113 |
|
114 /* Callback to implement watermarks on the input buffer. Only enabled |
|
115 * if the watermark is set. */ |
|
116 static void |
|
117 bufferevent_inbuf_wm_cb(struct evbuffer *buf, |
|
118 const struct evbuffer_cb_info *cbinfo, |
|
119 void *arg) |
|
120 { |
|
121 struct bufferevent *bufev = arg; |
|
122 size_t size; |
|
123 |
|
124 size = evbuffer_get_length(buf); |
|
125 |
|
126 if (size >= bufev->wm_read.high) |
|
127 bufferevent_wm_suspend_read(bufev); |
|
128 else |
|
129 bufferevent_wm_unsuspend_read(bufev); |
|
130 } |
|
131 |
|
132 static void |
|
133 bufferevent_run_deferred_callbacks_locked(struct deferred_cb *_, void *arg) |
|
134 { |
|
135 struct bufferevent_private *bufev_private = arg; |
|
136 struct bufferevent *bufev = &bufev_private->bev; |
|
137 |
|
138 BEV_LOCK(bufev); |
|
139 if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) && |
|
140 bufev->errorcb) { |
|
141 /* The "connected" happened before any reads or writes, so |
|
142 send it first. */ |
|
143 bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED; |
|
144 bufev->errorcb(bufev, BEV_EVENT_CONNECTED, bufev->cbarg); |
|
145 } |
|
146 if (bufev_private->readcb_pending && bufev->readcb) { |
|
147 bufev_private->readcb_pending = 0; |
|
148 bufev->readcb(bufev, bufev->cbarg); |
|
149 } |
|
150 if (bufev_private->writecb_pending && bufev->writecb) { |
|
151 bufev_private->writecb_pending = 0; |
|
152 bufev->writecb(bufev, bufev->cbarg); |
|
153 } |
|
154 if (bufev_private->eventcb_pending && bufev->errorcb) { |
|
155 short what = bufev_private->eventcb_pending; |
|
156 int err = bufev_private->errno_pending; |
|
157 bufev_private->eventcb_pending = 0; |
|
158 bufev_private->errno_pending = 0; |
|
159 EVUTIL_SET_SOCKET_ERROR(err); |
|
160 bufev->errorcb(bufev, what, bufev->cbarg); |
|
161 } |
|
162 _bufferevent_decref_and_unlock(bufev); |
|
163 } |
|
164 |
|
165 static void |
|
166 bufferevent_run_deferred_callbacks_unlocked(struct deferred_cb *_, void *arg) |
|
167 { |
|
168 struct bufferevent_private *bufev_private = arg; |
|
169 struct bufferevent *bufev = &bufev_private->bev; |
|
170 |
|
171 BEV_LOCK(bufev); |
|
172 #define UNLOCKED(stmt) \ |
|
173 do { BEV_UNLOCK(bufev); stmt; BEV_LOCK(bufev); } while(0) |
|
174 |
|
175 if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) && |
|
176 bufev->errorcb) { |
|
177 /* The "connected" happened before any reads or writes, so |
|
178 send it first. */ |
|
179 bufferevent_event_cb errorcb = bufev->errorcb; |
|
180 void *cbarg = bufev->cbarg; |
|
181 bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED; |
|
182 UNLOCKED(errorcb(bufev, BEV_EVENT_CONNECTED, cbarg)); |
|
183 } |
|
184 if (bufev_private->readcb_pending && bufev->readcb) { |
|
185 bufferevent_data_cb readcb = bufev->readcb; |
|
186 void *cbarg = bufev->cbarg; |
|
187 bufev_private->readcb_pending = 0; |
|
188 UNLOCKED(readcb(bufev, cbarg)); |
|
189 } |
|
190 if (bufev_private->writecb_pending && bufev->writecb) { |
|
191 bufferevent_data_cb writecb = bufev->writecb; |
|
192 void *cbarg = bufev->cbarg; |
|
193 bufev_private->writecb_pending = 0; |
|
194 UNLOCKED(writecb(bufev, cbarg)); |
|
195 } |
|
196 if (bufev_private->eventcb_pending && bufev->errorcb) { |
|
197 bufferevent_event_cb errorcb = bufev->errorcb; |
|
198 void *cbarg = bufev->cbarg; |
|
199 short what = bufev_private->eventcb_pending; |
|
200 int err = bufev_private->errno_pending; |
|
201 bufev_private->eventcb_pending = 0; |
|
202 bufev_private->errno_pending = 0; |
|
203 EVUTIL_SET_SOCKET_ERROR(err); |
|
204 UNLOCKED(errorcb(bufev,what,cbarg)); |
|
205 } |
|
206 _bufferevent_decref_and_unlock(bufev); |
|
207 #undef UNLOCKED |
|
208 } |
|
209 |
|
210 #define SCHEDULE_DEFERRED(bevp) \ |
|
211 do { \ |
|
212 bufferevent_incref(&(bevp)->bev); \ |
|
213 event_deferred_cb_schedule( \ |
|
214 event_base_get_deferred_cb_queue((bevp)->bev.ev_base), \ |
|
215 &(bevp)->deferred); \ |
|
216 } while (0) |
|
217 |
|
218 |
|
219 void |
|
220 _bufferevent_run_readcb(struct bufferevent *bufev) |
|
221 { |
|
222 /* Requires that we hold the lock and a reference */ |
|
223 struct bufferevent_private *p = |
|
224 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); |
|
225 if (bufev->readcb == NULL) |
|
226 return; |
|
227 if (p->options & BEV_OPT_DEFER_CALLBACKS) { |
|
228 p->readcb_pending = 1; |
|
229 if (!p->deferred.queued) |
|
230 SCHEDULE_DEFERRED(p); |
|
231 } else { |
|
232 bufev->readcb(bufev, bufev->cbarg); |
|
233 } |
|
234 } |
|
235 |
|
236 void |
|
237 _bufferevent_run_writecb(struct bufferevent *bufev) |
|
238 { |
|
239 /* Requires that we hold the lock and a reference */ |
|
240 struct bufferevent_private *p = |
|
241 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); |
|
242 if (bufev->writecb == NULL) |
|
243 return; |
|
244 if (p->options & BEV_OPT_DEFER_CALLBACKS) { |
|
245 p->writecb_pending = 1; |
|
246 if (!p->deferred.queued) |
|
247 SCHEDULE_DEFERRED(p); |
|
248 } else { |
|
249 bufev->writecb(bufev, bufev->cbarg); |
|
250 } |
|
251 } |
|
252 |
|
253 void |
|
254 _bufferevent_run_eventcb(struct bufferevent *bufev, short what) |
|
255 { |
|
256 /* Requires that we hold the lock and a reference */ |
|
257 struct bufferevent_private *p = |
|
258 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); |
|
259 if (bufev->errorcb == NULL) |
|
260 return; |
|
261 if (p->options & BEV_OPT_DEFER_CALLBACKS) { |
|
262 p->eventcb_pending |= what; |
|
263 p->errno_pending = EVUTIL_SOCKET_ERROR(); |
|
264 if (!p->deferred.queued) |
|
265 SCHEDULE_DEFERRED(p); |
|
266 } else { |
|
267 bufev->errorcb(bufev, what, bufev->cbarg); |
|
268 } |
|
269 } |
|
270 |
|
271 int |
|
272 bufferevent_init_common(struct bufferevent_private *bufev_private, |
|
273 struct event_base *base, |
|
274 const struct bufferevent_ops *ops, |
|
275 enum bufferevent_options options) |
|
276 { |
|
277 struct bufferevent *bufev = &bufev_private->bev; |
|
278 |
|
279 if (!bufev->input) { |
|
280 if ((bufev->input = evbuffer_new()) == NULL) |
|
281 return -1; |
|
282 } |
|
283 |
|
284 if (!bufev->output) { |
|
285 if ((bufev->output = evbuffer_new()) == NULL) { |
|
286 evbuffer_free(bufev->input); |
|
287 return -1; |
|
288 } |
|
289 } |
|
290 |
|
291 bufev_private->refcnt = 1; |
|
292 bufev->ev_base = base; |
|
293 |
|
294 /* Disable timeouts. */ |
|
295 evutil_timerclear(&bufev->timeout_read); |
|
296 evutil_timerclear(&bufev->timeout_write); |
|
297 |
|
298 bufev->be_ops = ops; |
|
299 |
|
300 /* |
|
301 * Set to EV_WRITE so that using bufferevent_write is going to |
|
302 * trigger a callback. Reading needs to be explicitly enabled |
|
303 * because otherwise no data will be available. |
|
304 */ |
|
305 bufev->enabled = EV_WRITE; |
|
306 |
|
307 #ifndef _EVENT_DISABLE_THREAD_SUPPORT |
|
308 if (options & BEV_OPT_THREADSAFE) { |
|
309 if (bufferevent_enable_locking(bufev, NULL) < 0) { |
|
310 /* cleanup */ |
|
311 evbuffer_free(bufev->input); |
|
312 evbuffer_free(bufev->output); |
|
313 bufev->input = NULL; |
|
314 bufev->output = NULL; |
|
315 return -1; |
|
316 } |
|
317 } |
|
318 #endif |
|
319 if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS)) |
|
320 == BEV_OPT_UNLOCK_CALLBACKS) { |
|
321 event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS"); |
|
322 return -1; |
|
323 } |
|
324 if (options & BEV_OPT_DEFER_CALLBACKS) { |
|
325 if (options & BEV_OPT_UNLOCK_CALLBACKS) |
|
326 event_deferred_cb_init(&bufev_private->deferred, |
|
327 bufferevent_run_deferred_callbacks_unlocked, |
|
328 bufev_private); |
|
329 else |
|
330 event_deferred_cb_init(&bufev_private->deferred, |
|
331 bufferevent_run_deferred_callbacks_locked, |
|
332 bufev_private); |
|
333 } |
|
334 |
|
335 bufev_private->options = options; |
|
336 |
|
337 evbuffer_set_parent(bufev->input, bufev); |
|
338 evbuffer_set_parent(bufev->output, bufev); |
|
339 |
|
340 return 0; |
|
341 } |
|
342 |
|
343 void |
|
344 bufferevent_setcb(struct bufferevent *bufev, |
|
345 bufferevent_data_cb readcb, bufferevent_data_cb writecb, |
|
346 bufferevent_event_cb eventcb, void *cbarg) |
|
347 { |
|
348 BEV_LOCK(bufev); |
|
349 |
|
350 bufev->readcb = readcb; |
|
351 bufev->writecb = writecb; |
|
352 bufev->errorcb = eventcb; |
|
353 |
|
354 bufev->cbarg = cbarg; |
|
355 BEV_UNLOCK(bufev); |
|
356 } |
|
357 |
|
358 struct evbuffer * |
|
359 bufferevent_get_input(struct bufferevent *bufev) |
|
360 { |
|
361 return bufev->input; |
|
362 } |
|
363 |
|
364 struct evbuffer * |
|
365 bufferevent_get_output(struct bufferevent *bufev) |
|
366 { |
|
367 return bufev->output; |
|
368 } |
|
369 |
|
370 struct event_base * |
|
371 bufferevent_get_base(struct bufferevent *bufev) |
|
372 { |
|
373 return bufev->ev_base; |
|
374 } |
|
375 |
|
376 int |
|
377 bufferevent_write(struct bufferevent *bufev, const void *data, size_t size) |
|
378 { |
|
379 if (evbuffer_add(bufev->output, data, size) == -1) |
|
380 return (-1); |
|
381 |
|
382 return 0; |
|
383 } |
|
384 |
|
385 int |
|
386 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf) |
|
387 { |
|
388 if (evbuffer_add_buffer(bufev->output, buf) == -1) |
|
389 return (-1); |
|
390 |
|
391 return 0; |
|
392 } |
|
393 |
|
394 size_t |
|
395 bufferevent_read(struct bufferevent *bufev, void *data, size_t size) |
|
396 { |
|
397 return (evbuffer_remove(bufev->input, data, size)); |
|
398 } |
|
399 |
|
400 int |
|
401 bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf) |
|
402 { |
|
403 return (evbuffer_add_buffer(buf, bufev->input)); |
|
404 } |
|
405 |
|
406 int |
|
407 bufferevent_enable(struct bufferevent *bufev, short event) |
|
408 { |
|
409 struct bufferevent_private *bufev_private = |
|
410 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); |
|
411 short impl_events = event; |
|
412 int r = 0; |
|
413 |
|
414 _bufferevent_incref_and_lock(bufev); |
|
415 if (bufev_private->read_suspended) |
|
416 impl_events &= ~EV_READ; |
|
417 if (bufev_private->write_suspended) |
|
418 impl_events &= ~EV_WRITE; |
|
419 |
|
420 bufev->enabled |= event; |
|
421 |
|
422 if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0) |
|
423 r = -1; |
|
424 |
|
425 _bufferevent_decref_and_unlock(bufev); |
|
426 return r; |
|
427 } |
|
428 |
|
429 int |
|
430 bufferevent_set_timeouts(struct bufferevent *bufev, |
|
431 const struct timeval *tv_read, |
|
432 const struct timeval *tv_write) |
|
433 { |
|
434 int r = 0; |
|
435 BEV_LOCK(bufev); |
|
436 if (tv_read) { |
|
437 bufev->timeout_read = *tv_read; |
|
438 } else { |
|
439 evutil_timerclear(&bufev->timeout_read); |
|
440 } |
|
441 if (tv_write) { |
|
442 bufev->timeout_write = *tv_write; |
|
443 } else { |
|
444 evutil_timerclear(&bufev->timeout_write); |
|
445 } |
|
446 |
|
447 if (bufev->be_ops->adj_timeouts) |
|
448 r = bufev->be_ops->adj_timeouts(bufev); |
|
449 BEV_UNLOCK(bufev); |
|
450 |
|
451 return r; |
|
452 } |
|
453 |
|
454 |
|
455 /* Obsolete; use bufferevent_set_timeouts */ |
|
456 void |
|
457 bufferevent_settimeout(struct bufferevent *bufev, |
|
458 int timeout_read, int timeout_write) |
|
459 { |
|
460 struct timeval tv_read, tv_write; |
|
461 struct timeval *ptv_read = NULL, *ptv_write = NULL; |
|
462 |
|
463 memset(&tv_read, 0, sizeof(tv_read)); |
|
464 memset(&tv_write, 0, sizeof(tv_write)); |
|
465 |
|
466 if (timeout_read) { |
|
467 tv_read.tv_sec = timeout_read; |
|
468 ptv_read = &tv_read; |
|
469 } |
|
470 if (timeout_write) { |
|
471 tv_write.tv_sec = timeout_write; |
|
472 ptv_write = &tv_write; |
|
473 } |
|
474 |
|
475 bufferevent_set_timeouts(bufev, ptv_read, ptv_write); |
|
476 } |
|
477 |
|
478 |
|
479 int |
|
480 bufferevent_disable_hard(struct bufferevent *bufev, short event) |
|
481 { |
|
482 int r = 0; |
|
483 struct bufferevent_private *bufev_private = |
|
484 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); |
|
485 |
|
486 BEV_LOCK(bufev); |
|
487 bufev->enabled &= ~event; |
|
488 |
|
489 bufev_private->connecting = 0; |
|
490 if (bufev->be_ops->disable(bufev, event) < 0) |
|
491 r = -1; |
|
492 |
|
493 BEV_UNLOCK(bufev); |
|
494 return r; |
|
495 } |
|
496 |
|
497 int |
|
498 bufferevent_disable(struct bufferevent *bufev, short event) |
|
499 { |
|
500 int r = 0; |
|
501 |
|
502 BEV_LOCK(bufev); |
|
503 bufev->enabled &= ~event; |
|
504 |
|
505 if (bufev->be_ops->disable(bufev, event) < 0) |
|
506 r = -1; |
|
507 |
|
508 BEV_UNLOCK(bufev); |
|
509 return r; |
|
510 } |
|
511 |
|
512 /* |
|
513 * Sets the water marks |
|
514 */ |
|
515 |
|
516 void |
|
517 bufferevent_setwatermark(struct bufferevent *bufev, short events, |
|
518 size_t lowmark, size_t highmark) |
|
519 { |
|
520 struct bufferevent_private *bufev_private = |
|
521 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); |
|
522 |
|
523 BEV_LOCK(bufev); |
|
524 if (events & EV_WRITE) { |
|
525 bufev->wm_write.low = lowmark; |
|
526 bufev->wm_write.high = highmark; |
|
527 } |
|
528 |
|
529 if (events & EV_READ) { |
|
530 bufev->wm_read.low = lowmark; |
|
531 bufev->wm_read.high = highmark; |
|
532 |
|
533 if (highmark) { |
|
534 /* There is now a new high-water mark for read. |
|
535 enable the callback if needed, and see if we should |
|
536 suspend/bufferevent_wm_unsuspend. */ |
|
537 |
|
538 if (bufev_private->read_watermarks_cb == NULL) { |
|
539 bufev_private->read_watermarks_cb = |
|
540 evbuffer_add_cb(bufev->input, |
|
541 bufferevent_inbuf_wm_cb, |
|
542 bufev); |
|
543 } |
|
544 evbuffer_cb_set_flags(bufev->input, |
|
545 bufev_private->read_watermarks_cb, |
|
546 EVBUFFER_CB_ENABLED|EVBUFFER_CB_NODEFER); |
|
547 |
|
548 if (evbuffer_get_length(bufev->input) > highmark) |
|
549 bufferevent_wm_suspend_read(bufev); |
|
550 else if (evbuffer_get_length(bufev->input) < highmark) |
|
551 bufferevent_wm_unsuspend_read(bufev); |
|
552 } else { |
|
553 /* There is now no high-water mark for read. */ |
|
554 if (bufev_private->read_watermarks_cb) |
|
555 evbuffer_cb_clear_flags(bufev->input, |
|
556 bufev_private->read_watermarks_cb, |
|
557 EVBUFFER_CB_ENABLED); |
|
558 bufferevent_wm_unsuspend_read(bufev); |
|
559 } |
|
560 } |
|
561 BEV_UNLOCK(bufev); |
|
562 } |
|
563 |
|
564 int |
|
565 bufferevent_flush(struct bufferevent *bufev, |
|
566 short iotype, |
|
567 enum bufferevent_flush_mode mode) |
|
568 { |
|
569 int r = -1; |
|
570 BEV_LOCK(bufev); |
|
571 if (bufev->be_ops->flush) |
|
572 r = bufev->be_ops->flush(bufev, iotype, mode); |
|
573 BEV_UNLOCK(bufev); |
|
574 return r; |
|
575 } |
|
576 |
|
577 void |
|
578 _bufferevent_incref_and_lock(struct bufferevent *bufev) |
|
579 { |
|
580 struct bufferevent_private *bufev_private = |
|
581 BEV_UPCAST(bufev); |
|
582 BEV_LOCK(bufev); |
|
583 ++bufev_private->refcnt; |
|
584 } |
|
585 |
|
586 #if 0 |
|
587 static void |
|
588 _bufferevent_transfer_lock_ownership(struct bufferevent *donor, |
|
589 struct bufferevent *recipient) |
|
590 { |
|
591 struct bufferevent_private *d = BEV_UPCAST(donor); |
|
592 struct bufferevent_private *r = BEV_UPCAST(recipient); |
|
593 if (d->lock != r->lock) |
|
594 return; |
|
595 if (r->own_lock) |
|
596 return; |
|
597 if (d->own_lock) { |
|
598 d->own_lock = 0; |
|
599 r->own_lock = 1; |
|
600 } |
|
601 } |
|
602 #endif |
|
603 |
|
604 int |
|
605 _bufferevent_decref_and_unlock(struct bufferevent *bufev) |
|
606 { |
|
607 struct bufferevent_private *bufev_private = |
|
608 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); |
|
609 struct bufferevent *underlying; |
|
610 |
|
611 EVUTIL_ASSERT(bufev_private->refcnt > 0); |
|
612 |
|
613 if (--bufev_private->refcnt) { |
|
614 BEV_UNLOCK(bufev); |
|
615 return 0; |
|
616 } |
|
617 |
|
618 underlying = bufferevent_get_underlying(bufev); |
|
619 |
|
620 /* Clean up the shared info */ |
|
621 if (bufev->be_ops->destruct) |
|
622 bufev->be_ops->destruct(bufev); |
|
623 |
|
624 /* XXX what happens if refcnt for these buffers is > 1? |
|
625 * The buffers can share a lock with this bufferevent object, |
|
626 * but the lock might be destroyed below. */ |
|
627 /* evbuffer will free the callbacks */ |
|
628 evbuffer_free(bufev->input); |
|
629 evbuffer_free(bufev->output); |
|
630 |
|
631 if (bufev_private->rate_limiting) { |
|
632 if (bufev_private->rate_limiting->group) |
|
633 bufferevent_remove_from_rate_limit_group_internal(bufev,0); |
|
634 if (event_initialized(&bufev_private->rate_limiting->refill_bucket_event)) |
|
635 event_del(&bufev_private->rate_limiting->refill_bucket_event); |
|
636 event_debug_unassign(&bufev_private->rate_limiting->refill_bucket_event); |
|
637 mm_free(bufev_private->rate_limiting); |
|
638 bufev_private->rate_limiting = NULL; |
|
639 } |
|
640 |
|
641 event_debug_unassign(&bufev->ev_read); |
|
642 event_debug_unassign(&bufev->ev_write); |
|
643 |
|
644 BEV_UNLOCK(bufev); |
|
645 if (bufev_private->own_lock) |
|
646 EVTHREAD_FREE_LOCK(bufev_private->lock, |
|
647 EVTHREAD_LOCKTYPE_RECURSIVE); |
|
648 |
|
649 /* Free the actual allocated memory. */ |
|
650 mm_free(((char*)bufev) - bufev->be_ops->mem_offset); |
|
651 |
|
652 /* Release the reference to underlying now that we no longer need the |
|
653 * reference to it. We wait this long mainly in case our lock is |
|
654 * shared with underlying. |
|
655 * |
|
656 * The 'destruct' function will also drop a reference to underlying |
|
657 * if BEV_OPT_CLOSE_ON_FREE is set. |
|
658 * |
|
659 * XXX Should we/can we just refcount evbuffer/bufferevent locks? |
|
660 * It would probably save us some headaches. |
|
661 */ |
|
662 if (underlying) |
|
663 bufferevent_decref(underlying); |
|
664 |
|
665 return 1; |
|
666 } |
|
667 |
|
668 int |
|
669 bufferevent_decref(struct bufferevent *bufev) |
|
670 { |
|
671 BEV_LOCK(bufev); |
|
672 return _bufferevent_decref_and_unlock(bufev); |
|
673 } |
|
674 |
|
675 void |
|
676 bufferevent_free(struct bufferevent *bufev) |
|
677 { |
|
678 BEV_LOCK(bufev); |
|
679 bufferevent_setcb(bufev, NULL, NULL, NULL, NULL); |
|
680 _bufferevent_cancel_all(bufev); |
|
681 _bufferevent_decref_and_unlock(bufev); |
|
682 } |
|
683 |
|
684 void |
|
685 bufferevent_incref(struct bufferevent *bufev) |
|
686 { |
|
687 struct bufferevent_private *bufev_private = |
|
688 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); |
|
689 |
|
690 BEV_LOCK(bufev); |
|
691 ++bufev_private->refcnt; |
|
692 BEV_UNLOCK(bufev); |
|
693 } |
|
694 |
|
695 int |
|
696 bufferevent_enable_locking(struct bufferevent *bufev, void *lock) |
|
697 { |
|
698 #ifdef _EVENT_DISABLE_THREAD_SUPPORT |
|
699 return -1; |
|
700 #else |
|
701 struct bufferevent *underlying; |
|
702 |
|
703 if (BEV_UPCAST(bufev)->lock) |
|
704 return -1; |
|
705 underlying = bufferevent_get_underlying(bufev); |
|
706 |
|
707 if (!lock && underlying && BEV_UPCAST(underlying)->lock) { |
|
708 lock = BEV_UPCAST(underlying)->lock; |
|
709 BEV_UPCAST(bufev)->lock = lock; |
|
710 BEV_UPCAST(bufev)->own_lock = 0; |
|
711 } else if (!lock) { |
|
712 EVTHREAD_ALLOC_LOCK(lock, EVTHREAD_LOCKTYPE_RECURSIVE); |
|
713 if (!lock) |
|
714 return -1; |
|
715 BEV_UPCAST(bufev)->lock = lock; |
|
716 BEV_UPCAST(bufev)->own_lock = 1; |
|
717 } else { |
|
718 BEV_UPCAST(bufev)->lock = lock; |
|
719 BEV_UPCAST(bufev)->own_lock = 0; |
|
720 } |
|
721 evbuffer_enable_locking(bufev->input, lock); |
|
722 evbuffer_enable_locking(bufev->output, lock); |
|
723 |
|
724 if (underlying && !BEV_UPCAST(underlying)->lock) |
|
725 bufferevent_enable_locking(underlying, lock); |
|
726 |
|
727 return 0; |
|
728 #endif |
|
729 } |
|
730 |
|
731 int |
|
732 bufferevent_setfd(struct bufferevent *bev, evutil_socket_t fd) |
|
733 { |
|
734 union bufferevent_ctrl_data d; |
|
735 int res = -1; |
|
736 d.fd = fd; |
|
737 BEV_LOCK(bev); |
|
738 if (bev->be_ops->ctrl) |
|
739 res = bev->be_ops->ctrl(bev, BEV_CTRL_SET_FD, &d); |
|
740 BEV_UNLOCK(bev); |
|
741 return res; |
|
742 } |
|
743 |
|
744 evutil_socket_t |
|
745 bufferevent_getfd(struct bufferevent *bev) |
|
746 { |
|
747 union bufferevent_ctrl_data d; |
|
748 int res = -1; |
|
749 d.fd = -1; |
|
750 BEV_LOCK(bev); |
|
751 if (bev->be_ops->ctrl) |
|
752 res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_FD, &d); |
|
753 BEV_UNLOCK(bev); |
|
754 return (res<0) ? -1 : d.fd; |
|
755 } |
|
756 |
|
757 static void |
|
758 _bufferevent_cancel_all(struct bufferevent *bev) |
|
759 { |
|
760 union bufferevent_ctrl_data d; |
|
761 memset(&d, 0, sizeof(d)); |
|
762 BEV_LOCK(bev); |
|
763 if (bev->be_ops->ctrl) |
|
764 bev->be_ops->ctrl(bev, BEV_CTRL_CANCEL_ALL, &d); |
|
765 BEV_UNLOCK(bev); |
|
766 } |
|
767 |
|
768 short |
|
769 bufferevent_get_enabled(struct bufferevent *bufev) |
|
770 { |
|
771 short r; |
|
772 BEV_LOCK(bufev); |
|
773 r = bufev->enabled; |
|
774 BEV_UNLOCK(bufev); |
|
775 return r; |
|
776 } |
|
777 |
|
778 struct bufferevent * |
|
779 bufferevent_get_underlying(struct bufferevent *bev) |
|
780 { |
|
781 union bufferevent_ctrl_data d; |
|
782 int res = -1; |
|
783 d.ptr = NULL; |
|
784 BEV_LOCK(bev); |
|
785 if (bev->be_ops->ctrl) |
|
786 res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_UNDERLYING, &d); |
|
787 BEV_UNLOCK(bev); |
|
788 return (res<0) ? NULL : d.ptr; |
|
789 } |
|
790 |
|
791 static void |
|
792 bufferevent_generic_read_timeout_cb(evutil_socket_t fd, short event, void *ctx) |
|
793 { |
|
794 struct bufferevent *bev = ctx; |
|
795 _bufferevent_incref_and_lock(bev); |
|
796 bufferevent_disable(bev, EV_READ); |
|
797 _bufferevent_run_eventcb(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING); |
|
798 _bufferevent_decref_and_unlock(bev); |
|
799 } |
|
800 static void |
|
801 bufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx) |
|
802 { |
|
803 struct bufferevent *bev = ctx; |
|
804 _bufferevent_incref_and_lock(bev); |
|
805 bufferevent_disable(bev, EV_WRITE); |
|
806 _bufferevent_run_eventcb(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING); |
|
807 _bufferevent_decref_and_unlock(bev); |
|
808 } |
|
809 |
|
810 void |
|
811 _bufferevent_init_generic_timeout_cbs(struct bufferevent *bev) |
|
812 { |
|
813 evtimer_assign(&bev->ev_read, bev->ev_base, |
|
814 bufferevent_generic_read_timeout_cb, bev); |
|
815 evtimer_assign(&bev->ev_write, bev->ev_base, |
|
816 bufferevent_generic_write_timeout_cb, bev); |
|
817 } |
|
818 |
|
819 int |
|
820 _bufferevent_del_generic_timeout_cbs(struct bufferevent *bev) |
|
821 { |
|
822 int r1,r2; |
|
823 r1 = event_del(&bev->ev_read); |
|
824 r2 = event_del(&bev->ev_write); |
|
825 if (r1<0 || r2<0) |
|
826 return -1; |
|
827 return 0; |
|
828 } |
|
829 |
|
830 int |
|
831 _bufferevent_generic_adj_timeouts(struct bufferevent *bev) |
|
832 { |
|
833 const short enabled = bev->enabled; |
|
834 struct bufferevent_private *bev_p = |
|
835 EVUTIL_UPCAST(bev, struct bufferevent_private, bev); |
|
836 int r1=0, r2=0; |
|
837 if ((enabled & EV_READ) && !bev_p->read_suspended && |
|
838 evutil_timerisset(&bev->timeout_read)) |
|
839 r1 = event_add(&bev->ev_read, &bev->timeout_read); |
|
840 else |
|
841 r1 = event_del(&bev->ev_read); |
|
842 |
|
843 if ((enabled & EV_WRITE) && !bev_p->write_suspended && |
|
844 evutil_timerisset(&bev->timeout_write) && |
|
845 evbuffer_get_length(bev->output)) |
|
846 r2 = event_add(&bev->ev_write, &bev->timeout_write); |
|
847 else |
|
848 r2 = event_del(&bev->ev_write); |
|
849 if (r1 < 0 || r2 < 0) |
|
850 return -1; |
|
851 return 0; |
|
852 } |
|
853 |
|
854 int |
|
855 _bufferevent_add_event(struct event *ev, const struct timeval *tv) |
|
856 { |
|
857 if (tv->tv_sec == 0 && tv->tv_usec == 0) |
|
858 return event_add(ev, NULL); |
|
859 else |
|
860 return event_add(ev, tv); |
|
861 } |
|
862 |
|
863 /* For use by user programs only; internally, we should be calling |
|
864 either _bufferevent_incref_and_lock(), or BEV_LOCK. */ |
|
865 void |
|
866 bufferevent_lock(struct bufferevent *bev) |
|
867 { |
|
868 _bufferevent_incref_and_lock(bev); |
|
869 } |
|
870 |
|
871 void |
|
872 bufferevent_unlock(struct bufferevent *bev) |
|
873 { |
|
874 _bufferevent_decref_and_unlock(bev); |
|
875 } |