nsprpub/pr/src/misc/prtpool.c

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

     1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
     2 /* This Source Code Form is subject to the terms of the Mozilla Public
     3  * License, v. 2.0. If a copy of the MPL was not distributed with this
     4  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
     6 #include "nspr.h"
     8 /*
     9  * Thread pools
    10  *	Thread pools create and manage threads to provide support for
    11  *	scheduling jobs onto one or more threads.
    12  *
    13  */
    14 #ifdef OPT_WINNT
    15 #include <windows.h>
    16 #endif
    18 /*
    19  * worker thread
    20  */
    21 typedef struct wthread {
    22 	PRCList		links;
    23 	PRThread	*thread;
    24 } wthread;
    26 /*
    27  * queue of timer jobs
    28  */
    29 typedef struct timer_jobq {
    30 	PRCList		list;
    31 	PRLock		*lock;
    32 	PRCondVar	*cv;
    33 	PRInt32		cnt;
    34 	PRCList 	wthreads;
    35 } timer_jobq;
    37 /*
    38  * queue of jobs
    39  */
    40 typedef struct tp_jobq {
    41 	PRCList		list;
    42 	PRInt32		cnt;
    43 	PRLock		*lock;
    44 	PRCondVar	*cv;
    45 	PRCList 	wthreads;
    46 #ifdef OPT_WINNT
    47 	HANDLE		nt_completion_port;
    48 #endif
    49 } tp_jobq;
    51 /*
    52  * queue of IO jobs
    53  */
    54 typedef struct io_jobq {
    55 	PRCList		list;
    56 	PRPollDesc  *pollfds;
    57 	PRInt32  	npollfds;
    58 	PRJob		**polljobs;
    59 	PRLock		*lock;
    60 	PRInt32		cnt;
    61 	PRFileDesc	*notify_fd;
    62 	PRCList 	wthreads;
    63 } io_jobq;
    65 /*
    66  * Threadpool
    67  */
    68 struct PRThreadPool {
    69 	PRInt32		init_threads;
    70 	PRInt32		max_threads;
    71 	PRInt32		current_threads;
    72 	PRInt32		idle_threads;
    73 	PRUint32	stacksize;
    74 	tp_jobq		jobq;
    75 	io_jobq		ioq;
    76 	timer_jobq	timerq;
    77 	PRLock		*join_lock;		/* used with jobp->join_cv */
    78 	PRCondVar	*shutdown_cv;
    79 	PRBool		shutdown;
    80 };
    82 typedef enum io_op_type
    83 	{ JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type;
    85 #ifdef OPT_WINNT
    86 typedef struct NT_notifier {
    87 	OVERLAPPED overlapped;		/* must be first */
    88 	PRJob	*jobp;
    89 } NT_notifier;
    90 #endif
    92 struct PRJob {
    93 	PRCList			links;		/* 	for linking jobs */
    94 	PRBool			on_ioq;		/* job on ioq */
    95 	PRBool			on_timerq;	/* job on timerq */
    96 	PRJobFn			job_func;
    97 	void 			*job_arg;
    98 	PRCondVar		*join_cv;
    99 	PRBool			join_wait;	/* == PR_TRUE, when waiting to join */
   100 	PRCondVar		*cancel_cv;	/* for cancelling IO jobs */
   101 	PRBool			cancel_io;	/* for cancelling IO jobs */
   102 	PRThreadPool	*tpool;		/* back pointer to thread pool */
   103 	PRJobIoDesc		*iod;
   104 	io_op_type		io_op;
   105 	PRInt16			io_poll_flags;
   106 	PRNetAddr		*netaddr;
   107 	PRIntervalTime	timeout;	/* relative value */
   108 	PRIntervalTime	absolute;
   109 #ifdef OPT_WINNT
   110 	NT_notifier		nt_notifier;	
   111 #endif
   112 };
   114 #define JOB_LINKS_PTR(_qp) \
   115     ((PRJob *) ((char *) (_qp) - offsetof(PRJob, links)))
   117 #define WTHREAD_LINKS_PTR(_qp) \
   118     ((wthread *) ((char *) (_qp) - offsetof(wthread, links)))
   120 #define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv)
   122 #define JOIN_NOTIFY(_jobp)								\
   123 				PR_BEGIN_MACRO							\
   124 				PR_Lock(_jobp->tpool->join_lock);		\
   125 				_jobp->join_wait = PR_FALSE;			\
   126 				PR_NotifyCondVar(_jobp->join_cv);		\
   127 				PR_Unlock(_jobp->tpool->join_lock);		\
   128 				PR_END_MACRO
   130 #define CANCEL_IO_JOB(jobp)								\
   131 				PR_BEGIN_MACRO							\
   132 				jobp->cancel_io = PR_FALSE;				\
   133 				jobp->on_ioq = PR_FALSE;				\
   134 				PR_REMOVE_AND_INIT_LINK(&jobp->links);	\
   135 				tp->ioq.cnt--;							\
   136 				PR_NotifyCondVar(jobp->cancel_cv);		\
   137 				PR_END_MACRO
   139 static void delete_job(PRJob *jobp);
   140 static PRThreadPool * alloc_threadpool(void);
   141 static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp);
   142 static void notify_ioq(PRThreadPool *tp);
   143 static void notify_timerq(PRThreadPool *tp);
   145 /*
   146  * locks are acquired in the following order
   147  *
   148  *	tp->ioq.lock,tp->timerq.lock
   149  *			|
   150  *			V
   151  *		tp->jobq->lock		
   152  */
   154 /*
   155  * worker thread function
   156  */
   157 static void wstart(void *arg)
   158 {
   159 PRThreadPool *tp = (PRThreadPool *) arg;
   160 PRCList *head;
   162 	/*
   163 	 * execute jobs until shutdown
   164 	 */
   165 	while (!tp->shutdown) {
   166 		PRJob *jobp;
   167 #ifdef OPT_WINNT
   168 		BOOL rv;
   169 		DWORD unused, shutdown;
   170 		LPOVERLAPPED olp;
   172 		PR_Lock(tp->jobq.lock);
   173 		tp->idle_threads++;
   174 		PR_Unlock(tp->jobq.lock);
   175 		rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port,
   176 					&unused, &shutdown, &olp, INFINITE);
   178 		PR_ASSERT(rv);
   179 		if (shutdown)
   180 			break;
   181 		jobp = ((NT_notifier *) olp)->jobp;
   182 		PR_Lock(tp->jobq.lock);
   183 		tp->idle_threads--;
   184 		tp->jobq.cnt--;
   185 		PR_Unlock(tp->jobq.lock);
   186 #else
   188 		PR_Lock(tp->jobq.lock);
   189 		while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) {
   190 			tp->idle_threads++;
   191 			PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT);
   192 			tp->idle_threads--;
   193 		}	
   194 		if (tp->shutdown) {
   195 			PR_Unlock(tp->jobq.lock);
   196 			break;
   197 		}
   198 		head = PR_LIST_HEAD(&tp->jobq.list);
   199 		/*
   200 		 * remove job from queue
   201 		 */
   202 		PR_REMOVE_AND_INIT_LINK(head);
   203 		tp->jobq.cnt--;
   204 		jobp = JOB_LINKS_PTR(head);
   205 		PR_Unlock(tp->jobq.lock);
   206 #endif
   208 		jobp->job_func(jobp->job_arg);
   209 		if (!JOINABLE_JOB(jobp)) {
   210 			delete_job(jobp);
   211 		} else {
   212 			JOIN_NOTIFY(jobp);
   213 		}
   214 	}
   215 	PR_Lock(tp->jobq.lock);
   216 	tp->current_threads--;
   217 	PR_Unlock(tp->jobq.lock);
   218 }
   220 /*
   221  * add a job to the work queue
   222  */
   223 static void
   224 add_to_jobq(PRThreadPool *tp, PRJob *jobp)
   225 {
   226 	/*
   227 	 * add to jobq
   228 	 */
   229 #ifdef OPT_WINNT
   230 	PR_Lock(tp->jobq.lock);
   231 	tp->jobq.cnt++;
   232 	PR_Unlock(tp->jobq.lock);
   233 	/*
   234 	 * notify worker thread(s)
   235 	 */
   236 	PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0,
   237             FALSE, &jobp->nt_notifier.overlapped);
   238 #else
   239 	PR_Lock(tp->jobq.lock);
   240 	PR_APPEND_LINK(&jobp->links,&tp->jobq.list);
   241 	tp->jobq.cnt++;
   242 	if ((tp->idle_threads < tp->jobq.cnt) &&
   243 					(tp->current_threads < tp->max_threads)) {
   244 		wthread *wthrp;
   245 		/*
   246 		 * increment thread count and unlock the jobq lock
   247 		 */
   248 		tp->current_threads++;
   249 		PR_Unlock(tp->jobq.lock);
   250 		/* create new worker thread */
   251 		wthrp = PR_NEWZAP(wthread);
   252 		if (wthrp) {
   253 			wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart,
   254 						tp, PR_PRIORITY_NORMAL,
   255 						PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize);
   256 			if (NULL == wthrp->thread) {
   257 				PR_DELETE(wthrp);  /* this sets wthrp to NULL */
   258 			}
   259 		}
   260 		PR_Lock(tp->jobq.lock);
   261 		if (NULL == wthrp) {
   262 			tp->current_threads--;
   263 		} else {
   264 			PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
   265 		}
   266 	}
   267 	/*
   268 	 * wakeup a worker thread
   269 	 */
   270 	PR_NotifyCondVar(tp->jobq.cv);
   271 	PR_Unlock(tp->jobq.lock);
   272 #endif
   273 }
   275 /*
   276  * io worker thread function
   277  */
   278 static void io_wstart(void *arg)
   279 {
   280 PRThreadPool *tp = (PRThreadPool *) arg;
   281 int pollfd_cnt, pollfds_used;
   282 int rv;
   283 PRCList *qp, *nextqp;
   284 PRPollDesc *pollfds;
   285 PRJob **polljobs;
   286 int poll_timeout;
   287 PRIntervalTime now;
   289 	/*
   290 	 * scan io_jobq
   291 	 * construct poll list
   292 	 * call PR_Poll
   293 	 * for all fds, for which poll returns true, move the job to
   294 	 * jobq and wakeup worker thread.
   295 	 */
   296 	while (!tp->shutdown) {
   297 		PRJob *jobp;
   299 		pollfd_cnt = tp->ioq.cnt + 10;
   300 		if (pollfd_cnt > tp->ioq.npollfds) {
   302 			/*
   303 			 * re-allocate pollfd array if the current one is not large
   304 			 * enough
   305 			 */
   306 			if (NULL != tp->ioq.pollfds)
   307 				PR_Free(tp->ioq.pollfds);
   308 			tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt *
   309 						(sizeof(PRPollDesc) + sizeof(PRJob *)));
   310 			PR_ASSERT(NULL != tp->ioq.pollfds);
   311 			/*
   312 			 * array of pollfds
   313 			 */
   314 			pollfds = tp->ioq.pollfds;
   315 			tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]);
   316 			/*
   317 			 * parallel array of jobs
   318 			 */
   319 			polljobs = tp->ioq.polljobs;
   320 			tp->ioq.npollfds = pollfd_cnt;
   321 		}
   323 		pollfds_used = 0;
   324 		/*
   325 		 * add the notify fd; used for unblocking io thread(s)
   326 		 */
   327 		pollfds[pollfds_used].fd = tp->ioq.notify_fd;
   328 		pollfds[pollfds_used].in_flags = PR_POLL_READ;
   329 		pollfds[pollfds_used].out_flags = 0;
   330 		polljobs[pollfds_used] = NULL;
   331 		pollfds_used++;
   332 		/*
   333 		 * fill in the pollfd array
   334 		 */
   335 		PR_Lock(tp->ioq.lock);
   336 		for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
   337 			nextqp = qp->next;
   338 			jobp = JOB_LINKS_PTR(qp);
   339 			if (jobp->cancel_io) {
   340 				CANCEL_IO_JOB(jobp);
   341 				continue;
   342 			}
   343 			if (pollfds_used == (pollfd_cnt))
   344 				break;
   345 			pollfds[pollfds_used].fd = jobp->iod->socket;
   346 			pollfds[pollfds_used].in_flags = jobp->io_poll_flags;
   347 			pollfds[pollfds_used].out_flags = 0;
   348 			polljobs[pollfds_used] = jobp;
   350 			pollfds_used++;
   351 		}
   352 		if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) {
   353 			qp = tp->ioq.list.next;
   354 			jobp = JOB_LINKS_PTR(qp);
   355 			if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout)
   356 				poll_timeout = PR_INTERVAL_NO_TIMEOUT;
   357 			else if (PR_INTERVAL_NO_WAIT == jobp->timeout)
   358 				poll_timeout = PR_INTERVAL_NO_WAIT;
   359 			else {
   360 				poll_timeout = jobp->absolute - PR_IntervalNow();
   361 				if (poll_timeout <= 0) /* already timed out */
   362 					poll_timeout = PR_INTERVAL_NO_WAIT;
   363 			}
   364 		} else {
   365 			poll_timeout = PR_INTERVAL_NO_TIMEOUT;
   366 		}
   367 		PR_Unlock(tp->ioq.lock);
   369 		/*
   370 		 * XXXX
   371 		 * should retry if more jobs have been added to the queue?
   372 		 *
   373 		 */
   374 		PR_ASSERT(pollfds_used <= pollfd_cnt);
   375 		rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout);
   377 		if (tp->shutdown) {
   378 			break;
   379 		}
   381 		if (rv > 0) {
   382 			/*
   383 			 * at least one io event is set
   384 			 */
   385 			PRStatus rval_status;
   386 			PRInt32 index;
   388 			PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd);
   389 			/*
   390 			 * reset the pollable event, if notified
   391 			 */
   392 			if (pollfds[0].out_flags & PR_POLL_READ) {
   393 				rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd);
   394 				PR_ASSERT(PR_SUCCESS == rval_status);
   395 			}
   397 			for(index = 1; index < (pollfds_used); index++) {
   398                 PRInt16 events = pollfds[index].in_flags;
   399                 PRInt16 revents = pollfds[index].out_flags;	
   400 				jobp = polljobs[index];	
   402                 if ((revents & PR_POLL_NVAL) ||  /* busted in all cases */
   403                 	(revents & PR_POLL_ERR) ||
   404                 			((events & PR_POLL_WRITE) &&
   405 							(revents & PR_POLL_HUP))) { /* write op & hup */
   406 					PR_Lock(tp->ioq.lock);
   407 					if (jobp->cancel_io) {
   408 						CANCEL_IO_JOB(jobp);
   409 						PR_Unlock(tp->ioq.lock);
   410 						continue;
   411 					}
   412 					PR_REMOVE_AND_INIT_LINK(&jobp->links);
   413 					tp->ioq.cnt--;
   414 					jobp->on_ioq = PR_FALSE;
   415 					PR_Unlock(tp->ioq.lock);
   417 					/* set error */
   418                     if (PR_POLL_NVAL & revents)
   419 						jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR;
   420                     else if (PR_POLL_HUP & revents)
   421 						jobp->iod->error = PR_CONNECT_RESET_ERROR;
   422                     else 
   423 						jobp->iod->error = PR_IO_ERROR;
   425 					/*
   426 					 * add to jobq
   427 					 */
   428 					add_to_jobq(tp, jobp);
   429 				} else if (revents) {
   430 					/*
   431 					 * add to jobq
   432 					 */
   433 					PR_Lock(tp->ioq.lock);
   434 					if (jobp->cancel_io) {
   435 						CANCEL_IO_JOB(jobp);
   436 						PR_Unlock(tp->ioq.lock);
   437 						continue;
   438 					}
   439 					PR_REMOVE_AND_INIT_LINK(&jobp->links);
   440 					tp->ioq.cnt--;
   441 					jobp->on_ioq = PR_FALSE;
   442 					PR_Unlock(tp->ioq.lock);
   444 					if (jobp->io_op == JOB_IO_CONNECT) {
   445 						if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS)
   446 							jobp->iod->error = 0;
   447 						else
   448 							jobp->iod->error = PR_GetError();
   449 					} else
   450 						jobp->iod->error = 0;
   452 					add_to_jobq(tp, jobp);
   453 				}
   454 			}
   455 		}
   456 		/*
   457 		 * timeout processing
   458 		 */
   459 		now = PR_IntervalNow();
   460 		PR_Lock(tp->ioq.lock);
   461 		for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
   462 			nextqp = qp->next;
   463 			jobp = JOB_LINKS_PTR(qp);
   464 			if (jobp->cancel_io) {
   465 				CANCEL_IO_JOB(jobp);
   466 				continue;
   467 			}
   468 			if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout)
   469 				break;
   470 			if ((PR_INTERVAL_NO_WAIT != jobp->timeout) &&
   471 								((PRInt32)(jobp->absolute - now) > 0))
   472 				break;
   473 			PR_REMOVE_AND_INIT_LINK(&jobp->links);
   474 			tp->ioq.cnt--;
   475 			jobp->on_ioq = PR_FALSE;
   476 			jobp->iod->error = PR_IO_TIMEOUT_ERROR;
   477 			add_to_jobq(tp, jobp);
   478 		}
   479 		PR_Unlock(tp->ioq.lock);
   480 	}
   481 }
   483 /*
   484  * timer worker thread function
   485  */
   486 static void timer_wstart(void *arg)
   487 {
   488 PRThreadPool *tp = (PRThreadPool *) arg;
   489 PRCList *qp;
   490 PRIntervalTime timeout;
   491 PRIntervalTime now;
   493 	/*
   494 	 * call PR_WaitCondVar with minimum value of all timeouts
   495 	 */
   496 	while (!tp->shutdown) {
   497 		PRJob *jobp;
   499 		PR_Lock(tp->timerq.lock);
   500 		if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
   501 			timeout = PR_INTERVAL_NO_TIMEOUT;
   502 		} else {
   503 			PRCList *qp;
   505 			qp = tp->timerq.list.next;
   506 			jobp = JOB_LINKS_PTR(qp);
   508 			timeout = jobp->absolute - PR_IntervalNow();
   509             if (timeout <= 0)
   510 				timeout = PR_INTERVAL_NO_WAIT;  /* already timed out */
   511 		}
   512 		if (PR_INTERVAL_NO_WAIT != timeout)
   513 			PR_WaitCondVar(tp->timerq.cv, timeout);
   514 		if (tp->shutdown) {
   515 			PR_Unlock(tp->timerq.lock);
   516 			break;
   517 		}
   518 		/*
   519 		 * move expired-timer jobs to jobq
   520 		 */
   521 		now = PR_IntervalNow();	
   522 		while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
   523 			qp = tp->timerq.list.next;
   524 			jobp = JOB_LINKS_PTR(qp);
   526 			if ((PRInt32)(jobp->absolute - now) > 0) {
   527 				break;
   528 			}
   529 			/*
   530 			 * job timed out
   531 			 */
   532 			PR_REMOVE_AND_INIT_LINK(&jobp->links);
   533 			tp->timerq.cnt--;
   534 			jobp->on_timerq = PR_FALSE;
   535 			add_to_jobq(tp, jobp);
   536 		}
   537 		PR_Unlock(tp->timerq.lock);
   538 	}
   539 }
   541 static void
   542 delete_threadpool(PRThreadPool *tp)
   543 {
   544 	if (NULL != tp) {
   545 		if (NULL != tp->shutdown_cv)
   546 			PR_DestroyCondVar(tp->shutdown_cv);
   547 		if (NULL != tp->jobq.cv)
   548 			PR_DestroyCondVar(tp->jobq.cv);
   549 		if (NULL != tp->jobq.lock)
   550 			PR_DestroyLock(tp->jobq.lock);
   551 		if (NULL != tp->join_lock)
   552 			PR_DestroyLock(tp->join_lock);
   553 #ifdef OPT_WINNT
   554 		if (NULL != tp->jobq.nt_completion_port)
   555 			CloseHandle(tp->jobq.nt_completion_port);
   556 #endif
   557 		/* Timer queue */
   558 		if (NULL != tp->timerq.cv)
   559 			PR_DestroyCondVar(tp->timerq.cv);
   560 		if (NULL != tp->timerq.lock)
   561 			PR_DestroyLock(tp->timerq.lock);
   563 		if (NULL != tp->ioq.lock)
   564 			PR_DestroyLock(tp->ioq.lock);
   565 		if (NULL != tp->ioq.pollfds)
   566 			PR_Free(tp->ioq.pollfds);
   567 		if (NULL != tp->ioq.notify_fd)
   568 			PR_DestroyPollableEvent(tp->ioq.notify_fd);
   569 		PR_Free(tp);
   570 	}
   571 	return;
   572 }
   574 static PRThreadPool *
   575 alloc_threadpool(void)
   576 {
   577 PRThreadPool *tp;
   579 	tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp));
   580 	if (NULL == tp)
   581 		goto failed;
   582 	tp->jobq.lock = PR_NewLock();
   583 	if (NULL == tp->jobq.lock)
   584 		goto failed;
   585 	tp->jobq.cv = PR_NewCondVar(tp->jobq.lock);
   586 	if (NULL == tp->jobq.cv)
   587 		goto failed;
   588 	tp->join_lock = PR_NewLock();
   589 	if (NULL == tp->join_lock)
   590 		goto failed;
   591 #ifdef OPT_WINNT
   592 	tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
   593 									NULL, 0, 0);
   594 	if (NULL == tp->jobq.nt_completion_port)
   595 		goto failed;
   596 #endif
   598 	tp->ioq.lock = PR_NewLock();
   599 	if (NULL == tp->ioq.lock)
   600 		goto failed;
   602 	/* Timer queue */
   604 	tp->timerq.lock = PR_NewLock();
   605 	if (NULL == tp->timerq.lock)
   606 		goto failed;
   607 	tp->timerq.cv = PR_NewCondVar(tp->timerq.lock);
   608 	if (NULL == tp->timerq.cv)
   609 		goto failed;
   611 	tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock);
   612 	if (NULL == tp->shutdown_cv)
   613 		goto failed;
   614 	tp->ioq.notify_fd = PR_NewPollableEvent();
   615 	if (NULL == tp->ioq.notify_fd)
   616 		goto failed;
   617 	return tp;
   618 failed:
   619 	delete_threadpool(tp);
   620 	PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
   621 	return NULL;
   622 }
   624 /* Create thread pool */
   625 PR_IMPLEMENT(PRThreadPool *)
   626 PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads,
   627                                 PRUint32 stacksize)
   628 {
   629 PRThreadPool *tp;
   630 PRThread *thr;
   631 int i;
   632 wthread *wthrp;
   634 	tp = alloc_threadpool();
   635 	if (NULL == tp)
   636 		return NULL;
   638 	tp->init_threads = initial_threads;
   639 	tp->max_threads = max_threads;
   640 	tp->stacksize = stacksize;
   641 	PR_INIT_CLIST(&tp->jobq.list);
   642 	PR_INIT_CLIST(&tp->ioq.list);
   643 	PR_INIT_CLIST(&tp->timerq.list);
   644 	PR_INIT_CLIST(&tp->jobq.wthreads);
   645 	PR_INIT_CLIST(&tp->ioq.wthreads);
   646 	PR_INIT_CLIST(&tp->timerq.wthreads);
   647 	tp->shutdown = PR_FALSE;
   649 	PR_Lock(tp->jobq.lock);
   650 	for(i=0; i < initial_threads; ++i) {
   652 		thr = PR_CreateThread(PR_USER_THREAD, wstart,
   653 						tp, PR_PRIORITY_NORMAL,
   654 						PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize);
   655 		PR_ASSERT(thr);
   656 		wthrp = PR_NEWZAP(wthread);
   657 		PR_ASSERT(wthrp);
   658 		wthrp->thread = thr;
   659 		PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
   660 	}
   661 	tp->current_threads = initial_threads;
   663 	thr = PR_CreateThread(PR_USER_THREAD, io_wstart,
   664 					tp, PR_PRIORITY_NORMAL,
   665 					PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
   666 	PR_ASSERT(thr);
   667 	wthrp = PR_NEWZAP(wthread);
   668 	PR_ASSERT(wthrp);
   669 	wthrp->thread = thr;
   670 	PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads);
   672 	thr = PR_CreateThread(PR_USER_THREAD, timer_wstart,
   673 					tp, PR_PRIORITY_NORMAL,
   674 					PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
   675 	PR_ASSERT(thr);
   676 	wthrp = PR_NEWZAP(wthread);
   677 	PR_ASSERT(wthrp);
   678 	wthrp->thread = thr;
   679 	PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads);
   681 	PR_Unlock(tp->jobq.lock);
   682 	return tp;
   683 }
   685 static void
   686 delete_job(PRJob *jobp)
   687 {
   688 	if (NULL != jobp) {
   689 		if (NULL != jobp->join_cv) {
   690 			PR_DestroyCondVar(jobp->join_cv);
   691 			jobp->join_cv = NULL;
   692 		}
   693 		if (NULL != jobp->cancel_cv) {
   694 			PR_DestroyCondVar(jobp->cancel_cv);
   695 			jobp->cancel_cv = NULL;
   696 		}
   697 		PR_DELETE(jobp);
   698 	}
   699 }
   701 static PRJob *
   702 alloc_job(PRBool joinable, PRThreadPool *tp)
   703 {
   704 	PRJob *jobp;
   706 	jobp = PR_NEWZAP(PRJob);
   707 	if (NULL == jobp) 
   708 		goto failed;
   709 	if (joinable) {
   710 		jobp->join_cv = PR_NewCondVar(tp->join_lock);
   711 		jobp->join_wait = PR_TRUE;
   712 		if (NULL == jobp->join_cv)
   713 			goto failed;
   714 	} else {
   715 		jobp->join_cv = NULL;
   716 	}
   717 #ifdef OPT_WINNT
   718 	jobp->nt_notifier.jobp = jobp;
   719 #endif
   720 	return jobp;
   721 failed:
   722 	delete_job(jobp);
   723 	PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
   724 	return NULL;
   725 }
   727 /* queue a job */
   728 PR_IMPLEMENT(PRJob *)
   729 PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable)
   730 {
   731 	PRJob *jobp;
   733 	jobp = alloc_job(joinable, tpool);
   734 	if (NULL == jobp)
   735 		return NULL;
   737 	jobp->job_func = fn;
   738 	jobp->job_arg = arg;
   739 	jobp->tpool = tpool;
   741 	add_to_jobq(tpool, jobp);
   742 	return jobp;
   743 }
   745 /* queue a job, when a socket is readable or writeable */
   746 static PRJob *
   747 queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
   748 				PRBool joinable, io_op_type op)
   749 {
   750 	PRJob *jobp;
   751 	PRIntervalTime now;
   753 	jobp = alloc_job(joinable, tpool);
   754 	if (NULL == jobp) {
   755 		return NULL;
   756 	}
   758 	/*
   759 	 * Add a new job to io_jobq
   760 	 * wakeup io worker thread
   761 	 */
   763 	jobp->job_func = fn;
   764 	jobp->job_arg = arg;
   765 	jobp->tpool = tpool;
   766 	jobp->iod = iod;
   767 	if (JOB_IO_READ == op) {
   768 		jobp->io_op = JOB_IO_READ;
   769 		jobp->io_poll_flags = PR_POLL_READ;
   770 	} else if (JOB_IO_WRITE == op) {
   771 		jobp->io_op = JOB_IO_WRITE;
   772 		jobp->io_poll_flags = PR_POLL_WRITE;
   773 	} else if (JOB_IO_ACCEPT == op) {
   774 		jobp->io_op = JOB_IO_ACCEPT;
   775 		jobp->io_poll_flags = PR_POLL_READ;
   776 	} else if (JOB_IO_CONNECT == op) {
   777 		jobp->io_op = JOB_IO_CONNECT;
   778 		jobp->io_poll_flags = PR_POLL_WRITE|PR_POLL_EXCEPT;
   779 	} else {
   780 		delete_job(jobp);
   781 		PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
   782 		return NULL;
   783 	}
   785 	jobp->timeout = iod->timeout;
   786 	if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) ||
   787 			(PR_INTERVAL_NO_WAIT == iod->timeout)) {
   788 		jobp->absolute = iod->timeout;
   789 	} else {
   790 		now = PR_IntervalNow();
   791 		jobp->absolute = now + iod->timeout;
   792 	}
   795 	PR_Lock(tpool->ioq.lock);
   797 	if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) ||
   798 			(PR_INTERVAL_NO_TIMEOUT == iod->timeout)) {
   799 		PR_APPEND_LINK(&jobp->links,&tpool->ioq.list);
   800 	} else if (PR_INTERVAL_NO_WAIT == iod->timeout) {
   801 		PR_INSERT_LINK(&jobp->links,&tpool->ioq.list);
   802 	} else {
   803 		PRCList *qp;
   804 		PRJob *tmp_jobp;
   805 		/*
   806 		 * insert into the timeout-sorted ioq
   807 		 */
   808 		for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list;
   809 							qp = qp->prev) {
   810 			tmp_jobp = JOB_LINKS_PTR(qp);
   811 			if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
   812 				break;
   813 			}
   814 		}
   815 		PR_INSERT_AFTER(&jobp->links,qp);
   816 	}
   818 	jobp->on_ioq = PR_TRUE;
   819 	tpool->ioq.cnt++;
   820 	/*
   821 	 * notify io worker thread(s)
   822 	 */
   823 	PR_Unlock(tpool->ioq.lock);
   824 	notify_ioq(tpool);
   825 	return jobp;
   826 }
   828 /* queue a job, when a socket is readable */
   829 PR_IMPLEMENT(PRJob *)
   830 PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
   831 											PRBool joinable)
   832 {
   833 	return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ));
   834 }
   836 /* queue a job, when a socket is writeable */
   837 PR_IMPLEMENT(PRJob *)
   838 PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,void * arg,
   839 										PRBool joinable)
   840 {
   841 	return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE));
   842 }
   845 /* queue a job, when a socket has a pending connection */
   846 PR_IMPLEMENT(PRJob *)
   847 PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,
   848 								void * arg, PRBool joinable)
   849 {
   850 	return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT));
   851 }
   853 /* queue a job, when a socket can be connected */
   854 PR_IMPLEMENT(PRJob *)
   855 PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod,
   856 			const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable)
   857 {
   858 	PRStatus rv;
   859 	PRErrorCode err;
   861 	rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT);
   862 	if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)){
   863 		/* connection pending */
   864 		return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT));
   865 	} else {
   866 		/*
   867 		 * connection succeeded or failed; add to jobq right away
   868 		 */
   869 		if (rv == PR_FAILURE)
   870 			iod->error = err;
   871 		else
   872 			iod->error = 0;
   873 		return(PR_QueueJob(tpool, fn, arg, joinable));
   874 	}
   875 }
   877 /* queue a job, when a timer expires */
   878 PR_IMPLEMENT(PRJob *)
   879 PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout,
   880 							PRJobFn fn, void * arg, PRBool joinable)
   881 {
   882 	PRIntervalTime now;
   883 	PRJob *jobp;
   885 	if (PR_INTERVAL_NO_TIMEOUT == timeout) {
   886 		PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
   887 		return NULL;
   888 	}
   889 	if (PR_INTERVAL_NO_WAIT == timeout) {
   890 		/*
   891 		 * no waiting; add to jobq right away
   892 		 */
   893 		return(PR_QueueJob(tpool, fn, arg, joinable));
   894 	}
   895 	jobp = alloc_job(joinable, tpool);
   896 	if (NULL == jobp) {
   897 		return NULL;
   898 	}
   900 	/*
   901 	 * Add a new job to timer_jobq
   902 	 * wakeup timer worker thread
   903 	 */
   905 	jobp->job_func = fn;
   906 	jobp->job_arg = arg;
   907 	jobp->tpool = tpool;
   908 	jobp->timeout = timeout;
   910 	now = PR_IntervalNow();
   911 	jobp->absolute = now + timeout;
   914 	PR_Lock(tpool->timerq.lock);
   915 	jobp->on_timerq = PR_TRUE;
   916 	if (PR_CLIST_IS_EMPTY(&tpool->timerq.list))
   917 		PR_APPEND_LINK(&jobp->links,&tpool->timerq.list);
   918 	else {
   919 		PRCList *qp;
   920 		PRJob *tmp_jobp;
   921 		/*
   922 		 * insert into the sorted timer jobq
   923 		 */
   924 		for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list;
   925 							qp = qp->prev) {
   926 			tmp_jobp = JOB_LINKS_PTR(qp);
   927 			if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
   928 				break;
   929 			}
   930 		}
   931 		PR_INSERT_AFTER(&jobp->links,qp);
   932 	}
   933 	tpool->timerq.cnt++;
   934 	/*
   935 	 * notify timer worker thread(s)
   936 	 */
   937 	notify_timerq(tpool);
   938 	PR_Unlock(tpool->timerq.lock);
   939 	return jobp;
   940 }
   942 static void
   943 notify_timerq(PRThreadPool *tp)
   944 {
   945 	/*
   946 	 * wakeup the timer thread(s)
   947 	 */
   948 	PR_NotifyCondVar(tp->timerq.cv);
   949 }
   951 static void
   952 notify_ioq(PRThreadPool *tp)
   953 {
   954 PRStatus rval_status;
   956 	/*
   957 	 * wakeup the io thread(s)
   958 	 */
   959 	rval_status = PR_SetPollableEvent(tp->ioq.notify_fd);
   960 	PR_ASSERT(PR_SUCCESS == rval_status);
   961 }
   963 /*
   964  * cancel a job
   965  *
   966  *	XXXX: is this needed? likely to be removed
   967  */
   968 PR_IMPLEMENT(PRStatus)
   969 PR_CancelJob(PRJob *jobp) {
   971 	PRStatus rval = PR_FAILURE;
   972 	PRThreadPool *tp;
   974 	if (jobp->on_timerq) {
   975 		/*
   976 		 * now, check again while holding the timerq lock
   977 		 */
   978 		tp = jobp->tpool;
   979 		PR_Lock(tp->timerq.lock);
   980 		if (jobp->on_timerq) {
   981 			jobp->on_timerq = PR_FALSE;
   982 			PR_REMOVE_AND_INIT_LINK(&jobp->links);
   983 			tp->timerq.cnt--;
   984 			PR_Unlock(tp->timerq.lock);
   985 			if (!JOINABLE_JOB(jobp)) {
   986 				delete_job(jobp);
   987 			} else {
   988 				JOIN_NOTIFY(jobp);
   989 			}
   990 			rval = PR_SUCCESS;
   991 		} else
   992 			PR_Unlock(tp->timerq.lock);
   993 	} else if (jobp->on_ioq) {
   994 		/*
   995 		 * now, check again while holding the ioq lock
   996 		 */
   997 		tp = jobp->tpool;
   998 		PR_Lock(tp->ioq.lock);
   999 		if (jobp->on_ioq) {
  1000 			jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock);
  1001 			if (NULL == jobp->cancel_cv) {
  1002 				PR_Unlock(tp->ioq.lock);
  1003 				PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0);
  1004 				return PR_FAILURE;
  1006 			/*
  1007 			 * mark job 'cancelled' and notify io thread(s)
  1008 			 * XXXX:
  1009 			 *		this assumes there is only one io thread; when there
  1010 			 * 		are multiple threads, the io thread processing this job
  1011 			 * 		must be notified.
  1012 			 */
  1013 			jobp->cancel_io = PR_TRUE;
  1014 			PR_Unlock(tp->ioq.lock);	/* release, reacquire ioq lock */
  1015 			notify_ioq(tp);
  1016 			PR_Lock(tp->ioq.lock);
  1017 			while (jobp->cancel_io)
  1018 				PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT);
  1019 			PR_Unlock(tp->ioq.lock);
  1020 			PR_ASSERT(!jobp->on_ioq);
  1021 			if (!JOINABLE_JOB(jobp)) {
  1022 				delete_job(jobp);
  1023 			} else {
  1024 				JOIN_NOTIFY(jobp);
  1026 			rval = PR_SUCCESS;
  1027 		} else
  1028 			PR_Unlock(tp->ioq.lock);
  1030 	if (PR_FAILURE == rval)
  1031 		PR_SetError(PR_INVALID_STATE_ERROR, 0);
  1032 	return rval;
  1035 /* join a job, wait until completion */
  1036 PR_IMPLEMENT(PRStatus)
  1037 PR_JoinJob(PRJob *jobp)
  1039 	if (!JOINABLE_JOB(jobp)) {
  1040 		PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
  1041 		return PR_FAILURE;
  1043 	PR_Lock(jobp->tpool->join_lock);
  1044 	while(jobp->join_wait)
  1045 		PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT);
  1046 	PR_Unlock(jobp->tpool->join_lock);
  1047 	delete_job(jobp);
  1048 	return PR_SUCCESS;
  1051 /* shutdown threadpool */
  1052 PR_IMPLEMENT(PRStatus)
  1053 PR_ShutdownThreadPool(PRThreadPool *tpool)
  1055 PRStatus rval = PR_SUCCESS;
  1057 	PR_Lock(tpool->jobq.lock);
  1058 	tpool->shutdown = PR_TRUE;
  1059 	PR_NotifyAllCondVar(tpool->shutdown_cv);
  1060 	PR_Unlock(tpool->jobq.lock);
  1062 	return rval;
  1065 /*
  1066  * join thread pool
  1067  * 	wait for termination of worker threads
  1068  *	reclaim threadpool resources
  1069  */
  1070 PR_IMPLEMENT(PRStatus)
  1071 PR_JoinThreadPool(PRThreadPool *tpool)
  1073 PRStatus rval = PR_SUCCESS;
  1074 PRCList *head;
  1075 PRStatus rval_status;
  1077 	PR_Lock(tpool->jobq.lock);
  1078 	while (!tpool->shutdown)
  1079 		PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT);
  1081 	/*
  1082 	 * wakeup worker threads
  1083 	 */
  1084 #ifdef OPT_WINNT
  1085 	/*
  1086 	 * post shutdown notification for all threads
  1087 	 */
  1089 		int i;
  1090 		for(i=0; i < tpool->current_threads; i++) {
  1091 			PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0,
  1092 												TRUE, NULL);
  1095 #else
  1096 	PR_NotifyAllCondVar(tpool->jobq.cv);
  1097 #endif
  1099 	/*
  1100 	 * wakeup io thread(s)
  1101 	 */
  1102 	notify_ioq(tpool);
  1104 	/*
  1105 	 * wakeup timer thread(s)
  1106 	 */
  1107 	PR_Lock(tpool->timerq.lock);
  1108 	notify_timerq(tpool);
  1109 	PR_Unlock(tpool->timerq.lock);
  1111 	while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) {
  1112 		wthread *wthrp;
  1114 		head = PR_LIST_HEAD(&tpool->jobq.wthreads);
  1115 		PR_REMOVE_AND_INIT_LINK(head);
  1116 		PR_Unlock(tpool->jobq.lock);
  1117 		wthrp = WTHREAD_LINKS_PTR(head);
  1118 		rval_status = PR_JoinThread(wthrp->thread);
  1119 		PR_ASSERT(PR_SUCCESS == rval_status);
  1120 		PR_DELETE(wthrp);
  1121 		PR_Lock(tpool->jobq.lock);
  1123 	PR_Unlock(tpool->jobq.lock);
  1124 	while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) {
  1125 		wthread *wthrp;
  1127 		head = PR_LIST_HEAD(&tpool->ioq.wthreads);
  1128 		PR_REMOVE_AND_INIT_LINK(head);
  1129 		wthrp = WTHREAD_LINKS_PTR(head);
  1130 		rval_status = PR_JoinThread(wthrp->thread);
  1131 		PR_ASSERT(PR_SUCCESS == rval_status);
  1132 		PR_DELETE(wthrp);
  1135 	while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) {
  1136 		wthread *wthrp;
  1138 		head = PR_LIST_HEAD(&tpool->timerq.wthreads);
  1139 		PR_REMOVE_AND_INIT_LINK(head);
  1140 		wthrp = WTHREAD_LINKS_PTR(head);
  1141 		rval_status = PR_JoinThread(wthrp->thread);
  1142 		PR_ASSERT(PR_SUCCESS == rval_status);
  1143 		PR_DELETE(wthrp);
  1146 	/*
  1147 	 * Delete queued jobs
  1148 	 */
  1149 	while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) {
  1150 		PRJob *jobp;
  1152 		head = PR_LIST_HEAD(&tpool->jobq.list);
  1153 		PR_REMOVE_AND_INIT_LINK(head);
  1154 		jobp = JOB_LINKS_PTR(head);
  1155 		tpool->jobq.cnt--;
  1156 		delete_job(jobp);
  1159 	/* delete io jobs */
  1160 	while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) {
  1161 		PRJob *jobp;
  1163 		head = PR_LIST_HEAD(&tpool->ioq.list);
  1164 		PR_REMOVE_AND_INIT_LINK(head);
  1165 		tpool->ioq.cnt--;
  1166 		jobp = JOB_LINKS_PTR(head);
  1167 		delete_job(jobp);
  1170 	/* delete timer jobs */
  1171 	while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) {
  1172 		PRJob *jobp;
  1174 		head = PR_LIST_HEAD(&tpool->timerq.list);
  1175 		PR_REMOVE_AND_INIT_LINK(head);
  1176 		tpool->timerq.cnt--;
  1177 		jobp = JOB_LINKS_PTR(head);
  1178 		delete_job(jobp);
  1181 	PR_ASSERT(0 == tpool->jobq.cnt);
  1182 	PR_ASSERT(0 == tpool->ioq.cnt);
  1183 	PR_ASSERT(0 == tpool->timerq.cnt);
  1185 	delete_threadpool(tpool);
  1186 	return rval;

mercurial