michael@0: // StreamBinder.cpp michael@0: michael@0: #include "StdAfx.h" michael@0: michael@0: #include "StreamBinder.h" michael@0: #include "../../Common/Defs.h" michael@0: #include "../../Common/MyCom.h" michael@0: michael@0: using namespace NWindows; michael@0: using namespace NSynchronization; michael@0: michael@0: class CSequentialInStreamForBinder: michael@0: public ISequentialInStream, michael@0: public CMyUnknownImp michael@0: { michael@0: public: michael@0: MY_UNKNOWN_IMP michael@0: michael@0: STDMETHOD(Read)(void *data, UInt32 size, UInt32 *processedSize); michael@0: private: michael@0: CStreamBinder *m_StreamBinder; michael@0: public: michael@0: ~CSequentialInStreamForBinder() { m_StreamBinder->CloseRead(); } michael@0: void SetBinder(CStreamBinder *streamBinder) { m_StreamBinder = streamBinder; } michael@0: }; michael@0: michael@0: STDMETHODIMP CSequentialInStreamForBinder::Read(void *data, UInt32 size, UInt32 *processedSize) michael@0: { return m_StreamBinder->Read(data, size, processedSize); } michael@0: michael@0: class CSequentialOutStreamForBinder: michael@0: public ISequentialOutStream, michael@0: public CMyUnknownImp michael@0: { michael@0: public: michael@0: MY_UNKNOWN_IMP michael@0: michael@0: STDMETHOD(Write)(const void *data, UInt32 size, UInt32 *processedSize); michael@0: michael@0: private: michael@0: CStreamBinder *m_StreamBinder; michael@0: public: michael@0: ~CSequentialOutStreamForBinder() { m_StreamBinder->CloseWrite(); } michael@0: void SetBinder(CStreamBinder *streamBinder) { m_StreamBinder = streamBinder; } michael@0: }; michael@0: michael@0: STDMETHODIMP CSequentialOutStreamForBinder::Write(const void *data, UInt32 size, UInt32 *processedSize) michael@0: { return m_StreamBinder->Write(data, size, processedSize); } michael@0: michael@0: michael@0: ////////////////////////// michael@0: // CStreamBinder michael@0: // (_thereAreBytesToReadEvent && _bufferSize == 0) means that stream is finished. michael@0: michael@0: void CStreamBinder::CreateEvents() michael@0: { michael@0: _allBytesAreWritenEvent = new CManualResetEvent(true); michael@0: _thereAreBytesToReadEvent = new CManualResetEvent(false); michael@0: _readStreamIsClosedEvent = new CManualResetEvent(false); michael@0: } michael@0: michael@0: void CStreamBinder::ReInit() michael@0: { michael@0: _thereAreBytesToReadEvent->Reset(); michael@0: _readStreamIsClosedEvent->Reset(); michael@0: ProcessedSize = 0; michael@0: } michael@0: michael@0: CStreamBinder::~CStreamBinder() michael@0: { michael@0: if (_allBytesAreWritenEvent != NULL) michael@0: delete _allBytesAreWritenEvent; michael@0: if (_thereAreBytesToReadEvent != NULL) michael@0: delete _thereAreBytesToReadEvent; michael@0: if (_readStreamIsClosedEvent != NULL) michael@0: delete _readStreamIsClosedEvent; michael@0: } michael@0: michael@0: michael@0: michael@0: michael@0: void CStreamBinder::CreateStreams(ISequentialInStream **inStream, michael@0: ISequentialOutStream **outStream) michael@0: { michael@0: CSequentialInStreamForBinder *inStreamSpec = new michael@0: CSequentialInStreamForBinder; michael@0: CMyComPtr inStreamLoc(inStreamSpec); michael@0: inStreamSpec->SetBinder(this); michael@0: *inStream = inStreamLoc.Detach(); michael@0: michael@0: CSequentialOutStreamForBinder *outStreamSpec = new michael@0: CSequentialOutStreamForBinder; michael@0: CMyComPtr outStreamLoc(outStreamSpec); michael@0: outStreamSpec->SetBinder(this); michael@0: *outStream = outStreamLoc.Detach(); michael@0: michael@0: _buffer = NULL; michael@0: _bufferSize= 0; michael@0: ProcessedSize = 0; michael@0: } michael@0: michael@0: HRESULT CStreamBinder::Read(void *data, UInt32 size, UInt32 *processedSize) michael@0: { michael@0: UInt32 sizeToRead = size; michael@0: if (size > 0) michael@0: { michael@0: if(!_thereAreBytesToReadEvent->Lock()) michael@0: return E_FAIL; michael@0: sizeToRead = MyMin(_bufferSize, size); michael@0: if (_bufferSize > 0) michael@0: { michael@0: MoveMemory(data, _buffer, sizeToRead); michael@0: _buffer = ((const Byte *)_buffer) + sizeToRead; michael@0: _bufferSize -= sizeToRead; michael@0: if (_bufferSize == 0) michael@0: { michael@0: _thereAreBytesToReadEvent->Reset(); michael@0: _allBytesAreWritenEvent->Set(); michael@0: } michael@0: } michael@0: } michael@0: if (processedSize != NULL) michael@0: *processedSize = sizeToRead; michael@0: ProcessedSize += sizeToRead; michael@0: return S_OK; michael@0: } michael@0: michael@0: void CStreamBinder::CloseRead() michael@0: { michael@0: _readStreamIsClosedEvent->Set(); michael@0: } michael@0: michael@0: HRESULT CStreamBinder::Write(const void *data, UInt32 size, UInt32 *processedSize) michael@0: { michael@0: if (size > 0) michael@0: { michael@0: _buffer = data; michael@0: _bufferSize = size; michael@0: _allBytesAreWritenEvent->Reset(); michael@0: _thereAreBytesToReadEvent->Set(); michael@0: michael@0: HANDLE events[2]; michael@0: events[0] = *_allBytesAreWritenEvent; michael@0: events[1] = *_readStreamIsClosedEvent; michael@0: DWORD waitResult = ::WaitForMultipleObjects(2, events, FALSE, INFINITE); michael@0: if (waitResult != WAIT_OBJECT_0 + 0) michael@0: { michael@0: // ReadingWasClosed = true; michael@0: return E_FAIL; michael@0: } michael@0: // if(!_allBytesAreWritenEvent.Lock()) michael@0: // return E_FAIL; michael@0: } michael@0: if (processedSize != NULL) michael@0: *processedSize = size; michael@0: return S_OK; michael@0: } michael@0: michael@0: void CStreamBinder::CloseWrite() michael@0: { michael@0: // _bufferSize must be = 0 michael@0: _thereAreBytesToReadEvent->Set(); michael@0: }