diff -r 000000000000 -r 6474c204b198 dom/indexedDB/TransactionThreadPool.cpp --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/dom/indexedDB/TransactionThreadPool.cpp Wed Dec 31 06:09:35 2014 +0100 @@ -0,0 +1,685 @@ +/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/* vim: set ts=2 et sw=2 tw=80: */ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +#include "TransactionThreadPool.h" + +#include "nsIObserverService.h" +#include "nsIThreadPool.h" + +#include "nsComponentManagerUtils.h" +#include "nsThreadUtils.h" +#include "nsServiceManagerUtils.h" +#include "nsXPCOMCIDInternal.h" + +#include "ProfilerHelpers.h" + +using mozilla::MonitorAutoLock; + +USING_INDEXEDDB_NAMESPACE + +namespace { + +const uint32_t kThreadLimit = 20; +const uint32_t kIdleThreadLimit = 5; +const uint32_t kIdleThreadTimeoutMs = 30000; + +TransactionThreadPool* gThreadPool = nullptr; +bool gShutdown = false; + +#ifdef MOZ_ENABLE_PROFILER_SPS + +class TransactionThreadPoolListener : public nsIThreadPoolListener +{ +public: + NS_DECL_THREADSAFE_ISUPPORTS + NS_DECL_NSITHREADPOOLLISTENER + +private: + virtual ~TransactionThreadPoolListener() + { } +}; + +#endif // MOZ_ENABLE_PROFILER_SPS + +} // anonymous namespace + +BEGIN_INDEXEDDB_NAMESPACE + +class FinishTransactionRunnable MOZ_FINAL : public nsIRunnable +{ +public: + NS_DECL_THREADSAFE_ISUPPORTS + NS_DECL_NSIRUNNABLE + + inline FinishTransactionRunnable(IDBTransaction* aTransaction, + nsCOMPtr& aFinishRunnable); + +private: + IDBTransaction* mTransaction; + nsCOMPtr mFinishRunnable; +}; + +END_INDEXEDDB_NAMESPACE + +TransactionThreadPool::TransactionThreadPool() +{ + NS_ASSERTION(NS_IsMainThread(), "Wrong thread!"); + NS_ASSERTION(!gThreadPool, "More than one instance!"); +} + +TransactionThreadPool::~TransactionThreadPool() +{ + NS_ASSERTION(NS_IsMainThread(), "Wrong thread!"); + NS_ASSERTION(gThreadPool == this, "Different instances!"); + gThreadPool = nullptr; +} + +// static +TransactionThreadPool* +TransactionThreadPool::GetOrCreate() +{ + if (!gThreadPool && !gShutdown) { + NS_ASSERTION(NS_IsMainThread(), "Wrong thread!"); + nsAutoPtr pool(new TransactionThreadPool()); + + nsresult rv = pool->Init(); + NS_ENSURE_SUCCESS(rv, nullptr); + + gThreadPool = pool.forget(); + } + return gThreadPool; +} + +// static +TransactionThreadPool* +TransactionThreadPool::Get() +{ + return gThreadPool; +} + +// static +void +TransactionThreadPool::Shutdown() +{ + NS_ASSERTION(NS_IsMainThread(), "Wrong thread!"); + + gShutdown = true; + + if (gThreadPool) { + if (NS_FAILED(gThreadPool->Cleanup())) { + NS_WARNING("Failed to shutdown thread pool!"); + } + delete gThreadPool; + gThreadPool = nullptr; + } +} + +nsresult +TransactionThreadPool::Init() +{ + NS_ASSERTION(NS_IsMainThread(), "Wrong thread!"); + + nsresult rv; + mThreadPool = do_CreateInstance(NS_THREADPOOL_CONTRACTID, &rv); + NS_ENSURE_SUCCESS(rv, rv); + + rv = mThreadPool->SetName(NS_LITERAL_CSTRING("IndexedDB Trans")); + NS_ENSURE_SUCCESS(rv, rv); + + rv = mThreadPool->SetThreadLimit(kThreadLimit); + NS_ENSURE_SUCCESS(rv, rv); + + rv = mThreadPool->SetIdleThreadLimit(kIdleThreadLimit); + NS_ENSURE_SUCCESS(rv, rv); + + rv = mThreadPool->SetIdleThreadTimeout(kIdleThreadTimeoutMs); + NS_ENSURE_SUCCESS(rv, rv); + +#ifdef MOZ_ENABLE_PROFILER_SPS + nsCOMPtr listener = + new TransactionThreadPoolListener(); + + rv = mThreadPool->SetListener(listener); + NS_ENSURE_SUCCESS(rv, rv); +#endif + + return NS_OK; +} + +nsresult +TransactionThreadPool::Cleanup() +{ + NS_ASSERTION(NS_IsMainThread(), "Wrong thread!"); + + PROFILER_MAIN_THREAD_LABEL("IndexedDB", "TransactionThreadPool::Cleanup"); + + nsresult rv = mThreadPool->Shutdown(); + NS_ENSURE_SUCCESS(rv, rv); + + // Make sure the pool is still accessible while any callbacks generated from + // the other threads are processed. + rv = NS_ProcessPendingEvents(nullptr); + NS_ENSURE_SUCCESS(rv, rv); + + if (!mCompleteCallbacks.IsEmpty()) { + // Run all callbacks manually now. + for (uint32_t index = 0; index < mCompleteCallbacks.Length(); index++) { + mCompleteCallbacks[index].mCallback->Run(); + } + mCompleteCallbacks.Clear(); + + // And make sure they get processed. + rv = NS_ProcessPendingEvents(nullptr); + NS_ENSURE_SUCCESS(rv, rv); + } + + return NS_OK; +} + +// static +PLDHashOperator +TransactionThreadPool::MaybeUnblockTransaction(nsPtrHashKey* aKey, + void* aUserArg) +{ + NS_ASSERTION(NS_IsMainThread(), "Wrong thread!"); + + TransactionInfo* maybeUnblockedInfo = aKey->GetKey(); + TransactionInfo* finishedInfo = static_cast(aUserArg); + + NS_ASSERTION(maybeUnblockedInfo->blockedOn.Contains(finishedInfo), + "Huh?"); + maybeUnblockedInfo->blockedOn.RemoveEntry(finishedInfo); + if (!maybeUnblockedInfo->blockedOn.Count()) { + // Let this transaction run. + maybeUnblockedInfo->queue->Unblock(); + } + + return PL_DHASH_NEXT; +} + +void +TransactionThreadPool::FinishTransaction(IDBTransaction* aTransaction) +{ + NS_ASSERTION(NS_IsMainThread(), "Wrong thread!"); + NS_ASSERTION(aTransaction, "Null pointer!"); + + PROFILER_MAIN_THREAD_LABEL("IndexedDB", + "TransactionThreadPool::FinishTransaction"); + + // AddRef here because removing from the hash will call Release. + nsRefPtr transaction(aTransaction); + + const nsACString& databaseId = aTransaction->mDatabase->Id(); + + DatabaseTransactionInfo* dbTransactionInfo; + if (!mTransactionsInProgress.Get(databaseId, &dbTransactionInfo)) { + NS_ERROR("We don't know anyting about this database?!"); + return; + } + + DatabaseTransactionInfo::TransactionHashtable& transactionsInProgress = + dbTransactionInfo->transactions; + + uint32_t transactionCount = transactionsInProgress.Count(); + +#ifdef DEBUG + if (aTransaction->mMode == IDBTransaction::VERSION_CHANGE) { + NS_ASSERTION(transactionCount == 1, + "More transactions running than should be!"); + } +#endif + + if (transactionCount == 1) { +#ifdef DEBUG + { + const TransactionInfo* info = transactionsInProgress.Get(aTransaction); + NS_ASSERTION(info->transaction == aTransaction, "Transaction mismatch!"); + } +#endif + mTransactionsInProgress.Remove(databaseId); + + // See if we need to fire any complete callbacks. + uint32_t index = 0; + while (index < mCompleteCallbacks.Length()) { + if (MaybeFireCallback(mCompleteCallbacks[index])) { + mCompleteCallbacks.RemoveElementAt(index); + } + else { + index++; + } + } + + return; + } + TransactionInfo* info = transactionsInProgress.Get(aTransaction); + NS_ASSERTION(info, "We've never heard of this transaction?!?"); + + const nsTArray& objectStoreNames = aTransaction->mObjectStoreNames; + for (uint32_t index = 0, count = objectStoreNames.Length(); index < count; + index++) { + TransactionInfoPair* blockInfo = + dbTransactionInfo->blockingTransactions.Get(objectStoreNames[index]); + NS_ASSERTION(blockInfo, "Huh?"); + + if (aTransaction->mMode == IDBTransaction::READ_WRITE && + blockInfo->lastBlockingReads == info) { + blockInfo->lastBlockingReads = nullptr; + } + + uint32_t i = blockInfo->lastBlockingWrites.IndexOf(info); + if (i != blockInfo->lastBlockingWrites.NoIndex) { + blockInfo->lastBlockingWrites.RemoveElementAt(i); + } + } + + info->blocking.EnumerateEntries(MaybeUnblockTransaction, info); + + transactionsInProgress.Remove(aTransaction); +} + +TransactionThreadPool::TransactionQueue& +TransactionThreadPool::GetQueueForTransaction(IDBTransaction* aTransaction) +{ + NS_ASSERTION(NS_IsMainThread(), "Wrong thread!"); + NS_ASSERTION(aTransaction, "Null pointer!"); + + const nsACString& databaseId = aTransaction->mDatabase->Id(); + + const nsTArray& objectStoreNames = aTransaction->mObjectStoreNames; + const uint16_t mode = aTransaction->mMode; + + // See if we can run this transaction now. + DatabaseTransactionInfo* dbTransactionInfo; + if (!mTransactionsInProgress.Get(databaseId, &dbTransactionInfo)) { + // First transaction for this database. + dbTransactionInfo = new DatabaseTransactionInfo(); + mTransactionsInProgress.Put(databaseId, dbTransactionInfo); + } + + DatabaseTransactionInfo::TransactionHashtable& transactionsInProgress = + dbTransactionInfo->transactions; + TransactionInfo* info = transactionsInProgress.Get(aTransaction); + if (info) { + // We recognize this one. + return *info->queue; + } + + TransactionInfo* transactionInfo = new TransactionInfo(aTransaction); + + dbTransactionInfo->transactions.Put(aTransaction, transactionInfo);; + + for (uint32_t index = 0, count = objectStoreNames.Length(); index < count; + index++) { + TransactionInfoPair* blockInfo = + dbTransactionInfo->blockingTransactions.Get(objectStoreNames[index]); + if (!blockInfo) { + blockInfo = new TransactionInfoPair(); + blockInfo->lastBlockingReads = nullptr; + dbTransactionInfo->blockingTransactions.Put(objectStoreNames[index], + blockInfo); + } + + // Mark what we are blocking on. + if (blockInfo->lastBlockingReads) { + TransactionInfo* blockingInfo = blockInfo->lastBlockingReads; + transactionInfo->blockedOn.PutEntry(blockingInfo); + blockingInfo->blocking.PutEntry(transactionInfo); + } + + if (mode == IDBTransaction::READ_WRITE && + blockInfo->lastBlockingWrites.Length()) { + for (uint32_t index = 0, + count = blockInfo->lastBlockingWrites.Length(); index < count; + index++) { + TransactionInfo* blockingInfo = blockInfo->lastBlockingWrites[index]; + transactionInfo->blockedOn.PutEntry(blockingInfo); + blockingInfo->blocking.PutEntry(transactionInfo); + } + } + + if (mode == IDBTransaction::READ_WRITE) { + blockInfo->lastBlockingReads = transactionInfo; + blockInfo->lastBlockingWrites.Clear(); + } + else { + blockInfo->lastBlockingWrites.AppendElement(transactionInfo); + } + } + + if (!transactionInfo->blockedOn.Count()) { + transactionInfo->queue->Unblock(); + } + + return *transactionInfo->queue; +} + +nsresult +TransactionThreadPool::Dispatch(IDBTransaction* aTransaction, + nsIRunnable* aRunnable, + bool aFinish, + nsIRunnable* aFinishRunnable) +{ + NS_ASSERTION(NS_IsMainThread(), "Wrong thread!"); + NS_ASSERTION(aTransaction, "Null pointer!"); + NS_ASSERTION(aRunnable, "Null pointer!"); + + if (aTransaction->mDatabase->IsInvalidated() && !aFinish) { + return NS_ERROR_NOT_AVAILABLE; + } + + TransactionQueue& queue = GetQueueForTransaction(aTransaction); + + queue.Dispatch(aRunnable); + if (aFinish) { + queue.Finish(aFinishRunnable); + } + return NS_OK; +} + +void +TransactionThreadPool::WaitForDatabasesToComplete( + nsTArray >& aDatabases, + nsIRunnable* aCallback) +{ + NS_ASSERTION(NS_IsMainThread(), "Wrong thread!"); + NS_ASSERTION(!aDatabases.IsEmpty(), "No databases to wait on!"); + NS_ASSERTION(aCallback, "Null pointer!"); + + DatabasesCompleteCallback* callback = mCompleteCallbacks.AppendElement(); + + callback->mCallback = aCallback; + callback->mDatabases.SwapElements(aDatabases); + + if (MaybeFireCallback(*callback)) { + mCompleteCallbacks.RemoveElementAt(mCompleteCallbacks.Length() - 1); + } +} + +// static +PLDHashOperator +TransactionThreadPool::CollectTransactions(IDBTransaction* aKey, + TransactionInfo* aValue, + void* aUserArg) +{ + nsAutoTArray, 50>* transactionArray = + static_cast, 50>*>(aUserArg); + transactionArray->AppendElement(aKey); + + return PL_DHASH_NEXT; +} + +void +TransactionThreadPool::AbortTransactionsForDatabase(IDBDatabase* aDatabase) +{ + NS_ASSERTION(NS_IsMainThread(), "Wrong thread!"); + NS_ASSERTION(aDatabase, "Null pointer!"); + + PROFILER_MAIN_THREAD_LABEL("IndexedDB", + "TransactionThreadPool::" + "AbortTransactionsForDatabase"); + + // Get list of transactions for this database id + DatabaseTransactionInfo* dbTransactionInfo; + if (!mTransactionsInProgress.Get(aDatabase->Id(), &dbTransactionInfo)) { + // If there are no transactions, we're done. + return; + } + + // Collect any running transactions + DatabaseTransactionInfo::TransactionHashtable& transactionsInProgress = + dbTransactionInfo->transactions; + + NS_ASSERTION(transactionsInProgress.Count(), "Should never be 0!"); + + nsAutoTArray, 50> transactions; + transactionsInProgress.EnumerateRead(CollectTransactions, &transactions); + + // Abort transactions. Do this after collecting the transactions in case + // calling Abort() modifies the data structures we're iterating above. + for (uint32_t index = 0; index < transactions.Length(); index++) { + if (transactions[index]->Database() != aDatabase) { + continue; + } + + // This can fail, for example if the transaction is in the process of + // being comitted. That is expected and fine, so we ignore any returned + // errors. + ErrorResult rv; + transactions[index]->Abort(rv); + } +} + +struct MOZ_STACK_CLASS TransactionSearchInfo +{ + TransactionSearchInfo(nsIOfflineStorage* aDatabase) + : db(aDatabase), found(false) + { + } + + nsIOfflineStorage* db; + bool found; +}; + +// static +PLDHashOperator +TransactionThreadPool::FindTransaction(IDBTransaction* aKey, + TransactionInfo* aValue, + void* aUserArg) +{ + TransactionSearchInfo* info = static_cast(aUserArg); + + if (aKey->Database() == info->db) { + info->found = true; + return PL_DHASH_STOP; + } + + return PL_DHASH_NEXT; +} +bool +TransactionThreadPool::HasTransactionsForDatabase(IDBDatabase* aDatabase) +{ + NS_ASSERTION(NS_IsMainThread(), "Wrong thread!"); + NS_ASSERTION(aDatabase, "Null pointer!"); + + DatabaseTransactionInfo* dbTransactionInfo = nullptr; + dbTransactionInfo = mTransactionsInProgress.Get(aDatabase->Id()); + if (!dbTransactionInfo) { + return false; + } + + TransactionSearchInfo info(aDatabase); + dbTransactionInfo->transactions.EnumerateRead(FindTransaction, &info); + + return info.found; +} + +bool +TransactionThreadPool::MaybeFireCallback(DatabasesCompleteCallback aCallback) +{ + NS_ASSERTION(NS_IsMainThread(), "Wrong thread!"); + + PROFILER_MAIN_THREAD_LABEL("IndexedDB", + "TransactionThreadPool::MaybeFireCallback"); + + for (uint32_t index = 0; index < aCallback.mDatabases.Length(); index++) { + IDBDatabase* database = aCallback.mDatabases[index]; + if (!database) { + MOZ_CRASH(); + } + + if (mTransactionsInProgress.Get(database->Id(), nullptr)) { + return false; + } + } + + aCallback.mCallback->Run(); + return true; +} + +TransactionThreadPool:: +TransactionQueue::TransactionQueue(IDBTransaction* aTransaction) +: mMonitor("TransactionQueue::mMonitor"), + mTransaction(aTransaction), + mShouldFinish(false) +{ + NS_ASSERTION(NS_IsMainThread(), "Wrong thread!"); + NS_ASSERTION(aTransaction, "Null pointer!"); +} + +void +TransactionThreadPool::TransactionQueue::Unblock() +{ + MonitorAutoLock lock(mMonitor); + + // NB: Finish may be called before Unblock. + + TransactionThreadPool::Get()->mThreadPool-> + Dispatch(this, NS_DISPATCH_NORMAL); +} + +void +TransactionThreadPool::TransactionQueue::Dispatch(nsIRunnable* aRunnable) +{ + MonitorAutoLock lock(mMonitor); + + NS_ASSERTION(!mShouldFinish, "Dispatch called after Finish!"); + + mQueue.AppendElement(aRunnable); + + mMonitor.Notify(); +} + +void +TransactionThreadPool::TransactionQueue::Finish(nsIRunnable* aFinishRunnable) +{ + MonitorAutoLock lock(mMonitor); + + NS_ASSERTION(!mShouldFinish, "Finish called more than once!"); + + mShouldFinish = true; + mFinishRunnable = aFinishRunnable; + + mMonitor.Notify(); +} + +NS_IMPL_ISUPPORTS(TransactionThreadPool::TransactionQueue, nsIRunnable) + +NS_IMETHODIMP +TransactionThreadPool::TransactionQueue::Run() +{ + NS_ASSERTION(!NS_IsMainThread(), "Wrong thread!"); + NS_ASSERTION(IndexedDatabaseManager::IsMainProcess(), "Wrong process!"); + + PROFILER_LABEL("IndexedDB", "TransactionQueue::Run"); + + IDB_PROFILER_MARK("IndexedDB Transaction %llu: Beginning database work", + "IDBTransaction[%llu] DT Start", + mTransaction->GetSerialNumber()); + + nsAutoTArray, 10> queue; + nsCOMPtr finishRunnable; + bool shouldFinish = false; + + do { + NS_ASSERTION(queue.IsEmpty(), "Should have cleared this!"); + + { + MonitorAutoLock lock(mMonitor); + while (!mShouldFinish && mQueue.IsEmpty()) { + if (NS_FAILED(mMonitor.Wait())) { + NS_ERROR("Failed to wait!"); + } + } + + mQueue.SwapElements(queue); + if (mShouldFinish) { + mFinishRunnable.swap(finishRunnable); + shouldFinish = true; + } + } + + uint32_t count = queue.Length(); + for (uint32_t index = 0; index < count; index++) { + nsCOMPtr& runnable = queue[index]; + runnable->Run(); + runnable = nullptr; + } + + if (count) { + queue.Clear(); + } + } while (!shouldFinish); + + IDB_PROFILER_MARK("IndexedDB Transaction %llu: Finished database work", + "IDBTransaction[%llu] DT Done", + mTransaction->GetSerialNumber()); + + nsCOMPtr finishTransactionRunnable = + new FinishTransactionRunnable(mTransaction, finishRunnable); + if (NS_FAILED(NS_DispatchToMainThread(finishTransactionRunnable, + NS_DISPATCH_NORMAL))) { + NS_WARNING("Failed to dispatch finishTransactionRunnable!"); + } + + return NS_OK; +} + +FinishTransactionRunnable::FinishTransactionRunnable( + IDBTransaction* aTransaction, + nsCOMPtr& aFinishRunnable) +: mTransaction(aTransaction) +{ + NS_ASSERTION(!NS_IsMainThread(), "Wrong thread!"); + NS_ASSERTION(aTransaction, "Null pointer!"); + mFinishRunnable.swap(aFinishRunnable); +} + +NS_IMPL_ISUPPORTS(FinishTransactionRunnable, nsIRunnable) + +NS_IMETHODIMP +FinishTransactionRunnable::Run() +{ + NS_ASSERTION(NS_IsMainThread(), "Wrong thread!"); + + PROFILER_MAIN_THREAD_LABEL("IndexedDB", "FinishTransactionRunnable::Run"); + + if (!gThreadPool) { + NS_ERROR("Running after shutdown!"); + return NS_ERROR_FAILURE; + } + + gThreadPool->FinishTransaction(mTransaction); + + if (mFinishRunnable) { + mFinishRunnable->Run(); + mFinishRunnable = nullptr; + } + + return NS_OK; +} + +#ifdef MOZ_ENABLE_PROFILER_SPS + +NS_IMPL_ISUPPORTS(TransactionThreadPoolListener, nsIThreadPoolListener) + +NS_IMETHODIMP +TransactionThreadPoolListener::OnThreadCreated() +{ + MOZ_ASSERT(!NS_IsMainThread()); + char aLocal; + profiler_register_thread("IndexedDB Transaction", &aLocal); + return NS_OK; +} + +NS_IMETHODIMP +TransactionThreadPoolListener::OnThreadShuttingDown() +{ + MOZ_ASSERT(!NS_IsMainThread()); + profiler_unregister_thread(); + return NS_OK; +} + +#endif // MOZ_ENABLE_PROFILER_SPS