nsprpub/pr/tests/thrpool_server.c

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

     1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
     2 /* This Source Code Form is subject to the terms of the Mozilla Public
     3  * License, v. 2.0. If a copy of the MPL was not distributed with this
     4  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
     6 /***********************************************************************
     7 **
     8 ** Name: thrpool.c
     9 **
    10 ** Description: Test threadpool functionality.
    11 **
    12 ** Modification History:
    13 */
    14 #include "primpl.h"
    16 #include "plgetopt.h"
    18 #include <stdio.h>
    19 #include <string.h>
    20 #include <errno.h>
    21 #ifdef XP_UNIX
    22 #include <sys/mman.h>
    23 #endif
    24 #if defined(_PR_PTHREADS) && !defined(_PR_DCETHREADS)
    25 #include <pthread.h>
    26 #endif
    28 /* for getcwd */
    29 #if defined(XP_UNIX) || defined (XP_OS2) || defined(XP_BEOS)
    30 #include <unistd.h>
    31 #elif defined(XP_PC)
    32 #include <direct.h>
    33 #endif
    35 #ifdef WIN32
    36 #include <process.h>
    37 #endif
    39 static int _debug_on = 0;
    40 static char *program_name = NULL;
    41 static void serve_client_write(void *arg);
    43 #include "obsolete/prsem.h"
    45 #ifdef XP_PC
    46 #define mode_t int
    47 #endif
    49 #define DPRINTF(arg) if (_debug_on) printf arg
    52 #define BUF_DATA_SIZE    (2 * 1024)
    53 #define TCP_MESG_SIZE    1024
    54 #define NUM_TCP_CLIENTS  10	/* for a listen queue depth of 5 */
    57 #define NUM_TCP_CONNECTIONS_PER_CLIENT  10
    58 #define NUM_TCP_MESGS_PER_CONNECTION    10
    59 #define TCP_SERVER_PORT            		10000
    60 #define SERVER_MAX_BIND_COUNT        	100
    62 #ifdef WINCE
    63 char *getcwd(char *buf, size_t size)
    64 {
    65     wchar_t wpath[MAX_PATH];
    66     _wgetcwd(wpath, MAX_PATH);
    67     WideCharToMultiByte(CP_ACP, 0, wpath, -1, buf, size, 0, 0);
    68 }
    70 #define perror(s)
    71 #endif
    73 static PRInt32 num_tcp_clients = NUM_TCP_CLIENTS;
    74 static PRInt32 num_tcp_connections_per_client = NUM_TCP_CONNECTIONS_PER_CLIENT;
    75 static PRInt32 tcp_mesg_size = TCP_MESG_SIZE;
    76 static PRInt32 num_tcp_mesgs_per_connection = NUM_TCP_MESGS_PER_CONNECTION;
    77 static void TCP_Server_Accept(void *arg);
    80 int failed_already=0;
    81 typedef struct buffer {
    82     char    data[BUF_DATA_SIZE];
    83 } buffer;
    86 typedef struct Server_Param {
    87     PRJobIoDesc iod;    /* socket to read from/write to    */
    88     PRInt32    	datalen;    /* bytes of data transfered in each read/write */
    89     PRNetAddr	netaddr;
    90     PRMonitor	*exit_mon;    /* monitor to signal on exit            */
    91     PRInt32		*job_counterp;    /* counter to decrement, before exit        */
    92     PRInt32		conn_counter;    /* counter to decrement, before exit        */
    93 	PRThreadPool *tp;
    94 } Server_Param;
    96 typedef struct Serve_Client_Param {
    97     PRJobIoDesc iod;    /* socket to read from/write to    */
    98     PRInt32    datalen;    /* bytes of data transfered in each read/write */
    99     PRMonitor *exit_mon;    /* monitor to signal on exit            */
   100     PRInt32 *job_counterp;    /* counter to decrement, before exit        */
   101 	PRThreadPool *tp;
   102 } Serve_Client_Param;
   104 typedef struct Session {
   105     PRJobIoDesc iod;    /* socket to read from/write to    */
   106 	buffer 	*in_buf;
   107 	PRInt32 bytes;
   108 	PRInt32 msg_num;
   109 	PRInt32 bytes_read;
   110     PRMonitor *exit_mon;    /* monitor to signal on exit            */
   111     PRInt32 *job_counterp;    /* counter to decrement, before exit        */
   112 	PRThreadPool *tp;
   113 } Session;
   115 static void
   116 serve_client_read(void *arg)
   117 {
   118 	Session *sp = (Session *) arg;
   119     int rem;
   120     int bytes;
   121     int offset;
   122 	PRFileDesc *sockfd;
   123 	char *buf;
   124 	PRJob *jobp;
   126 	PRIntervalTime timeout = PR_INTERVAL_NO_TIMEOUT;
   128 	sockfd = sp->iod.socket;
   129 	buf = sp->in_buf->data;
   131     PR_ASSERT(sp->msg_num < num_tcp_mesgs_per_connection);
   132 	PR_ASSERT(sp->bytes_read < sp->bytes);
   134 	offset = sp->bytes_read;
   135 	rem = sp->bytes - offset;
   136 	bytes = PR_Recv(sockfd, buf + offset, rem, 0, timeout);
   137 	if (bytes < 0) {
   138 		return;
   139 	}
   140 	sp->bytes_read += bytes;
   141 	sp->iod.timeout = PR_SecondsToInterval(60);
   142 	if (sp->bytes_read <  sp->bytes) {
   143 		jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp,
   144 							PR_FALSE);
   145 		PR_ASSERT(NULL != jobp);
   146 		return;
   147 	}
   148 	PR_ASSERT(sp->bytes_read == sp->bytes);
   149 	DPRINTF(("serve_client: read complete, msg(%d) \n", sp->msg_num));
   151 	sp->iod.timeout = PR_SecondsToInterval(60);
   152 	jobp = PR_QueueJob_Write(sp->tp, &sp->iod, serve_client_write, sp,
   153 							PR_FALSE);
   154 	PR_ASSERT(NULL != jobp);
   156     return;
   157 }
   159 static void
   160 serve_client_write(void *arg)
   161 {
   162 	Session *sp = (Session *) arg;
   163     int bytes;
   164 	PRFileDesc *sockfd;
   165 	char *buf;
   166 	PRJob *jobp;
   168 	sockfd = sp->iod.socket;
   169 	buf = sp->in_buf->data;
   171     PR_ASSERT(sp->msg_num < num_tcp_mesgs_per_connection);
   173 	bytes = PR_Send(sockfd, buf, sp->bytes, 0, PR_INTERVAL_NO_TIMEOUT);
   174 	PR_ASSERT(bytes == sp->bytes);
   176 	if (bytes < 0) {
   177 		return;
   178 	}
   179 	DPRINTF(("serve_client: write complete, msg(%d) \n", sp->msg_num));
   180     sp->msg_num++;
   181     if (sp->msg_num < num_tcp_mesgs_per_connection) {
   182 		sp->bytes_read = 0;
   183 		sp->iod.timeout = PR_SecondsToInterval(60);
   184 		jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp,
   185 							PR_FALSE);
   186 		PR_ASSERT(NULL != jobp);
   187 		return;
   188 	}
   190 	DPRINTF(("serve_client: read/write complete, msg(%d) \n", sp->msg_num));
   191     if (PR_Shutdown(sockfd, PR_SHUTDOWN_BOTH) < 0) {
   192         fprintf(stderr,"%s: ERROR - PR_Shutdown\n", program_name);
   193     }
   195     PR_Close(sockfd);
   196     PR_EnterMonitor(sp->exit_mon);
   197     --(*sp->job_counterp);
   198     PR_Notify(sp->exit_mon);
   199     PR_ExitMonitor(sp->exit_mon);
   201     PR_DELETE(sp->in_buf);
   202     PR_DELETE(sp);
   204     return;
   205 }
   207 /*
   208  * Serve_Client
   209  *    Thread, started by the server, for serving a client connection.
   210  *    Reads data from socket and writes it back, unmodified, and
   211  *    closes the socket
   212  */
   213 static void PR_CALLBACK
   214 Serve_Client(void *arg)
   215 {
   216     Serve_Client_Param *scp = (Serve_Client_Param *) arg;
   217     buffer *in_buf;
   218 	Session *sp;
   219 	PRJob *jobp;
   221 	sp = PR_NEW(Session);
   222 	sp->iod = scp->iod;
   224     in_buf = PR_NEW(buffer);
   225     if (in_buf == NULL) {
   226         fprintf(stderr,"%s: failed to alloc buffer struct\n",program_name);
   227         failed_already=1;
   228         return;
   229     }
   231 	sp->in_buf = in_buf;
   232 	sp->bytes = scp->datalen;
   233 	sp->msg_num = 0;
   234 	sp->bytes_read = 0;
   235 	sp->tp = scp->tp;
   236    	sp->exit_mon = scp->exit_mon;
   237     sp->job_counterp = scp->job_counterp;
   239 	sp->iod.timeout = PR_SecondsToInterval(60);
   240 	jobp = PR_QueueJob_Read(sp->tp, &sp->iod, serve_client_read, sp,
   241 							PR_FALSE);
   242 	PR_ASSERT(NULL != jobp);
   243 	PR_DELETE(scp);
   244 }
   246 static void
   247 print_stats(void *arg)
   248 {
   249     Server_Param *sp = (Server_Param *) arg;
   250     PRThreadPool *tp = sp->tp;
   251     PRInt32 counter;
   252 	PRJob *jobp;
   254 	PR_EnterMonitor(sp->exit_mon);
   255 	counter = (*sp->job_counterp);
   256 	PR_ExitMonitor(sp->exit_mon);
   258 	printf("PRINT_STATS: #client connections = %d\n",counter);
   261 	jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(500),
   262 						print_stats, sp, PR_FALSE);
   264 	PR_ASSERT(NULL != jobp);
   265 }
   267 static int job_counter = 0;
   268 /*
   269  * TCP Server
   270  *    Server binds an address to a socket, starts a client process and
   271  *	  listens for incoming connections.
   272  *    Each client connects to the server and sends a chunk of data
   273  *    Starts a Serve_Client job for each incoming connection, to read
   274  *    the data from the client and send it back to the client, unmodified.
   275  *    Each client checks that data received from server is same as the
   276  *    data it sent to the server.
   277  *	  Finally, the threadpool is shutdown
   278  */
   279 static void PR_CALLBACK
   280 TCP_Server(void *arg)
   281 {
   282     PRThreadPool *tp = (PRThreadPool *) arg;
   283     Server_Param *sp;
   284     PRFileDesc *sockfd;
   285     PRNetAddr netaddr;
   286 	PRMonitor *sc_mon;
   287 	PRJob *jobp;
   288 	int i;
   289 	PRStatus rval;
   291     /*
   292      * Create a tcp socket
   293      */
   294     if ((sockfd = PR_NewTCPSocket()) == NULL) {
   295         fprintf(stderr,"%s: PR_NewTCPSocket failed\n", program_name);
   296         return;
   297     }
   298     memset(&netaddr, 0 , sizeof(netaddr));
   299     netaddr.inet.family = PR_AF_INET;
   300     netaddr.inet.port = PR_htons(TCP_SERVER_PORT);
   301     netaddr.inet.ip = PR_htonl(PR_INADDR_ANY);
   302     /*
   303      * try a few times to bind server's address, if addresses are in
   304      * use
   305      */
   306 	i = 0;
   307     while (PR_Bind(sockfd, &netaddr) < 0) {
   308         if (PR_GetError() == PR_ADDRESS_IN_USE_ERROR) {
   309             netaddr.inet.port += 2;
   310             if (i++ < SERVER_MAX_BIND_COUNT)
   311                 continue;
   312         }
   313         fprintf(stderr,"%s: ERROR - PR_Bind failed\n", program_name);
   314         perror("PR_Bind");
   315         failed_already=1;
   316         return;
   317     }
   319     if (PR_Listen(sockfd, 32) < 0) {
   320         fprintf(stderr,"%s: ERROR - PR_Listen failed\n", program_name);
   321         failed_already=1;
   322         return;
   323     }
   325     if (PR_GetSockName(sockfd, &netaddr) < 0) {
   326         fprintf(stderr,"%s: ERROR - PR_GetSockName failed\n", program_name);
   327         failed_already=1;
   328         return;
   329     }
   331     DPRINTF((
   332 	"TCP_Server: PR_BIND netaddr.inet.ip = 0x%lx, netaddr.inet.port = %d\n",
   333         netaddr.inet.ip, netaddr.inet.port));
   335 	sp = PR_NEW(Server_Param);
   336 	if (sp == NULL) {
   337 		fprintf(stderr,"%s: PR_NEW failed\n", program_name);
   338 		failed_already=1;
   339 		return;
   340 	}
   341 	sp->iod.socket = sockfd;
   342 	sp->iod.timeout = PR_SecondsToInterval(60);
   343 	sp->datalen = tcp_mesg_size;
   344 	sp->exit_mon = sc_mon;
   345 	sp->job_counterp = &job_counter;
   346 	sp->conn_counter = 0;
   347 	sp->tp = tp;
   348 	sp->netaddr = netaddr;
   350 	/* create and cancel an io job */
   351 	jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp,
   352 							PR_FALSE);
   353 	PR_ASSERT(NULL != jobp);
   354 	rval = PR_CancelJob(jobp);
   355 	PR_ASSERT(PR_SUCCESS == rval);
   357 	/*
   358 	 * create the client process
   359 	 */
   360 	{
   361 #define MAX_ARGS 4
   362 		char *argv[MAX_ARGS + 1];
   363 		int index = 0;
   364 		char port[32];
   365         char path[1024 + sizeof("/thrpool_client")];
   367         getcwd(path, sizeof(path));
   369         (void)strcat(path, "/thrpool_client");
   370 #ifdef XP_PC
   371         (void)strcat(path, ".exe");
   372 #endif
   373         argv[index++] = path;
   374 		sprintf(port,"%d",PR_ntohs(netaddr.inet.port));
   375         if (_debug_on)
   376         {
   377             argv[index++] = "-d";
   378             argv[index++] = "-p";
   379             argv[index++] = port;
   380             argv[index++] = NULL;
   381         } else {
   382             argv[index++] = "-p";
   383             argv[index++] = port;
   384 			argv[index++] = NULL;
   385 		}
   386 		PR_ASSERT(MAX_ARGS >= (index - 1));
   388         DPRINTF(("creating client process %s ...\n", path));
   389         if (PR_FAILURE == PR_CreateProcessDetached(path, argv, NULL, NULL)) {
   390         	fprintf(stderr,
   391 				"thrpool_server: ERROR - PR_CreateProcessDetached failed\n");
   392         	failed_already=1;
   393         	return;
   394 		}
   395 	}
   397     sc_mon = PR_NewMonitor();
   398     if (sc_mon == NULL) {
   399         fprintf(stderr,"%s: PR_NewMonitor failed\n", program_name);
   400         failed_already=1;
   401         return;
   402     }
   404 	sp->iod.socket = sockfd;
   405 	sp->iod.timeout = PR_SecondsToInterval(60);
   406 	sp->datalen = tcp_mesg_size;
   407 	sp->exit_mon = sc_mon;
   408 	sp->job_counterp = &job_counter;
   409 	sp->conn_counter = 0;
   410 	sp->tp = tp;
   411 	sp->netaddr = netaddr;
   413 	/* create and cancel a timer job */
   414 	jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(5000),
   415 						print_stats, sp, PR_FALSE);
   416 	PR_ASSERT(NULL != jobp);
   417 	rval = PR_CancelJob(jobp);
   418 	PR_ASSERT(PR_SUCCESS == rval);
   420     DPRINTF(("TCP_Server: Accepting connections \n"));
   422 	jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp,
   423 							PR_FALSE);
   424 	PR_ASSERT(NULL != jobp);
   425 	return;
   426 }
   428 static void
   429 TCP_Server_Accept(void *arg)
   430 {
   431     Server_Param *sp = (Server_Param *) arg;
   432     PRThreadPool *tp = sp->tp;
   433     Serve_Client_Param *scp;
   434 	PRFileDesc *newsockfd;
   435 	PRJob *jobp;
   437 	if ((newsockfd = PR_Accept(sp->iod.socket, &sp->netaddr,
   438 		PR_INTERVAL_NO_TIMEOUT)) == NULL) {
   439 		fprintf(stderr,"%s: ERROR - PR_Accept failed\n", program_name);
   440 		failed_already=1;
   441 		goto exit;
   442 	}
   443 	scp = PR_NEW(Serve_Client_Param);
   444 	if (scp == NULL) {
   445 		fprintf(stderr,"%s: PR_NEW failed\n", program_name);
   446 		failed_already=1;
   447 		goto exit;
   448 	}
   450 	/*
   451 	 * Start a Serve_Client job for each incoming connection
   452 	 */
   453 	scp->iod.socket = newsockfd;
   454 	scp->iod.timeout = PR_SecondsToInterval(60);
   455 	scp->datalen = tcp_mesg_size;
   456 	scp->exit_mon = sp->exit_mon;
   457 	scp->job_counterp = sp->job_counterp;
   458 	scp->tp = sp->tp;
   460 	PR_EnterMonitor(sp->exit_mon);
   461 	(*sp->job_counterp)++;
   462 	PR_ExitMonitor(sp->exit_mon);
   463 	jobp = PR_QueueJob(tp, Serve_Client, scp,
   464 						PR_FALSE);
   466 	PR_ASSERT(NULL != jobp);
   467 	DPRINTF(("TCP_Server: Created Serve_Client = 0x%lx\n", jobp));
   469 	/*
   470 	 * single-threaded update; no lock needed
   471 	 */
   472     sp->conn_counter++;
   473     if (sp->conn_counter <
   474 			(num_tcp_clients * num_tcp_connections_per_client)) {
   475 		jobp = PR_QueueJob_Accept(tp, &sp->iod, TCP_Server_Accept, sp,
   476 								PR_FALSE);
   477 		PR_ASSERT(NULL != jobp);
   478 		return;
   479 	}
   480 	jobp = PR_QueueJob_Timer(tp, PR_MillisecondsToInterval(500),
   481 						print_stats, sp, PR_FALSE);
   483 	PR_ASSERT(NULL != jobp);
   484 	DPRINTF(("TCP_Server: Created print_stats timer job = 0x%lx\n", jobp));
   486 exit:
   487 	PR_EnterMonitor(sp->exit_mon);
   488     /* Wait for server jobs to finish */
   489     while (0 != *sp->job_counterp) {
   490         PR_Wait(sp->exit_mon, PR_INTERVAL_NO_TIMEOUT);
   491         DPRINTF(("TCP_Server: conn_counter = %d\n",
   492 												*sp->job_counterp));
   493     }
   495     PR_ExitMonitor(sp->exit_mon);
   496     if (sp->iod.socket) {
   497         PR_Close(sp->iod.socket);
   498     }
   499 	PR_DestroyMonitor(sp->exit_mon);
   500     printf("%30s","TCP_Socket_Client_Server_Test:");
   501     printf("%2ld Server %2ld Clients %2ld connections_per_client\n",1l,
   502         num_tcp_clients, num_tcp_connections_per_client);
   503     printf("%30s %2ld messages_per_connection %4ld bytes_per_message\n",":",
   504         num_tcp_mesgs_per_connection, tcp_mesg_size);
   506 	DPRINTF(("%s: calling PR_ShutdownThreadPool\n", program_name));
   507 	PR_ShutdownThreadPool(sp->tp);
   508 	PR_DELETE(sp);
   509 }
   511 /************************************************************************/
   513 #define DEFAULT_INITIAL_THREADS		4
   514 #define DEFAULT_MAX_THREADS			100
   515 #define DEFAULT_STACKSIZE			(512 * 1024)
   517 int main(int argc, char **argv)
   518 {
   519 	PRInt32 initial_threads = DEFAULT_INITIAL_THREADS;
   520 	PRInt32 max_threads = DEFAULT_MAX_THREADS;
   521 	PRInt32 stacksize = DEFAULT_STACKSIZE;
   522 	PRThreadPool *tp = NULL;
   523 	PRStatus rv;
   524 	PRJob *jobp;
   526     /*
   527      * -d           debug mode
   528      */
   529     PLOptStatus os;
   530     PLOptState *opt;
   532 	program_name = argv[0];
   533     opt = PL_CreateOptState(argc, argv, "d");
   534     while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))
   535     {
   536         if (PL_OPT_BAD == os) continue;
   537         switch (opt->option)
   538         {
   539         case 'd':  /* debug mode */
   540             _debug_on = 1;
   541             break;
   542         default:
   543             break;
   544         }
   545     }
   546     PL_DestroyOptState(opt);
   548     PR_Init(PR_USER_THREAD, PR_PRIORITY_NORMAL, 0);
   549     PR_STDIO_INIT();
   551     PR_SetConcurrency(4);
   553 	tp = PR_CreateThreadPool(initial_threads, max_threads, stacksize);
   554     if (NULL == tp) {
   555         printf("PR_CreateThreadPool failed\n");
   556         failed_already=1;
   557         goto done;
   558 	}
   559 	jobp = PR_QueueJob(tp, TCP_Server, tp, PR_TRUE);
   560 	rv = PR_JoinJob(jobp);		
   561 	PR_ASSERT(PR_SUCCESS == rv);
   563 	DPRINTF(("%s: calling PR_JoinThreadPool\n", program_name));
   564 	rv = PR_JoinThreadPool(tp);
   565 	PR_ASSERT(PR_SUCCESS == rv);
   566 	DPRINTF(("%s: returning from PR_JoinThreadPool\n", program_name));
   568 done:
   569     PR_Cleanup();
   570     if (failed_already) return 1;
   571     else return 0;
   572 }

mercurial