1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/js/src/vm/ThreadPool.cpp Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,469 @@ 1.4 +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 4 -*- 1.5 + * vim: set ts=8 sts=4 et sw=4 tw=99: 1.6 + * This Source Code Form is subject to the terms of the Mozilla Public 1.7 + * License, v. 2.0. If a copy of the MPL was not distributed with this 1.8 + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 1.9 + 1.10 +#include "vm/ThreadPool.h" 1.11 + 1.12 +#include "mozilla/Atomics.h" 1.13 + 1.14 +#include "jslock.h" 1.15 + 1.16 +#include "vm/ForkJoin.h" 1.17 +#include "vm/Monitor.h" 1.18 +#include "vm/Runtime.h" 1.19 + 1.20 +using namespace js; 1.21 + 1.22 +const size_t WORKER_THREAD_STACK_SIZE = 1*1024*1024; 1.23 + 1.24 +static inline uint32_t 1.25 +ComposeSliceBounds(uint16_t from, uint16_t to) 1.26 +{ 1.27 + MOZ_ASSERT(from <= to); 1.28 + return (uint32_t(from) << 16) | to; 1.29 +} 1.30 + 1.31 +static inline void 1.32 +DecomposeSliceBounds(uint32_t bounds, uint16_t *from, uint16_t *to) 1.33 +{ 1.34 + *from = bounds >> 16; 1.35 + *to = bounds & uint16_t(~0); 1.36 + MOZ_ASSERT(*from <= *to); 1.37 +} 1.38 + 1.39 +ThreadPoolWorker::ThreadPoolWorker(uint32_t workerId, uint32_t rngSeed, ThreadPool *pool) 1.40 + : workerId_(workerId), 1.41 + pool_(pool), 1.42 + sliceBounds_(0), 1.43 + state_(CREATED), 1.44 + schedulerRNGState_(rngSeed) 1.45 +{ } 1.46 + 1.47 +bool 1.48 +ThreadPoolWorker::hasWork() const 1.49 +{ 1.50 + uint16_t from, to; 1.51 + DecomposeSliceBounds(sliceBounds_, &from, &to); 1.52 + return from != to; 1.53 +} 1.54 + 1.55 +bool 1.56 +ThreadPoolWorker::popSliceFront(uint16_t *sliceId) 1.57 +{ 1.58 + uint32_t bounds; 1.59 + uint16_t from, to; 1.60 + do { 1.61 + bounds = sliceBounds_; 1.62 + DecomposeSliceBounds(bounds, &from, &to); 1.63 + if (from == to) 1.64 + return false; 1.65 + } while (!sliceBounds_.compareExchange(bounds, ComposeSliceBounds(from + 1, to))); 1.66 + 1.67 + *sliceId = from; 1.68 + pool_->pendingSlices_--; 1.69 + return true; 1.70 +} 1.71 + 1.72 +bool 1.73 +ThreadPoolWorker::popSliceBack(uint16_t *sliceId) 1.74 +{ 1.75 + uint32_t bounds; 1.76 + uint16_t from, to; 1.77 + do { 1.78 + bounds = sliceBounds_; 1.79 + DecomposeSliceBounds(bounds, &from, &to); 1.80 + if (from == to) 1.81 + return false; 1.82 + } while (!sliceBounds_.compareExchange(bounds, ComposeSliceBounds(from, to - 1))); 1.83 + 1.84 + *sliceId = to - 1; 1.85 + pool_->pendingSlices_--; 1.86 + return true; 1.87 +} 1.88 + 1.89 +void 1.90 +ThreadPoolWorker::discardSlices() 1.91 +{ 1.92 + uint32_t bounds; 1.93 + uint16_t from, to; 1.94 + do { 1.95 + bounds = sliceBounds_; 1.96 + DecomposeSliceBounds(bounds, &from, &to); 1.97 + } while (!sliceBounds_.compareExchange(bounds, 0)); 1.98 + 1.99 + pool_->pendingSlices_ -= to - from; 1.100 +} 1.101 + 1.102 +bool 1.103 +ThreadPoolWorker::stealFrom(ThreadPoolWorker *victim, uint16_t *sliceId) 1.104 +{ 1.105 + // Instead of popping the slice from the front by incrementing sliceStart_, 1.106 + // decrement sliceEnd_. Usually this gives us better locality. 1.107 + if (!victim->popSliceBack(sliceId)) 1.108 + return false; 1.109 +#ifdef DEBUG 1.110 + pool_->stolenSlices_++; 1.111 +#endif 1.112 + return true; 1.113 +} 1.114 + 1.115 +ThreadPoolWorker * 1.116 +ThreadPoolWorker::randomWorker() 1.117 +{ 1.118 + // Perform 32-bit xorshift. 1.119 + uint32_t x = schedulerRNGState_; 1.120 + x ^= x << XORSHIFT_A; 1.121 + x ^= x >> XORSHIFT_B; 1.122 + x ^= x << XORSHIFT_C; 1.123 + schedulerRNGState_ = x; 1.124 + return pool_->workers_[x % pool_->numWorkers()]; 1.125 +} 1.126 + 1.127 +bool 1.128 +ThreadPoolWorker::start() 1.129 +{ 1.130 +#ifndef JS_THREADSAFE 1.131 + return false; 1.132 +#else 1.133 + if (isMainThread()) 1.134 + return true; 1.135 + 1.136 + MOZ_ASSERT(state_ == CREATED); 1.137 + 1.138 + // Set state to active now, *before* the thread starts: 1.139 + state_ = ACTIVE; 1.140 + 1.141 + if (!PR_CreateThread(PR_USER_THREAD, 1.142 + HelperThreadMain, this, 1.143 + PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, 1.144 + PR_UNJOINABLE_THREAD, 1.145 + WORKER_THREAD_STACK_SIZE)) 1.146 + { 1.147 + // If the thread failed to start, call it TERMINATED. 1.148 + state_ = TERMINATED; 1.149 + return false; 1.150 + } 1.151 + 1.152 + return true; 1.153 +#endif 1.154 +} 1.155 + 1.156 +void 1.157 +ThreadPoolWorker::HelperThreadMain(void *arg) 1.158 +{ 1.159 + ThreadPoolWorker *worker = (ThreadPoolWorker*) arg; 1.160 + worker->helperLoop(); 1.161 +} 1.162 + 1.163 +void 1.164 +ThreadPoolWorker::helperLoop() 1.165 +{ 1.166 + MOZ_ASSERT(!isMainThread()); 1.167 + 1.168 + // This is hokey in the extreme. To compute the stack limit, 1.169 + // subtract the size of the stack from the address of a local 1.170 + // variable and give a 100k buffer. Is there a better way? 1.171 + // (Note: 2k proved to be fine on Mac, but too little on Linux) 1.172 + uintptr_t stackLimitOffset = WORKER_THREAD_STACK_SIZE - 100*1024; 1.173 + uintptr_t stackLimit = (((uintptr_t)&stackLimitOffset) + 1.174 + stackLimitOffset * JS_STACK_GROWTH_DIRECTION); 1.175 + 1.176 + 1.177 + for (;;) { 1.178 + // Wait for work to arrive or for us to terminate. 1.179 + { 1.180 + AutoLockMonitor lock(*pool_); 1.181 + while (state_ == ACTIVE && !pool_->hasWork()) 1.182 + lock.wait(); 1.183 + 1.184 + if (state_ == TERMINATED) { 1.185 + pool_->join(lock); 1.186 + return; 1.187 + } 1.188 + 1.189 + pool_->activeWorkers_++; 1.190 + } 1.191 + 1.192 + if (!pool_->job()->executeFromWorker(this, stackLimit)) 1.193 + pool_->abortJob(); 1.194 + 1.195 + // Join the pool. 1.196 + { 1.197 + AutoLockMonitor lock(*pool_); 1.198 + pool_->join(lock); 1.199 + } 1.200 + } 1.201 +} 1.202 + 1.203 +void 1.204 +ThreadPoolWorker::submitSlices(uint16_t sliceStart, uint16_t sliceEnd) 1.205 +{ 1.206 + MOZ_ASSERT(!hasWork()); 1.207 + sliceBounds_ = ComposeSliceBounds(sliceStart, sliceEnd); 1.208 +} 1.209 + 1.210 +bool 1.211 +ThreadPoolWorker::getSlice(ForkJoinContext *cx, uint16_t *sliceId) 1.212 +{ 1.213 + // First see whether we have any work ourself. 1.214 + if (popSliceFront(sliceId)) 1.215 + return true; 1.216 + 1.217 + // Try to steal work. 1.218 + if (!pool_->workStealing()) 1.219 + return false; 1.220 + 1.221 + do { 1.222 + if (!pool_->hasWork()) 1.223 + return false; 1.224 + } while (!stealFrom(randomWorker(), sliceId)); 1.225 + 1.226 + return true; 1.227 +} 1.228 + 1.229 +void 1.230 +ThreadPoolWorker::terminate(AutoLockMonitor &lock) 1.231 +{ 1.232 + MOZ_ASSERT(lock.isFor(*pool_)); 1.233 + MOZ_ASSERT(state_ != TERMINATED); 1.234 + state_ = TERMINATED; 1.235 +} 1.236 + 1.237 +///////////////////////////////////////////////////////////////////////////// 1.238 +// ThreadPool 1.239 +// 1.240 +// The |ThreadPool| starts up workers, submits work to them, and shuts 1.241 +// them down when requested. 1.242 + 1.243 +ThreadPool::ThreadPool(JSRuntime *rt) 1.244 + : activeWorkers_(0), 1.245 + joinBarrier_(nullptr), 1.246 + job_(nullptr), 1.247 +#ifdef DEBUG 1.248 + runtime_(rt), 1.249 + stolenSlices_(0), 1.250 +#endif 1.251 + pendingSlices_(0), 1.252 + isMainThreadActive_(false) 1.253 +{ } 1.254 + 1.255 +ThreadPool::~ThreadPool() 1.256 +{ 1.257 + terminateWorkers(); 1.258 +#ifdef JS_THREADSAFE 1.259 + if (joinBarrier_) 1.260 + PR_DestroyCondVar(joinBarrier_); 1.261 +#endif 1.262 +} 1.263 + 1.264 +bool 1.265 +ThreadPool::init() 1.266 +{ 1.267 +#ifdef JS_THREADSAFE 1.268 + if (!Monitor::init()) 1.269 + return false; 1.270 + joinBarrier_ = PR_NewCondVar(lock_); 1.271 + return !!joinBarrier_; 1.272 +#else 1.273 + return true; 1.274 +#endif 1.275 +} 1.276 + 1.277 +uint32_t 1.278 +ThreadPool::numWorkers() const 1.279 +{ 1.280 +#ifdef JS_THREADSAFE 1.281 + return WorkerThreadState().cpuCount; 1.282 +#else 1.283 + return 1; 1.284 +#endif 1.285 +} 1.286 + 1.287 +bool 1.288 +ThreadPool::workStealing() const 1.289 +{ 1.290 +#ifdef DEBUG 1.291 + if (char *stealEnv = getenv("JS_THREADPOOL_STEAL")) 1.292 + return !!strtol(stealEnv, nullptr, 10); 1.293 +#endif 1.294 + 1.295 + return true; 1.296 +} 1.297 + 1.298 +extern uint64_t random_next(uint64_t *, int); 1.299 + 1.300 +bool 1.301 +ThreadPool::lazyStartWorkers(JSContext *cx) 1.302 +{ 1.303 + // Starts the workers if they have not already been started. If 1.304 + // something goes wrong, reports an error and ensures that all 1.305 + // partially started threads are terminated. Therefore, upon exit 1.306 + // from this function, the workers array is either full (upon 1.307 + // success) or empty (upon failure). 1.308 + 1.309 +#ifdef JS_THREADSAFE 1.310 + if (!workers_.empty()) { 1.311 + MOZ_ASSERT(workers_.length() == numWorkers()); 1.312 + return true; 1.313 + } 1.314 + 1.315 + // Allocate workers array and then start the worker threads. 1.316 + // Note that numWorkers() is the number of *desired* workers, 1.317 + // but workers_.length() is the number of *successfully 1.318 + // initialized* workers. 1.319 + uint64_t rngState = 0; 1.320 + for (uint32_t workerId = 0; workerId < numWorkers(); workerId++) { 1.321 + uint32_t rngSeed = uint32_t(random_next(&rngState, 32)); 1.322 + ThreadPoolWorker *worker = cx->new_<ThreadPoolWorker>(workerId, rngSeed, this); 1.323 + if (!worker || !workers_.append(worker)) { 1.324 + terminateWorkersAndReportOOM(cx); 1.325 + return false; 1.326 + } 1.327 + } 1.328 + 1.329 + for (uint32_t workerId = 0; workerId < numWorkers(); workerId++) { 1.330 + if (!workers_[workerId]->start()) { 1.331 + // Note: do not delete worker here because it has been 1.332 + // added to the array and hence will be deleted by 1.333 + // |terminateWorkersAndReportOOM()|. 1.334 + terminateWorkersAndReportOOM(cx); 1.335 + return false; 1.336 + } 1.337 + } 1.338 +#endif 1.339 + 1.340 + return true; 1.341 +} 1.342 + 1.343 +void 1.344 +ThreadPool::terminateWorkersAndReportOOM(JSContext *cx) 1.345 +{ 1.346 + terminateWorkers(); 1.347 + MOZ_ASSERT(workers_.empty()); 1.348 + js_ReportOutOfMemory(cx); 1.349 +} 1.350 + 1.351 +void 1.352 +ThreadPool::terminateWorkers() 1.353 +{ 1.354 + if (workers_.length() > 0) { 1.355 + AutoLockMonitor lock(*this); 1.356 + 1.357 + // Signal to the workers they should quit. 1.358 + for (uint32_t i = 0; i < workers_.length(); i++) 1.359 + workers_[i]->terminate(lock); 1.360 + 1.361 + // Wake up all the workers. Set the number of active workers to the 1.362 + // current number of workers so we can make sure they all join. 1.363 + activeWorkers_ = workers_.length() - 1; 1.364 + lock.notifyAll(); 1.365 + 1.366 + // Wait for all workers to join. 1.367 + waitForWorkers(lock); 1.368 + 1.369 + while (workers_.length() > 0) 1.370 + js_delete(workers_.popCopy()); 1.371 + } 1.372 +} 1.373 + 1.374 +void 1.375 +ThreadPool::terminate() 1.376 +{ 1.377 + terminateWorkers(); 1.378 +} 1.379 + 1.380 +void 1.381 +ThreadPool::join(AutoLockMonitor &lock) 1.382 +{ 1.383 + MOZ_ASSERT(lock.isFor(*this)); 1.384 + if (--activeWorkers_ == 0) 1.385 + lock.notify(joinBarrier_); 1.386 +} 1.387 + 1.388 +void 1.389 +ThreadPool::waitForWorkers(AutoLockMonitor &lock) 1.390 +{ 1.391 + MOZ_ASSERT(lock.isFor(*this)); 1.392 + while (activeWorkers_ > 0) 1.393 + lock.wait(joinBarrier_); 1.394 + job_ = nullptr; 1.395 +} 1.396 + 1.397 +ParallelResult 1.398 +ThreadPool::executeJob(JSContext *cx, ParallelJob *job, uint16_t sliceStart, uint16_t sliceMax) 1.399 +{ 1.400 + MOZ_ASSERT(sliceStart < sliceMax); 1.401 + MOZ_ASSERT(CurrentThreadCanAccessRuntime(runtime_)); 1.402 + MOZ_ASSERT(activeWorkers_ == 0); 1.403 + MOZ_ASSERT(!hasWork()); 1.404 + 1.405 + if (!lazyStartWorkers(cx)) 1.406 + return TP_FATAL; 1.407 + 1.408 + // Evenly distribute slices to the workers. 1.409 + uint16_t numSlices = sliceMax - sliceStart; 1.410 + uint16_t slicesPerWorker = numSlices / numWorkers(); 1.411 + uint16_t leftover = numSlices % numWorkers(); 1.412 + uint16_t sliceEnd = sliceStart; 1.413 + for (uint32_t workerId = 0; workerId < numWorkers(); workerId++) { 1.414 + if (leftover > 0) { 1.415 + sliceEnd += slicesPerWorker + 1; 1.416 + leftover--; 1.417 + } else { 1.418 + sliceEnd += slicesPerWorker; 1.419 + } 1.420 + workers_[workerId]->submitSlices(sliceStart, sliceEnd); 1.421 + sliceStart = sliceEnd; 1.422 + } 1.423 + MOZ_ASSERT(leftover == 0); 1.424 + 1.425 + // Notify the worker threads that there's work now. 1.426 + { 1.427 + job_ = job; 1.428 + pendingSlices_ = numSlices; 1.429 +#ifdef DEBUG 1.430 + stolenSlices_ = 0; 1.431 +#endif 1.432 + AutoLockMonitor lock(*this); 1.433 + lock.notifyAll(); 1.434 + } 1.435 + 1.436 + // Do work on the main thread. 1.437 + isMainThreadActive_ = true; 1.438 + if (!job->executeFromMainThread(mainThreadWorker())) 1.439 + abortJob(); 1.440 + isMainThreadActive_ = false; 1.441 + 1.442 + // Wait for all threads to join. While there are no pending slices at this 1.443 + // point, the slices themselves may not be finished processing. 1.444 + { 1.445 + AutoLockMonitor lock(*this); 1.446 + waitForWorkers(lock); 1.447 + } 1.448 + 1.449 + // Guard against errors in the self-hosted slice processing function. If 1.450 + // we still have work at this point, it is the user function's fault. 1.451 + MOZ_ASSERT(!hasWork(), "User function did not process all the slices!"); 1.452 + 1.453 + // Everything went swimmingly. Give yourself a pat on the back. 1.454 + return TP_SUCCESS; 1.455 +} 1.456 + 1.457 +void 1.458 +ThreadPool::abortJob() 1.459 +{ 1.460 + for (uint32_t workerId = 0; workerId < numWorkers(); workerId++) 1.461 + workers_[workerId]->discardSlices(); 1.462 + 1.463 + // Spin until pendingSlices_ reaches 0. 1.464 + // 1.465 + // The reason for this is that while calling discardSlices() clears all 1.466 + // workers' bounds, the pendingSlices_ cache might still be > 0 due to 1.467 + // still-executing calls to popSliceBack or popSliceFront in other 1.468 + // threads. When those finish, we will be sure that !hasWork(), which is 1.469 + // important to ensure that an aborted worker does not start again due to 1.470 + // the thread pool having more work. 1.471 + while (hasWork()); 1.472 +}