michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: #include "CacheIOThread.h" michael@0: #include "CacheFileIOManager.h" michael@0: michael@0: #include "nsIRunnable.h" michael@0: #include "nsISupportsImpl.h" michael@0: #include "nsPrintfCString.h" michael@0: #include "nsThreadUtils.h" michael@0: #include "mozilla/IOInterposer.h" michael@0: #include "mozilla/VisualEventTracer.h" michael@0: michael@0: namespace mozilla { michael@0: namespace net { michael@0: michael@0: CacheIOThread* CacheIOThread::sSelf = nullptr; michael@0: michael@0: NS_IMPL_ISUPPORTS(CacheIOThread, nsIThreadObserver) michael@0: michael@0: CacheIOThread::CacheIOThread() michael@0: : mMonitor("CacheIOThread") michael@0: , mThread(nullptr) michael@0: , mLowestLevelWaiting(LAST_LEVEL) michael@0: , mCurrentlyExecutingLevel(0) michael@0: , mHasXPCOMEvents(false) michael@0: , mRerunCurrentEvent(false) michael@0: , mShutdown(false) michael@0: { michael@0: sSelf = this; michael@0: } michael@0: michael@0: CacheIOThread::~CacheIOThread() michael@0: { michael@0: sSelf = nullptr; michael@0: #ifdef DEBUG michael@0: for (uint32_t level = 0; level < LAST_LEVEL; ++level) { michael@0: MOZ_ASSERT(!mEventQueue[level].Length()); michael@0: } michael@0: #endif michael@0: } michael@0: michael@0: nsresult CacheIOThread::Init() michael@0: { michael@0: mThread = PR_CreateThread(PR_USER_THREAD, ThreadFunc, this, michael@0: PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, michael@0: PR_JOINABLE_THREAD, 128 * 1024); michael@0: if (!mThread) michael@0: return NS_ERROR_FAILURE; michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: nsresult CacheIOThread::Dispatch(nsIRunnable* aRunnable, uint32_t aLevel) michael@0: { michael@0: NS_ENSURE_ARG(aLevel < LAST_LEVEL); michael@0: michael@0: MonitorAutoLock lock(mMonitor); michael@0: michael@0: if (mShutdown && (PR_GetCurrentThread() != mThread)) michael@0: return NS_ERROR_UNEXPECTED; michael@0: michael@0: return DispatchInternal(aRunnable, aLevel); michael@0: } michael@0: michael@0: nsresult CacheIOThread::DispatchAfterPendingOpens(nsIRunnable* aRunnable) michael@0: { michael@0: MonitorAutoLock lock(mMonitor); michael@0: michael@0: if (mShutdown && (PR_GetCurrentThread() != mThread)) michael@0: return NS_ERROR_UNEXPECTED; michael@0: michael@0: // Move everything from later executed OPEN level to the OPEN_PRIORITY level michael@0: // where we post the (eviction) runnable. michael@0: mEventQueue[OPEN_PRIORITY].AppendElements(mEventQueue[OPEN]); michael@0: mEventQueue[OPEN].Clear(); michael@0: michael@0: return DispatchInternal(aRunnable, OPEN_PRIORITY); michael@0: } michael@0: michael@0: nsresult CacheIOThread::DispatchInternal(nsIRunnable* aRunnable, uint32_t aLevel) michael@0: { michael@0: mMonitor.AssertCurrentThreadOwns(); michael@0: michael@0: mEventQueue[aLevel].AppendElement(aRunnable); michael@0: if (mLowestLevelWaiting > aLevel) michael@0: mLowestLevelWaiting = aLevel; michael@0: michael@0: mMonitor.NotifyAll(); michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: bool CacheIOThread::IsCurrentThread() michael@0: { michael@0: return mThread == PR_GetCurrentThread(); michael@0: } michael@0: michael@0: bool CacheIOThread::YieldInternal() michael@0: { michael@0: if (!IsCurrentThread()) { michael@0: NS_WARNING("Trying to yield to priority events on non-cache2 I/O thread? " michael@0: "You probably do something wrong."); michael@0: return false; michael@0: } michael@0: michael@0: if (mCurrentlyExecutingLevel == XPCOM_LEVEL) { michael@0: // Doesn't make any sense, since this handler is the one michael@0: // that would be executed as the next one. michael@0: return false; michael@0: } michael@0: michael@0: if (!EventsPending(mCurrentlyExecutingLevel)) michael@0: return false; michael@0: michael@0: mRerunCurrentEvent = true; michael@0: return true; michael@0: } michael@0: michael@0: nsresult CacheIOThread::Shutdown() michael@0: { michael@0: { michael@0: MonitorAutoLock lock(mMonitor); michael@0: mShutdown = true; michael@0: mMonitor.NotifyAll(); michael@0: } michael@0: michael@0: PR_JoinThread(mThread); michael@0: mThread = nullptr; michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: already_AddRefed CacheIOThread::Target() michael@0: { michael@0: nsCOMPtr target; michael@0: michael@0: target = mXPCOMThread; michael@0: if (!target && mThread) michael@0: { michael@0: MonitorAutoLock lock(mMonitor); michael@0: if (!mXPCOMThread) michael@0: lock.Wait(); michael@0: michael@0: target = mXPCOMThread; michael@0: } michael@0: michael@0: return target.forget(); michael@0: } michael@0: michael@0: // static michael@0: void CacheIOThread::ThreadFunc(void* aClosure) michael@0: { michael@0: PR_SetCurrentThreadName("Cache2 I/O"); michael@0: mozilla::IOInterposer::RegisterCurrentThread(); michael@0: CacheIOThread* thread = static_cast(aClosure); michael@0: thread->ThreadFunc(); michael@0: mozilla::IOInterposer::UnregisterCurrentThread(); michael@0: } michael@0: michael@0: void CacheIOThread::ThreadFunc() michael@0: { michael@0: nsCOMPtr threadInternal; michael@0: michael@0: { michael@0: MonitorAutoLock lock(mMonitor); michael@0: michael@0: // This creates nsThread for this PRThread michael@0: nsCOMPtr xpcomThread = NS_GetCurrentThread(); michael@0: michael@0: threadInternal = do_QueryInterface(xpcomThread); michael@0: if (threadInternal) michael@0: threadInternal->SetObserver(this); michael@0: michael@0: mXPCOMThread.swap(xpcomThread); michael@0: michael@0: lock.NotifyAll(); michael@0: michael@0: do { michael@0: loopStart: michael@0: // Reset the lowest level now, so that we can detect a new event on michael@0: // a lower level (i.e. higher priority) has been scheduled while michael@0: // executing any previously scheduled event. michael@0: mLowestLevelWaiting = LAST_LEVEL; michael@0: michael@0: // Process xpcom events first michael@0: while (mHasXPCOMEvents) { michael@0: eventtracer::AutoEventTracer tracer(this, eventtracer::eExec, eventtracer::eDone, michael@0: "net::cache::io::level(xpcom)"); michael@0: michael@0: mHasXPCOMEvents = false; michael@0: mCurrentlyExecutingLevel = XPCOM_LEVEL; michael@0: michael@0: MonitorAutoUnlock unlock(mMonitor); michael@0: michael@0: bool processedEvent; michael@0: nsresult rv; michael@0: do { michael@0: rv = mXPCOMThread->ProcessNextEvent(false, &processedEvent); michael@0: } while (NS_SUCCEEDED(rv) && processedEvent); michael@0: } michael@0: michael@0: uint32_t level; michael@0: for (level = 0; level < LAST_LEVEL; ++level) { michael@0: if (!mEventQueue[level].Length()) { michael@0: // no events on this level, go to the next level michael@0: continue; michael@0: } michael@0: michael@0: LoopOneLevel(level); michael@0: michael@0: // Go to the first (lowest) level again michael@0: goto loopStart; michael@0: } michael@0: michael@0: if (EventsPending()) michael@0: continue; michael@0: michael@0: if (mShutdown) michael@0: break; michael@0: michael@0: lock.Wait(PR_INTERVAL_NO_TIMEOUT); michael@0: michael@0: if (EventsPending()) michael@0: continue; michael@0: michael@0: } while (true); michael@0: michael@0: MOZ_ASSERT(!EventsPending()); michael@0: } // lock michael@0: michael@0: if (threadInternal) michael@0: threadInternal->SetObserver(nullptr); michael@0: } michael@0: michael@0: static const char* const sLevelTraceName[] = { michael@0: "net::cache::io::level(0)", michael@0: "net::cache::io::level(1)", michael@0: "net::cache::io::level(2)", michael@0: "net::cache::io::level(3)", michael@0: "net::cache::io::level(4)", michael@0: "net::cache::io::level(5)", michael@0: "net::cache::io::level(6)", michael@0: "net::cache::io::level(7)", michael@0: "net::cache::io::level(8)", michael@0: "net::cache::io::level(9)", michael@0: "net::cache::io::level(10)", michael@0: "net::cache::io::level(11)", michael@0: "net::cache::io::level(12)" michael@0: }; michael@0: michael@0: void CacheIOThread::LoopOneLevel(uint32_t aLevel) michael@0: { michael@0: eventtracer::AutoEventTracer tracer(this, eventtracer::eExec, eventtracer::eDone, michael@0: sLevelTraceName[aLevel]); michael@0: michael@0: nsTArray > events; michael@0: events.SwapElements(mEventQueue[aLevel]); michael@0: uint32_t length = events.Length(); michael@0: michael@0: mCurrentlyExecutingLevel = aLevel; michael@0: michael@0: bool returnEvents = false; michael@0: uint32_t index; michael@0: { michael@0: MonitorAutoUnlock unlock(mMonitor); michael@0: michael@0: for (index = 0; index < length; ++index) { michael@0: if (EventsPending(aLevel)) { michael@0: // Somebody scheduled a new event on a lower level, break and harry michael@0: // to execute it! Don't forget to return what we haven't exec. michael@0: returnEvents = true; michael@0: break; michael@0: } michael@0: michael@0: // Drop any previous flagging, only an event on the current level may set michael@0: // this flag. michael@0: mRerunCurrentEvent = false; michael@0: michael@0: events[index]->Run(); michael@0: michael@0: if (mRerunCurrentEvent) { michael@0: // The event handler yields to higher priority events and wants to rerun. michael@0: returnEvents = true; michael@0: break; michael@0: } michael@0: michael@0: // Release outside the lock. michael@0: events[index] = nullptr; michael@0: } michael@0: } michael@0: michael@0: if (returnEvents) michael@0: mEventQueue[aLevel].InsertElementsAt(0, events.Elements() + index, length - index); michael@0: } michael@0: michael@0: bool CacheIOThread::EventsPending(uint32_t aLastLevel) michael@0: { michael@0: return mLowestLevelWaiting < aLastLevel || mHasXPCOMEvents; michael@0: } michael@0: michael@0: NS_IMETHODIMP CacheIOThread::OnDispatchedEvent(nsIThreadInternal *thread) michael@0: { michael@0: MonitorAutoLock lock(mMonitor); michael@0: mHasXPCOMEvents = true; michael@0: MOZ_ASSERT(!mShutdown || (PR_GetCurrentThread() == mThread)); michael@0: lock.Notify(); michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP CacheIOThread::OnProcessNextEvent(nsIThreadInternal *thread, bool mayWait, uint32_t recursionDepth) michael@0: { michael@0: return NS_OK; michael@0: } michael@0: michael@0: NS_IMETHODIMP CacheIOThread::AfterProcessNextEvent(nsIThreadInternal *thread, uint32_t recursionDepth, michael@0: bool eventWasProcessed) michael@0: { michael@0: return NS_OK; michael@0: } michael@0: michael@0: // Memory reporting michael@0: michael@0: size_t CacheIOThread::SizeOfExcludingThis(mozilla::MallocSizeOf mallocSizeOf) const michael@0: { michael@0: MonitorAutoLock lock(const_cast(this)->mMonitor); michael@0: michael@0: size_t n = 0; michael@0: n += mallocSizeOf(mThread); michael@0: for (uint32_t level = 0; level < LAST_LEVEL; ++level) { michael@0: n += mEventQueue[level].SizeOfExcludingThis(mallocSizeOf); michael@0: // Events referenced by the queues are arbitrary objects we cannot be sure michael@0: // are reported elsewhere as well as probably not implementing nsISizeOf michael@0: // interface. Deliberatly omitting them from reporting here. michael@0: } michael@0: michael@0: return n; michael@0: } michael@0: michael@0: size_t CacheIOThread::SizeOfIncludingThis(mozilla::MallocSizeOf mallocSizeOf) const michael@0: { michael@0: return mallocSizeOf(this) + SizeOfExcludingThis(mallocSizeOf); michael@0: } michael@0: michael@0: } // net michael@0: } // mozilla