michael@0: // CoderMixer2MT.cpp michael@0: michael@0: #include "StdAfx.h" michael@0: michael@0: #include "CoderMixer2MT.h" michael@0: #include "CrossThreadProgress.h" michael@0: michael@0: using namespace NWindows; michael@0: using namespace NSynchronization; michael@0: michael@0: namespace NCoderMixer2 { michael@0: michael@0: CThreadCoderInfo::CThreadCoderInfo(UInt32 numInStreams, UInt32 numOutStreams): michael@0: ExitEvent(NULL), michael@0: CompressEvent(NULL), michael@0: CompressionCompletedEvent(NULL), michael@0: CCoderInfo(numInStreams, numOutStreams) michael@0: { michael@0: InStreams.Reserve(NumInStreams); michael@0: InStreamPointers.Reserve(NumInStreams); michael@0: OutStreams.Reserve(NumOutStreams); michael@0: OutStreamPointers.Reserve(NumOutStreams); michael@0: } michael@0: michael@0: void CThreadCoderInfo::CreateEvents() michael@0: { michael@0: CompressEvent = new CAutoResetEvent(false); michael@0: CompressionCompletedEvent = new CAutoResetEvent(false); michael@0: } michael@0: michael@0: CThreadCoderInfo::~CThreadCoderInfo() michael@0: { michael@0: if (CompressEvent != NULL) michael@0: delete CompressEvent; michael@0: if (CompressionCompletedEvent != NULL) michael@0: delete CompressionCompletedEvent; michael@0: } michael@0: michael@0: class CCoderInfoFlusher2 michael@0: { michael@0: CThreadCoderInfo *m_CoderInfo; michael@0: public: michael@0: CCoderInfoFlusher2(CThreadCoderInfo *coderInfo): m_CoderInfo(coderInfo) {} michael@0: ~CCoderInfoFlusher2() michael@0: { michael@0: int i; michael@0: for (i = 0; i < m_CoderInfo->InStreams.Size(); i++) michael@0: m_CoderInfo->InStreams[i].Release(); michael@0: for (i = 0; i < m_CoderInfo->OutStreams.Size(); i++) michael@0: m_CoderInfo->OutStreams[i].Release(); michael@0: m_CoderInfo->CompressionCompletedEvent->Set(); michael@0: } michael@0: }; michael@0: michael@0: bool CThreadCoderInfo::WaitAndCode() michael@0: { michael@0: HANDLE events[2] = { ExitEvent, *CompressEvent }; michael@0: DWORD waitResult = ::WaitForMultipleObjects(2, events, FALSE, INFINITE); michael@0: if (waitResult == WAIT_OBJECT_0 + 0) michael@0: return false; michael@0: michael@0: { michael@0: InStreamPointers.Clear(); michael@0: OutStreamPointers.Clear(); michael@0: UInt32 i; michael@0: for (i = 0; i < NumInStreams; i++) michael@0: { michael@0: if (InSizePointers[i] != NULL) michael@0: InSizePointers[i] = &InSizes[i]; michael@0: InStreamPointers.Add(InStreams[i]); michael@0: } michael@0: for (i = 0; i < NumOutStreams; i++) michael@0: { michael@0: if (OutSizePointers[i] != NULL) michael@0: OutSizePointers[i] = &OutSizes[i]; michael@0: OutStreamPointers.Add(OutStreams[i]); michael@0: } michael@0: CCoderInfoFlusher2 coderInfoFlusher(this); michael@0: if (Coder) michael@0: Result = Coder->Code(InStreamPointers[0], michael@0: OutStreamPointers[0], michael@0: InSizePointers[0], michael@0: OutSizePointers[0], michael@0: Progress); michael@0: else michael@0: Result = Coder2->Code(&InStreamPointers.Front(), michael@0: &InSizePointers.Front(), michael@0: NumInStreams, michael@0: &OutStreamPointers.Front(), michael@0: &OutSizePointers.Front(), michael@0: NumOutStreams, michael@0: Progress); michael@0: } michael@0: return true; michael@0: } michael@0: michael@0: static void SetSizes(const UInt64 **srcSizes, CRecordVector &sizes, michael@0: CRecordVector &sizePointers, UInt32 numItems) michael@0: { michael@0: sizes.Clear(); michael@0: sizePointers.Clear(); michael@0: for(UInt32 i = 0; i < numItems; i++) michael@0: { michael@0: if (srcSizes == 0 || srcSizes[i] == NULL) michael@0: { michael@0: sizes.Add(0); michael@0: sizePointers.Add(NULL); michael@0: } michael@0: else michael@0: { michael@0: sizes.Add(*srcSizes[i]); michael@0: sizePointers.Add(&sizes.Back()); michael@0: } michael@0: } michael@0: } michael@0: michael@0: michael@0: void CThreadCoderInfo::SetCoderInfo(const UInt64 **inSizes, michael@0: const UInt64 **outSizes, ICompressProgressInfo *progress) michael@0: { michael@0: Progress = progress; michael@0: SetSizes(inSizes, InSizes, InSizePointers, NumInStreams); michael@0: SetSizes(outSizes, OutSizes, OutSizePointers, NumOutStreams); michael@0: } michael@0: michael@0: static DWORD WINAPI CoderThread(void *threadCoderInfo) michael@0: { michael@0: while(true) michael@0: { michael@0: if (!((CThreadCoderInfo *)threadCoderInfo)->WaitAndCode()) michael@0: return 0; michael@0: } michael@0: } michael@0: michael@0: ////////////////////////////////////// michael@0: // CCoderMixer2MT michael@0: michael@0: static DWORD WINAPI MainCoderThread(void *threadCoderInfo) michael@0: { michael@0: while(true) michael@0: { michael@0: if (!((CCoderMixer2MT *)threadCoderInfo)->MyCode()) michael@0: return 0; michael@0: } michael@0: } michael@0: michael@0: CCoderMixer2MT::CCoderMixer2MT() michael@0: { michael@0: if (!_mainThread.Create(MainCoderThread, this)) michael@0: throw 271825; michael@0: } michael@0: michael@0: CCoderMixer2MT::~CCoderMixer2MT() michael@0: { michael@0: _exitEvent.Set(); michael@0: _mainThread.Wait(); michael@0: for(int i = 0; i < _threads.Size(); i++) michael@0: { michael@0: _threads[i].Wait(); michael@0: _threads[i].Close(); michael@0: } michael@0: } michael@0: michael@0: void CCoderMixer2MT::SetBindInfo(const CBindInfo &bindInfo) michael@0: { michael@0: _bindInfo = bindInfo; michael@0: _streamBinders.Clear(); michael@0: for(int i = 0; i < _bindInfo.BindPairs.Size(); i++) michael@0: { michael@0: _streamBinders.Add(CStreamBinder()); michael@0: _streamBinders.Back().CreateEvents(); michael@0: } michael@0: } michael@0: michael@0: void CCoderMixer2MT::AddCoderCommon() michael@0: { michael@0: int index = _coderInfoVector.Size(); michael@0: const CCoderStreamsInfo &CoderStreamsInfo = _bindInfo.Coders[index]; michael@0: michael@0: CThreadCoderInfo threadCoderInfo(CoderStreamsInfo.NumInStreams, michael@0: CoderStreamsInfo.NumOutStreams); michael@0: _coderInfoVector.Add(threadCoderInfo); michael@0: _coderInfoVector.Back().CreateEvents(); michael@0: _coderInfoVector.Back().ExitEvent = _exitEvent; michael@0: _compressingCompletedEvents.Add(*_coderInfoVector.Back().CompressionCompletedEvent); michael@0: michael@0: NWindows::CThread newThread; michael@0: _threads.Add(newThread); michael@0: if (!_threads.Back().Create(CoderThread, &_coderInfoVector.Back())) michael@0: throw 271824; michael@0: } michael@0: michael@0: void CCoderMixer2MT::AddCoder(ICompressCoder *coder) michael@0: { michael@0: AddCoderCommon(); michael@0: _coderInfoVector.Back().Coder = coder; michael@0: } michael@0: michael@0: void CCoderMixer2MT::AddCoder2(ICompressCoder2 *coder) michael@0: { michael@0: AddCoderCommon(); michael@0: _coderInfoVector.Back().Coder2 = coder; michael@0: } michael@0: michael@0: /* michael@0: void CCoderMixer2MT::FinishAddingCoders() michael@0: { michael@0: for(int i = 0; i < _coderInfoVector.Size(); i++) michael@0: { michael@0: DWORD id; michael@0: HANDLE newThread = ::CreateThread(NULL, 0, CoderThread, michael@0: &_coderInfoVector[i], 0, &id); michael@0: if (newThread == 0) michael@0: throw 271824; michael@0: _threads.Add(newThread); michael@0: } michael@0: } michael@0: */ michael@0: michael@0: void CCoderMixer2MT::ReInit() michael@0: { michael@0: for(int i = 0; i < _streamBinders.Size(); i++) michael@0: _streamBinders[i].ReInit(); michael@0: } michael@0: michael@0: michael@0: STDMETHODIMP CCoderMixer2MT::Init(ISequentialInStream **inStreams, michael@0: ISequentialOutStream **outStreams) michael@0: { michael@0: if (_coderInfoVector.Size() != _bindInfo.Coders.Size()) michael@0: throw 0; michael@0: int i; michael@0: for(i = 0; i < _coderInfoVector.Size(); i++) michael@0: { michael@0: CThreadCoderInfo &coderInfo = _coderInfoVector[i]; michael@0: const CCoderStreamsInfo &coderStreamsInfo = _bindInfo.Coders[i]; michael@0: coderInfo.InStreams.Clear(); michael@0: UInt32 j; michael@0: for(j = 0; j < coderStreamsInfo.NumInStreams; j++) michael@0: coderInfo.InStreams.Add(NULL); michael@0: coderInfo.OutStreams.Clear(); michael@0: for(j = 0; j < coderStreamsInfo.NumOutStreams; j++) michael@0: coderInfo.OutStreams.Add(NULL); michael@0: } michael@0: michael@0: for(i = 0; i < _bindInfo.BindPairs.Size(); i++) michael@0: { michael@0: const CBindPair &bindPair = _bindInfo.BindPairs[i]; michael@0: UInt32 inCoderIndex, inCoderStreamIndex; michael@0: UInt32 outCoderIndex, outCoderStreamIndex; michael@0: _bindInfo.FindInStream(bindPair.InIndex, inCoderIndex, inCoderStreamIndex); michael@0: _bindInfo.FindOutStream(bindPair.OutIndex, outCoderIndex, outCoderStreamIndex); michael@0: michael@0: _streamBinders[i].CreateStreams( michael@0: &_coderInfoVector[inCoderIndex].InStreams[inCoderStreamIndex], michael@0: &_coderInfoVector[outCoderIndex].OutStreams[outCoderStreamIndex]); michael@0: } michael@0: michael@0: for(i = 0; i < _bindInfo.InStreams.Size(); i++) michael@0: { michael@0: UInt32 inCoderIndex, inCoderStreamIndex; michael@0: _bindInfo.FindInStream(_bindInfo.InStreams[i], inCoderIndex, inCoderStreamIndex); michael@0: _coderInfoVector[inCoderIndex].InStreams[inCoderStreamIndex] = inStreams[i]; michael@0: } michael@0: michael@0: for(i = 0; i < _bindInfo.OutStreams.Size(); i++) michael@0: { michael@0: UInt32 outCoderIndex, outCoderStreamIndex; michael@0: _bindInfo.FindOutStream(_bindInfo.OutStreams[i], outCoderIndex, outCoderStreamIndex); michael@0: _coderInfoVector[outCoderIndex].OutStreams[outCoderStreamIndex] = outStreams[i]; michael@0: } michael@0: return S_OK; michael@0: } michael@0: michael@0: michael@0: bool CCoderMixer2MT::MyCode() michael@0: { michael@0: HANDLE events[2] = { _exitEvent, _startCompressingEvent }; michael@0: DWORD waitResult = ::WaitForMultipleObjects(2, events, FALSE, INFINITE); michael@0: if (waitResult == WAIT_OBJECT_0 + 0) michael@0: return false; michael@0: michael@0: for(int i = 0; i < _coderInfoVector.Size(); i++) michael@0: _coderInfoVector[i].CompressEvent->Set(); michael@0: DWORD result = ::WaitForMultipleObjects(_compressingCompletedEvents.Size(), michael@0: &_compressingCompletedEvents.Front(), TRUE, INFINITE); michael@0: michael@0: _compressingFinishedEvent.Set(); michael@0: michael@0: return true; michael@0: } michael@0: michael@0: michael@0: STDMETHODIMP CCoderMixer2MT::Code(ISequentialInStream **inStreams, michael@0: const UInt64 **inSizes, michael@0: UInt32 numInStreams, michael@0: ISequentialOutStream **outStreams, michael@0: const UInt64 **outSizes, michael@0: UInt32 numOutStreams, michael@0: ICompressProgressInfo *progress) michael@0: { michael@0: if (numInStreams != (UInt32)_bindInfo.InStreams.Size() || michael@0: numOutStreams != (UInt32)_bindInfo.OutStreams.Size()) michael@0: return E_INVALIDARG; michael@0: michael@0: Init(inStreams, outStreams); michael@0: michael@0: _compressingFinishedEvent.Reset(); // ? michael@0: michael@0: CCrossThreadProgress *progressSpec = new CCrossThreadProgress; michael@0: CMyComPtr crossProgress = progressSpec; michael@0: progressSpec->Init(); michael@0: _coderInfoVector[_progressCoderIndex].Progress = crossProgress; michael@0: michael@0: _startCompressingEvent.Set(); michael@0: michael@0: michael@0: while (true) michael@0: { michael@0: HANDLE events[2] = {_compressingFinishedEvent, progressSpec->ProgressEvent }; michael@0: DWORD waitResult = ::WaitForMultipleObjects(2, events, FALSE, INFINITE); michael@0: if (waitResult == WAIT_OBJECT_0 + 0) michael@0: break; michael@0: if (progress != NULL) michael@0: progressSpec->Result = progress->SetRatioInfo(progressSpec->InSize, michael@0: progressSpec->OutSize); michael@0: else michael@0: progressSpec->Result = S_OK; michael@0: progressSpec->WaitEvent.Set(); michael@0: } michael@0: michael@0: int i; michael@0: for(i = 0; i < _coderInfoVector.Size(); i++) michael@0: { michael@0: HRESULT result = _coderInfoVector[i].Result; michael@0: if (result == S_FALSE) michael@0: return result; michael@0: } michael@0: for(i = 0; i < _coderInfoVector.Size(); i++) michael@0: { michael@0: HRESULT result = _coderInfoVector[i].Result; michael@0: if (result != S_OK && result != E_FAIL) michael@0: return result; michael@0: } michael@0: for(i = 0; i < _coderInfoVector.Size(); i++) michael@0: { michael@0: HRESULT result = _coderInfoVector[i].Result; michael@0: if (result != S_OK) michael@0: return result; michael@0: } michael@0: return S_OK; michael@0: } michael@0: michael@0: UInt64 CCoderMixer2MT::GetWriteProcessedSize(UInt32 binderIndex) const michael@0: { michael@0: return _streamBinders[binderIndex].ProcessedSize; michael@0: } michael@0: michael@0: }