Wed, 31 Dec 2014 06:55:50 +0100
Added tag UPSTREAM_283F7C6 for changeset ca08bd8f51b2
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 /*
7 *
8 * Notes:
9 * [1] lth. The call to Sleep() is a hack to get the test case to run
10 * on Windows 95. Without it, the test case fails with an error
11 * WSAECONNRESET following a recv() call. The error is caused by the
12 * server side thread termination without a shutdown() or closesocket()
13 * call. Windows docmunentation suggests that this is predicted
14 * behavior; that other platforms get away with it is ... serindipity.
15 * The test case should shutdown() or closesocket() before
16 * thread termination. I didn't have time to figure out where or how
17 * to do it. The Sleep() call inserts enough delay to allow the
18 * client side to recv() all his data before the server side thread
19 * terminates. Whew! ...
20 *
21 ** Modification History:
22 * 14-May-97 AGarcia- Converted the test to accomodate the debug_mode flag.
23 * The debug mode will print all of the printfs associated with this test.
24 * The regress mode will be the default mode. Since the regress tool limits
25 * the output to a one line status:PASS or FAIL,all of the printf statements
26 * have been handled with an if (debug_mode) statement.
27 */
29 #include "prclist.h"
30 #include "prcvar.h"
31 #include "prerror.h"
32 #include "prinit.h"
33 #include "prinrval.h"
34 #include "prio.h"
35 #include "prlock.h"
36 #include "prlog.h"
37 #include "prtime.h"
38 #include "prmem.h"
39 #include "prnetdb.h"
40 #include "prprf.h"
41 #include "prthread.h"
43 #include "pprio.h"
44 #include "primpl.h"
46 #include "plstr.h"
47 #include "plerror.h"
48 #include "plgetopt.h"
50 #include <stdlib.h>
51 #include <string.h>
53 #if defined(XP_UNIX)
54 #include <math.h>
55 #endif
57 /*
58 ** This is the beginning of the test
59 */
61 #define RECV_FLAGS 0
62 #define SEND_FLAGS 0
63 #define DEFAULT_LOW 0
64 #define DEFAULT_HIGH 0
65 #define BUFFER_SIZE 1024
66 #define DEFAULT_BACKLOG 5
67 #define DEFAULT_PORT 12849
68 #define DEFAULT_CLIENTS 1
69 #define ALLOWED_IN_ACCEPT 1
70 #define DEFAULT_CLIPPING 1000
71 #define DEFAULT_WORKERS_MIN 1
72 #define DEFAULT_WORKERS_MAX 1
73 #define DEFAULT_SERVER "localhost"
74 #define DEFAULT_EXECUTION_TIME 10
75 #define DEFAULT_CLIENT_TIMEOUT 4000
76 #define DEFAULT_SERVER_TIMEOUT 4000
77 #define DEFAULT_SERVER_PRIORITY PR_PRIORITY_HIGH
79 typedef enum CSState_e {cs_init, cs_run, cs_stop, cs_exit} CSState_t;
81 static void PR_CALLBACK Worker(void *arg);
82 typedef struct CSPool_s CSPool_t;
83 typedef struct CSWorker_s CSWorker_t;
84 typedef struct CSServer_s CSServer_t;
85 typedef enum Verbosity
86 {
87 TEST_LOG_ALWAYS,
88 TEST_LOG_ERROR,
89 TEST_LOG_WARNING,
90 TEST_LOG_NOTICE,
91 TEST_LOG_INFO,
92 TEST_LOG_STATUS,
93 TEST_LOG_VERBOSE
94 } Verbosity;
96 static PRInt32 domain = AF_INET;
97 static PRInt32 protocol = 6; /* TCP */
98 static PRFileDesc *debug_out = NULL;
99 static PRBool debug_mode = PR_FALSE;
100 static PRBool pthread_stats = PR_FALSE;
101 static Verbosity verbosity = TEST_LOG_ALWAYS;
102 static PRThreadScope thread_scope = PR_LOCAL_THREAD;
104 struct CSWorker_s
105 {
106 PRCList element; /* list of the server's workers */
108 PRThread *thread; /* this worker objects thread */
109 CSServer_t *server; /* back pointer to server structure */
110 };
112 struct CSPool_s
113 {
114 PRCondVar *exiting;
115 PRCondVar *acceptComplete;
116 PRUint32 accepting, active, workers;
117 };
119 struct CSServer_s
120 {
121 PRCList list; /* head of worker list */
123 PRLock *ml;
124 PRThread *thread; /* the main server thread */
125 PRCondVar *stateChange;
127 PRUint16 port; /* port we're listening on */
128 PRUint32 backlog; /* size of our listener backlog */
129 PRFileDesc *listener; /* the fd accepting connections */
131 CSPool_t pool; /* statistics on worker threads */
132 CSState_t state; /* the server's state */
133 struct /* controlling worker counts */
134 {
135 PRUint32 minimum, maximum, accepting;
136 } workers;
138 /* statistics */
139 PRIntervalTime started, stopped;
140 PRUint32 operations, bytesTransferred;
141 };
143 typedef struct CSDescriptor_s
144 {
145 PRInt32 size; /* size of transfer */
146 char filename[60]; /* filename, null padded */
147 } CSDescriptor_t;
149 typedef struct CSClient_s
150 {
151 PRLock *ml;
152 PRThread *thread;
153 PRCondVar *stateChange;
154 PRNetAddr serverAddress;
156 CSState_t state;
158 /* statistics */
159 PRIntervalTime started, stopped;
160 PRUint32 operations, bytesTransferred;
161 } CSClient_t;
163 #define TEST_LOG(l, p, a) \
164 do { \
165 if (debug_mode || (p <= verbosity)) printf a; \
166 } while (0)
168 PRLogModuleInfo *cltsrv_log_file = NULL;
170 #define MY_ASSERT(_expr) \
171 ((_expr)?((void)0):_MY_Assert(# _expr,__FILE__,__LINE__))
173 #define TEST_ASSERT(_expr) \
174 ((_expr)?((void)0):_MY_Assert(# _expr,__FILE__,__LINE__))
176 static void _MY_Assert(const char *s, const char *file, PRIntn ln)
177 {
178 PL_PrintError(NULL);
179 PR_Assert(s, file, ln);
180 } /* _MY_Assert */
182 static PRBool Aborted(PRStatus rv)
183 {
184 return ((PR_FAILURE == rv) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) ?
185 PR_TRUE : PR_FALSE;
186 }
188 static void TimeOfDayMessage(const char *msg, PRThread* me)
189 {
190 char buffer[100];
191 PRExplodedTime tod;
192 PR_ExplodeTime(PR_Now(), PR_LocalTimeParameters, &tod);
193 (void)PR_FormatTime(buffer, sizeof(buffer), "%H:%M:%S", &tod);
195 TEST_LOG(
196 cltsrv_log_file, TEST_LOG_ALWAYS,
197 ("%s(0x%p): %s\n", msg, me, buffer));
198 } /* TimeOfDayMessage */
201 static void PR_CALLBACK Client(void *arg)
202 {
203 PRStatus rv;
204 PRIntn index;
205 char buffer[1024];
206 PRFileDesc *fd = NULL;
207 PRUintn clipping = DEFAULT_CLIPPING;
208 PRThread *me = PR_GetCurrentThread();
209 CSClient_t *client = (CSClient_t*)arg;
210 CSDescriptor_t *descriptor = PR_NEW(CSDescriptor_t);
211 PRIntervalTime timeout = PR_MillisecondsToInterval(DEFAULT_CLIENT_TIMEOUT);
214 for (index = 0; index < sizeof(buffer); ++index)
215 buffer[index] = (char)index;
217 client->started = PR_IntervalNow();
219 PR_Lock(client->ml);
220 client->state = cs_run;
221 PR_NotifyCondVar(client->stateChange);
222 PR_Unlock(client->ml);
224 TimeOfDayMessage("Client started at", me);
226 while (cs_run == client->state)
227 {
228 PRInt32 bytes, descbytes, filebytes, netbytes;
230 (void)PR_NetAddrToString(&client->serverAddress, buffer, sizeof(buffer));
231 TEST_LOG(cltsrv_log_file, TEST_LOG_INFO,
232 ("\tClient(0x%p): connecting to server at %s\n", me, buffer));
234 fd = PR_Socket(domain, SOCK_STREAM, protocol);
235 TEST_ASSERT(NULL != fd);
236 rv = PR_Connect(fd, &client->serverAddress, timeout);
237 if (PR_FAILURE == rv)
238 {
239 TEST_LOG(
240 cltsrv_log_file, TEST_LOG_ERROR,
241 ("\tClient(0x%p): conection failed (%d, %d)\n",
242 me, PR_GetError(), PR_GetOSError()));
243 goto aborted;
244 }
246 memset(descriptor, 0, sizeof(*descriptor));
247 descriptor->size = PR_htonl(descbytes = rand() % clipping);
248 PR_snprintf(
249 descriptor->filename, sizeof(descriptor->filename),
250 "CS%p%p-%p.dat", client->started, me, client->operations);
251 TEST_LOG(
252 cltsrv_log_file, TEST_LOG_VERBOSE,
253 ("\tClient(0x%p): sending descriptor for %u bytes\n", me, descbytes));
254 bytes = PR_Send(
255 fd, descriptor, sizeof(*descriptor), SEND_FLAGS, timeout);
256 if (sizeof(CSDescriptor_t) != bytes)
257 {
258 if (Aborted(PR_FAILURE)) goto aborted;
259 if (PR_IO_TIMEOUT_ERROR == PR_GetError())
260 {
261 TEST_LOG(
262 cltsrv_log_file, TEST_LOG_ERROR,
263 ("\tClient(0x%p): send descriptor timeout\n", me));
264 goto retry;
265 }
266 }
267 TEST_ASSERT(sizeof(*descriptor) == bytes);
269 netbytes = 0;
270 while (netbytes < descbytes)
271 {
272 filebytes = sizeof(buffer);
273 if ((descbytes - netbytes) < filebytes)
274 filebytes = descbytes - netbytes;
275 TEST_LOG(
276 cltsrv_log_file, TEST_LOG_VERBOSE,
277 ("\tClient(0x%p): sending %d bytes\n", me, filebytes));
278 bytes = PR_Send(fd, buffer, filebytes, SEND_FLAGS, timeout);
279 if (filebytes != bytes)
280 {
281 if (Aborted(PR_FAILURE)) goto aborted;
282 if (PR_IO_TIMEOUT_ERROR == PR_GetError())
283 {
284 TEST_LOG(
285 cltsrv_log_file, TEST_LOG_ERROR,
286 ("\tClient(0x%p): send data timeout\n", me));
287 goto retry;
288 }
289 }
290 TEST_ASSERT(bytes == filebytes);
291 netbytes += bytes;
292 }
293 filebytes = 0;
294 while (filebytes < descbytes)
295 {
296 netbytes = sizeof(buffer);
297 if ((descbytes - filebytes) < netbytes)
298 netbytes = descbytes - filebytes;
299 TEST_LOG(
300 cltsrv_log_file, TEST_LOG_VERBOSE,
301 ("\tClient(0x%p): receiving %d bytes\n", me, netbytes));
302 bytes = PR_Recv(fd, buffer, netbytes, RECV_FLAGS, timeout);
303 if (-1 == bytes)
304 {
305 if (Aborted(PR_FAILURE))
306 {
307 TEST_LOG(
308 cltsrv_log_file, TEST_LOG_ERROR,
309 ("\tClient(0x%p): receive data aborted\n", me));
310 goto aborted;
311 }
312 else if (PR_IO_TIMEOUT_ERROR == PR_GetError())
313 TEST_LOG(
314 cltsrv_log_file, TEST_LOG_ERROR,
315 ("\tClient(0x%p): receive data timeout\n", me));
316 else
317 TEST_LOG(
318 cltsrv_log_file, TEST_LOG_ERROR,
319 ("\tClient(0x%p): receive error (%d, %d)\n",
320 me, PR_GetError(), PR_GetOSError()));
321 goto retry;
322 }
323 if (0 == bytes)
324 {
325 TEST_LOG(
326 cltsrv_log_file, TEST_LOG_ERROR,
327 ("\t\tClient(0x%p): unexpected end of stream\n",
328 PR_GetCurrentThread()));
329 break;
330 }
331 filebytes += bytes;
332 }
334 rv = PR_Shutdown(fd, PR_SHUTDOWN_BOTH);
335 if (Aborted(rv)) goto aborted;
336 TEST_ASSERT(PR_SUCCESS == rv);
337 retry:
338 (void)PR_Close(fd); fd = NULL;
339 TEST_LOG(
340 cltsrv_log_file, TEST_LOG_INFO,
341 ("\tClient(0x%p): disconnected from server\n", me));
343 PR_Lock(client->ml);
344 client->operations += 1;
345 client->bytesTransferred += 2 * descbytes;
346 rv = PR_WaitCondVar(client->stateChange, rand() % clipping);
347 PR_Unlock(client->ml);
348 if (Aborted(rv)) break;
349 }
351 aborted:
352 client->stopped = PR_IntervalNow();
354 PR_ClearInterrupt();
355 if (NULL != fd) rv = PR_Close(fd);
357 PR_Lock(client->ml);
358 client->state = cs_exit;
359 PR_NotifyCondVar(client->stateChange);
360 PR_Unlock(client->ml);
361 PR_DELETE(descriptor);
362 TEST_LOG(
363 cltsrv_log_file, TEST_LOG_ALWAYS,
364 ("\tClient(0x%p): stopped after %u operations and %u bytes\n",
365 PR_GetCurrentThread(), client->operations, client->bytesTransferred));
367 } /* Client */
369 static PRStatus ProcessRequest(PRFileDesc *fd, CSServer_t *server)
370 {
371 PRStatus drv, rv;
372 char buffer[1024];
373 PRFileDesc *file = NULL;
374 PRThread * me = PR_GetCurrentThread();
375 PRInt32 bytes, descbytes, netbytes, filebytes = 0;
376 CSDescriptor_t *descriptor = PR_NEW(CSDescriptor_t);
377 PRIntervalTime timeout = PR_MillisecondsToInterval(DEFAULT_SERVER_TIMEOUT);
379 TEST_LOG(
380 cltsrv_log_file, TEST_LOG_VERBOSE,
381 ("\tProcessRequest(0x%p): receiving desciptor\n", me));
382 bytes = PR_Recv(
383 fd, descriptor, sizeof(*descriptor), RECV_FLAGS, timeout);
384 if (-1 == bytes)
385 {
386 rv = PR_FAILURE;
387 if (Aborted(rv)) goto exit;
388 if (PR_IO_TIMEOUT_ERROR == PR_GetError())
389 {
390 TEST_LOG(
391 cltsrv_log_file, TEST_LOG_ERROR,
392 ("\tProcessRequest(0x%p): receive timeout\n", me));
393 }
394 goto exit;
395 }
396 if (0 == bytes)
397 {
398 rv = PR_FAILURE;
399 TEST_LOG(
400 cltsrv_log_file, TEST_LOG_ERROR,
401 ("\tProcessRequest(0x%p): unexpected end of file\n", me));
402 goto exit;
403 }
404 descbytes = PR_ntohl(descriptor->size);
405 TEST_ASSERT(sizeof(*descriptor) == bytes);
407 TEST_LOG(
408 cltsrv_log_file, TEST_LOG_VERBOSE,
409 ("\t\tProcessRequest(0x%p): read descriptor {%d, %s}\n",
410 me, descbytes, descriptor->filename));
412 file = PR_Open(
413 descriptor->filename, (PR_CREATE_FILE | PR_WRONLY), 0666);
414 if (NULL == file)
415 {
416 rv = PR_FAILURE;
417 if (Aborted(rv)) goto aborted;
418 if (PR_IO_TIMEOUT_ERROR == PR_GetError())
419 {
420 TEST_LOG(
421 cltsrv_log_file, TEST_LOG_ERROR,
422 ("\tProcessRequest(0x%p): open file timeout\n", me));
423 goto aborted;
424 }
425 }
426 TEST_ASSERT(NULL != file);
428 filebytes = 0;
429 while (filebytes < descbytes)
430 {
431 netbytes = sizeof(buffer);
432 if ((descbytes - filebytes) < netbytes)
433 netbytes = descbytes - filebytes;
434 TEST_LOG(
435 cltsrv_log_file, TEST_LOG_VERBOSE,
436 ("\tProcessRequest(0x%p): receive %d bytes\n", me, netbytes));
437 bytes = PR_Recv(fd, buffer, netbytes, RECV_FLAGS, timeout);
438 if (-1 == bytes)
439 {
440 rv = PR_FAILURE;
441 if (Aborted(rv)) goto aborted;
442 if (PR_IO_TIMEOUT_ERROR == PR_GetError())
443 {
444 TEST_LOG(
445 cltsrv_log_file, TEST_LOG_ERROR,
446 ("\t\tProcessRequest(0x%p): receive data timeout\n", me));
447 goto aborted;
448 }
449 /*
450 * XXX: I got (PR_CONNECT_RESET_ERROR, ERROR_NETNAME_DELETED)
451 * on NT here. This is equivalent to ECONNRESET on Unix.
452 * -wtc
453 */
454 TEST_LOG(
455 cltsrv_log_file, TEST_LOG_WARNING,
456 ("\t\tProcessRequest(0x%p): unexpected error (%d, %d)\n",
457 me, PR_GetError(), PR_GetOSError()));
458 goto aborted;
459 }
460 if(0 == bytes)
461 {
462 TEST_LOG(
463 cltsrv_log_file, TEST_LOG_WARNING,
464 ("\t\tProcessRequest(0x%p): unexpected end of stream\n", me));
465 rv = PR_FAILURE;
466 goto aborted;
467 }
468 filebytes += bytes;
469 netbytes = bytes;
470 /* The byte count for PR_Write should be positive */
471 MY_ASSERT(netbytes > 0);
472 TEST_LOG(
473 cltsrv_log_file, TEST_LOG_VERBOSE,
474 ("\tProcessRequest(0x%p): write %d bytes to file\n", me, netbytes));
475 bytes = PR_Write(file, buffer, netbytes);
476 if (netbytes != bytes)
477 {
478 rv = PR_FAILURE;
479 if (Aborted(rv)) goto aborted;
480 if (PR_IO_TIMEOUT_ERROR == PR_GetError())
481 {
482 TEST_LOG(
483 cltsrv_log_file, TEST_LOG_ERROR,
484 ("\t\tProcessRequest(0x%p): write file timeout\n", me));
485 goto aborted;
486 }
487 }
488 TEST_ASSERT(bytes > 0);
489 }
491 PR_Lock(server->ml);
492 server->operations += 1;
493 server->bytesTransferred += filebytes;
494 PR_Unlock(server->ml);
496 rv = PR_Close(file);
497 if (Aborted(rv)) goto aborted;
498 TEST_ASSERT(PR_SUCCESS == rv);
499 file = NULL;
501 TEST_LOG(
502 cltsrv_log_file, TEST_LOG_VERBOSE,
503 ("\t\tProcessRequest(0x%p): opening %s\n", me, descriptor->filename));
504 file = PR_Open(descriptor->filename, PR_RDONLY, 0);
505 if (NULL == file)
506 {
507 rv = PR_FAILURE;
508 if (Aborted(rv)) goto aborted;
509 if (PR_IO_TIMEOUT_ERROR == PR_GetError())
510 {
511 TEST_LOG(
512 cltsrv_log_file, TEST_LOG_ERROR,
513 ("\t\tProcessRequest(0x%p): open file timeout\n",
514 PR_GetCurrentThread()));
515 goto aborted;
516 }
517 TEST_LOG(
518 cltsrv_log_file, TEST_LOG_ERROR,
519 ("\t\tProcessRequest(0x%p): other file open error (%u, %u)\n",
520 me, PR_GetError(), PR_GetOSError()));
521 goto aborted;
522 }
523 TEST_ASSERT(NULL != file);
525 netbytes = 0;
526 while (netbytes < descbytes)
527 {
528 filebytes = sizeof(buffer);
529 if ((descbytes - netbytes) < filebytes)
530 filebytes = descbytes - netbytes;
531 TEST_LOG(
532 cltsrv_log_file, TEST_LOG_VERBOSE,
533 ("\tProcessRequest(0x%p): read %d bytes from file\n", me, filebytes));
534 bytes = PR_Read(file, buffer, filebytes);
535 if (filebytes != bytes)
536 {
537 rv = PR_FAILURE;
538 if (Aborted(rv)) goto aborted;
539 if (PR_IO_TIMEOUT_ERROR == PR_GetError())
540 TEST_LOG(
541 cltsrv_log_file, TEST_LOG_ERROR,
542 ("\t\tProcessRequest(0x%p): read file timeout\n", me));
543 else
544 TEST_LOG(
545 cltsrv_log_file, TEST_LOG_ERROR,
546 ("\t\tProcessRequest(0x%p): other file error (%d, %d)\n",
547 me, PR_GetError(), PR_GetOSError()));
548 goto aborted;
549 }
550 TEST_ASSERT(bytes > 0);
551 netbytes += bytes;
552 filebytes = bytes;
553 TEST_LOG(
554 cltsrv_log_file, TEST_LOG_VERBOSE,
555 ("\t\tProcessRequest(0x%p): sending %d bytes\n", me, filebytes));
556 bytes = PR_Send(fd, buffer, filebytes, SEND_FLAGS, timeout);
557 if (filebytes != bytes)
558 {
559 rv = PR_FAILURE;
560 if (Aborted(rv)) goto aborted;
561 if (PR_IO_TIMEOUT_ERROR == PR_GetError())
562 {
563 TEST_LOG(
564 cltsrv_log_file, TEST_LOG_ERROR,
565 ("\t\tProcessRequest(0x%p): send data timeout\n", me));
566 goto aborted;
567 }
568 break;
569 }
570 TEST_ASSERT(bytes > 0);
571 }
573 PR_Lock(server->ml);
574 server->bytesTransferred += filebytes;
575 PR_Unlock(server->ml);
577 rv = PR_Shutdown(fd, PR_SHUTDOWN_BOTH);
578 if (Aborted(rv)) goto aborted;
580 rv = PR_Close(file);
581 if (Aborted(rv)) goto aborted;
582 TEST_ASSERT(PR_SUCCESS == rv);
583 file = NULL;
585 aborted:
586 PR_ClearInterrupt();
587 if (NULL != file) PR_Close(file);
588 drv = PR_Delete(descriptor->filename);
589 TEST_ASSERT(PR_SUCCESS == drv);
590 exit:
591 TEST_LOG(
592 cltsrv_log_file, TEST_LOG_VERBOSE,
593 ("\t\tProcessRequest(0x%p): Finished\n", me));
595 PR_DELETE(descriptor);
597 #if defined(WIN95)
598 PR_Sleep(PR_MillisecondsToInterval(200)); /* lth. see note [1] */
599 #endif
600 return rv;
601 } /* ProcessRequest */
603 static PRStatus CreateWorker(CSServer_t *server, CSPool_t *pool)
604 {
605 CSWorker_t *worker = PR_NEWZAP(CSWorker_t);
606 worker->server = server;
607 PR_INIT_CLIST(&worker->element);
608 worker->thread = PR_CreateThread(
609 PR_USER_THREAD, Worker, worker,
610 DEFAULT_SERVER_PRIORITY, thread_scope,
611 PR_UNJOINABLE_THREAD, 0);
612 if (NULL == worker->thread)
613 {
614 PR_DELETE(worker);
615 return PR_FAILURE;
616 }
618 TEST_LOG(cltsrv_log_file, TEST_LOG_STATUS,
619 ("\tCreateWorker(0x%p): create new worker (0x%p)\n",
620 PR_GetCurrentThread(), worker->thread));
622 return PR_SUCCESS;
623 } /* CreateWorker */
625 static void PR_CALLBACK Worker(void *arg)
626 {
627 PRStatus rv;
628 PRNetAddr from;
629 PRFileDesc *fd = NULL;
630 PRThread *me = PR_GetCurrentThread();
631 CSWorker_t *worker = (CSWorker_t*)arg;
632 CSServer_t *server = worker->server;
633 CSPool_t *pool = &server->pool;
635 TEST_LOG(
636 cltsrv_log_file, TEST_LOG_NOTICE,
637 ("\t\tWorker(0x%p): started [%u]\n", me, pool->workers + 1));
639 PR_Lock(server->ml);
640 PR_APPEND_LINK(&worker->element, &server->list);
641 pool->workers += 1; /* define our existance */
643 while (cs_run == server->state)
644 {
645 while (pool->accepting >= server->workers.accepting)
646 {
647 TEST_LOG(
648 cltsrv_log_file, TEST_LOG_VERBOSE,
649 ("\t\tWorker(0x%p): waiting for accept slot[%d]\n",
650 me, pool->accepting));
651 rv = PR_WaitCondVar(pool->acceptComplete, PR_INTERVAL_NO_TIMEOUT);
652 if (Aborted(rv) || (cs_run != server->state))
653 {
654 TEST_LOG(
655 cltsrv_log_file, TEST_LOG_NOTICE,
656 ("\tWorker(0x%p): has been %s\n",
657 me, (Aborted(rv) ? "interrupted" : "stopped")));
658 goto exit;
659 }
660 }
661 pool->accepting += 1; /* how many are really in accept */
662 PR_Unlock(server->ml);
664 TEST_LOG(
665 cltsrv_log_file, TEST_LOG_VERBOSE,
666 ("\t\tWorker(0x%p): calling accept\n", me));
667 fd = PR_Accept(server->listener, &from, PR_INTERVAL_NO_TIMEOUT);
669 PR_Lock(server->ml);
670 pool->accepting -= 1;
671 PR_NotifyCondVar(pool->acceptComplete);
673 if ((NULL == fd) && Aborted(PR_FAILURE))
674 {
675 if (NULL != server->listener)
676 {
677 PR_Close(server->listener);
678 server->listener = NULL;
679 }
680 goto exit;
681 }
683 if (NULL != fd)
684 {
685 /*
686 ** Create another worker of the total number of workers is
687 ** less than the minimum specified or we have none left in
688 ** accept() AND we're not over the maximum.
689 ** This sort of presumes that the number allowed in accept
690 ** is at least as many as the minimum. Otherwise we'll keep
691 ** creating new threads and deleting them soon after.
692 */
693 PRBool another =
694 ((pool->workers < server->workers.minimum) ||
695 ((0 == pool->accepting)
696 && (pool->workers < server->workers.maximum))) ?
697 PR_TRUE : PR_FALSE;
698 pool->active += 1;
699 PR_Unlock(server->ml);
701 if (another) (void)CreateWorker(server, pool);
703 rv = ProcessRequest(fd, server);
704 if (PR_SUCCESS != rv)
705 TEST_LOG(
706 cltsrv_log_file, TEST_LOG_ERROR,
707 ("\t\tWorker(0x%p): server process ended abnormally\n", me));
708 (void)PR_Close(fd); fd = NULL;
710 PR_Lock(server->ml);
711 pool->active -= 1;
712 }
713 }
715 exit:
716 PR_ClearInterrupt();
717 PR_Unlock(server->ml);
719 if (NULL != fd)
720 {
721 (void)PR_Shutdown(fd, PR_SHUTDOWN_BOTH);
722 (void)PR_Close(fd);
723 }
725 TEST_LOG(
726 cltsrv_log_file, TEST_LOG_NOTICE,
727 ("\t\tWorker(0x%p): exiting [%u]\n", PR_GetCurrentThread(), pool->workers));
729 PR_Lock(server->ml);
730 pool->workers -= 1; /* undefine our existance */
731 PR_REMOVE_AND_INIT_LINK(&worker->element);
732 PR_NotifyCondVar(pool->exiting);
733 PR_Unlock(server->ml);
735 PR_DELETE(worker); /* destruction of the "worker" object */
737 } /* Worker */
739 static void PR_CALLBACK Server(void *arg)
740 {
741 PRStatus rv;
742 PRNetAddr serverAddress;
743 PRThread *me = PR_GetCurrentThread();
744 CSServer_t *server = (CSServer_t*)arg;
745 PRSocketOptionData sockOpt;
747 server->listener = PR_Socket(domain, SOCK_STREAM, protocol);
749 sockOpt.option = PR_SockOpt_Reuseaddr;
750 sockOpt.value.reuse_addr = PR_TRUE;
751 rv = PR_SetSocketOption(server->listener, &sockOpt);
752 TEST_ASSERT(PR_SUCCESS == rv);
754 memset(&serverAddress, 0, sizeof(serverAddress));
755 if (PR_AF_INET6 != domain)
756 rv = PR_InitializeNetAddr(PR_IpAddrAny, DEFAULT_PORT, &serverAddress);
757 else
758 rv = PR_SetNetAddr(PR_IpAddrAny, PR_AF_INET6, DEFAULT_PORT,
759 &serverAddress);
760 rv = PR_Bind(server->listener, &serverAddress);
761 TEST_ASSERT(PR_SUCCESS == rv);
763 rv = PR_Listen(server->listener, server->backlog);
764 TEST_ASSERT(PR_SUCCESS == rv);
766 server->started = PR_IntervalNow();
767 TimeOfDayMessage("Server started at", me);
769 PR_Lock(server->ml);
770 server->state = cs_run;
771 PR_NotifyCondVar(server->stateChange);
772 PR_Unlock(server->ml);
774 /*
775 ** Create the first worker (actually, a thread that accepts
776 ** connections and then processes the work load as needed).
777 ** From this point on, additional worker threads are created
778 ** as they are needed by existing worker threads.
779 */
780 rv = CreateWorker(server, &server->pool);
781 TEST_ASSERT(PR_SUCCESS == rv);
783 /*
784 ** From here on this thread is merely hanging around as the contact
785 ** point for the main test driver. It's just waiting for the driver
786 ** to declare the test complete.
787 */
788 TEST_LOG(
789 cltsrv_log_file, TEST_LOG_VERBOSE,
790 ("\tServer(0x%p): waiting for state change\n", me));
792 PR_Lock(server->ml);
793 while ((cs_run == server->state) && !Aborted(rv))
794 {
795 rv = PR_WaitCondVar(server->stateChange, PR_INTERVAL_NO_TIMEOUT);
796 }
797 PR_Unlock(server->ml);
798 PR_ClearInterrupt();
800 TEST_LOG(
801 cltsrv_log_file, TEST_LOG_INFO,
802 ("\tServer(0x%p): shutting down workers\n", me));
804 /*
805 ** Get all the worker threads to exit. They know how to
806 ** clean up after themselves, so this is just a matter of
807 ** waiting for clorine in the pool to take effect. During
808 ** this stage we're ignoring interrupts.
809 */
810 server->workers.minimum = server->workers.maximum = 0;
812 PR_Lock(server->ml);
813 while (!PR_CLIST_IS_EMPTY(&server->list))
814 {
815 PRCList *head = PR_LIST_HEAD(&server->list);
816 CSWorker_t *worker = (CSWorker_t*)head;
817 TEST_LOG(
818 cltsrv_log_file, TEST_LOG_VERBOSE,
819 ("\tServer(0x%p): interrupting worker(0x%p)\n", me, worker));
820 rv = PR_Interrupt(worker->thread);
821 TEST_ASSERT(PR_SUCCESS == rv);
822 PR_REMOVE_AND_INIT_LINK(head);
823 }
825 while (server->pool.workers > 0)
826 {
827 TEST_LOG(
828 cltsrv_log_file, TEST_LOG_NOTICE,
829 ("\tServer(0x%p): waiting for %u workers to exit\n",
830 me, server->pool.workers));
831 (void)PR_WaitCondVar(server->pool.exiting, PR_INTERVAL_NO_TIMEOUT);
832 }
834 server->state = cs_exit;
835 PR_NotifyCondVar(server->stateChange);
836 PR_Unlock(server->ml);
838 TEST_LOG(
839 cltsrv_log_file, TEST_LOG_ALWAYS,
840 ("\tServer(0x%p): stopped after %u operations and %u bytes\n",
841 me, server->operations, server->bytesTransferred));
843 if (NULL != server->listener) PR_Close(server->listener);
844 server->stopped = PR_IntervalNow();
846 } /* Server */
848 static void WaitForCompletion(PRIntn execution)
849 {
850 while (execution > 0)
851 {
852 PRIntn dally = (execution > 30) ? 30 : execution;
853 PR_Sleep(PR_SecondsToInterval(dally));
854 if (pthread_stats) PT_FPrintStats(debug_out, "\nPThread Statistics\n");
855 execution -= dally;
856 }
857 } /* WaitForCompletion */
859 static void Help(void)
860 {
861 PR_fprintf(debug_out, "cltsrv test program usage:\n");
862 PR_fprintf(debug_out, "\t-a <n> threads allowed in accept (5)\n");
863 PR_fprintf(debug_out, "\t-b <n> backlock for listen (5)\n");
864 PR_fprintf(debug_out, "\t-c <threads> number of clients to create (1)\n");
865 PR_fprintf(debug_out, "\t-f <low> low water mark for fd caching (0)\n");
866 PR_fprintf(debug_out, "\t-F <high> high water mark for fd caching (0)\n");
867 PR_fprintf(debug_out, "\t-w <threads> minimal number of server threads (1)\n");
868 PR_fprintf(debug_out, "\t-W <threads> maximum number of server threads (1)\n");
869 PR_fprintf(debug_out, "\t-e <seconds> duration of the test in seconds (10)\n");
870 PR_fprintf(debug_out, "\t-s <string> dsn name of server (localhost)\n");
871 PR_fprintf(debug_out, "\t-G use GLOBAL threads (LOCAL)\n");
872 PR_fprintf(debug_out, "\t-X use XTP as transport (TCP)\n");
873 PR_fprintf(debug_out, "\t-6 Use IPv6 (IPv4)\n");
874 PR_fprintf(debug_out, "\t-v verbosity (accumulative) (0)\n");
875 PR_fprintf(debug_out, "\t-p pthread statistics (FALSE)\n");
876 PR_fprintf(debug_out, "\t-d debug mode (FALSE)\n");
877 PR_fprintf(debug_out, "\t-h this message\n");
878 } /* Help */
880 static Verbosity IncrementVerbosity(void)
881 {
882 PRIntn verboge = (PRIntn)verbosity + 1;
883 return (Verbosity)verboge;
884 } /* IncrementVerbosity */
886 int main(int argc, char** argv)
887 {
888 PRUintn index;
889 PRBool boolean;
890 CSClient_t *client;
891 PRStatus rv, joinStatus;
892 CSServer_t *server = NULL;
894 PRUintn backlog = DEFAULT_BACKLOG;
895 PRUintn clients = DEFAULT_CLIENTS;
896 const char *serverName = DEFAULT_SERVER;
897 PRBool serverIsLocal = PR_TRUE;
898 PRUintn accepting = ALLOWED_IN_ACCEPT;
899 PRUintn workersMin = DEFAULT_WORKERS_MIN;
900 PRUintn workersMax = DEFAULT_WORKERS_MAX;
901 PRIntn execution = DEFAULT_EXECUTION_TIME;
902 PRIntn low = DEFAULT_LOW, high = DEFAULT_HIGH;
904 /*
905 * -G use global threads
906 * -a <n> threads allowed in accept
907 * -b <n> backlock for listen
908 * -c <threads> number of clients to create
909 * -f <low> low water mark for caching FDs
910 * -F <high> high water mark for caching FDs
911 * -w <threads> minimal number of server threads
912 * -W <threads> maximum number of server threads
913 * -e <seconds> duration of the test in seconds
914 * -s <string> dsn name of server (implies no server here)
915 * -v verbosity
916 */
918 PLOptStatus os;
919 PLOptState *opt = PL_CreateOptState(argc, argv, "GX6b:a:c:f:F:w:W:e:s:vdhp");
921 debug_out = PR_GetSpecialFD(PR_StandardError);
923 while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))
924 {
925 if (PL_OPT_BAD == os) continue;
926 switch (opt->option)
927 {
928 case 'G': /* use global threads */
929 thread_scope = PR_GLOBAL_THREAD;
930 break;
931 case 'X': /* use XTP as transport */
932 protocol = 36;
933 break;
934 case '6': /* Use IPv6 */
935 domain = PR_AF_INET6;
936 break;
937 case 'a': /* the value for accepting */
938 accepting = atoi(opt->value);
939 break;
940 case 'b': /* the value for backlock */
941 backlog = atoi(opt->value);
942 break;
943 case 'c': /* number of client threads */
944 clients = atoi(opt->value);
945 break;
946 case 'f': /* low water fd cache */
947 low = atoi(opt->value);
948 break;
949 case 'F': /* low water fd cache */
950 high = atoi(opt->value);
951 break;
952 case 'w': /* minimum server worker threads */
953 workersMin = atoi(opt->value);
954 break;
955 case 'W': /* maximum server worker threads */
956 workersMax = atoi(opt->value);
957 break;
958 case 'e': /* program execution time in seconds */
959 execution = atoi(opt->value);
960 break;
961 case 's': /* server's address */
962 serverName = opt->value;
963 break;
964 case 'v': /* verbosity */
965 verbosity = IncrementVerbosity();
966 break;
967 case 'd': /* debug mode */
968 debug_mode = PR_TRUE;
969 break;
970 case 'p': /* pthread mode */
971 pthread_stats = PR_TRUE;
972 break;
973 case 'h':
974 default:
975 Help();
976 return 2;
977 }
978 }
979 PL_DestroyOptState(opt);
981 if (0 != PL_strcmp(serverName, DEFAULT_SERVER)) serverIsLocal = PR_FALSE;
982 if (0 == execution) execution = DEFAULT_EXECUTION_TIME;
983 if (0 == workersMax) workersMax = DEFAULT_WORKERS_MAX;
984 if (0 == workersMin) workersMin = DEFAULT_WORKERS_MIN;
985 if (0 == accepting) accepting = ALLOWED_IN_ACCEPT;
986 if (0 == backlog) backlog = DEFAULT_BACKLOG;
988 if (workersMin > accepting) accepting = workersMin;
990 PR_STDIO_INIT();
991 TimeOfDayMessage("Client/Server started at", PR_GetCurrentThread());
993 cltsrv_log_file = PR_NewLogModule("cltsrv_log");
994 MY_ASSERT(NULL != cltsrv_log_file);
995 boolean = PR_SetLogFile("cltsrv.log");
996 MY_ASSERT(boolean);
998 rv = PR_SetFDCacheSize(low, high);
999 PR_ASSERT(PR_SUCCESS == rv);
1001 if (serverIsLocal)
1002 {
1003 /* Establish the server */
1004 TEST_LOG(
1005 cltsrv_log_file, TEST_LOG_INFO,
1006 ("main(0x%p): starting server\n", PR_GetCurrentThread()));
1008 server = PR_NEWZAP(CSServer_t);
1009 PR_INIT_CLIST(&server->list);
1010 server->state = cs_init;
1011 server->ml = PR_NewLock();
1012 server->backlog = backlog;
1013 server->port = DEFAULT_PORT;
1014 server->workers.minimum = workersMin;
1015 server->workers.maximum = workersMax;
1016 server->workers.accepting = accepting;
1017 server->stateChange = PR_NewCondVar(server->ml);
1018 server->pool.exiting = PR_NewCondVar(server->ml);
1019 server->pool.acceptComplete = PR_NewCondVar(server->ml);
1021 TEST_LOG(
1022 cltsrv_log_file, TEST_LOG_NOTICE,
1023 ("main(0x%p): creating server thread\n", PR_GetCurrentThread()));
1025 server->thread = PR_CreateThread(
1026 PR_USER_THREAD, Server, server, PR_PRIORITY_HIGH,
1027 thread_scope, PR_JOINABLE_THREAD, 0);
1028 TEST_ASSERT(NULL != server->thread);
1030 TEST_LOG(
1031 cltsrv_log_file, TEST_LOG_VERBOSE,
1032 ("main(0x%p): waiting for server init\n", PR_GetCurrentThread()));
1034 PR_Lock(server->ml);
1035 while (server->state == cs_init)
1036 PR_WaitCondVar(server->stateChange, PR_INTERVAL_NO_TIMEOUT);
1037 PR_Unlock(server->ml);
1039 TEST_LOG(
1040 cltsrv_log_file, TEST_LOG_VERBOSE,
1041 ("main(0x%p): server init complete (port #%d)\n",
1042 PR_GetCurrentThread(), server->port));
1043 }
1045 if (clients != 0)
1046 {
1047 /* Create all of the clients */
1048 PRHostEnt host;
1049 char buffer[BUFFER_SIZE];
1050 client = (CSClient_t*)PR_CALLOC(clients * sizeof(CSClient_t));
1052 TEST_LOG(
1053 cltsrv_log_file, TEST_LOG_VERBOSE,
1054 ("main(0x%p): creating %d client threads\n",
1055 PR_GetCurrentThread(), clients));
1057 if (!serverIsLocal)
1058 {
1059 rv = PR_GetHostByName(serverName, buffer, BUFFER_SIZE, &host);
1060 if (PR_SUCCESS != rv)
1061 {
1062 PL_FPrintError(PR_STDERR, "PR_GetHostByName");
1063 return 2;
1064 }
1065 }
1067 for (index = 0; index < clients; ++index)
1068 {
1069 client[index].state = cs_init;
1070 client[index].ml = PR_NewLock();
1071 if (serverIsLocal)
1072 {
1073 if (PR_AF_INET6 != domain)
1074 (void)PR_InitializeNetAddr(
1075 PR_IpAddrLoopback, DEFAULT_PORT,
1076 &client[index].serverAddress);
1077 else
1078 rv = PR_SetNetAddr(PR_IpAddrLoopback, PR_AF_INET6,
1079 DEFAULT_PORT, &client[index].serverAddress);
1080 }
1081 else
1082 {
1083 (void)PR_EnumerateHostEnt(
1084 0, &host, DEFAULT_PORT, &client[index].serverAddress);
1085 }
1086 client[index].stateChange = PR_NewCondVar(client[index].ml);
1087 TEST_LOG(
1088 cltsrv_log_file, TEST_LOG_INFO,
1089 ("main(0x%p): creating client threads\n", PR_GetCurrentThread()));
1090 client[index].thread = PR_CreateThread(
1091 PR_USER_THREAD, Client, &client[index], PR_PRIORITY_NORMAL,
1092 thread_scope, PR_JOINABLE_THREAD, 0);
1093 TEST_ASSERT(NULL != client[index].thread);
1094 PR_Lock(client[index].ml);
1095 while (cs_init == client[index].state)
1096 PR_WaitCondVar(client[index].stateChange, PR_INTERVAL_NO_TIMEOUT);
1097 PR_Unlock(client[index].ml);
1098 }
1099 }
1101 /* Then just let them go at it for a bit */
1102 TEST_LOG(
1103 cltsrv_log_file, TEST_LOG_ALWAYS,
1104 ("main(0x%p): waiting for execution interval (%d seconds)\n",
1105 PR_GetCurrentThread(), execution));
1107 WaitForCompletion(execution);
1109 TimeOfDayMessage("Shutting down", PR_GetCurrentThread());
1111 if (clients != 0)
1112 {
1113 for (index = 0; index < clients; ++index)
1114 {
1115 TEST_LOG(cltsrv_log_file, TEST_LOG_STATUS,
1116 ("main(0x%p): notifying client(0x%p) to stop\n",
1117 PR_GetCurrentThread(), client[index].thread));
1119 PR_Lock(client[index].ml);
1120 if (cs_run == client[index].state)
1121 {
1122 client[index].state = cs_stop;
1123 PR_Interrupt(client[index].thread);
1124 while (cs_stop == client[index].state)
1125 PR_WaitCondVar(
1126 client[index].stateChange, PR_INTERVAL_NO_TIMEOUT);
1127 }
1128 PR_Unlock(client[index].ml);
1130 TEST_LOG(cltsrv_log_file, TEST_LOG_VERBOSE,
1131 ("main(0x%p): joining client(0x%p)\n",
1132 PR_GetCurrentThread(), client[index].thread));
1134 joinStatus = PR_JoinThread(client[index].thread);
1135 TEST_ASSERT(PR_SUCCESS == joinStatus);
1136 PR_DestroyCondVar(client[index].stateChange);
1137 PR_DestroyLock(client[index].ml);
1138 }
1139 PR_DELETE(client);
1140 }
1142 if (NULL != server)
1143 {
1144 /* All clients joined - retrieve the server */
1145 TEST_LOG(
1146 cltsrv_log_file, TEST_LOG_NOTICE,
1147 ("main(0x%p): notifying server(0x%p) to stop\n",
1148 PR_GetCurrentThread(), server->thread));
1150 PR_Lock(server->ml);
1151 server->state = cs_stop;
1152 PR_Interrupt(server->thread);
1153 while (cs_exit != server->state)
1154 PR_WaitCondVar(server->stateChange, PR_INTERVAL_NO_TIMEOUT);
1155 PR_Unlock(server->ml);
1157 TEST_LOG(
1158 cltsrv_log_file, TEST_LOG_NOTICE,
1159 ("main(0x%p): joining server(0x%p)\n",
1160 PR_GetCurrentThread(), server->thread));
1161 joinStatus = PR_JoinThread(server->thread);
1162 TEST_ASSERT(PR_SUCCESS == joinStatus);
1164 PR_DestroyCondVar(server->stateChange);
1165 PR_DestroyCondVar(server->pool.exiting);
1166 PR_DestroyCondVar(server->pool.acceptComplete);
1167 PR_DestroyLock(server->ml);
1168 PR_DELETE(server);
1169 }
1171 TEST_LOG(
1172 cltsrv_log_file, TEST_LOG_ALWAYS,
1173 ("main(0x%p): test complete\n", PR_GetCurrentThread()));
1175 PT_FPrintStats(debug_out, "\nPThread Statistics\n");
1177 TimeOfDayMessage("Test exiting at", PR_GetCurrentThread());
1178 PR_Cleanup();
1179 return 0;
1180 } /* main */
1182 /* cltsrv.c */