|
1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
|
2 /* This Source Code Form is subject to the terms of the Mozilla Public |
|
3 * License, v. 2.0. If a copy of the MPL was not distributed with this |
|
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ |
|
5 |
|
6 #include <stdlib.h> |
|
7 #include "prlog.h" |
|
8 |
|
9 #include "mozilla/Mutex.h" |
|
10 #include "mozilla/Attributes.h" |
|
11 #include "nsIInputStreamTee.h" |
|
12 #include "nsIInputStream.h" |
|
13 #include "nsIOutputStream.h" |
|
14 #include "nsCOMPtr.h" |
|
15 #include "nsAutoPtr.h" |
|
16 #include "nsIEventTarget.h" |
|
17 #include "nsThreadUtils.h" |
|
18 |
|
19 using namespace mozilla; |
|
20 |
|
21 #ifdef LOG |
|
22 #undef LOG |
|
23 #endif |
|
24 #ifdef PR_LOGGING |
|
25 static PRLogModuleInfo* |
|
26 GetTeeLog() |
|
27 { |
|
28 static PRLogModuleInfo *sLog; |
|
29 if (!sLog) |
|
30 sLog = PR_NewLogModule("nsInputStreamTee"); |
|
31 return sLog; |
|
32 } |
|
33 #define LOG(args) PR_LOG(GetTeeLog(), PR_LOG_DEBUG, args) |
|
34 #else |
|
35 #define LOG(args) |
|
36 #endif |
|
37 |
|
38 class nsInputStreamTee MOZ_FINAL : public nsIInputStreamTee |
|
39 { |
|
40 public: |
|
41 NS_DECL_THREADSAFE_ISUPPORTS |
|
42 NS_DECL_NSIINPUTSTREAM |
|
43 NS_DECL_NSIINPUTSTREAMTEE |
|
44 |
|
45 nsInputStreamTee(); |
|
46 bool SinkIsValid(); |
|
47 void InvalidateSink(); |
|
48 |
|
49 private: |
|
50 ~nsInputStreamTee() {} |
|
51 |
|
52 nsresult TeeSegment(const char *buf, uint32_t count); |
|
53 |
|
54 static NS_METHOD WriteSegmentFun(nsIInputStream *, void *, const char *, |
|
55 uint32_t, uint32_t, uint32_t *); |
|
56 |
|
57 private: |
|
58 nsCOMPtr<nsIInputStream> mSource; |
|
59 nsCOMPtr<nsIOutputStream> mSink; |
|
60 nsCOMPtr<nsIEventTarget> mEventTarget; |
|
61 nsWriteSegmentFun mWriter; // for implementing ReadSegments |
|
62 void *mClosure; // for implementing ReadSegments |
|
63 nsAutoPtr<Mutex> mLock; // synchronize access to mSinkIsValid |
|
64 bool mSinkIsValid; // False if TeeWriteEvent fails |
|
65 }; |
|
66 |
|
67 class nsInputStreamTeeWriteEvent : public nsRunnable { |
|
68 public: |
|
69 // aTee's lock is held across construction of this object |
|
70 nsInputStreamTeeWriteEvent(const char *aBuf, uint32_t aCount, |
|
71 nsIOutputStream *aSink, |
|
72 nsInputStreamTee *aTee) |
|
73 { |
|
74 // copy the buffer - will be free'd by dtor |
|
75 mBuf = (char *)malloc(aCount); |
|
76 if (mBuf) memcpy(mBuf, (char *)aBuf, aCount); |
|
77 mCount = aCount; |
|
78 mSink = aSink; |
|
79 bool isNonBlocking; |
|
80 mSink->IsNonBlocking(&isNonBlocking); |
|
81 NS_ASSERTION(isNonBlocking == false, "mSink is nonblocking"); |
|
82 mTee = aTee; |
|
83 } |
|
84 |
|
85 NS_IMETHOD Run() |
|
86 { |
|
87 if (!mBuf) { |
|
88 NS_WARNING("nsInputStreamTeeWriteEvent::Run() " |
|
89 "memory not allocated\n"); |
|
90 return NS_OK; |
|
91 } |
|
92 NS_ABORT_IF_FALSE(mSink, "mSink is null!"); |
|
93 |
|
94 // The output stream could have been invalidated between when |
|
95 // this event was dispatched and now, so check before writing. |
|
96 if (!mTee->SinkIsValid()) { |
|
97 return NS_OK; |
|
98 } |
|
99 |
|
100 LOG(("nsInputStreamTeeWriteEvent::Run() [%p]" |
|
101 "will write %u bytes to %p\n", |
|
102 this, mCount, mSink.get())); |
|
103 |
|
104 uint32_t totalBytesWritten = 0; |
|
105 while (mCount) { |
|
106 nsresult rv; |
|
107 uint32_t bytesWritten = 0; |
|
108 rv = mSink->Write(mBuf + totalBytesWritten, mCount, &bytesWritten); |
|
109 if (NS_FAILED(rv)) { |
|
110 LOG(("nsInputStreamTeeWriteEvent::Run[%p] error %x in writing", |
|
111 this,rv)); |
|
112 mTee->InvalidateSink(); |
|
113 break; |
|
114 } |
|
115 totalBytesWritten += bytesWritten; |
|
116 NS_ASSERTION(bytesWritten <= mCount, "wrote too much"); |
|
117 mCount -= bytesWritten; |
|
118 } |
|
119 return NS_OK; |
|
120 } |
|
121 |
|
122 protected: |
|
123 virtual ~nsInputStreamTeeWriteEvent() |
|
124 { |
|
125 if (mBuf) free(mBuf); |
|
126 mBuf = nullptr; |
|
127 } |
|
128 |
|
129 private: |
|
130 char *mBuf; |
|
131 uint32_t mCount; |
|
132 nsCOMPtr<nsIOutputStream> mSink; |
|
133 // back pointer to the tee that created this runnable |
|
134 nsRefPtr<nsInputStreamTee> mTee; |
|
135 }; |
|
136 |
|
137 nsInputStreamTee::nsInputStreamTee(): mLock(nullptr) |
|
138 , mSinkIsValid(true) |
|
139 { |
|
140 } |
|
141 |
|
142 bool |
|
143 nsInputStreamTee::SinkIsValid() |
|
144 { |
|
145 MutexAutoLock lock(*mLock); |
|
146 return mSinkIsValid; |
|
147 } |
|
148 |
|
149 void |
|
150 nsInputStreamTee::InvalidateSink() |
|
151 { |
|
152 MutexAutoLock lock(*mLock); |
|
153 mSinkIsValid = false; |
|
154 } |
|
155 |
|
156 nsresult |
|
157 nsInputStreamTee::TeeSegment(const char *buf, uint32_t count) |
|
158 { |
|
159 if (!mSink) return NS_OK; // nothing to do |
|
160 if (mLock) { // asynchronous case |
|
161 NS_ASSERTION(mEventTarget, "mEventTarget is null, mLock is not null."); |
|
162 if (!SinkIsValid()) { |
|
163 return NS_OK; // nothing to do |
|
164 } |
|
165 nsRefPtr<nsIRunnable> event = |
|
166 new nsInputStreamTeeWriteEvent(buf, count, mSink, this); |
|
167 LOG(("nsInputStreamTee::TeeSegment [%p] dispatching write %u bytes\n", |
|
168 this, count)); |
|
169 return mEventTarget->Dispatch(event, NS_DISPATCH_NORMAL); |
|
170 } else { // synchronous case |
|
171 NS_ASSERTION(!mEventTarget, "mEventTarget is not null, mLock is null."); |
|
172 nsresult rv; |
|
173 uint32_t totalBytesWritten = 0; |
|
174 while (count) { |
|
175 uint32_t bytesWritten = 0; |
|
176 rv = mSink->Write(buf + totalBytesWritten, count, &bytesWritten); |
|
177 if (NS_FAILED(rv)) { |
|
178 // ok, this is not a fatal error... just drop our reference to mSink |
|
179 // and continue on as if nothing happened. |
|
180 NS_WARNING("Write failed (non-fatal)"); |
|
181 // catch possible misuse of the input stream tee |
|
182 NS_ASSERTION(rv != NS_BASE_STREAM_WOULD_BLOCK, "sink must be a blocking stream"); |
|
183 mSink = 0; |
|
184 break; |
|
185 } |
|
186 totalBytesWritten += bytesWritten; |
|
187 NS_ASSERTION(bytesWritten <= count, "wrote too much"); |
|
188 count -= bytesWritten; |
|
189 } |
|
190 return NS_OK; |
|
191 } |
|
192 } |
|
193 |
|
194 NS_METHOD |
|
195 nsInputStreamTee::WriteSegmentFun(nsIInputStream *in, void *closure, const char *fromSegment, |
|
196 uint32_t offset, uint32_t count, uint32_t *writeCount) |
|
197 { |
|
198 nsInputStreamTee *tee = reinterpret_cast<nsInputStreamTee *>(closure); |
|
199 |
|
200 nsresult rv = tee->mWriter(in, tee->mClosure, fromSegment, offset, count, writeCount); |
|
201 if (NS_FAILED(rv) || (*writeCount == 0)) { |
|
202 NS_ASSERTION((NS_FAILED(rv) ? (*writeCount == 0) : true), |
|
203 "writer returned an error with non-zero writeCount"); |
|
204 return rv; |
|
205 } |
|
206 |
|
207 return tee->TeeSegment(fromSegment, *writeCount); |
|
208 } |
|
209 |
|
210 NS_IMPL_ISUPPORTS(nsInputStreamTee, |
|
211 nsIInputStreamTee, |
|
212 nsIInputStream) |
|
213 NS_IMETHODIMP |
|
214 nsInputStreamTee::Close() |
|
215 { |
|
216 if (NS_WARN_IF(!mSource)) |
|
217 return NS_ERROR_NOT_INITIALIZED; |
|
218 nsresult rv = mSource->Close(); |
|
219 mSource = 0; |
|
220 mSink = 0; |
|
221 return rv; |
|
222 } |
|
223 |
|
224 NS_IMETHODIMP |
|
225 nsInputStreamTee::Available(uint64_t *avail) |
|
226 { |
|
227 if (NS_WARN_IF(!mSource)) |
|
228 return NS_ERROR_NOT_INITIALIZED; |
|
229 return mSource->Available(avail); |
|
230 } |
|
231 |
|
232 NS_IMETHODIMP |
|
233 nsInputStreamTee::Read(char *buf, uint32_t count, uint32_t *bytesRead) |
|
234 { |
|
235 if (NS_WARN_IF(!mSource)) |
|
236 return NS_ERROR_NOT_INITIALIZED; |
|
237 |
|
238 nsresult rv = mSource->Read(buf, count, bytesRead); |
|
239 if (NS_FAILED(rv) || (*bytesRead == 0)) |
|
240 return rv; |
|
241 |
|
242 return TeeSegment(buf, *bytesRead); |
|
243 } |
|
244 |
|
245 NS_IMETHODIMP |
|
246 nsInputStreamTee::ReadSegments(nsWriteSegmentFun writer, |
|
247 void *closure, |
|
248 uint32_t count, |
|
249 uint32_t *bytesRead) |
|
250 { |
|
251 if (NS_WARN_IF(!mSource)) |
|
252 return NS_ERROR_NOT_INITIALIZED; |
|
253 |
|
254 mWriter = writer; |
|
255 mClosure = closure; |
|
256 |
|
257 return mSource->ReadSegments(WriteSegmentFun, this, count, bytesRead); |
|
258 } |
|
259 |
|
260 NS_IMETHODIMP |
|
261 nsInputStreamTee::IsNonBlocking(bool *result) |
|
262 { |
|
263 if (NS_WARN_IF(!mSource)) |
|
264 return NS_ERROR_NOT_INITIALIZED; |
|
265 return mSource->IsNonBlocking(result); |
|
266 } |
|
267 |
|
268 NS_IMETHODIMP |
|
269 nsInputStreamTee::SetSource(nsIInputStream *source) |
|
270 { |
|
271 mSource = source; |
|
272 return NS_OK; |
|
273 } |
|
274 |
|
275 NS_IMETHODIMP |
|
276 nsInputStreamTee::GetSource(nsIInputStream **source) |
|
277 { |
|
278 NS_IF_ADDREF(*source = mSource); |
|
279 return NS_OK; |
|
280 } |
|
281 |
|
282 NS_IMETHODIMP |
|
283 nsInputStreamTee::SetSink(nsIOutputStream *sink) |
|
284 { |
|
285 #ifdef DEBUG |
|
286 if (sink) { |
|
287 bool nonBlocking; |
|
288 nsresult rv = sink->IsNonBlocking(&nonBlocking); |
|
289 if (NS_FAILED(rv) || nonBlocking) |
|
290 NS_ERROR("sink should be a blocking stream"); |
|
291 } |
|
292 #endif |
|
293 mSink = sink; |
|
294 return NS_OK; |
|
295 } |
|
296 |
|
297 NS_IMETHODIMP |
|
298 nsInputStreamTee::GetSink(nsIOutputStream **sink) |
|
299 { |
|
300 NS_IF_ADDREF(*sink = mSink); |
|
301 return NS_OK; |
|
302 } |
|
303 |
|
304 NS_IMETHODIMP |
|
305 nsInputStreamTee::SetEventTarget(nsIEventTarget *anEventTarget) |
|
306 { |
|
307 mEventTarget = anEventTarget; |
|
308 if (mEventTarget) { |
|
309 // Only need synchronization if this is an async tee |
|
310 mLock = new Mutex("nsInputStreamTee.mLock"); |
|
311 } |
|
312 return NS_OK; |
|
313 } |
|
314 |
|
315 NS_IMETHODIMP |
|
316 nsInputStreamTee::GetEventTarget(nsIEventTarget **anEventTarget) |
|
317 { |
|
318 NS_IF_ADDREF(*anEventTarget = mEventTarget); |
|
319 return NS_OK; |
|
320 } |
|
321 |
|
322 |
|
323 nsresult |
|
324 NS_NewInputStreamTeeAsync(nsIInputStream **result, |
|
325 nsIInputStream *source, |
|
326 nsIOutputStream *sink, |
|
327 nsIEventTarget *anEventTarget) |
|
328 { |
|
329 nsresult rv; |
|
330 |
|
331 nsCOMPtr<nsIInputStreamTee> tee = new nsInputStreamTee(); |
|
332 if (!tee) |
|
333 return NS_ERROR_OUT_OF_MEMORY; |
|
334 |
|
335 rv = tee->SetSource(source); |
|
336 if (NS_FAILED(rv)) return rv; |
|
337 |
|
338 rv = tee->SetSink(sink); |
|
339 if (NS_FAILED(rv)) return rv; |
|
340 |
|
341 rv = tee->SetEventTarget(anEventTarget); |
|
342 if (NS_FAILED(rv)) return rv; |
|
343 |
|
344 NS_ADDREF(*result = tee); |
|
345 return rv; |
|
346 } |
|
347 |
|
348 nsresult |
|
349 NS_NewInputStreamTee(nsIInputStream **result, |
|
350 nsIInputStream *source, |
|
351 nsIOutputStream *sink) |
|
352 { |
|
353 return NS_NewInputStreamTeeAsync(result, source, sink, nullptr); |
|
354 } |
|
355 |
|
356 #undef LOG |