1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/nsprpub/pr/tests/thrpool_client.c Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,344 @@ 1.4 +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 1.5 +/* This Source Code Form is subject to the terms of the Mozilla Public 1.6 + * License, v. 2.0. If a copy of the MPL was not distributed with this 1.7 + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 1.8 + 1.9 +/*********************************************************************** 1.10 +** 1.11 +** Name: thrpool_client.c 1.12 +** 1.13 +** Description: Test threadpool functionality. 1.14 +** 1.15 +** Modification History: 1.16 +*/ 1.17 +#include "primpl.h" 1.18 + 1.19 +#include "plgetopt.h" 1.20 + 1.21 +#include <stdio.h> 1.22 +#include <string.h> 1.23 +#include <errno.h> 1.24 +#ifdef XP_UNIX 1.25 +#include <sys/mman.h> 1.26 +#endif 1.27 +#if defined(_PR_PTHREADS) && !defined(_PR_DCETHREADS) 1.28 +#include <pthread.h> 1.29 +#endif 1.30 + 1.31 +#ifdef WIN32 1.32 +#include <process.h> 1.33 +#endif 1.34 + 1.35 +static int _debug_on = 0; 1.36 +static int server_port = -1; 1.37 +static char *program_name = NULL; 1.38 + 1.39 +#include "obsolete/prsem.h" 1.40 + 1.41 +#ifdef XP_PC 1.42 +#define mode_t int 1.43 +#endif 1.44 + 1.45 +#define DPRINTF(arg) if (_debug_on) printf arg 1.46 + 1.47 +#define BUF_DATA_SIZE (2 * 1024) 1.48 +#define TCP_MESG_SIZE 1024 1.49 +#define NUM_TCP_CLIENTS 10 /* for a listen queue depth of 5 */ 1.50 + 1.51 +#define NUM_TCP_CONNECTIONS_PER_CLIENT 10 1.52 +#define NUM_TCP_MESGS_PER_CONNECTION 10 1.53 +#define TCP_SERVER_PORT 10000 1.54 + 1.55 +static PRInt32 num_tcp_clients = NUM_TCP_CLIENTS; 1.56 +static PRInt32 num_tcp_connections_per_client = NUM_TCP_CONNECTIONS_PER_CLIENT; 1.57 +static PRInt32 tcp_mesg_size = TCP_MESG_SIZE; 1.58 +static PRInt32 num_tcp_mesgs_per_connection = NUM_TCP_MESGS_PER_CONNECTION; 1.59 + 1.60 +int failed_already=0; 1.61 + 1.62 +typedef struct buffer { 1.63 + char data[BUF_DATA_SIZE]; 1.64 +} buffer; 1.65 + 1.66 +PRNetAddr tcp_server_addr, udp_server_addr; 1.67 + 1.68 +typedef struct Client_Param { 1.69 + PRNetAddr server_addr; 1.70 + PRMonitor *exit_mon; /* monitor to signal on exit */ 1.71 + PRInt32 *exit_counter; /* counter to decrement, before exit */ 1.72 + PRInt32 datalen; 1.73 +} Client_Param; 1.74 + 1.75 +/* 1.76 + * readn 1.77 + * read data from sockfd into buf 1.78 + */ 1.79 +static PRInt32 1.80 +readn(PRFileDesc *sockfd, char *buf, int len) 1.81 +{ 1.82 + int rem; 1.83 + int bytes; 1.84 + int offset = 0; 1.85 + PRIntervalTime timeout = PR_INTERVAL_NO_TIMEOUT; 1.86 + 1.87 + for (rem=len; rem; offset += bytes, rem -= bytes) { 1.88 + DPRINTF(("thread = 0x%lx: calling PR_Recv, bytes = %d\n", 1.89 + PR_GetCurrentThread(), rem)); 1.90 + bytes = PR_Recv(sockfd, buf + offset, rem, 0, 1.91 + timeout); 1.92 + DPRINTF(("thread = 0x%lx: returning from PR_Recv, bytes = %d\n", 1.93 + PR_GetCurrentThread(), bytes)); 1.94 + if (bytes < 0) { 1.95 + return -1; 1.96 + } 1.97 + } 1.98 + return len; 1.99 +} 1.100 + 1.101 +/* 1.102 + * writen 1.103 + * write data from buf to sockfd 1.104 + */ 1.105 +static PRInt32 1.106 +writen(PRFileDesc *sockfd, char *buf, int len) 1.107 +{ 1.108 + int rem; 1.109 + int bytes; 1.110 + int offset = 0; 1.111 + 1.112 + for (rem=len; rem; offset += bytes, rem -= bytes) { 1.113 + DPRINTF(("thread = 0x%lx: calling PR_Send, bytes = %d\n", 1.114 + PR_GetCurrentThread(), rem)); 1.115 + bytes = PR_Send(sockfd, buf + offset, rem, 0, 1.116 + PR_INTERVAL_NO_TIMEOUT); 1.117 + DPRINTF(("thread = 0x%lx: returning from PR_Send, bytes = %d\n", 1.118 + PR_GetCurrentThread(), bytes)); 1.119 + if (bytes <= 0) 1.120 + return -1; 1.121 + } 1.122 + return len; 1.123 +} 1.124 + 1.125 +/* 1.126 + * TCP_Client 1.127 + * Client job 1.128 + * Connect to the server at the address specified in the argument. 1.129 + * Fill in a buffer, write data to server, read it back and check 1.130 + * for data corruption. 1.131 + * Close the socket for server connection 1.132 + */ 1.133 +static void PR_CALLBACK 1.134 +TCP_Client(void *arg) 1.135 +{ 1.136 + Client_Param *cp = (Client_Param *) arg; 1.137 + PRFileDesc *sockfd; 1.138 + buffer *in_buf, *out_buf; 1.139 + union PRNetAddr netaddr; 1.140 + PRInt32 bytes, i, j; 1.141 + 1.142 + 1.143 + DPRINTF(("TCP client started\n")); 1.144 + bytes = cp->datalen; 1.145 + out_buf = PR_NEW(buffer); 1.146 + if (out_buf == NULL) { 1.147 + fprintf(stderr,"%s: failed to alloc buffer struct\n", program_name); 1.148 + failed_already=1; 1.149 + return; 1.150 + } 1.151 + in_buf = PR_NEW(buffer); 1.152 + if (in_buf == NULL) { 1.153 + fprintf(stderr,"%s: failed to alloc buffer struct\n", program_name); 1.154 + failed_already=1; 1.155 + return; 1.156 + } 1.157 + netaddr.inet.family = cp->server_addr.inet.family; 1.158 + netaddr.inet.port = cp->server_addr.inet.port; 1.159 + netaddr.inet.ip = cp->server_addr.inet.ip; 1.160 + 1.161 + for (i = 0; i < num_tcp_connections_per_client; i++) { 1.162 + if ((sockfd = PR_OpenTCPSocket(PR_AF_INET)) == NULL) { 1.163 + fprintf(stderr,"%s: PR_OpenTCPSocket failed\n", program_name); 1.164 + failed_already=1; 1.165 + return; 1.166 + } 1.167 + 1.168 + DPRINTF(("TCP client connecting to server:%d\n", server_port)); 1.169 + if (PR_Connect(sockfd, &netaddr,PR_INTERVAL_NO_TIMEOUT) < 0){ 1.170 + fprintf(stderr, "PR_Connect failed: (%ld, %ld)\n", 1.171 + PR_GetError(), PR_GetOSError()); 1.172 + failed_already=1; 1.173 + return; 1.174 + } 1.175 + for (j = 0; j < num_tcp_mesgs_per_connection; j++) { 1.176 + /* 1.177 + * fill in random data 1.178 + */ 1.179 + memset(out_buf->data, ((PRInt32) (&netaddr)) + i + j, bytes); 1.180 + /* 1.181 + * write to server 1.182 + */ 1.183 + if (writen(sockfd, out_buf->data, bytes) < bytes) { 1.184 + fprintf(stderr,"%s: ERROR - TCP_Client:writen\n", program_name); 1.185 + failed_already=1; 1.186 + return; 1.187 + } 1.188 + /* 1.189 + DPRINTF(("TCP Client [0x%lx]: out_buf = 0x%lx out_buf[0] = 0x%lx\n", 1.190 + PR_GetCurrentThread(), out_buf, (*((int *) out_buf->data)))); 1.191 + */ 1.192 + if (readn(sockfd, in_buf->data, bytes) < bytes) { 1.193 + fprintf(stderr,"%s: ERROR - TCP_Client:readn\n", program_name); 1.194 + failed_already=1; 1.195 + return; 1.196 + } 1.197 + /* 1.198 + * verify the data read 1.199 + */ 1.200 + if (memcmp(in_buf->data, out_buf->data, bytes) != 0) { 1.201 + fprintf(stderr,"%s: ERROR - data corruption\n", program_name); 1.202 + failed_already=1; 1.203 + return; 1.204 + } 1.205 + } 1.206 + /* 1.207 + * shutdown reads and writes 1.208 + */ 1.209 + if (PR_Shutdown(sockfd, PR_SHUTDOWN_BOTH) < 0) { 1.210 + fprintf(stderr,"%s: ERROR - PR_Shutdown\n", program_name); 1.211 + failed_already=1; 1.212 + } 1.213 + PR_Close(sockfd); 1.214 + } 1.215 + 1.216 + PR_DELETE(out_buf); 1.217 + PR_DELETE(in_buf); 1.218 + 1.219 + /* 1.220 + * Decrement exit_counter and notify parent thread 1.221 + */ 1.222 + 1.223 + PR_EnterMonitor(cp->exit_mon); 1.224 + --(*cp->exit_counter); 1.225 + PR_Notify(cp->exit_mon); 1.226 + PR_ExitMonitor(cp->exit_mon); 1.227 + DPRINTF(("TCP_Client exiting\n")); 1.228 +} 1.229 + 1.230 +/* 1.231 + * TCP_Socket_Client_Server_Test - concurrent server test 1.232 + * 1.233 + * Each client connects to the server and sends a chunk of data 1.234 + * For each connection, server reads the data 1.235 + * from the client and sends it back to the client, unmodified. 1.236 + * Each client checks that data received from server is same as the 1.237 + * data it sent to the server. 1.238 + * 1.239 + */ 1.240 + 1.241 +static PRInt32 1.242 +TCP_Socket_Client_Server_Test(void) 1.243 +{ 1.244 + int i; 1.245 + Client_Param *cparamp; 1.246 + PRMonitor *mon2; 1.247 + PRInt32 datalen; 1.248 + PRInt32 connections = 0; 1.249 + PRThread *thr; 1.250 + 1.251 + datalen = tcp_mesg_size; 1.252 + connections = 0; 1.253 + 1.254 + mon2 = PR_NewMonitor(); 1.255 + if (mon2 == NULL) { 1.256 + fprintf(stderr,"%s: PR_NewMonitor failed\n", program_name); 1.257 + failed_already=1; 1.258 + return -1; 1.259 + } 1.260 + 1.261 + /* 1.262 + * Start client jobs 1.263 + */ 1.264 + cparamp = PR_NEW(Client_Param); 1.265 + if (cparamp == NULL) { 1.266 + fprintf(stderr,"%s: PR_NEW failed\n", program_name); 1.267 + failed_already=1; 1.268 + return -1; 1.269 + } 1.270 + cparamp->server_addr.inet.family = PR_AF_INET; 1.271 + cparamp->server_addr.inet.port = PR_htons(server_port); 1.272 + cparamp->server_addr.inet.ip = PR_htonl(PR_INADDR_LOOPBACK); 1.273 + cparamp->exit_mon = mon2; 1.274 + cparamp->exit_counter = &connections; 1.275 + cparamp->datalen = datalen; 1.276 + for (i = 0; i < num_tcp_clients; i++) { 1.277 + thr = PR_CreateThread(PR_USER_THREAD, TCP_Client, (void *)cparamp, 1.278 + PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, PR_UNJOINABLE_THREAD, 0); 1.279 + if (NULL == thr) { 1.280 + fprintf(stderr,"%s: PR_CreateThread failed\n", program_name); 1.281 + failed_already=1; 1.282 + return -1; 1.283 + } 1.284 + PR_EnterMonitor(mon2); 1.285 + connections++; 1.286 + PR_ExitMonitor(mon2); 1.287 + DPRINTF(("Created TCP client = 0x%lx\n", thr)); 1.288 + } 1.289 + /* Wait for client jobs to exit */ 1.290 + PR_EnterMonitor(mon2); 1.291 + while (0 != connections) { 1.292 + PR_Wait(mon2, PR_INTERVAL_NO_TIMEOUT); 1.293 + DPRINTF(("Client job count = %d\n", connections)); 1.294 + } 1.295 + PR_ExitMonitor(mon2); 1.296 + printf("%30s","TCP_Socket_Client_Server_Test:"); 1.297 + printf("%2ld Server %2ld Clients %2ld connections_per_client\n",1l, 1.298 + num_tcp_clients, num_tcp_connections_per_client); 1.299 + printf("%30s %2ld messages_per_connection %4ld bytes_per_message\n",":", 1.300 + num_tcp_mesgs_per_connection, tcp_mesg_size); 1.301 + 1.302 + PR_DELETE(cparamp); 1.303 + return 0; 1.304 +} 1.305 + 1.306 +/************************************************************************/ 1.307 + 1.308 +int main(int argc, char **argv) 1.309 +{ 1.310 + /* 1.311 + * -d debug mode 1.312 + */ 1.313 + PLOptStatus os; 1.314 + PLOptState *opt; 1.315 + program_name = argv[0]; 1.316 + 1.317 + opt = PL_CreateOptState(argc, argv, "dp:"); 1.318 + while (PL_OPT_EOL != (os = PL_GetNextOpt(opt))) 1.319 + { 1.320 + if (PL_OPT_BAD == os) continue; 1.321 + switch (opt->option) 1.322 + { 1.323 + case 'd': /* debug mode */ 1.324 + _debug_on = 1; 1.325 + break; 1.326 + case 'p': 1.327 + server_port = atoi(opt->value); 1.328 + break; 1.329 + default: 1.330 + break; 1.331 + } 1.332 + } 1.333 + PL_DestroyOptState(opt); 1.334 + 1.335 + PR_Init(PR_USER_THREAD, PR_PRIORITY_NORMAL, 0); 1.336 + PR_STDIO_INIT(); 1.337 + 1.338 + PR_SetConcurrency(4); 1.339 + 1.340 + TCP_Socket_Client_Server_Test(); 1.341 + 1.342 + PR_Cleanup(); 1.343 + if (failed_already) 1.344 + return 1; 1.345 + else 1.346 + return 0; 1.347 +}