|
1 /* |
|
2 * Copyright 2012 Google Inc. |
|
3 * |
|
4 * Use of this source code is governed by a BSD-style license that can be |
|
5 * found in the LICENSE file. |
|
6 */ |
|
7 |
|
8 #ifndef SkThreadPool_DEFINED |
|
9 #define SkThreadPool_DEFINED |
|
10 |
|
11 #include "SkCondVar.h" |
|
12 #include "SkRunnable.h" |
|
13 #include "SkTDArray.h" |
|
14 #include "SkTInternalLList.h" |
|
15 #include "SkThreadUtils.h" |
|
16 #include "SkTypes.h" |
|
17 |
|
18 #if defined(SK_BUILD_FOR_UNIX) || defined(SK_BUILD_FOR_MAC) || defined(SK_BUILD_FOR_ANDROID) |
|
19 # include <unistd.h> |
|
20 #endif |
|
21 |
|
22 // Returns the number of cores on this machine. |
|
23 static inline int num_cores() { |
|
24 #if defined(SK_BUILD_FOR_WIN32) |
|
25 SYSTEM_INFO sysinfo; |
|
26 GetSystemInfo(&sysinfo); |
|
27 return sysinfo.dwNumberOfProcessors; |
|
28 #elif defined(SK_BUILD_FOR_UNIX) || defined(SK_BUILD_FOR_MAC) || defined(SK_BUILD_FOR_ANDROID) |
|
29 return sysconf(_SC_NPROCESSORS_ONLN); |
|
30 #else |
|
31 return 1; |
|
32 #endif |
|
33 } |
|
34 |
|
35 template <typename T> |
|
36 class SkTThreadPool { |
|
37 public: |
|
38 /** |
|
39 * Create a threadpool with count threads, or one thread per core if kThreadPerCore. |
|
40 */ |
|
41 static const int kThreadPerCore = -1; |
|
42 explicit SkTThreadPool(int count); |
|
43 ~SkTThreadPool(); |
|
44 |
|
45 /** |
|
46 * Queues up an SkRunnable to run when a thread is available, or synchronously if count is 0. |
|
47 * Does not take ownership. NULL is a safe no-op. If T is not void, the runnable will be passed |
|
48 * a reference to a T on the thread's local stack. |
|
49 */ |
|
50 void add(SkTRunnable<T>*); |
|
51 |
|
52 /** |
|
53 * Block until all added SkRunnables have completed. Once called, calling add() is undefined. |
|
54 */ |
|
55 void wait(); |
|
56 |
|
57 private: |
|
58 struct LinkedRunnable { |
|
59 SkTRunnable<T>* fRunnable; // Unowned. |
|
60 SK_DECLARE_INTERNAL_LLIST_INTERFACE(LinkedRunnable); |
|
61 }; |
|
62 |
|
63 enum State { |
|
64 kRunning_State, // Normal case. We've been constructed and no one has called wait(). |
|
65 kWaiting_State, // wait has been called, but there still might be work to do or being done. |
|
66 kHalting_State, // There's no work to do and no thread is busy. All threads can shut down. |
|
67 }; |
|
68 |
|
69 SkTInternalLList<LinkedRunnable> fQueue; |
|
70 SkCondVar fReady; |
|
71 SkTDArray<SkThread*> fThreads; |
|
72 State fState; |
|
73 int fBusyThreads; |
|
74 |
|
75 static void Loop(void*); // Static because we pass in this. |
|
76 }; |
|
77 |
|
78 template <typename T> |
|
79 SkTThreadPool<T>::SkTThreadPool(int count) : fState(kRunning_State), fBusyThreads(0) { |
|
80 if (count < 0) { |
|
81 count = num_cores(); |
|
82 } |
|
83 // Create count threads, all running SkTThreadPool::Loop. |
|
84 for (int i = 0; i < count; i++) { |
|
85 SkThread* thread = SkNEW_ARGS(SkThread, (&SkTThreadPool::Loop, this)); |
|
86 *fThreads.append() = thread; |
|
87 thread->start(); |
|
88 } |
|
89 } |
|
90 |
|
91 template <typename T> |
|
92 SkTThreadPool<T>::~SkTThreadPool() { |
|
93 if (kRunning_State == fState) { |
|
94 this->wait(); |
|
95 } |
|
96 } |
|
97 |
|
98 namespace SkThreadPoolPrivate { |
|
99 |
|
100 template <typename T> |
|
101 struct ThreadLocal { |
|
102 void run(SkTRunnable<T>* r) { r->run(data); } |
|
103 T data; |
|
104 }; |
|
105 |
|
106 template <> |
|
107 struct ThreadLocal<void> { |
|
108 void run(SkTRunnable<void>* r) { r->run(); } |
|
109 }; |
|
110 |
|
111 } // namespace SkThreadPoolPrivate |
|
112 |
|
113 template <typename T> |
|
114 void SkTThreadPool<T>::add(SkTRunnable<T>* r) { |
|
115 if (r == NULL) { |
|
116 return; |
|
117 } |
|
118 |
|
119 if (fThreads.isEmpty()) { |
|
120 SkThreadPoolPrivate::ThreadLocal<T> threadLocal; |
|
121 threadLocal.run(r); |
|
122 return; |
|
123 } |
|
124 |
|
125 LinkedRunnable* linkedRunnable = SkNEW(LinkedRunnable); |
|
126 linkedRunnable->fRunnable = r; |
|
127 fReady.lock(); |
|
128 SkASSERT(fState != kHalting_State); // Shouldn't be able to add work when we're halting. |
|
129 fQueue.addToHead(linkedRunnable); |
|
130 fReady.signal(); |
|
131 fReady.unlock(); |
|
132 } |
|
133 |
|
134 |
|
135 template <typename T> |
|
136 void SkTThreadPool<T>::wait() { |
|
137 fReady.lock(); |
|
138 fState = kWaiting_State; |
|
139 fReady.broadcast(); |
|
140 fReady.unlock(); |
|
141 |
|
142 // Wait for all threads to stop. |
|
143 for (int i = 0; i < fThreads.count(); i++) { |
|
144 fThreads[i]->join(); |
|
145 SkDELETE(fThreads[i]); |
|
146 } |
|
147 SkASSERT(fQueue.isEmpty()); |
|
148 } |
|
149 |
|
150 template <typename T> |
|
151 /*static*/ void SkTThreadPool<T>::Loop(void* arg) { |
|
152 // The SkTThreadPool passes itself as arg to each thread as they're created. |
|
153 SkTThreadPool<T>* pool = static_cast<SkTThreadPool<T>*>(arg); |
|
154 SkThreadPoolPrivate::ThreadLocal<T> threadLocal; |
|
155 |
|
156 while (true) { |
|
157 // We have to be holding the lock to read the queue and to call wait. |
|
158 pool->fReady.lock(); |
|
159 while(pool->fQueue.isEmpty()) { |
|
160 // Does the client want to stop and are all the threads ready to stop? |
|
161 // If so, we move into the halting state, and whack all the threads so they notice. |
|
162 if (kWaiting_State == pool->fState && pool->fBusyThreads == 0) { |
|
163 pool->fState = kHalting_State; |
|
164 pool->fReady.broadcast(); |
|
165 } |
|
166 // Any time we find ourselves in the halting state, it's quitting time. |
|
167 if (kHalting_State == pool->fState) { |
|
168 pool->fReady.unlock(); |
|
169 return; |
|
170 } |
|
171 // wait yields the lock while waiting, but will have it again when awoken. |
|
172 pool->fReady.wait(); |
|
173 } |
|
174 // We've got the lock back here, no matter if we ran wait or not. |
|
175 |
|
176 // The queue is not empty, so we have something to run. Claim it. |
|
177 LinkedRunnable* r = pool->fQueue.tail(); |
|
178 |
|
179 pool->fQueue.remove(r); |
|
180 |
|
181 // Having claimed our SkRunnable, we now give up the lock while we run it. |
|
182 // Otherwise, we'd only ever do work on one thread at a time, which rather |
|
183 // defeats the point of this code. |
|
184 pool->fBusyThreads++; |
|
185 pool->fReady.unlock(); |
|
186 |
|
187 // OK, now really do the work. |
|
188 threadLocal.run(r->fRunnable); |
|
189 SkDELETE(r); |
|
190 |
|
191 // Let everyone know we're not busy. |
|
192 pool->fReady.lock(); |
|
193 pool->fBusyThreads--; |
|
194 pool->fReady.unlock(); |
|
195 } |
|
196 |
|
197 SkASSERT(false); // Unreachable. The only exit happens when pool->fState is kHalting_State. |
|
198 } |
|
199 |
|
200 typedef SkTThreadPool<void> SkThreadPool; |
|
201 |
|
202 #endif |