tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

ThrottleQueue.cpp (10456B)


      1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
      2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
      3 /* This Source Code Form is subject to the terms of the Mozilla Public
      4 * License, v. 2.0. If a copy of the MPL was not distributed with this
      5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      6 
      7 #include "ThrottleQueue.h"
      8 #include "mozilla/Components.h"
      9 #include "mozilla/net/InputChannelThrottleQueueParent.h"
     10 #include "nsISeekableStream.h"
     11 #include "nsIAsyncInputStream.h"
     12 #include "nsIOService.h"
     13 #include "nsSocketTransportService2.h"
     14 #include "nsStreamUtils.h"
     15 #include "nsNetUtil.h"
     16 
     17 namespace mozilla {
     18 namespace net {
     19 
     20 //-----------------------------------------------------------------------------
     21 
     22 class ThrottleInputStream final : public nsIAsyncInputStream,
     23                                  public nsISeekableStream {
     24 public:
     25  ThrottleInputStream(nsIInputStream* aStream, ThrottleQueue* aQueue);
     26 
     27  NS_DECL_THREADSAFE_ISUPPORTS
     28  NS_DECL_NSIINPUTSTREAM
     29  NS_DECL_NSISEEKABLESTREAM
     30  NS_DECL_NSITELLABLESTREAM
     31  NS_DECL_NSIASYNCINPUTSTREAM
     32 
     33  void AllowInput();
     34 
     35 private:
     36  ~ThrottleInputStream();
     37 
     38  nsCOMPtr<nsIInputStream> mStream;
     39  RefPtr<ThrottleQueue> mQueue;
     40  nsresult mClosedStatus;
     41 
     42  nsCOMPtr<nsIInputStreamCallback> mCallback;
     43  nsCOMPtr<nsIEventTarget> mEventTarget;
     44 };
     45 
     46 NS_IMPL_ISUPPORTS(ThrottleInputStream, nsIAsyncInputStream, nsIInputStream,
     47                  nsITellableStream, nsISeekableStream)
     48 
     49 ThrottleInputStream::ThrottleInputStream(nsIInputStream* aStream,
     50                                         ThrottleQueue* aQueue)
     51    : mStream(aStream), mQueue(aQueue), mClosedStatus(NS_OK) {
     52  MOZ_ASSERT(aQueue != nullptr);
     53 }
     54 
     55 ThrottleInputStream::~ThrottleInputStream() { Close(); }
     56 
     57 NS_IMETHODIMP
     58 ThrottleInputStream::Close() {
     59  if (NS_FAILED(mClosedStatus)) {
     60    return mClosedStatus;
     61  }
     62 
     63  if (mQueue) {
     64    mQueue->DequeueStream(this);
     65    mQueue = nullptr;
     66    mClosedStatus = NS_BASE_STREAM_CLOSED;
     67  }
     68  return mStream->Close();
     69 }
     70 
     71 NS_IMETHODIMP
     72 ThrottleInputStream::Available(uint64_t* aResult) {
     73  if (NS_FAILED(mClosedStatus)) {
     74    return mClosedStatus;
     75  }
     76 
     77  return mStream->Available(aResult);
     78 }
     79 
     80 NS_IMETHODIMP
     81 ThrottleInputStream::StreamStatus() {
     82  if (NS_FAILED(mClosedStatus)) {
     83    return mClosedStatus;
     84  }
     85 
     86  return mStream->StreamStatus();
     87 }
     88 
     89 NS_IMETHODIMP
     90 ThrottleInputStream::Read(char* aBuf, uint32_t aCount, uint32_t* aResult) {
     91  if (NS_FAILED(mClosedStatus)) {
     92    return mClosedStatus;
     93  }
     94 
     95  uint32_t realCount;
     96  nsresult rv = mQueue->Available(aCount, &realCount);
     97  if (NS_FAILED(rv)) {
     98    return rv;
     99  }
    100 
    101  if (realCount == 0) {
    102    return NS_BASE_STREAM_WOULD_BLOCK;
    103  }
    104 
    105  rv = mStream->Read(aBuf, realCount, aResult);
    106  if (NS_SUCCEEDED(rv) && *aResult > 0) {
    107    mQueue->RecordRead(*aResult);
    108  }
    109  return rv;
    110 }
    111 
    112 NS_IMETHODIMP
    113 ThrottleInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
    114                                  uint32_t aCount, uint32_t* aResult) {
    115  if (NS_FAILED(mClosedStatus)) {
    116    return mClosedStatus;
    117  }
    118 
    119  uint32_t realCount;
    120  nsresult rv = mQueue->Available(aCount, &realCount);
    121  if (NS_FAILED(rv)) {
    122    return rv;
    123  }
    124  MOZ_ASSERT(realCount <= aCount);
    125 
    126  if (realCount == 0) {
    127    return NS_BASE_STREAM_WOULD_BLOCK;
    128  }
    129 
    130  rv = mStream->ReadSegments(aWriter, aClosure, realCount, aResult);
    131  if (NS_SUCCEEDED(rv) && *aResult > 0) {
    132    mQueue->RecordRead(*aResult);
    133  }
    134  return rv;
    135 }
    136 
    137 NS_IMETHODIMP
    138 ThrottleInputStream::IsNonBlocking(bool* aNonBlocking) {
    139  *aNonBlocking = true;
    140  return NS_OK;
    141 }
    142 
    143 NS_IMETHODIMP
    144 ThrottleInputStream::Seek(int32_t aWhence, int64_t aOffset) {
    145  if (NS_FAILED(mClosedStatus)) {
    146    return mClosedStatus;
    147  }
    148 
    149  nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
    150  if (!sstream) {
    151    return NS_ERROR_FAILURE;
    152  }
    153 
    154  return sstream->Seek(aWhence, aOffset);
    155 }
    156 
    157 NS_IMETHODIMP
    158 ThrottleInputStream::Tell(int64_t* aResult) {
    159  if (NS_FAILED(mClosedStatus)) {
    160    return mClosedStatus;
    161  }
    162 
    163  nsCOMPtr<nsITellableStream> sstream = do_QueryInterface(mStream);
    164  if (!sstream) {
    165    return NS_ERROR_FAILURE;
    166  }
    167 
    168  return sstream->Tell(aResult);
    169 }
    170 
    171 NS_IMETHODIMP
    172 ThrottleInputStream::SetEOF() {
    173  if (NS_FAILED(mClosedStatus)) {
    174    return mClosedStatus;
    175  }
    176 
    177  nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
    178  if (!sstream) {
    179    return NS_ERROR_FAILURE;
    180  }
    181 
    182  return sstream->SetEOF();
    183 }
    184 
    185 NS_IMETHODIMP
    186 ThrottleInputStream::CloseWithStatus(nsresult aStatus) {
    187  if (NS_FAILED(mClosedStatus)) {
    188    // Already closed, ignore.
    189    return NS_OK;
    190  }
    191  if (NS_SUCCEEDED(aStatus)) {
    192    aStatus = NS_BASE_STREAM_CLOSED;
    193  }
    194 
    195  mClosedStatus = Close();
    196  if (NS_SUCCEEDED(mClosedStatus)) {
    197    mClosedStatus = aStatus;
    198  }
    199  return NS_OK;
    200 }
    201 
    202 NS_IMETHODIMP
    203 ThrottleInputStream::AsyncWait(nsIInputStreamCallback* aCallback,
    204                               uint32_t aFlags, uint32_t aRequestedCount,
    205                               nsIEventTarget* aEventTarget) {
    206  if (aFlags != 0) {
    207    return NS_ERROR_ILLEGAL_VALUE;
    208  }
    209 
    210  mCallback = aCallback;
    211  mEventTarget = aEventTarget;
    212  if (mCallback) {
    213    mQueue->QueueStream(this);
    214  } else {
    215    mQueue->DequeueStream(this);
    216  }
    217  return NS_OK;
    218 }
    219 
    220 void ThrottleInputStream::AllowInput() {
    221  MOZ_ASSERT(mCallback);
    222  nsCOMPtr<nsIInputStreamCallback> callbackEvent = NS_NewInputStreamReadyEvent(
    223      "ThrottleInputStream::AllowInput", mCallback, mEventTarget);
    224  mCallback = nullptr;
    225  mEventTarget = nullptr;
    226  callbackEvent->OnInputStreamReady(this);
    227 }
    228 
    229 //-----------------------------------------------------------------------------
    230 
    231 // static
    232 already_AddRefed<nsIInputChannelThrottleQueue> ThrottleQueue::Create() {
    233  MOZ_ASSERT(XRE_IsParentProcess());
    234 
    235  nsCOMPtr<nsIInputChannelThrottleQueue> tq;
    236  if (nsIOService::UseSocketProcess()) {
    237    tq = new InputChannelThrottleQueueParent();
    238  } else {
    239    tq = new ThrottleQueue();
    240  }
    241 
    242  return tq.forget();
    243 }
    244 
    245 NS_IMPL_ISUPPORTS(ThrottleQueue, nsIInputChannelThrottleQueue, nsITimerCallback,
    246                  nsINamed)
    247 
    248 ThrottleQueue::ThrottleQueue()
    249 
    250 {
    251  nsresult rv;
    252  nsCOMPtr<nsIEventTarget> sts;
    253  nsCOMPtr<nsIIOService> ioService = do_GetIOService(&rv);
    254  if (NS_SUCCEEDED(rv)) {
    255    sts = mozilla::components::SocketTransport::Service(&rv);
    256  }
    257  if (NS_SUCCEEDED(rv)) mTimer = NS_NewTimer(sts);
    258 }
    259 
    260 ThrottleQueue::~ThrottleQueue() {
    261  if (mTimer && mTimerArmed) {
    262    mTimer->Cancel();
    263  }
    264  mTimer = nullptr;
    265 }
    266 
    267 NS_IMETHODIMP
    268 ThrottleQueue::RecordRead(uint32_t aBytesRead) {
    269  MOZ_ASSERT(OnSocketThread(), "not on socket thread");
    270  ThrottleEntry entry;
    271  entry.mTime = TimeStamp::Now();
    272  entry.mBytesRead = aBytesRead;
    273  mReadEvents.AppendElement(entry);
    274  mBytesProcessed += aBytesRead;
    275  return NS_OK;
    276 }
    277 
    278 NS_IMETHODIMP
    279 ThrottleQueue::Available(uint32_t aRemaining, uint32_t* aAvailable) {
    280  MOZ_ASSERT(OnSocketThread(), "not on socket thread");
    281  TimeStamp now = TimeStamp::Now();
    282  TimeStamp oneSecondAgo = now - TimeDuration::FromSeconds(1);
    283  size_t i;
    284 
    285  // Remove all stale events.
    286  for (i = 0; i < mReadEvents.Length(); ++i) {
    287    if (mReadEvents[i].mTime >= oneSecondAgo) {
    288      break;
    289    }
    290  }
    291  mReadEvents.RemoveElementsAt(0, i);
    292 
    293  uint32_t totalBytes = 0;
    294  for (i = 0; i < mReadEvents.Length(); ++i) {
    295    totalBytes += mReadEvents[i].mBytesRead;
    296  }
    297 
    298  uint32_t spread = mMaxBytesPerSecond - mMeanBytesPerSecond;
    299  double prob = static_cast<double>(rand()) / RAND_MAX;
    300  uint32_t thisSliceBytes =
    301      mMeanBytesPerSecond - spread + static_cast<uint32_t>(2 * spread * prob);
    302 
    303  if (totalBytes >= thisSliceBytes) {
    304    *aAvailable = 0;
    305  } else {
    306    *aAvailable = std::min(thisSliceBytes, aRemaining);
    307  }
    308  return NS_OK;
    309 }
    310 
    311 NS_IMETHODIMP
    312 ThrottleQueue::Init(uint32_t aMeanBytesPerSecond, uint32_t aMaxBytesPerSecond) {
    313  // Can be called on any thread.
    314  if (aMeanBytesPerSecond == 0 || aMaxBytesPerSecond == 0 ||
    315      aMaxBytesPerSecond < aMeanBytesPerSecond) {
    316    return NS_ERROR_ILLEGAL_VALUE;
    317  }
    318 
    319  mMeanBytesPerSecond = aMeanBytesPerSecond;
    320  mMaxBytesPerSecond = aMaxBytesPerSecond;
    321  return NS_OK;
    322 }
    323 
    324 NS_IMETHODIMP
    325 ThrottleQueue::BytesProcessed(uint64_t* aResult) {
    326  *aResult = mBytesProcessed;
    327  return NS_OK;
    328 }
    329 
    330 NS_IMETHODIMP
    331 ThrottleQueue::WrapStream(nsIInputStream* aInputStream,
    332                          nsIAsyncInputStream** aResult) {
    333  nsCOMPtr<nsIAsyncInputStream> result =
    334      new ThrottleInputStream(aInputStream, this);
    335  result.forget(aResult);
    336  return NS_OK;
    337 }
    338 
    339 NS_IMETHODIMP
    340 ThrottleQueue::Notify(nsITimer* aTimer) {
    341  MOZ_ASSERT(OnSocketThread(), "not on socket thread");
    342  // A notified reader may need to push itself back on the queue.
    343  // Swap out the list of readers so that this works properly.
    344  nsTArray<RefPtr<ThrottleInputStream>> events = std::move(mAsyncEvents);
    345 
    346  // Optimistically notify all the waiting readers, and then let them
    347  // requeue if there isn't enough bandwidth.
    348  for (size_t i = 0; i < events.Length(); ++i) {
    349    events[i]->AllowInput();
    350  }
    351 
    352  mTimerArmed = false;
    353  return NS_OK;
    354 }
    355 
    356 NS_IMETHODIMP
    357 ThrottleQueue::GetName(nsACString& aName) {
    358  aName.AssignLiteral("net::ThrottleQueue");
    359  return NS_OK;
    360 }
    361 
    362 void ThrottleQueue::QueueStream(ThrottleInputStream* aStream) {
    363  MOZ_ASSERT(OnSocketThread(), "not on socket thread");
    364  if (mAsyncEvents.IndexOf(aStream) ==
    365      nsTArray<RefPtr<mozilla::net::ThrottleInputStream>>::NoIndex) {
    366    mAsyncEvents.AppendElement(aStream);
    367 
    368    if (!mTimerArmed) {
    369      uint32_t ms = 1000;
    370      if (mReadEvents.Length() > 0) {
    371        TimeStamp t = mReadEvents[0].mTime + TimeDuration::FromSeconds(1);
    372        TimeStamp now = TimeStamp::Now();
    373 
    374        if (t > now) {
    375          ms = static_cast<uint32_t>((t - now).ToMilliseconds());
    376        } else {
    377          ms = 1;
    378        }
    379      }
    380 
    381      if (NS_SUCCEEDED(
    382              mTimer->InitWithCallback(this, ms, nsITimer::TYPE_ONE_SHOT))) {
    383        mTimerArmed = true;
    384      }
    385    }
    386  }
    387 }
    388 
    389 void ThrottleQueue::DequeueStream(ThrottleInputStream* aStream) {
    390  MOZ_ASSERT(OnSocketThread(), "not on socket thread");
    391  mAsyncEvents.RemoveElement(aStream);
    392 }
    393 
    394 NS_IMETHODIMP
    395 ThrottleQueue::GetMeanBytesPerSecond(uint32_t* aMeanBytesPerSecond) {
    396  NS_ENSURE_ARG(aMeanBytesPerSecond);
    397 
    398  *aMeanBytesPerSecond = mMeanBytesPerSecond;
    399  return NS_OK;
    400 }
    401 
    402 NS_IMETHODIMP
    403 ThrottleQueue::GetMaxBytesPerSecond(uint32_t* aMaxBytesPerSecond) {
    404  NS_ENSURE_ARG(aMaxBytesPerSecond);
    405 
    406  *aMaxBytesPerSecond = mMaxBytesPerSecond;
    407  return NS_OK;
    408 }
    409 
    410 }  // namespace net
    411 }  // namespace mozilla