nsprpub/pr/tests/thruput.c

Fri, 16 Jan 2015 04:50:19 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Fri, 16 Jan 2015 04:50:19 +0100
branch
TOR_BUG_9701
changeset 13
44a2da4a2ab2
permissions
-rw-r--r--

Replace accessor implementation with direct member state manipulation, by
request https://trac.torproject.org/projects/tor/ticket/9701#comment:32

michael@0 1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
michael@0 2 /* This Source Code Form is subject to the terms of the Mozilla Public
michael@0 3 * License, v. 2.0. If a copy of the MPL was not distributed with this
michael@0 4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
michael@0 5
michael@0 6 /*
michael@0 7 ** File: thruput.c
michael@0 8 ** Description: Test server's throughput capability comparing various
michael@0 9 ** implmentation strategies.
michael@0 10 **
michael@0 11 ** Note: Requires a server machine and an aribitrary number of
michael@0 12 ** clients to bang on it. Trust the numbers on the server
michael@0 13 ** more than those being displayed by the various clients.
michael@0 14 */
michael@0 15
michael@0 16 #include "prerror.h"
michael@0 17 #include "prinrval.h"
michael@0 18 #include "prinit.h"
michael@0 19 #include "prio.h"
michael@0 20 #include "prlock.h"
michael@0 21 #include "prmem.h"
michael@0 22 #include "prnetdb.h"
michael@0 23 #include "prprf.h"
michael@0 24 #include "prthread.h"
michael@0 25 #include "pprio.h"
michael@0 26 #include "plerror.h"
michael@0 27 #include "plgetopt.h"
michael@0 28
michael@0 29 #define ADDR_BUFFER 100
michael@0 30 #define PORT_NUMBER 51877
michael@0 31 #define SAMPLING_INTERVAL 10
michael@0 32 #define BUFFER_SIZE (32 * 1024)
michael@0 33
michael@0 34 static PRInt32 domain = PR_AF_INET;
michael@0 35 static PRInt32 protocol = 6; /* TCP */
michael@0 36 static PRFileDesc *err = NULL;
michael@0 37 static PRIntn concurrency = 1;
michael@0 38 static PRInt32 xport_buffer = -1;
michael@0 39 static PRUint32 initial_streams = 1;
michael@0 40 static PRInt32 buffer_size = BUFFER_SIZE;
michael@0 41 static PRThreadScope thread_scope = PR_LOCAL_THREAD;
michael@0 42
michael@0 43 typedef struct Shared
michael@0 44 {
michael@0 45 PRLock *ml;
michael@0 46 PRUint32 sampled;
michael@0 47 PRUint32 threads;
michael@0 48 PRIntervalTime timein;
michael@0 49 PRNetAddr server_address;
michael@0 50 } Shared;
michael@0 51
michael@0 52 static Shared *shared = NULL;
michael@0 53
michael@0 54 static PRStatus PrintAddress(const PRNetAddr* address)
michael@0 55 {
michael@0 56 char buffer[ADDR_BUFFER];
michael@0 57 PRStatus rv = PR_NetAddrToString(address, buffer, sizeof(buffer));
michael@0 58 if (PR_SUCCESS == rv)
michael@0 59 PR_fprintf(err, "%s:%u\n", buffer, PR_ntohs(address->inet.port));
michael@0 60 else PL_FPrintError(err, "PR_NetAddrToString");
michael@0 61 return rv;
michael@0 62 } /* PrintAddress */
michael@0 63
michael@0 64
michael@0 65 static void PR_CALLBACK Clientel(void *arg)
michael@0 66 {
michael@0 67 PRStatus rv;
michael@0 68 PRFileDesc *xport;
michael@0 69 PRInt32 bytes, sampled;
michael@0 70 PRIntervalTime now, interval;
michael@0 71 PRBool do_display = PR_FALSE;
michael@0 72 Shared *shared = (Shared*)arg;
michael@0 73 char *buffer = (char*)PR_Malloc(buffer_size);
michael@0 74 PRNetAddr *server_address = &shared->server_address;
michael@0 75 PRIntervalTime connect_timeout = PR_SecondsToInterval(5);
michael@0 76 PRIntervalTime sampling_interval = PR_SecondsToInterval(SAMPLING_INTERVAL);
michael@0 77
michael@0 78 PR_fprintf(err, "Client connecting to ");
michael@0 79 (void)PrintAddress(server_address);
michael@0 80
michael@0 81 do
michael@0 82 {
michael@0 83 xport = PR_Socket(domain, PR_SOCK_STREAM, protocol);
michael@0 84 if (NULL == xport)
michael@0 85 {
michael@0 86 PL_FPrintError(err, "PR_Socket");
michael@0 87 return;
michael@0 88 }
michael@0 89
michael@0 90 if (xport_buffer != -1)
michael@0 91 {
michael@0 92 PRSocketOptionData data;
michael@0 93 data.option = PR_SockOpt_RecvBufferSize;
michael@0 94 data.value.recv_buffer_size = (PRSize)xport_buffer;
michael@0 95 rv = PR_SetSocketOption(xport, &data);
michael@0 96 if (PR_FAILURE == rv)
michael@0 97 PL_FPrintError(err, "PR_SetSocketOption - ignored");
michael@0 98 data.option = PR_SockOpt_SendBufferSize;
michael@0 99 data.value.send_buffer_size = (PRSize)xport_buffer;
michael@0 100 rv = PR_SetSocketOption(xport, &data);
michael@0 101 if (PR_FAILURE == rv)
michael@0 102 PL_FPrintError(err, "PR_SetSocketOption - ignored");
michael@0 103 }
michael@0 104
michael@0 105 rv = PR_Connect(xport, server_address, connect_timeout);
michael@0 106 if (PR_FAILURE == rv)
michael@0 107 {
michael@0 108 PL_FPrintError(err, "PR_Connect");
michael@0 109 if (PR_IO_TIMEOUT_ERROR != PR_GetError())
michael@0 110 PR_Sleep(connect_timeout);
michael@0 111 PR_Close(xport); /* delete it and start over */
michael@0 112 }
michael@0 113 } while (PR_FAILURE == rv);
michael@0 114
michael@0 115 do
michael@0 116 {
michael@0 117 bytes = PR_Recv(
michael@0 118 xport, buffer, buffer_size, 0, PR_INTERVAL_NO_TIMEOUT);
michael@0 119 PR_Lock(shared->ml);
michael@0 120 now = PR_IntervalNow();
michael@0 121 shared->sampled += bytes;
michael@0 122 interval = now - shared->timein;
michael@0 123 if (interval > sampling_interval)
michael@0 124 {
michael@0 125 sampled = shared->sampled;
michael@0 126 shared->timein = now;
michael@0 127 shared->sampled = 0;
michael@0 128 do_display = PR_TRUE;
michael@0 129 }
michael@0 130 PR_Unlock(shared->ml);
michael@0 131
michael@0 132 if (do_display)
michael@0 133 {
michael@0 134 PRUint32 rate = sampled / PR_IntervalToMilliseconds(interval);
michael@0 135 PR_fprintf(err, "%u streams @ %u Kbytes/sec\n", shared->threads, rate);
michael@0 136 do_display = PR_FALSE;
michael@0 137 }
michael@0 138
michael@0 139 } while (bytes > 0);
michael@0 140 } /* Clientel */
michael@0 141
michael@0 142 static void Client(const char *server_name)
michael@0 143 {
michael@0 144 PRStatus rv;
michael@0 145 PRHostEnt host;
michael@0 146 char buffer[PR_NETDB_BUF_SIZE];
michael@0 147 PRIntervalTime dally = PR_SecondsToInterval(60);
michael@0 148 PR_fprintf(err, "Translating the name %s\n", server_name);
michael@0 149 rv = PR_GetHostByName(server_name, buffer, sizeof(buffer), &host);
michael@0 150 if (PR_FAILURE == rv)
michael@0 151 PL_FPrintError(err, "PR_GetHostByName");
michael@0 152 else
michael@0 153 {
michael@0 154 if (PR_EnumerateHostEnt(
michael@0 155 0, &host, PORT_NUMBER, &shared->server_address) < 0)
michael@0 156 PL_FPrintError(err, "PR_EnumerateHostEnt");
michael@0 157 else
michael@0 158 {
michael@0 159 do
michael@0 160 {
michael@0 161 shared->threads += 1;
michael@0 162 (void)PR_CreateThread(
michael@0 163 PR_USER_THREAD, Clientel, shared,
michael@0 164 PR_PRIORITY_NORMAL, thread_scope,
michael@0 165 PR_UNJOINABLE_THREAD, 8 * 1024);
michael@0 166 if (shared->threads == initial_streams)
michael@0 167 {
michael@0 168 PR_Sleep(dally);
michael@0 169 initial_streams += 1;
michael@0 170 }
michael@0 171 } while (PR_TRUE);
michael@0 172 }
michael@0 173 }
michael@0 174 }
michael@0 175
michael@0 176 static void PR_CALLBACK Servette(void *arg)
michael@0 177 {
michael@0 178 PRInt32 bytes, sampled;
michael@0 179 PRIntervalTime now, interval;
michael@0 180 PRBool do_display = PR_FALSE;
michael@0 181 PRFileDesc *client = (PRFileDesc*)arg;
michael@0 182 char *buffer = (char*)PR_Malloc(buffer_size);
michael@0 183 PRIntervalTime sampling_interval = PR_SecondsToInterval(SAMPLING_INTERVAL);
michael@0 184
michael@0 185 if (xport_buffer != -1)
michael@0 186 {
michael@0 187 PRStatus rv;
michael@0 188 PRSocketOptionData data;
michael@0 189 data.option = PR_SockOpt_RecvBufferSize;
michael@0 190 data.value.recv_buffer_size = (PRSize)xport_buffer;
michael@0 191 rv = PR_SetSocketOption(client, &data);
michael@0 192 if (PR_FAILURE == rv)
michael@0 193 PL_FPrintError(err, "PR_SetSocketOption - ignored");
michael@0 194 data.option = PR_SockOpt_SendBufferSize;
michael@0 195 data.value.send_buffer_size = (PRSize)xport_buffer;
michael@0 196 rv = PR_SetSocketOption(client, &data);
michael@0 197 if (PR_FAILURE == rv)
michael@0 198 PL_FPrintError(err, "PR_SetSocketOption - ignored");
michael@0 199 }
michael@0 200
michael@0 201 do
michael@0 202 {
michael@0 203 bytes = PR_Send(
michael@0 204 client, buffer, buffer_size, 0, PR_INTERVAL_NO_TIMEOUT);
michael@0 205
michael@0 206 PR_Lock(shared->ml);
michael@0 207 now = PR_IntervalNow();
michael@0 208 shared->sampled += bytes;
michael@0 209 interval = now - shared->timein;
michael@0 210 if (interval > sampling_interval)
michael@0 211 {
michael@0 212 sampled = shared->sampled;
michael@0 213 shared->timein = now;
michael@0 214 shared->sampled = 0;
michael@0 215 do_display = PR_TRUE;
michael@0 216 }
michael@0 217 PR_Unlock(shared->ml);
michael@0 218
michael@0 219 if (do_display)
michael@0 220 {
michael@0 221 PRUint32 rate = sampled / PR_IntervalToMilliseconds(interval);
michael@0 222 PR_fprintf(err, "%u streams @ %u Kbytes/sec\n", shared->threads, rate);
michael@0 223 do_display = PR_FALSE;
michael@0 224 }
michael@0 225 } while (bytes > 0);
michael@0 226 } /* Servette */
michael@0 227
michael@0 228 static void Server(void)
michael@0 229 {
michael@0 230 PRStatus rv;
michael@0 231 PRNetAddr server_address, client_address;
michael@0 232 PRFileDesc *xport = PR_Socket(domain, PR_SOCK_STREAM, protocol);
michael@0 233
michael@0 234 if (NULL == xport)
michael@0 235 {
michael@0 236 PL_FPrintError(err, "PR_Socket");
michael@0 237 return;
michael@0 238 }
michael@0 239
michael@0 240 rv = PR_InitializeNetAddr(PR_IpAddrAny, PORT_NUMBER, &server_address);
michael@0 241 if (PR_FAILURE == rv) PL_FPrintError(err, "PR_InitializeNetAddr");
michael@0 242 else
michael@0 243 {
michael@0 244 rv = PR_Bind(xport, &server_address);
michael@0 245 if (PR_FAILURE == rv) PL_FPrintError(err, "PR_Bind");
michael@0 246 else
michael@0 247 {
michael@0 248 PRFileDesc *client;
michael@0 249 rv = PR_Listen(xport, 10);
michael@0 250 PR_fprintf(err, "Server listening on ");
michael@0 251 (void)PrintAddress(&server_address);
michael@0 252 do
michael@0 253 {
michael@0 254 client = PR_Accept(
michael@0 255 xport, &client_address, PR_INTERVAL_NO_TIMEOUT);
michael@0 256 if (NULL == client) PL_FPrintError(err, "PR_Accept");
michael@0 257 else
michael@0 258 {
michael@0 259 PR_fprintf(err, "Server accepting from ");
michael@0 260 (void)PrintAddress(&client_address);
michael@0 261 shared->threads += 1;
michael@0 262 (void)PR_CreateThread(
michael@0 263 PR_USER_THREAD, Servette, client,
michael@0 264 PR_PRIORITY_NORMAL, thread_scope,
michael@0 265 PR_UNJOINABLE_THREAD, 8 * 1024);
michael@0 266 }
michael@0 267 } while (PR_TRUE);
michael@0 268
michael@0 269 }
michael@0 270 }
michael@0 271 } /* Server */
michael@0 272
michael@0 273 static void Help(void)
michael@0 274 {
michael@0 275 PR_fprintf(err, "Usage: [-h] [<server>]\n");
michael@0 276 PR_fprintf(err, "\t-s <n> Initial # of connections (default: 1)\n");
michael@0 277 PR_fprintf(err, "\t-C <n> Set 'concurrency' (default: 1)\n");
michael@0 278 PR_fprintf(err, "\t-b <nK> Client buffer size (default: 32k)\n");
michael@0 279 PR_fprintf(err, "\t-B <nK> Transport recv/send buffer size (default: sys)\n");
michael@0 280 PR_fprintf(err, "\t-G Use GLOBAL threads (default: LOCAL)\n");
michael@0 281 PR_fprintf(err, "\t-X Use XTP transport (default: TCP)\n");
michael@0 282 PR_fprintf(err, "\t-6 Use IPv6 (default: IPv4)\n");
michael@0 283 PR_fprintf(err, "\t-h This message and nothing else\n");
michael@0 284 PR_fprintf(err, "\t<server> DNS name of server\n");
michael@0 285 PR_fprintf(err, "\t\tIf <server> is not specified, this host will be\n");
michael@0 286 PR_fprintf(err, "\t\tthe server and not act as a client.\n");
michael@0 287 } /* Help */
michael@0 288
michael@0 289 int main(int argc, char **argv)
michael@0 290 {
michael@0 291 PLOptStatus os;
michael@0 292 const char *server_name = NULL;
michael@0 293 PLOptState *opt = PL_CreateOptState(argc, argv, "hGX6C:b:s:B:");
michael@0 294
michael@0 295 err = PR_GetSpecialFD(PR_StandardError);
michael@0 296
michael@0 297 while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))
michael@0 298 {
michael@0 299 if (PL_OPT_BAD == os) continue;
michael@0 300 switch (opt->option)
michael@0 301 {
michael@0 302 case 0: /* Name of server */
michael@0 303 server_name = opt->value;
michael@0 304 break;
michael@0 305 case 'G': /* Globular threads */
michael@0 306 thread_scope = PR_GLOBAL_THREAD;
michael@0 307 break;
michael@0 308 case 'X': /* Use XTP as the transport */
michael@0 309 protocol = 36;
michael@0 310 break;
michael@0 311 case '6': /* Use IPv6 */
michael@0 312 domain = PR_AF_INET6;
michael@0 313 break;
michael@0 314 case 's': /* initial_streams */
michael@0 315 initial_streams = atoi(opt->value);
michael@0 316 break;
michael@0 317 case 'C': /* concurrency */
michael@0 318 concurrency = atoi(opt->value);
michael@0 319 break;
michael@0 320 case 'b': /* buffer size */
michael@0 321 buffer_size = 1024 * atoi(opt->value);
michael@0 322 break;
michael@0 323 case 'B': /* buffer size */
michael@0 324 xport_buffer = 1024 * atoi(opt->value);
michael@0 325 break;
michael@0 326 case 'h': /* user wants some guidance */
michael@0 327 default:
michael@0 328 Help(); /* so give him an earful */
michael@0 329 return 2; /* but not a lot else */
michael@0 330 }
michael@0 331 }
michael@0 332 PL_DestroyOptState(opt);
michael@0 333
michael@0 334 shared = PR_NEWZAP(Shared);
michael@0 335 shared->ml = PR_NewLock();
michael@0 336
michael@0 337 PR_fprintf(err,
michael@0 338 "This machine is %s\n",
michael@0 339 (NULL == server_name) ? "the SERVER" : "a CLIENT");
michael@0 340
michael@0 341 PR_fprintf(err,
michael@0 342 "Transport being used is %s\n",
michael@0 343 (6 == protocol) ? "TCP" : "XTP");
michael@0 344
michael@0 345 if (PR_GLOBAL_THREAD == thread_scope)
michael@0 346 {
michael@0 347 if (1 != concurrency)
michael@0 348 {
michael@0 349 PR_fprintf(err, " **Concurrency > 1 and GLOBAL threads!?!?\n");
michael@0 350 PR_fprintf(err, " **Ignoring concurrency\n");
michael@0 351 concurrency = 1;
michael@0 352 }
michael@0 353 }
michael@0 354
michael@0 355 if (1 != concurrency)
michael@0 356 {
michael@0 357 PR_SetConcurrency(concurrency);
michael@0 358 PR_fprintf(err, "Concurrency set to %u\n", concurrency);
michael@0 359 }
michael@0 360
michael@0 361 PR_fprintf(err,
michael@0 362 "All threads will be %s\n",
michael@0 363 (PR_GLOBAL_THREAD == thread_scope) ? "GLOBAL" : "LOCAL");
michael@0 364
michael@0 365 PR_fprintf(err, "Client buffer size will be %u\n", buffer_size);
michael@0 366
michael@0 367 if (-1 != xport_buffer)
michael@0 368 PR_fprintf(
michael@0 369 err, "Transport send & receive buffer size will be %u\n", xport_buffer);
michael@0 370
michael@0 371
michael@0 372 if (NULL == server_name) Server();
michael@0 373 else Client(server_name);
michael@0 374
michael@0 375 return 0;
michael@0 376 } /* main */
michael@0 377
michael@0 378 /* thruput.c */
michael@0 379

mercurial