js/src/vm/ThreadPool.cpp

Wed, 31 Dec 2014 06:09:35 +0100

author
Michael Schloh von Bennewitz <michael@schloh.com>
date
Wed, 31 Dec 2014 06:09:35 +0100
changeset 0
6474c204b198
permissions
-rw-r--r--

Cloned upstream origin tor-browser at tor-browser-31.3.0esr-4.5-1-build1
revision ID fc1c9ff7c1b2defdbc039f12214767608f46423f for hacking purpose.

     1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 4 -*-
     2  * vim: set ts=8 sts=4 et sw=4 tw=99:
     3  * This Source Code Form is subject to the terms of the Mozilla Public
     4  * License, v. 2.0. If a copy of the MPL was not distributed with this
     5  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
     7 #include "vm/ThreadPool.h"
     9 #include "mozilla/Atomics.h"
    11 #include "jslock.h"
    13 #include "vm/ForkJoin.h"
    14 #include "vm/Monitor.h"
    15 #include "vm/Runtime.h"
    17 using namespace js;
    19 const size_t WORKER_THREAD_STACK_SIZE = 1*1024*1024;
    21 static inline uint32_t
    22 ComposeSliceBounds(uint16_t from, uint16_t to)
    23 {
    24     MOZ_ASSERT(from <= to);
    25     return (uint32_t(from) << 16) | to;
    26 }
    28 static inline void
    29 DecomposeSliceBounds(uint32_t bounds, uint16_t *from, uint16_t *to)
    30 {
    31     *from = bounds >> 16;
    32     *to = bounds & uint16_t(~0);
    33     MOZ_ASSERT(*from <= *to);
    34 }
    36 ThreadPoolWorker::ThreadPoolWorker(uint32_t workerId, uint32_t rngSeed, ThreadPool *pool)
    37   : workerId_(workerId),
    38     pool_(pool),
    39     sliceBounds_(0),
    40     state_(CREATED),
    41     schedulerRNGState_(rngSeed)
    42 { }
    44 bool
    45 ThreadPoolWorker::hasWork() const
    46 {
    47     uint16_t from, to;
    48     DecomposeSliceBounds(sliceBounds_, &from, &to);
    49     return from != to;
    50 }
    52 bool
    53 ThreadPoolWorker::popSliceFront(uint16_t *sliceId)
    54 {
    55     uint32_t bounds;
    56     uint16_t from, to;
    57     do {
    58         bounds = sliceBounds_;
    59         DecomposeSliceBounds(bounds, &from, &to);
    60         if (from == to)
    61             return false;
    62     } while (!sliceBounds_.compareExchange(bounds, ComposeSliceBounds(from + 1, to)));
    64     *sliceId = from;
    65     pool_->pendingSlices_--;
    66     return true;
    67 }
    69 bool
    70 ThreadPoolWorker::popSliceBack(uint16_t *sliceId)
    71 {
    72     uint32_t bounds;
    73     uint16_t from, to;
    74     do {
    75         bounds = sliceBounds_;
    76         DecomposeSliceBounds(bounds, &from, &to);
    77         if (from == to)
    78             return false;
    79     } while (!sliceBounds_.compareExchange(bounds, ComposeSliceBounds(from, to - 1)));
    81     *sliceId = to - 1;
    82     pool_->pendingSlices_--;
    83     return true;
    84 }
    86 void
    87 ThreadPoolWorker::discardSlices()
    88 {
    89     uint32_t bounds;
    90     uint16_t from, to;
    91     do {
    92         bounds = sliceBounds_;
    93         DecomposeSliceBounds(bounds, &from, &to);
    94     } while (!sliceBounds_.compareExchange(bounds, 0));
    96     pool_->pendingSlices_ -= to - from;
    97 }
    99 bool
   100 ThreadPoolWorker::stealFrom(ThreadPoolWorker *victim, uint16_t *sliceId)
   101 {
   102     // Instead of popping the slice from the front by incrementing sliceStart_,
   103     // decrement sliceEnd_. Usually this gives us better locality.
   104     if (!victim->popSliceBack(sliceId))
   105         return false;
   106 #ifdef DEBUG
   107     pool_->stolenSlices_++;
   108 #endif
   109     return true;
   110 }
   112 ThreadPoolWorker *
   113 ThreadPoolWorker::randomWorker()
   114 {
   115     // Perform 32-bit xorshift.
   116     uint32_t x = schedulerRNGState_;
   117     x ^= x << XORSHIFT_A;
   118     x ^= x >> XORSHIFT_B;
   119     x ^= x << XORSHIFT_C;
   120     schedulerRNGState_ = x;
   121     return pool_->workers_[x % pool_->numWorkers()];
   122 }
   124 bool
   125 ThreadPoolWorker::start()
   126 {
   127 #ifndef JS_THREADSAFE
   128     return false;
   129 #else
   130     if (isMainThread())
   131         return true;
   133     MOZ_ASSERT(state_ == CREATED);
   135     // Set state to active now, *before* the thread starts:
   136     state_ = ACTIVE;
   138     if (!PR_CreateThread(PR_USER_THREAD,
   139                          HelperThreadMain, this,
   140                          PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
   141                          PR_UNJOINABLE_THREAD,
   142                          WORKER_THREAD_STACK_SIZE))
   143     {
   144         // If the thread failed to start, call it TERMINATED.
   145         state_ = TERMINATED;
   146         return false;
   147     }
   149     return true;
   150 #endif
   151 }
   153 void
   154 ThreadPoolWorker::HelperThreadMain(void *arg)
   155 {
   156     ThreadPoolWorker *worker = (ThreadPoolWorker*) arg;
   157     worker->helperLoop();
   158 }
   160 void
   161 ThreadPoolWorker::helperLoop()
   162 {
   163     MOZ_ASSERT(!isMainThread());
   165     // This is hokey in the extreme.  To compute the stack limit,
   166     // subtract the size of the stack from the address of a local
   167     // variable and give a 100k buffer.  Is there a better way?
   168     // (Note: 2k proved to be fine on Mac, but too little on Linux)
   169     uintptr_t stackLimitOffset = WORKER_THREAD_STACK_SIZE - 100*1024;
   170     uintptr_t stackLimit = (((uintptr_t)&stackLimitOffset) +
   171                              stackLimitOffset * JS_STACK_GROWTH_DIRECTION);
   174     for (;;) {
   175         // Wait for work to arrive or for us to terminate.
   176         {
   177             AutoLockMonitor lock(*pool_);
   178             while (state_ == ACTIVE && !pool_->hasWork())
   179                 lock.wait();
   181             if (state_ == TERMINATED) {
   182                 pool_->join(lock);
   183                 return;
   184             }
   186             pool_->activeWorkers_++;
   187         }
   189         if (!pool_->job()->executeFromWorker(this, stackLimit))
   190             pool_->abortJob();
   192         // Join the pool.
   193         {
   194             AutoLockMonitor lock(*pool_);
   195             pool_->join(lock);
   196         }
   197     }
   198 }
   200 void
   201 ThreadPoolWorker::submitSlices(uint16_t sliceStart, uint16_t sliceEnd)
   202 {
   203     MOZ_ASSERT(!hasWork());
   204     sliceBounds_ = ComposeSliceBounds(sliceStart, sliceEnd);
   205 }
   207 bool
   208 ThreadPoolWorker::getSlice(ForkJoinContext *cx, uint16_t *sliceId)
   209 {
   210     // First see whether we have any work ourself.
   211     if (popSliceFront(sliceId))
   212         return true;
   214     // Try to steal work.
   215     if (!pool_->workStealing())
   216         return false;
   218     do {
   219         if (!pool_->hasWork())
   220             return false;
   221     } while (!stealFrom(randomWorker(), sliceId));
   223     return true;
   224 }
   226 void
   227 ThreadPoolWorker::terminate(AutoLockMonitor &lock)
   228 {
   229     MOZ_ASSERT(lock.isFor(*pool_));
   230     MOZ_ASSERT(state_ != TERMINATED);
   231     state_ = TERMINATED;
   232 }
   234 /////////////////////////////////////////////////////////////////////////////
   235 // ThreadPool
   236 //
   237 // The |ThreadPool| starts up workers, submits work to them, and shuts
   238 // them down when requested.
   240 ThreadPool::ThreadPool(JSRuntime *rt)
   241   : activeWorkers_(0),
   242     joinBarrier_(nullptr),
   243     job_(nullptr),
   244 #ifdef DEBUG
   245     runtime_(rt),
   246     stolenSlices_(0),
   247 #endif
   248     pendingSlices_(0),
   249     isMainThreadActive_(false)
   250 { }
   252 ThreadPool::~ThreadPool()
   253 {
   254     terminateWorkers();
   255 #ifdef JS_THREADSAFE
   256     if (joinBarrier_)
   257         PR_DestroyCondVar(joinBarrier_);
   258 #endif
   259 }
   261 bool
   262 ThreadPool::init()
   263 {
   264 #ifdef JS_THREADSAFE
   265     if (!Monitor::init())
   266         return false;
   267     joinBarrier_ = PR_NewCondVar(lock_);
   268     return !!joinBarrier_;
   269 #else
   270     return true;
   271 #endif
   272 }
   274 uint32_t
   275 ThreadPool::numWorkers() const
   276 {
   277 #ifdef JS_THREADSAFE
   278     return WorkerThreadState().cpuCount;
   279 #else
   280     return 1;
   281 #endif
   282 }
   284 bool
   285 ThreadPool::workStealing() const
   286 {
   287 #ifdef DEBUG
   288     if (char *stealEnv = getenv("JS_THREADPOOL_STEAL"))
   289         return !!strtol(stealEnv, nullptr, 10);
   290 #endif
   292     return true;
   293 }
   295 extern uint64_t random_next(uint64_t *, int);
   297 bool
   298 ThreadPool::lazyStartWorkers(JSContext *cx)
   299 {
   300     // Starts the workers if they have not already been started.  If
   301     // something goes wrong, reports an error and ensures that all
   302     // partially started threads are terminated.  Therefore, upon exit
   303     // from this function, the workers array is either full (upon
   304     // success) or empty (upon failure).
   306 #ifdef JS_THREADSAFE
   307     if (!workers_.empty()) {
   308         MOZ_ASSERT(workers_.length() == numWorkers());
   309         return true;
   310     }
   312     // Allocate workers array and then start the worker threads.
   313     // Note that numWorkers() is the number of *desired* workers,
   314     // but workers_.length() is the number of *successfully
   315     // initialized* workers.
   316     uint64_t rngState = 0;
   317     for (uint32_t workerId = 0; workerId < numWorkers(); workerId++) {
   318         uint32_t rngSeed = uint32_t(random_next(&rngState, 32));
   319         ThreadPoolWorker *worker = cx->new_<ThreadPoolWorker>(workerId, rngSeed, this);
   320         if (!worker || !workers_.append(worker)) {
   321             terminateWorkersAndReportOOM(cx);
   322             return false;
   323         }
   324     }
   326     for (uint32_t workerId = 0; workerId < numWorkers(); workerId++) {
   327         if (!workers_[workerId]->start()) {
   328             // Note: do not delete worker here because it has been
   329             // added to the array and hence will be deleted by
   330             // |terminateWorkersAndReportOOM()|.
   331             terminateWorkersAndReportOOM(cx);
   332             return false;
   333         }
   334     }
   335 #endif
   337     return true;
   338 }
   340 void
   341 ThreadPool::terminateWorkersAndReportOOM(JSContext *cx)
   342 {
   343     terminateWorkers();
   344     MOZ_ASSERT(workers_.empty());
   345     js_ReportOutOfMemory(cx);
   346 }
   348 void
   349 ThreadPool::terminateWorkers()
   350 {
   351     if (workers_.length() > 0) {
   352         AutoLockMonitor lock(*this);
   354         // Signal to the workers they should quit.
   355         for (uint32_t i = 0; i < workers_.length(); i++)
   356             workers_[i]->terminate(lock);
   358         // Wake up all the workers. Set the number of active workers to the
   359         // current number of workers so we can make sure they all join.
   360         activeWorkers_ = workers_.length() - 1;
   361         lock.notifyAll();
   363         // Wait for all workers to join.
   364         waitForWorkers(lock);
   366         while (workers_.length() > 0)
   367             js_delete(workers_.popCopy());
   368     }
   369 }
   371 void
   372 ThreadPool::terminate()
   373 {
   374     terminateWorkers();
   375 }
   377 void
   378 ThreadPool::join(AutoLockMonitor &lock)
   379 {
   380     MOZ_ASSERT(lock.isFor(*this));
   381     if (--activeWorkers_ == 0)
   382         lock.notify(joinBarrier_);
   383 }
   385 void
   386 ThreadPool::waitForWorkers(AutoLockMonitor &lock)
   387 {
   388     MOZ_ASSERT(lock.isFor(*this));
   389     while (activeWorkers_ > 0)
   390         lock.wait(joinBarrier_);
   391     job_ = nullptr;
   392 }
   394 ParallelResult
   395 ThreadPool::executeJob(JSContext *cx, ParallelJob *job, uint16_t sliceStart, uint16_t sliceMax)
   396 {
   397     MOZ_ASSERT(sliceStart < sliceMax);
   398     MOZ_ASSERT(CurrentThreadCanAccessRuntime(runtime_));
   399     MOZ_ASSERT(activeWorkers_ == 0);
   400     MOZ_ASSERT(!hasWork());
   402     if (!lazyStartWorkers(cx))
   403         return TP_FATAL;
   405     // Evenly distribute slices to the workers.
   406     uint16_t numSlices = sliceMax - sliceStart;
   407     uint16_t slicesPerWorker = numSlices / numWorkers();
   408     uint16_t leftover = numSlices % numWorkers();
   409     uint16_t sliceEnd = sliceStart;
   410     for (uint32_t workerId = 0; workerId < numWorkers(); workerId++) {
   411         if (leftover > 0) {
   412             sliceEnd += slicesPerWorker + 1;
   413             leftover--;
   414         } else {
   415             sliceEnd += slicesPerWorker;
   416         }
   417         workers_[workerId]->submitSlices(sliceStart, sliceEnd);
   418         sliceStart = sliceEnd;
   419     }
   420     MOZ_ASSERT(leftover == 0);
   422     // Notify the worker threads that there's work now.
   423     {
   424         job_ = job;
   425         pendingSlices_ = numSlices;
   426 #ifdef DEBUG
   427         stolenSlices_ = 0;
   428 #endif
   429         AutoLockMonitor lock(*this);
   430         lock.notifyAll();
   431     }
   433     // Do work on the main thread.
   434     isMainThreadActive_ = true;
   435     if (!job->executeFromMainThread(mainThreadWorker()))
   436         abortJob();
   437     isMainThreadActive_ = false;
   439     // Wait for all threads to join. While there are no pending slices at this
   440     // point, the slices themselves may not be finished processing.
   441     {
   442         AutoLockMonitor lock(*this);
   443         waitForWorkers(lock);
   444     }
   446     // Guard against errors in the self-hosted slice processing function. If
   447     // we still have work at this point, it is the user function's fault.
   448     MOZ_ASSERT(!hasWork(), "User function did not process all the slices!");
   450     // Everything went swimmingly. Give yourself a pat on the back.
   451     return TP_SUCCESS;
   452 }
   454 void
   455 ThreadPool::abortJob()
   456 {
   457     for (uint32_t workerId = 0; workerId < numWorkers(); workerId++)
   458         workers_[workerId]->discardSlices();
   460     // Spin until pendingSlices_ reaches 0.
   461     //
   462     // The reason for this is that while calling discardSlices() clears all
   463     // workers' bounds, the pendingSlices_ cache might still be > 0 due to
   464     // still-executing calls to popSliceBack or popSliceFront in other
   465     // threads. When those finish, we will be sure that !hasWork(), which is
   466     // important to ensure that an aborted worker does not start again due to
   467     // the thread pool having more work.
   468     while (hasWork());
   469 }

mercurial