michael@0: /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 4 -*- michael@0: * vim: set ts=8 sts=4 et sw=4 tw=99: michael@0: * This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: #include "vm/ThreadPool.h" michael@0: michael@0: #include "mozilla/Atomics.h" michael@0: michael@0: #include "jslock.h" michael@0: michael@0: #include "vm/ForkJoin.h" michael@0: #include "vm/Monitor.h" michael@0: #include "vm/Runtime.h" michael@0: michael@0: using namespace js; michael@0: michael@0: const size_t WORKER_THREAD_STACK_SIZE = 1*1024*1024; michael@0: michael@0: static inline uint32_t michael@0: ComposeSliceBounds(uint16_t from, uint16_t to) michael@0: { michael@0: MOZ_ASSERT(from <= to); michael@0: return (uint32_t(from) << 16) | to; michael@0: } michael@0: michael@0: static inline void michael@0: DecomposeSliceBounds(uint32_t bounds, uint16_t *from, uint16_t *to) michael@0: { michael@0: *from = bounds >> 16; michael@0: *to = bounds & uint16_t(~0); michael@0: MOZ_ASSERT(*from <= *to); michael@0: } michael@0: michael@0: ThreadPoolWorker::ThreadPoolWorker(uint32_t workerId, uint32_t rngSeed, ThreadPool *pool) michael@0: : workerId_(workerId), michael@0: pool_(pool), michael@0: sliceBounds_(0), michael@0: state_(CREATED), michael@0: schedulerRNGState_(rngSeed) michael@0: { } michael@0: michael@0: bool michael@0: ThreadPoolWorker::hasWork() const michael@0: { michael@0: uint16_t from, to; michael@0: DecomposeSliceBounds(sliceBounds_, &from, &to); michael@0: return from != to; michael@0: } michael@0: michael@0: bool michael@0: ThreadPoolWorker::popSliceFront(uint16_t *sliceId) michael@0: { michael@0: uint32_t bounds; michael@0: uint16_t from, to; michael@0: do { michael@0: bounds = sliceBounds_; michael@0: DecomposeSliceBounds(bounds, &from, &to); michael@0: if (from == to) michael@0: return false; michael@0: } while (!sliceBounds_.compareExchange(bounds, ComposeSliceBounds(from + 1, to))); michael@0: michael@0: *sliceId = from; michael@0: pool_->pendingSlices_--; michael@0: return true; michael@0: } michael@0: michael@0: bool michael@0: ThreadPoolWorker::popSliceBack(uint16_t *sliceId) michael@0: { michael@0: uint32_t bounds; michael@0: uint16_t from, to; michael@0: do { michael@0: bounds = sliceBounds_; michael@0: DecomposeSliceBounds(bounds, &from, &to); michael@0: if (from == to) michael@0: return false; michael@0: } while (!sliceBounds_.compareExchange(bounds, ComposeSliceBounds(from, to - 1))); michael@0: michael@0: *sliceId = to - 1; michael@0: pool_->pendingSlices_--; michael@0: return true; michael@0: } michael@0: michael@0: void michael@0: ThreadPoolWorker::discardSlices() michael@0: { michael@0: uint32_t bounds; michael@0: uint16_t from, to; michael@0: do { michael@0: bounds = sliceBounds_; michael@0: DecomposeSliceBounds(bounds, &from, &to); michael@0: } while (!sliceBounds_.compareExchange(bounds, 0)); michael@0: michael@0: pool_->pendingSlices_ -= to - from; michael@0: } michael@0: michael@0: bool michael@0: ThreadPoolWorker::stealFrom(ThreadPoolWorker *victim, uint16_t *sliceId) michael@0: { michael@0: // Instead of popping the slice from the front by incrementing sliceStart_, michael@0: // decrement sliceEnd_. Usually this gives us better locality. michael@0: if (!victim->popSliceBack(sliceId)) michael@0: return false; michael@0: #ifdef DEBUG michael@0: pool_->stolenSlices_++; michael@0: #endif michael@0: return true; michael@0: } michael@0: michael@0: ThreadPoolWorker * michael@0: ThreadPoolWorker::randomWorker() michael@0: { michael@0: // Perform 32-bit xorshift. michael@0: uint32_t x = schedulerRNGState_; michael@0: x ^= x << XORSHIFT_A; michael@0: x ^= x >> XORSHIFT_B; michael@0: x ^= x << XORSHIFT_C; michael@0: schedulerRNGState_ = x; michael@0: return pool_->workers_[x % pool_->numWorkers()]; michael@0: } michael@0: michael@0: bool michael@0: ThreadPoolWorker::start() michael@0: { michael@0: #ifndef JS_THREADSAFE michael@0: return false; michael@0: #else michael@0: if (isMainThread()) michael@0: return true; michael@0: michael@0: MOZ_ASSERT(state_ == CREATED); michael@0: michael@0: // Set state to active now, *before* the thread starts: michael@0: state_ = ACTIVE; michael@0: michael@0: if (!PR_CreateThread(PR_USER_THREAD, michael@0: HelperThreadMain, this, michael@0: PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, michael@0: PR_UNJOINABLE_THREAD, michael@0: WORKER_THREAD_STACK_SIZE)) michael@0: { michael@0: // If the thread failed to start, call it TERMINATED. michael@0: state_ = TERMINATED; michael@0: return false; michael@0: } michael@0: michael@0: return true; michael@0: #endif michael@0: } michael@0: michael@0: void michael@0: ThreadPoolWorker::HelperThreadMain(void *arg) michael@0: { michael@0: ThreadPoolWorker *worker = (ThreadPoolWorker*) arg; michael@0: worker->helperLoop(); michael@0: } michael@0: michael@0: void michael@0: ThreadPoolWorker::helperLoop() michael@0: { michael@0: MOZ_ASSERT(!isMainThread()); michael@0: michael@0: // This is hokey in the extreme. To compute the stack limit, michael@0: // subtract the size of the stack from the address of a local michael@0: // variable and give a 100k buffer. Is there a better way? michael@0: // (Note: 2k proved to be fine on Mac, but too little on Linux) michael@0: uintptr_t stackLimitOffset = WORKER_THREAD_STACK_SIZE - 100*1024; michael@0: uintptr_t stackLimit = (((uintptr_t)&stackLimitOffset) + michael@0: stackLimitOffset * JS_STACK_GROWTH_DIRECTION); michael@0: michael@0: michael@0: for (;;) { michael@0: // Wait for work to arrive or for us to terminate. michael@0: { michael@0: AutoLockMonitor lock(*pool_); michael@0: while (state_ == ACTIVE && !pool_->hasWork()) michael@0: lock.wait(); michael@0: michael@0: if (state_ == TERMINATED) { michael@0: pool_->join(lock); michael@0: return; michael@0: } michael@0: michael@0: pool_->activeWorkers_++; michael@0: } michael@0: michael@0: if (!pool_->job()->executeFromWorker(this, stackLimit)) michael@0: pool_->abortJob(); michael@0: michael@0: // Join the pool. michael@0: { michael@0: AutoLockMonitor lock(*pool_); michael@0: pool_->join(lock); michael@0: } michael@0: } michael@0: } michael@0: michael@0: void michael@0: ThreadPoolWorker::submitSlices(uint16_t sliceStart, uint16_t sliceEnd) michael@0: { michael@0: MOZ_ASSERT(!hasWork()); michael@0: sliceBounds_ = ComposeSliceBounds(sliceStart, sliceEnd); michael@0: } michael@0: michael@0: bool michael@0: ThreadPoolWorker::getSlice(ForkJoinContext *cx, uint16_t *sliceId) michael@0: { michael@0: // First see whether we have any work ourself. michael@0: if (popSliceFront(sliceId)) michael@0: return true; michael@0: michael@0: // Try to steal work. michael@0: if (!pool_->workStealing()) michael@0: return false; michael@0: michael@0: do { michael@0: if (!pool_->hasWork()) michael@0: return false; michael@0: } while (!stealFrom(randomWorker(), sliceId)); michael@0: michael@0: return true; michael@0: } michael@0: michael@0: void michael@0: ThreadPoolWorker::terminate(AutoLockMonitor &lock) michael@0: { michael@0: MOZ_ASSERT(lock.isFor(*pool_)); michael@0: MOZ_ASSERT(state_ != TERMINATED); michael@0: state_ = TERMINATED; michael@0: } michael@0: michael@0: ///////////////////////////////////////////////////////////////////////////// michael@0: // ThreadPool michael@0: // michael@0: // The |ThreadPool| starts up workers, submits work to them, and shuts michael@0: // them down when requested. michael@0: michael@0: ThreadPool::ThreadPool(JSRuntime *rt) michael@0: : activeWorkers_(0), michael@0: joinBarrier_(nullptr), michael@0: job_(nullptr), michael@0: #ifdef DEBUG michael@0: runtime_(rt), michael@0: stolenSlices_(0), michael@0: #endif michael@0: pendingSlices_(0), michael@0: isMainThreadActive_(false) michael@0: { } michael@0: michael@0: ThreadPool::~ThreadPool() michael@0: { michael@0: terminateWorkers(); michael@0: #ifdef JS_THREADSAFE michael@0: if (joinBarrier_) michael@0: PR_DestroyCondVar(joinBarrier_); michael@0: #endif michael@0: } michael@0: michael@0: bool michael@0: ThreadPool::init() michael@0: { michael@0: #ifdef JS_THREADSAFE michael@0: if (!Monitor::init()) michael@0: return false; michael@0: joinBarrier_ = PR_NewCondVar(lock_); michael@0: return !!joinBarrier_; michael@0: #else michael@0: return true; michael@0: #endif michael@0: } michael@0: michael@0: uint32_t michael@0: ThreadPool::numWorkers() const michael@0: { michael@0: #ifdef JS_THREADSAFE michael@0: return WorkerThreadState().cpuCount; michael@0: #else michael@0: return 1; michael@0: #endif michael@0: } michael@0: michael@0: bool michael@0: ThreadPool::workStealing() const michael@0: { michael@0: #ifdef DEBUG michael@0: if (char *stealEnv = getenv("JS_THREADPOOL_STEAL")) michael@0: return !!strtol(stealEnv, nullptr, 10); michael@0: #endif michael@0: michael@0: return true; michael@0: } michael@0: michael@0: extern uint64_t random_next(uint64_t *, int); michael@0: michael@0: bool michael@0: ThreadPool::lazyStartWorkers(JSContext *cx) michael@0: { michael@0: // Starts the workers if they have not already been started. If michael@0: // something goes wrong, reports an error and ensures that all michael@0: // partially started threads are terminated. Therefore, upon exit michael@0: // from this function, the workers array is either full (upon michael@0: // success) or empty (upon failure). michael@0: michael@0: #ifdef JS_THREADSAFE michael@0: if (!workers_.empty()) { michael@0: MOZ_ASSERT(workers_.length() == numWorkers()); michael@0: return true; michael@0: } michael@0: michael@0: // Allocate workers array and then start the worker threads. michael@0: // Note that numWorkers() is the number of *desired* workers, michael@0: // but workers_.length() is the number of *successfully michael@0: // initialized* workers. michael@0: uint64_t rngState = 0; michael@0: for (uint32_t workerId = 0; workerId < numWorkers(); workerId++) { michael@0: uint32_t rngSeed = uint32_t(random_next(&rngState, 32)); michael@0: ThreadPoolWorker *worker = cx->new_(workerId, rngSeed, this); michael@0: if (!worker || !workers_.append(worker)) { michael@0: terminateWorkersAndReportOOM(cx); michael@0: return false; michael@0: } michael@0: } michael@0: michael@0: for (uint32_t workerId = 0; workerId < numWorkers(); workerId++) { michael@0: if (!workers_[workerId]->start()) { michael@0: // Note: do not delete worker here because it has been michael@0: // added to the array and hence will be deleted by michael@0: // |terminateWorkersAndReportOOM()|. michael@0: terminateWorkersAndReportOOM(cx); michael@0: return false; michael@0: } michael@0: } michael@0: #endif michael@0: michael@0: return true; michael@0: } michael@0: michael@0: void michael@0: ThreadPool::terminateWorkersAndReportOOM(JSContext *cx) michael@0: { michael@0: terminateWorkers(); michael@0: MOZ_ASSERT(workers_.empty()); michael@0: js_ReportOutOfMemory(cx); michael@0: } michael@0: michael@0: void michael@0: ThreadPool::terminateWorkers() michael@0: { michael@0: if (workers_.length() > 0) { michael@0: AutoLockMonitor lock(*this); michael@0: michael@0: // Signal to the workers they should quit. michael@0: for (uint32_t i = 0; i < workers_.length(); i++) michael@0: workers_[i]->terminate(lock); michael@0: michael@0: // Wake up all the workers. Set the number of active workers to the michael@0: // current number of workers so we can make sure they all join. michael@0: activeWorkers_ = workers_.length() - 1; michael@0: lock.notifyAll(); michael@0: michael@0: // Wait for all workers to join. michael@0: waitForWorkers(lock); michael@0: michael@0: while (workers_.length() > 0) michael@0: js_delete(workers_.popCopy()); michael@0: } michael@0: } michael@0: michael@0: void michael@0: ThreadPool::terminate() michael@0: { michael@0: terminateWorkers(); michael@0: } michael@0: michael@0: void michael@0: ThreadPool::join(AutoLockMonitor &lock) michael@0: { michael@0: MOZ_ASSERT(lock.isFor(*this)); michael@0: if (--activeWorkers_ == 0) michael@0: lock.notify(joinBarrier_); michael@0: } michael@0: michael@0: void michael@0: ThreadPool::waitForWorkers(AutoLockMonitor &lock) michael@0: { michael@0: MOZ_ASSERT(lock.isFor(*this)); michael@0: while (activeWorkers_ > 0) michael@0: lock.wait(joinBarrier_); michael@0: job_ = nullptr; michael@0: } michael@0: michael@0: ParallelResult michael@0: ThreadPool::executeJob(JSContext *cx, ParallelJob *job, uint16_t sliceStart, uint16_t sliceMax) michael@0: { michael@0: MOZ_ASSERT(sliceStart < sliceMax); michael@0: MOZ_ASSERT(CurrentThreadCanAccessRuntime(runtime_)); michael@0: MOZ_ASSERT(activeWorkers_ == 0); michael@0: MOZ_ASSERT(!hasWork()); michael@0: michael@0: if (!lazyStartWorkers(cx)) michael@0: return TP_FATAL; michael@0: michael@0: // Evenly distribute slices to the workers. michael@0: uint16_t numSlices = sliceMax - sliceStart; michael@0: uint16_t slicesPerWorker = numSlices / numWorkers(); michael@0: uint16_t leftover = numSlices % numWorkers(); michael@0: uint16_t sliceEnd = sliceStart; michael@0: for (uint32_t workerId = 0; workerId < numWorkers(); workerId++) { michael@0: if (leftover > 0) { michael@0: sliceEnd += slicesPerWorker + 1; michael@0: leftover--; michael@0: } else { michael@0: sliceEnd += slicesPerWorker; michael@0: } michael@0: workers_[workerId]->submitSlices(sliceStart, sliceEnd); michael@0: sliceStart = sliceEnd; michael@0: } michael@0: MOZ_ASSERT(leftover == 0); michael@0: michael@0: // Notify the worker threads that there's work now. michael@0: { michael@0: job_ = job; michael@0: pendingSlices_ = numSlices; michael@0: #ifdef DEBUG michael@0: stolenSlices_ = 0; michael@0: #endif michael@0: AutoLockMonitor lock(*this); michael@0: lock.notifyAll(); michael@0: } michael@0: michael@0: // Do work on the main thread. michael@0: isMainThreadActive_ = true; michael@0: if (!job->executeFromMainThread(mainThreadWorker())) michael@0: abortJob(); michael@0: isMainThreadActive_ = false; michael@0: michael@0: // Wait for all threads to join. While there are no pending slices at this michael@0: // point, the slices themselves may not be finished processing. michael@0: { michael@0: AutoLockMonitor lock(*this); michael@0: waitForWorkers(lock); michael@0: } michael@0: michael@0: // Guard against errors in the self-hosted slice processing function. If michael@0: // we still have work at this point, it is the user function's fault. michael@0: MOZ_ASSERT(!hasWork(), "User function did not process all the slices!"); michael@0: michael@0: // Everything went swimmingly. Give yourself a pat on the back. michael@0: return TP_SUCCESS; michael@0: } michael@0: michael@0: void michael@0: ThreadPool::abortJob() michael@0: { michael@0: for (uint32_t workerId = 0; workerId < numWorkers(); workerId++) michael@0: workers_[workerId]->discardSlices(); michael@0: michael@0: // Spin until pendingSlices_ reaches 0. michael@0: // michael@0: // The reason for this is that while calling discardSlices() clears all michael@0: // workers' bounds, the pendingSlices_ cache might still be > 0 due to michael@0: // still-executing calls to popSliceBack or popSliceFront in other michael@0: // threads. When those finish, we will be sure that !hasWork(), which is michael@0: // important to ensure that an aborted worker does not start again due to michael@0: // the thread pool having more work. michael@0: while (hasWork()); michael@0: }