1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/nsprpub/pr/src/io/prmwait.c Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,1456 @@ 1.4 +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 1.5 +/* This Source Code Form is subject to the terms of the Mozilla Public 1.6 + * License, v. 2.0. If a copy of the MPL was not distributed with this 1.7 + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 1.8 + 1.9 +#include "primpl.h" 1.10 +#include "pprmwait.h" 1.11 + 1.12 +#define _MW_REHASH_MAX 11 1.13 + 1.14 +static PRLock *mw_lock = NULL; 1.15 +static _PRGlobalState *mw_state = NULL; 1.16 + 1.17 +static PRIntervalTime max_polling_interval; 1.18 + 1.19 +#ifdef WINNT 1.20 + 1.21 +typedef struct TimerEvent { 1.22 + PRIntervalTime absolute; 1.23 + void (*func)(void *); 1.24 + void *arg; 1.25 + LONG ref_count; 1.26 + PRCList links; 1.27 +} TimerEvent; 1.28 + 1.29 +#define TIMER_EVENT_PTR(_qp) \ 1.30 + ((TimerEvent *) ((char *) (_qp) - offsetof(TimerEvent, links))) 1.31 + 1.32 +struct { 1.33 + PRLock *ml; 1.34 + PRCondVar *new_timer; 1.35 + PRCondVar *cancel_timer; 1.36 + PRThread *manager_thread; 1.37 + PRCList timer_queue; 1.38 +} tm_vars; 1.39 + 1.40 +static PRStatus TimerInit(void); 1.41 +static void TimerManager(void *arg); 1.42 +static TimerEvent *CreateTimer(PRIntervalTime timeout, 1.43 + void (*func)(void *), void *arg); 1.44 +static PRBool CancelTimer(TimerEvent *timer); 1.45 + 1.46 +static void TimerManager(void *arg) 1.47 +{ 1.48 + PRIntervalTime now; 1.49 + PRIntervalTime timeout; 1.50 + PRCList *head; 1.51 + TimerEvent *timer; 1.52 + 1.53 + PR_Lock(tm_vars.ml); 1.54 + while (1) 1.55 + { 1.56 + if (PR_CLIST_IS_EMPTY(&tm_vars.timer_queue)) 1.57 + { 1.58 + PR_WaitCondVar(tm_vars.new_timer, PR_INTERVAL_NO_TIMEOUT); 1.59 + } 1.60 + else 1.61 + { 1.62 + now = PR_IntervalNow(); 1.63 + head = PR_LIST_HEAD(&tm_vars.timer_queue); 1.64 + timer = TIMER_EVENT_PTR(head); 1.65 + if ((PRInt32) (now - timer->absolute) >= 0) 1.66 + { 1.67 + PR_REMOVE_LINK(head); 1.68 + /* 1.69 + * make its prev and next point to itself so that 1.70 + * it's obvious that it's not on the timer_queue. 1.71 + */ 1.72 + PR_INIT_CLIST(head); 1.73 + PR_ASSERT(2 == timer->ref_count); 1.74 + PR_Unlock(tm_vars.ml); 1.75 + timer->func(timer->arg); 1.76 + PR_Lock(tm_vars.ml); 1.77 + timer->ref_count -= 1; 1.78 + if (0 == timer->ref_count) 1.79 + { 1.80 + PR_NotifyAllCondVar(tm_vars.cancel_timer); 1.81 + } 1.82 + } 1.83 + else 1.84 + { 1.85 + timeout = (PRIntervalTime)(timer->absolute - now); 1.86 + PR_WaitCondVar(tm_vars.new_timer, timeout); 1.87 + } 1.88 + } 1.89 + } 1.90 + PR_Unlock(tm_vars.ml); 1.91 +} 1.92 + 1.93 +static TimerEvent *CreateTimer( 1.94 + PRIntervalTime timeout, 1.95 + void (*func)(void *), 1.96 + void *arg) 1.97 +{ 1.98 + TimerEvent *timer; 1.99 + PRCList *links, *tail; 1.100 + TimerEvent *elem; 1.101 + 1.102 + timer = PR_NEW(TimerEvent); 1.103 + if (NULL == timer) 1.104 + { 1.105 + PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); 1.106 + return timer; 1.107 + } 1.108 + timer->absolute = PR_IntervalNow() + timeout; 1.109 + timer->func = func; 1.110 + timer->arg = arg; 1.111 + timer->ref_count = 2; 1.112 + PR_Lock(tm_vars.ml); 1.113 + tail = links = PR_LIST_TAIL(&tm_vars.timer_queue); 1.114 + while (links->prev != tail) 1.115 + { 1.116 + elem = TIMER_EVENT_PTR(links); 1.117 + if ((PRInt32)(timer->absolute - elem->absolute) >= 0) 1.118 + { 1.119 + break; 1.120 + } 1.121 + links = links->prev; 1.122 + } 1.123 + PR_INSERT_AFTER(&timer->links, links); 1.124 + PR_NotifyCondVar(tm_vars.new_timer); 1.125 + PR_Unlock(tm_vars.ml); 1.126 + return timer; 1.127 +} 1.128 + 1.129 +static PRBool CancelTimer(TimerEvent *timer) 1.130 +{ 1.131 + PRBool canceled = PR_FALSE; 1.132 + 1.133 + PR_Lock(tm_vars.ml); 1.134 + timer->ref_count -= 1; 1.135 + if (timer->links.prev == &timer->links) 1.136 + { 1.137 + while (timer->ref_count == 1) 1.138 + { 1.139 + PR_WaitCondVar(tm_vars.cancel_timer, PR_INTERVAL_NO_TIMEOUT); 1.140 + } 1.141 + } 1.142 + else 1.143 + { 1.144 + PR_REMOVE_LINK(&timer->links); 1.145 + canceled = PR_TRUE; 1.146 + } 1.147 + PR_Unlock(tm_vars.ml); 1.148 + PR_DELETE(timer); 1.149 + return canceled; 1.150 +} 1.151 + 1.152 +static PRStatus TimerInit(void) 1.153 +{ 1.154 + tm_vars.ml = PR_NewLock(); 1.155 + if (NULL == tm_vars.ml) 1.156 + { 1.157 + goto failed; 1.158 + } 1.159 + tm_vars.new_timer = PR_NewCondVar(tm_vars.ml); 1.160 + if (NULL == tm_vars.new_timer) 1.161 + { 1.162 + goto failed; 1.163 + } 1.164 + tm_vars.cancel_timer = PR_NewCondVar(tm_vars.ml); 1.165 + if (NULL == tm_vars.cancel_timer) 1.166 + { 1.167 + goto failed; 1.168 + } 1.169 + PR_INIT_CLIST(&tm_vars.timer_queue); 1.170 + tm_vars.manager_thread = PR_CreateThread( 1.171 + PR_SYSTEM_THREAD, TimerManager, NULL, PR_PRIORITY_NORMAL, 1.172 + PR_LOCAL_THREAD, PR_UNJOINABLE_THREAD, 0); 1.173 + if (NULL == tm_vars.manager_thread) 1.174 + { 1.175 + goto failed; 1.176 + } 1.177 + return PR_SUCCESS; 1.178 + 1.179 +failed: 1.180 + if (NULL != tm_vars.cancel_timer) 1.181 + { 1.182 + PR_DestroyCondVar(tm_vars.cancel_timer); 1.183 + } 1.184 + if (NULL != tm_vars.new_timer) 1.185 + { 1.186 + PR_DestroyCondVar(tm_vars.new_timer); 1.187 + } 1.188 + if (NULL != tm_vars.ml) 1.189 + { 1.190 + PR_DestroyLock(tm_vars.ml); 1.191 + } 1.192 + return PR_FAILURE; 1.193 +} 1.194 + 1.195 +#endif /* WINNT */ 1.196 + 1.197 +/******************************************************************/ 1.198 +/******************************************************************/ 1.199 +/************************ The private portion *********************/ 1.200 +/******************************************************************/ 1.201 +/******************************************************************/ 1.202 +void _PR_InitMW(void) 1.203 +{ 1.204 +#ifdef WINNT 1.205 + /* 1.206 + * We use NT 4's InterlockedCompareExchange() to operate 1.207 + * on PRMWStatus variables. 1.208 + */ 1.209 + PR_ASSERT(sizeof(LONG) == sizeof(PRMWStatus)); 1.210 + TimerInit(); 1.211 +#endif 1.212 + mw_lock = PR_NewLock(); 1.213 + PR_ASSERT(NULL != mw_lock); 1.214 + mw_state = PR_NEWZAP(_PRGlobalState); 1.215 + PR_ASSERT(NULL != mw_state); 1.216 + PR_INIT_CLIST(&mw_state->group_list); 1.217 + max_polling_interval = PR_MillisecondsToInterval(MAX_POLLING_INTERVAL); 1.218 +} /* _PR_InitMW */ 1.219 + 1.220 +void _PR_CleanupMW(void) 1.221 +{ 1.222 + PR_DestroyLock(mw_lock); 1.223 + mw_lock = NULL; 1.224 + if (mw_state->group) { 1.225 + PR_DestroyWaitGroup(mw_state->group); 1.226 + /* mw_state->group is set to NULL as a side effect. */ 1.227 + } 1.228 + PR_DELETE(mw_state); 1.229 +} /* _PR_CleanupMW */ 1.230 + 1.231 +static PRWaitGroup *MW_Init2(void) 1.232 +{ 1.233 + PRWaitGroup *group = mw_state->group; /* it's the null group */ 1.234 + if (NULL == group) /* there is this special case */ 1.235 + { 1.236 + group = PR_CreateWaitGroup(_PR_DEFAULT_HASH_LENGTH); 1.237 + if (NULL == group) goto failed_alloc; 1.238 + PR_Lock(mw_lock); 1.239 + if (NULL == mw_state->group) 1.240 + { 1.241 + mw_state->group = group; 1.242 + group = NULL; 1.243 + } 1.244 + PR_Unlock(mw_lock); 1.245 + if (group != NULL) (void)PR_DestroyWaitGroup(group); 1.246 + group = mw_state->group; /* somebody beat us to it */ 1.247 + } 1.248 +failed_alloc: 1.249 + return group; /* whatever */ 1.250 +} /* MW_Init2 */ 1.251 + 1.252 +static _PR_HashStory MW_AddHashInternal(PRRecvWait *desc, _PRWaiterHash *hash) 1.253 +{ 1.254 + /* 1.255 + ** The entries are put in the table using the fd (PRFileDesc*) of 1.256 + ** the receive descriptor as the key. This allows us to locate 1.257 + ** the appropriate entry aqain when the poll operation finishes. 1.258 + ** 1.259 + ** The pointer to the file descriptor object is first divided by 1.260 + ** the natural alignment of a pointer in the belief that object 1.261 + ** will have at least that many zeros in the low order bits. 1.262 + ** This may not be a good assuption. 1.263 + ** 1.264 + ** We try to put the entry in by rehashing _MW_REHASH_MAX times. After 1.265 + ** that we declare defeat and force the table to be reconstructed. 1.266 + ** Since some fds might be added more than once, won't that cause 1.267 + ** collisions even in an empty table? 1.268 + */ 1.269 + PRIntn rehash = _MW_REHASH_MAX; 1.270 + PRRecvWait **waiter; 1.271 + PRUintn hidx = _MW_HASH(desc->fd, hash->length); 1.272 + PRUintn hoffset = 0; 1.273 + 1.274 + while (rehash-- > 0) 1.275 + { 1.276 + waiter = &hash->recv_wait; 1.277 + if (NULL == waiter[hidx]) 1.278 + { 1.279 + waiter[hidx] = desc; 1.280 + hash->count += 1; 1.281 +#if 0 1.282 + printf("Adding 0x%x->0x%x ", desc, desc->fd); 1.283 + printf( 1.284 + "table[%u:%u:*%u]: 0x%x->0x%x\n", 1.285 + hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd); 1.286 +#endif 1.287 + return _prmw_success; 1.288 + } 1.289 + if (desc == waiter[hidx]) 1.290 + { 1.291 + PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); /* desc already in table */ 1.292 + return _prmw_error; 1.293 + } 1.294 +#if 0 1.295 + printf("Failing 0x%x->0x%x ", desc, desc->fd); 1.296 + printf( 1.297 + "table[*%u:%u:%u]: 0x%x->0x%x\n", 1.298 + hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd); 1.299 +#endif 1.300 + if (0 == hoffset) 1.301 + { 1.302 + hoffset = _MW_HASH2(desc->fd, hash->length); 1.303 + PR_ASSERT(0 != hoffset); 1.304 + } 1.305 + hidx = (hidx + hoffset) % (hash->length); 1.306 + } 1.307 + return _prmw_rehash; 1.308 +} /* MW_AddHashInternal */ 1.309 + 1.310 +static _PR_HashStory MW_ExpandHashInternal(PRWaitGroup *group) 1.311 +{ 1.312 + PRRecvWait **desc; 1.313 + PRUint32 pidx, length; 1.314 + _PRWaiterHash *newHash, *oldHash = group->waiter; 1.315 + PRBool retry; 1.316 + _PR_HashStory hrv; 1.317 + 1.318 + static const PRInt32 prime_number[] = { 1.319 + _PR_DEFAULT_HASH_LENGTH, 179, 521, 907, 1427, 1.320 + 2711, 3917, 5021, 8219, 11549, 18911, 26711, 33749, 44771}; 1.321 + PRUintn primes = (sizeof(prime_number) / sizeof(PRInt32)); 1.322 + 1.323 + /* look up the next size we'd like to use for the hash table */ 1.324 + for (pidx = 0; pidx < primes; ++pidx) 1.325 + { 1.326 + if (prime_number[pidx] == oldHash->length) 1.327 + { 1.328 + break; 1.329 + } 1.330 + } 1.331 + /* table size must be one of the prime numbers */ 1.332 + PR_ASSERT(pidx < primes); 1.333 + 1.334 + /* if pidx == primes - 1, we can't expand the table any more */ 1.335 + while (pidx < primes - 1) 1.336 + { 1.337 + /* next size */ 1.338 + ++pidx; 1.339 + length = prime_number[pidx]; 1.340 + 1.341 + /* allocate the new hash table and fill it in with the old */ 1.342 + newHash = (_PRWaiterHash*)PR_CALLOC( 1.343 + sizeof(_PRWaiterHash) + (length * sizeof(PRRecvWait*))); 1.344 + if (NULL == newHash) 1.345 + { 1.346 + PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); 1.347 + return _prmw_error; 1.348 + } 1.349 + 1.350 + newHash->length = length; 1.351 + retry = PR_FALSE; 1.352 + for (desc = &oldHash->recv_wait; 1.353 + newHash->count < oldHash->count; ++desc) 1.354 + { 1.355 + PR_ASSERT(desc < &oldHash->recv_wait + oldHash->length); 1.356 + if (NULL != *desc) 1.357 + { 1.358 + hrv = MW_AddHashInternal(*desc, newHash); 1.359 + PR_ASSERT(_prmw_error != hrv); 1.360 + if (_prmw_success != hrv) 1.361 + { 1.362 + PR_DELETE(newHash); 1.363 + retry = PR_TRUE; 1.364 + break; 1.365 + } 1.366 + } 1.367 + } 1.368 + if (retry) continue; 1.369 + 1.370 + PR_DELETE(group->waiter); 1.371 + group->waiter = newHash; 1.372 + group->p_timestamp += 1; 1.373 + return _prmw_success; 1.374 + } 1.375 + 1.376 + PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); 1.377 + return _prmw_error; /* we're hosed */ 1.378 +} /* MW_ExpandHashInternal */ 1.379 + 1.380 +#ifndef WINNT 1.381 +static void _MW_DoneInternal( 1.382 + PRWaitGroup *group, PRRecvWait **waiter, PRMWStatus outcome) 1.383 +{ 1.384 + /* 1.385 + ** Add this receive wait object to the list of finished I/O 1.386 + ** operations for this particular group. If there are other 1.387 + ** threads waiting on the group, notify one. If not, arrange 1.388 + ** for this thread to return. 1.389 + */ 1.390 + 1.391 +#if 0 1.392 + printf("Removing 0x%x->0x%x\n", *waiter, (*waiter)->fd); 1.393 +#endif 1.394 + (*waiter)->outcome = outcome; 1.395 + PR_APPEND_LINK(&((*waiter)->internal), &group->io_ready); 1.396 + PR_NotifyCondVar(group->io_complete); 1.397 + PR_ASSERT(0 != group->waiter->count); 1.398 + group->waiter->count -= 1; 1.399 + *waiter = NULL; 1.400 +} /* _MW_DoneInternal */ 1.401 +#endif /* WINNT */ 1.402 + 1.403 +static PRRecvWait **_MW_LookupInternal(PRWaitGroup *group, PRFileDesc *fd) 1.404 +{ 1.405 + /* 1.406 + ** Find the receive wait object corresponding to the file descriptor. 1.407 + ** Only search the wait group specified. 1.408 + */ 1.409 + PRRecvWait **desc; 1.410 + PRIntn rehash = _MW_REHASH_MAX; 1.411 + _PRWaiterHash *hash = group->waiter; 1.412 + PRUintn hidx = _MW_HASH(fd, hash->length); 1.413 + PRUintn hoffset = 0; 1.414 + 1.415 + while (rehash-- > 0) 1.416 + { 1.417 + desc = (&hash->recv_wait) + hidx; 1.418 + if ((*desc != NULL) && ((*desc)->fd == fd)) return desc; 1.419 + if (0 == hoffset) 1.420 + { 1.421 + hoffset = _MW_HASH2(fd, hash->length); 1.422 + PR_ASSERT(0 != hoffset); 1.423 + } 1.424 + hidx = (hidx + hoffset) % (hash->length); 1.425 + } 1.426 + return NULL; 1.427 +} /* _MW_LookupInternal */ 1.428 + 1.429 +#ifndef WINNT 1.430 +static PRStatus _MW_PollInternal(PRWaitGroup *group) 1.431 +{ 1.432 + PRRecvWait **waiter; 1.433 + PRStatus rv = PR_FAILURE; 1.434 + PRInt32 count, count_ready; 1.435 + PRIntervalTime polling_interval; 1.436 + 1.437 + group->poller = PR_GetCurrentThread(); 1.438 + 1.439 + while (PR_TRUE) 1.440 + { 1.441 + PRIntervalTime now, since_last_poll; 1.442 + PRPollDesc *poll_list; 1.443 + 1.444 + while (0 == group->waiter->count) 1.445 + { 1.446 + PRStatus st; 1.447 + st = PR_WaitCondVar(group->new_business, PR_INTERVAL_NO_TIMEOUT); 1.448 + if (_prmw_running != group->state) 1.449 + { 1.450 + PR_SetError(PR_INVALID_STATE_ERROR, 0); 1.451 + goto aborted; 1.452 + } 1.453 + if (_MW_ABORTED(st)) goto aborted; 1.454 + } 1.455 + 1.456 + /* 1.457 + ** There's something to do. See if our existing polling list 1.458 + ** is large enough for what we have to do? 1.459 + */ 1.460 + 1.461 + while (group->polling_count < group->waiter->count) 1.462 + { 1.463 + PRUint32 old_count = group->waiter->count; 1.464 + PRUint32 new_count = PR_ROUNDUP(old_count, _PR_POLL_COUNT_FUDGE); 1.465 + PRSize new_size = sizeof(PRPollDesc) * new_count; 1.466 + PRPollDesc *old_polling_list = group->polling_list; 1.467 + 1.468 + PR_Unlock(group->ml); 1.469 + poll_list = (PRPollDesc*)PR_CALLOC(new_size); 1.470 + if (NULL == poll_list) 1.471 + { 1.472 + PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); 1.473 + PR_Lock(group->ml); 1.474 + goto failed_alloc; 1.475 + } 1.476 + if (NULL != old_polling_list) 1.477 + PR_DELETE(old_polling_list); 1.478 + PR_Lock(group->ml); 1.479 + if (_prmw_running != group->state) 1.480 + { 1.481 + PR_SetError(PR_INVALID_STATE_ERROR, 0); 1.482 + goto aborted; 1.483 + } 1.484 + group->polling_list = poll_list; 1.485 + group->polling_count = new_count; 1.486 + } 1.487 + 1.488 + now = PR_IntervalNow(); 1.489 + polling_interval = max_polling_interval; 1.490 + since_last_poll = now - group->last_poll; 1.491 + 1.492 + waiter = &group->waiter->recv_wait; 1.493 + poll_list = group->polling_list; 1.494 + for (count = 0; count < group->waiter->count; ++waiter) 1.495 + { 1.496 + PR_ASSERT(waiter < &group->waiter->recv_wait 1.497 + + group->waiter->length); 1.498 + if (NULL != *waiter) /* a live one! */ 1.499 + { 1.500 + if ((PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout) 1.501 + && (since_last_poll >= (*waiter)->timeout)) 1.502 + _MW_DoneInternal(group, waiter, PR_MW_TIMEOUT); 1.503 + else 1.504 + { 1.505 + if (PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout) 1.506 + { 1.507 + (*waiter)->timeout -= since_last_poll; 1.508 + if ((*waiter)->timeout < polling_interval) 1.509 + polling_interval = (*waiter)->timeout; 1.510 + } 1.511 + PR_ASSERT(poll_list < group->polling_list 1.512 + + group->polling_count); 1.513 + poll_list->fd = (*waiter)->fd; 1.514 + poll_list->in_flags = PR_POLL_READ; 1.515 + poll_list->out_flags = 0; 1.516 +#if 0 1.517 + printf( 1.518 + "Polling 0x%x[%d]: [fd: 0x%x, tmo: %u]\n", 1.519 + poll_list, count, poll_list->fd, (*waiter)->timeout); 1.520 +#endif 1.521 + poll_list += 1; 1.522 + count += 1; 1.523 + } 1.524 + } 1.525 + } 1.526 + 1.527 + PR_ASSERT(count == group->waiter->count); 1.528 + 1.529 + /* 1.530 + ** If there are no more threads waiting for completion, 1.531 + ** we need to return. 1.532 + */ 1.533 + if ((!PR_CLIST_IS_EMPTY(&group->io_ready)) 1.534 + && (1 == group->waiting_threads)) break; 1.535 + 1.536 + if (0 == count) continue; /* wait for new business */ 1.537 + 1.538 + group->last_poll = now; 1.539 + 1.540 + PR_Unlock(group->ml); 1.541 + 1.542 + count_ready = PR_Poll(group->polling_list, count, polling_interval); 1.543 + 1.544 + PR_Lock(group->ml); 1.545 + 1.546 + if (_prmw_running != group->state) 1.547 + { 1.548 + PR_SetError(PR_INVALID_STATE_ERROR, 0); 1.549 + goto aborted; 1.550 + } 1.551 + if (-1 == count_ready) 1.552 + { 1.553 + goto failed_poll; /* that's a shame */ 1.554 + } 1.555 + else if (0 < count_ready) 1.556 + { 1.557 + for (poll_list = group->polling_list; count > 0; 1.558 + poll_list++, count--) 1.559 + { 1.560 + PR_ASSERT( 1.561 + poll_list < group->polling_list + group->polling_count); 1.562 + if (poll_list->out_flags != 0) 1.563 + { 1.564 + waiter = _MW_LookupInternal(group, poll_list->fd); 1.565 + /* 1.566 + ** If 'waiter' is NULL, that means the wait receive 1.567 + ** descriptor has been canceled. 1.568 + */ 1.569 + if (NULL != waiter) 1.570 + _MW_DoneInternal(group, waiter, PR_MW_SUCCESS); 1.571 + } 1.572 + } 1.573 + } 1.574 + /* 1.575 + ** If there are no more threads waiting for completion, 1.576 + ** we need to return. 1.577 + ** This thread was "borrowed" to do the polling, but it really 1.578 + ** belongs to the client. 1.579 + */ 1.580 + if ((!PR_CLIST_IS_EMPTY(&group->io_ready)) 1.581 + && (1 == group->waiting_threads)) break; 1.582 + } 1.583 + 1.584 + rv = PR_SUCCESS; 1.585 + 1.586 +aborted: 1.587 +failed_poll: 1.588 +failed_alloc: 1.589 + group->poller = NULL; /* we were that, not we ain't */ 1.590 + if ((_prmw_running == group->state) && (group->waiting_threads > 1)) 1.591 + { 1.592 + /* Wake up one thread to become the new poller. */ 1.593 + PR_NotifyCondVar(group->io_complete); 1.594 + } 1.595 + return rv; /* we return with the lock held */ 1.596 +} /* _MW_PollInternal */ 1.597 +#endif /* !WINNT */ 1.598 + 1.599 +static PRMWGroupState MW_TestForShutdownInternal(PRWaitGroup *group) 1.600 +{ 1.601 + PRMWGroupState rv = group->state; 1.602 + /* 1.603 + ** Looking at the group's fields is safe because 1.604 + ** once the group's state is no longer running, it 1.605 + ** cannot revert and there is a safe check on entry 1.606 + ** to make sure no more threads are made to wait. 1.607 + */ 1.608 + if ((_prmw_stopping == rv) 1.609 + && (0 == group->waiting_threads)) 1.610 + { 1.611 + rv = group->state = _prmw_stopped; 1.612 + PR_NotifyCondVar(group->mw_manage); 1.613 + } 1.614 + return rv; 1.615 +} /* MW_TestForShutdownInternal */ 1.616 + 1.617 +#ifndef WINNT 1.618 +static void _MW_InitialRecv(PRCList *io_ready) 1.619 +{ 1.620 + PRRecvWait *desc = (PRRecvWait*)io_ready; 1.621 + if ((NULL == desc->buffer.start) 1.622 + || (0 == desc->buffer.length)) 1.623 + desc->bytesRecv = 0; 1.624 + else 1.625 + { 1.626 + desc->bytesRecv = (desc->fd->methods->recv)( 1.627 + desc->fd, desc->buffer.start, 1.628 + desc->buffer.length, 0, desc->timeout); 1.629 + if (desc->bytesRecv < 0) /* SetError should already be there */ 1.630 + desc->outcome = PR_MW_FAILURE; 1.631 + } 1.632 +} /* _MW_InitialRecv */ 1.633 +#endif 1.634 + 1.635 +#ifdef WINNT 1.636 +static void NT_TimeProc(void *arg) 1.637 +{ 1.638 + _MDOverlapped *overlapped = (_MDOverlapped *)arg; 1.639 + PRRecvWait *desc = overlapped->data.mw.desc; 1.640 + PRFileDesc *bottom; 1.641 + 1.642 + if (InterlockedCompareExchange((LONG *)&desc->outcome, 1.643 + (LONG)PR_MW_TIMEOUT, (LONG)PR_MW_PENDING) != (LONG)PR_MW_PENDING) 1.644 + { 1.645 + /* This wait recv descriptor has already completed. */ 1.646 + return; 1.647 + } 1.648 + 1.649 + /* close the osfd to abort the outstanding async io request */ 1.650 + /* $$$$ 1.651 + ** Little late to be checking if NSPR's on the bottom of stack, 1.652 + ** but if we don't check, we can't assert that the private data 1.653 + ** is what we think it is. 1.654 + ** $$$$ 1.655 + */ 1.656 + bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER); 1.657 + PR_ASSERT(NULL != bottom); 1.658 + if (NULL != bottom) /* now what!?!?! */ 1.659 + { 1.660 + bottom->secret->state = _PR_FILEDESC_CLOSED; 1.661 + if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR) 1.662 + { 1.663 + fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError()); 1.664 + PR_ASSERT(!"What shall I do?"); 1.665 + } 1.666 + } 1.667 + return; 1.668 +} /* NT_TimeProc */ 1.669 + 1.670 +static PRStatus NT_HashRemove(PRWaitGroup *group, PRFileDesc *fd) 1.671 +{ 1.672 + PRRecvWait **waiter; 1.673 + 1.674 + _PR_MD_LOCK(&group->mdlock); 1.675 + waiter = _MW_LookupInternal(group, fd); 1.676 + if (NULL != waiter) 1.677 + { 1.678 + group->waiter->count -= 1; 1.679 + *waiter = NULL; 1.680 + } 1.681 + _PR_MD_UNLOCK(&group->mdlock); 1.682 + return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE; 1.683 +} 1.684 + 1.685 +PRStatus NT_HashRemoveInternal(PRWaitGroup *group, PRFileDesc *fd) 1.686 +{ 1.687 + PRRecvWait **waiter; 1.688 + 1.689 + waiter = _MW_LookupInternal(group, fd); 1.690 + if (NULL != waiter) 1.691 + { 1.692 + group->waiter->count -= 1; 1.693 + *waiter = NULL; 1.694 + } 1.695 + return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE; 1.696 +} 1.697 +#endif /* WINNT */ 1.698 + 1.699 +/******************************************************************/ 1.700 +/******************************************************************/ 1.701 +/********************** The public API portion ********************/ 1.702 +/******************************************************************/ 1.703 +/******************************************************************/ 1.704 +PR_IMPLEMENT(PRStatus) PR_AddWaitFileDesc( 1.705 + PRWaitGroup *group, PRRecvWait *desc) 1.706 +{ 1.707 + _PR_HashStory hrv; 1.708 + PRStatus rv = PR_FAILURE; 1.709 +#ifdef WINNT 1.710 + _MDOverlapped *overlapped; 1.711 + HANDLE hFile; 1.712 + BOOL bResult; 1.713 + DWORD dwError; 1.714 + PRFileDesc *bottom; 1.715 +#endif 1.716 + 1.717 + if (!_pr_initialized) _PR_ImplicitInitialization(); 1.718 + if ((NULL == group) && (NULL == (group = MW_Init2()))) 1.719 + { 1.720 + return rv; 1.721 + } 1.722 + 1.723 + PR_ASSERT(NULL != desc->fd); 1.724 + 1.725 + desc->outcome = PR_MW_PENDING; /* nice, well known value */ 1.726 + desc->bytesRecv = 0; /* likewise, though this value is ambiguious */ 1.727 + 1.728 + PR_Lock(group->ml); 1.729 + 1.730 + if (_prmw_running != group->state) 1.731 + { 1.732 + /* Not allowed to add after cancelling the group */ 1.733 + desc->outcome = PR_MW_INTERRUPT; 1.734 + PR_SetError(PR_INVALID_STATE_ERROR, 0); 1.735 + PR_Unlock(group->ml); 1.736 + return rv; 1.737 + } 1.738 + 1.739 +#ifdef WINNT 1.740 + _PR_MD_LOCK(&group->mdlock); 1.741 +#endif 1.742 + 1.743 + /* 1.744 + ** If the waiter count is zero at this point, there's no telling 1.745 + ** how long we've been idle. Therefore, initialize the beginning 1.746 + ** of the timing interval. As long as the list doesn't go empty, 1.747 + ** it will maintain itself. 1.748 + */ 1.749 + if (0 == group->waiter->count) 1.750 + group->last_poll = PR_IntervalNow(); 1.751 + 1.752 + do 1.753 + { 1.754 + hrv = MW_AddHashInternal(desc, group->waiter); 1.755 + if (_prmw_rehash != hrv) break; 1.756 + hrv = MW_ExpandHashInternal(group); /* gruesome */ 1.757 + if (_prmw_success != hrv) break; 1.758 + } while (PR_TRUE); 1.759 + 1.760 +#ifdef WINNT 1.761 + _PR_MD_UNLOCK(&group->mdlock); 1.762 +#endif 1.763 + 1.764 + PR_NotifyCondVar(group->new_business); /* tell the world */ 1.765 + rv = (_prmw_success == hrv) ? PR_SUCCESS : PR_FAILURE; 1.766 + PR_Unlock(group->ml); 1.767 + 1.768 +#ifdef WINNT 1.769 + overlapped = PR_NEWZAP(_MDOverlapped); 1.770 + if (NULL == overlapped) 1.771 + { 1.772 + PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); 1.773 + NT_HashRemove(group, desc->fd); 1.774 + return rv; 1.775 + } 1.776 + overlapped->ioModel = _MD_MultiWaitIO; 1.777 + overlapped->data.mw.desc = desc; 1.778 + overlapped->data.mw.group = group; 1.779 + if (desc->timeout != PR_INTERVAL_NO_TIMEOUT) 1.780 + { 1.781 + overlapped->data.mw.timer = CreateTimer( 1.782 + desc->timeout, 1.783 + NT_TimeProc, 1.784 + overlapped); 1.785 + if (0 == overlapped->data.mw.timer) 1.786 + { 1.787 + NT_HashRemove(group, desc->fd); 1.788 + PR_DELETE(overlapped); 1.789 + /* 1.790 + * XXX It appears that a maximum of 16 timer events can 1.791 + * be outstanding. GetLastError() returns 0 when I try it. 1.792 + */ 1.793 + PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, GetLastError()); 1.794 + return PR_FAILURE; 1.795 + } 1.796 + } 1.797 + 1.798 + /* Reach to the bottom layer to get the OS fd */ 1.799 + bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER); 1.800 + PR_ASSERT(NULL != bottom); 1.801 + if (NULL == bottom) 1.802 + { 1.803 + PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); 1.804 + return PR_FAILURE; 1.805 + } 1.806 + hFile = (HANDLE)bottom->secret->md.osfd; 1.807 + if (!bottom->secret->md.io_model_committed) 1.808 + { 1.809 + PRInt32 st; 1.810 + st = _md_Associate(hFile); 1.811 + PR_ASSERT(0 != st); 1.812 + bottom->secret->md.io_model_committed = PR_TRUE; 1.813 + } 1.814 + bResult = ReadFile(hFile, 1.815 + desc->buffer.start, 1.816 + (DWORD)desc->buffer.length, 1.817 + NULL, 1.818 + &overlapped->overlapped); 1.819 + if (FALSE == bResult && (dwError = GetLastError()) != ERROR_IO_PENDING) 1.820 + { 1.821 + if (desc->timeout != PR_INTERVAL_NO_TIMEOUT) 1.822 + { 1.823 + if (InterlockedCompareExchange((LONG *)&desc->outcome, 1.824 + (LONG)PR_MW_FAILURE, (LONG)PR_MW_PENDING) 1.825 + == (LONG)PR_MW_PENDING) 1.826 + { 1.827 + CancelTimer(overlapped->data.mw.timer); 1.828 + } 1.829 + NT_HashRemove(group, desc->fd); 1.830 + PR_DELETE(overlapped); 1.831 + } 1.832 + _PR_MD_MAP_READ_ERROR(dwError); 1.833 + rv = PR_FAILURE; 1.834 + } 1.835 +#endif 1.836 + 1.837 + return rv; 1.838 +} /* PR_AddWaitFileDesc */ 1.839 + 1.840 +PR_IMPLEMENT(PRRecvWait*) PR_WaitRecvReady(PRWaitGroup *group) 1.841 +{ 1.842 + PRCList *io_ready = NULL; 1.843 +#ifdef WINNT 1.844 + PRThread *me = _PR_MD_CURRENT_THREAD(); 1.845 + _MDOverlapped *overlapped; 1.846 +#endif 1.847 + 1.848 + if (!_pr_initialized) _PR_ImplicitInitialization(); 1.849 + if ((NULL == group) && (NULL == (group = MW_Init2()))) goto failed_init; 1.850 + 1.851 + PR_Lock(group->ml); 1.852 + 1.853 + if (_prmw_running != group->state) 1.854 + { 1.855 + PR_SetError(PR_INVALID_STATE_ERROR, 0); 1.856 + goto invalid_state; 1.857 + } 1.858 + 1.859 + group->waiting_threads += 1; /* the polling thread is counted */ 1.860 + 1.861 +#ifdef WINNT 1.862 + _PR_MD_LOCK(&group->mdlock); 1.863 + while (PR_CLIST_IS_EMPTY(&group->io_ready)) 1.864 + { 1.865 + _PR_THREAD_LOCK(me); 1.866 + me->state = _PR_IO_WAIT; 1.867 + PR_APPEND_LINK(&me->waitQLinks, &group->wait_list); 1.868 + if (!_PR_IS_NATIVE_THREAD(me)) 1.869 + { 1.870 + _PR_SLEEPQ_LOCK(me->cpu); 1.871 + _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT); 1.872 + _PR_SLEEPQ_UNLOCK(me->cpu); 1.873 + } 1.874 + _PR_THREAD_UNLOCK(me); 1.875 + _PR_MD_UNLOCK(&group->mdlock); 1.876 + PR_Unlock(group->ml); 1.877 + _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT); 1.878 + me->state = _PR_RUNNING; 1.879 + PR_Lock(group->ml); 1.880 + _PR_MD_LOCK(&group->mdlock); 1.881 + if (_PR_PENDING_INTERRUPT(me)) { 1.882 + PR_REMOVE_LINK(&me->waitQLinks); 1.883 + _PR_MD_UNLOCK(&group->mdlock); 1.884 + me->flags &= ~_PR_INTERRUPT; 1.885 + me->io_suspended = PR_FALSE; 1.886 + PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0); 1.887 + goto aborted; 1.888 + } 1.889 + } 1.890 + io_ready = PR_LIST_HEAD(&group->io_ready); 1.891 + PR_ASSERT(io_ready != NULL); 1.892 + PR_REMOVE_LINK(io_ready); 1.893 + _PR_MD_UNLOCK(&group->mdlock); 1.894 + overlapped = (_MDOverlapped *) 1.895 + ((char *)io_ready - offsetof(_MDOverlapped, data)); 1.896 + io_ready = &overlapped->data.mw.desc->internal; 1.897 +#else 1.898 + do 1.899 + { 1.900 + /* 1.901 + ** If the I/O ready list isn't empty, have this thread 1.902 + ** return with the first receive wait object that's available. 1.903 + */ 1.904 + if (PR_CLIST_IS_EMPTY(&group->io_ready)) 1.905 + { 1.906 + /* 1.907 + ** Is there a polling thread yet? If not, grab this thread 1.908 + ** and use it. 1.909 + */ 1.910 + if (NULL == group->poller) 1.911 + { 1.912 + /* 1.913 + ** This thread will stay do polling until it becomes the only one 1.914 + ** left to service a completion. Then it will return and there will 1.915 + ** be none left to actually poll or to run completions. 1.916 + ** 1.917 + ** The polling function should only return w/ failure or 1.918 + ** with some I/O ready. 1.919 + */ 1.920 + if (PR_FAILURE == _MW_PollInternal(group)) goto failed_poll; 1.921 + } 1.922 + else 1.923 + { 1.924 + /* 1.925 + ** There are four reasons a thread can be awakened from 1.926 + ** a wait on the io_complete condition variable. 1.927 + ** 1. Some I/O has completed, i.e., the io_ready list 1.928 + ** is nonempty. 1.929 + ** 2. The wait group is canceled. 1.930 + ** 3. The thread is interrupted. 1.931 + ** 4. The current polling thread has to leave and needs 1.932 + ** a replacement. 1.933 + ** The logic to find a new polling thread is made more 1.934 + ** complicated by all the other possible events. 1.935 + ** I tried my best to write the logic clearly, but 1.936 + ** it is still full of if's with continue and goto. 1.937 + */ 1.938 + PRStatus st; 1.939 + do 1.940 + { 1.941 + st = PR_WaitCondVar(group->io_complete, PR_INTERVAL_NO_TIMEOUT); 1.942 + if (_prmw_running != group->state) 1.943 + { 1.944 + PR_SetError(PR_INVALID_STATE_ERROR, 0); 1.945 + goto aborted; 1.946 + } 1.947 + if (_MW_ABORTED(st) || (NULL == group->poller)) break; 1.948 + } while (PR_CLIST_IS_EMPTY(&group->io_ready)); 1.949 + 1.950 + /* 1.951 + ** The thread is interrupted and has to leave. It might 1.952 + ** have also been awakened to process ready i/o or be the 1.953 + ** new poller. To be safe, if either condition is true, 1.954 + ** we awaken another thread to take its place. 1.955 + */ 1.956 + if (_MW_ABORTED(st)) 1.957 + { 1.958 + if ((NULL == group->poller 1.959 + || !PR_CLIST_IS_EMPTY(&group->io_ready)) 1.960 + && group->waiting_threads > 1) 1.961 + PR_NotifyCondVar(group->io_complete); 1.962 + goto aborted; 1.963 + } 1.964 + 1.965 + /* 1.966 + ** A new poller is needed, but can I be the new poller? 1.967 + ** If there is no i/o ready, sure. But if there is any 1.968 + ** i/o ready, it has a higher priority. I want to 1.969 + ** process the ready i/o first and wake up another 1.970 + ** thread to be the new poller. 1.971 + */ 1.972 + if (NULL == group->poller) 1.973 + { 1.974 + if (PR_CLIST_IS_EMPTY(&group->io_ready)) 1.975 + continue; 1.976 + if (group->waiting_threads > 1) 1.977 + PR_NotifyCondVar(group->io_complete); 1.978 + } 1.979 + } 1.980 + PR_ASSERT(!PR_CLIST_IS_EMPTY(&group->io_ready)); 1.981 + } 1.982 + io_ready = PR_LIST_HEAD(&group->io_ready); 1.983 + PR_NotifyCondVar(group->io_taken); 1.984 + PR_ASSERT(io_ready != NULL); 1.985 + PR_REMOVE_LINK(io_ready); 1.986 + } while (NULL == io_ready); 1.987 + 1.988 +failed_poll: 1.989 + 1.990 +#endif 1.991 + 1.992 +aborted: 1.993 + 1.994 + group->waiting_threads -= 1; 1.995 +invalid_state: 1.996 + (void)MW_TestForShutdownInternal(group); 1.997 + PR_Unlock(group->ml); 1.998 + 1.999 +failed_init: 1.1000 + if (NULL != io_ready) 1.1001 + { 1.1002 + /* If the operation failed, record the reason why */ 1.1003 + switch (((PRRecvWait*)io_ready)->outcome) 1.1004 + { 1.1005 + case PR_MW_PENDING: 1.1006 + PR_ASSERT(0); 1.1007 + break; 1.1008 + case PR_MW_SUCCESS: 1.1009 +#ifndef WINNT 1.1010 + _MW_InitialRecv(io_ready); 1.1011 +#endif 1.1012 + break; 1.1013 +#ifdef WINNT 1.1014 + case PR_MW_FAILURE: 1.1015 + _PR_MD_MAP_READ_ERROR(overlapped->data.mw.error); 1.1016 + break; 1.1017 +#endif 1.1018 + case PR_MW_TIMEOUT: 1.1019 + PR_SetError(PR_IO_TIMEOUT_ERROR, 0); 1.1020 + break; 1.1021 + case PR_MW_INTERRUPT: 1.1022 + PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0); 1.1023 + break; 1.1024 + default: break; 1.1025 + } 1.1026 +#ifdef WINNT 1.1027 + if (NULL != overlapped->data.mw.timer) 1.1028 + { 1.1029 + PR_ASSERT(PR_INTERVAL_NO_TIMEOUT 1.1030 + != overlapped->data.mw.desc->timeout); 1.1031 + CancelTimer(overlapped->data.mw.timer); 1.1032 + } 1.1033 + else 1.1034 + { 1.1035 + PR_ASSERT(PR_INTERVAL_NO_TIMEOUT 1.1036 + == overlapped->data.mw.desc->timeout); 1.1037 + } 1.1038 + PR_DELETE(overlapped); 1.1039 +#endif 1.1040 + } 1.1041 + return (PRRecvWait*)io_ready; 1.1042 +} /* PR_WaitRecvReady */ 1.1043 + 1.1044 +PR_IMPLEMENT(PRStatus) PR_CancelWaitFileDesc(PRWaitGroup *group, PRRecvWait *desc) 1.1045 +{ 1.1046 +#if !defined(WINNT) 1.1047 + PRRecvWait **recv_wait; 1.1048 +#endif 1.1049 + PRStatus rv = PR_SUCCESS; 1.1050 + if (NULL == group) group = mw_state->group; 1.1051 + PR_ASSERT(NULL != group); 1.1052 + if (NULL == group) 1.1053 + { 1.1054 + PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); 1.1055 + return PR_FAILURE; 1.1056 + } 1.1057 + 1.1058 + PR_Lock(group->ml); 1.1059 + 1.1060 + if (_prmw_running != group->state) 1.1061 + { 1.1062 + PR_SetError(PR_INVALID_STATE_ERROR, 0); 1.1063 + rv = PR_FAILURE; 1.1064 + goto unlock; 1.1065 + } 1.1066 + 1.1067 +#ifdef WINNT 1.1068 + if (InterlockedCompareExchange((LONG *)&desc->outcome, 1.1069 + (LONG)PR_MW_INTERRUPT, (LONG)PR_MW_PENDING) == (LONG)PR_MW_PENDING) 1.1070 + { 1.1071 + PRFileDesc *bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER); 1.1072 + PR_ASSERT(NULL != bottom); 1.1073 + if (NULL == bottom) 1.1074 + { 1.1075 + PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); 1.1076 + goto unlock; 1.1077 + } 1.1078 + bottom->secret->state = _PR_FILEDESC_CLOSED; 1.1079 +#if 0 1.1080 + fprintf(stderr, "cancel wait recv: closing socket\n"); 1.1081 +#endif 1.1082 + if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR) 1.1083 + { 1.1084 + fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError()); 1.1085 + exit(1); 1.1086 + } 1.1087 + } 1.1088 +#else 1.1089 + if (NULL != (recv_wait = _MW_LookupInternal(group, desc->fd))) 1.1090 + { 1.1091 + /* it was in the wait table */ 1.1092 + _MW_DoneInternal(group, recv_wait, PR_MW_INTERRUPT); 1.1093 + goto unlock; 1.1094 + } 1.1095 + if (!PR_CLIST_IS_EMPTY(&group->io_ready)) 1.1096 + { 1.1097 + /* is it already complete? */ 1.1098 + PRCList *head = PR_LIST_HEAD(&group->io_ready); 1.1099 + do 1.1100 + { 1.1101 + PRRecvWait *done = (PRRecvWait*)head; 1.1102 + if (done == desc) goto unlock; 1.1103 + head = PR_NEXT_LINK(head); 1.1104 + } while (head != &group->io_ready); 1.1105 + } 1.1106 + PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); 1.1107 + rv = PR_FAILURE; 1.1108 + 1.1109 +#endif 1.1110 +unlock: 1.1111 + PR_Unlock(group->ml); 1.1112 + return rv; 1.1113 +} /* PR_CancelWaitFileDesc */ 1.1114 + 1.1115 +PR_IMPLEMENT(PRRecvWait*) PR_CancelWaitGroup(PRWaitGroup *group) 1.1116 +{ 1.1117 + PRRecvWait **desc; 1.1118 + PRRecvWait *recv_wait = NULL; 1.1119 +#ifdef WINNT 1.1120 + _MDOverlapped *overlapped; 1.1121 + PRRecvWait **end; 1.1122 + PRThread *me = _PR_MD_CURRENT_THREAD(); 1.1123 +#endif 1.1124 + 1.1125 + if (NULL == group) group = mw_state->group; 1.1126 + PR_ASSERT(NULL != group); 1.1127 + if (NULL == group) 1.1128 + { 1.1129 + PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); 1.1130 + return NULL; 1.1131 + } 1.1132 + 1.1133 + PR_Lock(group->ml); 1.1134 + if (_prmw_stopped != group->state) 1.1135 + { 1.1136 + if (_prmw_running == group->state) 1.1137 + group->state = _prmw_stopping; /* so nothing new comes in */ 1.1138 + if (0 == group->waiting_threads) /* is there anybody else? */ 1.1139 + group->state = _prmw_stopped; /* we can stop right now */ 1.1140 + else 1.1141 + { 1.1142 + PR_NotifyAllCondVar(group->new_business); 1.1143 + PR_NotifyAllCondVar(group->io_complete); 1.1144 + } 1.1145 + while (_prmw_stopped != group->state) 1.1146 + (void)PR_WaitCondVar(group->mw_manage, PR_INTERVAL_NO_TIMEOUT); 1.1147 + } 1.1148 + 1.1149 +#ifdef WINNT 1.1150 + _PR_MD_LOCK(&group->mdlock); 1.1151 +#endif 1.1152 + /* make all the existing descriptors look done/interrupted */ 1.1153 +#ifdef WINNT 1.1154 + end = &group->waiter->recv_wait + group->waiter->length; 1.1155 + for (desc = &group->waiter->recv_wait; desc < end; ++desc) 1.1156 + { 1.1157 + if (NULL != *desc) 1.1158 + { 1.1159 + if (InterlockedCompareExchange((LONG *)&(*desc)->outcome, 1.1160 + (LONG)PR_MW_INTERRUPT, (LONG)PR_MW_PENDING) 1.1161 + == (LONG)PR_MW_PENDING) 1.1162 + { 1.1163 + PRFileDesc *bottom = PR_GetIdentitiesLayer( 1.1164 + (*desc)->fd, PR_NSPR_IO_LAYER); 1.1165 + PR_ASSERT(NULL != bottom); 1.1166 + if (NULL == bottom) 1.1167 + { 1.1168 + PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); 1.1169 + goto invalid_arg; 1.1170 + } 1.1171 + bottom->secret->state = _PR_FILEDESC_CLOSED; 1.1172 +#if 0 1.1173 + fprintf(stderr, "cancel wait group: closing socket\n"); 1.1174 +#endif 1.1175 + if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR) 1.1176 + { 1.1177 + fprintf(stderr, "closesocket failed: %d\n", 1.1178 + WSAGetLastError()); 1.1179 + exit(1); 1.1180 + } 1.1181 + } 1.1182 + } 1.1183 + } 1.1184 + while (group->waiter->count > 0) 1.1185 + { 1.1186 + _PR_THREAD_LOCK(me); 1.1187 + me->state = _PR_IO_WAIT; 1.1188 + PR_APPEND_LINK(&me->waitQLinks, &group->wait_list); 1.1189 + if (!_PR_IS_NATIVE_THREAD(me)) 1.1190 + { 1.1191 + _PR_SLEEPQ_LOCK(me->cpu); 1.1192 + _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT); 1.1193 + _PR_SLEEPQ_UNLOCK(me->cpu); 1.1194 + } 1.1195 + _PR_THREAD_UNLOCK(me); 1.1196 + _PR_MD_UNLOCK(&group->mdlock); 1.1197 + PR_Unlock(group->ml); 1.1198 + _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT); 1.1199 + me->state = _PR_RUNNING; 1.1200 + PR_Lock(group->ml); 1.1201 + _PR_MD_LOCK(&group->mdlock); 1.1202 + } 1.1203 +#else 1.1204 + for (desc = &group->waiter->recv_wait; group->waiter->count > 0; ++desc) 1.1205 + { 1.1206 + PR_ASSERT(desc < &group->waiter->recv_wait + group->waiter->length); 1.1207 + if (NULL != *desc) 1.1208 + _MW_DoneInternal(group, desc, PR_MW_INTERRUPT); 1.1209 + } 1.1210 +#endif 1.1211 + 1.1212 + /* take first element of finished list and return it or NULL */ 1.1213 + if (PR_CLIST_IS_EMPTY(&group->io_ready)) 1.1214 + PR_SetError(PR_GROUP_EMPTY_ERROR, 0); 1.1215 + else 1.1216 + { 1.1217 + PRCList *head = PR_LIST_HEAD(&group->io_ready); 1.1218 + PR_REMOVE_AND_INIT_LINK(head); 1.1219 +#ifdef WINNT 1.1220 + overlapped = (_MDOverlapped *) 1.1221 + ((char *)head - offsetof(_MDOverlapped, data)); 1.1222 + head = &overlapped->data.mw.desc->internal; 1.1223 + if (NULL != overlapped->data.mw.timer) 1.1224 + { 1.1225 + PR_ASSERT(PR_INTERVAL_NO_TIMEOUT 1.1226 + != overlapped->data.mw.desc->timeout); 1.1227 + CancelTimer(overlapped->data.mw.timer); 1.1228 + } 1.1229 + else 1.1230 + { 1.1231 + PR_ASSERT(PR_INTERVAL_NO_TIMEOUT 1.1232 + == overlapped->data.mw.desc->timeout); 1.1233 + } 1.1234 + PR_DELETE(overlapped); 1.1235 +#endif 1.1236 + recv_wait = (PRRecvWait*)head; 1.1237 + } 1.1238 +#ifdef WINNT 1.1239 +invalid_arg: 1.1240 + _PR_MD_UNLOCK(&group->mdlock); 1.1241 +#endif 1.1242 + PR_Unlock(group->ml); 1.1243 + 1.1244 + return recv_wait; 1.1245 +} /* PR_CancelWaitGroup */ 1.1246 + 1.1247 +PR_IMPLEMENT(PRWaitGroup*) PR_CreateWaitGroup(PRInt32 size /* ignored */) 1.1248 +{ 1.1249 + PRWaitGroup *wg; 1.1250 + 1.1251 + if (NULL == (wg = PR_NEWZAP(PRWaitGroup))) 1.1252 + { 1.1253 + PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); 1.1254 + goto failed; 1.1255 + } 1.1256 + /* the wait group itself */ 1.1257 + wg->ml = PR_NewLock(); 1.1258 + if (NULL == wg->ml) goto failed_lock; 1.1259 + wg->io_taken = PR_NewCondVar(wg->ml); 1.1260 + if (NULL == wg->io_taken) goto failed_cvar0; 1.1261 + wg->io_complete = PR_NewCondVar(wg->ml); 1.1262 + if (NULL == wg->io_complete) goto failed_cvar1; 1.1263 + wg->new_business = PR_NewCondVar(wg->ml); 1.1264 + if (NULL == wg->new_business) goto failed_cvar2; 1.1265 + wg->mw_manage = PR_NewCondVar(wg->ml); 1.1266 + if (NULL == wg->mw_manage) goto failed_cvar3; 1.1267 + 1.1268 + PR_INIT_CLIST(&wg->group_link); 1.1269 + PR_INIT_CLIST(&wg->io_ready); 1.1270 + 1.1271 + /* the waiters sequence */ 1.1272 + wg->waiter = (_PRWaiterHash*)PR_CALLOC( 1.1273 + sizeof(_PRWaiterHash) + 1.1274 + (_PR_DEFAULT_HASH_LENGTH * sizeof(PRRecvWait*))); 1.1275 + if (NULL == wg->waiter) 1.1276 + { 1.1277 + PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); 1.1278 + goto failed_waiter; 1.1279 + } 1.1280 + wg->waiter->count = 0; 1.1281 + wg->waiter->length = _PR_DEFAULT_HASH_LENGTH; 1.1282 + 1.1283 +#ifdef WINNT 1.1284 + _PR_MD_NEW_LOCK(&wg->mdlock); 1.1285 + PR_INIT_CLIST(&wg->wait_list); 1.1286 +#endif /* WINNT */ 1.1287 + 1.1288 + PR_Lock(mw_lock); 1.1289 + PR_APPEND_LINK(&wg->group_link, &mw_state->group_list); 1.1290 + PR_Unlock(mw_lock); 1.1291 + return wg; 1.1292 + 1.1293 +failed_waiter: 1.1294 + PR_DestroyCondVar(wg->mw_manage); 1.1295 +failed_cvar3: 1.1296 + PR_DestroyCondVar(wg->new_business); 1.1297 +failed_cvar2: 1.1298 + PR_DestroyCondVar(wg->io_complete); 1.1299 +failed_cvar1: 1.1300 + PR_DestroyCondVar(wg->io_taken); 1.1301 +failed_cvar0: 1.1302 + PR_DestroyLock(wg->ml); 1.1303 +failed_lock: 1.1304 + PR_DELETE(wg); 1.1305 + wg = NULL; 1.1306 + 1.1307 +failed: 1.1308 + return wg; 1.1309 +} /* MW_CreateWaitGroup */ 1.1310 + 1.1311 +PR_IMPLEMENT(PRStatus) PR_DestroyWaitGroup(PRWaitGroup *group) 1.1312 +{ 1.1313 + PRStatus rv = PR_SUCCESS; 1.1314 + if (NULL == group) group = mw_state->group; 1.1315 + PR_ASSERT(NULL != group); 1.1316 + if (NULL != group) 1.1317 + { 1.1318 + PR_Lock(group->ml); 1.1319 + if ((group->waiting_threads == 0) 1.1320 + && (group->waiter->count == 0) 1.1321 + && PR_CLIST_IS_EMPTY(&group->io_ready)) 1.1322 + { 1.1323 + group->state = _prmw_stopped; 1.1324 + } 1.1325 + else 1.1326 + { 1.1327 + PR_SetError(PR_INVALID_STATE_ERROR, 0); 1.1328 + rv = PR_FAILURE; 1.1329 + } 1.1330 + PR_Unlock(group->ml); 1.1331 + if (PR_FAILURE == rv) return rv; 1.1332 + 1.1333 + PR_Lock(mw_lock); 1.1334 + PR_REMOVE_LINK(&group->group_link); 1.1335 + PR_Unlock(mw_lock); 1.1336 + 1.1337 +#ifdef WINNT 1.1338 + /* 1.1339 + * XXX make sure wait_list is empty and waiter is empty. 1.1340 + * These must be checked while holding mdlock. 1.1341 + */ 1.1342 + _PR_MD_FREE_LOCK(&group->mdlock); 1.1343 +#endif 1.1344 + 1.1345 + PR_DELETE(group->waiter); 1.1346 + PR_DELETE(group->polling_list); 1.1347 + PR_DestroyCondVar(group->mw_manage); 1.1348 + PR_DestroyCondVar(group->new_business); 1.1349 + PR_DestroyCondVar(group->io_complete); 1.1350 + PR_DestroyCondVar(group->io_taken); 1.1351 + PR_DestroyLock(group->ml); 1.1352 + if (group == mw_state->group) mw_state->group = NULL; 1.1353 + PR_DELETE(group); 1.1354 + } 1.1355 + else 1.1356 + { 1.1357 + /* The default wait group is not created yet. */ 1.1358 + PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); 1.1359 + rv = PR_FAILURE; 1.1360 + } 1.1361 + return rv; 1.1362 +} /* PR_DestroyWaitGroup */ 1.1363 + 1.1364 +/********************************************************************** 1.1365 +*********************************************************************** 1.1366 +******************** Wait group enumerations ************************** 1.1367 +*********************************************************************** 1.1368 +**********************************************************************/ 1.1369 + 1.1370 +PR_IMPLEMENT(PRMWaitEnumerator*) PR_CreateMWaitEnumerator(PRWaitGroup *group) 1.1371 +{ 1.1372 + PRMWaitEnumerator *enumerator = PR_NEWZAP(PRMWaitEnumerator); 1.1373 + if (NULL == enumerator) PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); 1.1374 + else 1.1375 + { 1.1376 + enumerator->group = group; 1.1377 + enumerator->seal = _PR_ENUM_SEALED; 1.1378 + } 1.1379 + return enumerator; 1.1380 +} /* PR_CreateMWaitEnumerator */ 1.1381 + 1.1382 +PR_IMPLEMENT(PRStatus) PR_DestroyMWaitEnumerator(PRMWaitEnumerator* enumerator) 1.1383 +{ 1.1384 + PR_ASSERT(NULL != enumerator); 1.1385 + PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal); 1.1386 + if ((NULL == enumerator) || (_PR_ENUM_SEALED != enumerator->seal)) 1.1387 + { 1.1388 + PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); 1.1389 + return PR_FAILURE; 1.1390 + } 1.1391 + enumerator->seal = _PR_ENUM_UNSEALED; 1.1392 + PR_Free(enumerator); 1.1393 + return PR_SUCCESS; 1.1394 +} /* PR_DestroyMWaitEnumerator */ 1.1395 + 1.1396 +PR_IMPLEMENT(PRRecvWait*) PR_EnumerateWaitGroup( 1.1397 + PRMWaitEnumerator *enumerator, const PRRecvWait *previous) 1.1398 +{ 1.1399 + PRRecvWait *result = NULL; 1.1400 + 1.1401 + /* entry point sanity checking */ 1.1402 + PR_ASSERT(NULL != enumerator); 1.1403 + PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal); 1.1404 + if ((NULL == enumerator) 1.1405 + || (_PR_ENUM_SEALED != enumerator->seal)) goto bad_argument; 1.1406 + 1.1407 + /* beginning of enumeration */ 1.1408 + if (NULL == previous) 1.1409 + { 1.1410 + if (NULL == enumerator->group) 1.1411 + { 1.1412 + enumerator->group = mw_state->group; 1.1413 + if (NULL == enumerator->group) 1.1414 + { 1.1415 + PR_SetError(PR_GROUP_EMPTY_ERROR, 0); 1.1416 + return NULL; 1.1417 + } 1.1418 + } 1.1419 + enumerator->waiter = &enumerator->group->waiter->recv_wait; 1.1420 + enumerator->p_timestamp = enumerator->group->p_timestamp; 1.1421 + enumerator->thread = PR_GetCurrentThread(); 1.1422 + enumerator->index = 0; 1.1423 + } 1.1424 + /* continuing an enumeration */ 1.1425 + else 1.1426 + { 1.1427 + PRThread *me = PR_GetCurrentThread(); 1.1428 + PR_ASSERT(me == enumerator->thread); 1.1429 + if (me != enumerator->thread) goto bad_argument; 1.1430 + 1.1431 + /* need to restart the enumeration */ 1.1432 + if (enumerator->p_timestamp != enumerator->group->p_timestamp) 1.1433 + return PR_EnumerateWaitGroup(enumerator, NULL); 1.1434 + } 1.1435 + 1.1436 + /* actually progress the enumeration */ 1.1437 +#if defined(WINNT) 1.1438 + _PR_MD_LOCK(&enumerator->group->mdlock); 1.1439 +#else 1.1440 + PR_Lock(enumerator->group->ml); 1.1441 +#endif 1.1442 + while (enumerator->index++ < enumerator->group->waiter->length) 1.1443 + { 1.1444 + if (NULL != (result = *(enumerator->waiter)++)) break; 1.1445 + } 1.1446 +#if defined(WINNT) 1.1447 + _PR_MD_UNLOCK(&enumerator->group->mdlock); 1.1448 +#else 1.1449 + PR_Unlock(enumerator->group->ml); 1.1450 +#endif 1.1451 + 1.1452 + return result; /* what we live for */ 1.1453 + 1.1454 +bad_argument: 1.1455 + PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); 1.1456 + return NULL; /* probably ambiguous */ 1.1457 +} /* PR_EnumerateWaitGroup */ 1.1458 + 1.1459 +/* prmwait.c */