tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

ReadableStreamPipeTo.cpp (37018B)


      1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
      2 /* vim:set ts=2 sw=2 sts=2 et cindent: */
      3 /* This Source Code Form is subject to the terms of the Mozilla Public
      4 * License, v. 2.0. If a copy of the MPL was not distributed with this
      5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      6 
      7 #include "ReadableStreamPipeTo.h"
      8 
      9 #include "js/Exception.h"
     10 #include "mozilla/AlreadyAddRefed.h"
     11 #include "mozilla/ErrorResult.h"
     12 #include "mozilla/dom/AbortFollower.h"
     13 #include "mozilla/dom/AbortSignal.h"
     14 #include "mozilla/dom/Promise-inl.h"
     15 #include "mozilla/dom/Promise.h"
     16 #include "mozilla/dom/PromiseNativeHandler.h"
     17 #include "mozilla/dom/ReadableStream.h"
     18 #include "mozilla/dom/ReadableStreamDefaultReader.h"
     19 #include "mozilla/dom/WritableStream.h"
     20 #include "mozilla/dom/WritableStreamDefaultWriter.h"
     21 #include "nsCycleCollectionParticipant.h"
     22 #include "nsISupportsImpl.h"
     23 
     24 namespace mozilla::dom {
     25 
     26 using namespace streams_abstract;
     27 
     28 struct PipeToReadRequest;
     29 class WriteFinishedPromiseHandler;
     30 class ShutdownActionFinishedPromiseHandler;
     31 
     32 // https://streams.spec.whatwg.org/#readable-stream-pipe-to (Steps 14-15.)
     33 //
     34 // This class implements everything that is required to read all chunks from
     35 // the reader (source) and write them to writer (destination), while
     36 // following the constraints given in the spec using our implementation-defined
     37 // behavior.
     38 //
     39 // The cycle-collected references look roughly like this:
     40 // clang-format off
     41 //
     42 // Closed promise <-- ReadableStreamDefaultReader <--> ReadableStream
     43 //         |                  ^              |
     44 //         |(PromiseHandler)  |(mReader)     |(ReadRequest)
     45 //         |                  |              |
     46 //         |-------------> PipeToPump <-------
     47 //                         ^  |   |
     48 //         |---------------|  |   |
     49 //         |                  |   |-------(mLastWrite) -------->
     50 //         |(PromiseHandler)  |   |< ---- (PromiseHandler) ---- Promise
     51 //         |                  |                                   ^
     52 //         |                  |(mWriter)                          |(mWriteRequests)
     53 //         |                  v                                   |
     54 // Closed promise <-- WritableStreamDefaultWriter <--------> WritableStream
     55 //
     56 // clang-format on
     57 class PipeToPump final : public AbortFollower {
     58  NS_DECL_CYCLE_COLLECTING_ISUPPORTS
     59  NS_DECL_CYCLE_COLLECTION_CLASS(PipeToPump)
     60 
     61  friend struct PipeToReadRequest;
     62  friend class WriteFinishedPromiseHandler;
     63  friend class ShutdownActionFinishedPromiseHandler;
     64 
     65  PipeToPump(Promise* aPromise, ReadableStreamDefaultReader* aReader,
     66             WritableStreamDefaultWriter* aWriter, bool aPreventClose,
     67             bool aPreventAbort, bool aPreventCancel)
     68      : mPromise(aPromise),
     69        mReader(aReader),
     70        mWriter(aWriter),
     71        mPreventClose(aPreventClose),
     72        mPreventAbort(aPreventAbort),
     73        mPreventCancel(aPreventCancel) {}
     74 
     75  MOZ_CAN_RUN_SCRIPT void Start(JSContext* aCx, AbortSignal* aSignal);
     76 
     77  MOZ_CAN_RUN_SCRIPT_BOUNDARY void RunAbortAlgorithm() override;
     78 
     79 private:
     80  ~PipeToPump() override = default;
     81 
     82  MOZ_CAN_RUN_SCRIPT void PerformAbortAlgorithm(JSContext* aCx,
     83                                                AbortSignalImpl* aSignal);
     84 
     85  MOZ_CAN_RUN_SCRIPT bool SourceOrDestErroredOrClosed(JSContext* aCx);
     86 
     87  using ShutdownAction = already_AddRefed<Promise> (*)(
     88      JSContext*, PipeToPump*, JS::Handle<mozilla::Maybe<JS::Value>>,
     89      ErrorResult&);
     90 
     91  MOZ_CAN_RUN_SCRIPT void ShutdownWithAction(
     92      JSContext* aCx, ShutdownAction aAction,
     93      JS::Handle<mozilla::Maybe<JS::Value>> aError);
     94  MOZ_CAN_RUN_SCRIPT void ShutdownWithActionAfterFinishedWrite(
     95      JSContext* aCx, ShutdownAction aAction,
     96      JS::Handle<mozilla::Maybe<JS::Value>> aError);
     97 
     98  MOZ_CAN_RUN_SCRIPT void Shutdown(
     99      JSContext* aCx, JS::Handle<mozilla::Maybe<JS::Value>> aError);
    100 
    101  void Finalize(JSContext* aCx, JS::Handle<mozilla::Maybe<JS::Value>> aError);
    102 
    103  MOZ_CAN_RUN_SCRIPT void OnReadFulfilled(JSContext* aCx,
    104                                          JS::Handle<JS::Value> aChunk,
    105                                          ErrorResult& aRv);
    106  MOZ_CAN_RUN_SCRIPT void OnWriterReady(JSContext* aCx, JS::Handle<JS::Value>);
    107  MOZ_CAN_RUN_SCRIPT void Read(JSContext* aCx);
    108 
    109  MOZ_CAN_RUN_SCRIPT void OnSourceClosed(JSContext* aCx, JS::Handle<JS::Value>);
    110  MOZ_CAN_RUN_SCRIPT void OnSourceErrored(
    111      JSContext* aCx, JS::Handle<JS::Value> aSourceStoredError);
    112 
    113  MOZ_CAN_RUN_SCRIPT void OnDestClosed(JSContext* aCx, JS::Handle<JS::Value>);
    114  MOZ_CAN_RUN_SCRIPT void OnDestErrored(JSContext* aCx,
    115                                        JS::Handle<JS::Value> aDestStoredError);
    116 
    117  RefPtr<Promise> mPromise;
    118  RefPtr<ReadableStreamDefaultReader> mReader;
    119  RefPtr<WritableStreamDefaultWriter> mWriter;
    120  RefPtr<Promise> mLastWritePromise;
    121  const bool mPreventClose;
    122  const bool mPreventAbort;
    123  const bool mPreventCancel;
    124  bool mShuttingDown = false;
    125 #ifdef DEBUG
    126  bool mReadChunk = false;
    127 #endif
    128 };
    129 
    130 // This is a helper class for PipeToPump that allows it to attach
    131 // member functions as promise handlers.
    132 class PipeToPumpHandler final : public PromiseNativeHandler {
    133  virtual ~PipeToPumpHandler() = default;
    134 
    135  using FunPtr = void (PipeToPump::*)(JSContext*, JS::Handle<JS::Value>);
    136 
    137  RefPtr<PipeToPump> mPipeToPump;
    138  FunPtr mResolved;
    139  FunPtr mRejected;
    140 
    141 public:
    142  NS_DECL_CYCLE_COLLECTING_ISUPPORTS
    143  NS_DECL_CYCLE_COLLECTION_CLASS(PipeToPumpHandler)
    144 
    145  explicit PipeToPumpHandler(PipeToPump* aPipeToPump, FunPtr aResolved,
    146                             FunPtr aRejected)
    147      : mPipeToPump(aPipeToPump), mResolved(aResolved), mRejected(aRejected) {}
    148 
    149  void ResolvedCallback(JSContext* aCx, JS::Handle<JS::Value> aValue,
    150                        ErrorResult&) override {
    151    if (mResolved) {
    152      (mPipeToPump->*mResolved)(aCx, aValue);
    153    }
    154  }
    155 
    156  void RejectedCallback(JSContext* aCx, JS::Handle<JS::Value> aReason,
    157                        ErrorResult&) override {
    158    if (mRejected) {
    159      (mPipeToPump->*mRejected)(aCx, aReason);
    160    }
    161  }
    162 };
    163 
    164 NS_IMPL_CYCLE_COLLECTION(PipeToPumpHandler, mPipeToPump)
    165 NS_IMPL_CYCLE_COLLECTING_ADDREF(PipeToPumpHandler)
    166 NS_IMPL_CYCLE_COLLECTING_RELEASE(PipeToPumpHandler)
    167 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(PipeToPumpHandler)
    168  NS_INTERFACE_MAP_ENTRY(nsISupports)
    169 NS_INTERFACE_MAP_END
    170 
    171 void PipeToPump::RunAbortAlgorithm() {
    172  AutoJSAPI jsapi;
    173  if (!jsapi.Init(mReader->GetStream()->GetParentObject())) {
    174    NS_WARNING(
    175        "Failed to initialize AutoJSAPI in PipeToPump::RunAbortAlgorithm");
    176    return;
    177  }
    178  JSContext* cx = jsapi.cx();
    179 
    180  RefPtr<AbortSignalImpl> signal = Signal();
    181  PerformAbortAlgorithm(cx, signal);
    182 }
    183 
    184 void PipeToPump::PerformAbortAlgorithm(JSContext* aCx,
    185                                       AbortSignalImpl* aSignal) {
    186  MOZ_ASSERT(aSignal->Aborted());
    187 
    188  // https://streams.spec.whatwg.org/#readable-stream-pipe-to
    189  // Step 14.1. Let abortAlgorithm be the following steps:
    190  // Note: All the following steps are 14.1.xx
    191 
    192  // Step 1. Let error be signal’s abort reason.
    193  JS::Rooted<JS::Value> error(aCx);
    194  aSignal->GetReason(aCx, &error);
    195 
    196  auto action = [](JSContext* aCx, PipeToPump* aPipeToPump,
    197                   JS::Handle<mozilla::Maybe<JS::Value>> aError,
    198                   ErrorResult& aRv) MOZ_CAN_RUN_SCRIPT {
    199    JS::Rooted<JS::Value> error(aCx, *aError);
    200 
    201    // Step 2. Let actions be an empty ordered set.
    202    nsTArray<RefPtr<Promise>> actions;
    203 
    204    // Step 3. If preventAbort is false, append the following action to actions:
    205    if (!aPipeToPump->mPreventAbort) {
    206      RefPtr<WritableStream> dest = aPipeToPump->mWriter->GetStream();
    207 
    208      // Step 3.1. If dest.[[state]] is "writable", return !
    209      // WritableStreamAbort(dest, error).
    210      if (dest->State() == WritableStream::WriterState::Writable) {
    211        RefPtr<Promise> p = WritableStreamAbort(aCx, dest, error, aRv);
    212        if (aRv.Failed()) {
    213          return already_AddRefed<Promise>();
    214        }
    215        actions.AppendElement(p);
    216      }
    217 
    218      // Step 3.2. Otherwise, return a promise resolved with undefined.
    219      // Note: This is basically a no-op.
    220    }
    221 
    222    // Step 4. If preventCancel is false, append the following action action to
    223    // actions:
    224    if (!aPipeToPump->mPreventCancel) {
    225      RefPtr<ReadableStream> source = aPipeToPump->mReader->GetStream();
    226 
    227      // Step 4.1. If source.[[state]] is "readable", return !
    228      // ReadableStreamCancel(source, error).
    229      if (source->State() == ReadableStream::ReaderState::Readable) {
    230        RefPtr<Promise> p = ReadableStreamCancel(aCx, source, error, aRv);
    231        if (aRv.Failed()) {
    232          return already_AddRefed<Promise>();
    233        }
    234        actions.AppendElement(p);
    235      }
    236 
    237      // Step 4.2. Otherwise, return a promise resolved with undefined.
    238      // No-op again.
    239    }
    240 
    241    // Step 5. .. action consisting of getting a promise to wait for
    242    // all of the actions in actions ...
    243    return Promise::All(aCx, actions, aRv);
    244  };
    245 
    246  // Step 5. Shutdown with an action consisting of getting a promise to wait for
    247  // all of the actions in actions, and with error.
    248  JS::Rooted<Maybe<JS::Value>> someError(aCx, Some(error.get()));
    249  ShutdownWithAction(aCx, action, someError);
    250 }
    251 
    252 bool PipeToPump::SourceOrDestErroredOrClosed(JSContext* aCx) {
    253  // (Constraint) Error and close states must be propagated:
    254  // the following conditions must be applied in order.
    255  RefPtr<ReadableStream> source = mReader->GetStream();
    256  RefPtr<WritableStream> dest = mWriter->GetStream();
    257 
    258  // Step 1. Errors must be propagated forward: if source.[[state]] is or
    259  // becomes "errored", then
    260  if (source->State() == ReadableStream::ReaderState::Errored) {
    261    JS::Rooted<JS::Value> storedError(aCx, source->StoredError());
    262    OnSourceErrored(aCx, storedError);
    263    return true;
    264  }
    265 
    266  // Step 2. Errors must be propagated backward: if dest.[[state]] is or becomes
    267  // "errored", then
    268  if (dest->State() == WritableStream::WriterState::Errored) {
    269    JS::Rooted<JS::Value> storedError(aCx, dest->StoredError());
    270    OnDestErrored(aCx, storedError);
    271    return true;
    272  }
    273 
    274  // Step 3. Closing must be propagated forward: if source.[[state]] is or
    275  // becomes "closed", then
    276  if (source->State() == ReadableStream::ReaderState::Closed) {
    277    OnSourceClosed(aCx, JS::UndefinedHandleValue);
    278    return true;
    279  }
    280 
    281  // Step 4. Closing must be propagated backward:
    282  // if ! WritableStreamCloseQueuedOrInFlight(dest) is true
    283  // or dest.[[state]] is "closed", then
    284  if (dest->CloseQueuedOrInFlight() ||
    285      dest->State() == WritableStream::WriterState::Closed) {
    286    OnDestClosed(aCx, JS::UndefinedHandleValue);
    287    return true;
    288  }
    289 
    290  return false;
    291 }
    292 
    293 // https://streams.spec.whatwg.org/#readable-stream-pipe-to
    294 // Steps 14-15.
    295 void PipeToPump::Start(JSContext* aCx, AbortSignal* aSignal) {
    296  // Step 14. If signal is not undefined,
    297  if (aSignal) {
    298    // Step 14.1. Let abortAlgorithm be the following steps:
    299    // ... This is implemented by RunAbortAlgorithm.
    300 
    301    // Step 14.2. If signal is aborted, perform abortAlgorithm and
    302    // return promise.
    303    if (aSignal->Aborted()) {
    304      PerformAbortAlgorithm(aCx, aSignal);
    305      return;
    306    }
    307 
    308    // Step 14.3. Add abortAlgorithm to signal.
    309    Follow(aSignal);
    310  }
    311 
    312  // Step 15. In parallel but not really; see #905, using reader and writer,
    313  // read all chunks from source and write them to dest.
    314  // Due to the locking provided by the reader and writer,
    315  // the exact manner in which this happens is not observable to author code,
    316  // and so there is flexibility in how this is done.
    317 
    318  // (Constraint) Error and close states must be propagated
    319 
    320  // Before piping has started, we have to check for source/destination being
    321  // errored/closed manually.
    322  if (SourceOrDestErroredOrClosed(aCx)) {
    323    return;
    324  }
    325 
    326  // We use the following two promises to propagate error and close states
    327  // during piping.
    328  RefPtr<Promise> readerClosed = mReader->ClosedPromise();
    329  readerClosed->AppendNativeHandler(new PipeToPumpHandler(
    330      this, &PipeToPump::OnSourceClosed, &PipeToPump::OnSourceErrored));
    331 
    332  // Note: Because we control the destination/writer it should never be closed
    333  // after we did the initial check above with SourceOrDestErroredOrClosed.
    334  RefPtr<Promise> writerClosed = mWriter->ClosedPromise();
    335  writerClosed->AppendNativeHandler(new PipeToPumpHandler(
    336      this, &PipeToPump::OnDestClosed, &PipeToPump::OnDestErrored));
    337 
    338  Read(aCx);
    339 }
    340 
    341 class WriteFinishedPromiseHandler final : public PromiseNativeHandler {
    342  RefPtr<PipeToPump> mPipeToPump;
    343  PipeToPump::ShutdownAction mAction;
    344  bool mHasError;
    345  JS::Heap<JS::Value> mError;
    346 
    347  virtual ~WriteFinishedPromiseHandler() { mozilla::DropJSObjects(this); };
    348 
    349 public:
    350  NS_DECL_CYCLE_COLLECTING_ISUPPORTS
    351  NS_DECL_CYCLE_COLLECTION_SCRIPT_HOLDER_CLASS(WriteFinishedPromiseHandler)
    352 
    353  explicit WriteFinishedPromiseHandler(
    354      JSContext* aCx, PipeToPump* aPipeToPump,
    355      PipeToPump::ShutdownAction aAction,
    356      JS::Handle<mozilla::Maybe<JS::Value>> aError)
    357      : mPipeToPump(aPipeToPump), mAction(aAction) {
    358    mHasError = aError.isSome();
    359    if (mHasError) {
    360      mError = *aError;
    361    }
    362    mozilla::HoldJSObjects(this);
    363  }
    364 
    365  MOZ_CAN_RUN_SCRIPT void WriteFinished(JSContext* aCx) {
    366    RefPtr<PipeToPump> pipeToPump = mPipeToPump;  // XXX known-live?
    367    JS::Rooted<Maybe<JS::Value>> error(aCx);
    368    if (mHasError) {
    369      error = Some(mError);
    370    }
    371    pipeToPump->ShutdownWithActionAfterFinishedWrite(aCx, mAction, error);
    372  }
    373 
    374  MOZ_CAN_RUN_SCRIPT void ResolvedCallback(JSContext* aCx,
    375                                           JS::Handle<JS::Value> aValue,
    376                                           ErrorResult&) override {
    377    WriteFinished(aCx);
    378  }
    379 
    380  MOZ_CAN_RUN_SCRIPT void RejectedCallback(JSContext* aCx,
    381                                           JS::Handle<JS::Value> aReason,
    382                                           ErrorResult&) override {
    383    WriteFinished(aCx);
    384  }
    385 };
    386 
    387 NS_IMPL_CYCLE_COLLECTION_WITH_JS_MEMBERS(WriteFinishedPromiseHandler,
    388                                         (mPipeToPump), (mError))
    389 NS_IMPL_CYCLE_COLLECTING_ADDREF(WriteFinishedPromiseHandler)
    390 NS_IMPL_CYCLE_COLLECTING_RELEASE(WriteFinishedPromiseHandler)
    391 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(WriteFinishedPromiseHandler)
    392  NS_INTERFACE_MAP_ENTRY(nsISupports)
    393 NS_INTERFACE_MAP_END
    394 
    395 // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action
    396 // Shutdown with an action: if any of the above requirements ask to shutdown
    397 // with an action action, optionally with an error originalError, then:
    398 void PipeToPump::ShutdownWithAction(
    399    JSContext* aCx, ShutdownAction aAction,
    400    JS::Handle<mozilla::Maybe<JS::Value>> aError) {
    401  // Step 1. If shuttingDown is true, abort these substeps.
    402  if (mShuttingDown) {
    403    return;
    404  }
    405 
    406  // Step 2. Set shuttingDown to true.
    407  mShuttingDown = true;
    408 
    409  // Step 3. If dest.[[state]] is "writable" and !
    410  // WritableStreamCloseQueuedOrInFlight(dest) is false,
    411  RefPtr<WritableStream> dest = mWriter->GetStream();
    412  if (dest->State() == WritableStream::WriterState::Writable &&
    413      !dest->CloseQueuedOrInFlight()) {
    414    // Step 3.1. If any chunks have been read but not yet written, write them to
    415    // dest.
    416    // Step 3.2. Wait until every chunk that has been read has been
    417    // written (i.e. the corresponding promises have settled).
    418    //
    419    // Note: Write requests are processed in order, so when the promise
    420    // for the last written chunk is settled all previous chunks have been
    421    // written as well.
    422    if (mLastWritePromise) {
    423      mLastWritePromise->AppendNativeHandler(
    424          new WriteFinishedPromiseHandler(aCx, this, aAction, aError));
    425      return;
    426    }
    427  }
    428 
    429  // Don't have to wait for last write, immediately continue.
    430  ShutdownWithActionAfterFinishedWrite(aCx, aAction, aError);
    431 }
    432 
    433 class ShutdownActionFinishedPromiseHandler final : public PromiseNativeHandler {
    434  RefPtr<PipeToPump> mPipeToPump;
    435  bool mHasError;
    436  JS::Heap<JS::Value> mError;
    437 
    438  virtual ~ShutdownActionFinishedPromiseHandler() {
    439    mozilla::DropJSObjects(this);
    440  }
    441 
    442 public:
    443  NS_DECL_CYCLE_COLLECTING_ISUPPORTS
    444  NS_DECL_CYCLE_COLLECTION_SCRIPT_HOLDER_CLASS(
    445      ShutdownActionFinishedPromiseHandler)
    446 
    447  explicit ShutdownActionFinishedPromiseHandler(
    448      JSContext* aCx, PipeToPump* aPipeToPump,
    449      JS::Handle<mozilla::Maybe<JS::Value>> aError)
    450      : mPipeToPump(aPipeToPump) {
    451    mHasError = aError.isSome();
    452    if (mHasError) {
    453      mError = *aError;
    454    }
    455    mozilla::HoldJSObjects(this);
    456  }
    457 
    458  void ResolvedCallback(JSContext* aCx, JS::Handle<JS::Value> aValue,
    459                        ErrorResult&) override {
    460    // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action
    461    // Step 5. Upon fulfillment of p, finalize, passing along originalError if
    462    // it was given.
    463    JS::Rooted<Maybe<JS::Value>> error(aCx);
    464    if (mHasError) {
    465      error = Some(mError);
    466    }
    467    mPipeToPump->Finalize(aCx, error);
    468  }
    469 
    470  void RejectedCallback(JSContext* aCx, JS::Handle<JS::Value> aReason,
    471                        ErrorResult&) override {
    472    // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action
    473    // Step 6. Upon rejection of p with reason newError, finalize with
    474    // newError.
    475    JS::Rooted<Maybe<JS::Value>> error(aCx, Some(aReason));
    476    mPipeToPump->Finalize(aCx, error);
    477  }
    478 };
    479 
    480 NS_IMPL_CYCLE_COLLECTION_WITH_JS_MEMBERS(ShutdownActionFinishedPromiseHandler,
    481                                         (mPipeToPump), (mError))
    482 NS_IMPL_CYCLE_COLLECTING_ADDREF(ShutdownActionFinishedPromiseHandler)
    483 NS_IMPL_CYCLE_COLLECTING_RELEASE(ShutdownActionFinishedPromiseHandler)
    484 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(ShutdownActionFinishedPromiseHandler)
    485  NS_INTERFACE_MAP_ENTRY(nsISupports)
    486 NS_INTERFACE_MAP_END
    487 
    488 // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action
    489 // Continuation after Step 3. triggered a promise resolution.
    490 void PipeToPump::ShutdownWithActionAfterFinishedWrite(
    491    JSContext* aCx, ShutdownAction aAction,
    492    JS::Handle<mozilla::Maybe<JS::Value>> aError) {
    493  if (!aAction) {
    494    // Used to implement shutdown without action. Finalize immediately.
    495    Finalize(aCx, aError);
    496    return;
    497  }
    498 
    499  // Step 4. Let p be the result of performing action.
    500  RefPtr<PipeToPump> thisRefPtr = this;
    501  ErrorResult rv;
    502  RefPtr<Promise> p = aAction(aCx, thisRefPtr, aError, rv);
    503 
    504  // Error while calling actions above, continue immediately with finalization.
    505  if (rv.MaybeSetPendingException(aCx)) {
    506    JS::Rooted<Maybe<JS::Value>> someError(aCx);
    507 
    508    JS::Rooted<JS::Value> error(aCx);
    509    if (JS_GetPendingException(aCx, &error)) {
    510      someError = Some(error.get());
    511    }
    512 
    513    JS_ClearPendingException(aCx);
    514 
    515    Finalize(aCx, someError);
    516    return;
    517  }
    518 
    519  // Steps 5-6.
    520  p->AppendNativeHandler(
    521      new ShutdownActionFinishedPromiseHandler(aCx, this, aError));
    522 }
    523 
    524 // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown
    525 // Shutdown: if any of the above requirements or steps ask to shutdown,
    526 // optionally with an error error, then:
    527 void PipeToPump::Shutdown(JSContext* aCx,
    528                          JS::Handle<mozilla::Maybe<JS::Value>> aError) {
    529  // Note: We implement "shutdown" in terms of "shutdown with action".
    530  // We can observe that when passing along an action that always succeeds
    531  // shutdown with action and shutdown have the same behavior, when
    532  // Ignoring the potential micro task for the promise that we skip anyway.
    533  ShutdownWithAction(aCx, nullptr, aError);
    534 }
    535 
    536 // https://streams.spec.whatwg.org/#rs-pipeTo-finalize
    537 // Finalize: both forms of shutdown will eventually ask to finalize,
    538 // optionally with an error error, which means to perform the following steps:
    539 void PipeToPump::Finalize(JSContext* aCx,
    540                          JS::Handle<mozilla::Maybe<JS::Value>> aError) {
    541  IgnoredErrorResult rv;
    542  // Step 1. Perform ! WritableStreamDefaultWriterRelease(writer).
    543  WritableStreamDefaultWriterRelease(aCx, mWriter);
    544 
    545  // Step 2. If reader implements ReadableStreamBYOBReader,
    546  // perform ! ReadableStreamBYOBReaderRelease(reader).
    547  // Note: We always use a default reader.
    548  MOZ_ASSERT(!mReader->IsBYOB());
    549 
    550  // Step 3. Otherwise, perform ! ReadableStreamDefaultReaderRelease(reader).
    551  ReadableStreamDefaultReaderRelease(aCx, mReader, rv);
    552  NS_WARNING_ASSERTION(!rv.Failed(),
    553                       "ReadableStreamReaderGenericRelease should not fail.");
    554 
    555  // Step 3. If signal is not undefined, remove abortAlgorithm from signal.
    556  if (IsFollowing()) {
    557    Unfollow();
    558  }
    559 
    560  // Step 4. If error was given, reject promise with error.
    561  if (aError.isSome()) {
    562    JS::Rooted<JS::Value> error(aCx, *aError);
    563    mPromise->MaybeReject(error);
    564  } else {
    565    // Step 5. Otherwise, resolve promise with undefined.
    566    mPromise->MaybeResolveWithUndefined();
    567  }
    568 
    569  // Remove all references.
    570  mPromise = nullptr;
    571  mReader = nullptr;
    572  mWriter = nullptr;
    573  mLastWritePromise = nullptr;
    574  Unfollow();
    575 }
    576 
    577 void PipeToPump::OnReadFulfilled(JSContext* aCx, JS::Handle<JS::Value> aChunk,
    578                                 ErrorResult& aRv) {
    579  // (Constraint) Shutdown must stop activity:
    580  // if shuttingDown becomes true, the user agent must not initiate further
    581  // reads from reader, and must only perform writes of already-read chunks ...
    582  //
    583  // We may reach this point after |On{Source,Dest}{Clos,Error}ed| has responded
    584  // to an out-of-band change.  Per the comment in |OnSourceErrored|, we want to
    585  // allow the implicated shutdown to proceed, and we don't want to interfere
    586  // with or additionally alter its operation.  Particularly, we don't want to
    587  // queue up the successfully-read chunk (if there was one, and this isn't just
    588  // reporting "done") to be written: it wasn't "already-read" when that
    589  // error/closure happened.
    590  //
    591  // All specified reactions to a closure/error invoke either the shutdown, or
    592  // shutdown with an action, algorithms.  Those algorithms each abort if either
    593  // shutdown algorithm has already been invoked.  So we check for shutdown here
    594  // in case of asynchronous closure/error and abort if shutdown has already
    595  // started (and possibly finished).
    596  //
    597  // TODO: Implement the eventual resolution from
    598  // https://github.com/whatwg/streams/issues/1207
    599  if (mShuttingDown) {
    600    return;
    601  }
    602 
    603  // Write asynchronously. Roughly this is like:
    604  // `Promise.resolve().then(() => stream.write(chunk));`
    605  // XXX: The spec currently does not require asynchronicity, but this still
    606  // matches other engines' behavior. See
    607  // https://github.com/whatwg/streams/issues/1243.
    608  RefPtr<Promise> promise =
    609      Promise::CreateInfallible(xpc::CurrentNativeGlobal(aCx));
    610  promise->MaybeResolveWithUndefined();
    611  auto result = promise->ThenWithCycleCollectedArgsJS(
    612      [](JSContext* aCx, JS::Handle<JS::Value>, ErrorResult& aRv,
    613         const RefPtr<PipeToPump>& aSelf,
    614         const RefPtr<WritableStreamDefaultWriter>& aWriter,
    615         JS::Handle<JS::Value> aChunk)
    616          MOZ_CAN_RUN_SCRIPT_FOR_DEFINITION -> already_AddRefed<Promise> {
    617            RefPtr<Promise> promise =
    618                WritableStreamDefaultWriterWrite(aCx, aWriter, aChunk, aRv);
    619 
    620            // Last read has finished, so it's time to start reading again.
    621            aSelf->Read(aCx);
    622 
    623            return promise.forget();
    624          },
    625      std::make_tuple(RefPtr{this}, mWriter), std::make_tuple(aChunk));
    626  if (result.isErr()) {
    627    mLastWritePromise = nullptr;
    628    return;
    629  }
    630  mLastWritePromise = result.unwrap();
    631 
    632  mLastWritePromise->AppendNativeHandler(
    633      new PipeToPumpHandler(this, nullptr, &PipeToPump::OnDestErrored));
    634 }
    635 
    636 void PipeToPump::OnWriterReady(JSContext* aCx, JS::Handle<JS::Value>) {
    637  // Writer is ready again (i.e. backpressure was resolved), so read.
    638  Read(aCx);
    639 }
    640 
    641 struct PipeToReadRequest : public ReadRequest {
    642 public:
    643  NS_DECL_ISUPPORTS_INHERITED
    644  NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(PipeToReadRequest, ReadRequest)
    645 
    646  RefPtr<PipeToPump> mPipeToPump;
    647 
    648  explicit PipeToReadRequest(PipeToPump* aPipeToPump)
    649      : mPipeToPump(aPipeToPump) {}
    650 
    651  MOZ_CAN_RUN_SCRIPT void ChunkSteps(JSContext* aCx,
    652                                     JS::Handle<JS::Value> aChunk,
    653                                     ErrorResult& aRv) override {
    654    RefPtr<PipeToPump> pipeToPump = mPipeToPump;  // XXX known live?
    655    pipeToPump->OnReadFulfilled(aCx, aChunk, aRv);
    656  }
    657 
    658  // The reader's closed promise handlers will already call OnSourceClosed/
    659  // OnSourceErrored, so these steps can just be ignored.
    660  void CloseSteps(JSContext* aCx, ErrorResult& aRv) override {}
    661  void ErrorSteps(JSContext* aCx, JS::Handle<JS::Value> aError,
    662                  ErrorResult& aRv) override {}
    663 
    664 protected:
    665  virtual ~PipeToReadRequest() = default;
    666 };
    667 
    668 NS_IMPL_CYCLE_COLLECTION_INHERITED(PipeToReadRequest, ReadRequest, mPipeToPump)
    669 
    670 NS_IMPL_ADDREF_INHERITED(PipeToReadRequest, ReadRequest)
    671 NS_IMPL_RELEASE_INHERITED(PipeToReadRequest, ReadRequest)
    672 
    673 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(PipeToReadRequest)
    674 NS_INTERFACE_MAP_END_INHERITING(ReadRequest)
    675 
    676 void PipeToPump::Read(JSContext* aCx) {
    677 #ifdef DEBUG
    678  mReadChunk = true;
    679 #endif
    680 
    681  // (Constraint) Shutdown must stop activity:
    682  // If shuttingDown becomes true, the user agent must not initiate
    683  // further reads from reader
    684  if (mShuttingDown) {
    685    return;
    686  }
    687 
    688  // (Constraint) Backpressure must be enforced:
    689  // While WritableStreamDefaultWriterGetDesiredSize(writer) is ≤ 0 or is null,
    690  // the user agent must not read from reader.
    691  Nullable<double> desiredSize =
    692      WritableStreamDefaultWriterGetDesiredSize(mWriter);
    693  if (desiredSize.IsNull()) {
    694    // This means the writer has errored. This is going to be handled
    695    // by the writer closed promise.
    696    return;
    697  }
    698 
    699  if (desiredSize.Value() <= 0) {
    700    // Wait for the writer to become ready before reading more data from
    701    // the reader. We don't care about rejections here, because those are
    702    // already handled by the writer closed promise.
    703    RefPtr<Promise> readyPromise = mWriter->Ready();
    704    readyPromise->AppendNativeHandler(
    705        new PipeToPumpHandler(this, &PipeToPump::OnWriterReady, nullptr));
    706    return;
    707  }
    708 
    709  RefPtr<ReadableStreamDefaultReader> reader = mReader;
    710  RefPtr<ReadRequest> request = new PipeToReadRequest(this);
    711  ErrorResult rv;
    712  ReadableStreamDefaultReaderRead(aCx, reader, request, rv);
    713  if (rv.MaybeSetPendingException(aCx)) {
    714    // XXX It's actually not quite obvious what we should do here.
    715    // We've got an error during reading, so on the surface it seems logical
    716    // to invoke `OnSourceErrored`. However in certain cases the required
    717    // condition > source.[[state]] is or becomes "errored" < won't actually
    718    // happen i.e. when `WritableStreamDefaultWriterWrite` called from
    719    // `OnReadFulfilled` (via PipeToReadRequest::ChunkSteps) fails in
    720    // a synchronous fashion.
    721    JS::Rooted<JS::Value> error(aCx);
    722    JS::Rooted<Maybe<JS::Value>> someError(aCx);
    723 
    724    // The error was moved to the JSContext by MaybeSetPendingException.
    725    if (JS_GetPendingException(aCx, &error)) {
    726      someError = Some(error.get());
    727    }
    728 
    729    JS_ClearPendingException(aCx);
    730 
    731    Shutdown(aCx, someError);
    732  }
    733 }
    734 
    735 // Step 3. Closing must be propagated forward: if source.[[state]] is or
    736 // becomes "closed", then
    737 void PipeToPump::OnSourceClosed(JSContext* aCx, JS::Handle<JS::Value>) {
    738  // Step 3.1. If preventClose is false, shutdown with an action of
    739  // ! WritableStreamDefaultWriterCloseWithErrorPropagation(writer).
    740  if (!mPreventClose) {
    741    ShutdownWithAction(
    742        aCx,
    743        [](JSContext* aCx, PipeToPump* aPipeToPump,
    744           JS::Handle<mozilla::Maybe<JS::Value>> aError, ErrorResult& aRv)
    745            MOZ_CAN_RUN_SCRIPT {
    746              RefPtr<WritableStreamDefaultWriter> writer = aPipeToPump->mWriter;
    747              return WritableStreamDefaultWriterCloseWithErrorPropagation(
    748                  aCx, writer, aRv);
    749            },
    750        JS::NothingHandleValue);
    751  } else {
    752    // Step 3.2 Otherwise, shutdown.
    753    Shutdown(aCx, JS::NothingHandleValue);
    754  }
    755 }
    756 
    757 // Step 1. Errors must be propagated forward: if source.[[state]] is or
    758 // becomes "errored", then
    759 void PipeToPump::OnSourceErrored(JSContext* aCx,
    760                                 JS::Handle<JS::Value> aSourceStoredError) {
    761  // If |source| becomes errored not during a pending read, it's clear we must
    762  // react immediately.
    763  //
    764  // But what if |source| becomes errored *during* a pending read?  Should this
    765  // first error, or the pending-read second error, predominate?  Two semantics
    766  // are possible when |source|/|dest| become closed or errored while there's a
    767  // pending read:
    768  //
    769  //   1. Wait until the read fulfills or rejects, then respond to the
    770  //      closure/error without regard to the read having fulfilled or rejected.
    771  //      (This will simply not react to the read being rejected, or it will
    772  //      queue up the read chunk to be written during shutdown.)
    773  //   2. React to the closure/error immediately per "Error and close states
    774  //      must be propagated".  Then when the read fulfills or rejects later, do
    775  //      nothing.
    776  //
    777  // The spec doesn't clearly require either semantics.  It requires that
    778  // *already-read* chunks be written (at least if |dest| didn't become errored
    779  // or closed such that no further writes can occur).  But it's silent as to
    780  // not-fully-read chunks.  (These semantic differences may only be observable
    781  // with very carefully constructed readable/writable streams.)
    782  //
    783  // It seems best, generally, to react to the temporally-earliest problem that
    784  // arises, so we implement option #2.  (Blink, in contrast, currently
    785  // implements option #1.)
    786  //
    787  // All specified reactions to a closure/error invoke either the shutdown, or
    788  // shutdown with an action, algorithms.  Those algorithms each abort if either
    789  // shutdown algorithm has already been invoked.  So we don't need to do
    790  // anything special here to deal with a pending read.
    791  //
    792  // TODO: Implement the eventual resolution from
    793  // https://github.com/whatwg/streams/issues/1207
    794 
    795  // Step 1.1 If preventAbort is false, shutdown with an action of
    796  // ! WritableStreamAbort(dest, source.[[storedError]])
    797  // and with source.[[storedError]].
    798  JS::Rooted<Maybe<JS::Value>> error(aCx, Some(aSourceStoredError));
    799  if (!mPreventAbort) {
    800    ShutdownWithAction(
    801        aCx,
    802        [](JSContext* aCx, PipeToPump* aPipeToPump,
    803           JS::Handle<mozilla::Maybe<JS::Value>> aError, ErrorResult& aRv)
    804            MOZ_CAN_RUN_SCRIPT {
    805              JS::Rooted<JS::Value> error(aCx, *aError);
    806              RefPtr<WritableStream> dest = aPipeToPump->mWriter->GetStream();
    807              return WritableStreamAbort(aCx, dest, error, aRv);
    808            },
    809        error);
    810  } else {
    811    // Step 1.1. Otherwise, shutdown with source.[[storedError]].
    812    Shutdown(aCx, error);
    813  }
    814 }
    815 
    816 // Step 4. Closing must be propagated backward:
    817 // if ! WritableStreamCloseQueuedOrInFlight(dest) is true
    818 // or dest.[[state]] is "closed", then
    819 void PipeToPump::OnDestClosed(JSContext* aCx, JS::Handle<JS::Value>) {
    820  // Step 4.1. Assert: no chunks have been read or written.
    821  // Note: No reading automatically implies no writing.
    822  // In a perfect world OnDestClosed would only be called before we start
    823  // piping, because afterwards the writer has an exclusive lock on the stream.
    824  // In reality the closed promise can still be resolved after we release
    825  // the lock on the writer in Finalize.
    826  if (mShuttingDown) {
    827    return;
    828  }
    829  MOZ_ASSERT(!mReadChunk);
    830 
    831  // Step 4.2. Let destClosed be a new TypeError.
    832  JS::Rooted<Maybe<JS::Value>> destClosed(aCx, Nothing());
    833  {
    834    ErrorResult rv;
    835    rv.ThrowTypeError("Cannot pipe to closed stream");
    836    JS::Rooted<JS::Value> error(aCx);
    837    bool ok = ToJSValue(aCx, std::move(rv), &error);
    838    MOZ_RELEASE_ASSERT(ok, "must be ok");
    839    destClosed = Some(error.get());
    840  }
    841 
    842  // Step 4.3. If preventCancel is false, shutdown with an action of
    843  // ! ReadableStreamCancel(source, destClosed) and with destClosed.
    844  if (!mPreventCancel) {
    845    ShutdownWithAction(
    846        aCx,
    847        [](JSContext* aCx, PipeToPump* aPipeToPump,
    848           JS::Handle<mozilla::Maybe<JS::Value>> aError, ErrorResult& aRv)
    849            MOZ_CAN_RUN_SCRIPT {
    850              JS::Rooted<JS::Value> error(aCx, *aError);
    851              RefPtr<ReadableStream> dest = aPipeToPump->mReader->GetStream();
    852              return ReadableStreamCancel(aCx, dest, error, aRv);
    853            },
    854        destClosed);
    855  } else {
    856    // Step 4.4. Otherwise, shutdown with destClosed.
    857    Shutdown(aCx, destClosed);
    858  }
    859 }
    860 
    861 // Step 2. Errors must be propagated backward: if dest.[[state]] is or becomes
    862 // "errored", then
    863 void PipeToPump::OnDestErrored(JSContext* aCx,
    864                               JS::Handle<JS::Value> aDestStoredError) {
    865  // Step 2.1. If preventCancel is false, shutdown with an action of
    866  // ! ReadableStreamCancel(source, dest.[[storedError]])
    867  // and with dest.[[storedError]].
    868  JS::Rooted<Maybe<JS::Value>> error(aCx, Some(aDestStoredError));
    869  if (!mPreventCancel) {
    870    ShutdownWithAction(
    871        aCx,
    872        [](JSContext* aCx, PipeToPump* aPipeToPump,
    873           JS::Handle<mozilla::Maybe<JS::Value>> aError, ErrorResult& aRv)
    874            MOZ_CAN_RUN_SCRIPT {
    875              JS::Rooted<JS::Value> error(aCx, *aError);
    876              RefPtr<ReadableStream> dest = aPipeToPump->mReader->GetStream();
    877              return ReadableStreamCancel(aCx, dest, error, aRv);
    878            },
    879        error);
    880  } else {
    881    // Step 2.1. Otherwise, shutdown with dest.[[storedError]].
    882    Shutdown(aCx, error);
    883  }
    884 }
    885 
    886 NS_IMPL_CYCLE_COLLECTION_CLASS(PipeToPump)
    887 NS_IMPL_CYCLE_COLLECTING_ADDREF(PipeToPump)
    888 NS_IMPL_CYCLE_COLLECTING_RELEASE(PipeToPump)
    889 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(PipeToPump)
    890  NS_INTERFACE_MAP_ENTRY(nsISupports)
    891 NS_INTERFACE_MAP_END
    892 
    893 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN(PipeToPump)
    894  NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mPromise)
    895  NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mReader)
    896  NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mWriter)
    897  NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mLastWritePromise)
    898 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END
    899 
    900 NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(PipeToPump)
    901  NS_IMPL_CYCLE_COLLECTION_UNLINK(mPromise)
    902  NS_IMPL_CYCLE_COLLECTION_UNLINK(mReader)
    903  NS_IMPL_CYCLE_COLLECTION_UNLINK(mWriter)
    904  NS_IMPL_CYCLE_COLLECTION_UNLINK(mLastWritePromise)
    905 NS_IMPL_CYCLE_COLLECTION_UNLINK_END
    906 
    907 namespace streams_abstract {
    908 // https://streams.spec.whatwg.org/#readable-stream-pipe-to
    909 already_AddRefed<Promise> ReadableStreamPipeTo(
    910    ReadableStream* aSource, WritableStream* aDest, bool aPreventClose,
    911    bool aPreventAbort, bool aPreventCancel, AbortSignal* aSignal,
    912    mozilla::ErrorResult& aRv) {
    913  // Step 1. Assert: source implements ReadableStream. (Implicit)
    914  // Step 2. Assert: dest implements WritableStream. (Implicit)
    915  // Step 3. Assert: preventClose, preventAbort, and preventCancel are all
    916  //         booleans (Implicit)
    917  // Step 4. If signal was not given, let signal be
    918  //         undefined. (Implicit)
    919  // Step 5. Assert: either signal is undefined, or signal
    920  //         implements AbortSignal. (Implicit)
    921  // Step 6. Assert: !IsReadableStreamLocked(source) is false.
    922  MOZ_ASSERT(!IsReadableStreamLocked(aSource));
    923 
    924  // Step 7. Assert: !IsWritableStreamLocked(dest) is false.
    925  MOZ_ASSERT(!IsWritableStreamLocked(aDest));
    926 
    927  AutoJSAPI jsapi;
    928  if (!jsapi.Init(aSource->GetParentObject())) {
    929    aRv.ThrowUnknownError("Internal error");
    930    return nullptr;
    931  }
    932  JSContext* cx = jsapi.cx();
    933 
    934  // Step 8. If source.[[controller]] implements ReadableByteStreamController,
    935  //         let reader be either !AcquireReadableStreamBYOBReader(source) or
    936  //         !AcquireReadableStreamDefaultReader(source), at the user agent’s
    937  //         discretion.
    938  // Step 9. Otherwise, let reader be
    939  //         !AcquireReadableStreamDefaultReader(source).
    940 
    941  // Note: In the interests of simplicity, we choose here to always acquire
    942  // a default reader.
    943  RefPtr<ReadableStreamDefaultReader> reader =
    944      AcquireReadableStreamDefaultReader(aSource, aRv);
    945  if (aRv.Failed()) {
    946    return nullptr;
    947  }
    948 
    949  // Step 10. Let writer be ! AcquireWritableStreamDefaultWriter(dest).
    950  RefPtr<WritableStreamDefaultWriter> writer =
    951      AcquireWritableStreamDefaultWriter(aDest, aRv);
    952  if (aRv.Failed()) {
    953    return nullptr;
    954  }
    955 
    956  // Step 11. Set source.[[disturbed]] to true.
    957  aSource->SetDisturbed(true);
    958 
    959  // Step 12. Let shuttingDown be false.
    960  // Note: PipeToPump ensures this by construction.
    961 
    962  // Step 13. Let promise be a new promise.
    963  RefPtr<Promise> promise =
    964      Promise::CreateInfallible(aSource->GetParentObject());
    965 
    966  // Steps 14-15.
    967  RefPtr<PipeToPump> pump = new PipeToPump(
    968      promise, reader, writer, aPreventClose, aPreventAbort, aPreventCancel);
    969  pump->Start(cx, aSignal);
    970 
    971  // Step 16. Return promise.
    972  return promise.forget();
    973 }
    974 }  // namespace streams_abstract
    975 
    976 }  // namespace mozilla::dom