michael@0: /* michael@0: * Copyright 2012 Google Inc. michael@0: * michael@0: * Use of this source code is governed by a BSD-style license that can be michael@0: * found in the LICENSE file. michael@0: */ michael@0: michael@0: #ifndef SkThreadPool_DEFINED michael@0: #define SkThreadPool_DEFINED michael@0: michael@0: #include "SkCondVar.h" michael@0: #include "SkRunnable.h" michael@0: #include "SkTDArray.h" michael@0: #include "SkTInternalLList.h" michael@0: #include "SkThreadUtils.h" michael@0: #include "SkTypes.h" michael@0: michael@0: #if defined(SK_BUILD_FOR_UNIX) || defined(SK_BUILD_FOR_MAC) || defined(SK_BUILD_FOR_ANDROID) michael@0: # include michael@0: #endif michael@0: michael@0: // Returns the number of cores on this machine. michael@0: static inline int num_cores() { michael@0: #if defined(SK_BUILD_FOR_WIN32) michael@0: SYSTEM_INFO sysinfo; michael@0: GetSystemInfo(&sysinfo); michael@0: return sysinfo.dwNumberOfProcessors; michael@0: #elif defined(SK_BUILD_FOR_UNIX) || defined(SK_BUILD_FOR_MAC) || defined(SK_BUILD_FOR_ANDROID) michael@0: return sysconf(_SC_NPROCESSORS_ONLN); michael@0: #else michael@0: return 1; michael@0: #endif michael@0: } michael@0: michael@0: template michael@0: class SkTThreadPool { michael@0: public: michael@0: /** michael@0: * Create a threadpool with count threads, or one thread per core if kThreadPerCore. michael@0: */ michael@0: static const int kThreadPerCore = -1; michael@0: explicit SkTThreadPool(int count); michael@0: ~SkTThreadPool(); michael@0: michael@0: /** michael@0: * Queues up an SkRunnable to run when a thread is available, or synchronously if count is 0. michael@0: * Does not take ownership. NULL is a safe no-op. If T is not void, the runnable will be passed michael@0: * a reference to a T on the thread's local stack. michael@0: */ michael@0: void add(SkTRunnable*); michael@0: michael@0: /** michael@0: * Block until all added SkRunnables have completed. Once called, calling add() is undefined. michael@0: */ michael@0: void wait(); michael@0: michael@0: private: michael@0: struct LinkedRunnable { michael@0: SkTRunnable* fRunnable; // Unowned. michael@0: SK_DECLARE_INTERNAL_LLIST_INTERFACE(LinkedRunnable); michael@0: }; michael@0: michael@0: enum State { michael@0: kRunning_State, // Normal case. We've been constructed and no one has called wait(). michael@0: kWaiting_State, // wait has been called, but there still might be work to do or being done. michael@0: kHalting_State, // There's no work to do and no thread is busy. All threads can shut down. michael@0: }; michael@0: michael@0: SkTInternalLList fQueue; michael@0: SkCondVar fReady; michael@0: SkTDArray fThreads; michael@0: State fState; michael@0: int fBusyThreads; michael@0: michael@0: static void Loop(void*); // Static because we pass in this. michael@0: }; michael@0: michael@0: template michael@0: SkTThreadPool::SkTThreadPool(int count) : fState(kRunning_State), fBusyThreads(0) { michael@0: if (count < 0) { michael@0: count = num_cores(); michael@0: } michael@0: // Create count threads, all running SkTThreadPool::Loop. michael@0: for (int i = 0; i < count; i++) { michael@0: SkThread* thread = SkNEW_ARGS(SkThread, (&SkTThreadPool::Loop, this)); michael@0: *fThreads.append() = thread; michael@0: thread->start(); michael@0: } michael@0: } michael@0: michael@0: template michael@0: SkTThreadPool::~SkTThreadPool() { michael@0: if (kRunning_State == fState) { michael@0: this->wait(); michael@0: } michael@0: } michael@0: michael@0: namespace SkThreadPoolPrivate { michael@0: michael@0: template michael@0: struct ThreadLocal { michael@0: void run(SkTRunnable* r) { r->run(data); } michael@0: T data; michael@0: }; michael@0: michael@0: template <> michael@0: struct ThreadLocal { michael@0: void run(SkTRunnable* r) { r->run(); } michael@0: }; michael@0: michael@0: } // namespace SkThreadPoolPrivate michael@0: michael@0: template michael@0: void SkTThreadPool::add(SkTRunnable* r) { michael@0: if (r == NULL) { michael@0: return; michael@0: } michael@0: michael@0: if (fThreads.isEmpty()) { michael@0: SkThreadPoolPrivate::ThreadLocal threadLocal; michael@0: threadLocal.run(r); michael@0: return; michael@0: } michael@0: michael@0: LinkedRunnable* linkedRunnable = SkNEW(LinkedRunnable); michael@0: linkedRunnable->fRunnable = r; michael@0: fReady.lock(); michael@0: SkASSERT(fState != kHalting_State); // Shouldn't be able to add work when we're halting. michael@0: fQueue.addToHead(linkedRunnable); michael@0: fReady.signal(); michael@0: fReady.unlock(); michael@0: } michael@0: michael@0: michael@0: template michael@0: void SkTThreadPool::wait() { michael@0: fReady.lock(); michael@0: fState = kWaiting_State; michael@0: fReady.broadcast(); michael@0: fReady.unlock(); michael@0: michael@0: // Wait for all threads to stop. michael@0: for (int i = 0; i < fThreads.count(); i++) { michael@0: fThreads[i]->join(); michael@0: SkDELETE(fThreads[i]); michael@0: } michael@0: SkASSERT(fQueue.isEmpty()); michael@0: } michael@0: michael@0: template michael@0: /*static*/ void SkTThreadPool::Loop(void* arg) { michael@0: // The SkTThreadPool passes itself as arg to each thread as they're created. michael@0: SkTThreadPool* pool = static_cast*>(arg); michael@0: SkThreadPoolPrivate::ThreadLocal threadLocal; michael@0: michael@0: while (true) { michael@0: // We have to be holding the lock to read the queue and to call wait. michael@0: pool->fReady.lock(); michael@0: while(pool->fQueue.isEmpty()) { michael@0: // Does the client want to stop and are all the threads ready to stop? michael@0: // If so, we move into the halting state, and whack all the threads so they notice. michael@0: if (kWaiting_State == pool->fState && pool->fBusyThreads == 0) { michael@0: pool->fState = kHalting_State; michael@0: pool->fReady.broadcast(); michael@0: } michael@0: // Any time we find ourselves in the halting state, it's quitting time. michael@0: if (kHalting_State == pool->fState) { michael@0: pool->fReady.unlock(); michael@0: return; michael@0: } michael@0: // wait yields the lock while waiting, but will have it again when awoken. michael@0: pool->fReady.wait(); michael@0: } michael@0: // We've got the lock back here, no matter if we ran wait or not. michael@0: michael@0: // The queue is not empty, so we have something to run. Claim it. michael@0: LinkedRunnable* r = pool->fQueue.tail(); michael@0: michael@0: pool->fQueue.remove(r); michael@0: michael@0: // Having claimed our SkRunnable, we now give up the lock while we run it. michael@0: // Otherwise, we'd only ever do work on one thread at a time, which rather michael@0: // defeats the point of this code. michael@0: pool->fBusyThreads++; michael@0: pool->fReady.unlock(); michael@0: michael@0: // OK, now really do the work. michael@0: threadLocal.run(r->fRunnable); michael@0: SkDELETE(r); michael@0: michael@0: // Let everyone know we're not busy. michael@0: pool->fReady.lock(); michael@0: pool->fBusyThreads--; michael@0: pool->fReady.unlock(); michael@0: } michael@0: michael@0: SkASSERT(false); // Unreachable. The only exit happens when pool->fState is kHalting_State. michael@0: } michael@0: michael@0: typedef SkTThreadPool SkThreadPool; michael@0: michael@0: #endif