1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/netwerk/base/src/nsAsyncStreamCopier.cpp Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,422 @@ 1.4 +/* This Source Code Form is subject to the terms of the Mozilla Public 1.5 + * License, v. 2.0. If a copy of the MPL was not distributed with this 1.6 + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 1.7 + 1.8 +#include "nsAsyncStreamCopier.h" 1.9 +#include "nsIOService.h" 1.10 +#include "nsIEventTarget.h" 1.11 +#include "nsStreamUtils.h" 1.12 +#include "nsThreadUtils.h" 1.13 +#include "nsNetUtil.h" 1.14 +#include "prlog.h" 1.15 + 1.16 +using namespace mozilla; 1.17 + 1.18 +#undef LOG 1.19 +#if defined(PR_LOGGING) 1.20 +// 1.21 +// NSPR_LOG_MODULES=nsStreamCopier:5 1.22 +// 1.23 +static PRLogModuleInfo *gStreamCopierLog = nullptr; 1.24 +#endif 1.25 +#define LOG(args) PR_LOG(gStreamCopierLog, PR_LOG_DEBUG, args) 1.26 + 1.27 +/** 1.28 + * An event used to perform initialization off the main thread. 1.29 + */ 1.30 +class AsyncApplyBufferingPolicyEvent MOZ_FINAL: public nsRunnable 1.31 +{ 1.32 +public: 1.33 + /** 1.34 + * @param aCopier 1.35 + * The nsAsyncStreamCopier requesting the information. 1.36 + */ 1.37 + AsyncApplyBufferingPolicyEvent(nsAsyncStreamCopier* aCopier) 1.38 + : mCopier(aCopier) 1.39 + , mTarget(NS_GetCurrentThread()) 1.40 + { } 1.41 + NS_METHOD Run() 1.42 + { 1.43 + nsresult rv = mCopier->ApplyBufferingPolicy(); 1.44 + if (NS_FAILED(rv)) { 1.45 + mCopier->Cancel(rv); 1.46 + return NS_OK; 1.47 + } 1.48 + 1.49 + nsCOMPtr<nsIRunnable> event = NS_NewRunnableMethod(mCopier, &nsAsyncStreamCopier::AsyncCopyInternal); 1.50 + rv = mTarget->Dispatch(event, NS_DISPATCH_NORMAL); 1.51 + MOZ_ASSERT(NS_SUCCEEDED(rv)); 1.52 + 1.53 + if (NS_FAILED(rv)) { 1.54 + mCopier->Cancel(rv); 1.55 + } 1.56 + return NS_OK; 1.57 + } 1.58 +private: 1.59 + nsRefPtr<nsAsyncStreamCopier> mCopier; 1.60 + nsCOMPtr<nsIEventTarget> mTarget; 1.61 +}; 1.62 + 1.63 + 1.64 + 1.65 +//----------------------------------------------------------------------------- 1.66 + 1.67 +nsAsyncStreamCopier::nsAsyncStreamCopier() 1.68 + : mLock("nsAsyncStreamCopier.mLock") 1.69 + , mMode(NS_ASYNCCOPY_VIA_READSEGMENTS) 1.70 + , mChunkSize(nsIOService::gDefaultSegmentSize) 1.71 + , mStatus(NS_OK) 1.72 + , mIsPending(false) 1.73 + , mShouldSniffBuffering(false) 1.74 +{ 1.75 +#if defined(PR_LOGGING) 1.76 + if (!gStreamCopierLog) 1.77 + gStreamCopierLog = PR_NewLogModule("nsStreamCopier"); 1.78 +#endif 1.79 + LOG(("Creating nsAsyncStreamCopier @%x\n", this)); 1.80 +} 1.81 + 1.82 +nsAsyncStreamCopier::~nsAsyncStreamCopier() 1.83 +{ 1.84 + LOG(("Destroying nsAsyncStreamCopier @%x\n", this)); 1.85 +} 1.86 + 1.87 +bool 1.88 +nsAsyncStreamCopier::IsComplete(nsresult *status) 1.89 +{ 1.90 + MutexAutoLock lock(mLock); 1.91 + if (status) 1.92 + *status = mStatus; 1.93 + return !mIsPending; 1.94 +} 1.95 + 1.96 +nsIRequest* 1.97 +nsAsyncStreamCopier::AsRequest() 1.98 +{ 1.99 + return static_cast<nsIRequest*>(static_cast<nsIAsyncStreamCopier*>(this)); 1.100 +} 1.101 + 1.102 +void 1.103 +nsAsyncStreamCopier::Complete(nsresult status) 1.104 +{ 1.105 + LOG(("nsAsyncStreamCopier::Complete [this=%p status=%x]\n", this, status)); 1.106 + 1.107 + nsCOMPtr<nsIRequestObserver> observer; 1.108 + nsCOMPtr<nsISupports> ctx; 1.109 + { 1.110 + MutexAutoLock lock(mLock); 1.111 + mCopierCtx = nullptr; 1.112 + 1.113 + if (mIsPending) { 1.114 + mIsPending = false; 1.115 + mStatus = status; 1.116 + 1.117 + // setup OnStopRequest callback and release references... 1.118 + observer = mObserver; 1.119 + mObserver = nullptr; 1.120 + } 1.121 + } 1.122 + 1.123 + if (observer) { 1.124 + LOG((" calling OnStopRequest [status=%x]\n", status)); 1.125 + observer->OnStopRequest(AsRequest(), ctx, status); 1.126 + } 1.127 +} 1.128 + 1.129 +void 1.130 +nsAsyncStreamCopier::OnAsyncCopyComplete(void *closure, nsresult status) 1.131 +{ 1.132 + nsAsyncStreamCopier *self = (nsAsyncStreamCopier *) closure; 1.133 + self->Complete(status); 1.134 + NS_RELEASE(self); // addref'd in AsyncCopy 1.135 +} 1.136 + 1.137 +//----------------------------------------------------------------------------- 1.138 +// nsISupports 1.139 + 1.140 +// We cannot use simply NS_IMPL_ISUPPORTSx as both 1.141 +// nsIAsyncStreamCopier and nsIAsyncStreamCopier2 implement nsIRequest 1.142 + 1.143 +NS_IMPL_ADDREF(nsAsyncStreamCopier) 1.144 +NS_IMPL_RELEASE(nsAsyncStreamCopier) 1.145 +NS_INTERFACE_TABLE_HEAD(nsAsyncStreamCopier) 1.146 +NS_INTERFACE_TABLE_BEGIN 1.147 +NS_INTERFACE_TABLE_ENTRY(nsAsyncStreamCopier, nsIAsyncStreamCopier) 1.148 +NS_INTERFACE_TABLE_ENTRY(nsAsyncStreamCopier, nsIAsyncStreamCopier2) 1.149 +NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsAsyncStreamCopier, nsIRequest, nsIAsyncStreamCopier) 1.150 +NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsAsyncStreamCopier, nsISupports, nsIAsyncStreamCopier) 1.151 +NS_INTERFACE_TABLE_END 1.152 +NS_INTERFACE_TABLE_TAIL 1.153 + 1.154 +//----------------------------------------------------------------------------- 1.155 +// nsIRequest 1.156 + 1.157 +NS_IMETHODIMP 1.158 +nsAsyncStreamCopier::GetName(nsACString &name) 1.159 +{ 1.160 + name.Truncate(); 1.161 + return NS_OK; 1.162 +} 1.163 + 1.164 +NS_IMETHODIMP 1.165 +nsAsyncStreamCopier::IsPending(bool *result) 1.166 +{ 1.167 + *result = !IsComplete(); 1.168 + return NS_OK; 1.169 +} 1.170 + 1.171 +NS_IMETHODIMP 1.172 +nsAsyncStreamCopier::GetStatus(nsresult *status) 1.173 +{ 1.174 + IsComplete(status); 1.175 + return NS_OK; 1.176 +} 1.177 + 1.178 +NS_IMETHODIMP 1.179 +nsAsyncStreamCopier::Cancel(nsresult status) 1.180 +{ 1.181 + nsCOMPtr<nsISupports> copierCtx; 1.182 + { 1.183 + MutexAutoLock lock(mLock); 1.184 + if (!mIsPending) 1.185 + return NS_OK; 1.186 + copierCtx.swap(mCopierCtx); 1.187 + } 1.188 + 1.189 + if (NS_SUCCEEDED(status)) { 1.190 + NS_WARNING("cancel with non-failure status code"); 1.191 + status = NS_BASE_STREAM_CLOSED; 1.192 + } 1.193 + 1.194 + if (copierCtx) 1.195 + NS_CancelAsyncCopy(copierCtx, status); 1.196 + 1.197 + return NS_OK; 1.198 +} 1.199 + 1.200 +NS_IMETHODIMP 1.201 +nsAsyncStreamCopier::Suspend() 1.202 +{ 1.203 + NS_NOTREACHED("nsAsyncStreamCopier::Suspend"); 1.204 + return NS_ERROR_NOT_IMPLEMENTED; 1.205 +} 1.206 + 1.207 +NS_IMETHODIMP 1.208 +nsAsyncStreamCopier::Resume() 1.209 +{ 1.210 + NS_NOTREACHED("nsAsyncStreamCopier::Resume"); 1.211 + return NS_ERROR_NOT_IMPLEMENTED; 1.212 +} 1.213 + 1.214 +NS_IMETHODIMP 1.215 +nsAsyncStreamCopier::GetLoadFlags(nsLoadFlags *aLoadFlags) 1.216 +{ 1.217 + *aLoadFlags = LOAD_NORMAL; 1.218 + return NS_OK; 1.219 +} 1.220 + 1.221 +NS_IMETHODIMP 1.222 +nsAsyncStreamCopier::SetLoadFlags(nsLoadFlags aLoadFlags) 1.223 +{ 1.224 + return NS_OK; 1.225 +} 1.226 + 1.227 +NS_IMETHODIMP 1.228 +nsAsyncStreamCopier::GetLoadGroup(nsILoadGroup **aLoadGroup) 1.229 +{ 1.230 + *aLoadGroup = nullptr; 1.231 + return NS_OK; 1.232 +} 1.233 + 1.234 +NS_IMETHODIMP 1.235 +nsAsyncStreamCopier::SetLoadGroup(nsILoadGroup *aLoadGroup) 1.236 +{ 1.237 + return NS_OK; 1.238 +} 1.239 + 1.240 +nsresult 1.241 +nsAsyncStreamCopier::InitInternal(nsIInputStream *source, 1.242 + nsIOutputStream *sink, 1.243 + nsIEventTarget *target, 1.244 + uint32_t chunkSize, 1.245 + bool closeSource, 1.246 + bool closeSink) 1.247 +{ 1.248 + NS_ASSERTION(!mSource && !mSink, "Init() called more than once"); 1.249 + if (chunkSize == 0) { 1.250 + chunkSize = nsIOService::gDefaultSegmentSize; 1.251 + } 1.252 + mChunkSize = chunkSize; 1.253 + 1.254 + mSource = source; 1.255 + mSink = sink; 1.256 + mCloseSource = closeSource; 1.257 + mCloseSink = closeSink; 1.258 + 1.259 + if (target) { 1.260 + mTarget = target; 1.261 + } else { 1.262 + nsresult rv; 1.263 + mTarget = do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv); 1.264 + if (NS_FAILED(rv)) { 1.265 + return rv; 1.266 + } 1.267 + } 1.268 + 1.269 + return NS_OK; 1.270 +} 1.271 + 1.272 +//----------------------------------------------------------------------------- 1.273 +// nsIAsyncStreamCopier 1.274 + 1.275 +NS_IMETHODIMP 1.276 +nsAsyncStreamCopier::Init(nsIInputStream *source, 1.277 + nsIOutputStream *sink, 1.278 + nsIEventTarget *target, 1.279 + bool sourceBuffered, 1.280 + bool sinkBuffered, 1.281 + uint32_t chunkSize, 1.282 + bool closeSource, 1.283 + bool closeSink) 1.284 +{ 1.285 + NS_ASSERTION(sourceBuffered || sinkBuffered, "at least one stream must be buffered"); 1.286 + mMode = sourceBuffered ? NS_ASYNCCOPY_VIA_READSEGMENTS 1.287 + : NS_ASYNCCOPY_VIA_WRITESEGMENTS; 1.288 + 1.289 + return InitInternal(source, sink, target, chunkSize, closeSource, closeSink); 1.290 +} 1.291 + 1.292 +//----------------------------------------------------------------------------- 1.293 +// nsIAsyncStreamCopier2 1.294 + 1.295 +NS_IMETHODIMP 1.296 +nsAsyncStreamCopier::Init(nsIInputStream *source, 1.297 + nsIOutputStream *sink, 1.298 + nsIEventTarget *target, 1.299 + uint32_t chunkSize, 1.300 + bool closeSource, 1.301 + bool closeSink) 1.302 +{ 1.303 + mShouldSniffBuffering = true; 1.304 + 1.305 + return InitInternal(source, sink, target, chunkSize, closeSource, closeSink); 1.306 +} 1.307 + 1.308 +/** 1.309 + * Detect whether the input or the output stream is buffered, 1.310 + * bufferize one of them if neither is buffered. 1.311 + */ 1.312 +nsresult 1.313 +nsAsyncStreamCopier::ApplyBufferingPolicy() 1.314 +{ 1.315 + // This function causes I/O, it must not be executed on the main 1.316 + // thread. 1.317 + MOZ_ASSERT(!NS_IsMainThread()); 1.318 + 1.319 + if (NS_OutputStreamIsBuffered(mSink)) { 1.320 + // Sink is buffered, no need to perform additional buffering 1.321 + mMode = NS_ASYNCCOPY_VIA_WRITESEGMENTS; 1.322 + return NS_OK; 1.323 + } 1.324 + if (NS_InputStreamIsBuffered(mSource)) { 1.325 + // Source is buffered, no need to perform additional buffering 1.326 + mMode = NS_ASYNCCOPY_VIA_READSEGMENTS; 1.327 + return NS_OK; 1.328 + } 1.329 + 1.330 + // No buffering, let's buffer the sink 1.331 + nsresult rv; 1.332 + nsCOMPtr<nsIBufferedOutputStream> sink = 1.333 + do_CreateInstance(NS_BUFFEREDOUTPUTSTREAM_CONTRACTID, &rv); 1.334 + if (NS_FAILED(rv)) { 1.335 + return rv; 1.336 + } 1.337 + 1.338 + rv = sink->Init(mSink, mChunkSize); 1.339 + if (NS_FAILED(rv)) { 1.340 + return rv; 1.341 + } 1.342 + 1.343 + mMode = NS_ASYNCCOPY_VIA_WRITESEGMENTS; 1.344 + mSink = sink; 1.345 + return NS_OK; 1.346 +} 1.347 + 1.348 +//----------------------------------------------------------------------------- 1.349 +// Both nsIAsyncStreamCopier and nsIAsyncStreamCopier2 1.350 + 1.351 +NS_IMETHODIMP 1.352 +nsAsyncStreamCopier::AsyncCopy(nsIRequestObserver *observer, nsISupports *ctx) 1.353 +{ 1.354 + LOG(("nsAsyncStreamCopier::AsyncCopy [this=%p observer=%x]\n", this, observer)); 1.355 + 1.356 + NS_ASSERTION(mSource && mSink, "not initialized"); 1.357 + nsresult rv; 1.358 + 1.359 + if (observer) { 1.360 + // build proxy for observer events 1.361 + rv = NS_NewRequestObserverProxy(getter_AddRefs(mObserver), observer, ctx); 1.362 + if (NS_FAILED(rv)) return rv; 1.363 + } 1.364 + 1.365 + // from this point forward, AsyncCopy is going to return NS_OK. any errors 1.366 + // will be reported via OnStopRequest. 1.367 + mIsPending = true; 1.368 + 1.369 + if (mObserver) { 1.370 + rv = mObserver->OnStartRequest(AsRequest(), nullptr); 1.371 + if (NS_FAILED(rv)) 1.372 + Cancel(rv); 1.373 + } 1.374 + 1.375 + if (!mShouldSniffBuffering) { 1.376 + // No buffer sniffing required, let's proceed 1.377 + AsyncCopyInternal(); 1.378 + return NS_OK; 1.379 + } 1.380 + 1.381 + if (NS_IsMainThread()) { 1.382 + // Don't perform buffer sniffing on the main thread 1.383 + nsCOMPtr<AsyncApplyBufferingPolicyEvent> event 1.384 + = new AsyncApplyBufferingPolicyEvent(this); 1.385 + rv = mTarget->Dispatch(event, NS_DISPATCH_NORMAL); 1.386 + if (NS_FAILED(rv)) { 1.387 + Cancel(rv); 1.388 + } 1.389 + return NS_OK; 1.390 + } 1.391 + 1.392 + // We're not going to block the main thread, so let's sniff here 1.393 + rv = ApplyBufferingPolicy(); 1.394 + if (NS_FAILED(rv)) { 1.395 + Cancel(rv); 1.396 + } 1.397 + AsyncCopyInternal(); 1.398 + return NS_OK; 1.399 +} 1.400 + 1.401 +// Launch async copy. 1.402 +// All errors are reported through the observer. 1.403 +void 1.404 +nsAsyncStreamCopier::AsyncCopyInternal() 1.405 +{ 1.406 + MOZ_ASSERT(mMode == NS_ASYNCCOPY_VIA_READSEGMENTS 1.407 + || mMode == NS_ASYNCCOPY_VIA_WRITESEGMENTS); 1.408 + 1.409 + nsresult rv; 1.410 + // we want to receive progress notifications; release happens in 1.411 + // OnAsyncCopyComplete. 1.412 + NS_ADDREF_THIS(); 1.413 + { 1.414 + MutexAutoLock lock(mLock); 1.415 + rv = NS_AsyncCopy(mSource, mSink, mTarget, mMode, mChunkSize, 1.416 + OnAsyncCopyComplete, this, mCloseSource, mCloseSink, 1.417 + getter_AddRefs(mCopierCtx)); 1.418 + } 1.419 + if (NS_FAILED(rv)) { 1.420 + NS_RELEASE_THIS(); 1.421 + Cancel(rv); 1.422 + } 1.423 +} 1.424 + 1.425 +