nsprpub/pr/src/io/prmwait.c

Wed, 31 Dec 2014 06:55:46 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:55:46 +0100
changeset 1
ca08bd8f51b2
permissions
-rw-r--r--

Added tag TORBROWSER_REPLICA for changeset 6474c204b198

michael@0 1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
michael@0 2 /* This Source Code Form is subject to the terms of the Mozilla Public
michael@0 3 * License, v. 2.0. If a copy of the MPL was not distributed with this
michael@0 4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
michael@0 5
michael@0 6 #include "primpl.h"
michael@0 7 #include "pprmwait.h"
michael@0 8
michael@0 9 #define _MW_REHASH_MAX 11
michael@0 10
michael@0 11 static PRLock *mw_lock = NULL;
michael@0 12 static _PRGlobalState *mw_state = NULL;
michael@0 13
michael@0 14 static PRIntervalTime max_polling_interval;
michael@0 15
michael@0 16 #ifdef WINNT
michael@0 17
michael@0 18 typedef struct TimerEvent {
michael@0 19 PRIntervalTime absolute;
michael@0 20 void (*func)(void *);
michael@0 21 void *arg;
michael@0 22 LONG ref_count;
michael@0 23 PRCList links;
michael@0 24 } TimerEvent;
michael@0 25
michael@0 26 #define TIMER_EVENT_PTR(_qp) \
michael@0 27 ((TimerEvent *) ((char *) (_qp) - offsetof(TimerEvent, links)))
michael@0 28
michael@0 29 struct {
michael@0 30 PRLock *ml;
michael@0 31 PRCondVar *new_timer;
michael@0 32 PRCondVar *cancel_timer;
michael@0 33 PRThread *manager_thread;
michael@0 34 PRCList timer_queue;
michael@0 35 } tm_vars;
michael@0 36
michael@0 37 static PRStatus TimerInit(void);
michael@0 38 static void TimerManager(void *arg);
michael@0 39 static TimerEvent *CreateTimer(PRIntervalTime timeout,
michael@0 40 void (*func)(void *), void *arg);
michael@0 41 static PRBool CancelTimer(TimerEvent *timer);
michael@0 42
michael@0 43 static void TimerManager(void *arg)
michael@0 44 {
michael@0 45 PRIntervalTime now;
michael@0 46 PRIntervalTime timeout;
michael@0 47 PRCList *head;
michael@0 48 TimerEvent *timer;
michael@0 49
michael@0 50 PR_Lock(tm_vars.ml);
michael@0 51 while (1)
michael@0 52 {
michael@0 53 if (PR_CLIST_IS_EMPTY(&tm_vars.timer_queue))
michael@0 54 {
michael@0 55 PR_WaitCondVar(tm_vars.new_timer, PR_INTERVAL_NO_TIMEOUT);
michael@0 56 }
michael@0 57 else
michael@0 58 {
michael@0 59 now = PR_IntervalNow();
michael@0 60 head = PR_LIST_HEAD(&tm_vars.timer_queue);
michael@0 61 timer = TIMER_EVENT_PTR(head);
michael@0 62 if ((PRInt32) (now - timer->absolute) >= 0)
michael@0 63 {
michael@0 64 PR_REMOVE_LINK(head);
michael@0 65 /*
michael@0 66 * make its prev and next point to itself so that
michael@0 67 * it's obvious that it's not on the timer_queue.
michael@0 68 */
michael@0 69 PR_INIT_CLIST(head);
michael@0 70 PR_ASSERT(2 == timer->ref_count);
michael@0 71 PR_Unlock(tm_vars.ml);
michael@0 72 timer->func(timer->arg);
michael@0 73 PR_Lock(tm_vars.ml);
michael@0 74 timer->ref_count -= 1;
michael@0 75 if (0 == timer->ref_count)
michael@0 76 {
michael@0 77 PR_NotifyAllCondVar(tm_vars.cancel_timer);
michael@0 78 }
michael@0 79 }
michael@0 80 else
michael@0 81 {
michael@0 82 timeout = (PRIntervalTime)(timer->absolute - now);
michael@0 83 PR_WaitCondVar(tm_vars.new_timer, timeout);
michael@0 84 }
michael@0 85 }
michael@0 86 }
michael@0 87 PR_Unlock(tm_vars.ml);
michael@0 88 }
michael@0 89
michael@0 90 static TimerEvent *CreateTimer(
michael@0 91 PRIntervalTime timeout,
michael@0 92 void (*func)(void *),
michael@0 93 void *arg)
michael@0 94 {
michael@0 95 TimerEvent *timer;
michael@0 96 PRCList *links, *tail;
michael@0 97 TimerEvent *elem;
michael@0 98
michael@0 99 timer = PR_NEW(TimerEvent);
michael@0 100 if (NULL == timer)
michael@0 101 {
michael@0 102 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
michael@0 103 return timer;
michael@0 104 }
michael@0 105 timer->absolute = PR_IntervalNow() + timeout;
michael@0 106 timer->func = func;
michael@0 107 timer->arg = arg;
michael@0 108 timer->ref_count = 2;
michael@0 109 PR_Lock(tm_vars.ml);
michael@0 110 tail = links = PR_LIST_TAIL(&tm_vars.timer_queue);
michael@0 111 while (links->prev != tail)
michael@0 112 {
michael@0 113 elem = TIMER_EVENT_PTR(links);
michael@0 114 if ((PRInt32)(timer->absolute - elem->absolute) >= 0)
michael@0 115 {
michael@0 116 break;
michael@0 117 }
michael@0 118 links = links->prev;
michael@0 119 }
michael@0 120 PR_INSERT_AFTER(&timer->links, links);
michael@0 121 PR_NotifyCondVar(tm_vars.new_timer);
michael@0 122 PR_Unlock(tm_vars.ml);
michael@0 123 return timer;
michael@0 124 }
michael@0 125
michael@0 126 static PRBool CancelTimer(TimerEvent *timer)
michael@0 127 {
michael@0 128 PRBool canceled = PR_FALSE;
michael@0 129
michael@0 130 PR_Lock(tm_vars.ml);
michael@0 131 timer->ref_count -= 1;
michael@0 132 if (timer->links.prev == &timer->links)
michael@0 133 {
michael@0 134 while (timer->ref_count == 1)
michael@0 135 {
michael@0 136 PR_WaitCondVar(tm_vars.cancel_timer, PR_INTERVAL_NO_TIMEOUT);
michael@0 137 }
michael@0 138 }
michael@0 139 else
michael@0 140 {
michael@0 141 PR_REMOVE_LINK(&timer->links);
michael@0 142 canceled = PR_TRUE;
michael@0 143 }
michael@0 144 PR_Unlock(tm_vars.ml);
michael@0 145 PR_DELETE(timer);
michael@0 146 return canceled;
michael@0 147 }
michael@0 148
michael@0 149 static PRStatus TimerInit(void)
michael@0 150 {
michael@0 151 tm_vars.ml = PR_NewLock();
michael@0 152 if (NULL == tm_vars.ml)
michael@0 153 {
michael@0 154 goto failed;
michael@0 155 }
michael@0 156 tm_vars.new_timer = PR_NewCondVar(tm_vars.ml);
michael@0 157 if (NULL == tm_vars.new_timer)
michael@0 158 {
michael@0 159 goto failed;
michael@0 160 }
michael@0 161 tm_vars.cancel_timer = PR_NewCondVar(tm_vars.ml);
michael@0 162 if (NULL == tm_vars.cancel_timer)
michael@0 163 {
michael@0 164 goto failed;
michael@0 165 }
michael@0 166 PR_INIT_CLIST(&tm_vars.timer_queue);
michael@0 167 tm_vars.manager_thread = PR_CreateThread(
michael@0 168 PR_SYSTEM_THREAD, TimerManager, NULL, PR_PRIORITY_NORMAL,
michael@0 169 PR_LOCAL_THREAD, PR_UNJOINABLE_THREAD, 0);
michael@0 170 if (NULL == tm_vars.manager_thread)
michael@0 171 {
michael@0 172 goto failed;
michael@0 173 }
michael@0 174 return PR_SUCCESS;
michael@0 175
michael@0 176 failed:
michael@0 177 if (NULL != tm_vars.cancel_timer)
michael@0 178 {
michael@0 179 PR_DestroyCondVar(tm_vars.cancel_timer);
michael@0 180 }
michael@0 181 if (NULL != tm_vars.new_timer)
michael@0 182 {
michael@0 183 PR_DestroyCondVar(tm_vars.new_timer);
michael@0 184 }
michael@0 185 if (NULL != tm_vars.ml)
michael@0 186 {
michael@0 187 PR_DestroyLock(tm_vars.ml);
michael@0 188 }
michael@0 189 return PR_FAILURE;
michael@0 190 }
michael@0 191
michael@0 192 #endif /* WINNT */
michael@0 193
michael@0 194 /******************************************************************/
michael@0 195 /******************************************************************/
michael@0 196 /************************ The private portion *********************/
michael@0 197 /******************************************************************/
michael@0 198 /******************************************************************/
michael@0 199 void _PR_InitMW(void)
michael@0 200 {
michael@0 201 #ifdef WINNT
michael@0 202 /*
michael@0 203 * We use NT 4's InterlockedCompareExchange() to operate
michael@0 204 * on PRMWStatus variables.
michael@0 205 */
michael@0 206 PR_ASSERT(sizeof(LONG) == sizeof(PRMWStatus));
michael@0 207 TimerInit();
michael@0 208 #endif
michael@0 209 mw_lock = PR_NewLock();
michael@0 210 PR_ASSERT(NULL != mw_lock);
michael@0 211 mw_state = PR_NEWZAP(_PRGlobalState);
michael@0 212 PR_ASSERT(NULL != mw_state);
michael@0 213 PR_INIT_CLIST(&mw_state->group_list);
michael@0 214 max_polling_interval = PR_MillisecondsToInterval(MAX_POLLING_INTERVAL);
michael@0 215 } /* _PR_InitMW */
michael@0 216
michael@0 217 void _PR_CleanupMW(void)
michael@0 218 {
michael@0 219 PR_DestroyLock(mw_lock);
michael@0 220 mw_lock = NULL;
michael@0 221 if (mw_state->group) {
michael@0 222 PR_DestroyWaitGroup(mw_state->group);
michael@0 223 /* mw_state->group is set to NULL as a side effect. */
michael@0 224 }
michael@0 225 PR_DELETE(mw_state);
michael@0 226 } /* _PR_CleanupMW */
michael@0 227
michael@0 228 static PRWaitGroup *MW_Init2(void)
michael@0 229 {
michael@0 230 PRWaitGroup *group = mw_state->group; /* it's the null group */
michael@0 231 if (NULL == group) /* there is this special case */
michael@0 232 {
michael@0 233 group = PR_CreateWaitGroup(_PR_DEFAULT_HASH_LENGTH);
michael@0 234 if (NULL == group) goto failed_alloc;
michael@0 235 PR_Lock(mw_lock);
michael@0 236 if (NULL == mw_state->group)
michael@0 237 {
michael@0 238 mw_state->group = group;
michael@0 239 group = NULL;
michael@0 240 }
michael@0 241 PR_Unlock(mw_lock);
michael@0 242 if (group != NULL) (void)PR_DestroyWaitGroup(group);
michael@0 243 group = mw_state->group; /* somebody beat us to it */
michael@0 244 }
michael@0 245 failed_alloc:
michael@0 246 return group; /* whatever */
michael@0 247 } /* MW_Init2 */
michael@0 248
michael@0 249 static _PR_HashStory MW_AddHashInternal(PRRecvWait *desc, _PRWaiterHash *hash)
michael@0 250 {
michael@0 251 /*
michael@0 252 ** The entries are put in the table using the fd (PRFileDesc*) of
michael@0 253 ** the receive descriptor as the key. This allows us to locate
michael@0 254 ** the appropriate entry aqain when the poll operation finishes.
michael@0 255 **
michael@0 256 ** The pointer to the file descriptor object is first divided by
michael@0 257 ** the natural alignment of a pointer in the belief that object
michael@0 258 ** will have at least that many zeros in the low order bits.
michael@0 259 ** This may not be a good assuption.
michael@0 260 **
michael@0 261 ** We try to put the entry in by rehashing _MW_REHASH_MAX times. After
michael@0 262 ** that we declare defeat and force the table to be reconstructed.
michael@0 263 ** Since some fds might be added more than once, won't that cause
michael@0 264 ** collisions even in an empty table?
michael@0 265 */
michael@0 266 PRIntn rehash = _MW_REHASH_MAX;
michael@0 267 PRRecvWait **waiter;
michael@0 268 PRUintn hidx = _MW_HASH(desc->fd, hash->length);
michael@0 269 PRUintn hoffset = 0;
michael@0 270
michael@0 271 while (rehash-- > 0)
michael@0 272 {
michael@0 273 waiter = &hash->recv_wait;
michael@0 274 if (NULL == waiter[hidx])
michael@0 275 {
michael@0 276 waiter[hidx] = desc;
michael@0 277 hash->count += 1;
michael@0 278 #if 0
michael@0 279 printf("Adding 0x%x->0x%x ", desc, desc->fd);
michael@0 280 printf(
michael@0 281 "table[%u:%u:*%u]: 0x%x->0x%x\n",
michael@0 282 hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
michael@0 283 #endif
michael@0 284 return _prmw_success;
michael@0 285 }
michael@0 286 if (desc == waiter[hidx])
michael@0 287 {
michael@0 288 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); /* desc already in table */
michael@0 289 return _prmw_error;
michael@0 290 }
michael@0 291 #if 0
michael@0 292 printf("Failing 0x%x->0x%x ", desc, desc->fd);
michael@0 293 printf(
michael@0 294 "table[*%u:%u:%u]: 0x%x->0x%x\n",
michael@0 295 hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
michael@0 296 #endif
michael@0 297 if (0 == hoffset)
michael@0 298 {
michael@0 299 hoffset = _MW_HASH2(desc->fd, hash->length);
michael@0 300 PR_ASSERT(0 != hoffset);
michael@0 301 }
michael@0 302 hidx = (hidx + hoffset) % (hash->length);
michael@0 303 }
michael@0 304 return _prmw_rehash;
michael@0 305 } /* MW_AddHashInternal */
michael@0 306
michael@0 307 static _PR_HashStory MW_ExpandHashInternal(PRWaitGroup *group)
michael@0 308 {
michael@0 309 PRRecvWait **desc;
michael@0 310 PRUint32 pidx, length;
michael@0 311 _PRWaiterHash *newHash, *oldHash = group->waiter;
michael@0 312 PRBool retry;
michael@0 313 _PR_HashStory hrv;
michael@0 314
michael@0 315 static const PRInt32 prime_number[] = {
michael@0 316 _PR_DEFAULT_HASH_LENGTH, 179, 521, 907, 1427,
michael@0 317 2711, 3917, 5021, 8219, 11549, 18911, 26711, 33749, 44771};
michael@0 318 PRUintn primes = (sizeof(prime_number) / sizeof(PRInt32));
michael@0 319
michael@0 320 /* look up the next size we'd like to use for the hash table */
michael@0 321 for (pidx = 0; pidx < primes; ++pidx)
michael@0 322 {
michael@0 323 if (prime_number[pidx] == oldHash->length)
michael@0 324 {
michael@0 325 break;
michael@0 326 }
michael@0 327 }
michael@0 328 /* table size must be one of the prime numbers */
michael@0 329 PR_ASSERT(pidx < primes);
michael@0 330
michael@0 331 /* if pidx == primes - 1, we can't expand the table any more */
michael@0 332 while (pidx < primes - 1)
michael@0 333 {
michael@0 334 /* next size */
michael@0 335 ++pidx;
michael@0 336 length = prime_number[pidx];
michael@0 337
michael@0 338 /* allocate the new hash table and fill it in with the old */
michael@0 339 newHash = (_PRWaiterHash*)PR_CALLOC(
michael@0 340 sizeof(_PRWaiterHash) + (length * sizeof(PRRecvWait*)));
michael@0 341 if (NULL == newHash)
michael@0 342 {
michael@0 343 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
michael@0 344 return _prmw_error;
michael@0 345 }
michael@0 346
michael@0 347 newHash->length = length;
michael@0 348 retry = PR_FALSE;
michael@0 349 for (desc = &oldHash->recv_wait;
michael@0 350 newHash->count < oldHash->count; ++desc)
michael@0 351 {
michael@0 352 PR_ASSERT(desc < &oldHash->recv_wait + oldHash->length);
michael@0 353 if (NULL != *desc)
michael@0 354 {
michael@0 355 hrv = MW_AddHashInternal(*desc, newHash);
michael@0 356 PR_ASSERT(_prmw_error != hrv);
michael@0 357 if (_prmw_success != hrv)
michael@0 358 {
michael@0 359 PR_DELETE(newHash);
michael@0 360 retry = PR_TRUE;
michael@0 361 break;
michael@0 362 }
michael@0 363 }
michael@0 364 }
michael@0 365 if (retry) continue;
michael@0 366
michael@0 367 PR_DELETE(group->waiter);
michael@0 368 group->waiter = newHash;
michael@0 369 group->p_timestamp += 1;
michael@0 370 return _prmw_success;
michael@0 371 }
michael@0 372
michael@0 373 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
michael@0 374 return _prmw_error; /* we're hosed */
michael@0 375 } /* MW_ExpandHashInternal */
michael@0 376
michael@0 377 #ifndef WINNT
michael@0 378 static void _MW_DoneInternal(
michael@0 379 PRWaitGroup *group, PRRecvWait **waiter, PRMWStatus outcome)
michael@0 380 {
michael@0 381 /*
michael@0 382 ** Add this receive wait object to the list of finished I/O
michael@0 383 ** operations for this particular group. If there are other
michael@0 384 ** threads waiting on the group, notify one. If not, arrange
michael@0 385 ** for this thread to return.
michael@0 386 */
michael@0 387
michael@0 388 #if 0
michael@0 389 printf("Removing 0x%x->0x%x\n", *waiter, (*waiter)->fd);
michael@0 390 #endif
michael@0 391 (*waiter)->outcome = outcome;
michael@0 392 PR_APPEND_LINK(&((*waiter)->internal), &group->io_ready);
michael@0 393 PR_NotifyCondVar(group->io_complete);
michael@0 394 PR_ASSERT(0 != group->waiter->count);
michael@0 395 group->waiter->count -= 1;
michael@0 396 *waiter = NULL;
michael@0 397 } /* _MW_DoneInternal */
michael@0 398 #endif /* WINNT */
michael@0 399
michael@0 400 static PRRecvWait **_MW_LookupInternal(PRWaitGroup *group, PRFileDesc *fd)
michael@0 401 {
michael@0 402 /*
michael@0 403 ** Find the receive wait object corresponding to the file descriptor.
michael@0 404 ** Only search the wait group specified.
michael@0 405 */
michael@0 406 PRRecvWait **desc;
michael@0 407 PRIntn rehash = _MW_REHASH_MAX;
michael@0 408 _PRWaiterHash *hash = group->waiter;
michael@0 409 PRUintn hidx = _MW_HASH(fd, hash->length);
michael@0 410 PRUintn hoffset = 0;
michael@0 411
michael@0 412 while (rehash-- > 0)
michael@0 413 {
michael@0 414 desc = (&hash->recv_wait) + hidx;
michael@0 415 if ((*desc != NULL) && ((*desc)->fd == fd)) return desc;
michael@0 416 if (0 == hoffset)
michael@0 417 {
michael@0 418 hoffset = _MW_HASH2(fd, hash->length);
michael@0 419 PR_ASSERT(0 != hoffset);
michael@0 420 }
michael@0 421 hidx = (hidx + hoffset) % (hash->length);
michael@0 422 }
michael@0 423 return NULL;
michael@0 424 } /* _MW_LookupInternal */
michael@0 425
michael@0 426 #ifndef WINNT
michael@0 427 static PRStatus _MW_PollInternal(PRWaitGroup *group)
michael@0 428 {
michael@0 429 PRRecvWait **waiter;
michael@0 430 PRStatus rv = PR_FAILURE;
michael@0 431 PRInt32 count, count_ready;
michael@0 432 PRIntervalTime polling_interval;
michael@0 433
michael@0 434 group->poller = PR_GetCurrentThread();
michael@0 435
michael@0 436 while (PR_TRUE)
michael@0 437 {
michael@0 438 PRIntervalTime now, since_last_poll;
michael@0 439 PRPollDesc *poll_list;
michael@0 440
michael@0 441 while (0 == group->waiter->count)
michael@0 442 {
michael@0 443 PRStatus st;
michael@0 444 st = PR_WaitCondVar(group->new_business, PR_INTERVAL_NO_TIMEOUT);
michael@0 445 if (_prmw_running != group->state)
michael@0 446 {
michael@0 447 PR_SetError(PR_INVALID_STATE_ERROR, 0);
michael@0 448 goto aborted;
michael@0 449 }
michael@0 450 if (_MW_ABORTED(st)) goto aborted;
michael@0 451 }
michael@0 452
michael@0 453 /*
michael@0 454 ** There's something to do. See if our existing polling list
michael@0 455 ** is large enough for what we have to do?
michael@0 456 */
michael@0 457
michael@0 458 while (group->polling_count < group->waiter->count)
michael@0 459 {
michael@0 460 PRUint32 old_count = group->waiter->count;
michael@0 461 PRUint32 new_count = PR_ROUNDUP(old_count, _PR_POLL_COUNT_FUDGE);
michael@0 462 PRSize new_size = sizeof(PRPollDesc) * new_count;
michael@0 463 PRPollDesc *old_polling_list = group->polling_list;
michael@0 464
michael@0 465 PR_Unlock(group->ml);
michael@0 466 poll_list = (PRPollDesc*)PR_CALLOC(new_size);
michael@0 467 if (NULL == poll_list)
michael@0 468 {
michael@0 469 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
michael@0 470 PR_Lock(group->ml);
michael@0 471 goto failed_alloc;
michael@0 472 }
michael@0 473 if (NULL != old_polling_list)
michael@0 474 PR_DELETE(old_polling_list);
michael@0 475 PR_Lock(group->ml);
michael@0 476 if (_prmw_running != group->state)
michael@0 477 {
michael@0 478 PR_SetError(PR_INVALID_STATE_ERROR, 0);
michael@0 479 goto aborted;
michael@0 480 }
michael@0 481 group->polling_list = poll_list;
michael@0 482 group->polling_count = new_count;
michael@0 483 }
michael@0 484
michael@0 485 now = PR_IntervalNow();
michael@0 486 polling_interval = max_polling_interval;
michael@0 487 since_last_poll = now - group->last_poll;
michael@0 488
michael@0 489 waiter = &group->waiter->recv_wait;
michael@0 490 poll_list = group->polling_list;
michael@0 491 for (count = 0; count < group->waiter->count; ++waiter)
michael@0 492 {
michael@0 493 PR_ASSERT(waiter < &group->waiter->recv_wait
michael@0 494 + group->waiter->length);
michael@0 495 if (NULL != *waiter) /* a live one! */
michael@0 496 {
michael@0 497 if ((PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout)
michael@0 498 && (since_last_poll >= (*waiter)->timeout))
michael@0 499 _MW_DoneInternal(group, waiter, PR_MW_TIMEOUT);
michael@0 500 else
michael@0 501 {
michael@0 502 if (PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout)
michael@0 503 {
michael@0 504 (*waiter)->timeout -= since_last_poll;
michael@0 505 if ((*waiter)->timeout < polling_interval)
michael@0 506 polling_interval = (*waiter)->timeout;
michael@0 507 }
michael@0 508 PR_ASSERT(poll_list < group->polling_list
michael@0 509 + group->polling_count);
michael@0 510 poll_list->fd = (*waiter)->fd;
michael@0 511 poll_list->in_flags = PR_POLL_READ;
michael@0 512 poll_list->out_flags = 0;
michael@0 513 #if 0
michael@0 514 printf(
michael@0 515 "Polling 0x%x[%d]: [fd: 0x%x, tmo: %u]\n",
michael@0 516 poll_list, count, poll_list->fd, (*waiter)->timeout);
michael@0 517 #endif
michael@0 518 poll_list += 1;
michael@0 519 count += 1;
michael@0 520 }
michael@0 521 }
michael@0 522 }
michael@0 523
michael@0 524 PR_ASSERT(count == group->waiter->count);
michael@0 525
michael@0 526 /*
michael@0 527 ** If there are no more threads waiting for completion,
michael@0 528 ** we need to return.
michael@0 529 */
michael@0 530 if ((!PR_CLIST_IS_EMPTY(&group->io_ready))
michael@0 531 && (1 == group->waiting_threads)) break;
michael@0 532
michael@0 533 if (0 == count) continue; /* wait for new business */
michael@0 534
michael@0 535 group->last_poll = now;
michael@0 536
michael@0 537 PR_Unlock(group->ml);
michael@0 538
michael@0 539 count_ready = PR_Poll(group->polling_list, count, polling_interval);
michael@0 540
michael@0 541 PR_Lock(group->ml);
michael@0 542
michael@0 543 if (_prmw_running != group->state)
michael@0 544 {
michael@0 545 PR_SetError(PR_INVALID_STATE_ERROR, 0);
michael@0 546 goto aborted;
michael@0 547 }
michael@0 548 if (-1 == count_ready)
michael@0 549 {
michael@0 550 goto failed_poll; /* that's a shame */
michael@0 551 }
michael@0 552 else if (0 < count_ready)
michael@0 553 {
michael@0 554 for (poll_list = group->polling_list; count > 0;
michael@0 555 poll_list++, count--)
michael@0 556 {
michael@0 557 PR_ASSERT(
michael@0 558 poll_list < group->polling_list + group->polling_count);
michael@0 559 if (poll_list->out_flags != 0)
michael@0 560 {
michael@0 561 waiter = _MW_LookupInternal(group, poll_list->fd);
michael@0 562 /*
michael@0 563 ** If 'waiter' is NULL, that means the wait receive
michael@0 564 ** descriptor has been canceled.
michael@0 565 */
michael@0 566 if (NULL != waiter)
michael@0 567 _MW_DoneInternal(group, waiter, PR_MW_SUCCESS);
michael@0 568 }
michael@0 569 }
michael@0 570 }
michael@0 571 /*
michael@0 572 ** If there are no more threads waiting for completion,
michael@0 573 ** we need to return.
michael@0 574 ** This thread was "borrowed" to do the polling, but it really
michael@0 575 ** belongs to the client.
michael@0 576 */
michael@0 577 if ((!PR_CLIST_IS_EMPTY(&group->io_ready))
michael@0 578 && (1 == group->waiting_threads)) break;
michael@0 579 }
michael@0 580
michael@0 581 rv = PR_SUCCESS;
michael@0 582
michael@0 583 aborted:
michael@0 584 failed_poll:
michael@0 585 failed_alloc:
michael@0 586 group->poller = NULL; /* we were that, not we ain't */
michael@0 587 if ((_prmw_running == group->state) && (group->waiting_threads > 1))
michael@0 588 {
michael@0 589 /* Wake up one thread to become the new poller. */
michael@0 590 PR_NotifyCondVar(group->io_complete);
michael@0 591 }
michael@0 592 return rv; /* we return with the lock held */
michael@0 593 } /* _MW_PollInternal */
michael@0 594 #endif /* !WINNT */
michael@0 595
michael@0 596 static PRMWGroupState MW_TestForShutdownInternal(PRWaitGroup *group)
michael@0 597 {
michael@0 598 PRMWGroupState rv = group->state;
michael@0 599 /*
michael@0 600 ** Looking at the group's fields is safe because
michael@0 601 ** once the group's state is no longer running, it
michael@0 602 ** cannot revert and there is a safe check on entry
michael@0 603 ** to make sure no more threads are made to wait.
michael@0 604 */
michael@0 605 if ((_prmw_stopping == rv)
michael@0 606 && (0 == group->waiting_threads))
michael@0 607 {
michael@0 608 rv = group->state = _prmw_stopped;
michael@0 609 PR_NotifyCondVar(group->mw_manage);
michael@0 610 }
michael@0 611 return rv;
michael@0 612 } /* MW_TestForShutdownInternal */
michael@0 613
michael@0 614 #ifndef WINNT
michael@0 615 static void _MW_InitialRecv(PRCList *io_ready)
michael@0 616 {
michael@0 617 PRRecvWait *desc = (PRRecvWait*)io_ready;
michael@0 618 if ((NULL == desc->buffer.start)
michael@0 619 || (0 == desc->buffer.length))
michael@0 620 desc->bytesRecv = 0;
michael@0 621 else
michael@0 622 {
michael@0 623 desc->bytesRecv = (desc->fd->methods->recv)(
michael@0 624 desc->fd, desc->buffer.start,
michael@0 625 desc->buffer.length, 0, desc->timeout);
michael@0 626 if (desc->bytesRecv < 0) /* SetError should already be there */
michael@0 627 desc->outcome = PR_MW_FAILURE;
michael@0 628 }
michael@0 629 } /* _MW_InitialRecv */
michael@0 630 #endif
michael@0 631
michael@0 632 #ifdef WINNT
michael@0 633 static void NT_TimeProc(void *arg)
michael@0 634 {
michael@0 635 _MDOverlapped *overlapped = (_MDOverlapped *)arg;
michael@0 636 PRRecvWait *desc = overlapped->data.mw.desc;
michael@0 637 PRFileDesc *bottom;
michael@0 638
michael@0 639 if (InterlockedCompareExchange((LONG *)&desc->outcome,
michael@0 640 (LONG)PR_MW_TIMEOUT, (LONG)PR_MW_PENDING) != (LONG)PR_MW_PENDING)
michael@0 641 {
michael@0 642 /* This wait recv descriptor has already completed. */
michael@0 643 return;
michael@0 644 }
michael@0 645
michael@0 646 /* close the osfd to abort the outstanding async io request */
michael@0 647 /* $$$$
michael@0 648 ** Little late to be checking if NSPR's on the bottom of stack,
michael@0 649 ** but if we don't check, we can't assert that the private data
michael@0 650 ** is what we think it is.
michael@0 651 ** $$$$
michael@0 652 */
michael@0 653 bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
michael@0 654 PR_ASSERT(NULL != bottom);
michael@0 655 if (NULL != bottom) /* now what!?!?! */
michael@0 656 {
michael@0 657 bottom->secret->state = _PR_FILEDESC_CLOSED;
michael@0 658 if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
michael@0 659 {
michael@0 660 fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
michael@0 661 PR_ASSERT(!"What shall I do?");
michael@0 662 }
michael@0 663 }
michael@0 664 return;
michael@0 665 } /* NT_TimeProc */
michael@0 666
michael@0 667 static PRStatus NT_HashRemove(PRWaitGroup *group, PRFileDesc *fd)
michael@0 668 {
michael@0 669 PRRecvWait **waiter;
michael@0 670
michael@0 671 _PR_MD_LOCK(&group->mdlock);
michael@0 672 waiter = _MW_LookupInternal(group, fd);
michael@0 673 if (NULL != waiter)
michael@0 674 {
michael@0 675 group->waiter->count -= 1;
michael@0 676 *waiter = NULL;
michael@0 677 }
michael@0 678 _PR_MD_UNLOCK(&group->mdlock);
michael@0 679 return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
michael@0 680 }
michael@0 681
michael@0 682 PRStatus NT_HashRemoveInternal(PRWaitGroup *group, PRFileDesc *fd)
michael@0 683 {
michael@0 684 PRRecvWait **waiter;
michael@0 685
michael@0 686 waiter = _MW_LookupInternal(group, fd);
michael@0 687 if (NULL != waiter)
michael@0 688 {
michael@0 689 group->waiter->count -= 1;
michael@0 690 *waiter = NULL;
michael@0 691 }
michael@0 692 return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
michael@0 693 }
michael@0 694 #endif /* WINNT */
michael@0 695
michael@0 696 /******************************************************************/
michael@0 697 /******************************************************************/
michael@0 698 /********************** The public API portion ********************/
michael@0 699 /******************************************************************/
michael@0 700 /******************************************************************/
michael@0 701 PR_IMPLEMENT(PRStatus) PR_AddWaitFileDesc(
michael@0 702 PRWaitGroup *group, PRRecvWait *desc)
michael@0 703 {
michael@0 704 _PR_HashStory hrv;
michael@0 705 PRStatus rv = PR_FAILURE;
michael@0 706 #ifdef WINNT
michael@0 707 _MDOverlapped *overlapped;
michael@0 708 HANDLE hFile;
michael@0 709 BOOL bResult;
michael@0 710 DWORD dwError;
michael@0 711 PRFileDesc *bottom;
michael@0 712 #endif
michael@0 713
michael@0 714 if (!_pr_initialized) _PR_ImplicitInitialization();
michael@0 715 if ((NULL == group) && (NULL == (group = MW_Init2())))
michael@0 716 {
michael@0 717 return rv;
michael@0 718 }
michael@0 719
michael@0 720 PR_ASSERT(NULL != desc->fd);
michael@0 721
michael@0 722 desc->outcome = PR_MW_PENDING; /* nice, well known value */
michael@0 723 desc->bytesRecv = 0; /* likewise, though this value is ambiguious */
michael@0 724
michael@0 725 PR_Lock(group->ml);
michael@0 726
michael@0 727 if (_prmw_running != group->state)
michael@0 728 {
michael@0 729 /* Not allowed to add after cancelling the group */
michael@0 730 desc->outcome = PR_MW_INTERRUPT;
michael@0 731 PR_SetError(PR_INVALID_STATE_ERROR, 0);
michael@0 732 PR_Unlock(group->ml);
michael@0 733 return rv;
michael@0 734 }
michael@0 735
michael@0 736 #ifdef WINNT
michael@0 737 _PR_MD_LOCK(&group->mdlock);
michael@0 738 #endif
michael@0 739
michael@0 740 /*
michael@0 741 ** If the waiter count is zero at this point, there's no telling
michael@0 742 ** how long we've been idle. Therefore, initialize the beginning
michael@0 743 ** of the timing interval. As long as the list doesn't go empty,
michael@0 744 ** it will maintain itself.
michael@0 745 */
michael@0 746 if (0 == group->waiter->count)
michael@0 747 group->last_poll = PR_IntervalNow();
michael@0 748
michael@0 749 do
michael@0 750 {
michael@0 751 hrv = MW_AddHashInternal(desc, group->waiter);
michael@0 752 if (_prmw_rehash != hrv) break;
michael@0 753 hrv = MW_ExpandHashInternal(group); /* gruesome */
michael@0 754 if (_prmw_success != hrv) break;
michael@0 755 } while (PR_TRUE);
michael@0 756
michael@0 757 #ifdef WINNT
michael@0 758 _PR_MD_UNLOCK(&group->mdlock);
michael@0 759 #endif
michael@0 760
michael@0 761 PR_NotifyCondVar(group->new_business); /* tell the world */
michael@0 762 rv = (_prmw_success == hrv) ? PR_SUCCESS : PR_FAILURE;
michael@0 763 PR_Unlock(group->ml);
michael@0 764
michael@0 765 #ifdef WINNT
michael@0 766 overlapped = PR_NEWZAP(_MDOverlapped);
michael@0 767 if (NULL == overlapped)
michael@0 768 {
michael@0 769 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
michael@0 770 NT_HashRemove(group, desc->fd);
michael@0 771 return rv;
michael@0 772 }
michael@0 773 overlapped->ioModel = _MD_MultiWaitIO;
michael@0 774 overlapped->data.mw.desc = desc;
michael@0 775 overlapped->data.mw.group = group;
michael@0 776 if (desc->timeout != PR_INTERVAL_NO_TIMEOUT)
michael@0 777 {
michael@0 778 overlapped->data.mw.timer = CreateTimer(
michael@0 779 desc->timeout,
michael@0 780 NT_TimeProc,
michael@0 781 overlapped);
michael@0 782 if (0 == overlapped->data.mw.timer)
michael@0 783 {
michael@0 784 NT_HashRemove(group, desc->fd);
michael@0 785 PR_DELETE(overlapped);
michael@0 786 /*
michael@0 787 * XXX It appears that a maximum of 16 timer events can
michael@0 788 * be outstanding. GetLastError() returns 0 when I try it.
michael@0 789 */
michael@0 790 PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, GetLastError());
michael@0 791 return PR_FAILURE;
michael@0 792 }
michael@0 793 }
michael@0 794
michael@0 795 /* Reach to the bottom layer to get the OS fd */
michael@0 796 bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
michael@0 797 PR_ASSERT(NULL != bottom);
michael@0 798 if (NULL == bottom)
michael@0 799 {
michael@0 800 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
michael@0 801 return PR_FAILURE;
michael@0 802 }
michael@0 803 hFile = (HANDLE)bottom->secret->md.osfd;
michael@0 804 if (!bottom->secret->md.io_model_committed)
michael@0 805 {
michael@0 806 PRInt32 st;
michael@0 807 st = _md_Associate(hFile);
michael@0 808 PR_ASSERT(0 != st);
michael@0 809 bottom->secret->md.io_model_committed = PR_TRUE;
michael@0 810 }
michael@0 811 bResult = ReadFile(hFile,
michael@0 812 desc->buffer.start,
michael@0 813 (DWORD)desc->buffer.length,
michael@0 814 NULL,
michael@0 815 &overlapped->overlapped);
michael@0 816 if (FALSE == bResult && (dwError = GetLastError()) != ERROR_IO_PENDING)
michael@0 817 {
michael@0 818 if (desc->timeout != PR_INTERVAL_NO_TIMEOUT)
michael@0 819 {
michael@0 820 if (InterlockedCompareExchange((LONG *)&desc->outcome,
michael@0 821 (LONG)PR_MW_FAILURE, (LONG)PR_MW_PENDING)
michael@0 822 == (LONG)PR_MW_PENDING)
michael@0 823 {
michael@0 824 CancelTimer(overlapped->data.mw.timer);
michael@0 825 }
michael@0 826 NT_HashRemove(group, desc->fd);
michael@0 827 PR_DELETE(overlapped);
michael@0 828 }
michael@0 829 _PR_MD_MAP_READ_ERROR(dwError);
michael@0 830 rv = PR_FAILURE;
michael@0 831 }
michael@0 832 #endif
michael@0 833
michael@0 834 return rv;
michael@0 835 } /* PR_AddWaitFileDesc */
michael@0 836
michael@0 837 PR_IMPLEMENT(PRRecvWait*) PR_WaitRecvReady(PRWaitGroup *group)
michael@0 838 {
michael@0 839 PRCList *io_ready = NULL;
michael@0 840 #ifdef WINNT
michael@0 841 PRThread *me = _PR_MD_CURRENT_THREAD();
michael@0 842 _MDOverlapped *overlapped;
michael@0 843 #endif
michael@0 844
michael@0 845 if (!_pr_initialized) _PR_ImplicitInitialization();
michael@0 846 if ((NULL == group) && (NULL == (group = MW_Init2()))) goto failed_init;
michael@0 847
michael@0 848 PR_Lock(group->ml);
michael@0 849
michael@0 850 if (_prmw_running != group->state)
michael@0 851 {
michael@0 852 PR_SetError(PR_INVALID_STATE_ERROR, 0);
michael@0 853 goto invalid_state;
michael@0 854 }
michael@0 855
michael@0 856 group->waiting_threads += 1; /* the polling thread is counted */
michael@0 857
michael@0 858 #ifdef WINNT
michael@0 859 _PR_MD_LOCK(&group->mdlock);
michael@0 860 while (PR_CLIST_IS_EMPTY(&group->io_ready))
michael@0 861 {
michael@0 862 _PR_THREAD_LOCK(me);
michael@0 863 me->state = _PR_IO_WAIT;
michael@0 864 PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
michael@0 865 if (!_PR_IS_NATIVE_THREAD(me))
michael@0 866 {
michael@0 867 _PR_SLEEPQ_LOCK(me->cpu);
michael@0 868 _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
michael@0 869 _PR_SLEEPQ_UNLOCK(me->cpu);
michael@0 870 }
michael@0 871 _PR_THREAD_UNLOCK(me);
michael@0 872 _PR_MD_UNLOCK(&group->mdlock);
michael@0 873 PR_Unlock(group->ml);
michael@0 874 _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
michael@0 875 me->state = _PR_RUNNING;
michael@0 876 PR_Lock(group->ml);
michael@0 877 _PR_MD_LOCK(&group->mdlock);
michael@0 878 if (_PR_PENDING_INTERRUPT(me)) {
michael@0 879 PR_REMOVE_LINK(&me->waitQLinks);
michael@0 880 _PR_MD_UNLOCK(&group->mdlock);
michael@0 881 me->flags &= ~_PR_INTERRUPT;
michael@0 882 me->io_suspended = PR_FALSE;
michael@0 883 PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
michael@0 884 goto aborted;
michael@0 885 }
michael@0 886 }
michael@0 887 io_ready = PR_LIST_HEAD(&group->io_ready);
michael@0 888 PR_ASSERT(io_ready != NULL);
michael@0 889 PR_REMOVE_LINK(io_ready);
michael@0 890 _PR_MD_UNLOCK(&group->mdlock);
michael@0 891 overlapped = (_MDOverlapped *)
michael@0 892 ((char *)io_ready - offsetof(_MDOverlapped, data));
michael@0 893 io_ready = &overlapped->data.mw.desc->internal;
michael@0 894 #else
michael@0 895 do
michael@0 896 {
michael@0 897 /*
michael@0 898 ** If the I/O ready list isn't empty, have this thread
michael@0 899 ** return with the first receive wait object that's available.
michael@0 900 */
michael@0 901 if (PR_CLIST_IS_EMPTY(&group->io_ready))
michael@0 902 {
michael@0 903 /*
michael@0 904 ** Is there a polling thread yet? If not, grab this thread
michael@0 905 ** and use it.
michael@0 906 */
michael@0 907 if (NULL == group->poller)
michael@0 908 {
michael@0 909 /*
michael@0 910 ** This thread will stay do polling until it becomes the only one
michael@0 911 ** left to service a completion. Then it will return and there will
michael@0 912 ** be none left to actually poll or to run completions.
michael@0 913 **
michael@0 914 ** The polling function should only return w/ failure or
michael@0 915 ** with some I/O ready.
michael@0 916 */
michael@0 917 if (PR_FAILURE == _MW_PollInternal(group)) goto failed_poll;
michael@0 918 }
michael@0 919 else
michael@0 920 {
michael@0 921 /*
michael@0 922 ** There are four reasons a thread can be awakened from
michael@0 923 ** a wait on the io_complete condition variable.
michael@0 924 ** 1. Some I/O has completed, i.e., the io_ready list
michael@0 925 ** is nonempty.
michael@0 926 ** 2. The wait group is canceled.
michael@0 927 ** 3. The thread is interrupted.
michael@0 928 ** 4. The current polling thread has to leave and needs
michael@0 929 ** a replacement.
michael@0 930 ** The logic to find a new polling thread is made more
michael@0 931 ** complicated by all the other possible events.
michael@0 932 ** I tried my best to write the logic clearly, but
michael@0 933 ** it is still full of if's with continue and goto.
michael@0 934 */
michael@0 935 PRStatus st;
michael@0 936 do
michael@0 937 {
michael@0 938 st = PR_WaitCondVar(group->io_complete, PR_INTERVAL_NO_TIMEOUT);
michael@0 939 if (_prmw_running != group->state)
michael@0 940 {
michael@0 941 PR_SetError(PR_INVALID_STATE_ERROR, 0);
michael@0 942 goto aborted;
michael@0 943 }
michael@0 944 if (_MW_ABORTED(st) || (NULL == group->poller)) break;
michael@0 945 } while (PR_CLIST_IS_EMPTY(&group->io_ready));
michael@0 946
michael@0 947 /*
michael@0 948 ** The thread is interrupted and has to leave. It might
michael@0 949 ** have also been awakened to process ready i/o or be the
michael@0 950 ** new poller. To be safe, if either condition is true,
michael@0 951 ** we awaken another thread to take its place.
michael@0 952 */
michael@0 953 if (_MW_ABORTED(st))
michael@0 954 {
michael@0 955 if ((NULL == group->poller
michael@0 956 || !PR_CLIST_IS_EMPTY(&group->io_ready))
michael@0 957 && group->waiting_threads > 1)
michael@0 958 PR_NotifyCondVar(group->io_complete);
michael@0 959 goto aborted;
michael@0 960 }
michael@0 961
michael@0 962 /*
michael@0 963 ** A new poller is needed, but can I be the new poller?
michael@0 964 ** If there is no i/o ready, sure. But if there is any
michael@0 965 ** i/o ready, it has a higher priority. I want to
michael@0 966 ** process the ready i/o first and wake up another
michael@0 967 ** thread to be the new poller.
michael@0 968 */
michael@0 969 if (NULL == group->poller)
michael@0 970 {
michael@0 971 if (PR_CLIST_IS_EMPTY(&group->io_ready))
michael@0 972 continue;
michael@0 973 if (group->waiting_threads > 1)
michael@0 974 PR_NotifyCondVar(group->io_complete);
michael@0 975 }
michael@0 976 }
michael@0 977 PR_ASSERT(!PR_CLIST_IS_EMPTY(&group->io_ready));
michael@0 978 }
michael@0 979 io_ready = PR_LIST_HEAD(&group->io_ready);
michael@0 980 PR_NotifyCondVar(group->io_taken);
michael@0 981 PR_ASSERT(io_ready != NULL);
michael@0 982 PR_REMOVE_LINK(io_ready);
michael@0 983 } while (NULL == io_ready);
michael@0 984
michael@0 985 failed_poll:
michael@0 986
michael@0 987 #endif
michael@0 988
michael@0 989 aborted:
michael@0 990
michael@0 991 group->waiting_threads -= 1;
michael@0 992 invalid_state:
michael@0 993 (void)MW_TestForShutdownInternal(group);
michael@0 994 PR_Unlock(group->ml);
michael@0 995
michael@0 996 failed_init:
michael@0 997 if (NULL != io_ready)
michael@0 998 {
michael@0 999 /* If the operation failed, record the reason why */
michael@0 1000 switch (((PRRecvWait*)io_ready)->outcome)
michael@0 1001 {
michael@0 1002 case PR_MW_PENDING:
michael@0 1003 PR_ASSERT(0);
michael@0 1004 break;
michael@0 1005 case PR_MW_SUCCESS:
michael@0 1006 #ifndef WINNT
michael@0 1007 _MW_InitialRecv(io_ready);
michael@0 1008 #endif
michael@0 1009 break;
michael@0 1010 #ifdef WINNT
michael@0 1011 case PR_MW_FAILURE:
michael@0 1012 _PR_MD_MAP_READ_ERROR(overlapped->data.mw.error);
michael@0 1013 break;
michael@0 1014 #endif
michael@0 1015 case PR_MW_TIMEOUT:
michael@0 1016 PR_SetError(PR_IO_TIMEOUT_ERROR, 0);
michael@0 1017 break;
michael@0 1018 case PR_MW_INTERRUPT:
michael@0 1019 PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
michael@0 1020 break;
michael@0 1021 default: break;
michael@0 1022 }
michael@0 1023 #ifdef WINNT
michael@0 1024 if (NULL != overlapped->data.mw.timer)
michael@0 1025 {
michael@0 1026 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
michael@0 1027 != overlapped->data.mw.desc->timeout);
michael@0 1028 CancelTimer(overlapped->data.mw.timer);
michael@0 1029 }
michael@0 1030 else
michael@0 1031 {
michael@0 1032 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
michael@0 1033 == overlapped->data.mw.desc->timeout);
michael@0 1034 }
michael@0 1035 PR_DELETE(overlapped);
michael@0 1036 #endif
michael@0 1037 }
michael@0 1038 return (PRRecvWait*)io_ready;
michael@0 1039 } /* PR_WaitRecvReady */
michael@0 1040
michael@0 1041 PR_IMPLEMENT(PRStatus) PR_CancelWaitFileDesc(PRWaitGroup *group, PRRecvWait *desc)
michael@0 1042 {
michael@0 1043 #if !defined(WINNT)
michael@0 1044 PRRecvWait **recv_wait;
michael@0 1045 #endif
michael@0 1046 PRStatus rv = PR_SUCCESS;
michael@0 1047 if (NULL == group) group = mw_state->group;
michael@0 1048 PR_ASSERT(NULL != group);
michael@0 1049 if (NULL == group)
michael@0 1050 {
michael@0 1051 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
michael@0 1052 return PR_FAILURE;
michael@0 1053 }
michael@0 1054
michael@0 1055 PR_Lock(group->ml);
michael@0 1056
michael@0 1057 if (_prmw_running != group->state)
michael@0 1058 {
michael@0 1059 PR_SetError(PR_INVALID_STATE_ERROR, 0);
michael@0 1060 rv = PR_FAILURE;
michael@0 1061 goto unlock;
michael@0 1062 }
michael@0 1063
michael@0 1064 #ifdef WINNT
michael@0 1065 if (InterlockedCompareExchange((LONG *)&desc->outcome,
michael@0 1066 (LONG)PR_MW_INTERRUPT, (LONG)PR_MW_PENDING) == (LONG)PR_MW_PENDING)
michael@0 1067 {
michael@0 1068 PRFileDesc *bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
michael@0 1069 PR_ASSERT(NULL != bottom);
michael@0 1070 if (NULL == bottom)
michael@0 1071 {
michael@0 1072 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
michael@0 1073 goto unlock;
michael@0 1074 }
michael@0 1075 bottom->secret->state = _PR_FILEDESC_CLOSED;
michael@0 1076 #if 0
michael@0 1077 fprintf(stderr, "cancel wait recv: closing socket\n");
michael@0 1078 #endif
michael@0 1079 if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
michael@0 1080 {
michael@0 1081 fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
michael@0 1082 exit(1);
michael@0 1083 }
michael@0 1084 }
michael@0 1085 #else
michael@0 1086 if (NULL != (recv_wait = _MW_LookupInternal(group, desc->fd)))
michael@0 1087 {
michael@0 1088 /* it was in the wait table */
michael@0 1089 _MW_DoneInternal(group, recv_wait, PR_MW_INTERRUPT);
michael@0 1090 goto unlock;
michael@0 1091 }
michael@0 1092 if (!PR_CLIST_IS_EMPTY(&group->io_ready))
michael@0 1093 {
michael@0 1094 /* is it already complete? */
michael@0 1095 PRCList *head = PR_LIST_HEAD(&group->io_ready);
michael@0 1096 do
michael@0 1097 {
michael@0 1098 PRRecvWait *done = (PRRecvWait*)head;
michael@0 1099 if (done == desc) goto unlock;
michael@0 1100 head = PR_NEXT_LINK(head);
michael@0 1101 } while (head != &group->io_ready);
michael@0 1102 }
michael@0 1103 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
michael@0 1104 rv = PR_FAILURE;
michael@0 1105
michael@0 1106 #endif
michael@0 1107 unlock:
michael@0 1108 PR_Unlock(group->ml);
michael@0 1109 return rv;
michael@0 1110 } /* PR_CancelWaitFileDesc */
michael@0 1111
michael@0 1112 PR_IMPLEMENT(PRRecvWait*) PR_CancelWaitGroup(PRWaitGroup *group)
michael@0 1113 {
michael@0 1114 PRRecvWait **desc;
michael@0 1115 PRRecvWait *recv_wait = NULL;
michael@0 1116 #ifdef WINNT
michael@0 1117 _MDOverlapped *overlapped;
michael@0 1118 PRRecvWait **end;
michael@0 1119 PRThread *me = _PR_MD_CURRENT_THREAD();
michael@0 1120 #endif
michael@0 1121
michael@0 1122 if (NULL == group) group = mw_state->group;
michael@0 1123 PR_ASSERT(NULL != group);
michael@0 1124 if (NULL == group)
michael@0 1125 {
michael@0 1126 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
michael@0 1127 return NULL;
michael@0 1128 }
michael@0 1129
michael@0 1130 PR_Lock(group->ml);
michael@0 1131 if (_prmw_stopped != group->state)
michael@0 1132 {
michael@0 1133 if (_prmw_running == group->state)
michael@0 1134 group->state = _prmw_stopping; /* so nothing new comes in */
michael@0 1135 if (0 == group->waiting_threads) /* is there anybody else? */
michael@0 1136 group->state = _prmw_stopped; /* we can stop right now */
michael@0 1137 else
michael@0 1138 {
michael@0 1139 PR_NotifyAllCondVar(group->new_business);
michael@0 1140 PR_NotifyAllCondVar(group->io_complete);
michael@0 1141 }
michael@0 1142 while (_prmw_stopped != group->state)
michael@0 1143 (void)PR_WaitCondVar(group->mw_manage, PR_INTERVAL_NO_TIMEOUT);
michael@0 1144 }
michael@0 1145
michael@0 1146 #ifdef WINNT
michael@0 1147 _PR_MD_LOCK(&group->mdlock);
michael@0 1148 #endif
michael@0 1149 /* make all the existing descriptors look done/interrupted */
michael@0 1150 #ifdef WINNT
michael@0 1151 end = &group->waiter->recv_wait + group->waiter->length;
michael@0 1152 for (desc = &group->waiter->recv_wait; desc < end; ++desc)
michael@0 1153 {
michael@0 1154 if (NULL != *desc)
michael@0 1155 {
michael@0 1156 if (InterlockedCompareExchange((LONG *)&(*desc)->outcome,
michael@0 1157 (LONG)PR_MW_INTERRUPT, (LONG)PR_MW_PENDING)
michael@0 1158 == (LONG)PR_MW_PENDING)
michael@0 1159 {
michael@0 1160 PRFileDesc *bottom = PR_GetIdentitiesLayer(
michael@0 1161 (*desc)->fd, PR_NSPR_IO_LAYER);
michael@0 1162 PR_ASSERT(NULL != bottom);
michael@0 1163 if (NULL == bottom)
michael@0 1164 {
michael@0 1165 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
michael@0 1166 goto invalid_arg;
michael@0 1167 }
michael@0 1168 bottom->secret->state = _PR_FILEDESC_CLOSED;
michael@0 1169 #if 0
michael@0 1170 fprintf(stderr, "cancel wait group: closing socket\n");
michael@0 1171 #endif
michael@0 1172 if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
michael@0 1173 {
michael@0 1174 fprintf(stderr, "closesocket failed: %d\n",
michael@0 1175 WSAGetLastError());
michael@0 1176 exit(1);
michael@0 1177 }
michael@0 1178 }
michael@0 1179 }
michael@0 1180 }
michael@0 1181 while (group->waiter->count > 0)
michael@0 1182 {
michael@0 1183 _PR_THREAD_LOCK(me);
michael@0 1184 me->state = _PR_IO_WAIT;
michael@0 1185 PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
michael@0 1186 if (!_PR_IS_NATIVE_THREAD(me))
michael@0 1187 {
michael@0 1188 _PR_SLEEPQ_LOCK(me->cpu);
michael@0 1189 _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
michael@0 1190 _PR_SLEEPQ_UNLOCK(me->cpu);
michael@0 1191 }
michael@0 1192 _PR_THREAD_UNLOCK(me);
michael@0 1193 _PR_MD_UNLOCK(&group->mdlock);
michael@0 1194 PR_Unlock(group->ml);
michael@0 1195 _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
michael@0 1196 me->state = _PR_RUNNING;
michael@0 1197 PR_Lock(group->ml);
michael@0 1198 _PR_MD_LOCK(&group->mdlock);
michael@0 1199 }
michael@0 1200 #else
michael@0 1201 for (desc = &group->waiter->recv_wait; group->waiter->count > 0; ++desc)
michael@0 1202 {
michael@0 1203 PR_ASSERT(desc < &group->waiter->recv_wait + group->waiter->length);
michael@0 1204 if (NULL != *desc)
michael@0 1205 _MW_DoneInternal(group, desc, PR_MW_INTERRUPT);
michael@0 1206 }
michael@0 1207 #endif
michael@0 1208
michael@0 1209 /* take first element of finished list and return it or NULL */
michael@0 1210 if (PR_CLIST_IS_EMPTY(&group->io_ready))
michael@0 1211 PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
michael@0 1212 else
michael@0 1213 {
michael@0 1214 PRCList *head = PR_LIST_HEAD(&group->io_ready);
michael@0 1215 PR_REMOVE_AND_INIT_LINK(head);
michael@0 1216 #ifdef WINNT
michael@0 1217 overlapped = (_MDOverlapped *)
michael@0 1218 ((char *)head - offsetof(_MDOverlapped, data));
michael@0 1219 head = &overlapped->data.mw.desc->internal;
michael@0 1220 if (NULL != overlapped->data.mw.timer)
michael@0 1221 {
michael@0 1222 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
michael@0 1223 != overlapped->data.mw.desc->timeout);
michael@0 1224 CancelTimer(overlapped->data.mw.timer);
michael@0 1225 }
michael@0 1226 else
michael@0 1227 {
michael@0 1228 PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
michael@0 1229 == overlapped->data.mw.desc->timeout);
michael@0 1230 }
michael@0 1231 PR_DELETE(overlapped);
michael@0 1232 #endif
michael@0 1233 recv_wait = (PRRecvWait*)head;
michael@0 1234 }
michael@0 1235 #ifdef WINNT
michael@0 1236 invalid_arg:
michael@0 1237 _PR_MD_UNLOCK(&group->mdlock);
michael@0 1238 #endif
michael@0 1239 PR_Unlock(group->ml);
michael@0 1240
michael@0 1241 return recv_wait;
michael@0 1242 } /* PR_CancelWaitGroup */
michael@0 1243
michael@0 1244 PR_IMPLEMENT(PRWaitGroup*) PR_CreateWaitGroup(PRInt32 size /* ignored */)
michael@0 1245 {
michael@0 1246 PRWaitGroup *wg;
michael@0 1247
michael@0 1248 if (NULL == (wg = PR_NEWZAP(PRWaitGroup)))
michael@0 1249 {
michael@0 1250 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
michael@0 1251 goto failed;
michael@0 1252 }
michael@0 1253 /* the wait group itself */
michael@0 1254 wg->ml = PR_NewLock();
michael@0 1255 if (NULL == wg->ml) goto failed_lock;
michael@0 1256 wg->io_taken = PR_NewCondVar(wg->ml);
michael@0 1257 if (NULL == wg->io_taken) goto failed_cvar0;
michael@0 1258 wg->io_complete = PR_NewCondVar(wg->ml);
michael@0 1259 if (NULL == wg->io_complete) goto failed_cvar1;
michael@0 1260 wg->new_business = PR_NewCondVar(wg->ml);
michael@0 1261 if (NULL == wg->new_business) goto failed_cvar2;
michael@0 1262 wg->mw_manage = PR_NewCondVar(wg->ml);
michael@0 1263 if (NULL == wg->mw_manage) goto failed_cvar3;
michael@0 1264
michael@0 1265 PR_INIT_CLIST(&wg->group_link);
michael@0 1266 PR_INIT_CLIST(&wg->io_ready);
michael@0 1267
michael@0 1268 /* the waiters sequence */
michael@0 1269 wg->waiter = (_PRWaiterHash*)PR_CALLOC(
michael@0 1270 sizeof(_PRWaiterHash) +
michael@0 1271 (_PR_DEFAULT_HASH_LENGTH * sizeof(PRRecvWait*)));
michael@0 1272 if (NULL == wg->waiter)
michael@0 1273 {
michael@0 1274 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
michael@0 1275 goto failed_waiter;
michael@0 1276 }
michael@0 1277 wg->waiter->count = 0;
michael@0 1278 wg->waiter->length = _PR_DEFAULT_HASH_LENGTH;
michael@0 1279
michael@0 1280 #ifdef WINNT
michael@0 1281 _PR_MD_NEW_LOCK(&wg->mdlock);
michael@0 1282 PR_INIT_CLIST(&wg->wait_list);
michael@0 1283 #endif /* WINNT */
michael@0 1284
michael@0 1285 PR_Lock(mw_lock);
michael@0 1286 PR_APPEND_LINK(&wg->group_link, &mw_state->group_list);
michael@0 1287 PR_Unlock(mw_lock);
michael@0 1288 return wg;
michael@0 1289
michael@0 1290 failed_waiter:
michael@0 1291 PR_DestroyCondVar(wg->mw_manage);
michael@0 1292 failed_cvar3:
michael@0 1293 PR_DestroyCondVar(wg->new_business);
michael@0 1294 failed_cvar2:
michael@0 1295 PR_DestroyCondVar(wg->io_complete);
michael@0 1296 failed_cvar1:
michael@0 1297 PR_DestroyCondVar(wg->io_taken);
michael@0 1298 failed_cvar0:
michael@0 1299 PR_DestroyLock(wg->ml);
michael@0 1300 failed_lock:
michael@0 1301 PR_DELETE(wg);
michael@0 1302 wg = NULL;
michael@0 1303
michael@0 1304 failed:
michael@0 1305 return wg;
michael@0 1306 } /* MW_CreateWaitGroup */
michael@0 1307
michael@0 1308 PR_IMPLEMENT(PRStatus) PR_DestroyWaitGroup(PRWaitGroup *group)
michael@0 1309 {
michael@0 1310 PRStatus rv = PR_SUCCESS;
michael@0 1311 if (NULL == group) group = mw_state->group;
michael@0 1312 PR_ASSERT(NULL != group);
michael@0 1313 if (NULL != group)
michael@0 1314 {
michael@0 1315 PR_Lock(group->ml);
michael@0 1316 if ((group->waiting_threads == 0)
michael@0 1317 && (group->waiter->count == 0)
michael@0 1318 && PR_CLIST_IS_EMPTY(&group->io_ready))
michael@0 1319 {
michael@0 1320 group->state = _prmw_stopped;
michael@0 1321 }
michael@0 1322 else
michael@0 1323 {
michael@0 1324 PR_SetError(PR_INVALID_STATE_ERROR, 0);
michael@0 1325 rv = PR_FAILURE;
michael@0 1326 }
michael@0 1327 PR_Unlock(group->ml);
michael@0 1328 if (PR_FAILURE == rv) return rv;
michael@0 1329
michael@0 1330 PR_Lock(mw_lock);
michael@0 1331 PR_REMOVE_LINK(&group->group_link);
michael@0 1332 PR_Unlock(mw_lock);
michael@0 1333
michael@0 1334 #ifdef WINNT
michael@0 1335 /*
michael@0 1336 * XXX make sure wait_list is empty and waiter is empty.
michael@0 1337 * These must be checked while holding mdlock.
michael@0 1338 */
michael@0 1339 _PR_MD_FREE_LOCK(&group->mdlock);
michael@0 1340 #endif
michael@0 1341
michael@0 1342 PR_DELETE(group->waiter);
michael@0 1343 PR_DELETE(group->polling_list);
michael@0 1344 PR_DestroyCondVar(group->mw_manage);
michael@0 1345 PR_DestroyCondVar(group->new_business);
michael@0 1346 PR_DestroyCondVar(group->io_complete);
michael@0 1347 PR_DestroyCondVar(group->io_taken);
michael@0 1348 PR_DestroyLock(group->ml);
michael@0 1349 if (group == mw_state->group) mw_state->group = NULL;
michael@0 1350 PR_DELETE(group);
michael@0 1351 }
michael@0 1352 else
michael@0 1353 {
michael@0 1354 /* The default wait group is not created yet. */
michael@0 1355 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
michael@0 1356 rv = PR_FAILURE;
michael@0 1357 }
michael@0 1358 return rv;
michael@0 1359 } /* PR_DestroyWaitGroup */
michael@0 1360
michael@0 1361 /**********************************************************************
michael@0 1362 ***********************************************************************
michael@0 1363 ******************** Wait group enumerations **************************
michael@0 1364 ***********************************************************************
michael@0 1365 **********************************************************************/
michael@0 1366
michael@0 1367 PR_IMPLEMENT(PRMWaitEnumerator*) PR_CreateMWaitEnumerator(PRWaitGroup *group)
michael@0 1368 {
michael@0 1369 PRMWaitEnumerator *enumerator = PR_NEWZAP(PRMWaitEnumerator);
michael@0 1370 if (NULL == enumerator) PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
michael@0 1371 else
michael@0 1372 {
michael@0 1373 enumerator->group = group;
michael@0 1374 enumerator->seal = _PR_ENUM_SEALED;
michael@0 1375 }
michael@0 1376 return enumerator;
michael@0 1377 } /* PR_CreateMWaitEnumerator */
michael@0 1378
michael@0 1379 PR_IMPLEMENT(PRStatus) PR_DestroyMWaitEnumerator(PRMWaitEnumerator* enumerator)
michael@0 1380 {
michael@0 1381 PR_ASSERT(NULL != enumerator);
michael@0 1382 PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
michael@0 1383 if ((NULL == enumerator) || (_PR_ENUM_SEALED != enumerator->seal))
michael@0 1384 {
michael@0 1385 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
michael@0 1386 return PR_FAILURE;
michael@0 1387 }
michael@0 1388 enumerator->seal = _PR_ENUM_UNSEALED;
michael@0 1389 PR_Free(enumerator);
michael@0 1390 return PR_SUCCESS;
michael@0 1391 } /* PR_DestroyMWaitEnumerator */
michael@0 1392
michael@0 1393 PR_IMPLEMENT(PRRecvWait*) PR_EnumerateWaitGroup(
michael@0 1394 PRMWaitEnumerator *enumerator, const PRRecvWait *previous)
michael@0 1395 {
michael@0 1396 PRRecvWait *result = NULL;
michael@0 1397
michael@0 1398 /* entry point sanity checking */
michael@0 1399 PR_ASSERT(NULL != enumerator);
michael@0 1400 PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
michael@0 1401 if ((NULL == enumerator)
michael@0 1402 || (_PR_ENUM_SEALED != enumerator->seal)) goto bad_argument;
michael@0 1403
michael@0 1404 /* beginning of enumeration */
michael@0 1405 if (NULL == previous)
michael@0 1406 {
michael@0 1407 if (NULL == enumerator->group)
michael@0 1408 {
michael@0 1409 enumerator->group = mw_state->group;
michael@0 1410 if (NULL == enumerator->group)
michael@0 1411 {
michael@0 1412 PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
michael@0 1413 return NULL;
michael@0 1414 }
michael@0 1415 }
michael@0 1416 enumerator->waiter = &enumerator->group->waiter->recv_wait;
michael@0 1417 enumerator->p_timestamp = enumerator->group->p_timestamp;
michael@0 1418 enumerator->thread = PR_GetCurrentThread();
michael@0 1419 enumerator->index = 0;
michael@0 1420 }
michael@0 1421 /* continuing an enumeration */
michael@0 1422 else
michael@0 1423 {
michael@0 1424 PRThread *me = PR_GetCurrentThread();
michael@0 1425 PR_ASSERT(me == enumerator->thread);
michael@0 1426 if (me != enumerator->thread) goto bad_argument;
michael@0 1427
michael@0 1428 /* need to restart the enumeration */
michael@0 1429 if (enumerator->p_timestamp != enumerator->group->p_timestamp)
michael@0 1430 return PR_EnumerateWaitGroup(enumerator, NULL);
michael@0 1431 }
michael@0 1432
michael@0 1433 /* actually progress the enumeration */
michael@0 1434 #if defined(WINNT)
michael@0 1435 _PR_MD_LOCK(&enumerator->group->mdlock);
michael@0 1436 #else
michael@0 1437 PR_Lock(enumerator->group->ml);
michael@0 1438 #endif
michael@0 1439 while (enumerator->index++ < enumerator->group->waiter->length)
michael@0 1440 {
michael@0 1441 if (NULL != (result = *(enumerator->waiter)++)) break;
michael@0 1442 }
michael@0 1443 #if defined(WINNT)
michael@0 1444 _PR_MD_UNLOCK(&enumerator->group->mdlock);
michael@0 1445 #else
michael@0 1446 PR_Unlock(enumerator->group->ml);
michael@0 1447 #endif
michael@0 1448
michael@0 1449 return result; /* what we live for */
michael@0 1450
michael@0 1451 bad_argument:
michael@0 1452 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
michael@0 1453 return NULL; /* probably ambiguous */
michael@0 1454 } /* PR_EnumerateWaitGroup */
michael@0 1455
michael@0 1456 /* prmwait.c */

mercurial