1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/xpcom/tests/TestPipes.cpp Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,469 @@ 1.4 +/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ 1.5 +/* This Source Code Form is subject to the terms of the Mozilla Public 1.6 + * License, v. 2.0. If a copy of the MPL was not distributed with this 1.7 + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 1.8 + 1.9 +#include "TestHarness.h" 1.10 + 1.11 +#include "nsIThread.h" 1.12 +#include "nsIRunnable.h" 1.13 +#include "nsThreadUtils.h" 1.14 +#include "prprf.h" 1.15 +#include "prinrval.h" 1.16 +#include "nsCRT.h" 1.17 +#include "nsIPipe.h" // new implementation 1.18 + 1.19 +#include "mozilla/Monitor.h" 1.20 +using namespace mozilla; 1.21 + 1.22 +/** NS_NewPipe2 reimplemented, because it's not exported by XPCOM */ 1.23 +nsresult TP_NewPipe2(nsIAsyncInputStream** input, 1.24 + nsIAsyncOutputStream** output, 1.25 + bool nonBlockingInput, 1.26 + bool nonBlockingOutput, 1.27 + uint32_t segmentSize, 1.28 + uint32_t segmentCount, 1.29 + nsIMemory* segmentAlloc) 1.30 +{ 1.31 + nsCOMPtr<nsIPipe> pipe = do_CreateInstance("@mozilla.org/pipe;1"); 1.32 + if (!pipe) 1.33 + return NS_ERROR_OUT_OF_MEMORY; 1.34 + 1.35 + nsresult rv = pipe->Init(nonBlockingInput, 1.36 + nonBlockingOutput, 1.37 + segmentSize, 1.38 + segmentCount, 1.39 + segmentAlloc); 1.40 + 1.41 + if (NS_FAILED(rv)) 1.42 + return rv; 1.43 + 1.44 + pipe->GetInputStream(input); 1.45 + pipe->GetOutputStream(output); 1.46 + return NS_OK; 1.47 +} 1.48 + 1.49 +/** NS_NewPipe reimplemented, because it's not exported by XPCOM */ 1.50 +#define TP_DEFAULT_SEGMENT_SIZE 4096 1.51 +nsresult TP_NewPipe(nsIInputStream **pipeIn, 1.52 + nsIOutputStream **pipeOut, 1.53 + uint32_t segmentSize = 0, 1.54 + uint32_t maxSize = 0, 1.55 + bool nonBlockingInput = false, 1.56 + bool nonBlockingOutput = false, 1.57 + nsIMemory *segmentAlloc = nullptr); 1.58 +nsresult TP_NewPipe(nsIInputStream **pipeIn, 1.59 + nsIOutputStream **pipeOut, 1.60 + uint32_t segmentSize, 1.61 + uint32_t maxSize, 1.62 + bool nonBlockingInput, 1.63 + bool nonBlockingOutput, 1.64 + nsIMemory *segmentAlloc) 1.65 +{ 1.66 + if (segmentSize == 0) 1.67 + segmentSize = TP_DEFAULT_SEGMENT_SIZE; 1.68 + 1.69 + // Handle maxSize of UINT32_MAX as a special case 1.70 + uint32_t segmentCount; 1.71 + if (maxSize == UINT32_MAX) 1.72 + segmentCount = UINT32_MAX; 1.73 + else 1.74 + segmentCount = maxSize / segmentSize; 1.75 + 1.76 + nsIAsyncInputStream *in; 1.77 + nsIAsyncOutputStream *out; 1.78 + nsresult rv = TP_NewPipe2(&in, &out, nonBlockingInput, nonBlockingOutput, 1.79 + segmentSize, segmentCount, segmentAlloc); 1.80 + if (NS_FAILED(rv)) return rv; 1.81 + 1.82 + *pipeIn = in; 1.83 + *pipeOut = out; 1.84 + return NS_OK; 1.85 +} 1.86 + 1.87 + 1.88 +#define KEY 0xa7 1.89 +#define ITERATIONS 33333 1.90 +char kTestPattern[] = "My hovercraft is full of eels.\n"; 1.91 + 1.92 +bool gTrace = false; 1.93 + 1.94 +static nsresult 1.95 +WriteAll(nsIOutputStream *os, const char *buf, uint32_t bufLen, uint32_t *lenWritten) 1.96 +{ 1.97 + const char *p = buf; 1.98 + *lenWritten = 0; 1.99 + while (bufLen) { 1.100 + uint32_t n; 1.101 + nsresult rv = os->Write(p, bufLen, &n); 1.102 + if (NS_FAILED(rv)) return rv; 1.103 + p += n; 1.104 + bufLen -= n; 1.105 + *lenWritten += n; 1.106 + } 1.107 + return NS_OK; 1.108 +} 1.109 + 1.110 +class nsReceiver : public nsIRunnable { 1.111 +public: 1.112 + NS_DECL_THREADSAFE_ISUPPORTS 1.113 + 1.114 + NS_IMETHOD Run() { 1.115 + nsresult rv; 1.116 + char buf[101]; 1.117 + uint32_t count; 1.118 + PRIntervalTime start = PR_IntervalNow(); 1.119 + while (true) { 1.120 + rv = mIn->Read(buf, 100, &count); 1.121 + if (NS_FAILED(rv)) { 1.122 + printf("read failed\n"); 1.123 + break; 1.124 + } 1.125 + if (count == 0) { 1.126 +// printf("EOF count = %d\n", mCount); 1.127 + break; 1.128 + } 1.129 + 1.130 + if (gTrace) { 1.131 + buf[count] = '\0'; 1.132 + printf("read: %s\n", buf); 1.133 + } 1.134 + mCount += count; 1.135 + } 1.136 + PRIntervalTime end = PR_IntervalNow(); 1.137 + printf("read %d bytes, time = %dms\n", mCount, 1.138 + PR_IntervalToMilliseconds(end - start)); 1.139 + return rv; 1.140 + } 1.141 + 1.142 + nsReceiver(nsIInputStream* in) : mIn(in), mCount(0) { 1.143 + } 1.144 + 1.145 + uint32_t GetBytesRead() { return mCount; } 1.146 + 1.147 +protected: 1.148 + nsCOMPtr<nsIInputStream> mIn; 1.149 + uint32_t mCount; 1.150 +}; 1.151 + 1.152 +NS_IMPL_ISUPPORTS(nsReceiver, nsIRunnable) 1.153 + 1.154 +nsresult 1.155 +TestPipe(nsIInputStream* in, nsIOutputStream* out) 1.156 +{ 1.157 + nsCOMPtr<nsReceiver> receiver = new nsReceiver(in); 1.158 + if (!receiver) 1.159 + return NS_ERROR_OUT_OF_MEMORY; 1.160 + 1.161 + nsresult rv; 1.162 + 1.163 + nsCOMPtr<nsIThread> thread; 1.164 + rv = NS_NewThread(getter_AddRefs(thread), receiver); 1.165 + if (NS_FAILED(rv)) return rv; 1.166 + 1.167 + uint32_t total = 0; 1.168 + PRIntervalTime start = PR_IntervalNow(); 1.169 + for (uint32_t i = 0; i < ITERATIONS; i++) { 1.170 + uint32_t writeCount; 1.171 + char *buf = PR_smprintf("%d %s", i, kTestPattern); 1.172 + uint32_t len = strlen(buf); 1.173 + rv = WriteAll(out, buf, len, &writeCount); 1.174 + if (gTrace) { 1.175 + printf("wrote: "); 1.176 + for (uint32_t j = 0; j < writeCount; j++) { 1.177 + putc(buf[j], stdout); 1.178 + } 1.179 + printf("\n"); 1.180 + } 1.181 + PR_smprintf_free(buf); 1.182 + if (NS_FAILED(rv)) return rv; 1.183 + total += writeCount; 1.184 + } 1.185 + rv = out->Close(); 1.186 + if (NS_FAILED(rv)) return rv; 1.187 + 1.188 + PRIntervalTime end = PR_IntervalNow(); 1.189 + 1.190 + thread->Shutdown(); 1.191 + 1.192 + printf("wrote %d bytes, time = %dms\n", total, 1.193 + PR_IntervalToMilliseconds(end - start)); 1.194 + NS_ASSERTION(receiver->GetBytesRead() == total, "didn't read everything"); 1.195 + 1.196 + return NS_OK; 1.197 +} 1.198 + 1.199 +//////////////////////////////////////////////////////////////////////////////// 1.200 + 1.201 +class nsShortReader : public nsIRunnable { 1.202 +public: 1.203 + NS_DECL_THREADSAFE_ISUPPORTS 1.204 + 1.205 + NS_IMETHOD Run() { 1.206 + nsresult rv; 1.207 + char buf[101]; 1.208 + uint32_t count; 1.209 + uint32_t total = 0; 1.210 + while (true) { 1.211 + //if (gTrace) 1.212 + // printf("calling Read\n"); 1.213 + rv = mIn->Read(buf, 100, &count); 1.214 + if (NS_FAILED(rv)) { 1.215 + printf("read failed\n"); 1.216 + break; 1.217 + } 1.218 + if (count == 0) { 1.219 + break; 1.220 + } 1.221 + 1.222 + if (gTrace) { 1.223 + // For next |printf()| call and possible others elsewhere. 1.224 + buf[count] = '\0'; 1.225 + 1.226 + printf("read %d bytes: %s\n", count, buf); 1.227 + } 1.228 + 1.229 + Received(count); 1.230 + total += count; 1.231 + } 1.232 + printf("read %d bytes\n", total); 1.233 + return rv; 1.234 + } 1.235 + 1.236 + nsShortReader(nsIInputStream* in) : mIn(in), mReceived(0) { 1.237 + mMon = new Monitor("nsShortReader"); 1.238 + } 1.239 + 1.240 + void Received(uint32_t count) { 1.241 + MonitorAutoEnter mon(*mMon); 1.242 + mReceived += count; 1.243 + mon.Notify(); 1.244 + } 1.245 + 1.246 + uint32_t WaitForReceipt(const uint32_t aWriteCount) { 1.247 + MonitorAutoEnter mon(*mMon); 1.248 + uint32_t result = mReceived; 1.249 + 1.250 + while (result < aWriteCount) { 1.251 + mon.Wait(); 1.252 + 1.253 + NS_ASSERTION(mReceived > result, "failed to receive"); 1.254 + result = mReceived; 1.255 + } 1.256 + 1.257 + mReceived = 0; 1.258 + return result; 1.259 + } 1.260 + 1.261 +protected: 1.262 + nsCOMPtr<nsIInputStream> mIn; 1.263 + uint32_t mReceived; 1.264 + Monitor* mMon; 1.265 +}; 1.266 + 1.267 +NS_IMPL_ISUPPORTS(nsShortReader, nsIRunnable) 1.268 + 1.269 +nsresult 1.270 +TestShortWrites(nsIInputStream* in, nsIOutputStream* out) 1.271 +{ 1.272 + nsCOMPtr<nsShortReader> receiver = new nsShortReader(in); 1.273 + if (!receiver) 1.274 + return NS_ERROR_OUT_OF_MEMORY; 1.275 + 1.276 + nsresult rv; 1.277 + 1.278 + nsCOMPtr<nsIThread> thread; 1.279 + rv = NS_NewThread(getter_AddRefs(thread), receiver); 1.280 + if (NS_FAILED(rv)) return rv; 1.281 + 1.282 + uint32_t total = 0; 1.283 + for (uint32_t i = 0; i < ITERATIONS; i++) { 1.284 + uint32_t writeCount; 1.285 + char* buf = PR_smprintf("%d %s", i, kTestPattern); 1.286 + uint32_t len = strlen(buf); 1.287 + len = len * rand() / RAND_MAX; 1.288 + len = XPCOM_MAX(1, len); 1.289 + rv = WriteAll(out, buf, len, &writeCount); 1.290 + if (NS_FAILED(rv)) return rv; 1.291 + NS_ASSERTION(writeCount == len, "didn't write enough"); 1.292 + total += writeCount; 1.293 + 1.294 + if (gTrace) 1.295 + printf("wrote %d bytes: %s\n", writeCount, buf); 1.296 + PR_smprintf_free(buf); 1.297 + //printf("calling Flush\n"); 1.298 + out->Flush(); 1.299 + //printf("calling WaitForReceipt\n"); 1.300 + 1.301 +#ifdef DEBUG 1.302 + const uint32_t received = 1.303 +#endif 1.304 + receiver->WaitForReceipt(writeCount); 1.305 + NS_ASSERTION(received == writeCount, "received wrong amount"); 1.306 + } 1.307 + rv = out->Close(); 1.308 + if (NS_FAILED(rv)) return rv; 1.309 + 1.310 + thread->Shutdown(); 1.311 + 1.312 + printf("wrote %d bytes\n", total); 1.313 + 1.314 + return NS_OK; 1.315 +} 1.316 + 1.317 +//////////////////////////////////////////////////////////////////////////////// 1.318 + 1.319 +class nsPump : public nsIRunnable 1.320 +{ 1.321 +public: 1.322 + NS_DECL_THREADSAFE_ISUPPORTS 1.323 + 1.324 + NS_IMETHOD Run() { 1.325 + nsresult rv; 1.326 + uint32_t count; 1.327 + while (true) { 1.328 + rv = mOut->WriteFrom(mIn, ~0U, &count); 1.329 + if (NS_FAILED(rv)) { 1.330 + printf("Write failed\n"); 1.331 + break; 1.332 + } 1.333 + if (count == 0) { 1.334 + printf("EOF count = %d\n", mCount); 1.335 + break; 1.336 + } 1.337 + 1.338 + if (gTrace) { 1.339 + printf("Wrote: %d\n", count); 1.340 + } 1.341 + mCount += count; 1.342 + } 1.343 + mOut->Close(); 1.344 + return rv; 1.345 + } 1.346 + 1.347 + nsPump(nsIInputStream* in, 1.348 + nsIOutputStream* out) 1.349 + : mIn(in), mOut(out), mCount(0) { 1.350 + } 1.351 + 1.352 +protected: 1.353 + nsCOMPtr<nsIInputStream> mIn; 1.354 + nsCOMPtr<nsIOutputStream> mOut; 1.355 + uint32_t mCount; 1.356 +}; 1.357 + 1.358 +NS_IMPL_ISUPPORTS(nsPump, nsIRunnable) 1.359 + 1.360 +nsresult 1.361 +TestChainedPipes() 1.362 +{ 1.363 + nsresult rv; 1.364 + printf("TestChainedPipes\n"); 1.365 + 1.366 + nsCOMPtr<nsIInputStream> in1; 1.367 + nsCOMPtr<nsIOutputStream> out1; 1.368 + rv = TP_NewPipe(getter_AddRefs(in1), getter_AddRefs(out1), 20, 1999); 1.369 + if (NS_FAILED(rv)) return rv; 1.370 + 1.371 + nsCOMPtr<nsIInputStream> in2; 1.372 + nsCOMPtr<nsIOutputStream> out2; 1.373 + rv = TP_NewPipe(getter_AddRefs(in2), getter_AddRefs(out2), 200, 401); 1.374 + if (NS_FAILED(rv)) return rv; 1.375 + 1.376 + nsCOMPtr<nsPump> pump = new nsPump(in1, out2); 1.377 + if (pump == nullptr) return NS_ERROR_OUT_OF_MEMORY; 1.378 + 1.379 + nsCOMPtr<nsIThread> thread; 1.380 + rv = NS_NewThread(getter_AddRefs(thread), pump); 1.381 + if (NS_FAILED(rv)) return rv; 1.382 + 1.383 + nsCOMPtr<nsReceiver> receiver = new nsReceiver(in2); 1.384 + if (receiver == nullptr) return NS_ERROR_OUT_OF_MEMORY; 1.385 + 1.386 + nsCOMPtr<nsIThread> receiverThread; 1.387 + rv = NS_NewThread(getter_AddRefs(receiverThread), receiver); 1.388 + if (NS_FAILED(rv)) return rv; 1.389 + 1.390 + uint32_t total = 0; 1.391 + for (uint32_t i = 0; i < ITERATIONS; i++) { 1.392 + uint32_t writeCount; 1.393 + char* buf = PR_smprintf("%d %s", i, kTestPattern); 1.394 + uint32_t len = strlen(buf); 1.395 + len = len * rand() / RAND_MAX; 1.396 + len = XPCOM_MAX(1, len); 1.397 + rv = WriteAll(out1, buf, len, &writeCount); 1.398 + if (NS_FAILED(rv)) return rv; 1.399 + NS_ASSERTION(writeCount == len, "didn't write enough"); 1.400 + total += writeCount; 1.401 + 1.402 + if (gTrace) 1.403 + printf("wrote %d bytes: %s\n", writeCount, buf); 1.404 + 1.405 + PR_smprintf_free(buf); 1.406 + } 1.407 + printf("wrote total of %d bytes\n", total); 1.408 + rv = out1->Close(); 1.409 + if (NS_FAILED(rv)) return rv; 1.410 + 1.411 + thread->Shutdown(); 1.412 + receiverThread->Shutdown(); 1.413 + 1.414 + return NS_OK; 1.415 +} 1.416 + 1.417 +//////////////////////////////////////////////////////////////////////////////// 1.418 + 1.419 +void 1.420 +RunTests(uint32_t segSize, uint32_t segCount) 1.421 +{ 1.422 + nsresult rv; 1.423 + nsCOMPtr<nsIInputStream> in; 1.424 + nsCOMPtr<nsIOutputStream> out; 1.425 + uint32_t bufSize = segSize * segCount; 1.426 + printf("Testing New Pipes: segment size %d buffer size %d\n", segSize, bufSize); 1.427 + 1.428 + printf("Testing long writes...\n"); 1.429 + rv = TP_NewPipe(getter_AddRefs(in), getter_AddRefs(out), segSize, bufSize); 1.430 + NS_ASSERTION(NS_SUCCEEDED(rv), "TP_NewPipe failed"); 1.431 + rv = TestPipe(in, out); 1.432 + NS_ASSERTION(NS_SUCCEEDED(rv), "TestPipe failed"); 1.433 + 1.434 + printf("Testing short writes...\n"); 1.435 + rv = TP_NewPipe(getter_AddRefs(in), getter_AddRefs(out), segSize, bufSize); 1.436 + NS_ASSERTION(NS_SUCCEEDED(rv), "TP_NewPipe failed"); 1.437 + rv = TestShortWrites(in, out); 1.438 + NS_ASSERTION(NS_SUCCEEDED(rv), "TestPipe failed"); 1.439 +} 1.440 + 1.441 +//////////////////////////////////////////////////////////////////////////////// 1.442 + 1.443 +#if 0 1.444 +extern void 1.445 +TestSegmentedBuffer(); 1.446 +#endif 1.447 + 1.448 +int 1.449 +main(int argc, char* argv[]) 1.450 +{ 1.451 + nsresult rv; 1.452 + 1.453 + nsCOMPtr<nsIServiceManager> servMgr; 1.454 + rv = NS_InitXPCOM2(getter_AddRefs(servMgr), nullptr, nullptr); 1.455 + if (NS_FAILED(rv)) return rv; 1.456 + 1.457 + if (argc > 1 && nsCRT::strcmp(argv[1], "-trace") == 0) 1.458 + gTrace = true; 1.459 + 1.460 + rv = TestChainedPipes(); 1.461 + NS_ASSERTION(NS_SUCCEEDED(rv), "TestChainedPipes failed"); 1.462 + RunTests(16, 1); 1.463 + RunTests(4096, 16); 1.464 + 1.465 + servMgr = 0; 1.466 + rv = NS_ShutdownXPCOM(nullptr); 1.467 + NS_ASSERTION(NS_SUCCEEDED(rv), "NS_ShutdownXPCOM failed"); 1.468 + 1.469 + return 0; 1.470 +} 1.471 + 1.472 +////////////////////////////////////////////////////////////////////////////////