Tue, 06 Jan 2015 21:39:09 +0100
Conditionally force memory storage according to privacy.thirdparty.isolate;
This solves Tor bug #9701, complying with disk avoidance documented in
https://www.torproject.org/projects/torbrowser/design/#disk-avoidance.
1 // CoderMixer2MT.cpp
3 #include "StdAfx.h"
5 #include "CoderMixer2MT.h"
6 #include "CrossThreadProgress.h"
8 using namespace NWindows;
9 using namespace NSynchronization;
11 namespace NCoderMixer2 {
13 CThreadCoderInfo::CThreadCoderInfo(UInt32 numInStreams, UInt32 numOutStreams):
14 ExitEvent(NULL),
15 CompressEvent(NULL),
16 CompressionCompletedEvent(NULL),
17 CCoderInfo(numInStreams, numOutStreams)
18 {
19 InStreams.Reserve(NumInStreams);
20 InStreamPointers.Reserve(NumInStreams);
21 OutStreams.Reserve(NumOutStreams);
22 OutStreamPointers.Reserve(NumOutStreams);
23 }
25 void CThreadCoderInfo::CreateEvents()
26 {
27 CompressEvent = new CAutoResetEvent(false);
28 CompressionCompletedEvent = new CAutoResetEvent(false);
29 }
31 CThreadCoderInfo::~CThreadCoderInfo()
32 {
33 if (CompressEvent != NULL)
34 delete CompressEvent;
35 if (CompressionCompletedEvent != NULL)
36 delete CompressionCompletedEvent;
37 }
39 class CCoderInfoFlusher2
40 {
41 CThreadCoderInfo *m_CoderInfo;
42 public:
43 CCoderInfoFlusher2(CThreadCoderInfo *coderInfo): m_CoderInfo(coderInfo) {}
44 ~CCoderInfoFlusher2()
45 {
46 int i;
47 for (i = 0; i < m_CoderInfo->InStreams.Size(); i++)
48 m_CoderInfo->InStreams[i].Release();
49 for (i = 0; i < m_CoderInfo->OutStreams.Size(); i++)
50 m_CoderInfo->OutStreams[i].Release();
51 m_CoderInfo->CompressionCompletedEvent->Set();
52 }
53 };
55 bool CThreadCoderInfo::WaitAndCode()
56 {
57 HANDLE events[2] = { ExitEvent, *CompressEvent };
58 DWORD waitResult = ::WaitForMultipleObjects(2, events, FALSE, INFINITE);
59 if (waitResult == WAIT_OBJECT_0 + 0)
60 return false;
62 {
63 InStreamPointers.Clear();
64 OutStreamPointers.Clear();
65 UInt32 i;
66 for (i = 0; i < NumInStreams; i++)
67 {
68 if (InSizePointers[i] != NULL)
69 InSizePointers[i] = &InSizes[i];
70 InStreamPointers.Add(InStreams[i]);
71 }
72 for (i = 0; i < NumOutStreams; i++)
73 {
74 if (OutSizePointers[i] != NULL)
75 OutSizePointers[i] = &OutSizes[i];
76 OutStreamPointers.Add(OutStreams[i]);
77 }
78 CCoderInfoFlusher2 coderInfoFlusher(this);
79 if (Coder)
80 Result = Coder->Code(InStreamPointers[0],
81 OutStreamPointers[0],
82 InSizePointers[0],
83 OutSizePointers[0],
84 Progress);
85 else
86 Result = Coder2->Code(&InStreamPointers.Front(),
87 &InSizePointers.Front(),
88 NumInStreams,
89 &OutStreamPointers.Front(),
90 &OutSizePointers.Front(),
91 NumOutStreams,
92 Progress);
93 }
94 return true;
95 }
97 static void SetSizes(const UInt64 **srcSizes, CRecordVector<UInt64> &sizes,
98 CRecordVector<const UInt64 *> &sizePointers, UInt32 numItems)
99 {
100 sizes.Clear();
101 sizePointers.Clear();
102 for(UInt32 i = 0; i < numItems; i++)
103 {
104 if (srcSizes == 0 || srcSizes[i] == NULL)
105 {
106 sizes.Add(0);
107 sizePointers.Add(NULL);
108 }
109 else
110 {
111 sizes.Add(*srcSizes[i]);
112 sizePointers.Add(&sizes.Back());
113 }
114 }
115 }
118 void CThreadCoderInfo::SetCoderInfo(const UInt64 **inSizes,
119 const UInt64 **outSizes, ICompressProgressInfo *progress)
120 {
121 Progress = progress;
122 SetSizes(inSizes, InSizes, InSizePointers, NumInStreams);
123 SetSizes(outSizes, OutSizes, OutSizePointers, NumOutStreams);
124 }
126 static DWORD WINAPI CoderThread(void *threadCoderInfo)
127 {
128 while(true)
129 {
130 if (!((CThreadCoderInfo *)threadCoderInfo)->WaitAndCode())
131 return 0;
132 }
133 }
135 //////////////////////////////////////
136 // CCoderMixer2MT
138 static DWORD WINAPI MainCoderThread(void *threadCoderInfo)
139 {
140 while(true)
141 {
142 if (!((CCoderMixer2MT *)threadCoderInfo)->MyCode())
143 return 0;
144 }
145 }
147 CCoderMixer2MT::CCoderMixer2MT()
148 {
149 if (!_mainThread.Create(MainCoderThread, this))
150 throw 271825;
151 }
153 CCoderMixer2MT::~CCoderMixer2MT()
154 {
155 _exitEvent.Set();
156 _mainThread.Wait();
157 for(int i = 0; i < _threads.Size(); i++)
158 {
159 _threads[i].Wait();
160 _threads[i].Close();
161 }
162 }
164 void CCoderMixer2MT::SetBindInfo(const CBindInfo &bindInfo)
165 {
166 _bindInfo = bindInfo;
167 _streamBinders.Clear();
168 for(int i = 0; i < _bindInfo.BindPairs.Size(); i++)
169 {
170 _streamBinders.Add(CStreamBinder());
171 _streamBinders.Back().CreateEvents();
172 }
173 }
175 void CCoderMixer2MT::AddCoderCommon()
176 {
177 int index = _coderInfoVector.Size();
178 const CCoderStreamsInfo &CoderStreamsInfo = _bindInfo.Coders[index];
180 CThreadCoderInfo threadCoderInfo(CoderStreamsInfo.NumInStreams,
181 CoderStreamsInfo.NumOutStreams);
182 _coderInfoVector.Add(threadCoderInfo);
183 _coderInfoVector.Back().CreateEvents();
184 _coderInfoVector.Back().ExitEvent = _exitEvent;
185 _compressingCompletedEvents.Add(*_coderInfoVector.Back().CompressionCompletedEvent);
187 NWindows::CThread newThread;
188 _threads.Add(newThread);
189 if (!_threads.Back().Create(CoderThread, &_coderInfoVector.Back()))
190 throw 271824;
191 }
193 void CCoderMixer2MT::AddCoder(ICompressCoder *coder)
194 {
195 AddCoderCommon();
196 _coderInfoVector.Back().Coder = coder;
197 }
199 void CCoderMixer2MT::AddCoder2(ICompressCoder2 *coder)
200 {
201 AddCoderCommon();
202 _coderInfoVector.Back().Coder2 = coder;
203 }
205 /*
206 void CCoderMixer2MT::FinishAddingCoders()
207 {
208 for(int i = 0; i < _coderInfoVector.Size(); i++)
209 {
210 DWORD id;
211 HANDLE newThread = ::CreateThread(NULL, 0, CoderThread,
212 &_coderInfoVector[i], 0, &id);
213 if (newThread == 0)
214 throw 271824;
215 _threads.Add(newThread);
216 }
217 }
218 */
220 void CCoderMixer2MT::ReInit()
221 {
222 for(int i = 0; i < _streamBinders.Size(); i++)
223 _streamBinders[i].ReInit();
224 }
227 STDMETHODIMP CCoderMixer2MT::Init(ISequentialInStream **inStreams,
228 ISequentialOutStream **outStreams)
229 {
230 if (_coderInfoVector.Size() != _bindInfo.Coders.Size())
231 throw 0;
232 int i;
233 for(i = 0; i < _coderInfoVector.Size(); i++)
234 {
235 CThreadCoderInfo &coderInfo = _coderInfoVector[i];
236 const CCoderStreamsInfo &coderStreamsInfo = _bindInfo.Coders[i];
237 coderInfo.InStreams.Clear();
238 UInt32 j;
239 for(j = 0; j < coderStreamsInfo.NumInStreams; j++)
240 coderInfo.InStreams.Add(NULL);
241 coderInfo.OutStreams.Clear();
242 for(j = 0; j < coderStreamsInfo.NumOutStreams; j++)
243 coderInfo.OutStreams.Add(NULL);
244 }
246 for(i = 0; i < _bindInfo.BindPairs.Size(); i++)
247 {
248 const CBindPair &bindPair = _bindInfo.BindPairs[i];
249 UInt32 inCoderIndex, inCoderStreamIndex;
250 UInt32 outCoderIndex, outCoderStreamIndex;
251 _bindInfo.FindInStream(bindPair.InIndex, inCoderIndex, inCoderStreamIndex);
252 _bindInfo.FindOutStream(bindPair.OutIndex, outCoderIndex, outCoderStreamIndex);
254 _streamBinders[i].CreateStreams(
255 &_coderInfoVector[inCoderIndex].InStreams[inCoderStreamIndex],
256 &_coderInfoVector[outCoderIndex].OutStreams[outCoderStreamIndex]);
257 }
259 for(i = 0; i < _bindInfo.InStreams.Size(); i++)
260 {
261 UInt32 inCoderIndex, inCoderStreamIndex;
262 _bindInfo.FindInStream(_bindInfo.InStreams[i], inCoderIndex, inCoderStreamIndex);
263 _coderInfoVector[inCoderIndex].InStreams[inCoderStreamIndex] = inStreams[i];
264 }
266 for(i = 0; i < _bindInfo.OutStreams.Size(); i++)
267 {
268 UInt32 outCoderIndex, outCoderStreamIndex;
269 _bindInfo.FindOutStream(_bindInfo.OutStreams[i], outCoderIndex, outCoderStreamIndex);
270 _coderInfoVector[outCoderIndex].OutStreams[outCoderStreamIndex] = outStreams[i];
271 }
272 return S_OK;
273 }
276 bool CCoderMixer2MT::MyCode()
277 {
278 HANDLE events[2] = { _exitEvent, _startCompressingEvent };
279 DWORD waitResult = ::WaitForMultipleObjects(2, events, FALSE, INFINITE);
280 if (waitResult == WAIT_OBJECT_0 + 0)
281 return false;
283 for(int i = 0; i < _coderInfoVector.Size(); i++)
284 _coderInfoVector[i].CompressEvent->Set();
285 DWORD result = ::WaitForMultipleObjects(_compressingCompletedEvents.Size(),
286 &_compressingCompletedEvents.Front(), TRUE, INFINITE);
288 _compressingFinishedEvent.Set();
290 return true;
291 }
294 STDMETHODIMP CCoderMixer2MT::Code(ISequentialInStream **inStreams,
295 const UInt64 **inSizes,
296 UInt32 numInStreams,
297 ISequentialOutStream **outStreams,
298 const UInt64 **outSizes,
299 UInt32 numOutStreams,
300 ICompressProgressInfo *progress)
301 {
302 if (numInStreams != (UInt32)_bindInfo.InStreams.Size() ||
303 numOutStreams != (UInt32)_bindInfo.OutStreams.Size())
304 return E_INVALIDARG;
306 Init(inStreams, outStreams);
308 _compressingFinishedEvent.Reset(); // ?
310 CCrossThreadProgress *progressSpec = new CCrossThreadProgress;
311 CMyComPtr<ICompressProgressInfo> crossProgress = progressSpec;
312 progressSpec->Init();
313 _coderInfoVector[_progressCoderIndex].Progress = crossProgress;
315 _startCompressingEvent.Set();
318 while (true)
319 {
320 HANDLE events[2] = {_compressingFinishedEvent, progressSpec->ProgressEvent };
321 DWORD waitResult = ::WaitForMultipleObjects(2, events, FALSE, INFINITE);
322 if (waitResult == WAIT_OBJECT_0 + 0)
323 break;
324 if (progress != NULL)
325 progressSpec->Result = progress->SetRatioInfo(progressSpec->InSize,
326 progressSpec->OutSize);
327 else
328 progressSpec->Result = S_OK;
329 progressSpec->WaitEvent.Set();
330 }
332 int i;
333 for(i = 0; i < _coderInfoVector.Size(); i++)
334 {
335 HRESULT result = _coderInfoVector[i].Result;
336 if (result == S_FALSE)
337 return result;
338 }
339 for(i = 0; i < _coderInfoVector.Size(); i++)
340 {
341 HRESULT result = _coderInfoVector[i].Result;
342 if (result != S_OK && result != E_FAIL)
343 return result;
344 }
345 for(i = 0; i < _coderInfoVector.Size(); i++)
346 {
347 HRESULT result = _coderInfoVector[i].Result;
348 if (result != S_OK)
349 return result;
350 }
351 return S_OK;
352 }
354 UInt64 CCoderMixer2MT::GetWriteProcessedSize(UInt32 binderIndex) const
355 {
356 return _streamBinders[binderIndex].ProcessedSize;
357 }
359 }