nsprpub/pr/tests/multiwait.c

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

     1 /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
     2 /* This Source Code Form is subject to the terms of the Mozilla Public
     3  * License, v. 2.0. If a copy of the MPL was not distributed with this
     4  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
     6 #include "prio.h"
     7 #include "prprf.h"
     8 #include "prlog.h"
     9 #include "prmem.h"
    10 #include "pratom.h"
    11 #include "prlock.h"
    12 #include "prmwait.h"
    13 #include "prclist.h"
    14 #include "prerror.h"
    15 #include "prinrval.h"
    16 #include "prnetdb.h"
    17 #include "prthread.h"
    19 #include "plstr.h"
    20 #include "plerror.h"
    21 #include "plgetopt.h"
    23 #include <string.h>
    25 typedef struct Shared
    26 {
    27     const char *title;
    28     PRLock *list_lock;
    29     PRWaitGroup *group;
    30     PRIntervalTime timeout;
    31 } Shared;
    33 typedef enum Verbosity {silent, quiet, chatty, noisy} Verbosity;
    35 static PRFileDesc *debug = NULL;
    36 static PRInt32 desc_allocated = 0;
    37 static PRUint16 default_port = 12273;
    38 static enum Verbosity verbosity = quiet;
    39 static PRInt32 ops_required = 1000, ops_done = 0;
    40 static PRThreadScope thread_scope = PR_LOCAL_THREAD;
    41 static PRIntn client_threads = 20, worker_threads = 2, wait_objects = 50;
    43 #if defined(DEBUG)
    44 #define MW_ASSERT(_expr) \
    45     ((_expr)?((void)0):_MW_Assert(# _expr,__FILE__,__LINE__))
    46 static void _MW_Assert(const char *s, const char *file, PRIntn ln)
    47 {
    48     if (NULL != debug) PL_FPrintError(debug, NULL);
    49     PR_Assert(s, file, ln);
    50 }  /* _MW_Assert */
    51 #else
    52 #define MW_ASSERT(_expr)
    53 #endif
    55 static void PrintRecvDesc(PRRecvWait *desc, const char *msg)
    56 {
    57     const char *tag[] = {
    58         "PR_MW_INTERRUPT", "PR_MW_TIMEOUT",
    59         "PR_MW_FAILURE", "PR_MW_SUCCESS", "PR_MW_PENDING"};
    60     PR_fprintf(
    61         debug, "%s: PRRecvWait(@0x%x): {fd: 0x%x, outcome: %s, tmo: %u}\n",
    62         msg, desc, desc->fd, tag[desc->outcome + 3], desc->timeout);
    63 }  /* PrintRecvDesc */
    65 static Shared *MakeShared(const char *title)
    66 {
    67     Shared *shared = PR_NEWZAP(Shared);
    68     shared->group = PR_CreateWaitGroup(1);
    69     shared->timeout = PR_SecondsToInterval(1);
    70     shared->list_lock = PR_NewLock();
    71     shared->title = title;
    72     return shared;
    73 }  /* MakeShared */
    75 static void DestroyShared(Shared *shared)
    76 {
    77     PRStatus rv;
    78     if (verbosity > quiet)
    79         PR_fprintf(debug, "%s: destroying group\n", shared->title);
    80     rv = PR_DestroyWaitGroup(shared->group);
    81     MW_ASSERT(PR_SUCCESS == rv);
    82     PR_DestroyLock(shared->list_lock);
    83     PR_DELETE(shared);
    84 }  /* DestroyShared */
    86 static PRRecvWait *CreateRecvWait(PRFileDesc *fd, PRIntervalTime timeout)
    87 {
    88     PRRecvWait *desc_out = PR_NEWZAP(PRRecvWait);
    89     MW_ASSERT(NULL != desc_out);
    91     MW_ASSERT(NULL != fd);
    92     desc_out->fd = fd;
    93     desc_out->timeout = timeout;
    94     desc_out->buffer.length = 120;
    95     desc_out->buffer.start = PR_CALLOC(120);
    97     PR_AtomicIncrement(&desc_allocated);
    99     if (verbosity > chatty)
   100         PrintRecvDesc(desc_out, "Allocated");
   101     return desc_out;
   102 }  /* CreateRecvWait */
   104 static void DestroyRecvWait(PRRecvWait *desc_out)
   105 {
   106     if (verbosity > chatty)
   107         PrintRecvDesc(desc_out, "Destroying");
   108     PR_Close(desc_out->fd);
   109     if (NULL != desc_out->buffer.start)
   110         PR_DELETE(desc_out->buffer.start);
   111     PR_Free(desc_out);
   112     (void)PR_AtomicDecrement(&desc_allocated);
   113 }  /* DestroyRecvWait */
   115 static void CancelGroup(Shared *shared)
   116 {
   117     PRRecvWait *desc_out;
   119     if (verbosity > quiet)
   120         PR_fprintf(debug, "%s Reclaiming wait descriptors\n", shared->title);
   122     do
   123     {
   124         desc_out = PR_CancelWaitGroup(shared->group);
   125         if (NULL != desc_out) DestroyRecvWait(desc_out);
   126     } while (NULL != desc_out);
   128     MW_ASSERT(0 == desc_allocated);
   129     MW_ASSERT(PR_GROUP_EMPTY_ERROR == PR_GetError());
   130 }  /* CancelGroup */
   132 static void PR_CALLBACK ClientThread(void* arg)
   133 {
   134     PRStatus rv;
   135     PRInt32 bytes;
   136     PRIntn empty_flags = 0;
   137     PRNetAddr server_address;
   138     unsigned char buffer[100];
   139     Shared *shared = (Shared*)arg;
   140     PRFileDesc *server = PR_NewTCPSocket();
   141     if ((NULL == server)
   142     && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) return;
   143     MW_ASSERT(NULL != server);
   145     if (verbosity > chatty)
   146         PR_fprintf(debug, "%s: Server socket @0x%x\n", shared->title, server);
   148     /* Initialize the buffer so that Purify won't complain */
   149     memset(buffer, 0, sizeof(buffer));
   151     rv = PR_InitializeNetAddr(PR_IpAddrLoopback, default_port, &server_address);
   152     MW_ASSERT(PR_SUCCESS == rv);
   154     if (verbosity > quiet)
   155         PR_fprintf(debug, "%s: Client opening connection\n", shared->title);
   156     rv = PR_Connect(server, &server_address, PR_INTERVAL_NO_TIMEOUT);
   158     if (PR_FAILURE == rv)
   159     {
   160         if (verbosity > silent) PL_FPrintError(debug, "Client connect failed");
   161         return;
   162     }
   164     while (ops_done < ops_required)
   165     {
   166         bytes = PR_Send(
   167             server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT);
   168         if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break;
   169         MW_ASSERT(sizeof(buffer) == bytes);
   170         if (verbosity > chatty)
   171             PR_fprintf(
   172                 debug, "%s: Client sent %d bytes\n",
   173                 shared->title, sizeof(buffer));
   174         bytes = PR_Recv(
   175             server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT);
   176         if (verbosity > chatty)
   177             PR_fprintf(
   178                 debug, "%s: Client received %d bytes\n",
   179                 shared->title, sizeof(buffer));
   180         if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break;
   181         MW_ASSERT(sizeof(buffer) == bytes);
   182         PR_Sleep(shared->timeout);
   183     }
   184     rv = PR_Close(server);
   185     MW_ASSERT(PR_SUCCESS == rv);
   187 }  /* ClientThread */
   189 static void OneInThenCancelled(Shared *shared)
   190 {
   191     PRStatus rv;
   192     PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait);
   194     shared->timeout = PR_INTERVAL_NO_TIMEOUT;
   196     desc_in->fd = PR_NewTCPSocket();
   197     desc_in->timeout = shared->timeout;
   199     if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc");
   201     rv = PR_AddWaitFileDesc(shared->group, desc_in);
   202     MW_ASSERT(PR_SUCCESS == rv);
   204     if (verbosity > chatty) PrintRecvDesc(desc_in, "Cancelling");
   205     rv = PR_CancelWaitFileDesc(shared->group, desc_in);
   206     MW_ASSERT(PR_SUCCESS == rv);
   208     desc_out = PR_WaitRecvReady(shared->group);
   209     MW_ASSERT(desc_out == desc_in);
   210     MW_ASSERT(PR_MW_INTERRUPT == desc_out->outcome);
   211     MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
   212     if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready");
   214     rv = PR_Close(desc_in->fd);
   215     MW_ASSERT(PR_SUCCESS == rv);
   217     if (verbosity > quiet)
   218         PR_fprintf(debug, "%s: destroying group\n", shared->title);
   220     PR_DELETE(desc_in);
   221 }  /* OneInThenCancelled */
   223 static void OneOpOneThread(Shared *shared)
   224 {
   225     PRStatus rv;
   226     PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait);
   228     desc_in->fd = PR_NewTCPSocket();
   229     desc_in->timeout = shared->timeout;
   231     if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc");
   233     rv = PR_AddWaitFileDesc(shared->group, desc_in);
   234     MW_ASSERT(PR_SUCCESS == rv);
   235     desc_out = PR_WaitRecvReady(shared->group);
   236     MW_ASSERT(desc_out == desc_in);
   237     MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
   238     MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
   239     if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready");
   241     rv = PR_Close(desc_in->fd);
   242     MW_ASSERT(PR_SUCCESS == rv);
   244     PR_DELETE(desc_in);
   245 }  /* OneOpOneThread */
   247 static void ManyOpOneThread(Shared *shared)
   248 {
   249     PRStatus rv;
   250     PRIntn index;
   251     PRRecvWait *desc_in;
   252     PRRecvWait *desc_out;
   254     if (verbosity > quiet)
   255         PR_fprintf(debug, "%s: adding %d descs\n", shared->title, wait_objects);
   257     for (index = 0; index < wait_objects; ++index)
   258     {
   259         desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout);
   261         rv = PR_AddWaitFileDesc(shared->group, desc_in);
   262         MW_ASSERT(PR_SUCCESS == rv);
   263     }
   265     while (ops_done < ops_required)
   266     {
   267         desc_out = PR_WaitRecvReady(shared->group);
   268         MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
   269         MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
   270         if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready/readding");
   271         rv = PR_AddWaitFileDesc(shared->group, desc_out);
   272         MW_ASSERT(PR_SUCCESS == rv);
   273         (void)PR_AtomicIncrement(&ops_done);
   274     }
   276     CancelGroup(shared);
   277 }  /* ManyOpOneThread */
   279 static void PR_CALLBACK SomeOpsThread(void *arg)
   280 {
   281     PRRecvWait *desc_out;
   282     PRStatus rv = PR_SUCCESS;
   283     Shared *shared = (Shared*)arg;
   284     do  /* until interrupted */
   285     {
   286         desc_out = PR_WaitRecvReady(shared->group);
   287         if (NULL == desc_out)
   288         {
   289             MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
   290             if (verbosity > quiet) PR_fprintf(debug, "Aborted\n");
   291             break;
   292         }
   293         MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome);
   294         MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
   295         if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready");
   297         if (verbosity > chatty) PrintRecvDesc(desc_out, "Re-Adding");
   298         desc_out->timeout = shared->timeout;
   299         rv = PR_AddWaitFileDesc(shared->group, desc_out);
   300         PR_AtomicIncrement(&ops_done);
   301         if (ops_done > ops_required) break;
   302     } while (PR_SUCCESS == rv);
   303     MW_ASSERT(PR_SUCCESS == rv);
   304 }  /* SomeOpsThread */
   306 static void SomeOpsSomeThreads(Shared *shared)
   307 {
   308     PRStatus rv;
   309     PRThread **thread;
   310     PRIntn index;
   311     PRRecvWait *desc_in;
   313     thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads);
   315     /* Create some threads */
   317     if (verbosity > quiet)
   318         PR_fprintf(debug, "%s: creating threads\n", shared->title);
   319     for (index = 0; index < worker_threads; ++index)
   320     {
   321         thread[index] = PR_CreateThread(
   322             PR_USER_THREAD, SomeOpsThread, shared,
   323             PR_PRIORITY_HIGH, thread_scope,
   324             PR_JOINABLE_THREAD, 16 * 1024);
   325     }
   327     /* then create some operations */
   328     if (verbosity > quiet)
   329         PR_fprintf(debug, "%s: creating desc\n", shared->title);
   330     for (index = 0; index < wait_objects; ++index)
   331     {
   332         desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout);
   333         rv = PR_AddWaitFileDesc(shared->group, desc_in);
   334         MW_ASSERT(PR_SUCCESS == rv);
   335     }
   337     if (verbosity > quiet)
   338         PR_fprintf(debug, "%s: sleeping\n", shared->title);
   339     while (ops_done < ops_required) PR_Sleep(shared->timeout);
   341     if (verbosity > quiet)
   342         PR_fprintf(debug, "%s: interrupting/joining threads\n", shared->title);
   343     for (index = 0; index < worker_threads; ++index)
   344     {
   345         rv = PR_Interrupt(thread[index]);
   346         MW_ASSERT(PR_SUCCESS == rv);
   347         rv = PR_JoinThread(thread[index]);
   348         MW_ASSERT(PR_SUCCESS == rv);
   349     }
   350     PR_DELETE(thread);
   352     CancelGroup(shared);
   353 }  /* SomeOpsSomeThreads */
   355 static PRStatus ServiceRequest(Shared *shared, PRRecvWait *desc)
   356 {
   357     PRInt32 bytes_out;
   359     if (verbosity > chatty)
   360         PR_fprintf(
   361             debug, "%s: Service received %d bytes\n",
   362             shared->title, desc->bytesRecv);
   364     if (0 == desc->bytesRecv) goto quitting;
   365     if ((-1 == desc->bytesRecv)
   366     && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted;
   368     bytes_out = PR_Send(
   369         desc->fd, desc->buffer.start, desc->bytesRecv, 0, shared->timeout);
   370     if (verbosity > chatty)
   371         PR_fprintf(
   372             debug, "%s: Service sent %d bytes\n",
   373             shared->title, bytes_out);
   375     if ((-1 == bytes_out)
   376     && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted;
   377     MW_ASSERT(bytes_out == desc->bytesRecv);
   379     return PR_SUCCESS;
   381 aborted:
   382 quitting:
   383     return PR_FAILURE;
   384 }  /* ServiceRequest */
   386 static void PR_CALLBACK ServiceThread(void *arg)
   387 {
   388     PRStatus rv = PR_SUCCESS;
   389     PRRecvWait *desc_out = NULL;
   390     Shared *shared = (Shared*)arg;
   391     do  /* until interrupted */
   392     {
   393         if (NULL != desc_out)
   394         {
   395             desc_out->timeout = PR_INTERVAL_NO_TIMEOUT;
   396             if (verbosity > chatty)
   397                 PrintRecvDesc(desc_out, "Service re-adding");
   398             rv = PR_AddWaitFileDesc(shared->group, desc_out);
   399             MW_ASSERT(PR_SUCCESS == rv);
   400         }
   402         desc_out = PR_WaitRecvReady(shared->group);
   403         if (NULL == desc_out)
   404         {
   405             MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
   406             break;
   407         }
   409         switch (desc_out->outcome)
   410         {
   411             case PR_MW_SUCCESS:
   412             {
   413                 PR_AtomicIncrement(&ops_done);
   414                 if (verbosity > chatty)
   415                     PrintRecvDesc(desc_out, "Service ready");
   416                 rv = ServiceRequest(shared, desc_out);
   417                 break;
   418             }
   419             case PR_MW_INTERRUPT:
   420                 MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
   421                 rv = PR_FAILURE;  /* if interrupted, then exit */
   422                 break;
   423             case PR_MW_TIMEOUT:
   424                 MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError());
   425             case PR_MW_FAILURE:
   426                 if (verbosity > silent)
   427                     PL_FPrintError(debug, "RecvReady failure");
   428                 break;
   429             default:
   430                 break;
   431         }
   432     } while (PR_SUCCESS == rv);
   434     if (NULL != desc_out) DestroyRecvWait(desc_out);
   436 }  /* ServiceThread */
   438 static void PR_CALLBACK EnumerationThread(void *arg)
   439 {
   440     PRStatus rv;
   441     PRIntn count;
   442     PRRecvWait *desc;
   443     Shared *shared = (Shared*)arg;
   444     PRIntervalTime five_seconds = PR_SecondsToInterval(5);
   445     PRMWaitEnumerator *enumerator = PR_CreateMWaitEnumerator(shared->group);
   446     MW_ASSERT(NULL != enumerator);
   448     while (PR_SUCCESS == PR_Sleep(five_seconds))
   449     {
   450         count = 0;
   451         desc = NULL;
   452         while (NULL != (desc = PR_EnumerateWaitGroup(enumerator, desc)))
   453         {
   454             if (verbosity > chatty) PrintRecvDesc(desc, shared->title);
   455             count += 1;
   456         }
   457         if (verbosity > silent)
   458             PR_fprintf(debug,
   459                 "%s Enumerated %d objects\n", shared->title, count);
   460     }
   462     MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError());
   465     rv = PR_DestroyMWaitEnumerator(enumerator);
   466     MW_ASSERT(PR_SUCCESS == rv);
   467 }  /* EnumerationThread */
   469 static void PR_CALLBACK ServerThread(void *arg)
   470 {
   471     PRStatus rv;
   472     PRIntn index;
   473     PRRecvWait *desc_in;
   474     PRThread **worker_thread;
   475     Shared *shared = (Shared*)arg;
   476     PRFileDesc *listener, *service;
   477     PRNetAddr server_address, client_address;
   479     worker_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads);
   480     if (verbosity > quiet)
   481         PR_fprintf(debug, "%s: Server creating worker_threads\n", shared->title);
   482     for (index = 0; index < worker_threads; ++index)
   483     {
   484         worker_thread[index] = PR_CreateThread(
   485             PR_USER_THREAD, ServiceThread, shared,
   486             PR_PRIORITY_HIGH, thread_scope,
   487             PR_JOINABLE_THREAD, 16 * 1024);
   488     }
   490     rv = PR_InitializeNetAddr(PR_IpAddrAny, default_port, &server_address);
   491     MW_ASSERT(PR_SUCCESS == rv);
   493     listener = PR_NewTCPSocket(); MW_ASSERT(NULL != listener);
   494     if (verbosity > chatty)
   495         PR_fprintf(
   496             debug, "%s: Server listener socket @0x%x\n",
   497             shared->title, listener);
   498     rv = PR_Bind(listener, &server_address); MW_ASSERT(PR_SUCCESS == rv);
   499     rv = PR_Listen(listener, 10); MW_ASSERT(PR_SUCCESS == rv);
   500     while (ops_done < ops_required)
   501     {
   502         if (verbosity > quiet)
   503             PR_fprintf(debug, "%s: Server accepting connection\n", shared->title);
   504         service = PR_Accept(listener, &client_address, PR_INTERVAL_NO_TIMEOUT);
   505         if (NULL == service)
   506         {
   507             if (PR_PENDING_INTERRUPT_ERROR == PR_GetError()) break;
   508             PL_PrintError("Accept failed");
   509             MW_ASSERT(!"Accept failed");
   510         }
   511         else
   512         {
   513             desc_in = CreateRecvWait(service, shared->timeout);
   514             desc_in->timeout = PR_INTERVAL_NO_TIMEOUT;
   515             if (verbosity > chatty)
   516                 PrintRecvDesc(desc_in, "Service adding");
   517             rv = PR_AddWaitFileDesc(shared->group, desc_in);
   518             MW_ASSERT(PR_SUCCESS == rv);
   519         }
   520     }
   522     if (verbosity > quiet)
   523         PR_fprintf(debug, "%s: Server interrupting worker_threads\n", shared->title);
   524     for (index = 0; index < worker_threads; ++index)
   525     {
   526         rv = PR_Interrupt(worker_thread[index]);
   527         MW_ASSERT(PR_SUCCESS == rv);
   528         rv = PR_JoinThread(worker_thread[index]);
   529         MW_ASSERT(PR_SUCCESS == rv);
   530     }
   531     PR_DELETE(worker_thread);
   533     PR_Close(listener);
   535     CancelGroup(shared);
   537 }  /* ServerThread */
   539 static void RealOneGroupIO(Shared *shared)
   540 {
   541     /*
   542     ** Create a server that listens for connections and then services
   543     ** requests that come in over those connections. The server never
   544     ** deletes a connection and assumes a basic RPC model of operation.
   545     **
   546     ** Use worker_threads threads to service how every many open ports
   547     ** there might be.
   548     **
   549     ** Oh, ya. Almost forget. Create (some) clients as well.
   550     */
   551     PRStatus rv;
   552     PRIntn index;
   553     PRThread *server_thread, *enumeration_thread, **client_thread;
   555     if (verbosity > quiet)
   556         PR_fprintf(debug, "%s: creating server_thread\n", shared->title);
   558     server_thread = PR_CreateThread(
   559         PR_USER_THREAD, ServerThread, shared,
   560         PR_PRIORITY_HIGH, thread_scope,
   561         PR_JOINABLE_THREAD, 16 * 1024);
   563     if (verbosity > quiet)
   564         PR_fprintf(debug, "%s: creating enumeration_thread\n", shared->title);
   566     enumeration_thread = PR_CreateThread(
   567         PR_USER_THREAD, EnumerationThread, shared,
   568         PR_PRIORITY_HIGH, thread_scope,
   569         PR_JOINABLE_THREAD, 16 * 1024);
   571     if (verbosity > quiet)
   572         PR_fprintf(debug, "%s: snoozing before creating clients\n", shared->title);
   573     PR_Sleep(5 * shared->timeout);
   575     if (verbosity > quiet)
   576         PR_fprintf(debug, "%s: creating client_threads\n", shared->title);
   577     client_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * client_threads);
   578     for (index = 0; index < client_threads; ++index)
   579     {
   580         client_thread[index] = PR_CreateThread(
   581             PR_USER_THREAD, ClientThread, shared,
   582             PR_PRIORITY_NORMAL, thread_scope,
   583             PR_JOINABLE_THREAD, 16 * 1024);
   584     }
   586     while (ops_done < ops_required) PR_Sleep(shared->timeout);
   588     if (verbosity > quiet)
   589         PR_fprintf(debug, "%s: interrupting/joining client_threads\n", shared->title);
   590     for (index = 0; index < client_threads; ++index)
   591     {
   592         rv = PR_Interrupt(client_thread[index]);
   593         MW_ASSERT(PR_SUCCESS == rv);
   594         rv = PR_JoinThread(client_thread[index]);
   595         MW_ASSERT(PR_SUCCESS == rv);
   596     }
   597     PR_DELETE(client_thread);
   599     if (verbosity > quiet)
   600         PR_fprintf(debug, "%s: interrupting/joining enumeration_thread\n", shared->title);
   601     rv = PR_Interrupt(enumeration_thread);
   602     MW_ASSERT(PR_SUCCESS == rv);
   603     rv = PR_JoinThread(enumeration_thread);
   604     MW_ASSERT(PR_SUCCESS == rv);
   606     if (verbosity > quiet)
   607         PR_fprintf(debug, "%s: interrupting/joining server_thread\n", shared->title);
   608     rv = PR_Interrupt(server_thread);
   609     MW_ASSERT(PR_SUCCESS == rv);
   610     rv = PR_JoinThread(server_thread);
   611     MW_ASSERT(PR_SUCCESS == rv);
   612 }  /* RealOneGroupIO */
   614 static void RunThisOne(
   615     void (*func)(Shared*), const char *name, const char *test_name)
   616 {
   617     Shared *shared;
   618     if ((NULL == test_name) || (0 == PL_strcmp(name, test_name)))
   619     {
   620         if (verbosity > silent)
   621             PR_fprintf(debug, "%s()\n", name);
   622         shared = MakeShared(name);
   623         ops_done = 0;
   624         func(shared);  /* run the test */
   625         MW_ASSERT(0 == desc_allocated);
   626         DestroyShared(shared);
   627     }
   628 }  /* RunThisOne */
   630 static Verbosity ChangeVerbosity(Verbosity verbosity, PRIntn delta)
   631 {
   632     PRIntn verbage = (PRIntn)verbosity;
   633     return (Verbosity)(verbage += delta);
   634 }  /* ChangeVerbosity */
   636 int main(int argc, char **argv)
   637 {
   638     PLOptStatus os;
   639     const char *test_name = NULL;
   640     PLOptState *opt = PL_CreateOptState(argc, argv, "dqGc:o:p:t:w:");
   642     while (PL_OPT_EOL != (os = PL_GetNextOpt(opt)))
   643     {
   644         if (PL_OPT_BAD == os) continue;
   645         switch (opt->option)
   646         {
   647         case 0:
   648             test_name = opt->value;
   649             break;
   650         case 'd':  /* debug mode */
   651             if (verbosity < noisy)
   652                 verbosity = ChangeVerbosity(verbosity, 1);
   653             break;
   654         case 'q':  /* debug mode */
   655             if (verbosity > silent)
   656                 verbosity = ChangeVerbosity(verbosity, -1);
   657             break;
   658         case 'G':  /* use global threads */
   659             thread_scope = PR_GLOBAL_THREAD;
   660             break;
   661         case 'c':  /* number of client threads */
   662             client_threads = atoi(opt->value);
   663             break;
   664         case 'o':  /* operations to compelete */
   665             ops_required = atoi(opt->value);
   666             break;
   667         case 'p':  /* default port */
   668             default_port = atoi(opt->value);
   669             break;
   670         case 't':  /* number of threads waiting */
   671             worker_threads = atoi(opt->value);
   672             break;
   673         case 'w':  /* number of wait objects */
   674             wait_objects = atoi(opt->value);
   675             break;
   676         default:
   677             break;
   678         }
   679     }
   680     PL_DestroyOptState(opt);
   682     if (verbosity > 0)
   683         debug = PR_GetSpecialFD(PR_StandardError);
   685     RunThisOne(OneInThenCancelled, "OneInThenCancelled", test_name);
   686     RunThisOne(OneOpOneThread, "OneOpOneThread", test_name);
   687     RunThisOne(ManyOpOneThread, "ManyOpOneThread", test_name);
   688     RunThisOne(SomeOpsSomeThreads, "SomeOpsSomeThreads", test_name);
   689     RunThisOne(RealOneGroupIO, "RealOneGroupIO", test_name);
   690     return 0;
   691 }  /* main */
   693 /* multwait.c */

mercurial