michael@0: /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: /* michael@0: ** File: thruput.c michael@0: ** Description: Test server's throughput capability comparing various michael@0: ** implmentation strategies. michael@0: ** michael@0: ** Note: Requires a server machine and an aribitrary number of michael@0: ** clients to bang on it. Trust the numbers on the server michael@0: ** more than those being displayed by the various clients. michael@0: */ michael@0: michael@0: #include "prerror.h" michael@0: #include "prinrval.h" michael@0: #include "prinit.h" michael@0: #include "prio.h" michael@0: #include "prlock.h" michael@0: #include "prmem.h" michael@0: #include "prnetdb.h" michael@0: #include "prprf.h" michael@0: #include "prthread.h" michael@0: #include "pprio.h" michael@0: #include "plerror.h" michael@0: #include "plgetopt.h" michael@0: michael@0: #define ADDR_BUFFER 100 michael@0: #define PORT_NUMBER 51877 michael@0: #define SAMPLING_INTERVAL 10 michael@0: #define BUFFER_SIZE (32 * 1024) michael@0: michael@0: static PRInt32 domain = PR_AF_INET; michael@0: static PRInt32 protocol = 6; /* TCP */ michael@0: static PRFileDesc *err = NULL; michael@0: static PRIntn concurrency = 1; michael@0: static PRInt32 xport_buffer = -1; michael@0: static PRUint32 initial_streams = 1; michael@0: static PRInt32 buffer_size = BUFFER_SIZE; michael@0: static PRThreadScope thread_scope = PR_LOCAL_THREAD; michael@0: michael@0: typedef struct Shared michael@0: { michael@0: PRLock *ml; michael@0: PRUint32 sampled; michael@0: PRUint32 threads; michael@0: PRIntervalTime timein; michael@0: PRNetAddr server_address; michael@0: } Shared; michael@0: michael@0: static Shared *shared = NULL; michael@0: michael@0: static PRStatus PrintAddress(const PRNetAddr* address) michael@0: { michael@0: char buffer[ADDR_BUFFER]; michael@0: PRStatus rv = PR_NetAddrToString(address, buffer, sizeof(buffer)); michael@0: if (PR_SUCCESS == rv) michael@0: PR_fprintf(err, "%s:%u\n", buffer, PR_ntohs(address->inet.port)); michael@0: else PL_FPrintError(err, "PR_NetAddrToString"); michael@0: return rv; michael@0: } /* PrintAddress */ michael@0: michael@0: michael@0: static void PR_CALLBACK Clientel(void *arg) michael@0: { michael@0: PRStatus rv; michael@0: PRFileDesc *xport; michael@0: PRInt32 bytes, sampled; michael@0: PRIntervalTime now, interval; michael@0: PRBool do_display = PR_FALSE; michael@0: Shared *shared = (Shared*)arg; michael@0: char *buffer = (char*)PR_Malloc(buffer_size); michael@0: PRNetAddr *server_address = &shared->server_address; michael@0: PRIntervalTime connect_timeout = PR_SecondsToInterval(5); michael@0: PRIntervalTime sampling_interval = PR_SecondsToInterval(SAMPLING_INTERVAL); michael@0: michael@0: PR_fprintf(err, "Client connecting to "); michael@0: (void)PrintAddress(server_address); michael@0: michael@0: do michael@0: { michael@0: xport = PR_Socket(domain, PR_SOCK_STREAM, protocol); michael@0: if (NULL == xport) michael@0: { michael@0: PL_FPrintError(err, "PR_Socket"); michael@0: return; michael@0: } michael@0: michael@0: if (xport_buffer != -1) michael@0: { michael@0: PRSocketOptionData data; michael@0: data.option = PR_SockOpt_RecvBufferSize; michael@0: data.value.recv_buffer_size = (PRSize)xport_buffer; michael@0: rv = PR_SetSocketOption(xport, &data); michael@0: if (PR_FAILURE == rv) michael@0: PL_FPrintError(err, "PR_SetSocketOption - ignored"); michael@0: data.option = PR_SockOpt_SendBufferSize; michael@0: data.value.send_buffer_size = (PRSize)xport_buffer; michael@0: rv = PR_SetSocketOption(xport, &data); michael@0: if (PR_FAILURE == rv) michael@0: PL_FPrintError(err, "PR_SetSocketOption - ignored"); michael@0: } michael@0: michael@0: rv = PR_Connect(xport, server_address, connect_timeout); michael@0: if (PR_FAILURE == rv) michael@0: { michael@0: PL_FPrintError(err, "PR_Connect"); michael@0: if (PR_IO_TIMEOUT_ERROR != PR_GetError()) michael@0: PR_Sleep(connect_timeout); michael@0: PR_Close(xport); /* delete it and start over */ michael@0: } michael@0: } while (PR_FAILURE == rv); michael@0: michael@0: do michael@0: { michael@0: bytes = PR_Recv( michael@0: xport, buffer, buffer_size, 0, PR_INTERVAL_NO_TIMEOUT); michael@0: PR_Lock(shared->ml); michael@0: now = PR_IntervalNow(); michael@0: shared->sampled += bytes; michael@0: interval = now - shared->timein; michael@0: if (interval > sampling_interval) michael@0: { michael@0: sampled = shared->sampled; michael@0: shared->timein = now; michael@0: shared->sampled = 0; michael@0: do_display = PR_TRUE; michael@0: } michael@0: PR_Unlock(shared->ml); michael@0: michael@0: if (do_display) michael@0: { michael@0: PRUint32 rate = sampled / PR_IntervalToMilliseconds(interval); michael@0: PR_fprintf(err, "%u streams @ %u Kbytes/sec\n", shared->threads, rate); michael@0: do_display = PR_FALSE; michael@0: } michael@0: michael@0: } while (bytes > 0); michael@0: } /* Clientel */ michael@0: michael@0: static void Client(const char *server_name) michael@0: { michael@0: PRStatus rv; michael@0: PRHostEnt host; michael@0: char buffer[PR_NETDB_BUF_SIZE]; michael@0: PRIntervalTime dally = PR_SecondsToInterval(60); michael@0: PR_fprintf(err, "Translating the name %s\n", server_name); michael@0: rv = PR_GetHostByName(server_name, buffer, sizeof(buffer), &host); michael@0: if (PR_FAILURE == rv) michael@0: PL_FPrintError(err, "PR_GetHostByName"); michael@0: else michael@0: { michael@0: if (PR_EnumerateHostEnt( michael@0: 0, &host, PORT_NUMBER, &shared->server_address) < 0) michael@0: PL_FPrintError(err, "PR_EnumerateHostEnt"); michael@0: else michael@0: { michael@0: do michael@0: { michael@0: shared->threads += 1; michael@0: (void)PR_CreateThread( michael@0: PR_USER_THREAD, Clientel, shared, michael@0: PR_PRIORITY_NORMAL, thread_scope, michael@0: PR_UNJOINABLE_THREAD, 8 * 1024); michael@0: if (shared->threads == initial_streams) michael@0: { michael@0: PR_Sleep(dally); michael@0: initial_streams += 1; michael@0: } michael@0: } while (PR_TRUE); michael@0: } michael@0: } michael@0: } michael@0: michael@0: static void PR_CALLBACK Servette(void *arg) michael@0: { michael@0: PRInt32 bytes, sampled; michael@0: PRIntervalTime now, interval; michael@0: PRBool do_display = PR_FALSE; michael@0: PRFileDesc *client = (PRFileDesc*)arg; michael@0: char *buffer = (char*)PR_Malloc(buffer_size); michael@0: PRIntervalTime sampling_interval = PR_SecondsToInterval(SAMPLING_INTERVAL); michael@0: michael@0: if (xport_buffer != -1) michael@0: { michael@0: PRStatus rv; michael@0: PRSocketOptionData data; michael@0: data.option = PR_SockOpt_RecvBufferSize; michael@0: data.value.recv_buffer_size = (PRSize)xport_buffer; michael@0: rv = PR_SetSocketOption(client, &data); michael@0: if (PR_FAILURE == rv) michael@0: PL_FPrintError(err, "PR_SetSocketOption - ignored"); michael@0: data.option = PR_SockOpt_SendBufferSize; michael@0: data.value.send_buffer_size = (PRSize)xport_buffer; michael@0: rv = PR_SetSocketOption(client, &data); michael@0: if (PR_FAILURE == rv) michael@0: PL_FPrintError(err, "PR_SetSocketOption - ignored"); michael@0: } michael@0: michael@0: do michael@0: { michael@0: bytes = PR_Send( michael@0: client, buffer, buffer_size, 0, PR_INTERVAL_NO_TIMEOUT); michael@0: michael@0: PR_Lock(shared->ml); michael@0: now = PR_IntervalNow(); michael@0: shared->sampled += bytes; michael@0: interval = now - shared->timein; michael@0: if (interval > sampling_interval) michael@0: { michael@0: sampled = shared->sampled; michael@0: shared->timein = now; michael@0: shared->sampled = 0; michael@0: do_display = PR_TRUE; michael@0: } michael@0: PR_Unlock(shared->ml); michael@0: michael@0: if (do_display) michael@0: { michael@0: PRUint32 rate = sampled / PR_IntervalToMilliseconds(interval); michael@0: PR_fprintf(err, "%u streams @ %u Kbytes/sec\n", shared->threads, rate); michael@0: do_display = PR_FALSE; michael@0: } michael@0: } while (bytes > 0); michael@0: } /* Servette */ michael@0: michael@0: static void Server(void) michael@0: { michael@0: PRStatus rv; michael@0: PRNetAddr server_address, client_address; michael@0: PRFileDesc *xport = PR_Socket(domain, PR_SOCK_STREAM, protocol); michael@0: michael@0: if (NULL == xport) michael@0: { michael@0: PL_FPrintError(err, "PR_Socket"); michael@0: return; michael@0: } michael@0: michael@0: rv = PR_InitializeNetAddr(PR_IpAddrAny, PORT_NUMBER, &server_address); michael@0: if (PR_FAILURE == rv) PL_FPrintError(err, "PR_InitializeNetAddr"); michael@0: else michael@0: { michael@0: rv = PR_Bind(xport, &server_address); michael@0: if (PR_FAILURE == rv) PL_FPrintError(err, "PR_Bind"); michael@0: else michael@0: { michael@0: PRFileDesc *client; michael@0: rv = PR_Listen(xport, 10); michael@0: PR_fprintf(err, "Server listening on "); michael@0: (void)PrintAddress(&server_address); michael@0: do michael@0: { michael@0: client = PR_Accept( michael@0: xport, &client_address, PR_INTERVAL_NO_TIMEOUT); michael@0: if (NULL == client) PL_FPrintError(err, "PR_Accept"); michael@0: else michael@0: { michael@0: PR_fprintf(err, "Server accepting from "); michael@0: (void)PrintAddress(&client_address); michael@0: shared->threads += 1; michael@0: (void)PR_CreateThread( michael@0: PR_USER_THREAD, Servette, client, michael@0: PR_PRIORITY_NORMAL, thread_scope, michael@0: PR_UNJOINABLE_THREAD, 8 * 1024); michael@0: } michael@0: } while (PR_TRUE); michael@0: michael@0: } michael@0: } michael@0: } /* Server */ michael@0: michael@0: static void Help(void) michael@0: { michael@0: PR_fprintf(err, "Usage: [-h] []\n"); michael@0: PR_fprintf(err, "\t-s Initial # of connections (default: 1)\n"); michael@0: PR_fprintf(err, "\t-C Set 'concurrency' (default: 1)\n"); michael@0: PR_fprintf(err, "\t-b Client buffer size (default: 32k)\n"); michael@0: PR_fprintf(err, "\t-B Transport recv/send buffer size (default: sys)\n"); michael@0: PR_fprintf(err, "\t-G Use GLOBAL threads (default: LOCAL)\n"); michael@0: PR_fprintf(err, "\t-X Use XTP transport (default: TCP)\n"); michael@0: PR_fprintf(err, "\t-6 Use IPv6 (default: IPv4)\n"); michael@0: PR_fprintf(err, "\t-h This message and nothing else\n"); michael@0: PR_fprintf(err, "\t DNS name of server\n"); michael@0: PR_fprintf(err, "\t\tIf is not specified, this host will be\n"); michael@0: PR_fprintf(err, "\t\tthe server and not act as a client.\n"); michael@0: } /* Help */ michael@0: michael@0: int main(int argc, char **argv) michael@0: { michael@0: PLOptStatus os; michael@0: const char *server_name = NULL; michael@0: PLOptState *opt = PL_CreateOptState(argc, argv, "hGX6C:b:s:B:"); michael@0: michael@0: err = PR_GetSpecialFD(PR_StandardError); michael@0: michael@0: while (PL_OPT_EOL != (os = PL_GetNextOpt(opt))) michael@0: { michael@0: if (PL_OPT_BAD == os) continue; michael@0: switch (opt->option) michael@0: { michael@0: case 0: /* Name of server */ michael@0: server_name = opt->value; michael@0: break; michael@0: case 'G': /* Globular threads */ michael@0: thread_scope = PR_GLOBAL_THREAD; michael@0: break; michael@0: case 'X': /* Use XTP as the transport */ michael@0: protocol = 36; michael@0: break; michael@0: case '6': /* Use IPv6 */ michael@0: domain = PR_AF_INET6; michael@0: break; michael@0: case 's': /* initial_streams */ michael@0: initial_streams = atoi(opt->value); michael@0: break; michael@0: case 'C': /* concurrency */ michael@0: concurrency = atoi(opt->value); michael@0: break; michael@0: case 'b': /* buffer size */ michael@0: buffer_size = 1024 * atoi(opt->value); michael@0: break; michael@0: case 'B': /* buffer size */ michael@0: xport_buffer = 1024 * atoi(opt->value); michael@0: break; michael@0: case 'h': /* user wants some guidance */ michael@0: default: michael@0: Help(); /* so give him an earful */ michael@0: return 2; /* but not a lot else */ michael@0: } michael@0: } michael@0: PL_DestroyOptState(opt); michael@0: michael@0: shared = PR_NEWZAP(Shared); michael@0: shared->ml = PR_NewLock(); michael@0: michael@0: PR_fprintf(err, michael@0: "This machine is %s\n", michael@0: (NULL == server_name) ? "the SERVER" : "a CLIENT"); michael@0: michael@0: PR_fprintf(err, michael@0: "Transport being used is %s\n", michael@0: (6 == protocol) ? "TCP" : "XTP"); michael@0: michael@0: if (PR_GLOBAL_THREAD == thread_scope) michael@0: { michael@0: if (1 != concurrency) michael@0: { michael@0: PR_fprintf(err, " **Concurrency > 1 and GLOBAL threads!?!?\n"); michael@0: PR_fprintf(err, " **Ignoring concurrency\n"); michael@0: concurrency = 1; michael@0: } michael@0: } michael@0: michael@0: if (1 != concurrency) michael@0: { michael@0: PR_SetConcurrency(concurrency); michael@0: PR_fprintf(err, "Concurrency set to %u\n", concurrency); michael@0: } michael@0: michael@0: PR_fprintf(err, michael@0: "All threads will be %s\n", michael@0: (PR_GLOBAL_THREAD == thread_scope) ? "GLOBAL" : "LOCAL"); michael@0: michael@0: PR_fprintf(err, "Client buffer size will be %u\n", buffer_size); michael@0: michael@0: if (-1 != xport_buffer) michael@0: PR_fprintf( michael@0: err, "Transport send & receive buffer size will be %u\n", xport_buffer); michael@0: michael@0: michael@0: if (NULL == server_name) Server(); michael@0: else Client(server_name); michael@0: michael@0: return 0; michael@0: } /* main */ michael@0: michael@0: /* thruput.c */ michael@0: