nsprpub/pr/src/misc/prtpool.c

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

michael@0 1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
michael@0 2 /* This Source Code Form is subject to the terms of the Mozilla Public
michael@0 3 * License, v. 2.0. If a copy of the MPL was not distributed with this
michael@0 4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
michael@0 5
michael@0 6 #include "nspr.h"
michael@0 7
michael@0 8 /*
michael@0 9 * Thread pools
michael@0 10 * Thread pools create and manage threads to provide support for
michael@0 11 * scheduling jobs onto one or more threads.
michael@0 12 *
michael@0 13 */
michael@0 14 #ifdef OPT_WINNT
michael@0 15 #include <windows.h>
michael@0 16 #endif
michael@0 17
michael@0 18 /*
michael@0 19 * worker thread
michael@0 20 */
michael@0 21 typedef struct wthread {
michael@0 22 PRCList links;
michael@0 23 PRThread *thread;
michael@0 24 } wthread;
michael@0 25
michael@0 26 /*
michael@0 27 * queue of timer jobs
michael@0 28 */
michael@0 29 typedef struct timer_jobq {
michael@0 30 PRCList list;
michael@0 31 PRLock *lock;
michael@0 32 PRCondVar *cv;
michael@0 33 PRInt32 cnt;
michael@0 34 PRCList wthreads;
michael@0 35 } timer_jobq;
michael@0 36
michael@0 37 /*
michael@0 38 * queue of jobs
michael@0 39 */
michael@0 40 typedef struct tp_jobq {
michael@0 41 PRCList list;
michael@0 42 PRInt32 cnt;
michael@0 43 PRLock *lock;
michael@0 44 PRCondVar *cv;
michael@0 45 PRCList wthreads;
michael@0 46 #ifdef OPT_WINNT
michael@0 47 HANDLE nt_completion_port;
michael@0 48 #endif
michael@0 49 } tp_jobq;
michael@0 50
michael@0 51 /*
michael@0 52 * queue of IO jobs
michael@0 53 */
michael@0 54 typedef struct io_jobq {
michael@0 55 PRCList list;
michael@0 56 PRPollDesc *pollfds;
michael@0 57 PRInt32 npollfds;
michael@0 58 PRJob **polljobs;
michael@0 59 PRLock *lock;
michael@0 60 PRInt32 cnt;
michael@0 61 PRFileDesc *notify_fd;
michael@0 62 PRCList wthreads;
michael@0 63 } io_jobq;
michael@0 64
michael@0 65 /*
michael@0 66 * Threadpool
michael@0 67 */
michael@0 68 struct PRThreadPool {
michael@0 69 PRInt32 init_threads;
michael@0 70 PRInt32 max_threads;
michael@0 71 PRInt32 current_threads;
michael@0 72 PRInt32 idle_threads;
michael@0 73 PRUint32 stacksize;
michael@0 74 tp_jobq jobq;
michael@0 75 io_jobq ioq;
michael@0 76 timer_jobq timerq;
michael@0 77 PRLock *join_lock; /* used with jobp->join_cv */
michael@0 78 PRCondVar *shutdown_cv;
michael@0 79 PRBool shutdown;
michael@0 80 };
michael@0 81
michael@0 82 typedef enum io_op_type
michael@0 83 { JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type;
michael@0 84
michael@0 85 #ifdef OPT_WINNT
michael@0 86 typedef struct NT_notifier {
michael@0 87 OVERLAPPED overlapped; /* must be first */
michael@0 88 PRJob *jobp;
michael@0 89 } NT_notifier;
michael@0 90 #endif
michael@0 91
michael@0 92 struct PRJob {
michael@0 93 PRCList links; /* for linking jobs */
michael@0 94 PRBool on_ioq; /* job on ioq */
michael@0 95 PRBool on_timerq; /* job on timerq */
michael@0 96 PRJobFn job_func;
michael@0 97 void *job_arg;
michael@0 98 PRCondVar *join_cv;
michael@0 99 PRBool join_wait; /* == PR_TRUE, when waiting to join */
michael@0 100 PRCondVar *cancel_cv; /* for cancelling IO jobs */
michael@0 101 PRBool cancel_io; /* for cancelling IO jobs */
michael@0 102 PRThreadPool *tpool; /* back pointer to thread pool */
michael@0 103 PRJobIoDesc *iod;
michael@0 104 io_op_type io_op;
michael@0 105 PRInt16 io_poll_flags;
michael@0 106 PRNetAddr *netaddr;
michael@0 107 PRIntervalTime timeout; /* relative value */
michael@0 108 PRIntervalTime absolute;
michael@0 109 #ifdef OPT_WINNT
michael@0 110 NT_notifier nt_notifier;
michael@0 111 #endif
michael@0 112 };
michael@0 113
michael@0 114 #define JOB_LINKS_PTR(_qp) \
michael@0 115 ((PRJob *) ((char *) (_qp) - offsetof(PRJob, links)))
michael@0 116
michael@0 117 #define WTHREAD_LINKS_PTR(_qp) \
michael@0 118 ((wthread *) ((char *) (_qp) - offsetof(wthread, links)))
michael@0 119
michael@0 120 #define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv)
michael@0 121
michael@0 122 #define JOIN_NOTIFY(_jobp) \
michael@0 123 PR_BEGIN_MACRO \
michael@0 124 PR_Lock(_jobp->tpool->join_lock); \
michael@0 125 _jobp->join_wait = PR_FALSE; \
michael@0 126 PR_NotifyCondVar(_jobp->join_cv); \
michael@0 127 PR_Unlock(_jobp->tpool->join_lock); \
michael@0 128 PR_END_MACRO
michael@0 129
michael@0 130 #define CANCEL_IO_JOB(jobp) \
michael@0 131 PR_BEGIN_MACRO \
michael@0 132 jobp->cancel_io = PR_FALSE; \
michael@0 133 jobp->on_ioq = PR_FALSE; \
michael@0 134 PR_REMOVE_AND_INIT_LINK(&jobp->links); \
michael@0 135 tp->ioq.cnt--; \
michael@0 136 PR_NotifyCondVar(jobp->cancel_cv); \
michael@0 137 PR_END_MACRO
michael@0 138
michael@0 139 static void delete_job(PRJob *jobp);
michael@0 140 static PRThreadPool * alloc_threadpool(void);
michael@0 141 static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp);
michael@0 142 static void notify_ioq(PRThreadPool *tp);
michael@0 143 static void notify_timerq(PRThreadPool *tp);
michael@0 144
michael@0 145 /*
michael@0 146 * locks are acquired in the following order
michael@0 147 *
michael@0 148 * tp->ioq.lock,tp->timerq.lock
michael@0 149 * |
michael@0 150 * V
michael@0 151 * tp->jobq->lock
michael@0 152 */
michael@0 153
michael@0 154 /*
michael@0 155 * worker thread function
michael@0 156 */
michael@0 157 static void wstart(void *arg)
michael@0 158 {
michael@0 159 PRThreadPool *tp = (PRThreadPool *) arg;
michael@0 160 PRCList *head;
michael@0 161
michael@0 162 /*
michael@0 163 * execute jobs until shutdown
michael@0 164 */
michael@0 165 while (!tp->shutdown) {
michael@0 166 PRJob *jobp;
michael@0 167 #ifdef OPT_WINNT
michael@0 168 BOOL rv;
michael@0 169 DWORD unused, shutdown;
michael@0 170 LPOVERLAPPED olp;
michael@0 171
michael@0 172 PR_Lock(tp->jobq.lock);
michael@0 173 tp->idle_threads++;
michael@0 174 PR_Unlock(tp->jobq.lock);
michael@0 175 rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port,
michael@0 176 &unused, &shutdown, &olp, INFINITE);
michael@0 177
michael@0 178 PR_ASSERT(rv);
michael@0 179 if (shutdown)
michael@0 180 break;
michael@0 181 jobp = ((NT_notifier *) olp)->jobp;
michael@0 182 PR_Lock(tp->jobq.lock);
michael@0 183 tp->idle_threads--;
michael@0 184 tp->jobq.cnt--;
michael@0 185 PR_Unlock(tp->jobq.lock);
michael@0 186 #else
michael@0 187
michael@0 188 PR_Lock(tp->jobq.lock);
michael@0 189 while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) {
michael@0 190 tp->idle_threads++;
michael@0 191 PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT);
michael@0 192 tp->idle_threads--;
michael@0 193 }
michael@0 194 if (tp->shutdown) {
michael@0 195 PR_Unlock(tp->jobq.lock);
michael@0 196 break;
michael@0 197 }
michael@0 198 head = PR_LIST_HEAD(&tp->jobq.list);
michael@0 199 /*
michael@0 200 * remove job from queue
michael@0 201 */
michael@0 202 PR_REMOVE_AND_INIT_LINK(head);
michael@0 203 tp->jobq.cnt--;
michael@0 204 jobp = JOB_LINKS_PTR(head);
michael@0 205 PR_Unlock(tp->jobq.lock);
michael@0 206 #endif
michael@0 207
michael@0 208 jobp->job_func(jobp->job_arg);
michael@0 209 if (!JOINABLE_JOB(jobp)) {
michael@0 210 delete_job(jobp);
michael@0 211 } else {
michael@0 212 JOIN_NOTIFY(jobp);
michael@0 213 }
michael@0 214 }
michael@0 215 PR_Lock(tp->jobq.lock);
michael@0 216 tp->current_threads--;
michael@0 217 PR_Unlock(tp->jobq.lock);
michael@0 218 }
michael@0 219
michael@0 220 /*
michael@0 221 * add a job to the work queue
michael@0 222 */
michael@0 223 static void
michael@0 224 add_to_jobq(PRThreadPool *tp, PRJob *jobp)
michael@0 225 {
michael@0 226 /*
michael@0 227 * add to jobq
michael@0 228 */
michael@0 229 #ifdef OPT_WINNT
michael@0 230 PR_Lock(tp->jobq.lock);
michael@0 231 tp->jobq.cnt++;
michael@0 232 PR_Unlock(tp->jobq.lock);
michael@0 233 /*
michael@0 234 * notify worker thread(s)
michael@0 235 */
michael@0 236 PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0,
michael@0 237 FALSE, &jobp->nt_notifier.overlapped);
michael@0 238 #else
michael@0 239 PR_Lock(tp->jobq.lock);
michael@0 240 PR_APPEND_LINK(&jobp->links,&tp->jobq.list);
michael@0 241 tp->jobq.cnt++;
michael@0 242 if ((tp->idle_threads < tp->jobq.cnt) &&
michael@0 243 (tp->current_threads < tp->max_threads)) {
michael@0 244 wthread *wthrp;
michael@0 245 /*
michael@0 246 * increment thread count and unlock the jobq lock
michael@0 247 */
michael@0 248 tp->current_threads++;
michael@0 249 PR_Unlock(tp->jobq.lock);
michael@0 250 /* create new worker thread */
michael@0 251 wthrp = PR_NEWZAP(wthread);
michael@0 252 if (wthrp) {
michael@0 253 wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart,
michael@0 254 tp, PR_PRIORITY_NORMAL,
michael@0 255 PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize);
michael@0 256 if (NULL == wthrp->thread) {
michael@0 257 PR_DELETE(wthrp); /* this sets wthrp to NULL */
michael@0 258 }
michael@0 259 }
michael@0 260 PR_Lock(tp->jobq.lock);
michael@0 261 if (NULL == wthrp) {
michael@0 262 tp->current_threads--;
michael@0 263 } else {
michael@0 264 PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
michael@0 265 }
michael@0 266 }
michael@0 267 /*
michael@0 268 * wakeup a worker thread
michael@0 269 */
michael@0 270 PR_NotifyCondVar(tp->jobq.cv);
michael@0 271 PR_Unlock(tp->jobq.lock);
michael@0 272 #endif
michael@0 273 }
michael@0 274
michael@0 275 /*
michael@0 276 * io worker thread function
michael@0 277 */
michael@0 278 static void io_wstart(void *arg)
michael@0 279 {
michael@0 280 PRThreadPool *tp = (PRThreadPool *) arg;
michael@0 281 int pollfd_cnt, pollfds_used;
michael@0 282 int rv;
michael@0 283 PRCList *qp, *nextqp;
michael@0 284 PRPollDesc *pollfds;
michael@0 285 PRJob **polljobs;
michael@0 286 int poll_timeout;
michael@0 287 PRIntervalTime now;
michael@0 288
michael@0 289 /*
michael@0 290 * scan io_jobq
michael@0 291 * construct poll list
michael@0 292 * call PR_Poll
michael@0 293 * for all fds, for which poll returns true, move the job to
michael@0 294 * jobq and wakeup worker thread.
michael@0 295 */
michael@0 296 while (!tp->shutdown) {
michael@0 297 PRJob *jobp;
michael@0 298
michael@0 299 pollfd_cnt = tp->ioq.cnt + 10;
michael@0 300 if (pollfd_cnt > tp->ioq.npollfds) {
michael@0 301
michael@0 302 /*
michael@0 303 * re-allocate pollfd array if the current one is not large
michael@0 304 * enough
michael@0 305 */
michael@0 306 if (NULL != tp->ioq.pollfds)
michael@0 307 PR_Free(tp->ioq.pollfds);
michael@0 308 tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt *
michael@0 309 (sizeof(PRPollDesc) + sizeof(PRJob *)));
michael@0 310 PR_ASSERT(NULL != tp->ioq.pollfds);
michael@0 311 /*
michael@0 312 * array of pollfds
michael@0 313 */
michael@0 314 pollfds = tp->ioq.pollfds;
michael@0 315 tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]);
michael@0 316 /*
michael@0 317 * parallel array of jobs
michael@0 318 */
michael@0 319 polljobs = tp->ioq.polljobs;
michael@0 320 tp->ioq.npollfds = pollfd_cnt;
michael@0 321 }
michael@0 322
michael@0 323 pollfds_used = 0;
michael@0 324 /*
michael@0 325 * add the notify fd; used for unblocking io thread(s)
michael@0 326 */
michael@0 327 pollfds[pollfds_used].fd = tp->ioq.notify_fd;
michael@0 328 pollfds[pollfds_used].in_flags = PR_POLL_READ;
michael@0 329 pollfds[pollfds_used].out_flags = 0;
michael@0 330 polljobs[pollfds_used] = NULL;
michael@0 331 pollfds_used++;
michael@0 332 /*
michael@0 333 * fill in the pollfd array
michael@0 334 */
michael@0 335 PR_Lock(tp->ioq.lock);
michael@0 336 for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
michael@0 337 nextqp = qp->next;
michael@0 338 jobp = JOB_LINKS_PTR(qp);
michael@0 339 if (jobp->cancel_io) {
michael@0 340 CANCEL_IO_JOB(jobp);
michael@0 341 continue;
michael@0 342 }
michael@0 343 if (pollfds_used == (pollfd_cnt))
michael@0 344 break;
michael@0 345 pollfds[pollfds_used].fd = jobp->iod->socket;
michael@0 346 pollfds[pollfds_used].in_flags = jobp->io_poll_flags;
michael@0 347 pollfds[pollfds_used].out_flags = 0;
michael@0 348 polljobs[pollfds_used] = jobp;
michael@0 349
michael@0 350 pollfds_used++;
michael@0 351 }
michael@0 352 if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) {
michael@0 353 qp = tp->ioq.list.next;
michael@0 354 jobp = JOB_LINKS_PTR(qp);
michael@0 355 if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout)
michael@0 356 poll_timeout = PR_INTERVAL_NO_TIMEOUT;
michael@0 357 else if (PR_INTERVAL_NO_WAIT == jobp->timeout)
michael@0 358 poll_timeout = PR_INTERVAL_NO_WAIT;
michael@0 359 else {
michael@0 360 poll_timeout = jobp->absolute - PR_IntervalNow();
michael@0 361 if (poll_timeout <= 0) /* already timed out */
michael@0 362 poll_timeout = PR_INTERVAL_NO_WAIT;
michael@0 363 }
michael@0 364 } else {
michael@0 365 poll_timeout = PR_INTERVAL_NO_TIMEOUT;
michael@0 366 }
michael@0 367 PR_Unlock(tp->ioq.lock);
michael@0 368
michael@0 369 /*
michael@0 370 * XXXX
michael@0 371 * should retry if more jobs have been added to the queue?
michael@0 372 *
michael@0 373 */
michael@0 374 PR_ASSERT(pollfds_used <= pollfd_cnt);
michael@0 375 rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout);
michael@0 376
michael@0 377 if (tp->shutdown) {
michael@0 378 break;
michael@0 379 }
michael@0 380
michael@0 381 if (rv > 0) {
michael@0 382 /*
michael@0 383 * at least one io event is set
michael@0 384 */
michael@0 385 PRStatus rval_status;
michael@0 386 PRInt32 index;
michael@0 387
michael@0 388 PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd);
michael@0 389 /*
michael@0 390 * reset the pollable event, if notified
michael@0 391 */
michael@0 392 if (pollfds[0].out_flags & PR_POLL_READ) {
michael@0 393 rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd);
michael@0 394 PR_ASSERT(PR_SUCCESS == rval_status);
michael@0 395 }
michael@0 396
michael@0 397 for(index = 1; index < (pollfds_used); index++) {
michael@0 398 PRInt16 events = pollfds[index].in_flags;
michael@0 399 PRInt16 revents = pollfds[index].out_flags;
michael@0 400 jobp = polljobs[index];
michael@0 401
michael@0 402 if ((revents & PR_POLL_NVAL) || /* busted in all cases */
michael@0 403 (revents & PR_POLL_ERR) ||
michael@0 404 ((events & PR_POLL_WRITE) &&
michael@0 405 (revents & PR_POLL_HUP))) { /* write op & hup */
michael@0 406 PR_Lock(tp->ioq.lock);
michael@0 407 if (jobp->cancel_io) {
michael@0 408 CANCEL_IO_JOB(jobp);
michael@0 409 PR_Unlock(tp->ioq.lock);
michael@0 410 continue;
michael@0 411 }
michael@0 412 PR_REMOVE_AND_INIT_LINK(&jobp->links);
michael@0 413 tp->ioq.cnt--;
michael@0 414 jobp->on_ioq = PR_FALSE;
michael@0 415 PR_Unlock(tp->ioq.lock);
michael@0 416
michael@0 417 /* set error */
michael@0 418 if (PR_POLL_NVAL & revents)
michael@0 419 jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR;
michael@0 420 else if (PR_POLL_HUP & revents)
michael@0 421 jobp->iod->error = PR_CONNECT_RESET_ERROR;
michael@0 422 else
michael@0 423 jobp->iod->error = PR_IO_ERROR;
michael@0 424
michael@0 425 /*
michael@0 426 * add to jobq
michael@0 427 */
michael@0 428 add_to_jobq(tp, jobp);
michael@0 429 } else if (revents) {
michael@0 430 /*
michael@0 431 * add to jobq
michael@0 432 */
michael@0 433 PR_Lock(tp->ioq.lock);
michael@0 434 if (jobp->cancel_io) {
michael@0 435 CANCEL_IO_JOB(jobp);
michael@0 436 PR_Unlock(tp->ioq.lock);
michael@0 437 continue;
michael@0 438 }
michael@0 439 PR_REMOVE_AND_INIT_LINK(&jobp->links);
michael@0 440 tp->ioq.cnt--;
michael@0 441 jobp->on_ioq = PR_FALSE;
michael@0 442 PR_Unlock(tp->ioq.lock);
michael@0 443
michael@0 444 if (jobp->io_op == JOB_IO_CONNECT) {
michael@0 445 if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS)
michael@0 446 jobp->iod->error = 0;
michael@0 447 else
michael@0 448 jobp->iod->error = PR_GetError();
michael@0 449 } else
michael@0 450 jobp->iod->error = 0;
michael@0 451
michael@0 452 add_to_jobq(tp, jobp);
michael@0 453 }
michael@0 454 }
michael@0 455 }
michael@0 456 /*
michael@0 457 * timeout processing
michael@0 458 */
michael@0 459 now = PR_IntervalNow();
michael@0 460 PR_Lock(tp->ioq.lock);
michael@0 461 for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
michael@0 462 nextqp = qp->next;
michael@0 463 jobp = JOB_LINKS_PTR(qp);
michael@0 464 if (jobp->cancel_io) {
michael@0 465 CANCEL_IO_JOB(jobp);
michael@0 466 continue;
michael@0 467 }
michael@0 468 if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout)
michael@0 469 break;
michael@0 470 if ((PR_INTERVAL_NO_WAIT != jobp->timeout) &&
michael@0 471 ((PRInt32)(jobp->absolute - now) > 0))
michael@0 472 break;
michael@0 473 PR_REMOVE_AND_INIT_LINK(&jobp->links);
michael@0 474 tp->ioq.cnt--;
michael@0 475 jobp->on_ioq = PR_FALSE;
michael@0 476 jobp->iod->error = PR_IO_TIMEOUT_ERROR;
michael@0 477 add_to_jobq(tp, jobp);
michael@0 478 }
michael@0 479 PR_Unlock(tp->ioq.lock);
michael@0 480 }
michael@0 481 }
michael@0 482
michael@0 483 /*
michael@0 484 * timer worker thread function
michael@0 485 */
michael@0 486 static void timer_wstart(void *arg)
michael@0 487 {
michael@0 488 PRThreadPool *tp = (PRThreadPool *) arg;
michael@0 489 PRCList *qp;
michael@0 490 PRIntervalTime timeout;
michael@0 491 PRIntervalTime now;
michael@0 492
michael@0 493 /*
michael@0 494 * call PR_WaitCondVar with minimum value of all timeouts
michael@0 495 */
michael@0 496 while (!tp->shutdown) {
michael@0 497 PRJob *jobp;
michael@0 498
michael@0 499 PR_Lock(tp->timerq.lock);
michael@0 500 if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
michael@0 501 timeout = PR_INTERVAL_NO_TIMEOUT;
michael@0 502 } else {
michael@0 503 PRCList *qp;
michael@0 504
michael@0 505 qp = tp->timerq.list.next;
michael@0 506 jobp = JOB_LINKS_PTR(qp);
michael@0 507
michael@0 508 timeout = jobp->absolute - PR_IntervalNow();
michael@0 509 if (timeout <= 0)
michael@0 510 timeout = PR_INTERVAL_NO_WAIT; /* already timed out */
michael@0 511 }
michael@0 512 if (PR_INTERVAL_NO_WAIT != timeout)
michael@0 513 PR_WaitCondVar(tp->timerq.cv, timeout);
michael@0 514 if (tp->shutdown) {
michael@0 515 PR_Unlock(tp->timerq.lock);
michael@0 516 break;
michael@0 517 }
michael@0 518 /*
michael@0 519 * move expired-timer jobs to jobq
michael@0 520 */
michael@0 521 now = PR_IntervalNow();
michael@0 522 while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
michael@0 523 qp = tp->timerq.list.next;
michael@0 524 jobp = JOB_LINKS_PTR(qp);
michael@0 525
michael@0 526 if ((PRInt32)(jobp->absolute - now) > 0) {
michael@0 527 break;
michael@0 528 }
michael@0 529 /*
michael@0 530 * job timed out
michael@0 531 */
michael@0 532 PR_REMOVE_AND_INIT_LINK(&jobp->links);
michael@0 533 tp->timerq.cnt--;
michael@0 534 jobp->on_timerq = PR_FALSE;
michael@0 535 add_to_jobq(tp, jobp);
michael@0 536 }
michael@0 537 PR_Unlock(tp->timerq.lock);
michael@0 538 }
michael@0 539 }
michael@0 540
michael@0 541 static void
michael@0 542 delete_threadpool(PRThreadPool *tp)
michael@0 543 {
michael@0 544 if (NULL != tp) {
michael@0 545 if (NULL != tp->shutdown_cv)
michael@0 546 PR_DestroyCondVar(tp->shutdown_cv);
michael@0 547 if (NULL != tp->jobq.cv)
michael@0 548 PR_DestroyCondVar(tp->jobq.cv);
michael@0 549 if (NULL != tp->jobq.lock)
michael@0 550 PR_DestroyLock(tp->jobq.lock);
michael@0 551 if (NULL != tp->join_lock)
michael@0 552 PR_DestroyLock(tp->join_lock);
michael@0 553 #ifdef OPT_WINNT
michael@0 554 if (NULL != tp->jobq.nt_completion_port)
michael@0 555 CloseHandle(tp->jobq.nt_completion_port);
michael@0 556 #endif
michael@0 557 /* Timer queue */
michael@0 558 if (NULL != tp->timerq.cv)
michael@0 559 PR_DestroyCondVar(tp->timerq.cv);
michael@0 560 if (NULL != tp->timerq.lock)
michael@0 561 PR_DestroyLock(tp->timerq.lock);
michael@0 562
michael@0 563 if (NULL != tp->ioq.lock)
michael@0 564 PR_DestroyLock(tp->ioq.lock);
michael@0 565 if (NULL != tp->ioq.pollfds)
michael@0 566 PR_Free(tp->ioq.pollfds);
michael@0 567 if (NULL != tp->ioq.notify_fd)
michael@0 568 PR_DestroyPollableEvent(tp->ioq.notify_fd);
michael@0 569 PR_Free(tp);
michael@0 570 }
michael@0 571 return;
michael@0 572 }
michael@0 573
michael@0 574 static PRThreadPool *
michael@0 575 alloc_threadpool(void)
michael@0 576 {
michael@0 577 PRThreadPool *tp;
michael@0 578
michael@0 579 tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp));
michael@0 580 if (NULL == tp)
michael@0 581 goto failed;
michael@0 582 tp->jobq.lock = PR_NewLock();
michael@0 583 if (NULL == tp->jobq.lock)
michael@0 584 goto failed;
michael@0 585 tp->jobq.cv = PR_NewCondVar(tp->jobq.lock);
michael@0 586 if (NULL == tp->jobq.cv)
michael@0 587 goto failed;
michael@0 588 tp->join_lock = PR_NewLock();
michael@0 589 if (NULL == tp->join_lock)
michael@0 590 goto failed;
michael@0 591 #ifdef OPT_WINNT
michael@0 592 tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
michael@0 593 NULL, 0, 0);
michael@0 594 if (NULL == tp->jobq.nt_completion_port)
michael@0 595 goto failed;
michael@0 596 #endif
michael@0 597
michael@0 598 tp->ioq.lock = PR_NewLock();
michael@0 599 if (NULL == tp->ioq.lock)
michael@0 600 goto failed;
michael@0 601
michael@0 602 /* Timer queue */
michael@0 603
michael@0 604 tp->timerq.lock = PR_NewLock();
michael@0 605 if (NULL == tp->timerq.lock)
michael@0 606 goto failed;
michael@0 607 tp->timerq.cv = PR_NewCondVar(tp->timerq.lock);
michael@0 608 if (NULL == tp->timerq.cv)
michael@0 609 goto failed;
michael@0 610
michael@0 611 tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock);
michael@0 612 if (NULL == tp->shutdown_cv)
michael@0 613 goto failed;
michael@0 614 tp->ioq.notify_fd = PR_NewPollableEvent();
michael@0 615 if (NULL == tp->ioq.notify_fd)
michael@0 616 goto failed;
michael@0 617 return tp;
michael@0 618 failed:
michael@0 619 delete_threadpool(tp);
michael@0 620 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
michael@0 621 return NULL;
michael@0 622 }
michael@0 623
michael@0 624 /* Create thread pool */
michael@0 625 PR_IMPLEMENT(PRThreadPool *)
michael@0 626 PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads,
michael@0 627 PRUint32 stacksize)
michael@0 628 {
michael@0 629 PRThreadPool *tp;
michael@0 630 PRThread *thr;
michael@0 631 int i;
michael@0 632 wthread *wthrp;
michael@0 633
michael@0 634 tp = alloc_threadpool();
michael@0 635 if (NULL == tp)
michael@0 636 return NULL;
michael@0 637
michael@0 638 tp->init_threads = initial_threads;
michael@0 639 tp->max_threads = max_threads;
michael@0 640 tp->stacksize = stacksize;
michael@0 641 PR_INIT_CLIST(&tp->jobq.list);
michael@0 642 PR_INIT_CLIST(&tp->ioq.list);
michael@0 643 PR_INIT_CLIST(&tp->timerq.list);
michael@0 644 PR_INIT_CLIST(&tp->jobq.wthreads);
michael@0 645 PR_INIT_CLIST(&tp->ioq.wthreads);
michael@0 646 PR_INIT_CLIST(&tp->timerq.wthreads);
michael@0 647 tp->shutdown = PR_FALSE;
michael@0 648
michael@0 649 PR_Lock(tp->jobq.lock);
michael@0 650 for(i=0; i < initial_threads; ++i) {
michael@0 651
michael@0 652 thr = PR_CreateThread(PR_USER_THREAD, wstart,
michael@0 653 tp, PR_PRIORITY_NORMAL,
michael@0 654 PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize);
michael@0 655 PR_ASSERT(thr);
michael@0 656 wthrp = PR_NEWZAP(wthread);
michael@0 657 PR_ASSERT(wthrp);
michael@0 658 wthrp->thread = thr;
michael@0 659 PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
michael@0 660 }
michael@0 661 tp->current_threads = initial_threads;
michael@0 662
michael@0 663 thr = PR_CreateThread(PR_USER_THREAD, io_wstart,
michael@0 664 tp, PR_PRIORITY_NORMAL,
michael@0 665 PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
michael@0 666 PR_ASSERT(thr);
michael@0 667 wthrp = PR_NEWZAP(wthread);
michael@0 668 PR_ASSERT(wthrp);
michael@0 669 wthrp->thread = thr;
michael@0 670 PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads);
michael@0 671
michael@0 672 thr = PR_CreateThread(PR_USER_THREAD, timer_wstart,
michael@0 673 tp, PR_PRIORITY_NORMAL,
michael@0 674 PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
michael@0 675 PR_ASSERT(thr);
michael@0 676 wthrp = PR_NEWZAP(wthread);
michael@0 677 PR_ASSERT(wthrp);
michael@0 678 wthrp->thread = thr;
michael@0 679 PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads);
michael@0 680
michael@0 681 PR_Unlock(tp->jobq.lock);
michael@0 682 return tp;
michael@0 683 }
michael@0 684
michael@0 685 static void
michael@0 686 delete_job(PRJob *jobp)
michael@0 687 {
michael@0 688 if (NULL != jobp) {
michael@0 689 if (NULL != jobp->join_cv) {
michael@0 690 PR_DestroyCondVar(jobp->join_cv);
michael@0 691 jobp->join_cv = NULL;
michael@0 692 }
michael@0 693 if (NULL != jobp->cancel_cv) {
michael@0 694 PR_DestroyCondVar(jobp->cancel_cv);
michael@0 695 jobp->cancel_cv = NULL;
michael@0 696 }
michael@0 697 PR_DELETE(jobp);
michael@0 698 }
michael@0 699 }
michael@0 700
michael@0 701 static PRJob *
michael@0 702 alloc_job(PRBool joinable, PRThreadPool *tp)
michael@0 703 {
michael@0 704 PRJob *jobp;
michael@0 705
michael@0 706 jobp = PR_NEWZAP(PRJob);
michael@0 707 if (NULL == jobp)
michael@0 708 goto failed;
michael@0 709 if (joinable) {
michael@0 710 jobp->join_cv = PR_NewCondVar(tp->join_lock);
michael@0 711 jobp->join_wait = PR_TRUE;
michael@0 712 if (NULL == jobp->join_cv)
michael@0 713 goto failed;
michael@0 714 } else {
michael@0 715 jobp->join_cv = NULL;
michael@0 716 }
michael@0 717 #ifdef OPT_WINNT
michael@0 718 jobp->nt_notifier.jobp = jobp;
michael@0 719 #endif
michael@0 720 return jobp;
michael@0 721 failed:
michael@0 722 delete_job(jobp);
michael@0 723 PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
michael@0 724 return NULL;
michael@0 725 }
michael@0 726
michael@0 727 /* queue a job */
michael@0 728 PR_IMPLEMENT(PRJob *)
michael@0 729 PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable)
michael@0 730 {
michael@0 731 PRJob *jobp;
michael@0 732
michael@0 733 jobp = alloc_job(joinable, tpool);
michael@0 734 if (NULL == jobp)
michael@0 735 return NULL;
michael@0 736
michael@0 737 jobp->job_func = fn;
michael@0 738 jobp->job_arg = arg;
michael@0 739 jobp->tpool = tpool;
michael@0 740
michael@0 741 add_to_jobq(tpool, jobp);
michael@0 742 return jobp;
michael@0 743 }
michael@0 744
michael@0 745 /* queue a job, when a socket is readable or writeable */
michael@0 746 static PRJob *
michael@0 747 queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
michael@0 748 PRBool joinable, io_op_type op)
michael@0 749 {
michael@0 750 PRJob *jobp;
michael@0 751 PRIntervalTime now;
michael@0 752
michael@0 753 jobp = alloc_job(joinable, tpool);
michael@0 754 if (NULL == jobp) {
michael@0 755 return NULL;
michael@0 756 }
michael@0 757
michael@0 758 /*
michael@0 759 * Add a new job to io_jobq
michael@0 760 * wakeup io worker thread
michael@0 761 */
michael@0 762
michael@0 763 jobp->job_func = fn;
michael@0 764 jobp->job_arg = arg;
michael@0 765 jobp->tpool = tpool;
michael@0 766 jobp->iod = iod;
michael@0 767 if (JOB_IO_READ == op) {
michael@0 768 jobp->io_op = JOB_IO_READ;
michael@0 769 jobp->io_poll_flags = PR_POLL_READ;
michael@0 770 } else if (JOB_IO_WRITE == op) {
michael@0 771 jobp->io_op = JOB_IO_WRITE;
michael@0 772 jobp->io_poll_flags = PR_POLL_WRITE;
michael@0 773 } else if (JOB_IO_ACCEPT == op) {
michael@0 774 jobp->io_op = JOB_IO_ACCEPT;
michael@0 775 jobp->io_poll_flags = PR_POLL_READ;
michael@0 776 } else if (JOB_IO_CONNECT == op) {
michael@0 777 jobp->io_op = JOB_IO_CONNECT;
michael@0 778 jobp->io_poll_flags = PR_POLL_WRITE|PR_POLL_EXCEPT;
michael@0 779 } else {
michael@0 780 delete_job(jobp);
michael@0 781 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
michael@0 782 return NULL;
michael@0 783 }
michael@0 784
michael@0 785 jobp->timeout = iod->timeout;
michael@0 786 if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) ||
michael@0 787 (PR_INTERVAL_NO_WAIT == iod->timeout)) {
michael@0 788 jobp->absolute = iod->timeout;
michael@0 789 } else {
michael@0 790 now = PR_IntervalNow();
michael@0 791 jobp->absolute = now + iod->timeout;
michael@0 792 }
michael@0 793
michael@0 794
michael@0 795 PR_Lock(tpool->ioq.lock);
michael@0 796
michael@0 797 if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) ||
michael@0 798 (PR_INTERVAL_NO_TIMEOUT == iod->timeout)) {
michael@0 799 PR_APPEND_LINK(&jobp->links,&tpool->ioq.list);
michael@0 800 } else if (PR_INTERVAL_NO_WAIT == iod->timeout) {
michael@0 801 PR_INSERT_LINK(&jobp->links,&tpool->ioq.list);
michael@0 802 } else {
michael@0 803 PRCList *qp;
michael@0 804 PRJob *tmp_jobp;
michael@0 805 /*
michael@0 806 * insert into the timeout-sorted ioq
michael@0 807 */
michael@0 808 for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list;
michael@0 809 qp = qp->prev) {
michael@0 810 tmp_jobp = JOB_LINKS_PTR(qp);
michael@0 811 if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
michael@0 812 break;
michael@0 813 }
michael@0 814 }
michael@0 815 PR_INSERT_AFTER(&jobp->links,qp);
michael@0 816 }
michael@0 817
michael@0 818 jobp->on_ioq = PR_TRUE;
michael@0 819 tpool->ioq.cnt++;
michael@0 820 /*
michael@0 821 * notify io worker thread(s)
michael@0 822 */
michael@0 823 PR_Unlock(tpool->ioq.lock);
michael@0 824 notify_ioq(tpool);
michael@0 825 return jobp;
michael@0 826 }
michael@0 827
michael@0 828 /* queue a job, when a socket is readable */
michael@0 829 PR_IMPLEMENT(PRJob *)
michael@0 830 PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
michael@0 831 PRBool joinable)
michael@0 832 {
michael@0 833 return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ));
michael@0 834 }
michael@0 835
michael@0 836 /* queue a job, when a socket is writeable */
michael@0 837 PR_IMPLEMENT(PRJob *)
michael@0 838 PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,void * arg,
michael@0 839 PRBool joinable)
michael@0 840 {
michael@0 841 return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE));
michael@0 842 }
michael@0 843
michael@0 844
michael@0 845 /* queue a job, when a socket has a pending connection */
michael@0 846 PR_IMPLEMENT(PRJob *)
michael@0 847 PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,
michael@0 848 void * arg, PRBool joinable)
michael@0 849 {
michael@0 850 return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT));
michael@0 851 }
michael@0 852
michael@0 853 /* queue a job, when a socket can be connected */
michael@0 854 PR_IMPLEMENT(PRJob *)
michael@0 855 PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod,
michael@0 856 const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable)
michael@0 857 {
michael@0 858 PRStatus rv;
michael@0 859 PRErrorCode err;
michael@0 860
michael@0 861 rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT);
michael@0 862 if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)){
michael@0 863 /* connection pending */
michael@0 864 return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT));
michael@0 865 } else {
michael@0 866 /*
michael@0 867 * connection succeeded or failed; add to jobq right away
michael@0 868 */
michael@0 869 if (rv == PR_FAILURE)
michael@0 870 iod->error = err;
michael@0 871 else
michael@0 872 iod->error = 0;
michael@0 873 return(PR_QueueJob(tpool, fn, arg, joinable));
michael@0 874 }
michael@0 875 }
michael@0 876
michael@0 877 /* queue a job, when a timer expires */
michael@0 878 PR_IMPLEMENT(PRJob *)
michael@0 879 PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout,
michael@0 880 PRJobFn fn, void * arg, PRBool joinable)
michael@0 881 {
michael@0 882 PRIntervalTime now;
michael@0 883 PRJob *jobp;
michael@0 884
michael@0 885 if (PR_INTERVAL_NO_TIMEOUT == timeout) {
michael@0 886 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
michael@0 887 return NULL;
michael@0 888 }
michael@0 889 if (PR_INTERVAL_NO_WAIT == timeout) {
michael@0 890 /*
michael@0 891 * no waiting; add to jobq right away
michael@0 892 */
michael@0 893 return(PR_QueueJob(tpool, fn, arg, joinable));
michael@0 894 }
michael@0 895 jobp = alloc_job(joinable, tpool);
michael@0 896 if (NULL == jobp) {
michael@0 897 return NULL;
michael@0 898 }
michael@0 899
michael@0 900 /*
michael@0 901 * Add a new job to timer_jobq
michael@0 902 * wakeup timer worker thread
michael@0 903 */
michael@0 904
michael@0 905 jobp->job_func = fn;
michael@0 906 jobp->job_arg = arg;
michael@0 907 jobp->tpool = tpool;
michael@0 908 jobp->timeout = timeout;
michael@0 909
michael@0 910 now = PR_IntervalNow();
michael@0 911 jobp->absolute = now + timeout;
michael@0 912
michael@0 913
michael@0 914 PR_Lock(tpool->timerq.lock);
michael@0 915 jobp->on_timerq = PR_TRUE;
michael@0 916 if (PR_CLIST_IS_EMPTY(&tpool->timerq.list))
michael@0 917 PR_APPEND_LINK(&jobp->links,&tpool->timerq.list);
michael@0 918 else {
michael@0 919 PRCList *qp;
michael@0 920 PRJob *tmp_jobp;
michael@0 921 /*
michael@0 922 * insert into the sorted timer jobq
michael@0 923 */
michael@0 924 for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list;
michael@0 925 qp = qp->prev) {
michael@0 926 tmp_jobp = JOB_LINKS_PTR(qp);
michael@0 927 if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
michael@0 928 break;
michael@0 929 }
michael@0 930 }
michael@0 931 PR_INSERT_AFTER(&jobp->links,qp);
michael@0 932 }
michael@0 933 tpool->timerq.cnt++;
michael@0 934 /*
michael@0 935 * notify timer worker thread(s)
michael@0 936 */
michael@0 937 notify_timerq(tpool);
michael@0 938 PR_Unlock(tpool->timerq.lock);
michael@0 939 return jobp;
michael@0 940 }
michael@0 941
michael@0 942 static void
michael@0 943 notify_timerq(PRThreadPool *tp)
michael@0 944 {
michael@0 945 /*
michael@0 946 * wakeup the timer thread(s)
michael@0 947 */
michael@0 948 PR_NotifyCondVar(tp->timerq.cv);
michael@0 949 }
michael@0 950
michael@0 951 static void
michael@0 952 notify_ioq(PRThreadPool *tp)
michael@0 953 {
michael@0 954 PRStatus rval_status;
michael@0 955
michael@0 956 /*
michael@0 957 * wakeup the io thread(s)
michael@0 958 */
michael@0 959 rval_status = PR_SetPollableEvent(tp->ioq.notify_fd);
michael@0 960 PR_ASSERT(PR_SUCCESS == rval_status);
michael@0 961 }
michael@0 962
michael@0 963 /*
michael@0 964 * cancel a job
michael@0 965 *
michael@0 966 * XXXX: is this needed? likely to be removed
michael@0 967 */
michael@0 968 PR_IMPLEMENT(PRStatus)
michael@0 969 PR_CancelJob(PRJob *jobp) {
michael@0 970
michael@0 971 PRStatus rval = PR_FAILURE;
michael@0 972 PRThreadPool *tp;
michael@0 973
michael@0 974 if (jobp->on_timerq) {
michael@0 975 /*
michael@0 976 * now, check again while holding the timerq lock
michael@0 977 */
michael@0 978 tp = jobp->tpool;
michael@0 979 PR_Lock(tp->timerq.lock);
michael@0 980 if (jobp->on_timerq) {
michael@0 981 jobp->on_timerq = PR_FALSE;
michael@0 982 PR_REMOVE_AND_INIT_LINK(&jobp->links);
michael@0 983 tp->timerq.cnt--;
michael@0 984 PR_Unlock(tp->timerq.lock);
michael@0 985 if (!JOINABLE_JOB(jobp)) {
michael@0 986 delete_job(jobp);
michael@0 987 } else {
michael@0 988 JOIN_NOTIFY(jobp);
michael@0 989 }
michael@0 990 rval = PR_SUCCESS;
michael@0 991 } else
michael@0 992 PR_Unlock(tp->timerq.lock);
michael@0 993 } else if (jobp->on_ioq) {
michael@0 994 /*
michael@0 995 * now, check again while holding the ioq lock
michael@0 996 */
michael@0 997 tp = jobp->tpool;
michael@0 998 PR_Lock(tp->ioq.lock);
michael@0 999 if (jobp->on_ioq) {
michael@0 1000 jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock);
michael@0 1001 if (NULL == jobp->cancel_cv) {
michael@0 1002 PR_Unlock(tp->ioq.lock);
michael@0 1003 PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0);
michael@0 1004 return PR_FAILURE;
michael@0 1005 }
michael@0 1006 /*
michael@0 1007 * mark job 'cancelled' and notify io thread(s)
michael@0 1008 * XXXX:
michael@0 1009 * this assumes there is only one io thread; when there
michael@0 1010 * are multiple threads, the io thread processing this job
michael@0 1011 * must be notified.
michael@0 1012 */
michael@0 1013 jobp->cancel_io = PR_TRUE;
michael@0 1014 PR_Unlock(tp->ioq.lock); /* release, reacquire ioq lock */
michael@0 1015 notify_ioq(tp);
michael@0 1016 PR_Lock(tp->ioq.lock);
michael@0 1017 while (jobp->cancel_io)
michael@0 1018 PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT);
michael@0 1019 PR_Unlock(tp->ioq.lock);
michael@0 1020 PR_ASSERT(!jobp->on_ioq);
michael@0 1021 if (!JOINABLE_JOB(jobp)) {
michael@0 1022 delete_job(jobp);
michael@0 1023 } else {
michael@0 1024 JOIN_NOTIFY(jobp);
michael@0 1025 }
michael@0 1026 rval = PR_SUCCESS;
michael@0 1027 } else
michael@0 1028 PR_Unlock(tp->ioq.lock);
michael@0 1029 }
michael@0 1030 if (PR_FAILURE == rval)
michael@0 1031 PR_SetError(PR_INVALID_STATE_ERROR, 0);
michael@0 1032 return rval;
michael@0 1033 }
michael@0 1034
michael@0 1035 /* join a job, wait until completion */
michael@0 1036 PR_IMPLEMENT(PRStatus)
michael@0 1037 PR_JoinJob(PRJob *jobp)
michael@0 1038 {
michael@0 1039 if (!JOINABLE_JOB(jobp)) {
michael@0 1040 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
michael@0 1041 return PR_FAILURE;
michael@0 1042 }
michael@0 1043 PR_Lock(jobp->tpool->join_lock);
michael@0 1044 while(jobp->join_wait)
michael@0 1045 PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT);
michael@0 1046 PR_Unlock(jobp->tpool->join_lock);
michael@0 1047 delete_job(jobp);
michael@0 1048 return PR_SUCCESS;
michael@0 1049 }
michael@0 1050
michael@0 1051 /* shutdown threadpool */
michael@0 1052 PR_IMPLEMENT(PRStatus)
michael@0 1053 PR_ShutdownThreadPool(PRThreadPool *tpool)
michael@0 1054 {
michael@0 1055 PRStatus rval = PR_SUCCESS;
michael@0 1056
michael@0 1057 PR_Lock(tpool->jobq.lock);
michael@0 1058 tpool->shutdown = PR_TRUE;
michael@0 1059 PR_NotifyAllCondVar(tpool->shutdown_cv);
michael@0 1060 PR_Unlock(tpool->jobq.lock);
michael@0 1061
michael@0 1062 return rval;
michael@0 1063 }
michael@0 1064
michael@0 1065 /*
michael@0 1066 * join thread pool
michael@0 1067 * wait for termination of worker threads
michael@0 1068 * reclaim threadpool resources
michael@0 1069 */
michael@0 1070 PR_IMPLEMENT(PRStatus)
michael@0 1071 PR_JoinThreadPool(PRThreadPool *tpool)
michael@0 1072 {
michael@0 1073 PRStatus rval = PR_SUCCESS;
michael@0 1074 PRCList *head;
michael@0 1075 PRStatus rval_status;
michael@0 1076
michael@0 1077 PR_Lock(tpool->jobq.lock);
michael@0 1078 while (!tpool->shutdown)
michael@0 1079 PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT);
michael@0 1080
michael@0 1081 /*
michael@0 1082 * wakeup worker threads
michael@0 1083 */
michael@0 1084 #ifdef OPT_WINNT
michael@0 1085 /*
michael@0 1086 * post shutdown notification for all threads
michael@0 1087 */
michael@0 1088 {
michael@0 1089 int i;
michael@0 1090 for(i=0; i < tpool->current_threads; i++) {
michael@0 1091 PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0,
michael@0 1092 TRUE, NULL);
michael@0 1093 }
michael@0 1094 }
michael@0 1095 #else
michael@0 1096 PR_NotifyAllCondVar(tpool->jobq.cv);
michael@0 1097 #endif
michael@0 1098
michael@0 1099 /*
michael@0 1100 * wakeup io thread(s)
michael@0 1101 */
michael@0 1102 notify_ioq(tpool);
michael@0 1103
michael@0 1104 /*
michael@0 1105 * wakeup timer thread(s)
michael@0 1106 */
michael@0 1107 PR_Lock(tpool->timerq.lock);
michael@0 1108 notify_timerq(tpool);
michael@0 1109 PR_Unlock(tpool->timerq.lock);
michael@0 1110
michael@0 1111 while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) {
michael@0 1112 wthread *wthrp;
michael@0 1113
michael@0 1114 head = PR_LIST_HEAD(&tpool->jobq.wthreads);
michael@0 1115 PR_REMOVE_AND_INIT_LINK(head);
michael@0 1116 PR_Unlock(tpool->jobq.lock);
michael@0 1117 wthrp = WTHREAD_LINKS_PTR(head);
michael@0 1118 rval_status = PR_JoinThread(wthrp->thread);
michael@0 1119 PR_ASSERT(PR_SUCCESS == rval_status);
michael@0 1120 PR_DELETE(wthrp);
michael@0 1121 PR_Lock(tpool->jobq.lock);
michael@0 1122 }
michael@0 1123 PR_Unlock(tpool->jobq.lock);
michael@0 1124 while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) {
michael@0 1125 wthread *wthrp;
michael@0 1126
michael@0 1127 head = PR_LIST_HEAD(&tpool->ioq.wthreads);
michael@0 1128 PR_REMOVE_AND_INIT_LINK(head);
michael@0 1129 wthrp = WTHREAD_LINKS_PTR(head);
michael@0 1130 rval_status = PR_JoinThread(wthrp->thread);
michael@0 1131 PR_ASSERT(PR_SUCCESS == rval_status);
michael@0 1132 PR_DELETE(wthrp);
michael@0 1133 }
michael@0 1134
michael@0 1135 while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) {
michael@0 1136 wthread *wthrp;
michael@0 1137
michael@0 1138 head = PR_LIST_HEAD(&tpool->timerq.wthreads);
michael@0 1139 PR_REMOVE_AND_INIT_LINK(head);
michael@0 1140 wthrp = WTHREAD_LINKS_PTR(head);
michael@0 1141 rval_status = PR_JoinThread(wthrp->thread);
michael@0 1142 PR_ASSERT(PR_SUCCESS == rval_status);
michael@0 1143 PR_DELETE(wthrp);
michael@0 1144 }
michael@0 1145
michael@0 1146 /*
michael@0 1147 * Delete queued jobs
michael@0 1148 */
michael@0 1149 while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) {
michael@0 1150 PRJob *jobp;
michael@0 1151
michael@0 1152 head = PR_LIST_HEAD(&tpool->jobq.list);
michael@0 1153 PR_REMOVE_AND_INIT_LINK(head);
michael@0 1154 jobp = JOB_LINKS_PTR(head);
michael@0 1155 tpool->jobq.cnt--;
michael@0 1156 delete_job(jobp);
michael@0 1157 }
michael@0 1158
michael@0 1159 /* delete io jobs */
michael@0 1160 while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) {
michael@0 1161 PRJob *jobp;
michael@0 1162
michael@0 1163 head = PR_LIST_HEAD(&tpool->ioq.list);
michael@0 1164 PR_REMOVE_AND_INIT_LINK(head);
michael@0 1165 tpool->ioq.cnt--;
michael@0 1166 jobp = JOB_LINKS_PTR(head);
michael@0 1167 delete_job(jobp);
michael@0 1168 }
michael@0 1169
michael@0 1170 /* delete timer jobs */
michael@0 1171 while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) {
michael@0 1172 PRJob *jobp;
michael@0 1173
michael@0 1174 head = PR_LIST_HEAD(&tpool->timerq.list);
michael@0 1175 PR_REMOVE_AND_INIT_LINK(head);
michael@0 1176 tpool->timerq.cnt--;
michael@0 1177 jobp = JOB_LINKS_PTR(head);
michael@0 1178 delete_job(jobp);
michael@0 1179 }
michael@0 1180
michael@0 1181 PR_ASSERT(0 == tpool->jobq.cnt);
michael@0 1182 PR_ASSERT(0 == tpool->ioq.cnt);
michael@0 1183 PR_ASSERT(0 == tpool->timerq.cnt);
michael@0 1184
michael@0 1185 delete_threadpool(tpool);
michael@0 1186 return rval;
michael@0 1187 }

mercurial