tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

UnderlyingSourceCallbackHelpers.cpp (21682B)


      1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
      2 /* vim:set ts=2 sw=2 sts=2 et cindent: */
      3 /* This Source Code Form is subject to the terms of the Mozilla Public
      4 * License, v. 2.0. If a copy of the MPL was not distributed with this
      5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      6 
      7 #include "mozilla/dom/UnderlyingSourceCallbackHelpers.h"
      8 
      9 #include "StreamUtils.h"
     10 #include "js/experimental/TypedData.h"
     11 #include "mozilla/dom/ReadableByteStreamController.h"
     12 #include "mozilla/dom/ReadableStream.h"
     13 #include "mozilla/dom/ReadableStreamDefaultController.h"
     14 #include "mozilla/dom/UnderlyingSourceBinding.h"
     15 #include "mozilla/dom/WorkerCommon.h"
     16 #include "mozilla/dom/WorkerPrivate.h"
     17 #include "mozilla/dom/WorkerRunnable.h"
     18 #include "nsStreamUtils.h"
     19 
     20 namespace mozilla::dom {
     21 
     22 using namespace streams_abstract;
     23 
     24 // UnderlyingSourceAlgorithmsBase
     25 NS_IMPL_CYCLE_COLLECTION(UnderlyingSourceAlgorithmsBase)
     26 NS_IMPL_CYCLE_COLLECTING_ADDREF(UnderlyingSourceAlgorithmsBase)
     27 NS_IMPL_CYCLE_COLLECTING_RELEASE(UnderlyingSourceAlgorithmsBase)
     28 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(UnderlyingSourceAlgorithmsBase)
     29  NS_INTERFACE_MAP_ENTRY(nsISupports)
     30 NS_INTERFACE_MAP_END
     31 
     32 NS_IMPL_CYCLE_COLLECTION_INHERITED_WITH_JS_MEMBERS(
     33    UnderlyingSourceAlgorithms, UnderlyingSourceAlgorithmsBase,
     34    (mGlobal, mStartCallback, mPullCallback, mCancelCallback),
     35    (mUnderlyingSource))
     36 NS_IMPL_ADDREF_INHERITED(UnderlyingSourceAlgorithms,
     37                         UnderlyingSourceAlgorithmsBase)
     38 NS_IMPL_RELEASE_INHERITED(UnderlyingSourceAlgorithms,
     39                          UnderlyingSourceAlgorithmsBase)
     40 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(UnderlyingSourceAlgorithms)
     41 NS_INTERFACE_MAP_END_INHERITING(UnderlyingSourceAlgorithmsBase)
     42 
     43 // https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller-from-underlying-source
     44 void UnderlyingSourceAlgorithms::StartCallback(
     45    JSContext* aCx, ReadableStreamControllerBase& aController,
     46    JS::MutableHandle<JS::Value> aRetVal, ErrorResult& aRv) {
     47  if (!mStartCallback) {
     48    // Step 2: Let startAlgorithm be an algorithm that returns undefined.
     49    aRetVal.setUndefined();
     50    return;
     51  }
     52 
     53  // Step 5: If underlyingSourceDict["start"] exists, then set startAlgorithm to
     54  // an algorithm which returns the result of invoking
     55  // underlyingSourceDict["start"] with argument list « controller » and
     56  // callback this value underlyingSource.
     57  JS::Rooted<JSObject*> thisObj(aCx, mUnderlyingSource);
     58  ReadableStreamDefaultControllerOrReadableByteStreamController controller;
     59  if (aController.IsDefault()) {
     60    controller.SetAsReadableStreamDefaultController() = aController.AsDefault();
     61  } else {
     62    controller.SetAsReadableByteStreamController() = aController.AsByte();
     63  }
     64 
     65  return mStartCallback->Call(thisObj, controller, aRetVal, aRv,
     66                              "UnderlyingSource.start",
     67                              CallbackFunction::eRethrowExceptions);
     68 }
     69 
     70 // https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller-from-underlying-source
     71 already_AddRefed<Promise> UnderlyingSourceAlgorithms::PullCallback(
     72    JSContext* aCx, ReadableStreamControllerBase& aController,
     73    ErrorResult& aRv) {
     74  JS::Rooted<JSObject*> thisObj(aCx, mUnderlyingSource);
     75  if (!mPullCallback) {
     76    // Step 3: Let pullAlgorithm be an algorithm that returns a promise resolved
     77    // with undefined.
     78    return Promise::CreateResolvedWithUndefined(mGlobal, aRv);
     79  }
     80 
     81  // Step 6: If underlyingSourceDict["pull"] exists, then set pullAlgorithm to
     82  // an algorithm which returns the result of invoking
     83  // underlyingSourceDict["pull"] with argument list « controller » and callback
     84  // this value underlyingSource.
     85  ReadableStreamDefaultControllerOrReadableByteStreamController controller;
     86  if (aController.IsDefault()) {
     87    controller.SetAsReadableStreamDefaultController() = aController.AsDefault();
     88  } else {
     89    controller.SetAsReadableByteStreamController() = aController.AsByte();
     90  }
     91 
     92  RefPtr<Promise> promise =
     93      mPullCallback->Call(thisObj, controller, aRv, "UnderlyingSource.pull",
     94                          CallbackFunction::eRethrowExceptions);
     95 
     96  return promise.forget();
     97 }
     98 
     99 // https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller-from-underlying-source
    100 already_AddRefed<Promise> UnderlyingSourceAlgorithms::CancelCallback(
    101    JSContext* aCx, const Optional<JS::Handle<JS::Value>>& aReason,
    102    ErrorResult& aRv) {
    103  if (!mCancelCallback) {
    104    // Step 4: Let cancelAlgorithm be an algorithm that returns a promise
    105    // resolved with undefined.
    106    return Promise::CreateResolvedWithUndefined(mGlobal, aRv);
    107  }
    108 
    109  // Step 7: If underlyingSourceDict["cancel"] exists, then set cancelAlgorithm
    110  // to an algorithm which takes an argument reason and returns the result of
    111  // invoking underlyingSourceDict["cancel"] with argument list « reason » and
    112  // callback this value underlyingSource.
    113  JS::Rooted<JSObject*> thisObj(aCx, mUnderlyingSource);
    114  RefPtr<Promise> promise =
    115      mCancelCallback->Call(thisObj, aReason, aRv, "UnderlyingSource.cancel",
    116                            CallbackFunction::eRethrowExceptions);
    117 
    118  return promise.forget();
    119 }
    120 
    121 // Shared between:
    122 // https://streams.spec.whatwg.org/#readablestream-set-up
    123 // https://streams.spec.whatwg.org/#readablestream-set-up-with-byte-reading-support
    124 // Step 1: Let startAlgorithm be an algorithm that returns undefined.
    125 void UnderlyingSourceAlgorithmsWrapper::StartCallback(
    126    JSContext*, ReadableStreamControllerBase&,
    127    JS::MutableHandle<JS::Value> aRetVal, ErrorResult&) {
    128  aRetVal.setUndefined();
    129 }
    130 
    131 // Shared between:
    132 // https://streams.spec.whatwg.org/#readablestream-set-up
    133 // https://streams.spec.whatwg.org/#readablestream-set-up-with-byte-reading-support
    134 // Step 2: Let pullAlgorithmWrapper be an algorithm that runs these steps:
    135 already_AddRefed<Promise> UnderlyingSourceAlgorithmsWrapper::PullCallback(
    136    JSContext* aCx, ReadableStreamControllerBase& aController,
    137    ErrorResult& aRv) {
    138  nsCOMPtr<nsIGlobalObject> global = aController.GetParentObject();
    139  return PromisifyAlgorithm(
    140      global,
    141      [&](ErrorResult& aRv) MOZ_CAN_RUN_SCRIPT_FOR_DEFINITION {
    142        return PullCallbackImpl(aCx, aController, aRv);
    143      },
    144      aRv);
    145 }
    146 
    147 // Shared between:
    148 // https://streams.spec.whatwg.org/#readablestream-set-up
    149 // https://streams.spec.whatwg.org/#readablestream-set-up-with-byte-reading-support
    150 // Step 3: Let cancelAlgorithmWrapper be an algorithm that runs these steps:
    151 already_AddRefed<Promise> UnderlyingSourceAlgorithmsWrapper::CancelCallback(
    152    JSContext* aCx, const Optional<JS::Handle<JS::Value>>& aReason,
    153    ErrorResult& aRv) {
    154  nsCOMPtr<nsIGlobalObject> global = xpc::CurrentNativeGlobal(aCx);
    155  return PromisifyAlgorithm(
    156      global,
    157      [&](ErrorResult& aRv) MOZ_CAN_RUN_SCRIPT_FOR_DEFINITION {
    158        return CancelCallbackImpl(aCx, aReason, aRv);
    159      },
    160      aRv);
    161 }
    162 
    163 NS_IMPL_ISUPPORTS(InputStreamHolder, nsIInputStreamCallback)
    164 
    165 InputStreamHolder::InputStreamHolder(nsIGlobalObject* aGlobal,
    166                                     InputToReadableStreamAlgorithms* aCallback,
    167                                     nsIAsyncInputStream* aInput)
    168    : GlobalTeardownObserver(aGlobal), mCallback(aCallback), mInput(aInput) {}
    169 
    170 void InputStreamHolder::Init(JSContext* aCx) {
    171  if (!NS_IsMainThread()) {
    172    // We're in a worker
    173    WorkerPrivate* workerPrivate = GetWorkerPrivateFromContext(aCx);
    174    MOZ_ASSERT(workerPrivate);
    175 
    176    workerPrivate->AssertIsOnWorkerThread();
    177 
    178    // Note, this will create a ref-cycle between the holder and the stream.
    179    // The cycle is broken when the stream is closed or the worker begins
    180    // shutting down.
    181    mWorkerRef = StrongWorkerRef::Create(workerPrivate, "InputStreamHolder",
    182                                         [self = RefPtr{this}]() {});
    183    if (NS_WARN_IF(!mWorkerRef)) {
    184      return;
    185    }
    186  }
    187 }
    188 
    189 InputStreamHolder::~InputStreamHolder() = default;
    190 
    191 void InputStreamHolder::DisconnectFromOwner() {
    192  Shutdown();
    193  GlobalTeardownObserver::DisconnectFromOwner();
    194 }
    195 
    196 void InputStreamHolder::Shutdown() {
    197  if (mInput) {
    198    mInput->Close();
    199  }
    200  // NOTE(krosylight): Dropping mAsyncWaitAlgorithms here means letting cycle
    201  // collection happen on the underlying source, which can cause a dangling
    202  // read promise that never resolves. Doing so shouldn't be a problem at
    203  // shutdown phase.
    204  // Note that this is currently primarily for Fetch which does not explicitly
    205  // close its streams at shutdown. (i.e. to prevent memory leak for cases e.g
    206  // WPT /fetch/api/basic/stream-response.any.html)
    207  mAsyncWaitAlgorithms = nullptr;
    208  // If we have an AsyncWait running, we'll get a callback and clear
    209  // the mAsyncWaitWorkerRef
    210  mWorkerRef = nullptr;
    211 }
    212 
    213 nsresult InputStreamHolder::AsyncWait(uint32_t aFlags, uint32_t aRequestedCount,
    214                                      nsIEventTarget* aEventTarget) {
    215  nsresult rv = mInput->AsyncWait(this, aFlags, aRequestedCount, aEventTarget);
    216  if (NS_SUCCEEDED(rv)) {
    217    mAsyncWaitWorkerRef = mWorkerRef;
    218    mAsyncWaitAlgorithms = mCallback;
    219  }
    220  return rv;
    221 }
    222 
    223 NS_IMETHODIMP InputStreamHolder::OnInputStreamReady(
    224    nsIAsyncInputStream* aStream) {
    225  mAsyncWaitWorkerRef = nullptr;
    226  mAsyncWaitAlgorithms = nullptr;
    227  // We may get called back after ::Shutdown()
    228  if (mCallback) {
    229    return mCallback->OnInputStreamReady(aStream);
    230  }
    231  return NS_ERROR_FAILURE;
    232 }
    233 
    234 NS_IMPL_ISUPPORTS_CYCLE_COLLECTION_INHERITED(InputToReadableStreamAlgorithms,
    235                                             UnderlyingSourceAlgorithmsWrapper,
    236                                             nsIInputStreamCallback)
    237 NS_IMPL_CYCLE_COLLECTION_WEAK_PTR_INHERITED(InputToReadableStreamAlgorithms,
    238                                            UnderlyingSourceAlgorithmsWrapper,
    239                                            mPullPromise, mStream)
    240 
    241 InputToReadableStreamAlgorithms::InputToReadableStreamAlgorithms(
    242    JSContext* aCx, nsIAsyncInputStream* aInput, ReadableStream* aStream)
    243    : mOwningEventTarget(GetCurrentSerialEventTarget()),
    244      mInput(new InputStreamHolder(aStream->GetParentObject(), this, aInput)),
    245      mStream(aStream) {
    246  mInput->Init(aCx);
    247 }
    248 
    249 already_AddRefed<Promise> InputToReadableStreamAlgorithms::PullCallbackImpl(
    250    JSContext* aCx, ReadableStreamControllerBase& aController,
    251    ErrorResult& aRv) {
    252  MOZ_ASSERT(aController.IsByte());
    253  ReadableStream* stream = aController.Stream();
    254  MOZ_ASSERT(stream);
    255 
    256  MOZ_DIAGNOSTIC_ASSERT(stream->Disturbed());
    257 
    258  MOZ_DIAGNOSTIC_ASSERT(!IsClosed());
    259  MOZ_ASSERT(!mPullPromise);
    260  mPullPromise = Promise::CreateInfallible(aController.GetParentObject());
    261 
    262  MOZ_DIAGNOSTIC_ASSERT(mInput);
    263 
    264  nsresult rv = mInput->AsyncWait(0, 0, mOwningEventTarget);
    265  if (NS_WARN_IF(NS_FAILED(rv))) {
    266    ErrorPropagation(aCx, stream, rv);
    267    return nullptr;
    268  }
    269 
    270  // All good.
    271  return do_AddRef(mPullPromise);
    272 }
    273 
    274 // _BOUNDARY because OnInputStreamReady doesn't have [can-run-script]
    275 MOZ_CAN_RUN_SCRIPT_BOUNDARY NS_IMETHODIMP
    276 InputToReadableStreamAlgorithms::OnInputStreamReady(
    277    nsIAsyncInputStream* aStream) {
    278  MOZ_DIAGNOSTIC_ASSERT(aStream);
    279 
    280  // Already closed. We have nothing else to do here.
    281  if (IsClosed()) {
    282    return NS_OK;
    283  }
    284 
    285  AutoEntryScript aes(mStream->GetParentObject(),
    286                      "InputToReadableStream data available");
    287 
    288  MOZ_DIAGNOSTIC_ASSERT(mInput);
    289 
    290  JSContext* cx = aes.cx();
    291 
    292  uint64_t size = 0;
    293  nsresult rv = mInput->Available(&size);
    294  MOZ_ASSERT_IF(NS_SUCCEEDED(rv), size > 0);
    295 
    296  // No warning for stream closed.
    297  if (rv == NS_BASE_STREAM_CLOSED || NS_WARN_IF(NS_FAILED(rv))) {
    298    ErrorPropagation(cx, mStream, rv);
    299    return NS_OK;
    300  }
    301 
    302  // Not having a promise means we are pinged by stream closure
    303  // (WAIT_CLOSURE_ONLY below), but here we still have more data to read. Let's
    304  // wait for the next read request in that case.
    305  if (!mPullPromise) {
    306    return NS_OK;
    307  }
    308 
    309  MOZ_DIAGNOSTIC_ASSERT(mPullPromise->State() ==
    310                        Promise::PromiseState::Pending);
    311 
    312  ErrorResult errorResult;
    313  PullFromInputStream(cx, size, errorResult);
    314  errorResult.WouldReportJSException();
    315  if (errorResult.Failed()) {
    316    ErrorPropagation(cx, mStream, errorResult.StealNSResult());
    317    return NS_OK;
    318  }
    319 
    320  // PullFromInputStream can fulfill read request, which can trigger read
    321  // request chunk steps, which again may execute JS. But it should be still
    322  // safe from cycle collection as the caller nsIAsyncInputStream should hold
    323  // the reference of `this`.
    324  //
    325  // That said, it's generally good to be cautious as there's no guarantee that
    326  // the interface is implemented in the safest way.
    327  MOZ_DIAGNOSTIC_ASSERT(mPullPromise);
    328  if (mPullPromise) {
    329    mPullPromise->MaybeResolveWithUndefined();
    330    mPullPromise = nullptr;
    331  }
    332 
    333  MOZ_DIAGNOSTIC_ASSERT(mInput);
    334  if (mInput) {
    335    // Subscribe WAIT_CLOSURE_ONLY so that OnInputStreamReady can be called when
    336    // mInput is closed.
    337    rv = mInput->AsyncWait(nsIAsyncInputStream::WAIT_CLOSURE_ONLY, 0,
    338                           mOwningEventTarget);
    339    if (NS_WARN_IF(NS_FAILED(rv))) {
    340      ErrorPropagation(cx, mStream, errorResult.StealNSResult());
    341      return NS_OK;
    342    }
    343  }
    344 
    345  return NS_OK;
    346 }
    347 
    348 void InputToReadableStreamAlgorithms::WriteIntoReadRequestBuffer(
    349    JSContext* aCx, ReadableStream* aStream, JS::Handle<JSObject*> aBuffer,
    350    uint32_t aLength, uint32_t* aByteWritten, ErrorResult& aRv) {
    351  MOZ_DIAGNOSTIC_ASSERT(aBuffer);
    352  MOZ_DIAGNOSTIC_ASSERT(aByteWritten);
    353  MOZ_DIAGNOSTIC_ASSERT(mInput);
    354  MOZ_DIAGNOSTIC_ASSERT(!IsClosed());
    355  MOZ_DIAGNOSTIC_ASSERT(mPullPromise->State() ==
    356                        Promise::PromiseState::Pending);
    357 
    358  uint32_t written;
    359  nsresult rv;
    360  void* buffer;
    361  {
    362    // Bug 1754513: Hazard suppression.
    363    //
    364    // Because mInput->Read is detected as possibly GCing by the
    365    // current state of our static hazard analysis, we need to do the
    366    // suppression here. This can be removed with future improvements
    367    // to the static analysis.
    368    JS::AutoSuppressGCAnalysis suppress;
    369    JS::AutoCheckCannotGC noGC;
    370    bool isSharedMemory;
    371 
    372    buffer = JS_GetArrayBufferViewData(aBuffer, &isSharedMemory, noGC);
    373    MOZ_ASSERT(!isSharedMemory);
    374 
    375    rv = mInput->Read(static_cast<char*>(buffer), aLength, &written);
    376    if (NS_WARN_IF(NS_FAILED(rv))) {
    377      aRv.Throw(rv);
    378      return;
    379    }
    380  }
    381 
    382  *aByteWritten = written;
    383 
    384  if (written == 0) {
    385    // If bytesWritten is zero, then the stream has been closed; return rather
    386    // than enqueueing a chunk filled with zeros.
    387    aRv.Throw(NS_BASE_STREAM_CLOSED);
    388    return;
    389  }
    390 
    391  // All good.
    392 }
    393 
    394 // https://streams.spec.whatwg.org/#readablestream-pull-from-bytes
    395 // This is a ReadableStream algorithm but will probably be used solely in
    396 // InputToReadableStreamAlgorithms.
    397 void InputToReadableStreamAlgorithms::PullFromInputStream(JSContext* aCx,
    398                                                          uint64_t aAvailable,
    399                                                          ErrorResult& aRv) {
    400  // Step 1. Assert: stream.[[controller]] implements
    401  // ReadableByteStreamController.
    402  MOZ_ASSERT(mStream->Controller()->IsByte());
    403 
    404  // Step 2. Let available be bytes’s length. (aAvailable)
    405  // Step 3. Let desiredSize be available.
    406  uint64_t desiredSize = aAvailable;
    407 
    408  // Step 4. If stream’s current BYOB request view is non-null, then set
    409  // desiredSize to stream’s current BYOB request view's byte length.
    410  JS::Rooted<JSObject*> byobView(aCx);
    411  mStream->GetCurrentBYOBRequestView(aCx, &byobView, aRv);
    412  if (aRv.Failed()) {
    413    return;
    414  }
    415  if (byobView) {
    416    desiredSize = JS_GetArrayBufferViewByteLength(byobView);
    417  }
    418 
    419  // Step 5. Let pullSize be the smaller value of available and desiredSize.
    420  //
    421  // To avoid OOMing up on huge amounts of available data on a 32 bit system,
    422  // as well as potentially overflowing nsIInputStream's Read method's
    423  // parameter, let's limit our maximum chunk size to 256MB.
    424  //
    425  // (Note that nsIInputStream uses uint64_t for Available and uint32_t for
    426  // Read.)
    427  uint64_t pullSize = std::min(static_cast<uint64_t>(256 * 1024 * 1024),
    428                               std::min(aAvailable, desiredSize));
    429 
    430  // Step 6. Let pulled be the first pullSize bytes of bytes.
    431  // Step 7. Remove the first pullSize bytes from bytes.
    432  //
    433  // We do this in step 8 and 9, as we don't have a direct access to the data
    434  // but need to let nsIInputStream to write into the view.
    435 
    436  // Step 8. If stream’s current BYOB request view is non-null, then:
    437  if (byobView) {
    438    // Step 8.1. Write pulled into stream’s current BYOB request view.
    439    uint32_t bytesWritten = 0;
    440    WriteIntoReadRequestBuffer(aCx, mStream, byobView, pullSize, &bytesWritten,
    441                               aRv);
    442    if (aRv.Failed()) {
    443      return;
    444    }
    445 
    446    // Step 8.2. Perform ?
    447    // ReadableByteStreamControllerRespond(stream.[[controller]], pullSize).
    448    //
    449    // But we do not use pullSize but use byteWritten here, since nsIInputStream
    450    // does not guarantee to read as much as it told in Available().
    451    MOZ_DIAGNOSTIC_ASSERT(pullSize == bytesWritten);
    452    ReadableByteStreamControllerRespond(
    453        aCx, MOZ_KnownLive(mStream->Controller()->AsByte()), bytesWritten, aRv);
    454  }
    455  // Step 9. Otherwise,
    456  else {
    457    // Step 9.1. Set view to the result of creating a Uint8Array from pulled in
    458    // stream’s relevant Realm.
    459    UniquePtr<uint8_t[], JS::FreePolicy> buffer(
    460        static_cast<uint8_t*>(JS_malloc(aCx, pullSize)));
    461    if (!buffer) {
    462      aRv.ThrowTypeError("Out of memory");
    463      return;
    464    }
    465 
    466    uint32_t bytesWritten = 0;
    467    nsresult rv = mInput->Read((char*)buffer.get(), pullSize, &bytesWritten);
    468    if (!bytesWritten) {
    469      rv = NS_BASE_STREAM_CLOSED;
    470    }
    471    if (NS_FAILED(rv)) {
    472      aRv.Throw(rv);
    473      return;
    474    }
    475 
    476    MOZ_DIAGNOSTIC_ASSERT(pullSize == bytesWritten);
    477    JS::Rooted<JSObject*> view(aCx, nsJSUtils::MoveBufferAsUint8Array(
    478                                        aCx, bytesWritten, std::move(buffer)));
    479    if (!view) {
    480      JS_ClearPendingException(aCx);
    481      aRv.ThrowTypeError("Out of memory");
    482      return;
    483    }
    484 
    485    // Step 9.2. Perform ?
    486    // ReadableByteStreamControllerEnqueue(stream.[[controller]], view).
    487    ReadableByteStreamControllerEnqueue(
    488        aCx, MOZ_KnownLive(mStream->Controller()->AsByte()), view, aRv);
    489  }
    490 }
    491 
    492 void InputToReadableStreamAlgorithms::CloseAndReleaseObjects(
    493    JSContext* aCx, ReadableStream* aStream) {
    494  MOZ_DIAGNOSTIC_ASSERT(!IsClosed());
    495 
    496  ReleaseObjects();
    497 
    498  if (aStream->State() == ReadableStream::ReaderState::Readable) {
    499    IgnoredErrorResult rv;
    500    aStream->CloseNative(aCx, rv);
    501    NS_WARNING_ASSERTION(!rv.Failed(), "Failed to Close Stream");
    502  }
    503 }
    504 
    505 void InputToReadableStreamAlgorithms::ReleaseObjects() {
    506  if (mInput) {
    507    mInput->CloseWithStatus(NS_BASE_STREAM_CLOSED);
    508    mInput->Shutdown();
    509    mInput = nullptr;
    510  }
    511 
    512  // It's okay to leave a potentially unsettled promise as-is as this is only
    513  // used to prevent reentrant to PullCallback. CloseNative() or ErrorNative()
    514  // will settle the read requests for us.
    515  mPullPromise = nullptr;
    516 }
    517 
    518 nsIInputStream* InputToReadableStreamAlgorithms::MaybeGetInputStreamIfUnread() {
    519  MOZ_ASSERT(!mStream->Disturbed(),
    520             "Should be only called on non-disturbed streams");
    521  return mInput->GetInputStream();
    522 }
    523 
    524 void InputToReadableStreamAlgorithms::ErrorPropagation(JSContext* aCx,
    525                                                       ReadableStream* aStream,
    526                                                       nsresult aError) {
    527  // Nothing to do.
    528  if (IsClosed()) {
    529    return;
    530  }
    531 
    532  // Let's close the stream.
    533  if (aError == NS_BASE_STREAM_CLOSED) {
    534    CloseAndReleaseObjects(aCx, aStream);
    535    return;
    536  }
    537 
    538  // Let's use a generic error.
    539  ErrorResult rv;
    540  // XXXbz can we come up with a better error message here to tell the
    541  // consumer what went wrong?
    542  rv.ThrowTypeError("Error in input stream");
    543 
    544  JS::Rooted<JS::Value> errorValue(aCx);
    545  bool ok = ToJSValue(aCx, std::move(rv), &errorValue);
    546  MOZ_RELEASE_ASSERT(ok, "ToJSValue never fails for ErrorResult");
    547 
    548  {
    549    // This will be ignored if it's already errored.
    550    IgnoredErrorResult rv;
    551    aStream->ErrorNative(aCx, errorValue, rv);
    552    NS_WARNING_ASSERTION(!rv.Failed(), "Failed to error InputToReadableStream");
    553  }
    554 
    555  MOZ_ASSERT(IsClosed());
    556 }
    557 
    558 NS_IMPL_ISUPPORTS_CYCLE_COLLECTION_INHERITED_0(
    559    NonAsyncInputToReadableStreamAlgorithms, UnderlyingSourceAlgorithmsWrapper)
    560 NS_IMPL_CYCLE_COLLECTION_INHERITED(NonAsyncInputToReadableStreamAlgorithms,
    561                                   UnderlyingSourceAlgorithmsWrapper,
    562                                   mAsyncAlgorithms)
    563 
    564 already_AddRefed<Promise>
    565 NonAsyncInputToReadableStreamAlgorithms::PullCallbackImpl(
    566    JSContext* aCx, ReadableStreamControllerBase& aController,
    567    ErrorResult& aRv) {
    568  if (!mAsyncAlgorithms) {
    569    nsCOMPtr<nsIAsyncInputStream> asyncStream;
    570 
    571    // NS_MakeAsyncNonBlockingInputStream may immediately start a stream read
    572    // via nsInputStreamTransport::OpenInputStream, which is why this should be
    573    // called on a pull callback instead of in the constructor.
    574    nsresult rv = NS_MakeAsyncNonBlockingInputStream(
    575        mInput.forget(), getter_AddRefs(asyncStream));
    576    if (NS_WARN_IF(NS_FAILED(rv))) {
    577      aRv.Throw(rv);
    578      return nullptr;
    579    }
    580 
    581    mAsyncAlgorithms = MakeRefPtr<InputToReadableStreamAlgorithms>(
    582        aCx, asyncStream, aController.Stream());
    583  }
    584 
    585  MOZ_ASSERT(!mInput);
    586  return mAsyncAlgorithms->PullCallbackImpl(aCx, aController, aRv);
    587 }
    588 
    589 }  // namespace mozilla::dom