EventTokenBucket.cpp (11135B)
1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* vim:set ts=2 sw=2 sts=2 et cindent: */ 3 /* This Source Code Form is subject to the terms of the Mozilla Public 4 * License, v. 2.0. If a copy of the MPL was not distributed with this 5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 6 7 #include "EventTokenBucket.h" 8 9 #include "nsICancelable.h" 10 #include "nsIIOService.h" 11 #include "nsNetCID.h" 12 #include "nsNetUtil.h" 13 #include "nsServiceManagerUtils.h" 14 #include "nsSocketTransportService2.h" 15 16 #include "mozilla/Components.h" 17 18 #ifdef DEBUG 19 # include "MainThreadUtils.h" 20 #endif 21 22 #ifdef XP_WIN 23 # include <windows.h> 24 # include <mmsystem.h> 25 #endif 26 27 namespace mozilla { 28 namespace net { 29 30 //////////////////////////////////////////// 31 // EventTokenBucketCancelable 32 //////////////////////////////////////////// 33 34 class TokenBucketCancelable : public nsICancelable { 35 public: 36 NS_DECL_THREADSAFE_ISUPPORTS 37 NS_DECL_NSICANCELABLE 38 39 explicit TokenBucketCancelable(class ATokenBucketEvent* event); 40 void Fire(); 41 42 private: 43 virtual ~TokenBucketCancelable() = default; 44 45 friend class EventTokenBucket; 46 ATokenBucketEvent* mEvent; 47 }; 48 49 NS_IMPL_ISUPPORTS(TokenBucketCancelable, nsICancelable) 50 51 TokenBucketCancelable::TokenBucketCancelable(ATokenBucketEvent* event) 52 : mEvent(event) {} 53 54 NS_IMETHODIMP 55 TokenBucketCancelable::Cancel(nsresult reason) { 56 MOZ_ASSERT(OnSocketThread(), "not on socket thread"); 57 mEvent = nullptr; 58 return NS_OK; 59 } 60 61 void TokenBucketCancelable::Fire() { 62 if (!mEvent) return; 63 64 ATokenBucketEvent* event = mEvent; 65 mEvent = nullptr; 66 event->OnTokenBucketAdmitted(); 67 } 68 69 //////////////////////////////////////////// 70 // EventTokenBucket 71 //////////////////////////////////////////// 72 73 NS_IMPL_ISUPPORTS(EventTokenBucket, nsITimerCallback, nsINamed) 74 75 // by default 1hz with no burst 76 EventTokenBucket::EventTokenBucket(uint32_t eventsPerSecond, uint32_t burstSize) 77 : mUnitCost(kUsecPerSec), 78 mMaxCredit(kUsecPerSec), 79 mCredit(kUsecPerSec), 80 mPaused(false), 81 mStopped(false), 82 mTimerArmed(false) 83 #ifdef XP_WIN 84 , 85 mFineGrainTimerInUse(false), 86 mFineGrainResetTimerArmed(false) 87 #endif 88 { 89 mLastUpdate = TimeStamp::Now(); 90 91 MOZ_ASSERT(NS_IsMainThread()); 92 93 nsresult rv; 94 nsCOMPtr<nsIEventTarget> sts; 95 nsCOMPtr<nsIIOService> ioService = do_GetIOService(&rv); 96 if (NS_SUCCEEDED(rv)) { 97 sts = mozilla::components::SocketTransport::Service(&rv); 98 } 99 if (NS_SUCCEEDED(rv)) mTimer = NS_NewTimer(sts); 100 SetRate(eventsPerSecond, burstSize); 101 } 102 103 EventTokenBucket::~EventTokenBucket() { 104 SOCKET_LOG( 105 ("EventTokenBucket::dtor %p events=%zu\n", this, mEvents.GetSize())); 106 107 CleanupTimers(); 108 109 // Complete any queued events to prevent hangs 110 while (mEvents.GetSize()) { 111 RefPtr<TokenBucketCancelable> cancelable = mEvents.PopFront(); 112 cancelable->Fire(); 113 } 114 } 115 116 void EventTokenBucket::CleanupTimers() { 117 if (mTimer && mTimerArmed) { 118 mTimer->Cancel(); 119 } 120 mTimer = nullptr; 121 mTimerArmed = false; 122 123 #ifdef XP_WIN 124 NormalTimers(); 125 if (mFineGrainResetTimer && mFineGrainResetTimerArmed) { 126 mFineGrainResetTimer->Cancel(); 127 } 128 mFineGrainResetTimer = nullptr; 129 mFineGrainResetTimerArmed = false; 130 #endif 131 } 132 133 void EventTokenBucket::SetRate(uint32_t eventsPerSecond, uint32_t burstSize) { 134 SOCKET_LOG(("EventTokenBucket::SetRate %p %u %u\n", this, eventsPerSecond, 135 burstSize)); 136 137 if (eventsPerSecond > kMaxHz) { 138 eventsPerSecond = kMaxHz; 139 SOCKET_LOG((" eventsPerSecond out of range\n")); 140 } 141 142 if (!eventsPerSecond) { 143 eventsPerSecond = 1; 144 SOCKET_LOG((" eventsPerSecond out of range\n")); 145 } 146 147 mUnitCost = kUsecPerSec / eventsPerSecond; 148 mMaxCredit = mUnitCost * burstSize; 149 if (mMaxCredit > kUsecPerSec * 60 * 15) { 150 SOCKET_LOG((" burstSize out of range\n")); 151 mMaxCredit = kUsecPerSec * 60 * 15; 152 } 153 mCredit = mMaxCredit; 154 mLastUpdate = TimeStamp::Now(); 155 } 156 157 void EventTokenBucket::ClearCredits() { 158 MOZ_ASSERT(OnSocketThread(), "not on socket thread"); 159 SOCKET_LOG(("EventTokenBucket::ClearCredits %p\n", this)); 160 mCredit = 0; 161 } 162 163 uint32_t EventTokenBucket::BurstEventsAvailable() { 164 MOZ_ASSERT(OnSocketThread(), "not on socket thread"); 165 return static_cast<uint32_t>(mCredit / mUnitCost); 166 } 167 168 uint32_t EventTokenBucket::QueuedEvents() { 169 MOZ_ASSERT(OnSocketThread(), "not on socket thread"); 170 return mEvents.GetSize(); 171 } 172 173 void EventTokenBucket::Pause() { 174 MOZ_ASSERT(OnSocketThread(), "not on socket thread"); 175 SOCKET_LOG(("EventTokenBucket::Pause %p\n", this)); 176 if (mPaused || mStopped) return; 177 178 mPaused = true; 179 if (mTimerArmed) { 180 mTimer->Cancel(); 181 mTimerArmed = false; 182 } 183 } 184 185 void EventTokenBucket::UnPause() { 186 MOZ_ASSERT(OnSocketThread(), "not on socket thread"); 187 SOCKET_LOG(("EventTokenBucket::UnPause %p\n", this)); 188 if (!mPaused || mStopped) return; 189 190 mPaused = false; 191 DispatchEvents(); 192 UpdateTimer(); 193 } 194 195 void EventTokenBucket::Stop() { 196 MOZ_ASSERT(OnSocketThread(), "not on socket thread"); 197 SOCKET_LOG(("EventTokenBucket::Stop %p armed=%d\n", this, mTimerArmed)); 198 mStopped = true; 199 CleanupTimers(); 200 201 // Complete any queued events to prevent hangs 202 while (mEvents.GetSize()) { 203 RefPtr<TokenBucketCancelable> cancelable = mEvents.PopFront(); 204 cancelable->Fire(); 205 } 206 } 207 208 nsresult EventTokenBucket::SubmitEvent(ATokenBucketEvent* event, 209 nsICancelable** cancelable) { 210 MOZ_ASSERT(OnSocketThread(), "not on socket thread"); 211 SOCKET_LOG(("EventTokenBucket::SubmitEvent %p\n", this)); 212 213 if (mStopped || !mTimer) return NS_ERROR_FAILURE; 214 215 UpdateCredits(); 216 217 RefPtr<TokenBucketCancelable> cancelEvent = new TokenBucketCancelable(event); 218 // When this function exits the cancelEvent needs 2 references, one for the 219 // mEvents queue and one for the caller of SubmitEvent() 220 221 *cancelable = do_AddRef(cancelEvent).take(); 222 223 if (mPaused || !TryImmediateDispatch(cancelEvent.get())) { 224 // queue it 225 SOCKET_LOG((" queued\n")); 226 mEvents.Push(cancelEvent.forget()); 227 UpdateTimer(); 228 } else { 229 SOCKET_LOG((" dispatched synchronously\n")); 230 } 231 232 return NS_OK; 233 } 234 235 bool EventTokenBucket::TryImmediateDispatch(TokenBucketCancelable* cancelable) { 236 if (mCredit < mUnitCost) return false; 237 238 mCredit -= mUnitCost; 239 cancelable->Fire(); 240 return true; 241 } 242 243 void EventTokenBucket::DispatchEvents() { 244 MOZ_ASSERT(OnSocketThread(), "not on socket thread"); 245 SOCKET_LOG(("EventTokenBucket::DispatchEvents %p %d\n", this, mPaused)); 246 if (mPaused || mStopped) return; 247 248 while (mEvents.GetSize() && mUnitCost <= mCredit) { 249 RefPtr<TokenBucketCancelable> cancelable = mEvents.PopFront(); 250 if (cancelable->mEvent) { 251 SOCKET_LOG( 252 ("EventTokenBucket::DispachEvents [%p] " 253 "Dispatching queue token bucket event cost=%" PRIu64 254 " credit=%" PRIu64 "\n", 255 this, mUnitCost, mCredit)); 256 mCredit -= mUnitCost; 257 cancelable->Fire(); 258 } 259 } 260 261 #ifdef XP_WIN 262 if (!mEvents.GetSize()) WantNormalTimers(); 263 #endif 264 } 265 266 void EventTokenBucket::UpdateTimer() { 267 MOZ_ASSERT(OnSocketThread(), "not on socket thread"); 268 if (mTimerArmed || mPaused || mStopped || !mEvents.GetSize() || !mTimer) { 269 return; 270 } 271 272 if (mCredit >= mUnitCost) return; 273 274 // determine the time needed to wait to accumulate enough credits to admit 275 // one more event and set the timer for that point. Always round it 276 // up because firing early doesn't help. 277 // 278 uint64_t deficit = mUnitCost - mCredit; 279 uint64_t msecWait = (deficit + (kUsecPerMsec - 1)) / kUsecPerMsec; 280 281 if (msecWait < 4) { // minimum wait 282 msecWait = 4; 283 } else if (msecWait > 60000) { // maximum wait 284 msecWait = 60000; 285 } 286 287 #ifdef XP_WIN 288 FineGrainTimers(); 289 #endif 290 291 SOCKET_LOG( 292 ("EventTokenBucket::UpdateTimer %p for %" PRIu64 "ms\n", this, msecWait)); 293 nsresult rv = mTimer->InitWithCallback(this, static_cast<uint32_t>(msecWait), 294 nsITimer::TYPE_ONE_SHOT); 295 mTimerArmed = NS_SUCCEEDED(rv); 296 } 297 298 NS_IMETHODIMP 299 EventTokenBucket::Notify(nsITimer* timer) { 300 MOZ_ASSERT(OnSocketThread(), "not on socket thread"); 301 302 #ifdef XP_WIN 303 if (timer == mFineGrainResetTimer) { 304 FineGrainResetTimerNotify(); 305 return NS_OK; 306 } 307 #endif 308 309 SOCKET_LOG(("EventTokenBucket::Notify() %p\n", this)); 310 mTimerArmed = false; 311 if (mStopped) return NS_OK; 312 313 UpdateCredits(); 314 DispatchEvents(); 315 UpdateTimer(); 316 317 return NS_OK; 318 } 319 320 NS_IMETHODIMP 321 EventTokenBucket::GetName(nsACString& aName) { 322 aName.AssignLiteral("EventTokenBucket"); 323 return NS_OK; 324 } 325 326 void EventTokenBucket::UpdateCredits() { 327 MOZ_ASSERT(OnSocketThread(), "not on socket thread"); 328 329 TimeStamp now = TimeStamp::Now(); 330 TimeDuration elapsed = now - mLastUpdate; 331 mLastUpdate = now; 332 333 mCredit += static_cast<uint64_t>(elapsed.ToMicroseconds()); 334 if (mCredit > mMaxCredit) mCredit = mMaxCredit; 335 SOCKET_LOG(("EventTokenBucket::UpdateCredits %p to %" PRIu64 " (%" PRIu64 336 " each.. %3.2f)\n", 337 this, mCredit, mUnitCost, (double)mCredit / mUnitCost)); 338 } 339 340 #ifdef XP_WIN 341 void EventTokenBucket::FineGrainTimers() { 342 SOCKET_LOG(("EventTokenBucket::FineGrainTimers %p mFineGrainTimerInUse=%d\n", 343 this, mFineGrainTimerInUse)); 344 345 mLastFineGrainTimerUse = TimeStamp::Now(); 346 347 if (mFineGrainTimerInUse) return; 348 349 if (mUnitCost > kCostFineGrainThreshold) return; 350 351 SOCKET_LOG( 352 ("EventTokenBucket::FineGrainTimers %p timeBeginPeriod()\n", this)); 353 354 mFineGrainTimerInUse = true; 355 timeBeginPeriod(1); 356 } 357 358 void EventTokenBucket::NormalTimers() { 359 if (!mFineGrainTimerInUse) return; 360 mFineGrainTimerInUse = false; 361 362 SOCKET_LOG(("EventTokenBucket::NormalTimers %p timeEndPeriod()\n", this)); 363 timeEndPeriod(1); 364 } 365 366 void EventTokenBucket::WantNormalTimers() { 367 if (!mFineGrainTimerInUse) return; 368 if (mFineGrainResetTimerArmed) return; 369 370 TimeDuration elapsed(TimeStamp::Now() - mLastFineGrainTimerUse); 371 static const TimeDuration fiveSeconds = TimeDuration::FromSeconds(5); 372 373 if (elapsed >= fiveSeconds) { 374 NormalTimers(); 375 return; 376 } 377 378 if (!mFineGrainResetTimer) mFineGrainResetTimer = NS_NewTimer(); 379 380 // if we can't delay the reset, just do it now 381 if (!mFineGrainResetTimer) { 382 NormalTimers(); 383 return; 384 } 385 386 // pad the callback out 100ms to avoid having to round trip this again if the 387 // timer calls back just a tad early. 388 SOCKET_LOG( 389 ("EventTokenBucket::WantNormalTimers %p " 390 "Will reset timer granularity after delay", 391 this)); 392 393 mFineGrainResetTimer->InitWithCallback( 394 this, 395 static_cast<uint32_t>((fiveSeconds - elapsed).ToMilliseconds()) + 100, 396 nsITimer::TYPE_ONE_SHOT); 397 mFineGrainResetTimerArmed = true; 398 } 399 400 void EventTokenBucket::FineGrainResetTimerNotify() { 401 SOCKET_LOG(("EventTokenBucket::FineGrainResetTimerNotify(%p) events = %zd\n", 402 this, mEvents.GetSize())); 403 mFineGrainResetTimerArmed = false; 404 405 // If we are currently processing events then wait for the queue to drain 406 // before trying to reset back to normal timers again 407 if (!mEvents.GetSize()) WantNormalTimers(); 408 } 409 410 #endif 411 412 } // namespace net 413 } // namespace mozilla