nsprpub/pr/tests/thrpool_client.c

Fri, 16 Jan 2015 18:13:44 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Fri, 16 Jan 2015 18:13:44 +0100
branch
TOR_BUG_9701
changeset 14
925c144e1f1f
permissions
-rw-r--r--

Integrate suggestion from review to improve consistency with existing code.

     1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
     2 /* This Source Code Form is subject to the terms of the Mozilla Public
     3  * License, v. 2.0. If a copy of the MPL was not distributed with this
     4  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
     6 /***********************************************************************
     7 **
     8 ** Name: thrpool_client.c
     9 **
    10 ** Description: Test threadpool functionality.
    11 **
    12 ** Modification History:
    13 */
    14 #include "primpl.h"
    16 #include "plgetopt.h"
    18 #include <stdio.h>
    19 #include <string.h>
    20 #include <errno.h>
    21 #ifdef XP_UNIX
    22 #include <sys/mman.h>
    23 #endif
    24 #if defined(_PR_PTHREADS) && !defined(_PR_DCETHREADS)
    25 #include <pthread.h>
    26 #endif
    28 #ifdef WIN32
    29 #include <process.h>
    30 #endif
    32 static int _debug_on = 0;
    33 static int server_port = -1;
    34 static char *program_name = NULL;
    36 #include "obsolete/prsem.h"
    38 #ifdef XP_PC
    39 #define mode_t int
    40 #endif
    42 #define DPRINTF(arg) if (_debug_on) printf arg
    44 #define    BUF_DATA_SIZE    (2 * 1024)
    45 #define TCP_MESG_SIZE    1024
    46 #define NUM_TCP_CLIENTS            10	/* for a listen queue depth of 5 */
    48 #define NUM_TCP_CONNECTIONS_PER_CLIENT    10
    49 #define NUM_TCP_MESGS_PER_CONNECTION    10
    50 #define TCP_SERVER_PORT            10000
    52 static PRInt32 num_tcp_clients = NUM_TCP_CLIENTS;
    53 static PRInt32 num_tcp_connections_per_client = NUM_TCP_CONNECTIONS_PER_CLIENT;
    54 static PRInt32 tcp_mesg_size = TCP_MESG_SIZE;
    55 static PRInt32 num_tcp_mesgs_per_connection = NUM_TCP_MESGS_PER_CONNECTION;
    57 int failed_already=0;
    59 typedef struct buffer {
    60     char    data[BUF_DATA_SIZE];
    61 } buffer;
    63 PRNetAddr tcp_server_addr, udp_server_addr;
    65 typedef struct Client_Param {
    66     PRNetAddr server_addr;
    67     PRMonitor *exit_mon;    /* monitor to signal on exit */
    68     PRInt32 *exit_counter;    /* counter to decrement, before exit */
    69     PRInt32    datalen;
    70 } Client_Param;
    72 /*
    73  * readn
    74  *    read data from sockfd into buf
    75  */
    76 static PRInt32
    77 readn(PRFileDesc *sockfd, char *buf, int len)
    78 {
    79     int rem;
    80     int bytes;
    81     int offset = 0;
    82 	PRIntervalTime timeout = PR_INTERVAL_NO_TIMEOUT;
    84     for (rem=len; rem; offset += bytes, rem -= bytes) {
    85         DPRINTF(("thread = 0x%lx: calling PR_Recv, bytes = %d\n",
    86             PR_GetCurrentThread(), rem));
    87         bytes = PR_Recv(sockfd, buf + offset, rem, 0,
    88             	timeout);
    89         DPRINTF(("thread = 0x%lx: returning from PR_Recv, bytes = %d\n",
    90             PR_GetCurrentThread(), bytes));
    91         if (bytes < 0) {
    92 			return -1;
    93 		}	
    94     }
    95     return len;
    96 }
    98 /*
    99  * writen
   100  *    write data from buf to sockfd
   101  */
   102 static PRInt32
   103 writen(PRFileDesc *sockfd, char *buf, int len)
   104 {
   105     int rem;
   106     int bytes;
   107     int offset = 0;
   109     for (rem=len; rem; offset += bytes, rem -= bytes) {
   110         DPRINTF(("thread = 0x%lx: calling PR_Send, bytes = %d\n",
   111             PR_GetCurrentThread(), rem));
   112         bytes = PR_Send(sockfd, buf + offset, rem, 0,
   113             PR_INTERVAL_NO_TIMEOUT);
   114         DPRINTF(("thread = 0x%lx: returning from PR_Send, bytes = %d\n",
   115             PR_GetCurrentThread(), bytes));
   116         if (bytes <= 0)
   117             return -1;
   118     }
   119     return len;
   120 }
   122 /*
   123  * TCP_Client
   124  *    Client job
   125  *    Connect to the server at the address specified in the argument.
   126  *    Fill in a buffer, write data to server, read it back and check
   127  *    for data corruption.
   128  *    Close the socket for server connection
   129  */
   130 static void PR_CALLBACK
   131 TCP_Client(void *arg)
   132 {
   133     Client_Param *cp = (Client_Param *) arg;
   134     PRFileDesc *sockfd;
   135     buffer *in_buf, *out_buf;
   136     union PRNetAddr netaddr;
   137     PRInt32 bytes, i, j;
   140     DPRINTF(("TCP client started\n"));
   141     bytes = cp->datalen;
   142     out_buf = PR_NEW(buffer);
   143     if (out_buf == NULL) {
   144         fprintf(stderr,"%s: failed to alloc buffer struct\n", program_name);
   145         failed_already=1;
   146         return;
   147     }
   148     in_buf = PR_NEW(buffer);
   149     if (in_buf == NULL) {
   150         fprintf(stderr,"%s: failed to alloc buffer struct\n", program_name);
   151         failed_already=1;
   152         return;
   153     }
   154     netaddr.inet.family = cp->server_addr.inet.family;
   155     netaddr.inet.port = cp->server_addr.inet.port;
   156     netaddr.inet.ip = cp->server_addr.inet.ip;
   158     for (i = 0; i < num_tcp_connections_per_client; i++) {
   159         if ((sockfd = PR_OpenTCPSocket(PR_AF_INET)) == NULL) {
   160             fprintf(stderr,"%s: PR_OpenTCPSocket failed\n", program_name);
   161             failed_already=1;
   162             return;
   163         }
   165         DPRINTF(("TCP client connecting to server:%d\n", server_port));
   166         if (PR_Connect(sockfd, &netaddr,PR_INTERVAL_NO_TIMEOUT) < 0){
   167         	fprintf(stderr, "PR_Connect failed: (%ld, %ld)\n",
   168             		PR_GetError(), PR_GetOSError());
   169             failed_already=1;
   170             return;
   171         }
   172         for (j = 0; j < num_tcp_mesgs_per_connection; j++) {
   173             /*
   174              * fill in random data
   175              */
   176             memset(out_buf->data, ((PRInt32) (&netaddr)) + i + j, bytes);
   177             /*
   178              * write to server
   179              */
   180             if (writen(sockfd, out_buf->data, bytes) < bytes) {
   181                 fprintf(stderr,"%s: ERROR - TCP_Client:writen\n", program_name);
   182                 failed_already=1;
   183                 return;
   184             }
   185 			/*
   186             DPRINTF(("TCP Client [0x%lx]: out_buf = 0x%lx out_buf[0] = 0x%lx\n",
   187                 PR_GetCurrentThread(), out_buf, (*((int *) out_buf->data))));
   188 			*/
   189             if (readn(sockfd, in_buf->data, bytes) < bytes) {
   190                 fprintf(stderr,"%s: ERROR - TCP_Client:readn\n", program_name);
   191                 failed_already=1;
   192                 return;
   193             }
   194             /*
   195              * verify the data read
   196              */
   197             if (memcmp(in_buf->data, out_buf->data, bytes) != 0) {
   198                 fprintf(stderr,"%s: ERROR - data corruption\n", program_name);
   199                 failed_already=1;
   200                 return;
   201             }
   202         }
   203         /*
   204          * shutdown reads and writes
   205          */
   206         if (PR_Shutdown(sockfd, PR_SHUTDOWN_BOTH) < 0) {
   207             fprintf(stderr,"%s: ERROR - PR_Shutdown\n", program_name);
   208             failed_already=1;
   209         }
   210         PR_Close(sockfd);
   211     }
   213     PR_DELETE(out_buf);
   214     PR_DELETE(in_buf);
   216     /*
   217      * Decrement exit_counter and notify parent thread
   218      */
   220     PR_EnterMonitor(cp->exit_mon);
   221     --(*cp->exit_counter);
   222     PR_Notify(cp->exit_mon);
   223     PR_ExitMonitor(cp->exit_mon);
   224     DPRINTF(("TCP_Client exiting\n"));
   225 }
   227 /*
   228  * TCP_Socket_Client_Server_Test    - concurrent server test
   229  *    
   230  *    Each client connects to the server and sends a chunk of data
   231  *    For each connection, server reads the data
   232  *    from the client and sends it back to the client, unmodified.
   233  *    Each client checks that data received from server is same as the
   234  *    data it sent to the server.
   235  *
   236  */
   238 static PRInt32
   239 TCP_Socket_Client_Server_Test(void)
   240 {
   241     int i;
   242     Client_Param *cparamp;
   243     PRMonitor *mon2;
   244     PRInt32    datalen;
   245     PRInt32    connections = 0;
   246 	PRThread *thr;
   248     datalen = tcp_mesg_size;
   249     connections = 0;
   251     mon2 = PR_NewMonitor();
   252     if (mon2 == NULL) {
   253         fprintf(stderr,"%s: PR_NewMonitor failed\n", program_name);
   254         failed_already=1;
   255         return -1;
   256     }
   258     /*
   259      * Start client jobs
   260      */
   261     cparamp = PR_NEW(Client_Param);
   262     if (cparamp == NULL) {
   263         fprintf(stderr,"%s: PR_NEW failed\n", program_name);
   264         failed_already=1;
   265         return -1;
   266     }
   267     cparamp->server_addr.inet.family = PR_AF_INET;
   268     cparamp->server_addr.inet.port = PR_htons(server_port);
   269     cparamp->server_addr.inet.ip = PR_htonl(PR_INADDR_LOOPBACK);
   270     cparamp->exit_mon = mon2;
   271     cparamp->exit_counter = &connections;
   272     cparamp->datalen = datalen;
   273     for (i = 0; i < num_tcp_clients; i++) {
   274 		thr = PR_CreateThread(PR_USER_THREAD, TCP_Client, (void *)cparamp,
   275         		PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, PR_UNJOINABLE_THREAD, 0);
   276         if (NULL == thr) {
   277             fprintf(stderr,"%s: PR_CreateThread failed\n", program_name);
   278             failed_already=1;
   279             return -1;
   280         }
   281     	PR_EnterMonitor(mon2);
   282         connections++;
   283     	PR_ExitMonitor(mon2);
   284         DPRINTF(("Created TCP client = 0x%lx\n", thr));
   285     }
   286     /* Wait for client jobs to exit */
   287     PR_EnterMonitor(mon2);
   288     while (0 != connections) {
   289         PR_Wait(mon2, PR_INTERVAL_NO_TIMEOUT);
   290         DPRINTF(("Client job count = %d\n", connections));
   291     }
   292     PR_ExitMonitor(mon2);
   293     printf("%30s","TCP_Socket_Client_Server_Test:");
   294     printf("%2ld Server %2ld Clients %2ld connections_per_client\n",1l,
   295         num_tcp_clients, num_tcp_connections_per_client);
   296     printf("%30s %2ld messages_per_connection %4ld bytes_per_message\n",":",
   297         num_tcp_mesgs_per_connection, tcp_mesg_size);
   299     PR_DELETE(cparamp);
   300     return 0;
   301 }
   303 /************************************************************************/
   305 int main(int argc, char **argv)
   306 {
   307     /*
   308      * -d           debug mode
   309      */
   310     PLOptStatus os;
   311     PLOptState *opt;
   312 	program_name = argv[0];
   314     opt = PL_CreateOptState(argc, argv, "dp:");
   315     while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))
   316     {
   317         if (PL_OPT_BAD == os) continue;
   318         switch (opt->option)
   319         {
   320         case 'd':  /* debug mode */
   321             _debug_on = 1;
   322             break;
   323         case 'p':
   324             server_port = atoi(opt->value);
   325             break;
   326         default:
   327             break;
   328         }
   329     }
   330     PL_DestroyOptState(opt);
   332     PR_Init(PR_USER_THREAD, PR_PRIORITY_NORMAL, 0);
   333     PR_STDIO_INIT();
   335     PR_SetConcurrency(4);
   337 	TCP_Socket_Client_Server_Test();
   339     PR_Cleanup();
   340     if (failed_already)
   341 		return 1;
   342     else
   343 		return 0;
   344 }

mercurial