tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

FetchStreamReader.cpp (15395B)


      1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
      2 /* vim: set ts=8 sts=2 et sw=2 tw=80: */
      3 /* This Source Code Form is subject to the terms of the Mozilla Public
      4 * License, v. 2.0. If a copy of the MPL was not distributed with this
      5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      6 
      7 #include "FetchStreamReader.h"
      8 
      9 #include "InternalResponse.h"
     10 #include "jsapi.h"
     11 #include "mozilla/ConsoleReportCollector.h"
     12 #include "mozilla/ErrorResult.h"
     13 #include "mozilla/HoldDropJSObjects.h"
     14 #include "mozilla/StaticAnalysisFunctions.h"
     15 #include "mozilla/dom/AutoEntryScript.h"
     16 #include "mozilla/dom/Promise.h"
     17 #include "mozilla/dom/PromiseBinding.h"
     18 #include "mozilla/dom/ReadableStream.h"
     19 #include "mozilla/dom/ReadableStreamDefaultController.h"
     20 #include "mozilla/dom/ReadableStreamDefaultReader.h"
     21 #include "mozilla/dom/WorkerPrivate.h"
     22 #include "mozilla/dom/WorkerRef.h"
     23 #include "nsContentUtils.h"
     24 #include "nsDebug.h"
     25 #include "nsIAsyncInputStream.h"
     26 #include "nsIPipe.h"
     27 #include "nsIScriptError.h"
     28 #include "nsPIDOMWindow.h"
     29 
     30 namespace mozilla::dom {
     31 
     32 NS_IMPL_ISUPPORTS(OutputStreamHolder, nsIOutputStreamCallback)
     33 
     34 OutputStreamHolder::OutputStreamHolder(FetchStreamReader* aReader,
     35                                       nsIAsyncOutputStream* aOutput)
     36    : mReader(aReader), mOutput(aOutput) {}
     37 
     38 nsresult OutputStreamHolder::Init(JSContext* aCx) {
     39  if (NS_IsMainThread()) {
     40    return NS_OK;
     41  }
     42 
     43  // We're in a worker
     44  WorkerPrivate* workerPrivate = GetWorkerPrivateFromContext(aCx);
     45  MOZ_ASSERT(workerPrivate);
     46 
     47  workerPrivate->AssertIsOnWorkerThread();
     48 
     49  // Note, this will create a ref-cycle between the holder and the stream.
     50  // The cycle is broken when the stream is closed or the worker begins
     51  // shutting down.
     52  mWorkerRef =
     53      StrongWorkerRef::Create(workerPrivate, "OutputStreamHolder",
     54                              [self = RefPtr{this}]() { self->Shutdown(); });
     55  if (NS_WARN_IF(!mWorkerRef)) {
     56    return NS_ERROR_FAILURE;
     57  }
     58  return NS_OK;
     59 }
     60 
     61 OutputStreamHolder::~OutputStreamHolder() = default;
     62 
     63 void OutputStreamHolder::Shutdown() {
     64  if (mOutput) {
     65    mOutput->Close();
     66  }
     67  // If we have an AsyncWait running, we'll get a callback and clear
     68  // the mAsyncWaitWorkerRef
     69  mWorkerRef = nullptr;
     70 }
     71 
     72 nsresult OutputStreamHolder::AsyncWait(uint32_t aFlags,
     73                                       uint32_t aRequestedCount,
     74                                       nsIEventTarget* aEventTarget) {
     75  mAsyncWaitWorkerRef = mWorkerRef;
     76  // Grab the strong reference for the reader but only when we are waiting for
     77  // the output stream, because it means we still have things to write.
     78  // (WAIT_CLOSURE_ONLY happens when waiting for ReadableStream to respond, at
     79  // which point the pull callback should get an indirect strong reference via
     80  // the controller argument.)
     81  mAsyncWaitReader =
     82      aFlags == nsIAsyncOutputStream::WAIT_CLOSURE_ONLY ? nullptr : mReader;
     83  nsresult rv = mOutput->AsyncWait(this, aFlags, aRequestedCount, aEventTarget);
     84  if (NS_WARN_IF(NS_FAILED(rv))) {
     85    mAsyncWaitWorkerRef = nullptr;
     86    mAsyncWaitReader = nullptr;
     87  }
     88  return rv;
     89 }
     90 
     91 NS_IMETHODIMP OutputStreamHolder::OnOutputStreamReady(
     92    nsIAsyncOutputStream* aStream) {
     93  // We may get called back after ::Shutdown()
     94  if (!mReader) {
     95    mAsyncWaitWorkerRef = nullptr;
     96    MOZ_ASSERT(!mAsyncWaitReader);
     97    return NS_OK;
     98  }
     99 
    100  // mAsyncWaitReader may be reset during OnOutputStreamReady, make sure to let
    101  // it live during the call
    102  RefPtr<FetchStreamReader> reader = mReader.get();
    103  if (!reader->OnOutputStreamReady()) {
    104    mAsyncWaitWorkerRef = nullptr;
    105    mAsyncWaitReader = nullptr;
    106    return NS_OK;
    107  }
    108  return NS_OK;
    109 }
    110 
    111 NS_IMPL_CYCLE_COLLECTING_ADDREF(FetchStreamReader)
    112 NS_IMPL_CYCLE_COLLECTING_RELEASE(FetchStreamReader)
    113 
    114 NS_IMPL_CYCLE_COLLECTION_WEAK_PTR(FetchStreamReader, mGlobal, mReader)
    115 
    116 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(FetchStreamReader)
    117  NS_INTERFACE_MAP_ENTRY(nsISupports)
    118 NS_INTERFACE_MAP_END
    119 
    120 /* static */
    121 nsresult FetchStreamReader::Create(JSContext* aCx, nsIGlobalObject* aGlobal,
    122                                   FetchStreamReader** aStreamReader,
    123                                   nsIInputStream** aInputStream) {
    124  MOZ_ASSERT(aCx);
    125  MOZ_ASSERT(aGlobal);
    126  MOZ_ASSERT(aStreamReader);
    127  MOZ_ASSERT(aInputStream);
    128 
    129  RefPtr<FetchStreamReader> streamReader = new FetchStreamReader(aGlobal);
    130 
    131  nsCOMPtr<nsIAsyncInputStream> pipeIn;
    132  nsCOMPtr<nsIAsyncOutputStream> pipeOut;
    133 
    134  NS_NewPipe2(getter_AddRefs(pipeIn), getter_AddRefs(pipeOut), true, true, 0,
    135              0);
    136 
    137  streamReader->mOutput = new OutputStreamHolder(streamReader, pipeOut);
    138 
    139  pipeIn.forget(aInputStream);
    140  streamReader.forget(aStreamReader);
    141  return NS_OK;
    142 }
    143 
    144 FetchStreamReader::FetchStreamReader(nsIGlobalObject* aGlobal)
    145    : mGlobal(aGlobal), mOwningEventTarget(mGlobal->SerialEventTarget()) {
    146  MOZ_ASSERT(aGlobal);
    147 }
    148 
    149 FetchStreamReader::~FetchStreamReader() {
    150  CloseAndRelease(nullptr, NS_BASE_STREAM_CLOSED);
    151 }
    152 
    153 // If a context is provided, an attempt will be made to cancel the reader.  The
    154 // only situation where we don't expect to have a context is when closure is
    155 // being triggered from the destructor or the WorkerRef is notifying.  If
    156 // we're at the destructor, it's far too late to cancel anything.  And if the
    157 // WorkerRef is being notified, the global is going away, so there's also
    158 // no need to do further JS work.
    159 void FetchStreamReader::CloseAndRelease(JSContext* aCx, nsresult aStatus) {
    160  NS_ASSERT_OWNINGTHREAD(FetchStreamReader);
    161 
    162  if (mStreamClosed) {
    163    // Already closed.
    164    return;
    165  }
    166 
    167  RefPtr<FetchStreamReader> kungFuDeathGrip = this;
    168  if (aCx && mReader) {
    169    ErrorResult rv;
    170    if (aStatus == NS_ERROR_DOM_WRONG_TYPE_ERR) {
    171      rv.ThrowTypeError<MSG_FETCH_BODY_WRONG_TYPE>();
    172    } else {
    173      rv = aStatus;
    174    }
    175    JS::Rooted<JS::Value> errorValue(aCx);
    176    if (ToJSValue(aCx, std::move(rv), &errorValue)) {
    177      IgnoredErrorResult ignoredError;
    178      // It's currently safe to cancel an already closed reader because, per the
    179      // comments in ReadableStream::cancel() conveying the spec, step 2 of
    180      // 3.4.3 that specified ReadableStreamCancel is: If stream.[[state]] is
    181      // "closed", return a new promise resolved with undefined.
    182      RefPtr<Promise> cancelResultPromise =
    183          MOZ_KnownLive(mReader)->Cancel(aCx, errorValue, ignoredError);
    184      NS_WARNING_ASSERTION(!ignoredError.Failed(),
    185                           "Failed to cancel stream during close and release");
    186      if (cancelResultPromise) {
    187        bool setHandled = cancelResultPromise->SetAnyPromiseIsHandled();
    188        NS_WARNING_ASSERTION(setHandled,
    189                             "Failed to mark cancel promise as handled.");
    190        (void)setHandled;
    191      }
    192    }
    193 
    194    // We don't want to propagate exceptions during the cleanup.
    195    JS_ClearPendingException(aCx);
    196  }
    197 
    198  mStreamClosed = true;
    199 
    200  mGlobal = nullptr;
    201 
    202  if (mOutput) {
    203    mOutput->CloseWithStatus(aStatus);
    204    mOutput->Shutdown();
    205    mOutput = nullptr;
    206  }
    207 
    208  mReader = nullptr;
    209  mBuffer.Clear();
    210 }
    211 
    212 // https://fetch.spec.whatwg.org/#body-incrementally-read
    213 void FetchStreamReader::StartConsuming(JSContext* aCx, ReadableStream* aStream,
    214                                       ErrorResult& aRv) {
    215  MOZ_DIAGNOSTIC_ASSERT(!mReader);
    216  MOZ_DIAGNOSTIC_ASSERT(aStream);
    217  MOZ_ASSERT(!aStream->MaybeGetInputStreamIfUnread(),
    218             "FetchStreamReader is for JS streams but we got a stream based on "
    219             "nsIInputStream here. Extract nsIInputStream and read it instead "
    220             "to reduce overhead.");
    221 
    222  aRv = mOutput->Init(aCx);
    223  if (aRv.Failed()) {
    224    CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR);
    225    return;
    226  }
    227 
    228  // Step 2: Let reader be the result of getting a reader for body’s stream.
    229  RefPtr<ReadableStreamDefaultReader> reader = aStream->GetReader(aRv);
    230  if (aRv.Failed()) {
    231    CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR);
    232    return;
    233  }
    234 
    235  mReader = reader;
    236 
    237  aRv = mOutput->AsyncWait(0, 0, mOwningEventTarget);
    238  if (NS_WARN_IF(aRv.Failed())) {
    239    CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR);
    240  }
    241 }
    242 
    243 struct FetchReadRequest : public ReadRequest {
    244 public:
    245  NS_DECL_ISUPPORTS_INHERITED
    246  NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(FetchReadRequest, ReadRequest)
    247 
    248  explicit FetchReadRequest(FetchStreamReader* aReader)
    249      : mFetchStreamReader(aReader) {}
    250 
    251  MOZ_CAN_RUN_SCRIPT_BOUNDARY
    252  void ChunkSteps(JSContext* aCx, JS::Handle<JS::Value> aChunk,
    253                  ErrorResult& aRv) override {
    254    mFetchStreamReader->ChunkSteps(aCx, aChunk, aRv);
    255  }
    256 
    257  MOZ_CAN_RUN_SCRIPT_BOUNDARY
    258  void CloseSteps(JSContext* aCx, ErrorResult& aRv) override {
    259    mFetchStreamReader->CloseSteps(aCx, aRv);
    260  }
    261 
    262  MOZ_CAN_RUN_SCRIPT_BOUNDARY
    263  void ErrorSteps(JSContext* aCx, JS::Handle<JS::Value> aError,
    264                  ErrorResult& aRv) override {
    265    mFetchStreamReader->ErrorSteps(aCx, aError, aRv);
    266  }
    267 
    268 protected:
    269  virtual ~FetchReadRequest() = default;
    270 
    271  MOZ_KNOWN_LIVE RefPtr<FetchStreamReader> mFetchStreamReader;
    272 };
    273 
    274 NS_IMPL_CYCLE_COLLECTION_INHERITED(FetchReadRequest, ReadRequest,
    275                                   mFetchStreamReader)
    276 NS_IMPL_ADDREF_INHERITED(FetchReadRequest, ReadRequest)
    277 NS_IMPL_RELEASE_INHERITED(FetchReadRequest, ReadRequest)
    278 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(FetchReadRequest)
    279 NS_INTERFACE_MAP_END_INHERITING(ReadRequest)
    280 
    281 // nsIOutputStreamCallback interface
    282 MOZ_CAN_RUN_SCRIPT_BOUNDARY
    283 bool FetchStreamReader::OnOutputStreamReady() {
    284  NS_ASSERT_OWNINGTHREAD(FetchStreamReader);
    285  if (mStreamClosed) {
    286    return false;
    287  }
    288 
    289  AutoEntryScript aes(mGlobal, "ReadableStreamReader.read");
    290  return Process(aes.cx());
    291 }
    292 
    293 bool FetchStreamReader::Process(JSContext* aCx) {
    294  NS_ASSERT_OWNINGTHREAD(FetchStreamReader);
    295  MOZ_ASSERT(mReader);
    296 
    297  if (!mBuffer.IsEmpty()) {
    298    nsresult rv = WriteBuffer();
    299    if (NS_WARN_IF(NS_FAILED(rv))) {
    300      CloseAndRelease(aCx, NS_ERROR_DOM_ABORT_ERR);
    301      return false;
    302    }
    303    return true;
    304  }
    305 
    306  // Check if the output stream has already been closed. This lets us propagate
    307  // errors eagerly, and detect output stream closures even when we have no data
    308  // to write.
    309  if (NS_WARN_IF(NS_FAILED(mOutput->StreamStatus()))) {
    310    CloseAndRelease(aCx, NS_ERROR_DOM_ABORT_ERR);
    311    return false;
    312  }
    313 
    314  // We're waiting on new data - set up a WAIT_CLOSURE_ONLY callback so we
    315  // notice if the reader closes.
    316  nsresult rv = mOutput->AsyncWait(nsIAsyncOutputStream::WAIT_CLOSURE_ONLY, 0,
    317                                   mOwningEventTarget);
    318  if (NS_WARN_IF(NS_FAILED(rv))) {
    319    CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR);
    320    return false;
    321  }
    322 
    323  // If we already have an outstanding read request, don't start another one
    324  // concurrently.
    325  if (!mHasOutstandingReadRequest) {
    326    // https://fetch.spec.whatwg.org/#incrementally-read-loop
    327    // The below very loosely tries to implement the incrementally-read-loop
    328    // from the fetch spec.
    329    // Step 2: Read a chunk from reader given readRequest.
    330    RefPtr<ReadRequest> readRequest = new FetchReadRequest(this);
    331    RefPtr<ReadableStreamDefaultReader> reader = mReader;
    332    mHasOutstandingReadRequest = true;
    333 
    334    IgnoredErrorResult err;
    335    reader->ReadChunk(aCx, *readRequest, err);
    336    if (NS_WARN_IF(err.Failed())) {
    337      // Let's close the stream.
    338      mHasOutstandingReadRequest = false;
    339      CloseAndRelease(aCx, NS_ERROR_DOM_INVALID_STATE_ERR);
    340      // Don't return false, as we've already called `AsyncWait`.
    341    }
    342  }
    343  return true;
    344 }
    345 
    346 void FetchStreamReader::ChunkSteps(JSContext* aCx, JS::Handle<JS::Value> aChunk,
    347                                   ErrorResult& aRv) {
    348  // This roughly implements the chunk steps from
    349  // https://fetch.spec.whatwg.org/#incrementally-read-loop.
    350 
    351  mHasOutstandingReadRequest = false;
    352 
    353  // Step 2. If chunk is not a Uint8Array object, then set continueAlgorithm to
    354  // this step: run processBodyError given a TypeError.
    355  RootedSpiderMonkeyInterface<Uint8Array> chunk(aCx);
    356  if (!aChunk.isObject() || !chunk.Init(&aChunk.toObject())) {
    357    CloseAndRelease(aCx, NS_ERROR_DOM_WRONG_TYPE_ERR);
    358    return;
    359  }
    360 
    361  MOZ_DIAGNOSTIC_ASSERT(mBuffer.IsEmpty());
    362 
    363  // Let's take a copy of the data.
    364  // FIXME: We could sometimes avoid this copy by trying to write `chunk`
    365  // directly into `mPipeOut` eagerly, and only filling `mBuffer` if there isn't
    366  // enough space in the pipe's buffer.
    367  if (!chunk.AppendDataTo(mBuffer)) {
    368    CloseAndRelease(aCx, NS_ERROR_OUT_OF_MEMORY);
    369    return;
    370  }
    371 
    372  mBufferOffset = 0;
    373  mBufferRemaining = mBuffer.Length();
    374 
    375  nsresult rv = WriteBuffer();
    376  if (NS_WARN_IF(NS_FAILED(rv))) {
    377    CloseAndRelease(aCx, NS_ERROR_DOM_ABORT_ERR);
    378  }
    379 }
    380 
    381 void FetchStreamReader::CloseSteps(JSContext* aCx, ErrorResult& aRv) {
    382  mHasOutstandingReadRequest = false;
    383  CloseAndRelease(aCx, NS_BASE_STREAM_CLOSED);
    384 }
    385 
    386 void FetchStreamReader::ErrorSteps(JSContext* aCx, JS::Handle<JS::Value> aError,
    387                                   ErrorResult& aRv) {
    388  mHasOutstandingReadRequest = false;
    389  ReportErrorToConsole(aCx, aError);
    390  CloseAndRelease(aCx, NS_ERROR_FAILURE);
    391 }
    392 
    393 nsresult FetchStreamReader::WriteBuffer() {
    394  MOZ_ASSERT(mBuffer.Length() == (mBufferOffset + mBufferRemaining));
    395 
    396  char* data = reinterpret_cast<char*>(mBuffer.Elements());
    397 
    398  while (mBufferRemaining > 0) {
    399    uint32_t written = 0;
    400    nsresult rv =
    401        mOutput->Write(data + mBufferOffset, mBufferRemaining, &written);
    402 
    403    if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
    404      break;
    405    }
    406 
    407    if (NS_WARN_IF(NS_FAILED(rv))) {
    408      return rv;
    409    }
    410 
    411    MOZ_ASSERT(written <= mBufferRemaining);
    412    mBufferRemaining -= written;
    413    mBufferOffset += written;
    414 
    415    if (mBufferRemaining == 0) {
    416      mBuffer.Clear();
    417      break;
    418    }
    419  }
    420 
    421  nsresult rv = mOutput->AsyncWait(0, 0, mOwningEventTarget);
    422  if (NS_WARN_IF(NS_FAILED(rv))) {
    423    return rv;
    424  }
    425 
    426  return NS_OK;
    427 }
    428 
    429 void FetchStreamReader::ReportErrorToConsole(JSContext* aCx,
    430                                             JS::Handle<JS::Value> aValue) {
    431  nsCString sourceSpec;
    432  uint32_t line = 0;
    433  uint32_t column = 0;
    434  nsString valueString;
    435 
    436  nsContentUtils::ExtractErrorValues(aCx, aValue, sourceSpec, &line, &column,
    437                                     valueString);
    438 
    439  nsTArray<nsString> params;
    440  params.AppendElement(valueString);
    441 
    442  RefPtr<ConsoleReportCollector> reporter = new ConsoleReportCollector();
    443  reporter->AddConsoleReport(nsIScriptError::errorFlag,
    444                             "ReadableStreamReader.read"_ns,
    445                             nsContentUtils::eDOM_PROPERTIES, sourceSpec, line,
    446                             column, "ReadableStreamReadingFailed"_ns, params);
    447 
    448  uint64_t innerWindowId = 0;
    449 
    450  if (NS_IsMainThread()) {
    451    nsCOMPtr<nsPIDOMWindowInner> window = do_QueryInterface(mGlobal);
    452    if (window) {
    453      innerWindowId = window->WindowID();
    454    }
    455    reporter->FlushReportsToConsole(innerWindowId);
    456    return;
    457  }
    458 
    459  WorkerPrivate* workerPrivate = GetWorkerPrivateFromContext(aCx);
    460  if (workerPrivate) {
    461    innerWindowId = workerPrivate->WindowID();
    462  }
    463 
    464  RefPtr<Runnable> r = NS_NewRunnableFunction(
    465      "FetchStreamReader::ReportErrorToConsole", [reporter, innerWindowId]() {
    466        reporter->FlushReportsToConsole(innerWindowId);
    467      });
    468 
    469  workerPrivate->DispatchToMainThread(r.forget());
    470 }
    471 
    472 }  // namespace mozilla::dom