michael@0: /* michael@0: * Copyright (c) 2009-2012 Niels Provos, Nick Mathewson michael@0: * michael@0: * Redistribution and use in source and binary forms, with or without michael@0: * modification, are permitted provided that the following conditions michael@0: * are met: michael@0: * 1. Redistributions of source code must retain the above copyright michael@0: * notice, this list of conditions and the following disclaimer. michael@0: * 2. Redistributions in binary form must reproduce the above copyright michael@0: * notice, this list of conditions and the following disclaimer in the michael@0: * documentation and/or other materials provided with the distribution. michael@0: * 3. The name of the author may not be used to endorse or promote products michael@0: * derived from this software without specific prior written permission. michael@0: * michael@0: * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR michael@0: * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES michael@0: * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. michael@0: * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, michael@0: * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT michael@0: * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, michael@0: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY michael@0: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT michael@0: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF michael@0: * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. michael@0: */ michael@0: michael@0: #ifndef _WIN32_WINNT michael@0: /* Minimum required for InitializeCriticalSectionAndSpinCount */ michael@0: #define _WIN32_WINNT 0x0403 michael@0: #endif michael@0: #include michael@0: #include michael@0: #include michael@0: #include michael@0: #include michael@0: michael@0: #include "event2/util.h" michael@0: #include "util-internal.h" michael@0: #include "iocp-internal.h" michael@0: #include "log-internal.h" michael@0: #include "mm-internal.h" michael@0: #include "event-internal.h" michael@0: #include "evthread-internal.h" michael@0: michael@0: #define NOTIFICATION_KEY ((ULONG_PTR)-1) michael@0: michael@0: void michael@0: event_overlapped_init(struct event_overlapped *o, iocp_callback cb) michael@0: { michael@0: memset(o, 0, sizeof(struct event_overlapped)); michael@0: o->cb = cb; michael@0: } michael@0: michael@0: static void michael@0: handle_entry(OVERLAPPED *o, ULONG_PTR completion_key, DWORD nBytes, int ok) michael@0: { michael@0: struct event_overlapped *eo = michael@0: EVUTIL_UPCAST(o, struct event_overlapped, overlapped); michael@0: eo->cb(eo, completion_key, nBytes, ok); michael@0: } michael@0: michael@0: static void michael@0: loop(void *_port) michael@0: { michael@0: struct event_iocp_port *port = _port; michael@0: long ms = port->ms; michael@0: HANDLE p = port->port; michael@0: michael@0: if (ms <= 0) michael@0: ms = INFINITE; michael@0: michael@0: while (1) { michael@0: OVERLAPPED *overlapped=NULL; michael@0: ULONG_PTR key=0; michael@0: DWORD bytes=0; michael@0: int ok = GetQueuedCompletionStatus(p, &bytes, &key, michael@0: &overlapped, ms); michael@0: EnterCriticalSection(&port->lock); michael@0: if (port->shutdown) { michael@0: if (--port->n_live_threads == 0) michael@0: ReleaseSemaphore(port->shutdownSemaphore, 1, michael@0: NULL); michael@0: LeaveCriticalSection(&port->lock); michael@0: return; michael@0: } michael@0: LeaveCriticalSection(&port->lock); michael@0: michael@0: if (key != NOTIFICATION_KEY && overlapped) michael@0: handle_entry(overlapped, key, bytes, ok); michael@0: else if (!overlapped) michael@0: break; michael@0: } michael@0: event_warnx("GetQueuedCompletionStatus exited with no event."); michael@0: EnterCriticalSection(&port->lock); michael@0: if (--port->n_live_threads == 0) michael@0: ReleaseSemaphore(port->shutdownSemaphore, 1, NULL); michael@0: LeaveCriticalSection(&port->lock); michael@0: } michael@0: michael@0: int michael@0: event_iocp_port_associate(struct event_iocp_port *port, evutil_socket_t fd, michael@0: ev_uintptr_t key) michael@0: { michael@0: HANDLE h; michael@0: h = CreateIoCompletionPort((HANDLE)fd, port->port, key, port->n_threads); michael@0: if (!h) michael@0: return -1; michael@0: return 0; michael@0: } michael@0: michael@0: static void * michael@0: get_extension_function(SOCKET s, const GUID *which_fn) michael@0: { michael@0: void *ptr = NULL; michael@0: DWORD bytes=0; michael@0: WSAIoctl(s, SIO_GET_EXTENSION_FUNCTION_POINTER, michael@0: (GUID*)which_fn, sizeof(*which_fn), michael@0: &ptr, sizeof(ptr), michael@0: &bytes, NULL, NULL); michael@0: michael@0: /* No need to detect errors here: if ptr is set, then we have a good michael@0: function pointer. Otherwise, we should behave as if we had no michael@0: function pointer. michael@0: */ michael@0: return ptr; michael@0: } michael@0: michael@0: /* Mingw doesn't have these in its mswsock.h. The values are copied from michael@0: wine.h. Perhaps if we copy them exactly, the cargo will come again. michael@0: */ michael@0: #ifndef WSAID_ACCEPTEX michael@0: #define WSAID_ACCEPTEX \ michael@0: {0xb5367df1,0xcbac,0x11cf,{0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92}} michael@0: #endif michael@0: #ifndef WSAID_CONNECTEX michael@0: #define WSAID_CONNECTEX \ michael@0: {0x25a207b9,0xddf3,0x4660,{0x8e,0xe9,0x76,0xe5,0x8c,0x74,0x06,0x3e}} michael@0: #endif michael@0: #ifndef WSAID_GETACCEPTEXSOCKADDRS michael@0: #define WSAID_GETACCEPTEXSOCKADDRS \ michael@0: {0xb5367df2,0xcbac,0x11cf,{0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92}} michael@0: #endif michael@0: michael@0: static void michael@0: init_extension_functions(struct win32_extension_fns *ext) michael@0: { michael@0: const GUID acceptex = WSAID_ACCEPTEX; michael@0: const GUID connectex = WSAID_CONNECTEX; michael@0: const GUID getacceptexsockaddrs = WSAID_GETACCEPTEXSOCKADDRS; michael@0: SOCKET s = socket(AF_INET, SOCK_STREAM, 0); michael@0: if (s == INVALID_SOCKET) michael@0: return; michael@0: ext->AcceptEx = get_extension_function(s, &acceptex); michael@0: ext->ConnectEx = get_extension_function(s, &connectex); michael@0: ext->GetAcceptExSockaddrs = get_extension_function(s, michael@0: &getacceptexsockaddrs); michael@0: closesocket(s); michael@0: } michael@0: michael@0: static struct win32_extension_fns the_extension_fns; michael@0: static int extension_fns_initialized = 0; michael@0: michael@0: const struct win32_extension_fns * michael@0: event_get_win32_extension_fns(void) michael@0: { michael@0: return &the_extension_fns; michael@0: } michael@0: michael@0: #define N_CPUS_DEFAULT 2 michael@0: michael@0: struct event_iocp_port * michael@0: event_iocp_port_launch(int n_cpus) michael@0: { michael@0: struct event_iocp_port *port; michael@0: int i; michael@0: michael@0: if (!extension_fns_initialized) michael@0: init_extension_functions(&the_extension_fns); michael@0: michael@0: if (!(port = mm_calloc(1, sizeof(struct event_iocp_port)))) michael@0: return NULL; michael@0: michael@0: if (n_cpus <= 0) michael@0: n_cpus = N_CPUS_DEFAULT; michael@0: port->n_threads = n_cpus * 2; michael@0: port->threads = mm_calloc(port->n_threads, sizeof(HANDLE)); michael@0: if (!port->threads) michael@0: goto err; michael@0: michael@0: port->port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, michael@0: n_cpus); michael@0: port->ms = -1; michael@0: if (!port->port) michael@0: goto err; michael@0: michael@0: port->shutdownSemaphore = CreateSemaphore(NULL, 0, 1, NULL); michael@0: if (!port->shutdownSemaphore) michael@0: goto err; michael@0: michael@0: for (i=0; in_threads; ++i) { michael@0: ev_uintptr_t th = _beginthread(loop, 0, port); michael@0: if (th == (ev_uintptr_t)-1) michael@0: goto err; michael@0: port->threads[i] = (HANDLE)th; michael@0: ++port->n_live_threads; michael@0: } michael@0: michael@0: InitializeCriticalSectionAndSpinCount(&port->lock, 1000); michael@0: michael@0: return port; michael@0: err: michael@0: if (port->port) michael@0: CloseHandle(port->port); michael@0: if (port->threads) michael@0: mm_free(port->threads); michael@0: if (port->shutdownSemaphore) michael@0: CloseHandle(port->shutdownSemaphore); michael@0: mm_free(port); michael@0: return NULL; michael@0: } michael@0: michael@0: static void michael@0: _event_iocp_port_unlock_and_free(struct event_iocp_port *port) michael@0: { michael@0: DeleteCriticalSection(&port->lock); michael@0: CloseHandle(port->port); michael@0: CloseHandle(port->shutdownSemaphore); michael@0: mm_free(port->threads); michael@0: mm_free(port); michael@0: } michael@0: michael@0: static int michael@0: event_iocp_notify_all(struct event_iocp_port *port) michael@0: { michael@0: int i, r, ok=1; michael@0: for (i=0; in_threads; ++i) { michael@0: r = PostQueuedCompletionStatus(port->port, 0, NOTIFICATION_KEY, michael@0: NULL); michael@0: if (!r) michael@0: ok = 0; michael@0: } michael@0: return ok ? 0 : -1; michael@0: } michael@0: michael@0: int michael@0: event_iocp_shutdown(struct event_iocp_port *port, long waitMsec) michael@0: { michael@0: DWORD ms = INFINITE; michael@0: int n; michael@0: michael@0: EnterCriticalSection(&port->lock); michael@0: port->shutdown = 1; michael@0: LeaveCriticalSection(&port->lock); michael@0: event_iocp_notify_all(port); michael@0: michael@0: if (waitMsec >= 0) michael@0: ms = waitMsec; michael@0: michael@0: WaitForSingleObject(port->shutdownSemaphore, ms); michael@0: EnterCriticalSection(&port->lock); michael@0: n = port->n_live_threads; michael@0: LeaveCriticalSection(&port->lock); michael@0: if (n == 0) { michael@0: _event_iocp_port_unlock_and_free(port); michael@0: return 0; michael@0: } else { michael@0: return -1; michael@0: } michael@0: } michael@0: michael@0: int michael@0: event_iocp_activate_overlapped( michael@0: struct event_iocp_port *port, struct event_overlapped *o, michael@0: ev_uintptr_t key, ev_uint32_t n) michael@0: { michael@0: BOOL r; michael@0: michael@0: r = PostQueuedCompletionStatus(port->port, n, key, &o->overlapped); michael@0: return (r==0) ? -1 : 0; michael@0: } michael@0: michael@0: struct event_iocp_port * michael@0: event_base_get_iocp(struct event_base *base) michael@0: { michael@0: #ifdef WIN32 michael@0: return base->iocp; michael@0: #else michael@0: return NULL; michael@0: #endif michael@0: }