nsprpub/pr/src/io/prmwait.c

Wed, 31 Dec 2014 06:55:46 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:55:46 +0100
changeset 1
ca08bd8f51b2
permissions
-rw-r--r--

Added tag TORBROWSER_REPLICA for changeset 6474c204b198

     1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
     2 /* This Source Code Form is subject to the terms of the Mozilla Public
     3  * License, v. 2.0. If a copy of the MPL was not distributed with this
     4  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
     6 #include "primpl.h"
     7 #include "pprmwait.h"
     9 #define _MW_REHASH_MAX 11
    11 static PRLock *mw_lock = NULL;
    12 static _PRGlobalState *mw_state = NULL;
    14 static PRIntervalTime max_polling_interval;
    16 #ifdef WINNT
    18 typedef struct TimerEvent {
    19     PRIntervalTime absolute;
    20     void (*func)(void *);
    21     void *arg;
    22     LONG ref_count;
    23     PRCList links;
    24 } TimerEvent;
    26 #define TIMER_EVENT_PTR(_qp) \
    27     ((TimerEvent *) ((char *) (_qp) - offsetof(TimerEvent, links)))
    29 struct {
    30     PRLock *ml;
    31     PRCondVar *new_timer;
    32     PRCondVar *cancel_timer;
    33     PRThread *manager_thread;
    34     PRCList timer_queue;
    35 } tm_vars;
    37 static PRStatus TimerInit(void);
    38 static void TimerManager(void *arg);
    39 static TimerEvent *CreateTimer(PRIntervalTime timeout,
    40     void (*func)(void *), void *arg);
    41 static PRBool CancelTimer(TimerEvent *timer);
    43 static void TimerManager(void *arg)
    44 {
    45     PRIntervalTime now;
    46     PRIntervalTime timeout;
    47     PRCList *head;
    48     TimerEvent *timer;
    50     PR_Lock(tm_vars.ml);
    51     while (1)
    52     {
    53         if (PR_CLIST_IS_EMPTY(&tm_vars.timer_queue))
    54         {
    55             PR_WaitCondVar(tm_vars.new_timer, PR_INTERVAL_NO_TIMEOUT);
    56         }
    57         else
    58         {
    59             now = PR_IntervalNow();
    60             head = PR_LIST_HEAD(&tm_vars.timer_queue);
    61             timer = TIMER_EVENT_PTR(head);
    62             if ((PRInt32) (now - timer->absolute) >= 0)
    63             {
    64                 PR_REMOVE_LINK(head);
    65                 /*
    66                  * make its prev and next point to itself so that
    67                  * it's obvious that it's not on the timer_queue.
    68                  */
    69                 PR_INIT_CLIST(head);
    70                 PR_ASSERT(2 == timer->ref_count);
    71                 PR_Unlock(tm_vars.ml);
    72                 timer->func(timer->arg);
    73                 PR_Lock(tm_vars.ml);
    74                 timer->ref_count -= 1;
    75                 if (0 == timer->ref_count)
    76                 {
    77                     PR_NotifyAllCondVar(tm_vars.cancel_timer);
    78                 }
    79             }
    80             else
    81             {
    82                 timeout = (PRIntervalTime)(timer->absolute - now);
    83                 PR_WaitCondVar(tm_vars.new_timer, timeout);
    84             } 
    85         }
    86     }
    87     PR_Unlock(tm_vars.ml);
    88 }
    90 static TimerEvent *CreateTimer(
    91     PRIntervalTime timeout,
    92     void (*func)(void *),
    93     void *arg)
    94 {
    95     TimerEvent *timer;
    96     PRCList *links, *tail;
    97     TimerEvent *elem;
    99     timer = PR_NEW(TimerEvent);
   100     if (NULL == timer)
   101     {
   102         PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
   103         return timer;
   104     }
   105     timer->absolute = PR_IntervalNow() + timeout;
   106     timer->func = func;
   107     timer->arg = arg;
   108     timer->ref_count = 2;
   109     PR_Lock(tm_vars.ml);
   110     tail = links = PR_LIST_TAIL(&tm_vars.timer_queue);
   111     while (links->prev != tail)
   112     {
   113         elem = TIMER_EVENT_PTR(links);
   114         if ((PRInt32)(timer->absolute - elem->absolute) >= 0)
   115         {
   116             break;
   117         }
   118         links = links->prev;
   119     }
   120     PR_INSERT_AFTER(&timer->links, links);
   121     PR_NotifyCondVar(tm_vars.new_timer);
   122     PR_Unlock(tm_vars.ml);
   123     return timer;
   124 }
   126 static PRBool CancelTimer(TimerEvent *timer)
   127 {
   128     PRBool canceled = PR_FALSE;
   130     PR_Lock(tm_vars.ml);
   131     timer->ref_count -= 1;
   132     if (timer->links.prev == &timer->links)
   133     {
   134         while (timer->ref_count == 1)
   135         {
   136             PR_WaitCondVar(tm_vars.cancel_timer, PR_INTERVAL_NO_TIMEOUT);
   137         }
   138     }
   139     else
   140     {
   141         PR_REMOVE_LINK(&timer->links);
   142         canceled = PR_TRUE;
   143     }
   144     PR_Unlock(tm_vars.ml);
   145     PR_DELETE(timer);
   146     return canceled; 
   147 }
   149 static PRStatus TimerInit(void)
   150 {
   151     tm_vars.ml = PR_NewLock();
   152     if (NULL == tm_vars.ml)
   153     {
   154         goto failed;
   155     }
   156     tm_vars.new_timer = PR_NewCondVar(tm_vars.ml);
   157     if (NULL == tm_vars.new_timer)
   158     {
   159         goto failed;
   160     }
   161     tm_vars.cancel_timer = PR_NewCondVar(tm_vars.ml);
   162     if (NULL == tm_vars.cancel_timer)
   163     {
   164         goto failed;
   165     }
   166     PR_INIT_CLIST(&tm_vars.timer_queue);
   167     tm_vars.manager_thread = PR_CreateThread(
   168         PR_SYSTEM_THREAD, TimerManager, NULL, PR_PRIORITY_NORMAL,
   169         PR_LOCAL_THREAD, PR_UNJOINABLE_THREAD, 0);
   170     if (NULL == tm_vars.manager_thread)
   171     {
   172         goto failed;
   173     }
   174     return PR_SUCCESS;
   176 failed:
   177     if (NULL != tm_vars.cancel_timer)
   178     {
   179         PR_DestroyCondVar(tm_vars.cancel_timer);
   180     }
   181     if (NULL != tm_vars.new_timer)
   182     {
   183         PR_DestroyCondVar(tm_vars.new_timer);
   184     }
   185     if (NULL != tm_vars.ml)
   186     {
   187         PR_DestroyLock(tm_vars.ml);
   188     }
   189     return PR_FAILURE;
   190 }
   192 #endif /* WINNT */
   194 /******************************************************************/
   195 /******************************************************************/
   196 /************************ The private portion *********************/
   197 /******************************************************************/
   198 /******************************************************************/
   199 void _PR_InitMW(void)
   200 {
   201 #ifdef WINNT
   202     /*
   203      * We use NT 4's InterlockedCompareExchange() to operate
   204      * on PRMWStatus variables.
   205      */
   206     PR_ASSERT(sizeof(LONG) == sizeof(PRMWStatus));
   207     TimerInit();
   208 #endif
   209     mw_lock = PR_NewLock();
   210     PR_ASSERT(NULL != mw_lock);
   211     mw_state = PR_NEWZAP(_PRGlobalState);
   212     PR_ASSERT(NULL != mw_state);
   213     PR_INIT_CLIST(&mw_state->group_list);
   214     max_polling_interval = PR_MillisecondsToInterval(MAX_POLLING_INTERVAL);
   215 }  /* _PR_InitMW */
   217 void _PR_CleanupMW(void)
   218 {
   219     PR_DestroyLock(mw_lock);
   220     mw_lock = NULL;
   221     if (mw_state->group) {
   222         PR_DestroyWaitGroup(mw_state->group);
   223         /* mw_state->group is set to NULL as a side effect. */
   224     }
   225     PR_DELETE(mw_state);
   226 }  /* _PR_CleanupMW */
   228 static PRWaitGroup *MW_Init2(void)
   229 {
   230     PRWaitGroup *group = mw_state->group;  /* it's the null group */
   231     if (NULL == group)  /* there is this special case */
   232     {
   233         group = PR_CreateWaitGroup(_PR_DEFAULT_HASH_LENGTH);
   234         if (NULL == group) goto failed_alloc;
   235         PR_Lock(mw_lock);
   236         if (NULL == mw_state->group)
   237         {
   238             mw_state->group = group;
   239             group = NULL;
   240         }
   241         PR_Unlock(mw_lock);
   242         if (group != NULL) (void)PR_DestroyWaitGroup(group);
   243         group = mw_state->group;  /* somebody beat us to it */
   244     }
   245 failed_alloc:
   246     return group;  /* whatever */
   247 }  /* MW_Init2 */
   249 static _PR_HashStory MW_AddHashInternal(PRRecvWait *desc, _PRWaiterHash *hash)
   250 {
   251     /*
   252     ** The entries are put in the table using the fd (PRFileDesc*) of
   253     ** the receive descriptor as the key. This allows us to locate
   254     ** the appropriate entry aqain when the poll operation finishes.
   255     **
   256     ** The pointer to the file descriptor object is first divided by
   257     ** the natural alignment of a pointer in the belief that object
   258     ** will have at least that many zeros in the low order bits.
   259     ** This may not be a good assuption.
   260     **
   261     ** We try to put the entry in by rehashing _MW_REHASH_MAX times. After
   262     ** that we declare defeat and force the table to be reconstructed.
   263     ** Since some fds might be added more than once, won't that cause
   264     ** collisions even in an empty table?
   265     */
   266     PRIntn rehash = _MW_REHASH_MAX;
   267     PRRecvWait **waiter;
   268     PRUintn hidx = _MW_HASH(desc->fd, hash->length);
   269     PRUintn hoffset = 0;
   271     while (rehash-- > 0)
   272     {
   273         waiter = &hash->recv_wait;
   274         if (NULL == waiter[hidx])
   275         {
   276             waiter[hidx] = desc;
   277             hash->count += 1;
   278 #if 0
   279             printf("Adding 0x%x->0x%x ", desc, desc->fd);
   280             printf(
   281                 "table[%u:%u:*%u]: 0x%x->0x%x\n",
   282                 hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
   283 #endif
   284             return _prmw_success;
   285         }
   286         if (desc == waiter[hidx])
   287         {
   288             PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);  /* desc already in table */
   289             return _prmw_error;
   290         }
   291 #if 0
   292         printf("Failing 0x%x->0x%x ", desc, desc->fd);
   293         printf(
   294             "table[*%u:%u:%u]: 0x%x->0x%x\n",
   295             hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
   296 #endif
   297         if (0 == hoffset)
   298         {
   299             hoffset = _MW_HASH2(desc->fd, hash->length);
   300             PR_ASSERT(0 != hoffset);
   301         }
   302         hidx = (hidx + hoffset) % (hash->length);
   303     }
   304     return _prmw_rehash;    
   305 }  /* MW_AddHashInternal */
   307 static _PR_HashStory MW_ExpandHashInternal(PRWaitGroup *group)
   308 {
   309     PRRecvWait **desc;
   310     PRUint32 pidx, length;
   311     _PRWaiterHash *newHash, *oldHash = group->waiter;
   312     PRBool retry;
   313     _PR_HashStory hrv;
   315     static const PRInt32 prime_number[] = {
   316         _PR_DEFAULT_HASH_LENGTH, 179, 521, 907, 1427,
   317         2711, 3917, 5021, 8219, 11549, 18911, 26711, 33749, 44771};
   318     PRUintn primes = (sizeof(prime_number) / sizeof(PRInt32));
   320     /* look up the next size we'd like to use for the hash table */
   321     for (pidx = 0; pidx < primes; ++pidx)
   322     {
   323         if (prime_number[pidx] == oldHash->length)
   324         {
   325             break;
   326         }
   327     }
   328     /* table size must be one of the prime numbers */
   329     PR_ASSERT(pidx < primes);
   331     /* if pidx == primes - 1, we can't expand the table any more */
   332     while (pidx < primes - 1)
   333     {
   334         /* next size */
   335         ++pidx;
   336         length = prime_number[pidx];
   338         /* allocate the new hash table and fill it in with the old */
   339         newHash = (_PRWaiterHash*)PR_CALLOC(
   340             sizeof(_PRWaiterHash) + (length * sizeof(PRRecvWait*)));
   341         if (NULL == newHash)
   342         {
   343             PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
   344             return _prmw_error;
   345         }
   347         newHash->length = length;
   348         retry = PR_FALSE;
   349         for (desc = &oldHash->recv_wait;
   350             newHash->count < oldHash->count; ++desc)
   351         {
   352             PR_ASSERT(desc < &oldHash->recv_wait + oldHash->length);
   353             if (NULL != *desc)
   354             {
   355                 hrv = MW_AddHashInternal(*desc, newHash);
   356                 PR_ASSERT(_prmw_error != hrv);
   357                 if (_prmw_success != hrv)
   358                 {
   359                     PR_DELETE(newHash);
   360                     retry = PR_TRUE;
   361                     break;
   362                 }
   363             }
   364         }
   365         if (retry) continue;
   367         PR_DELETE(group->waiter);
   368         group->waiter = newHash;
   369         group->p_timestamp += 1;
   370         return _prmw_success;
   371     }
   373     PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
   374     return _prmw_error;  /* we're hosed */
   375 }  /* MW_ExpandHashInternal */
   377 #ifndef WINNT
   378 static void _MW_DoneInternal(
   379     PRWaitGroup *group, PRRecvWait **waiter, PRMWStatus outcome)
   380 {
   381     /*
   382     ** Add this receive wait object to the list of finished I/O
   383     ** operations for this particular group. If there are other
   384     ** threads waiting on the group, notify one. If not, arrange
   385     ** for this thread to return.
   386     */
   388 #if 0
   389     printf("Removing 0x%x->0x%x\n", *waiter, (*waiter)->fd);
   390 #endif
   391     (*waiter)->outcome = outcome;
   392     PR_APPEND_LINK(&((*waiter)->internal), &group->io_ready);
   393     PR_NotifyCondVar(group->io_complete);
   394     PR_ASSERT(0 != group->waiter->count);
   395     group->waiter->count -= 1;
   396     *waiter = NULL;
   397 }  /* _MW_DoneInternal */
   398 #endif /* WINNT */
   400 static PRRecvWait **_MW_LookupInternal(PRWaitGroup *group, PRFileDesc *fd)
   401 {
   402     /*
   403     ** Find the receive wait object corresponding to the file descriptor.
   404     ** Only search the wait group specified.
   405     */
   406     PRRecvWait **desc;
   407     PRIntn rehash = _MW_REHASH_MAX;
   408     _PRWaiterHash *hash = group->waiter;
   409     PRUintn hidx = _MW_HASH(fd, hash->length);
   410     PRUintn hoffset = 0;
   412     while (rehash-- > 0)
   413     {
   414         desc = (&hash->recv_wait) + hidx;
   415         if ((*desc != NULL) && ((*desc)->fd == fd)) return desc;
   416         if (0 == hoffset)
   417         {
   418             hoffset = _MW_HASH2(fd, hash->length);
   419             PR_ASSERT(0 != hoffset);
   420         }
   421         hidx = (hidx + hoffset) % (hash->length);
   422     }
   423     return NULL;
   424 }  /* _MW_LookupInternal */
   426 #ifndef WINNT
   427 static PRStatus _MW_PollInternal(PRWaitGroup *group)
   428 {
   429     PRRecvWait **waiter;
   430     PRStatus rv = PR_FAILURE;
   431     PRInt32 count, count_ready;
   432     PRIntervalTime polling_interval;
   434     group->poller = PR_GetCurrentThread();
   436     while (PR_TRUE)
   437     {
   438         PRIntervalTime now, since_last_poll;
   439         PRPollDesc *poll_list;
   441         while (0 == group->waiter->count)
   442         {
   443             PRStatus st;
   444             st = PR_WaitCondVar(group->new_business, PR_INTERVAL_NO_TIMEOUT);
   445             if (_prmw_running != group->state)
   446             {
   447                 PR_SetError(PR_INVALID_STATE_ERROR, 0);
   448                 goto aborted;
   449             }
   450             if (_MW_ABORTED(st)) goto aborted;
   451         }
   453         /*
   454         ** There's something to do. See if our existing polling list
   455         ** is large enough for what we have to do?
   456         */
   458         while (group->polling_count < group->waiter->count)
   459         {
   460             PRUint32 old_count = group->waiter->count;
   461             PRUint32 new_count = PR_ROUNDUP(old_count, _PR_POLL_COUNT_FUDGE);
   462             PRSize new_size = sizeof(PRPollDesc) * new_count;
   463             PRPollDesc *old_polling_list = group->polling_list;
   465             PR_Unlock(group->ml);
   466             poll_list = (PRPollDesc*)PR_CALLOC(new_size);
   467             if (NULL == poll_list)
   468             {
   469                 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
   470                 PR_Lock(group->ml);
   471                 goto failed_alloc;
   472             }
   473             if (NULL != old_polling_list)
   474                 PR_DELETE(old_polling_list);
   475             PR_Lock(group->ml);
   476             if (_prmw_running != group->state)
   477             {
   478                 PR_SetError(PR_INVALID_STATE_ERROR, 0);
   479                 goto aborted;
   480             }
   481             group->polling_list = poll_list;
   482             group->polling_count = new_count;
   483         }
   485         now = PR_IntervalNow();
   486         polling_interval = max_polling_interval;
   487         since_last_poll = now - group->last_poll;
   489         waiter = &group->waiter->recv_wait;
   490         poll_list = group->polling_list;
   491         for (count = 0; count < group->waiter->count; ++waiter)
   492         {
   493             PR_ASSERT(waiter < &group->waiter->recv_wait
   494                 + group->waiter->length);
   495             if (NULL != *waiter)  /* a live one! */
   496             {
   497                 if ((PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout)
   498                 && (since_last_poll >= (*waiter)->timeout))
   499                     _MW_DoneInternal(group, waiter, PR_MW_TIMEOUT);
   500                 else
   501                 {
   502                     if (PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout)
   503                     {
   504                         (*waiter)->timeout -= since_last_poll;
   505                         if ((*waiter)->timeout < polling_interval)
   506                             polling_interval = (*waiter)->timeout;
   507                     }
   508                     PR_ASSERT(poll_list < group->polling_list
   509                         + group->polling_count);
   510                     poll_list->fd = (*waiter)->fd;
   511                     poll_list->in_flags = PR_POLL_READ;
   512                     poll_list->out_flags = 0;
   513 #if 0
   514                     printf(
   515                         "Polling 0x%x[%d]: [fd: 0x%x, tmo: %u]\n",
   516                         poll_list, count, poll_list->fd, (*waiter)->timeout);
   517 #endif
   518                     poll_list += 1;
   519                     count += 1;
   520                 }
   521             }
   522         } 
   524         PR_ASSERT(count == group->waiter->count);
   526         /*
   527         ** If there are no more threads waiting for completion,
   528         ** we need to return.
   529         */
   530         if ((!PR_CLIST_IS_EMPTY(&group->io_ready))
   531         && (1 == group->waiting_threads)) break;
   533         if (0 == count) continue;  /* wait for new business */
   535         group->last_poll = now;
   537         PR_Unlock(group->ml);
   539         count_ready = PR_Poll(group->polling_list, count, polling_interval);
   541         PR_Lock(group->ml);
   543         if (_prmw_running != group->state)
   544         {
   545             PR_SetError(PR_INVALID_STATE_ERROR, 0);
   546             goto aborted;
   547         }
   548         if (-1 == count_ready)
   549         {
   550             goto failed_poll;  /* that's a shame */
   551         }
   552         else if (0 < count_ready)
   553         {
   554             for (poll_list = group->polling_list; count > 0;
   555             poll_list++, count--)
   556             {
   557                 PR_ASSERT(
   558                     poll_list < group->polling_list + group->polling_count);
   559                 if (poll_list->out_flags != 0)
   560                 {
   561                     waiter = _MW_LookupInternal(group, poll_list->fd);
   562                     /*
   563                     ** If 'waiter' is NULL, that means the wait receive
   564                     ** descriptor has been canceled.
   565                     */
   566                     if (NULL != waiter)
   567                         _MW_DoneInternal(group, waiter, PR_MW_SUCCESS);
   568                 }
   569             }
   570         }
   571         /*
   572         ** If there are no more threads waiting for completion,
   573         ** we need to return.
   574         ** This thread was "borrowed" to do the polling, but it really
   575         ** belongs to the client.
   576         */
   577         if ((!PR_CLIST_IS_EMPTY(&group->io_ready))
   578         && (1 == group->waiting_threads)) break;
   579     }
   581     rv = PR_SUCCESS;
   583 aborted:
   584 failed_poll:
   585 failed_alloc:
   586     group->poller = NULL;  /* we were that, not we ain't */
   587     if ((_prmw_running == group->state) && (group->waiting_threads > 1))
   588     {
   589         /* Wake up one thread to become the new poller. */
   590         PR_NotifyCondVar(group->io_complete);
   591     }
   592     return rv;  /* we return with the lock held */
   593 }  /* _MW_PollInternal */
   594 #endif /* !WINNT */
   596 static PRMWGroupState MW_TestForShutdownInternal(PRWaitGroup *group)
   597 {
   598     PRMWGroupState rv = group->state;
   599     /*
   600     ** Looking at the group's fields is safe because
   601     ** once the group's state is no longer running, it
   602     ** cannot revert and there is a safe check on entry
   603     ** to make sure no more threads are made to wait.
   604     */
   605     if ((_prmw_stopping == rv)
   606     && (0 == group->waiting_threads))
   607     {
   608         rv = group->state = _prmw_stopped;
   609         PR_NotifyCondVar(group->mw_manage);
   610     }
   611     return rv;
   612 }  /* MW_TestForShutdownInternal */
   614 #ifndef WINNT
   615 static void _MW_InitialRecv(PRCList *io_ready)
   616 {
   617     PRRecvWait *desc = (PRRecvWait*)io_ready;
   618     if ((NULL == desc->buffer.start)
   619     || (0 == desc->buffer.length))
   620         desc->bytesRecv = 0;
   621     else
   622     {
   623         desc->bytesRecv = (desc->fd->methods->recv)(
   624             desc->fd, desc->buffer.start,
   625             desc->buffer.length, 0, desc->timeout);
   626         if (desc->bytesRecv < 0)  /* SetError should already be there */
   627             desc->outcome = PR_MW_FAILURE;
   628     }
   629 }  /* _MW_InitialRecv */
   630 #endif
   632 #ifdef WINNT
   633 static void NT_TimeProc(void *arg)
   634 {
   635     _MDOverlapped *overlapped = (_MDOverlapped *)arg;
   636     PRRecvWait *desc =  overlapped->data.mw.desc;
   637     PRFileDesc *bottom;
   639     if (InterlockedCompareExchange((LONG *)&desc->outcome,
   640         (LONG)PR_MW_TIMEOUT, (LONG)PR_MW_PENDING) != (LONG)PR_MW_PENDING)
   641     {
   642         /* This wait recv descriptor has already completed. */
   643         return;
   644     }
   646     /* close the osfd to abort the outstanding async io request */
   647     /* $$$$
   648     ** Little late to be checking if NSPR's on the bottom of stack,
   649     ** but if we don't check, we can't assert that the private data
   650     ** is what we think it is.
   651     ** $$$$
   652     */
   653     bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
   654     PR_ASSERT(NULL != bottom);
   655     if (NULL != bottom)  /* now what!?!?! */
   656     {
   657         bottom->secret->state = _PR_FILEDESC_CLOSED;
   658         if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
   659         {
   660             fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
   661             PR_ASSERT(!"What shall I do?");
   662         }
   663     }
   664     return;
   665 }  /* NT_TimeProc */
   667 static PRStatus NT_HashRemove(PRWaitGroup *group, PRFileDesc *fd)
   668 {
   669     PRRecvWait **waiter;
   671     _PR_MD_LOCK(&group->mdlock);
   672     waiter = _MW_LookupInternal(group, fd);
   673     if (NULL != waiter)
   674     {
   675         group->waiter->count -= 1;
   676         *waiter = NULL;
   677     }
   678     _PR_MD_UNLOCK(&group->mdlock);
   679     return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
   680 }
   682 PRStatus NT_HashRemoveInternal(PRWaitGroup *group, PRFileDesc *fd)
   683 {
   684     PRRecvWait **waiter;
   686     waiter = _MW_LookupInternal(group, fd);
   687     if (NULL != waiter)
   688     {
   689         group->waiter->count -= 1;
   690         *waiter = NULL;
   691     }
   692     return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
   693 }
   694 #endif /* WINNT */
   696 /******************************************************************/
   697 /******************************************************************/
   698 /********************** The public API portion ********************/
   699 /******************************************************************/
   700 /******************************************************************/
   701 PR_IMPLEMENT(PRStatus) PR_AddWaitFileDesc(
   702     PRWaitGroup *group, PRRecvWait *desc)
   703 {
   704     _PR_HashStory hrv;
   705     PRStatus rv = PR_FAILURE;
   706 #ifdef WINNT
   707     _MDOverlapped *overlapped;
   708     HANDLE hFile;
   709     BOOL bResult;
   710     DWORD dwError;
   711     PRFileDesc *bottom;
   712 #endif
   714     if (!_pr_initialized) _PR_ImplicitInitialization();
   715     if ((NULL == group) && (NULL == (group = MW_Init2())))
   716     {
   717         return rv;
   718     }
   720     PR_ASSERT(NULL != desc->fd);
   722     desc->outcome = PR_MW_PENDING;  /* nice, well known value */
   723     desc->bytesRecv = 0;  /* likewise, though this value is ambiguious */
   725     PR_Lock(group->ml);
   727     if (_prmw_running != group->state)
   728     {
   729         /* Not allowed to add after cancelling the group */
   730         desc->outcome = PR_MW_INTERRUPT;
   731         PR_SetError(PR_INVALID_STATE_ERROR, 0);
   732         PR_Unlock(group->ml);
   733         return rv;
   734     }
   736 #ifdef WINNT
   737     _PR_MD_LOCK(&group->mdlock);
   738 #endif
   740     /*
   741     ** If the waiter count is zero at this point, there's no telling
   742     ** how long we've been idle. Therefore, initialize the beginning
   743     ** of the timing interval. As long as the list doesn't go empty,
   744     ** it will maintain itself.
   745     */
   746     if (0 == group->waiter->count)
   747         group->last_poll = PR_IntervalNow();
   749     do
   750     {
   751         hrv = MW_AddHashInternal(desc, group->waiter);
   752         if (_prmw_rehash != hrv) break;
   753         hrv = MW_ExpandHashInternal(group);  /* gruesome */
   754         if (_prmw_success != hrv) break;
   755     } while (PR_TRUE);
   757 #ifdef WINNT
   758     _PR_MD_UNLOCK(&group->mdlock);
   759 #endif
   761     PR_NotifyCondVar(group->new_business);  /* tell the world */
   762     rv = (_prmw_success == hrv) ? PR_SUCCESS : PR_FAILURE;
   763     PR_Unlock(group->ml);
   765 #ifdef WINNT
   766     overlapped = PR_NEWZAP(_MDOverlapped);
   767     if (NULL == overlapped)
   768     {
   769         PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
   770         NT_HashRemove(group, desc->fd);
   771         return rv;
   772     }
   773     overlapped->ioModel = _MD_MultiWaitIO;
   774     overlapped->data.mw.desc = desc;
   775     overlapped->data.mw.group = group;
   776     if (desc->timeout != PR_INTERVAL_NO_TIMEOUT)
   777     {
   778         overlapped->data.mw.timer = CreateTimer(
   779             desc->timeout,
   780             NT_TimeProc,
   781             overlapped);
   782         if (0 == overlapped->data.mw.timer)
   783         {
   784             NT_HashRemove(group, desc->fd);
   785             PR_DELETE(overlapped);
   786             /*
   787              * XXX It appears that a maximum of 16 timer events can
   788              * be outstanding. GetLastError() returns 0 when I try it.
   789              */
   790             PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, GetLastError());
   791             return PR_FAILURE;
   792         }
   793     }
   795     /* Reach to the bottom layer to get the OS fd */
   796     bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
   797     PR_ASSERT(NULL != bottom);
   798     if (NULL == bottom)
   799     {
   800         PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
   801         return PR_FAILURE;
   802     }
   803     hFile = (HANDLE)bottom->secret->md.osfd; 
   804     if (!bottom->secret->md.io_model_committed)
   805     {
   806         PRInt32 st;
   807         st = _md_Associate(hFile);
   808         PR_ASSERT(0 != st);
   809         bottom->secret->md.io_model_committed = PR_TRUE;
   810     }
   811     bResult = ReadFile(hFile,
   812         desc->buffer.start,
   813         (DWORD)desc->buffer.length,
   814         NULL,
   815         &overlapped->overlapped);
   816     if (FALSE == bResult && (dwError = GetLastError()) != ERROR_IO_PENDING)
   817     {
   818         if (desc->timeout != PR_INTERVAL_NO_TIMEOUT)
   819         {
   820             if (InterlockedCompareExchange((LONG *)&desc->outcome,
   821                 (LONG)PR_MW_FAILURE, (LONG)PR_MW_PENDING)
   822                 == (LONG)PR_MW_PENDING)
   823             {
   824                 CancelTimer(overlapped->data.mw.timer);
   825             }
   826             NT_HashRemove(group, desc->fd);
   827             PR_DELETE(overlapped);
   828         }
   829         _PR_MD_MAP_READ_ERROR(dwError);
   830         rv = PR_FAILURE;
   831     }
   832 #endif
   834     return rv;
   835 }  /* PR_AddWaitFileDesc */
   837 PR_IMPLEMENT(PRRecvWait*) PR_WaitRecvReady(PRWaitGroup *group)
   838 {
   839     PRCList *io_ready = NULL;
   840 #ifdef WINNT
   841     PRThread *me = _PR_MD_CURRENT_THREAD();
   842     _MDOverlapped *overlapped;    
   843 #endif
   845     if (!_pr_initialized) _PR_ImplicitInitialization();
   846     if ((NULL == group) && (NULL == (group = MW_Init2()))) goto failed_init;
   848     PR_Lock(group->ml);
   850     if (_prmw_running != group->state)
   851     {
   852         PR_SetError(PR_INVALID_STATE_ERROR, 0);
   853         goto invalid_state;
   854     }
   856     group->waiting_threads += 1;  /* the polling thread is counted */
   858 #ifdef WINNT
   859     _PR_MD_LOCK(&group->mdlock);
   860     while (PR_CLIST_IS_EMPTY(&group->io_ready))
   861     {
   862         _PR_THREAD_LOCK(me);
   863         me->state = _PR_IO_WAIT;
   864         PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
   865         if (!_PR_IS_NATIVE_THREAD(me))
   866         {
   867             _PR_SLEEPQ_LOCK(me->cpu);
   868             _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
   869             _PR_SLEEPQ_UNLOCK(me->cpu);
   870         }
   871         _PR_THREAD_UNLOCK(me);
   872         _PR_MD_UNLOCK(&group->mdlock);
   873         PR_Unlock(group->ml);
   874         _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
   875         me->state = _PR_RUNNING;
   876         PR_Lock(group->ml);
   877         _PR_MD_LOCK(&group->mdlock);
   878         if (_PR_PENDING_INTERRUPT(me)) {
   879             PR_REMOVE_LINK(&me->waitQLinks);
   880             _PR_MD_UNLOCK(&group->mdlock);
   881             me->flags &= ~_PR_INTERRUPT;
   882             me->io_suspended = PR_FALSE;
   883             PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
   884             goto aborted;
   885         }
   886     }
   887     io_ready = PR_LIST_HEAD(&group->io_ready);
   888     PR_ASSERT(io_ready != NULL);
   889     PR_REMOVE_LINK(io_ready);
   890     _PR_MD_UNLOCK(&group->mdlock);
   891     overlapped = (_MDOverlapped *)
   892         ((char *)io_ready - offsetof(_MDOverlapped, data));
   893     io_ready = &overlapped->data.mw.desc->internal;
   894 #else
   895     do
   896     {
   897         /*
   898         ** If the I/O ready list isn't empty, have this thread
   899         ** return with the first receive wait object that's available.
   900         */
   901         if (PR_CLIST_IS_EMPTY(&group->io_ready))
   902         {
   903             /*
   904             ** Is there a polling thread yet? If not, grab this thread
   905             ** and use it.
   906             */
   907             if (NULL == group->poller)
   908             {
   909                 /*
   910                 ** This thread will stay do polling until it becomes the only one
   911                 ** left to service a completion. Then it will return and there will
   912                 ** be none left to actually poll or to run completions.
   913                 **
   914                 ** The polling function should only return w/ failure or
   915                 ** with some I/O ready.
   916                 */
   917                 if (PR_FAILURE == _MW_PollInternal(group)) goto failed_poll;
   918             }
   919             else
   920             {
   921                 /*
   922                 ** There are four reasons a thread can be awakened from
   923                 ** a wait on the io_complete condition variable.
   924                 ** 1. Some I/O has completed, i.e., the io_ready list
   925                 **    is nonempty.
   926                 ** 2. The wait group is canceled.
   927                 ** 3. The thread is interrupted.
   928                 ** 4. The current polling thread has to leave and needs
   929                 **    a replacement.
   930                 ** The logic to find a new polling thread is made more
   931                 ** complicated by all the other possible events.
   932                 ** I tried my best to write the logic clearly, but
   933                 ** it is still full of if's with continue and goto.
   934                 */
   935                 PRStatus st;
   936                 do 
   937                 {
   938                     st = PR_WaitCondVar(group->io_complete, PR_INTERVAL_NO_TIMEOUT);
   939                     if (_prmw_running != group->state)
   940                     {
   941                         PR_SetError(PR_INVALID_STATE_ERROR, 0);
   942                         goto aborted;
   943                     }
   944                     if (_MW_ABORTED(st) || (NULL == group->poller)) break;
   945                 } while (PR_CLIST_IS_EMPTY(&group->io_ready));
   947                 /*
   948                 ** The thread is interrupted and has to leave.  It might
   949                 ** have also been awakened to process ready i/o or be the
   950                 ** new poller.  To be safe, if either condition is true,
   951                 ** we awaken another thread to take its place.
   952                 */
   953                 if (_MW_ABORTED(st))
   954                 {
   955                     if ((NULL == group->poller
   956                     || !PR_CLIST_IS_EMPTY(&group->io_ready))
   957                     && group->waiting_threads > 1)
   958                         PR_NotifyCondVar(group->io_complete);
   959                     goto aborted;
   960                 }
   962                 /*
   963                 ** A new poller is needed, but can I be the new poller?
   964                 ** If there is no i/o ready, sure.  But if there is any
   965                 ** i/o ready, it has a higher priority.  I want to
   966                 ** process the ready i/o first and wake up another
   967                 ** thread to be the new poller.
   968                 */ 
   969                 if (NULL == group->poller)
   970                 {
   971                     if (PR_CLIST_IS_EMPTY(&group->io_ready))
   972                         continue;
   973                     if (group->waiting_threads > 1)
   974                         PR_NotifyCondVar(group->io_complete);
   975                 }
   976             }
   977             PR_ASSERT(!PR_CLIST_IS_EMPTY(&group->io_ready));
   978         }
   979         io_ready = PR_LIST_HEAD(&group->io_ready);
   980         PR_NotifyCondVar(group->io_taken);
   981         PR_ASSERT(io_ready != NULL);
   982         PR_REMOVE_LINK(io_ready);
   983     } while (NULL == io_ready);
   985 failed_poll:
   987 #endif
   989 aborted:
   991     group->waiting_threads -= 1;
   992 invalid_state:
   993     (void)MW_TestForShutdownInternal(group);
   994     PR_Unlock(group->ml);
   996 failed_init:
   997     if (NULL != io_ready)
   998     {
   999         /* If the operation failed, record the reason why */
  1000         switch (((PRRecvWait*)io_ready)->outcome)
  1002             case PR_MW_PENDING:
  1003                 PR_ASSERT(0);
  1004                 break;
  1005             case PR_MW_SUCCESS:
  1006 #ifndef WINNT
  1007                 _MW_InitialRecv(io_ready);
  1008 #endif
  1009                 break;
  1010 #ifdef WINNT
  1011             case PR_MW_FAILURE:
  1012                 _PR_MD_MAP_READ_ERROR(overlapped->data.mw.error);
  1013                 break;
  1014 #endif
  1015             case PR_MW_TIMEOUT:
  1016                 PR_SetError(PR_IO_TIMEOUT_ERROR, 0);
  1017                 break;
  1018             case PR_MW_INTERRUPT:
  1019                 PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
  1020                 break;
  1021             default: break;
  1023 #ifdef WINNT
  1024         if (NULL != overlapped->data.mw.timer)
  1026             PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
  1027                 != overlapped->data.mw.desc->timeout);
  1028             CancelTimer(overlapped->data.mw.timer);
  1030         else
  1032             PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
  1033                 == overlapped->data.mw.desc->timeout);
  1035         PR_DELETE(overlapped);
  1036 #endif
  1038     return (PRRecvWait*)io_ready;
  1039 }  /* PR_WaitRecvReady */
  1041 PR_IMPLEMENT(PRStatus) PR_CancelWaitFileDesc(PRWaitGroup *group, PRRecvWait *desc)
  1043 #if !defined(WINNT)
  1044     PRRecvWait **recv_wait;
  1045 #endif
  1046     PRStatus rv = PR_SUCCESS;
  1047     if (NULL == group) group = mw_state->group;
  1048     PR_ASSERT(NULL != group);
  1049     if (NULL == group)
  1051         PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
  1052         return PR_FAILURE;
  1055     PR_Lock(group->ml);
  1057     if (_prmw_running != group->state)
  1059         PR_SetError(PR_INVALID_STATE_ERROR, 0);
  1060         rv = PR_FAILURE;
  1061         goto unlock;
  1064 #ifdef WINNT
  1065     if (InterlockedCompareExchange((LONG *)&desc->outcome,
  1066         (LONG)PR_MW_INTERRUPT, (LONG)PR_MW_PENDING) == (LONG)PR_MW_PENDING)
  1068         PRFileDesc *bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
  1069         PR_ASSERT(NULL != bottom);
  1070         if (NULL == bottom)
  1072             PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
  1073             goto unlock;
  1075         bottom->secret->state = _PR_FILEDESC_CLOSED;
  1076 #if 0
  1077         fprintf(stderr, "cancel wait recv: closing socket\n");
  1078 #endif
  1079         if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
  1081             fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
  1082             exit(1);
  1085 #else
  1086     if (NULL != (recv_wait = _MW_LookupInternal(group, desc->fd)))
  1088         /* it was in the wait table */
  1089         _MW_DoneInternal(group, recv_wait, PR_MW_INTERRUPT);
  1090         goto unlock;
  1092     if (!PR_CLIST_IS_EMPTY(&group->io_ready))
  1094         /* is it already complete? */
  1095         PRCList *head = PR_LIST_HEAD(&group->io_ready);
  1096         do
  1098             PRRecvWait *done = (PRRecvWait*)head;
  1099             if (done == desc) goto unlock;
  1100             head = PR_NEXT_LINK(head);
  1101         } while (head != &group->io_ready);
  1103     PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
  1104     rv = PR_FAILURE;
  1106 #endif
  1107 unlock:
  1108     PR_Unlock(group->ml);
  1109     return rv;
  1110 }  /* PR_CancelWaitFileDesc */
  1112 PR_IMPLEMENT(PRRecvWait*) PR_CancelWaitGroup(PRWaitGroup *group)
  1114     PRRecvWait **desc;
  1115     PRRecvWait *recv_wait = NULL;
  1116 #ifdef WINNT
  1117     _MDOverlapped *overlapped;
  1118     PRRecvWait **end;
  1119     PRThread *me = _PR_MD_CURRENT_THREAD();
  1120 #endif
  1122     if (NULL == group) group = mw_state->group;
  1123     PR_ASSERT(NULL != group);
  1124     if (NULL == group)
  1126         PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
  1127         return NULL;
  1130     PR_Lock(group->ml);
  1131     if (_prmw_stopped != group->state)
  1133         if (_prmw_running == group->state)
  1134             group->state = _prmw_stopping;  /* so nothing new comes in */
  1135         if (0 == group->waiting_threads)  /* is there anybody else? */
  1136             group->state = _prmw_stopped;  /* we can stop right now */
  1137         else
  1139             PR_NotifyAllCondVar(group->new_business);
  1140             PR_NotifyAllCondVar(group->io_complete);
  1142         while (_prmw_stopped != group->state)
  1143             (void)PR_WaitCondVar(group->mw_manage, PR_INTERVAL_NO_TIMEOUT);
  1146 #ifdef WINNT
  1147     _PR_MD_LOCK(&group->mdlock);
  1148 #endif
  1149     /* make all the existing descriptors look done/interrupted */
  1150 #ifdef WINNT
  1151     end = &group->waiter->recv_wait + group->waiter->length;
  1152     for (desc = &group->waiter->recv_wait; desc < end; ++desc)
  1154         if (NULL != *desc)
  1156             if (InterlockedCompareExchange((LONG *)&(*desc)->outcome,
  1157                 (LONG)PR_MW_INTERRUPT, (LONG)PR_MW_PENDING)
  1158                 == (LONG)PR_MW_PENDING)
  1160                 PRFileDesc *bottom = PR_GetIdentitiesLayer(
  1161                     (*desc)->fd, PR_NSPR_IO_LAYER);
  1162                 PR_ASSERT(NULL != bottom);
  1163                 if (NULL == bottom)
  1165                     PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
  1166                     goto invalid_arg;
  1168                 bottom->secret->state = _PR_FILEDESC_CLOSED;
  1169 #if 0
  1170                 fprintf(stderr, "cancel wait group: closing socket\n");
  1171 #endif
  1172                 if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
  1174                     fprintf(stderr, "closesocket failed: %d\n",
  1175                         WSAGetLastError());
  1176                     exit(1);
  1181     while (group->waiter->count > 0)
  1183         _PR_THREAD_LOCK(me);
  1184         me->state = _PR_IO_WAIT;
  1185         PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
  1186         if (!_PR_IS_NATIVE_THREAD(me))
  1188             _PR_SLEEPQ_LOCK(me->cpu);
  1189             _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
  1190             _PR_SLEEPQ_UNLOCK(me->cpu);
  1192         _PR_THREAD_UNLOCK(me);
  1193         _PR_MD_UNLOCK(&group->mdlock);
  1194         PR_Unlock(group->ml);
  1195         _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
  1196         me->state = _PR_RUNNING;
  1197         PR_Lock(group->ml);
  1198         _PR_MD_LOCK(&group->mdlock);
  1200 #else
  1201     for (desc = &group->waiter->recv_wait; group->waiter->count > 0; ++desc)
  1203         PR_ASSERT(desc < &group->waiter->recv_wait + group->waiter->length);
  1204         if (NULL != *desc)
  1205             _MW_DoneInternal(group, desc, PR_MW_INTERRUPT);
  1207 #endif
  1209     /* take first element of finished list and return it or NULL */
  1210     if (PR_CLIST_IS_EMPTY(&group->io_ready))
  1211         PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
  1212     else
  1214         PRCList *head = PR_LIST_HEAD(&group->io_ready);
  1215         PR_REMOVE_AND_INIT_LINK(head);
  1216 #ifdef WINNT
  1217         overlapped = (_MDOverlapped *)
  1218             ((char *)head - offsetof(_MDOverlapped, data));
  1219         head = &overlapped->data.mw.desc->internal;
  1220         if (NULL != overlapped->data.mw.timer)
  1222             PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
  1223                 != overlapped->data.mw.desc->timeout);
  1224             CancelTimer(overlapped->data.mw.timer);
  1226         else
  1228             PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
  1229                 == overlapped->data.mw.desc->timeout);
  1231         PR_DELETE(overlapped);
  1232 #endif
  1233         recv_wait = (PRRecvWait*)head;
  1235 #ifdef WINNT
  1236 invalid_arg:
  1237     _PR_MD_UNLOCK(&group->mdlock);
  1238 #endif
  1239     PR_Unlock(group->ml);
  1241     return recv_wait;
  1242 }  /* PR_CancelWaitGroup */
  1244 PR_IMPLEMENT(PRWaitGroup*) PR_CreateWaitGroup(PRInt32 size /* ignored */)
  1246     PRWaitGroup *wg;
  1248     if (NULL == (wg = PR_NEWZAP(PRWaitGroup)))
  1250         PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
  1251         goto failed;
  1253     /* the wait group itself */
  1254     wg->ml = PR_NewLock();
  1255     if (NULL == wg->ml) goto failed_lock;
  1256     wg->io_taken = PR_NewCondVar(wg->ml);
  1257     if (NULL == wg->io_taken) goto failed_cvar0;
  1258     wg->io_complete = PR_NewCondVar(wg->ml);
  1259     if (NULL == wg->io_complete) goto failed_cvar1;
  1260     wg->new_business = PR_NewCondVar(wg->ml);
  1261     if (NULL == wg->new_business) goto failed_cvar2;
  1262     wg->mw_manage = PR_NewCondVar(wg->ml);
  1263     if (NULL == wg->mw_manage) goto failed_cvar3;
  1265     PR_INIT_CLIST(&wg->group_link);
  1266     PR_INIT_CLIST(&wg->io_ready);
  1268     /* the waiters sequence */
  1269     wg->waiter = (_PRWaiterHash*)PR_CALLOC(
  1270         sizeof(_PRWaiterHash) +
  1271         (_PR_DEFAULT_HASH_LENGTH * sizeof(PRRecvWait*)));
  1272     if (NULL == wg->waiter)
  1274         PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
  1275         goto failed_waiter;
  1277     wg->waiter->count = 0;
  1278     wg->waiter->length = _PR_DEFAULT_HASH_LENGTH;
  1280 #ifdef WINNT
  1281     _PR_MD_NEW_LOCK(&wg->mdlock);
  1282     PR_INIT_CLIST(&wg->wait_list);
  1283 #endif /* WINNT */
  1285     PR_Lock(mw_lock);
  1286     PR_APPEND_LINK(&wg->group_link, &mw_state->group_list);
  1287     PR_Unlock(mw_lock);
  1288     return wg;
  1290 failed_waiter:
  1291     PR_DestroyCondVar(wg->mw_manage);
  1292 failed_cvar3:
  1293     PR_DestroyCondVar(wg->new_business);
  1294 failed_cvar2:
  1295     PR_DestroyCondVar(wg->io_complete);
  1296 failed_cvar1:
  1297     PR_DestroyCondVar(wg->io_taken);
  1298 failed_cvar0:
  1299     PR_DestroyLock(wg->ml);
  1300 failed_lock:
  1301     PR_DELETE(wg);
  1302     wg = NULL;
  1304 failed:
  1305     return wg;
  1306 }  /* MW_CreateWaitGroup */
  1308 PR_IMPLEMENT(PRStatus) PR_DestroyWaitGroup(PRWaitGroup *group)
  1310     PRStatus rv = PR_SUCCESS;
  1311     if (NULL == group) group = mw_state->group;
  1312     PR_ASSERT(NULL != group);
  1313     if (NULL != group)
  1315         PR_Lock(group->ml);
  1316         if ((group->waiting_threads == 0)
  1317         && (group->waiter->count == 0)
  1318         && PR_CLIST_IS_EMPTY(&group->io_ready))
  1320             group->state = _prmw_stopped;
  1322         else
  1324             PR_SetError(PR_INVALID_STATE_ERROR, 0);
  1325             rv = PR_FAILURE;
  1327         PR_Unlock(group->ml);
  1328         if (PR_FAILURE == rv) return rv;
  1330         PR_Lock(mw_lock);
  1331         PR_REMOVE_LINK(&group->group_link);
  1332         PR_Unlock(mw_lock);
  1334 #ifdef WINNT
  1335         /*
  1336          * XXX make sure wait_list is empty and waiter is empty.
  1337          * These must be checked while holding mdlock.
  1338          */
  1339         _PR_MD_FREE_LOCK(&group->mdlock);
  1340 #endif
  1342         PR_DELETE(group->waiter);
  1343         PR_DELETE(group->polling_list);
  1344         PR_DestroyCondVar(group->mw_manage);
  1345         PR_DestroyCondVar(group->new_business);
  1346         PR_DestroyCondVar(group->io_complete);
  1347         PR_DestroyCondVar(group->io_taken);
  1348         PR_DestroyLock(group->ml);
  1349         if (group == mw_state->group) mw_state->group = NULL;
  1350         PR_DELETE(group);
  1352     else
  1354         /* The default wait group is not created yet. */
  1355         PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
  1356         rv = PR_FAILURE;
  1358     return rv;
  1359 }  /* PR_DestroyWaitGroup */
  1361 /**********************************************************************
  1362 ***********************************************************************
  1363 ******************** Wait group enumerations **************************
  1364 ***********************************************************************
  1365 **********************************************************************/
  1367 PR_IMPLEMENT(PRMWaitEnumerator*) PR_CreateMWaitEnumerator(PRWaitGroup *group)
  1369     PRMWaitEnumerator *enumerator = PR_NEWZAP(PRMWaitEnumerator);
  1370     if (NULL == enumerator) PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
  1371     else
  1373         enumerator->group = group;
  1374         enumerator->seal = _PR_ENUM_SEALED;
  1376     return enumerator;
  1377 }  /* PR_CreateMWaitEnumerator */
  1379 PR_IMPLEMENT(PRStatus) PR_DestroyMWaitEnumerator(PRMWaitEnumerator* enumerator)
  1381     PR_ASSERT(NULL != enumerator);
  1382     PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
  1383     if ((NULL == enumerator) || (_PR_ENUM_SEALED != enumerator->seal))
  1385         PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
  1386         return PR_FAILURE;
  1388     enumerator->seal = _PR_ENUM_UNSEALED;
  1389     PR_Free(enumerator);
  1390     return PR_SUCCESS;
  1391 }  /* PR_DestroyMWaitEnumerator */
  1393 PR_IMPLEMENT(PRRecvWait*) PR_EnumerateWaitGroup(
  1394     PRMWaitEnumerator *enumerator, const PRRecvWait *previous)
  1396     PRRecvWait *result = NULL;
  1398     /* entry point sanity checking */
  1399     PR_ASSERT(NULL != enumerator);
  1400     PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
  1401     if ((NULL == enumerator)
  1402     || (_PR_ENUM_SEALED != enumerator->seal)) goto bad_argument;
  1404     /* beginning of enumeration */
  1405     if (NULL == previous)
  1407         if (NULL == enumerator->group)
  1409             enumerator->group = mw_state->group;
  1410             if (NULL == enumerator->group)
  1412                 PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
  1413                 return NULL;
  1416         enumerator->waiter = &enumerator->group->waiter->recv_wait;
  1417         enumerator->p_timestamp = enumerator->group->p_timestamp;
  1418         enumerator->thread = PR_GetCurrentThread();
  1419         enumerator->index = 0;
  1421     /* continuing an enumeration */
  1422     else
  1424         PRThread *me = PR_GetCurrentThread();
  1425         PR_ASSERT(me == enumerator->thread);
  1426         if (me != enumerator->thread) goto bad_argument;
  1428         /* need to restart the enumeration */
  1429         if (enumerator->p_timestamp != enumerator->group->p_timestamp)
  1430             return PR_EnumerateWaitGroup(enumerator, NULL);
  1433     /* actually progress the enumeration */
  1434 #if defined(WINNT)
  1435     _PR_MD_LOCK(&enumerator->group->mdlock);
  1436 #else
  1437     PR_Lock(enumerator->group->ml);
  1438 #endif
  1439     while (enumerator->index++ < enumerator->group->waiter->length)
  1441         if (NULL != (result = *(enumerator->waiter)++)) break;
  1443 #if defined(WINNT)
  1444     _PR_MD_UNLOCK(&enumerator->group->mdlock);
  1445 #else
  1446     PR_Unlock(enumerator->group->ml);
  1447 #endif
  1449     return result;  /* what we live for */
  1451 bad_argument:
  1452     PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
  1453     return NULL;  /* probably ambiguous */
  1454 }  /* PR_EnumerateWaitGroup */
  1456 /* prmwait.c */

mercurial