ThrottleQueue.cpp (10456B)
1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */ 3 /* This Source Code Form is subject to the terms of the Mozilla Public 4 * License, v. 2.0. If a copy of the MPL was not distributed with this 5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 6 7 #include "ThrottleQueue.h" 8 #include "mozilla/Components.h" 9 #include "mozilla/net/InputChannelThrottleQueueParent.h" 10 #include "nsISeekableStream.h" 11 #include "nsIAsyncInputStream.h" 12 #include "nsIOService.h" 13 #include "nsSocketTransportService2.h" 14 #include "nsStreamUtils.h" 15 #include "nsNetUtil.h" 16 17 namespace mozilla { 18 namespace net { 19 20 //----------------------------------------------------------------------------- 21 22 class ThrottleInputStream final : public nsIAsyncInputStream, 23 public nsISeekableStream { 24 public: 25 ThrottleInputStream(nsIInputStream* aStream, ThrottleQueue* aQueue); 26 27 NS_DECL_THREADSAFE_ISUPPORTS 28 NS_DECL_NSIINPUTSTREAM 29 NS_DECL_NSISEEKABLESTREAM 30 NS_DECL_NSITELLABLESTREAM 31 NS_DECL_NSIASYNCINPUTSTREAM 32 33 void AllowInput(); 34 35 private: 36 ~ThrottleInputStream(); 37 38 nsCOMPtr<nsIInputStream> mStream; 39 RefPtr<ThrottleQueue> mQueue; 40 nsresult mClosedStatus; 41 42 nsCOMPtr<nsIInputStreamCallback> mCallback; 43 nsCOMPtr<nsIEventTarget> mEventTarget; 44 }; 45 46 NS_IMPL_ISUPPORTS(ThrottleInputStream, nsIAsyncInputStream, nsIInputStream, 47 nsITellableStream, nsISeekableStream) 48 49 ThrottleInputStream::ThrottleInputStream(nsIInputStream* aStream, 50 ThrottleQueue* aQueue) 51 : mStream(aStream), mQueue(aQueue), mClosedStatus(NS_OK) { 52 MOZ_ASSERT(aQueue != nullptr); 53 } 54 55 ThrottleInputStream::~ThrottleInputStream() { Close(); } 56 57 NS_IMETHODIMP 58 ThrottleInputStream::Close() { 59 if (NS_FAILED(mClosedStatus)) { 60 return mClosedStatus; 61 } 62 63 if (mQueue) { 64 mQueue->DequeueStream(this); 65 mQueue = nullptr; 66 mClosedStatus = NS_BASE_STREAM_CLOSED; 67 } 68 return mStream->Close(); 69 } 70 71 NS_IMETHODIMP 72 ThrottleInputStream::Available(uint64_t* aResult) { 73 if (NS_FAILED(mClosedStatus)) { 74 return mClosedStatus; 75 } 76 77 return mStream->Available(aResult); 78 } 79 80 NS_IMETHODIMP 81 ThrottleInputStream::StreamStatus() { 82 if (NS_FAILED(mClosedStatus)) { 83 return mClosedStatus; 84 } 85 86 return mStream->StreamStatus(); 87 } 88 89 NS_IMETHODIMP 90 ThrottleInputStream::Read(char* aBuf, uint32_t aCount, uint32_t* aResult) { 91 if (NS_FAILED(mClosedStatus)) { 92 return mClosedStatus; 93 } 94 95 uint32_t realCount; 96 nsresult rv = mQueue->Available(aCount, &realCount); 97 if (NS_FAILED(rv)) { 98 return rv; 99 } 100 101 if (realCount == 0) { 102 return NS_BASE_STREAM_WOULD_BLOCK; 103 } 104 105 rv = mStream->Read(aBuf, realCount, aResult); 106 if (NS_SUCCEEDED(rv) && *aResult > 0) { 107 mQueue->RecordRead(*aResult); 108 } 109 return rv; 110 } 111 112 NS_IMETHODIMP 113 ThrottleInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, 114 uint32_t aCount, uint32_t* aResult) { 115 if (NS_FAILED(mClosedStatus)) { 116 return mClosedStatus; 117 } 118 119 uint32_t realCount; 120 nsresult rv = mQueue->Available(aCount, &realCount); 121 if (NS_FAILED(rv)) { 122 return rv; 123 } 124 MOZ_ASSERT(realCount <= aCount); 125 126 if (realCount == 0) { 127 return NS_BASE_STREAM_WOULD_BLOCK; 128 } 129 130 rv = mStream->ReadSegments(aWriter, aClosure, realCount, aResult); 131 if (NS_SUCCEEDED(rv) && *aResult > 0) { 132 mQueue->RecordRead(*aResult); 133 } 134 return rv; 135 } 136 137 NS_IMETHODIMP 138 ThrottleInputStream::IsNonBlocking(bool* aNonBlocking) { 139 *aNonBlocking = true; 140 return NS_OK; 141 } 142 143 NS_IMETHODIMP 144 ThrottleInputStream::Seek(int32_t aWhence, int64_t aOffset) { 145 if (NS_FAILED(mClosedStatus)) { 146 return mClosedStatus; 147 } 148 149 nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream); 150 if (!sstream) { 151 return NS_ERROR_FAILURE; 152 } 153 154 return sstream->Seek(aWhence, aOffset); 155 } 156 157 NS_IMETHODIMP 158 ThrottleInputStream::Tell(int64_t* aResult) { 159 if (NS_FAILED(mClosedStatus)) { 160 return mClosedStatus; 161 } 162 163 nsCOMPtr<nsITellableStream> sstream = do_QueryInterface(mStream); 164 if (!sstream) { 165 return NS_ERROR_FAILURE; 166 } 167 168 return sstream->Tell(aResult); 169 } 170 171 NS_IMETHODIMP 172 ThrottleInputStream::SetEOF() { 173 if (NS_FAILED(mClosedStatus)) { 174 return mClosedStatus; 175 } 176 177 nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream); 178 if (!sstream) { 179 return NS_ERROR_FAILURE; 180 } 181 182 return sstream->SetEOF(); 183 } 184 185 NS_IMETHODIMP 186 ThrottleInputStream::CloseWithStatus(nsresult aStatus) { 187 if (NS_FAILED(mClosedStatus)) { 188 // Already closed, ignore. 189 return NS_OK; 190 } 191 if (NS_SUCCEEDED(aStatus)) { 192 aStatus = NS_BASE_STREAM_CLOSED; 193 } 194 195 mClosedStatus = Close(); 196 if (NS_SUCCEEDED(mClosedStatus)) { 197 mClosedStatus = aStatus; 198 } 199 return NS_OK; 200 } 201 202 NS_IMETHODIMP 203 ThrottleInputStream::AsyncWait(nsIInputStreamCallback* aCallback, 204 uint32_t aFlags, uint32_t aRequestedCount, 205 nsIEventTarget* aEventTarget) { 206 if (aFlags != 0) { 207 return NS_ERROR_ILLEGAL_VALUE; 208 } 209 210 mCallback = aCallback; 211 mEventTarget = aEventTarget; 212 if (mCallback) { 213 mQueue->QueueStream(this); 214 } else { 215 mQueue->DequeueStream(this); 216 } 217 return NS_OK; 218 } 219 220 void ThrottleInputStream::AllowInput() { 221 MOZ_ASSERT(mCallback); 222 nsCOMPtr<nsIInputStreamCallback> callbackEvent = NS_NewInputStreamReadyEvent( 223 "ThrottleInputStream::AllowInput", mCallback, mEventTarget); 224 mCallback = nullptr; 225 mEventTarget = nullptr; 226 callbackEvent->OnInputStreamReady(this); 227 } 228 229 //----------------------------------------------------------------------------- 230 231 // static 232 already_AddRefed<nsIInputChannelThrottleQueue> ThrottleQueue::Create() { 233 MOZ_ASSERT(XRE_IsParentProcess()); 234 235 nsCOMPtr<nsIInputChannelThrottleQueue> tq; 236 if (nsIOService::UseSocketProcess()) { 237 tq = new InputChannelThrottleQueueParent(); 238 } else { 239 tq = new ThrottleQueue(); 240 } 241 242 return tq.forget(); 243 } 244 245 NS_IMPL_ISUPPORTS(ThrottleQueue, nsIInputChannelThrottleQueue, nsITimerCallback, 246 nsINamed) 247 248 ThrottleQueue::ThrottleQueue() 249 250 { 251 nsresult rv; 252 nsCOMPtr<nsIEventTarget> sts; 253 nsCOMPtr<nsIIOService> ioService = do_GetIOService(&rv); 254 if (NS_SUCCEEDED(rv)) { 255 sts = mozilla::components::SocketTransport::Service(&rv); 256 } 257 if (NS_SUCCEEDED(rv)) mTimer = NS_NewTimer(sts); 258 } 259 260 ThrottleQueue::~ThrottleQueue() { 261 if (mTimer && mTimerArmed) { 262 mTimer->Cancel(); 263 } 264 mTimer = nullptr; 265 } 266 267 NS_IMETHODIMP 268 ThrottleQueue::RecordRead(uint32_t aBytesRead) { 269 MOZ_ASSERT(OnSocketThread(), "not on socket thread"); 270 ThrottleEntry entry; 271 entry.mTime = TimeStamp::Now(); 272 entry.mBytesRead = aBytesRead; 273 mReadEvents.AppendElement(entry); 274 mBytesProcessed += aBytesRead; 275 return NS_OK; 276 } 277 278 NS_IMETHODIMP 279 ThrottleQueue::Available(uint32_t aRemaining, uint32_t* aAvailable) { 280 MOZ_ASSERT(OnSocketThread(), "not on socket thread"); 281 TimeStamp now = TimeStamp::Now(); 282 TimeStamp oneSecondAgo = now - TimeDuration::FromSeconds(1); 283 size_t i; 284 285 // Remove all stale events. 286 for (i = 0; i < mReadEvents.Length(); ++i) { 287 if (mReadEvents[i].mTime >= oneSecondAgo) { 288 break; 289 } 290 } 291 mReadEvents.RemoveElementsAt(0, i); 292 293 uint32_t totalBytes = 0; 294 for (i = 0; i < mReadEvents.Length(); ++i) { 295 totalBytes += mReadEvents[i].mBytesRead; 296 } 297 298 uint32_t spread = mMaxBytesPerSecond - mMeanBytesPerSecond; 299 double prob = static_cast<double>(rand()) / RAND_MAX; 300 uint32_t thisSliceBytes = 301 mMeanBytesPerSecond - spread + static_cast<uint32_t>(2 * spread * prob); 302 303 if (totalBytes >= thisSliceBytes) { 304 *aAvailable = 0; 305 } else { 306 *aAvailable = std::min(thisSliceBytes, aRemaining); 307 } 308 return NS_OK; 309 } 310 311 NS_IMETHODIMP 312 ThrottleQueue::Init(uint32_t aMeanBytesPerSecond, uint32_t aMaxBytesPerSecond) { 313 // Can be called on any thread. 314 if (aMeanBytesPerSecond == 0 || aMaxBytesPerSecond == 0 || 315 aMaxBytesPerSecond < aMeanBytesPerSecond) { 316 return NS_ERROR_ILLEGAL_VALUE; 317 } 318 319 mMeanBytesPerSecond = aMeanBytesPerSecond; 320 mMaxBytesPerSecond = aMaxBytesPerSecond; 321 return NS_OK; 322 } 323 324 NS_IMETHODIMP 325 ThrottleQueue::BytesProcessed(uint64_t* aResult) { 326 *aResult = mBytesProcessed; 327 return NS_OK; 328 } 329 330 NS_IMETHODIMP 331 ThrottleQueue::WrapStream(nsIInputStream* aInputStream, 332 nsIAsyncInputStream** aResult) { 333 nsCOMPtr<nsIAsyncInputStream> result = 334 new ThrottleInputStream(aInputStream, this); 335 result.forget(aResult); 336 return NS_OK; 337 } 338 339 NS_IMETHODIMP 340 ThrottleQueue::Notify(nsITimer* aTimer) { 341 MOZ_ASSERT(OnSocketThread(), "not on socket thread"); 342 // A notified reader may need to push itself back on the queue. 343 // Swap out the list of readers so that this works properly. 344 nsTArray<RefPtr<ThrottleInputStream>> events = std::move(mAsyncEvents); 345 346 // Optimistically notify all the waiting readers, and then let them 347 // requeue if there isn't enough bandwidth. 348 for (size_t i = 0; i < events.Length(); ++i) { 349 events[i]->AllowInput(); 350 } 351 352 mTimerArmed = false; 353 return NS_OK; 354 } 355 356 NS_IMETHODIMP 357 ThrottleQueue::GetName(nsACString& aName) { 358 aName.AssignLiteral("net::ThrottleQueue"); 359 return NS_OK; 360 } 361 362 void ThrottleQueue::QueueStream(ThrottleInputStream* aStream) { 363 MOZ_ASSERT(OnSocketThread(), "not on socket thread"); 364 if (mAsyncEvents.IndexOf(aStream) == 365 nsTArray<RefPtr<mozilla::net::ThrottleInputStream>>::NoIndex) { 366 mAsyncEvents.AppendElement(aStream); 367 368 if (!mTimerArmed) { 369 uint32_t ms = 1000; 370 if (mReadEvents.Length() > 0) { 371 TimeStamp t = mReadEvents[0].mTime + TimeDuration::FromSeconds(1); 372 TimeStamp now = TimeStamp::Now(); 373 374 if (t > now) { 375 ms = static_cast<uint32_t>((t - now).ToMilliseconds()); 376 } else { 377 ms = 1; 378 } 379 } 380 381 if (NS_SUCCEEDED( 382 mTimer->InitWithCallback(this, ms, nsITimer::TYPE_ONE_SHOT))) { 383 mTimerArmed = true; 384 } 385 } 386 } 387 } 388 389 void ThrottleQueue::DequeueStream(ThrottleInputStream* aStream) { 390 MOZ_ASSERT(OnSocketThread(), "not on socket thread"); 391 mAsyncEvents.RemoveElement(aStream); 392 } 393 394 NS_IMETHODIMP 395 ThrottleQueue::GetMeanBytesPerSecond(uint32_t* aMeanBytesPerSecond) { 396 NS_ENSURE_ARG(aMeanBytesPerSecond); 397 398 *aMeanBytesPerSecond = mMeanBytesPerSecond; 399 return NS_OK; 400 } 401 402 NS_IMETHODIMP 403 ThrottleQueue::GetMaxBytesPerSecond(uint32_t* aMaxBytesPerSecond) { 404 NS_ENSURE_ARG(aMaxBytesPerSecond); 405 406 *aMaxBytesPerSecond = mMaxBytesPerSecond; 407 return NS_OK; 408 } 409 410 } // namespace net 411 } // namespace mozilla