diff -r 000000000000 -r 6474c204b198 xpcom/tests/TestPipes.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/xpcom/tests/TestPipes.cpp Wed Dec 31 06:09:35 2014 +0100 @@ -0,0 +1,469 @@ +/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "TestHarness.h" + +#include "nsIThread.h" +#include "nsIRunnable.h" +#include "nsThreadUtils.h" +#include "prprf.h" +#include "prinrval.h" +#include "nsCRT.h" +#include "nsIPipe.h" // new implementation + +#include "mozilla/Monitor.h" +using namespace mozilla; + +/** NS_NewPipe2 reimplemented, because it's not exported by XPCOM */ +nsresult TP_NewPipe2(nsIAsyncInputStream** input, + nsIAsyncOutputStream** output, + bool nonBlockingInput, + bool nonBlockingOutput, + uint32_t segmentSize, + uint32_t segmentCount, + nsIMemory* segmentAlloc) +{ + nsCOMPtr pipe = do_CreateInstance("@mozilla.org/pipe;1"); + if (!pipe) + return NS_ERROR_OUT_OF_MEMORY; + + nsresult rv = pipe->Init(nonBlockingInput, + nonBlockingOutput, + segmentSize, + segmentCount, + segmentAlloc); + + if (NS_FAILED(rv)) + return rv; + + pipe->GetInputStream(input); + pipe->GetOutputStream(output); + return NS_OK; +} + +/** NS_NewPipe reimplemented, because it's not exported by XPCOM */ +#define TP_DEFAULT_SEGMENT_SIZE 4096 +nsresult TP_NewPipe(nsIInputStream **pipeIn, + nsIOutputStream **pipeOut, + uint32_t segmentSize = 0, + uint32_t maxSize = 0, + bool nonBlockingInput = false, + bool nonBlockingOutput = false, + nsIMemory *segmentAlloc = nullptr); +nsresult TP_NewPipe(nsIInputStream **pipeIn, + nsIOutputStream **pipeOut, + uint32_t segmentSize, + uint32_t maxSize, + bool nonBlockingInput, + bool nonBlockingOutput, + nsIMemory *segmentAlloc) +{ + if (segmentSize == 0) + segmentSize = TP_DEFAULT_SEGMENT_SIZE; + + // Handle maxSize of UINT32_MAX as a special case + uint32_t segmentCount; + if (maxSize == UINT32_MAX) + segmentCount = UINT32_MAX; + else + segmentCount = maxSize / segmentSize; + + nsIAsyncInputStream *in; + nsIAsyncOutputStream *out; + nsresult rv = TP_NewPipe2(&in, &out, nonBlockingInput, nonBlockingOutput, + segmentSize, segmentCount, segmentAlloc); + if (NS_FAILED(rv)) return rv; + + *pipeIn = in; + *pipeOut = out; + return NS_OK; +} + + +#define KEY 0xa7 +#define ITERATIONS 33333 +char kTestPattern[] = "My hovercraft is full of eels.\n"; + +bool gTrace = false; + +static nsresult +WriteAll(nsIOutputStream *os, const char *buf, uint32_t bufLen, uint32_t *lenWritten) +{ + const char *p = buf; + *lenWritten = 0; + while (bufLen) { + uint32_t n; + nsresult rv = os->Write(p, bufLen, &n); + if (NS_FAILED(rv)) return rv; + p += n; + bufLen -= n; + *lenWritten += n; + } + return NS_OK; +} + +class nsReceiver : public nsIRunnable { +public: + NS_DECL_THREADSAFE_ISUPPORTS + + NS_IMETHOD Run() { + nsresult rv; + char buf[101]; + uint32_t count; + PRIntervalTime start = PR_IntervalNow(); + while (true) { + rv = mIn->Read(buf, 100, &count); + if (NS_FAILED(rv)) { + printf("read failed\n"); + break; + } + if (count == 0) { +// printf("EOF count = %d\n", mCount); + break; + } + + if (gTrace) { + buf[count] = '\0'; + printf("read: %s\n", buf); + } + mCount += count; + } + PRIntervalTime end = PR_IntervalNow(); + printf("read %d bytes, time = %dms\n", mCount, + PR_IntervalToMilliseconds(end - start)); + return rv; + } + + nsReceiver(nsIInputStream* in) : mIn(in), mCount(0) { + } + + uint32_t GetBytesRead() { return mCount; } + +protected: + nsCOMPtr mIn; + uint32_t mCount; +}; + +NS_IMPL_ISUPPORTS(nsReceiver, nsIRunnable) + +nsresult +TestPipe(nsIInputStream* in, nsIOutputStream* out) +{ + nsCOMPtr receiver = new nsReceiver(in); + if (!receiver) + return NS_ERROR_OUT_OF_MEMORY; + + nsresult rv; + + nsCOMPtr thread; + rv = NS_NewThread(getter_AddRefs(thread), receiver); + if (NS_FAILED(rv)) return rv; + + uint32_t total = 0; + PRIntervalTime start = PR_IntervalNow(); + for (uint32_t i = 0; i < ITERATIONS; i++) { + uint32_t writeCount; + char *buf = PR_smprintf("%d %s", i, kTestPattern); + uint32_t len = strlen(buf); + rv = WriteAll(out, buf, len, &writeCount); + if (gTrace) { + printf("wrote: "); + for (uint32_t j = 0; j < writeCount; j++) { + putc(buf[j], stdout); + } + printf("\n"); + } + PR_smprintf_free(buf); + if (NS_FAILED(rv)) return rv; + total += writeCount; + } + rv = out->Close(); + if (NS_FAILED(rv)) return rv; + + PRIntervalTime end = PR_IntervalNow(); + + thread->Shutdown(); + + printf("wrote %d bytes, time = %dms\n", total, + PR_IntervalToMilliseconds(end - start)); + NS_ASSERTION(receiver->GetBytesRead() == total, "didn't read everything"); + + return NS_OK; +} + +//////////////////////////////////////////////////////////////////////////////// + +class nsShortReader : public nsIRunnable { +public: + NS_DECL_THREADSAFE_ISUPPORTS + + NS_IMETHOD Run() { + nsresult rv; + char buf[101]; + uint32_t count; + uint32_t total = 0; + while (true) { + //if (gTrace) + // printf("calling Read\n"); + rv = mIn->Read(buf, 100, &count); + if (NS_FAILED(rv)) { + printf("read failed\n"); + break; + } + if (count == 0) { + break; + } + + if (gTrace) { + // For next |printf()| call and possible others elsewhere. + buf[count] = '\0'; + + printf("read %d bytes: %s\n", count, buf); + } + + Received(count); + total += count; + } + printf("read %d bytes\n", total); + return rv; + } + + nsShortReader(nsIInputStream* in) : mIn(in), mReceived(0) { + mMon = new Monitor("nsShortReader"); + } + + void Received(uint32_t count) { + MonitorAutoEnter mon(*mMon); + mReceived += count; + mon.Notify(); + } + + uint32_t WaitForReceipt(const uint32_t aWriteCount) { + MonitorAutoEnter mon(*mMon); + uint32_t result = mReceived; + + while (result < aWriteCount) { + mon.Wait(); + + NS_ASSERTION(mReceived > result, "failed to receive"); + result = mReceived; + } + + mReceived = 0; + return result; + } + +protected: + nsCOMPtr mIn; + uint32_t mReceived; + Monitor* mMon; +}; + +NS_IMPL_ISUPPORTS(nsShortReader, nsIRunnable) + +nsresult +TestShortWrites(nsIInputStream* in, nsIOutputStream* out) +{ + nsCOMPtr receiver = new nsShortReader(in); + if (!receiver) + return NS_ERROR_OUT_OF_MEMORY; + + nsresult rv; + + nsCOMPtr thread; + rv = NS_NewThread(getter_AddRefs(thread), receiver); + if (NS_FAILED(rv)) return rv; + + uint32_t total = 0; + for (uint32_t i = 0; i < ITERATIONS; i++) { + uint32_t writeCount; + char* buf = PR_smprintf("%d %s", i, kTestPattern); + uint32_t len = strlen(buf); + len = len * rand() / RAND_MAX; + len = XPCOM_MAX(1, len); + rv = WriteAll(out, buf, len, &writeCount); + if (NS_FAILED(rv)) return rv; + NS_ASSERTION(writeCount == len, "didn't write enough"); + total += writeCount; + + if (gTrace) + printf("wrote %d bytes: %s\n", writeCount, buf); + PR_smprintf_free(buf); + //printf("calling Flush\n"); + out->Flush(); + //printf("calling WaitForReceipt\n"); + +#ifdef DEBUG + const uint32_t received = +#endif + receiver->WaitForReceipt(writeCount); + NS_ASSERTION(received == writeCount, "received wrong amount"); + } + rv = out->Close(); + if (NS_FAILED(rv)) return rv; + + thread->Shutdown(); + + printf("wrote %d bytes\n", total); + + return NS_OK; +} + +//////////////////////////////////////////////////////////////////////////////// + +class nsPump : public nsIRunnable +{ +public: + NS_DECL_THREADSAFE_ISUPPORTS + + NS_IMETHOD Run() { + nsresult rv; + uint32_t count; + while (true) { + rv = mOut->WriteFrom(mIn, ~0U, &count); + if (NS_FAILED(rv)) { + printf("Write failed\n"); + break; + } + if (count == 0) { + printf("EOF count = %d\n", mCount); + break; + } + + if (gTrace) { + printf("Wrote: %d\n", count); + } + mCount += count; + } + mOut->Close(); + return rv; + } + + nsPump(nsIInputStream* in, + nsIOutputStream* out) + : mIn(in), mOut(out), mCount(0) { + } + +protected: + nsCOMPtr mIn; + nsCOMPtr mOut; + uint32_t mCount; +}; + +NS_IMPL_ISUPPORTS(nsPump, nsIRunnable) + +nsresult +TestChainedPipes() +{ + nsresult rv; + printf("TestChainedPipes\n"); + + nsCOMPtr in1; + nsCOMPtr out1; + rv = TP_NewPipe(getter_AddRefs(in1), getter_AddRefs(out1), 20, 1999); + if (NS_FAILED(rv)) return rv; + + nsCOMPtr in2; + nsCOMPtr out2; + rv = TP_NewPipe(getter_AddRefs(in2), getter_AddRefs(out2), 200, 401); + if (NS_FAILED(rv)) return rv; + + nsCOMPtr pump = new nsPump(in1, out2); + if (pump == nullptr) return NS_ERROR_OUT_OF_MEMORY; + + nsCOMPtr thread; + rv = NS_NewThread(getter_AddRefs(thread), pump); + if (NS_FAILED(rv)) return rv; + + nsCOMPtr receiver = new nsReceiver(in2); + if (receiver == nullptr) return NS_ERROR_OUT_OF_MEMORY; + + nsCOMPtr receiverThread; + rv = NS_NewThread(getter_AddRefs(receiverThread), receiver); + if (NS_FAILED(rv)) return rv; + + uint32_t total = 0; + for (uint32_t i = 0; i < ITERATIONS; i++) { + uint32_t writeCount; + char* buf = PR_smprintf("%d %s", i, kTestPattern); + uint32_t len = strlen(buf); + len = len * rand() / RAND_MAX; + len = XPCOM_MAX(1, len); + rv = WriteAll(out1, buf, len, &writeCount); + if (NS_FAILED(rv)) return rv; + NS_ASSERTION(writeCount == len, "didn't write enough"); + total += writeCount; + + if (gTrace) + printf("wrote %d bytes: %s\n", writeCount, buf); + + PR_smprintf_free(buf); + } + printf("wrote total of %d bytes\n", total); + rv = out1->Close(); + if (NS_FAILED(rv)) return rv; + + thread->Shutdown(); + receiverThread->Shutdown(); + + return NS_OK; +} + +//////////////////////////////////////////////////////////////////////////////// + +void +RunTests(uint32_t segSize, uint32_t segCount) +{ + nsresult rv; + nsCOMPtr in; + nsCOMPtr out; + uint32_t bufSize = segSize * segCount; + printf("Testing New Pipes: segment size %d buffer size %d\n", segSize, bufSize); + + printf("Testing long writes...\n"); + rv = TP_NewPipe(getter_AddRefs(in), getter_AddRefs(out), segSize, bufSize); + NS_ASSERTION(NS_SUCCEEDED(rv), "TP_NewPipe failed"); + rv = TestPipe(in, out); + NS_ASSERTION(NS_SUCCEEDED(rv), "TestPipe failed"); + + printf("Testing short writes...\n"); + rv = TP_NewPipe(getter_AddRefs(in), getter_AddRefs(out), segSize, bufSize); + NS_ASSERTION(NS_SUCCEEDED(rv), "TP_NewPipe failed"); + rv = TestShortWrites(in, out); + NS_ASSERTION(NS_SUCCEEDED(rv), "TestPipe failed"); +} + +//////////////////////////////////////////////////////////////////////////////// + +#if 0 +extern void +TestSegmentedBuffer(); +#endif + +int +main(int argc, char* argv[]) +{ + nsresult rv; + + nsCOMPtr servMgr; + rv = NS_InitXPCOM2(getter_AddRefs(servMgr), nullptr, nullptr); + if (NS_FAILED(rv)) return rv; + + if (argc > 1 && nsCRT::strcmp(argv[1], "-trace") == 0) + gTrace = true; + + rv = TestChainedPipes(); + NS_ASSERTION(NS_SUCCEEDED(rv), "TestChainedPipes failed"); + RunTests(16, 1); + RunTests(4096, 16); + + servMgr = 0; + rv = NS_ShutdownXPCOM(nullptr); + NS_ASSERTION(NS_SUCCEEDED(rv), "NS_ShutdownXPCOM failed"); + + return 0; +} + +////////////////////////////////////////////////////////////////////////////////