1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000 1.2 +++ b/netwerk/base/src/EventTokenBucket.cpp Wed Dec 31 06:09:35 2014 +0100 1.3 @@ -0,0 +1,437 @@ 1.4 +/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 1.5 +/* vim:set ts=2 sw=2 sts=2 et cindent: */ 1.6 +/* This Source Code Form is subject to the terms of the Mozilla Public 1.7 + * License, v. 2.0. If a copy of the MPL was not distributed with this 1.8 + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 1.9 + 1.10 +#include "EventTokenBucket.h" 1.11 + 1.12 +#include "nsICancelable.h" 1.13 +#include "nsNetUtil.h" 1.14 +#include "nsSocketTransportService2.h" 1.15 + 1.16 +#ifdef DEBUG 1.17 +#include "MainThreadUtils.h" 1.18 +#endif 1.19 + 1.20 +#ifdef XP_WIN 1.21 +#include <windows.h> 1.22 +#include <mmsystem.h> 1.23 +#endif 1.24 + 1.25 +extern PRThread *gSocketThread; 1.26 + 1.27 +namespace mozilla { 1.28 +namespace net { 1.29 + 1.30 +//////////////////////////////////////////// 1.31 +// EventTokenBucketCancelable 1.32 +//////////////////////////////////////////// 1.33 + 1.34 +class TokenBucketCancelable : public nsICancelable 1.35 +{ 1.36 +public: 1.37 + NS_DECL_THREADSAFE_ISUPPORTS 1.38 + NS_DECL_NSICANCELABLE 1.39 + 1.40 + TokenBucketCancelable(class ATokenBucketEvent *event); 1.41 + virtual ~TokenBucketCancelable() {} 1.42 + void Fire(); 1.43 + 1.44 +private: 1.45 + friend class EventTokenBucket; 1.46 + ATokenBucketEvent *mEvent; 1.47 +}; 1.48 + 1.49 +NS_IMPL_ISUPPORTS(TokenBucketCancelable, nsICancelable) 1.50 + 1.51 +TokenBucketCancelable::TokenBucketCancelable(ATokenBucketEvent *event) 1.52 + : mEvent(event) 1.53 +{ 1.54 +} 1.55 + 1.56 +NS_IMETHODIMP 1.57 +TokenBucketCancelable::Cancel(nsresult reason) 1.58 +{ 1.59 + MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); 1.60 + mEvent = nullptr; 1.61 + return NS_OK; 1.62 +} 1.63 + 1.64 +void 1.65 +TokenBucketCancelable::Fire() 1.66 +{ 1.67 + if (!mEvent) 1.68 + return; 1.69 + 1.70 + ATokenBucketEvent *event = mEvent; 1.71 + mEvent = nullptr; 1.72 + event->OnTokenBucketAdmitted(); 1.73 +} 1.74 + 1.75 +//////////////////////////////////////////// 1.76 +// EventTokenBucket 1.77 +//////////////////////////////////////////// 1.78 + 1.79 +NS_IMPL_ISUPPORTS(EventTokenBucket, nsITimerCallback) 1.80 + 1.81 +// by default 1hz with no burst 1.82 +EventTokenBucket::EventTokenBucket(uint32_t eventsPerSecond, 1.83 + uint32_t burstSize) 1.84 + : mUnitCost(kUsecPerSec) 1.85 + , mMaxCredit(kUsecPerSec) 1.86 + , mCredit(kUsecPerSec) 1.87 + , mPaused(false) 1.88 + , mStopped(false) 1.89 + , mTimerArmed(false) 1.90 +#ifdef XP_WIN 1.91 + , mFineGrainTimerInUse(false) 1.92 + , mFineGrainResetTimerArmed(false) 1.93 +#endif 1.94 +{ 1.95 + MOZ_COUNT_CTOR(EventTokenBucket); 1.96 + mLastUpdate = TimeStamp::Now(); 1.97 + 1.98 + MOZ_ASSERT(NS_IsMainThread()); 1.99 + 1.100 + nsresult rv; 1.101 + nsCOMPtr<nsIEventTarget> sts; 1.102 + nsCOMPtr<nsIIOService> ioService = do_GetIOService(&rv); 1.103 + if (NS_SUCCEEDED(rv)) 1.104 + sts = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv); 1.105 + if (NS_SUCCEEDED(rv)) 1.106 + mTimer = do_CreateInstance("@mozilla.org/timer;1"); 1.107 + if (mTimer) 1.108 + mTimer->SetTarget(sts); 1.109 + SetRate(eventsPerSecond, burstSize); 1.110 +} 1.111 + 1.112 +EventTokenBucket::~EventTokenBucket() 1.113 +{ 1.114 + SOCKET_LOG(("EventTokenBucket::dtor %p events=%d\n", 1.115 + this, mEvents.GetSize())); 1.116 + 1.117 + MOZ_COUNT_DTOR(EventTokenBucket); 1.118 + if (mTimer && mTimerArmed) 1.119 + mTimer->Cancel(); 1.120 + 1.121 +#ifdef XP_WIN 1.122 + NormalTimers(); 1.123 + if (mFineGrainResetTimerArmed) { 1.124 + mFineGrainResetTimerArmed = false; 1.125 + mFineGrainResetTimer->Cancel(); 1.126 + } 1.127 +#endif 1.128 + 1.129 + // Complete any queued events to prevent hangs 1.130 + while (mEvents.GetSize()) { 1.131 + nsRefPtr<TokenBucketCancelable> cancelable = 1.132 + dont_AddRef(static_cast<TokenBucketCancelable *>(mEvents.PopFront())); 1.133 + cancelable->Fire(); 1.134 + } 1.135 +} 1.136 + 1.137 +void 1.138 +EventTokenBucket::SetRate(uint32_t eventsPerSecond, 1.139 + uint32_t burstSize) 1.140 +{ 1.141 + SOCKET_LOG(("EventTokenBucket::SetRate %p %u %u\n", 1.142 + this, eventsPerSecond, burstSize)); 1.143 + 1.144 + if (eventsPerSecond > kMaxHz) { 1.145 + eventsPerSecond = kMaxHz; 1.146 + SOCKET_LOG((" eventsPerSecond out of range\n")); 1.147 + } 1.148 + 1.149 + if (!eventsPerSecond) { 1.150 + eventsPerSecond = 1; 1.151 + SOCKET_LOG((" eventsPerSecond out of range\n")); 1.152 + } 1.153 + 1.154 + mUnitCost = kUsecPerSec / eventsPerSecond; 1.155 + mMaxCredit = mUnitCost * burstSize; 1.156 + if (mMaxCredit > kUsecPerSec * 60 * 15) { 1.157 + SOCKET_LOG((" burstSize out of range\n")); 1.158 + mMaxCredit = kUsecPerSec * 60 * 15; 1.159 + } 1.160 + mCredit = mMaxCredit; 1.161 + mLastUpdate = TimeStamp::Now(); 1.162 +} 1.163 + 1.164 +void 1.165 +EventTokenBucket::ClearCredits() 1.166 +{ 1.167 + MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); 1.168 + SOCKET_LOG(("EventTokenBucket::ClearCredits %p\n", this)); 1.169 + mCredit = 0; 1.170 +} 1.171 + 1.172 +uint32_t 1.173 +EventTokenBucket::BurstEventsAvailable() 1.174 +{ 1.175 + MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); 1.176 + return static_cast<uint32_t>(mCredit / mUnitCost); 1.177 +} 1.178 + 1.179 +uint32_t 1.180 +EventTokenBucket::QueuedEvents() 1.181 +{ 1.182 + MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); 1.183 + return mEvents.GetSize(); 1.184 +} 1.185 + 1.186 +void 1.187 +EventTokenBucket::Pause() 1.188 +{ 1.189 + MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); 1.190 + SOCKET_LOG(("EventTokenBucket::Pause %p\n", this)); 1.191 + if (mPaused || mStopped) 1.192 + return; 1.193 + 1.194 + mPaused = true; 1.195 + if (mTimerArmed) { 1.196 + mTimer->Cancel(); 1.197 + mTimerArmed = false; 1.198 + } 1.199 +} 1.200 + 1.201 +void 1.202 +EventTokenBucket::UnPause() 1.203 +{ 1.204 + MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); 1.205 + SOCKET_LOG(("EventTokenBucket::UnPause %p\n", this)); 1.206 + if (!mPaused || mStopped) 1.207 + return; 1.208 + 1.209 + mPaused = false; 1.210 + DispatchEvents(); 1.211 + UpdateTimer(); 1.212 +} 1.213 + 1.214 +nsresult 1.215 +EventTokenBucket::SubmitEvent(ATokenBucketEvent *event, nsICancelable **cancelable) 1.216 +{ 1.217 + MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); 1.218 + SOCKET_LOG(("EventTokenBucket::SubmitEvent %p\n", this)); 1.219 + 1.220 + if (mStopped || !mTimer) 1.221 + return NS_ERROR_FAILURE; 1.222 + 1.223 + UpdateCredits(); 1.224 + 1.225 + nsRefPtr<TokenBucketCancelable> cancelEvent = new TokenBucketCancelable(event); 1.226 + // When this function exits the cancelEvent needs 2 references, one for the 1.227 + // mEvents queue and one for the caller of SubmitEvent() 1.228 + 1.229 + NS_ADDREF(*cancelable = cancelEvent.get()); 1.230 + 1.231 + if (mPaused || !TryImmediateDispatch(cancelEvent.get())) { 1.232 + // queue it 1.233 + SOCKET_LOG((" queued\n")); 1.234 + mEvents.Push(cancelEvent.forget().take()); 1.235 + UpdateTimer(); 1.236 + } 1.237 + else { 1.238 + SOCKET_LOG((" dispatched synchronously\n")); 1.239 + } 1.240 + 1.241 + return NS_OK; 1.242 +} 1.243 + 1.244 +bool 1.245 +EventTokenBucket::TryImmediateDispatch(TokenBucketCancelable *cancelable) 1.246 +{ 1.247 + if (mCredit < mUnitCost) 1.248 + return false; 1.249 + 1.250 + mCredit -= mUnitCost; 1.251 + cancelable->Fire(); 1.252 + return true; 1.253 +} 1.254 + 1.255 +void 1.256 +EventTokenBucket::DispatchEvents() 1.257 +{ 1.258 + MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); 1.259 + SOCKET_LOG(("EventTokenBucket::DispatchEvents %p %d\n", this, mPaused)); 1.260 + if (mPaused || mStopped) 1.261 + return; 1.262 + 1.263 + while (mEvents.GetSize() && mUnitCost <= mCredit) { 1.264 + nsRefPtr<TokenBucketCancelable> cancelable = 1.265 + dont_AddRef(static_cast<TokenBucketCancelable *>(mEvents.PopFront())); 1.266 + if (cancelable->mEvent) { 1.267 + SOCKET_LOG(("EventTokenBucket::DispachEvents [%p] " 1.268 + "Dispatching queue token bucket event cost=%lu credit=%lu\n", 1.269 + this, mUnitCost, mCredit)); 1.270 + mCredit -= mUnitCost; 1.271 + cancelable->Fire(); 1.272 + } 1.273 + } 1.274 + 1.275 +#ifdef XP_WIN 1.276 + if (!mEvents.GetSize()) 1.277 + WantNormalTimers(); 1.278 +#endif 1.279 +} 1.280 + 1.281 +void 1.282 +EventTokenBucket::UpdateTimer() 1.283 +{ 1.284 + MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); 1.285 + if (mTimerArmed || mPaused || mStopped || !mEvents.GetSize() || !mTimer) 1.286 + return; 1.287 + 1.288 + if (mCredit >= mUnitCost) 1.289 + return; 1.290 + 1.291 + // determine the time needed to wait to accumulate enough credits to admit 1.292 + // one more event and set the timer for that point. Always round it 1.293 + // up because firing early doesn't help. 1.294 + // 1.295 + uint64_t deficit = mUnitCost - mCredit; 1.296 + uint64_t msecWait = (deficit + (kUsecPerMsec - 1)) / kUsecPerMsec; 1.297 + 1.298 + if (msecWait < 4) // minimum wait 1.299 + msecWait = 4; 1.300 + else if (msecWait > 60000) // maximum wait 1.301 + msecWait = 60000; 1.302 + 1.303 +#ifdef XP_WIN 1.304 + FineGrainTimers(); 1.305 +#endif 1.306 + 1.307 + SOCKET_LOG(("EventTokenBucket::UpdateTimer %p for %dms\n", 1.308 + this, msecWait)); 1.309 + nsresult rv = mTimer->InitWithCallback(this, static_cast<uint32_t>(msecWait), 1.310 + nsITimer::TYPE_ONE_SHOT); 1.311 + mTimerArmed = NS_SUCCEEDED(rv); 1.312 +} 1.313 + 1.314 +NS_IMETHODIMP 1.315 +EventTokenBucket::Notify(nsITimer *timer) 1.316 +{ 1.317 + MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); 1.318 + 1.319 +#ifdef XP_WIN 1.320 + if (timer == mFineGrainResetTimer) { 1.321 + FineGrainResetTimerNotify(); 1.322 + return NS_OK; 1.323 + } 1.324 +#endif 1.325 + 1.326 + SOCKET_LOG(("EventTokenBucket::Notify() %p\n", this)); 1.327 + mTimerArmed = false; 1.328 + if (mStopped) 1.329 + return NS_OK; 1.330 + 1.331 + UpdateCredits(); 1.332 + DispatchEvents(); 1.333 + UpdateTimer(); 1.334 + 1.335 + return NS_OK; 1.336 +} 1.337 + 1.338 +void 1.339 +EventTokenBucket::UpdateCredits() 1.340 +{ 1.341 + MOZ_ASSERT(PR_GetCurrentThread() == gSocketThread); 1.342 + 1.343 + TimeStamp now = TimeStamp::Now(); 1.344 + TimeDuration elapsed = now - mLastUpdate; 1.345 + mLastUpdate = now; 1.346 + 1.347 + mCredit += static_cast<uint64_t>(elapsed.ToMicroseconds()); 1.348 + if (mCredit > mMaxCredit) 1.349 + mCredit = mMaxCredit; 1.350 + SOCKET_LOG(("EventTokenBucket::UpdateCredits %p to %lu (%lu each.. %3.2f)\n", 1.351 + this, mCredit, mUnitCost, (double)mCredit / mUnitCost)); 1.352 +} 1.353 + 1.354 +#ifdef XP_WIN 1.355 +void 1.356 +EventTokenBucket::FineGrainTimers() 1.357 +{ 1.358 + SOCKET_LOG(("EventTokenBucket::FineGrainTimers %p mFineGrainTimerInUse=%d\n", 1.359 + this, mFineGrainTimerInUse)); 1.360 + 1.361 + mLastFineGrainTimerUse = TimeStamp::Now(); 1.362 + 1.363 + if (mFineGrainTimerInUse) 1.364 + return; 1.365 + 1.366 + if (mUnitCost > kCostFineGrainThreshold) 1.367 + return; 1.368 + 1.369 + SOCKET_LOG(("EventTokenBucket::FineGrainTimers %p timeBeginPeriod()\n", 1.370 + this)); 1.371 + 1.372 + mFineGrainTimerInUse = true; 1.373 + timeBeginPeriod(1); 1.374 +} 1.375 + 1.376 +void 1.377 +EventTokenBucket::NormalTimers() 1.378 +{ 1.379 + if (!mFineGrainTimerInUse) 1.380 + return; 1.381 + mFineGrainTimerInUse = false; 1.382 + 1.383 + SOCKET_LOG(("EventTokenBucket::NormalTimers %p timeEndPeriod()\n", this)); 1.384 + timeEndPeriod(1); 1.385 +} 1.386 + 1.387 +void 1.388 +EventTokenBucket::WantNormalTimers() 1.389 +{ 1.390 + if (!mFineGrainTimerInUse) 1.391 + return; 1.392 + if (mFineGrainResetTimerArmed) 1.393 + return; 1.394 + 1.395 + TimeDuration elapsed(TimeStamp::Now() - mLastFineGrainTimerUse); 1.396 + static const TimeDuration fiveSeconds = TimeDuration::FromSeconds(5); 1.397 + 1.398 + if (elapsed >= fiveSeconds) { 1.399 + NormalTimers(); 1.400 + return; 1.401 + } 1.402 + 1.403 + if (!mFineGrainResetTimer) 1.404 + mFineGrainResetTimer = do_CreateInstance("@mozilla.org/timer;1"); 1.405 + 1.406 + // if we can't delay the reset, just do it now 1.407 + if (!mFineGrainResetTimer) { 1.408 + NormalTimers(); 1.409 + return; 1.410 + } 1.411 + 1.412 + // pad the callback out 100ms to avoid having to round trip this again if the 1.413 + // timer calls back just a tad early. 1.414 + SOCKET_LOG(("EventTokenBucket::WantNormalTimers %p " 1.415 + "Will reset timer granularity after delay", this)); 1.416 + 1.417 + mFineGrainResetTimer->InitWithCallback( 1.418 + this, 1.419 + static_cast<uint32_t>((fiveSeconds - elapsed).ToMilliseconds()) + 100, 1.420 + nsITimer::TYPE_ONE_SHOT); 1.421 + mFineGrainResetTimerArmed = true; 1.422 +} 1.423 + 1.424 +void 1.425 +EventTokenBucket::FineGrainResetTimerNotify() 1.426 +{ 1.427 + SOCKET_LOG(("EventTokenBucket::FineGrainResetTimerNotify() events = %d\n", 1.428 + this, mEvents.GetSize())); 1.429 + mFineGrainResetTimerArmed = false; 1.430 + 1.431 + // If we are currently processing events then wait for the queue to drain 1.432 + // before trying to reset back to normal timers again 1.433 + if (!mEvents.GetSize()) 1.434 + WantNormalTimers(); 1.435 +} 1.436 + 1.437 +#endif 1.438 + 1.439 +} // mozilla::net 1.440 +} // mozilla