1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/ipc/chromium/src/third_party/libevent/bufferevent_async.c Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,690 @@ 1.4 +/* 1.5 + * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson 1.6 + * 1.7 + * All rights reserved. 1.8 + * 1.9 + * Redistribution and use in source and binary forms, with or without 1.10 + * modification, are permitted provided that the following conditions 1.11 + * are met: 1.12 + * 1. Redistributions of source code must retain the above copyright 1.13 + * notice, this list of conditions and the following disclaimer. 1.14 + * 2. Redistributions in binary form must reproduce the above copyright 1.15 + * notice, this list of conditions and the following disclaimer in the 1.16 + * documentation and/or other materials provided with the distribution. 1.17 + * 3. The name of the author may not be used to endorse or promote products 1.18 + * derived from this software without specific prior written permission. 1.19 + * 1.20 + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 1.21 + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 1.22 + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 1.23 + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 1.24 + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 1.25 + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 1.26 + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 1.27 + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 1.28 + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 1.29 + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 1.30 + */ 1.31 + 1.32 +#include "event2/event-config.h" 1.33 + 1.34 +#ifdef _EVENT_HAVE_SYS_TIME_H 1.35 +#include <sys/time.h> 1.36 +#endif 1.37 + 1.38 +#include <errno.h> 1.39 +#include <stdio.h> 1.40 +#include <stdlib.h> 1.41 +#include <string.h> 1.42 +#ifdef _EVENT_HAVE_STDARG_H 1.43 +#include <stdarg.h> 1.44 +#endif 1.45 +#ifdef _EVENT_HAVE_UNISTD_H 1.46 +#include <unistd.h> 1.47 +#endif 1.48 + 1.49 +#ifdef WIN32 1.50 +#include <winsock2.h> 1.51 +#include <ws2tcpip.h> 1.52 +#endif 1.53 + 1.54 +#include <sys/queue.h> 1.55 + 1.56 +#include "event2/util.h" 1.57 +#include "event2/bufferevent.h" 1.58 +#include "event2/buffer.h" 1.59 +#include "event2/bufferevent_struct.h" 1.60 +#include "event2/event.h" 1.61 +#include "event2/util.h" 1.62 +#include "event-internal.h" 1.63 +#include "log-internal.h" 1.64 +#include "mm-internal.h" 1.65 +#include "bufferevent-internal.h" 1.66 +#include "util-internal.h" 1.67 +#include "iocp-internal.h" 1.68 + 1.69 +#ifndef SO_UPDATE_CONNECT_CONTEXT 1.70 +/* Mingw is sometimes missing this */ 1.71 +#define SO_UPDATE_CONNECT_CONTEXT 0x7010 1.72 +#endif 1.73 + 1.74 +/* prototypes */ 1.75 +static int be_async_enable(struct bufferevent *, short); 1.76 +static int be_async_disable(struct bufferevent *, short); 1.77 +static void be_async_destruct(struct bufferevent *); 1.78 +static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode); 1.79 +static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); 1.80 + 1.81 +struct bufferevent_async { 1.82 + struct bufferevent_private bev; 1.83 + struct event_overlapped connect_overlapped; 1.84 + struct event_overlapped read_overlapped; 1.85 + struct event_overlapped write_overlapped; 1.86 + size_t read_in_progress; 1.87 + size_t write_in_progress; 1.88 + unsigned ok : 1; 1.89 + unsigned read_added : 1; 1.90 + unsigned write_added : 1; 1.91 +}; 1.92 + 1.93 +const struct bufferevent_ops bufferevent_ops_async = { 1.94 + "socket_async", 1.95 + evutil_offsetof(struct bufferevent_async, bev.bev), 1.96 + be_async_enable, 1.97 + be_async_disable, 1.98 + be_async_destruct, 1.99 + _bufferevent_generic_adj_timeouts, 1.100 + be_async_flush, 1.101 + be_async_ctrl, 1.102 +}; 1.103 + 1.104 +static inline struct bufferevent_async * 1.105 +upcast(struct bufferevent *bev) 1.106 +{ 1.107 + struct bufferevent_async *bev_a; 1.108 + if (bev->be_ops != &bufferevent_ops_async) 1.109 + return NULL; 1.110 + bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev); 1.111 + return bev_a; 1.112 +} 1.113 + 1.114 +static inline struct bufferevent_async * 1.115 +upcast_connect(struct event_overlapped *eo) 1.116 +{ 1.117 + struct bufferevent_async *bev_a; 1.118 + bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped); 1.119 + EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 1.120 + return bev_a; 1.121 +} 1.122 + 1.123 +static inline struct bufferevent_async * 1.124 +upcast_read(struct event_overlapped *eo) 1.125 +{ 1.126 + struct bufferevent_async *bev_a; 1.127 + bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped); 1.128 + EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 1.129 + return bev_a; 1.130 +} 1.131 + 1.132 +static inline struct bufferevent_async * 1.133 +upcast_write(struct event_overlapped *eo) 1.134 +{ 1.135 + struct bufferevent_async *bev_a; 1.136 + bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped); 1.137 + EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 1.138 + return bev_a; 1.139 +} 1.140 + 1.141 +static void 1.142 +bev_async_del_write(struct bufferevent_async *beva) 1.143 +{ 1.144 + struct bufferevent *bev = &beva->bev.bev; 1.145 + 1.146 + if (beva->write_added) { 1.147 + beva->write_added = 0; 1.148 + event_base_del_virtual(bev->ev_base); 1.149 + } 1.150 +} 1.151 + 1.152 +static void 1.153 +bev_async_del_read(struct bufferevent_async *beva) 1.154 +{ 1.155 + struct bufferevent *bev = &beva->bev.bev; 1.156 + 1.157 + if (beva->read_added) { 1.158 + beva->read_added = 0; 1.159 + event_base_del_virtual(bev->ev_base); 1.160 + } 1.161 +} 1.162 + 1.163 +static void 1.164 +bev_async_add_write(struct bufferevent_async *beva) 1.165 +{ 1.166 + struct bufferevent *bev = &beva->bev.bev; 1.167 + 1.168 + if (!beva->write_added) { 1.169 + beva->write_added = 1; 1.170 + event_base_add_virtual(bev->ev_base); 1.171 + } 1.172 +} 1.173 + 1.174 +static void 1.175 +bev_async_add_read(struct bufferevent_async *beva) 1.176 +{ 1.177 + struct bufferevent *bev = &beva->bev.bev; 1.178 + 1.179 + if (!beva->read_added) { 1.180 + beva->read_added = 1; 1.181 + event_base_add_virtual(bev->ev_base); 1.182 + } 1.183 +} 1.184 + 1.185 +static void 1.186 +bev_async_consider_writing(struct bufferevent_async *beva) 1.187 +{ 1.188 + size_t at_most; 1.189 + int limit; 1.190 + struct bufferevent *bev = &beva->bev.bev; 1.191 + 1.192 + /* Don't write if there's a write in progress, or we do not 1.193 + * want to write, or when there's nothing left to write. */ 1.194 + if (beva->write_in_progress || beva->bev.connecting) 1.195 + return; 1.196 + if (!beva->ok || !(bev->enabled&EV_WRITE) || 1.197 + !evbuffer_get_length(bev->output)) { 1.198 + bev_async_del_write(beva); 1.199 + return; 1.200 + } 1.201 + 1.202 + at_most = evbuffer_get_length(bev->output); 1.203 + 1.204 + /* This is safe so long as bufferevent_get_write_max never returns 1.205 + * more than INT_MAX. That's true for now. XXXX */ 1.206 + limit = (int)_bufferevent_get_write_max(&beva->bev); 1.207 + if (at_most >= (size_t)limit && limit >= 0) 1.208 + at_most = limit; 1.209 + 1.210 + if (beva->bev.write_suspended) { 1.211 + bev_async_del_write(beva); 1.212 + return; 1.213 + } 1.214 + 1.215 + /* XXXX doesn't respect low-water mark very well. */ 1.216 + bufferevent_incref(bev); 1.217 + if (evbuffer_launch_write(bev->output, at_most, 1.218 + &beva->write_overlapped)) { 1.219 + bufferevent_decref(bev); 1.220 + beva->ok = 0; 1.221 + _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR); 1.222 + } else { 1.223 + beva->write_in_progress = at_most; 1.224 + _bufferevent_decrement_write_buckets(&beva->bev, at_most); 1.225 + bev_async_add_write(beva); 1.226 + } 1.227 +} 1.228 + 1.229 +static void 1.230 +bev_async_consider_reading(struct bufferevent_async *beva) 1.231 +{ 1.232 + size_t cur_size; 1.233 + size_t read_high; 1.234 + size_t at_most; 1.235 + int limit; 1.236 + struct bufferevent *bev = &beva->bev.bev; 1.237 + 1.238 + /* Don't read if there is a read in progress, or we do not 1.239 + * want to read. */ 1.240 + if (beva->read_in_progress || beva->bev.connecting) 1.241 + return; 1.242 + if (!beva->ok || !(bev->enabled&EV_READ)) { 1.243 + bev_async_del_read(beva); 1.244 + return; 1.245 + } 1.246 + 1.247 + /* Don't read if we're full */ 1.248 + cur_size = evbuffer_get_length(bev->input); 1.249 + read_high = bev->wm_read.high; 1.250 + if (read_high) { 1.251 + if (cur_size >= read_high) { 1.252 + bev_async_del_read(beva); 1.253 + return; 1.254 + } 1.255 + at_most = read_high - cur_size; 1.256 + } else { 1.257 + at_most = 16384; /* FIXME totally magic. */ 1.258 + } 1.259 + 1.260 + /* XXXX This over-commits. */ 1.261 + /* XXXX see also not above on cast on _bufferevent_get_write_max() */ 1.262 + limit = (int)_bufferevent_get_read_max(&beva->bev); 1.263 + if (at_most >= (size_t)limit && limit >= 0) 1.264 + at_most = limit; 1.265 + 1.266 + if (beva->bev.read_suspended) { 1.267 + bev_async_del_read(beva); 1.268 + return; 1.269 + } 1.270 + 1.271 + bufferevent_incref(bev); 1.272 + if (evbuffer_launch_read(bev->input, at_most, &beva->read_overlapped)) { 1.273 + beva->ok = 0; 1.274 + _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR); 1.275 + bufferevent_decref(bev); 1.276 + } else { 1.277 + beva->read_in_progress = at_most; 1.278 + _bufferevent_decrement_read_buckets(&beva->bev, at_most); 1.279 + bev_async_add_read(beva); 1.280 + } 1.281 + 1.282 + return; 1.283 +} 1.284 + 1.285 +static void 1.286 +be_async_outbuf_callback(struct evbuffer *buf, 1.287 + const struct evbuffer_cb_info *cbinfo, 1.288 + void *arg) 1.289 +{ 1.290 + struct bufferevent *bev = arg; 1.291 + struct bufferevent_async *bev_async = upcast(bev); 1.292 + 1.293 + /* If we added data to the outbuf and were not writing before, 1.294 + * we may want to write now. */ 1.295 + 1.296 + _bufferevent_incref_and_lock(bev); 1.297 + 1.298 + if (cbinfo->n_added) 1.299 + bev_async_consider_writing(bev_async); 1.300 + 1.301 + _bufferevent_decref_and_unlock(bev); 1.302 +} 1.303 + 1.304 +static void 1.305 +be_async_inbuf_callback(struct evbuffer *buf, 1.306 + const struct evbuffer_cb_info *cbinfo, 1.307 + void *arg) 1.308 +{ 1.309 + struct bufferevent *bev = arg; 1.310 + struct bufferevent_async *bev_async = upcast(bev); 1.311 + 1.312 + /* If we drained data from the inbuf and were not reading before, 1.313 + * we may want to read now */ 1.314 + 1.315 + _bufferevent_incref_and_lock(bev); 1.316 + 1.317 + if (cbinfo->n_deleted) 1.318 + bev_async_consider_reading(bev_async); 1.319 + 1.320 + _bufferevent_decref_and_unlock(bev); 1.321 +} 1.322 + 1.323 +static int 1.324 +be_async_enable(struct bufferevent *buf, short what) 1.325 +{ 1.326 + struct bufferevent_async *bev_async = upcast(buf); 1.327 + 1.328 + if (!bev_async->ok) 1.329 + return -1; 1.330 + 1.331 + if (bev_async->bev.connecting) { 1.332 + /* Don't launch anything during connection attempts. */ 1.333 + return 0; 1.334 + } 1.335 + 1.336 + if (what & EV_READ) 1.337 + BEV_RESET_GENERIC_READ_TIMEOUT(buf); 1.338 + if (what & EV_WRITE) 1.339 + BEV_RESET_GENERIC_WRITE_TIMEOUT(buf); 1.340 + 1.341 + /* If we newly enable reading or writing, and we aren't reading or 1.342 + writing already, consider launching a new read or write. */ 1.343 + 1.344 + if (what & EV_READ) 1.345 + bev_async_consider_reading(bev_async); 1.346 + if (what & EV_WRITE) 1.347 + bev_async_consider_writing(bev_async); 1.348 + return 0; 1.349 +} 1.350 + 1.351 +static int 1.352 +be_async_disable(struct bufferevent *bev, short what) 1.353 +{ 1.354 + struct bufferevent_async *bev_async = upcast(bev); 1.355 + /* XXXX If we disable reading or writing, we may want to consider 1.356 + * canceling any in-progress read or write operation, though it might 1.357 + * not work. */ 1.358 + 1.359 + if (what & EV_READ) { 1.360 + BEV_DEL_GENERIC_READ_TIMEOUT(bev); 1.361 + bev_async_del_read(bev_async); 1.362 + } 1.363 + if (what & EV_WRITE) { 1.364 + BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); 1.365 + bev_async_del_write(bev_async); 1.366 + } 1.367 + 1.368 + return 0; 1.369 +} 1.370 + 1.371 +static void 1.372 +be_async_destruct(struct bufferevent *bev) 1.373 +{ 1.374 + struct bufferevent_async *bev_async = upcast(bev); 1.375 + struct bufferevent_private *bev_p = BEV_UPCAST(bev); 1.376 + evutil_socket_t fd; 1.377 + 1.378 + EVUTIL_ASSERT(!upcast(bev)->write_in_progress && 1.379 + !upcast(bev)->read_in_progress); 1.380 + 1.381 + bev_async_del_read(bev_async); 1.382 + bev_async_del_write(bev_async); 1.383 + 1.384 + fd = _evbuffer_overlapped_get_fd(bev->input); 1.385 + if (bev_p->options & BEV_OPT_CLOSE_ON_FREE) { 1.386 + /* XXXX possible double-close */ 1.387 + evutil_closesocket(fd); 1.388 + } 1.389 + /* delete this in case non-blocking connect was used */ 1.390 + if (event_initialized(&bev->ev_write)) { 1.391 + event_del(&bev->ev_write); 1.392 + _bufferevent_del_generic_timeout_cbs(bev); 1.393 + } 1.394 +} 1.395 + 1.396 +/* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so 1.397 + * we use WSAGetOverlappedResult to translate. */ 1.398 +static void 1.399 +bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo) 1.400 +{ 1.401 + DWORD bytes, flags; 1.402 + evutil_socket_t fd; 1.403 + 1.404 + fd = _evbuffer_overlapped_get_fd(bev->input); 1.405 + WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags); 1.406 +} 1.407 + 1.408 +static int 1.409 +be_async_flush(struct bufferevent *bev, short what, 1.410 + enum bufferevent_flush_mode mode) 1.411 +{ 1.412 + return 0; 1.413 +} 1.414 + 1.415 +static void 1.416 +connect_complete(struct event_overlapped *eo, ev_uintptr_t key, 1.417 + ev_ssize_t nbytes, int ok) 1.418 +{ 1.419 + struct bufferevent_async *bev_a = upcast_connect(eo); 1.420 + struct bufferevent *bev = &bev_a->bev.bev; 1.421 + evutil_socket_t sock; 1.422 + 1.423 + BEV_LOCK(bev); 1.424 + 1.425 + EVUTIL_ASSERT(bev_a->bev.connecting); 1.426 + bev_a->bev.connecting = 0; 1.427 + sock = _evbuffer_overlapped_get_fd(bev_a->bev.bev.input); 1.428 + /* XXXX Handle error? */ 1.429 + setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0); 1.430 + 1.431 + if (ok) 1.432 + bufferevent_async_set_connected(bev); 1.433 + else 1.434 + bev_async_set_wsa_error(bev, eo); 1.435 + 1.436 + _bufferevent_run_eventcb(bev, 1.437 + ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR); 1.438 + 1.439 + event_base_del_virtual(bev->ev_base); 1.440 + 1.441 + _bufferevent_decref_and_unlock(bev); 1.442 +} 1.443 + 1.444 +static void 1.445 +read_complete(struct event_overlapped *eo, ev_uintptr_t key, 1.446 + ev_ssize_t nbytes, int ok) 1.447 +{ 1.448 + struct bufferevent_async *bev_a = upcast_read(eo); 1.449 + struct bufferevent *bev = &bev_a->bev.bev; 1.450 + short what = BEV_EVENT_READING; 1.451 + ev_ssize_t amount_unread; 1.452 + BEV_LOCK(bev); 1.453 + EVUTIL_ASSERT(bev_a->read_in_progress); 1.454 + 1.455 + amount_unread = bev_a->read_in_progress - nbytes; 1.456 + evbuffer_commit_read(bev->input, nbytes); 1.457 + bev_a->read_in_progress = 0; 1.458 + if (amount_unread) 1.459 + _bufferevent_decrement_read_buckets(&bev_a->bev, -amount_unread); 1.460 + 1.461 + if (!ok) 1.462 + bev_async_set_wsa_error(bev, eo); 1.463 + 1.464 + if (bev_a->ok) { 1.465 + if (ok && nbytes) { 1.466 + BEV_RESET_GENERIC_READ_TIMEOUT(bev); 1.467 + if (evbuffer_get_length(bev->input) >= bev->wm_read.low) 1.468 + _bufferevent_run_readcb(bev); 1.469 + bev_async_consider_reading(bev_a); 1.470 + } else if (!ok) { 1.471 + what |= BEV_EVENT_ERROR; 1.472 + bev_a->ok = 0; 1.473 + _bufferevent_run_eventcb(bev, what); 1.474 + } else if (!nbytes) { 1.475 + what |= BEV_EVENT_EOF; 1.476 + bev_a->ok = 0; 1.477 + _bufferevent_run_eventcb(bev, what); 1.478 + } 1.479 + } 1.480 + 1.481 + _bufferevent_decref_and_unlock(bev); 1.482 +} 1.483 + 1.484 +static void 1.485 +write_complete(struct event_overlapped *eo, ev_uintptr_t key, 1.486 + ev_ssize_t nbytes, int ok) 1.487 +{ 1.488 + struct bufferevent_async *bev_a = upcast_write(eo); 1.489 + struct bufferevent *bev = &bev_a->bev.bev; 1.490 + short what = BEV_EVENT_WRITING; 1.491 + ev_ssize_t amount_unwritten; 1.492 + 1.493 + BEV_LOCK(bev); 1.494 + EVUTIL_ASSERT(bev_a->write_in_progress); 1.495 + 1.496 + amount_unwritten = bev_a->write_in_progress - nbytes; 1.497 + evbuffer_commit_write(bev->output, nbytes); 1.498 + bev_a->write_in_progress = 0; 1.499 + 1.500 + if (amount_unwritten) 1.501 + _bufferevent_decrement_write_buckets(&bev_a->bev, 1.502 + -amount_unwritten); 1.503 + 1.504 + 1.505 + if (!ok) 1.506 + bev_async_set_wsa_error(bev, eo); 1.507 + 1.508 + if (bev_a->ok) { 1.509 + if (ok && nbytes) { 1.510 + BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); 1.511 + if (evbuffer_get_length(bev->output) <= 1.512 + bev->wm_write.low) 1.513 + _bufferevent_run_writecb(bev); 1.514 + bev_async_consider_writing(bev_a); 1.515 + } else if (!ok) { 1.516 + what |= BEV_EVENT_ERROR; 1.517 + bev_a->ok = 0; 1.518 + _bufferevent_run_eventcb(bev, what); 1.519 + } else if (!nbytes) { 1.520 + what |= BEV_EVENT_EOF; 1.521 + bev_a->ok = 0; 1.522 + _bufferevent_run_eventcb(bev, what); 1.523 + } 1.524 + } 1.525 + 1.526 + _bufferevent_decref_and_unlock(bev); 1.527 +} 1.528 + 1.529 +struct bufferevent * 1.530 +bufferevent_async_new(struct event_base *base, 1.531 + evutil_socket_t fd, int options) 1.532 +{ 1.533 + struct bufferevent_async *bev_a; 1.534 + struct bufferevent *bev; 1.535 + struct event_iocp_port *iocp; 1.536 + 1.537 + options |= BEV_OPT_THREADSAFE; 1.538 + 1.539 + if (!(iocp = event_base_get_iocp(base))) 1.540 + return NULL; 1.541 + 1.542 + if (fd >= 0 && event_iocp_port_associate(iocp, fd, 1)<0) { 1.543 + int err = GetLastError(); 1.544 + /* We may have alrady associated this fd with a port. 1.545 + * Let's hope it's this port, and that the error code 1.546 + * for doing this neer changes. */ 1.547 + if (err != ERROR_INVALID_PARAMETER) 1.548 + return NULL; 1.549 + } 1.550 + 1.551 + if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async)))) 1.552 + return NULL; 1.553 + 1.554 + bev = &bev_a->bev.bev; 1.555 + if (!(bev->input = evbuffer_overlapped_new(fd))) { 1.556 + mm_free(bev_a); 1.557 + return NULL; 1.558 + } 1.559 + if (!(bev->output = evbuffer_overlapped_new(fd))) { 1.560 + evbuffer_free(bev->input); 1.561 + mm_free(bev_a); 1.562 + return NULL; 1.563 + } 1.564 + 1.565 + if (bufferevent_init_common(&bev_a->bev, base, &bufferevent_ops_async, 1.566 + options)<0) 1.567 + goto err; 1.568 + 1.569 + evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev); 1.570 + evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev); 1.571 + 1.572 + event_overlapped_init(&bev_a->connect_overlapped, connect_complete); 1.573 + event_overlapped_init(&bev_a->read_overlapped, read_complete); 1.574 + event_overlapped_init(&bev_a->write_overlapped, write_complete); 1.575 + 1.576 + bev_a->ok = fd >= 0; 1.577 + if (bev_a->ok) 1.578 + _bufferevent_init_generic_timeout_cbs(bev); 1.579 + 1.580 + return bev; 1.581 +err: 1.582 + bufferevent_free(&bev_a->bev.bev); 1.583 + return NULL; 1.584 +} 1.585 + 1.586 +void 1.587 +bufferevent_async_set_connected(struct bufferevent *bev) 1.588 +{ 1.589 + struct bufferevent_async *bev_async = upcast(bev); 1.590 + bev_async->ok = 1; 1.591 + _bufferevent_init_generic_timeout_cbs(bev); 1.592 + /* Now's a good time to consider reading/writing */ 1.593 + be_async_enable(bev, bev->enabled); 1.594 +} 1.595 + 1.596 +int 1.597 +bufferevent_async_can_connect(struct bufferevent *bev) 1.598 +{ 1.599 + const struct win32_extension_fns *ext = 1.600 + event_get_win32_extension_fns(); 1.601 + 1.602 + if (BEV_IS_ASYNC(bev) && 1.603 + event_base_get_iocp(bev->ev_base) && 1.604 + ext && ext->ConnectEx) 1.605 + return 1; 1.606 + 1.607 + return 0; 1.608 +} 1.609 + 1.610 +int 1.611 +bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd, 1.612 + const struct sockaddr *sa, int socklen) 1.613 +{ 1.614 + BOOL rc; 1.615 + struct bufferevent_async *bev_async = upcast(bev); 1.616 + struct sockaddr_storage ss; 1.617 + const struct win32_extension_fns *ext = 1.618 + event_get_win32_extension_fns(); 1.619 + 1.620 + EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL); 1.621 + 1.622 + /* ConnectEx() requires that the socket be bound to an address 1.623 + * with bind() before using, otherwise it will fail. We attempt 1.624 + * to issue a bind() here, taking into account that the error 1.625 + * code is set to WSAEINVAL when the socket is already bound. */ 1.626 + memset(&ss, 0, sizeof(ss)); 1.627 + if (sa->sa_family == AF_INET) { 1.628 + struct sockaddr_in *sin = (struct sockaddr_in *)&ss; 1.629 + sin->sin_family = AF_INET; 1.630 + sin->sin_addr.s_addr = INADDR_ANY; 1.631 + } else if (sa->sa_family == AF_INET6) { 1.632 + struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss; 1.633 + sin6->sin6_family = AF_INET6; 1.634 + sin6->sin6_addr = in6addr_any; 1.635 + } else { 1.636 + /* Well, the user will have to bind() */ 1.637 + return -1; 1.638 + } 1.639 + if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 && 1.640 + WSAGetLastError() != WSAEINVAL) 1.641 + return -1; 1.642 + 1.643 + event_base_add_virtual(bev->ev_base); 1.644 + bufferevent_incref(bev); 1.645 + rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL, 1.646 + &bev_async->connect_overlapped.overlapped); 1.647 + if (rc || WSAGetLastError() == ERROR_IO_PENDING) 1.648 + return 0; 1.649 + 1.650 + event_base_del_virtual(bev->ev_base); 1.651 + bufferevent_decref(bev); 1.652 + 1.653 + return -1; 1.654 +} 1.655 + 1.656 +static int 1.657 +be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, 1.658 + union bufferevent_ctrl_data *data) 1.659 +{ 1.660 + switch (op) { 1.661 + case BEV_CTRL_GET_FD: 1.662 + data->fd = _evbuffer_overlapped_get_fd(bev->input); 1.663 + return 0; 1.664 + case BEV_CTRL_SET_FD: { 1.665 + struct event_iocp_port *iocp; 1.666 + 1.667 + if (data->fd == _evbuffer_overlapped_get_fd(bev->input)) 1.668 + return 0; 1.669 + if (!(iocp = event_base_get_iocp(bev->ev_base))) 1.670 + return -1; 1.671 + if (event_iocp_port_associate(iocp, data->fd, 1) < 0) 1.672 + return -1; 1.673 + _evbuffer_overlapped_set_fd(bev->input, data->fd); 1.674 + _evbuffer_overlapped_set_fd(bev->output, data->fd); 1.675 + return 0; 1.676 + } 1.677 + case BEV_CTRL_CANCEL_ALL: { 1.678 + struct bufferevent_async *bev_a = upcast(bev); 1.679 + evutil_socket_t fd = _evbuffer_overlapped_get_fd(bev->input); 1.680 + if (fd != (evutil_socket_t)INVALID_SOCKET && 1.681 + (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) { 1.682 + closesocket(fd); 1.683 + } 1.684 + bev_a->ok = 0; 1.685 + return 0; 1.686 + } 1.687 + case BEV_CTRL_GET_UNDERLYING: 1.688 + default: 1.689 + return -1; 1.690 + } 1.691 +} 1.692 + 1.693 +