nsprpub/pr/tests/thrpool_client.c

changeset 0
6474c204b198
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/nsprpub/pr/tests/thrpool_client.c	Wed Dec 31 06:09:35 2014 +0100
     1.3 @@ -0,0 +1,344 @@
     1.4 +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
     1.5 +/* This Source Code Form is subject to the terms of the Mozilla Public
     1.6 + * License, v. 2.0. If a copy of the MPL was not distributed with this
     1.7 + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
     1.8 +
     1.9 +/***********************************************************************
    1.10 +**
    1.11 +** Name: thrpool_client.c
    1.12 +**
    1.13 +** Description: Test threadpool functionality.
    1.14 +**
    1.15 +** Modification History:
    1.16 +*/
    1.17 +#include "primpl.h"
    1.18 +
    1.19 +#include "plgetopt.h"
    1.20 +
    1.21 +#include <stdio.h>
    1.22 +#include <string.h>
    1.23 +#include <errno.h>
    1.24 +#ifdef XP_UNIX
    1.25 +#include <sys/mman.h>
    1.26 +#endif
    1.27 +#if defined(_PR_PTHREADS) && !defined(_PR_DCETHREADS)
    1.28 +#include <pthread.h>
    1.29 +#endif
    1.30 +
    1.31 +#ifdef WIN32
    1.32 +#include <process.h>
    1.33 +#endif
    1.34 +
    1.35 +static int _debug_on = 0;
    1.36 +static int server_port = -1;
    1.37 +static char *program_name = NULL;
    1.38 +
    1.39 +#include "obsolete/prsem.h"
    1.40 +
    1.41 +#ifdef XP_PC
    1.42 +#define mode_t int
    1.43 +#endif
    1.44 +
    1.45 +#define DPRINTF(arg) if (_debug_on) printf arg
    1.46 +
    1.47 +#define    BUF_DATA_SIZE    (2 * 1024)
    1.48 +#define TCP_MESG_SIZE    1024
    1.49 +#define NUM_TCP_CLIENTS            10	/* for a listen queue depth of 5 */
    1.50 +
    1.51 +#define NUM_TCP_CONNECTIONS_PER_CLIENT    10
    1.52 +#define NUM_TCP_MESGS_PER_CONNECTION    10
    1.53 +#define TCP_SERVER_PORT            10000
    1.54 +
    1.55 +static PRInt32 num_tcp_clients = NUM_TCP_CLIENTS;
    1.56 +static PRInt32 num_tcp_connections_per_client = NUM_TCP_CONNECTIONS_PER_CLIENT;
    1.57 +static PRInt32 tcp_mesg_size = TCP_MESG_SIZE;
    1.58 +static PRInt32 num_tcp_mesgs_per_connection = NUM_TCP_MESGS_PER_CONNECTION;
    1.59 +
    1.60 +int failed_already=0;
    1.61 +
    1.62 +typedef struct buffer {
    1.63 +    char    data[BUF_DATA_SIZE];
    1.64 +} buffer;
    1.65 +
    1.66 +PRNetAddr tcp_server_addr, udp_server_addr;
    1.67 +
    1.68 +typedef struct Client_Param {
    1.69 +    PRNetAddr server_addr;
    1.70 +    PRMonitor *exit_mon;    /* monitor to signal on exit */
    1.71 +    PRInt32 *exit_counter;    /* counter to decrement, before exit */
    1.72 +    PRInt32    datalen;
    1.73 +} Client_Param;
    1.74 +
    1.75 +/*
    1.76 + * readn
    1.77 + *    read data from sockfd into buf
    1.78 + */
    1.79 +static PRInt32
    1.80 +readn(PRFileDesc *sockfd, char *buf, int len)
    1.81 +{
    1.82 +    int rem;
    1.83 +    int bytes;
    1.84 +    int offset = 0;
    1.85 +	PRIntervalTime timeout = PR_INTERVAL_NO_TIMEOUT;
    1.86 +
    1.87 +    for (rem=len; rem; offset += bytes, rem -= bytes) {
    1.88 +        DPRINTF(("thread = 0x%lx: calling PR_Recv, bytes = %d\n",
    1.89 +            PR_GetCurrentThread(), rem));
    1.90 +        bytes = PR_Recv(sockfd, buf + offset, rem, 0,
    1.91 +            	timeout);
    1.92 +        DPRINTF(("thread = 0x%lx: returning from PR_Recv, bytes = %d\n",
    1.93 +            PR_GetCurrentThread(), bytes));
    1.94 +        if (bytes < 0) {
    1.95 +			return -1;
    1.96 +		}	
    1.97 +    }
    1.98 +    return len;
    1.99 +}
   1.100 +
   1.101 +/*
   1.102 + * writen
   1.103 + *    write data from buf to sockfd
   1.104 + */
   1.105 +static PRInt32
   1.106 +writen(PRFileDesc *sockfd, char *buf, int len)
   1.107 +{
   1.108 +    int rem;
   1.109 +    int bytes;
   1.110 +    int offset = 0;
   1.111 +
   1.112 +    for (rem=len; rem; offset += bytes, rem -= bytes) {
   1.113 +        DPRINTF(("thread = 0x%lx: calling PR_Send, bytes = %d\n",
   1.114 +            PR_GetCurrentThread(), rem));
   1.115 +        bytes = PR_Send(sockfd, buf + offset, rem, 0,
   1.116 +            PR_INTERVAL_NO_TIMEOUT);
   1.117 +        DPRINTF(("thread = 0x%lx: returning from PR_Send, bytes = %d\n",
   1.118 +            PR_GetCurrentThread(), bytes));
   1.119 +        if (bytes <= 0)
   1.120 +            return -1;
   1.121 +    }
   1.122 +    return len;
   1.123 +}
   1.124 +
   1.125 +/*
   1.126 + * TCP_Client
   1.127 + *    Client job
   1.128 + *    Connect to the server at the address specified in the argument.
   1.129 + *    Fill in a buffer, write data to server, read it back and check
   1.130 + *    for data corruption.
   1.131 + *    Close the socket for server connection
   1.132 + */
   1.133 +static void PR_CALLBACK
   1.134 +TCP_Client(void *arg)
   1.135 +{
   1.136 +    Client_Param *cp = (Client_Param *) arg;
   1.137 +    PRFileDesc *sockfd;
   1.138 +    buffer *in_buf, *out_buf;
   1.139 +    union PRNetAddr netaddr;
   1.140 +    PRInt32 bytes, i, j;
   1.141 +
   1.142 +
   1.143 +    DPRINTF(("TCP client started\n"));
   1.144 +    bytes = cp->datalen;
   1.145 +    out_buf = PR_NEW(buffer);
   1.146 +    if (out_buf == NULL) {
   1.147 +        fprintf(stderr,"%s: failed to alloc buffer struct\n", program_name);
   1.148 +        failed_already=1;
   1.149 +        return;
   1.150 +    }
   1.151 +    in_buf = PR_NEW(buffer);
   1.152 +    if (in_buf == NULL) {
   1.153 +        fprintf(stderr,"%s: failed to alloc buffer struct\n", program_name);
   1.154 +        failed_already=1;
   1.155 +        return;
   1.156 +    }
   1.157 +    netaddr.inet.family = cp->server_addr.inet.family;
   1.158 +    netaddr.inet.port = cp->server_addr.inet.port;
   1.159 +    netaddr.inet.ip = cp->server_addr.inet.ip;
   1.160 +
   1.161 +    for (i = 0; i < num_tcp_connections_per_client; i++) {
   1.162 +        if ((sockfd = PR_OpenTCPSocket(PR_AF_INET)) == NULL) {
   1.163 +            fprintf(stderr,"%s: PR_OpenTCPSocket failed\n", program_name);
   1.164 +            failed_already=1;
   1.165 +            return;
   1.166 +        }
   1.167 +
   1.168 +        DPRINTF(("TCP client connecting to server:%d\n", server_port));
   1.169 +        if (PR_Connect(sockfd, &netaddr,PR_INTERVAL_NO_TIMEOUT) < 0){
   1.170 +        	fprintf(stderr, "PR_Connect failed: (%ld, %ld)\n",
   1.171 +            		PR_GetError(), PR_GetOSError());
   1.172 +            failed_already=1;
   1.173 +            return;
   1.174 +        }
   1.175 +        for (j = 0; j < num_tcp_mesgs_per_connection; j++) {
   1.176 +            /*
   1.177 +             * fill in random data
   1.178 +             */
   1.179 +            memset(out_buf->data, ((PRInt32) (&netaddr)) + i + j, bytes);
   1.180 +            /*
   1.181 +             * write to server
   1.182 +             */
   1.183 +            if (writen(sockfd, out_buf->data, bytes) < bytes) {
   1.184 +                fprintf(stderr,"%s: ERROR - TCP_Client:writen\n", program_name);
   1.185 +                failed_already=1;
   1.186 +                return;
   1.187 +            }
   1.188 +			/*
   1.189 +            DPRINTF(("TCP Client [0x%lx]: out_buf = 0x%lx out_buf[0] = 0x%lx\n",
   1.190 +                PR_GetCurrentThread(), out_buf, (*((int *) out_buf->data))));
   1.191 +			*/
   1.192 +            if (readn(sockfd, in_buf->data, bytes) < bytes) {
   1.193 +                fprintf(stderr,"%s: ERROR - TCP_Client:readn\n", program_name);
   1.194 +                failed_already=1;
   1.195 +                return;
   1.196 +            }
   1.197 +            /*
   1.198 +             * verify the data read
   1.199 +             */
   1.200 +            if (memcmp(in_buf->data, out_buf->data, bytes) != 0) {
   1.201 +                fprintf(stderr,"%s: ERROR - data corruption\n", program_name);
   1.202 +                failed_already=1;
   1.203 +                return;
   1.204 +            }
   1.205 +        }
   1.206 +        /*
   1.207 +         * shutdown reads and writes
   1.208 +         */
   1.209 +        if (PR_Shutdown(sockfd, PR_SHUTDOWN_BOTH) < 0) {
   1.210 +            fprintf(stderr,"%s: ERROR - PR_Shutdown\n", program_name);
   1.211 +            failed_already=1;
   1.212 +        }
   1.213 +        PR_Close(sockfd);
   1.214 +    }
   1.215 +
   1.216 +    PR_DELETE(out_buf);
   1.217 +    PR_DELETE(in_buf);
   1.218 +
   1.219 +    /*
   1.220 +     * Decrement exit_counter and notify parent thread
   1.221 +     */
   1.222 +
   1.223 +    PR_EnterMonitor(cp->exit_mon);
   1.224 +    --(*cp->exit_counter);
   1.225 +    PR_Notify(cp->exit_mon);
   1.226 +    PR_ExitMonitor(cp->exit_mon);
   1.227 +    DPRINTF(("TCP_Client exiting\n"));
   1.228 +}
   1.229 +
   1.230 +/*
   1.231 + * TCP_Socket_Client_Server_Test    - concurrent server test
   1.232 + *    
   1.233 + *    Each client connects to the server and sends a chunk of data
   1.234 + *    For each connection, server reads the data
   1.235 + *    from the client and sends it back to the client, unmodified.
   1.236 + *    Each client checks that data received from server is same as the
   1.237 + *    data it sent to the server.
   1.238 + *
   1.239 + */
   1.240 +
   1.241 +static PRInt32
   1.242 +TCP_Socket_Client_Server_Test(void)
   1.243 +{
   1.244 +    int i;
   1.245 +    Client_Param *cparamp;
   1.246 +    PRMonitor *mon2;
   1.247 +    PRInt32    datalen;
   1.248 +    PRInt32    connections = 0;
   1.249 +	PRThread *thr;
   1.250 +
   1.251 +    datalen = tcp_mesg_size;
   1.252 +    connections = 0;
   1.253 +
   1.254 +    mon2 = PR_NewMonitor();
   1.255 +    if (mon2 == NULL) {
   1.256 +        fprintf(stderr,"%s: PR_NewMonitor failed\n", program_name);
   1.257 +        failed_already=1;
   1.258 +        return -1;
   1.259 +    }
   1.260 +
   1.261 +    /*
   1.262 +     * Start client jobs
   1.263 +     */
   1.264 +    cparamp = PR_NEW(Client_Param);
   1.265 +    if (cparamp == NULL) {
   1.266 +        fprintf(stderr,"%s: PR_NEW failed\n", program_name);
   1.267 +        failed_already=1;
   1.268 +        return -1;
   1.269 +    }
   1.270 +    cparamp->server_addr.inet.family = PR_AF_INET;
   1.271 +    cparamp->server_addr.inet.port = PR_htons(server_port);
   1.272 +    cparamp->server_addr.inet.ip = PR_htonl(PR_INADDR_LOOPBACK);
   1.273 +    cparamp->exit_mon = mon2;
   1.274 +    cparamp->exit_counter = &connections;
   1.275 +    cparamp->datalen = datalen;
   1.276 +    for (i = 0; i < num_tcp_clients; i++) {
   1.277 +		thr = PR_CreateThread(PR_USER_THREAD, TCP_Client, (void *)cparamp,
   1.278 +        		PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, PR_UNJOINABLE_THREAD, 0);
   1.279 +        if (NULL == thr) {
   1.280 +            fprintf(stderr,"%s: PR_CreateThread failed\n", program_name);
   1.281 +            failed_already=1;
   1.282 +            return -1;
   1.283 +        }
   1.284 +    	PR_EnterMonitor(mon2);
   1.285 +        connections++;
   1.286 +    	PR_ExitMonitor(mon2);
   1.287 +        DPRINTF(("Created TCP client = 0x%lx\n", thr));
   1.288 +    }
   1.289 +    /* Wait for client jobs to exit */
   1.290 +    PR_EnterMonitor(mon2);
   1.291 +    while (0 != connections) {
   1.292 +        PR_Wait(mon2, PR_INTERVAL_NO_TIMEOUT);
   1.293 +        DPRINTF(("Client job count = %d\n", connections));
   1.294 +    }
   1.295 +    PR_ExitMonitor(mon2);
   1.296 +    printf("%30s","TCP_Socket_Client_Server_Test:");
   1.297 +    printf("%2ld Server %2ld Clients %2ld connections_per_client\n",1l,
   1.298 +        num_tcp_clients, num_tcp_connections_per_client);
   1.299 +    printf("%30s %2ld messages_per_connection %4ld bytes_per_message\n",":",
   1.300 +        num_tcp_mesgs_per_connection, tcp_mesg_size);
   1.301 +
   1.302 +    PR_DELETE(cparamp);
   1.303 +    return 0;
   1.304 +}
   1.305 +
   1.306 +/************************************************************************/
   1.307 +
   1.308 +int main(int argc, char **argv)
   1.309 +{
   1.310 +    /*
   1.311 +     * -d           debug mode
   1.312 +     */
   1.313 +    PLOptStatus os;
   1.314 +    PLOptState *opt;
   1.315 +	program_name = argv[0];
   1.316 +
   1.317 +    opt = PL_CreateOptState(argc, argv, "dp:");
   1.318 +    while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))
   1.319 +    {
   1.320 +        if (PL_OPT_BAD == os) continue;
   1.321 +        switch (opt->option)
   1.322 +        {
   1.323 +        case 'd':  /* debug mode */
   1.324 +            _debug_on = 1;
   1.325 +            break;
   1.326 +        case 'p':
   1.327 +            server_port = atoi(opt->value);
   1.328 +            break;
   1.329 +        default:
   1.330 +            break;
   1.331 +        }
   1.332 +    }
   1.333 +    PL_DestroyOptState(opt);
   1.334 +
   1.335 +    PR_Init(PR_USER_THREAD, PR_PRIORITY_NORMAL, 0);
   1.336 +    PR_STDIO_INIT();
   1.337 +
   1.338 +    PR_SetConcurrency(4);
   1.339 +
   1.340 +	TCP_Socket_Client_Server_Test();
   1.341 +
   1.342 +    PR_Cleanup();
   1.343 +    if (failed_already)
   1.344 +		return 1;
   1.345 +    else
   1.346 +		return 0;
   1.347 +}

mercurial