|
1 // CoderMixer2MT.cpp |
|
2 |
|
3 #include "StdAfx.h" |
|
4 |
|
5 #include "CoderMixer2MT.h" |
|
6 #include "CrossThreadProgress.h" |
|
7 |
|
8 using namespace NWindows; |
|
9 using namespace NSynchronization; |
|
10 |
|
11 namespace NCoderMixer2 { |
|
12 |
|
13 CThreadCoderInfo::CThreadCoderInfo(UInt32 numInStreams, UInt32 numOutStreams): |
|
14 ExitEvent(NULL), |
|
15 CompressEvent(NULL), |
|
16 CompressionCompletedEvent(NULL), |
|
17 CCoderInfo(numInStreams, numOutStreams) |
|
18 { |
|
19 InStreams.Reserve(NumInStreams); |
|
20 InStreamPointers.Reserve(NumInStreams); |
|
21 OutStreams.Reserve(NumOutStreams); |
|
22 OutStreamPointers.Reserve(NumOutStreams); |
|
23 } |
|
24 |
|
25 void CThreadCoderInfo::CreateEvents() |
|
26 { |
|
27 CompressEvent = new CAutoResetEvent(false); |
|
28 CompressionCompletedEvent = new CAutoResetEvent(false); |
|
29 } |
|
30 |
|
31 CThreadCoderInfo::~CThreadCoderInfo() |
|
32 { |
|
33 if (CompressEvent != NULL) |
|
34 delete CompressEvent; |
|
35 if (CompressionCompletedEvent != NULL) |
|
36 delete CompressionCompletedEvent; |
|
37 } |
|
38 |
|
39 class CCoderInfoFlusher2 |
|
40 { |
|
41 CThreadCoderInfo *m_CoderInfo; |
|
42 public: |
|
43 CCoderInfoFlusher2(CThreadCoderInfo *coderInfo): m_CoderInfo(coderInfo) {} |
|
44 ~CCoderInfoFlusher2() |
|
45 { |
|
46 int i; |
|
47 for (i = 0; i < m_CoderInfo->InStreams.Size(); i++) |
|
48 m_CoderInfo->InStreams[i].Release(); |
|
49 for (i = 0; i < m_CoderInfo->OutStreams.Size(); i++) |
|
50 m_CoderInfo->OutStreams[i].Release(); |
|
51 m_CoderInfo->CompressionCompletedEvent->Set(); |
|
52 } |
|
53 }; |
|
54 |
|
55 bool CThreadCoderInfo::WaitAndCode() |
|
56 { |
|
57 HANDLE events[2] = { ExitEvent, *CompressEvent }; |
|
58 DWORD waitResult = ::WaitForMultipleObjects(2, events, FALSE, INFINITE); |
|
59 if (waitResult == WAIT_OBJECT_0 + 0) |
|
60 return false; |
|
61 |
|
62 { |
|
63 InStreamPointers.Clear(); |
|
64 OutStreamPointers.Clear(); |
|
65 UInt32 i; |
|
66 for (i = 0; i < NumInStreams; i++) |
|
67 { |
|
68 if (InSizePointers[i] != NULL) |
|
69 InSizePointers[i] = &InSizes[i]; |
|
70 InStreamPointers.Add(InStreams[i]); |
|
71 } |
|
72 for (i = 0; i < NumOutStreams; i++) |
|
73 { |
|
74 if (OutSizePointers[i] != NULL) |
|
75 OutSizePointers[i] = &OutSizes[i]; |
|
76 OutStreamPointers.Add(OutStreams[i]); |
|
77 } |
|
78 CCoderInfoFlusher2 coderInfoFlusher(this); |
|
79 if (Coder) |
|
80 Result = Coder->Code(InStreamPointers[0], |
|
81 OutStreamPointers[0], |
|
82 InSizePointers[0], |
|
83 OutSizePointers[0], |
|
84 Progress); |
|
85 else |
|
86 Result = Coder2->Code(&InStreamPointers.Front(), |
|
87 &InSizePointers.Front(), |
|
88 NumInStreams, |
|
89 &OutStreamPointers.Front(), |
|
90 &OutSizePointers.Front(), |
|
91 NumOutStreams, |
|
92 Progress); |
|
93 } |
|
94 return true; |
|
95 } |
|
96 |
|
97 static void SetSizes(const UInt64 **srcSizes, CRecordVector<UInt64> &sizes, |
|
98 CRecordVector<const UInt64 *> &sizePointers, UInt32 numItems) |
|
99 { |
|
100 sizes.Clear(); |
|
101 sizePointers.Clear(); |
|
102 for(UInt32 i = 0; i < numItems; i++) |
|
103 { |
|
104 if (srcSizes == 0 || srcSizes[i] == NULL) |
|
105 { |
|
106 sizes.Add(0); |
|
107 sizePointers.Add(NULL); |
|
108 } |
|
109 else |
|
110 { |
|
111 sizes.Add(*srcSizes[i]); |
|
112 sizePointers.Add(&sizes.Back()); |
|
113 } |
|
114 } |
|
115 } |
|
116 |
|
117 |
|
118 void CThreadCoderInfo::SetCoderInfo(const UInt64 **inSizes, |
|
119 const UInt64 **outSizes, ICompressProgressInfo *progress) |
|
120 { |
|
121 Progress = progress; |
|
122 SetSizes(inSizes, InSizes, InSizePointers, NumInStreams); |
|
123 SetSizes(outSizes, OutSizes, OutSizePointers, NumOutStreams); |
|
124 } |
|
125 |
|
126 static DWORD WINAPI CoderThread(void *threadCoderInfo) |
|
127 { |
|
128 while(true) |
|
129 { |
|
130 if (!((CThreadCoderInfo *)threadCoderInfo)->WaitAndCode()) |
|
131 return 0; |
|
132 } |
|
133 } |
|
134 |
|
135 ////////////////////////////////////// |
|
136 // CCoderMixer2MT |
|
137 |
|
138 static DWORD WINAPI MainCoderThread(void *threadCoderInfo) |
|
139 { |
|
140 while(true) |
|
141 { |
|
142 if (!((CCoderMixer2MT *)threadCoderInfo)->MyCode()) |
|
143 return 0; |
|
144 } |
|
145 } |
|
146 |
|
147 CCoderMixer2MT::CCoderMixer2MT() |
|
148 { |
|
149 if (!_mainThread.Create(MainCoderThread, this)) |
|
150 throw 271825; |
|
151 } |
|
152 |
|
153 CCoderMixer2MT::~CCoderMixer2MT() |
|
154 { |
|
155 _exitEvent.Set(); |
|
156 _mainThread.Wait(); |
|
157 for(int i = 0; i < _threads.Size(); i++) |
|
158 { |
|
159 _threads[i].Wait(); |
|
160 _threads[i].Close(); |
|
161 } |
|
162 } |
|
163 |
|
164 void CCoderMixer2MT::SetBindInfo(const CBindInfo &bindInfo) |
|
165 { |
|
166 _bindInfo = bindInfo; |
|
167 _streamBinders.Clear(); |
|
168 for(int i = 0; i < _bindInfo.BindPairs.Size(); i++) |
|
169 { |
|
170 _streamBinders.Add(CStreamBinder()); |
|
171 _streamBinders.Back().CreateEvents(); |
|
172 } |
|
173 } |
|
174 |
|
175 void CCoderMixer2MT::AddCoderCommon() |
|
176 { |
|
177 int index = _coderInfoVector.Size(); |
|
178 const CCoderStreamsInfo &CoderStreamsInfo = _bindInfo.Coders[index]; |
|
179 |
|
180 CThreadCoderInfo threadCoderInfo(CoderStreamsInfo.NumInStreams, |
|
181 CoderStreamsInfo.NumOutStreams); |
|
182 _coderInfoVector.Add(threadCoderInfo); |
|
183 _coderInfoVector.Back().CreateEvents(); |
|
184 _coderInfoVector.Back().ExitEvent = _exitEvent; |
|
185 _compressingCompletedEvents.Add(*_coderInfoVector.Back().CompressionCompletedEvent); |
|
186 |
|
187 NWindows::CThread newThread; |
|
188 _threads.Add(newThread); |
|
189 if (!_threads.Back().Create(CoderThread, &_coderInfoVector.Back())) |
|
190 throw 271824; |
|
191 } |
|
192 |
|
193 void CCoderMixer2MT::AddCoder(ICompressCoder *coder) |
|
194 { |
|
195 AddCoderCommon(); |
|
196 _coderInfoVector.Back().Coder = coder; |
|
197 } |
|
198 |
|
199 void CCoderMixer2MT::AddCoder2(ICompressCoder2 *coder) |
|
200 { |
|
201 AddCoderCommon(); |
|
202 _coderInfoVector.Back().Coder2 = coder; |
|
203 } |
|
204 |
|
205 /* |
|
206 void CCoderMixer2MT::FinishAddingCoders() |
|
207 { |
|
208 for(int i = 0; i < _coderInfoVector.Size(); i++) |
|
209 { |
|
210 DWORD id; |
|
211 HANDLE newThread = ::CreateThread(NULL, 0, CoderThread, |
|
212 &_coderInfoVector[i], 0, &id); |
|
213 if (newThread == 0) |
|
214 throw 271824; |
|
215 _threads.Add(newThread); |
|
216 } |
|
217 } |
|
218 */ |
|
219 |
|
220 void CCoderMixer2MT::ReInit() |
|
221 { |
|
222 for(int i = 0; i < _streamBinders.Size(); i++) |
|
223 _streamBinders[i].ReInit(); |
|
224 } |
|
225 |
|
226 |
|
227 STDMETHODIMP CCoderMixer2MT::Init(ISequentialInStream **inStreams, |
|
228 ISequentialOutStream **outStreams) |
|
229 { |
|
230 if (_coderInfoVector.Size() != _bindInfo.Coders.Size()) |
|
231 throw 0; |
|
232 int i; |
|
233 for(i = 0; i < _coderInfoVector.Size(); i++) |
|
234 { |
|
235 CThreadCoderInfo &coderInfo = _coderInfoVector[i]; |
|
236 const CCoderStreamsInfo &coderStreamsInfo = _bindInfo.Coders[i]; |
|
237 coderInfo.InStreams.Clear(); |
|
238 UInt32 j; |
|
239 for(j = 0; j < coderStreamsInfo.NumInStreams; j++) |
|
240 coderInfo.InStreams.Add(NULL); |
|
241 coderInfo.OutStreams.Clear(); |
|
242 for(j = 0; j < coderStreamsInfo.NumOutStreams; j++) |
|
243 coderInfo.OutStreams.Add(NULL); |
|
244 } |
|
245 |
|
246 for(i = 0; i < _bindInfo.BindPairs.Size(); i++) |
|
247 { |
|
248 const CBindPair &bindPair = _bindInfo.BindPairs[i]; |
|
249 UInt32 inCoderIndex, inCoderStreamIndex; |
|
250 UInt32 outCoderIndex, outCoderStreamIndex; |
|
251 _bindInfo.FindInStream(bindPair.InIndex, inCoderIndex, inCoderStreamIndex); |
|
252 _bindInfo.FindOutStream(bindPair.OutIndex, outCoderIndex, outCoderStreamIndex); |
|
253 |
|
254 _streamBinders[i].CreateStreams( |
|
255 &_coderInfoVector[inCoderIndex].InStreams[inCoderStreamIndex], |
|
256 &_coderInfoVector[outCoderIndex].OutStreams[outCoderStreamIndex]); |
|
257 } |
|
258 |
|
259 for(i = 0; i < _bindInfo.InStreams.Size(); i++) |
|
260 { |
|
261 UInt32 inCoderIndex, inCoderStreamIndex; |
|
262 _bindInfo.FindInStream(_bindInfo.InStreams[i], inCoderIndex, inCoderStreamIndex); |
|
263 _coderInfoVector[inCoderIndex].InStreams[inCoderStreamIndex] = inStreams[i]; |
|
264 } |
|
265 |
|
266 for(i = 0; i < _bindInfo.OutStreams.Size(); i++) |
|
267 { |
|
268 UInt32 outCoderIndex, outCoderStreamIndex; |
|
269 _bindInfo.FindOutStream(_bindInfo.OutStreams[i], outCoderIndex, outCoderStreamIndex); |
|
270 _coderInfoVector[outCoderIndex].OutStreams[outCoderStreamIndex] = outStreams[i]; |
|
271 } |
|
272 return S_OK; |
|
273 } |
|
274 |
|
275 |
|
276 bool CCoderMixer2MT::MyCode() |
|
277 { |
|
278 HANDLE events[2] = { _exitEvent, _startCompressingEvent }; |
|
279 DWORD waitResult = ::WaitForMultipleObjects(2, events, FALSE, INFINITE); |
|
280 if (waitResult == WAIT_OBJECT_0 + 0) |
|
281 return false; |
|
282 |
|
283 for(int i = 0; i < _coderInfoVector.Size(); i++) |
|
284 _coderInfoVector[i].CompressEvent->Set(); |
|
285 DWORD result = ::WaitForMultipleObjects(_compressingCompletedEvents.Size(), |
|
286 &_compressingCompletedEvents.Front(), TRUE, INFINITE); |
|
287 |
|
288 _compressingFinishedEvent.Set(); |
|
289 |
|
290 return true; |
|
291 } |
|
292 |
|
293 |
|
294 STDMETHODIMP CCoderMixer2MT::Code(ISequentialInStream **inStreams, |
|
295 const UInt64 **inSizes, |
|
296 UInt32 numInStreams, |
|
297 ISequentialOutStream **outStreams, |
|
298 const UInt64 **outSizes, |
|
299 UInt32 numOutStreams, |
|
300 ICompressProgressInfo *progress) |
|
301 { |
|
302 if (numInStreams != (UInt32)_bindInfo.InStreams.Size() || |
|
303 numOutStreams != (UInt32)_bindInfo.OutStreams.Size()) |
|
304 return E_INVALIDARG; |
|
305 |
|
306 Init(inStreams, outStreams); |
|
307 |
|
308 _compressingFinishedEvent.Reset(); // ? |
|
309 |
|
310 CCrossThreadProgress *progressSpec = new CCrossThreadProgress; |
|
311 CMyComPtr<ICompressProgressInfo> crossProgress = progressSpec; |
|
312 progressSpec->Init(); |
|
313 _coderInfoVector[_progressCoderIndex].Progress = crossProgress; |
|
314 |
|
315 _startCompressingEvent.Set(); |
|
316 |
|
317 |
|
318 while (true) |
|
319 { |
|
320 HANDLE events[2] = {_compressingFinishedEvent, progressSpec->ProgressEvent }; |
|
321 DWORD waitResult = ::WaitForMultipleObjects(2, events, FALSE, INFINITE); |
|
322 if (waitResult == WAIT_OBJECT_0 + 0) |
|
323 break; |
|
324 if (progress != NULL) |
|
325 progressSpec->Result = progress->SetRatioInfo(progressSpec->InSize, |
|
326 progressSpec->OutSize); |
|
327 else |
|
328 progressSpec->Result = S_OK; |
|
329 progressSpec->WaitEvent.Set(); |
|
330 } |
|
331 |
|
332 int i; |
|
333 for(i = 0; i < _coderInfoVector.Size(); i++) |
|
334 { |
|
335 HRESULT result = _coderInfoVector[i].Result; |
|
336 if (result == S_FALSE) |
|
337 return result; |
|
338 } |
|
339 for(i = 0; i < _coderInfoVector.Size(); i++) |
|
340 { |
|
341 HRESULT result = _coderInfoVector[i].Result; |
|
342 if (result != S_OK && result != E_FAIL) |
|
343 return result; |
|
344 } |
|
345 for(i = 0; i < _coderInfoVector.Size(); i++) |
|
346 { |
|
347 HRESULT result = _coderInfoVector[i].Result; |
|
348 if (result != S_OK) |
|
349 return result; |
|
350 } |
|
351 return S_OK; |
|
352 } |
|
353 |
|
354 UInt64 CCoderMixer2MT::GetWriteProcessedSize(UInt32 binderIndex) const |
|
355 { |
|
356 return _streamBinders[binderIndex].ProcessedSize; |
|
357 } |
|
358 |
|
359 } |