michael@0: /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: #include "TestHarness.h" michael@0: michael@0: #include "nsIThread.h" michael@0: #include "nsIRunnable.h" michael@0: #include "nsThreadUtils.h" michael@0: #include "prprf.h" michael@0: #include "prinrval.h" michael@0: #include "nsCRT.h" michael@0: #include "nsIPipe.h" // new implementation michael@0: michael@0: #include "mozilla/Monitor.h" michael@0: using namespace mozilla; michael@0: michael@0: /** NS_NewPipe2 reimplemented, because it's not exported by XPCOM */ michael@0: nsresult TP_NewPipe2(nsIAsyncInputStream** input, michael@0: nsIAsyncOutputStream** output, michael@0: bool nonBlockingInput, michael@0: bool nonBlockingOutput, michael@0: uint32_t segmentSize, michael@0: uint32_t segmentCount, michael@0: nsIMemory* segmentAlloc) michael@0: { michael@0: nsCOMPtr pipe = do_CreateInstance("@mozilla.org/pipe;1"); michael@0: if (!pipe) michael@0: return NS_ERROR_OUT_OF_MEMORY; michael@0: michael@0: nsresult rv = pipe->Init(nonBlockingInput, michael@0: nonBlockingOutput, michael@0: segmentSize, michael@0: segmentCount, michael@0: segmentAlloc); michael@0: michael@0: if (NS_FAILED(rv)) michael@0: return rv; michael@0: michael@0: pipe->GetInputStream(input); michael@0: pipe->GetOutputStream(output); michael@0: return NS_OK; michael@0: } michael@0: michael@0: /** NS_NewPipe reimplemented, because it's not exported by XPCOM */ michael@0: #define TP_DEFAULT_SEGMENT_SIZE 4096 michael@0: nsresult TP_NewPipe(nsIInputStream **pipeIn, michael@0: nsIOutputStream **pipeOut, michael@0: uint32_t segmentSize = 0, michael@0: uint32_t maxSize = 0, michael@0: bool nonBlockingInput = false, michael@0: bool nonBlockingOutput = false, michael@0: nsIMemory *segmentAlloc = nullptr); michael@0: nsresult TP_NewPipe(nsIInputStream **pipeIn, michael@0: nsIOutputStream **pipeOut, michael@0: uint32_t segmentSize, michael@0: uint32_t maxSize, michael@0: bool nonBlockingInput, michael@0: bool nonBlockingOutput, michael@0: nsIMemory *segmentAlloc) michael@0: { michael@0: if (segmentSize == 0) michael@0: segmentSize = TP_DEFAULT_SEGMENT_SIZE; michael@0: michael@0: // Handle maxSize of UINT32_MAX as a special case michael@0: uint32_t segmentCount; michael@0: if (maxSize == UINT32_MAX) michael@0: segmentCount = UINT32_MAX; michael@0: else michael@0: segmentCount = maxSize / segmentSize; michael@0: michael@0: nsIAsyncInputStream *in; michael@0: nsIAsyncOutputStream *out; michael@0: nsresult rv = TP_NewPipe2(&in, &out, nonBlockingInput, nonBlockingOutput, michael@0: segmentSize, segmentCount, segmentAlloc); michael@0: if (NS_FAILED(rv)) return rv; michael@0: michael@0: *pipeIn = in; michael@0: *pipeOut = out; michael@0: return NS_OK; michael@0: } michael@0: michael@0: michael@0: #define KEY 0xa7 michael@0: #define ITERATIONS 33333 michael@0: char kTestPattern[] = "My hovercraft is full of eels.\n"; michael@0: michael@0: bool gTrace = false; michael@0: michael@0: static nsresult michael@0: WriteAll(nsIOutputStream *os, const char *buf, uint32_t bufLen, uint32_t *lenWritten) michael@0: { michael@0: const char *p = buf; michael@0: *lenWritten = 0; michael@0: while (bufLen) { michael@0: uint32_t n; michael@0: nsresult rv = os->Write(p, bufLen, &n); michael@0: if (NS_FAILED(rv)) return rv; michael@0: p += n; michael@0: bufLen -= n; michael@0: *lenWritten += n; michael@0: } michael@0: return NS_OK; michael@0: } michael@0: michael@0: class nsReceiver : public nsIRunnable { michael@0: public: michael@0: NS_DECL_THREADSAFE_ISUPPORTS michael@0: michael@0: NS_IMETHOD Run() { michael@0: nsresult rv; michael@0: char buf[101]; michael@0: uint32_t count; michael@0: PRIntervalTime start = PR_IntervalNow(); michael@0: while (true) { michael@0: rv = mIn->Read(buf, 100, &count); michael@0: if (NS_FAILED(rv)) { michael@0: printf("read failed\n"); michael@0: break; michael@0: } michael@0: if (count == 0) { michael@0: // printf("EOF count = %d\n", mCount); michael@0: break; michael@0: } michael@0: michael@0: if (gTrace) { michael@0: buf[count] = '\0'; michael@0: printf("read: %s\n", buf); michael@0: } michael@0: mCount += count; michael@0: } michael@0: PRIntervalTime end = PR_IntervalNow(); michael@0: printf("read %d bytes, time = %dms\n", mCount, michael@0: PR_IntervalToMilliseconds(end - start)); michael@0: return rv; michael@0: } michael@0: michael@0: nsReceiver(nsIInputStream* in) : mIn(in), mCount(0) { michael@0: } michael@0: michael@0: uint32_t GetBytesRead() { return mCount; } michael@0: michael@0: protected: michael@0: nsCOMPtr mIn; michael@0: uint32_t mCount; michael@0: }; michael@0: michael@0: NS_IMPL_ISUPPORTS(nsReceiver, nsIRunnable) michael@0: michael@0: nsresult michael@0: TestPipe(nsIInputStream* in, nsIOutputStream* out) michael@0: { michael@0: nsCOMPtr receiver = new nsReceiver(in); michael@0: if (!receiver) michael@0: return NS_ERROR_OUT_OF_MEMORY; michael@0: michael@0: nsresult rv; michael@0: michael@0: nsCOMPtr thread; michael@0: rv = NS_NewThread(getter_AddRefs(thread), receiver); michael@0: if (NS_FAILED(rv)) return rv; michael@0: michael@0: uint32_t total = 0; michael@0: PRIntervalTime start = PR_IntervalNow(); michael@0: for (uint32_t i = 0; i < ITERATIONS; i++) { michael@0: uint32_t writeCount; michael@0: char *buf = PR_smprintf("%d %s", i, kTestPattern); michael@0: uint32_t len = strlen(buf); michael@0: rv = WriteAll(out, buf, len, &writeCount); michael@0: if (gTrace) { michael@0: printf("wrote: "); michael@0: for (uint32_t j = 0; j < writeCount; j++) { michael@0: putc(buf[j], stdout); michael@0: } michael@0: printf("\n"); michael@0: } michael@0: PR_smprintf_free(buf); michael@0: if (NS_FAILED(rv)) return rv; michael@0: total += writeCount; michael@0: } michael@0: rv = out->Close(); michael@0: if (NS_FAILED(rv)) return rv; michael@0: michael@0: PRIntervalTime end = PR_IntervalNow(); michael@0: michael@0: thread->Shutdown(); michael@0: michael@0: printf("wrote %d bytes, time = %dms\n", total, michael@0: PR_IntervalToMilliseconds(end - start)); michael@0: NS_ASSERTION(receiver->GetBytesRead() == total, "didn't read everything"); michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: //////////////////////////////////////////////////////////////////////////////// michael@0: michael@0: class nsShortReader : public nsIRunnable { michael@0: public: michael@0: NS_DECL_THREADSAFE_ISUPPORTS michael@0: michael@0: NS_IMETHOD Run() { michael@0: nsresult rv; michael@0: char buf[101]; michael@0: uint32_t count; michael@0: uint32_t total = 0; michael@0: while (true) { michael@0: //if (gTrace) michael@0: // printf("calling Read\n"); michael@0: rv = mIn->Read(buf, 100, &count); michael@0: if (NS_FAILED(rv)) { michael@0: printf("read failed\n"); michael@0: break; michael@0: } michael@0: if (count == 0) { michael@0: break; michael@0: } michael@0: michael@0: if (gTrace) { michael@0: // For next |printf()| call and possible others elsewhere. michael@0: buf[count] = '\0'; michael@0: michael@0: printf("read %d bytes: %s\n", count, buf); michael@0: } michael@0: michael@0: Received(count); michael@0: total += count; michael@0: } michael@0: printf("read %d bytes\n", total); michael@0: return rv; michael@0: } michael@0: michael@0: nsShortReader(nsIInputStream* in) : mIn(in), mReceived(0) { michael@0: mMon = new Monitor("nsShortReader"); michael@0: } michael@0: michael@0: void Received(uint32_t count) { michael@0: MonitorAutoEnter mon(*mMon); michael@0: mReceived += count; michael@0: mon.Notify(); michael@0: } michael@0: michael@0: uint32_t WaitForReceipt(const uint32_t aWriteCount) { michael@0: MonitorAutoEnter mon(*mMon); michael@0: uint32_t result = mReceived; michael@0: michael@0: while (result < aWriteCount) { michael@0: mon.Wait(); michael@0: michael@0: NS_ASSERTION(mReceived > result, "failed to receive"); michael@0: result = mReceived; michael@0: } michael@0: michael@0: mReceived = 0; michael@0: return result; michael@0: } michael@0: michael@0: protected: michael@0: nsCOMPtr mIn; michael@0: uint32_t mReceived; michael@0: Monitor* mMon; michael@0: }; michael@0: michael@0: NS_IMPL_ISUPPORTS(nsShortReader, nsIRunnable) michael@0: michael@0: nsresult michael@0: TestShortWrites(nsIInputStream* in, nsIOutputStream* out) michael@0: { michael@0: nsCOMPtr receiver = new nsShortReader(in); michael@0: if (!receiver) michael@0: return NS_ERROR_OUT_OF_MEMORY; michael@0: michael@0: nsresult rv; michael@0: michael@0: nsCOMPtr thread; michael@0: rv = NS_NewThread(getter_AddRefs(thread), receiver); michael@0: if (NS_FAILED(rv)) return rv; michael@0: michael@0: uint32_t total = 0; michael@0: for (uint32_t i = 0; i < ITERATIONS; i++) { michael@0: uint32_t writeCount; michael@0: char* buf = PR_smprintf("%d %s", i, kTestPattern); michael@0: uint32_t len = strlen(buf); michael@0: len = len * rand() / RAND_MAX; michael@0: len = XPCOM_MAX(1, len); michael@0: rv = WriteAll(out, buf, len, &writeCount); michael@0: if (NS_FAILED(rv)) return rv; michael@0: NS_ASSERTION(writeCount == len, "didn't write enough"); michael@0: total += writeCount; michael@0: michael@0: if (gTrace) michael@0: printf("wrote %d bytes: %s\n", writeCount, buf); michael@0: PR_smprintf_free(buf); michael@0: //printf("calling Flush\n"); michael@0: out->Flush(); michael@0: //printf("calling WaitForReceipt\n"); michael@0: michael@0: #ifdef DEBUG michael@0: const uint32_t received = michael@0: #endif michael@0: receiver->WaitForReceipt(writeCount); michael@0: NS_ASSERTION(received == writeCount, "received wrong amount"); michael@0: } michael@0: rv = out->Close(); michael@0: if (NS_FAILED(rv)) return rv; michael@0: michael@0: thread->Shutdown(); michael@0: michael@0: printf("wrote %d bytes\n", total); michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: //////////////////////////////////////////////////////////////////////////////// michael@0: michael@0: class nsPump : public nsIRunnable michael@0: { michael@0: public: michael@0: NS_DECL_THREADSAFE_ISUPPORTS michael@0: michael@0: NS_IMETHOD Run() { michael@0: nsresult rv; michael@0: uint32_t count; michael@0: while (true) { michael@0: rv = mOut->WriteFrom(mIn, ~0U, &count); michael@0: if (NS_FAILED(rv)) { michael@0: printf("Write failed\n"); michael@0: break; michael@0: } michael@0: if (count == 0) { michael@0: printf("EOF count = %d\n", mCount); michael@0: break; michael@0: } michael@0: michael@0: if (gTrace) { michael@0: printf("Wrote: %d\n", count); michael@0: } michael@0: mCount += count; michael@0: } michael@0: mOut->Close(); michael@0: return rv; michael@0: } michael@0: michael@0: nsPump(nsIInputStream* in, michael@0: nsIOutputStream* out) michael@0: : mIn(in), mOut(out), mCount(0) { michael@0: } michael@0: michael@0: protected: michael@0: nsCOMPtr mIn; michael@0: nsCOMPtr mOut; michael@0: uint32_t mCount; michael@0: }; michael@0: michael@0: NS_IMPL_ISUPPORTS(nsPump, nsIRunnable) michael@0: michael@0: nsresult michael@0: TestChainedPipes() michael@0: { michael@0: nsresult rv; michael@0: printf("TestChainedPipes\n"); michael@0: michael@0: nsCOMPtr in1; michael@0: nsCOMPtr out1; michael@0: rv = TP_NewPipe(getter_AddRefs(in1), getter_AddRefs(out1), 20, 1999); michael@0: if (NS_FAILED(rv)) return rv; michael@0: michael@0: nsCOMPtr in2; michael@0: nsCOMPtr out2; michael@0: rv = TP_NewPipe(getter_AddRefs(in2), getter_AddRefs(out2), 200, 401); michael@0: if (NS_FAILED(rv)) return rv; michael@0: michael@0: nsCOMPtr pump = new nsPump(in1, out2); michael@0: if (pump == nullptr) return NS_ERROR_OUT_OF_MEMORY; michael@0: michael@0: nsCOMPtr thread; michael@0: rv = NS_NewThread(getter_AddRefs(thread), pump); michael@0: if (NS_FAILED(rv)) return rv; michael@0: michael@0: nsCOMPtr receiver = new nsReceiver(in2); michael@0: if (receiver == nullptr) return NS_ERROR_OUT_OF_MEMORY; michael@0: michael@0: nsCOMPtr receiverThread; michael@0: rv = NS_NewThread(getter_AddRefs(receiverThread), receiver); michael@0: if (NS_FAILED(rv)) return rv; michael@0: michael@0: uint32_t total = 0; michael@0: for (uint32_t i = 0; i < ITERATIONS; i++) { michael@0: uint32_t writeCount; michael@0: char* buf = PR_smprintf("%d %s", i, kTestPattern); michael@0: uint32_t len = strlen(buf); michael@0: len = len * rand() / RAND_MAX; michael@0: len = XPCOM_MAX(1, len); michael@0: rv = WriteAll(out1, buf, len, &writeCount); michael@0: if (NS_FAILED(rv)) return rv; michael@0: NS_ASSERTION(writeCount == len, "didn't write enough"); michael@0: total += writeCount; michael@0: michael@0: if (gTrace) michael@0: printf("wrote %d bytes: %s\n", writeCount, buf); michael@0: michael@0: PR_smprintf_free(buf); michael@0: } michael@0: printf("wrote total of %d bytes\n", total); michael@0: rv = out1->Close(); michael@0: if (NS_FAILED(rv)) return rv; michael@0: michael@0: thread->Shutdown(); michael@0: receiverThread->Shutdown(); michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: //////////////////////////////////////////////////////////////////////////////// michael@0: michael@0: void michael@0: RunTests(uint32_t segSize, uint32_t segCount) michael@0: { michael@0: nsresult rv; michael@0: nsCOMPtr in; michael@0: nsCOMPtr out; michael@0: uint32_t bufSize = segSize * segCount; michael@0: printf("Testing New Pipes: segment size %d buffer size %d\n", segSize, bufSize); michael@0: michael@0: printf("Testing long writes...\n"); michael@0: rv = TP_NewPipe(getter_AddRefs(in), getter_AddRefs(out), segSize, bufSize); michael@0: NS_ASSERTION(NS_SUCCEEDED(rv), "TP_NewPipe failed"); michael@0: rv = TestPipe(in, out); michael@0: NS_ASSERTION(NS_SUCCEEDED(rv), "TestPipe failed"); michael@0: michael@0: printf("Testing short writes...\n"); michael@0: rv = TP_NewPipe(getter_AddRefs(in), getter_AddRefs(out), segSize, bufSize); michael@0: NS_ASSERTION(NS_SUCCEEDED(rv), "TP_NewPipe failed"); michael@0: rv = TestShortWrites(in, out); michael@0: NS_ASSERTION(NS_SUCCEEDED(rv), "TestPipe failed"); michael@0: } michael@0: michael@0: //////////////////////////////////////////////////////////////////////////////// michael@0: michael@0: #if 0 michael@0: extern void michael@0: TestSegmentedBuffer(); michael@0: #endif michael@0: michael@0: int michael@0: main(int argc, char* argv[]) michael@0: { michael@0: nsresult rv; michael@0: michael@0: nsCOMPtr servMgr; michael@0: rv = NS_InitXPCOM2(getter_AddRefs(servMgr), nullptr, nullptr); michael@0: if (NS_FAILED(rv)) return rv; michael@0: michael@0: if (argc > 1 && nsCRT::strcmp(argv[1], "-trace") == 0) michael@0: gTrace = true; michael@0: michael@0: rv = TestChainedPipes(); michael@0: NS_ASSERTION(NS_SUCCEEDED(rv), "TestChainedPipes failed"); michael@0: RunTests(16, 1); michael@0: RunTests(4096, 16); michael@0: michael@0: servMgr = 0; michael@0: rv = NS_ShutdownXPCOM(nullptr); michael@0: NS_ASSERTION(NS_SUCCEEDED(rv), "NS_ShutdownXPCOM failed"); michael@0: michael@0: return 0; michael@0: } michael@0: michael@0: ////////////////////////////////////////////////////////////////////////////////