michael@0: /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ michael@0: /* vim:set ts=2 sw=2 sts=2 et cindent: */ michael@0: /* This Source Code Form is subject to the terms of the Mozilla Public michael@0: * License, v. 2.0. If a copy of the MPL was not distributed with this michael@0: * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ michael@0: michael@0: #include "EventTokenBucket.h" michael@0: michael@0: #include "nsICancelable.h" michael@0: #include "nsNetUtil.h" michael@0: #include "nsSocketTransportService2.h" michael@0: michael@0: #ifdef DEBUG michael@0: #include "MainThreadUtils.h" michael@0: #endif michael@0: michael@0: #ifdef XP_WIN michael@0: #include michael@0: #include michael@0: #endif michael@0: michael@0: extern PRThread *gSocketThread; michael@0: michael@0: namespace mozilla { michael@0: namespace net { michael@0: michael@0: //////////////////////////////////////////// michael@0: // EventTokenBucketCancelable michael@0: //////////////////////////////////////////// michael@0: michael@0: class TokenBucketCancelable : public nsICancelable michael@0: { michael@0: public: michael@0: NS_DECL_THREADSAFE_ISUPPORTS michael@0: NS_DECL_NSICANCELABLE michael@0: michael@0: TokenBucketCancelable(class ATokenBucketEvent *event); michael@0: virtual ~TokenBucketCancelable() {} michael@0: void Fire(); michael@0: michael@0: private: michael@0: friend class EventTokenBucket; michael@0: ATokenBucketEvent *mEvent; michael@0: }; michael@0: michael@0: NS_IMPL_ISUPPORTS(TokenBucketCancelable, nsICancelable) michael@0: michael@0: TokenBucketCancelable::TokenBucketCancelable(ATokenBucketEvent *event) michael@0: : mEvent(event) michael@0: { michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: TokenBucketCancelable::Cancel(nsresult reason) michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: mEvent = nullptr; michael@0: return NS_OK; michael@0: } michael@0: michael@0: void michael@0: TokenBucketCancelable::Fire() michael@0: { michael@0: if (!mEvent) michael@0: return; michael@0: michael@0: ATokenBucketEvent *event = mEvent; michael@0: mEvent = nullptr; michael@0: event->OnTokenBucketAdmitted(); michael@0: } michael@0: michael@0: //////////////////////////////////////////// michael@0: // EventTokenBucket michael@0: //////////////////////////////////////////// michael@0: michael@0: NS_IMPL_ISUPPORTS(EventTokenBucket, nsITimerCallback) michael@0: michael@0: // by default 1hz with no burst michael@0: EventTokenBucket::EventTokenBucket(uint32_t eventsPerSecond, michael@0: uint32_t burstSize) michael@0: : mUnitCost(kUsecPerSec) michael@0: , mMaxCredit(kUsecPerSec) michael@0: , mCredit(kUsecPerSec) michael@0: , mPaused(false) michael@0: , mStopped(false) michael@0: , mTimerArmed(false) michael@0: #ifdef XP_WIN michael@0: , mFineGrainTimerInUse(false) michael@0: , mFineGrainResetTimerArmed(false) michael@0: #endif michael@0: { michael@0: MOZ_COUNT_CTOR(EventTokenBucket); michael@0: mLastUpdate = TimeStamp::Now(); michael@0: michael@0: MOZ_ASSERT(NS_IsMainThread()); michael@0: michael@0: nsresult rv; michael@0: nsCOMPtr sts; michael@0: nsCOMPtr ioService = do_GetIOService(&rv); michael@0: if (NS_SUCCEEDED(rv)) michael@0: sts = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv); michael@0: if (NS_SUCCEEDED(rv)) michael@0: mTimer = do_CreateInstance("@mozilla.org/timer;1"); michael@0: if (mTimer) michael@0: mTimer->SetTarget(sts); michael@0: SetRate(eventsPerSecond, burstSize); michael@0: } michael@0: michael@0: EventTokenBucket::~EventTokenBucket() michael@0: { michael@0: SOCKET_LOG(("EventTokenBucket::dtor %p events=%d\n", michael@0: this, mEvents.GetSize())); michael@0: michael@0: MOZ_COUNT_DTOR(EventTokenBucket); michael@0: if (mTimer && mTimerArmed) michael@0: mTimer->Cancel(); michael@0: michael@0: #ifdef XP_WIN michael@0: NormalTimers(); michael@0: if (mFineGrainResetTimerArmed) { michael@0: mFineGrainResetTimerArmed = false; michael@0: mFineGrainResetTimer->Cancel(); michael@0: } michael@0: #endif michael@0: michael@0: // Complete any queued events to prevent hangs michael@0: while (mEvents.GetSize()) { michael@0: nsRefPtr cancelable = michael@0: dont_AddRef(static_cast(mEvents.PopFront())); michael@0: cancelable->Fire(); michael@0: } michael@0: } michael@0: michael@0: void michael@0: EventTokenBucket::SetRate(uint32_t eventsPerSecond, michael@0: uint32_t burstSize) michael@0: { michael@0: SOCKET_LOG(("EventTokenBucket::SetRate %p %u %u\n", michael@0: this, eventsPerSecond, burstSize)); michael@0: michael@0: if (eventsPerSecond > kMaxHz) { michael@0: eventsPerSecond = kMaxHz; michael@0: SOCKET_LOG((" eventsPerSecond out of range\n")); michael@0: } michael@0: michael@0: if (!eventsPerSecond) { michael@0: eventsPerSecond = 1; michael@0: SOCKET_LOG((" eventsPerSecond out of range\n")); michael@0: } michael@0: michael@0: mUnitCost = kUsecPerSec / eventsPerSecond; michael@0: mMaxCredit = mUnitCost * burstSize; michael@0: if (mMaxCredit > kUsecPerSec * 60 * 15) { michael@0: SOCKET_LOG((" burstSize out of range\n")); michael@0: mMaxCredit = kUsecPerSec * 60 * 15; michael@0: } michael@0: mCredit = mMaxCredit; michael@0: mLastUpdate = TimeStamp::Now(); michael@0: } michael@0: michael@0: void michael@0: EventTokenBucket::ClearCredits() michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: SOCKET_LOG(("EventTokenBucket::ClearCredits %p\n", this)); michael@0: mCredit = 0; michael@0: } michael@0: michael@0: uint32_t michael@0: EventTokenBucket::BurstEventsAvailable() michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: return static_cast(mCredit / mUnitCost); michael@0: } michael@0: michael@0: uint32_t michael@0: EventTokenBucket::QueuedEvents() michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: return mEvents.GetSize(); michael@0: } michael@0: michael@0: void michael@0: EventTokenBucket::Pause() michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: SOCKET_LOG(("EventTokenBucket::Pause %p\n", this)); michael@0: if (mPaused || mStopped) michael@0: return; michael@0: michael@0: mPaused = true; michael@0: if (mTimerArmed) { michael@0: mTimer->Cancel(); michael@0: mTimerArmed = false; michael@0: } michael@0: } michael@0: michael@0: void michael@0: EventTokenBucket::UnPause() michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: SOCKET_LOG(("EventTokenBucket::UnPause %p\n", this)); michael@0: if (!mPaused || mStopped) michael@0: return; michael@0: michael@0: mPaused = false; michael@0: DispatchEvents(); michael@0: UpdateTimer(); michael@0: } michael@0: michael@0: nsresult michael@0: EventTokenBucket::SubmitEvent(ATokenBucketEvent *event, nsICancelable **cancelable) michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: SOCKET_LOG(("EventTokenBucket::SubmitEvent %p\n", this)); michael@0: michael@0: if (mStopped || !mTimer) michael@0: return NS_ERROR_FAILURE; michael@0: michael@0: UpdateCredits(); michael@0: michael@0: nsRefPtr cancelEvent = new TokenBucketCancelable(event); michael@0: // When this function exits the cancelEvent needs 2 references, one for the michael@0: // mEvents queue and one for the caller of SubmitEvent() michael@0: michael@0: NS_ADDREF(*cancelable = cancelEvent.get()); michael@0: michael@0: if (mPaused || !TryImmediateDispatch(cancelEvent.get())) { michael@0: // queue it michael@0: SOCKET_LOG((" queued\n")); michael@0: mEvents.Push(cancelEvent.forget().take()); michael@0: UpdateTimer(); michael@0: } michael@0: else { michael@0: SOCKET_LOG((" dispatched synchronously\n")); michael@0: } michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: bool michael@0: EventTokenBucket::TryImmediateDispatch(TokenBucketCancelable *cancelable) michael@0: { michael@0: if (mCredit < mUnitCost) michael@0: return false; michael@0: michael@0: mCredit -= mUnitCost; michael@0: cancelable->Fire(); michael@0: return true; michael@0: } michael@0: michael@0: void michael@0: EventTokenBucket::DispatchEvents() michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: SOCKET_LOG(("EventTokenBucket::DispatchEvents %p %d\n", this, mPaused)); michael@0: if (mPaused || mStopped) michael@0: return; michael@0: michael@0: while (mEvents.GetSize() && mUnitCost <= mCredit) { michael@0: nsRefPtr cancelable = michael@0: dont_AddRef(static_cast(mEvents.PopFront())); michael@0: if (cancelable->mEvent) { michael@0: SOCKET_LOG(("EventTokenBucket::DispachEvents [%p] " michael@0: "Dispatching queue token bucket event cost=%lu credit=%lu\n", michael@0: this, mUnitCost, mCredit)); michael@0: mCredit -= mUnitCost; michael@0: cancelable->Fire(); michael@0: } michael@0: } michael@0: michael@0: #ifdef XP_WIN michael@0: if (!mEvents.GetSize()) michael@0: WantNormalTimers(); michael@0: #endif michael@0: } michael@0: michael@0: void michael@0: EventTokenBucket::UpdateTimer() michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: if (mTimerArmed || mPaused || mStopped || !mEvents.GetSize() || !mTimer) michael@0: return; michael@0: michael@0: if (mCredit >= mUnitCost) michael@0: return; michael@0: michael@0: // determine the time needed to wait to accumulate enough credits to admit michael@0: // one more event and set the timer for that point. Always round it michael@0: // up because firing early doesn't help. michael@0: // michael@0: uint64_t deficit = mUnitCost - mCredit; michael@0: uint64_t msecWait = (deficit + (kUsecPerMsec - 1)) / kUsecPerMsec; michael@0: michael@0: if (msecWait < 4) // minimum wait michael@0: msecWait = 4; michael@0: else if (msecWait > 60000) // maximum wait michael@0: msecWait = 60000; michael@0: michael@0: #ifdef XP_WIN michael@0: FineGrainTimers(); michael@0: #endif michael@0: michael@0: SOCKET_LOG(("EventTokenBucket::UpdateTimer %p for %dms\n", michael@0: this, msecWait)); michael@0: nsresult rv = mTimer->InitWithCallback(this, static_cast(msecWait), michael@0: nsITimer::TYPE_ONE_SHOT); michael@0: mTimerArmed = NS_SUCCEEDED(rv); michael@0: } michael@0: michael@0: NS_IMETHODIMP michael@0: EventTokenBucket::Notify(nsITimer *timer) michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: michael@0: #ifdef XP_WIN michael@0: if (timer == mFineGrainResetTimer) { michael@0: FineGrainResetTimerNotify(); michael@0: return NS_OK; michael@0: } michael@0: #endif michael@0: michael@0: SOCKET_LOG(("EventTokenBucket::Notify() %p\n", this)); michael@0: mTimerArmed = false; michael@0: if (mStopped) michael@0: return NS_OK; michael@0: michael@0: UpdateCredits(); michael@0: DispatchEvents(); michael@0: UpdateTimer(); michael@0: michael@0: return NS_OK; michael@0: } michael@0: michael@0: void michael@0: EventTokenBucket::UpdateCredits() michael@0: { michael@0: MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); michael@0: michael@0: TimeStamp now = TimeStamp::Now(); michael@0: TimeDuration elapsed = now - mLastUpdate; michael@0: mLastUpdate = now; michael@0: michael@0: mCredit += static_cast(elapsed.ToMicroseconds()); michael@0: if (mCredit > mMaxCredit) michael@0: mCredit = mMaxCredit; michael@0: SOCKET_LOG(("EventTokenBucket::UpdateCredits %p to %lu (%lu each.. %3.2f)\n", michael@0: this, mCredit, mUnitCost, (double)mCredit / mUnitCost)); michael@0: } michael@0: michael@0: #ifdef XP_WIN michael@0: void michael@0: EventTokenBucket::FineGrainTimers() michael@0: { michael@0: SOCKET_LOG(("EventTokenBucket::FineGrainTimers %p mFineGrainTimerInUse=%d\n", michael@0: this, mFineGrainTimerInUse)); michael@0: michael@0: mLastFineGrainTimerUse = TimeStamp::Now(); michael@0: michael@0: if (mFineGrainTimerInUse) michael@0: return; michael@0: michael@0: if (mUnitCost > kCostFineGrainThreshold) michael@0: return; michael@0: michael@0: SOCKET_LOG(("EventTokenBucket::FineGrainTimers %p timeBeginPeriod()\n", michael@0: this)); michael@0: michael@0: mFineGrainTimerInUse = true; michael@0: timeBeginPeriod(1); michael@0: } michael@0: michael@0: void michael@0: EventTokenBucket::NormalTimers() michael@0: { michael@0: if (!mFineGrainTimerInUse) michael@0: return; michael@0: mFineGrainTimerInUse = false; michael@0: michael@0: SOCKET_LOG(("EventTokenBucket::NormalTimers %p timeEndPeriod()\n", this)); michael@0: timeEndPeriod(1); michael@0: } michael@0: michael@0: void michael@0: EventTokenBucket::WantNormalTimers() michael@0: { michael@0: if (!mFineGrainTimerInUse) michael@0: return; michael@0: if (mFineGrainResetTimerArmed) michael@0: return; michael@0: michael@0: TimeDuration elapsed(TimeStamp::Now() - mLastFineGrainTimerUse); michael@0: static const TimeDuration fiveSeconds = TimeDuration::FromSeconds(5); michael@0: michael@0: if (elapsed >= fiveSeconds) { michael@0: NormalTimers(); michael@0: return; michael@0: } michael@0: michael@0: if (!mFineGrainResetTimer) michael@0: mFineGrainResetTimer = do_CreateInstance("@mozilla.org/timer;1"); michael@0: michael@0: // if we can't delay the reset, just do it now michael@0: if (!mFineGrainResetTimer) { michael@0: NormalTimers(); michael@0: return; michael@0: } michael@0: michael@0: // pad the callback out 100ms to avoid having to round trip this again if the michael@0: // timer calls back just a tad early. michael@0: SOCKET_LOG(("EventTokenBucket::WantNormalTimers %p " michael@0: "Will reset timer granularity after delay", this)); michael@0: michael@0: mFineGrainResetTimer->InitWithCallback( michael@0: this, michael@0: static_cast((fiveSeconds - elapsed).ToMilliseconds()) + 100, michael@0: nsITimer::TYPE_ONE_SHOT); michael@0: mFineGrainResetTimerArmed = true; michael@0: } michael@0: michael@0: void michael@0: EventTokenBucket::FineGrainResetTimerNotify() michael@0: { michael@0: SOCKET_LOG(("EventTokenBucket::FineGrainResetTimerNotify() events = %d\n", michael@0: this, mEvents.GetSize())); michael@0: mFineGrainResetTimerArmed = false; michael@0: michael@0: // If we are currently processing events then wait for the queue to drain michael@0: // before trying to reset back to normal timers again michael@0: if (!mEvents.GetSize()) michael@0: WantNormalTimers(); michael@0: } michael@0: michael@0: #endif michael@0: michael@0: } // mozilla::net michael@0: } // mozilla