nsprpub/pr/tests/multiwait.c

Wed, 31 Dec 2014 07:53:36 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 07:53:36 +0100
branch
TOR_BUG_3246
changeset 5
4ab42b5ab56c
permissions
-rw-r--r--

Correct small whitespace inconsistency, lost while renaming variables.

michael@0 1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
michael@0 2 /* This Source Code Form is subject to the terms of the Mozilla Public
michael@0 3 * License, v. 2.0. If a copy of the MPL was not distributed with this
michael@0 4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
michael@0 5
michael@0 6 #include "prio.h"
michael@0 7 #include "prprf.h"
michael@0 8 #include "prlog.h"
michael@0 9 #include "prmem.h"
michael@0 10 #include "pratom.h"
michael@0 11 #include "prlock.h"
michael@0 12 #include "prmwait.h"
michael@0 13 #include "prclist.h"
michael@0 14 #include "prerror.h"
michael@0 15 #include "prinrval.h"
michael@0 16 #include "prnetdb.h"
michael@0 17 #include "prthread.h"
michael@0 18
michael@0 19 #include "plstr.h"
michael@0 20 #include "plerror.h"
michael@0 21 #include "plgetopt.h"
michael@0 22
michael@0 23 #include <string.h>
michael@0 24
michael@0 25 typedef struct Shared
michael@0 26 {
michael@0 27 const char *title;
michael@0 28 PRLock *list_lock;
michael@0 29 PRWaitGroup *group;
michael@0 30 PRIntervalTime timeout;
michael@0 31 } Shared;
michael@0 32
michael@0 33 typedef enum Verbosity {silent, quiet, chatty, noisy} Verbosity;
michael@0 34
michael@0 35 static PRFileDesc *debug = NULL;
michael@0 36 static PRInt32 desc_allocated = 0;
michael@0 37 static PRUint16 default_port = 12273;
michael@0 38 static enum Verbosity verbosity = quiet;
michael@0 39 static PRInt32 ops_required = 1000, ops_done = 0;
michael@0 40 static PRThreadScope thread_scope = PR_LOCAL_THREAD;
michael@0 41 static PRIntn client_threads = 20, worker_threads = 2, wait_objects = 50;
michael@0 42
michael@0 43 #if defined(DEBUG)
michael@0 44 #define MW_ASSERT(_expr) \
michael@0 45 ((_expr)?((void)0):_MW_Assert(# _expr,__FILE__,__LINE__))
michael@0 46 static void _MW_Assert(const char *s, const char *file, PRIntn ln)
michael@0 47 {
michael@0 48 if (NULL != debug) PL_FPrintError(debug, NULL);
michael@0 49 PR_Assert(s, file, ln);
michael@0 50 } /* _MW_Assert */
michael@0 51 #else
michael@0 52 #define MW_ASSERT(_expr)
michael@0 53 #endif
michael@0 54
michael@0 55 static void PrintRecvDesc(PRRecvWait *desc, const char *msg)
michael@0 56 {
michael@0 57 const char *tag[] = {
michael@0 58 "PR_MW_INTERRUPT", "PR_MW_TIMEOUT",
michael@0 59 "PR_MW_FAILURE", "PR_MW_SUCCESS", "PR_MW_PENDING"};
michael@0 60 PR_fprintf(
michael@0 61 debug, "%s: PRRecvWait(@0x%x): {fd: 0x%x, outcome: %s, tmo: %u}\n",
michael@0 62 msg, desc, desc->fd, tag[desc->outcome + 3], desc->timeout);
michael@0 63 } /* PrintRecvDesc */
michael@0 64
michael@0 65 static Shared *MakeShared(const char *title)
michael@0 66 {
michael@0 67 Shared *shared = PR_NEWZAP(Shared);
michael@0 68 shared->group = PR_CreateWaitGroup(1);
michael@0 69 shared->timeout = PR_SecondsToInterval(1);
michael@0 70 shared->list_lock = PR_NewLock();
michael@0 71 shared->title = title;
michael@0 72 return shared;
michael@0 73 } /* MakeShared */
michael@0 74
michael@0 75 static void DestroyShared(Shared *shared)
michael@0 76 {
michael@0 77 PRStatus rv;
michael@0 78 if (verbosity > quiet)
michael@0 79 PR_fprintf(debug, "%s: destroying group\n", shared->title);
michael@0 80 rv = PR_DestroyWaitGroup(shared->group);
michael@0 81 MW_ASSERT(PR_SUCCESS == rv);
michael@0 82 PR_DestroyLock(shared->list_lock);
michael@0 83 PR_DELETE(shared);
michael@0 84 } /* DestroyShared */
michael@0 85
michael@0 86 static PRRecvWait *CreateRecvWait(PRFileDesc *fd, PRIntervalTime timeout)
michael@0 87 {
michael@0 88 PRRecvWait *desc_out = PR_NEWZAP(PRRecvWait);
michael@0 89 MW_ASSERT(NULL != desc_out);
michael@0 90
michael@0 91 MW_ASSERT(NULL != fd);
michael@0 92 desc_out->fd = fd;
michael@0 93 desc_out->timeout = timeout;
michael@0 94 desc_out->buffer.length = 120;
michael@0 95 desc_out->buffer.start = PR_CALLOC(120);
michael@0 96
michael@0 97 PR_AtomicIncrement(&desc_allocated);
michael@0 98
michael@0 99 if (verbosity > chatty)
michael@0 100 PrintRecvDesc(desc_out, "Allocated");
michael@0 101 return desc_out;
michael@0 102 } /* CreateRecvWait */
michael@0 103
michael@0 104 static void DestroyRecvWait(PRRecvWait *desc_out)
michael@0 105 {
michael@0 106 if (verbosity > chatty)
michael@0 107 PrintRecvDesc(desc_out, "Destroying");
michael@0 108 PR_Close(desc_out->fd);
michael@0 109 if (NULL != desc_out->buffer.start)
michael@0 110 PR_DELETE(desc_out->buffer.start);
michael@0 111 PR_Free(desc_out);
michael@0 112 (void)PR_AtomicDecrement(&desc_allocated);
michael@0 113 } /* DestroyRecvWait */
michael@0 114
michael@0 115 static void CancelGroup(Shared *shared)
michael@0 116 {
michael@0 117 PRRecvWait *desc_out;
michael@0 118
michael@0 119 if (verbosity > quiet)
michael@0 120 PR_fprintf(debug, "%s Reclaiming wait descriptors\n", shared->title);
michael@0 121
michael@0 122 do
michael@0 123 {
michael@0 124 desc_out = PR_CancelWaitGroup(shared->group);
michael@0 125 if (NULL != desc_out) DestroyRecvWait(desc_out);
michael@0 126 } while (NULL != desc_out);
michael@0 127
michael@0 128 MW_ASSERT(0 == desc_allocated);
michael@0 129 MW_ASSERT(PR_GROUP_EMPTY_ERROR == PR_GetError());
michael@0 130 } /* CancelGroup */
michael@0 131
michael@0 132 static void PR_CALLBACK ClientThread(void* arg)
michael@0 133 {
michael@0 134 PRStatus rv;
michael@0 135 PRInt32 bytes;
michael@0 136 PRIntn empty_flags = 0;
michael@0 137 PRNetAddr server_address;
michael@0 138 unsigned char buffer[100];
michael@0 139 Shared *shared = (Shared*)arg;
michael@0 140 PRFileDesc *server = PR_NewTCPSocket();
michael@0 141 if ((NULL == server)
michael@0 142 && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) return;
michael@0 143 MW_ASSERT(NULL != server);
michael@0 144
michael@0 145 if (verbosity > chatty)
michael@0 146 PR_fprintf(debug, "%s: Server socket @0x%x\n", shared->title, server);
michael@0 147
michael@0 148 /* Initialize the buffer so that Purify won't complain */
michael@0 149 memset(buffer, 0, sizeof(buffer));
michael@0 150
michael@0 151 rv = PR_InitializeNetAddr(PR_IpAddrLoopback, default_port, &server_address);
michael@0 152 MW_ASSERT(PR_SUCCESS == rv);
michael@0 153
michael@0 154 if (verbosity > quiet)
michael@0 155 PR_fprintf(debug, "%s: Client opening connection\n", shared->title);
michael@0 156 rv = PR_Connect(server, &server_address, PR_INTERVAL_NO_TIMEOUT);
michael@0 157
michael@0 158 if (PR_FAILURE == rv)
michael@0 159 {
michael@0 160 if (verbosity > silent) PL_FPrintError(debug, "Client connect failed");
michael@0 161 return;
michael@0 162 }
michael@0 163
michael@0 164 while (ops_done < ops_required)
michael@0 165 {
michael@0 166 bytes = PR_Send(
michael@0 167 server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT);
michael@0 168 if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break;
michael@0 169 MW_ASSERT(sizeof(buffer) == bytes);
michael@0 170 if (verbosity > chatty)
michael@0 171 PR_fprintf(
michael@0 172 debug, "%s: Client sent %d bytes\n",
michael@0 173 shared->title, sizeof(buffer));
michael@0 174 bytes = PR_Recv(
michael@0 175 server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT);
michael@0 176 if (verbosity > chatty)
michael@0 177 PR_fprintf(
michael@0 178 debug, "%s: Client received %d bytes\n",
michael@0 179 shared->title, sizeof(buffer));
michael@0 180 if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break;
michael@0 181 MW_ASSERT(sizeof(buffer) == bytes);
michael@0 182 PR_Sleep(shared->timeout);
michael@0 183 }
michael@0 184 rv = PR_Close(server);
michael@0 185 MW_ASSERT(PR_SUCCESS == rv);
michael@0 186
michael@0 187 } /* ClientThread */
michael@0 188
michael@0 189 static void OneInThenCancelled(Shared *shared)
michael@0 190 {
michael@0 191 PRStatus rv;
michael@0 192 PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait);
michael@0 193
michael@0 194 shared->timeout = PR_INTERVAL_NO_TIMEOUT;
michael@0 195
michael@0 196 desc_in->fd = PR_NewTCPSocket();
michael@0 197 desc_in->timeout = shared->timeout;
michael@0 198
michael@0 199 if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc");
michael@0 200
michael@0 201 rv = PR_AddWaitFileDesc(shared->group, desc_in);
michael@0 202 MW_ASSERT(PR_SUCCESS == rv);
michael@0 203
michael@0 204 if (verbosity > chatty) PrintRecvDesc(desc_in, "Cancelling");
michael@0 205 rv = PR_CancelWaitFileDesc(shared->group, desc_in);
michael@0 206 MW_ASSERT(PR_SUCCESS == rv);
michael@0 207
michael@0 208 desc_out = PR_WaitRecvReady(shared->group);
michael@0 209 MW_ASSERT(desc_out == desc_in);
michael@0 210 MW_ASSERT(PR_MW_INTERRUPT == desc_out->outcome);
michael@0 211 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
michael@0 212 if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready");
michael@0 213
michael@0 214 rv = PR_Close(desc_in->fd);
michael@0 215 MW_ASSERT(PR_SUCCESS == rv);
michael@0 216
michael@0 217 if (verbosity > quiet)
michael@0 218 PR_fprintf(debug, "%s: destroying group\n", shared->title);
michael@0 219
michael@0 220 PR_DELETE(desc_in);
michael@0 221 } /* OneInThenCancelled */
michael@0 222
michael@0 223 static void OneOpOneThread(Shared *shared)
michael@0 224 {
michael@0 225 PRStatus rv;
michael@0 226 PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait);
michael@0 227
michael@0 228 desc_in->fd = PR_NewTCPSocket();
michael@0 229 desc_in->timeout = shared->timeout;
michael@0 230
michael@0 231 if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc");
michael@0 232
michael@0 233 rv = PR_AddWaitFileDesc(shared->group, desc_in);
michael@0 234 MW_ASSERT(PR_SUCCESS == rv);
michael@0 235 desc_out = PR_WaitRecvReady(shared->group);
michael@0 236 MW_ASSERT(desc_out == desc_in);
michael@0 237 MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
michael@0 238 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
michael@0 239 if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready");
michael@0 240
michael@0 241 rv = PR_Close(desc_in->fd);
michael@0 242 MW_ASSERT(PR_SUCCESS == rv);
michael@0 243
michael@0 244 PR_DELETE(desc_in);
michael@0 245 } /* OneOpOneThread */
michael@0 246
michael@0 247 static void ManyOpOneThread(Shared *shared)
michael@0 248 {
michael@0 249 PRStatus rv;
michael@0 250 PRIntn index;
michael@0 251 PRRecvWait *desc_in;
michael@0 252 PRRecvWait *desc_out;
michael@0 253
michael@0 254 if (verbosity > quiet)
michael@0 255 PR_fprintf(debug, "%s: adding %d descs\n", shared->title, wait_objects);
michael@0 256
michael@0 257 for (index = 0; index < wait_objects; ++index)
michael@0 258 {
michael@0 259 desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout);
michael@0 260
michael@0 261 rv = PR_AddWaitFileDesc(shared->group, desc_in);
michael@0 262 MW_ASSERT(PR_SUCCESS == rv);
michael@0 263 }
michael@0 264
michael@0 265 while (ops_done < ops_required)
michael@0 266 {
michael@0 267 desc_out = PR_WaitRecvReady(shared->group);
michael@0 268 MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
michael@0 269 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
michael@0 270 if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready/readding");
michael@0 271 rv = PR_AddWaitFileDesc(shared->group, desc_out);
michael@0 272 MW_ASSERT(PR_SUCCESS == rv);
michael@0 273 (void)PR_AtomicIncrement(&ops_done);
michael@0 274 }
michael@0 275
michael@0 276 CancelGroup(shared);
michael@0 277 } /* ManyOpOneThread */
michael@0 278
michael@0 279 static void PR_CALLBACK SomeOpsThread(void *arg)
michael@0 280 {
michael@0 281 PRRecvWait *desc_out;
michael@0 282 PRStatus rv = PR_SUCCESS;
michael@0 283 Shared *shared = (Shared*)arg;
michael@0 284 do /* until interrupted */
michael@0 285 {
michael@0 286 desc_out = PR_WaitRecvReady(shared->group);
michael@0 287 if (NULL == desc_out)
michael@0 288 {
michael@0 289 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
michael@0 290 if (verbosity > quiet) PR_fprintf(debug, "Aborted\n");
michael@0 291 break;
michael@0 292 }
michael@0 293 MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
michael@0 294 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
michael@0 295 if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready");
michael@0 296
michael@0 297 if (verbosity > chatty) PrintRecvDesc(desc_out, "Re-Adding");
michael@0 298 desc_out->timeout = shared->timeout;
michael@0 299 rv = PR_AddWaitFileDesc(shared->group, desc_out);
michael@0 300 PR_AtomicIncrement(&ops_done);
michael@0 301 if (ops_done > ops_required) break;
michael@0 302 } while (PR_SUCCESS == rv);
michael@0 303 MW_ASSERT(PR_SUCCESS == rv);
michael@0 304 } /* SomeOpsThread */
michael@0 305
michael@0 306 static void SomeOpsSomeThreads(Shared *shared)
michael@0 307 {
michael@0 308 PRStatus rv;
michael@0 309 PRThread **thread;
michael@0 310 PRIntn index;
michael@0 311 PRRecvWait *desc_in;
michael@0 312
michael@0 313 thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads);
michael@0 314
michael@0 315 /* Create some threads */
michael@0 316
michael@0 317 if (verbosity > quiet)
michael@0 318 PR_fprintf(debug, "%s: creating threads\n", shared->title);
michael@0 319 for (index = 0; index < worker_threads; ++index)
michael@0 320 {
michael@0 321 thread[index] = PR_CreateThread(
michael@0 322 PR_USER_THREAD, SomeOpsThread, shared,
michael@0 323 PR_PRIORITY_HIGH, thread_scope,
michael@0 324 PR_JOINABLE_THREAD, 16 * 1024);
michael@0 325 }
michael@0 326
michael@0 327 /* then create some operations */
michael@0 328 if (verbosity > quiet)
michael@0 329 PR_fprintf(debug, "%s: creating desc\n", shared->title);
michael@0 330 for (index = 0; index < wait_objects; ++index)
michael@0 331 {
michael@0 332 desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout);
michael@0 333 rv = PR_AddWaitFileDesc(shared->group, desc_in);
michael@0 334 MW_ASSERT(PR_SUCCESS == rv);
michael@0 335 }
michael@0 336
michael@0 337 if (verbosity > quiet)
michael@0 338 PR_fprintf(debug, "%s: sleeping\n", shared->title);
michael@0 339 while (ops_done < ops_required) PR_Sleep(shared->timeout);
michael@0 340
michael@0 341 if (verbosity > quiet)
michael@0 342 PR_fprintf(debug, "%s: interrupting/joining threads\n", shared->title);
michael@0 343 for (index = 0; index < worker_threads; ++index)
michael@0 344 {
michael@0 345 rv = PR_Interrupt(thread[index]);
michael@0 346 MW_ASSERT(PR_SUCCESS == rv);
michael@0 347 rv = PR_JoinThread(thread[index]);
michael@0 348 MW_ASSERT(PR_SUCCESS == rv);
michael@0 349 }
michael@0 350 PR_DELETE(thread);
michael@0 351
michael@0 352 CancelGroup(shared);
michael@0 353 } /* SomeOpsSomeThreads */
michael@0 354
michael@0 355 static PRStatus ServiceRequest(Shared *shared, PRRecvWait *desc)
michael@0 356 {
michael@0 357 PRInt32 bytes_out;
michael@0 358
michael@0 359 if (verbosity > chatty)
michael@0 360 PR_fprintf(
michael@0 361 debug, "%s: Service received %d bytes\n",
michael@0 362 shared->title, desc->bytesRecv);
michael@0 363
michael@0 364 if (0 == desc->bytesRecv) goto quitting;
michael@0 365 if ((-1 == desc->bytesRecv)
michael@0 366 && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted;
michael@0 367
michael@0 368 bytes_out = PR_Send(
michael@0 369 desc->fd, desc->buffer.start, desc->bytesRecv, 0, shared->timeout);
michael@0 370 if (verbosity > chatty)
michael@0 371 PR_fprintf(
michael@0 372 debug, "%s: Service sent %d bytes\n",
michael@0 373 shared->title, bytes_out);
michael@0 374
michael@0 375 if ((-1 == bytes_out)
michael@0 376 && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted;
michael@0 377 MW_ASSERT(bytes_out == desc->bytesRecv);
michael@0 378
michael@0 379 return PR_SUCCESS;
michael@0 380
michael@0 381 aborted:
michael@0 382 quitting:
michael@0 383 return PR_FAILURE;
michael@0 384 } /* ServiceRequest */
michael@0 385
michael@0 386 static void PR_CALLBACK ServiceThread(void *arg)
michael@0 387 {
michael@0 388 PRStatus rv = PR_SUCCESS;
michael@0 389 PRRecvWait *desc_out = NULL;
michael@0 390 Shared *shared = (Shared*)arg;
michael@0 391 do /* until interrupted */
michael@0 392 {
michael@0 393 if (NULL != desc_out)
michael@0 394 {
michael@0 395 desc_out->timeout = PR_INTERVAL_NO_TIMEOUT;
michael@0 396 if (verbosity > chatty)
michael@0 397 PrintRecvDesc(desc_out, "Service re-adding");
michael@0 398 rv = PR_AddWaitFileDesc(shared->group, desc_out);
michael@0 399 MW_ASSERT(PR_SUCCESS == rv);
michael@0 400 }
michael@0 401
michael@0 402 desc_out = PR_WaitRecvReady(shared->group);
michael@0 403 if (NULL == desc_out)
michael@0 404 {
michael@0 405 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
michael@0 406 break;
michael@0 407 }
michael@0 408
michael@0 409 switch (desc_out->outcome)
michael@0 410 {
michael@0 411 case PR_MW_SUCCESS:
michael@0 412 {
michael@0 413 PR_AtomicIncrement(&ops_done);
michael@0 414 if (verbosity > chatty)
michael@0 415 PrintRecvDesc(desc_out, "Service ready");
michael@0 416 rv = ServiceRequest(shared, desc_out);
michael@0 417 break;
michael@0 418 }
michael@0 419 case PR_MW_INTERRUPT:
michael@0 420 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
michael@0 421 rv = PR_FAILURE; /* if interrupted, then exit */
michael@0 422 break;
michael@0 423 case PR_MW_TIMEOUT:
michael@0 424 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
michael@0 425 case PR_MW_FAILURE:
michael@0 426 if (verbosity > silent)
michael@0 427 PL_FPrintError(debug, "RecvReady failure");
michael@0 428 break;
michael@0 429 default:
michael@0 430 break;
michael@0 431 }
michael@0 432 } while (PR_SUCCESS == rv);
michael@0 433
michael@0 434 if (NULL != desc_out) DestroyRecvWait(desc_out);
michael@0 435
michael@0 436 } /* ServiceThread */
michael@0 437
michael@0 438 static void PR_CALLBACK EnumerationThread(void *arg)
michael@0 439 {
michael@0 440 PRStatus rv;
michael@0 441 PRIntn count;
michael@0 442 PRRecvWait *desc;
michael@0 443 Shared *shared = (Shared*)arg;
michael@0 444 PRIntervalTime five_seconds = PR_SecondsToInterval(5);
michael@0 445 PRMWaitEnumerator *enumerator = PR_CreateMWaitEnumerator(shared->group);
michael@0 446 MW_ASSERT(NULL != enumerator);
michael@0 447
michael@0 448 while (PR_SUCCESS == PR_Sleep(five_seconds))
michael@0 449 {
michael@0 450 count = 0;
michael@0 451 desc = NULL;
michael@0 452 while (NULL != (desc = PR_EnumerateWaitGroup(enumerator, desc)))
michael@0 453 {
michael@0 454 if (verbosity > chatty) PrintRecvDesc(desc, shared->title);
michael@0 455 count += 1;
michael@0 456 }
michael@0 457 if (verbosity > silent)
michael@0 458 PR_fprintf(debug,
michael@0 459 "%s Enumerated %d objects\n", shared->title, count);
michael@0 460 }
michael@0 461
michael@0 462 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
michael@0 463
michael@0 464
michael@0 465 rv = PR_DestroyMWaitEnumerator(enumerator);
michael@0 466 MW_ASSERT(PR_SUCCESS == rv);
michael@0 467 } /* EnumerationThread */
michael@0 468
michael@0 469 static void PR_CALLBACK ServerThread(void *arg)
michael@0 470 {
michael@0 471 PRStatus rv;
michael@0 472 PRIntn index;
michael@0 473 PRRecvWait *desc_in;
michael@0 474 PRThread **worker_thread;
michael@0 475 Shared *shared = (Shared*)arg;
michael@0 476 PRFileDesc *listener, *service;
michael@0 477 PRNetAddr server_address, client_address;
michael@0 478
michael@0 479 worker_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads);
michael@0 480 if (verbosity > quiet)
michael@0 481 PR_fprintf(debug, "%s: Server creating worker_threads\n", shared->title);
michael@0 482 for (index = 0; index < worker_threads; ++index)
michael@0 483 {
michael@0 484 worker_thread[index] = PR_CreateThread(
michael@0 485 PR_USER_THREAD, ServiceThread, shared,
michael@0 486 PR_PRIORITY_HIGH, thread_scope,
michael@0 487 PR_JOINABLE_THREAD, 16 * 1024);
michael@0 488 }
michael@0 489
michael@0 490 rv = PR_InitializeNetAddr(PR_IpAddrAny, default_port, &server_address);
michael@0 491 MW_ASSERT(PR_SUCCESS == rv);
michael@0 492
michael@0 493 listener = PR_NewTCPSocket(); MW_ASSERT(NULL != listener);
michael@0 494 if (verbosity > chatty)
michael@0 495 PR_fprintf(
michael@0 496 debug, "%s: Server listener socket @0x%x\n",
michael@0 497 shared->title, listener);
michael@0 498 rv = PR_Bind(listener, &server_address); MW_ASSERT(PR_SUCCESS == rv);
michael@0 499 rv = PR_Listen(listener, 10); MW_ASSERT(PR_SUCCESS == rv);
michael@0 500 while (ops_done < ops_required)
michael@0 501 {
michael@0 502 if (verbosity > quiet)
michael@0 503 PR_fprintf(debug, "%s: Server accepting connection\n", shared->title);
michael@0 504 service = PR_Accept(listener, &client_address, PR_INTERVAL_NO_TIMEOUT);
michael@0 505 if (NULL == service)
michael@0 506 {
michael@0 507 if (PR_PENDING_INTERRUPT_ERROR == PR_GetError()) break;
michael@0 508 PL_PrintError("Accept failed");
michael@0 509 MW_ASSERT(!"Accept failed");
michael@0 510 }
michael@0 511 else
michael@0 512 {
michael@0 513 desc_in = CreateRecvWait(service, shared->timeout);
michael@0 514 desc_in->timeout = PR_INTERVAL_NO_TIMEOUT;
michael@0 515 if (verbosity > chatty)
michael@0 516 PrintRecvDesc(desc_in, "Service adding");
michael@0 517 rv = PR_AddWaitFileDesc(shared->group, desc_in);
michael@0 518 MW_ASSERT(PR_SUCCESS == rv);
michael@0 519 }
michael@0 520 }
michael@0 521
michael@0 522 if (verbosity > quiet)
michael@0 523 PR_fprintf(debug, "%s: Server interrupting worker_threads\n", shared->title);
michael@0 524 for (index = 0; index < worker_threads; ++index)
michael@0 525 {
michael@0 526 rv = PR_Interrupt(worker_thread[index]);
michael@0 527 MW_ASSERT(PR_SUCCESS == rv);
michael@0 528 rv = PR_JoinThread(worker_thread[index]);
michael@0 529 MW_ASSERT(PR_SUCCESS == rv);
michael@0 530 }
michael@0 531 PR_DELETE(worker_thread);
michael@0 532
michael@0 533 PR_Close(listener);
michael@0 534
michael@0 535 CancelGroup(shared);
michael@0 536
michael@0 537 } /* ServerThread */
michael@0 538
michael@0 539 static void RealOneGroupIO(Shared *shared)
michael@0 540 {
michael@0 541 /*
michael@0 542 ** Create a server that listens for connections and then services
michael@0 543 ** requests that come in over those connections. The server never
michael@0 544 ** deletes a connection and assumes a basic RPC model of operation.
michael@0 545 **
michael@0 546 ** Use worker_threads threads to service how every many open ports
michael@0 547 ** there might be.
michael@0 548 **
michael@0 549 ** Oh, ya. Almost forget. Create (some) clients as well.
michael@0 550 */
michael@0 551 PRStatus rv;
michael@0 552 PRIntn index;
michael@0 553 PRThread *server_thread, *enumeration_thread, **client_thread;
michael@0 554
michael@0 555 if (verbosity > quiet)
michael@0 556 PR_fprintf(debug, "%s: creating server_thread\n", shared->title);
michael@0 557
michael@0 558 server_thread = PR_CreateThread(
michael@0 559 PR_USER_THREAD, ServerThread, shared,
michael@0 560 PR_PRIORITY_HIGH, thread_scope,
michael@0 561 PR_JOINABLE_THREAD, 16 * 1024);
michael@0 562
michael@0 563 if (verbosity > quiet)
michael@0 564 PR_fprintf(debug, "%s: creating enumeration_thread\n", shared->title);
michael@0 565
michael@0 566 enumeration_thread = PR_CreateThread(
michael@0 567 PR_USER_THREAD, EnumerationThread, shared,
michael@0 568 PR_PRIORITY_HIGH, thread_scope,
michael@0 569 PR_JOINABLE_THREAD, 16 * 1024);
michael@0 570
michael@0 571 if (verbosity > quiet)
michael@0 572 PR_fprintf(debug, "%s: snoozing before creating clients\n", shared->title);
michael@0 573 PR_Sleep(5 * shared->timeout);
michael@0 574
michael@0 575 if (verbosity > quiet)
michael@0 576 PR_fprintf(debug, "%s: creating client_threads\n", shared->title);
michael@0 577 client_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * client_threads);
michael@0 578 for (index = 0; index < client_threads; ++index)
michael@0 579 {
michael@0 580 client_thread[index] = PR_CreateThread(
michael@0 581 PR_USER_THREAD, ClientThread, shared,
michael@0 582 PR_PRIORITY_NORMAL, thread_scope,
michael@0 583 PR_JOINABLE_THREAD, 16 * 1024);
michael@0 584 }
michael@0 585
michael@0 586 while (ops_done < ops_required) PR_Sleep(shared->timeout);
michael@0 587
michael@0 588 if (verbosity > quiet)
michael@0 589 PR_fprintf(debug, "%s: interrupting/joining client_threads\n", shared->title);
michael@0 590 for (index = 0; index < client_threads; ++index)
michael@0 591 {
michael@0 592 rv = PR_Interrupt(client_thread[index]);
michael@0 593 MW_ASSERT(PR_SUCCESS == rv);
michael@0 594 rv = PR_JoinThread(client_thread[index]);
michael@0 595 MW_ASSERT(PR_SUCCESS == rv);
michael@0 596 }
michael@0 597 PR_DELETE(client_thread);
michael@0 598
michael@0 599 if (verbosity > quiet)
michael@0 600 PR_fprintf(debug, "%s: interrupting/joining enumeration_thread\n", shared->title);
michael@0 601 rv = PR_Interrupt(enumeration_thread);
michael@0 602 MW_ASSERT(PR_SUCCESS == rv);
michael@0 603 rv = PR_JoinThread(enumeration_thread);
michael@0 604 MW_ASSERT(PR_SUCCESS == rv);
michael@0 605
michael@0 606 if (verbosity > quiet)
michael@0 607 PR_fprintf(debug, "%s: interrupting/joining server_thread\n", shared->title);
michael@0 608 rv = PR_Interrupt(server_thread);
michael@0 609 MW_ASSERT(PR_SUCCESS == rv);
michael@0 610 rv = PR_JoinThread(server_thread);
michael@0 611 MW_ASSERT(PR_SUCCESS == rv);
michael@0 612 } /* RealOneGroupIO */
michael@0 613
michael@0 614 static void RunThisOne(
michael@0 615 void (*func)(Shared*), const char *name, const char *test_name)
michael@0 616 {
michael@0 617 Shared *shared;
michael@0 618 if ((NULL == test_name) || (0 == PL_strcmp(name, test_name)))
michael@0 619 {
michael@0 620 if (verbosity > silent)
michael@0 621 PR_fprintf(debug, "%s()\n", name);
michael@0 622 shared = MakeShared(name);
michael@0 623 ops_done = 0;
michael@0 624 func(shared); /* run the test */
michael@0 625 MW_ASSERT(0 == desc_allocated);
michael@0 626 DestroyShared(shared);
michael@0 627 }
michael@0 628 } /* RunThisOne */
michael@0 629
michael@0 630 static Verbosity ChangeVerbosity(Verbosity verbosity, PRIntn delta)
michael@0 631 {
michael@0 632 PRIntn verbage = (PRIntn)verbosity;
michael@0 633 return (Verbosity)(verbage += delta);
michael@0 634 } /* ChangeVerbosity */
michael@0 635
michael@0 636 int main(int argc, char **argv)
michael@0 637 {
michael@0 638 PLOptStatus os;
michael@0 639 const char *test_name = NULL;
michael@0 640 PLOptState *opt = PL_CreateOptState(argc, argv, "dqGc:o:p:t:w:");
michael@0 641
michael@0 642 while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))
michael@0 643 {
michael@0 644 if (PL_OPT_BAD == os) continue;
michael@0 645 switch (opt->option)
michael@0 646 {
michael@0 647 case 0:
michael@0 648 test_name = opt->value;
michael@0 649 break;
michael@0 650 case 'd': /* debug mode */
michael@0 651 if (verbosity < noisy)
michael@0 652 verbosity = ChangeVerbosity(verbosity, 1);
michael@0 653 break;
michael@0 654 case 'q': /* debug mode */
michael@0 655 if (verbosity > silent)
michael@0 656 verbosity = ChangeVerbosity(verbosity, -1);
michael@0 657 break;
michael@0 658 case 'G': /* use global threads */
michael@0 659 thread_scope = PR_GLOBAL_THREAD;
michael@0 660 break;
michael@0 661 case 'c': /* number of client threads */
michael@0 662 client_threads = atoi(opt->value);
michael@0 663 break;
michael@0 664 case 'o': /* operations to compelete */
michael@0 665 ops_required = atoi(opt->value);
michael@0 666 break;
michael@0 667 case 'p': /* default port */
michael@0 668 default_port = atoi(opt->value);
michael@0 669 break;
michael@0 670 case 't': /* number of threads waiting */
michael@0 671 worker_threads = atoi(opt->value);
michael@0 672 break;
michael@0 673 case 'w': /* number of wait objects */
michael@0 674 wait_objects = atoi(opt->value);
michael@0 675 break;
michael@0 676 default:
michael@0 677 break;
michael@0 678 }
michael@0 679 }
michael@0 680 PL_DestroyOptState(opt);
michael@0 681
michael@0 682 if (verbosity > 0)
michael@0 683 debug = PR_GetSpecialFD(PR_StandardError);
michael@0 684
michael@0 685 RunThisOne(OneInThenCancelled, "OneInThenCancelled", test_name);
michael@0 686 RunThisOne(OneOpOneThread, "OneOpOneThread", test_name);
michael@0 687 RunThisOne(ManyOpOneThread, "ManyOpOneThread", test_name);
michael@0 688 RunThisOne(SomeOpsSomeThreads, "SomeOpsSomeThreads", test_name);
michael@0 689 RunThisOne(RealOneGroupIO, "RealOneGroupIO", test_name);
michael@0 690 return 0;
michael@0 691 } /* main */
michael@0 692
michael@0 693 /* multwait.c */

mercurial