1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/xpcom/io/nsInputStreamTee.cpp Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,356 @@ 1.4 +/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ 1.5 +/* This Source Code Form is subject to the terms of the Mozilla Public 1.6 + * License, v. 2.0. If a copy of the MPL was not distributed with this 1.7 + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 1.8 + 1.9 +#include <stdlib.h> 1.10 +#include "prlog.h" 1.11 + 1.12 +#include "mozilla/Mutex.h" 1.13 +#include "mozilla/Attributes.h" 1.14 +#include "nsIInputStreamTee.h" 1.15 +#include "nsIInputStream.h" 1.16 +#include "nsIOutputStream.h" 1.17 +#include "nsCOMPtr.h" 1.18 +#include "nsAutoPtr.h" 1.19 +#include "nsIEventTarget.h" 1.20 +#include "nsThreadUtils.h" 1.21 + 1.22 +using namespace mozilla; 1.23 + 1.24 +#ifdef LOG 1.25 +#undef LOG 1.26 +#endif 1.27 +#ifdef PR_LOGGING 1.28 +static PRLogModuleInfo* 1.29 +GetTeeLog() 1.30 +{ 1.31 + static PRLogModuleInfo *sLog; 1.32 + if (!sLog) 1.33 + sLog = PR_NewLogModule("nsInputStreamTee"); 1.34 + return sLog; 1.35 +} 1.36 +#define LOG(args) PR_LOG(GetTeeLog(), PR_LOG_DEBUG, args) 1.37 +#else 1.38 +#define LOG(args) 1.39 +#endif 1.40 + 1.41 +class nsInputStreamTee MOZ_FINAL : public nsIInputStreamTee 1.42 +{ 1.43 +public: 1.44 + NS_DECL_THREADSAFE_ISUPPORTS 1.45 + NS_DECL_NSIINPUTSTREAM 1.46 + NS_DECL_NSIINPUTSTREAMTEE 1.47 + 1.48 + nsInputStreamTee(); 1.49 + bool SinkIsValid(); 1.50 + void InvalidateSink(); 1.51 + 1.52 +private: 1.53 + ~nsInputStreamTee() {} 1.54 + 1.55 + nsresult TeeSegment(const char *buf, uint32_t count); 1.56 + 1.57 + static NS_METHOD WriteSegmentFun(nsIInputStream *, void *, const char *, 1.58 + uint32_t, uint32_t, uint32_t *); 1.59 + 1.60 +private: 1.61 + nsCOMPtr<nsIInputStream> mSource; 1.62 + nsCOMPtr<nsIOutputStream> mSink; 1.63 + nsCOMPtr<nsIEventTarget> mEventTarget; 1.64 + nsWriteSegmentFun mWriter; // for implementing ReadSegments 1.65 + void *mClosure; // for implementing ReadSegments 1.66 + nsAutoPtr<Mutex> mLock; // synchronize access to mSinkIsValid 1.67 + bool mSinkIsValid; // False if TeeWriteEvent fails 1.68 +}; 1.69 + 1.70 +class nsInputStreamTeeWriteEvent : public nsRunnable { 1.71 +public: 1.72 + // aTee's lock is held across construction of this object 1.73 + nsInputStreamTeeWriteEvent(const char *aBuf, uint32_t aCount, 1.74 + nsIOutputStream *aSink, 1.75 + nsInputStreamTee *aTee) 1.76 + { 1.77 + // copy the buffer - will be free'd by dtor 1.78 + mBuf = (char *)malloc(aCount); 1.79 + if (mBuf) memcpy(mBuf, (char *)aBuf, aCount); 1.80 + mCount = aCount; 1.81 + mSink = aSink; 1.82 + bool isNonBlocking; 1.83 + mSink->IsNonBlocking(&isNonBlocking); 1.84 + NS_ASSERTION(isNonBlocking == false, "mSink is nonblocking"); 1.85 + mTee = aTee; 1.86 + } 1.87 + 1.88 + NS_IMETHOD Run() 1.89 + { 1.90 + if (!mBuf) { 1.91 + NS_WARNING("nsInputStreamTeeWriteEvent::Run() " 1.92 + "memory not allocated\n"); 1.93 + return NS_OK; 1.94 + } 1.95 + NS_ABORT_IF_FALSE(mSink, "mSink is null!"); 1.96 + 1.97 + // The output stream could have been invalidated between when 1.98 + // this event was dispatched and now, so check before writing. 1.99 + if (!mTee->SinkIsValid()) { 1.100 + return NS_OK; 1.101 + } 1.102 + 1.103 + LOG(("nsInputStreamTeeWriteEvent::Run() [%p]" 1.104 + "will write %u bytes to %p\n", 1.105 + this, mCount, mSink.get())); 1.106 + 1.107 + uint32_t totalBytesWritten = 0; 1.108 + while (mCount) { 1.109 + nsresult rv; 1.110 + uint32_t bytesWritten = 0; 1.111 + rv = mSink->Write(mBuf + totalBytesWritten, mCount, &bytesWritten); 1.112 + if (NS_FAILED(rv)) { 1.113 + LOG(("nsInputStreamTeeWriteEvent::Run[%p] error %x in writing", 1.114 + this,rv)); 1.115 + mTee->InvalidateSink(); 1.116 + break; 1.117 + } 1.118 + totalBytesWritten += bytesWritten; 1.119 + NS_ASSERTION(bytesWritten <= mCount, "wrote too much"); 1.120 + mCount -= bytesWritten; 1.121 + } 1.122 + return NS_OK; 1.123 + } 1.124 + 1.125 +protected: 1.126 + virtual ~nsInputStreamTeeWriteEvent() 1.127 + { 1.128 + if (mBuf) free(mBuf); 1.129 + mBuf = nullptr; 1.130 + } 1.131 + 1.132 +private: 1.133 + char *mBuf; 1.134 + uint32_t mCount; 1.135 + nsCOMPtr<nsIOutputStream> mSink; 1.136 + // back pointer to the tee that created this runnable 1.137 + nsRefPtr<nsInputStreamTee> mTee; 1.138 +}; 1.139 + 1.140 +nsInputStreamTee::nsInputStreamTee(): mLock(nullptr) 1.141 + , mSinkIsValid(true) 1.142 +{ 1.143 +} 1.144 + 1.145 +bool 1.146 +nsInputStreamTee::SinkIsValid() 1.147 +{ 1.148 + MutexAutoLock lock(*mLock); 1.149 + return mSinkIsValid; 1.150 +} 1.151 + 1.152 +void 1.153 +nsInputStreamTee::InvalidateSink() 1.154 +{ 1.155 + MutexAutoLock lock(*mLock); 1.156 + mSinkIsValid = false; 1.157 +} 1.158 + 1.159 +nsresult 1.160 +nsInputStreamTee::TeeSegment(const char *buf, uint32_t count) 1.161 +{ 1.162 + if (!mSink) return NS_OK; // nothing to do 1.163 + if (mLock) { // asynchronous case 1.164 + NS_ASSERTION(mEventTarget, "mEventTarget is null, mLock is not null."); 1.165 + if (!SinkIsValid()) { 1.166 + return NS_OK; // nothing to do 1.167 + } 1.168 + nsRefPtr<nsIRunnable> event = 1.169 + new nsInputStreamTeeWriteEvent(buf, count, mSink, this); 1.170 + LOG(("nsInputStreamTee::TeeSegment [%p] dispatching write %u bytes\n", 1.171 + this, count)); 1.172 + return mEventTarget->Dispatch(event, NS_DISPATCH_NORMAL); 1.173 + } else { // synchronous case 1.174 + NS_ASSERTION(!mEventTarget, "mEventTarget is not null, mLock is null."); 1.175 + nsresult rv; 1.176 + uint32_t totalBytesWritten = 0; 1.177 + while (count) { 1.178 + uint32_t bytesWritten = 0; 1.179 + rv = mSink->Write(buf + totalBytesWritten, count, &bytesWritten); 1.180 + if (NS_FAILED(rv)) { 1.181 + // ok, this is not a fatal error... just drop our reference to mSink 1.182 + // and continue on as if nothing happened. 1.183 + NS_WARNING("Write failed (non-fatal)"); 1.184 + // catch possible misuse of the input stream tee 1.185 + NS_ASSERTION(rv != NS_BASE_STREAM_WOULD_BLOCK, "sink must be a blocking stream"); 1.186 + mSink = 0; 1.187 + break; 1.188 + } 1.189 + totalBytesWritten += bytesWritten; 1.190 + NS_ASSERTION(bytesWritten <= count, "wrote too much"); 1.191 + count -= bytesWritten; 1.192 + } 1.193 + return NS_OK; 1.194 + } 1.195 +} 1.196 + 1.197 +NS_METHOD 1.198 +nsInputStreamTee::WriteSegmentFun(nsIInputStream *in, void *closure, const char *fromSegment, 1.199 + uint32_t offset, uint32_t count, uint32_t *writeCount) 1.200 +{ 1.201 + nsInputStreamTee *tee = reinterpret_cast<nsInputStreamTee *>(closure); 1.202 + 1.203 + nsresult rv = tee->mWriter(in, tee->mClosure, fromSegment, offset, count, writeCount); 1.204 + if (NS_FAILED(rv) || (*writeCount == 0)) { 1.205 + NS_ASSERTION((NS_FAILED(rv) ? (*writeCount == 0) : true), 1.206 + "writer returned an error with non-zero writeCount"); 1.207 + return rv; 1.208 + } 1.209 + 1.210 + return tee->TeeSegment(fromSegment, *writeCount); 1.211 +} 1.212 + 1.213 +NS_IMPL_ISUPPORTS(nsInputStreamTee, 1.214 + nsIInputStreamTee, 1.215 + nsIInputStream) 1.216 +NS_IMETHODIMP 1.217 +nsInputStreamTee::Close() 1.218 +{ 1.219 + if (NS_WARN_IF(!mSource)) 1.220 + return NS_ERROR_NOT_INITIALIZED; 1.221 + nsresult rv = mSource->Close(); 1.222 + mSource = 0; 1.223 + mSink = 0; 1.224 + return rv; 1.225 +} 1.226 + 1.227 +NS_IMETHODIMP 1.228 +nsInputStreamTee::Available(uint64_t *avail) 1.229 +{ 1.230 + if (NS_WARN_IF(!mSource)) 1.231 + return NS_ERROR_NOT_INITIALIZED; 1.232 + return mSource->Available(avail); 1.233 +} 1.234 + 1.235 +NS_IMETHODIMP 1.236 +nsInputStreamTee::Read(char *buf, uint32_t count, uint32_t *bytesRead) 1.237 +{ 1.238 + if (NS_WARN_IF(!mSource)) 1.239 + return NS_ERROR_NOT_INITIALIZED; 1.240 + 1.241 + nsresult rv = mSource->Read(buf, count, bytesRead); 1.242 + if (NS_FAILED(rv) || (*bytesRead == 0)) 1.243 + return rv; 1.244 + 1.245 + return TeeSegment(buf, *bytesRead); 1.246 +} 1.247 + 1.248 +NS_IMETHODIMP 1.249 +nsInputStreamTee::ReadSegments(nsWriteSegmentFun writer, 1.250 + void *closure, 1.251 + uint32_t count, 1.252 + uint32_t *bytesRead) 1.253 +{ 1.254 + if (NS_WARN_IF(!mSource)) 1.255 + return NS_ERROR_NOT_INITIALIZED; 1.256 + 1.257 + mWriter = writer; 1.258 + mClosure = closure; 1.259 + 1.260 + return mSource->ReadSegments(WriteSegmentFun, this, count, bytesRead); 1.261 +} 1.262 + 1.263 +NS_IMETHODIMP 1.264 +nsInputStreamTee::IsNonBlocking(bool *result) 1.265 +{ 1.266 + if (NS_WARN_IF(!mSource)) 1.267 + return NS_ERROR_NOT_INITIALIZED; 1.268 + return mSource->IsNonBlocking(result); 1.269 +} 1.270 + 1.271 +NS_IMETHODIMP 1.272 +nsInputStreamTee::SetSource(nsIInputStream *source) 1.273 +{ 1.274 + mSource = source; 1.275 + return NS_OK; 1.276 +} 1.277 + 1.278 +NS_IMETHODIMP 1.279 +nsInputStreamTee::GetSource(nsIInputStream **source) 1.280 +{ 1.281 + NS_IF_ADDREF(*source = mSource); 1.282 + return NS_OK; 1.283 +} 1.284 + 1.285 +NS_IMETHODIMP 1.286 +nsInputStreamTee::SetSink(nsIOutputStream *sink) 1.287 +{ 1.288 +#ifdef DEBUG 1.289 + if (sink) { 1.290 + bool nonBlocking; 1.291 + nsresult rv = sink->IsNonBlocking(&nonBlocking); 1.292 + if (NS_FAILED(rv) || nonBlocking) 1.293 + NS_ERROR("sink should be a blocking stream"); 1.294 + } 1.295 +#endif 1.296 + mSink = sink; 1.297 + return NS_OK; 1.298 +} 1.299 + 1.300 +NS_IMETHODIMP 1.301 +nsInputStreamTee::GetSink(nsIOutputStream **sink) 1.302 +{ 1.303 + NS_IF_ADDREF(*sink = mSink); 1.304 + return NS_OK; 1.305 +} 1.306 + 1.307 +NS_IMETHODIMP 1.308 +nsInputStreamTee::SetEventTarget(nsIEventTarget *anEventTarget) 1.309 +{ 1.310 + mEventTarget = anEventTarget; 1.311 + if (mEventTarget) { 1.312 + // Only need synchronization if this is an async tee 1.313 + mLock = new Mutex("nsInputStreamTee.mLock"); 1.314 + } 1.315 + return NS_OK; 1.316 +} 1.317 + 1.318 +NS_IMETHODIMP 1.319 +nsInputStreamTee::GetEventTarget(nsIEventTarget **anEventTarget) 1.320 +{ 1.321 + NS_IF_ADDREF(*anEventTarget = mEventTarget); 1.322 + return NS_OK; 1.323 +} 1.324 + 1.325 + 1.326 +nsresult 1.327 +NS_NewInputStreamTeeAsync(nsIInputStream **result, 1.328 + nsIInputStream *source, 1.329 + nsIOutputStream *sink, 1.330 + nsIEventTarget *anEventTarget) 1.331 +{ 1.332 + nsresult rv; 1.333 + 1.334 + nsCOMPtr<nsIInputStreamTee> tee = new nsInputStreamTee(); 1.335 + if (!tee) 1.336 + return NS_ERROR_OUT_OF_MEMORY; 1.337 + 1.338 + rv = tee->SetSource(source); 1.339 + if (NS_FAILED(rv)) return rv; 1.340 + 1.341 + rv = tee->SetSink(sink); 1.342 + if (NS_FAILED(rv)) return rv; 1.343 + 1.344 + rv = tee->SetEventTarget(anEventTarget); 1.345 + if (NS_FAILED(rv)) return rv; 1.346 + 1.347 + NS_ADDREF(*result = tee); 1.348 + return rv; 1.349 +} 1.350 + 1.351 +nsresult 1.352 +NS_NewInputStreamTee(nsIInputStream **result, 1.353 + nsIInputStream *source, 1.354 + nsIOutputStream *sink) 1.355 +{ 1.356 + return NS_NewInputStreamTeeAsync(result, source, sink, nullptr); 1.357 +} 1.358 + 1.359 +#undef LOG