michael@0: /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: #include "nspr.h" michael@0: michael@0: /* michael@0: * Thread pools michael@0: * Thread pools create and manage threads to provide support for michael@0: * scheduling jobs onto one or more threads. michael@0: * michael@0: */ michael@0: #ifdef OPT_WINNT michael@0: #include michael@0: #endif michael@0: michael@0: /* michael@0: * worker thread michael@0: */ michael@0: typedef struct wthread { michael@0: PRCList links; michael@0: PRThread *thread; michael@0: } wthread; michael@0: michael@0: /* michael@0: * queue of timer jobs michael@0: */ michael@0: typedef struct timer_jobq { michael@0: PRCList list; michael@0: PRLock *lock; michael@0: PRCondVar *cv; michael@0: PRInt32 cnt; michael@0: PRCList wthreads; michael@0: } timer_jobq; michael@0: michael@0: /* michael@0: * queue of jobs michael@0: */ michael@0: typedef struct tp_jobq { michael@0: PRCList list; michael@0: PRInt32 cnt; michael@0: PRLock *lock; michael@0: PRCondVar *cv; michael@0: PRCList wthreads; michael@0: #ifdef OPT_WINNT michael@0: HANDLE nt_completion_port; michael@0: #endif michael@0: } tp_jobq; michael@0: michael@0: /* michael@0: * queue of IO jobs michael@0: */ michael@0: typedef struct io_jobq { michael@0: PRCList list; michael@0: PRPollDesc *pollfds; michael@0: PRInt32 npollfds; michael@0: PRJob **polljobs; michael@0: PRLock *lock; michael@0: PRInt32 cnt; michael@0: PRFileDesc *notify_fd; michael@0: PRCList wthreads; michael@0: } io_jobq; michael@0: michael@0: /* michael@0: * Threadpool michael@0: */ michael@0: struct PRThreadPool { michael@0: PRInt32 init_threads; michael@0: PRInt32 max_threads; michael@0: PRInt32 current_threads; michael@0: PRInt32 idle_threads; michael@0: PRUint32 stacksize; michael@0: tp_jobq jobq; michael@0: io_jobq ioq; michael@0: timer_jobq timerq; michael@0: PRLock *join_lock; /* used with jobp->join_cv */ michael@0: PRCondVar *shutdown_cv; michael@0: PRBool shutdown; michael@0: }; michael@0: michael@0: typedef enum io_op_type michael@0: { JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type; michael@0: michael@0: #ifdef OPT_WINNT michael@0: typedef struct NT_notifier { michael@0: OVERLAPPED overlapped; /* must be first */ michael@0: PRJob *jobp; michael@0: } NT_notifier; michael@0: #endif michael@0: michael@0: struct PRJob { michael@0: PRCList links; /* for linking jobs */ michael@0: PRBool on_ioq; /* job on ioq */ michael@0: PRBool on_timerq; /* job on timerq */ michael@0: PRJobFn job_func; michael@0: void *job_arg; michael@0: PRCondVar *join_cv; michael@0: PRBool join_wait; /* == PR_TRUE, when waiting to join */ michael@0: PRCondVar *cancel_cv; /* for cancelling IO jobs */ michael@0: PRBool cancel_io; /* for cancelling IO jobs */ michael@0: PRThreadPool *tpool; /* back pointer to thread pool */ michael@0: PRJobIoDesc *iod; michael@0: io_op_type io_op; michael@0: PRInt16 io_poll_flags; michael@0: PRNetAddr *netaddr; michael@0: PRIntervalTime timeout; /* relative value */ michael@0: PRIntervalTime absolute; michael@0: #ifdef OPT_WINNT michael@0: NT_notifier nt_notifier; michael@0: #endif michael@0: }; michael@0: michael@0: #define JOB_LINKS_PTR(_qp) \ michael@0: ((PRJob *) ((char *) (_qp) - offsetof(PRJob, links))) michael@0: michael@0: #define WTHREAD_LINKS_PTR(_qp) \ michael@0: ((wthread *) ((char *) (_qp) - offsetof(wthread, links))) michael@0: michael@0: #define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv) michael@0: michael@0: #define JOIN_NOTIFY(_jobp) \ michael@0: PR_BEGIN_MACRO \ michael@0: PR_Lock(_jobp->tpool->join_lock); \ michael@0: _jobp->join_wait = PR_FALSE; \ michael@0: PR_NotifyCondVar(_jobp->join_cv); \ michael@0: PR_Unlock(_jobp->tpool->join_lock); \ michael@0: PR_END_MACRO michael@0: michael@0: #define CANCEL_IO_JOB(jobp) \ michael@0: PR_BEGIN_MACRO \ michael@0: jobp->cancel_io = PR_FALSE; \ michael@0: jobp->on_ioq = PR_FALSE; \ michael@0: PR_REMOVE_AND_INIT_LINK(&jobp->links); \ michael@0: tp->ioq.cnt--; \ michael@0: PR_NotifyCondVar(jobp->cancel_cv); \ michael@0: PR_END_MACRO michael@0: michael@0: static void delete_job(PRJob *jobp); michael@0: static PRThreadPool * alloc_threadpool(void); michael@0: static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp); michael@0: static void notify_ioq(PRThreadPool *tp); michael@0: static void notify_timerq(PRThreadPool *tp); michael@0: michael@0: /* michael@0: * locks are acquired in the following order michael@0: * michael@0: * tp->ioq.lock,tp->timerq.lock michael@0: * | michael@0: * V michael@0: * tp->jobq->lock michael@0: */ michael@0: michael@0: /* michael@0: * worker thread function michael@0: */ michael@0: static void wstart(void *arg) michael@0: { michael@0: PRThreadPool *tp = (PRThreadPool *) arg; michael@0: PRCList *head; michael@0: michael@0: /* michael@0: * execute jobs until shutdown michael@0: */ michael@0: while (!tp->shutdown) { michael@0: PRJob *jobp; michael@0: #ifdef OPT_WINNT michael@0: BOOL rv; michael@0: DWORD unused, shutdown; michael@0: LPOVERLAPPED olp; michael@0: michael@0: PR_Lock(tp->jobq.lock); michael@0: tp->idle_threads++; michael@0: PR_Unlock(tp->jobq.lock); michael@0: rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port, michael@0: &unused, &shutdown, &olp, INFINITE); michael@0: michael@0: PR_ASSERT(rv); michael@0: if (shutdown) michael@0: break; michael@0: jobp = ((NT_notifier *) olp)->jobp; michael@0: PR_Lock(tp->jobq.lock); michael@0: tp->idle_threads--; michael@0: tp->jobq.cnt--; michael@0: PR_Unlock(tp->jobq.lock); michael@0: #else michael@0: michael@0: PR_Lock(tp->jobq.lock); michael@0: while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) { michael@0: tp->idle_threads++; michael@0: PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT); michael@0: tp->idle_threads--; michael@0: } michael@0: if (tp->shutdown) { michael@0: PR_Unlock(tp->jobq.lock); michael@0: break; michael@0: } michael@0: head = PR_LIST_HEAD(&tp->jobq.list); michael@0: /* michael@0: * remove job from queue michael@0: */ michael@0: PR_REMOVE_AND_INIT_LINK(head); michael@0: tp->jobq.cnt--; michael@0: jobp = JOB_LINKS_PTR(head); michael@0: PR_Unlock(tp->jobq.lock); michael@0: #endif michael@0: michael@0: jobp->job_func(jobp->job_arg); michael@0: if (!JOINABLE_JOB(jobp)) { michael@0: delete_job(jobp); michael@0: } else { michael@0: JOIN_NOTIFY(jobp); michael@0: } michael@0: } michael@0: PR_Lock(tp->jobq.lock); michael@0: tp->current_threads--; michael@0: PR_Unlock(tp->jobq.lock); michael@0: } michael@0: michael@0: /* michael@0: * add a job to the work queue michael@0: */ michael@0: static void michael@0: add_to_jobq(PRThreadPool *tp, PRJob *jobp) michael@0: { michael@0: /* michael@0: * add to jobq michael@0: */ michael@0: #ifdef OPT_WINNT michael@0: PR_Lock(tp->jobq.lock); michael@0: tp->jobq.cnt++; michael@0: PR_Unlock(tp->jobq.lock); michael@0: /* michael@0: * notify worker thread(s) michael@0: */ michael@0: PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0, michael@0: FALSE, &jobp->nt_notifier.overlapped); michael@0: #else michael@0: PR_Lock(tp->jobq.lock); michael@0: PR_APPEND_LINK(&jobp->links,&tp->jobq.list); michael@0: tp->jobq.cnt++; michael@0: if ((tp->idle_threads < tp->jobq.cnt) && michael@0: (tp->current_threads < tp->max_threads)) { michael@0: wthread *wthrp; michael@0: /* michael@0: * increment thread count and unlock the jobq lock michael@0: */ michael@0: tp->current_threads++; michael@0: PR_Unlock(tp->jobq.lock); michael@0: /* create new worker thread */ michael@0: wthrp = PR_NEWZAP(wthread); michael@0: if (wthrp) { michael@0: wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart, michael@0: tp, PR_PRIORITY_NORMAL, michael@0: PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize); michael@0: if (NULL == wthrp->thread) { michael@0: PR_DELETE(wthrp); /* this sets wthrp to NULL */ michael@0: } michael@0: } michael@0: PR_Lock(tp->jobq.lock); michael@0: if (NULL == wthrp) { michael@0: tp->current_threads--; michael@0: } else { michael@0: PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads); michael@0: } michael@0: } michael@0: /* michael@0: * wakeup a worker thread michael@0: */ michael@0: PR_NotifyCondVar(tp->jobq.cv); michael@0: PR_Unlock(tp->jobq.lock); michael@0: #endif michael@0: } michael@0: michael@0: /* michael@0: * io worker thread function michael@0: */ michael@0: static void io_wstart(void *arg) michael@0: { michael@0: PRThreadPool *tp = (PRThreadPool *) arg; michael@0: int pollfd_cnt, pollfds_used; michael@0: int rv; michael@0: PRCList *qp, *nextqp; michael@0: PRPollDesc *pollfds; michael@0: PRJob **polljobs; michael@0: int poll_timeout; michael@0: PRIntervalTime now; michael@0: michael@0: /* michael@0: * scan io_jobq michael@0: * construct poll list michael@0: * call PR_Poll michael@0: * for all fds, for which poll returns true, move the job to michael@0: * jobq and wakeup worker thread. michael@0: */ michael@0: while (!tp->shutdown) { michael@0: PRJob *jobp; michael@0: michael@0: pollfd_cnt = tp->ioq.cnt + 10; michael@0: if (pollfd_cnt > tp->ioq.npollfds) { michael@0: michael@0: /* michael@0: * re-allocate pollfd array if the current one is not large michael@0: * enough michael@0: */ michael@0: if (NULL != tp->ioq.pollfds) michael@0: PR_Free(tp->ioq.pollfds); michael@0: tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt * michael@0: (sizeof(PRPollDesc) + sizeof(PRJob *))); michael@0: PR_ASSERT(NULL != tp->ioq.pollfds); michael@0: /* michael@0: * array of pollfds michael@0: */ michael@0: pollfds = tp->ioq.pollfds; michael@0: tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]); michael@0: /* michael@0: * parallel array of jobs michael@0: */ michael@0: polljobs = tp->ioq.polljobs; michael@0: tp->ioq.npollfds = pollfd_cnt; michael@0: } michael@0: michael@0: pollfds_used = 0; michael@0: /* michael@0: * add the notify fd; used for unblocking io thread(s) michael@0: */ michael@0: pollfds[pollfds_used].fd = tp->ioq.notify_fd; michael@0: pollfds[pollfds_used].in_flags = PR_POLL_READ; michael@0: pollfds[pollfds_used].out_flags = 0; michael@0: polljobs[pollfds_used] = NULL; michael@0: pollfds_used++; michael@0: /* michael@0: * fill in the pollfd array michael@0: */ michael@0: PR_Lock(tp->ioq.lock); michael@0: for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) { michael@0: nextqp = qp->next; michael@0: jobp = JOB_LINKS_PTR(qp); michael@0: if (jobp->cancel_io) { michael@0: CANCEL_IO_JOB(jobp); michael@0: continue; michael@0: } michael@0: if (pollfds_used == (pollfd_cnt)) michael@0: break; michael@0: pollfds[pollfds_used].fd = jobp->iod->socket; michael@0: pollfds[pollfds_used].in_flags = jobp->io_poll_flags; michael@0: pollfds[pollfds_used].out_flags = 0; michael@0: polljobs[pollfds_used] = jobp; michael@0: michael@0: pollfds_used++; michael@0: } michael@0: if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) { michael@0: qp = tp->ioq.list.next; michael@0: jobp = JOB_LINKS_PTR(qp); michael@0: if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) michael@0: poll_timeout = PR_INTERVAL_NO_TIMEOUT; michael@0: else if (PR_INTERVAL_NO_WAIT == jobp->timeout) michael@0: poll_timeout = PR_INTERVAL_NO_WAIT; michael@0: else { michael@0: poll_timeout = jobp->absolute - PR_IntervalNow(); michael@0: if (poll_timeout <= 0) /* already timed out */ michael@0: poll_timeout = PR_INTERVAL_NO_WAIT; michael@0: } michael@0: } else { michael@0: poll_timeout = PR_INTERVAL_NO_TIMEOUT; michael@0: } michael@0: PR_Unlock(tp->ioq.lock); michael@0: michael@0: /* michael@0: * XXXX michael@0: * should retry if more jobs have been added to the queue? michael@0: * michael@0: */ michael@0: PR_ASSERT(pollfds_used <= pollfd_cnt); michael@0: rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout); michael@0: michael@0: if (tp->shutdown) { michael@0: break; michael@0: } michael@0: michael@0: if (rv > 0) { michael@0: /* michael@0: * at least one io event is set michael@0: */ michael@0: PRStatus rval_status; michael@0: PRInt32 index; michael@0: michael@0: PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd); michael@0: /* michael@0: * reset the pollable event, if notified michael@0: */ michael@0: if (pollfds[0].out_flags & PR_POLL_READ) { michael@0: rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd); michael@0: PR_ASSERT(PR_SUCCESS == rval_status); michael@0: } michael@0: michael@0: for(index = 1; index < (pollfds_used); index++) { michael@0: PRInt16 events = pollfds[index].in_flags; michael@0: PRInt16 revents = pollfds[index].out_flags; michael@0: jobp = polljobs[index]; michael@0: michael@0: if ((revents & PR_POLL_NVAL) || /* busted in all cases */ michael@0: (revents & PR_POLL_ERR) || michael@0: ((events & PR_POLL_WRITE) && michael@0: (revents & PR_POLL_HUP))) { /* write op & hup */ michael@0: PR_Lock(tp->ioq.lock); michael@0: if (jobp->cancel_io) { michael@0: CANCEL_IO_JOB(jobp); michael@0: PR_Unlock(tp->ioq.lock); michael@0: continue; michael@0: } michael@0: PR_REMOVE_AND_INIT_LINK(&jobp->links); michael@0: tp->ioq.cnt--; michael@0: jobp->on_ioq = PR_FALSE; michael@0: PR_Unlock(tp->ioq.lock); michael@0: michael@0: /* set error */ michael@0: if (PR_POLL_NVAL & revents) michael@0: jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR; michael@0: else if (PR_POLL_HUP & revents) michael@0: jobp->iod->error = PR_CONNECT_RESET_ERROR; michael@0: else michael@0: jobp->iod->error = PR_IO_ERROR; michael@0: michael@0: /* michael@0: * add to jobq michael@0: */ michael@0: add_to_jobq(tp, jobp); michael@0: } else if (revents) { michael@0: /* michael@0: * add to jobq michael@0: */ michael@0: PR_Lock(tp->ioq.lock); michael@0: if (jobp->cancel_io) { michael@0: CANCEL_IO_JOB(jobp); michael@0: PR_Unlock(tp->ioq.lock); michael@0: continue; michael@0: } michael@0: PR_REMOVE_AND_INIT_LINK(&jobp->links); michael@0: tp->ioq.cnt--; michael@0: jobp->on_ioq = PR_FALSE; michael@0: PR_Unlock(tp->ioq.lock); michael@0: michael@0: if (jobp->io_op == JOB_IO_CONNECT) { michael@0: if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS) michael@0: jobp->iod->error = 0; michael@0: else michael@0: jobp->iod->error = PR_GetError(); michael@0: } else michael@0: jobp->iod->error = 0; michael@0: michael@0: add_to_jobq(tp, jobp); michael@0: } michael@0: } michael@0: } michael@0: /* michael@0: * timeout processing michael@0: */ michael@0: now = PR_IntervalNow(); michael@0: PR_Lock(tp->ioq.lock); michael@0: for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) { michael@0: nextqp = qp->next; michael@0: jobp = JOB_LINKS_PTR(qp); michael@0: if (jobp->cancel_io) { michael@0: CANCEL_IO_JOB(jobp); michael@0: continue; michael@0: } michael@0: if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout) michael@0: break; michael@0: if ((PR_INTERVAL_NO_WAIT != jobp->timeout) && michael@0: ((PRInt32)(jobp->absolute - now) > 0)) michael@0: break; michael@0: PR_REMOVE_AND_INIT_LINK(&jobp->links); michael@0: tp->ioq.cnt--; michael@0: jobp->on_ioq = PR_FALSE; michael@0: jobp->iod->error = PR_IO_TIMEOUT_ERROR; michael@0: add_to_jobq(tp, jobp); michael@0: } michael@0: PR_Unlock(tp->ioq.lock); michael@0: } michael@0: } michael@0: michael@0: /* michael@0: * timer worker thread function michael@0: */ michael@0: static void timer_wstart(void *arg) michael@0: { michael@0: PRThreadPool *tp = (PRThreadPool *) arg; michael@0: PRCList *qp; michael@0: PRIntervalTime timeout; michael@0: PRIntervalTime now; michael@0: michael@0: /* michael@0: * call PR_WaitCondVar with minimum value of all timeouts michael@0: */ michael@0: while (!tp->shutdown) { michael@0: PRJob *jobp; michael@0: michael@0: PR_Lock(tp->timerq.lock); michael@0: if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) { michael@0: timeout = PR_INTERVAL_NO_TIMEOUT; michael@0: } else { michael@0: PRCList *qp; michael@0: michael@0: qp = tp->timerq.list.next; michael@0: jobp = JOB_LINKS_PTR(qp); michael@0: michael@0: timeout = jobp->absolute - PR_IntervalNow(); michael@0: if (timeout <= 0) michael@0: timeout = PR_INTERVAL_NO_WAIT; /* already timed out */ michael@0: } michael@0: if (PR_INTERVAL_NO_WAIT != timeout) michael@0: PR_WaitCondVar(tp->timerq.cv, timeout); michael@0: if (tp->shutdown) { michael@0: PR_Unlock(tp->timerq.lock); michael@0: break; michael@0: } michael@0: /* michael@0: * move expired-timer jobs to jobq michael@0: */ michael@0: now = PR_IntervalNow(); michael@0: while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) { michael@0: qp = tp->timerq.list.next; michael@0: jobp = JOB_LINKS_PTR(qp); michael@0: michael@0: if ((PRInt32)(jobp->absolute - now) > 0) { michael@0: break; michael@0: } michael@0: /* michael@0: * job timed out michael@0: */ michael@0: PR_REMOVE_AND_INIT_LINK(&jobp->links); michael@0: tp->timerq.cnt--; michael@0: jobp->on_timerq = PR_FALSE; michael@0: add_to_jobq(tp, jobp); michael@0: } michael@0: PR_Unlock(tp->timerq.lock); michael@0: } michael@0: } michael@0: michael@0: static void michael@0: delete_threadpool(PRThreadPool *tp) michael@0: { michael@0: if (NULL != tp) { michael@0: if (NULL != tp->shutdown_cv) michael@0: PR_DestroyCondVar(tp->shutdown_cv); michael@0: if (NULL != tp->jobq.cv) michael@0: PR_DestroyCondVar(tp->jobq.cv); michael@0: if (NULL != tp->jobq.lock) michael@0: PR_DestroyLock(tp->jobq.lock); michael@0: if (NULL != tp->join_lock) michael@0: PR_DestroyLock(tp->join_lock); michael@0: #ifdef OPT_WINNT michael@0: if (NULL != tp->jobq.nt_completion_port) michael@0: CloseHandle(tp->jobq.nt_completion_port); michael@0: #endif michael@0: /* Timer queue */ michael@0: if (NULL != tp->timerq.cv) michael@0: PR_DestroyCondVar(tp->timerq.cv); michael@0: if (NULL != tp->timerq.lock) michael@0: PR_DestroyLock(tp->timerq.lock); michael@0: michael@0: if (NULL != tp->ioq.lock) michael@0: PR_DestroyLock(tp->ioq.lock); michael@0: if (NULL != tp->ioq.pollfds) michael@0: PR_Free(tp->ioq.pollfds); michael@0: if (NULL != tp->ioq.notify_fd) michael@0: PR_DestroyPollableEvent(tp->ioq.notify_fd); michael@0: PR_Free(tp); michael@0: } michael@0: return; michael@0: } michael@0: michael@0: static PRThreadPool * michael@0: alloc_threadpool(void) michael@0: { michael@0: PRThreadPool *tp; michael@0: michael@0: tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp)); michael@0: if (NULL == tp) michael@0: goto failed; michael@0: tp->jobq.lock = PR_NewLock(); michael@0: if (NULL == tp->jobq.lock) michael@0: goto failed; michael@0: tp->jobq.cv = PR_NewCondVar(tp->jobq.lock); michael@0: if (NULL == tp->jobq.cv) michael@0: goto failed; michael@0: tp->join_lock = PR_NewLock(); michael@0: if (NULL == tp->join_lock) michael@0: goto failed; michael@0: #ifdef OPT_WINNT michael@0: tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, michael@0: NULL, 0, 0); michael@0: if (NULL == tp->jobq.nt_completion_port) michael@0: goto failed; michael@0: #endif michael@0: michael@0: tp->ioq.lock = PR_NewLock(); michael@0: if (NULL == tp->ioq.lock) michael@0: goto failed; michael@0: michael@0: /* Timer queue */ michael@0: michael@0: tp->timerq.lock = PR_NewLock(); michael@0: if (NULL == tp->timerq.lock) michael@0: goto failed; michael@0: tp->timerq.cv = PR_NewCondVar(tp->timerq.lock); michael@0: if (NULL == tp->timerq.cv) michael@0: goto failed; michael@0: michael@0: tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock); michael@0: if (NULL == tp->shutdown_cv) michael@0: goto failed; michael@0: tp->ioq.notify_fd = PR_NewPollableEvent(); michael@0: if (NULL == tp->ioq.notify_fd) michael@0: goto failed; michael@0: return tp; michael@0: failed: michael@0: delete_threadpool(tp); michael@0: PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); michael@0: return NULL; michael@0: } michael@0: michael@0: /* Create thread pool */ michael@0: PR_IMPLEMENT(PRThreadPool *) michael@0: PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads, michael@0: PRUint32 stacksize) michael@0: { michael@0: PRThreadPool *tp; michael@0: PRThread *thr; michael@0: int i; michael@0: wthread *wthrp; michael@0: michael@0: tp = alloc_threadpool(); michael@0: if (NULL == tp) michael@0: return NULL; michael@0: michael@0: tp->init_threads = initial_threads; michael@0: tp->max_threads = max_threads; michael@0: tp->stacksize = stacksize; michael@0: PR_INIT_CLIST(&tp->jobq.list); michael@0: PR_INIT_CLIST(&tp->ioq.list); michael@0: PR_INIT_CLIST(&tp->timerq.list); michael@0: PR_INIT_CLIST(&tp->jobq.wthreads); michael@0: PR_INIT_CLIST(&tp->ioq.wthreads); michael@0: PR_INIT_CLIST(&tp->timerq.wthreads); michael@0: tp->shutdown = PR_FALSE; michael@0: michael@0: PR_Lock(tp->jobq.lock); michael@0: for(i=0; i < initial_threads; ++i) { michael@0: michael@0: thr = PR_CreateThread(PR_USER_THREAD, wstart, michael@0: tp, PR_PRIORITY_NORMAL, michael@0: PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize); michael@0: PR_ASSERT(thr); michael@0: wthrp = PR_NEWZAP(wthread); michael@0: PR_ASSERT(wthrp); michael@0: wthrp->thread = thr; michael@0: PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads); michael@0: } michael@0: tp->current_threads = initial_threads; michael@0: michael@0: thr = PR_CreateThread(PR_USER_THREAD, io_wstart, michael@0: tp, PR_PRIORITY_NORMAL, michael@0: PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize); michael@0: PR_ASSERT(thr); michael@0: wthrp = PR_NEWZAP(wthread); michael@0: PR_ASSERT(wthrp); michael@0: wthrp->thread = thr; michael@0: PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads); michael@0: michael@0: thr = PR_CreateThread(PR_USER_THREAD, timer_wstart, michael@0: tp, PR_PRIORITY_NORMAL, michael@0: PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize); michael@0: PR_ASSERT(thr); michael@0: wthrp = PR_NEWZAP(wthread); michael@0: PR_ASSERT(wthrp); michael@0: wthrp->thread = thr; michael@0: PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads); michael@0: michael@0: PR_Unlock(tp->jobq.lock); michael@0: return tp; michael@0: } michael@0: michael@0: static void michael@0: delete_job(PRJob *jobp) michael@0: { michael@0: if (NULL != jobp) { michael@0: if (NULL != jobp->join_cv) { michael@0: PR_DestroyCondVar(jobp->join_cv); michael@0: jobp->join_cv = NULL; michael@0: } michael@0: if (NULL != jobp->cancel_cv) { michael@0: PR_DestroyCondVar(jobp->cancel_cv); michael@0: jobp->cancel_cv = NULL; michael@0: } michael@0: PR_DELETE(jobp); michael@0: } michael@0: } michael@0: michael@0: static PRJob * michael@0: alloc_job(PRBool joinable, PRThreadPool *tp) michael@0: { michael@0: PRJob *jobp; michael@0: michael@0: jobp = PR_NEWZAP(PRJob); michael@0: if (NULL == jobp) michael@0: goto failed; michael@0: if (joinable) { michael@0: jobp->join_cv = PR_NewCondVar(tp->join_lock); michael@0: jobp->join_wait = PR_TRUE; michael@0: if (NULL == jobp->join_cv) michael@0: goto failed; michael@0: } else { michael@0: jobp->join_cv = NULL; michael@0: } michael@0: #ifdef OPT_WINNT michael@0: jobp->nt_notifier.jobp = jobp; michael@0: #endif michael@0: return jobp; michael@0: failed: michael@0: delete_job(jobp); michael@0: PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0); michael@0: return NULL; michael@0: } michael@0: michael@0: /* queue a job */ michael@0: PR_IMPLEMENT(PRJob *) michael@0: PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable) michael@0: { michael@0: PRJob *jobp; michael@0: michael@0: jobp = alloc_job(joinable, tpool); michael@0: if (NULL == jobp) michael@0: return NULL; michael@0: michael@0: jobp->job_func = fn; michael@0: jobp->job_arg = arg; michael@0: jobp->tpool = tpool; michael@0: michael@0: add_to_jobq(tpool, jobp); michael@0: return jobp; michael@0: } michael@0: michael@0: /* queue a job, when a socket is readable or writeable */ michael@0: static PRJob * michael@0: queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg, michael@0: PRBool joinable, io_op_type op) michael@0: { michael@0: PRJob *jobp; michael@0: PRIntervalTime now; michael@0: michael@0: jobp = alloc_job(joinable, tpool); michael@0: if (NULL == jobp) { michael@0: return NULL; michael@0: } michael@0: michael@0: /* michael@0: * Add a new job to io_jobq michael@0: * wakeup io worker thread michael@0: */ michael@0: michael@0: jobp->job_func = fn; michael@0: jobp->job_arg = arg; michael@0: jobp->tpool = tpool; michael@0: jobp->iod = iod; michael@0: if (JOB_IO_READ == op) { michael@0: jobp->io_op = JOB_IO_READ; michael@0: jobp->io_poll_flags = PR_POLL_READ; michael@0: } else if (JOB_IO_WRITE == op) { michael@0: jobp->io_op = JOB_IO_WRITE; michael@0: jobp->io_poll_flags = PR_POLL_WRITE; michael@0: } else if (JOB_IO_ACCEPT == op) { michael@0: jobp->io_op = JOB_IO_ACCEPT; michael@0: jobp->io_poll_flags = PR_POLL_READ; michael@0: } else if (JOB_IO_CONNECT == op) { michael@0: jobp->io_op = JOB_IO_CONNECT; michael@0: jobp->io_poll_flags = PR_POLL_WRITE|PR_POLL_EXCEPT; michael@0: } else { michael@0: delete_job(jobp); michael@0: PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); michael@0: return NULL; michael@0: } michael@0: michael@0: jobp->timeout = iod->timeout; michael@0: if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) || michael@0: (PR_INTERVAL_NO_WAIT == iod->timeout)) { michael@0: jobp->absolute = iod->timeout; michael@0: } else { michael@0: now = PR_IntervalNow(); michael@0: jobp->absolute = now + iod->timeout; michael@0: } michael@0: michael@0: michael@0: PR_Lock(tpool->ioq.lock); michael@0: michael@0: if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) || michael@0: (PR_INTERVAL_NO_TIMEOUT == iod->timeout)) { michael@0: PR_APPEND_LINK(&jobp->links,&tpool->ioq.list); michael@0: } else if (PR_INTERVAL_NO_WAIT == iod->timeout) { michael@0: PR_INSERT_LINK(&jobp->links,&tpool->ioq.list); michael@0: } else { michael@0: PRCList *qp; michael@0: PRJob *tmp_jobp; michael@0: /* michael@0: * insert into the timeout-sorted ioq michael@0: */ michael@0: for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list; michael@0: qp = qp->prev) { michael@0: tmp_jobp = JOB_LINKS_PTR(qp); michael@0: if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { michael@0: break; michael@0: } michael@0: } michael@0: PR_INSERT_AFTER(&jobp->links,qp); michael@0: } michael@0: michael@0: jobp->on_ioq = PR_TRUE; michael@0: tpool->ioq.cnt++; michael@0: /* michael@0: * notify io worker thread(s) michael@0: */ michael@0: PR_Unlock(tpool->ioq.lock); michael@0: notify_ioq(tpool); michael@0: return jobp; michael@0: } michael@0: michael@0: /* queue a job, when a socket is readable */ michael@0: PR_IMPLEMENT(PRJob *) michael@0: PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg, michael@0: PRBool joinable) michael@0: { michael@0: return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ)); michael@0: } michael@0: michael@0: /* queue a job, when a socket is writeable */ michael@0: PR_IMPLEMENT(PRJob *) michael@0: PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,void * arg, michael@0: PRBool joinable) michael@0: { michael@0: return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE)); michael@0: } michael@0: michael@0: michael@0: /* queue a job, when a socket has a pending connection */ michael@0: PR_IMPLEMENT(PRJob *) michael@0: PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, michael@0: void * arg, PRBool joinable) michael@0: { michael@0: return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT)); michael@0: } michael@0: michael@0: /* queue a job, when a socket can be connected */ michael@0: PR_IMPLEMENT(PRJob *) michael@0: PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod, michael@0: const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable) michael@0: { michael@0: PRStatus rv; michael@0: PRErrorCode err; michael@0: michael@0: rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT); michael@0: if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)){ michael@0: /* connection pending */ michael@0: return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT)); michael@0: } else { michael@0: /* michael@0: * connection succeeded or failed; add to jobq right away michael@0: */ michael@0: if (rv == PR_FAILURE) michael@0: iod->error = err; michael@0: else michael@0: iod->error = 0; michael@0: return(PR_QueueJob(tpool, fn, arg, joinable)); michael@0: } michael@0: } michael@0: michael@0: /* queue a job, when a timer expires */ michael@0: PR_IMPLEMENT(PRJob *) michael@0: PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout, michael@0: PRJobFn fn, void * arg, PRBool joinable) michael@0: { michael@0: PRIntervalTime now; michael@0: PRJob *jobp; michael@0: michael@0: if (PR_INTERVAL_NO_TIMEOUT == timeout) { michael@0: PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); michael@0: return NULL; michael@0: } michael@0: if (PR_INTERVAL_NO_WAIT == timeout) { michael@0: /* michael@0: * no waiting; add to jobq right away michael@0: */ michael@0: return(PR_QueueJob(tpool, fn, arg, joinable)); michael@0: } michael@0: jobp = alloc_job(joinable, tpool); michael@0: if (NULL == jobp) { michael@0: return NULL; michael@0: } michael@0: michael@0: /* michael@0: * Add a new job to timer_jobq michael@0: * wakeup timer worker thread michael@0: */ michael@0: michael@0: jobp->job_func = fn; michael@0: jobp->job_arg = arg; michael@0: jobp->tpool = tpool; michael@0: jobp->timeout = timeout; michael@0: michael@0: now = PR_IntervalNow(); michael@0: jobp->absolute = now + timeout; michael@0: michael@0: michael@0: PR_Lock(tpool->timerq.lock); michael@0: jobp->on_timerq = PR_TRUE; michael@0: if (PR_CLIST_IS_EMPTY(&tpool->timerq.list)) michael@0: PR_APPEND_LINK(&jobp->links,&tpool->timerq.list); michael@0: else { michael@0: PRCList *qp; michael@0: PRJob *tmp_jobp; michael@0: /* michael@0: * insert into the sorted timer jobq michael@0: */ michael@0: for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list; michael@0: qp = qp->prev) { michael@0: tmp_jobp = JOB_LINKS_PTR(qp); michael@0: if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { michael@0: break; michael@0: } michael@0: } michael@0: PR_INSERT_AFTER(&jobp->links,qp); michael@0: } michael@0: tpool->timerq.cnt++; michael@0: /* michael@0: * notify timer worker thread(s) michael@0: */ michael@0: notify_timerq(tpool); michael@0: PR_Unlock(tpool->timerq.lock); michael@0: return jobp; michael@0: } michael@0: michael@0: static void michael@0: notify_timerq(PRThreadPool *tp) michael@0: { michael@0: /* michael@0: * wakeup the timer thread(s) michael@0: */ michael@0: PR_NotifyCondVar(tp->timerq.cv); michael@0: } michael@0: michael@0: static void michael@0: notify_ioq(PRThreadPool *tp) michael@0: { michael@0: PRStatus rval_status; michael@0: michael@0: /* michael@0: * wakeup the io thread(s) michael@0: */ michael@0: rval_status = PR_SetPollableEvent(tp->ioq.notify_fd); michael@0: PR_ASSERT(PR_SUCCESS == rval_status); michael@0: } michael@0: michael@0: /* michael@0: * cancel a job michael@0: * michael@0: * XXXX: is this needed? likely to be removed michael@0: */ michael@0: PR_IMPLEMENT(PRStatus) michael@0: PR_CancelJob(PRJob *jobp) { michael@0: michael@0: PRStatus rval = PR_FAILURE; michael@0: PRThreadPool *tp; michael@0: michael@0: if (jobp->on_timerq) { michael@0: /* michael@0: * now, check again while holding the timerq lock michael@0: */ michael@0: tp = jobp->tpool; michael@0: PR_Lock(tp->timerq.lock); michael@0: if (jobp->on_timerq) { michael@0: jobp->on_timerq = PR_FALSE; michael@0: PR_REMOVE_AND_INIT_LINK(&jobp->links); michael@0: tp->timerq.cnt--; michael@0: PR_Unlock(tp->timerq.lock); michael@0: if (!JOINABLE_JOB(jobp)) { michael@0: delete_job(jobp); michael@0: } else { michael@0: JOIN_NOTIFY(jobp); michael@0: } michael@0: rval = PR_SUCCESS; michael@0: } else michael@0: PR_Unlock(tp->timerq.lock); michael@0: } else if (jobp->on_ioq) { michael@0: /* michael@0: * now, check again while holding the ioq lock michael@0: */ michael@0: tp = jobp->tpool; michael@0: PR_Lock(tp->ioq.lock); michael@0: if (jobp->on_ioq) { michael@0: jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock); michael@0: if (NULL == jobp->cancel_cv) { michael@0: PR_Unlock(tp->ioq.lock); michael@0: PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0); michael@0: return PR_FAILURE; michael@0: } michael@0: /* michael@0: * mark job 'cancelled' and notify io thread(s) michael@0: * XXXX: michael@0: * this assumes there is only one io thread; when there michael@0: * are multiple threads, the io thread processing this job michael@0: * must be notified. michael@0: */ michael@0: jobp->cancel_io = PR_TRUE; michael@0: PR_Unlock(tp->ioq.lock); /* release, reacquire ioq lock */ michael@0: notify_ioq(tp); michael@0: PR_Lock(tp->ioq.lock); michael@0: while (jobp->cancel_io) michael@0: PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT); michael@0: PR_Unlock(tp->ioq.lock); michael@0: PR_ASSERT(!jobp->on_ioq); michael@0: if (!JOINABLE_JOB(jobp)) { michael@0: delete_job(jobp); michael@0: } else { michael@0: JOIN_NOTIFY(jobp); michael@0: } michael@0: rval = PR_SUCCESS; michael@0: } else michael@0: PR_Unlock(tp->ioq.lock); michael@0: } michael@0: if (PR_FAILURE == rval) michael@0: PR_SetError(PR_INVALID_STATE_ERROR, 0); michael@0: return rval; michael@0: } michael@0: michael@0: /* join a job, wait until completion */ michael@0: PR_IMPLEMENT(PRStatus) michael@0: PR_JoinJob(PRJob *jobp) michael@0: { michael@0: if (!JOINABLE_JOB(jobp)) { michael@0: PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); michael@0: return PR_FAILURE; michael@0: } michael@0: PR_Lock(jobp->tpool->join_lock); michael@0: while(jobp->join_wait) michael@0: PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT); michael@0: PR_Unlock(jobp->tpool->join_lock); michael@0: delete_job(jobp); michael@0: return PR_SUCCESS; michael@0: } michael@0: michael@0: /* shutdown threadpool */ michael@0: PR_IMPLEMENT(PRStatus) michael@0: PR_ShutdownThreadPool(PRThreadPool *tpool) michael@0: { michael@0: PRStatus rval = PR_SUCCESS; michael@0: michael@0: PR_Lock(tpool->jobq.lock); michael@0: tpool->shutdown = PR_TRUE; michael@0: PR_NotifyAllCondVar(tpool->shutdown_cv); michael@0: PR_Unlock(tpool->jobq.lock); michael@0: michael@0: return rval; michael@0: } michael@0: michael@0: /* michael@0: * join thread pool michael@0: * wait for termination of worker threads michael@0: * reclaim threadpool resources michael@0: */ michael@0: PR_IMPLEMENT(PRStatus) michael@0: PR_JoinThreadPool(PRThreadPool *tpool) michael@0: { michael@0: PRStatus rval = PR_SUCCESS; michael@0: PRCList *head; michael@0: PRStatus rval_status; michael@0: michael@0: PR_Lock(tpool->jobq.lock); michael@0: while (!tpool->shutdown) michael@0: PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT); michael@0: michael@0: /* michael@0: * wakeup worker threads michael@0: */ michael@0: #ifdef OPT_WINNT michael@0: /* michael@0: * post shutdown notification for all threads michael@0: */ michael@0: { michael@0: int i; michael@0: for(i=0; i < tpool->current_threads; i++) { michael@0: PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0, michael@0: TRUE, NULL); michael@0: } michael@0: } michael@0: #else michael@0: PR_NotifyAllCondVar(tpool->jobq.cv); michael@0: #endif michael@0: michael@0: /* michael@0: * wakeup io thread(s) michael@0: */ michael@0: notify_ioq(tpool); michael@0: michael@0: /* michael@0: * wakeup timer thread(s) michael@0: */ michael@0: PR_Lock(tpool->timerq.lock); michael@0: notify_timerq(tpool); michael@0: PR_Unlock(tpool->timerq.lock); michael@0: michael@0: while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) { michael@0: wthread *wthrp; michael@0: michael@0: head = PR_LIST_HEAD(&tpool->jobq.wthreads); michael@0: PR_REMOVE_AND_INIT_LINK(head); michael@0: PR_Unlock(tpool->jobq.lock); michael@0: wthrp = WTHREAD_LINKS_PTR(head); michael@0: rval_status = PR_JoinThread(wthrp->thread); michael@0: PR_ASSERT(PR_SUCCESS == rval_status); michael@0: PR_DELETE(wthrp); michael@0: PR_Lock(tpool->jobq.lock); michael@0: } michael@0: PR_Unlock(tpool->jobq.lock); michael@0: while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) { michael@0: wthread *wthrp; michael@0: michael@0: head = PR_LIST_HEAD(&tpool->ioq.wthreads); michael@0: PR_REMOVE_AND_INIT_LINK(head); michael@0: wthrp = WTHREAD_LINKS_PTR(head); michael@0: rval_status = PR_JoinThread(wthrp->thread); michael@0: PR_ASSERT(PR_SUCCESS == rval_status); michael@0: PR_DELETE(wthrp); michael@0: } michael@0: michael@0: while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) { michael@0: wthread *wthrp; michael@0: michael@0: head = PR_LIST_HEAD(&tpool->timerq.wthreads); michael@0: PR_REMOVE_AND_INIT_LINK(head); michael@0: wthrp = WTHREAD_LINKS_PTR(head); michael@0: rval_status = PR_JoinThread(wthrp->thread); michael@0: PR_ASSERT(PR_SUCCESS == rval_status); michael@0: PR_DELETE(wthrp); michael@0: } michael@0: michael@0: /* michael@0: * Delete queued jobs michael@0: */ michael@0: while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) { michael@0: PRJob *jobp; michael@0: michael@0: head = PR_LIST_HEAD(&tpool->jobq.list); michael@0: PR_REMOVE_AND_INIT_LINK(head); michael@0: jobp = JOB_LINKS_PTR(head); michael@0: tpool->jobq.cnt--; michael@0: delete_job(jobp); michael@0: } michael@0: michael@0: /* delete io jobs */ michael@0: while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) { michael@0: PRJob *jobp; michael@0: michael@0: head = PR_LIST_HEAD(&tpool->ioq.list); michael@0: PR_REMOVE_AND_INIT_LINK(head); michael@0: tpool->ioq.cnt--; michael@0: jobp = JOB_LINKS_PTR(head); michael@0: delete_job(jobp); michael@0: } michael@0: michael@0: /* delete timer jobs */ michael@0: while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) { michael@0: PRJob *jobp; michael@0: michael@0: head = PR_LIST_HEAD(&tpool->timerq.list); michael@0: PR_REMOVE_AND_INIT_LINK(head); michael@0: tpool->timerq.cnt--; michael@0: jobp = JOB_LINKS_PTR(head); michael@0: delete_job(jobp); michael@0: } michael@0: michael@0: PR_ASSERT(0 == tpool->jobq.cnt); michael@0: PR_ASSERT(0 == tpool->ioq.cnt); michael@0: PR_ASSERT(0 == tpool->timerq.cnt); michael@0: michael@0: delete_threadpool(tpool); michael@0: return rval; michael@0: }