michael@0: /* michael@0: * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson michael@0: * michael@0: * All rights reserved. michael@0: * michael@0: * Redistribution and use in source and binary forms, with or without michael@0: * modification, are permitted provided that the following conditions michael@0: * are met: michael@0: * 1. Redistributions of source code must retain the above copyright michael@0: * notice, this list of conditions and the following disclaimer. michael@0: * 2. Redistributions in binary form must reproduce the above copyright michael@0: * notice, this list of conditions and the following disclaimer in the michael@0: * documentation and/or other materials provided with the distribution. michael@0: * 3. The name of the author may not be used to endorse or promote products michael@0: * derived from this software without specific prior written permission. michael@0: * michael@0: * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR michael@0: * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES michael@0: * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. michael@0: * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, michael@0: * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT michael@0: * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, michael@0: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY michael@0: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT michael@0: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF michael@0: * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. michael@0: */ michael@0: michael@0: #include "event2/event-config.h" michael@0: michael@0: #ifdef _EVENT_HAVE_SYS_TIME_H michael@0: #include michael@0: #endif michael@0: michael@0: #include michael@0: #include michael@0: #include michael@0: #include michael@0: #ifdef _EVENT_HAVE_STDARG_H michael@0: #include michael@0: #endif michael@0: #ifdef _EVENT_HAVE_UNISTD_H michael@0: #include michael@0: #endif michael@0: michael@0: #ifdef WIN32 michael@0: #include michael@0: #include michael@0: #endif michael@0: michael@0: #include michael@0: michael@0: #include "event2/util.h" michael@0: #include "event2/bufferevent.h" michael@0: #include "event2/buffer.h" michael@0: #include "event2/bufferevent_struct.h" michael@0: #include "event2/event.h" michael@0: #include "event2/util.h" michael@0: #include "event-internal.h" michael@0: #include "log-internal.h" michael@0: #include "mm-internal.h" michael@0: #include "bufferevent-internal.h" michael@0: #include "util-internal.h" michael@0: #include "iocp-internal.h" michael@0: michael@0: #ifndef SO_UPDATE_CONNECT_CONTEXT michael@0: /* Mingw is sometimes missing this */ michael@0: #define SO_UPDATE_CONNECT_CONTEXT 0x7010 michael@0: #endif michael@0: michael@0: /* prototypes */ michael@0: static int be_async_enable(struct bufferevent *, short); michael@0: static int be_async_disable(struct bufferevent *, short); michael@0: static void be_async_destruct(struct bufferevent *); michael@0: static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode); michael@0: static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); michael@0: michael@0: struct bufferevent_async { michael@0: struct bufferevent_private bev; michael@0: struct event_overlapped connect_overlapped; michael@0: struct event_overlapped read_overlapped; michael@0: struct event_overlapped write_overlapped; michael@0: size_t read_in_progress; michael@0: size_t write_in_progress; michael@0: unsigned ok : 1; michael@0: unsigned read_added : 1; michael@0: unsigned write_added : 1; michael@0: }; michael@0: michael@0: const struct bufferevent_ops bufferevent_ops_async = { michael@0: "socket_async", michael@0: evutil_offsetof(struct bufferevent_async, bev.bev), michael@0: be_async_enable, michael@0: be_async_disable, michael@0: be_async_destruct, michael@0: _bufferevent_generic_adj_timeouts, michael@0: be_async_flush, michael@0: be_async_ctrl, michael@0: }; michael@0: michael@0: static inline struct bufferevent_async * michael@0: upcast(struct bufferevent *bev) michael@0: { michael@0: struct bufferevent_async *bev_a; michael@0: if (bev->be_ops != &bufferevent_ops_async) michael@0: return NULL; michael@0: bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev); michael@0: return bev_a; michael@0: } michael@0: michael@0: static inline struct bufferevent_async * michael@0: upcast_connect(struct event_overlapped *eo) michael@0: { michael@0: struct bufferevent_async *bev_a; michael@0: bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped); michael@0: EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); michael@0: return bev_a; michael@0: } michael@0: michael@0: static inline struct bufferevent_async * michael@0: upcast_read(struct event_overlapped *eo) michael@0: { michael@0: struct bufferevent_async *bev_a; michael@0: bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped); michael@0: EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); michael@0: return bev_a; michael@0: } michael@0: michael@0: static inline struct bufferevent_async * michael@0: upcast_write(struct event_overlapped *eo) michael@0: { michael@0: struct bufferevent_async *bev_a; michael@0: bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped); michael@0: EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); michael@0: return bev_a; michael@0: } michael@0: michael@0: static void michael@0: bev_async_del_write(struct bufferevent_async *beva) michael@0: { michael@0: struct bufferevent *bev = &beva->bev.bev; michael@0: michael@0: if (beva->write_added) { michael@0: beva->write_added = 0; michael@0: event_base_del_virtual(bev->ev_base); michael@0: } michael@0: } michael@0: michael@0: static void michael@0: bev_async_del_read(struct bufferevent_async *beva) michael@0: { michael@0: struct bufferevent *bev = &beva->bev.bev; michael@0: michael@0: if (beva->read_added) { michael@0: beva->read_added = 0; michael@0: event_base_del_virtual(bev->ev_base); michael@0: } michael@0: } michael@0: michael@0: static void michael@0: bev_async_add_write(struct bufferevent_async *beva) michael@0: { michael@0: struct bufferevent *bev = &beva->bev.bev; michael@0: michael@0: if (!beva->write_added) { michael@0: beva->write_added = 1; michael@0: event_base_add_virtual(bev->ev_base); michael@0: } michael@0: } michael@0: michael@0: static void michael@0: bev_async_add_read(struct bufferevent_async *beva) michael@0: { michael@0: struct bufferevent *bev = &beva->bev.bev; michael@0: michael@0: if (!beva->read_added) { michael@0: beva->read_added = 1; michael@0: event_base_add_virtual(bev->ev_base); michael@0: } michael@0: } michael@0: michael@0: static void michael@0: bev_async_consider_writing(struct bufferevent_async *beva) michael@0: { michael@0: size_t at_most; michael@0: int limit; michael@0: struct bufferevent *bev = &beva->bev.bev; michael@0: michael@0: /* Don't write if there's a write in progress, or we do not michael@0: * want to write, or when there's nothing left to write. */ michael@0: if (beva->write_in_progress || beva->bev.connecting) michael@0: return; michael@0: if (!beva->ok || !(bev->enabled&EV_WRITE) || michael@0: !evbuffer_get_length(bev->output)) { michael@0: bev_async_del_write(beva); michael@0: return; michael@0: } michael@0: michael@0: at_most = evbuffer_get_length(bev->output); michael@0: michael@0: /* This is safe so long as bufferevent_get_write_max never returns michael@0: * more than INT_MAX. That's true for now. XXXX */ michael@0: limit = (int)_bufferevent_get_write_max(&beva->bev); michael@0: if (at_most >= (size_t)limit && limit >= 0) michael@0: at_most = limit; michael@0: michael@0: if (beva->bev.write_suspended) { michael@0: bev_async_del_write(beva); michael@0: return; michael@0: } michael@0: michael@0: /* XXXX doesn't respect low-water mark very well. */ michael@0: bufferevent_incref(bev); michael@0: if (evbuffer_launch_write(bev->output, at_most, michael@0: &beva->write_overlapped)) { michael@0: bufferevent_decref(bev); michael@0: beva->ok = 0; michael@0: _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR); michael@0: } else { michael@0: beva->write_in_progress = at_most; michael@0: _bufferevent_decrement_write_buckets(&beva->bev, at_most); michael@0: bev_async_add_write(beva); michael@0: } michael@0: } michael@0: michael@0: static void michael@0: bev_async_consider_reading(struct bufferevent_async *beva) michael@0: { michael@0: size_t cur_size; michael@0: size_t read_high; michael@0: size_t at_most; michael@0: int limit; michael@0: struct bufferevent *bev = &beva->bev.bev; michael@0: michael@0: /* Don't read if there is a read in progress, or we do not michael@0: * want to read. */ michael@0: if (beva->read_in_progress || beva->bev.connecting) michael@0: return; michael@0: if (!beva->ok || !(bev->enabled&EV_READ)) { michael@0: bev_async_del_read(beva); michael@0: return; michael@0: } michael@0: michael@0: /* Don't read if we're full */ michael@0: cur_size = evbuffer_get_length(bev->input); michael@0: read_high = bev->wm_read.high; michael@0: if (read_high) { michael@0: if (cur_size >= read_high) { michael@0: bev_async_del_read(beva); michael@0: return; michael@0: } michael@0: at_most = read_high - cur_size; michael@0: } else { michael@0: at_most = 16384; /* FIXME totally magic. */ michael@0: } michael@0: michael@0: /* XXXX This over-commits. */ michael@0: /* XXXX see also not above on cast on _bufferevent_get_write_max() */ michael@0: limit = (int)_bufferevent_get_read_max(&beva->bev); michael@0: if (at_most >= (size_t)limit && limit >= 0) michael@0: at_most = limit; michael@0: michael@0: if (beva->bev.read_suspended) { michael@0: bev_async_del_read(beva); michael@0: return; michael@0: } michael@0: michael@0: bufferevent_incref(bev); michael@0: if (evbuffer_launch_read(bev->input, at_most, &beva->read_overlapped)) { michael@0: beva->ok = 0; michael@0: _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR); michael@0: bufferevent_decref(bev); michael@0: } else { michael@0: beva->read_in_progress = at_most; michael@0: _bufferevent_decrement_read_buckets(&beva->bev, at_most); michael@0: bev_async_add_read(beva); michael@0: } michael@0: michael@0: return; michael@0: } michael@0: michael@0: static void michael@0: be_async_outbuf_callback(struct evbuffer *buf, michael@0: const struct evbuffer_cb_info *cbinfo, michael@0: void *arg) michael@0: { michael@0: struct bufferevent *bev = arg; michael@0: struct bufferevent_async *bev_async = upcast(bev); michael@0: michael@0: /* If we added data to the outbuf and were not writing before, michael@0: * we may want to write now. */ michael@0: michael@0: _bufferevent_incref_and_lock(bev); michael@0: michael@0: if (cbinfo->n_added) michael@0: bev_async_consider_writing(bev_async); michael@0: michael@0: _bufferevent_decref_and_unlock(bev); michael@0: } michael@0: michael@0: static void michael@0: be_async_inbuf_callback(struct evbuffer *buf, michael@0: const struct evbuffer_cb_info *cbinfo, michael@0: void *arg) michael@0: { michael@0: struct bufferevent *bev = arg; michael@0: struct bufferevent_async *bev_async = upcast(bev); michael@0: michael@0: /* If we drained data from the inbuf and were not reading before, michael@0: * we may want to read now */ michael@0: michael@0: _bufferevent_incref_and_lock(bev); michael@0: michael@0: if (cbinfo->n_deleted) michael@0: bev_async_consider_reading(bev_async); michael@0: michael@0: _bufferevent_decref_and_unlock(bev); michael@0: } michael@0: michael@0: static int michael@0: be_async_enable(struct bufferevent *buf, short what) michael@0: { michael@0: struct bufferevent_async *bev_async = upcast(buf); michael@0: michael@0: if (!bev_async->ok) michael@0: return -1; michael@0: michael@0: if (bev_async->bev.connecting) { michael@0: /* Don't launch anything during connection attempts. */ michael@0: return 0; michael@0: } michael@0: michael@0: if (what & EV_READ) michael@0: BEV_RESET_GENERIC_READ_TIMEOUT(buf); michael@0: if (what & EV_WRITE) michael@0: BEV_RESET_GENERIC_WRITE_TIMEOUT(buf); michael@0: michael@0: /* If we newly enable reading or writing, and we aren't reading or michael@0: writing already, consider launching a new read or write. */ michael@0: michael@0: if (what & EV_READ) michael@0: bev_async_consider_reading(bev_async); michael@0: if (what & EV_WRITE) michael@0: bev_async_consider_writing(bev_async); michael@0: return 0; michael@0: } michael@0: michael@0: static int michael@0: be_async_disable(struct bufferevent *bev, short what) michael@0: { michael@0: struct bufferevent_async *bev_async = upcast(bev); michael@0: /* XXXX If we disable reading or writing, we may want to consider michael@0: * canceling any in-progress read or write operation, though it might michael@0: * not work. */ michael@0: michael@0: if (what & EV_READ) { michael@0: BEV_DEL_GENERIC_READ_TIMEOUT(bev); michael@0: bev_async_del_read(bev_async); michael@0: } michael@0: if (what & EV_WRITE) { michael@0: BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); michael@0: bev_async_del_write(bev_async); michael@0: } michael@0: michael@0: return 0; michael@0: } michael@0: michael@0: static void michael@0: be_async_destruct(struct bufferevent *bev) michael@0: { michael@0: struct bufferevent_async *bev_async = upcast(bev); michael@0: struct bufferevent_private *bev_p = BEV_UPCAST(bev); michael@0: evutil_socket_t fd; michael@0: michael@0: EVUTIL_ASSERT(!upcast(bev)->write_in_progress && michael@0: !upcast(bev)->read_in_progress); michael@0: michael@0: bev_async_del_read(bev_async); michael@0: bev_async_del_write(bev_async); michael@0: michael@0: fd = _evbuffer_overlapped_get_fd(bev->input); michael@0: if (bev_p->options & BEV_OPT_CLOSE_ON_FREE) { michael@0: /* XXXX possible double-close */ michael@0: evutil_closesocket(fd); michael@0: } michael@0: /* delete this in case non-blocking connect was used */ michael@0: if (event_initialized(&bev->ev_write)) { michael@0: event_del(&bev->ev_write); michael@0: _bufferevent_del_generic_timeout_cbs(bev); michael@0: } michael@0: } michael@0: michael@0: /* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so michael@0: * we use WSAGetOverlappedResult to translate. */ michael@0: static void michael@0: bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo) michael@0: { michael@0: DWORD bytes, flags; michael@0: evutil_socket_t fd; michael@0: michael@0: fd = _evbuffer_overlapped_get_fd(bev->input); michael@0: WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags); michael@0: } michael@0: michael@0: static int michael@0: be_async_flush(struct bufferevent *bev, short what, michael@0: enum bufferevent_flush_mode mode) michael@0: { michael@0: return 0; michael@0: } michael@0: michael@0: static void michael@0: connect_complete(struct event_overlapped *eo, ev_uintptr_t key, michael@0: ev_ssize_t nbytes, int ok) michael@0: { michael@0: struct bufferevent_async *bev_a = upcast_connect(eo); michael@0: struct bufferevent *bev = &bev_a->bev.bev; michael@0: evutil_socket_t sock; michael@0: michael@0: BEV_LOCK(bev); michael@0: michael@0: EVUTIL_ASSERT(bev_a->bev.connecting); michael@0: bev_a->bev.connecting = 0; michael@0: sock = _evbuffer_overlapped_get_fd(bev_a->bev.bev.input); michael@0: /* XXXX Handle error? */ michael@0: setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0); michael@0: michael@0: if (ok) michael@0: bufferevent_async_set_connected(bev); michael@0: else michael@0: bev_async_set_wsa_error(bev, eo); michael@0: michael@0: _bufferevent_run_eventcb(bev, michael@0: ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR); michael@0: michael@0: event_base_del_virtual(bev->ev_base); michael@0: michael@0: _bufferevent_decref_and_unlock(bev); michael@0: } michael@0: michael@0: static void michael@0: read_complete(struct event_overlapped *eo, ev_uintptr_t key, michael@0: ev_ssize_t nbytes, int ok) michael@0: { michael@0: struct bufferevent_async *bev_a = upcast_read(eo); michael@0: struct bufferevent *bev = &bev_a->bev.bev; michael@0: short what = BEV_EVENT_READING; michael@0: ev_ssize_t amount_unread; michael@0: BEV_LOCK(bev); michael@0: EVUTIL_ASSERT(bev_a->read_in_progress); michael@0: michael@0: amount_unread = bev_a->read_in_progress - nbytes; michael@0: evbuffer_commit_read(bev->input, nbytes); michael@0: bev_a->read_in_progress = 0; michael@0: if (amount_unread) michael@0: _bufferevent_decrement_read_buckets(&bev_a->bev, -amount_unread); michael@0: michael@0: if (!ok) michael@0: bev_async_set_wsa_error(bev, eo); michael@0: michael@0: if (bev_a->ok) { michael@0: if (ok && nbytes) { michael@0: BEV_RESET_GENERIC_READ_TIMEOUT(bev); michael@0: if (evbuffer_get_length(bev->input) >= bev->wm_read.low) michael@0: _bufferevent_run_readcb(bev); michael@0: bev_async_consider_reading(bev_a); michael@0: } else if (!ok) { michael@0: what |= BEV_EVENT_ERROR; michael@0: bev_a->ok = 0; michael@0: _bufferevent_run_eventcb(bev, what); michael@0: } else if (!nbytes) { michael@0: what |= BEV_EVENT_EOF; michael@0: bev_a->ok = 0; michael@0: _bufferevent_run_eventcb(bev, what); michael@0: } michael@0: } michael@0: michael@0: _bufferevent_decref_and_unlock(bev); michael@0: } michael@0: michael@0: static void michael@0: write_complete(struct event_overlapped *eo, ev_uintptr_t key, michael@0: ev_ssize_t nbytes, int ok) michael@0: { michael@0: struct bufferevent_async *bev_a = upcast_write(eo); michael@0: struct bufferevent *bev = &bev_a->bev.bev; michael@0: short what = BEV_EVENT_WRITING; michael@0: ev_ssize_t amount_unwritten; michael@0: michael@0: BEV_LOCK(bev); michael@0: EVUTIL_ASSERT(bev_a->write_in_progress); michael@0: michael@0: amount_unwritten = bev_a->write_in_progress - nbytes; michael@0: evbuffer_commit_write(bev->output, nbytes); michael@0: bev_a->write_in_progress = 0; michael@0: michael@0: if (amount_unwritten) michael@0: _bufferevent_decrement_write_buckets(&bev_a->bev, michael@0: -amount_unwritten); michael@0: michael@0: michael@0: if (!ok) michael@0: bev_async_set_wsa_error(bev, eo); michael@0: michael@0: if (bev_a->ok) { michael@0: if (ok && nbytes) { michael@0: BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); michael@0: if (evbuffer_get_length(bev->output) <= michael@0: bev->wm_write.low) michael@0: _bufferevent_run_writecb(bev); michael@0: bev_async_consider_writing(bev_a); michael@0: } else if (!ok) { michael@0: what |= BEV_EVENT_ERROR; michael@0: bev_a->ok = 0; michael@0: _bufferevent_run_eventcb(bev, what); michael@0: } else if (!nbytes) { michael@0: what |= BEV_EVENT_EOF; michael@0: bev_a->ok = 0; michael@0: _bufferevent_run_eventcb(bev, what); michael@0: } michael@0: } michael@0: michael@0: _bufferevent_decref_and_unlock(bev); michael@0: } michael@0: michael@0: struct bufferevent * michael@0: bufferevent_async_new(struct event_base *base, michael@0: evutil_socket_t fd, int options) michael@0: { michael@0: struct bufferevent_async *bev_a; michael@0: struct bufferevent *bev; michael@0: struct event_iocp_port *iocp; michael@0: michael@0: options |= BEV_OPT_THREADSAFE; michael@0: michael@0: if (!(iocp = event_base_get_iocp(base))) michael@0: return NULL; michael@0: michael@0: if (fd >= 0 && event_iocp_port_associate(iocp, fd, 1)<0) { michael@0: int err = GetLastError(); michael@0: /* We may have alrady associated this fd with a port. michael@0: * Let's hope it's this port, and that the error code michael@0: * for doing this neer changes. */ michael@0: if (err != ERROR_INVALID_PARAMETER) michael@0: return NULL; michael@0: } michael@0: michael@0: if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async)))) michael@0: return NULL; michael@0: michael@0: bev = &bev_a->bev.bev; michael@0: if (!(bev->input = evbuffer_overlapped_new(fd))) { michael@0: mm_free(bev_a); michael@0: return NULL; michael@0: } michael@0: if (!(bev->output = evbuffer_overlapped_new(fd))) { michael@0: evbuffer_free(bev->input); michael@0: mm_free(bev_a); michael@0: return NULL; michael@0: } michael@0: michael@0: if (bufferevent_init_common(&bev_a->bev, base, &bufferevent_ops_async, michael@0: options)<0) michael@0: goto err; michael@0: michael@0: evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev); michael@0: evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev); michael@0: michael@0: event_overlapped_init(&bev_a->connect_overlapped, connect_complete); michael@0: event_overlapped_init(&bev_a->read_overlapped, read_complete); michael@0: event_overlapped_init(&bev_a->write_overlapped, write_complete); michael@0: michael@0: bev_a->ok = fd >= 0; michael@0: if (bev_a->ok) michael@0: _bufferevent_init_generic_timeout_cbs(bev); michael@0: michael@0: return bev; michael@0: err: michael@0: bufferevent_free(&bev_a->bev.bev); michael@0: return NULL; michael@0: } michael@0: michael@0: void michael@0: bufferevent_async_set_connected(struct bufferevent *bev) michael@0: { michael@0: struct bufferevent_async *bev_async = upcast(bev); michael@0: bev_async->ok = 1; michael@0: _bufferevent_init_generic_timeout_cbs(bev); michael@0: /* Now's a good time to consider reading/writing */ michael@0: be_async_enable(bev, bev->enabled); michael@0: } michael@0: michael@0: int michael@0: bufferevent_async_can_connect(struct bufferevent *bev) michael@0: { michael@0: const struct win32_extension_fns *ext = michael@0: event_get_win32_extension_fns(); michael@0: michael@0: if (BEV_IS_ASYNC(bev) && michael@0: event_base_get_iocp(bev->ev_base) && michael@0: ext && ext->ConnectEx) michael@0: return 1; michael@0: michael@0: return 0; michael@0: } michael@0: michael@0: int michael@0: bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd, michael@0: const struct sockaddr *sa, int socklen) michael@0: { michael@0: BOOL rc; michael@0: struct bufferevent_async *bev_async = upcast(bev); michael@0: struct sockaddr_storage ss; michael@0: const struct win32_extension_fns *ext = michael@0: event_get_win32_extension_fns(); michael@0: michael@0: EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL); michael@0: michael@0: /* ConnectEx() requires that the socket be bound to an address michael@0: * with bind() before using, otherwise it will fail. We attempt michael@0: * to issue a bind() here, taking into account that the error michael@0: * code is set to WSAEINVAL when the socket is already bound. */ michael@0: memset(&ss, 0, sizeof(ss)); michael@0: if (sa->sa_family == AF_INET) { michael@0: struct sockaddr_in *sin = (struct sockaddr_in *)&ss; michael@0: sin->sin_family = AF_INET; michael@0: sin->sin_addr.s_addr = INADDR_ANY; michael@0: } else if (sa->sa_family == AF_INET6) { michael@0: struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss; michael@0: sin6->sin6_family = AF_INET6; michael@0: sin6->sin6_addr = in6addr_any; michael@0: } else { michael@0: /* Well, the user will have to bind() */ michael@0: return -1; michael@0: } michael@0: if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 && michael@0: WSAGetLastError() != WSAEINVAL) michael@0: return -1; michael@0: michael@0: event_base_add_virtual(bev->ev_base); michael@0: bufferevent_incref(bev); michael@0: rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL, michael@0: &bev_async->connect_overlapped.overlapped); michael@0: if (rc || WSAGetLastError() == ERROR_IO_PENDING) michael@0: return 0; michael@0: michael@0: event_base_del_virtual(bev->ev_base); michael@0: bufferevent_decref(bev); michael@0: michael@0: return -1; michael@0: } michael@0: michael@0: static int michael@0: be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, michael@0: union bufferevent_ctrl_data *data) michael@0: { michael@0: switch (op) { michael@0: case BEV_CTRL_GET_FD: michael@0: data->fd = _evbuffer_overlapped_get_fd(bev->input); michael@0: return 0; michael@0: case BEV_CTRL_SET_FD: { michael@0: struct event_iocp_port *iocp; michael@0: michael@0: if (data->fd == _evbuffer_overlapped_get_fd(bev->input)) michael@0: return 0; michael@0: if (!(iocp = event_base_get_iocp(bev->ev_base))) michael@0: return -1; michael@0: if (event_iocp_port_associate(iocp, data->fd, 1) < 0) michael@0: return -1; michael@0: _evbuffer_overlapped_set_fd(bev->input, data->fd); michael@0: _evbuffer_overlapped_set_fd(bev->output, data->fd); michael@0: return 0; michael@0: } michael@0: case BEV_CTRL_CANCEL_ALL: { michael@0: struct bufferevent_async *bev_a = upcast(bev); michael@0: evutil_socket_t fd = _evbuffer_overlapped_get_fd(bev->input); michael@0: if (fd != (evutil_socket_t)INVALID_SOCKET && michael@0: (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) { michael@0: closesocket(fd); michael@0: } michael@0: bev_a->ok = 0; michael@0: return 0; michael@0: } michael@0: case BEV_CTRL_GET_UNDERLYING: michael@0: default: michael@0: return -1; michael@0: } michael@0: } michael@0: michael@0: