Fri, 16 Jan 2015 04:50:19 +0100
Replace accessor implementation with direct member state manipulation, by
request https://trac.torproject.org/projects/tor/ticket/9701#comment:32
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 /***********************************************************************
7 **
8 ** This server simulates a server running in loopback mode.
9 **
10 ** The idea is that a single server is created. The server initially creates
11 ** a number of worker threads. Then, with the server running, a number of
12 ** clients are created which start requesting service from the server.
13 **
14 **
15 ** Modification History:
16 ** 19-May-97 AGarcia- Converted the test to accomodate the debug_mode flag.
17 ** The debug mode will print all of the printfs associated with this test.
18 ** The regress mode will be the default mode. Since the regress tool limits
19 ** the output to a one line status:PASS or FAIL,all of the printf statements
20 ** have been handled with an if (debug_mode) statement.
21 ** 04-June-97 AGarcia removed the Test_Result function. Regress tool has been updated to
22 ** recognize the return code from tha main program.
23 ***********************************************************************/
25 /***********************************************************************
26 ** Includes
27 ***********************************************************************/
28 /* Used to get the command line option */
29 #include "plgetopt.h"
31 #include "nspr.h"
32 #include "pprthred.h"
34 #include <string.h>
36 #define PORT 15004
37 #define THREAD_STACKSIZE 0
39 static int _iterations = 1000;
40 static int _clients = 1;
41 static int _client_data = 250;
42 static int _server_data = (8*1024);
44 static PRThreadScope ServerScope, ClientScope;
46 #define SERVER "Server"
47 #define MAIN "Main"
49 #define SERVER_STATE_STARTUP 0
50 #define SERVER_STATE_READY 1
51 #define SERVER_STATE_DYING 2
52 #define SERVER_STATE_DEAD 4
53 int ServerState;
54 PRLock *ServerStateCVLock;
55 PRCondVar *ServerStateCV;
57 #ifdef DEBUGPRINTS
58 #define DPRINTF printf
59 #else
60 #define DPRINTF
61 #endif
63 PRIntn failed_already=0;
64 PRIntn debug_mode;
66 static void do_work(void);
68 /* --- Server state functions --------------------------------------------- */
69 void
70 SetServerState(char *waiter, PRInt32 state)
71 {
72 PR_Lock(ServerStateCVLock);
73 ServerState = state;
74 PR_NotifyCondVar(ServerStateCV);
76 if (debug_mode) DPRINTF("\t%s changed state to %d\n", waiter, state);
78 PR_Unlock(ServerStateCVLock);
79 }
81 int
82 WaitServerState(char *waiter, PRInt32 state)
83 {
84 PRInt32 rv;
86 PR_Lock(ServerStateCVLock);
88 if (debug_mode) DPRINTF("\t%s waiting for state %d\n", waiter, state);
90 while(!(ServerState & state))
91 PR_WaitCondVar(ServerStateCV, PR_INTERVAL_NO_TIMEOUT);
92 rv = ServerState;
94 if (debug_mode) DPRINTF("\t%s resuming from wait for state %d; state now %d\n",
95 waiter, state, ServerState);
96 PR_Unlock(ServerStateCVLock);
98 return rv;
99 }
101 /* --- Server Functions ------------------------------------------- */
103 PRLock *workerThreadsLock;
104 PRInt32 workerThreads;
105 PRInt32 workerThreadsBusy;
107 void
108 WorkerThreadFunc(void *_listenSock)
109 {
110 PRFileDesc *listenSock = (PRFileDesc *)_listenSock;
111 PRInt32 bytesRead;
112 PRInt32 bytesWritten;
113 char *dataBuf;
114 char *sendBuf;
116 if (debug_mode) DPRINTF("\tServer buffer is %d bytes; %d data, %d netaddrs\n",
117 _client_data+(2*sizeof(PRNetAddr))+32, _client_data, (2*sizeof(PRNetAddr))+32);
118 dataBuf = (char *)PR_MALLOC(_client_data + 2*sizeof(PRNetAddr) + 32);
119 if (!dataBuf)
120 if (debug_mode) printf("\tServer could not malloc space!?\n");
121 sendBuf = (char *)PR_MALLOC(_server_data *sizeof(char));
122 if (!sendBuf)
123 if (debug_mode) printf("\tServer could not malloc space!?\n");
125 if (debug_mode) DPRINTF("\tServer worker thread running\n");
127 while(1) {
128 PRInt32 bytesToRead = _client_data;
129 PRInt32 bytesToWrite = _server_data;
130 PRFileDesc *newSock;
131 PRNetAddr *rAddr;
132 PRInt32 loops = 0;
134 loops++;
136 if (debug_mode) DPRINTF("\tServer thread going into accept\n");
138 bytesRead = PR_AcceptRead(listenSock,
139 &newSock,
140 &rAddr,
141 dataBuf,
142 bytesToRead,
143 PR_INTERVAL_NO_TIMEOUT);
145 if (bytesRead < 0) {
146 if (debug_mode) printf("\tServer error in accept (%d)\n", bytesRead);
147 continue;
148 }
150 if (debug_mode) DPRINTF("\tServer accepted connection (%d bytes)\n", bytesRead);
152 PR_AtomicIncrement(&workerThreadsBusy);
153 #ifdef SYMBIAN
154 if (workerThreadsBusy == workerThreads && workerThreads<1) {
155 #else
156 if (workerThreadsBusy == workerThreads) {
157 #endif
158 PR_Lock(workerThreadsLock);
159 if (workerThreadsBusy == workerThreads) {
160 PRThread *WorkerThread;
162 WorkerThread = PR_CreateThread(
163 PR_SYSTEM_THREAD,
164 WorkerThreadFunc,
165 listenSock,
166 PR_PRIORITY_NORMAL,
167 ServerScope,
168 PR_UNJOINABLE_THREAD,
169 THREAD_STACKSIZE);
171 if (!WorkerThread) {
172 if (debug_mode) printf("Error creating client thread %d\n", workerThreads);
173 } else {
174 PR_AtomicIncrement(&workerThreads);
175 if (debug_mode) DPRINTF("\tServer creates worker (%d)\n", workerThreads);
176 }
177 }
178 PR_Unlock(workerThreadsLock);
179 }
181 bytesToRead -= bytesRead;
182 while (bytesToRead) {
183 bytesRead = PR_Recv(newSock,
184 dataBuf,
185 bytesToRead,
186 0,
187 PR_INTERVAL_NO_TIMEOUT);
188 if (bytesRead < 0) {
189 if (debug_mode) printf("\tServer error receiving data (%d)\n", bytesRead);
190 continue;
191 }
192 if (debug_mode) DPRINTF("\tServer received %d bytes\n", bytesRead);
193 }
195 bytesWritten = PR_Send(newSock,
196 sendBuf,
197 bytesToWrite,
198 0,
199 PR_INTERVAL_NO_TIMEOUT);
200 if (bytesWritten != _server_data) {
201 if (debug_mode) printf("\tError sending data to client (%d, %d)\n",
202 bytesWritten, PR_GetOSError());
203 } else {
204 if (debug_mode) DPRINTF("\tServer sent %d bytes\n", bytesWritten);
205 }
207 PR_Close(newSock);
208 PR_AtomicDecrement(&workerThreadsBusy);
209 }
210 }
212 PRFileDesc *
213 ServerSetup(void)
214 {
215 PRFileDesc *listenSocket;
216 PRSocketOptionData sockOpt;
217 PRNetAddr serverAddr;
218 PRThread *WorkerThread;
220 if ( (listenSocket = PR_NewTCPSocket()) == NULL) {
221 if (debug_mode) printf("\tServer error creating listen socket\n");
222 else failed_already=1;
223 return NULL;
224 }
226 sockOpt.option = PR_SockOpt_Reuseaddr;
227 sockOpt.value.reuse_addr = PR_TRUE;
228 if ( PR_SetSocketOption(listenSocket, &sockOpt) == PR_FAILURE) {
229 if (debug_mode) printf("\tServer error setting socket option: OS error %d\n",
230 PR_GetOSError());
231 else failed_already=1;
232 PR_Close(listenSocket);
233 return NULL;
234 }
236 memset(&serverAddr, 0, sizeof(PRNetAddr));
237 serverAddr.inet.family = PR_AF_INET;
238 serverAddr.inet.port = PR_htons(PORT);
239 serverAddr.inet.ip = PR_htonl(PR_INADDR_ANY);
241 if ( PR_Bind(listenSocket, &serverAddr) == PR_FAILURE) {
242 if (debug_mode) printf("\tServer error binding to server address: OS error %d\n",
243 PR_GetOSError());
244 else failed_already=1;
245 PR_Close(listenSocket);
246 return NULL;
247 }
249 if ( PR_Listen(listenSocket, 128) == PR_FAILURE) {
250 if (debug_mode) printf("\tServer error listening to server socket\n");
251 else failed_already=1;
252 PR_Close(listenSocket);
254 return NULL;
255 }
257 /* Create Clients */
258 workerThreads = 0;
259 workerThreadsBusy = 0;
261 workerThreadsLock = PR_NewLock();
263 WorkerThread = PR_CreateThread(
264 PR_SYSTEM_THREAD,
265 WorkerThreadFunc,
266 listenSocket,
267 PR_PRIORITY_NORMAL,
268 ServerScope,
269 PR_UNJOINABLE_THREAD,
270 THREAD_STACKSIZE);
272 if (!WorkerThread) {
273 if (debug_mode) printf("error creating working thread\n");
274 PR_Close(listenSocket);
275 return NULL;
276 }
277 PR_AtomicIncrement(&workerThreads);
278 if (debug_mode) DPRINTF("\tServer created primordial worker thread\n");
280 return listenSocket;
281 }
283 /* The main server loop */
284 void
285 ServerThreadFunc(void *unused)
286 {
287 PRFileDesc *listenSocket;
289 /* Do setup */
290 listenSocket = ServerSetup();
292 if (!listenSocket) {
293 SetServerState(SERVER, SERVER_STATE_DEAD);
294 } else {
296 if (debug_mode) DPRINTF("\tServer up\n");
298 /* Tell clients they can start now. */
299 SetServerState(SERVER, SERVER_STATE_READY);
301 /* Now wait for server death signal */
302 WaitServerState(SERVER, SERVER_STATE_DYING);
304 /* Cleanup */
305 SetServerState(SERVER, SERVER_STATE_DEAD);
306 }
307 }
309 /* --- Client Functions ------------------------------------------- */
311 PRInt32 numRequests;
312 PRInt32 numClients;
313 PRMonitor *clientMonitor;
315 void
316 ClientThreadFunc(void *unused)
317 {
318 PRNetAddr serverAddr;
319 PRFileDesc *clientSocket;
320 char *sendBuf;
321 char *recvBuf;
322 PRInt32 rv;
323 PRInt32 bytesNeeded;
325 sendBuf = (char *)PR_MALLOC(_client_data * sizeof(char));
326 if (!sendBuf)
327 if (debug_mode) printf("\tClient could not malloc space!?\n");
328 recvBuf = (char *)PR_MALLOC(_server_data * sizeof(char));
329 if (!recvBuf)
330 if (debug_mode) printf("\tClient could not malloc space!?\n");
332 memset(&serverAddr, 0, sizeof(PRNetAddr));
333 serverAddr.inet.family = PR_AF_INET;
334 serverAddr.inet.port = PR_htons(PORT);
335 serverAddr.inet.ip = PR_htonl(PR_INADDR_LOOPBACK);
337 while(numRequests > 0) {
339 if ( (numRequests % 10) == 0 )
340 if (debug_mode) printf(".");
341 if (debug_mode) DPRINTF("\tClient starting request %d\n", numRequests);
343 clientSocket = PR_NewTCPSocket();
344 if (!clientSocket) {
345 if (debug_mode) printf("Client error creating socket: OS error %d\n",
346 PR_GetOSError());
347 continue;
348 }
350 if (debug_mode) DPRINTF("\tClient connecting\n");
352 rv = PR_Connect(clientSocket,
353 &serverAddr,
354 PR_INTERVAL_NO_TIMEOUT);
355 if (!clientSocket) {
356 if (debug_mode) printf("\tClient error connecting\n");
357 continue;
358 }
360 if (debug_mode) DPRINTF("\tClient connected\n");
362 rv = PR_Send(clientSocket,
363 sendBuf,
364 _client_data,
365 0,
366 PR_INTERVAL_NO_TIMEOUT);
367 if (rv != _client_data) {
368 if (debug_mode) printf("Client error sending data (%d)\n", rv);
369 PR_Close(clientSocket);
370 continue;
371 }
373 if (debug_mode) DPRINTF("\tClient sent %d bytes\n", rv);
375 bytesNeeded = _server_data;
376 while(bytesNeeded) {
377 rv = PR_Recv(clientSocket,
378 recvBuf,
379 bytesNeeded,
380 0,
381 PR_INTERVAL_NO_TIMEOUT);
382 if (rv <= 0) {
383 if (debug_mode) printf("Client error receiving data (%d) (%d/%d)\n",
384 rv, (_server_data - bytesNeeded), _server_data);
385 break;
386 }
387 if (debug_mode) DPRINTF("\tClient received %d bytes; need %d more\n", rv, bytesNeeded - rv);
388 bytesNeeded -= rv;
389 }
391 PR_Close(clientSocket);
393 PR_AtomicDecrement(&numRequests);
394 }
396 PR_EnterMonitor(clientMonitor);
397 --numClients;
398 PR_Notify(clientMonitor);
399 PR_ExitMonitor(clientMonitor);
401 PR_DELETE(sendBuf);
402 PR_DELETE(recvBuf);
403 }
405 void
406 RunClients(void)
407 {
408 PRInt32 index;
410 numRequests = _iterations;
411 numClients = _clients;
412 clientMonitor = PR_NewMonitor();
414 for (index=0; index<_clients; index++) {
415 PRThread *clientThread;
418 clientThread = PR_CreateThread(
419 PR_USER_THREAD,
420 ClientThreadFunc,
421 NULL,
422 PR_PRIORITY_NORMAL,
423 ClientScope,
424 PR_UNJOINABLE_THREAD,
425 THREAD_STACKSIZE);
427 if (!clientThread) {
428 if (debug_mode) printf("\terror creating client thread %d\n", index);
429 } else
430 if (debug_mode) DPRINTF("\tMain created client %d/%d\n", index+1, _clients);
432 }
434 PR_EnterMonitor(clientMonitor);
435 while(numClients)
436 PR_Wait(clientMonitor, PR_INTERVAL_NO_TIMEOUT);
437 PR_ExitMonitor(clientMonitor);
438 }
440 /* --- Main Function ---------------------------------------------- */
442 static
443 void do_work()
444 {
445 PRThread *ServerThread;
446 PRInt32 state;
448 SetServerState(MAIN, SERVER_STATE_STARTUP);
449 ServerThread = PR_CreateThread(
450 PR_USER_THREAD,
451 ServerThreadFunc,
452 NULL,
453 PR_PRIORITY_NORMAL,
454 ServerScope,
455 PR_JOINABLE_THREAD,
456 THREAD_STACKSIZE);
457 if (!ServerThread) {
458 if (debug_mode) printf("error creating main server thread\n");
459 return;
460 }
462 /* Wait for server to be ready */
463 state = WaitServerState(MAIN, SERVER_STATE_READY|SERVER_STATE_DEAD);
465 if (!(state & SERVER_STATE_DEAD)) {
466 /* Run Test Clients */
467 RunClients();
469 /* Send death signal to server */
470 SetServerState(MAIN, SERVER_STATE_DYING);
471 }
473 PR_JoinThread(ServerThread);
474 }
477 static void do_workKU(void)
478 {
479 ServerScope = PR_GLOBAL_THREAD;
480 ClientScope = PR_LOCAL_THREAD;
481 do_work();
482 }
486 static void Measure(void (*func)(void), const char *msg)
487 {
488 PRIntervalTime start, stop;
489 double d;
491 start = PR_IntervalNow();
492 (*func)();
493 stop = PR_IntervalNow();
495 d = (double)PR_IntervalToMicroseconds(stop - start);
497 if (debug_mode) printf("\n%40s: %6.2f usec\n", msg, d / _iterations);
498 }
501 int main(int argc, char **argv)
502 {
503 /* The command line argument: -d is used to determine if the test is being run
504 in debug mode. The regress tool requires only one line output:PASS or FAIL.
505 All of the printfs associated with this test has been handled with a if (debug_mode)
506 test.
507 Usage: test_name -d
508 */
509 PLOptStatus os;
510 PLOptState *opt = PL_CreateOptState(argc, argv, "d:");
511 while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))
512 {
513 if (PL_OPT_BAD == os) continue;
514 switch (opt->option)
515 {
516 case 'd': /* debug mode */
517 debug_mode = 1;
518 break;
519 default:
520 break;
521 }
522 }
523 PL_DestroyOptState(opt);
525 /* main test */
526 #ifndef SYMBIAN
527 if (debug_mode) {
528 printf("Enter number of iterations: \n");
529 scanf("%d", &_iterations);
530 printf("Enter number of clients : \n");
531 scanf("%d", &_clients);
532 printf("Enter size of client data : \n");
533 scanf("%d", &_client_data);
534 printf("Enter size of server data : \n");
535 scanf("%d", &_server_data);
536 }
537 else
538 #endif
539 {
540 _iterations = 7;
541 _clients = 7;
542 _client_data = 100;
543 _server_data = 100;
544 }
546 if (debug_mode) {
547 printf("\n\n%d iterations with %d client threads.\n",
548 _iterations, _clients);
549 printf("Sending %d bytes of client data and %d bytes of server data\n",
550 _client_data, _server_data);
551 }
552 PR_Init(PR_USER_THREAD, PR_PRIORITY_NORMAL, 0);
553 PR_STDIO_INIT();
555 PR_SetThreadRecycleMode(64);
557 ServerStateCVLock = PR_NewLock();
558 ServerStateCV = PR_NewCondVar(ServerStateCVLock);
560 Measure(do_workKU, "server loop kernel/user");
562 PR_Cleanup();
563 if(failed_already)
564 return 1;
565 else
566 return 0;
568 }