|
1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ |
|
2 /* This Source Code Form is subject to the terms of the Mozilla Public |
|
3 * License, v. 2.0. If a copy of the MPL was not distributed with this |
|
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ |
|
5 |
|
6 #include "prio.h" |
|
7 #include "prprf.h" |
|
8 #include "prlog.h" |
|
9 #include "prmem.h" |
|
10 #include "pratom.h" |
|
11 #include "prlock.h" |
|
12 #include "prmwait.h" |
|
13 #include "prclist.h" |
|
14 #include "prerror.h" |
|
15 #include "prinrval.h" |
|
16 #include "prnetdb.h" |
|
17 #include "prthread.h" |
|
18 |
|
19 #include "plstr.h" |
|
20 #include "plerror.h" |
|
21 #include "plgetopt.h" |
|
22 |
|
23 #include <string.h> |
|
24 |
|
25 typedef struct Shared |
|
26 { |
|
27 const char *title; |
|
28 PRLock *list_lock; |
|
29 PRWaitGroup *group; |
|
30 PRIntervalTime timeout; |
|
31 } Shared; |
|
32 |
|
33 typedef enum Verbosity {silent, quiet, chatty, noisy} Verbosity; |
|
34 |
|
35 static PRFileDesc *debug = NULL; |
|
36 static PRInt32 desc_allocated = 0; |
|
37 static PRUint16 default_port = 12273; |
|
38 static enum Verbosity verbosity = quiet; |
|
39 static PRInt32 ops_required = 1000, ops_done = 0; |
|
40 static PRThreadScope thread_scope = PR_LOCAL_THREAD; |
|
41 static PRIntn client_threads = 20, worker_threads = 2, wait_objects = 50; |
|
42 |
|
43 #if defined(DEBUG) |
|
44 #define MW_ASSERT(_expr) \ |
|
45 ((_expr)?((void)0):_MW_Assert(# _expr,__FILE__,__LINE__)) |
|
46 static void _MW_Assert(const char *s, const char *file, PRIntn ln) |
|
47 { |
|
48 if (NULL != debug) PL_FPrintError(debug, NULL); |
|
49 PR_Assert(s, file, ln); |
|
50 } /* _MW_Assert */ |
|
51 #else |
|
52 #define MW_ASSERT(_expr) |
|
53 #endif |
|
54 |
|
55 static void PrintRecvDesc(PRRecvWait *desc, const char *msg) |
|
56 { |
|
57 const char *tag[] = { |
|
58 "PR_MW_INTERRUPT", "PR_MW_TIMEOUT", |
|
59 "PR_MW_FAILURE", "PR_MW_SUCCESS", "PR_MW_PENDING"}; |
|
60 PR_fprintf( |
|
61 debug, "%s: PRRecvWait(@0x%x): {fd: 0x%x, outcome: %s, tmo: %u}\n", |
|
62 msg, desc, desc->fd, tag[desc->outcome + 3], desc->timeout); |
|
63 } /* PrintRecvDesc */ |
|
64 |
|
65 static Shared *MakeShared(const char *title) |
|
66 { |
|
67 Shared *shared = PR_NEWZAP(Shared); |
|
68 shared->group = PR_CreateWaitGroup(1); |
|
69 shared->timeout = PR_SecondsToInterval(1); |
|
70 shared->list_lock = PR_NewLock(); |
|
71 shared->title = title; |
|
72 return shared; |
|
73 } /* MakeShared */ |
|
74 |
|
75 static void DestroyShared(Shared *shared) |
|
76 { |
|
77 PRStatus rv; |
|
78 if (verbosity > quiet) |
|
79 PR_fprintf(debug, "%s: destroying group\n", shared->title); |
|
80 rv = PR_DestroyWaitGroup(shared->group); |
|
81 MW_ASSERT(PR_SUCCESS == rv); |
|
82 PR_DestroyLock(shared->list_lock); |
|
83 PR_DELETE(shared); |
|
84 } /* DestroyShared */ |
|
85 |
|
86 static PRRecvWait *CreateRecvWait(PRFileDesc *fd, PRIntervalTime timeout) |
|
87 { |
|
88 PRRecvWait *desc_out = PR_NEWZAP(PRRecvWait); |
|
89 MW_ASSERT(NULL != desc_out); |
|
90 |
|
91 MW_ASSERT(NULL != fd); |
|
92 desc_out->fd = fd; |
|
93 desc_out->timeout = timeout; |
|
94 desc_out->buffer.length = 120; |
|
95 desc_out->buffer.start = PR_CALLOC(120); |
|
96 |
|
97 PR_AtomicIncrement(&desc_allocated); |
|
98 |
|
99 if (verbosity > chatty) |
|
100 PrintRecvDesc(desc_out, "Allocated"); |
|
101 return desc_out; |
|
102 } /* CreateRecvWait */ |
|
103 |
|
104 static void DestroyRecvWait(PRRecvWait *desc_out) |
|
105 { |
|
106 if (verbosity > chatty) |
|
107 PrintRecvDesc(desc_out, "Destroying"); |
|
108 PR_Close(desc_out->fd); |
|
109 if (NULL != desc_out->buffer.start) |
|
110 PR_DELETE(desc_out->buffer.start); |
|
111 PR_Free(desc_out); |
|
112 (void)PR_AtomicDecrement(&desc_allocated); |
|
113 } /* DestroyRecvWait */ |
|
114 |
|
115 static void CancelGroup(Shared *shared) |
|
116 { |
|
117 PRRecvWait *desc_out; |
|
118 |
|
119 if (verbosity > quiet) |
|
120 PR_fprintf(debug, "%s Reclaiming wait descriptors\n", shared->title); |
|
121 |
|
122 do |
|
123 { |
|
124 desc_out = PR_CancelWaitGroup(shared->group); |
|
125 if (NULL != desc_out) DestroyRecvWait(desc_out); |
|
126 } while (NULL != desc_out); |
|
127 |
|
128 MW_ASSERT(0 == desc_allocated); |
|
129 MW_ASSERT(PR_GROUP_EMPTY_ERROR == PR_GetError()); |
|
130 } /* CancelGroup */ |
|
131 |
|
132 static void PR_CALLBACK ClientThread(void* arg) |
|
133 { |
|
134 PRStatus rv; |
|
135 PRInt32 bytes; |
|
136 PRIntn empty_flags = 0; |
|
137 PRNetAddr server_address; |
|
138 unsigned char buffer[100]; |
|
139 Shared *shared = (Shared*)arg; |
|
140 PRFileDesc *server = PR_NewTCPSocket(); |
|
141 if ((NULL == server) |
|
142 && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) return; |
|
143 MW_ASSERT(NULL != server); |
|
144 |
|
145 if (verbosity > chatty) |
|
146 PR_fprintf(debug, "%s: Server socket @0x%x\n", shared->title, server); |
|
147 |
|
148 /* Initialize the buffer so that Purify won't complain */ |
|
149 memset(buffer, 0, sizeof(buffer)); |
|
150 |
|
151 rv = PR_InitializeNetAddr(PR_IpAddrLoopback, default_port, &server_address); |
|
152 MW_ASSERT(PR_SUCCESS == rv); |
|
153 |
|
154 if (verbosity > quiet) |
|
155 PR_fprintf(debug, "%s: Client opening connection\n", shared->title); |
|
156 rv = PR_Connect(server, &server_address, PR_INTERVAL_NO_TIMEOUT); |
|
157 |
|
158 if (PR_FAILURE == rv) |
|
159 { |
|
160 if (verbosity > silent) PL_FPrintError(debug, "Client connect failed"); |
|
161 return; |
|
162 } |
|
163 |
|
164 while (ops_done < ops_required) |
|
165 { |
|
166 bytes = PR_Send( |
|
167 server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT); |
|
168 if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break; |
|
169 MW_ASSERT(sizeof(buffer) == bytes); |
|
170 if (verbosity > chatty) |
|
171 PR_fprintf( |
|
172 debug, "%s: Client sent %d bytes\n", |
|
173 shared->title, sizeof(buffer)); |
|
174 bytes = PR_Recv( |
|
175 server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT); |
|
176 if (verbosity > chatty) |
|
177 PR_fprintf( |
|
178 debug, "%s: Client received %d bytes\n", |
|
179 shared->title, sizeof(buffer)); |
|
180 if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break; |
|
181 MW_ASSERT(sizeof(buffer) == bytes); |
|
182 PR_Sleep(shared->timeout); |
|
183 } |
|
184 rv = PR_Close(server); |
|
185 MW_ASSERT(PR_SUCCESS == rv); |
|
186 |
|
187 } /* ClientThread */ |
|
188 |
|
189 static void OneInThenCancelled(Shared *shared) |
|
190 { |
|
191 PRStatus rv; |
|
192 PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait); |
|
193 |
|
194 shared->timeout = PR_INTERVAL_NO_TIMEOUT; |
|
195 |
|
196 desc_in->fd = PR_NewTCPSocket(); |
|
197 desc_in->timeout = shared->timeout; |
|
198 |
|
199 if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc"); |
|
200 |
|
201 rv = PR_AddWaitFileDesc(shared->group, desc_in); |
|
202 MW_ASSERT(PR_SUCCESS == rv); |
|
203 |
|
204 if (verbosity > chatty) PrintRecvDesc(desc_in, "Cancelling"); |
|
205 rv = PR_CancelWaitFileDesc(shared->group, desc_in); |
|
206 MW_ASSERT(PR_SUCCESS == rv); |
|
207 |
|
208 desc_out = PR_WaitRecvReady(shared->group); |
|
209 MW_ASSERT(desc_out == desc_in); |
|
210 MW_ASSERT(PR_MW_INTERRUPT == desc_out->outcome); |
|
211 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); |
|
212 if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready"); |
|
213 |
|
214 rv = PR_Close(desc_in->fd); |
|
215 MW_ASSERT(PR_SUCCESS == rv); |
|
216 |
|
217 if (verbosity > quiet) |
|
218 PR_fprintf(debug, "%s: destroying group\n", shared->title); |
|
219 |
|
220 PR_DELETE(desc_in); |
|
221 } /* OneInThenCancelled */ |
|
222 |
|
223 static void OneOpOneThread(Shared *shared) |
|
224 { |
|
225 PRStatus rv; |
|
226 PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait); |
|
227 |
|
228 desc_in->fd = PR_NewTCPSocket(); |
|
229 desc_in->timeout = shared->timeout; |
|
230 |
|
231 if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc"); |
|
232 |
|
233 rv = PR_AddWaitFileDesc(shared->group, desc_in); |
|
234 MW_ASSERT(PR_SUCCESS == rv); |
|
235 desc_out = PR_WaitRecvReady(shared->group); |
|
236 MW_ASSERT(desc_out == desc_in); |
|
237 MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome); |
|
238 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError()); |
|
239 if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready"); |
|
240 |
|
241 rv = PR_Close(desc_in->fd); |
|
242 MW_ASSERT(PR_SUCCESS == rv); |
|
243 |
|
244 PR_DELETE(desc_in); |
|
245 } /* OneOpOneThread */ |
|
246 |
|
247 static void ManyOpOneThread(Shared *shared) |
|
248 { |
|
249 PRStatus rv; |
|
250 PRIntn index; |
|
251 PRRecvWait *desc_in; |
|
252 PRRecvWait *desc_out; |
|
253 |
|
254 if (verbosity > quiet) |
|
255 PR_fprintf(debug, "%s: adding %d descs\n", shared->title, wait_objects); |
|
256 |
|
257 for (index = 0; index < wait_objects; ++index) |
|
258 { |
|
259 desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout); |
|
260 |
|
261 rv = PR_AddWaitFileDesc(shared->group, desc_in); |
|
262 MW_ASSERT(PR_SUCCESS == rv); |
|
263 } |
|
264 |
|
265 while (ops_done < ops_required) |
|
266 { |
|
267 desc_out = PR_WaitRecvReady(shared->group); |
|
268 MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome); |
|
269 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError()); |
|
270 if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready/readding"); |
|
271 rv = PR_AddWaitFileDesc(shared->group, desc_out); |
|
272 MW_ASSERT(PR_SUCCESS == rv); |
|
273 (void)PR_AtomicIncrement(&ops_done); |
|
274 } |
|
275 |
|
276 CancelGroup(shared); |
|
277 } /* ManyOpOneThread */ |
|
278 |
|
279 static void PR_CALLBACK SomeOpsThread(void *arg) |
|
280 { |
|
281 PRRecvWait *desc_out; |
|
282 PRStatus rv = PR_SUCCESS; |
|
283 Shared *shared = (Shared*)arg; |
|
284 do /* until interrupted */ |
|
285 { |
|
286 desc_out = PR_WaitRecvReady(shared->group); |
|
287 if (NULL == desc_out) |
|
288 { |
|
289 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); |
|
290 if (verbosity > quiet) PR_fprintf(debug, "Aborted\n"); |
|
291 break; |
|
292 } |
|
293 MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome); |
|
294 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError()); |
|
295 if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready"); |
|
296 |
|
297 if (verbosity > chatty) PrintRecvDesc(desc_out, "Re-Adding"); |
|
298 desc_out->timeout = shared->timeout; |
|
299 rv = PR_AddWaitFileDesc(shared->group, desc_out); |
|
300 PR_AtomicIncrement(&ops_done); |
|
301 if (ops_done > ops_required) break; |
|
302 } while (PR_SUCCESS == rv); |
|
303 MW_ASSERT(PR_SUCCESS == rv); |
|
304 } /* SomeOpsThread */ |
|
305 |
|
306 static void SomeOpsSomeThreads(Shared *shared) |
|
307 { |
|
308 PRStatus rv; |
|
309 PRThread **thread; |
|
310 PRIntn index; |
|
311 PRRecvWait *desc_in; |
|
312 |
|
313 thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads); |
|
314 |
|
315 /* Create some threads */ |
|
316 |
|
317 if (verbosity > quiet) |
|
318 PR_fprintf(debug, "%s: creating threads\n", shared->title); |
|
319 for (index = 0; index < worker_threads; ++index) |
|
320 { |
|
321 thread[index] = PR_CreateThread( |
|
322 PR_USER_THREAD, SomeOpsThread, shared, |
|
323 PR_PRIORITY_HIGH, thread_scope, |
|
324 PR_JOINABLE_THREAD, 16 * 1024); |
|
325 } |
|
326 |
|
327 /* then create some operations */ |
|
328 if (verbosity > quiet) |
|
329 PR_fprintf(debug, "%s: creating desc\n", shared->title); |
|
330 for (index = 0; index < wait_objects; ++index) |
|
331 { |
|
332 desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout); |
|
333 rv = PR_AddWaitFileDesc(shared->group, desc_in); |
|
334 MW_ASSERT(PR_SUCCESS == rv); |
|
335 } |
|
336 |
|
337 if (verbosity > quiet) |
|
338 PR_fprintf(debug, "%s: sleeping\n", shared->title); |
|
339 while (ops_done < ops_required) PR_Sleep(shared->timeout); |
|
340 |
|
341 if (verbosity > quiet) |
|
342 PR_fprintf(debug, "%s: interrupting/joining threads\n", shared->title); |
|
343 for (index = 0; index < worker_threads; ++index) |
|
344 { |
|
345 rv = PR_Interrupt(thread[index]); |
|
346 MW_ASSERT(PR_SUCCESS == rv); |
|
347 rv = PR_JoinThread(thread[index]); |
|
348 MW_ASSERT(PR_SUCCESS == rv); |
|
349 } |
|
350 PR_DELETE(thread); |
|
351 |
|
352 CancelGroup(shared); |
|
353 } /* SomeOpsSomeThreads */ |
|
354 |
|
355 static PRStatus ServiceRequest(Shared *shared, PRRecvWait *desc) |
|
356 { |
|
357 PRInt32 bytes_out; |
|
358 |
|
359 if (verbosity > chatty) |
|
360 PR_fprintf( |
|
361 debug, "%s: Service received %d bytes\n", |
|
362 shared->title, desc->bytesRecv); |
|
363 |
|
364 if (0 == desc->bytesRecv) goto quitting; |
|
365 if ((-1 == desc->bytesRecv) |
|
366 && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted; |
|
367 |
|
368 bytes_out = PR_Send( |
|
369 desc->fd, desc->buffer.start, desc->bytesRecv, 0, shared->timeout); |
|
370 if (verbosity > chatty) |
|
371 PR_fprintf( |
|
372 debug, "%s: Service sent %d bytes\n", |
|
373 shared->title, bytes_out); |
|
374 |
|
375 if ((-1 == bytes_out) |
|
376 && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted; |
|
377 MW_ASSERT(bytes_out == desc->bytesRecv); |
|
378 |
|
379 return PR_SUCCESS; |
|
380 |
|
381 aborted: |
|
382 quitting: |
|
383 return PR_FAILURE; |
|
384 } /* ServiceRequest */ |
|
385 |
|
386 static void PR_CALLBACK ServiceThread(void *arg) |
|
387 { |
|
388 PRStatus rv = PR_SUCCESS; |
|
389 PRRecvWait *desc_out = NULL; |
|
390 Shared *shared = (Shared*)arg; |
|
391 do /* until interrupted */ |
|
392 { |
|
393 if (NULL != desc_out) |
|
394 { |
|
395 desc_out->timeout = PR_INTERVAL_NO_TIMEOUT; |
|
396 if (verbosity > chatty) |
|
397 PrintRecvDesc(desc_out, "Service re-adding"); |
|
398 rv = PR_AddWaitFileDesc(shared->group, desc_out); |
|
399 MW_ASSERT(PR_SUCCESS == rv); |
|
400 } |
|
401 |
|
402 desc_out = PR_WaitRecvReady(shared->group); |
|
403 if (NULL == desc_out) |
|
404 { |
|
405 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); |
|
406 break; |
|
407 } |
|
408 |
|
409 switch (desc_out->outcome) |
|
410 { |
|
411 case PR_MW_SUCCESS: |
|
412 { |
|
413 PR_AtomicIncrement(&ops_done); |
|
414 if (verbosity > chatty) |
|
415 PrintRecvDesc(desc_out, "Service ready"); |
|
416 rv = ServiceRequest(shared, desc_out); |
|
417 break; |
|
418 } |
|
419 case PR_MW_INTERRUPT: |
|
420 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); |
|
421 rv = PR_FAILURE; /* if interrupted, then exit */ |
|
422 break; |
|
423 case PR_MW_TIMEOUT: |
|
424 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError()); |
|
425 case PR_MW_FAILURE: |
|
426 if (verbosity > silent) |
|
427 PL_FPrintError(debug, "RecvReady failure"); |
|
428 break; |
|
429 default: |
|
430 break; |
|
431 } |
|
432 } while (PR_SUCCESS == rv); |
|
433 |
|
434 if (NULL != desc_out) DestroyRecvWait(desc_out); |
|
435 |
|
436 } /* ServiceThread */ |
|
437 |
|
438 static void PR_CALLBACK EnumerationThread(void *arg) |
|
439 { |
|
440 PRStatus rv; |
|
441 PRIntn count; |
|
442 PRRecvWait *desc; |
|
443 Shared *shared = (Shared*)arg; |
|
444 PRIntervalTime five_seconds = PR_SecondsToInterval(5); |
|
445 PRMWaitEnumerator *enumerator = PR_CreateMWaitEnumerator(shared->group); |
|
446 MW_ASSERT(NULL != enumerator); |
|
447 |
|
448 while (PR_SUCCESS == PR_Sleep(five_seconds)) |
|
449 { |
|
450 count = 0; |
|
451 desc = NULL; |
|
452 while (NULL != (desc = PR_EnumerateWaitGroup(enumerator, desc))) |
|
453 { |
|
454 if (verbosity > chatty) PrintRecvDesc(desc, shared->title); |
|
455 count += 1; |
|
456 } |
|
457 if (verbosity > silent) |
|
458 PR_fprintf(debug, |
|
459 "%s Enumerated %d objects\n", shared->title, count); |
|
460 } |
|
461 |
|
462 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); |
|
463 |
|
464 |
|
465 rv = PR_DestroyMWaitEnumerator(enumerator); |
|
466 MW_ASSERT(PR_SUCCESS == rv); |
|
467 } /* EnumerationThread */ |
|
468 |
|
469 static void PR_CALLBACK ServerThread(void *arg) |
|
470 { |
|
471 PRStatus rv; |
|
472 PRIntn index; |
|
473 PRRecvWait *desc_in; |
|
474 PRThread **worker_thread; |
|
475 Shared *shared = (Shared*)arg; |
|
476 PRFileDesc *listener, *service; |
|
477 PRNetAddr server_address, client_address; |
|
478 |
|
479 worker_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads); |
|
480 if (verbosity > quiet) |
|
481 PR_fprintf(debug, "%s: Server creating worker_threads\n", shared->title); |
|
482 for (index = 0; index < worker_threads; ++index) |
|
483 { |
|
484 worker_thread[index] = PR_CreateThread( |
|
485 PR_USER_THREAD, ServiceThread, shared, |
|
486 PR_PRIORITY_HIGH, thread_scope, |
|
487 PR_JOINABLE_THREAD, 16 * 1024); |
|
488 } |
|
489 |
|
490 rv = PR_InitializeNetAddr(PR_IpAddrAny, default_port, &server_address); |
|
491 MW_ASSERT(PR_SUCCESS == rv); |
|
492 |
|
493 listener = PR_NewTCPSocket(); MW_ASSERT(NULL != listener); |
|
494 if (verbosity > chatty) |
|
495 PR_fprintf( |
|
496 debug, "%s: Server listener socket @0x%x\n", |
|
497 shared->title, listener); |
|
498 rv = PR_Bind(listener, &server_address); MW_ASSERT(PR_SUCCESS == rv); |
|
499 rv = PR_Listen(listener, 10); MW_ASSERT(PR_SUCCESS == rv); |
|
500 while (ops_done < ops_required) |
|
501 { |
|
502 if (verbosity > quiet) |
|
503 PR_fprintf(debug, "%s: Server accepting connection\n", shared->title); |
|
504 service = PR_Accept(listener, &client_address, PR_INTERVAL_NO_TIMEOUT); |
|
505 if (NULL == service) |
|
506 { |
|
507 if (PR_PENDING_INTERRUPT_ERROR == PR_GetError()) break; |
|
508 PL_PrintError("Accept failed"); |
|
509 MW_ASSERT(!"Accept failed"); |
|
510 } |
|
511 else |
|
512 { |
|
513 desc_in = CreateRecvWait(service, shared->timeout); |
|
514 desc_in->timeout = PR_INTERVAL_NO_TIMEOUT; |
|
515 if (verbosity > chatty) |
|
516 PrintRecvDesc(desc_in, "Service adding"); |
|
517 rv = PR_AddWaitFileDesc(shared->group, desc_in); |
|
518 MW_ASSERT(PR_SUCCESS == rv); |
|
519 } |
|
520 } |
|
521 |
|
522 if (verbosity > quiet) |
|
523 PR_fprintf(debug, "%s: Server interrupting worker_threads\n", shared->title); |
|
524 for (index = 0; index < worker_threads; ++index) |
|
525 { |
|
526 rv = PR_Interrupt(worker_thread[index]); |
|
527 MW_ASSERT(PR_SUCCESS == rv); |
|
528 rv = PR_JoinThread(worker_thread[index]); |
|
529 MW_ASSERT(PR_SUCCESS == rv); |
|
530 } |
|
531 PR_DELETE(worker_thread); |
|
532 |
|
533 PR_Close(listener); |
|
534 |
|
535 CancelGroup(shared); |
|
536 |
|
537 } /* ServerThread */ |
|
538 |
|
539 static void RealOneGroupIO(Shared *shared) |
|
540 { |
|
541 /* |
|
542 ** Create a server that listens for connections and then services |
|
543 ** requests that come in over those connections. The server never |
|
544 ** deletes a connection and assumes a basic RPC model of operation. |
|
545 ** |
|
546 ** Use worker_threads threads to service how every many open ports |
|
547 ** there might be. |
|
548 ** |
|
549 ** Oh, ya. Almost forget. Create (some) clients as well. |
|
550 */ |
|
551 PRStatus rv; |
|
552 PRIntn index; |
|
553 PRThread *server_thread, *enumeration_thread, **client_thread; |
|
554 |
|
555 if (verbosity > quiet) |
|
556 PR_fprintf(debug, "%s: creating server_thread\n", shared->title); |
|
557 |
|
558 server_thread = PR_CreateThread( |
|
559 PR_USER_THREAD, ServerThread, shared, |
|
560 PR_PRIORITY_HIGH, thread_scope, |
|
561 PR_JOINABLE_THREAD, 16 * 1024); |
|
562 |
|
563 if (verbosity > quiet) |
|
564 PR_fprintf(debug, "%s: creating enumeration_thread\n", shared->title); |
|
565 |
|
566 enumeration_thread = PR_CreateThread( |
|
567 PR_USER_THREAD, EnumerationThread, shared, |
|
568 PR_PRIORITY_HIGH, thread_scope, |
|
569 PR_JOINABLE_THREAD, 16 * 1024); |
|
570 |
|
571 if (verbosity > quiet) |
|
572 PR_fprintf(debug, "%s: snoozing before creating clients\n", shared->title); |
|
573 PR_Sleep(5 * shared->timeout); |
|
574 |
|
575 if (verbosity > quiet) |
|
576 PR_fprintf(debug, "%s: creating client_threads\n", shared->title); |
|
577 client_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * client_threads); |
|
578 for (index = 0; index < client_threads; ++index) |
|
579 { |
|
580 client_thread[index] = PR_CreateThread( |
|
581 PR_USER_THREAD, ClientThread, shared, |
|
582 PR_PRIORITY_NORMAL, thread_scope, |
|
583 PR_JOINABLE_THREAD, 16 * 1024); |
|
584 } |
|
585 |
|
586 while (ops_done < ops_required) PR_Sleep(shared->timeout); |
|
587 |
|
588 if (verbosity > quiet) |
|
589 PR_fprintf(debug, "%s: interrupting/joining client_threads\n", shared->title); |
|
590 for (index = 0; index < client_threads; ++index) |
|
591 { |
|
592 rv = PR_Interrupt(client_thread[index]); |
|
593 MW_ASSERT(PR_SUCCESS == rv); |
|
594 rv = PR_JoinThread(client_thread[index]); |
|
595 MW_ASSERT(PR_SUCCESS == rv); |
|
596 } |
|
597 PR_DELETE(client_thread); |
|
598 |
|
599 if (verbosity > quiet) |
|
600 PR_fprintf(debug, "%s: interrupting/joining enumeration_thread\n", shared->title); |
|
601 rv = PR_Interrupt(enumeration_thread); |
|
602 MW_ASSERT(PR_SUCCESS == rv); |
|
603 rv = PR_JoinThread(enumeration_thread); |
|
604 MW_ASSERT(PR_SUCCESS == rv); |
|
605 |
|
606 if (verbosity > quiet) |
|
607 PR_fprintf(debug, "%s: interrupting/joining server_thread\n", shared->title); |
|
608 rv = PR_Interrupt(server_thread); |
|
609 MW_ASSERT(PR_SUCCESS == rv); |
|
610 rv = PR_JoinThread(server_thread); |
|
611 MW_ASSERT(PR_SUCCESS == rv); |
|
612 } /* RealOneGroupIO */ |
|
613 |
|
614 static void RunThisOne( |
|
615 void (*func)(Shared*), const char *name, const char *test_name) |
|
616 { |
|
617 Shared *shared; |
|
618 if ((NULL == test_name) || (0 == PL_strcmp(name, test_name))) |
|
619 { |
|
620 if (verbosity > silent) |
|
621 PR_fprintf(debug, "%s()\n", name); |
|
622 shared = MakeShared(name); |
|
623 ops_done = 0; |
|
624 func(shared); /* run the test */ |
|
625 MW_ASSERT(0 == desc_allocated); |
|
626 DestroyShared(shared); |
|
627 } |
|
628 } /* RunThisOne */ |
|
629 |
|
630 static Verbosity ChangeVerbosity(Verbosity verbosity, PRIntn delta) |
|
631 { |
|
632 PRIntn verbage = (PRIntn)verbosity; |
|
633 return (Verbosity)(verbage += delta); |
|
634 } /* ChangeVerbosity */ |
|
635 |
|
636 int main(int argc, char **argv) |
|
637 { |
|
638 PLOptStatus os; |
|
639 const char *test_name = NULL; |
|
640 PLOptState *opt = PL_CreateOptState(argc, argv, "dqGc:o:p:t:w:"); |
|
641 |
|
642 while (PL_OPT_EOL != (os = PL_GetNextOpt(opt))) |
|
643 { |
|
644 if (PL_OPT_BAD == os) continue; |
|
645 switch (opt->option) |
|
646 { |
|
647 case 0: |
|
648 test_name = opt->value; |
|
649 break; |
|
650 case 'd': /* debug mode */ |
|
651 if (verbosity < noisy) |
|
652 verbosity = ChangeVerbosity(verbosity, 1); |
|
653 break; |
|
654 case 'q': /* debug mode */ |
|
655 if (verbosity > silent) |
|
656 verbosity = ChangeVerbosity(verbosity, -1); |
|
657 break; |
|
658 case 'G': /* use global threads */ |
|
659 thread_scope = PR_GLOBAL_THREAD; |
|
660 break; |
|
661 case 'c': /* number of client threads */ |
|
662 client_threads = atoi(opt->value); |
|
663 break; |
|
664 case 'o': /* operations to compelete */ |
|
665 ops_required = atoi(opt->value); |
|
666 break; |
|
667 case 'p': /* default port */ |
|
668 default_port = atoi(opt->value); |
|
669 break; |
|
670 case 't': /* number of threads waiting */ |
|
671 worker_threads = atoi(opt->value); |
|
672 break; |
|
673 case 'w': /* number of wait objects */ |
|
674 wait_objects = atoi(opt->value); |
|
675 break; |
|
676 default: |
|
677 break; |
|
678 } |
|
679 } |
|
680 PL_DestroyOptState(opt); |
|
681 |
|
682 if (verbosity > 0) |
|
683 debug = PR_GetSpecialFD(PR_StandardError); |
|
684 |
|
685 RunThisOne(OneInThenCancelled, "OneInThenCancelled", test_name); |
|
686 RunThisOne(OneOpOneThread, "OneOpOneThread", test_name); |
|
687 RunThisOne(ManyOpOneThread, "ManyOpOneThread", test_name); |
|
688 RunThisOne(SomeOpsSomeThreads, "SomeOpsSomeThreads", test_name); |
|
689 RunThisOne(RealOneGroupIO, "RealOneGroupIO", test_name); |
|
690 return 0; |
|
691 } /* main */ |
|
692 |
|
693 /* multwait.c */ |