ipc/chromium/src/third_party/libevent/bufferevent_async.c

changeset 0
6474c204b198
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/ipc/chromium/src/third_party/libevent/bufferevent_async.c	Wed Dec 31 06:09:35 2014 +0100
     1.3 @@ -0,0 +1,690 @@
     1.4 +/*
     1.5 + * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson
     1.6 + *
     1.7 + * All rights reserved.
     1.8 + *
     1.9 + * Redistribution and use in source and binary forms, with or without
    1.10 + * modification, are permitted provided that the following conditions
    1.11 + * are met:
    1.12 + * 1. Redistributions of source code must retain the above copyright
    1.13 + *    notice, this list of conditions and the following disclaimer.
    1.14 + * 2. Redistributions in binary form must reproduce the above copyright
    1.15 + *    notice, this list of conditions and the following disclaimer in the
    1.16 + *    documentation and/or other materials provided with the distribution.
    1.17 + * 3. The name of the author may not be used to endorse or promote products
    1.18 + *    derived from this software without specific prior written permission.
    1.19 + *
    1.20 + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
    1.21 + * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
    1.22 + * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
    1.23 + * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
    1.24 + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
    1.25 + * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
    1.26 + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
    1.27 + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
    1.28 + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
    1.29 + * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
    1.30 + */
    1.31 +
    1.32 +#include "event2/event-config.h"
    1.33 +
    1.34 +#ifdef _EVENT_HAVE_SYS_TIME_H
    1.35 +#include <sys/time.h>
    1.36 +#endif
    1.37 +
    1.38 +#include <errno.h>
    1.39 +#include <stdio.h>
    1.40 +#include <stdlib.h>
    1.41 +#include <string.h>
    1.42 +#ifdef _EVENT_HAVE_STDARG_H
    1.43 +#include <stdarg.h>
    1.44 +#endif
    1.45 +#ifdef _EVENT_HAVE_UNISTD_H
    1.46 +#include <unistd.h>
    1.47 +#endif
    1.48 +
    1.49 +#ifdef WIN32
    1.50 +#include <winsock2.h>
    1.51 +#include <ws2tcpip.h>
    1.52 +#endif
    1.53 +
    1.54 +#include <sys/queue.h>
    1.55 +
    1.56 +#include "event2/util.h"
    1.57 +#include "event2/bufferevent.h"
    1.58 +#include "event2/buffer.h"
    1.59 +#include "event2/bufferevent_struct.h"
    1.60 +#include "event2/event.h"
    1.61 +#include "event2/util.h"
    1.62 +#include "event-internal.h"
    1.63 +#include "log-internal.h"
    1.64 +#include "mm-internal.h"
    1.65 +#include "bufferevent-internal.h"
    1.66 +#include "util-internal.h"
    1.67 +#include "iocp-internal.h"
    1.68 +
    1.69 +#ifndef SO_UPDATE_CONNECT_CONTEXT
    1.70 +/* Mingw is sometimes missing this */
    1.71 +#define SO_UPDATE_CONNECT_CONTEXT 0x7010
    1.72 +#endif
    1.73 +
    1.74 +/* prototypes */
    1.75 +static int be_async_enable(struct bufferevent *, short);
    1.76 +static int be_async_disable(struct bufferevent *, short);
    1.77 +static void be_async_destruct(struct bufferevent *);
    1.78 +static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode);
    1.79 +static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
    1.80 +
    1.81 +struct bufferevent_async {
    1.82 +	struct bufferevent_private bev;
    1.83 +	struct event_overlapped connect_overlapped;
    1.84 +	struct event_overlapped read_overlapped;
    1.85 +	struct event_overlapped write_overlapped;
    1.86 +	size_t read_in_progress;
    1.87 +	size_t write_in_progress;
    1.88 +	unsigned ok : 1;
    1.89 +	unsigned read_added : 1;
    1.90 +	unsigned write_added : 1;
    1.91 +};
    1.92 +
    1.93 +const struct bufferevent_ops bufferevent_ops_async = {
    1.94 +	"socket_async",
    1.95 +	evutil_offsetof(struct bufferevent_async, bev.bev),
    1.96 +	be_async_enable,
    1.97 +	be_async_disable,
    1.98 +	be_async_destruct,
    1.99 +	_bufferevent_generic_adj_timeouts,
   1.100 +	be_async_flush,
   1.101 +	be_async_ctrl,
   1.102 +};
   1.103 +
   1.104 +static inline struct bufferevent_async *
   1.105 +upcast(struct bufferevent *bev)
   1.106 +{
   1.107 +	struct bufferevent_async *bev_a;
   1.108 +	if (bev->be_ops != &bufferevent_ops_async)
   1.109 +		return NULL;
   1.110 +	bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev);
   1.111 +	return bev_a;
   1.112 +}
   1.113 +
   1.114 +static inline struct bufferevent_async *
   1.115 +upcast_connect(struct event_overlapped *eo)
   1.116 +{
   1.117 +	struct bufferevent_async *bev_a;
   1.118 +	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
   1.119 +	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
   1.120 +	return bev_a;
   1.121 +}
   1.122 +
   1.123 +static inline struct bufferevent_async *
   1.124 +upcast_read(struct event_overlapped *eo)
   1.125 +{
   1.126 +	struct bufferevent_async *bev_a;
   1.127 +	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped);
   1.128 +	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
   1.129 +	return bev_a;
   1.130 +}
   1.131 +
   1.132 +static inline struct bufferevent_async *
   1.133 +upcast_write(struct event_overlapped *eo)
   1.134 +{
   1.135 +	struct bufferevent_async *bev_a;
   1.136 +	bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped);
   1.137 +	EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
   1.138 +	return bev_a;
   1.139 +}
   1.140 +
   1.141 +static void
   1.142 +bev_async_del_write(struct bufferevent_async *beva)
   1.143 +{
   1.144 +	struct bufferevent *bev = &beva->bev.bev;
   1.145 +
   1.146 +	if (beva->write_added) {
   1.147 +		beva->write_added = 0;
   1.148 +		event_base_del_virtual(bev->ev_base);
   1.149 +	}
   1.150 +}
   1.151 +
   1.152 +static void
   1.153 +bev_async_del_read(struct bufferevent_async *beva)
   1.154 +{
   1.155 +	struct bufferevent *bev = &beva->bev.bev;
   1.156 +
   1.157 +	if (beva->read_added) {
   1.158 +		beva->read_added = 0;
   1.159 +		event_base_del_virtual(bev->ev_base);
   1.160 +	}
   1.161 +}
   1.162 +
   1.163 +static void
   1.164 +bev_async_add_write(struct bufferevent_async *beva)
   1.165 +{
   1.166 +	struct bufferevent *bev = &beva->bev.bev;
   1.167 +
   1.168 +	if (!beva->write_added) {
   1.169 +		beva->write_added = 1;
   1.170 +		event_base_add_virtual(bev->ev_base);
   1.171 +	}
   1.172 +}
   1.173 +
   1.174 +static void
   1.175 +bev_async_add_read(struct bufferevent_async *beva)
   1.176 +{
   1.177 +	struct bufferevent *bev = &beva->bev.bev;
   1.178 +
   1.179 +	if (!beva->read_added) {
   1.180 +		beva->read_added = 1;
   1.181 +		event_base_add_virtual(bev->ev_base);
   1.182 +	}
   1.183 +}
   1.184 +
   1.185 +static void
   1.186 +bev_async_consider_writing(struct bufferevent_async *beva)
   1.187 +{
   1.188 +	size_t at_most;
   1.189 +	int limit;
   1.190 +	struct bufferevent *bev = &beva->bev.bev;
   1.191 +
   1.192 +	/* Don't write if there's a write in progress, or we do not
   1.193 +	 * want to write, or when there's nothing left to write. */
   1.194 +	if (beva->write_in_progress || beva->bev.connecting)
   1.195 +		return;
   1.196 +	if (!beva->ok || !(bev->enabled&EV_WRITE) ||
   1.197 +	    !evbuffer_get_length(bev->output)) {
   1.198 +		bev_async_del_write(beva);
   1.199 +		return;
   1.200 +	}
   1.201 +
   1.202 +	at_most = evbuffer_get_length(bev->output);
   1.203 +
   1.204 +	/* This is safe so long as bufferevent_get_write_max never returns
   1.205 +	 * more than INT_MAX.  That's true for now. XXXX */
   1.206 +	limit = (int)_bufferevent_get_write_max(&beva->bev);
   1.207 +	if (at_most >= (size_t)limit && limit >= 0)
   1.208 +		at_most = limit;
   1.209 +
   1.210 +	if (beva->bev.write_suspended) {
   1.211 +		bev_async_del_write(beva);
   1.212 +		return;
   1.213 +	}
   1.214 +
   1.215 +	/*  XXXX doesn't respect low-water mark very well. */
   1.216 +	bufferevent_incref(bev);
   1.217 +	if (evbuffer_launch_write(bev->output, at_most,
   1.218 +	    &beva->write_overlapped)) {
   1.219 +		bufferevent_decref(bev);
   1.220 +		beva->ok = 0;
   1.221 +		_bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
   1.222 +	} else {
   1.223 +		beva->write_in_progress = at_most;
   1.224 +		_bufferevent_decrement_write_buckets(&beva->bev, at_most);
   1.225 +		bev_async_add_write(beva);
   1.226 +	}
   1.227 +}
   1.228 +
   1.229 +static void
   1.230 +bev_async_consider_reading(struct bufferevent_async *beva)
   1.231 +{
   1.232 +	size_t cur_size;
   1.233 +	size_t read_high;
   1.234 +	size_t at_most;
   1.235 +	int limit;
   1.236 +	struct bufferevent *bev = &beva->bev.bev;
   1.237 +
   1.238 +	/* Don't read if there is a read in progress, or we do not
   1.239 +	 * want to read. */
   1.240 +	if (beva->read_in_progress || beva->bev.connecting)
   1.241 +		return;
   1.242 +	if (!beva->ok || !(bev->enabled&EV_READ)) {
   1.243 +		bev_async_del_read(beva);
   1.244 +		return;
   1.245 +	}
   1.246 +
   1.247 +	/* Don't read if we're full */
   1.248 +	cur_size = evbuffer_get_length(bev->input);
   1.249 +	read_high = bev->wm_read.high;
   1.250 +	if (read_high) {
   1.251 +		if (cur_size >= read_high) {
   1.252 +			bev_async_del_read(beva);
   1.253 +			return;
   1.254 +		}
   1.255 +		at_most = read_high - cur_size;
   1.256 +	} else {
   1.257 +		at_most = 16384; /* FIXME totally magic. */
   1.258 +	}
   1.259 +
   1.260 +	/* XXXX This over-commits. */
   1.261 +	/* XXXX see also not above on cast on _bufferevent_get_write_max() */
   1.262 +	limit = (int)_bufferevent_get_read_max(&beva->bev);
   1.263 +	if (at_most >= (size_t)limit && limit >= 0)
   1.264 +		at_most = limit;
   1.265 +
   1.266 +	if (beva->bev.read_suspended) {
   1.267 +		bev_async_del_read(beva);
   1.268 +		return;
   1.269 +	}
   1.270 +
   1.271 +	bufferevent_incref(bev);
   1.272 +	if (evbuffer_launch_read(bev->input, at_most, &beva->read_overlapped)) {
   1.273 +		beva->ok = 0;
   1.274 +		_bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
   1.275 +		bufferevent_decref(bev);
   1.276 +	} else {
   1.277 +		beva->read_in_progress = at_most;
   1.278 +		_bufferevent_decrement_read_buckets(&beva->bev, at_most);
   1.279 +		bev_async_add_read(beva);
   1.280 +	}
   1.281 +
   1.282 +	return;
   1.283 +}
   1.284 +
   1.285 +static void
   1.286 +be_async_outbuf_callback(struct evbuffer *buf,
   1.287 +    const struct evbuffer_cb_info *cbinfo,
   1.288 +    void *arg)
   1.289 +{
   1.290 +	struct bufferevent *bev = arg;
   1.291 +	struct bufferevent_async *bev_async = upcast(bev);
   1.292 +
   1.293 +	/* If we added data to the outbuf and were not writing before,
   1.294 +	 * we may want to write now. */
   1.295 +
   1.296 +	_bufferevent_incref_and_lock(bev);
   1.297 +
   1.298 +	if (cbinfo->n_added)
   1.299 +		bev_async_consider_writing(bev_async);
   1.300 +
   1.301 +	_bufferevent_decref_and_unlock(bev);
   1.302 +}
   1.303 +
   1.304 +static void
   1.305 +be_async_inbuf_callback(struct evbuffer *buf,
   1.306 +    const struct evbuffer_cb_info *cbinfo,
   1.307 +    void *arg)
   1.308 +{
   1.309 +	struct bufferevent *bev = arg;
   1.310 +	struct bufferevent_async *bev_async = upcast(bev);
   1.311 +
   1.312 +	/* If we drained data from the inbuf and were not reading before,
   1.313 +	 * we may want to read now */
   1.314 +
   1.315 +	_bufferevent_incref_and_lock(bev);
   1.316 +
   1.317 +	if (cbinfo->n_deleted)
   1.318 +		bev_async_consider_reading(bev_async);
   1.319 +
   1.320 +	_bufferevent_decref_and_unlock(bev);
   1.321 +}
   1.322 +
   1.323 +static int
   1.324 +be_async_enable(struct bufferevent *buf, short what)
   1.325 +{
   1.326 +	struct bufferevent_async *bev_async = upcast(buf);
   1.327 +
   1.328 +	if (!bev_async->ok)
   1.329 +		return -1;
   1.330 +
   1.331 +	if (bev_async->bev.connecting) {
   1.332 +		/* Don't launch anything during connection attempts. */
   1.333 +		return 0;
   1.334 +	}
   1.335 +
   1.336 +	if (what & EV_READ)
   1.337 +		BEV_RESET_GENERIC_READ_TIMEOUT(buf);
   1.338 +	if (what & EV_WRITE)
   1.339 +		BEV_RESET_GENERIC_WRITE_TIMEOUT(buf);
   1.340 +
   1.341 +	/* If we newly enable reading or writing, and we aren't reading or
   1.342 +	   writing already, consider launching a new read or write. */
   1.343 +
   1.344 +	if (what & EV_READ)
   1.345 +		bev_async_consider_reading(bev_async);
   1.346 +	if (what & EV_WRITE)
   1.347 +		bev_async_consider_writing(bev_async);
   1.348 +	return 0;
   1.349 +}
   1.350 +
   1.351 +static int
   1.352 +be_async_disable(struct bufferevent *bev, short what)
   1.353 +{
   1.354 +	struct bufferevent_async *bev_async = upcast(bev);
   1.355 +	/* XXXX If we disable reading or writing, we may want to consider
   1.356 +	 * canceling any in-progress read or write operation, though it might
   1.357 +	 * not work. */
   1.358 +
   1.359 +	if (what & EV_READ) {
   1.360 +		BEV_DEL_GENERIC_READ_TIMEOUT(bev);
   1.361 +		bev_async_del_read(bev_async);
   1.362 +	}
   1.363 +	if (what & EV_WRITE) {
   1.364 +		BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
   1.365 +		bev_async_del_write(bev_async);
   1.366 +	}
   1.367 +
   1.368 +	return 0;
   1.369 +}
   1.370 +
   1.371 +static void
   1.372 +be_async_destruct(struct bufferevent *bev)
   1.373 +{
   1.374 +	struct bufferevent_async *bev_async = upcast(bev);
   1.375 +	struct bufferevent_private *bev_p = BEV_UPCAST(bev);
   1.376 +	evutil_socket_t fd;
   1.377 +
   1.378 +	EVUTIL_ASSERT(!upcast(bev)->write_in_progress &&
   1.379 +			!upcast(bev)->read_in_progress);
   1.380 +
   1.381 +	bev_async_del_read(bev_async);
   1.382 +	bev_async_del_write(bev_async);
   1.383 +
   1.384 +	fd = _evbuffer_overlapped_get_fd(bev->input);
   1.385 +	if (bev_p->options & BEV_OPT_CLOSE_ON_FREE) {
   1.386 +		/* XXXX possible double-close */
   1.387 +		evutil_closesocket(fd);
   1.388 +	}
   1.389 +	/* delete this in case non-blocking connect was used */
   1.390 +	if (event_initialized(&bev->ev_write)) {
   1.391 +		event_del(&bev->ev_write);
   1.392 +		_bufferevent_del_generic_timeout_cbs(bev);
   1.393 +	}
   1.394 +}
   1.395 +
   1.396 +/* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
   1.397 + * we use WSAGetOverlappedResult to translate. */
   1.398 +static void
   1.399 +bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo)
   1.400 +{
   1.401 +	DWORD bytes, flags;
   1.402 +	evutil_socket_t fd;
   1.403 +
   1.404 +	fd = _evbuffer_overlapped_get_fd(bev->input);
   1.405 +	WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags);
   1.406 +}
   1.407 +
   1.408 +static int
   1.409 +be_async_flush(struct bufferevent *bev, short what,
   1.410 +    enum bufferevent_flush_mode mode)
   1.411 +{
   1.412 +	return 0;
   1.413 +}
   1.414 +
   1.415 +static void
   1.416 +connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
   1.417 +    ev_ssize_t nbytes, int ok)
   1.418 +{
   1.419 +	struct bufferevent_async *bev_a = upcast_connect(eo);
   1.420 +	struct bufferevent *bev = &bev_a->bev.bev;
   1.421 +	evutil_socket_t sock;
   1.422 +
   1.423 +	BEV_LOCK(bev);
   1.424 +
   1.425 +	EVUTIL_ASSERT(bev_a->bev.connecting);
   1.426 +	bev_a->bev.connecting = 0;
   1.427 +	sock = _evbuffer_overlapped_get_fd(bev_a->bev.bev.input);
   1.428 +	/* XXXX Handle error? */
   1.429 +	setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
   1.430 +
   1.431 +	if (ok)
   1.432 +		bufferevent_async_set_connected(bev);
   1.433 +	else
   1.434 +		bev_async_set_wsa_error(bev, eo);
   1.435 +
   1.436 +	_bufferevent_run_eventcb(bev,
   1.437 +			ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR);
   1.438 +
   1.439 +	event_base_del_virtual(bev->ev_base);
   1.440 +
   1.441 +	_bufferevent_decref_and_unlock(bev);
   1.442 +}
   1.443 +
   1.444 +static void
   1.445 +read_complete(struct event_overlapped *eo, ev_uintptr_t key,
   1.446 +    ev_ssize_t nbytes, int ok)
   1.447 +{
   1.448 +	struct bufferevent_async *bev_a = upcast_read(eo);
   1.449 +	struct bufferevent *bev = &bev_a->bev.bev;
   1.450 +	short what = BEV_EVENT_READING;
   1.451 +	ev_ssize_t amount_unread;
   1.452 +	BEV_LOCK(bev);
   1.453 +	EVUTIL_ASSERT(bev_a->read_in_progress);
   1.454 +
   1.455 +	amount_unread = bev_a->read_in_progress - nbytes;
   1.456 +	evbuffer_commit_read(bev->input, nbytes);
   1.457 +	bev_a->read_in_progress = 0;
   1.458 +	if (amount_unread)
   1.459 +		_bufferevent_decrement_read_buckets(&bev_a->bev, -amount_unread);
   1.460 +
   1.461 +	if (!ok)
   1.462 +		bev_async_set_wsa_error(bev, eo);
   1.463 +
   1.464 +	if (bev_a->ok) {
   1.465 +		if (ok && nbytes) {
   1.466 +			BEV_RESET_GENERIC_READ_TIMEOUT(bev);
   1.467 +			if (evbuffer_get_length(bev->input) >= bev->wm_read.low)
   1.468 +				_bufferevent_run_readcb(bev);
   1.469 +			bev_async_consider_reading(bev_a);
   1.470 +		} else if (!ok) {
   1.471 +			what |= BEV_EVENT_ERROR;
   1.472 +			bev_a->ok = 0;
   1.473 +			_bufferevent_run_eventcb(bev, what);
   1.474 +		} else if (!nbytes) {
   1.475 +			what |= BEV_EVENT_EOF;
   1.476 +			bev_a->ok = 0;
   1.477 +			_bufferevent_run_eventcb(bev, what);
   1.478 +		}
   1.479 +	}
   1.480 +
   1.481 +	_bufferevent_decref_and_unlock(bev);
   1.482 +}
   1.483 +
   1.484 +static void
   1.485 +write_complete(struct event_overlapped *eo, ev_uintptr_t key,
   1.486 +    ev_ssize_t nbytes, int ok)
   1.487 +{
   1.488 +	struct bufferevent_async *bev_a = upcast_write(eo);
   1.489 +	struct bufferevent *bev = &bev_a->bev.bev;
   1.490 +	short what = BEV_EVENT_WRITING;
   1.491 +	ev_ssize_t amount_unwritten;
   1.492 +
   1.493 +	BEV_LOCK(bev);
   1.494 +	EVUTIL_ASSERT(bev_a->write_in_progress);
   1.495 +
   1.496 +	amount_unwritten = bev_a->write_in_progress - nbytes;
   1.497 +	evbuffer_commit_write(bev->output, nbytes);
   1.498 +	bev_a->write_in_progress = 0;
   1.499 +
   1.500 +	if (amount_unwritten)
   1.501 +		_bufferevent_decrement_write_buckets(&bev_a->bev,
   1.502 +		                                     -amount_unwritten);
   1.503 +
   1.504 +
   1.505 +	if (!ok)
   1.506 +		bev_async_set_wsa_error(bev, eo);
   1.507 +
   1.508 +	if (bev_a->ok) {
   1.509 +		if (ok && nbytes) {
   1.510 +			BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
   1.511 +			if (evbuffer_get_length(bev->output) <=
   1.512 +			    bev->wm_write.low)
   1.513 +				_bufferevent_run_writecb(bev);
   1.514 +			bev_async_consider_writing(bev_a);
   1.515 +		} else if (!ok) {
   1.516 +			what |= BEV_EVENT_ERROR;
   1.517 +			bev_a->ok = 0;
   1.518 +			_bufferevent_run_eventcb(bev, what);
   1.519 +		} else if (!nbytes) {
   1.520 +			what |= BEV_EVENT_EOF;
   1.521 +			bev_a->ok = 0;
   1.522 +			_bufferevent_run_eventcb(bev, what);
   1.523 +		}
   1.524 +	}
   1.525 +
   1.526 +	_bufferevent_decref_and_unlock(bev);
   1.527 +}
   1.528 +
   1.529 +struct bufferevent *
   1.530 +bufferevent_async_new(struct event_base *base,
   1.531 +    evutil_socket_t fd, int options)
   1.532 +{
   1.533 +	struct bufferevent_async *bev_a;
   1.534 +	struct bufferevent *bev;
   1.535 +	struct event_iocp_port *iocp;
   1.536 +
   1.537 +	options |= BEV_OPT_THREADSAFE;
   1.538 +
   1.539 +	if (!(iocp = event_base_get_iocp(base)))
   1.540 +		return NULL;
   1.541 +
   1.542 +	if (fd >= 0 && event_iocp_port_associate(iocp, fd, 1)<0) {
   1.543 +		int err = GetLastError();
   1.544 +		/* We may have alrady associated this fd with a port.
   1.545 +		 * Let's hope it's this port, and that the error code
   1.546 +		 * for doing this neer changes. */
   1.547 +		if (err != ERROR_INVALID_PARAMETER)
   1.548 +			return NULL;
   1.549 +	}
   1.550 +
   1.551 +	if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async))))
   1.552 +		return NULL;
   1.553 +
   1.554 +	bev = &bev_a->bev.bev;
   1.555 +	if (!(bev->input = evbuffer_overlapped_new(fd))) {
   1.556 +		mm_free(bev_a);
   1.557 +		return NULL;
   1.558 +	}
   1.559 +	if (!(bev->output = evbuffer_overlapped_new(fd))) {
   1.560 +		evbuffer_free(bev->input);
   1.561 +		mm_free(bev_a);
   1.562 +		return NULL;
   1.563 +	}
   1.564 +
   1.565 +	if (bufferevent_init_common(&bev_a->bev, base, &bufferevent_ops_async,
   1.566 +		options)<0)
   1.567 +		goto err;
   1.568 +
   1.569 +	evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
   1.570 +	evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
   1.571 +
   1.572 +	event_overlapped_init(&bev_a->connect_overlapped, connect_complete);
   1.573 +	event_overlapped_init(&bev_a->read_overlapped, read_complete);
   1.574 +	event_overlapped_init(&bev_a->write_overlapped, write_complete);
   1.575 +
   1.576 +	bev_a->ok = fd >= 0;
   1.577 +	if (bev_a->ok)
   1.578 +		_bufferevent_init_generic_timeout_cbs(bev);
   1.579 +
   1.580 +	return bev;
   1.581 +err:
   1.582 +	bufferevent_free(&bev_a->bev.bev);
   1.583 +	return NULL;
   1.584 +}
   1.585 +
   1.586 +void
   1.587 +bufferevent_async_set_connected(struct bufferevent *bev)
   1.588 +{
   1.589 +	struct bufferevent_async *bev_async = upcast(bev);
   1.590 +	bev_async->ok = 1;
   1.591 +	_bufferevent_init_generic_timeout_cbs(bev);
   1.592 +	/* Now's a good time to consider reading/writing */
   1.593 +	be_async_enable(bev, bev->enabled);
   1.594 +}
   1.595 +
   1.596 +int
   1.597 +bufferevent_async_can_connect(struct bufferevent *bev)
   1.598 +{
   1.599 +	const struct win32_extension_fns *ext =
   1.600 +	    event_get_win32_extension_fns();
   1.601 +
   1.602 +	if (BEV_IS_ASYNC(bev) &&
   1.603 +	    event_base_get_iocp(bev->ev_base) &&
   1.604 +	    ext && ext->ConnectEx)
   1.605 +		return 1;
   1.606 +
   1.607 +	return 0;
   1.608 +}
   1.609 +
   1.610 +int
   1.611 +bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd,
   1.612 +	const struct sockaddr *sa, int socklen)
   1.613 +{
   1.614 +	BOOL rc;
   1.615 +	struct bufferevent_async *bev_async = upcast(bev);
   1.616 +	struct sockaddr_storage ss;
   1.617 +	const struct win32_extension_fns *ext =
   1.618 +	    event_get_win32_extension_fns();
   1.619 +
   1.620 +	EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL);
   1.621 +
   1.622 +	/* ConnectEx() requires that the socket be bound to an address
   1.623 +	 * with bind() before using, otherwise it will fail. We attempt
   1.624 +	 * to issue a bind() here, taking into account that the error
   1.625 +	 * code is set to WSAEINVAL when the socket is already bound. */
   1.626 +	memset(&ss, 0, sizeof(ss));
   1.627 +	if (sa->sa_family == AF_INET) {
   1.628 +		struct sockaddr_in *sin = (struct sockaddr_in *)&ss;
   1.629 +		sin->sin_family = AF_INET;
   1.630 +		sin->sin_addr.s_addr = INADDR_ANY;
   1.631 +	} else if (sa->sa_family == AF_INET6) {
   1.632 +		struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss;
   1.633 +		sin6->sin6_family = AF_INET6;
   1.634 +		sin6->sin6_addr = in6addr_any;
   1.635 +	} else {
   1.636 +		/* Well, the user will have to bind() */
   1.637 +		return -1;
   1.638 +	}
   1.639 +	if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&
   1.640 +	    WSAGetLastError() != WSAEINVAL)
   1.641 +		return -1;
   1.642 +
   1.643 +	event_base_add_virtual(bev->ev_base);
   1.644 +	bufferevent_incref(bev);
   1.645 +	rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
   1.646 +			    &bev_async->connect_overlapped.overlapped);
   1.647 +	if (rc || WSAGetLastError() == ERROR_IO_PENDING)
   1.648 +		return 0;
   1.649 +
   1.650 +	event_base_del_virtual(bev->ev_base);
   1.651 +	bufferevent_decref(bev);
   1.652 +
   1.653 +	return -1;
   1.654 +}
   1.655 +
   1.656 +static int
   1.657 +be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
   1.658 +    union bufferevent_ctrl_data *data)
   1.659 +{
   1.660 +	switch (op) {
   1.661 +	case BEV_CTRL_GET_FD:
   1.662 +		data->fd = _evbuffer_overlapped_get_fd(bev->input);
   1.663 +		return 0;
   1.664 +	case BEV_CTRL_SET_FD: {
   1.665 +		struct event_iocp_port *iocp;
   1.666 +
   1.667 +		if (data->fd == _evbuffer_overlapped_get_fd(bev->input))
   1.668 +			return 0;
   1.669 +		if (!(iocp = event_base_get_iocp(bev->ev_base)))
   1.670 +			return -1;
   1.671 +		if (event_iocp_port_associate(iocp, data->fd, 1) < 0)
   1.672 +			return -1;
   1.673 +		_evbuffer_overlapped_set_fd(bev->input, data->fd);
   1.674 +		_evbuffer_overlapped_set_fd(bev->output, data->fd);
   1.675 +		return 0;
   1.676 +	}
   1.677 +	case BEV_CTRL_CANCEL_ALL: {
   1.678 +		struct bufferevent_async *bev_a = upcast(bev);
   1.679 +		evutil_socket_t fd = _evbuffer_overlapped_get_fd(bev->input);
   1.680 +		if (fd != (evutil_socket_t)INVALID_SOCKET &&
   1.681 +		    (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) {
   1.682 +			closesocket(fd);
   1.683 +		}
   1.684 +		bev_a->ok = 0;
   1.685 +		return 0;
   1.686 +	}
   1.687 +	case BEV_CTRL_GET_UNDERLYING:
   1.688 +	default:
   1.689 +		return -1;
   1.690 +	}
   1.691 +}
   1.692 +
   1.693 +

mercurial