tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

nsInputStreamPump.cpp (27026B)


      1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
      2 /* vim:set ts=4 sts=2 sw=2 et cin: */
      3 /* This Source Code Form is subject to the terms of the Mozilla Public
      4 * License, v. 2.0. If a copy of the MPL was not distributed with this
      5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      6 
      7 #include "nsIOService.h"
      8 #include "nsInputStreamPump.h"
      9 #include "nsIStreamTransportService.h"
     10 #include "nsIThreadRetargetableStreamListener.h"
     11 #include "nsThreadUtils.h"
     12 #include "nsCOMPtr.h"
     13 #include "mozilla/Logging.h"
     14 #include "mozilla/NonBlockingAsyncInputStream.h"
     15 #include "mozilla/ProfilerLabels.h"
     16 #include "mozilla/SlicedInputStream.h"
     17 #include "mozilla/StaticPrefs_network.h"
     18 #include "nsIStreamListener.h"
     19 #include "nsILoadGroup.h"
     20 #include "nsNetCID.h"
     21 #include "nsNetUtil.h"
     22 #include "nsStreamUtils.h"
     23 #include <algorithm>
     24 
     25 //
     26 // MOZ_LOG=nsStreamPump:5
     27 //
     28 static mozilla::LazyLogModule gStreamPumpLog("nsStreamPump");
     29 #undef LOG
     30 #define LOG(args) MOZ_LOG(gStreamPumpLog, mozilla::LogLevel::Debug, args)
     31 
     32 //-----------------------------------------------------------------------------
     33 // nsInputStreamPump methods
     34 //-----------------------------------------------------------------------------
     35 
     36 nsInputStreamPump::nsInputStreamPump() : mOffMainThread(!NS_IsMainThread()) {}
     37 
     38 nsresult nsInputStreamPump::Create(nsInputStreamPump** result,
     39                                   nsIInputStream* stream, uint32_t segsize,
     40                                   uint32_t segcount, bool closeWhenDone,
     41                                   nsISerialEventTarget* mainThreadTarget) {
     42  nsresult rv = NS_ERROR_OUT_OF_MEMORY;
     43  RefPtr<nsInputStreamPump> pump = new nsInputStreamPump();
     44  if (pump) {
     45    rv = pump->Init(stream, segsize, segcount, closeWhenDone, mainThreadTarget);
     46    if (NS_SUCCEEDED(rv)) {
     47      pump.forget(result);
     48    }
     49  }
     50  return rv;
     51 }
     52 
     53 struct PeekData {
     54  PeekData(nsInputStreamPump::PeekSegmentFun fun, void* closure)
     55      : mFunc(fun), mClosure(closure) {}
     56 
     57  nsInputStreamPump::PeekSegmentFun mFunc;
     58  void* mClosure;
     59 };
     60 
     61 static nsresult CallPeekFunc(nsIInputStream* aInStream, void* aClosure,
     62                             const char* aFromSegment, uint32_t aToOffset,
     63                             uint32_t aCount, uint32_t* aWriteCount) {
     64  NS_ASSERTION(aToOffset == 0, "Called more than once?");
     65  NS_ASSERTION(aCount > 0, "Called without data?");
     66 
     67  PeekData* data = static_cast<PeekData*>(aClosure);
     68  data->mFunc(data->mClosure, reinterpret_cast<const uint8_t*>(aFromSegment),
     69              aCount);
     70  return NS_BINDING_ABORTED;
     71 }
     72 
     73 nsresult nsInputStreamPump::PeekStream(PeekSegmentFun callback, void* closure) {
     74  RecursiveMutexAutoLock lock(mMutex);
     75 
     76  if (!mAsyncStream) {
     77    MOZ_DIAGNOSTIC_ASSERT(false, "PeekStream called without stream");
     78    return NS_ERROR_NOT_AVAILABLE;
     79  }
     80 
     81  nsresult rv = CreateBufferedStreamIfNeeded();
     82  NS_ENSURE_SUCCESS(rv, rv);
     83 
     84  // See if the pipe is closed by checking the return of Available.
     85  uint64_t dummy64;
     86  rv = mAsyncStream->Available(&dummy64);
     87  if (NS_FAILED(rv)) return rv;
     88  uint32_t dummy = (uint32_t)std::min(dummy64, (uint64_t)UINT32_MAX);
     89 
     90  PeekData data(callback, closure);
     91  return mAsyncStream->ReadSegments(
     92      CallPeekFunc, &data, mozilla::net::nsIOService::gDefaultSegmentSize,
     93      &dummy);
     94 }
     95 
     96 nsresult nsInputStreamPump::EnsureWaiting() {
     97  mMutex.AssertCurrentThreadIn();
     98 
     99  // no need to worry about multiple threads... an input stream pump lives
    100  // on only one thread at a time.
    101  MOZ_ASSERT(mAsyncStream);
    102  if (!mWaitingForInputStreamReady && !mProcessingCallbacks) {
    103    // Ensure OnStateStop is called on the main thread only when this pump is
    104    // created on main thread.
    105    if (mState == STATE_STOP && !mOffMainThread) {
    106      nsCOMPtr<nsISerialEventTarget> mainThread =
    107          mLabeledMainThreadTarget
    108              ? mLabeledMainThreadTarget
    109              : do_AddRef(mozilla::GetMainThreadSerialEventTarget());
    110      if (mTargetThread != mainThread) {
    111        mTargetThread = mainThread;
    112      }
    113    }
    114    MOZ_ASSERT(mTargetThread);
    115    nsresult rv = mAsyncStream->AsyncWait(this, 0, 0, mTargetThread);
    116    if (NS_FAILED(rv)) {
    117      NS_ERROR("AsyncWait failed");
    118      return rv;
    119    }
    120    // Any retargeting during STATE_START or START_TRANSFER is complete
    121    // after the call to AsyncWait; next callback will be on mTargetThread.
    122    mRetargeting = false;
    123    mWaitingForInputStreamReady = true;
    124  }
    125  return NS_OK;
    126 }
    127 
    128 //-----------------------------------------------------------------------------
    129 // nsInputStreamPump::nsISupports
    130 //-----------------------------------------------------------------------------
    131 
    132 // although this class can only be accessed from one thread at a time, we do
    133 // allow its ownership to move from thread to thread, assuming the consumer
    134 // understands the limitations of this.
    135 NS_IMPL_ADDREF(nsInputStreamPump)
    136 NS_IMPL_RELEASE(nsInputStreamPump)
    137 NS_INTERFACE_MAP_BEGIN(nsInputStreamPump)
    138  NS_INTERFACE_MAP_ENTRY(nsIRequest)
    139  NS_INTERFACE_MAP_ENTRY(nsIThreadRetargetableRequest)
    140  NS_INTERFACE_MAP_ENTRY(nsIInputStreamCallback)
    141  NS_INTERFACE_MAP_ENTRY(nsIInputStreamPump)
    142  NS_INTERFACE_MAP_ENTRY_CONCRETE(nsInputStreamPump)
    143  NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIInputStreamPump)
    144 NS_INTERFACE_MAP_END
    145 
    146 //-----------------------------------------------------------------------------
    147 // nsInputStreamPump::nsIRequest
    148 //-----------------------------------------------------------------------------
    149 
    150 NS_IMETHODIMP
    151 nsInputStreamPump::GetName(nsACString& result) {
    152  RecursiveMutexAutoLock lock(mMutex);
    153 
    154  result.Truncate();
    155  return NS_OK;
    156 }
    157 
    158 NS_IMETHODIMP
    159 nsInputStreamPump::IsPending(bool* result) {
    160  RecursiveMutexAutoLock lock(mMutex);
    161 
    162  *result = (mState != STATE_IDLE && mState != STATE_DEAD);
    163  return NS_OK;
    164 }
    165 
    166 NS_IMETHODIMP
    167 nsInputStreamPump::GetStatus(nsresult* status) {
    168  RecursiveMutexAutoLock lock(mMutex);
    169 
    170  *status = mStatus;
    171  return NS_OK;
    172 }
    173 
    174 NS_IMETHODIMP nsInputStreamPump::SetCanceledReason(const nsACString& aReason) {
    175  return SetCanceledReasonImpl(aReason);
    176 }
    177 
    178 NS_IMETHODIMP nsInputStreamPump::GetCanceledReason(nsACString& aReason) {
    179  return GetCanceledReasonImpl(aReason);
    180 }
    181 
    182 NS_IMETHODIMP nsInputStreamPump::CancelWithReason(nsresult aStatus,
    183                                                  const nsACString& aReason) {
    184  return CancelWithReasonImpl(aStatus, aReason);
    185 }
    186 
    187 NS_IMETHODIMP
    188 nsInputStreamPump::Cancel(nsresult status) {
    189  RecursiveMutexAutoLock lock(mMutex);
    190 
    191  AssertOnThread();
    192 
    193  LOG(("nsInputStreamPump::Cancel [this=%p status=%" PRIx32 "]\n", this,
    194       static_cast<uint32_t>(status)));
    195 
    196  if (NS_FAILED(mStatus)) {
    197    LOG(("  already canceled\n"));
    198    return NS_OK;
    199  }
    200 
    201  NS_ASSERTION(NS_FAILED(status), "cancel with non-failure status code");
    202  mStatus = status;
    203 
    204  // close input stream
    205  if (mAsyncStream) {
    206    // If mSuspendCount != 0, EnsureWaiting will be called by Resume().
    207    // Note that while suspended, OnInputStreamReady will
    208    // not do anything, and also note that calling asyncWait
    209    // on a closed stream works and will dispatch an event immediately.
    210 
    211    nsCOMPtr<nsIEventTarget> currentTarget = NS_GetCurrentThread();
    212    if (mTargetThread && currentTarget != mTargetThread) {
    213      nsresult rv = mTargetThread->Dispatch(NS_NewRunnableFunction(
    214          "nsInputStreamPump::Cancel", [self = RefPtr{this}, status] {
    215            RecursiveMutexAutoLock lock(self->mMutex);
    216            if (!self->mAsyncStream) {
    217              return;
    218            }
    219            self->mAsyncStream->CloseWithStatus(status);
    220            if (self->mSuspendCount == 0) {
    221              self->EnsureWaiting();
    222            }
    223          }));
    224      NS_ENSURE_SUCCESS(rv, rv);
    225    } else {
    226      mAsyncStream->CloseWithStatus(status);
    227      if (mSuspendCount == 0) {
    228        EnsureWaiting();
    229      }
    230    }
    231  }
    232  return NS_OK;
    233 }
    234 
    235 NS_IMETHODIMP
    236 nsInputStreamPump::Suspend() {
    237  RecursiveMutexAutoLock lock(mMutex);
    238 
    239  LOG(("nsInputStreamPump::Suspend [this=%p]\n", this));
    240  NS_ENSURE_TRUE(mState != STATE_IDLE && mState != STATE_DEAD,
    241                 NS_ERROR_UNEXPECTED);
    242  ++mSuspendCount;
    243  return NS_OK;
    244 }
    245 
    246 NS_IMETHODIMP
    247 nsInputStreamPump::Resume() {
    248  RecursiveMutexAutoLock lock(mMutex);
    249 
    250  LOG(("nsInputStreamPump::Resume [this=%p]\n", this));
    251  NS_ENSURE_TRUE(mSuspendCount > 0, NS_ERROR_UNEXPECTED);
    252  NS_ENSURE_TRUE(mState != STATE_IDLE && mState != STATE_DEAD,
    253                 NS_ERROR_UNEXPECTED);
    254 
    255  // There is a brief in-between state when we null out mAsyncStream in
    256  // OnStateStop() before calling OnStopRequest, and only afterwards set
    257  // STATE_DEAD, which we need to handle gracefully.
    258  if (--mSuspendCount == 0 && mAsyncStream) {
    259    EnsureWaiting();
    260  }
    261  return NS_OK;
    262 }
    263 
    264 NS_IMETHODIMP
    265 nsInputStreamPump::GetLoadFlags(nsLoadFlags* aLoadFlags) {
    266  RecursiveMutexAutoLock lock(mMutex);
    267 
    268  *aLoadFlags = mLoadFlags;
    269  return NS_OK;
    270 }
    271 
    272 NS_IMETHODIMP
    273 nsInputStreamPump::SetLoadFlags(nsLoadFlags aLoadFlags) {
    274  RecursiveMutexAutoLock lock(mMutex);
    275 
    276  mLoadFlags = aLoadFlags;
    277  return NS_OK;
    278 }
    279 
    280 NS_IMETHODIMP
    281 nsInputStreamPump::GetTRRMode(nsIRequest::TRRMode* aTRRMode) {
    282  return GetTRRModeImpl(aTRRMode);
    283 }
    284 
    285 NS_IMETHODIMP
    286 nsInputStreamPump::SetTRRMode(nsIRequest::TRRMode aTRRMode) {
    287  return SetTRRModeImpl(aTRRMode);
    288 }
    289 
    290 NS_IMETHODIMP
    291 nsInputStreamPump::GetLoadGroup(nsILoadGroup** aLoadGroup) {
    292  RecursiveMutexAutoLock lock(mMutex);
    293 
    294  *aLoadGroup = do_AddRef(mLoadGroup).take();
    295  return NS_OK;
    296 }
    297 
    298 NS_IMETHODIMP
    299 nsInputStreamPump::SetLoadGroup(nsILoadGroup* aLoadGroup) {
    300  RecursiveMutexAutoLock lock(mMutex);
    301 
    302  mLoadGroup = aLoadGroup;
    303  return NS_OK;
    304 }
    305 
    306 //-----------------------------------------------------------------------------
    307 // nsInputStreamPump::nsIInputStreamPump implementation
    308 //-----------------------------------------------------------------------------
    309 
    310 NS_IMETHODIMP
    311 nsInputStreamPump::Init(nsIInputStream* stream, uint32_t segsize,
    312                        uint32_t segcount, bool closeWhenDone,
    313                        nsISerialEventTarget* mainThreadTarget) {
    314  // probably we can't be multithread-accessed yet
    315  RecursiveMutexAutoLock lock(mMutex);
    316  NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS);
    317 
    318  mStream = stream;
    319  mSegSize = segsize;
    320  mSegCount = segcount;
    321  mCloseWhenDone = closeWhenDone;
    322  mLabeledMainThreadTarget = mainThreadTarget;
    323  if (mOffMainThread && mLabeledMainThreadTarget) {
    324    MOZ_ASSERT(
    325        false,
    326        "Init stream pump off main thread with a main thread event target.");
    327    return NS_ERROR_FAILURE;
    328  }
    329 
    330  return NS_OK;
    331 }
    332 
    333 NS_IMETHODIMP
    334 nsInputStreamPump::Reset() {
    335  RecursiveMutexAutoLock lock(mMutex);
    336  LOG(("nsInputStreamPump::Reset [this=%p]\n", this));
    337  mListener = nullptr;
    338 
    339  if (mAsyncStream && NS_SUCCEEDED(mAsyncStream->StreamStatus())) {
    340    mAsyncStream->Close();
    341    mAsyncStream->AsyncWait(nullptr, 0, 0, nullptr);
    342  }
    343 
    344  // release the reference, input stream must be closed by the transaction
    345  mStream = nullptr;
    346 
    347  return NS_OK;
    348 }
    349 
    350 NS_IMETHODIMP
    351 nsInputStreamPump::AsyncRead(nsIStreamListener* listener) {
    352  RecursiveMutexAutoLock lock(mMutex);
    353 
    354  // This ensures only one thread can interact with a pump at a time
    355  NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS);
    356  NS_ENSURE_ARG_POINTER(listener);
    357  MOZ_ASSERT(NS_IsMainThread() || mOffMainThread,
    358             "nsInputStreamPump should be read from the "
    359             "main thread only.");
    360 
    361  nsresult rv = NS_MakeAsyncNonBlockingInputStream(
    362      mStream.forget(), getter_AddRefs(mAsyncStream), mCloseWhenDone, mSegSize,
    363      mSegCount);
    364  if (NS_WARN_IF(NS_FAILED(rv))) {
    365    return rv;
    366  }
    367 
    368  MOZ_ASSERT(mAsyncStream);
    369 
    370  // mStreamOffset now holds the number of bytes currently read.
    371  mStreamOffset = 0;
    372 
    373  // grab event queue (we must do this here by contract, since all notifications
    374  // must go to the thread which called AsyncRead)
    375  if (NS_IsMainThread() && mLabeledMainThreadTarget) {
    376    mTargetThread = mLabeledMainThreadTarget;
    377  } else {
    378    mTargetThread = mozilla::GetCurrentSerialEventTarget();
    379  }
    380  NS_ENSURE_STATE(mTargetThread);
    381 
    382  rv = EnsureWaiting();
    383  if (NS_FAILED(rv)) return rv;
    384 
    385  if (mLoadGroup) mLoadGroup->AddRequest(this, nullptr);
    386 
    387  mState = STATE_START;
    388  mListener = listener;
    389  return NS_OK;
    390 }
    391 
    392 //-----------------------------------------------------------------------------
    393 // nsInputStreamPump::nsIInputStreamCallback implementation
    394 //-----------------------------------------------------------------------------
    395 
    396 NS_IMETHODIMP
    397 nsInputStreamPump::OnInputStreamReady(nsIAsyncInputStream* stream) {
    398  LOG(("nsInputStreamPump::OnInputStreamReady [this=%p]\n", this));
    399 
    400  AUTO_PROFILER_LABEL("nsInputStreamPump::OnInputStreamReady", NETWORK);
    401 
    402  // this function has been called from a PLEvent, so we can safely call
    403  // any listener or progress sink methods directly from here.
    404 
    405  for (;;) {
    406    // There should only be one iteration of this loop happening at a time.
    407    // To prevent AsyncWait() (called during callbacks or on other threads)
    408    // from creating a parallel OnInputStreamReady(), we use:
    409    // -- a mutex; and
    410    // -- a boolean mProcessingCallbacks to detect parallel loops
    411    //    when exiting the mutex for callbacks.
    412    RecursiveMutexAutoLock lock(mMutex);
    413 
    414    // Prevent parallel execution during callbacks, while out of mutex.
    415    if (mProcessingCallbacks) {
    416      MOZ_ASSERT(!mProcessingCallbacks);
    417      break;
    418    }
    419    mProcessingCallbacks = true;
    420    if (mSuspendCount || mState == STATE_IDLE || mState == STATE_DEAD) {
    421      mWaitingForInputStreamReady = false;
    422      mProcessingCallbacks = false;
    423      break;
    424    }
    425 
    426    uint32_t nextState;
    427    switch (mState) {
    428      case STATE_START:
    429        nextState = OnStateStart();
    430        break;
    431      case STATE_TRANSFER:
    432        nextState = OnStateTransfer();
    433        break;
    434      case STATE_STOP:
    435        mRetargeting = false;
    436        nextState = OnStateStop();
    437        break;
    438      default:
    439        nextState = 0;
    440        MOZ_ASSERT_UNREACHABLE("Unknown enum value.");
    441        return NS_ERROR_UNEXPECTED;
    442    }
    443 
    444    bool stillTransferring =
    445        (mState == STATE_TRANSFER && nextState == STATE_TRANSFER);
    446    if (stillTransferring) {
    447      NS_ASSERTION(NS_SUCCEEDED(mStatus),
    448                   "Should not have failed status for ongoing transfer");
    449    } else {
    450      NS_ASSERTION(mState != nextState,
    451                   "Only OnStateTransfer can be called more than once.");
    452    }
    453    if (mRetargeting) {
    454      NS_ASSERTION(mState != STATE_STOP,
    455                   "Retargeting should not happen during OnStateStop.");
    456    }
    457 
    458    // Set mRetargeting so EnsureWaiting will be called. It ensures that
    459    // OnStateStop is called on the main thread.
    460    if (nextState == STATE_STOP && !NS_IsMainThread() && !mOffMainThread) {
    461      mRetargeting = true;
    462    }
    463 
    464    // Unset mProcessingCallbacks here (while we have lock) so our own call to
    465    // EnsureWaiting isn't blocked by it.
    466    mProcessingCallbacks = false;
    467 
    468    // We must break the loop if suspended during one of the previous
    469    // operation.
    470    if (mSuspendCount) {
    471      mState = nextState;
    472      mWaitingForInputStreamReady = false;
    473      break;
    474    }
    475 
    476    // Wait asynchronously if there is still data to transfer, or we're
    477    // switching event delivery to another thread.
    478    if (stillTransferring || mRetargeting) {
    479      mState = nextState;
    480      mWaitingForInputStreamReady = false;
    481      nsresult rv = EnsureWaiting();
    482      if (NS_SUCCEEDED(rv)) break;
    483 
    484      // Failure to start asynchronous wait: stop transfer.
    485      // Do not set mStatus if it was previously set to report a failure.
    486      if (NS_SUCCEEDED(mStatus)) {
    487        mStatus = rv;
    488      }
    489      nextState = STATE_STOP;
    490    }
    491 
    492    mState = nextState;
    493  }
    494  return NS_OK;
    495 }
    496 
    497 uint32_t nsInputStreamPump::OnStateStart() MOZ_REQUIRES(mMutex) {
    498  mMutex.AssertCurrentThreadIn();
    499 
    500  AUTO_PROFILER_LABEL("nsInputStreamPump::OnStateStart", NETWORK);
    501 
    502  LOG(("  OnStateStart [this=%p]\n", this));
    503 
    504  nsresult rv;
    505 
    506  // need to check the reason why the stream is ready.  this is required
    507  // so our listener can check our status from OnStartRequest.
    508  // XXX async streams should have a GetStatus method!
    509  if (NS_SUCCEEDED(mStatus)) {
    510    uint64_t avail;
    511    rv = mAsyncStream->Available(&avail);
    512    if (NS_FAILED(rv) && rv != NS_BASE_STREAM_CLOSED) mStatus = rv;
    513  }
    514 
    515  {
    516    nsCOMPtr<nsIStreamListener> listener = mListener;
    517    if (!listener) {
    518      return STATE_DEAD;
    519    }
    520    // We're on the writing thread
    521    AssertOnThread();
    522 
    523    // Note: Must exit mutex for call to OnStartRequest to avoid
    524    // deadlocks when calls to RetargetDeliveryTo for multiple
    525    // nsInputStreamPumps are needed (e.g. nsHttpChannel).
    526    RecursiveMutexAutoUnlock unlock(mMutex);
    527    rv = listener->OnStartRequest(this);
    528  }
    529 
    530  // an error returned from OnStartRequest should cause us to abort; however,
    531  // we must not stomp on mStatus if already canceled.
    532  if (NS_FAILED(rv) && NS_SUCCEEDED(mStatus)) mStatus = rv;
    533 
    534  return NS_SUCCEEDED(mStatus) ? STATE_TRANSFER : STATE_STOP;
    535 }
    536 
    537 uint32_t nsInputStreamPump::OnStateTransfer() MOZ_REQUIRES(mMutex) {
    538  mMutex.AssertCurrentThreadIn();
    539 
    540  AUTO_PROFILER_LABEL("nsInputStreamPump::OnStateTransfer", NETWORK);
    541 
    542  LOG(("  OnStateTransfer [this=%p]\n", this));
    543 
    544  // if canceled, go directly to STATE_STOP...
    545  if (NS_FAILED(mStatus)) return STATE_STOP;
    546 
    547  nsresult rv = CreateBufferedStreamIfNeeded();
    548  if (NS_WARN_IF(NS_FAILED(rv))) {
    549    return STATE_STOP;
    550  }
    551 
    552  uint64_t avail;
    553  rv = mAsyncStream->Available(&avail);
    554  LOG(("  Available returned [stream=%p rv=%" PRIx32 " avail=%" PRIu64 "]\n",
    555       mAsyncStream.get(), static_cast<uint32_t>(rv), avail));
    556 
    557  if (rv == NS_BASE_STREAM_CLOSED) {
    558    rv = NS_OK;
    559    avail = 0;
    560  } else if (NS_SUCCEEDED(rv) && avail) {
    561    // we used to limit avail to 16K - we were afraid some ODA handlers
    562    // might assume they wouldn't get more than 16K at once
    563    // we're removing that limit since it speeds up local file access.
    564    // Now there's an implicit 64K limit of 4 16K segments
    565    // NOTE: ok, so the story is as follows.  OnDataAvailable impls
    566    //       are by contract supposed to consume exactly |avail| bytes.
    567    //       however, many do not... mailnews... stream converters...
    568    //       cough, cough.  the input stream pump is fairly tolerant
    569    //       in this regard; however, if an ODA does not consume any
    570    //       data from the stream, then we could potentially end up in
    571    //       an infinite loop.  we do our best here to try to catch
    572    //       such an error.  (see bug 189672)
    573 
    574    // in most cases this QI will succeed (mAsyncStream is almost always
    575    // a nsPipeInputStream, which implements nsITellableStream::Tell).
    576    int64_t offsetBefore;
    577    nsCOMPtr<nsITellableStream> tellable = do_QueryInterface(mAsyncStream);
    578    if (tellable && NS_FAILED(tellable->Tell(&offsetBefore))) {
    579      MOZ_ASSERT_UNREACHABLE("Tell failed on readable stream");
    580      offsetBefore = 0;
    581    }
    582 
    583    uint32_t odaAvail = avail > UINT32_MAX ? UINT32_MAX : uint32_t(avail);
    584 
    585    LOG(("  calling OnDataAvailable [offset=%" PRIu64 " count=%" PRIu64
    586         "(%u)]\n",
    587         mStreamOffset, avail, odaAvail));
    588 
    589    {
    590      // We may be called on non-MainThread even if mOffMainThread is
    591      // false, due to RetargetDeliveryTo(), so don't use AssertOnThread()
    592      if (mTargetThread) {
    593        MOZ_ASSERT(mTargetThread->IsOnCurrentThread());
    594      } else {
    595        MOZ_ASSERT(NS_IsMainThread());
    596      }
    597 
    598      nsCOMPtr<nsIStreamListener> listener = mListener;
    599      if (!listener) {
    600        return STATE_DEAD;
    601      }
    602      // Note: Must exit mutex for call to OnStartRequest to avoid
    603      // deadlocks when calls to RetargetDeliveryTo for multiple
    604      // nsInputStreamPumps are needed (e.g. nsHttpChannel).
    605      RecursiveMutexAutoUnlock unlock(mMutex);
    606      // We're on the writing thread for mListener and mAsyncStream.
    607      // mStreamOffset is only touched in OnStateTransfer, and AsyncRead
    608      // shouldn't be called during OnDataAvailable()
    609 
    610      MOZ_PUSH_IGNORE_THREAD_SAFETY
    611      rv = listener->OnDataAvailable(this, mAsyncStream, mStreamOffset,
    612                                     odaAvail);
    613      MOZ_POP_THREAD_SAFETY
    614    }
    615 
    616    // don't enter this code if ODA failed or called Cancel
    617    if (NS_SUCCEEDED(rv) && NS_SUCCEEDED(mStatus)) {
    618      // test to see if this ODA failed to consume data
    619      if (tellable) {
    620        // NOTE: if Tell fails, which can happen if the stream is
    621        // now closed, then we assume that everything was read.
    622        int64_t offsetAfter;
    623        if (NS_FAILED(tellable->Tell(&offsetAfter))) {
    624          offsetAfter = offsetBefore + odaAvail;
    625        }
    626        if (offsetAfter > offsetBefore) {
    627          mStreamOffset += (offsetAfter - offsetBefore);
    628        } else if (mSuspendCount == 0) {
    629          //
    630          // possible infinite loop if we continue pumping data!
    631          //
    632          // NOTE: although not allowed by nsIStreamListener, we
    633          // will allow the ODA impl to Suspend the pump.  IMAP
    634          // does this :-(
    635          //
    636          NS_ERROR("OnDataAvailable implementation consumed no data");
    637          mStatus = NS_ERROR_UNEXPECTED;
    638        }
    639      } else {
    640        mStreamOffset += odaAvail;  // assume ODA behaved well
    641      }
    642    }
    643  }
    644 
    645  // an error returned from Available or OnDataAvailable should cause us to
    646  // abort; however, we must not stop on mStatus if already canceled.
    647 
    648  if (NS_SUCCEEDED(mStatus)) {
    649    if (NS_FAILED(rv)) {
    650      mStatus = rv;
    651    } else if (avail) {
    652      // if stream is now closed, advance to STATE_STOP right away.
    653      // Available may return 0 bytes available at the moment; that
    654      // would not mean that we are done.
    655      // XXX async streams should have a GetStatus method!
    656      rv = mAsyncStream->Available(&avail);
    657      if (NS_SUCCEEDED(rv)) return STATE_TRANSFER;
    658      if (rv != NS_BASE_STREAM_CLOSED) mStatus = rv;
    659    }
    660  }
    661  return STATE_STOP;
    662 }
    663 
    664 nsresult nsInputStreamPump::CallOnStateStop() {
    665  RecursiveMutexAutoLock lock(mMutex);
    666 
    667  MOZ_ASSERT(NS_IsMainThread(),
    668             "CallOnStateStop should only be called on the main thread.");
    669 
    670  mState = OnStateStop();
    671  return NS_OK;
    672 }
    673 
    674 uint32_t nsInputStreamPump::OnStateStop() MOZ_REQUIRES(mMutex) {
    675  mMutex.AssertCurrentThreadIn();
    676 
    677  if (!NS_IsMainThread() && !mOffMainThread) {
    678    // This method can be called on a different thread if nsInputStreamPump
    679    // is used off the main-thread.
    680    if (NS_SUCCEEDED(mStatus) && mListener &&
    681        mozilla::StaticPrefs::network_send_OnDataFinished_nsInputStreamPump()) {
    682      nsCOMPtr<nsIThreadRetargetableStreamListener> retargetableListener =
    683          do_QueryInterface(mListener);
    684      if (retargetableListener) {
    685        retargetableListener->OnDataFinished(mStatus);
    686      }
    687    }
    688    nsresult rv = mLabeledMainThreadTarget->Dispatch(
    689        mozilla::NewRunnableMethod("nsInputStreamPump::CallOnStateStop", this,
    690                                   &nsInputStreamPump::CallOnStateStop));
    691    NS_ENSURE_SUCCESS(rv, STATE_DEAD);
    692    return STATE_DEAD;
    693  }
    694 
    695  AUTO_PROFILER_LABEL("nsInputStreamPump::OnStateStop", NETWORK);
    696 
    697  LOG(("  OnStateStop [this=%p status=%" PRIx32 "]\n", this,
    698       static_cast<uint32_t>(mStatus)));
    699 
    700  // if an error occurred, we must be sure to pass the error onto the async
    701  // stream.  in some cases, this is redundant, but since close is idempotent,
    702  // this is OK.  otherwise, be sure to honor the "close-when-done" option.
    703 
    704  if (!mAsyncStream) {
    705    MOZ_ASSERT(mAsyncStream, "null mAsyncStream: OnStateStop called twice?");
    706    return STATE_DEAD;
    707  }
    708 
    709  if (NS_FAILED(mStatus)) {
    710    mAsyncStream->CloseWithStatus(mStatus);
    711  } else if (mCloseWhenDone) {
    712    mAsyncStream->Close();
    713  }
    714 
    715  mAsyncStream = nullptr;
    716  mIsPending = false;
    717  {
    718    // We're on the writing thread.
    719    // We believe that mStatus can't be changed on us here.
    720    AssertOnThread();
    721 
    722    nsCOMPtr<nsIStreamListener> listener = mListener;
    723    nsresult status = mStatus;
    724    // Note: Must exit mutex for call to OnStartRequest to avoid
    725    // deadlocks when calls to RetargetDeliveryTo for multiple
    726    // nsInputStreamPumps are needed (e.g. nsHttpChannel).
    727    RecursiveMutexAutoUnlock unlock(mMutex);
    728 
    729    listener->OnStopRequest(this, status);
    730  }
    731  mTargetThread = nullptr;
    732  mListener = nullptr;
    733 
    734  if (mLoadGroup) mLoadGroup->RemoveRequest(this, nullptr, mStatus);
    735 
    736  return STATE_DEAD;
    737 }
    738 
    739 nsresult nsInputStreamPump::CreateBufferedStreamIfNeeded() {
    740  if (mAsyncStreamIsBuffered) {
    741    return NS_OK;
    742  }
    743 
    744  // ReadSegments is not available for any nsIAsyncInputStream. In order to use
    745  // it, we wrap a nsIBufferedInputStream around it, if needed.
    746 
    747  if (NS_InputStreamIsBuffered(mAsyncStream)) {
    748    mAsyncStreamIsBuffered = true;
    749    return NS_OK;
    750  }
    751 
    752  nsCOMPtr<nsIInputStream> stream;
    753  nsresult rv = NS_NewBufferedInputStream(getter_AddRefs(stream),
    754                                          mAsyncStream.forget(), 4096);
    755  NS_ENSURE_SUCCESS(rv, rv);
    756 
    757  // A buffered inputStream must implement nsIAsyncInputStream.
    758  mAsyncStream = do_QueryInterface(stream);
    759  MOZ_DIAGNOSTIC_ASSERT(mAsyncStream);
    760  mAsyncStreamIsBuffered = true;
    761 
    762  return NS_OK;
    763 }
    764 
    765 //-----------------------------------------------------------------------------
    766 // nsIThreadRetargetableRequest
    767 //-----------------------------------------------------------------------------
    768 
    769 NS_IMETHODIMP
    770 nsInputStreamPump::RetargetDeliveryTo(nsISerialEventTarget* aNewTarget) {
    771  RecursiveMutexAutoLock lock(mMutex);
    772 
    773  NS_ENSURE_ARG(aNewTarget);
    774  NS_ENSURE_TRUE(mState == STATE_START || mState == STATE_TRANSFER,
    775                 NS_ERROR_UNEXPECTED);
    776 
    777  // If canceled, do not retarget. Return with canceled status.
    778  if (NS_FAILED(mStatus)) {
    779    return mStatus;
    780  }
    781 
    782  if (aNewTarget == mTargetThread) {
    783    NS_WARNING("Retargeting delivery to same thread");
    784    return NS_OK;
    785  }
    786 
    787  if (mOffMainThread) {
    788    // Don't support retargeting if this pump is already used off the main
    789    // thread.
    790    return NS_ERROR_FAILURE;
    791  }
    792 
    793  // Ensure that |mListener| and any subsequent listeners can be retargeted
    794  // to another thread.
    795  nsresult rv = NS_OK;
    796  nsCOMPtr<nsIThreadRetargetableStreamListener> retargetableListener =
    797      do_QueryInterface(mListener, &rv);
    798  if (NS_SUCCEEDED(rv) && retargetableListener) {
    799    rv = retargetableListener->CheckListenerChain();
    800    if (NS_SUCCEEDED(rv)) {
    801      mTargetThread = aNewTarget;
    802      mRetargeting = true;
    803    }
    804  }
    805  LOG(
    806      ("nsInputStreamPump::RetargetDeliveryTo [this=%p aNewTarget=%p] "
    807       "%s listener [%p] rv[%" PRIx32 "]",
    808       this, aNewTarget, (mTargetThread == aNewTarget ? "success" : "failure"),
    809       (nsIStreamListener*)mListener, static_cast<uint32_t>(rv)));
    810  return rv;
    811 }
    812 
    813 NS_IMETHODIMP
    814 nsInputStreamPump::GetDeliveryTarget(nsISerialEventTarget** aNewTarget) {
    815  RecursiveMutexAutoLock lock(mMutex);
    816 
    817  nsCOMPtr<nsISerialEventTarget> target = mTargetThread;
    818  target.forget(aNewTarget);
    819  return NS_OK;
    820 }