|
1 /* This Source Code Form is subject to the terms of the Mozilla Public |
|
2 * License, v. 2.0. If a copy of the MPL was not distributed with this |
|
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ |
|
4 |
|
5 #include "nsAsyncStreamCopier.h" |
|
6 #include "nsIOService.h" |
|
7 #include "nsIEventTarget.h" |
|
8 #include "nsStreamUtils.h" |
|
9 #include "nsThreadUtils.h" |
|
10 #include "nsNetUtil.h" |
|
11 #include "prlog.h" |
|
12 |
|
13 using namespace mozilla; |
|
14 |
|
15 #undef LOG |
|
16 #if defined(PR_LOGGING) |
|
17 // |
|
18 // NSPR_LOG_MODULES=nsStreamCopier:5 |
|
19 // |
|
20 static PRLogModuleInfo *gStreamCopierLog = nullptr; |
|
21 #endif |
|
22 #define LOG(args) PR_LOG(gStreamCopierLog, PR_LOG_DEBUG, args) |
|
23 |
|
24 /** |
|
25 * An event used to perform initialization off the main thread. |
|
26 */ |
|
27 class AsyncApplyBufferingPolicyEvent MOZ_FINAL: public nsRunnable |
|
28 { |
|
29 public: |
|
30 /** |
|
31 * @param aCopier |
|
32 * The nsAsyncStreamCopier requesting the information. |
|
33 */ |
|
34 AsyncApplyBufferingPolicyEvent(nsAsyncStreamCopier* aCopier) |
|
35 : mCopier(aCopier) |
|
36 , mTarget(NS_GetCurrentThread()) |
|
37 { } |
|
38 NS_METHOD Run() |
|
39 { |
|
40 nsresult rv = mCopier->ApplyBufferingPolicy(); |
|
41 if (NS_FAILED(rv)) { |
|
42 mCopier->Cancel(rv); |
|
43 return NS_OK; |
|
44 } |
|
45 |
|
46 nsCOMPtr<nsIRunnable> event = NS_NewRunnableMethod(mCopier, &nsAsyncStreamCopier::AsyncCopyInternal); |
|
47 rv = mTarget->Dispatch(event, NS_DISPATCH_NORMAL); |
|
48 MOZ_ASSERT(NS_SUCCEEDED(rv)); |
|
49 |
|
50 if (NS_FAILED(rv)) { |
|
51 mCopier->Cancel(rv); |
|
52 } |
|
53 return NS_OK; |
|
54 } |
|
55 private: |
|
56 nsRefPtr<nsAsyncStreamCopier> mCopier; |
|
57 nsCOMPtr<nsIEventTarget> mTarget; |
|
58 }; |
|
59 |
|
60 |
|
61 |
|
62 //----------------------------------------------------------------------------- |
|
63 |
|
64 nsAsyncStreamCopier::nsAsyncStreamCopier() |
|
65 : mLock("nsAsyncStreamCopier.mLock") |
|
66 , mMode(NS_ASYNCCOPY_VIA_READSEGMENTS) |
|
67 , mChunkSize(nsIOService::gDefaultSegmentSize) |
|
68 , mStatus(NS_OK) |
|
69 , mIsPending(false) |
|
70 , mShouldSniffBuffering(false) |
|
71 { |
|
72 #if defined(PR_LOGGING) |
|
73 if (!gStreamCopierLog) |
|
74 gStreamCopierLog = PR_NewLogModule("nsStreamCopier"); |
|
75 #endif |
|
76 LOG(("Creating nsAsyncStreamCopier @%x\n", this)); |
|
77 } |
|
78 |
|
79 nsAsyncStreamCopier::~nsAsyncStreamCopier() |
|
80 { |
|
81 LOG(("Destroying nsAsyncStreamCopier @%x\n", this)); |
|
82 } |
|
83 |
|
84 bool |
|
85 nsAsyncStreamCopier::IsComplete(nsresult *status) |
|
86 { |
|
87 MutexAutoLock lock(mLock); |
|
88 if (status) |
|
89 *status = mStatus; |
|
90 return !mIsPending; |
|
91 } |
|
92 |
|
93 nsIRequest* |
|
94 nsAsyncStreamCopier::AsRequest() |
|
95 { |
|
96 return static_cast<nsIRequest*>(static_cast<nsIAsyncStreamCopier*>(this)); |
|
97 } |
|
98 |
|
99 void |
|
100 nsAsyncStreamCopier::Complete(nsresult status) |
|
101 { |
|
102 LOG(("nsAsyncStreamCopier::Complete [this=%p status=%x]\n", this, status)); |
|
103 |
|
104 nsCOMPtr<nsIRequestObserver> observer; |
|
105 nsCOMPtr<nsISupports> ctx; |
|
106 { |
|
107 MutexAutoLock lock(mLock); |
|
108 mCopierCtx = nullptr; |
|
109 |
|
110 if (mIsPending) { |
|
111 mIsPending = false; |
|
112 mStatus = status; |
|
113 |
|
114 // setup OnStopRequest callback and release references... |
|
115 observer = mObserver; |
|
116 mObserver = nullptr; |
|
117 } |
|
118 } |
|
119 |
|
120 if (observer) { |
|
121 LOG((" calling OnStopRequest [status=%x]\n", status)); |
|
122 observer->OnStopRequest(AsRequest(), ctx, status); |
|
123 } |
|
124 } |
|
125 |
|
126 void |
|
127 nsAsyncStreamCopier::OnAsyncCopyComplete(void *closure, nsresult status) |
|
128 { |
|
129 nsAsyncStreamCopier *self = (nsAsyncStreamCopier *) closure; |
|
130 self->Complete(status); |
|
131 NS_RELEASE(self); // addref'd in AsyncCopy |
|
132 } |
|
133 |
|
134 //----------------------------------------------------------------------------- |
|
135 // nsISupports |
|
136 |
|
137 // We cannot use simply NS_IMPL_ISUPPORTSx as both |
|
138 // nsIAsyncStreamCopier and nsIAsyncStreamCopier2 implement nsIRequest |
|
139 |
|
140 NS_IMPL_ADDREF(nsAsyncStreamCopier) |
|
141 NS_IMPL_RELEASE(nsAsyncStreamCopier) |
|
142 NS_INTERFACE_TABLE_HEAD(nsAsyncStreamCopier) |
|
143 NS_INTERFACE_TABLE_BEGIN |
|
144 NS_INTERFACE_TABLE_ENTRY(nsAsyncStreamCopier, nsIAsyncStreamCopier) |
|
145 NS_INTERFACE_TABLE_ENTRY(nsAsyncStreamCopier, nsIAsyncStreamCopier2) |
|
146 NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsAsyncStreamCopier, nsIRequest, nsIAsyncStreamCopier) |
|
147 NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsAsyncStreamCopier, nsISupports, nsIAsyncStreamCopier) |
|
148 NS_INTERFACE_TABLE_END |
|
149 NS_INTERFACE_TABLE_TAIL |
|
150 |
|
151 //----------------------------------------------------------------------------- |
|
152 // nsIRequest |
|
153 |
|
154 NS_IMETHODIMP |
|
155 nsAsyncStreamCopier::GetName(nsACString &name) |
|
156 { |
|
157 name.Truncate(); |
|
158 return NS_OK; |
|
159 } |
|
160 |
|
161 NS_IMETHODIMP |
|
162 nsAsyncStreamCopier::IsPending(bool *result) |
|
163 { |
|
164 *result = !IsComplete(); |
|
165 return NS_OK; |
|
166 } |
|
167 |
|
168 NS_IMETHODIMP |
|
169 nsAsyncStreamCopier::GetStatus(nsresult *status) |
|
170 { |
|
171 IsComplete(status); |
|
172 return NS_OK; |
|
173 } |
|
174 |
|
175 NS_IMETHODIMP |
|
176 nsAsyncStreamCopier::Cancel(nsresult status) |
|
177 { |
|
178 nsCOMPtr<nsISupports> copierCtx; |
|
179 { |
|
180 MutexAutoLock lock(mLock); |
|
181 if (!mIsPending) |
|
182 return NS_OK; |
|
183 copierCtx.swap(mCopierCtx); |
|
184 } |
|
185 |
|
186 if (NS_SUCCEEDED(status)) { |
|
187 NS_WARNING("cancel with non-failure status code"); |
|
188 status = NS_BASE_STREAM_CLOSED; |
|
189 } |
|
190 |
|
191 if (copierCtx) |
|
192 NS_CancelAsyncCopy(copierCtx, status); |
|
193 |
|
194 return NS_OK; |
|
195 } |
|
196 |
|
197 NS_IMETHODIMP |
|
198 nsAsyncStreamCopier::Suspend() |
|
199 { |
|
200 NS_NOTREACHED("nsAsyncStreamCopier::Suspend"); |
|
201 return NS_ERROR_NOT_IMPLEMENTED; |
|
202 } |
|
203 |
|
204 NS_IMETHODIMP |
|
205 nsAsyncStreamCopier::Resume() |
|
206 { |
|
207 NS_NOTREACHED("nsAsyncStreamCopier::Resume"); |
|
208 return NS_ERROR_NOT_IMPLEMENTED; |
|
209 } |
|
210 |
|
211 NS_IMETHODIMP |
|
212 nsAsyncStreamCopier::GetLoadFlags(nsLoadFlags *aLoadFlags) |
|
213 { |
|
214 *aLoadFlags = LOAD_NORMAL; |
|
215 return NS_OK; |
|
216 } |
|
217 |
|
218 NS_IMETHODIMP |
|
219 nsAsyncStreamCopier::SetLoadFlags(nsLoadFlags aLoadFlags) |
|
220 { |
|
221 return NS_OK; |
|
222 } |
|
223 |
|
224 NS_IMETHODIMP |
|
225 nsAsyncStreamCopier::GetLoadGroup(nsILoadGroup **aLoadGroup) |
|
226 { |
|
227 *aLoadGroup = nullptr; |
|
228 return NS_OK; |
|
229 } |
|
230 |
|
231 NS_IMETHODIMP |
|
232 nsAsyncStreamCopier::SetLoadGroup(nsILoadGroup *aLoadGroup) |
|
233 { |
|
234 return NS_OK; |
|
235 } |
|
236 |
|
237 nsresult |
|
238 nsAsyncStreamCopier::InitInternal(nsIInputStream *source, |
|
239 nsIOutputStream *sink, |
|
240 nsIEventTarget *target, |
|
241 uint32_t chunkSize, |
|
242 bool closeSource, |
|
243 bool closeSink) |
|
244 { |
|
245 NS_ASSERTION(!mSource && !mSink, "Init() called more than once"); |
|
246 if (chunkSize == 0) { |
|
247 chunkSize = nsIOService::gDefaultSegmentSize; |
|
248 } |
|
249 mChunkSize = chunkSize; |
|
250 |
|
251 mSource = source; |
|
252 mSink = sink; |
|
253 mCloseSource = closeSource; |
|
254 mCloseSink = closeSink; |
|
255 |
|
256 if (target) { |
|
257 mTarget = target; |
|
258 } else { |
|
259 nsresult rv; |
|
260 mTarget = do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv); |
|
261 if (NS_FAILED(rv)) { |
|
262 return rv; |
|
263 } |
|
264 } |
|
265 |
|
266 return NS_OK; |
|
267 } |
|
268 |
|
269 //----------------------------------------------------------------------------- |
|
270 // nsIAsyncStreamCopier |
|
271 |
|
272 NS_IMETHODIMP |
|
273 nsAsyncStreamCopier::Init(nsIInputStream *source, |
|
274 nsIOutputStream *sink, |
|
275 nsIEventTarget *target, |
|
276 bool sourceBuffered, |
|
277 bool sinkBuffered, |
|
278 uint32_t chunkSize, |
|
279 bool closeSource, |
|
280 bool closeSink) |
|
281 { |
|
282 NS_ASSERTION(sourceBuffered || sinkBuffered, "at least one stream must be buffered"); |
|
283 mMode = sourceBuffered ? NS_ASYNCCOPY_VIA_READSEGMENTS |
|
284 : NS_ASYNCCOPY_VIA_WRITESEGMENTS; |
|
285 |
|
286 return InitInternal(source, sink, target, chunkSize, closeSource, closeSink); |
|
287 } |
|
288 |
|
289 //----------------------------------------------------------------------------- |
|
290 // nsIAsyncStreamCopier2 |
|
291 |
|
292 NS_IMETHODIMP |
|
293 nsAsyncStreamCopier::Init(nsIInputStream *source, |
|
294 nsIOutputStream *sink, |
|
295 nsIEventTarget *target, |
|
296 uint32_t chunkSize, |
|
297 bool closeSource, |
|
298 bool closeSink) |
|
299 { |
|
300 mShouldSniffBuffering = true; |
|
301 |
|
302 return InitInternal(source, sink, target, chunkSize, closeSource, closeSink); |
|
303 } |
|
304 |
|
305 /** |
|
306 * Detect whether the input or the output stream is buffered, |
|
307 * bufferize one of them if neither is buffered. |
|
308 */ |
|
309 nsresult |
|
310 nsAsyncStreamCopier::ApplyBufferingPolicy() |
|
311 { |
|
312 // This function causes I/O, it must not be executed on the main |
|
313 // thread. |
|
314 MOZ_ASSERT(!NS_IsMainThread()); |
|
315 |
|
316 if (NS_OutputStreamIsBuffered(mSink)) { |
|
317 // Sink is buffered, no need to perform additional buffering |
|
318 mMode = NS_ASYNCCOPY_VIA_WRITESEGMENTS; |
|
319 return NS_OK; |
|
320 } |
|
321 if (NS_InputStreamIsBuffered(mSource)) { |
|
322 // Source is buffered, no need to perform additional buffering |
|
323 mMode = NS_ASYNCCOPY_VIA_READSEGMENTS; |
|
324 return NS_OK; |
|
325 } |
|
326 |
|
327 // No buffering, let's buffer the sink |
|
328 nsresult rv; |
|
329 nsCOMPtr<nsIBufferedOutputStream> sink = |
|
330 do_CreateInstance(NS_BUFFEREDOUTPUTSTREAM_CONTRACTID, &rv); |
|
331 if (NS_FAILED(rv)) { |
|
332 return rv; |
|
333 } |
|
334 |
|
335 rv = sink->Init(mSink, mChunkSize); |
|
336 if (NS_FAILED(rv)) { |
|
337 return rv; |
|
338 } |
|
339 |
|
340 mMode = NS_ASYNCCOPY_VIA_WRITESEGMENTS; |
|
341 mSink = sink; |
|
342 return NS_OK; |
|
343 } |
|
344 |
|
345 //----------------------------------------------------------------------------- |
|
346 // Both nsIAsyncStreamCopier and nsIAsyncStreamCopier2 |
|
347 |
|
348 NS_IMETHODIMP |
|
349 nsAsyncStreamCopier::AsyncCopy(nsIRequestObserver *observer, nsISupports *ctx) |
|
350 { |
|
351 LOG(("nsAsyncStreamCopier::AsyncCopy [this=%p observer=%x]\n", this, observer)); |
|
352 |
|
353 NS_ASSERTION(mSource && mSink, "not initialized"); |
|
354 nsresult rv; |
|
355 |
|
356 if (observer) { |
|
357 // build proxy for observer events |
|
358 rv = NS_NewRequestObserverProxy(getter_AddRefs(mObserver), observer, ctx); |
|
359 if (NS_FAILED(rv)) return rv; |
|
360 } |
|
361 |
|
362 // from this point forward, AsyncCopy is going to return NS_OK. any errors |
|
363 // will be reported via OnStopRequest. |
|
364 mIsPending = true; |
|
365 |
|
366 if (mObserver) { |
|
367 rv = mObserver->OnStartRequest(AsRequest(), nullptr); |
|
368 if (NS_FAILED(rv)) |
|
369 Cancel(rv); |
|
370 } |
|
371 |
|
372 if (!mShouldSniffBuffering) { |
|
373 // No buffer sniffing required, let's proceed |
|
374 AsyncCopyInternal(); |
|
375 return NS_OK; |
|
376 } |
|
377 |
|
378 if (NS_IsMainThread()) { |
|
379 // Don't perform buffer sniffing on the main thread |
|
380 nsCOMPtr<AsyncApplyBufferingPolicyEvent> event |
|
381 = new AsyncApplyBufferingPolicyEvent(this); |
|
382 rv = mTarget->Dispatch(event, NS_DISPATCH_NORMAL); |
|
383 if (NS_FAILED(rv)) { |
|
384 Cancel(rv); |
|
385 } |
|
386 return NS_OK; |
|
387 } |
|
388 |
|
389 // We're not going to block the main thread, so let's sniff here |
|
390 rv = ApplyBufferingPolicy(); |
|
391 if (NS_FAILED(rv)) { |
|
392 Cancel(rv); |
|
393 } |
|
394 AsyncCopyInternal(); |
|
395 return NS_OK; |
|
396 } |
|
397 |
|
398 // Launch async copy. |
|
399 // All errors are reported through the observer. |
|
400 void |
|
401 nsAsyncStreamCopier::AsyncCopyInternal() |
|
402 { |
|
403 MOZ_ASSERT(mMode == NS_ASYNCCOPY_VIA_READSEGMENTS |
|
404 || mMode == NS_ASYNCCOPY_VIA_WRITESEGMENTS); |
|
405 |
|
406 nsresult rv; |
|
407 // we want to receive progress notifications; release happens in |
|
408 // OnAsyncCopyComplete. |
|
409 NS_ADDREF_THIS(); |
|
410 { |
|
411 MutexAutoLock lock(mLock); |
|
412 rv = NS_AsyncCopy(mSource, mSink, mTarget, mMode, mChunkSize, |
|
413 OnAsyncCopyComplete, this, mCloseSource, mCloseSink, |
|
414 getter_AddRefs(mCopierCtx)); |
|
415 } |
|
416 if (NS_FAILED(rv)) { |
|
417 NS_RELEASE_THIS(); |
|
418 Cancel(rv); |
|
419 } |
|
420 } |
|
421 |
|
422 |