1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/netwerk/base/src/nsStreamTransportService.cpp Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,553 @@ 1.4 +/* This Source Code Form is subject to the terms of the Mozilla Public 1.5 + * License, v. 2.0. If a copy of the MPL was not distributed with this 1.6 + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 1.7 + 1.8 +#include "nsStreamTransportService.h" 1.9 +#include "nsXPCOMCIDInternal.h" 1.10 +#include "nsNetSegmentUtils.h" 1.11 +#include "nsTransportUtils.h" 1.12 +#include "nsStreamUtils.h" 1.13 +#include "nsError.h" 1.14 +#include "nsNetCID.h" 1.15 + 1.16 +#include "nsIAsyncInputStream.h" 1.17 +#include "nsIAsyncOutputStream.h" 1.18 +#include "nsISeekableStream.h" 1.19 +#include "nsIPipe.h" 1.20 +#include "nsITransport.h" 1.21 +#include "nsIObserverService.h" 1.22 +#include "nsIThreadPool.h" 1.23 +#include "mozilla/Services.h" 1.24 + 1.25 +//----------------------------------------------------------------------------- 1.26 +// nsInputStreamTransport 1.27 +// 1.28 +// Implements nsIInputStream as a wrapper around the real input stream. This 1.29 +// allows the transport to support seeking, range-limiting, progress reporting, 1.30 +// and close-when-done semantics while utilizing NS_AsyncCopy. 1.31 +//----------------------------------------------------------------------------- 1.32 + 1.33 +class nsInputStreamTransport : public nsITransport 1.34 + , public nsIInputStream 1.35 +{ 1.36 +public: 1.37 + NS_DECL_THREADSAFE_ISUPPORTS 1.38 + NS_DECL_NSITRANSPORT 1.39 + NS_DECL_NSIINPUTSTREAM 1.40 + 1.41 + nsInputStreamTransport(nsIInputStream *source, 1.42 + uint64_t offset, 1.43 + uint64_t limit, 1.44 + bool closeWhenDone) 1.45 + : mSource(source) 1.46 + , mOffset(offset) 1.47 + , mLimit(limit) 1.48 + , mCloseWhenDone(closeWhenDone) 1.49 + , mFirstTime(true) 1.50 + , mInProgress(false) 1.51 + { 1.52 + } 1.53 + 1.54 + virtual ~nsInputStreamTransport() 1.55 + { 1.56 + } 1.57 + 1.58 +private: 1.59 + nsCOMPtr<nsIAsyncInputStream> mPipeIn; 1.60 + 1.61 + // while the copy is active, these members may only be accessed from the 1.62 + // nsIInputStream implementation. 1.63 + nsCOMPtr<nsITransportEventSink> mEventSink; 1.64 + nsCOMPtr<nsIInputStream> mSource; 1.65 + uint64_t mOffset; 1.66 + uint64_t mLimit; 1.67 + bool mCloseWhenDone; 1.68 + bool mFirstTime; 1.69 + 1.70 + // this variable serves as a lock to prevent the state of the transport 1.71 + // from being modified once the copy is in progress. 1.72 + bool mInProgress; 1.73 +}; 1.74 + 1.75 +NS_IMPL_ISUPPORTS(nsInputStreamTransport, 1.76 + nsITransport, 1.77 + nsIInputStream) 1.78 + 1.79 +/** nsITransport **/ 1.80 + 1.81 +NS_IMETHODIMP 1.82 +nsInputStreamTransport::OpenInputStream(uint32_t flags, 1.83 + uint32_t segsize, 1.84 + uint32_t segcount, 1.85 + nsIInputStream **result) 1.86 +{ 1.87 + NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS); 1.88 + 1.89 + nsresult rv; 1.90 + nsCOMPtr<nsIEventTarget> target = 1.91 + do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv); 1.92 + if (NS_FAILED(rv)) return rv; 1.93 + 1.94 + // XXX if the caller requests an unbuffered stream, then perhaps 1.95 + // we'd want to simply return mSource; however, then we would 1.96 + // not be reading mSource on a background thread. is this ok? 1.97 + 1.98 + bool nonblocking = !(flags & OPEN_BLOCKING); 1.99 + 1.100 + net_ResolveSegmentParams(segsize, segcount); 1.101 + 1.102 + nsCOMPtr<nsIAsyncOutputStream> pipeOut; 1.103 + rv = NS_NewPipe2(getter_AddRefs(mPipeIn), 1.104 + getter_AddRefs(pipeOut), 1.105 + nonblocking, true, 1.106 + segsize, segcount); 1.107 + if (NS_FAILED(rv)) return rv; 1.108 + 1.109 + mInProgress = true; 1.110 + 1.111 + // startup async copy process... 1.112 + rv = NS_AsyncCopy(this, pipeOut, target, 1.113 + NS_ASYNCCOPY_VIA_WRITESEGMENTS, segsize); 1.114 + if (NS_SUCCEEDED(rv)) 1.115 + NS_ADDREF(*result = mPipeIn); 1.116 + 1.117 + return rv; 1.118 +} 1.119 + 1.120 +NS_IMETHODIMP 1.121 +nsInputStreamTransport::OpenOutputStream(uint32_t flags, 1.122 + uint32_t segsize, 1.123 + uint32_t segcount, 1.124 + nsIOutputStream **result) 1.125 +{ 1.126 + // this transport only supports reading! 1.127 + NS_NOTREACHED("nsInputStreamTransport::OpenOutputStream"); 1.128 + return NS_ERROR_UNEXPECTED; 1.129 +} 1.130 + 1.131 +NS_IMETHODIMP 1.132 +nsInputStreamTransport::Close(nsresult reason) 1.133 +{ 1.134 + if (NS_SUCCEEDED(reason)) 1.135 + reason = NS_BASE_STREAM_CLOSED; 1.136 + 1.137 + return mPipeIn->CloseWithStatus(reason); 1.138 +} 1.139 + 1.140 +NS_IMETHODIMP 1.141 +nsInputStreamTransport::SetEventSink(nsITransportEventSink *sink, 1.142 + nsIEventTarget *target) 1.143 +{ 1.144 + NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS); 1.145 + 1.146 + if (target) 1.147 + return net_NewTransportEventSinkProxy(getter_AddRefs(mEventSink), 1.148 + sink, target); 1.149 + 1.150 + mEventSink = sink; 1.151 + return NS_OK; 1.152 +} 1.153 + 1.154 +/** nsIInputStream **/ 1.155 + 1.156 +NS_IMETHODIMP 1.157 +nsInputStreamTransport::Close() 1.158 +{ 1.159 + if (mCloseWhenDone) 1.160 + mSource->Close(); 1.161 + 1.162 + // make additional reads return early... 1.163 + mOffset = mLimit = 0; 1.164 + return NS_OK; 1.165 +} 1.166 + 1.167 +NS_IMETHODIMP 1.168 +nsInputStreamTransport::Available(uint64_t *result) 1.169 +{ 1.170 + return NS_ERROR_NOT_IMPLEMENTED; 1.171 +} 1.172 + 1.173 +NS_IMETHODIMP 1.174 +nsInputStreamTransport::Read(char *buf, uint32_t count, uint32_t *result) 1.175 +{ 1.176 + if (mFirstTime) { 1.177 + mFirstTime = false; 1.178 + if (mOffset != 0) { 1.179 + // read from current position if offset equal to max 1.180 + if (mOffset != UINT64_MAX) { 1.181 + nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mSource); 1.182 + if (seekable) 1.183 + seekable->Seek(nsISeekableStream::NS_SEEK_SET, mOffset); 1.184 + } 1.185 + // reset offset to zero so we can use it to enforce limit 1.186 + mOffset = 0; 1.187 + } 1.188 + } 1.189 + 1.190 + // limit amount read 1.191 + uint64_t max = mLimit - mOffset; 1.192 + if (max == 0) { 1.193 + *result = 0; 1.194 + return NS_OK; 1.195 + } 1.196 + 1.197 + if (count > max) 1.198 + count = static_cast<uint32_t>(max); 1.199 + 1.200 + nsresult rv = mSource->Read(buf, count, result); 1.201 + 1.202 + if (NS_SUCCEEDED(rv)) { 1.203 + mOffset += *result; 1.204 + if (mEventSink) 1.205 + mEventSink->OnTransportStatus(this, NS_NET_STATUS_READING, mOffset, 1.206 + mLimit); 1.207 + } 1.208 + return rv; 1.209 +} 1.210 + 1.211 +NS_IMETHODIMP 1.212 +nsInputStreamTransport::ReadSegments(nsWriteSegmentFun writer, void *closure, 1.213 + uint32_t count, uint32_t *result) 1.214 +{ 1.215 + return NS_ERROR_NOT_IMPLEMENTED; 1.216 +} 1.217 + 1.218 +NS_IMETHODIMP 1.219 +nsInputStreamTransport::IsNonBlocking(bool *result) 1.220 +{ 1.221 + *result = false; 1.222 + return NS_OK; 1.223 +} 1.224 + 1.225 +//----------------------------------------------------------------------------- 1.226 +// nsOutputStreamTransport 1.227 +// 1.228 +// Implements nsIOutputStream as a wrapper around the real input stream. This 1.229 +// allows the transport to support seeking, range-limiting, progress reporting, 1.230 +// and close-when-done semantics while utilizing NS_AsyncCopy. 1.231 +//----------------------------------------------------------------------------- 1.232 + 1.233 +class nsOutputStreamTransport : public nsITransport 1.234 + , public nsIOutputStream 1.235 +{ 1.236 +public: 1.237 + NS_DECL_THREADSAFE_ISUPPORTS 1.238 + NS_DECL_NSITRANSPORT 1.239 + NS_DECL_NSIOUTPUTSTREAM 1.240 + 1.241 + nsOutputStreamTransport(nsIOutputStream *sink, 1.242 + uint64_t offset, 1.243 + uint64_t limit, 1.244 + bool closeWhenDone) 1.245 + : mSink(sink) 1.246 + , mOffset(offset) 1.247 + , mLimit(limit) 1.248 + , mCloseWhenDone(closeWhenDone) 1.249 + , mFirstTime(true) 1.250 + , mInProgress(false) 1.251 + { 1.252 + } 1.253 + 1.254 + virtual ~nsOutputStreamTransport() 1.255 + { 1.256 + } 1.257 + 1.258 +private: 1.259 + nsCOMPtr<nsIAsyncOutputStream> mPipeOut; 1.260 + 1.261 + // while the copy is active, these members may only be accessed from the 1.262 + // nsIOutputStream implementation. 1.263 + nsCOMPtr<nsITransportEventSink> mEventSink; 1.264 + nsCOMPtr<nsIOutputStream> mSink; 1.265 + uint64_t mOffset; 1.266 + uint64_t mLimit; 1.267 + bool mCloseWhenDone; 1.268 + bool mFirstTime; 1.269 + 1.270 + // this variable serves as a lock to prevent the state of the transport 1.271 + // from being modified once the copy is in progress. 1.272 + bool mInProgress; 1.273 +}; 1.274 + 1.275 +NS_IMPL_ISUPPORTS(nsOutputStreamTransport, 1.276 + nsITransport, 1.277 + nsIOutputStream) 1.278 + 1.279 +/** nsITransport **/ 1.280 + 1.281 +NS_IMETHODIMP 1.282 +nsOutputStreamTransport::OpenInputStream(uint32_t flags, 1.283 + uint32_t segsize, 1.284 + uint32_t segcount, 1.285 + nsIInputStream **result) 1.286 +{ 1.287 + // this transport only supports writing! 1.288 + NS_NOTREACHED("nsOutputStreamTransport::OpenInputStream"); 1.289 + return NS_ERROR_UNEXPECTED; 1.290 +} 1.291 + 1.292 +NS_IMETHODIMP 1.293 +nsOutputStreamTransport::OpenOutputStream(uint32_t flags, 1.294 + uint32_t segsize, 1.295 + uint32_t segcount, 1.296 + nsIOutputStream **result) 1.297 +{ 1.298 + NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS); 1.299 + 1.300 + nsresult rv; 1.301 + nsCOMPtr<nsIEventTarget> target = 1.302 + do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv); 1.303 + if (NS_FAILED(rv)) return rv; 1.304 + 1.305 + // XXX if the caller requests an unbuffered stream, then perhaps 1.306 + // we'd want to simply return mSink; however, then we would 1.307 + // not be writing to mSink on a background thread. is this ok? 1.308 + 1.309 + bool nonblocking = !(flags & OPEN_BLOCKING); 1.310 + 1.311 + net_ResolveSegmentParams(segsize, segcount); 1.312 + 1.313 + nsCOMPtr<nsIAsyncInputStream> pipeIn; 1.314 + rv = NS_NewPipe2(getter_AddRefs(pipeIn), 1.315 + getter_AddRefs(mPipeOut), 1.316 + true, nonblocking, 1.317 + segsize, segcount); 1.318 + if (NS_FAILED(rv)) return rv; 1.319 + 1.320 + mInProgress = true; 1.321 + 1.322 + // startup async copy process... 1.323 + rv = NS_AsyncCopy(pipeIn, this, target, 1.324 + NS_ASYNCCOPY_VIA_READSEGMENTS, segsize); 1.325 + if (NS_SUCCEEDED(rv)) 1.326 + NS_ADDREF(*result = mPipeOut); 1.327 + 1.328 + return rv; 1.329 +} 1.330 + 1.331 +NS_IMETHODIMP 1.332 +nsOutputStreamTransport::Close(nsresult reason) 1.333 +{ 1.334 + if (NS_SUCCEEDED(reason)) 1.335 + reason = NS_BASE_STREAM_CLOSED; 1.336 + 1.337 + return mPipeOut->CloseWithStatus(reason); 1.338 +} 1.339 + 1.340 +NS_IMETHODIMP 1.341 +nsOutputStreamTransport::SetEventSink(nsITransportEventSink *sink, 1.342 + nsIEventTarget *target) 1.343 +{ 1.344 + NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS); 1.345 + 1.346 + if (target) 1.347 + return net_NewTransportEventSinkProxy(getter_AddRefs(mEventSink), 1.348 + sink, target); 1.349 + 1.350 + mEventSink = sink; 1.351 + return NS_OK; 1.352 +} 1.353 + 1.354 +/** nsIOutputStream **/ 1.355 + 1.356 +NS_IMETHODIMP 1.357 +nsOutputStreamTransport::Close() 1.358 +{ 1.359 + if (mCloseWhenDone) 1.360 + mSink->Close(); 1.361 + 1.362 + // make additional writes return early... 1.363 + mOffset = mLimit = 0; 1.364 + return NS_OK; 1.365 +} 1.366 + 1.367 +NS_IMETHODIMP 1.368 +nsOutputStreamTransport::Flush() 1.369 +{ 1.370 + return NS_OK; 1.371 +} 1.372 + 1.373 +NS_IMETHODIMP 1.374 +nsOutputStreamTransport::Write(const char *buf, uint32_t count, uint32_t *result) 1.375 +{ 1.376 + if (mFirstTime) { 1.377 + mFirstTime = false; 1.378 + if (mOffset != 0) { 1.379 + // write to current position if offset equal to max 1.380 + if (mOffset != UINT64_MAX) { 1.381 + nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mSink); 1.382 + if (seekable) 1.383 + seekable->Seek(nsISeekableStream::NS_SEEK_SET, mOffset); 1.384 + } 1.385 + // reset offset to zero so we can use it to enforce limit 1.386 + mOffset = 0; 1.387 + } 1.388 + } 1.389 + 1.390 + // limit amount written 1.391 + uint64_t max = mLimit - mOffset; 1.392 + if (max == 0) { 1.393 + *result = 0; 1.394 + return NS_OK; 1.395 + } 1.396 + 1.397 + if (count > max) 1.398 + count = static_cast<uint32_t>(max); 1.399 + 1.400 + nsresult rv = mSink->Write(buf, count, result); 1.401 + 1.402 + if (NS_SUCCEEDED(rv)) { 1.403 + mOffset += *result; 1.404 + if (mEventSink) 1.405 + mEventSink->OnTransportStatus(this, NS_NET_STATUS_WRITING, mOffset, 1.406 + mLimit); 1.407 + } 1.408 + return rv; 1.409 +} 1.410 + 1.411 +NS_IMETHODIMP 1.412 +nsOutputStreamTransport::WriteSegments(nsReadSegmentFun reader, void *closure, 1.413 + uint32_t count, uint32_t *result) 1.414 +{ 1.415 + return NS_ERROR_NOT_IMPLEMENTED; 1.416 +} 1.417 + 1.418 +NS_IMETHODIMP 1.419 +nsOutputStreamTransport::WriteFrom(nsIInputStream *in, uint32_t count, uint32_t *result) 1.420 +{ 1.421 + return NS_ERROR_NOT_IMPLEMENTED; 1.422 +} 1.423 + 1.424 +NS_IMETHODIMP 1.425 +nsOutputStreamTransport::IsNonBlocking(bool *result) 1.426 +{ 1.427 + *result = false; 1.428 + return NS_OK; 1.429 +} 1.430 + 1.431 +#ifdef MOZ_NUWA_PROCESS 1.432 +#include "ipc/Nuwa.h" 1.433 + 1.434 +class STSThreadPoolListener : public nsIThreadPoolListener 1.435 +{ 1.436 +public: 1.437 + NS_DECL_THREADSAFE_ISUPPORTS 1.438 + NS_DECL_NSITHREADPOOLLISTENER 1.439 + 1.440 + STSThreadPoolListener() {} 1.441 + ~STSThreadPoolListener() {} 1.442 +}; 1.443 + 1.444 +NS_IMPL_ISUPPORTS(STSThreadPoolListener, nsIThreadPoolListener) 1.445 + 1.446 +NS_IMETHODIMP 1.447 +STSThreadPoolListener::OnThreadCreated() 1.448 +{ 1.449 + if (IsNuwaProcess()) { 1.450 + NuwaMarkCurrentThread(nullptr, nullptr); 1.451 + } 1.452 + return NS_OK; 1.453 +} 1.454 + 1.455 +NS_IMETHODIMP 1.456 +STSThreadPoolListener::OnThreadShuttingDown() 1.457 +{ 1.458 + return NS_OK; 1.459 +} 1.460 + 1.461 +#endif // MOZ_NUWA_PROCESS 1.462 + 1.463 +//----------------------------------------------------------------------------- 1.464 +// nsStreamTransportService 1.465 +//----------------------------------------------------------------------------- 1.466 + 1.467 +nsStreamTransportService::~nsStreamTransportService() 1.468 +{ 1.469 + NS_ASSERTION(!mPool, "thread pool wasn't shutdown"); 1.470 +} 1.471 + 1.472 +nsresult 1.473 +nsStreamTransportService::Init() 1.474 +{ 1.475 + mPool = do_CreateInstance(NS_THREADPOOL_CONTRACTID); 1.476 + NS_ENSURE_STATE(mPool); 1.477 + 1.478 + // Configure the pool 1.479 + mPool->SetName(NS_LITERAL_CSTRING("StreamTrans")); 1.480 + mPool->SetThreadLimit(25); 1.481 + mPool->SetIdleThreadLimit(1); 1.482 + mPool->SetIdleThreadTimeout(PR_SecondsToInterval(30)); 1.483 +#ifdef MOZ_NUWA_PROCESS 1.484 + if (IsNuwaProcess()) { 1.485 + mPool->SetListener(new STSThreadPoolListener()); 1.486 + } 1.487 +#endif 1.488 + 1.489 + nsCOMPtr<nsIObserverService> obsSvc = 1.490 + mozilla::services::GetObserverService(); 1.491 + if (obsSvc) 1.492 + obsSvc->AddObserver(this, "xpcom-shutdown-threads", false); 1.493 + return NS_OK; 1.494 +} 1.495 + 1.496 +NS_IMPL_ISUPPORTS(nsStreamTransportService, 1.497 + nsIStreamTransportService, 1.498 + nsIEventTarget, 1.499 + nsIObserver) 1.500 + 1.501 +NS_IMETHODIMP 1.502 +nsStreamTransportService::Dispatch(nsIRunnable *task, uint32_t flags) 1.503 +{ 1.504 + NS_ENSURE_TRUE(mPool, NS_ERROR_NOT_INITIALIZED); 1.505 + return mPool->Dispatch(task, flags); 1.506 +} 1.507 + 1.508 +NS_IMETHODIMP 1.509 +nsStreamTransportService::IsOnCurrentThread(bool *result) 1.510 +{ 1.511 + NS_ENSURE_TRUE(mPool, NS_ERROR_NOT_INITIALIZED); 1.512 + return mPool->IsOnCurrentThread(result); 1.513 +} 1.514 + 1.515 +NS_IMETHODIMP 1.516 +nsStreamTransportService::CreateInputTransport(nsIInputStream *stream, 1.517 + int64_t offset, 1.518 + int64_t limit, 1.519 + bool closeWhenDone, 1.520 + nsITransport **result) 1.521 +{ 1.522 + nsInputStreamTransport *trans = 1.523 + new nsInputStreamTransport(stream, offset, limit, closeWhenDone); 1.524 + if (!trans) 1.525 + return NS_ERROR_OUT_OF_MEMORY; 1.526 + NS_ADDREF(*result = trans); 1.527 + return NS_OK; 1.528 +} 1.529 + 1.530 +NS_IMETHODIMP 1.531 +nsStreamTransportService::CreateOutputTransport(nsIOutputStream *stream, 1.532 + int64_t offset, 1.533 + int64_t limit, 1.534 + bool closeWhenDone, 1.535 + nsITransport **result) 1.536 +{ 1.537 + nsOutputStreamTransport *trans = 1.538 + new nsOutputStreamTransport(stream, offset, limit, closeWhenDone); 1.539 + if (!trans) 1.540 + return NS_ERROR_OUT_OF_MEMORY; 1.541 + NS_ADDREF(*result = trans); 1.542 + return NS_OK; 1.543 +} 1.544 + 1.545 +NS_IMETHODIMP 1.546 +nsStreamTransportService::Observe(nsISupports *subject, const char *topic, 1.547 + const char16_t *data) 1.548 +{ 1.549 + NS_ASSERTION(strcmp(topic, "xpcom-shutdown-threads") == 0, "oops"); 1.550 + 1.551 + if (mPool) { 1.552 + mPool->Shutdown(); 1.553 + mPool = nullptr; 1.554 + } 1.555 + return NS_OK; 1.556 +}