1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/nsprpub/pr/tests/multiwait.c Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,693 @@ 1.4 +/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 1.5 +/* This Source Code Form is subject to the terms of the Mozilla Public 1.6 + * License, v. 2.0. If a copy of the MPL was not distributed with this 1.7 + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 1.8 + 1.9 +#include "prio.h" 1.10 +#include "prprf.h" 1.11 +#include "prlog.h" 1.12 +#include "prmem.h" 1.13 +#include "pratom.h" 1.14 +#include "prlock.h" 1.15 +#include "prmwait.h" 1.16 +#include "prclist.h" 1.17 +#include "prerror.h" 1.18 +#include "prinrval.h" 1.19 +#include "prnetdb.h" 1.20 +#include "prthread.h" 1.21 + 1.22 +#include "plstr.h" 1.23 +#include "plerror.h" 1.24 +#include "plgetopt.h" 1.25 + 1.26 +#include <string.h> 1.27 + 1.28 +typedef struct Shared 1.29 +{ 1.30 + const char *title; 1.31 + PRLock *list_lock; 1.32 + PRWaitGroup *group; 1.33 + PRIntervalTime timeout; 1.34 +} Shared; 1.35 + 1.36 +typedef enum Verbosity {silent, quiet, chatty, noisy} Verbosity; 1.37 + 1.38 +static PRFileDesc *debug = NULL; 1.39 +static PRInt32 desc_allocated = 0; 1.40 +static PRUint16 default_port = 12273; 1.41 +static enum Verbosity verbosity = quiet; 1.42 +static PRInt32 ops_required = 1000, ops_done = 0; 1.43 +static PRThreadScope thread_scope = PR_LOCAL_THREAD; 1.44 +static PRIntn client_threads = 20, worker_threads = 2, wait_objects = 50; 1.45 + 1.46 +#if defined(DEBUG) 1.47 +#define MW_ASSERT(_expr) \ 1.48 + ((_expr)?((void)0):_MW_Assert(# _expr,__FILE__,__LINE__)) 1.49 +static void _MW_Assert(const char *s, const char *file, PRIntn ln) 1.50 +{ 1.51 + if (NULL != debug) PL_FPrintError(debug, NULL); 1.52 + PR_Assert(s, file, ln); 1.53 +} /* _MW_Assert */ 1.54 +#else 1.55 +#define MW_ASSERT(_expr) 1.56 +#endif 1.57 + 1.58 +static void PrintRecvDesc(PRRecvWait *desc, const char *msg) 1.59 +{ 1.60 + const char *tag[] = { 1.61 + "PR_MW_INTERRUPT", "PR_MW_TIMEOUT", 1.62 + "PR_MW_FAILURE", "PR_MW_SUCCESS", "PR_MW_PENDING"}; 1.63 + PR_fprintf( 1.64 + debug, "%s: PRRecvWait(@0x%x): {fd: 0x%x, outcome: %s, tmo: %u}\n", 1.65 + msg, desc, desc->fd, tag[desc->outcome + 3], desc->timeout); 1.66 +} /* PrintRecvDesc */ 1.67 + 1.68 +static Shared *MakeShared(const char *title) 1.69 +{ 1.70 + Shared *shared = PR_NEWZAP(Shared); 1.71 + shared->group = PR_CreateWaitGroup(1); 1.72 + shared->timeout = PR_SecondsToInterval(1); 1.73 + shared->list_lock = PR_NewLock(); 1.74 + shared->title = title; 1.75 + return shared; 1.76 +} /* MakeShared */ 1.77 + 1.78 +static void DestroyShared(Shared *shared) 1.79 +{ 1.80 + PRStatus rv; 1.81 + if (verbosity > quiet) 1.82 + PR_fprintf(debug, "%s: destroying group\n", shared->title); 1.83 + rv = PR_DestroyWaitGroup(shared->group); 1.84 + MW_ASSERT(PR_SUCCESS == rv); 1.85 + PR_DestroyLock(shared->list_lock); 1.86 + PR_DELETE(shared); 1.87 +} /* DestroyShared */ 1.88 + 1.89 +static PRRecvWait *CreateRecvWait(PRFileDesc *fd, PRIntervalTime timeout) 1.90 +{ 1.91 + PRRecvWait *desc_out = PR_NEWZAP(PRRecvWait); 1.92 + MW_ASSERT(NULL != desc_out); 1.93 + 1.94 + MW_ASSERT(NULL != fd); 1.95 + desc_out->fd = fd; 1.96 + desc_out->timeout = timeout; 1.97 + desc_out->buffer.length = 120; 1.98 + desc_out->buffer.start = PR_CALLOC(120); 1.99 + 1.100 + PR_AtomicIncrement(&desc_allocated); 1.101 + 1.102 + if (verbosity > chatty) 1.103 + PrintRecvDesc(desc_out, "Allocated"); 1.104 + return desc_out; 1.105 +} /* CreateRecvWait */ 1.106 + 1.107 +static void DestroyRecvWait(PRRecvWait *desc_out) 1.108 +{ 1.109 + if (verbosity > chatty) 1.110 + PrintRecvDesc(desc_out, "Destroying"); 1.111 + PR_Close(desc_out->fd); 1.112 + if (NULL != desc_out->buffer.start) 1.113 + PR_DELETE(desc_out->buffer.start); 1.114 + PR_Free(desc_out); 1.115 + (void)PR_AtomicDecrement(&desc_allocated); 1.116 +} /* DestroyRecvWait */ 1.117 + 1.118 +static void CancelGroup(Shared *shared) 1.119 +{ 1.120 + PRRecvWait *desc_out; 1.121 + 1.122 + if (verbosity > quiet) 1.123 + PR_fprintf(debug, "%s Reclaiming wait descriptors\n", shared->title); 1.124 + 1.125 + do 1.126 + { 1.127 + desc_out = PR_CancelWaitGroup(shared->group); 1.128 + if (NULL != desc_out) DestroyRecvWait(desc_out); 1.129 + } while (NULL != desc_out); 1.130 + 1.131 + MW_ASSERT(0 == desc_allocated); 1.132 + MW_ASSERT(PR_GROUP_EMPTY_ERROR == PR_GetError()); 1.133 +} /* CancelGroup */ 1.134 + 1.135 +static void PR_CALLBACK ClientThread(void* arg) 1.136 +{ 1.137 + PRStatus rv; 1.138 + PRInt32 bytes; 1.139 + PRIntn empty_flags = 0; 1.140 + PRNetAddr server_address; 1.141 + unsigned char buffer[100]; 1.142 + Shared *shared = (Shared*)arg; 1.143 + PRFileDesc *server = PR_NewTCPSocket(); 1.144 + if ((NULL == server) 1.145 + && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) return; 1.146 + MW_ASSERT(NULL != server); 1.147 + 1.148 + if (verbosity > chatty) 1.149 + PR_fprintf(debug, "%s: Server socket @0x%x\n", shared->title, server); 1.150 + 1.151 + /* Initialize the buffer so that Purify won't complain */ 1.152 + memset(buffer, 0, sizeof(buffer)); 1.153 + 1.154 + rv = PR_InitializeNetAddr(PR_IpAddrLoopback, default_port, &server_address); 1.155 + MW_ASSERT(PR_SUCCESS == rv); 1.156 + 1.157 + if (verbosity > quiet) 1.158 + PR_fprintf(debug, "%s: Client opening connection\n", shared->title); 1.159 + rv = PR_Connect(server, &server_address, PR_INTERVAL_NO_TIMEOUT); 1.160 + 1.161 + if (PR_FAILURE == rv) 1.162 + { 1.163 + if (verbosity > silent) PL_FPrintError(debug, "Client connect failed"); 1.164 + return; 1.165 + } 1.166 + 1.167 + while (ops_done < ops_required) 1.168 + { 1.169 + bytes = PR_Send( 1.170 + server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT); 1.171 + if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break; 1.172 + MW_ASSERT(sizeof(buffer) == bytes); 1.173 + if (verbosity > chatty) 1.174 + PR_fprintf( 1.175 + debug, "%s: Client sent %d bytes\n", 1.176 + shared->title, sizeof(buffer)); 1.177 + bytes = PR_Recv( 1.178 + server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT); 1.179 + if (verbosity > chatty) 1.180 + PR_fprintf( 1.181 + debug, "%s: Client received %d bytes\n", 1.182 + shared->title, sizeof(buffer)); 1.183 + if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break; 1.184 + MW_ASSERT(sizeof(buffer) == bytes); 1.185 + PR_Sleep(shared->timeout); 1.186 + } 1.187 + rv = PR_Close(server); 1.188 + MW_ASSERT(PR_SUCCESS == rv); 1.189 + 1.190 +} /* ClientThread */ 1.191 + 1.192 +static void OneInThenCancelled(Shared *shared) 1.193 +{ 1.194 + PRStatus rv; 1.195 + PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait); 1.196 + 1.197 + shared->timeout = PR_INTERVAL_NO_TIMEOUT; 1.198 + 1.199 + desc_in->fd = PR_NewTCPSocket(); 1.200 + desc_in->timeout = shared->timeout; 1.201 + 1.202 + if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc"); 1.203 + 1.204 + rv = PR_AddWaitFileDesc(shared->group, desc_in); 1.205 + MW_ASSERT(PR_SUCCESS == rv); 1.206 + 1.207 + if (verbosity > chatty) PrintRecvDesc(desc_in, "Cancelling"); 1.208 + rv = PR_CancelWaitFileDesc(shared->group, desc_in); 1.209 + MW_ASSERT(PR_SUCCESS == rv); 1.210 + 1.211 + desc_out = PR_WaitRecvReady(shared->group); 1.212 + MW_ASSERT(desc_out == desc_in); 1.213 + MW_ASSERT(PR_MW_INTERRUPT == desc_out->outcome); 1.214 + MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); 1.215 + if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready"); 1.216 + 1.217 + rv = PR_Close(desc_in->fd); 1.218 + MW_ASSERT(PR_SUCCESS == rv); 1.219 + 1.220 + if (verbosity > quiet) 1.221 + PR_fprintf(debug, "%s: destroying group\n", shared->title); 1.222 + 1.223 + PR_DELETE(desc_in); 1.224 +} /* OneInThenCancelled */ 1.225 + 1.226 +static void OneOpOneThread(Shared *shared) 1.227 +{ 1.228 + PRStatus rv; 1.229 + PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait); 1.230 + 1.231 + desc_in->fd = PR_NewTCPSocket(); 1.232 + desc_in->timeout = shared->timeout; 1.233 + 1.234 + if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc"); 1.235 + 1.236 + rv = PR_AddWaitFileDesc(shared->group, desc_in); 1.237 + MW_ASSERT(PR_SUCCESS == rv); 1.238 + desc_out = PR_WaitRecvReady(shared->group); 1.239 + MW_ASSERT(desc_out == desc_in); 1.240 + MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome); 1.241 + MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError()); 1.242 + if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready"); 1.243 + 1.244 + rv = PR_Close(desc_in->fd); 1.245 + MW_ASSERT(PR_SUCCESS == rv); 1.246 + 1.247 + PR_DELETE(desc_in); 1.248 +} /* OneOpOneThread */ 1.249 + 1.250 +static void ManyOpOneThread(Shared *shared) 1.251 +{ 1.252 + PRStatus rv; 1.253 + PRIntn index; 1.254 + PRRecvWait *desc_in; 1.255 + PRRecvWait *desc_out; 1.256 + 1.257 + if (verbosity > quiet) 1.258 + PR_fprintf(debug, "%s: adding %d descs\n", shared->title, wait_objects); 1.259 + 1.260 + for (index = 0; index < wait_objects; ++index) 1.261 + { 1.262 + desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout); 1.263 + 1.264 + rv = PR_AddWaitFileDesc(shared->group, desc_in); 1.265 + MW_ASSERT(PR_SUCCESS == rv); 1.266 + } 1.267 + 1.268 + while (ops_done < ops_required) 1.269 + { 1.270 + desc_out = PR_WaitRecvReady(shared->group); 1.271 + MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome); 1.272 + MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError()); 1.273 + if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready/readding"); 1.274 + rv = PR_AddWaitFileDesc(shared->group, desc_out); 1.275 + MW_ASSERT(PR_SUCCESS == rv); 1.276 + (void)PR_AtomicIncrement(&ops_done); 1.277 + } 1.278 + 1.279 + CancelGroup(shared); 1.280 +} /* ManyOpOneThread */ 1.281 + 1.282 +static void PR_CALLBACK SomeOpsThread(void *arg) 1.283 +{ 1.284 + PRRecvWait *desc_out; 1.285 + PRStatus rv = PR_SUCCESS; 1.286 + Shared *shared = (Shared*)arg; 1.287 + do /* until interrupted */ 1.288 + { 1.289 + desc_out = PR_WaitRecvReady(shared->group); 1.290 + if (NULL == desc_out) 1.291 + { 1.292 + MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); 1.293 + if (verbosity > quiet) PR_fprintf(debug, "Aborted\n"); 1.294 + break; 1.295 + } 1.296 + MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome); 1.297 + MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError()); 1.298 + if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready"); 1.299 + 1.300 + if (verbosity > chatty) PrintRecvDesc(desc_out, "Re-Adding"); 1.301 + desc_out->timeout = shared->timeout; 1.302 + rv = PR_AddWaitFileDesc(shared->group, desc_out); 1.303 + PR_AtomicIncrement(&ops_done); 1.304 + if (ops_done > ops_required) break; 1.305 + } while (PR_SUCCESS == rv); 1.306 + MW_ASSERT(PR_SUCCESS == rv); 1.307 +} /* SomeOpsThread */ 1.308 + 1.309 +static void SomeOpsSomeThreads(Shared *shared) 1.310 +{ 1.311 + PRStatus rv; 1.312 + PRThread **thread; 1.313 + PRIntn index; 1.314 + PRRecvWait *desc_in; 1.315 + 1.316 + thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads); 1.317 + 1.318 + /* Create some threads */ 1.319 + 1.320 + if (verbosity > quiet) 1.321 + PR_fprintf(debug, "%s: creating threads\n", shared->title); 1.322 + for (index = 0; index < worker_threads; ++index) 1.323 + { 1.324 + thread[index] = PR_CreateThread( 1.325 + PR_USER_THREAD, SomeOpsThread, shared, 1.326 + PR_PRIORITY_HIGH, thread_scope, 1.327 + PR_JOINABLE_THREAD, 16 * 1024); 1.328 + } 1.329 + 1.330 + /* then create some operations */ 1.331 + if (verbosity > quiet) 1.332 + PR_fprintf(debug, "%s: creating desc\n", shared->title); 1.333 + for (index = 0; index < wait_objects; ++index) 1.334 + { 1.335 + desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout); 1.336 + rv = PR_AddWaitFileDesc(shared->group, desc_in); 1.337 + MW_ASSERT(PR_SUCCESS == rv); 1.338 + } 1.339 + 1.340 + if (verbosity > quiet) 1.341 + PR_fprintf(debug, "%s: sleeping\n", shared->title); 1.342 + while (ops_done < ops_required) PR_Sleep(shared->timeout); 1.343 + 1.344 + if (verbosity > quiet) 1.345 + PR_fprintf(debug, "%s: interrupting/joining threads\n", shared->title); 1.346 + for (index = 0; index < worker_threads; ++index) 1.347 + { 1.348 + rv = PR_Interrupt(thread[index]); 1.349 + MW_ASSERT(PR_SUCCESS == rv); 1.350 + rv = PR_JoinThread(thread[index]); 1.351 + MW_ASSERT(PR_SUCCESS == rv); 1.352 + } 1.353 + PR_DELETE(thread); 1.354 + 1.355 + CancelGroup(shared); 1.356 +} /* SomeOpsSomeThreads */ 1.357 + 1.358 +static PRStatus ServiceRequest(Shared *shared, PRRecvWait *desc) 1.359 +{ 1.360 + PRInt32 bytes_out; 1.361 + 1.362 + if (verbosity > chatty) 1.363 + PR_fprintf( 1.364 + debug, "%s: Service received %d bytes\n", 1.365 + shared->title, desc->bytesRecv); 1.366 + 1.367 + if (0 == desc->bytesRecv) goto quitting; 1.368 + if ((-1 == desc->bytesRecv) 1.369 + && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted; 1.370 + 1.371 + bytes_out = PR_Send( 1.372 + desc->fd, desc->buffer.start, desc->bytesRecv, 0, shared->timeout); 1.373 + if (verbosity > chatty) 1.374 + PR_fprintf( 1.375 + debug, "%s: Service sent %d bytes\n", 1.376 + shared->title, bytes_out); 1.377 + 1.378 + if ((-1 == bytes_out) 1.379 + && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted; 1.380 + MW_ASSERT(bytes_out == desc->bytesRecv); 1.381 + 1.382 + return PR_SUCCESS; 1.383 + 1.384 +aborted: 1.385 +quitting: 1.386 + return PR_FAILURE; 1.387 +} /* ServiceRequest */ 1.388 + 1.389 +static void PR_CALLBACK ServiceThread(void *arg) 1.390 +{ 1.391 + PRStatus rv = PR_SUCCESS; 1.392 + PRRecvWait *desc_out = NULL; 1.393 + Shared *shared = (Shared*)arg; 1.394 + do /* until interrupted */ 1.395 + { 1.396 + if (NULL != desc_out) 1.397 + { 1.398 + desc_out->timeout = PR_INTERVAL_NO_TIMEOUT; 1.399 + if (verbosity > chatty) 1.400 + PrintRecvDesc(desc_out, "Service re-adding"); 1.401 + rv = PR_AddWaitFileDesc(shared->group, desc_out); 1.402 + MW_ASSERT(PR_SUCCESS == rv); 1.403 + } 1.404 + 1.405 + desc_out = PR_WaitRecvReady(shared->group); 1.406 + if (NULL == desc_out) 1.407 + { 1.408 + MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); 1.409 + break; 1.410 + } 1.411 + 1.412 + switch (desc_out->outcome) 1.413 + { 1.414 + case PR_MW_SUCCESS: 1.415 + { 1.416 + PR_AtomicIncrement(&ops_done); 1.417 + if (verbosity > chatty) 1.418 + PrintRecvDesc(desc_out, "Service ready"); 1.419 + rv = ServiceRequest(shared, desc_out); 1.420 + break; 1.421 + } 1.422 + case PR_MW_INTERRUPT: 1.423 + MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); 1.424 + rv = PR_FAILURE; /* if interrupted, then exit */ 1.425 + break; 1.426 + case PR_MW_TIMEOUT: 1.427 + MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError()); 1.428 + case PR_MW_FAILURE: 1.429 + if (verbosity > silent) 1.430 + PL_FPrintError(debug, "RecvReady failure"); 1.431 + break; 1.432 + default: 1.433 + break; 1.434 + } 1.435 + } while (PR_SUCCESS == rv); 1.436 + 1.437 + if (NULL != desc_out) DestroyRecvWait(desc_out); 1.438 + 1.439 +} /* ServiceThread */ 1.440 + 1.441 +static void PR_CALLBACK EnumerationThread(void *arg) 1.442 +{ 1.443 + PRStatus rv; 1.444 + PRIntn count; 1.445 + PRRecvWait *desc; 1.446 + Shared *shared = (Shared*)arg; 1.447 + PRIntervalTime five_seconds = PR_SecondsToInterval(5); 1.448 + PRMWaitEnumerator *enumerator = PR_CreateMWaitEnumerator(shared->group); 1.449 + MW_ASSERT(NULL != enumerator); 1.450 + 1.451 + while (PR_SUCCESS == PR_Sleep(five_seconds)) 1.452 + { 1.453 + count = 0; 1.454 + desc = NULL; 1.455 + while (NULL != (desc = PR_EnumerateWaitGroup(enumerator, desc))) 1.456 + { 1.457 + if (verbosity > chatty) PrintRecvDesc(desc, shared->title); 1.458 + count += 1; 1.459 + } 1.460 + if (verbosity > silent) 1.461 + PR_fprintf(debug, 1.462 + "%s Enumerated %d objects\n", shared->title, count); 1.463 + } 1.464 + 1.465 + MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); 1.466 + 1.467 + 1.468 + rv = PR_DestroyMWaitEnumerator(enumerator); 1.469 + MW_ASSERT(PR_SUCCESS == rv); 1.470 +} /* EnumerationThread */ 1.471 + 1.472 +static void PR_CALLBACK ServerThread(void *arg) 1.473 +{ 1.474 + PRStatus rv; 1.475 + PRIntn index; 1.476 + PRRecvWait *desc_in; 1.477 + PRThread **worker_thread; 1.478 + Shared *shared = (Shared*)arg; 1.479 + PRFileDesc *listener, *service; 1.480 + PRNetAddr server_address, client_address; 1.481 + 1.482 + worker_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads); 1.483 + if (verbosity > quiet) 1.484 + PR_fprintf(debug, "%s: Server creating worker_threads\n", shared->title); 1.485 + for (index = 0; index < worker_threads; ++index) 1.486 + { 1.487 + worker_thread[index] = PR_CreateThread( 1.488 + PR_USER_THREAD, ServiceThread, shared, 1.489 + PR_PRIORITY_HIGH, thread_scope, 1.490 + PR_JOINABLE_THREAD, 16 * 1024); 1.491 + } 1.492 + 1.493 + rv = PR_InitializeNetAddr(PR_IpAddrAny, default_port, &server_address); 1.494 + MW_ASSERT(PR_SUCCESS == rv); 1.495 + 1.496 + listener = PR_NewTCPSocket(); MW_ASSERT(NULL != listener); 1.497 + if (verbosity > chatty) 1.498 + PR_fprintf( 1.499 + debug, "%s: Server listener socket @0x%x\n", 1.500 + shared->title, listener); 1.501 + rv = PR_Bind(listener, &server_address); MW_ASSERT(PR_SUCCESS == rv); 1.502 + rv = PR_Listen(listener, 10); MW_ASSERT(PR_SUCCESS == rv); 1.503 + while (ops_done < ops_required) 1.504 + { 1.505 + if (verbosity > quiet) 1.506 + PR_fprintf(debug, "%s: Server accepting connection\n", shared->title); 1.507 + service = PR_Accept(listener, &client_address, PR_INTERVAL_NO_TIMEOUT); 1.508 + if (NULL == service) 1.509 + { 1.510 + if (PR_PENDING_INTERRUPT_ERROR == PR_GetError()) break; 1.511 + PL_PrintError("Accept failed"); 1.512 + MW_ASSERT(!"Accept failed"); 1.513 + } 1.514 + else 1.515 + { 1.516 + desc_in = CreateRecvWait(service, shared->timeout); 1.517 + desc_in->timeout = PR_INTERVAL_NO_TIMEOUT; 1.518 + if (verbosity > chatty) 1.519 + PrintRecvDesc(desc_in, "Service adding"); 1.520 + rv = PR_AddWaitFileDesc(shared->group, desc_in); 1.521 + MW_ASSERT(PR_SUCCESS == rv); 1.522 + } 1.523 + } 1.524 + 1.525 + if (verbosity > quiet) 1.526 + PR_fprintf(debug, "%s: Server interrupting worker_threads\n", shared->title); 1.527 + for (index = 0; index < worker_threads; ++index) 1.528 + { 1.529 + rv = PR_Interrupt(worker_thread[index]); 1.530 + MW_ASSERT(PR_SUCCESS == rv); 1.531 + rv = PR_JoinThread(worker_thread[index]); 1.532 + MW_ASSERT(PR_SUCCESS == rv); 1.533 + } 1.534 + PR_DELETE(worker_thread); 1.535 + 1.536 + PR_Close(listener); 1.537 + 1.538 + CancelGroup(shared); 1.539 + 1.540 +} /* ServerThread */ 1.541 + 1.542 +static void RealOneGroupIO(Shared *shared) 1.543 +{ 1.544 + /* 1.545 + ** Create a server that listens for connections and then services 1.546 + ** requests that come in over those connections. The server never 1.547 + ** deletes a connection and assumes a basic RPC model of operation. 1.548 + ** 1.549 + ** Use worker_threads threads to service how every many open ports 1.550 + ** there might be. 1.551 + ** 1.552 + ** Oh, ya. Almost forget. Create (some) clients as well. 1.553 + */ 1.554 + PRStatus rv; 1.555 + PRIntn index; 1.556 + PRThread *server_thread, *enumeration_thread, **client_thread; 1.557 + 1.558 + if (verbosity > quiet) 1.559 + PR_fprintf(debug, "%s: creating server_thread\n", shared->title); 1.560 + 1.561 + server_thread = PR_CreateThread( 1.562 + PR_USER_THREAD, ServerThread, shared, 1.563 + PR_PRIORITY_HIGH, thread_scope, 1.564 + PR_JOINABLE_THREAD, 16 * 1024); 1.565 + 1.566 + if (verbosity > quiet) 1.567 + PR_fprintf(debug, "%s: creating enumeration_thread\n", shared->title); 1.568 + 1.569 + enumeration_thread = PR_CreateThread( 1.570 + PR_USER_THREAD, EnumerationThread, shared, 1.571 + PR_PRIORITY_HIGH, thread_scope, 1.572 + PR_JOINABLE_THREAD, 16 * 1024); 1.573 + 1.574 + if (verbosity > quiet) 1.575 + PR_fprintf(debug, "%s: snoozing before creating clients\n", shared->title); 1.576 + PR_Sleep(5 * shared->timeout); 1.577 + 1.578 + if (verbosity > quiet) 1.579 + PR_fprintf(debug, "%s: creating client_threads\n", shared->title); 1.580 + client_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * client_threads); 1.581 + for (index = 0; index < client_threads; ++index) 1.582 + { 1.583 + client_thread[index] = PR_CreateThread( 1.584 + PR_USER_THREAD, ClientThread, shared, 1.585 + PR_PRIORITY_NORMAL, thread_scope, 1.586 + PR_JOINABLE_THREAD, 16 * 1024); 1.587 + } 1.588 + 1.589 + while (ops_done < ops_required) PR_Sleep(shared->timeout); 1.590 + 1.591 + if (verbosity > quiet) 1.592 + PR_fprintf(debug, "%s: interrupting/joining client_threads\n", shared->title); 1.593 + for (index = 0; index < client_threads; ++index) 1.594 + { 1.595 + rv = PR_Interrupt(client_thread[index]); 1.596 + MW_ASSERT(PR_SUCCESS == rv); 1.597 + rv = PR_JoinThread(client_thread[index]); 1.598 + MW_ASSERT(PR_SUCCESS == rv); 1.599 + } 1.600 + PR_DELETE(client_thread); 1.601 + 1.602 + if (verbosity > quiet) 1.603 + PR_fprintf(debug, "%s: interrupting/joining enumeration_thread\n", shared->title); 1.604 + rv = PR_Interrupt(enumeration_thread); 1.605 + MW_ASSERT(PR_SUCCESS == rv); 1.606 + rv = PR_JoinThread(enumeration_thread); 1.607 + MW_ASSERT(PR_SUCCESS == rv); 1.608 + 1.609 + if (verbosity > quiet) 1.610 + PR_fprintf(debug, "%s: interrupting/joining server_thread\n", shared->title); 1.611 + rv = PR_Interrupt(server_thread); 1.612 + MW_ASSERT(PR_SUCCESS == rv); 1.613 + rv = PR_JoinThread(server_thread); 1.614 + MW_ASSERT(PR_SUCCESS == rv); 1.615 +} /* RealOneGroupIO */ 1.616 + 1.617 +static void RunThisOne( 1.618 + void (*func)(Shared*), const char *name, const char *test_name) 1.619 +{ 1.620 + Shared *shared; 1.621 + if ((NULL == test_name) || (0 == PL_strcmp(name, test_name))) 1.622 + { 1.623 + if (verbosity > silent) 1.624 + PR_fprintf(debug, "%s()\n", name); 1.625 + shared = MakeShared(name); 1.626 + ops_done = 0; 1.627 + func(shared); /* run the test */ 1.628 + MW_ASSERT(0 == desc_allocated); 1.629 + DestroyShared(shared); 1.630 + } 1.631 +} /* RunThisOne */ 1.632 + 1.633 +static Verbosity ChangeVerbosity(Verbosity verbosity, PRIntn delta) 1.634 +{ 1.635 + PRIntn verbage = (PRIntn)verbosity; 1.636 + return (Verbosity)(verbage += delta); 1.637 +} /* ChangeVerbosity */ 1.638 + 1.639 +int main(int argc, char **argv) 1.640 +{ 1.641 + PLOptStatus os; 1.642 + const char *test_name = NULL; 1.643 + PLOptState *opt = PL_CreateOptState(argc, argv, "dqGc:o:p:t:w:"); 1.644 + 1.645 + while (PL_OPT_EOL != (os = PL_GetNextOpt(opt))) 1.646 + { 1.647 + if (PL_OPT_BAD == os) continue; 1.648 + switch (opt->option) 1.649 + { 1.650 + case 0: 1.651 + test_name = opt->value; 1.652 + break; 1.653 + case 'd': /* debug mode */ 1.654 + if (verbosity < noisy) 1.655 + verbosity = ChangeVerbosity(verbosity, 1); 1.656 + break; 1.657 + case 'q': /* debug mode */ 1.658 + if (verbosity > silent) 1.659 + verbosity = ChangeVerbosity(verbosity, -1); 1.660 + break; 1.661 + case 'G': /* use global threads */ 1.662 + thread_scope = PR_GLOBAL_THREAD; 1.663 + break; 1.664 + case 'c': /* number of client threads */ 1.665 + client_threads = atoi(opt->value); 1.666 + break; 1.667 + case 'o': /* operations to compelete */ 1.668 + ops_required = atoi(opt->value); 1.669 + break; 1.670 + case 'p': /* default port */ 1.671 + default_port = atoi(opt->value); 1.672 + break; 1.673 + case 't': /* number of threads waiting */ 1.674 + worker_threads = atoi(opt->value); 1.675 + break; 1.676 + case 'w': /* number of wait objects */ 1.677 + wait_objects = atoi(opt->value); 1.678 + break; 1.679 + default: 1.680 + break; 1.681 + } 1.682 + } 1.683 + PL_DestroyOptState(opt); 1.684 + 1.685 + if (verbosity > 0) 1.686 + debug = PR_GetSpecialFD(PR_StandardError); 1.687 + 1.688 + RunThisOne(OneInThenCancelled, "OneInThenCancelled", test_name); 1.689 + RunThisOne(OneOpOneThread, "OneOpOneThread", test_name); 1.690 + RunThisOne(ManyOpOneThread, "ManyOpOneThread", test_name); 1.691 + RunThisOne(SomeOpsSomeThreads, "SomeOpsSomeThreads", test_name); 1.692 + RunThisOne(RealOneGroupIO, "RealOneGroupIO", test_name); 1.693 + return 0; 1.694 +} /* main */ 1.695 + 1.696 +/* multwait.c */