|
1 /* |
|
2 * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson |
|
3 * |
|
4 * All rights reserved. |
|
5 * |
|
6 * Redistribution and use in source and binary forms, with or without |
|
7 * modification, are permitted provided that the following conditions |
|
8 * are met: |
|
9 * 1. Redistributions of source code must retain the above copyright |
|
10 * notice, this list of conditions and the following disclaimer. |
|
11 * 2. Redistributions in binary form must reproduce the above copyright |
|
12 * notice, this list of conditions and the following disclaimer in the |
|
13 * documentation and/or other materials provided with the distribution. |
|
14 * 3. The name of the author may not be used to endorse or promote products |
|
15 * derived from this software without specific prior written permission. |
|
16 * |
|
17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR |
|
18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES |
|
19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. |
|
20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, |
|
21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT |
|
22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
|
23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
|
24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
|
25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF |
|
26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
|
27 */ |
|
28 |
|
29 #include "event2/event-config.h" |
|
30 |
|
31 #ifdef _EVENT_HAVE_SYS_TIME_H |
|
32 #include <sys/time.h> |
|
33 #endif |
|
34 |
|
35 #include <errno.h> |
|
36 #include <stdio.h> |
|
37 #include <stdlib.h> |
|
38 #include <string.h> |
|
39 #ifdef _EVENT_HAVE_STDARG_H |
|
40 #include <stdarg.h> |
|
41 #endif |
|
42 #ifdef _EVENT_HAVE_UNISTD_H |
|
43 #include <unistd.h> |
|
44 #endif |
|
45 |
|
46 #ifdef WIN32 |
|
47 #include <winsock2.h> |
|
48 #include <ws2tcpip.h> |
|
49 #endif |
|
50 |
|
51 #include <sys/queue.h> |
|
52 |
|
53 #include "event2/util.h" |
|
54 #include "event2/bufferevent.h" |
|
55 #include "event2/buffer.h" |
|
56 #include "event2/bufferevent_struct.h" |
|
57 #include "event2/event.h" |
|
58 #include "event2/util.h" |
|
59 #include "event-internal.h" |
|
60 #include "log-internal.h" |
|
61 #include "mm-internal.h" |
|
62 #include "bufferevent-internal.h" |
|
63 #include "util-internal.h" |
|
64 #include "iocp-internal.h" |
|
65 |
|
66 #ifndef SO_UPDATE_CONNECT_CONTEXT |
|
67 /* Mingw is sometimes missing this */ |
|
68 #define SO_UPDATE_CONNECT_CONTEXT 0x7010 |
|
69 #endif |
|
70 |
|
71 /* prototypes */ |
|
72 static int be_async_enable(struct bufferevent *, short); |
|
73 static int be_async_disable(struct bufferevent *, short); |
|
74 static void be_async_destruct(struct bufferevent *); |
|
75 static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode); |
|
76 static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); |
|
77 |
|
78 struct bufferevent_async { |
|
79 struct bufferevent_private bev; |
|
80 struct event_overlapped connect_overlapped; |
|
81 struct event_overlapped read_overlapped; |
|
82 struct event_overlapped write_overlapped; |
|
83 size_t read_in_progress; |
|
84 size_t write_in_progress; |
|
85 unsigned ok : 1; |
|
86 unsigned read_added : 1; |
|
87 unsigned write_added : 1; |
|
88 }; |
|
89 |
|
90 const struct bufferevent_ops bufferevent_ops_async = { |
|
91 "socket_async", |
|
92 evutil_offsetof(struct bufferevent_async, bev.bev), |
|
93 be_async_enable, |
|
94 be_async_disable, |
|
95 be_async_destruct, |
|
96 _bufferevent_generic_adj_timeouts, |
|
97 be_async_flush, |
|
98 be_async_ctrl, |
|
99 }; |
|
100 |
|
101 static inline struct bufferevent_async * |
|
102 upcast(struct bufferevent *bev) |
|
103 { |
|
104 struct bufferevent_async *bev_a; |
|
105 if (bev->be_ops != &bufferevent_ops_async) |
|
106 return NULL; |
|
107 bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev); |
|
108 return bev_a; |
|
109 } |
|
110 |
|
111 static inline struct bufferevent_async * |
|
112 upcast_connect(struct event_overlapped *eo) |
|
113 { |
|
114 struct bufferevent_async *bev_a; |
|
115 bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped); |
|
116 EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); |
|
117 return bev_a; |
|
118 } |
|
119 |
|
120 static inline struct bufferevent_async * |
|
121 upcast_read(struct event_overlapped *eo) |
|
122 { |
|
123 struct bufferevent_async *bev_a; |
|
124 bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped); |
|
125 EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); |
|
126 return bev_a; |
|
127 } |
|
128 |
|
129 static inline struct bufferevent_async * |
|
130 upcast_write(struct event_overlapped *eo) |
|
131 { |
|
132 struct bufferevent_async *bev_a; |
|
133 bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped); |
|
134 EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); |
|
135 return bev_a; |
|
136 } |
|
137 |
|
138 static void |
|
139 bev_async_del_write(struct bufferevent_async *beva) |
|
140 { |
|
141 struct bufferevent *bev = &beva->bev.bev; |
|
142 |
|
143 if (beva->write_added) { |
|
144 beva->write_added = 0; |
|
145 event_base_del_virtual(bev->ev_base); |
|
146 } |
|
147 } |
|
148 |
|
149 static void |
|
150 bev_async_del_read(struct bufferevent_async *beva) |
|
151 { |
|
152 struct bufferevent *bev = &beva->bev.bev; |
|
153 |
|
154 if (beva->read_added) { |
|
155 beva->read_added = 0; |
|
156 event_base_del_virtual(bev->ev_base); |
|
157 } |
|
158 } |
|
159 |
|
160 static void |
|
161 bev_async_add_write(struct bufferevent_async *beva) |
|
162 { |
|
163 struct bufferevent *bev = &beva->bev.bev; |
|
164 |
|
165 if (!beva->write_added) { |
|
166 beva->write_added = 1; |
|
167 event_base_add_virtual(bev->ev_base); |
|
168 } |
|
169 } |
|
170 |
|
171 static void |
|
172 bev_async_add_read(struct bufferevent_async *beva) |
|
173 { |
|
174 struct bufferevent *bev = &beva->bev.bev; |
|
175 |
|
176 if (!beva->read_added) { |
|
177 beva->read_added = 1; |
|
178 event_base_add_virtual(bev->ev_base); |
|
179 } |
|
180 } |
|
181 |
|
182 static void |
|
183 bev_async_consider_writing(struct bufferevent_async *beva) |
|
184 { |
|
185 size_t at_most; |
|
186 int limit; |
|
187 struct bufferevent *bev = &beva->bev.bev; |
|
188 |
|
189 /* Don't write if there's a write in progress, or we do not |
|
190 * want to write, or when there's nothing left to write. */ |
|
191 if (beva->write_in_progress || beva->bev.connecting) |
|
192 return; |
|
193 if (!beva->ok || !(bev->enabled&EV_WRITE) || |
|
194 !evbuffer_get_length(bev->output)) { |
|
195 bev_async_del_write(beva); |
|
196 return; |
|
197 } |
|
198 |
|
199 at_most = evbuffer_get_length(bev->output); |
|
200 |
|
201 /* This is safe so long as bufferevent_get_write_max never returns |
|
202 * more than INT_MAX. That's true for now. XXXX */ |
|
203 limit = (int)_bufferevent_get_write_max(&beva->bev); |
|
204 if (at_most >= (size_t)limit && limit >= 0) |
|
205 at_most = limit; |
|
206 |
|
207 if (beva->bev.write_suspended) { |
|
208 bev_async_del_write(beva); |
|
209 return; |
|
210 } |
|
211 |
|
212 /* XXXX doesn't respect low-water mark very well. */ |
|
213 bufferevent_incref(bev); |
|
214 if (evbuffer_launch_write(bev->output, at_most, |
|
215 &beva->write_overlapped)) { |
|
216 bufferevent_decref(bev); |
|
217 beva->ok = 0; |
|
218 _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR); |
|
219 } else { |
|
220 beva->write_in_progress = at_most; |
|
221 _bufferevent_decrement_write_buckets(&beva->bev, at_most); |
|
222 bev_async_add_write(beva); |
|
223 } |
|
224 } |
|
225 |
|
226 static void |
|
227 bev_async_consider_reading(struct bufferevent_async *beva) |
|
228 { |
|
229 size_t cur_size; |
|
230 size_t read_high; |
|
231 size_t at_most; |
|
232 int limit; |
|
233 struct bufferevent *bev = &beva->bev.bev; |
|
234 |
|
235 /* Don't read if there is a read in progress, or we do not |
|
236 * want to read. */ |
|
237 if (beva->read_in_progress || beva->bev.connecting) |
|
238 return; |
|
239 if (!beva->ok || !(bev->enabled&EV_READ)) { |
|
240 bev_async_del_read(beva); |
|
241 return; |
|
242 } |
|
243 |
|
244 /* Don't read if we're full */ |
|
245 cur_size = evbuffer_get_length(bev->input); |
|
246 read_high = bev->wm_read.high; |
|
247 if (read_high) { |
|
248 if (cur_size >= read_high) { |
|
249 bev_async_del_read(beva); |
|
250 return; |
|
251 } |
|
252 at_most = read_high - cur_size; |
|
253 } else { |
|
254 at_most = 16384; /* FIXME totally magic. */ |
|
255 } |
|
256 |
|
257 /* XXXX This over-commits. */ |
|
258 /* XXXX see also not above on cast on _bufferevent_get_write_max() */ |
|
259 limit = (int)_bufferevent_get_read_max(&beva->bev); |
|
260 if (at_most >= (size_t)limit && limit >= 0) |
|
261 at_most = limit; |
|
262 |
|
263 if (beva->bev.read_suspended) { |
|
264 bev_async_del_read(beva); |
|
265 return; |
|
266 } |
|
267 |
|
268 bufferevent_incref(bev); |
|
269 if (evbuffer_launch_read(bev->input, at_most, &beva->read_overlapped)) { |
|
270 beva->ok = 0; |
|
271 _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR); |
|
272 bufferevent_decref(bev); |
|
273 } else { |
|
274 beva->read_in_progress = at_most; |
|
275 _bufferevent_decrement_read_buckets(&beva->bev, at_most); |
|
276 bev_async_add_read(beva); |
|
277 } |
|
278 |
|
279 return; |
|
280 } |
|
281 |
|
282 static void |
|
283 be_async_outbuf_callback(struct evbuffer *buf, |
|
284 const struct evbuffer_cb_info *cbinfo, |
|
285 void *arg) |
|
286 { |
|
287 struct bufferevent *bev = arg; |
|
288 struct bufferevent_async *bev_async = upcast(bev); |
|
289 |
|
290 /* If we added data to the outbuf and were not writing before, |
|
291 * we may want to write now. */ |
|
292 |
|
293 _bufferevent_incref_and_lock(bev); |
|
294 |
|
295 if (cbinfo->n_added) |
|
296 bev_async_consider_writing(bev_async); |
|
297 |
|
298 _bufferevent_decref_and_unlock(bev); |
|
299 } |
|
300 |
|
301 static void |
|
302 be_async_inbuf_callback(struct evbuffer *buf, |
|
303 const struct evbuffer_cb_info *cbinfo, |
|
304 void *arg) |
|
305 { |
|
306 struct bufferevent *bev = arg; |
|
307 struct bufferevent_async *bev_async = upcast(bev); |
|
308 |
|
309 /* If we drained data from the inbuf and were not reading before, |
|
310 * we may want to read now */ |
|
311 |
|
312 _bufferevent_incref_and_lock(bev); |
|
313 |
|
314 if (cbinfo->n_deleted) |
|
315 bev_async_consider_reading(bev_async); |
|
316 |
|
317 _bufferevent_decref_and_unlock(bev); |
|
318 } |
|
319 |
|
320 static int |
|
321 be_async_enable(struct bufferevent *buf, short what) |
|
322 { |
|
323 struct bufferevent_async *bev_async = upcast(buf); |
|
324 |
|
325 if (!bev_async->ok) |
|
326 return -1; |
|
327 |
|
328 if (bev_async->bev.connecting) { |
|
329 /* Don't launch anything during connection attempts. */ |
|
330 return 0; |
|
331 } |
|
332 |
|
333 if (what & EV_READ) |
|
334 BEV_RESET_GENERIC_READ_TIMEOUT(buf); |
|
335 if (what & EV_WRITE) |
|
336 BEV_RESET_GENERIC_WRITE_TIMEOUT(buf); |
|
337 |
|
338 /* If we newly enable reading or writing, and we aren't reading or |
|
339 writing already, consider launching a new read or write. */ |
|
340 |
|
341 if (what & EV_READ) |
|
342 bev_async_consider_reading(bev_async); |
|
343 if (what & EV_WRITE) |
|
344 bev_async_consider_writing(bev_async); |
|
345 return 0; |
|
346 } |
|
347 |
|
348 static int |
|
349 be_async_disable(struct bufferevent *bev, short what) |
|
350 { |
|
351 struct bufferevent_async *bev_async = upcast(bev); |
|
352 /* XXXX If we disable reading or writing, we may want to consider |
|
353 * canceling any in-progress read or write operation, though it might |
|
354 * not work. */ |
|
355 |
|
356 if (what & EV_READ) { |
|
357 BEV_DEL_GENERIC_READ_TIMEOUT(bev); |
|
358 bev_async_del_read(bev_async); |
|
359 } |
|
360 if (what & EV_WRITE) { |
|
361 BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); |
|
362 bev_async_del_write(bev_async); |
|
363 } |
|
364 |
|
365 return 0; |
|
366 } |
|
367 |
|
368 static void |
|
369 be_async_destruct(struct bufferevent *bev) |
|
370 { |
|
371 struct bufferevent_async *bev_async = upcast(bev); |
|
372 struct bufferevent_private *bev_p = BEV_UPCAST(bev); |
|
373 evutil_socket_t fd; |
|
374 |
|
375 EVUTIL_ASSERT(!upcast(bev)->write_in_progress && |
|
376 !upcast(bev)->read_in_progress); |
|
377 |
|
378 bev_async_del_read(bev_async); |
|
379 bev_async_del_write(bev_async); |
|
380 |
|
381 fd = _evbuffer_overlapped_get_fd(bev->input); |
|
382 if (bev_p->options & BEV_OPT_CLOSE_ON_FREE) { |
|
383 /* XXXX possible double-close */ |
|
384 evutil_closesocket(fd); |
|
385 } |
|
386 /* delete this in case non-blocking connect was used */ |
|
387 if (event_initialized(&bev->ev_write)) { |
|
388 event_del(&bev->ev_write); |
|
389 _bufferevent_del_generic_timeout_cbs(bev); |
|
390 } |
|
391 } |
|
392 |
|
393 /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so |
|
394 * we use WSAGetOverlappedResult to translate. */ |
|
395 static void |
|
396 bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo) |
|
397 { |
|
398 DWORD bytes, flags; |
|
399 evutil_socket_t fd; |
|
400 |
|
401 fd = _evbuffer_overlapped_get_fd(bev->input); |
|
402 WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags); |
|
403 } |
|
404 |
|
405 static int |
|
406 be_async_flush(struct bufferevent *bev, short what, |
|
407 enum bufferevent_flush_mode mode) |
|
408 { |
|
409 return 0; |
|
410 } |
|
411 |
|
412 static void |
|
413 connect_complete(struct event_overlapped *eo, ev_uintptr_t key, |
|
414 ev_ssize_t nbytes, int ok) |
|
415 { |
|
416 struct bufferevent_async *bev_a = upcast_connect(eo); |
|
417 struct bufferevent *bev = &bev_a->bev.bev; |
|
418 evutil_socket_t sock; |
|
419 |
|
420 BEV_LOCK(bev); |
|
421 |
|
422 EVUTIL_ASSERT(bev_a->bev.connecting); |
|
423 bev_a->bev.connecting = 0; |
|
424 sock = _evbuffer_overlapped_get_fd(bev_a->bev.bev.input); |
|
425 /* XXXX Handle error? */ |
|
426 setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0); |
|
427 |
|
428 if (ok) |
|
429 bufferevent_async_set_connected(bev); |
|
430 else |
|
431 bev_async_set_wsa_error(bev, eo); |
|
432 |
|
433 _bufferevent_run_eventcb(bev, |
|
434 ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR); |
|
435 |
|
436 event_base_del_virtual(bev->ev_base); |
|
437 |
|
438 _bufferevent_decref_and_unlock(bev); |
|
439 } |
|
440 |
|
441 static void |
|
442 read_complete(struct event_overlapped *eo, ev_uintptr_t key, |
|
443 ev_ssize_t nbytes, int ok) |
|
444 { |
|
445 struct bufferevent_async *bev_a = upcast_read(eo); |
|
446 struct bufferevent *bev = &bev_a->bev.bev; |
|
447 short what = BEV_EVENT_READING; |
|
448 ev_ssize_t amount_unread; |
|
449 BEV_LOCK(bev); |
|
450 EVUTIL_ASSERT(bev_a->read_in_progress); |
|
451 |
|
452 amount_unread = bev_a->read_in_progress - nbytes; |
|
453 evbuffer_commit_read(bev->input, nbytes); |
|
454 bev_a->read_in_progress = 0; |
|
455 if (amount_unread) |
|
456 _bufferevent_decrement_read_buckets(&bev_a->bev, -amount_unread); |
|
457 |
|
458 if (!ok) |
|
459 bev_async_set_wsa_error(bev, eo); |
|
460 |
|
461 if (bev_a->ok) { |
|
462 if (ok && nbytes) { |
|
463 BEV_RESET_GENERIC_READ_TIMEOUT(bev); |
|
464 if (evbuffer_get_length(bev->input) >= bev->wm_read.low) |
|
465 _bufferevent_run_readcb(bev); |
|
466 bev_async_consider_reading(bev_a); |
|
467 } else if (!ok) { |
|
468 what |= BEV_EVENT_ERROR; |
|
469 bev_a->ok = 0; |
|
470 _bufferevent_run_eventcb(bev, what); |
|
471 } else if (!nbytes) { |
|
472 what |= BEV_EVENT_EOF; |
|
473 bev_a->ok = 0; |
|
474 _bufferevent_run_eventcb(bev, what); |
|
475 } |
|
476 } |
|
477 |
|
478 _bufferevent_decref_and_unlock(bev); |
|
479 } |
|
480 |
|
481 static void |
|
482 write_complete(struct event_overlapped *eo, ev_uintptr_t key, |
|
483 ev_ssize_t nbytes, int ok) |
|
484 { |
|
485 struct bufferevent_async *bev_a = upcast_write(eo); |
|
486 struct bufferevent *bev = &bev_a->bev.bev; |
|
487 short what = BEV_EVENT_WRITING; |
|
488 ev_ssize_t amount_unwritten; |
|
489 |
|
490 BEV_LOCK(bev); |
|
491 EVUTIL_ASSERT(bev_a->write_in_progress); |
|
492 |
|
493 amount_unwritten = bev_a->write_in_progress - nbytes; |
|
494 evbuffer_commit_write(bev->output, nbytes); |
|
495 bev_a->write_in_progress = 0; |
|
496 |
|
497 if (amount_unwritten) |
|
498 _bufferevent_decrement_write_buckets(&bev_a->bev, |
|
499 -amount_unwritten); |
|
500 |
|
501 |
|
502 if (!ok) |
|
503 bev_async_set_wsa_error(bev, eo); |
|
504 |
|
505 if (bev_a->ok) { |
|
506 if (ok && nbytes) { |
|
507 BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); |
|
508 if (evbuffer_get_length(bev->output) <= |
|
509 bev->wm_write.low) |
|
510 _bufferevent_run_writecb(bev); |
|
511 bev_async_consider_writing(bev_a); |
|
512 } else if (!ok) { |
|
513 what |= BEV_EVENT_ERROR; |
|
514 bev_a->ok = 0; |
|
515 _bufferevent_run_eventcb(bev, what); |
|
516 } else if (!nbytes) { |
|
517 what |= BEV_EVENT_EOF; |
|
518 bev_a->ok = 0; |
|
519 _bufferevent_run_eventcb(bev, what); |
|
520 } |
|
521 } |
|
522 |
|
523 _bufferevent_decref_and_unlock(bev); |
|
524 } |
|
525 |
|
526 struct bufferevent * |
|
527 bufferevent_async_new(struct event_base *base, |
|
528 evutil_socket_t fd, int options) |
|
529 { |
|
530 struct bufferevent_async *bev_a; |
|
531 struct bufferevent *bev; |
|
532 struct event_iocp_port *iocp; |
|
533 |
|
534 options |= BEV_OPT_THREADSAFE; |
|
535 |
|
536 if (!(iocp = event_base_get_iocp(base))) |
|
537 return NULL; |
|
538 |
|
539 if (fd >= 0 && event_iocp_port_associate(iocp, fd, 1)<0) { |
|
540 int err = GetLastError(); |
|
541 /* We may have alrady associated this fd with a port. |
|
542 * Let's hope it's this port, and that the error code |
|
543 * for doing this neer changes. */ |
|
544 if (err != ERROR_INVALID_PARAMETER) |
|
545 return NULL; |
|
546 } |
|
547 |
|
548 if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async)))) |
|
549 return NULL; |
|
550 |
|
551 bev = &bev_a->bev.bev; |
|
552 if (!(bev->input = evbuffer_overlapped_new(fd))) { |
|
553 mm_free(bev_a); |
|
554 return NULL; |
|
555 } |
|
556 if (!(bev->output = evbuffer_overlapped_new(fd))) { |
|
557 evbuffer_free(bev->input); |
|
558 mm_free(bev_a); |
|
559 return NULL; |
|
560 } |
|
561 |
|
562 if (bufferevent_init_common(&bev_a->bev, base, &bufferevent_ops_async, |
|
563 options)<0) |
|
564 goto err; |
|
565 |
|
566 evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev); |
|
567 evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev); |
|
568 |
|
569 event_overlapped_init(&bev_a->connect_overlapped, connect_complete); |
|
570 event_overlapped_init(&bev_a->read_overlapped, read_complete); |
|
571 event_overlapped_init(&bev_a->write_overlapped, write_complete); |
|
572 |
|
573 bev_a->ok = fd >= 0; |
|
574 if (bev_a->ok) |
|
575 _bufferevent_init_generic_timeout_cbs(bev); |
|
576 |
|
577 return bev; |
|
578 err: |
|
579 bufferevent_free(&bev_a->bev.bev); |
|
580 return NULL; |
|
581 } |
|
582 |
|
583 void |
|
584 bufferevent_async_set_connected(struct bufferevent *bev) |
|
585 { |
|
586 struct bufferevent_async *bev_async = upcast(bev); |
|
587 bev_async->ok = 1; |
|
588 _bufferevent_init_generic_timeout_cbs(bev); |
|
589 /* Now's a good time to consider reading/writing */ |
|
590 be_async_enable(bev, bev->enabled); |
|
591 } |
|
592 |
|
593 int |
|
594 bufferevent_async_can_connect(struct bufferevent *bev) |
|
595 { |
|
596 const struct win32_extension_fns *ext = |
|
597 event_get_win32_extension_fns(); |
|
598 |
|
599 if (BEV_IS_ASYNC(bev) && |
|
600 event_base_get_iocp(bev->ev_base) && |
|
601 ext && ext->ConnectEx) |
|
602 return 1; |
|
603 |
|
604 return 0; |
|
605 } |
|
606 |
|
607 int |
|
608 bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd, |
|
609 const struct sockaddr *sa, int socklen) |
|
610 { |
|
611 BOOL rc; |
|
612 struct bufferevent_async *bev_async = upcast(bev); |
|
613 struct sockaddr_storage ss; |
|
614 const struct win32_extension_fns *ext = |
|
615 event_get_win32_extension_fns(); |
|
616 |
|
617 EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL); |
|
618 |
|
619 /* ConnectEx() requires that the socket be bound to an address |
|
620 * with bind() before using, otherwise it will fail. We attempt |
|
621 * to issue a bind() here, taking into account that the error |
|
622 * code is set to WSAEINVAL when the socket is already bound. */ |
|
623 memset(&ss, 0, sizeof(ss)); |
|
624 if (sa->sa_family == AF_INET) { |
|
625 struct sockaddr_in *sin = (struct sockaddr_in *)&ss; |
|
626 sin->sin_family = AF_INET; |
|
627 sin->sin_addr.s_addr = INADDR_ANY; |
|
628 } else if (sa->sa_family == AF_INET6) { |
|
629 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss; |
|
630 sin6->sin6_family = AF_INET6; |
|
631 sin6->sin6_addr = in6addr_any; |
|
632 } else { |
|
633 /* Well, the user will have to bind() */ |
|
634 return -1; |
|
635 } |
|
636 if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 && |
|
637 WSAGetLastError() != WSAEINVAL) |
|
638 return -1; |
|
639 |
|
640 event_base_add_virtual(bev->ev_base); |
|
641 bufferevent_incref(bev); |
|
642 rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL, |
|
643 &bev_async->connect_overlapped.overlapped); |
|
644 if (rc || WSAGetLastError() == ERROR_IO_PENDING) |
|
645 return 0; |
|
646 |
|
647 event_base_del_virtual(bev->ev_base); |
|
648 bufferevent_decref(bev); |
|
649 |
|
650 return -1; |
|
651 } |
|
652 |
|
653 static int |
|
654 be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, |
|
655 union bufferevent_ctrl_data *data) |
|
656 { |
|
657 switch (op) { |
|
658 case BEV_CTRL_GET_FD: |
|
659 data->fd = _evbuffer_overlapped_get_fd(bev->input); |
|
660 return 0; |
|
661 case BEV_CTRL_SET_FD: { |
|
662 struct event_iocp_port *iocp; |
|
663 |
|
664 if (data->fd == _evbuffer_overlapped_get_fd(bev->input)) |
|
665 return 0; |
|
666 if (!(iocp = event_base_get_iocp(bev->ev_base))) |
|
667 return -1; |
|
668 if (event_iocp_port_associate(iocp, data->fd, 1) < 0) |
|
669 return -1; |
|
670 _evbuffer_overlapped_set_fd(bev->input, data->fd); |
|
671 _evbuffer_overlapped_set_fd(bev->output, data->fd); |
|
672 return 0; |
|
673 } |
|
674 case BEV_CTRL_CANCEL_ALL: { |
|
675 struct bufferevent_async *bev_a = upcast(bev); |
|
676 evutil_socket_t fd = _evbuffer_overlapped_get_fd(bev->input); |
|
677 if (fd != (evutil_socket_t)INVALID_SOCKET && |
|
678 (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) { |
|
679 closesocket(fd); |
|
680 } |
|
681 bev_a->ok = 0; |
|
682 return 0; |
|
683 } |
|
684 case BEV_CTRL_GET_UNDERLYING: |
|
685 default: |
|
686 return -1; |
|
687 } |
|
688 } |
|
689 |
|
690 |