Thu, 22 Jan 2015 13:21:57 +0100
Incorporate requested changes from Mozilla in review:
https://bugzilla.mozilla.org/show_bug.cgi?id=1123480#c6
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 4 -*-
2 * vim: set ts=8 sts=4 et sw=4 tw=99:
3 * This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
7 #include "vm/ThreadPool.h"
9 #include "mozilla/Atomics.h"
11 #include "jslock.h"
13 #include "vm/ForkJoin.h"
14 #include "vm/Monitor.h"
15 #include "vm/Runtime.h"
17 using namespace js;
19 const size_t WORKER_THREAD_STACK_SIZE = 1*1024*1024;
21 static inline uint32_t
22 ComposeSliceBounds(uint16_t from, uint16_t to)
23 {
24 MOZ_ASSERT(from <= to);
25 return (uint32_t(from) << 16) | to;
26 }
28 static inline void
29 DecomposeSliceBounds(uint32_t bounds, uint16_t *from, uint16_t *to)
30 {
31 *from = bounds >> 16;
32 *to = bounds & uint16_t(~0);
33 MOZ_ASSERT(*from <= *to);
34 }
36 ThreadPoolWorker::ThreadPoolWorker(uint32_t workerId, uint32_t rngSeed, ThreadPool *pool)
37 : workerId_(workerId),
38 pool_(pool),
39 sliceBounds_(0),
40 state_(CREATED),
41 schedulerRNGState_(rngSeed)
42 { }
44 bool
45 ThreadPoolWorker::hasWork() const
46 {
47 uint16_t from, to;
48 DecomposeSliceBounds(sliceBounds_, &from, &to);
49 return from != to;
50 }
52 bool
53 ThreadPoolWorker::popSliceFront(uint16_t *sliceId)
54 {
55 uint32_t bounds;
56 uint16_t from, to;
57 do {
58 bounds = sliceBounds_;
59 DecomposeSliceBounds(bounds, &from, &to);
60 if (from == to)
61 return false;
62 } while (!sliceBounds_.compareExchange(bounds, ComposeSliceBounds(from + 1, to)));
64 *sliceId = from;
65 pool_->pendingSlices_--;
66 return true;
67 }
69 bool
70 ThreadPoolWorker::popSliceBack(uint16_t *sliceId)
71 {
72 uint32_t bounds;
73 uint16_t from, to;
74 do {
75 bounds = sliceBounds_;
76 DecomposeSliceBounds(bounds, &from, &to);
77 if (from == to)
78 return false;
79 } while (!sliceBounds_.compareExchange(bounds, ComposeSliceBounds(from, to - 1)));
81 *sliceId = to - 1;
82 pool_->pendingSlices_--;
83 return true;
84 }
86 void
87 ThreadPoolWorker::discardSlices()
88 {
89 uint32_t bounds;
90 uint16_t from, to;
91 do {
92 bounds = sliceBounds_;
93 DecomposeSliceBounds(bounds, &from, &to);
94 } while (!sliceBounds_.compareExchange(bounds, 0));
96 pool_->pendingSlices_ -= to - from;
97 }
99 bool
100 ThreadPoolWorker::stealFrom(ThreadPoolWorker *victim, uint16_t *sliceId)
101 {
102 // Instead of popping the slice from the front by incrementing sliceStart_,
103 // decrement sliceEnd_. Usually this gives us better locality.
104 if (!victim->popSliceBack(sliceId))
105 return false;
106 #ifdef DEBUG
107 pool_->stolenSlices_++;
108 #endif
109 return true;
110 }
112 ThreadPoolWorker *
113 ThreadPoolWorker::randomWorker()
114 {
115 // Perform 32-bit xorshift.
116 uint32_t x = schedulerRNGState_;
117 x ^= x << XORSHIFT_A;
118 x ^= x >> XORSHIFT_B;
119 x ^= x << XORSHIFT_C;
120 schedulerRNGState_ = x;
121 return pool_->workers_[x % pool_->numWorkers()];
122 }
124 bool
125 ThreadPoolWorker::start()
126 {
127 #ifndef JS_THREADSAFE
128 return false;
129 #else
130 if (isMainThread())
131 return true;
133 MOZ_ASSERT(state_ == CREATED);
135 // Set state to active now, *before* the thread starts:
136 state_ = ACTIVE;
138 if (!PR_CreateThread(PR_USER_THREAD,
139 HelperThreadMain, this,
140 PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
141 PR_UNJOINABLE_THREAD,
142 WORKER_THREAD_STACK_SIZE))
143 {
144 // If the thread failed to start, call it TERMINATED.
145 state_ = TERMINATED;
146 return false;
147 }
149 return true;
150 #endif
151 }
153 void
154 ThreadPoolWorker::HelperThreadMain(void *arg)
155 {
156 ThreadPoolWorker *worker = (ThreadPoolWorker*) arg;
157 worker->helperLoop();
158 }
160 void
161 ThreadPoolWorker::helperLoop()
162 {
163 MOZ_ASSERT(!isMainThread());
165 // This is hokey in the extreme. To compute the stack limit,
166 // subtract the size of the stack from the address of a local
167 // variable and give a 100k buffer. Is there a better way?
168 // (Note: 2k proved to be fine on Mac, but too little on Linux)
169 uintptr_t stackLimitOffset = WORKER_THREAD_STACK_SIZE - 100*1024;
170 uintptr_t stackLimit = (((uintptr_t)&stackLimitOffset) +
171 stackLimitOffset * JS_STACK_GROWTH_DIRECTION);
174 for (;;) {
175 // Wait for work to arrive or for us to terminate.
176 {
177 AutoLockMonitor lock(*pool_);
178 while (state_ == ACTIVE && !pool_->hasWork())
179 lock.wait();
181 if (state_ == TERMINATED) {
182 pool_->join(lock);
183 return;
184 }
186 pool_->activeWorkers_++;
187 }
189 if (!pool_->job()->executeFromWorker(this, stackLimit))
190 pool_->abortJob();
192 // Join the pool.
193 {
194 AutoLockMonitor lock(*pool_);
195 pool_->join(lock);
196 }
197 }
198 }
200 void
201 ThreadPoolWorker::submitSlices(uint16_t sliceStart, uint16_t sliceEnd)
202 {
203 MOZ_ASSERT(!hasWork());
204 sliceBounds_ = ComposeSliceBounds(sliceStart, sliceEnd);
205 }
207 bool
208 ThreadPoolWorker::getSlice(ForkJoinContext *cx, uint16_t *sliceId)
209 {
210 // First see whether we have any work ourself.
211 if (popSliceFront(sliceId))
212 return true;
214 // Try to steal work.
215 if (!pool_->workStealing())
216 return false;
218 do {
219 if (!pool_->hasWork())
220 return false;
221 } while (!stealFrom(randomWorker(), sliceId));
223 return true;
224 }
226 void
227 ThreadPoolWorker::terminate(AutoLockMonitor &lock)
228 {
229 MOZ_ASSERT(lock.isFor(*pool_));
230 MOZ_ASSERT(state_ != TERMINATED);
231 state_ = TERMINATED;
232 }
234 /////////////////////////////////////////////////////////////////////////////
235 // ThreadPool
236 //
237 // The |ThreadPool| starts up workers, submits work to them, and shuts
238 // them down when requested.
240 ThreadPool::ThreadPool(JSRuntime *rt)
241 : activeWorkers_(0),
242 joinBarrier_(nullptr),
243 job_(nullptr),
244 #ifdef DEBUG
245 runtime_(rt),
246 stolenSlices_(0),
247 #endif
248 pendingSlices_(0),
249 isMainThreadActive_(false)
250 { }
252 ThreadPool::~ThreadPool()
253 {
254 terminateWorkers();
255 #ifdef JS_THREADSAFE
256 if (joinBarrier_)
257 PR_DestroyCondVar(joinBarrier_);
258 #endif
259 }
261 bool
262 ThreadPool::init()
263 {
264 #ifdef JS_THREADSAFE
265 if (!Monitor::init())
266 return false;
267 joinBarrier_ = PR_NewCondVar(lock_);
268 return !!joinBarrier_;
269 #else
270 return true;
271 #endif
272 }
274 uint32_t
275 ThreadPool::numWorkers() const
276 {
277 #ifdef JS_THREADSAFE
278 return WorkerThreadState().cpuCount;
279 #else
280 return 1;
281 #endif
282 }
284 bool
285 ThreadPool::workStealing() const
286 {
287 #ifdef DEBUG
288 if (char *stealEnv = getenv("JS_THREADPOOL_STEAL"))
289 return !!strtol(stealEnv, nullptr, 10);
290 #endif
292 return true;
293 }
295 extern uint64_t random_next(uint64_t *, int);
297 bool
298 ThreadPool::lazyStartWorkers(JSContext *cx)
299 {
300 // Starts the workers if they have not already been started. If
301 // something goes wrong, reports an error and ensures that all
302 // partially started threads are terminated. Therefore, upon exit
303 // from this function, the workers array is either full (upon
304 // success) or empty (upon failure).
306 #ifdef JS_THREADSAFE
307 if (!workers_.empty()) {
308 MOZ_ASSERT(workers_.length() == numWorkers());
309 return true;
310 }
312 // Allocate workers array and then start the worker threads.
313 // Note that numWorkers() is the number of *desired* workers,
314 // but workers_.length() is the number of *successfully
315 // initialized* workers.
316 uint64_t rngState = 0;
317 for (uint32_t workerId = 0; workerId < numWorkers(); workerId++) {
318 uint32_t rngSeed = uint32_t(random_next(&rngState, 32));
319 ThreadPoolWorker *worker = cx->new_<ThreadPoolWorker>(workerId, rngSeed, this);
320 if (!worker || !workers_.append(worker)) {
321 terminateWorkersAndReportOOM(cx);
322 return false;
323 }
324 }
326 for (uint32_t workerId = 0; workerId < numWorkers(); workerId++) {
327 if (!workers_[workerId]->start()) {
328 // Note: do not delete worker here because it has been
329 // added to the array and hence will be deleted by
330 // |terminateWorkersAndReportOOM()|.
331 terminateWorkersAndReportOOM(cx);
332 return false;
333 }
334 }
335 #endif
337 return true;
338 }
340 void
341 ThreadPool::terminateWorkersAndReportOOM(JSContext *cx)
342 {
343 terminateWorkers();
344 MOZ_ASSERT(workers_.empty());
345 js_ReportOutOfMemory(cx);
346 }
348 void
349 ThreadPool::terminateWorkers()
350 {
351 if (workers_.length() > 0) {
352 AutoLockMonitor lock(*this);
354 // Signal to the workers they should quit.
355 for (uint32_t i = 0; i < workers_.length(); i++)
356 workers_[i]->terminate(lock);
358 // Wake up all the workers. Set the number of active workers to the
359 // current number of workers so we can make sure they all join.
360 activeWorkers_ = workers_.length() - 1;
361 lock.notifyAll();
363 // Wait for all workers to join.
364 waitForWorkers(lock);
366 while (workers_.length() > 0)
367 js_delete(workers_.popCopy());
368 }
369 }
371 void
372 ThreadPool::terminate()
373 {
374 terminateWorkers();
375 }
377 void
378 ThreadPool::join(AutoLockMonitor &lock)
379 {
380 MOZ_ASSERT(lock.isFor(*this));
381 if (--activeWorkers_ == 0)
382 lock.notify(joinBarrier_);
383 }
385 void
386 ThreadPool::waitForWorkers(AutoLockMonitor &lock)
387 {
388 MOZ_ASSERT(lock.isFor(*this));
389 while (activeWorkers_ > 0)
390 lock.wait(joinBarrier_);
391 job_ = nullptr;
392 }
394 ParallelResult
395 ThreadPool::executeJob(JSContext *cx, ParallelJob *job, uint16_t sliceStart, uint16_t sliceMax)
396 {
397 MOZ_ASSERT(sliceStart < sliceMax);
398 MOZ_ASSERT(CurrentThreadCanAccessRuntime(runtime_));
399 MOZ_ASSERT(activeWorkers_ == 0);
400 MOZ_ASSERT(!hasWork());
402 if (!lazyStartWorkers(cx))
403 return TP_FATAL;
405 // Evenly distribute slices to the workers.
406 uint16_t numSlices = sliceMax - sliceStart;
407 uint16_t slicesPerWorker = numSlices / numWorkers();
408 uint16_t leftover = numSlices % numWorkers();
409 uint16_t sliceEnd = sliceStart;
410 for (uint32_t workerId = 0; workerId < numWorkers(); workerId++) {
411 if (leftover > 0) {
412 sliceEnd += slicesPerWorker + 1;
413 leftover--;
414 } else {
415 sliceEnd += slicesPerWorker;
416 }
417 workers_[workerId]->submitSlices(sliceStart, sliceEnd);
418 sliceStart = sliceEnd;
419 }
420 MOZ_ASSERT(leftover == 0);
422 // Notify the worker threads that there's work now.
423 {
424 job_ = job;
425 pendingSlices_ = numSlices;
426 #ifdef DEBUG
427 stolenSlices_ = 0;
428 #endif
429 AutoLockMonitor lock(*this);
430 lock.notifyAll();
431 }
433 // Do work on the main thread.
434 isMainThreadActive_ = true;
435 if (!job->executeFromMainThread(mainThreadWorker()))
436 abortJob();
437 isMainThreadActive_ = false;
439 // Wait for all threads to join. While there are no pending slices at this
440 // point, the slices themselves may not be finished processing.
441 {
442 AutoLockMonitor lock(*this);
443 waitForWorkers(lock);
444 }
446 // Guard against errors in the self-hosted slice processing function. If
447 // we still have work at this point, it is the user function's fault.
448 MOZ_ASSERT(!hasWork(), "User function did not process all the slices!");
450 // Everything went swimmingly. Give yourself a pat on the back.
451 return TP_SUCCESS;
452 }
454 void
455 ThreadPool::abortJob()
456 {
457 for (uint32_t workerId = 0; workerId < numWorkers(); workerId++)
458 workers_[workerId]->discardSlices();
460 // Spin until pendingSlices_ reaches 0.
461 //
462 // The reason for this is that while calling discardSlices() clears all
463 // workers' bounds, the pendingSlices_ cache might still be > 0 due to
464 // still-executing calls to popSliceBack or popSliceFront in other
465 // threads. When those finish, we will be sure that !hasWork(), which is
466 // important to ensure that an aborted worker does not start again due to
467 // the thread pool having more work.
468 while (hasWork());
469 }