michael@0: // Copyright (c) 2012 The Chromium Authors. All rights reserved. michael@0: // Use of this source code is governed by a BSD-style license that can be michael@0: // found in the LICENSE file. michael@0: michael@0: #ifndef BASE_THREADING_SEQUENCED_WORKER_POOL_H_ michael@0: #define BASE_THREADING_SEQUENCED_WORKER_POOL_H_ michael@0: michael@0: #include michael@0: #include michael@0: michael@0: #include "base/base_export.h" michael@0: #include "base/basictypes.h" michael@0: #include "base/callback_forward.h" michael@0: #include "base/memory/ref_counted.h" michael@0: #include "base/memory/scoped_ptr.h" michael@0: #include "base/task_runner.h" michael@0: michael@0: namespace tracked_objects { michael@0: class Location; michael@0: } // namespace tracked_objects michael@0: michael@0: namespace base { michael@0: michael@0: class MessageLoopProxy; michael@0: michael@0: template class DeleteHelper; michael@0: michael@0: class SequencedTaskRunner; michael@0: michael@0: // A worker thread pool that enforces ordering between sets of tasks. It also michael@0: // allows you to specify what should happen to your tasks on shutdown. michael@0: // michael@0: // To enforce ordering, get a unique sequence token from the pool and post all michael@0: // tasks you want to order with the token. All tasks with the same token are michael@0: // guaranteed to execute serially, though not necessarily on the same thread. michael@0: // This means that: michael@0: // michael@0: // - No two tasks with the same token will run at the same time. michael@0: // michael@0: // - Given two tasks T1 and T2 with the same token such that T2 will michael@0: // run after T1, then T2 will start after T1 is destroyed. michael@0: // michael@0: // - If T2 will run after T1, then all memory changes in T1 and T1's michael@0: // destruction will be visible to T2. michael@0: // michael@0: // Example: michael@0: // SequencedWorkerPool::SequenceToken token = pool.GetSequenceToken(); michael@0: // pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN, michael@0: // FROM_HERE, base::Bind(...)); michael@0: // pool.PostSequencedWorkerTask(token, SequencedWorkerPool::SKIP_ON_SHUTDOWN, michael@0: // FROM_HERE, base::Bind(...)); michael@0: // michael@0: // You can make named sequence tokens to make it easier to share a token michael@0: // across different components. michael@0: // michael@0: // You can also post tasks to the pool without ordering using PostWorkerTask. michael@0: // These will be executed in an unspecified order. The order of execution michael@0: // between tasks with different sequence tokens is also unspecified. michael@0: // michael@0: // This class may be leaked on shutdown to facilitate fast shutdown. The michael@0: // expected usage, however, is to call Shutdown(), which correctly accounts michael@0: // for CONTINUE_ON_SHUTDOWN behavior and is required for BLOCK_SHUTDOWN michael@0: // behavior. michael@0: // michael@0: // Implementation note: This does not use a base::WorkerPool since that does michael@0: // not enforce shutdown semantics or allow us to specify how many worker michael@0: // threads to run. For the typical use case of random background work, we don't michael@0: // necessarily want to be super aggressive about creating threads. michael@0: // michael@0: // Note that SequencedWorkerPool is RefCountedThreadSafe (inherited michael@0: // from TaskRunner). michael@0: class BASE_EXPORT SequencedWorkerPool : public TaskRunner { michael@0: public: michael@0: // Defines what should happen to a task posted to the worker pool on michael@0: // shutdown. michael@0: enum WorkerShutdown { michael@0: // Tasks posted with this mode which have not run at shutdown will be michael@0: // deleted rather than run, and any tasks with this mode running at michael@0: // shutdown will be ignored (the worker thread will not be joined). michael@0: // michael@0: // This option provides a nice way to post stuff you don't want blocking michael@0: // shutdown. For example, you might be doing a slow DNS lookup and if it's michael@0: // blocked on the OS, you may not want to stop shutdown, since the result michael@0: // doesn't really matter at that point. michael@0: // michael@0: // However, you need to be very careful what you do in your callback when michael@0: // you use this option. Since the thread will continue to run until the OS michael@0: // terminates the process, the app can be in the process of tearing down michael@0: // when you're running. This means any singletons or global objects you michael@0: // use may suddenly become invalid out from under you. For this reason, michael@0: // it's best to use this only for slow but simple operations like the DNS michael@0: // example. michael@0: CONTINUE_ON_SHUTDOWN, michael@0: michael@0: // Tasks posted with this mode that have not started executing at michael@0: // shutdown will be deleted rather than executed. However, any tasks that michael@0: // have already begun executing when shutdown is called will be allowed michael@0: // to continue, and will block shutdown until completion. michael@0: // michael@0: // Note: Because Shutdown() may block while these tasks are executing, michael@0: // care must be taken to ensure that they do not block on the thread that michael@0: // called Shutdown(), as this may lead to deadlock. michael@0: SKIP_ON_SHUTDOWN, michael@0: michael@0: // Tasks posted with this mode will block shutdown until they're michael@0: // executed. Since this can have significant performance implications, michael@0: // use sparingly. michael@0: // michael@0: // Generally, this should be used only for user data, for example, a task michael@0: // writing a preference file. michael@0: // michael@0: // If a task is posted during shutdown, it will not get run since the michael@0: // workers may already be stopped. In this case, the post operation will michael@0: // fail (return false) and the task will be deleted. michael@0: BLOCK_SHUTDOWN, michael@0: }; michael@0: michael@0: // Opaque identifier that defines sequencing of tasks posted to the worker michael@0: // pool. michael@0: class SequenceToken { michael@0: public: michael@0: SequenceToken() : id_(0) {} michael@0: ~SequenceToken() {} michael@0: michael@0: bool Equals(const SequenceToken& other) const { michael@0: return id_ == other.id_; michael@0: } michael@0: michael@0: // Returns false if current thread is executing an unsequenced task. michael@0: bool IsValid() const { michael@0: return id_ != 0; michael@0: } michael@0: michael@0: private: michael@0: friend class SequencedWorkerPool; michael@0: michael@0: explicit SequenceToken(int id) : id_(id) {} michael@0: michael@0: int id_; michael@0: }; michael@0: michael@0: // Allows tests to perform certain actions. michael@0: class TestingObserver { michael@0: public: michael@0: virtual ~TestingObserver() {} michael@0: virtual void OnHasWork() = 0; michael@0: virtual void WillWaitForShutdown() = 0; michael@0: virtual void OnDestruct() = 0; michael@0: }; michael@0: michael@0: // Gets the SequencedToken of the current thread. michael@0: // If current thread is not a SequencedWorkerPool worker thread or is running michael@0: // an unsequenced task, returns an invalid SequenceToken. michael@0: static SequenceToken GetSequenceTokenForCurrentThread(); michael@0: michael@0: // When constructing a SequencedWorkerPool, there must be a michael@0: // MessageLoop on the current thread unless you plan to deliberately michael@0: // leak it. michael@0: michael@0: // Pass the maximum number of threads (they will be lazily created as needed) michael@0: // and a prefix for the thread name to aid in debugging. michael@0: SequencedWorkerPool(size_t max_threads, michael@0: const std::string& thread_name_prefix); michael@0: michael@0: // Like above, but with |observer| for testing. Does not take michael@0: // ownership of |observer|. michael@0: SequencedWorkerPool(size_t max_threads, michael@0: const std::string& thread_name_prefix, michael@0: TestingObserver* observer); michael@0: michael@0: // Returns a unique token that can be used to sequence tasks posted to michael@0: // PostSequencedWorkerTask(). Valid tokens are always nonzero. michael@0: SequenceToken GetSequenceToken(); michael@0: michael@0: // Returns the sequence token associated with the given name. Calling this michael@0: // function multiple times with the same string will always produce the michael@0: // same sequence token. If the name has not been used before, a new token michael@0: // will be created. michael@0: SequenceToken GetNamedSequenceToken(const std::string& name); michael@0: michael@0: // Returns a SequencedTaskRunner wrapper which posts to this michael@0: // SequencedWorkerPool using the given sequence token. Tasks with nonzero michael@0: // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay michael@0: // are posted with BLOCK_SHUTDOWN behavior. michael@0: scoped_refptr GetSequencedTaskRunner( michael@0: SequenceToken token); michael@0: michael@0: // Returns a SequencedTaskRunner wrapper which posts to this michael@0: // SequencedWorkerPool using the given sequence token. Tasks with nonzero michael@0: // delay are posted with SKIP_ON_SHUTDOWN behavior and tasks with zero delay michael@0: // are posted with the given shutdown behavior. michael@0: scoped_refptr GetSequencedTaskRunnerWithShutdownBehavior( michael@0: SequenceToken token, michael@0: WorkerShutdown shutdown_behavior); michael@0: michael@0: // Returns a TaskRunner wrapper which posts to this SequencedWorkerPool using michael@0: // the given shutdown behavior. Tasks with nonzero delay are posted with michael@0: // SKIP_ON_SHUTDOWN behavior and tasks with zero delay are posted with the michael@0: // given shutdown behavior. michael@0: scoped_refptr GetTaskRunnerWithShutdownBehavior( michael@0: WorkerShutdown shutdown_behavior); michael@0: michael@0: // Posts the given task for execution in the worker pool. Tasks posted with michael@0: // this function will execute in an unspecified order on a background thread. michael@0: // Returns true if the task was posted. If your tasks have ordering michael@0: // requirements, see PostSequencedWorkerTask(). michael@0: // michael@0: // This class will attempt to delete tasks that aren't run michael@0: // (non-block-shutdown semantics) but can't guarantee that this happens. If michael@0: // all worker threads are busy running CONTINUE_ON_SHUTDOWN tasks, there michael@0: // will be no workers available to delete these tasks. And there may be michael@0: // tasks with the same sequence token behind those CONTINUE_ON_SHUTDOWN michael@0: // tasks. Deleting those tasks before the previous one has completed could michael@0: // cause nondeterministic crashes because the task could be keeping some michael@0: // objects alive which do work in their destructor, which could voilate the michael@0: // assumptions of the running task. michael@0: // michael@0: // The task will be guaranteed to run to completion before shutdown michael@0: // (BLOCK_SHUTDOWN semantics). michael@0: // michael@0: // Returns true if the task was posted successfully. This may fail during michael@0: // shutdown regardless of the specified ShutdownBehavior. michael@0: bool PostWorkerTask(const tracked_objects::Location& from_here, michael@0: const Closure& task); michael@0: michael@0: // Same as PostWorkerTask but allows a delay to be specified (although doing michael@0: // so changes the shutdown behavior). The task will be run after the given michael@0: // delay has elapsed. michael@0: // michael@0: // If the delay is nonzero, the task won't be guaranteed to run to completion michael@0: // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs. michael@0: // If the delay is zero, this behaves exactly like PostWorkerTask, i.e. the michael@0: // task will be guaranteed to run to completion before shutdown michael@0: // (BLOCK_SHUTDOWN semantics). michael@0: bool PostDelayedWorkerTask(const tracked_objects::Location& from_here, michael@0: const Closure& task, michael@0: TimeDelta delay); michael@0: michael@0: // Same as PostWorkerTask but allows specification of the shutdown behavior. michael@0: bool PostWorkerTaskWithShutdownBehavior( michael@0: const tracked_objects::Location& from_here, michael@0: const Closure& task, michael@0: WorkerShutdown shutdown_behavior); michael@0: michael@0: // Like PostWorkerTask above, but provides sequencing semantics. This means michael@0: // that tasks posted with the same sequence token (see GetSequenceToken()) michael@0: // are guaranteed to execute in order. This is useful in cases where you're michael@0: // doing operations that may depend on previous ones, like appending to a michael@0: // file. michael@0: // michael@0: // The task will be guaranteed to run to completion before shutdown michael@0: // (BLOCK_SHUTDOWN semantics). michael@0: // michael@0: // Returns true if the task was posted successfully. This may fail during michael@0: // shutdown regardless of the specified ShutdownBehavior. michael@0: bool PostSequencedWorkerTask(SequenceToken sequence_token, michael@0: const tracked_objects::Location& from_here, michael@0: const Closure& task); michael@0: michael@0: // Like PostSequencedWorkerTask above, but allows you to specify a named michael@0: // token, which saves an extra call to GetNamedSequenceToken. michael@0: bool PostNamedSequencedWorkerTask(const std::string& token_name, michael@0: const tracked_objects::Location& from_here, michael@0: const Closure& task); michael@0: michael@0: // Same as PostSequencedWorkerTask but allows a delay to be specified michael@0: // (although doing so changes the shutdown behavior). The task will be run michael@0: // after the given delay has elapsed. michael@0: // michael@0: // If the delay is nonzero, the task won't be guaranteed to run to completion michael@0: // before shutdown (SKIP_ON_SHUTDOWN semantics) to avoid shutdown hangs. michael@0: // If the delay is zero, this behaves exactly like PostSequencedWorkerTask, michael@0: // i.e. the task will be guaranteed to run to completion before shutdown michael@0: // (BLOCK_SHUTDOWN semantics). michael@0: bool PostDelayedSequencedWorkerTask( michael@0: SequenceToken sequence_token, michael@0: const tracked_objects::Location& from_here, michael@0: const Closure& task, michael@0: TimeDelta delay); michael@0: michael@0: // Same as PostSequencedWorkerTask but allows specification of the shutdown michael@0: // behavior. michael@0: bool PostSequencedWorkerTaskWithShutdownBehavior( michael@0: SequenceToken sequence_token, michael@0: const tracked_objects::Location& from_here, michael@0: const Closure& task, michael@0: WorkerShutdown shutdown_behavior); michael@0: michael@0: // TaskRunner implementation. Forwards to PostDelayedWorkerTask(). michael@0: virtual bool PostDelayedTask(const tracked_objects::Location& from_here, michael@0: const Closure& task, michael@0: TimeDelta delay) OVERRIDE; michael@0: virtual bool RunsTasksOnCurrentThread() const OVERRIDE; michael@0: michael@0: // Returns true if the current thread is processing a task with the given michael@0: // sequence_token. michael@0: bool IsRunningSequenceOnCurrentThread(SequenceToken sequence_token) const; michael@0: michael@0: // Blocks until all pending tasks are complete. This should only be called in michael@0: // unit tests when you want to validate something that should have happened. michael@0: // This will not flush delayed tasks; delayed tasks get deleted. michael@0: // michael@0: // Note that calling this will not prevent other threads from posting work to michael@0: // the queue while the calling thread is waiting on Flush(). In this case, michael@0: // Flush will return only when there's no more work in the queue. Normally, michael@0: // this doesn't come up since in a test, all the work is being posted from michael@0: // the main thread. michael@0: void FlushForTesting(); michael@0: michael@0: // Spuriously signal that there is work to be done. michael@0: void SignalHasWorkForTesting(); michael@0: michael@0: // Implements the worker pool shutdown. This should be called during app michael@0: // shutdown, and will discard/join with appropriate tasks before returning. michael@0: // After this call, subsequent calls to post tasks will fail. michael@0: // michael@0: // Must be called from the same thread this object was constructed on. michael@0: void Shutdown() { Shutdown(0); } michael@0: michael@0: // A variant that allows an arbitrary number of new blocking tasks to michael@0: // be posted during shutdown from within tasks that execute during shutdown. michael@0: // Only tasks designated as BLOCKING_SHUTDOWN will be allowed, and only if michael@0: // posted by tasks that are not designated as CONTINUE_ON_SHUTDOWN. Once michael@0: // the limit is reached, subsequent calls to post task fail in all cases. michael@0: // michael@0: // Must be called from the same thread this object was constructed on. michael@0: void Shutdown(int max_new_blocking_tasks_after_shutdown); michael@0: michael@0: // Check if Shutdown was called for given threading pool. This method is used michael@0: // for aborting time consuming operation to avoid blocking shutdown. michael@0: // michael@0: // Can be called from any thread. michael@0: bool IsShutdownInProgress(); michael@0: michael@0: protected: michael@0: virtual ~SequencedWorkerPool(); michael@0: michael@0: virtual void OnDestruct() const OVERRIDE; michael@0: michael@0: private: michael@0: friend class RefCountedThreadSafe; michael@0: friend class DeleteHelper; michael@0: michael@0: class Inner; michael@0: class Worker; michael@0: michael@0: const scoped_refptr constructor_message_loop_; michael@0: michael@0: // Avoid pulling in too many headers by putting (almost) everything michael@0: // into |inner_|. michael@0: const scoped_ptr inner_; michael@0: michael@0: DISALLOW_COPY_AND_ASSIGN(SequencedWorkerPool); michael@0: }; michael@0: michael@0: } // namespace base michael@0: michael@0: #endif // BASE_THREADING_SEQUENCED_WORKER_POOL_H_