|
1 /* |
|
2 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson |
|
3 * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu> |
|
4 * All rights reserved. |
|
5 * |
|
6 * Redistribution and use in source and binary forms, with or without |
|
7 * modification, are permitted provided that the following conditions |
|
8 * are met: |
|
9 * 1. Redistributions of source code must retain the above copyright |
|
10 * notice, this list of conditions and the following disclaimer. |
|
11 * 2. Redistributions in binary form must reproduce the above copyright |
|
12 * notice, this list of conditions and the following disclaimer in the |
|
13 * documentation and/or other materials provided with the distribution. |
|
14 * 3. The name of the author may not be used to endorse or promote products |
|
15 * derived from this software without specific prior written permission. |
|
16 * |
|
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR |
|
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES |
|
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. |
|
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, |
|
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT |
|
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
|
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
|
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
|
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF |
|
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
|
27 */ |
|
28 |
|
29 #include <sys/types.h> |
|
30 |
|
31 #include "event2/event-config.h" |
|
32 |
|
33 #ifdef _EVENT_HAVE_SYS_TIME_H |
|
34 #include <sys/time.h> |
|
35 #endif |
|
36 |
|
37 #include <errno.h> |
|
38 #include <stdio.h> |
|
39 #include <stdlib.h> |
|
40 #include <string.h> |
|
41 #ifdef _EVENT_HAVE_STDARG_H |
|
42 #include <stdarg.h> |
|
43 #endif |
|
44 #ifdef _EVENT_HAVE_UNISTD_H |
|
45 #include <unistd.h> |
|
46 #endif |
|
47 |
|
48 #ifdef WIN32 |
|
49 #include <winsock2.h> |
|
50 #include <ws2tcpip.h> |
|
51 #endif |
|
52 |
|
53 #ifdef _EVENT_HAVE_SYS_SOCKET_H |
|
54 #include <sys/socket.h> |
|
55 #endif |
|
56 #ifdef _EVENT_HAVE_NETINET_IN_H |
|
57 #include <netinet/in.h> |
|
58 #endif |
|
59 #ifdef _EVENT_HAVE_NETINET_IN6_H |
|
60 #include <netinet/in6.h> |
|
61 #endif |
|
62 |
|
63 #include "event2/util.h" |
|
64 #include "event2/bufferevent.h" |
|
65 #include "event2/buffer.h" |
|
66 #include "event2/bufferevent_struct.h" |
|
67 #include "event2/bufferevent_compat.h" |
|
68 #include "event2/event.h" |
|
69 #include "log-internal.h" |
|
70 #include "mm-internal.h" |
|
71 #include "bufferevent-internal.h" |
|
72 #include "util-internal.h" |
|
73 #ifdef WIN32 |
|
74 #include "iocp-internal.h" |
|
75 #endif |
|
76 |
|
77 /* prototypes */ |
|
78 static int be_socket_enable(struct bufferevent *, short); |
|
79 static int be_socket_disable(struct bufferevent *, short); |
|
80 static void be_socket_destruct(struct bufferevent *); |
|
81 static int be_socket_adj_timeouts(struct bufferevent *); |
|
82 static int be_socket_flush(struct bufferevent *, short, enum bufferevent_flush_mode); |
|
83 static int be_socket_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); |
|
84 |
|
85 static void be_socket_setfd(struct bufferevent *, evutil_socket_t); |
|
86 |
|
87 const struct bufferevent_ops bufferevent_ops_socket = { |
|
88 "socket", |
|
89 evutil_offsetof(struct bufferevent_private, bev), |
|
90 be_socket_enable, |
|
91 be_socket_disable, |
|
92 be_socket_destruct, |
|
93 be_socket_adj_timeouts, |
|
94 be_socket_flush, |
|
95 be_socket_ctrl, |
|
96 }; |
|
97 |
|
98 #define be_socket_add(ev, t) \ |
|
99 _bufferevent_add_event((ev), (t)) |
|
100 |
|
101 static void |
|
102 bufferevent_socket_outbuf_cb(struct evbuffer *buf, |
|
103 const struct evbuffer_cb_info *cbinfo, |
|
104 void *arg) |
|
105 { |
|
106 struct bufferevent *bufev = arg; |
|
107 struct bufferevent_private *bufev_p = |
|
108 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); |
|
109 |
|
110 if (cbinfo->n_added && |
|
111 (bufev->enabled & EV_WRITE) && |
|
112 !event_pending(&bufev->ev_write, EV_WRITE, NULL) && |
|
113 !bufev_p->write_suspended) { |
|
114 /* Somebody added data to the buffer, and we would like to |
|
115 * write, and we were not writing. So, start writing. */ |
|
116 if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) == -1) { |
|
117 /* Should we log this? */ |
|
118 } |
|
119 } |
|
120 } |
|
121 |
|
122 static void |
|
123 bufferevent_readcb(evutil_socket_t fd, short event, void *arg) |
|
124 { |
|
125 struct bufferevent *bufev = arg; |
|
126 struct bufferevent_private *bufev_p = |
|
127 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); |
|
128 struct evbuffer *input; |
|
129 int res = 0; |
|
130 short what = BEV_EVENT_READING; |
|
131 ev_ssize_t howmuch = -1, readmax=-1; |
|
132 |
|
133 _bufferevent_incref_and_lock(bufev); |
|
134 |
|
135 if (event == EV_TIMEOUT) { |
|
136 /* Note that we only check for event==EV_TIMEOUT. If |
|
137 * event==EV_TIMEOUT|EV_READ, we can safely ignore the |
|
138 * timeout, since a read has occurred */ |
|
139 what |= BEV_EVENT_TIMEOUT; |
|
140 goto error; |
|
141 } |
|
142 |
|
143 input = bufev->input; |
|
144 |
|
145 /* |
|
146 * If we have a high watermark configured then we don't want to |
|
147 * read more data than would make us reach the watermark. |
|
148 */ |
|
149 if (bufev->wm_read.high != 0) { |
|
150 howmuch = bufev->wm_read.high - evbuffer_get_length(input); |
|
151 /* we somehow lowered the watermark, stop reading */ |
|
152 if (howmuch <= 0) { |
|
153 bufferevent_wm_suspend_read(bufev); |
|
154 goto done; |
|
155 } |
|
156 } |
|
157 readmax = _bufferevent_get_read_max(bufev_p); |
|
158 if (howmuch < 0 || howmuch > readmax) /* The use of -1 for "unlimited" |
|
159 * uglifies this code. XXXX */ |
|
160 howmuch = readmax; |
|
161 if (bufev_p->read_suspended) |
|
162 goto done; |
|
163 |
|
164 evbuffer_unfreeze(input, 0); |
|
165 res = evbuffer_read(input, fd, (int)howmuch); /* XXXX evbuffer_read would do better to take and return ev_ssize_t */ |
|
166 evbuffer_freeze(input, 0); |
|
167 |
|
168 if (res == -1) { |
|
169 int err = evutil_socket_geterror(fd); |
|
170 if (EVUTIL_ERR_RW_RETRIABLE(err)) |
|
171 goto reschedule; |
|
172 /* error case */ |
|
173 what |= BEV_EVENT_ERROR; |
|
174 } else if (res == 0) { |
|
175 /* eof case */ |
|
176 what |= BEV_EVENT_EOF; |
|
177 } |
|
178 |
|
179 if (res <= 0) |
|
180 goto error; |
|
181 |
|
182 _bufferevent_decrement_read_buckets(bufev_p, res); |
|
183 |
|
184 /* Invoke the user callback - must always be called last */ |
|
185 if (evbuffer_get_length(input) >= bufev->wm_read.low) |
|
186 _bufferevent_run_readcb(bufev); |
|
187 |
|
188 goto done; |
|
189 |
|
190 reschedule: |
|
191 goto done; |
|
192 |
|
193 error: |
|
194 bufferevent_disable(bufev, EV_READ); |
|
195 _bufferevent_run_eventcb(bufev, what); |
|
196 |
|
197 done: |
|
198 _bufferevent_decref_and_unlock(bufev); |
|
199 } |
|
200 |
|
201 static void |
|
202 bufferevent_writecb(evutil_socket_t fd, short event, void *arg) |
|
203 { |
|
204 struct bufferevent *bufev = arg; |
|
205 struct bufferevent_private *bufev_p = |
|
206 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); |
|
207 int res = 0; |
|
208 short what = BEV_EVENT_WRITING; |
|
209 int connected = 0; |
|
210 ev_ssize_t atmost = -1; |
|
211 |
|
212 _bufferevent_incref_and_lock(bufev); |
|
213 |
|
214 if (event == EV_TIMEOUT) { |
|
215 /* Note that we only check for event==EV_TIMEOUT. If |
|
216 * event==EV_TIMEOUT|EV_WRITE, we can safely ignore the |
|
217 * timeout, since a read has occurred */ |
|
218 what |= BEV_EVENT_TIMEOUT; |
|
219 goto error; |
|
220 } |
|
221 if (bufev_p->connecting) { |
|
222 int c = evutil_socket_finished_connecting(fd); |
|
223 /* we need to fake the error if the connection was refused |
|
224 * immediately - usually connection to localhost on BSD */ |
|
225 if (bufev_p->connection_refused) { |
|
226 bufev_p->connection_refused = 0; |
|
227 c = -1; |
|
228 } |
|
229 |
|
230 if (c == 0) |
|
231 goto done; |
|
232 |
|
233 bufev_p->connecting = 0; |
|
234 if (c < 0) { |
|
235 event_del(&bufev->ev_write); |
|
236 event_del(&bufev->ev_read); |
|
237 _bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR); |
|
238 goto done; |
|
239 } else { |
|
240 connected = 1; |
|
241 #ifdef WIN32 |
|
242 if (BEV_IS_ASYNC(bufev)) { |
|
243 event_del(&bufev->ev_write); |
|
244 bufferevent_async_set_connected(bufev); |
|
245 _bufferevent_run_eventcb(bufev, |
|
246 BEV_EVENT_CONNECTED); |
|
247 goto done; |
|
248 } |
|
249 #endif |
|
250 _bufferevent_run_eventcb(bufev, |
|
251 BEV_EVENT_CONNECTED); |
|
252 if (!(bufev->enabled & EV_WRITE) || |
|
253 bufev_p->write_suspended) { |
|
254 event_del(&bufev->ev_write); |
|
255 goto done; |
|
256 } |
|
257 } |
|
258 } |
|
259 |
|
260 atmost = _bufferevent_get_write_max(bufev_p); |
|
261 |
|
262 if (bufev_p->write_suspended) |
|
263 goto done; |
|
264 |
|
265 if (evbuffer_get_length(bufev->output)) { |
|
266 evbuffer_unfreeze(bufev->output, 1); |
|
267 res = evbuffer_write_atmost(bufev->output, fd, atmost); |
|
268 evbuffer_freeze(bufev->output, 1); |
|
269 if (res == -1) { |
|
270 int err = evutil_socket_geterror(fd); |
|
271 if (EVUTIL_ERR_RW_RETRIABLE(err)) |
|
272 goto reschedule; |
|
273 what |= BEV_EVENT_ERROR; |
|
274 } else if (res == 0) { |
|
275 /* eof case |
|
276 XXXX Actually, a 0 on write doesn't indicate |
|
277 an EOF. An ECONNRESET might be more typical. |
|
278 */ |
|
279 what |= BEV_EVENT_EOF; |
|
280 } |
|
281 if (res <= 0) |
|
282 goto error; |
|
283 |
|
284 _bufferevent_decrement_write_buckets(bufev_p, res); |
|
285 } |
|
286 |
|
287 if (evbuffer_get_length(bufev->output) == 0) { |
|
288 event_del(&bufev->ev_write); |
|
289 } |
|
290 |
|
291 /* |
|
292 * Invoke the user callback if our buffer is drained or below the |
|
293 * low watermark. |
|
294 */ |
|
295 if ((res || !connected) && |
|
296 evbuffer_get_length(bufev->output) <= bufev->wm_write.low) { |
|
297 _bufferevent_run_writecb(bufev); |
|
298 } |
|
299 |
|
300 goto done; |
|
301 |
|
302 reschedule: |
|
303 if (evbuffer_get_length(bufev->output) == 0) { |
|
304 event_del(&bufev->ev_write); |
|
305 } |
|
306 goto done; |
|
307 |
|
308 error: |
|
309 bufferevent_disable(bufev, EV_WRITE); |
|
310 _bufferevent_run_eventcb(bufev, what); |
|
311 |
|
312 done: |
|
313 _bufferevent_decref_and_unlock(bufev); |
|
314 } |
|
315 |
|
316 struct bufferevent * |
|
317 bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, |
|
318 int options) |
|
319 { |
|
320 struct bufferevent_private *bufev_p; |
|
321 struct bufferevent *bufev; |
|
322 |
|
323 #ifdef WIN32 |
|
324 if (base && event_base_get_iocp(base)) |
|
325 return bufferevent_async_new(base, fd, options); |
|
326 #endif |
|
327 |
|
328 if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL) |
|
329 return NULL; |
|
330 |
|
331 if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket, |
|
332 options) < 0) { |
|
333 mm_free(bufev_p); |
|
334 return NULL; |
|
335 } |
|
336 bufev = &bufev_p->bev; |
|
337 evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD); |
|
338 |
|
339 event_assign(&bufev->ev_read, bufev->ev_base, fd, |
|
340 EV_READ|EV_PERSIST, bufferevent_readcb, bufev); |
|
341 event_assign(&bufev->ev_write, bufev->ev_base, fd, |
|
342 EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev); |
|
343 |
|
344 evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev); |
|
345 |
|
346 evbuffer_freeze(bufev->input, 0); |
|
347 evbuffer_freeze(bufev->output, 1); |
|
348 |
|
349 return bufev; |
|
350 } |
|
351 |
|
352 int |
|
353 bufferevent_socket_connect(struct bufferevent *bev, |
|
354 struct sockaddr *sa, int socklen) |
|
355 { |
|
356 struct bufferevent_private *bufev_p = |
|
357 EVUTIL_UPCAST(bev, struct bufferevent_private, bev); |
|
358 |
|
359 evutil_socket_t fd; |
|
360 int r = 0; |
|
361 int result=-1; |
|
362 int ownfd = 0; |
|
363 |
|
364 _bufferevent_incref_and_lock(bev); |
|
365 |
|
366 if (!bufev_p) |
|
367 goto done; |
|
368 |
|
369 fd = bufferevent_getfd(bev); |
|
370 if (fd < 0) { |
|
371 if (!sa) |
|
372 goto done; |
|
373 fd = socket(sa->sa_family, SOCK_STREAM, 0); |
|
374 if (fd < 0) |
|
375 goto done; |
|
376 if (evutil_make_socket_nonblocking(fd)<0) |
|
377 goto done; |
|
378 ownfd = 1; |
|
379 } |
|
380 if (sa) { |
|
381 #ifdef WIN32 |
|
382 if (bufferevent_async_can_connect(bev)) { |
|
383 bufferevent_setfd(bev, fd); |
|
384 r = bufferevent_async_connect(bev, fd, sa, socklen); |
|
385 if (r < 0) |
|
386 goto freesock; |
|
387 bufev_p->connecting = 1; |
|
388 result = 0; |
|
389 goto done; |
|
390 } else |
|
391 #endif |
|
392 r = evutil_socket_connect(&fd, sa, socklen); |
|
393 if (r < 0) |
|
394 goto freesock; |
|
395 } |
|
396 #ifdef WIN32 |
|
397 /* ConnectEx() isn't always around, even when IOCP is enabled. |
|
398 * Here, we borrow the socket object's write handler to fall back |
|
399 * on a non-blocking connect() when ConnectEx() is unavailable. */ |
|
400 if (BEV_IS_ASYNC(bev)) { |
|
401 event_assign(&bev->ev_write, bev->ev_base, fd, |
|
402 EV_WRITE|EV_PERSIST, bufferevent_writecb, bev); |
|
403 } |
|
404 #endif |
|
405 bufferevent_setfd(bev, fd); |
|
406 if (r == 0) { |
|
407 if (! be_socket_enable(bev, EV_WRITE)) { |
|
408 bufev_p->connecting = 1; |
|
409 result = 0; |
|
410 goto done; |
|
411 } |
|
412 } else if (r == 1) { |
|
413 /* The connect succeeded already. How very BSD of it. */ |
|
414 result = 0; |
|
415 bufev_p->connecting = 1; |
|
416 event_active(&bev->ev_write, EV_WRITE, 1); |
|
417 } else { |
|
418 /* The connect failed already. How very BSD of it. */ |
|
419 bufev_p->connection_refused = 1; |
|
420 bufev_p->connecting = 1; |
|
421 result = 0; |
|
422 event_active(&bev->ev_write, EV_WRITE, 1); |
|
423 } |
|
424 |
|
425 goto done; |
|
426 |
|
427 freesock: |
|
428 _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR); |
|
429 if (ownfd) |
|
430 evutil_closesocket(fd); |
|
431 /* do something about the error? */ |
|
432 done: |
|
433 _bufferevent_decref_and_unlock(bev); |
|
434 return result; |
|
435 } |
|
436 |
|
437 static void |
|
438 bufferevent_connect_getaddrinfo_cb(int result, struct evutil_addrinfo *ai, |
|
439 void *arg) |
|
440 { |
|
441 struct bufferevent *bev = arg; |
|
442 struct bufferevent_private *bev_p = |
|
443 EVUTIL_UPCAST(bev, struct bufferevent_private, bev); |
|
444 int r; |
|
445 BEV_LOCK(bev); |
|
446 |
|
447 bufferevent_unsuspend_write(bev, BEV_SUSPEND_LOOKUP); |
|
448 bufferevent_unsuspend_read(bev, BEV_SUSPEND_LOOKUP); |
|
449 |
|
450 if (result != 0) { |
|
451 bev_p->dns_error = result; |
|
452 _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR); |
|
453 _bufferevent_decref_and_unlock(bev); |
|
454 if (ai) |
|
455 evutil_freeaddrinfo(ai); |
|
456 return; |
|
457 } |
|
458 |
|
459 /* XXX use the other addrinfos? */ |
|
460 /* XXX use this return value */ |
|
461 r = bufferevent_socket_connect(bev, ai->ai_addr, (int)ai->ai_addrlen); |
|
462 (void)r; |
|
463 _bufferevent_decref_and_unlock(bev); |
|
464 evutil_freeaddrinfo(ai); |
|
465 } |
|
466 |
|
467 int |
|
468 bufferevent_socket_connect_hostname(struct bufferevent *bev, |
|
469 struct evdns_base *evdns_base, int family, const char *hostname, int port) |
|
470 { |
|
471 char portbuf[10]; |
|
472 struct evutil_addrinfo hint; |
|
473 int err; |
|
474 struct bufferevent_private *bev_p = |
|
475 EVUTIL_UPCAST(bev, struct bufferevent_private, bev); |
|
476 |
|
477 if (family != AF_INET && family != AF_INET6 && family != AF_UNSPEC) |
|
478 return -1; |
|
479 if (port < 1 || port > 65535) |
|
480 return -1; |
|
481 |
|
482 BEV_LOCK(bev); |
|
483 bev_p->dns_error = 0; |
|
484 BEV_UNLOCK(bev); |
|
485 |
|
486 evutil_snprintf(portbuf, sizeof(portbuf), "%d", port); |
|
487 |
|
488 memset(&hint, 0, sizeof(hint)); |
|
489 hint.ai_family = family; |
|
490 hint.ai_protocol = IPPROTO_TCP; |
|
491 hint.ai_socktype = SOCK_STREAM; |
|
492 |
|
493 bufferevent_suspend_write(bev, BEV_SUSPEND_LOOKUP); |
|
494 bufferevent_suspend_read(bev, BEV_SUSPEND_LOOKUP); |
|
495 |
|
496 bufferevent_incref(bev); |
|
497 err = evutil_getaddrinfo_async(evdns_base, hostname, portbuf, |
|
498 &hint, bufferevent_connect_getaddrinfo_cb, bev); |
|
499 |
|
500 if (err == 0) { |
|
501 return 0; |
|
502 } else { |
|
503 bufferevent_unsuspend_write(bev, BEV_SUSPEND_LOOKUP); |
|
504 bufferevent_unsuspend_read(bev, BEV_SUSPEND_LOOKUP); |
|
505 return -1; |
|
506 } |
|
507 } |
|
508 |
|
509 int |
|
510 bufferevent_socket_get_dns_error(struct bufferevent *bev) |
|
511 { |
|
512 int rv; |
|
513 struct bufferevent_private *bev_p = |
|
514 EVUTIL_UPCAST(bev, struct bufferevent_private, bev); |
|
515 |
|
516 BEV_LOCK(bev); |
|
517 rv = bev_p->dns_error; |
|
518 BEV_LOCK(bev); |
|
519 |
|
520 return rv; |
|
521 } |
|
522 |
|
523 /* |
|
524 * Create a new buffered event object. |
|
525 * |
|
526 * The read callback is invoked whenever we read new data. |
|
527 * The write callback is invoked whenever the output buffer is drained. |
|
528 * The error callback is invoked on a write/read error or on EOF. |
|
529 * |
|
530 * Both read and write callbacks maybe NULL. The error callback is not |
|
531 * allowed to be NULL and have to be provided always. |
|
532 */ |
|
533 |
|
534 struct bufferevent * |
|
535 bufferevent_new(evutil_socket_t fd, |
|
536 bufferevent_data_cb readcb, bufferevent_data_cb writecb, |
|
537 bufferevent_event_cb eventcb, void *cbarg) |
|
538 { |
|
539 struct bufferevent *bufev; |
|
540 |
|
541 if (!(bufev = bufferevent_socket_new(NULL, fd, 0))) |
|
542 return NULL; |
|
543 |
|
544 bufferevent_setcb(bufev, readcb, writecb, eventcb, cbarg); |
|
545 |
|
546 return bufev; |
|
547 } |
|
548 |
|
549 |
|
550 static int |
|
551 be_socket_enable(struct bufferevent *bufev, short event) |
|
552 { |
|
553 if (event & EV_READ) { |
|
554 if (be_socket_add(&bufev->ev_read,&bufev->timeout_read) == -1) |
|
555 return -1; |
|
556 } |
|
557 if (event & EV_WRITE) { |
|
558 if (be_socket_add(&bufev->ev_write,&bufev->timeout_write) == -1) |
|
559 return -1; |
|
560 } |
|
561 return 0; |
|
562 } |
|
563 |
|
564 static int |
|
565 be_socket_disable(struct bufferevent *bufev, short event) |
|
566 { |
|
567 struct bufferevent_private *bufev_p = |
|
568 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); |
|
569 if (event & EV_READ) { |
|
570 if (event_del(&bufev->ev_read) == -1) |
|
571 return -1; |
|
572 } |
|
573 /* Don't actually disable the write if we are trying to connect. */ |
|
574 if ((event & EV_WRITE) && ! bufev_p->connecting) { |
|
575 if (event_del(&bufev->ev_write) == -1) |
|
576 return -1; |
|
577 } |
|
578 return 0; |
|
579 } |
|
580 |
|
581 static void |
|
582 be_socket_destruct(struct bufferevent *bufev) |
|
583 { |
|
584 struct bufferevent_private *bufev_p = |
|
585 EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); |
|
586 evutil_socket_t fd; |
|
587 EVUTIL_ASSERT(bufev->be_ops == &bufferevent_ops_socket); |
|
588 |
|
589 fd = event_get_fd(&bufev->ev_read); |
|
590 |
|
591 event_del(&bufev->ev_read); |
|
592 event_del(&bufev->ev_write); |
|
593 |
|
594 if ((bufev_p->options & BEV_OPT_CLOSE_ON_FREE) && fd >= 0) |
|
595 EVUTIL_CLOSESOCKET(fd); |
|
596 } |
|
597 |
|
598 static int |
|
599 be_socket_adj_timeouts(struct bufferevent *bufev) |
|
600 { |
|
601 int r = 0; |
|
602 if (event_pending(&bufev->ev_read, EV_READ, NULL)) |
|
603 if (be_socket_add(&bufev->ev_read, &bufev->timeout_read) < 0) |
|
604 r = -1; |
|
605 if (event_pending(&bufev->ev_write, EV_WRITE, NULL)) { |
|
606 if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) < 0) |
|
607 r = -1; |
|
608 } |
|
609 return r; |
|
610 } |
|
611 |
|
612 static int |
|
613 be_socket_flush(struct bufferevent *bev, short iotype, |
|
614 enum bufferevent_flush_mode mode) |
|
615 { |
|
616 return 0; |
|
617 } |
|
618 |
|
619 |
|
620 static void |
|
621 be_socket_setfd(struct bufferevent *bufev, evutil_socket_t fd) |
|
622 { |
|
623 BEV_LOCK(bufev); |
|
624 EVUTIL_ASSERT(bufev->be_ops == &bufferevent_ops_socket); |
|
625 |
|
626 event_del(&bufev->ev_read); |
|
627 event_del(&bufev->ev_write); |
|
628 |
|
629 event_assign(&bufev->ev_read, bufev->ev_base, fd, |
|
630 EV_READ|EV_PERSIST, bufferevent_readcb, bufev); |
|
631 event_assign(&bufev->ev_write, bufev->ev_base, fd, |
|
632 EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev); |
|
633 |
|
634 if (fd >= 0) |
|
635 bufferevent_enable(bufev, bufev->enabled); |
|
636 |
|
637 BEV_UNLOCK(bufev); |
|
638 } |
|
639 |
|
640 /* XXXX Should non-socket bufferevents support this? */ |
|
641 int |
|
642 bufferevent_priority_set(struct bufferevent *bufev, int priority) |
|
643 { |
|
644 int r = -1; |
|
645 |
|
646 BEV_LOCK(bufev); |
|
647 if (bufev->be_ops != &bufferevent_ops_socket) |
|
648 goto done; |
|
649 |
|
650 if (event_priority_set(&bufev->ev_read, priority) == -1) |
|
651 goto done; |
|
652 if (event_priority_set(&bufev->ev_write, priority) == -1) |
|
653 goto done; |
|
654 |
|
655 r = 0; |
|
656 done: |
|
657 BEV_UNLOCK(bufev); |
|
658 return r; |
|
659 } |
|
660 |
|
661 /* XXXX Should non-socket bufferevents support this? */ |
|
662 int |
|
663 bufferevent_base_set(struct event_base *base, struct bufferevent *bufev) |
|
664 { |
|
665 int res = -1; |
|
666 |
|
667 BEV_LOCK(bufev); |
|
668 if (bufev->be_ops != &bufferevent_ops_socket) |
|
669 goto done; |
|
670 |
|
671 bufev->ev_base = base; |
|
672 |
|
673 res = event_base_set(base, &bufev->ev_read); |
|
674 if (res == -1) |
|
675 goto done; |
|
676 |
|
677 res = event_base_set(base, &bufev->ev_write); |
|
678 done: |
|
679 BEV_UNLOCK(bufev); |
|
680 return res; |
|
681 } |
|
682 |
|
683 static int |
|
684 be_socket_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, |
|
685 union bufferevent_ctrl_data *data) |
|
686 { |
|
687 switch (op) { |
|
688 case BEV_CTRL_SET_FD: |
|
689 be_socket_setfd(bev, data->fd); |
|
690 return 0; |
|
691 case BEV_CTRL_GET_FD: |
|
692 data->fd = event_get_fd(&bev->ev_read); |
|
693 return 0; |
|
694 case BEV_CTRL_GET_UNDERLYING: |
|
695 case BEV_CTRL_CANCEL_ALL: |
|
696 default: |
|
697 return -1; |
|
698 } |
|
699 } |