michael@0: /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: #include "prio.h" michael@0: #include "prprf.h" michael@0: #include "prlog.h" michael@0: #include "prmem.h" michael@0: #include "pratom.h" michael@0: #include "prlock.h" michael@0: #include "prmwait.h" michael@0: #include "prclist.h" michael@0: #include "prerror.h" michael@0: #include "prinrval.h" michael@0: #include "prnetdb.h" michael@0: #include "prthread.h" michael@0: michael@0: #include "plstr.h" michael@0: #include "plerror.h" michael@0: #include "plgetopt.h" michael@0: michael@0: #include michael@0: michael@0: typedef struct Shared michael@0: { michael@0: const char *title; michael@0: PRLock *list_lock; michael@0: PRWaitGroup *group; michael@0: PRIntervalTime timeout; michael@0: } Shared; michael@0: michael@0: typedef enum Verbosity {silent, quiet, chatty, noisy} Verbosity; michael@0: michael@0: static PRFileDesc *debug = NULL; michael@0: static PRInt32 desc_allocated = 0; michael@0: static PRUint16 default_port = 12273; michael@0: static enum Verbosity verbosity = quiet; michael@0: static PRInt32 ops_required = 1000, ops_done = 0; michael@0: static PRThreadScope thread_scope = PR_LOCAL_THREAD; michael@0: static PRIntn client_threads = 20, worker_threads = 2, wait_objects = 50; michael@0: michael@0: #if defined(DEBUG) michael@0: #define MW_ASSERT(_expr) \ michael@0: ((_expr)?((void)0):_MW_Assert(# _expr,__FILE__,__LINE__)) michael@0: static void _MW_Assert(const char *s, const char *file, PRIntn ln) michael@0: { michael@0: if (NULL != debug) PL_FPrintError(debug, NULL); michael@0: PR_Assert(s, file, ln); michael@0: } /* _MW_Assert */ michael@0: #else michael@0: #define MW_ASSERT(_expr) michael@0: #endif michael@0: michael@0: static void PrintRecvDesc(PRRecvWait *desc, const char *msg) michael@0: { michael@0: const char *tag[] = { michael@0: "PR_MW_INTERRUPT", "PR_MW_TIMEOUT", michael@0: "PR_MW_FAILURE", "PR_MW_SUCCESS", "PR_MW_PENDING"}; michael@0: PR_fprintf( michael@0: debug, "%s: PRRecvWait(@0x%x): {fd: 0x%x, outcome: %s, tmo: %u}\n", michael@0: msg, desc, desc->fd, tag[desc->outcome + 3], desc->timeout); michael@0: } /* PrintRecvDesc */ michael@0: michael@0: static Shared *MakeShared(const char *title) michael@0: { michael@0: Shared *shared = PR_NEWZAP(Shared); michael@0: shared->group = PR_CreateWaitGroup(1); michael@0: shared->timeout = PR_SecondsToInterval(1); michael@0: shared->list_lock = PR_NewLock(); michael@0: shared->title = title; michael@0: return shared; michael@0: } /* MakeShared */ michael@0: michael@0: static void DestroyShared(Shared *shared) michael@0: { michael@0: PRStatus rv; michael@0: if (verbosity > quiet) michael@0: PR_fprintf(debug, "%s: destroying group\n", shared->title); michael@0: rv = PR_DestroyWaitGroup(shared->group); michael@0: MW_ASSERT(PR_SUCCESS == rv); michael@0: PR_DestroyLock(shared->list_lock); michael@0: PR_DELETE(shared); michael@0: } /* DestroyShared */ michael@0: michael@0: static PRRecvWait *CreateRecvWait(PRFileDesc *fd, PRIntervalTime timeout) michael@0: { michael@0: PRRecvWait *desc_out = PR_NEWZAP(PRRecvWait); michael@0: MW_ASSERT(NULL != desc_out); michael@0: michael@0: MW_ASSERT(NULL != fd); michael@0: desc_out->fd = fd; michael@0: desc_out->timeout = timeout; michael@0: desc_out->buffer.length = 120; michael@0: desc_out->buffer.start = PR_CALLOC(120); michael@0: michael@0: PR_AtomicIncrement(&desc_allocated); michael@0: michael@0: if (verbosity > chatty) michael@0: PrintRecvDesc(desc_out, "Allocated"); michael@0: return desc_out; michael@0: } /* CreateRecvWait */ michael@0: michael@0: static void DestroyRecvWait(PRRecvWait *desc_out) michael@0: { michael@0: if (verbosity > chatty) michael@0: PrintRecvDesc(desc_out, "Destroying"); michael@0: PR_Close(desc_out->fd); michael@0: if (NULL != desc_out->buffer.start) michael@0: PR_DELETE(desc_out->buffer.start); michael@0: PR_Free(desc_out); michael@0: (void)PR_AtomicDecrement(&desc_allocated); michael@0: } /* DestroyRecvWait */ michael@0: michael@0: static void CancelGroup(Shared *shared) michael@0: { michael@0: PRRecvWait *desc_out; michael@0: michael@0: if (verbosity > quiet) michael@0: PR_fprintf(debug, "%s Reclaiming wait descriptors\n", shared->title); michael@0: michael@0: do michael@0: { michael@0: desc_out = PR_CancelWaitGroup(shared->group); michael@0: if (NULL != desc_out) DestroyRecvWait(desc_out); michael@0: } while (NULL != desc_out); michael@0: michael@0: MW_ASSERT(0 == desc_allocated); michael@0: MW_ASSERT(PR_GROUP_EMPTY_ERROR == PR_GetError()); michael@0: } /* CancelGroup */ michael@0: michael@0: static void PR_CALLBACK ClientThread(void* arg) michael@0: { michael@0: PRStatus rv; michael@0: PRInt32 bytes; michael@0: PRIntn empty_flags = 0; michael@0: PRNetAddr server_address; michael@0: unsigned char buffer[100]; michael@0: Shared *shared = (Shared*)arg; michael@0: PRFileDesc *server = PR_NewTCPSocket(); michael@0: if ((NULL == server) michael@0: && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) return; michael@0: MW_ASSERT(NULL != server); michael@0: michael@0: if (verbosity > chatty) michael@0: PR_fprintf(debug, "%s: Server socket @0x%x\n", shared->title, server); michael@0: michael@0: /* Initialize the buffer so that Purify won't complain */ michael@0: memset(buffer, 0, sizeof(buffer)); michael@0: michael@0: rv = PR_InitializeNetAddr(PR_IpAddrLoopback, default_port, &server_address); michael@0: MW_ASSERT(PR_SUCCESS == rv); michael@0: michael@0: if (verbosity > quiet) michael@0: PR_fprintf(debug, "%s: Client opening connection\n", shared->title); michael@0: rv = PR_Connect(server, &server_address, PR_INTERVAL_NO_TIMEOUT); michael@0: michael@0: if (PR_FAILURE == rv) michael@0: { michael@0: if (verbosity > silent) PL_FPrintError(debug, "Client connect failed"); michael@0: return; michael@0: } michael@0: michael@0: while (ops_done < ops_required) michael@0: { michael@0: bytes = PR_Send( michael@0: server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT); michael@0: if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break; michael@0: MW_ASSERT(sizeof(buffer) == bytes); michael@0: if (verbosity > chatty) michael@0: PR_fprintf( michael@0: debug, "%s: Client sent %d bytes\n", michael@0: shared->title, sizeof(buffer)); michael@0: bytes = PR_Recv( michael@0: server, buffer, sizeof(buffer), empty_flags, PR_INTERVAL_NO_TIMEOUT); michael@0: if (verbosity > chatty) michael@0: PR_fprintf( michael@0: debug, "%s: Client received %d bytes\n", michael@0: shared->title, sizeof(buffer)); michael@0: if ((-1 == bytes) && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) break; michael@0: MW_ASSERT(sizeof(buffer) == bytes); michael@0: PR_Sleep(shared->timeout); michael@0: } michael@0: rv = PR_Close(server); michael@0: MW_ASSERT(PR_SUCCESS == rv); michael@0: michael@0: } /* ClientThread */ michael@0: michael@0: static void OneInThenCancelled(Shared *shared) michael@0: { michael@0: PRStatus rv; michael@0: PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait); michael@0: michael@0: shared->timeout = PR_INTERVAL_NO_TIMEOUT; michael@0: michael@0: desc_in->fd = PR_NewTCPSocket(); michael@0: desc_in->timeout = shared->timeout; michael@0: michael@0: if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc"); michael@0: michael@0: rv = PR_AddWaitFileDesc(shared->group, desc_in); michael@0: MW_ASSERT(PR_SUCCESS == rv); michael@0: michael@0: if (verbosity > chatty) PrintRecvDesc(desc_in, "Cancelling"); michael@0: rv = PR_CancelWaitFileDesc(shared->group, desc_in); michael@0: MW_ASSERT(PR_SUCCESS == rv); michael@0: michael@0: desc_out = PR_WaitRecvReady(shared->group); michael@0: MW_ASSERT(desc_out == desc_in); michael@0: MW_ASSERT(PR_MW_INTERRUPT == desc_out->outcome); michael@0: MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); michael@0: if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready"); michael@0: michael@0: rv = PR_Close(desc_in->fd); michael@0: MW_ASSERT(PR_SUCCESS == rv); michael@0: michael@0: if (verbosity > quiet) michael@0: PR_fprintf(debug, "%s: destroying group\n", shared->title); michael@0: michael@0: PR_DELETE(desc_in); michael@0: } /* OneInThenCancelled */ michael@0: michael@0: static void OneOpOneThread(Shared *shared) michael@0: { michael@0: PRStatus rv; michael@0: PRRecvWait *desc_out, *desc_in = PR_NEWZAP(PRRecvWait); michael@0: michael@0: desc_in->fd = PR_NewTCPSocket(); michael@0: desc_in->timeout = shared->timeout; michael@0: michael@0: if (verbosity > chatty) PrintRecvDesc(desc_in, "Adding desc"); michael@0: michael@0: rv = PR_AddWaitFileDesc(shared->group, desc_in); michael@0: MW_ASSERT(PR_SUCCESS == rv); michael@0: desc_out = PR_WaitRecvReady(shared->group); michael@0: MW_ASSERT(desc_out == desc_in); michael@0: MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome); michael@0: MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError()); michael@0: if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready"); michael@0: michael@0: rv = PR_Close(desc_in->fd); michael@0: MW_ASSERT(PR_SUCCESS == rv); michael@0: michael@0: PR_DELETE(desc_in); michael@0: } /* OneOpOneThread */ michael@0: michael@0: static void ManyOpOneThread(Shared *shared) michael@0: { michael@0: PRStatus rv; michael@0: PRIntn index; michael@0: PRRecvWait *desc_in; michael@0: PRRecvWait *desc_out; michael@0: michael@0: if (verbosity > quiet) michael@0: PR_fprintf(debug, "%s: adding %d descs\n", shared->title, wait_objects); michael@0: michael@0: for (index = 0; index < wait_objects; ++index) michael@0: { michael@0: desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout); michael@0: michael@0: rv = PR_AddWaitFileDesc(shared->group, desc_in); michael@0: MW_ASSERT(PR_SUCCESS == rv); michael@0: } michael@0: michael@0: while (ops_done < ops_required) michael@0: { michael@0: desc_out = PR_WaitRecvReady(shared->group); michael@0: MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome); michael@0: MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError()); michael@0: if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready/readding"); michael@0: rv = PR_AddWaitFileDesc(shared->group, desc_out); michael@0: MW_ASSERT(PR_SUCCESS == rv); michael@0: (void)PR_AtomicIncrement(&ops_done); michael@0: } michael@0: michael@0: CancelGroup(shared); michael@0: } /* ManyOpOneThread */ michael@0: michael@0: static void PR_CALLBACK SomeOpsThread(void *arg) michael@0: { michael@0: PRRecvWait *desc_out; michael@0: PRStatus rv = PR_SUCCESS; michael@0: Shared *shared = (Shared*)arg; michael@0: do /* until interrupted */ michael@0: { michael@0: desc_out = PR_WaitRecvReady(shared->group); michael@0: if (NULL == desc_out) michael@0: { michael@0: MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); michael@0: if (verbosity > quiet) PR_fprintf(debug, "Aborted\n"); michael@0: break; michael@0: } michael@0: MW_ASSERT(PR_MW_TIMEOUT == desc_out->outcome); michael@0: MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError()); michael@0: if (verbosity > chatty) PrintRecvDesc(desc_out, "Ready"); michael@0: michael@0: if (verbosity > chatty) PrintRecvDesc(desc_out, "Re-Adding"); michael@0: desc_out->timeout = shared->timeout; michael@0: rv = PR_AddWaitFileDesc(shared->group, desc_out); michael@0: PR_AtomicIncrement(&ops_done); michael@0: if (ops_done > ops_required) break; michael@0: } while (PR_SUCCESS == rv); michael@0: MW_ASSERT(PR_SUCCESS == rv); michael@0: } /* SomeOpsThread */ michael@0: michael@0: static void SomeOpsSomeThreads(Shared *shared) michael@0: { michael@0: PRStatus rv; michael@0: PRThread **thread; michael@0: PRIntn index; michael@0: PRRecvWait *desc_in; michael@0: michael@0: thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads); michael@0: michael@0: /* Create some threads */ michael@0: michael@0: if (verbosity > quiet) michael@0: PR_fprintf(debug, "%s: creating threads\n", shared->title); michael@0: for (index = 0; index < worker_threads; ++index) michael@0: { michael@0: thread[index] = PR_CreateThread( michael@0: PR_USER_THREAD, SomeOpsThread, shared, michael@0: PR_PRIORITY_HIGH, thread_scope, michael@0: PR_JOINABLE_THREAD, 16 * 1024); michael@0: } michael@0: michael@0: /* then create some operations */ michael@0: if (verbosity > quiet) michael@0: PR_fprintf(debug, "%s: creating desc\n", shared->title); michael@0: for (index = 0; index < wait_objects; ++index) michael@0: { michael@0: desc_in = CreateRecvWait(PR_NewTCPSocket(), shared->timeout); michael@0: rv = PR_AddWaitFileDesc(shared->group, desc_in); michael@0: MW_ASSERT(PR_SUCCESS == rv); michael@0: } michael@0: michael@0: if (verbosity > quiet) michael@0: PR_fprintf(debug, "%s: sleeping\n", shared->title); michael@0: while (ops_done < ops_required) PR_Sleep(shared->timeout); michael@0: michael@0: if (verbosity > quiet) michael@0: PR_fprintf(debug, "%s: interrupting/joining threads\n", shared->title); michael@0: for (index = 0; index < worker_threads; ++index) michael@0: { michael@0: rv = PR_Interrupt(thread[index]); michael@0: MW_ASSERT(PR_SUCCESS == rv); michael@0: rv = PR_JoinThread(thread[index]); michael@0: MW_ASSERT(PR_SUCCESS == rv); michael@0: } michael@0: PR_DELETE(thread); michael@0: michael@0: CancelGroup(shared); michael@0: } /* SomeOpsSomeThreads */ michael@0: michael@0: static PRStatus ServiceRequest(Shared *shared, PRRecvWait *desc) michael@0: { michael@0: PRInt32 bytes_out; michael@0: michael@0: if (verbosity > chatty) michael@0: PR_fprintf( michael@0: debug, "%s: Service received %d bytes\n", michael@0: shared->title, desc->bytesRecv); michael@0: michael@0: if (0 == desc->bytesRecv) goto quitting; michael@0: if ((-1 == desc->bytesRecv) michael@0: && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted; michael@0: michael@0: bytes_out = PR_Send( michael@0: desc->fd, desc->buffer.start, desc->bytesRecv, 0, shared->timeout); michael@0: if (verbosity > chatty) michael@0: PR_fprintf( michael@0: debug, "%s: Service sent %d bytes\n", michael@0: shared->title, bytes_out); michael@0: michael@0: if ((-1 == bytes_out) michael@0: && (PR_PENDING_INTERRUPT_ERROR == PR_GetError())) goto aborted; michael@0: MW_ASSERT(bytes_out == desc->bytesRecv); michael@0: michael@0: return PR_SUCCESS; michael@0: michael@0: aborted: michael@0: quitting: michael@0: return PR_FAILURE; michael@0: } /* ServiceRequest */ michael@0: michael@0: static void PR_CALLBACK ServiceThread(void *arg) michael@0: { michael@0: PRStatus rv = PR_SUCCESS; michael@0: PRRecvWait *desc_out = NULL; michael@0: Shared *shared = (Shared*)arg; michael@0: do /* until interrupted */ michael@0: { michael@0: if (NULL != desc_out) michael@0: { michael@0: desc_out->timeout = PR_INTERVAL_NO_TIMEOUT; michael@0: if (verbosity > chatty) michael@0: PrintRecvDesc(desc_out, "Service re-adding"); michael@0: rv = PR_AddWaitFileDesc(shared->group, desc_out); michael@0: MW_ASSERT(PR_SUCCESS == rv); michael@0: } michael@0: michael@0: desc_out = PR_WaitRecvReady(shared->group); michael@0: if (NULL == desc_out) michael@0: { michael@0: MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); michael@0: break; michael@0: } michael@0: michael@0: switch (desc_out->outcome) michael@0: { michael@0: case PR_MW_SUCCESS: michael@0: { michael@0: PR_AtomicIncrement(&ops_done); michael@0: if (verbosity > chatty) michael@0: PrintRecvDesc(desc_out, "Service ready"); michael@0: rv = ServiceRequest(shared, desc_out); michael@0: break; michael@0: } michael@0: case PR_MW_INTERRUPT: michael@0: MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); michael@0: rv = PR_FAILURE; /* if interrupted, then exit */ michael@0: break; michael@0: case PR_MW_TIMEOUT: michael@0: MW_ASSERT(PR_IO_TIMEOUT_ERROR == PR_GetError()); michael@0: case PR_MW_FAILURE: michael@0: if (verbosity > silent) michael@0: PL_FPrintError(debug, "RecvReady failure"); michael@0: break; michael@0: default: michael@0: break; michael@0: } michael@0: } while (PR_SUCCESS == rv); michael@0: michael@0: if (NULL != desc_out) DestroyRecvWait(desc_out); michael@0: michael@0: } /* ServiceThread */ michael@0: michael@0: static void PR_CALLBACK EnumerationThread(void *arg) michael@0: { michael@0: PRStatus rv; michael@0: PRIntn count; michael@0: PRRecvWait *desc; michael@0: Shared *shared = (Shared*)arg; michael@0: PRIntervalTime five_seconds = PR_SecondsToInterval(5); michael@0: PRMWaitEnumerator *enumerator = PR_CreateMWaitEnumerator(shared->group); michael@0: MW_ASSERT(NULL != enumerator); michael@0: michael@0: while (PR_SUCCESS == PR_Sleep(five_seconds)) michael@0: { michael@0: count = 0; michael@0: desc = NULL; michael@0: while (NULL != (desc = PR_EnumerateWaitGroup(enumerator, desc))) michael@0: { michael@0: if (verbosity > chatty) PrintRecvDesc(desc, shared->title); michael@0: count += 1; michael@0: } michael@0: if (verbosity > silent) michael@0: PR_fprintf(debug, michael@0: "%s Enumerated %d objects\n", shared->title, count); michael@0: } michael@0: michael@0: MW_ASSERT(PR_PENDING_INTERRUPT_ERROR == PR_GetError()); michael@0: michael@0: michael@0: rv = PR_DestroyMWaitEnumerator(enumerator); michael@0: MW_ASSERT(PR_SUCCESS == rv); michael@0: } /* EnumerationThread */ michael@0: michael@0: static void PR_CALLBACK ServerThread(void *arg) michael@0: { michael@0: PRStatus rv; michael@0: PRIntn index; michael@0: PRRecvWait *desc_in; michael@0: PRThread **worker_thread; michael@0: Shared *shared = (Shared*)arg; michael@0: PRFileDesc *listener, *service; michael@0: PRNetAddr server_address, client_address; michael@0: michael@0: worker_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * worker_threads); michael@0: if (verbosity > quiet) michael@0: PR_fprintf(debug, "%s: Server creating worker_threads\n", shared->title); michael@0: for (index = 0; index < worker_threads; ++index) michael@0: { michael@0: worker_thread[index] = PR_CreateThread( michael@0: PR_USER_THREAD, ServiceThread, shared, michael@0: PR_PRIORITY_HIGH, thread_scope, michael@0: PR_JOINABLE_THREAD, 16 * 1024); michael@0: } michael@0: michael@0: rv = PR_InitializeNetAddr(PR_IpAddrAny, default_port, &server_address); michael@0: MW_ASSERT(PR_SUCCESS == rv); michael@0: michael@0: listener = PR_NewTCPSocket(); MW_ASSERT(NULL != listener); michael@0: if (verbosity > chatty) michael@0: PR_fprintf( michael@0: debug, "%s: Server listener socket @0x%x\n", michael@0: shared->title, listener); michael@0: rv = PR_Bind(listener, &server_address); MW_ASSERT(PR_SUCCESS == rv); michael@0: rv = PR_Listen(listener, 10); MW_ASSERT(PR_SUCCESS == rv); michael@0: while (ops_done < ops_required) michael@0: { michael@0: if (verbosity > quiet) michael@0: PR_fprintf(debug, "%s: Server accepting connection\n", shared->title); michael@0: service = PR_Accept(listener, &client_address, PR_INTERVAL_NO_TIMEOUT); michael@0: if (NULL == service) michael@0: { michael@0: if (PR_PENDING_INTERRUPT_ERROR == PR_GetError()) break; michael@0: PL_PrintError("Accept failed"); michael@0: MW_ASSERT(!"Accept failed"); michael@0: } michael@0: else michael@0: { michael@0: desc_in = CreateRecvWait(service, shared->timeout); michael@0: desc_in->timeout = PR_INTERVAL_NO_TIMEOUT; michael@0: if (verbosity > chatty) michael@0: PrintRecvDesc(desc_in, "Service adding"); michael@0: rv = PR_AddWaitFileDesc(shared->group, desc_in); michael@0: MW_ASSERT(PR_SUCCESS == rv); michael@0: } michael@0: } michael@0: michael@0: if (verbosity > quiet) michael@0: PR_fprintf(debug, "%s: Server interrupting worker_threads\n", shared->title); michael@0: for (index = 0; index < worker_threads; ++index) michael@0: { michael@0: rv = PR_Interrupt(worker_thread[index]); michael@0: MW_ASSERT(PR_SUCCESS == rv); michael@0: rv = PR_JoinThread(worker_thread[index]); michael@0: MW_ASSERT(PR_SUCCESS == rv); michael@0: } michael@0: PR_DELETE(worker_thread); michael@0: michael@0: PR_Close(listener); michael@0: michael@0: CancelGroup(shared); michael@0: michael@0: } /* ServerThread */ michael@0: michael@0: static void RealOneGroupIO(Shared *shared) michael@0: { michael@0: /* michael@0: ** Create a server that listens for connections and then services michael@0: ** requests that come in over those connections. The server never michael@0: ** deletes a connection and assumes a basic RPC model of operation. michael@0: ** michael@0: ** Use worker_threads threads to service how every many open ports michael@0: ** there might be. michael@0: ** michael@0: ** Oh, ya. Almost forget. Create (some) clients as well. michael@0: */ michael@0: PRStatus rv; michael@0: PRIntn index; michael@0: PRThread *server_thread, *enumeration_thread, **client_thread; michael@0: michael@0: if (verbosity > quiet) michael@0: PR_fprintf(debug, "%s: creating server_thread\n", shared->title); michael@0: michael@0: server_thread = PR_CreateThread( michael@0: PR_USER_THREAD, ServerThread, shared, michael@0: PR_PRIORITY_HIGH, thread_scope, michael@0: PR_JOINABLE_THREAD, 16 * 1024); michael@0: michael@0: if (verbosity > quiet) michael@0: PR_fprintf(debug, "%s: creating enumeration_thread\n", shared->title); michael@0: michael@0: enumeration_thread = PR_CreateThread( michael@0: PR_USER_THREAD, EnumerationThread, shared, michael@0: PR_PRIORITY_HIGH, thread_scope, michael@0: PR_JOINABLE_THREAD, 16 * 1024); michael@0: michael@0: if (verbosity > quiet) michael@0: PR_fprintf(debug, "%s: snoozing before creating clients\n", shared->title); michael@0: PR_Sleep(5 * shared->timeout); michael@0: michael@0: if (verbosity > quiet) michael@0: PR_fprintf(debug, "%s: creating client_threads\n", shared->title); michael@0: client_thread = (PRThread**)PR_CALLOC(sizeof(PRThread*) * client_threads); michael@0: for (index = 0; index < client_threads; ++index) michael@0: { michael@0: client_thread[index] = PR_CreateThread( michael@0: PR_USER_THREAD, ClientThread, shared, michael@0: PR_PRIORITY_NORMAL, thread_scope, michael@0: PR_JOINABLE_THREAD, 16 * 1024); michael@0: } michael@0: michael@0: while (ops_done < ops_required) PR_Sleep(shared->timeout); michael@0: michael@0: if (verbosity > quiet) michael@0: PR_fprintf(debug, "%s: interrupting/joining client_threads\n", shared->title); michael@0: for (index = 0; index < client_threads; ++index) michael@0: { michael@0: rv = PR_Interrupt(client_thread[index]); michael@0: MW_ASSERT(PR_SUCCESS == rv); michael@0: rv = PR_JoinThread(client_thread[index]); michael@0: MW_ASSERT(PR_SUCCESS == rv); michael@0: } michael@0: PR_DELETE(client_thread); michael@0: michael@0: if (verbosity > quiet) michael@0: PR_fprintf(debug, "%s: interrupting/joining enumeration_thread\n", shared->title); michael@0: rv = PR_Interrupt(enumeration_thread); michael@0: MW_ASSERT(PR_SUCCESS == rv); michael@0: rv = PR_JoinThread(enumeration_thread); michael@0: MW_ASSERT(PR_SUCCESS == rv); michael@0: michael@0: if (verbosity > quiet) michael@0: PR_fprintf(debug, "%s: interrupting/joining server_thread\n", shared->title); michael@0: rv = PR_Interrupt(server_thread); michael@0: MW_ASSERT(PR_SUCCESS == rv); michael@0: rv = PR_JoinThread(server_thread); michael@0: MW_ASSERT(PR_SUCCESS == rv); michael@0: } /* RealOneGroupIO */ michael@0: michael@0: static void RunThisOne( michael@0: void (*func)(Shared*), const char *name, const char *test_name) michael@0: { michael@0: Shared *shared; michael@0: if ((NULL == test_name) || (0 == PL_strcmp(name, test_name))) michael@0: { michael@0: if (verbosity > silent) michael@0: PR_fprintf(debug, "%s()\n", name); michael@0: shared = MakeShared(name); michael@0: ops_done = 0; michael@0: func(shared); /* run the test */ michael@0: MW_ASSERT(0 == desc_allocated); michael@0: DestroyShared(shared); michael@0: } michael@0: } /* RunThisOne */ michael@0: michael@0: static Verbosity ChangeVerbosity(Verbosity verbosity, PRIntn delta) michael@0: { michael@0: PRIntn verbage = (PRIntn)verbosity; michael@0: return (Verbosity)(verbage += delta); michael@0: } /* ChangeVerbosity */ michael@0: michael@0: int main(int argc, char **argv) michael@0: { michael@0: PLOptStatus os; michael@0: const char *test_name = NULL; michael@0: PLOptState *opt = PL_CreateOptState(argc, argv, "dqGc:o:p:t:w:"); michael@0: michael@0: while (PL_OPT_EOL != (os = PL_GetNextOpt(opt))) michael@0: { michael@0: if (PL_OPT_BAD == os) continue; michael@0: switch (opt->option) michael@0: { michael@0: case 0: michael@0: test_name = opt->value; michael@0: break; michael@0: case 'd': /* debug mode */ michael@0: if (verbosity < noisy) michael@0: verbosity = ChangeVerbosity(verbosity, 1); michael@0: break; michael@0: case 'q': /* debug mode */ michael@0: if (verbosity > silent) michael@0: verbosity = ChangeVerbosity(verbosity, -1); michael@0: break; michael@0: case 'G': /* use global threads */ michael@0: thread_scope = PR_GLOBAL_THREAD; michael@0: break; michael@0: case 'c': /* number of client threads */ michael@0: client_threads = atoi(opt->value); michael@0: break; michael@0: case 'o': /* operations to compelete */ michael@0: ops_required = atoi(opt->value); michael@0: break; michael@0: case 'p': /* default port */ michael@0: default_port = atoi(opt->value); michael@0: break; michael@0: case 't': /* number of threads waiting */ michael@0: worker_threads = atoi(opt->value); michael@0: break; michael@0: case 'w': /* number of wait objects */ michael@0: wait_objects = atoi(opt->value); michael@0: break; michael@0: default: michael@0: break; michael@0: } michael@0: } michael@0: PL_DestroyOptState(opt); michael@0: michael@0: if (verbosity > 0) michael@0: debug = PR_GetSpecialFD(PR_StandardError); michael@0: michael@0: RunThisOne(OneInThenCancelled, "OneInThenCancelled", test_name); michael@0: RunThisOne(OneOpOneThread, "OneOpOneThread", test_name); michael@0: RunThisOne(ManyOpOneThread, "ManyOpOneThread", test_name); michael@0: RunThisOne(SomeOpsSomeThreads, "SomeOpsSomeThreads", test_name); michael@0: RunThisOne(RealOneGroupIO, "RealOneGroupIO", test_name); michael@0: return 0; michael@0: } /* main */ michael@0: michael@0: /* multwait.c */