Wed, 31 Dec 2014 06:09:35 +0100
Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.
1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 #include "nsStreamTransportService.h"
6 #include "nsXPCOMCIDInternal.h"
7 #include "nsNetSegmentUtils.h"
8 #include "nsTransportUtils.h"
9 #include "nsStreamUtils.h"
10 #include "nsError.h"
11 #include "nsNetCID.h"
13 #include "nsIAsyncInputStream.h"
14 #include "nsIAsyncOutputStream.h"
15 #include "nsISeekableStream.h"
16 #include "nsIPipe.h"
17 #include "nsITransport.h"
18 #include "nsIObserverService.h"
19 #include "nsIThreadPool.h"
20 #include "mozilla/Services.h"
22 //-----------------------------------------------------------------------------
23 // nsInputStreamTransport
24 //
25 // Implements nsIInputStream as a wrapper around the real input stream. This
26 // allows the transport to support seeking, range-limiting, progress reporting,
27 // and close-when-done semantics while utilizing NS_AsyncCopy.
28 //-----------------------------------------------------------------------------
30 class nsInputStreamTransport : public nsITransport
31 , public nsIInputStream
32 {
33 public:
34 NS_DECL_THREADSAFE_ISUPPORTS
35 NS_DECL_NSITRANSPORT
36 NS_DECL_NSIINPUTSTREAM
38 nsInputStreamTransport(nsIInputStream *source,
39 uint64_t offset,
40 uint64_t limit,
41 bool closeWhenDone)
42 : mSource(source)
43 , mOffset(offset)
44 , mLimit(limit)
45 , mCloseWhenDone(closeWhenDone)
46 , mFirstTime(true)
47 , mInProgress(false)
48 {
49 }
51 virtual ~nsInputStreamTransport()
52 {
53 }
55 private:
56 nsCOMPtr<nsIAsyncInputStream> mPipeIn;
58 // while the copy is active, these members may only be accessed from the
59 // nsIInputStream implementation.
60 nsCOMPtr<nsITransportEventSink> mEventSink;
61 nsCOMPtr<nsIInputStream> mSource;
62 uint64_t mOffset;
63 uint64_t mLimit;
64 bool mCloseWhenDone;
65 bool mFirstTime;
67 // this variable serves as a lock to prevent the state of the transport
68 // from being modified once the copy is in progress.
69 bool mInProgress;
70 };
72 NS_IMPL_ISUPPORTS(nsInputStreamTransport,
73 nsITransport,
74 nsIInputStream)
76 /** nsITransport **/
78 NS_IMETHODIMP
79 nsInputStreamTransport::OpenInputStream(uint32_t flags,
80 uint32_t segsize,
81 uint32_t segcount,
82 nsIInputStream **result)
83 {
84 NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS);
86 nsresult rv;
87 nsCOMPtr<nsIEventTarget> target =
88 do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv);
89 if (NS_FAILED(rv)) return rv;
91 // XXX if the caller requests an unbuffered stream, then perhaps
92 // we'd want to simply return mSource; however, then we would
93 // not be reading mSource on a background thread. is this ok?
95 bool nonblocking = !(flags & OPEN_BLOCKING);
97 net_ResolveSegmentParams(segsize, segcount);
99 nsCOMPtr<nsIAsyncOutputStream> pipeOut;
100 rv = NS_NewPipe2(getter_AddRefs(mPipeIn),
101 getter_AddRefs(pipeOut),
102 nonblocking, true,
103 segsize, segcount);
104 if (NS_FAILED(rv)) return rv;
106 mInProgress = true;
108 // startup async copy process...
109 rv = NS_AsyncCopy(this, pipeOut, target,
110 NS_ASYNCCOPY_VIA_WRITESEGMENTS, segsize);
111 if (NS_SUCCEEDED(rv))
112 NS_ADDREF(*result = mPipeIn);
114 return rv;
115 }
117 NS_IMETHODIMP
118 nsInputStreamTransport::OpenOutputStream(uint32_t flags,
119 uint32_t segsize,
120 uint32_t segcount,
121 nsIOutputStream **result)
122 {
123 // this transport only supports reading!
124 NS_NOTREACHED("nsInputStreamTransport::OpenOutputStream");
125 return NS_ERROR_UNEXPECTED;
126 }
128 NS_IMETHODIMP
129 nsInputStreamTransport::Close(nsresult reason)
130 {
131 if (NS_SUCCEEDED(reason))
132 reason = NS_BASE_STREAM_CLOSED;
134 return mPipeIn->CloseWithStatus(reason);
135 }
137 NS_IMETHODIMP
138 nsInputStreamTransport::SetEventSink(nsITransportEventSink *sink,
139 nsIEventTarget *target)
140 {
141 NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS);
143 if (target)
144 return net_NewTransportEventSinkProxy(getter_AddRefs(mEventSink),
145 sink, target);
147 mEventSink = sink;
148 return NS_OK;
149 }
151 /** nsIInputStream **/
153 NS_IMETHODIMP
154 nsInputStreamTransport::Close()
155 {
156 if (mCloseWhenDone)
157 mSource->Close();
159 // make additional reads return early...
160 mOffset = mLimit = 0;
161 return NS_OK;
162 }
164 NS_IMETHODIMP
165 nsInputStreamTransport::Available(uint64_t *result)
166 {
167 return NS_ERROR_NOT_IMPLEMENTED;
168 }
170 NS_IMETHODIMP
171 nsInputStreamTransport::Read(char *buf, uint32_t count, uint32_t *result)
172 {
173 if (mFirstTime) {
174 mFirstTime = false;
175 if (mOffset != 0) {
176 // read from current position if offset equal to max
177 if (mOffset != UINT64_MAX) {
178 nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mSource);
179 if (seekable)
180 seekable->Seek(nsISeekableStream::NS_SEEK_SET, mOffset);
181 }
182 // reset offset to zero so we can use it to enforce limit
183 mOffset = 0;
184 }
185 }
187 // limit amount read
188 uint64_t max = mLimit - mOffset;
189 if (max == 0) {
190 *result = 0;
191 return NS_OK;
192 }
194 if (count > max)
195 count = static_cast<uint32_t>(max);
197 nsresult rv = mSource->Read(buf, count, result);
199 if (NS_SUCCEEDED(rv)) {
200 mOffset += *result;
201 if (mEventSink)
202 mEventSink->OnTransportStatus(this, NS_NET_STATUS_READING, mOffset,
203 mLimit);
204 }
205 return rv;
206 }
208 NS_IMETHODIMP
209 nsInputStreamTransport::ReadSegments(nsWriteSegmentFun writer, void *closure,
210 uint32_t count, uint32_t *result)
211 {
212 return NS_ERROR_NOT_IMPLEMENTED;
213 }
215 NS_IMETHODIMP
216 nsInputStreamTransport::IsNonBlocking(bool *result)
217 {
218 *result = false;
219 return NS_OK;
220 }
222 //-----------------------------------------------------------------------------
223 // nsOutputStreamTransport
224 //
225 // Implements nsIOutputStream as a wrapper around the real input stream. This
226 // allows the transport to support seeking, range-limiting, progress reporting,
227 // and close-when-done semantics while utilizing NS_AsyncCopy.
228 //-----------------------------------------------------------------------------
230 class nsOutputStreamTransport : public nsITransport
231 , public nsIOutputStream
232 {
233 public:
234 NS_DECL_THREADSAFE_ISUPPORTS
235 NS_DECL_NSITRANSPORT
236 NS_DECL_NSIOUTPUTSTREAM
238 nsOutputStreamTransport(nsIOutputStream *sink,
239 uint64_t offset,
240 uint64_t limit,
241 bool closeWhenDone)
242 : mSink(sink)
243 , mOffset(offset)
244 , mLimit(limit)
245 , mCloseWhenDone(closeWhenDone)
246 , mFirstTime(true)
247 , mInProgress(false)
248 {
249 }
251 virtual ~nsOutputStreamTransport()
252 {
253 }
255 private:
256 nsCOMPtr<nsIAsyncOutputStream> mPipeOut;
258 // while the copy is active, these members may only be accessed from the
259 // nsIOutputStream implementation.
260 nsCOMPtr<nsITransportEventSink> mEventSink;
261 nsCOMPtr<nsIOutputStream> mSink;
262 uint64_t mOffset;
263 uint64_t mLimit;
264 bool mCloseWhenDone;
265 bool mFirstTime;
267 // this variable serves as a lock to prevent the state of the transport
268 // from being modified once the copy is in progress.
269 bool mInProgress;
270 };
272 NS_IMPL_ISUPPORTS(nsOutputStreamTransport,
273 nsITransport,
274 nsIOutputStream)
276 /** nsITransport **/
278 NS_IMETHODIMP
279 nsOutputStreamTransport::OpenInputStream(uint32_t flags,
280 uint32_t segsize,
281 uint32_t segcount,
282 nsIInputStream **result)
283 {
284 // this transport only supports writing!
285 NS_NOTREACHED("nsOutputStreamTransport::OpenInputStream");
286 return NS_ERROR_UNEXPECTED;
287 }
289 NS_IMETHODIMP
290 nsOutputStreamTransport::OpenOutputStream(uint32_t flags,
291 uint32_t segsize,
292 uint32_t segcount,
293 nsIOutputStream **result)
294 {
295 NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS);
297 nsresult rv;
298 nsCOMPtr<nsIEventTarget> target =
299 do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv);
300 if (NS_FAILED(rv)) return rv;
302 // XXX if the caller requests an unbuffered stream, then perhaps
303 // we'd want to simply return mSink; however, then we would
304 // not be writing to mSink on a background thread. is this ok?
306 bool nonblocking = !(flags & OPEN_BLOCKING);
308 net_ResolveSegmentParams(segsize, segcount);
310 nsCOMPtr<nsIAsyncInputStream> pipeIn;
311 rv = NS_NewPipe2(getter_AddRefs(pipeIn),
312 getter_AddRefs(mPipeOut),
313 true, nonblocking,
314 segsize, segcount);
315 if (NS_FAILED(rv)) return rv;
317 mInProgress = true;
319 // startup async copy process...
320 rv = NS_AsyncCopy(pipeIn, this, target,
321 NS_ASYNCCOPY_VIA_READSEGMENTS, segsize);
322 if (NS_SUCCEEDED(rv))
323 NS_ADDREF(*result = mPipeOut);
325 return rv;
326 }
328 NS_IMETHODIMP
329 nsOutputStreamTransport::Close(nsresult reason)
330 {
331 if (NS_SUCCEEDED(reason))
332 reason = NS_BASE_STREAM_CLOSED;
334 return mPipeOut->CloseWithStatus(reason);
335 }
337 NS_IMETHODIMP
338 nsOutputStreamTransport::SetEventSink(nsITransportEventSink *sink,
339 nsIEventTarget *target)
340 {
341 NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS);
343 if (target)
344 return net_NewTransportEventSinkProxy(getter_AddRefs(mEventSink),
345 sink, target);
347 mEventSink = sink;
348 return NS_OK;
349 }
351 /** nsIOutputStream **/
353 NS_IMETHODIMP
354 nsOutputStreamTransport::Close()
355 {
356 if (mCloseWhenDone)
357 mSink->Close();
359 // make additional writes return early...
360 mOffset = mLimit = 0;
361 return NS_OK;
362 }
364 NS_IMETHODIMP
365 nsOutputStreamTransport::Flush()
366 {
367 return NS_OK;
368 }
370 NS_IMETHODIMP
371 nsOutputStreamTransport::Write(const char *buf, uint32_t count, uint32_t *result)
372 {
373 if (mFirstTime) {
374 mFirstTime = false;
375 if (mOffset != 0) {
376 // write to current position if offset equal to max
377 if (mOffset != UINT64_MAX) {
378 nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mSink);
379 if (seekable)
380 seekable->Seek(nsISeekableStream::NS_SEEK_SET, mOffset);
381 }
382 // reset offset to zero so we can use it to enforce limit
383 mOffset = 0;
384 }
385 }
387 // limit amount written
388 uint64_t max = mLimit - mOffset;
389 if (max == 0) {
390 *result = 0;
391 return NS_OK;
392 }
394 if (count > max)
395 count = static_cast<uint32_t>(max);
397 nsresult rv = mSink->Write(buf, count, result);
399 if (NS_SUCCEEDED(rv)) {
400 mOffset += *result;
401 if (mEventSink)
402 mEventSink->OnTransportStatus(this, NS_NET_STATUS_WRITING, mOffset,
403 mLimit);
404 }
405 return rv;
406 }
408 NS_IMETHODIMP
409 nsOutputStreamTransport::WriteSegments(nsReadSegmentFun reader, void *closure,
410 uint32_t count, uint32_t *result)
411 {
412 return NS_ERROR_NOT_IMPLEMENTED;
413 }
415 NS_IMETHODIMP
416 nsOutputStreamTransport::WriteFrom(nsIInputStream *in, uint32_t count, uint32_t *result)
417 {
418 return NS_ERROR_NOT_IMPLEMENTED;
419 }
421 NS_IMETHODIMP
422 nsOutputStreamTransport::IsNonBlocking(bool *result)
423 {
424 *result = false;
425 return NS_OK;
426 }
428 #ifdef MOZ_NUWA_PROCESS
429 #include "ipc/Nuwa.h"
431 class STSThreadPoolListener : public nsIThreadPoolListener
432 {
433 public:
434 NS_DECL_THREADSAFE_ISUPPORTS
435 NS_DECL_NSITHREADPOOLLISTENER
437 STSThreadPoolListener() {}
438 ~STSThreadPoolListener() {}
439 };
441 NS_IMPL_ISUPPORTS(STSThreadPoolListener, nsIThreadPoolListener)
443 NS_IMETHODIMP
444 STSThreadPoolListener::OnThreadCreated()
445 {
446 if (IsNuwaProcess()) {
447 NuwaMarkCurrentThread(nullptr, nullptr);
448 }
449 return NS_OK;
450 }
452 NS_IMETHODIMP
453 STSThreadPoolListener::OnThreadShuttingDown()
454 {
455 return NS_OK;
456 }
458 #endif // MOZ_NUWA_PROCESS
460 //-----------------------------------------------------------------------------
461 // nsStreamTransportService
462 //-----------------------------------------------------------------------------
464 nsStreamTransportService::~nsStreamTransportService()
465 {
466 NS_ASSERTION(!mPool, "thread pool wasn't shutdown");
467 }
469 nsresult
470 nsStreamTransportService::Init()
471 {
472 mPool = do_CreateInstance(NS_THREADPOOL_CONTRACTID);
473 NS_ENSURE_STATE(mPool);
475 // Configure the pool
476 mPool->SetName(NS_LITERAL_CSTRING("StreamTrans"));
477 mPool->SetThreadLimit(25);
478 mPool->SetIdleThreadLimit(1);
479 mPool->SetIdleThreadTimeout(PR_SecondsToInterval(30));
480 #ifdef MOZ_NUWA_PROCESS
481 if (IsNuwaProcess()) {
482 mPool->SetListener(new STSThreadPoolListener());
483 }
484 #endif
486 nsCOMPtr<nsIObserverService> obsSvc =
487 mozilla::services::GetObserverService();
488 if (obsSvc)
489 obsSvc->AddObserver(this, "xpcom-shutdown-threads", false);
490 return NS_OK;
491 }
493 NS_IMPL_ISUPPORTS(nsStreamTransportService,
494 nsIStreamTransportService,
495 nsIEventTarget,
496 nsIObserver)
498 NS_IMETHODIMP
499 nsStreamTransportService::Dispatch(nsIRunnable *task, uint32_t flags)
500 {
501 NS_ENSURE_TRUE(mPool, NS_ERROR_NOT_INITIALIZED);
502 return mPool->Dispatch(task, flags);
503 }
505 NS_IMETHODIMP
506 nsStreamTransportService::IsOnCurrentThread(bool *result)
507 {
508 NS_ENSURE_TRUE(mPool, NS_ERROR_NOT_INITIALIZED);
509 return mPool->IsOnCurrentThread(result);
510 }
512 NS_IMETHODIMP
513 nsStreamTransportService::CreateInputTransport(nsIInputStream *stream,
514 int64_t offset,
515 int64_t limit,
516 bool closeWhenDone,
517 nsITransport **result)
518 {
519 nsInputStreamTransport *trans =
520 new nsInputStreamTransport(stream, offset, limit, closeWhenDone);
521 if (!trans)
522 return NS_ERROR_OUT_OF_MEMORY;
523 NS_ADDREF(*result = trans);
524 return NS_OK;
525 }
527 NS_IMETHODIMP
528 nsStreamTransportService::CreateOutputTransport(nsIOutputStream *stream,
529 int64_t offset,
530 int64_t limit,
531 bool closeWhenDone,
532 nsITransport **result)
533 {
534 nsOutputStreamTransport *trans =
535 new nsOutputStreamTransport(stream, offset, limit, closeWhenDone);
536 if (!trans)
537 return NS_ERROR_OUT_OF_MEMORY;
538 NS_ADDREF(*result = trans);
539 return NS_OK;
540 }
542 NS_IMETHODIMP
543 nsStreamTransportService::Observe(nsISupports *subject, const char *topic,
544 const char16_t *data)
545 {
546 NS_ASSERTION(strcmp(topic, "xpcom-shutdown-threads") == 0, "oops");
548 if (mPool) {
549 mPool->Shutdown();
550 mPool = nullptr;
551 }
552 return NS_OK;
553 }