1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/nsprpub/pr/src/misc/prtpool.c Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,1187 @@ 1.4 +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 1.5 +/* This Source Code Form is subject to the terms of the Mozilla Public 1.6 + * License, v. 2.0. If a copy of the MPL was not distributed with this 1.7 + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 1.8 + 1.9 +#include "nspr.h" 1.10 + 1.11 +/* 1.12 + * Thread pools 1.13 + * Thread pools create and manage threads to provide support for 1.14 + * scheduling jobs onto one or more threads. 1.15 + * 1.16 + */ 1.17 +#ifdef OPT_WINNT 1.18 +#include <windows.h> 1.19 +#endif 1.20 + 1.21 +/* 1.22 + * worker thread 1.23 + */ 1.24 +typedef struct wthread { 1.25 + PRCList links; 1.26 + PRThread *thread; 1.27 +} wthread; 1.28 + 1.29 +/* 1.30 + * queue of timer jobs 1.31 + */ 1.32 +typedef struct timer_jobq { 1.33 + PRCList list; 1.34 + PRLock *lock; 1.35 + PRCondVar *cv; 1.36 + PRInt32 cnt; 1.37 + PRCList wthreads; 1.38 +} timer_jobq; 1.39 + 1.40 +/* 1.41 + * queue of jobs 1.42 + */ 1.43 +typedef struct tp_jobq { 1.44 + PRCList list; 1.45 + PRInt32 cnt; 1.46 + PRLock *lock; 1.47 + PRCondVar *cv; 1.48 + PRCList wthreads; 1.49 +#ifdef OPT_WINNT 1.50 + HANDLE nt_completion_port; 1.51 +#endif 1.52 +} tp_jobq; 1.53 + 1.54 +/* 1.55 + * queue of IO jobs 1.56 + */ 1.57 +typedef struct io_jobq { 1.58 + PRCList list; 1.59 + PRPollDesc *pollfds; 1.60 + PRInt32 npollfds; 1.61 + PRJob **polljobs; 1.62 + PRLock *lock; 1.63 + PRInt32 cnt; 1.64 + PRFileDesc *notify_fd; 1.65 + PRCList wthreads; 1.66 +} io_jobq; 1.67 + 1.68 +/* 1.69 + * Threadpool 1.70 + */ 1.71 +struct PRThreadPool { 1.72 + PRInt32 init_threads; 1.73 + PRInt32 max_threads; 1.74 + PRInt32 current_threads; 1.75 + PRInt32 idle_threads; 1.76 + PRUint32 stacksize; 1.77 + tp_jobq jobq; 1.78 + io_jobq ioq; 1.79 + timer_jobq timerq; 1.80 + PRLock *join_lock; /* used with jobp->join_cv */ 1.81 + PRCondVar *shutdown_cv; 1.82 + PRBool shutdown; 1.83 +}; 1.84 + 1.85 +typedef enum io_op_type 1.86 + { JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type; 1.87 + 1.88 +#ifdef OPT_WINNT 1.89 +typedef struct NT_notifier { 1.90 + OVERLAPPED overlapped; /* must be first */ 1.91 + PRJob *jobp; 1.92 +} NT_notifier; 1.93 +#endif 1.94 + 1.95 +struct PRJob { 1.96 + PRCList links; /* for linking jobs */ 1.97 + PRBool on_ioq; /* job on ioq */ 1.98 + PRBool on_timerq; /* job on timerq */ 1.99 + PRJobFn job_func; 1.100 + void *job_arg; 1.101 + PRCondVar *join_cv; 1.102 + PRBool join_wait; /* == PR_TRUE, when waiting to join */ 1.103 + PRCondVar *cancel_cv; /* for cancelling IO jobs */ 1.104 + PRBool cancel_io; /* for cancelling IO jobs */ 1.105 + PRThreadPool *tpool; /* back pointer to thread pool */ 1.106 + PRJobIoDesc *iod; 1.107 + io_op_type io_op; 1.108 + PRInt16 io_poll_flags; 1.109 + PRNetAddr *netaddr; 1.110 + PRIntervalTime timeout; /* relative value */ 1.111 + PRIntervalTime absolute; 1.112 +#ifdef OPT_WINNT 1.113 + NT_notifier nt_notifier; 1.114 +#endif 1.115 +}; 1.116 + 1.117 +#define JOB_LINKS_PTR(_qp) \ 1.118 + ((PRJob *) ((char *) (_qp) - offsetof(PRJob, links))) 1.119 + 1.120 +#define WTHREAD_LINKS_PTR(_qp) \ 1.121 + ((wthread *) ((char *) (_qp) - offsetof(wthread, links))) 1.122 + 1.123 +#define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv) 1.124 + 1.125 +#define JOIN_NOTIFY(_jobp) \ 1.126 + PR_BEGIN_MACRO \ 1.127 + PR_Lock(_jobp->tpool->join_lock); \ 1.128 + _jobp->join_wait = PR_FALSE; \ 1.129 + PR_NotifyCondVar(_jobp->join_cv); \ 1.130 + PR_Unlock(_jobp->tpool->join_lock); \ 1.131 + PR_END_MACRO 1.132 + 1.133 +#define CANCEL_IO_JOB(jobp) \ 1.134 + PR_BEGIN_MACRO \ 1.135 + jobp->cancel_io = PR_FALSE; \ 1.136 + jobp->on_ioq = PR_FALSE; \ 1.137 + PR_REMOVE_AND_INIT_LINK(&jobp->links); \ 1.138 + tp->ioq.cnt--; \ 1.139 + PR_NotifyCondVar(jobp->cancel_cv); \ 1.140 + PR_END_MACRO 1.141 + 1.142 +static void delete_job(PRJob *jobp); 1.143 +static PRThreadPool * alloc_threadpool(void); 1.144 +static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp); 1.145 +static void notify_ioq(PRThreadPool *tp); 1.146 +static void notify_timerq(PRThreadPool *tp); 1.147 + 1.148 +/* 1.149 + * locks are acquired in the following order 1.150 + * 1.151 + * tp->ioq.lock,tp->timerq.lock 1.152 + * | 1.153 + * V 1.154 + * tp->jobq->lock 1.155 + */ 1.156 + 1.157 +/* 1.158 + * worker thread function 1.159 + */ 1.160 +static void wstart(void *arg) 1.161 +{ 1.162 +PRThreadPool *tp = (PRThreadPool *) arg; 1.163 +PRCList *head; 1.164 + 1.165 + /* 1.166 + * execute jobs until shutdown 1.167 + */ 1.168 + while (!tp->shutdown) { 1.169 + PRJob *jobp; 1.170 +#ifdef OPT_WINNT 1.171 + BOOL rv; 1.172 + DWORD unused, shutdown; 1.173 + LPOVERLAPPED olp; 1.174 + 1.175 + PR_Lock(tp->jobq.lock); 1.176 + tp->idle_threads++; 1.177 + PR_Unlock(tp->jobq.lock); 1.178 + rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port, 1.179 + &unused, &shutdown, &olp, INFINITE); 1.180 + 1.181 + PR_ASSERT(rv); 1.182 + if (shutdown) 1.183 + break; 1.184 + jobp = ((NT_notifier *) olp)->jobp; 1.185 + PR_Lock(tp->jobq.lock); 1.186 + tp->idle_threads--; 1.187 + tp->jobq.cnt--; 1.188 + PR_Unlock(tp->jobq.lock); 1.189 +#else 1.190 + 1.191 + PR_Lock(tp->jobq.lock); 1.192 + while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) { 1.193 + tp->idle_threads++; 1.194 + PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT); 1.195 + tp->idle_threads--; 1.196 + } 1.197 + if (tp->shutdown) { 1.198 + PR_Unlock(tp->jobq.lock); 1.199 + break; 1.200 + } 1.201 + head = PR_LIST_HEAD(&tp->jobq.list); 1.202 + /* 1.203 + * remove job from queue 1.204 + */ 1.205 + PR_REMOVE_AND_INIT_LINK(head); 1.206 + tp->jobq.cnt--; 1.207 + jobp = JOB_LINKS_PTR(head); 1.208 + PR_Unlock(tp->jobq.lock); 1.209 +#endif 1.210 + 1.211 + jobp->job_func(jobp->job_arg); 1.212 + if (!JOINABLE_JOB(jobp)) { 1.213 + delete_job(jobp); 1.214 + } else { 1.215 + JOIN_NOTIFY(jobp); 1.216 + } 1.217 + } 1.218 + PR_Lock(tp->jobq.lock); 1.219 + tp->current_threads--; 1.220 + PR_Unlock(tp->jobq.lock); 1.221 +} 1.222 + 1.223 +/* 1.224 + * add a job to the work queue 1.225 + */ 1.226 +static void 1.227 +add_to_jobq(PRThreadPool *tp, PRJob *jobp) 1.228 +{ 1.229 + /* 1.230 + * add to jobq 1.231 + */ 1.232 +#ifdef OPT_WINNT 1.233 + PR_Lock(tp->jobq.lock); 1.234 + tp->jobq.cnt++; 1.235 + PR_Unlock(tp->jobq.lock); 1.236 + /* 1.237 + * notify worker thread(s) 1.238 + */ 1.239 + PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0, 1.240 + FALSE, &jobp->nt_notifier.overlapped); 1.241 +#else 1.242 + PR_Lock(tp->jobq.lock); 1.243 + PR_APPEND_LINK(&jobp->links,&tp->jobq.list); 1.244 + tp->jobq.cnt++; 1.245 + if ((tp->idle_threads < tp->jobq.cnt) && 1.246 + (tp->current_threads < tp->max_threads)) { 1.247 + wthread *wthrp; 1.248 + /* 1.249 + * increment thread count and unlock the jobq lock 1.250 + */ 1.251 + tp->current_threads++; 1.252 + PR_Unlock(tp->jobq.lock); 1.253 + /* create new worker thread */ 1.254 + wthrp = PR_NEWZAP(wthread); 1.255 + if (wthrp) { 1.256 + wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart, 1.257 + tp, PR_PRIORITY_NORMAL, 1.258 + PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize); 1.259 + if (NULL == wthrp->thread) { 1.260 + PR_DELETE(wthrp); /* this sets wthrp to NULL */ 1.261 + } 1.262 + } 1.263 + PR_Lock(tp->jobq.lock); 1.264 + if (NULL == wthrp) { 1.265 + tp->current_threads--; 1.266 + } else { 1.267 + PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads); 1.268 + } 1.269 + } 1.270 + /* 1.271 + * wakeup a worker thread 1.272 + */ 1.273 + PR_NotifyCondVar(tp->jobq.cv); 1.274 + PR_Unlock(tp->jobq.lock); 1.275 +#endif 1.276 +} 1.277 + 1.278 +/* 1.279 + * io worker thread function 1.280 + */ 1.281 +static void io_wstart(void *arg) 1.282 +{ 1.283 +PRThreadPool *tp = (PRThreadPool *) arg; 1.284 +int pollfd_cnt, pollfds_used; 1.285 +int rv; 1.286 +PRCList *qp, *nextqp; 1.287 +PRPollDesc *pollfds; 1.288 +PRJob **polljobs; 1.289 +int poll_timeout; 1.290 +PRIntervalTime now; 1.291 + 1.292 + /* 1.293 + * scan io_jobq 1.294 + * construct poll list 1.295 + * call PR_Poll 1.296 + * for all fds, for which poll returns true, move the job to 1.297 + * jobq and wakeup worker thread. 1.298 + */ 1.299 + while (!tp->shutdown) { 1.300 + PRJob *jobp; 1.301 + 1.302 + pollfd_cnt = tp->ioq.cnt + 10; 1.303 + if (pollfd_cnt > tp->ioq.npollfds) { 1.304 + 1.305 + /* 1.306 + * re-allocate pollfd array if the current one is not large 1.307 + * enough 1.308 + */ 1.309 + if (NULL != tp->ioq.pollfds) 1.310 + PR_Free(tp->ioq.pollfds); 1.311 + tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt * 1.312 + (sizeof(PRPollDesc) + sizeof(PRJob *))); 1.313 + PR_ASSERT(NULL != tp->ioq.pollfds); 1.314 + /* 1.315 + * array of pollfds 1.316 + */ 1.317 + pollfds = tp->ioq.pollfds; 1.318 + tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]); 1.319 + /* 1.320 + * parallel array of jobs 1.321 + */ 1.322 + polljobs = tp->ioq.polljobs; 1.323 + tp->ioq.npollfds = pollfd_cnt; 1.324 + } 1.325 + 1.326 + pollfds_used = 0; 1.327 + /* 1.328 + * add the notify fd; used for unblocking io thread(s) 1.329 + */ 1.330 + pollfds[pollfds_used].fd = tp->ioq.notify_fd; 1.331 + pollfds[pollfds_used].in_flags = PR_POLL_READ; 1.332 + pollfds[pollfds_used].out_flags = 0; 1.333 + polljobs[pollfds_used] = NULL; 1.334 + pollfds_used++; 1.335 + /* 1.336 + * fill in the pollfd array 1.337 + */ 1.338 + PR_Lock(tp->ioq.lock); 1.339 + for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) { 1.340 + nextqp = qp->next; 1.341 + jobp = JOB_LINKS_PTR(qp); 1.342 + if (jobp->cancel_io) { 1.343 + CANCEL_IO_JOB(jobp); 1.344 + continue; 1.345 + } 1.346 + if (pollfds_used == (pollfd_cnt)) 1.347 + break; 1.348 + pollfds[pollfds_used].fd = jobp->iod->socket; 1.349 + pollfds[pollfds_used].in_flags = jobp->io_poll_flags; 1.350 + pollfds[pollfds_used].out_flags = 0; 1.351 + polljobs[pollfds_used] = jobp; 1.352 + 1.353 + pollfds_used++; 1.354 + } 1.355 + if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) { 1.356 + qp = tp->ioq.list.next; 1.357 + jobp = JOB_LINKS_PTR(qp); 1.358 + if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) 1.359 + poll_timeout = PR_INTERVAL_NO_TIMEOUT; 1.360 + else if (PR_INTERVAL_NO_WAIT == jobp->timeout) 1.361 + poll_timeout = PR_INTERVAL_NO_WAIT; 1.362 + else { 1.363 + poll_timeout = jobp->absolute - PR_IntervalNow(); 1.364 + if (poll_timeout <= 0) /* already timed out */ 1.365 + poll_timeout = PR_INTERVAL_NO_WAIT; 1.366 + } 1.367 + } else { 1.368 + poll_timeout = PR_INTERVAL_NO_TIMEOUT; 1.369 + } 1.370 + PR_Unlock(tp->ioq.lock); 1.371 + 1.372 + /* 1.373 + * XXXX 1.374 + * should retry if more jobs have been added to the queue? 1.375 + * 1.376 + */ 1.377 + PR_ASSERT(pollfds_used <= pollfd_cnt); 1.378 + rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout); 1.379 + 1.380 + if (tp->shutdown) { 1.381 + break; 1.382 + } 1.383 + 1.384 + if (rv > 0) { 1.385 + /* 1.386 + * at least one io event is set 1.387 + */ 1.388 + PRStatus rval_status; 1.389 + PRInt32 index; 1.390 + 1.391 + PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd); 1.392 + /* 1.393 + * reset the pollable event, if notified 1.394 + */ 1.395 + if (pollfds[0].out_flags & PR_POLL_READ) { 1.396 + rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd); 1.397 + PR_ASSERT(PR_SUCCESS == rval_status); 1.398 + } 1.399 + 1.400 + for(index = 1; index < (pollfds_used); index++) { 1.401 + PRInt16 events = pollfds[index].in_flags; 1.402 + PRInt16 revents = pollfds[index].out_flags; 1.403 + jobp = polljobs[index]; 1.404 + 1.405 + if ((revents & PR_POLL_NVAL) || /* busted in all cases */ 1.406 + (revents & PR_POLL_ERR) || 1.407 + ((events & PR_POLL_WRITE) && 1.408 + (revents & PR_POLL_HUP))) { /* write op & hup */ 1.409 + PR_Lock(tp->ioq.lock); 1.410 + if (jobp->cancel_io) { 1.411 + CANCEL_IO_JOB(jobp); 1.412 + PR_Unlock(tp->ioq.lock); 1.413 + continue; 1.414 + } 1.415 + PR_REMOVE_AND_INIT_LINK(&jobp->links); 1.416 + tp->ioq.cnt--; 1.417 + jobp->on_ioq = PR_FALSE; 1.418 + PR_Unlock(tp->ioq.lock); 1.419 + 1.420 + /* set error */ 1.421 + if (PR_POLL_NVAL & revents) 1.422 + jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR; 1.423 + else if (PR_POLL_HUP & revents) 1.424 + jobp->iod->error = PR_CONNECT_RESET_ERROR; 1.425 + else 1.426 + jobp->iod->error = PR_IO_ERROR; 1.427 + 1.428 + /* 1.429 + * add to jobq 1.430 + */ 1.431 + add_to_jobq(tp, jobp); 1.432 + } else if (revents) { 1.433 + /* 1.434 + * add to jobq 1.435 + */ 1.436 + PR_Lock(tp->ioq.lock); 1.437 + if (jobp->cancel_io) { 1.438 + CANCEL_IO_JOB(jobp); 1.439 + PR_Unlock(tp->ioq.lock); 1.440 + continue; 1.441 + } 1.442 + PR_REMOVE_AND_INIT_LINK(&jobp->links); 1.443 + tp->ioq.cnt--; 1.444 + jobp->on_ioq = PR_FALSE; 1.445 + PR_Unlock(tp->ioq.lock); 1.446 + 1.447 + if (jobp->io_op == JOB_IO_CONNECT) { 1.448 + if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS) 1.449 + jobp->iod->error = 0; 1.450 + else 1.451 + jobp->iod->error = PR_GetError(); 1.452 + } else 1.453 + jobp->iod->error = 0; 1.454 + 1.455 + add_to_jobq(tp, jobp); 1.456 + } 1.457 + } 1.458 + } 1.459 + /* 1.460 + * timeout processing 1.461 + */ 1.462 + now = PR_IntervalNow(); 1.463 + PR_Lock(tp->ioq.lock); 1.464 + for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) { 1.465 + nextqp = qp->next; 1.466 + jobp = JOB_LINKS_PTR(qp); 1.467 + if (jobp->cancel_io) { 1.468 + CANCEL_IO_JOB(jobp); 1.469 + continue; 1.470 + } 1.471 + if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) 1.472 + break; 1.473 + if ((PR_INTERVAL_NO_WAIT != jobp->timeout) && 1.474 + ((PRInt32)(jobp->absolute - now) > 0)) 1.475 + break; 1.476 + PR_REMOVE_AND_INIT_LINK(&jobp->links); 1.477 + tp->ioq.cnt--; 1.478 + jobp->on_ioq = PR_FALSE; 1.479 + jobp->iod->error = PR_IO_TIMEOUT_ERROR; 1.480 + add_to_jobq(tp, jobp); 1.481 + } 1.482 + PR_Unlock(tp->ioq.lock); 1.483 + } 1.484 +} 1.485 + 1.486 +/* 1.487 + * timer worker thread function 1.488 + */ 1.489 +static void timer_wstart(void *arg) 1.490 +{ 1.491 +PRThreadPool *tp = (PRThreadPool *) arg; 1.492 +PRCList *qp; 1.493 +PRIntervalTime timeout; 1.494 +PRIntervalTime now; 1.495 + 1.496 + /* 1.497 + * call PR_WaitCondVar with minimum value of all timeouts 1.498 + */ 1.499 + while (!tp->shutdown) { 1.500 + PRJob *jobp; 1.501 + 1.502 + PR_Lock(tp->timerq.lock); 1.503 + if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) { 1.504 + timeout = PR_INTERVAL_NO_TIMEOUT; 1.505 + } else { 1.506 + PRCList *qp; 1.507 + 1.508 + qp = tp->timerq.list.next; 1.509 + jobp = JOB_LINKS_PTR(qp); 1.510 + 1.511 + timeout = jobp->absolute - PR_IntervalNow(); 1.512 + if (timeout <= 0) 1.513 + timeout = PR_INTERVAL_NO_WAIT; /* already timed out */ 1.514 + } 1.515 + if (PR_INTERVAL_NO_WAIT != timeout) 1.516 + PR_WaitCondVar(tp->timerq.cv, timeout); 1.517 + if (tp->shutdown) { 1.518 + PR_Unlock(tp->timerq.lock); 1.519 + break; 1.520 + } 1.521 + /* 1.522 + * move expired-timer jobs to jobq 1.523 + */ 1.524 + now = PR_IntervalNow(); 1.525 + while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) { 1.526 + qp = tp->timerq.list.next; 1.527 + jobp = JOB_LINKS_PTR(qp); 1.528 + 1.529 + if ((PRInt32)(jobp->absolute - now) > 0) { 1.530 + break; 1.531 + } 1.532 + /* 1.533 + * job timed out 1.534 + */ 1.535 + PR_REMOVE_AND_INIT_LINK(&jobp->links); 1.536 + tp->timerq.cnt--; 1.537 + jobp->on_timerq = PR_FALSE; 1.538 + add_to_jobq(tp, jobp); 1.539 + } 1.540 + PR_Unlock(tp->timerq.lock); 1.541 + } 1.542 +} 1.543 + 1.544 +static void 1.545 +delete_threadpool(PRThreadPool *tp) 1.546 +{ 1.547 + if (NULL != tp) { 1.548 + if (NULL != tp->shutdown_cv) 1.549 + PR_DestroyCondVar(tp->shutdown_cv); 1.550 + if (NULL != tp->jobq.cv) 1.551 + PR_DestroyCondVar(tp->jobq.cv); 1.552 + if (NULL != tp->jobq.lock) 1.553 + PR_DestroyLock(tp->jobq.lock); 1.554 + if (NULL != tp->join_lock) 1.555 + PR_DestroyLock(tp->join_lock); 1.556 +#ifdef OPT_WINNT 1.557 + if (NULL != tp->jobq.nt_completion_port) 1.558 + CloseHandle(tp->jobq.nt_completion_port); 1.559 +#endif 1.560 + /* Timer queue */ 1.561 + if (NULL != tp->timerq.cv) 1.562 + PR_DestroyCondVar(tp->timerq.cv); 1.563 + if (NULL != tp->timerq.lock) 1.564 + PR_DestroyLock(tp->timerq.lock); 1.565 + 1.566 + if (NULL != tp->ioq.lock) 1.567 + PR_DestroyLock(tp->ioq.lock); 1.568 + if (NULL != tp->ioq.pollfds) 1.569 + PR_Free(tp->ioq.pollfds); 1.570 + if (NULL != tp->ioq.notify_fd) 1.571 + PR_DestroyPollableEvent(tp->ioq.notify_fd); 1.572 + PR_Free(tp); 1.573 + } 1.574 + return; 1.575 +} 1.576 + 1.577 +static PRThreadPool * 1.578 +alloc_threadpool(void) 1.579 +{ 1.580 +PRThreadPool *tp; 1.581 + 1.582 + tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp)); 1.583 + if (NULL == tp) 1.584 + goto failed; 1.585 + tp->jobq.lock = PR_NewLock(); 1.586 + if (NULL == tp->jobq.lock) 1.587 + goto failed; 1.588 + tp->jobq.cv = PR_NewCondVar(tp->jobq.lock); 1.589 + if (NULL == tp->jobq.cv) 1.590 + goto failed; 1.591 + tp->join_lock = PR_NewLock(); 1.592 + if (NULL == tp->join_lock) 1.593 + goto failed; 1.594 +#ifdef OPT_WINNT 1.595 + tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 1.596 + NULL, 0, 0); 1.597 + if (NULL == tp->jobq.nt_completion_port) 1.598 + goto failed; 1.599 +#endif 1.600 + 1.601 + tp->ioq.lock = PR_NewLock(); 1.602 + if (NULL == tp->ioq.lock) 1.603 + goto failed; 1.604 + 1.605 + /* Timer queue */ 1.606 + 1.607 + tp->timerq.lock = PR_NewLock(); 1.608 + if (NULL == tp->timerq.lock) 1.609 + goto failed; 1.610 + tp->timerq.cv = PR_NewCondVar(tp->timerq.lock); 1.611 + if (NULL == tp->timerq.cv) 1.612 + goto failed; 1.613 + 1.614 + tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock); 1.615 + if (NULL == tp->shutdown_cv) 1.616 + goto failed; 1.617 + tp->ioq.notify_fd = PR_NewPollableEvent(); 1.618 + if (NULL == tp->ioq.notify_fd) 1.619 + goto failed; 1.620 + return tp; 1.621 +failed: 1.622 + delete_threadpool(tp); 1.623 + PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); 1.624 + return NULL; 1.625 +} 1.626 + 1.627 +/* Create thread pool */ 1.628 +PR_IMPLEMENT(PRThreadPool *) 1.629 +PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads, 1.630 + PRUint32 stacksize) 1.631 +{ 1.632 +PRThreadPool *tp; 1.633 +PRThread *thr; 1.634 +int i; 1.635 +wthread *wthrp; 1.636 + 1.637 + tp = alloc_threadpool(); 1.638 + if (NULL == tp) 1.639 + return NULL; 1.640 + 1.641 + tp->init_threads = initial_threads; 1.642 + tp->max_threads = max_threads; 1.643 + tp->stacksize = stacksize; 1.644 + PR_INIT_CLIST(&tp->jobq.list); 1.645 + PR_INIT_CLIST(&tp->ioq.list); 1.646 + PR_INIT_CLIST(&tp->timerq.list); 1.647 + PR_INIT_CLIST(&tp->jobq.wthreads); 1.648 + PR_INIT_CLIST(&tp->ioq.wthreads); 1.649 + PR_INIT_CLIST(&tp->timerq.wthreads); 1.650 + tp->shutdown = PR_FALSE; 1.651 + 1.652 + PR_Lock(tp->jobq.lock); 1.653 + for(i=0; i < initial_threads; ++i) { 1.654 + 1.655 + thr = PR_CreateThread(PR_USER_THREAD, wstart, 1.656 + tp, PR_PRIORITY_NORMAL, 1.657 + PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize); 1.658 + PR_ASSERT(thr); 1.659 + wthrp = PR_NEWZAP(wthread); 1.660 + PR_ASSERT(wthrp); 1.661 + wthrp->thread = thr; 1.662 + PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads); 1.663 + } 1.664 + tp->current_threads = initial_threads; 1.665 + 1.666 + thr = PR_CreateThread(PR_USER_THREAD, io_wstart, 1.667 + tp, PR_PRIORITY_NORMAL, 1.668 + PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize); 1.669 + PR_ASSERT(thr); 1.670 + wthrp = PR_NEWZAP(wthread); 1.671 + PR_ASSERT(wthrp); 1.672 + wthrp->thread = thr; 1.673 + PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads); 1.674 + 1.675 + thr = PR_CreateThread(PR_USER_THREAD, timer_wstart, 1.676 + tp, PR_PRIORITY_NORMAL, 1.677 + PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize); 1.678 + PR_ASSERT(thr); 1.679 + wthrp = PR_NEWZAP(wthread); 1.680 + PR_ASSERT(wthrp); 1.681 + wthrp->thread = thr; 1.682 + PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads); 1.683 + 1.684 + PR_Unlock(tp->jobq.lock); 1.685 + return tp; 1.686 +} 1.687 + 1.688 +static void 1.689 +delete_job(PRJob *jobp) 1.690 +{ 1.691 + if (NULL != jobp) { 1.692 + if (NULL != jobp->join_cv) { 1.693 + PR_DestroyCondVar(jobp->join_cv); 1.694 + jobp->join_cv = NULL; 1.695 + } 1.696 + if (NULL != jobp->cancel_cv) { 1.697 + PR_DestroyCondVar(jobp->cancel_cv); 1.698 + jobp->cancel_cv = NULL; 1.699 + } 1.700 + PR_DELETE(jobp); 1.701 + } 1.702 +} 1.703 + 1.704 +static PRJob * 1.705 +alloc_job(PRBool joinable, PRThreadPool *tp) 1.706 +{ 1.707 + PRJob *jobp; 1.708 + 1.709 + jobp = PR_NEWZAP(PRJob); 1.710 + if (NULL == jobp) 1.711 + goto failed; 1.712 + if (joinable) { 1.713 + jobp->join_cv = PR_NewCondVar(tp->join_lock); 1.714 + jobp->join_wait = PR_TRUE; 1.715 + if (NULL == jobp->join_cv) 1.716 + goto failed; 1.717 + } else { 1.718 + jobp->join_cv = NULL; 1.719 + } 1.720 +#ifdef OPT_WINNT 1.721 + jobp->nt_notifier.jobp = jobp; 1.722 +#endif 1.723 + return jobp; 1.724 +failed: 1.725 + delete_job(jobp); 1.726 + PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); 1.727 + return NULL; 1.728 +} 1.729 + 1.730 +/* queue a job */ 1.731 +PR_IMPLEMENT(PRJob *) 1.732 +PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable) 1.733 +{ 1.734 + PRJob *jobp; 1.735 + 1.736 + jobp = alloc_job(joinable, tpool); 1.737 + if (NULL == jobp) 1.738 + return NULL; 1.739 + 1.740 + jobp->job_func = fn; 1.741 + jobp->job_arg = arg; 1.742 + jobp->tpool = tpool; 1.743 + 1.744 + add_to_jobq(tpool, jobp); 1.745 + return jobp; 1.746 +} 1.747 + 1.748 +/* queue a job, when a socket is readable or writeable */ 1.749 +static PRJob * 1.750 +queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg, 1.751 + PRBool joinable, io_op_type op) 1.752 +{ 1.753 + PRJob *jobp; 1.754 + PRIntervalTime now; 1.755 + 1.756 + jobp = alloc_job(joinable, tpool); 1.757 + if (NULL == jobp) { 1.758 + return NULL; 1.759 + } 1.760 + 1.761 + /* 1.762 + * Add a new job to io_jobq 1.763 + * wakeup io worker thread 1.764 + */ 1.765 + 1.766 + jobp->job_func = fn; 1.767 + jobp->job_arg = arg; 1.768 + jobp->tpool = tpool; 1.769 + jobp->iod = iod; 1.770 + if (JOB_IO_READ == op) { 1.771 + jobp->io_op = JOB_IO_READ; 1.772 + jobp->io_poll_flags = PR_POLL_READ; 1.773 + } else if (JOB_IO_WRITE == op) { 1.774 + jobp->io_op = JOB_IO_WRITE; 1.775 + jobp->io_poll_flags = PR_POLL_WRITE; 1.776 + } else if (JOB_IO_ACCEPT == op) { 1.777 + jobp->io_op = JOB_IO_ACCEPT; 1.778 + jobp->io_poll_flags = PR_POLL_READ; 1.779 + } else if (JOB_IO_CONNECT == op) { 1.780 + jobp->io_op = JOB_IO_CONNECT; 1.781 + jobp->io_poll_flags = PR_POLL_WRITE|PR_POLL_EXCEPT; 1.782 + } else { 1.783 + delete_job(jobp); 1.784 + PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); 1.785 + return NULL; 1.786 + } 1.787 + 1.788 + jobp->timeout = iod->timeout; 1.789 + if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) || 1.790 + (PR_INTERVAL_NO_WAIT == iod->timeout)) { 1.791 + jobp->absolute = iod->timeout; 1.792 + } else { 1.793 + now = PR_IntervalNow(); 1.794 + jobp->absolute = now + iod->timeout; 1.795 + } 1.796 + 1.797 + 1.798 + PR_Lock(tpool->ioq.lock); 1.799 + 1.800 + if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) || 1.801 + (PR_INTERVAL_NO_TIMEOUT == iod->timeout)) { 1.802 + PR_APPEND_LINK(&jobp->links,&tpool->ioq.list); 1.803 + } else if (PR_INTERVAL_NO_WAIT == iod->timeout) { 1.804 + PR_INSERT_LINK(&jobp->links,&tpool->ioq.list); 1.805 + } else { 1.806 + PRCList *qp; 1.807 + PRJob *tmp_jobp; 1.808 + /* 1.809 + * insert into the timeout-sorted ioq 1.810 + */ 1.811 + for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list; 1.812 + qp = qp->prev) { 1.813 + tmp_jobp = JOB_LINKS_PTR(qp); 1.814 + if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { 1.815 + break; 1.816 + } 1.817 + } 1.818 + PR_INSERT_AFTER(&jobp->links,qp); 1.819 + } 1.820 + 1.821 + jobp->on_ioq = PR_TRUE; 1.822 + tpool->ioq.cnt++; 1.823 + /* 1.824 + * notify io worker thread(s) 1.825 + */ 1.826 + PR_Unlock(tpool->ioq.lock); 1.827 + notify_ioq(tpool); 1.828 + return jobp; 1.829 +} 1.830 + 1.831 +/* queue a job, when a socket is readable */ 1.832 +PR_IMPLEMENT(PRJob *) 1.833 +PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg, 1.834 + PRBool joinable) 1.835 +{ 1.836 + return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ)); 1.837 +} 1.838 + 1.839 +/* queue a job, when a socket is writeable */ 1.840 +PR_IMPLEMENT(PRJob *) 1.841 +PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,void * arg, 1.842 + PRBool joinable) 1.843 +{ 1.844 + return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE)); 1.845 +} 1.846 + 1.847 + 1.848 +/* queue a job, when a socket has a pending connection */ 1.849 +PR_IMPLEMENT(PRJob *) 1.850 +PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, 1.851 + void * arg, PRBool joinable) 1.852 +{ 1.853 + return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT)); 1.854 +} 1.855 + 1.856 +/* queue a job, when a socket can be connected */ 1.857 +PR_IMPLEMENT(PRJob *) 1.858 +PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod, 1.859 + const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable) 1.860 +{ 1.861 + PRStatus rv; 1.862 + PRErrorCode err; 1.863 + 1.864 + rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT); 1.865 + if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)){ 1.866 + /* connection pending */ 1.867 + return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT)); 1.868 + } else { 1.869 + /* 1.870 + * connection succeeded or failed; add to jobq right away 1.871 + */ 1.872 + if (rv == PR_FAILURE) 1.873 + iod->error = err; 1.874 + else 1.875 + iod->error = 0; 1.876 + return(PR_QueueJob(tpool, fn, arg, joinable)); 1.877 + } 1.878 +} 1.879 + 1.880 +/* queue a job, when a timer expires */ 1.881 +PR_IMPLEMENT(PRJob *) 1.882 +PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout, 1.883 + PRJobFn fn, void * arg, PRBool joinable) 1.884 +{ 1.885 + PRIntervalTime now; 1.886 + PRJob *jobp; 1.887 + 1.888 + if (PR_INTERVAL_NO_TIMEOUT == timeout) { 1.889 + PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); 1.890 + return NULL; 1.891 + } 1.892 + if (PR_INTERVAL_NO_WAIT == timeout) { 1.893 + /* 1.894 + * no waiting; add to jobq right away 1.895 + */ 1.896 + return(PR_QueueJob(tpool, fn, arg, joinable)); 1.897 + } 1.898 + jobp = alloc_job(joinable, tpool); 1.899 + if (NULL == jobp) { 1.900 + return NULL; 1.901 + } 1.902 + 1.903 + /* 1.904 + * Add a new job to timer_jobq 1.905 + * wakeup timer worker thread 1.906 + */ 1.907 + 1.908 + jobp->job_func = fn; 1.909 + jobp->job_arg = arg; 1.910 + jobp->tpool = tpool; 1.911 + jobp->timeout = timeout; 1.912 + 1.913 + now = PR_IntervalNow(); 1.914 + jobp->absolute = now + timeout; 1.915 + 1.916 + 1.917 + PR_Lock(tpool->timerq.lock); 1.918 + jobp->on_timerq = PR_TRUE; 1.919 + if (PR_CLIST_IS_EMPTY(&tpool->timerq.list)) 1.920 + PR_APPEND_LINK(&jobp->links,&tpool->timerq.list); 1.921 + else { 1.922 + PRCList *qp; 1.923 + PRJob *tmp_jobp; 1.924 + /* 1.925 + * insert into the sorted timer jobq 1.926 + */ 1.927 + for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list; 1.928 + qp = qp->prev) { 1.929 + tmp_jobp = JOB_LINKS_PTR(qp); 1.930 + if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { 1.931 + break; 1.932 + } 1.933 + } 1.934 + PR_INSERT_AFTER(&jobp->links,qp); 1.935 + } 1.936 + tpool->timerq.cnt++; 1.937 + /* 1.938 + * notify timer worker thread(s) 1.939 + */ 1.940 + notify_timerq(tpool); 1.941 + PR_Unlock(tpool->timerq.lock); 1.942 + return jobp; 1.943 +} 1.944 + 1.945 +static void 1.946 +notify_timerq(PRThreadPool *tp) 1.947 +{ 1.948 + /* 1.949 + * wakeup the timer thread(s) 1.950 + */ 1.951 + PR_NotifyCondVar(tp->timerq.cv); 1.952 +} 1.953 + 1.954 +static void 1.955 +notify_ioq(PRThreadPool *tp) 1.956 +{ 1.957 +PRStatus rval_status; 1.958 + 1.959 + /* 1.960 + * wakeup the io thread(s) 1.961 + */ 1.962 + rval_status = PR_SetPollableEvent(tp->ioq.notify_fd); 1.963 + PR_ASSERT(PR_SUCCESS == rval_status); 1.964 +} 1.965 + 1.966 +/* 1.967 + * cancel a job 1.968 + * 1.969 + * XXXX: is this needed? likely to be removed 1.970 + */ 1.971 +PR_IMPLEMENT(PRStatus) 1.972 +PR_CancelJob(PRJob *jobp) { 1.973 + 1.974 + PRStatus rval = PR_FAILURE; 1.975 + PRThreadPool *tp; 1.976 + 1.977 + if (jobp->on_timerq) { 1.978 + /* 1.979 + * now, check again while holding the timerq lock 1.980 + */ 1.981 + tp = jobp->tpool; 1.982 + PR_Lock(tp->timerq.lock); 1.983 + if (jobp->on_timerq) { 1.984 + jobp->on_timerq = PR_FALSE; 1.985 + PR_REMOVE_AND_INIT_LINK(&jobp->links); 1.986 + tp->timerq.cnt--; 1.987 + PR_Unlock(tp->timerq.lock); 1.988 + if (!JOINABLE_JOB(jobp)) { 1.989 + delete_job(jobp); 1.990 + } else { 1.991 + JOIN_NOTIFY(jobp); 1.992 + } 1.993 + rval = PR_SUCCESS; 1.994 + } else 1.995 + PR_Unlock(tp->timerq.lock); 1.996 + } else if (jobp->on_ioq) { 1.997 + /* 1.998 + * now, check again while holding the ioq lock 1.999 + */ 1.1000 + tp = jobp->tpool; 1.1001 + PR_Lock(tp->ioq.lock); 1.1002 + if (jobp->on_ioq) { 1.1003 + jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock); 1.1004 + if (NULL == jobp->cancel_cv) { 1.1005 + PR_Unlock(tp->ioq.lock); 1.1006 + PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0); 1.1007 + return PR_FAILURE; 1.1008 + } 1.1009 + /* 1.1010 + * mark job 'cancelled' and notify io thread(s) 1.1011 + * XXXX: 1.1012 + * this assumes there is only one io thread; when there 1.1013 + * are multiple threads, the io thread processing this job 1.1014 + * must be notified. 1.1015 + */ 1.1016 + jobp->cancel_io = PR_TRUE; 1.1017 + PR_Unlock(tp->ioq.lock); /* release, reacquire ioq lock */ 1.1018 + notify_ioq(tp); 1.1019 + PR_Lock(tp->ioq.lock); 1.1020 + while (jobp->cancel_io) 1.1021 + PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT); 1.1022 + PR_Unlock(tp->ioq.lock); 1.1023 + PR_ASSERT(!jobp->on_ioq); 1.1024 + if (!JOINABLE_JOB(jobp)) { 1.1025 + delete_job(jobp); 1.1026 + } else { 1.1027 + JOIN_NOTIFY(jobp); 1.1028 + } 1.1029 + rval = PR_SUCCESS; 1.1030 + } else 1.1031 + PR_Unlock(tp->ioq.lock); 1.1032 + } 1.1033 + if (PR_FAILURE == rval) 1.1034 + PR_SetError(PR_INVALID_STATE_ERROR, 0); 1.1035 + return rval; 1.1036 +} 1.1037 + 1.1038 +/* join a job, wait until completion */ 1.1039 +PR_IMPLEMENT(PRStatus) 1.1040 +PR_JoinJob(PRJob *jobp) 1.1041 +{ 1.1042 + if (!JOINABLE_JOB(jobp)) { 1.1043 + PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); 1.1044 + return PR_FAILURE; 1.1045 + } 1.1046 + PR_Lock(jobp->tpool->join_lock); 1.1047 + while(jobp->join_wait) 1.1048 + PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT); 1.1049 + PR_Unlock(jobp->tpool->join_lock); 1.1050 + delete_job(jobp); 1.1051 + return PR_SUCCESS; 1.1052 +} 1.1053 + 1.1054 +/* shutdown threadpool */ 1.1055 +PR_IMPLEMENT(PRStatus) 1.1056 +PR_ShutdownThreadPool(PRThreadPool *tpool) 1.1057 +{ 1.1058 +PRStatus rval = PR_SUCCESS; 1.1059 + 1.1060 + PR_Lock(tpool->jobq.lock); 1.1061 + tpool->shutdown = PR_TRUE; 1.1062 + PR_NotifyAllCondVar(tpool->shutdown_cv); 1.1063 + PR_Unlock(tpool->jobq.lock); 1.1064 + 1.1065 + return rval; 1.1066 +} 1.1067 + 1.1068 +/* 1.1069 + * join thread pool 1.1070 + * wait for termination of worker threads 1.1071 + * reclaim threadpool resources 1.1072 + */ 1.1073 +PR_IMPLEMENT(PRStatus) 1.1074 +PR_JoinThreadPool(PRThreadPool *tpool) 1.1075 +{ 1.1076 +PRStatus rval = PR_SUCCESS; 1.1077 +PRCList *head; 1.1078 +PRStatus rval_status; 1.1079 + 1.1080 + PR_Lock(tpool->jobq.lock); 1.1081 + while (!tpool->shutdown) 1.1082 + PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT); 1.1083 + 1.1084 + /* 1.1085 + * wakeup worker threads 1.1086 + */ 1.1087 +#ifdef OPT_WINNT 1.1088 + /* 1.1089 + * post shutdown notification for all threads 1.1090 + */ 1.1091 + { 1.1092 + int i; 1.1093 + for(i=0; i < tpool->current_threads; i++) { 1.1094 + PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0, 1.1095 + TRUE, NULL); 1.1096 + } 1.1097 + } 1.1098 +#else 1.1099 + PR_NotifyAllCondVar(tpool->jobq.cv); 1.1100 +#endif 1.1101 + 1.1102 + /* 1.1103 + * wakeup io thread(s) 1.1104 + */ 1.1105 + notify_ioq(tpool); 1.1106 + 1.1107 + /* 1.1108 + * wakeup timer thread(s) 1.1109 + */ 1.1110 + PR_Lock(tpool->timerq.lock); 1.1111 + notify_timerq(tpool); 1.1112 + PR_Unlock(tpool->timerq.lock); 1.1113 + 1.1114 + while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) { 1.1115 + wthread *wthrp; 1.1116 + 1.1117 + head = PR_LIST_HEAD(&tpool->jobq.wthreads); 1.1118 + PR_REMOVE_AND_INIT_LINK(head); 1.1119 + PR_Unlock(tpool->jobq.lock); 1.1120 + wthrp = WTHREAD_LINKS_PTR(head); 1.1121 + rval_status = PR_JoinThread(wthrp->thread); 1.1122 + PR_ASSERT(PR_SUCCESS == rval_status); 1.1123 + PR_DELETE(wthrp); 1.1124 + PR_Lock(tpool->jobq.lock); 1.1125 + } 1.1126 + PR_Unlock(tpool->jobq.lock); 1.1127 + while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) { 1.1128 + wthread *wthrp; 1.1129 + 1.1130 + head = PR_LIST_HEAD(&tpool->ioq.wthreads); 1.1131 + PR_REMOVE_AND_INIT_LINK(head); 1.1132 + wthrp = WTHREAD_LINKS_PTR(head); 1.1133 + rval_status = PR_JoinThread(wthrp->thread); 1.1134 + PR_ASSERT(PR_SUCCESS == rval_status); 1.1135 + PR_DELETE(wthrp); 1.1136 + } 1.1137 + 1.1138 + while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) { 1.1139 + wthread *wthrp; 1.1140 + 1.1141 + head = PR_LIST_HEAD(&tpool->timerq.wthreads); 1.1142 + PR_REMOVE_AND_INIT_LINK(head); 1.1143 + wthrp = WTHREAD_LINKS_PTR(head); 1.1144 + rval_status = PR_JoinThread(wthrp->thread); 1.1145 + PR_ASSERT(PR_SUCCESS == rval_status); 1.1146 + PR_DELETE(wthrp); 1.1147 + } 1.1148 + 1.1149 + /* 1.1150 + * Delete queued jobs 1.1151 + */ 1.1152 + while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) { 1.1153 + PRJob *jobp; 1.1154 + 1.1155 + head = PR_LIST_HEAD(&tpool->jobq.list); 1.1156 + PR_REMOVE_AND_INIT_LINK(head); 1.1157 + jobp = JOB_LINKS_PTR(head); 1.1158 + tpool->jobq.cnt--; 1.1159 + delete_job(jobp); 1.1160 + } 1.1161 + 1.1162 + /* delete io jobs */ 1.1163 + while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) { 1.1164 + PRJob *jobp; 1.1165 + 1.1166 + head = PR_LIST_HEAD(&tpool->ioq.list); 1.1167 + PR_REMOVE_AND_INIT_LINK(head); 1.1168 + tpool->ioq.cnt--; 1.1169 + jobp = JOB_LINKS_PTR(head); 1.1170 + delete_job(jobp); 1.1171 + } 1.1172 + 1.1173 + /* delete timer jobs */ 1.1174 + while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) { 1.1175 + PRJob *jobp; 1.1176 + 1.1177 + head = PR_LIST_HEAD(&tpool->timerq.list); 1.1178 + PR_REMOVE_AND_INIT_LINK(head); 1.1179 + tpool->timerq.cnt--; 1.1180 + jobp = JOB_LINKS_PTR(head); 1.1181 + delete_job(jobp); 1.1182 + } 1.1183 + 1.1184 + PR_ASSERT(0 == tpool->jobq.cnt); 1.1185 + PR_ASSERT(0 == tpool->ioq.cnt); 1.1186 + PR_ASSERT(0 == tpool->timerq.cnt); 1.1187 + 1.1188 + delete_threadpool(tpool); 1.1189 + return rval; 1.1190 +}