michael@0: /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: #include "primpl.h" michael@0: #include "pprmwait.h" michael@0: michael@0: #define _MW_REHASH_MAX 11 michael@0: michael@0: static PRLock *mw_lock = NULL; michael@0: static _PRGlobalState *mw_state = NULL; michael@0: michael@0: static PRIntervalTime max_polling_interval; michael@0: michael@0: #ifdef WINNT michael@0: michael@0: typedef struct TimerEvent { michael@0: PRIntervalTime absolute; michael@0: void (*func)(void *); michael@0: void *arg; michael@0: LONG ref_count; michael@0: PRCList links; michael@0: } TimerEvent; michael@0: michael@0: #define TIMER_EVENT_PTR(_qp) \ michael@0: ((TimerEvent *) ((char *) (_qp) - offsetof(TimerEvent, links))) michael@0: michael@0: struct { michael@0: PRLock *ml; michael@0: PRCondVar *new_timer; michael@0: PRCondVar *cancel_timer; michael@0: PRThread *manager_thread; michael@0: PRCList timer_queue; michael@0: } tm_vars; michael@0: michael@0: static PRStatus TimerInit(void); michael@0: static void TimerManager(void *arg); michael@0: static TimerEvent *CreateTimer(PRIntervalTime timeout, michael@0: void (*func)(void *), void *arg); michael@0: static PRBool CancelTimer(TimerEvent *timer); michael@0: michael@0: static void TimerManager(void *arg) michael@0: { michael@0: PRIntervalTime now; michael@0: PRIntervalTime timeout; michael@0: PRCList *head; michael@0: TimerEvent *timer; michael@0: michael@0: PR_Lock(tm_vars.ml); michael@0: while (1) michael@0: { michael@0: if (PR_CLIST_IS_EMPTY(&tm_vars.timer_queue)) michael@0: { michael@0: PR_WaitCondVar(tm_vars.new_timer, PR_INTERVAL_NO_TIMEOUT); michael@0: } michael@0: else michael@0: { michael@0: now = PR_IntervalNow(); michael@0: head = PR_LIST_HEAD(&tm_vars.timer_queue); michael@0: timer = TIMER_EVENT_PTR(head); michael@0: if ((PRInt32) (now - timer->absolute) >= 0) michael@0: { michael@0: PR_REMOVE_LINK(head); michael@0: /* michael@0: * make its prev and next point to itself so that michael@0: * it's obvious that it's not on the timer_queue. michael@0: */ michael@0: PR_INIT_CLIST(head); michael@0: PR_ASSERT(2 == timer->ref_count); michael@0: PR_Unlock(tm_vars.ml); michael@0: timer->func(timer->arg); michael@0: PR_Lock(tm_vars.ml); michael@0: timer->ref_count -= 1; michael@0: if (0 == timer->ref_count) michael@0: { michael@0: PR_NotifyAllCondVar(tm_vars.cancel_timer); michael@0: } michael@0: } michael@0: else michael@0: { michael@0: timeout = (PRIntervalTime)(timer->absolute - now); michael@0: PR_WaitCondVar(tm_vars.new_timer, timeout); michael@0: } michael@0: } michael@0: } michael@0: PR_Unlock(tm_vars.ml); michael@0: } michael@0: michael@0: static TimerEvent *CreateTimer( michael@0: PRIntervalTime timeout, michael@0: void (*func)(void *), michael@0: void *arg) michael@0: { michael@0: TimerEvent *timer; michael@0: PRCList *links, *tail; michael@0: TimerEvent *elem; michael@0: michael@0: timer = PR_NEW(TimerEvent); michael@0: if (NULL == timer) michael@0: { michael@0: PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); michael@0: return timer; michael@0: } michael@0: timer->absolute = PR_IntervalNow() + timeout; michael@0: timer->func = func; michael@0: timer->arg = arg; michael@0: timer->ref_count = 2; michael@0: PR_Lock(tm_vars.ml); michael@0: tail = links = PR_LIST_TAIL(&tm_vars.timer_queue); michael@0: while (links->prev != tail) michael@0: { michael@0: elem = TIMER_EVENT_PTR(links); michael@0: if ((PRInt32)(timer->absolute - elem->absolute) >= 0) michael@0: { michael@0: break; michael@0: } michael@0: links = links->prev; michael@0: } michael@0: PR_INSERT_AFTER(&timer->links, links); michael@0: PR_NotifyCondVar(tm_vars.new_timer); michael@0: PR_Unlock(tm_vars.ml); michael@0: return timer; michael@0: } michael@0: michael@0: static PRBool CancelTimer(TimerEvent *timer) michael@0: { michael@0: PRBool canceled = PR_FALSE; michael@0: michael@0: PR_Lock(tm_vars.ml); michael@0: timer->ref_count -= 1; michael@0: if (timer->links.prev == &timer->links) michael@0: { michael@0: while (timer->ref_count == 1) michael@0: { michael@0: PR_WaitCondVar(tm_vars.cancel_timer, PR_INTERVAL_NO_TIMEOUT); michael@0: } michael@0: } michael@0: else michael@0: { michael@0: PR_REMOVE_LINK(&timer->links); michael@0: canceled = PR_TRUE; michael@0: } michael@0: PR_Unlock(tm_vars.ml); michael@0: PR_DELETE(timer); michael@0: return canceled; michael@0: } michael@0: michael@0: static PRStatus TimerInit(void) michael@0: { michael@0: tm_vars.ml = PR_NewLock(); michael@0: if (NULL == tm_vars.ml) michael@0: { michael@0: goto failed; michael@0: } michael@0: tm_vars.new_timer = PR_NewCondVar(tm_vars.ml); michael@0: if (NULL == tm_vars.new_timer) michael@0: { michael@0: goto failed; michael@0: } michael@0: tm_vars.cancel_timer = PR_NewCondVar(tm_vars.ml); michael@0: if (NULL == tm_vars.cancel_timer) michael@0: { michael@0: goto failed; michael@0: } michael@0: PR_INIT_CLIST(&tm_vars.timer_queue); michael@0: tm_vars.manager_thread = PR_CreateThread( michael@0: PR_SYSTEM_THREAD, TimerManager, NULL, PR_PRIORITY_NORMAL, michael@0: PR_LOCAL_THREAD, PR_UNJOINABLE_THREAD, 0); michael@0: if (NULL == tm_vars.manager_thread) michael@0: { michael@0: goto failed; michael@0: } michael@0: return PR_SUCCESS; michael@0: michael@0: failed: michael@0: if (NULL != tm_vars.cancel_timer) michael@0: { michael@0: PR_DestroyCondVar(tm_vars.cancel_timer); michael@0: } michael@0: if (NULL != tm_vars.new_timer) michael@0: { michael@0: PR_DestroyCondVar(tm_vars.new_timer); michael@0: } michael@0: if (NULL != tm_vars.ml) michael@0: { michael@0: PR_DestroyLock(tm_vars.ml); michael@0: } michael@0: return PR_FAILURE; michael@0: } michael@0: michael@0: #endif /* WINNT */ michael@0: michael@0: /******************************************************************/ michael@0: /******************************************************************/ michael@0: /************************ The private portion *********************/ michael@0: /******************************************************************/ michael@0: /******************************************************************/ michael@0: void _PR_InitMW(void) michael@0: { michael@0: #ifdef WINNT michael@0: /* michael@0: * We use NT 4's InterlockedCompareExchange() to operate michael@0: * on PRMWStatus variables. michael@0: */ michael@0: PR_ASSERT(sizeof(LONG) == sizeof(PRMWStatus)); michael@0: TimerInit(); michael@0: #endif michael@0: mw_lock = PR_NewLock(); michael@0: PR_ASSERT(NULL != mw_lock); michael@0: mw_state = PR_NEWZAP(_PRGlobalState); michael@0: PR_ASSERT(NULL != mw_state); michael@0: PR_INIT_CLIST(&mw_state->group_list); michael@0: max_polling_interval = PR_MillisecondsToInterval(MAX_POLLING_INTERVAL); michael@0: } /* _PR_InitMW */ michael@0: michael@0: void _PR_CleanupMW(void) michael@0: { michael@0: PR_DestroyLock(mw_lock); michael@0: mw_lock = NULL; michael@0: if (mw_state->group) { michael@0: PR_DestroyWaitGroup(mw_state->group); michael@0: /* mw_state->group is set to NULL as a side effect. */ michael@0: } michael@0: PR_DELETE(mw_state); michael@0: } /* _PR_CleanupMW */ michael@0: michael@0: static PRWaitGroup *MW_Init2(void) michael@0: { michael@0: PRWaitGroup *group = mw_state->group; /* it's the null group */ michael@0: if (NULL == group) /* there is this special case */ michael@0: { michael@0: group = PR_CreateWaitGroup(_PR_DEFAULT_HASH_LENGTH); michael@0: if (NULL == group) goto failed_alloc; michael@0: PR_Lock(mw_lock); michael@0: if (NULL == mw_state->group) michael@0: { michael@0: mw_state->group = group; michael@0: group = NULL; michael@0: } michael@0: PR_Unlock(mw_lock); michael@0: if (group != NULL) (void)PR_DestroyWaitGroup(group); michael@0: group = mw_state->group; /* somebody beat us to it */ michael@0: } michael@0: failed_alloc: michael@0: return group; /* whatever */ michael@0: } /* MW_Init2 */ michael@0: michael@0: static _PR_HashStory MW_AddHashInternal(PRRecvWait *desc, _PRWaiterHash *hash) michael@0: { michael@0: /* michael@0: ** The entries are put in the table using the fd (PRFileDesc*) of michael@0: ** the receive descriptor as the key. This allows us to locate michael@0: ** the appropriate entry aqain when the poll operation finishes. michael@0: ** michael@0: ** The pointer to the file descriptor object is first divided by michael@0: ** the natural alignment of a pointer in the belief that object michael@0: ** will have at least that many zeros in the low order bits. michael@0: ** This may not be a good assuption. michael@0: ** michael@0: ** We try to put the entry in by rehashing _MW_REHASH_MAX times. After michael@0: ** that we declare defeat and force the table to be reconstructed. michael@0: ** Since some fds might be added more than once, won't that cause michael@0: ** collisions even in an empty table? michael@0: */ michael@0: PRIntn rehash = _MW_REHASH_MAX; michael@0: PRRecvWait **waiter; michael@0: PRUintn hidx = _MW_HASH(desc->fd, hash->length); michael@0: PRUintn hoffset = 0; michael@0: michael@0: while (rehash-- > 0) michael@0: { michael@0: waiter = &hash->recv_wait; michael@0: if (NULL == waiter[hidx]) michael@0: { michael@0: waiter[hidx] = desc; michael@0: hash->count += 1; michael@0: #if 0 michael@0: printf("Adding 0x%x->0x%x ", desc, desc->fd); michael@0: printf( michael@0: "table[%u:%u:*%u]: 0x%x->0x%x\n", michael@0: hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd); michael@0: #endif michael@0: return _prmw_success; michael@0: } michael@0: if (desc == waiter[hidx]) michael@0: { michael@0: PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); /* desc already in table */ michael@0: return _prmw_error; michael@0: } michael@0: #if 0 michael@0: printf("Failing 0x%x->0x%x ", desc, desc->fd); michael@0: printf( michael@0: "table[*%u:%u:%u]: 0x%x->0x%x\n", michael@0: hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd); michael@0: #endif michael@0: if (0 == hoffset) michael@0: { michael@0: hoffset = _MW_HASH2(desc->fd, hash->length); michael@0: PR_ASSERT(0 != hoffset); michael@0: } michael@0: hidx = (hidx + hoffset) % (hash->length); michael@0: } michael@0: return _prmw_rehash; michael@0: } /* MW_AddHashInternal */ michael@0: michael@0: static _PR_HashStory MW_ExpandHashInternal(PRWaitGroup *group) michael@0: { michael@0: PRRecvWait **desc; michael@0: PRUint32 pidx, length; michael@0: _PRWaiterHash *newHash, *oldHash = group->waiter; michael@0: PRBool retry; michael@0: _PR_HashStory hrv; michael@0: michael@0: static const PRInt32 prime_number[] = { michael@0: _PR_DEFAULT_HASH_LENGTH, 179, 521, 907, 1427, michael@0: 2711, 3917, 5021, 8219, 11549, 18911, 26711, 33749, 44771}; michael@0: PRUintn primes = (sizeof(prime_number) / sizeof(PRInt32)); michael@0: michael@0: /* look up the next size we'd like to use for the hash table */ michael@0: for (pidx = 0; pidx < primes; ++pidx) michael@0: { michael@0: if (prime_number[pidx] == oldHash->length) michael@0: { michael@0: break; michael@0: } michael@0: } michael@0: /* table size must be one of the prime numbers */ michael@0: PR_ASSERT(pidx < primes); michael@0: michael@0: /* if pidx == primes - 1, we can't expand the table any more */ michael@0: while (pidx < primes - 1) michael@0: { michael@0: /* next size */ michael@0: ++pidx; michael@0: length = prime_number[pidx]; michael@0: michael@0: /* allocate the new hash table and fill it in with the old */ michael@0: newHash = (_PRWaiterHash*)PR_CALLOC( michael@0: sizeof(_PRWaiterHash) + (length * sizeof(PRRecvWait*))); michael@0: if (NULL == newHash) michael@0: { michael@0: PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); michael@0: return _prmw_error; michael@0: } michael@0: michael@0: newHash->length = length; michael@0: retry = PR_FALSE; michael@0: for (desc = &oldHash->recv_wait; michael@0: newHash->count < oldHash->count; ++desc) michael@0: { michael@0: PR_ASSERT(desc < &oldHash->recv_wait + oldHash->length); michael@0: if (NULL != *desc) michael@0: { michael@0: hrv = MW_AddHashInternal(*desc, newHash); michael@0: PR_ASSERT(_prmw_error != hrv); michael@0: if (_prmw_success != hrv) michael@0: { michael@0: PR_DELETE(newHash); michael@0: retry = PR_TRUE; michael@0: break; michael@0: } michael@0: } michael@0: } michael@0: if (retry) continue; michael@0: michael@0: PR_DELETE(group->waiter); michael@0: group->waiter = newHash; michael@0: group->p_timestamp += 1; michael@0: return _prmw_success; michael@0: } michael@0: michael@0: PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); michael@0: return _prmw_error; /* we're hosed */ michael@0: } /* MW_ExpandHashInternal */ michael@0: michael@0: #ifndef WINNT michael@0: static void _MW_DoneInternal( michael@0: PRWaitGroup *group, PRRecvWait **waiter, PRMWStatus outcome) michael@0: { michael@0: /* michael@0: ** Add this receive wait object to the list of finished I/O michael@0: ** operations for this particular group. If there are other michael@0: ** threads waiting on the group, notify one. If not, arrange michael@0: ** for this thread to return. michael@0: */ michael@0: michael@0: #if 0 michael@0: printf("Removing 0x%x->0x%x\n", *waiter, (*waiter)->fd); michael@0: #endif michael@0: (*waiter)->outcome = outcome; michael@0: PR_APPEND_LINK(&((*waiter)->internal), &group->io_ready); michael@0: PR_NotifyCondVar(group->io_complete); michael@0: PR_ASSERT(0 != group->waiter->count); michael@0: group->waiter->count -= 1; michael@0: *waiter = NULL; michael@0: } /* _MW_DoneInternal */ michael@0: #endif /* WINNT */ michael@0: michael@0: static PRRecvWait **_MW_LookupInternal(PRWaitGroup *group, PRFileDesc *fd) michael@0: { michael@0: /* michael@0: ** Find the receive wait object corresponding to the file descriptor. michael@0: ** Only search the wait group specified. michael@0: */ michael@0: PRRecvWait **desc; michael@0: PRIntn rehash = _MW_REHASH_MAX; michael@0: _PRWaiterHash *hash = group->waiter; michael@0: PRUintn hidx = _MW_HASH(fd, hash->length); michael@0: PRUintn hoffset = 0; michael@0: michael@0: while (rehash-- > 0) michael@0: { michael@0: desc = (&hash->recv_wait) + hidx; michael@0: if ((*desc != NULL) && ((*desc)->fd == fd)) return desc; michael@0: if (0 == hoffset) michael@0: { michael@0: hoffset = _MW_HASH2(fd, hash->length); michael@0: PR_ASSERT(0 != hoffset); michael@0: } michael@0: hidx = (hidx + hoffset) % (hash->length); michael@0: } michael@0: return NULL; michael@0: } /* _MW_LookupInternal */ michael@0: michael@0: #ifndef WINNT michael@0: static PRStatus _MW_PollInternal(PRWaitGroup *group) michael@0: { michael@0: PRRecvWait **waiter; michael@0: PRStatus rv = PR_FAILURE; michael@0: PRInt32 count, count_ready; michael@0: PRIntervalTime polling_interval; michael@0: michael@0: group->poller = PR_GetCurrentThread(); michael@0: michael@0: while (PR_TRUE) michael@0: { michael@0: PRIntervalTime now, since_last_poll; michael@0: PRPollDesc *poll_list; michael@0: michael@0: while (0 == group->waiter->count) michael@0: { michael@0: PRStatus st; michael@0: st = PR_WaitCondVar(group->new_business, PR_INTERVAL_NO_TIMEOUT); michael@0: if (_prmw_running != group->state) michael@0: { michael@0: PR_SetError(PR_INVALID_STATE_ERROR, 0); michael@0: goto aborted; michael@0: } michael@0: if (_MW_ABORTED(st)) goto aborted; michael@0: } michael@0: michael@0: /* michael@0: ** There's something to do. See if our existing polling list michael@0: ** is large enough for what we have to do? michael@0: */ michael@0: michael@0: while (group->polling_count < group->waiter->count) michael@0: { michael@0: PRUint32 old_count = group->waiter->count; michael@0: PRUint32 new_count = PR_ROUNDUP(old_count, _PR_POLL_COUNT_FUDGE); michael@0: PRSize new_size = sizeof(PRPollDesc) * new_count; michael@0: PRPollDesc *old_polling_list = group->polling_list; michael@0: michael@0: PR_Unlock(group->ml); michael@0: poll_list = (PRPollDesc*)PR_CALLOC(new_size); michael@0: if (NULL == poll_list) michael@0: { michael@0: PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); michael@0: PR_Lock(group->ml); michael@0: goto failed_alloc; michael@0: } michael@0: if (NULL != old_polling_list) michael@0: PR_DELETE(old_polling_list); michael@0: PR_Lock(group->ml); michael@0: if (_prmw_running != group->state) michael@0: { michael@0: PR_SetError(PR_INVALID_STATE_ERROR, 0); michael@0: goto aborted; michael@0: } michael@0: group->polling_list = poll_list; michael@0: group->polling_count = new_count; michael@0: } michael@0: michael@0: now = PR_IntervalNow(); michael@0: polling_interval = max_polling_interval; michael@0: since_last_poll = now - group->last_poll; michael@0: michael@0: waiter = &group->waiter->recv_wait; michael@0: poll_list = group->polling_list; michael@0: for (count = 0; count < group->waiter->count; ++waiter) michael@0: { michael@0: PR_ASSERT(waiter < &group->waiter->recv_wait michael@0: + group->waiter->length); michael@0: if (NULL != *waiter) /* a live one! */ michael@0: { michael@0: if ((PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout) michael@0: && (since_last_poll >= (*waiter)->timeout)) michael@0: _MW_DoneInternal(group, waiter, PR_MW_TIMEOUT); michael@0: else michael@0: { michael@0: if (PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout) michael@0: { michael@0: (*waiter)->timeout -= since_last_poll; michael@0: if ((*waiter)->timeout < polling_interval) michael@0: polling_interval = (*waiter)->timeout; michael@0: } michael@0: PR_ASSERT(poll_list < group->polling_list michael@0: + group->polling_count); michael@0: poll_list->fd = (*waiter)->fd; michael@0: poll_list->in_flags = PR_POLL_READ; michael@0: poll_list->out_flags = 0; michael@0: #if 0 michael@0: printf( michael@0: "Polling 0x%x[%d]: [fd: 0x%x, tmo: %u]\n", michael@0: poll_list, count, poll_list->fd, (*waiter)->timeout); michael@0: #endif michael@0: poll_list += 1; michael@0: count += 1; michael@0: } michael@0: } michael@0: } michael@0: michael@0: PR_ASSERT(count == group->waiter->count); michael@0: michael@0: /* michael@0: ** If there are no more threads waiting for completion, michael@0: ** we need to return. michael@0: */ michael@0: if ((!PR_CLIST_IS_EMPTY(&group->io_ready)) michael@0: && (1 == group->waiting_threads)) break; michael@0: michael@0: if (0 == count) continue; /* wait for new business */ michael@0: michael@0: group->last_poll = now; michael@0: michael@0: PR_Unlock(group->ml); michael@0: michael@0: count_ready = PR_Poll(group->polling_list, count, polling_interval); michael@0: michael@0: PR_Lock(group->ml); michael@0: michael@0: if (_prmw_running != group->state) michael@0: { michael@0: PR_SetError(PR_INVALID_STATE_ERROR, 0); michael@0: goto aborted; michael@0: } michael@0: if (-1 == count_ready) michael@0: { michael@0: goto failed_poll; /* that's a shame */ michael@0: } michael@0: else if (0 < count_ready) michael@0: { michael@0: for (poll_list = group->polling_list; count > 0; michael@0: poll_list++, count--) michael@0: { michael@0: PR_ASSERT( michael@0: poll_list < group->polling_list + group->polling_count); michael@0: if (poll_list->out_flags != 0) michael@0: { michael@0: waiter = _MW_LookupInternal(group, poll_list->fd); michael@0: /* michael@0: ** If 'waiter' is NULL, that means the wait receive michael@0: ** descriptor has been canceled. michael@0: */ michael@0: if (NULL != waiter) michael@0: _MW_DoneInternal(group, waiter, PR_MW_SUCCESS); michael@0: } michael@0: } michael@0: } michael@0: /* michael@0: ** If there are no more threads waiting for completion, michael@0: ** we need to return. michael@0: ** This thread was "borrowed" to do the polling, but it really michael@0: ** belongs to the client. michael@0: */ michael@0: if ((!PR_CLIST_IS_EMPTY(&group->io_ready)) michael@0: && (1 == group->waiting_threads)) break; michael@0: } michael@0: michael@0: rv = PR_SUCCESS; michael@0: michael@0: aborted: michael@0: failed_poll: michael@0: failed_alloc: michael@0: group->poller = NULL; /* we were that, not we ain't */ michael@0: if ((_prmw_running == group->state) && (group->waiting_threads > 1)) michael@0: { michael@0: /* Wake up one thread to become the new poller. */ michael@0: PR_NotifyCondVar(group->io_complete); michael@0: } michael@0: return rv; /* we return with the lock held */ michael@0: } /* _MW_PollInternal */ michael@0: #endif /* !WINNT */ michael@0: michael@0: static PRMWGroupState MW_TestForShutdownInternal(PRWaitGroup *group) michael@0: { michael@0: PRMWGroupState rv = group->state; michael@0: /* michael@0: ** Looking at the group's fields is safe because michael@0: ** once the group's state is no longer running, it michael@0: ** cannot revert and there is a safe check on entry michael@0: ** to make sure no more threads are made to wait. michael@0: */ michael@0: if ((_prmw_stopping == rv) michael@0: && (0 == group->waiting_threads)) michael@0: { michael@0: rv = group->state = _prmw_stopped; michael@0: PR_NotifyCondVar(group->mw_manage); michael@0: } michael@0: return rv; michael@0: } /* MW_TestForShutdownInternal */ michael@0: michael@0: #ifndef WINNT michael@0: static void _MW_InitialRecv(PRCList *io_ready) michael@0: { michael@0: PRRecvWait *desc = (PRRecvWait*)io_ready; michael@0: if ((NULL == desc->buffer.start) michael@0: || (0 == desc->buffer.length)) michael@0: desc->bytesRecv = 0; michael@0: else michael@0: { michael@0: desc->bytesRecv = (desc->fd->methods->recv)( michael@0: desc->fd, desc->buffer.start, michael@0: desc->buffer.length, 0, desc->timeout); michael@0: if (desc->bytesRecv < 0) /* SetError should already be there */ michael@0: desc->outcome = PR_MW_FAILURE; michael@0: } michael@0: } /* _MW_InitialRecv */ michael@0: #endif michael@0: michael@0: #ifdef WINNT michael@0: static void NT_TimeProc(void *arg) michael@0: { michael@0: _MDOverlapped *overlapped = (_MDOverlapped *)arg; michael@0: PRRecvWait *desc = overlapped->data.mw.desc; michael@0: PRFileDesc *bottom; michael@0: michael@0: if (InterlockedCompareExchange((LONG *)&desc->outcome, michael@0: (LONG)PR_MW_TIMEOUT, (LONG)PR_MW_PENDING) != (LONG)PR_MW_PENDING) michael@0: { michael@0: /* This wait recv descriptor has already completed. */ michael@0: return; michael@0: } michael@0: michael@0: /* close the osfd to abort the outstanding async io request */ michael@0: /* $$$$ michael@0: ** Little late to be checking if NSPR's on the bottom of stack, michael@0: ** but if we don't check, we can't assert that the private data michael@0: ** is what we think it is. michael@0: ** $$$$ michael@0: */ michael@0: bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER); michael@0: PR_ASSERT(NULL != bottom); michael@0: if (NULL != bottom) /* now what!?!?! */ michael@0: { michael@0: bottom->secret->state = _PR_FILEDESC_CLOSED; michael@0: if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR) michael@0: { michael@0: fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError()); michael@0: PR_ASSERT(!"What shall I do?"); michael@0: } michael@0: } michael@0: return; michael@0: } /* NT_TimeProc */ michael@0: michael@0: static PRStatus NT_HashRemove(PRWaitGroup *group, PRFileDesc *fd) michael@0: { michael@0: PRRecvWait **waiter; michael@0: michael@0: _PR_MD_LOCK(&group->mdlock); michael@0: waiter = _MW_LookupInternal(group, fd); michael@0: if (NULL != waiter) michael@0: { michael@0: group->waiter->count -= 1; michael@0: *waiter = NULL; michael@0: } michael@0: _PR_MD_UNLOCK(&group->mdlock); michael@0: return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE; michael@0: } michael@0: michael@0: PRStatus NT_HashRemoveInternal(PRWaitGroup *group, PRFileDesc *fd) michael@0: { michael@0: PRRecvWait **waiter; michael@0: michael@0: waiter = _MW_LookupInternal(group, fd); michael@0: if (NULL != waiter) michael@0: { michael@0: group->waiter->count -= 1; michael@0: *waiter = NULL; michael@0: } michael@0: return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE; michael@0: } michael@0: #endif /* WINNT */ michael@0: michael@0: /******************************************************************/ michael@0: /******************************************************************/ michael@0: /********************** The public API portion ********************/ michael@0: /******************************************************************/ michael@0: /******************************************************************/ michael@0: PR_IMPLEMENT(PRStatus) PR_AddWaitFileDesc( michael@0: PRWaitGroup *group, PRRecvWait *desc) michael@0: { michael@0: _PR_HashStory hrv; michael@0: PRStatus rv = PR_FAILURE; michael@0: #ifdef WINNT michael@0: _MDOverlapped *overlapped; michael@0: HANDLE hFile; michael@0: BOOL bResult; michael@0: DWORD dwError; michael@0: PRFileDesc *bottom; michael@0: #endif michael@0: michael@0: if (!_pr_initialized) _PR_ImplicitInitialization(); michael@0: if ((NULL == group) && (NULL == (group = MW_Init2()))) michael@0: { michael@0: return rv; michael@0: } michael@0: michael@0: PR_ASSERT(NULL != desc->fd); michael@0: michael@0: desc->outcome = PR_MW_PENDING; /* nice, well known value */ michael@0: desc->bytesRecv = 0; /* likewise, though this value is ambiguious */ michael@0: michael@0: PR_Lock(group->ml); michael@0: michael@0: if (_prmw_running != group->state) michael@0: { michael@0: /* Not allowed to add after cancelling the group */ michael@0: desc->outcome = PR_MW_INTERRUPT; michael@0: PR_SetError(PR_INVALID_STATE_ERROR, 0); michael@0: PR_Unlock(group->ml); michael@0: return rv; michael@0: } michael@0: michael@0: #ifdef WINNT michael@0: _PR_MD_LOCK(&group->mdlock); michael@0: #endif michael@0: michael@0: /* michael@0: ** If the waiter count is zero at this point, there's no telling michael@0: ** how long we've been idle. Therefore, initialize the beginning michael@0: ** of the timing interval. As long as the list doesn't go empty, michael@0: ** it will maintain itself. michael@0: */ michael@0: if (0 == group->waiter->count) michael@0: group->last_poll = PR_IntervalNow(); michael@0: michael@0: do michael@0: { michael@0: hrv = MW_AddHashInternal(desc, group->waiter); michael@0: if (_prmw_rehash != hrv) break; michael@0: hrv = MW_ExpandHashInternal(group); /* gruesome */ michael@0: if (_prmw_success != hrv) break; michael@0: } while (PR_TRUE); michael@0: michael@0: #ifdef WINNT michael@0: _PR_MD_UNLOCK(&group->mdlock); michael@0: #endif michael@0: michael@0: PR_NotifyCondVar(group->new_business); /* tell the world */ michael@0: rv = (_prmw_success == hrv) ? PR_SUCCESS : PR_FAILURE; michael@0: PR_Unlock(group->ml); michael@0: michael@0: #ifdef WINNT michael@0: overlapped = PR_NEWZAP(_MDOverlapped); michael@0: if (NULL == overlapped) michael@0: { michael@0: PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); michael@0: NT_HashRemove(group, desc->fd); michael@0: return rv; michael@0: } michael@0: overlapped->ioModel = _MD_MultiWaitIO; michael@0: overlapped->data.mw.desc = desc; michael@0: overlapped->data.mw.group = group; michael@0: if (desc->timeout != PR_INTERVAL_NO_TIMEOUT) michael@0: { michael@0: overlapped->data.mw.timer = CreateTimer( michael@0: desc->timeout, michael@0: NT_TimeProc, michael@0: overlapped); michael@0: if (0 == overlapped->data.mw.timer) michael@0: { michael@0: NT_HashRemove(group, desc->fd); michael@0: PR_DELETE(overlapped); michael@0: /* michael@0: * XXX It appears that a maximum of 16 timer events can michael@0: * be outstanding. GetLastError() returns 0 when I try it. michael@0: */ michael@0: PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, GetLastError()); michael@0: return PR_FAILURE; michael@0: } michael@0: } michael@0: michael@0: /* Reach to the bottom layer to get the OS fd */ michael@0: bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER); michael@0: PR_ASSERT(NULL != bottom); michael@0: if (NULL == bottom) michael@0: { michael@0: PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); michael@0: return PR_FAILURE; michael@0: } michael@0: hFile = (HANDLE)bottom->secret->md.osfd; michael@0: if (!bottom->secret->md.io_model_committed) michael@0: { michael@0: PRInt32 st; michael@0: st = _md_Associate(hFile); michael@0: PR_ASSERT(0 != st); michael@0: bottom->secret->md.io_model_committed = PR_TRUE; michael@0: } michael@0: bResult = ReadFile(hFile, michael@0: desc->buffer.start, michael@0: (DWORD)desc->buffer.length, michael@0: NULL, michael@0: &overlapped->overlapped); michael@0: if (FALSE == bResult && (dwError = GetLastError()) != ERROR_IO_PENDING) michael@0: { michael@0: if (desc->timeout != PR_INTERVAL_NO_TIMEOUT) michael@0: { michael@0: if (InterlockedCompareExchange((LONG *)&desc->outcome, michael@0: (LONG)PR_MW_FAILURE, (LONG)PR_MW_PENDING) michael@0: == (LONG)PR_MW_PENDING) michael@0: { michael@0: CancelTimer(overlapped->data.mw.timer); michael@0: } michael@0: NT_HashRemove(group, desc->fd); michael@0: PR_DELETE(overlapped); michael@0: } michael@0: _PR_MD_MAP_READ_ERROR(dwError); michael@0: rv = PR_FAILURE; michael@0: } michael@0: #endif michael@0: michael@0: return rv; michael@0: } /* PR_AddWaitFileDesc */ michael@0: michael@0: PR_IMPLEMENT(PRRecvWait*) PR_WaitRecvReady(PRWaitGroup *group) michael@0: { michael@0: PRCList *io_ready = NULL; michael@0: #ifdef WINNT michael@0: PRThread *me = _PR_MD_CURRENT_THREAD(); michael@0: _MDOverlapped *overlapped; michael@0: #endif michael@0: michael@0: if (!_pr_initialized) _PR_ImplicitInitialization(); michael@0: if ((NULL == group) && (NULL == (group = MW_Init2()))) goto failed_init; michael@0: michael@0: PR_Lock(group->ml); michael@0: michael@0: if (_prmw_running != group->state) michael@0: { michael@0: PR_SetError(PR_INVALID_STATE_ERROR, 0); michael@0: goto invalid_state; michael@0: } michael@0: michael@0: group->waiting_threads += 1; /* the polling thread is counted */ michael@0: michael@0: #ifdef WINNT michael@0: _PR_MD_LOCK(&group->mdlock); michael@0: while (PR_CLIST_IS_EMPTY(&group->io_ready)) michael@0: { michael@0: _PR_THREAD_LOCK(me); michael@0: me->state = _PR_IO_WAIT; michael@0: PR_APPEND_LINK(&me->waitQLinks, &group->wait_list); michael@0: if (!_PR_IS_NATIVE_THREAD(me)) michael@0: { michael@0: _PR_SLEEPQ_LOCK(me->cpu); michael@0: _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT); michael@0: _PR_SLEEPQ_UNLOCK(me->cpu); michael@0: } michael@0: _PR_THREAD_UNLOCK(me); michael@0: _PR_MD_UNLOCK(&group->mdlock); michael@0: PR_Unlock(group->ml); michael@0: _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT); michael@0: me->state = _PR_RUNNING; michael@0: PR_Lock(group->ml); michael@0: _PR_MD_LOCK(&group->mdlock); michael@0: if (_PR_PENDING_INTERRUPT(me)) { michael@0: PR_REMOVE_LINK(&me->waitQLinks); michael@0: _PR_MD_UNLOCK(&group->mdlock); michael@0: me->flags &= ~_PR_INTERRUPT; michael@0: me->io_suspended = PR_FALSE; michael@0: PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0); michael@0: goto aborted; michael@0: } michael@0: } michael@0: io_ready = PR_LIST_HEAD(&group->io_ready); michael@0: PR_ASSERT(io_ready != NULL); michael@0: PR_REMOVE_LINK(io_ready); michael@0: _PR_MD_UNLOCK(&group->mdlock); michael@0: overlapped = (_MDOverlapped *) michael@0: ((char *)io_ready - offsetof(_MDOverlapped, data)); michael@0: io_ready = &overlapped->data.mw.desc->internal; michael@0: #else michael@0: do michael@0: { michael@0: /* michael@0: ** If the I/O ready list isn't empty, have this thread michael@0: ** return with the first receive wait object that's available. michael@0: */ michael@0: if (PR_CLIST_IS_EMPTY(&group->io_ready)) michael@0: { michael@0: /* michael@0: ** Is there a polling thread yet? If not, grab this thread michael@0: ** and use it. michael@0: */ michael@0: if (NULL == group->poller) michael@0: { michael@0: /* michael@0: ** This thread will stay do polling until it becomes the only one michael@0: ** left to service a completion. Then it will return and there will michael@0: ** be none left to actually poll or to run completions. michael@0: ** michael@0: ** The polling function should only return w/ failure or michael@0: ** with some I/O ready. michael@0: */ michael@0: if (PR_FAILURE == _MW_PollInternal(group)) goto failed_poll; michael@0: } michael@0: else michael@0: { michael@0: /* michael@0: ** There are four reasons a thread can be awakened from michael@0: ** a wait on the io_complete condition variable. michael@0: ** 1. Some I/O has completed, i.e., the io_ready list michael@0: ** is nonempty. michael@0: ** 2. The wait group is canceled. michael@0: ** 3. The thread is interrupted. michael@0: ** 4. The current polling thread has to leave and needs michael@0: ** a replacement. michael@0: ** The logic to find a new polling thread is made more michael@0: ** complicated by all the other possible events. michael@0: ** I tried my best to write the logic clearly, but michael@0: ** it is still full of if's with continue and goto. michael@0: */ michael@0: PRStatus st; michael@0: do michael@0: { michael@0: st = PR_WaitCondVar(group->io_complete, PR_INTERVAL_NO_TIMEOUT); michael@0: if (_prmw_running != group->state) michael@0: { michael@0: PR_SetError(PR_INVALID_STATE_ERROR, 0); michael@0: goto aborted; michael@0: } michael@0: if (_MW_ABORTED(st) || (NULL == group->poller)) break; michael@0: } while (PR_CLIST_IS_EMPTY(&group->io_ready)); michael@0: michael@0: /* michael@0: ** The thread is interrupted and has to leave. It might michael@0: ** have also been awakened to process ready i/o or be the michael@0: ** new poller. To be safe, if either condition is true, michael@0: ** we awaken another thread to take its place. michael@0: */ michael@0: if (_MW_ABORTED(st)) michael@0: { michael@0: if ((NULL == group->poller michael@0: || !PR_CLIST_IS_EMPTY(&group->io_ready)) michael@0: && group->waiting_threads > 1) michael@0: PR_NotifyCondVar(group->io_complete); michael@0: goto aborted; michael@0: } michael@0: michael@0: /* michael@0: ** A new poller is needed, but can I be the new poller? michael@0: ** If there is no i/o ready, sure. But if there is any michael@0: ** i/o ready, it has a higher priority. I want to michael@0: ** process the ready i/o first and wake up another michael@0: ** thread to be the new poller. michael@0: */ michael@0: if (NULL == group->poller) michael@0: { michael@0: if (PR_CLIST_IS_EMPTY(&group->io_ready)) michael@0: continue; michael@0: if (group->waiting_threads > 1) michael@0: PR_NotifyCondVar(group->io_complete); michael@0: } michael@0: } michael@0: PR_ASSERT(!PR_CLIST_IS_EMPTY(&group->io_ready)); michael@0: } michael@0: io_ready = PR_LIST_HEAD(&group->io_ready); michael@0: PR_NotifyCondVar(group->io_taken); michael@0: PR_ASSERT(io_ready != NULL); michael@0: PR_REMOVE_LINK(io_ready); michael@0: } while (NULL == io_ready); michael@0: michael@0: failed_poll: michael@0: michael@0: #endif michael@0: michael@0: aborted: michael@0: michael@0: group->waiting_threads -= 1; michael@0: invalid_state: michael@0: (void)MW_TestForShutdownInternal(group); michael@0: PR_Unlock(group->ml); michael@0: michael@0: failed_init: michael@0: if (NULL != io_ready) michael@0: { michael@0: /* If the operation failed, record the reason why */ michael@0: switch (((PRRecvWait*)io_ready)->outcome) michael@0: { michael@0: case PR_MW_PENDING: michael@0: PR_ASSERT(0); michael@0: break; michael@0: case PR_MW_SUCCESS: michael@0: #ifndef WINNT michael@0: _MW_InitialRecv(io_ready); michael@0: #endif michael@0: break; michael@0: #ifdef WINNT michael@0: case PR_MW_FAILURE: michael@0: _PR_MD_MAP_READ_ERROR(overlapped->data.mw.error); michael@0: break; michael@0: #endif michael@0: case PR_MW_TIMEOUT: michael@0: PR_SetError(PR_IO_TIMEOUT_ERROR, 0); michael@0: break; michael@0: case PR_MW_INTERRUPT: michael@0: PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0); michael@0: break; michael@0: default: break; michael@0: } michael@0: #ifdef WINNT michael@0: if (NULL != overlapped->data.mw.timer) michael@0: { michael@0: PR_ASSERT(PR_INTERVAL_NO_TIMEOUT michael@0: != overlapped->data.mw.desc->timeout); michael@0: CancelTimer(overlapped->data.mw.timer); michael@0: } michael@0: else michael@0: { michael@0: PR_ASSERT(PR_INTERVAL_NO_TIMEOUT michael@0: == overlapped->data.mw.desc->timeout); michael@0: } michael@0: PR_DELETE(overlapped); michael@0: #endif michael@0: } michael@0: return (PRRecvWait*)io_ready; michael@0: } /* PR_WaitRecvReady */ michael@0: michael@0: PR_IMPLEMENT(PRStatus) PR_CancelWaitFileDesc(PRWaitGroup *group, PRRecvWait *desc) michael@0: { michael@0: #if !defined(WINNT) michael@0: PRRecvWait **recv_wait; michael@0: #endif michael@0: PRStatus rv = PR_SUCCESS; michael@0: if (NULL == group) group = mw_state->group; michael@0: PR_ASSERT(NULL != group); michael@0: if (NULL == group) michael@0: { michael@0: PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); michael@0: return PR_FAILURE; michael@0: } michael@0: michael@0: PR_Lock(group->ml); michael@0: michael@0: if (_prmw_running != group->state) michael@0: { michael@0: PR_SetError(PR_INVALID_STATE_ERROR, 0); michael@0: rv = PR_FAILURE; michael@0: goto unlock; michael@0: } michael@0: michael@0: #ifdef WINNT michael@0: if (InterlockedCompareExchange((LONG *)&desc->outcome, michael@0: (LONG)PR_MW_INTERRUPT, (LONG)PR_MW_PENDING) == (LONG)PR_MW_PENDING) michael@0: { michael@0: PRFileDesc *bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER); michael@0: PR_ASSERT(NULL != bottom); michael@0: if (NULL == bottom) michael@0: { michael@0: PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); michael@0: goto unlock; michael@0: } michael@0: bottom->secret->state = _PR_FILEDESC_CLOSED; michael@0: #if 0 michael@0: fprintf(stderr, "cancel wait recv: closing socket\n"); michael@0: #endif michael@0: if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR) michael@0: { michael@0: fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError()); michael@0: exit(1); michael@0: } michael@0: } michael@0: #else michael@0: if (NULL != (recv_wait = _MW_LookupInternal(group, desc->fd))) michael@0: { michael@0: /* it was in the wait table */ michael@0: _MW_DoneInternal(group, recv_wait, PR_MW_INTERRUPT); michael@0: goto unlock; michael@0: } michael@0: if (!PR_CLIST_IS_EMPTY(&group->io_ready)) michael@0: { michael@0: /* is it already complete? */ michael@0: PRCList *head = PR_LIST_HEAD(&group->io_ready); michael@0: do michael@0: { michael@0: PRRecvWait *done = (PRRecvWait*)head; michael@0: if (done == desc) goto unlock; michael@0: head = PR_NEXT_LINK(head); michael@0: } while (head != &group->io_ready); michael@0: } michael@0: PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); michael@0: rv = PR_FAILURE; michael@0: michael@0: #endif michael@0: unlock: michael@0: PR_Unlock(group->ml); michael@0: return rv; michael@0: } /* PR_CancelWaitFileDesc */ michael@0: michael@0: PR_IMPLEMENT(PRRecvWait*) PR_CancelWaitGroup(PRWaitGroup *group) michael@0: { michael@0: PRRecvWait **desc; michael@0: PRRecvWait *recv_wait = NULL; michael@0: #ifdef WINNT michael@0: _MDOverlapped *overlapped; michael@0: PRRecvWait **end; michael@0: PRThread *me = _PR_MD_CURRENT_THREAD(); michael@0: #endif michael@0: michael@0: if (NULL == group) group = mw_state->group; michael@0: PR_ASSERT(NULL != group); michael@0: if (NULL == group) michael@0: { michael@0: PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); michael@0: return NULL; michael@0: } michael@0: michael@0: PR_Lock(group->ml); michael@0: if (_prmw_stopped != group->state) michael@0: { michael@0: if (_prmw_running == group->state) michael@0: group->state = _prmw_stopping; /* so nothing new comes in */ michael@0: if (0 == group->waiting_threads) /* is there anybody else? */ michael@0: group->state = _prmw_stopped; /* we can stop right now */ michael@0: else michael@0: { michael@0: PR_NotifyAllCondVar(group->new_business); michael@0: PR_NotifyAllCondVar(group->io_complete); michael@0: } michael@0: while (_prmw_stopped != group->state) michael@0: (void)PR_WaitCondVar(group->mw_manage, PR_INTERVAL_NO_TIMEOUT); michael@0: } michael@0: michael@0: #ifdef WINNT michael@0: _PR_MD_LOCK(&group->mdlock); michael@0: #endif michael@0: /* make all the existing descriptors look done/interrupted */ michael@0: #ifdef WINNT michael@0: end = &group->waiter->recv_wait + group->waiter->length; michael@0: for (desc = &group->waiter->recv_wait; desc < end; ++desc) michael@0: { michael@0: if (NULL != *desc) michael@0: { michael@0: if (InterlockedCompareExchange((LONG *)&(*desc)->outcome, michael@0: (LONG)PR_MW_INTERRUPT, (LONG)PR_MW_PENDING) michael@0: == (LONG)PR_MW_PENDING) michael@0: { michael@0: PRFileDesc *bottom = PR_GetIdentitiesLayer( michael@0: (*desc)->fd, PR_NSPR_IO_LAYER); michael@0: PR_ASSERT(NULL != bottom); michael@0: if (NULL == bottom) michael@0: { michael@0: PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); michael@0: goto invalid_arg; michael@0: } michael@0: bottom->secret->state = _PR_FILEDESC_CLOSED; michael@0: #if 0 michael@0: fprintf(stderr, "cancel wait group: closing socket\n"); michael@0: #endif michael@0: if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR) michael@0: { michael@0: fprintf(stderr, "closesocket failed: %d\n", michael@0: WSAGetLastError()); michael@0: exit(1); michael@0: } michael@0: } michael@0: } michael@0: } michael@0: while (group->waiter->count > 0) michael@0: { michael@0: _PR_THREAD_LOCK(me); michael@0: me->state = _PR_IO_WAIT; michael@0: PR_APPEND_LINK(&me->waitQLinks, &group->wait_list); michael@0: if (!_PR_IS_NATIVE_THREAD(me)) michael@0: { michael@0: _PR_SLEEPQ_LOCK(me->cpu); michael@0: _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT); michael@0: _PR_SLEEPQ_UNLOCK(me->cpu); michael@0: } michael@0: _PR_THREAD_UNLOCK(me); michael@0: _PR_MD_UNLOCK(&group->mdlock); michael@0: PR_Unlock(group->ml); michael@0: _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT); michael@0: me->state = _PR_RUNNING; michael@0: PR_Lock(group->ml); michael@0: _PR_MD_LOCK(&group->mdlock); michael@0: } michael@0: #else michael@0: for (desc = &group->waiter->recv_wait; group->waiter->count > 0; ++desc) michael@0: { michael@0: PR_ASSERT(desc < &group->waiter->recv_wait + group->waiter->length); michael@0: if (NULL != *desc) michael@0: _MW_DoneInternal(group, desc, PR_MW_INTERRUPT); michael@0: } michael@0: #endif michael@0: michael@0: /* take first element of finished list and return it or NULL */ michael@0: if (PR_CLIST_IS_EMPTY(&group->io_ready)) michael@0: PR_SetError(PR_GROUP_EMPTY_ERROR, 0); michael@0: else michael@0: { michael@0: PRCList *head = PR_LIST_HEAD(&group->io_ready); michael@0: PR_REMOVE_AND_INIT_LINK(head); michael@0: #ifdef WINNT michael@0: overlapped = (_MDOverlapped *) michael@0: ((char *)head - offsetof(_MDOverlapped, data)); michael@0: head = &overlapped->data.mw.desc->internal; michael@0: if (NULL != overlapped->data.mw.timer) michael@0: { michael@0: PR_ASSERT(PR_INTERVAL_NO_TIMEOUT michael@0: != overlapped->data.mw.desc->timeout); michael@0: CancelTimer(overlapped->data.mw.timer); michael@0: } michael@0: else michael@0: { michael@0: PR_ASSERT(PR_INTERVAL_NO_TIMEOUT michael@0: == overlapped->data.mw.desc->timeout); michael@0: } michael@0: PR_DELETE(overlapped); michael@0: #endif michael@0: recv_wait = (PRRecvWait*)head; michael@0: } michael@0: #ifdef WINNT michael@0: invalid_arg: michael@0: _PR_MD_UNLOCK(&group->mdlock); michael@0: #endif michael@0: PR_Unlock(group->ml); michael@0: michael@0: return recv_wait; michael@0: } /* PR_CancelWaitGroup */ michael@0: michael@0: PR_IMPLEMENT(PRWaitGroup*) PR_CreateWaitGroup(PRInt32 size /* ignored */) michael@0: { michael@0: PRWaitGroup *wg; michael@0: michael@0: if (NULL == (wg = PR_NEWZAP(PRWaitGroup))) michael@0: { michael@0: PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); michael@0: goto failed; michael@0: } michael@0: /* the wait group itself */ michael@0: wg->ml = PR_NewLock(); michael@0: if (NULL == wg->ml) goto failed_lock; michael@0: wg->io_taken = PR_NewCondVar(wg->ml); michael@0: if (NULL == wg->io_taken) goto failed_cvar0; michael@0: wg->io_complete = PR_NewCondVar(wg->ml); michael@0: if (NULL == wg->io_complete) goto failed_cvar1; michael@0: wg->new_business = PR_NewCondVar(wg->ml); michael@0: if (NULL == wg->new_business) goto failed_cvar2; michael@0: wg->mw_manage = PR_NewCondVar(wg->ml); michael@0: if (NULL == wg->mw_manage) goto failed_cvar3; michael@0: michael@0: PR_INIT_CLIST(&wg->group_link); michael@0: PR_INIT_CLIST(&wg->io_ready); michael@0: michael@0: /* the waiters sequence */ michael@0: wg->waiter = (_PRWaiterHash*)PR_CALLOC( michael@0: sizeof(_PRWaiterHash) + michael@0: (_PR_DEFAULT_HASH_LENGTH * sizeof(PRRecvWait*))); michael@0: if (NULL == wg->waiter) michael@0: { michael@0: PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); michael@0: goto failed_waiter; michael@0: } michael@0: wg->waiter->count = 0; michael@0: wg->waiter->length = _PR_DEFAULT_HASH_LENGTH; michael@0: michael@0: #ifdef WINNT michael@0: _PR_MD_NEW_LOCK(&wg->mdlock); michael@0: PR_INIT_CLIST(&wg->wait_list); michael@0: #endif /* WINNT */ michael@0: michael@0: PR_Lock(mw_lock); michael@0: PR_APPEND_LINK(&wg->group_link, &mw_state->group_list); michael@0: PR_Unlock(mw_lock); michael@0: return wg; michael@0: michael@0: failed_waiter: michael@0: PR_DestroyCondVar(wg->mw_manage); michael@0: failed_cvar3: michael@0: PR_DestroyCondVar(wg->new_business); michael@0: failed_cvar2: michael@0: PR_DestroyCondVar(wg->io_complete); michael@0: failed_cvar1: michael@0: PR_DestroyCondVar(wg->io_taken); michael@0: failed_cvar0: michael@0: PR_DestroyLock(wg->ml); michael@0: failed_lock: michael@0: PR_DELETE(wg); michael@0: wg = NULL; michael@0: michael@0: failed: michael@0: return wg; michael@0: } /* MW_CreateWaitGroup */ michael@0: michael@0: PR_IMPLEMENT(PRStatus) PR_DestroyWaitGroup(PRWaitGroup *group) michael@0: { michael@0: PRStatus rv = PR_SUCCESS; michael@0: if (NULL == group) group = mw_state->group; michael@0: PR_ASSERT(NULL != group); michael@0: if (NULL != group) michael@0: { michael@0: PR_Lock(group->ml); michael@0: if ((group->waiting_threads == 0) michael@0: && (group->waiter->count == 0) michael@0: && PR_CLIST_IS_EMPTY(&group->io_ready)) michael@0: { michael@0: group->state = _prmw_stopped; michael@0: } michael@0: else michael@0: { michael@0: PR_SetError(PR_INVALID_STATE_ERROR, 0); michael@0: rv = PR_FAILURE; michael@0: } michael@0: PR_Unlock(group->ml); michael@0: if (PR_FAILURE == rv) return rv; michael@0: michael@0: PR_Lock(mw_lock); michael@0: PR_REMOVE_LINK(&group->group_link); michael@0: PR_Unlock(mw_lock); michael@0: michael@0: #ifdef WINNT michael@0: /* michael@0: * XXX make sure wait_list is empty and waiter is empty. michael@0: * These must be checked while holding mdlock. michael@0: */ michael@0: _PR_MD_FREE_LOCK(&group->mdlock); michael@0: #endif michael@0: michael@0: PR_DELETE(group->waiter); michael@0: PR_DELETE(group->polling_list); michael@0: PR_DestroyCondVar(group->mw_manage); michael@0: PR_DestroyCondVar(group->new_business); michael@0: PR_DestroyCondVar(group->io_complete); michael@0: PR_DestroyCondVar(group->io_taken); michael@0: PR_DestroyLock(group->ml); michael@0: if (group == mw_state->group) mw_state->group = NULL; michael@0: PR_DELETE(group); michael@0: } michael@0: else michael@0: { michael@0: /* The default wait group is not created yet. */ michael@0: PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); michael@0: rv = PR_FAILURE; michael@0: } michael@0: return rv; michael@0: } /* PR_DestroyWaitGroup */ michael@0: michael@0: /********************************************************************** michael@0: *********************************************************************** michael@0: ******************** Wait group enumerations ************************** michael@0: *********************************************************************** michael@0: **********************************************************************/ michael@0: michael@0: PR_IMPLEMENT(PRMWaitEnumerator*) PR_CreateMWaitEnumerator(PRWaitGroup *group) michael@0: { michael@0: PRMWaitEnumerator *enumerator = PR_NEWZAP(PRMWaitEnumerator); michael@0: if (NULL == enumerator) PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); michael@0: else michael@0: { michael@0: enumerator->group = group; michael@0: enumerator->seal = _PR_ENUM_SEALED; michael@0: } michael@0: return enumerator; michael@0: } /* PR_CreateMWaitEnumerator */ michael@0: michael@0: PR_IMPLEMENT(PRStatus) PR_DestroyMWaitEnumerator(PRMWaitEnumerator* enumerator) michael@0: { michael@0: PR_ASSERT(NULL != enumerator); michael@0: PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal); michael@0: if ((NULL == enumerator) || (_PR_ENUM_SEALED != enumerator->seal)) michael@0: { michael@0: PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); michael@0: return PR_FAILURE; michael@0: } michael@0: enumerator->seal = _PR_ENUM_UNSEALED; michael@0: PR_Free(enumerator); michael@0: return PR_SUCCESS; michael@0: } /* PR_DestroyMWaitEnumerator */ michael@0: michael@0: PR_IMPLEMENT(PRRecvWait*) PR_EnumerateWaitGroup( michael@0: PRMWaitEnumerator *enumerator, const PRRecvWait *previous) michael@0: { michael@0: PRRecvWait *result = NULL; michael@0: michael@0: /* entry point sanity checking */ michael@0: PR_ASSERT(NULL != enumerator); michael@0: PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal); michael@0: if ((NULL == enumerator) michael@0: || (_PR_ENUM_SEALED != enumerator->seal)) goto bad_argument; michael@0: michael@0: /* beginning of enumeration */ michael@0: if (NULL == previous) michael@0: { michael@0: if (NULL == enumerator->group) michael@0: { michael@0: enumerator->group = mw_state->group; michael@0: if (NULL == enumerator->group) michael@0: { michael@0: PR_SetError(PR_GROUP_EMPTY_ERROR, 0); michael@0: return NULL; michael@0: } michael@0: } michael@0: enumerator->waiter = &enumerator->group->waiter->recv_wait; michael@0: enumerator->p_timestamp = enumerator->group->p_timestamp; michael@0: enumerator->thread = PR_GetCurrentThread(); michael@0: enumerator->index = 0; michael@0: } michael@0: /* continuing an enumeration */ michael@0: else michael@0: { michael@0: PRThread *me = PR_GetCurrentThread(); michael@0: PR_ASSERT(me == enumerator->thread); michael@0: if (me != enumerator->thread) goto bad_argument; michael@0: michael@0: /* need to restart the enumeration */ michael@0: if (enumerator->p_timestamp != enumerator->group->p_timestamp) michael@0: return PR_EnumerateWaitGroup(enumerator, NULL); michael@0: } michael@0: michael@0: /* actually progress the enumeration */ michael@0: #if defined(WINNT) michael@0: _PR_MD_LOCK(&enumerator->group->mdlock); michael@0: #else michael@0: PR_Lock(enumerator->group->ml); michael@0: #endif michael@0: while (enumerator->index++ < enumerator->group->waiter->length) michael@0: { michael@0: if (NULL != (result = *(enumerator->waiter)++)) break; michael@0: } michael@0: #if defined(WINNT) michael@0: _PR_MD_UNLOCK(&enumerator->group->mdlock); michael@0: #else michael@0: PR_Unlock(enumerator->group->ml); michael@0: #endif michael@0: michael@0: return result; /* what we live for */ michael@0: michael@0: bad_argument: michael@0: PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); michael@0: return NULL; /* probably ambiguous */ michael@0: } /* PR_EnumerateWaitGroup */ michael@0: michael@0: /* prmwait.c */