ReadableStreamPipeTo.cpp (37018B)
1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* vim:set ts=2 sw=2 sts=2 et cindent: */ 3 /* This Source Code Form is subject to the terms of the Mozilla Public 4 * License, v. 2.0. If a copy of the MPL was not distributed with this 5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 6 7 #include "ReadableStreamPipeTo.h" 8 9 #include "js/Exception.h" 10 #include "mozilla/AlreadyAddRefed.h" 11 #include "mozilla/ErrorResult.h" 12 #include "mozilla/dom/AbortFollower.h" 13 #include "mozilla/dom/AbortSignal.h" 14 #include "mozilla/dom/Promise-inl.h" 15 #include "mozilla/dom/Promise.h" 16 #include "mozilla/dom/PromiseNativeHandler.h" 17 #include "mozilla/dom/ReadableStream.h" 18 #include "mozilla/dom/ReadableStreamDefaultReader.h" 19 #include "mozilla/dom/WritableStream.h" 20 #include "mozilla/dom/WritableStreamDefaultWriter.h" 21 #include "nsCycleCollectionParticipant.h" 22 #include "nsISupportsImpl.h" 23 24 namespace mozilla::dom { 25 26 using namespace streams_abstract; 27 28 struct PipeToReadRequest; 29 class WriteFinishedPromiseHandler; 30 class ShutdownActionFinishedPromiseHandler; 31 32 // https://streams.spec.whatwg.org/#readable-stream-pipe-to (Steps 14-15.) 33 // 34 // This class implements everything that is required to read all chunks from 35 // the reader (source) and write them to writer (destination), while 36 // following the constraints given in the spec using our implementation-defined 37 // behavior. 38 // 39 // The cycle-collected references look roughly like this: 40 // clang-format off 41 // 42 // Closed promise <-- ReadableStreamDefaultReader <--> ReadableStream 43 // | ^ | 44 // |(PromiseHandler) |(mReader) |(ReadRequest) 45 // | | | 46 // |-------------> PipeToPump <------- 47 // ^ | | 48 // |---------------| | | 49 // | | |-------(mLastWrite) --------> 50 // |(PromiseHandler) | |< ---- (PromiseHandler) ---- Promise 51 // | | ^ 52 // | |(mWriter) |(mWriteRequests) 53 // | v | 54 // Closed promise <-- WritableStreamDefaultWriter <--------> WritableStream 55 // 56 // clang-format on 57 class PipeToPump final : public AbortFollower { 58 NS_DECL_CYCLE_COLLECTING_ISUPPORTS 59 NS_DECL_CYCLE_COLLECTION_CLASS(PipeToPump) 60 61 friend struct PipeToReadRequest; 62 friend class WriteFinishedPromiseHandler; 63 friend class ShutdownActionFinishedPromiseHandler; 64 65 PipeToPump(Promise* aPromise, ReadableStreamDefaultReader* aReader, 66 WritableStreamDefaultWriter* aWriter, bool aPreventClose, 67 bool aPreventAbort, bool aPreventCancel) 68 : mPromise(aPromise), 69 mReader(aReader), 70 mWriter(aWriter), 71 mPreventClose(aPreventClose), 72 mPreventAbort(aPreventAbort), 73 mPreventCancel(aPreventCancel) {} 74 75 MOZ_CAN_RUN_SCRIPT void Start(JSContext* aCx, AbortSignal* aSignal); 76 77 MOZ_CAN_RUN_SCRIPT_BOUNDARY void RunAbortAlgorithm() override; 78 79 private: 80 ~PipeToPump() override = default; 81 82 MOZ_CAN_RUN_SCRIPT void PerformAbortAlgorithm(JSContext* aCx, 83 AbortSignalImpl* aSignal); 84 85 MOZ_CAN_RUN_SCRIPT bool SourceOrDestErroredOrClosed(JSContext* aCx); 86 87 using ShutdownAction = already_AddRefed<Promise> (*)( 88 JSContext*, PipeToPump*, JS::Handle<mozilla::Maybe<JS::Value>>, 89 ErrorResult&); 90 91 MOZ_CAN_RUN_SCRIPT void ShutdownWithAction( 92 JSContext* aCx, ShutdownAction aAction, 93 JS::Handle<mozilla::Maybe<JS::Value>> aError); 94 MOZ_CAN_RUN_SCRIPT void ShutdownWithActionAfterFinishedWrite( 95 JSContext* aCx, ShutdownAction aAction, 96 JS::Handle<mozilla::Maybe<JS::Value>> aError); 97 98 MOZ_CAN_RUN_SCRIPT void Shutdown( 99 JSContext* aCx, JS::Handle<mozilla::Maybe<JS::Value>> aError); 100 101 void Finalize(JSContext* aCx, JS::Handle<mozilla::Maybe<JS::Value>> aError); 102 103 MOZ_CAN_RUN_SCRIPT void OnReadFulfilled(JSContext* aCx, 104 JS::Handle<JS::Value> aChunk, 105 ErrorResult& aRv); 106 MOZ_CAN_RUN_SCRIPT void OnWriterReady(JSContext* aCx, JS::Handle<JS::Value>); 107 MOZ_CAN_RUN_SCRIPT void Read(JSContext* aCx); 108 109 MOZ_CAN_RUN_SCRIPT void OnSourceClosed(JSContext* aCx, JS::Handle<JS::Value>); 110 MOZ_CAN_RUN_SCRIPT void OnSourceErrored( 111 JSContext* aCx, JS::Handle<JS::Value> aSourceStoredError); 112 113 MOZ_CAN_RUN_SCRIPT void OnDestClosed(JSContext* aCx, JS::Handle<JS::Value>); 114 MOZ_CAN_RUN_SCRIPT void OnDestErrored(JSContext* aCx, 115 JS::Handle<JS::Value> aDestStoredError); 116 117 RefPtr<Promise> mPromise; 118 RefPtr<ReadableStreamDefaultReader> mReader; 119 RefPtr<WritableStreamDefaultWriter> mWriter; 120 RefPtr<Promise> mLastWritePromise; 121 const bool mPreventClose; 122 const bool mPreventAbort; 123 const bool mPreventCancel; 124 bool mShuttingDown = false; 125 #ifdef DEBUG 126 bool mReadChunk = false; 127 #endif 128 }; 129 130 // This is a helper class for PipeToPump that allows it to attach 131 // member functions as promise handlers. 132 class PipeToPumpHandler final : public PromiseNativeHandler { 133 virtual ~PipeToPumpHandler() = default; 134 135 using FunPtr = void (PipeToPump::*)(JSContext*, JS::Handle<JS::Value>); 136 137 RefPtr<PipeToPump> mPipeToPump; 138 FunPtr mResolved; 139 FunPtr mRejected; 140 141 public: 142 NS_DECL_CYCLE_COLLECTING_ISUPPORTS 143 NS_DECL_CYCLE_COLLECTION_CLASS(PipeToPumpHandler) 144 145 explicit PipeToPumpHandler(PipeToPump* aPipeToPump, FunPtr aResolved, 146 FunPtr aRejected) 147 : mPipeToPump(aPipeToPump), mResolved(aResolved), mRejected(aRejected) {} 148 149 void ResolvedCallback(JSContext* aCx, JS::Handle<JS::Value> aValue, 150 ErrorResult&) override { 151 if (mResolved) { 152 (mPipeToPump->*mResolved)(aCx, aValue); 153 } 154 } 155 156 void RejectedCallback(JSContext* aCx, JS::Handle<JS::Value> aReason, 157 ErrorResult&) override { 158 if (mRejected) { 159 (mPipeToPump->*mRejected)(aCx, aReason); 160 } 161 } 162 }; 163 164 NS_IMPL_CYCLE_COLLECTION(PipeToPumpHandler, mPipeToPump) 165 NS_IMPL_CYCLE_COLLECTING_ADDREF(PipeToPumpHandler) 166 NS_IMPL_CYCLE_COLLECTING_RELEASE(PipeToPumpHandler) 167 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(PipeToPumpHandler) 168 NS_INTERFACE_MAP_ENTRY(nsISupports) 169 NS_INTERFACE_MAP_END 170 171 void PipeToPump::RunAbortAlgorithm() { 172 AutoJSAPI jsapi; 173 if (!jsapi.Init(mReader->GetStream()->GetParentObject())) { 174 NS_WARNING( 175 "Failed to initialize AutoJSAPI in PipeToPump::RunAbortAlgorithm"); 176 return; 177 } 178 JSContext* cx = jsapi.cx(); 179 180 RefPtr<AbortSignalImpl> signal = Signal(); 181 PerformAbortAlgorithm(cx, signal); 182 } 183 184 void PipeToPump::PerformAbortAlgorithm(JSContext* aCx, 185 AbortSignalImpl* aSignal) { 186 MOZ_ASSERT(aSignal->Aborted()); 187 188 // https://streams.spec.whatwg.org/#readable-stream-pipe-to 189 // Step 14.1. Let abortAlgorithm be the following steps: 190 // Note: All the following steps are 14.1.xx 191 192 // Step 1. Let error be signal’s abort reason. 193 JS::Rooted<JS::Value> error(aCx); 194 aSignal->GetReason(aCx, &error); 195 196 auto action = [](JSContext* aCx, PipeToPump* aPipeToPump, 197 JS::Handle<mozilla::Maybe<JS::Value>> aError, 198 ErrorResult& aRv) MOZ_CAN_RUN_SCRIPT { 199 JS::Rooted<JS::Value> error(aCx, *aError); 200 201 // Step 2. Let actions be an empty ordered set. 202 nsTArray<RefPtr<Promise>> actions; 203 204 // Step 3. If preventAbort is false, append the following action to actions: 205 if (!aPipeToPump->mPreventAbort) { 206 RefPtr<WritableStream> dest = aPipeToPump->mWriter->GetStream(); 207 208 // Step 3.1. If dest.[[state]] is "writable", return ! 209 // WritableStreamAbort(dest, error). 210 if (dest->State() == WritableStream::WriterState::Writable) { 211 RefPtr<Promise> p = WritableStreamAbort(aCx, dest, error, aRv); 212 if (aRv.Failed()) { 213 return already_AddRefed<Promise>(); 214 } 215 actions.AppendElement(p); 216 } 217 218 // Step 3.2. Otherwise, return a promise resolved with undefined. 219 // Note: This is basically a no-op. 220 } 221 222 // Step 4. If preventCancel is false, append the following action action to 223 // actions: 224 if (!aPipeToPump->mPreventCancel) { 225 RefPtr<ReadableStream> source = aPipeToPump->mReader->GetStream(); 226 227 // Step 4.1. If source.[[state]] is "readable", return ! 228 // ReadableStreamCancel(source, error). 229 if (source->State() == ReadableStream::ReaderState::Readable) { 230 RefPtr<Promise> p = ReadableStreamCancel(aCx, source, error, aRv); 231 if (aRv.Failed()) { 232 return already_AddRefed<Promise>(); 233 } 234 actions.AppendElement(p); 235 } 236 237 // Step 4.2. Otherwise, return a promise resolved with undefined. 238 // No-op again. 239 } 240 241 // Step 5. .. action consisting of getting a promise to wait for 242 // all of the actions in actions ... 243 return Promise::All(aCx, actions, aRv); 244 }; 245 246 // Step 5. Shutdown with an action consisting of getting a promise to wait for 247 // all of the actions in actions, and with error. 248 JS::Rooted<Maybe<JS::Value>> someError(aCx, Some(error.get())); 249 ShutdownWithAction(aCx, action, someError); 250 } 251 252 bool PipeToPump::SourceOrDestErroredOrClosed(JSContext* aCx) { 253 // (Constraint) Error and close states must be propagated: 254 // the following conditions must be applied in order. 255 RefPtr<ReadableStream> source = mReader->GetStream(); 256 RefPtr<WritableStream> dest = mWriter->GetStream(); 257 258 // Step 1. Errors must be propagated forward: if source.[[state]] is or 259 // becomes "errored", then 260 if (source->State() == ReadableStream::ReaderState::Errored) { 261 JS::Rooted<JS::Value> storedError(aCx, source->StoredError()); 262 OnSourceErrored(aCx, storedError); 263 return true; 264 } 265 266 // Step 2. Errors must be propagated backward: if dest.[[state]] is or becomes 267 // "errored", then 268 if (dest->State() == WritableStream::WriterState::Errored) { 269 JS::Rooted<JS::Value> storedError(aCx, dest->StoredError()); 270 OnDestErrored(aCx, storedError); 271 return true; 272 } 273 274 // Step 3. Closing must be propagated forward: if source.[[state]] is or 275 // becomes "closed", then 276 if (source->State() == ReadableStream::ReaderState::Closed) { 277 OnSourceClosed(aCx, JS::UndefinedHandleValue); 278 return true; 279 } 280 281 // Step 4. Closing must be propagated backward: 282 // if ! WritableStreamCloseQueuedOrInFlight(dest) is true 283 // or dest.[[state]] is "closed", then 284 if (dest->CloseQueuedOrInFlight() || 285 dest->State() == WritableStream::WriterState::Closed) { 286 OnDestClosed(aCx, JS::UndefinedHandleValue); 287 return true; 288 } 289 290 return false; 291 } 292 293 // https://streams.spec.whatwg.org/#readable-stream-pipe-to 294 // Steps 14-15. 295 void PipeToPump::Start(JSContext* aCx, AbortSignal* aSignal) { 296 // Step 14. If signal is not undefined, 297 if (aSignal) { 298 // Step 14.1. Let abortAlgorithm be the following steps: 299 // ... This is implemented by RunAbortAlgorithm. 300 301 // Step 14.2. If signal is aborted, perform abortAlgorithm and 302 // return promise. 303 if (aSignal->Aborted()) { 304 PerformAbortAlgorithm(aCx, aSignal); 305 return; 306 } 307 308 // Step 14.3. Add abortAlgorithm to signal. 309 Follow(aSignal); 310 } 311 312 // Step 15. In parallel but not really; see #905, using reader and writer, 313 // read all chunks from source and write them to dest. 314 // Due to the locking provided by the reader and writer, 315 // the exact manner in which this happens is not observable to author code, 316 // and so there is flexibility in how this is done. 317 318 // (Constraint) Error and close states must be propagated 319 320 // Before piping has started, we have to check for source/destination being 321 // errored/closed manually. 322 if (SourceOrDestErroredOrClosed(aCx)) { 323 return; 324 } 325 326 // We use the following two promises to propagate error and close states 327 // during piping. 328 RefPtr<Promise> readerClosed = mReader->ClosedPromise(); 329 readerClosed->AppendNativeHandler(new PipeToPumpHandler( 330 this, &PipeToPump::OnSourceClosed, &PipeToPump::OnSourceErrored)); 331 332 // Note: Because we control the destination/writer it should never be closed 333 // after we did the initial check above with SourceOrDestErroredOrClosed. 334 RefPtr<Promise> writerClosed = mWriter->ClosedPromise(); 335 writerClosed->AppendNativeHandler(new PipeToPumpHandler( 336 this, &PipeToPump::OnDestClosed, &PipeToPump::OnDestErrored)); 337 338 Read(aCx); 339 } 340 341 class WriteFinishedPromiseHandler final : public PromiseNativeHandler { 342 RefPtr<PipeToPump> mPipeToPump; 343 PipeToPump::ShutdownAction mAction; 344 bool mHasError; 345 JS::Heap<JS::Value> mError; 346 347 virtual ~WriteFinishedPromiseHandler() { mozilla::DropJSObjects(this); }; 348 349 public: 350 NS_DECL_CYCLE_COLLECTING_ISUPPORTS 351 NS_DECL_CYCLE_COLLECTION_SCRIPT_HOLDER_CLASS(WriteFinishedPromiseHandler) 352 353 explicit WriteFinishedPromiseHandler( 354 JSContext* aCx, PipeToPump* aPipeToPump, 355 PipeToPump::ShutdownAction aAction, 356 JS::Handle<mozilla::Maybe<JS::Value>> aError) 357 : mPipeToPump(aPipeToPump), mAction(aAction) { 358 mHasError = aError.isSome(); 359 if (mHasError) { 360 mError = *aError; 361 } 362 mozilla::HoldJSObjects(this); 363 } 364 365 MOZ_CAN_RUN_SCRIPT void WriteFinished(JSContext* aCx) { 366 RefPtr<PipeToPump> pipeToPump = mPipeToPump; // XXX known-live? 367 JS::Rooted<Maybe<JS::Value>> error(aCx); 368 if (mHasError) { 369 error = Some(mError); 370 } 371 pipeToPump->ShutdownWithActionAfterFinishedWrite(aCx, mAction, error); 372 } 373 374 MOZ_CAN_RUN_SCRIPT void ResolvedCallback(JSContext* aCx, 375 JS::Handle<JS::Value> aValue, 376 ErrorResult&) override { 377 WriteFinished(aCx); 378 } 379 380 MOZ_CAN_RUN_SCRIPT void RejectedCallback(JSContext* aCx, 381 JS::Handle<JS::Value> aReason, 382 ErrorResult&) override { 383 WriteFinished(aCx); 384 } 385 }; 386 387 NS_IMPL_CYCLE_COLLECTION_WITH_JS_MEMBERS(WriteFinishedPromiseHandler, 388 (mPipeToPump), (mError)) 389 NS_IMPL_CYCLE_COLLECTING_ADDREF(WriteFinishedPromiseHandler) 390 NS_IMPL_CYCLE_COLLECTING_RELEASE(WriteFinishedPromiseHandler) 391 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(WriteFinishedPromiseHandler) 392 NS_INTERFACE_MAP_ENTRY(nsISupports) 393 NS_INTERFACE_MAP_END 394 395 // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action 396 // Shutdown with an action: if any of the above requirements ask to shutdown 397 // with an action action, optionally with an error originalError, then: 398 void PipeToPump::ShutdownWithAction( 399 JSContext* aCx, ShutdownAction aAction, 400 JS::Handle<mozilla::Maybe<JS::Value>> aError) { 401 // Step 1. If shuttingDown is true, abort these substeps. 402 if (mShuttingDown) { 403 return; 404 } 405 406 // Step 2. Set shuttingDown to true. 407 mShuttingDown = true; 408 409 // Step 3. If dest.[[state]] is "writable" and ! 410 // WritableStreamCloseQueuedOrInFlight(dest) is false, 411 RefPtr<WritableStream> dest = mWriter->GetStream(); 412 if (dest->State() == WritableStream::WriterState::Writable && 413 !dest->CloseQueuedOrInFlight()) { 414 // Step 3.1. If any chunks have been read but not yet written, write them to 415 // dest. 416 // Step 3.2. Wait until every chunk that has been read has been 417 // written (i.e. the corresponding promises have settled). 418 // 419 // Note: Write requests are processed in order, so when the promise 420 // for the last written chunk is settled all previous chunks have been 421 // written as well. 422 if (mLastWritePromise) { 423 mLastWritePromise->AppendNativeHandler( 424 new WriteFinishedPromiseHandler(aCx, this, aAction, aError)); 425 return; 426 } 427 } 428 429 // Don't have to wait for last write, immediately continue. 430 ShutdownWithActionAfterFinishedWrite(aCx, aAction, aError); 431 } 432 433 class ShutdownActionFinishedPromiseHandler final : public PromiseNativeHandler { 434 RefPtr<PipeToPump> mPipeToPump; 435 bool mHasError; 436 JS::Heap<JS::Value> mError; 437 438 virtual ~ShutdownActionFinishedPromiseHandler() { 439 mozilla::DropJSObjects(this); 440 } 441 442 public: 443 NS_DECL_CYCLE_COLLECTING_ISUPPORTS 444 NS_DECL_CYCLE_COLLECTION_SCRIPT_HOLDER_CLASS( 445 ShutdownActionFinishedPromiseHandler) 446 447 explicit ShutdownActionFinishedPromiseHandler( 448 JSContext* aCx, PipeToPump* aPipeToPump, 449 JS::Handle<mozilla::Maybe<JS::Value>> aError) 450 : mPipeToPump(aPipeToPump) { 451 mHasError = aError.isSome(); 452 if (mHasError) { 453 mError = *aError; 454 } 455 mozilla::HoldJSObjects(this); 456 } 457 458 void ResolvedCallback(JSContext* aCx, JS::Handle<JS::Value> aValue, 459 ErrorResult&) override { 460 // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action 461 // Step 5. Upon fulfillment of p, finalize, passing along originalError if 462 // it was given. 463 JS::Rooted<Maybe<JS::Value>> error(aCx); 464 if (mHasError) { 465 error = Some(mError); 466 } 467 mPipeToPump->Finalize(aCx, error); 468 } 469 470 void RejectedCallback(JSContext* aCx, JS::Handle<JS::Value> aReason, 471 ErrorResult&) override { 472 // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action 473 // Step 6. Upon rejection of p with reason newError, finalize with 474 // newError. 475 JS::Rooted<Maybe<JS::Value>> error(aCx, Some(aReason)); 476 mPipeToPump->Finalize(aCx, error); 477 } 478 }; 479 480 NS_IMPL_CYCLE_COLLECTION_WITH_JS_MEMBERS(ShutdownActionFinishedPromiseHandler, 481 (mPipeToPump), (mError)) 482 NS_IMPL_CYCLE_COLLECTING_ADDREF(ShutdownActionFinishedPromiseHandler) 483 NS_IMPL_CYCLE_COLLECTING_RELEASE(ShutdownActionFinishedPromiseHandler) 484 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(ShutdownActionFinishedPromiseHandler) 485 NS_INTERFACE_MAP_ENTRY(nsISupports) 486 NS_INTERFACE_MAP_END 487 488 // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown-with-action 489 // Continuation after Step 3. triggered a promise resolution. 490 void PipeToPump::ShutdownWithActionAfterFinishedWrite( 491 JSContext* aCx, ShutdownAction aAction, 492 JS::Handle<mozilla::Maybe<JS::Value>> aError) { 493 if (!aAction) { 494 // Used to implement shutdown without action. Finalize immediately. 495 Finalize(aCx, aError); 496 return; 497 } 498 499 // Step 4. Let p be the result of performing action. 500 RefPtr<PipeToPump> thisRefPtr = this; 501 ErrorResult rv; 502 RefPtr<Promise> p = aAction(aCx, thisRefPtr, aError, rv); 503 504 // Error while calling actions above, continue immediately with finalization. 505 if (rv.MaybeSetPendingException(aCx)) { 506 JS::Rooted<Maybe<JS::Value>> someError(aCx); 507 508 JS::Rooted<JS::Value> error(aCx); 509 if (JS_GetPendingException(aCx, &error)) { 510 someError = Some(error.get()); 511 } 512 513 JS_ClearPendingException(aCx); 514 515 Finalize(aCx, someError); 516 return; 517 } 518 519 // Steps 5-6. 520 p->AppendNativeHandler( 521 new ShutdownActionFinishedPromiseHandler(aCx, this, aError)); 522 } 523 524 // https://streams.spec.whatwg.org/#rs-pipeTo-shutdown 525 // Shutdown: if any of the above requirements or steps ask to shutdown, 526 // optionally with an error error, then: 527 void PipeToPump::Shutdown(JSContext* aCx, 528 JS::Handle<mozilla::Maybe<JS::Value>> aError) { 529 // Note: We implement "shutdown" in terms of "shutdown with action". 530 // We can observe that when passing along an action that always succeeds 531 // shutdown with action and shutdown have the same behavior, when 532 // Ignoring the potential micro task for the promise that we skip anyway. 533 ShutdownWithAction(aCx, nullptr, aError); 534 } 535 536 // https://streams.spec.whatwg.org/#rs-pipeTo-finalize 537 // Finalize: both forms of shutdown will eventually ask to finalize, 538 // optionally with an error error, which means to perform the following steps: 539 void PipeToPump::Finalize(JSContext* aCx, 540 JS::Handle<mozilla::Maybe<JS::Value>> aError) { 541 IgnoredErrorResult rv; 542 // Step 1. Perform ! WritableStreamDefaultWriterRelease(writer). 543 WritableStreamDefaultWriterRelease(aCx, mWriter); 544 545 // Step 2. If reader implements ReadableStreamBYOBReader, 546 // perform ! ReadableStreamBYOBReaderRelease(reader). 547 // Note: We always use a default reader. 548 MOZ_ASSERT(!mReader->IsBYOB()); 549 550 // Step 3. Otherwise, perform ! ReadableStreamDefaultReaderRelease(reader). 551 ReadableStreamDefaultReaderRelease(aCx, mReader, rv); 552 NS_WARNING_ASSERTION(!rv.Failed(), 553 "ReadableStreamReaderGenericRelease should not fail."); 554 555 // Step 3. If signal is not undefined, remove abortAlgorithm from signal. 556 if (IsFollowing()) { 557 Unfollow(); 558 } 559 560 // Step 4. If error was given, reject promise with error. 561 if (aError.isSome()) { 562 JS::Rooted<JS::Value> error(aCx, *aError); 563 mPromise->MaybeReject(error); 564 } else { 565 // Step 5. Otherwise, resolve promise with undefined. 566 mPromise->MaybeResolveWithUndefined(); 567 } 568 569 // Remove all references. 570 mPromise = nullptr; 571 mReader = nullptr; 572 mWriter = nullptr; 573 mLastWritePromise = nullptr; 574 Unfollow(); 575 } 576 577 void PipeToPump::OnReadFulfilled(JSContext* aCx, JS::Handle<JS::Value> aChunk, 578 ErrorResult& aRv) { 579 // (Constraint) Shutdown must stop activity: 580 // if shuttingDown becomes true, the user agent must not initiate further 581 // reads from reader, and must only perform writes of already-read chunks ... 582 // 583 // We may reach this point after |On{Source,Dest}{Clos,Error}ed| has responded 584 // to an out-of-band change. Per the comment in |OnSourceErrored|, we want to 585 // allow the implicated shutdown to proceed, and we don't want to interfere 586 // with or additionally alter its operation. Particularly, we don't want to 587 // queue up the successfully-read chunk (if there was one, and this isn't just 588 // reporting "done") to be written: it wasn't "already-read" when that 589 // error/closure happened. 590 // 591 // All specified reactions to a closure/error invoke either the shutdown, or 592 // shutdown with an action, algorithms. Those algorithms each abort if either 593 // shutdown algorithm has already been invoked. So we check for shutdown here 594 // in case of asynchronous closure/error and abort if shutdown has already 595 // started (and possibly finished). 596 // 597 // TODO: Implement the eventual resolution from 598 // https://github.com/whatwg/streams/issues/1207 599 if (mShuttingDown) { 600 return; 601 } 602 603 // Write asynchronously. Roughly this is like: 604 // `Promise.resolve().then(() => stream.write(chunk));` 605 // XXX: The spec currently does not require asynchronicity, but this still 606 // matches other engines' behavior. See 607 // https://github.com/whatwg/streams/issues/1243. 608 RefPtr<Promise> promise = 609 Promise::CreateInfallible(xpc::CurrentNativeGlobal(aCx)); 610 promise->MaybeResolveWithUndefined(); 611 auto result = promise->ThenWithCycleCollectedArgsJS( 612 [](JSContext* aCx, JS::Handle<JS::Value>, ErrorResult& aRv, 613 const RefPtr<PipeToPump>& aSelf, 614 const RefPtr<WritableStreamDefaultWriter>& aWriter, 615 JS::Handle<JS::Value> aChunk) 616 MOZ_CAN_RUN_SCRIPT_FOR_DEFINITION -> already_AddRefed<Promise> { 617 RefPtr<Promise> promise = 618 WritableStreamDefaultWriterWrite(aCx, aWriter, aChunk, aRv); 619 620 // Last read has finished, so it's time to start reading again. 621 aSelf->Read(aCx); 622 623 return promise.forget(); 624 }, 625 std::make_tuple(RefPtr{this}, mWriter), std::make_tuple(aChunk)); 626 if (result.isErr()) { 627 mLastWritePromise = nullptr; 628 return; 629 } 630 mLastWritePromise = result.unwrap(); 631 632 mLastWritePromise->AppendNativeHandler( 633 new PipeToPumpHandler(this, nullptr, &PipeToPump::OnDestErrored)); 634 } 635 636 void PipeToPump::OnWriterReady(JSContext* aCx, JS::Handle<JS::Value>) { 637 // Writer is ready again (i.e. backpressure was resolved), so read. 638 Read(aCx); 639 } 640 641 struct PipeToReadRequest : public ReadRequest { 642 public: 643 NS_DECL_ISUPPORTS_INHERITED 644 NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(PipeToReadRequest, ReadRequest) 645 646 RefPtr<PipeToPump> mPipeToPump; 647 648 explicit PipeToReadRequest(PipeToPump* aPipeToPump) 649 : mPipeToPump(aPipeToPump) {} 650 651 MOZ_CAN_RUN_SCRIPT void ChunkSteps(JSContext* aCx, 652 JS::Handle<JS::Value> aChunk, 653 ErrorResult& aRv) override { 654 RefPtr<PipeToPump> pipeToPump = mPipeToPump; // XXX known live? 655 pipeToPump->OnReadFulfilled(aCx, aChunk, aRv); 656 } 657 658 // The reader's closed promise handlers will already call OnSourceClosed/ 659 // OnSourceErrored, so these steps can just be ignored. 660 void CloseSteps(JSContext* aCx, ErrorResult& aRv) override {} 661 void ErrorSteps(JSContext* aCx, JS::Handle<JS::Value> aError, 662 ErrorResult& aRv) override {} 663 664 protected: 665 virtual ~PipeToReadRequest() = default; 666 }; 667 668 NS_IMPL_CYCLE_COLLECTION_INHERITED(PipeToReadRequest, ReadRequest, mPipeToPump) 669 670 NS_IMPL_ADDREF_INHERITED(PipeToReadRequest, ReadRequest) 671 NS_IMPL_RELEASE_INHERITED(PipeToReadRequest, ReadRequest) 672 673 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(PipeToReadRequest) 674 NS_INTERFACE_MAP_END_INHERITING(ReadRequest) 675 676 void PipeToPump::Read(JSContext* aCx) { 677 #ifdef DEBUG 678 mReadChunk = true; 679 #endif 680 681 // (Constraint) Shutdown must stop activity: 682 // If shuttingDown becomes true, the user agent must not initiate 683 // further reads from reader 684 if (mShuttingDown) { 685 return; 686 } 687 688 // (Constraint) Backpressure must be enforced: 689 // While WritableStreamDefaultWriterGetDesiredSize(writer) is ≤ 0 or is null, 690 // the user agent must not read from reader. 691 Nullable<double> desiredSize = 692 WritableStreamDefaultWriterGetDesiredSize(mWriter); 693 if (desiredSize.IsNull()) { 694 // This means the writer has errored. This is going to be handled 695 // by the writer closed promise. 696 return; 697 } 698 699 if (desiredSize.Value() <= 0) { 700 // Wait for the writer to become ready before reading more data from 701 // the reader. We don't care about rejections here, because those are 702 // already handled by the writer closed promise. 703 RefPtr<Promise> readyPromise = mWriter->Ready(); 704 readyPromise->AppendNativeHandler( 705 new PipeToPumpHandler(this, &PipeToPump::OnWriterReady, nullptr)); 706 return; 707 } 708 709 RefPtr<ReadableStreamDefaultReader> reader = mReader; 710 RefPtr<ReadRequest> request = new PipeToReadRequest(this); 711 ErrorResult rv; 712 ReadableStreamDefaultReaderRead(aCx, reader, request, rv); 713 if (rv.MaybeSetPendingException(aCx)) { 714 // XXX It's actually not quite obvious what we should do here. 715 // We've got an error during reading, so on the surface it seems logical 716 // to invoke `OnSourceErrored`. However in certain cases the required 717 // condition > source.[[state]] is or becomes "errored" < won't actually 718 // happen i.e. when `WritableStreamDefaultWriterWrite` called from 719 // `OnReadFulfilled` (via PipeToReadRequest::ChunkSteps) fails in 720 // a synchronous fashion. 721 JS::Rooted<JS::Value> error(aCx); 722 JS::Rooted<Maybe<JS::Value>> someError(aCx); 723 724 // The error was moved to the JSContext by MaybeSetPendingException. 725 if (JS_GetPendingException(aCx, &error)) { 726 someError = Some(error.get()); 727 } 728 729 JS_ClearPendingException(aCx); 730 731 Shutdown(aCx, someError); 732 } 733 } 734 735 // Step 3. Closing must be propagated forward: if source.[[state]] is or 736 // becomes "closed", then 737 void PipeToPump::OnSourceClosed(JSContext* aCx, JS::Handle<JS::Value>) { 738 // Step 3.1. If preventClose is false, shutdown with an action of 739 // ! WritableStreamDefaultWriterCloseWithErrorPropagation(writer). 740 if (!mPreventClose) { 741 ShutdownWithAction( 742 aCx, 743 [](JSContext* aCx, PipeToPump* aPipeToPump, 744 JS::Handle<mozilla::Maybe<JS::Value>> aError, ErrorResult& aRv) 745 MOZ_CAN_RUN_SCRIPT { 746 RefPtr<WritableStreamDefaultWriter> writer = aPipeToPump->mWriter; 747 return WritableStreamDefaultWriterCloseWithErrorPropagation( 748 aCx, writer, aRv); 749 }, 750 JS::NothingHandleValue); 751 } else { 752 // Step 3.2 Otherwise, shutdown. 753 Shutdown(aCx, JS::NothingHandleValue); 754 } 755 } 756 757 // Step 1. Errors must be propagated forward: if source.[[state]] is or 758 // becomes "errored", then 759 void PipeToPump::OnSourceErrored(JSContext* aCx, 760 JS::Handle<JS::Value> aSourceStoredError) { 761 // If |source| becomes errored not during a pending read, it's clear we must 762 // react immediately. 763 // 764 // But what if |source| becomes errored *during* a pending read? Should this 765 // first error, or the pending-read second error, predominate? Two semantics 766 // are possible when |source|/|dest| become closed or errored while there's a 767 // pending read: 768 // 769 // 1. Wait until the read fulfills or rejects, then respond to the 770 // closure/error without regard to the read having fulfilled or rejected. 771 // (This will simply not react to the read being rejected, or it will 772 // queue up the read chunk to be written during shutdown.) 773 // 2. React to the closure/error immediately per "Error and close states 774 // must be propagated". Then when the read fulfills or rejects later, do 775 // nothing. 776 // 777 // The spec doesn't clearly require either semantics. It requires that 778 // *already-read* chunks be written (at least if |dest| didn't become errored 779 // or closed such that no further writes can occur). But it's silent as to 780 // not-fully-read chunks. (These semantic differences may only be observable 781 // with very carefully constructed readable/writable streams.) 782 // 783 // It seems best, generally, to react to the temporally-earliest problem that 784 // arises, so we implement option #2. (Blink, in contrast, currently 785 // implements option #1.) 786 // 787 // All specified reactions to a closure/error invoke either the shutdown, or 788 // shutdown with an action, algorithms. Those algorithms each abort if either 789 // shutdown algorithm has already been invoked. So we don't need to do 790 // anything special here to deal with a pending read. 791 // 792 // TODO: Implement the eventual resolution from 793 // https://github.com/whatwg/streams/issues/1207 794 795 // Step 1.1 If preventAbort is false, shutdown with an action of 796 // ! WritableStreamAbort(dest, source.[[storedError]]) 797 // and with source.[[storedError]]. 798 JS::Rooted<Maybe<JS::Value>> error(aCx, Some(aSourceStoredError)); 799 if (!mPreventAbort) { 800 ShutdownWithAction( 801 aCx, 802 [](JSContext* aCx, PipeToPump* aPipeToPump, 803 JS::Handle<mozilla::Maybe<JS::Value>> aError, ErrorResult& aRv) 804 MOZ_CAN_RUN_SCRIPT { 805 JS::Rooted<JS::Value> error(aCx, *aError); 806 RefPtr<WritableStream> dest = aPipeToPump->mWriter->GetStream(); 807 return WritableStreamAbort(aCx, dest, error, aRv); 808 }, 809 error); 810 } else { 811 // Step 1.1. Otherwise, shutdown with source.[[storedError]]. 812 Shutdown(aCx, error); 813 } 814 } 815 816 // Step 4. Closing must be propagated backward: 817 // if ! WritableStreamCloseQueuedOrInFlight(dest) is true 818 // or dest.[[state]] is "closed", then 819 void PipeToPump::OnDestClosed(JSContext* aCx, JS::Handle<JS::Value>) { 820 // Step 4.1. Assert: no chunks have been read or written. 821 // Note: No reading automatically implies no writing. 822 // In a perfect world OnDestClosed would only be called before we start 823 // piping, because afterwards the writer has an exclusive lock on the stream. 824 // In reality the closed promise can still be resolved after we release 825 // the lock on the writer in Finalize. 826 if (mShuttingDown) { 827 return; 828 } 829 MOZ_ASSERT(!mReadChunk); 830 831 // Step 4.2. Let destClosed be a new TypeError. 832 JS::Rooted<Maybe<JS::Value>> destClosed(aCx, Nothing()); 833 { 834 ErrorResult rv; 835 rv.ThrowTypeError("Cannot pipe to closed stream"); 836 JS::Rooted<JS::Value> error(aCx); 837 bool ok = ToJSValue(aCx, std::move(rv), &error); 838 MOZ_RELEASE_ASSERT(ok, "must be ok"); 839 destClosed = Some(error.get()); 840 } 841 842 // Step 4.3. If preventCancel is false, shutdown with an action of 843 // ! ReadableStreamCancel(source, destClosed) and with destClosed. 844 if (!mPreventCancel) { 845 ShutdownWithAction( 846 aCx, 847 [](JSContext* aCx, PipeToPump* aPipeToPump, 848 JS::Handle<mozilla::Maybe<JS::Value>> aError, ErrorResult& aRv) 849 MOZ_CAN_RUN_SCRIPT { 850 JS::Rooted<JS::Value> error(aCx, *aError); 851 RefPtr<ReadableStream> dest = aPipeToPump->mReader->GetStream(); 852 return ReadableStreamCancel(aCx, dest, error, aRv); 853 }, 854 destClosed); 855 } else { 856 // Step 4.4. Otherwise, shutdown with destClosed. 857 Shutdown(aCx, destClosed); 858 } 859 } 860 861 // Step 2. Errors must be propagated backward: if dest.[[state]] is or becomes 862 // "errored", then 863 void PipeToPump::OnDestErrored(JSContext* aCx, 864 JS::Handle<JS::Value> aDestStoredError) { 865 // Step 2.1. If preventCancel is false, shutdown with an action of 866 // ! ReadableStreamCancel(source, dest.[[storedError]]) 867 // and with dest.[[storedError]]. 868 JS::Rooted<Maybe<JS::Value>> error(aCx, Some(aDestStoredError)); 869 if (!mPreventCancel) { 870 ShutdownWithAction( 871 aCx, 872 [](JSContext* aCx, PipeToPump* aPipeToPump, 873 JS::Handle<mozilla::Maybe<JS::Value>> aError, ErrorResult& aRv) 874 MOZ_CAN_RUN_SCRIPT { 875 JS::Rooted<JS::Value> error(aCx, *aError); 876 RefPtr<ReadableStream> dest = aPipeToPump->mReader->GetStream(); 877 return ReadableStreamCancel(aCx, dest, error, aRv); 878 }, 879 error); 880 } else { 881 // Step 2.1. Otherwise, shutdown with dest.[[storedError]]. 882 Shutdown(aCx, error); 883 } 884 } 885 886 NS_IMPL_CYCLE_COLLECTION_CLASS(PipeToPump) 887 NS_IMPL_CYCLE_COLLECTING_ADDREF(PipeToPump) 888 NS_IMPL_CYCLE_COLLECTING_RELEASE(PipeToPump) 889 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(PipeToPump) 890 NS_INTERFACE_MAP_ENTRY(nsISupports) 891 NS_INTERFACE_MAP_END 892 893 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN(PipeToPump) 894 NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mPromise) 895 NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mReader) 896 NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mWriter) 897 NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mLastWritePromise) 898 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END 899 900 NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN(PipeToPump) 901 NS_IMPL_CYCLE_COLLECTION_UNLINK(mPromise) 902 NS_IMPL_CYCLE_COLLECTION_UNLINK(mReader) 903 NS_IMPL_CYCLE_COLLECTION_UNLINK(mWriter) 904 NS_IMPL_CYCLE_COLLECTION_UNLINK(mLastWritePromise) 905 NS_IMPL_CYCLE_COLLECTION_UNLINK_END 906 907 namespace streams_abstract { 908 // https://streams.spec.whatwg.org/#readable-stream-pipe-to 909 already_AddRefed<Promise> ReadableStreamPipeTo( 910 ReadableStream* aSource, WritableStream* aDest, bool aPreventClose, 911 bool aPreventAbort, bool aPreventCancel, AbortSignal* aSignal, 912 mozilla::ErrorResult& aRv) { 913 // Step 1. Assert: source implements ReadableStream. (Implicit) 914 // Step 2. Assert: dest implements WritableStream. (Implicit) 915 // Step 3. Assert: preventClose, preventAbort, and preventCancel are all 916 // booleans (Implicit) 917 // Step 4. If signal was not given, let signal be 918 // undefined. (Implicit) 919 // Step 5. Assert: either signal is undefined, or signal 920 // implements AbortSignal. (Implicit) 921 // Step 6. Assert: !IsReadableStreamLocked(source) is false. 922 MOZ_ASSERT(!IsReadableStreamLocked(aSource)); 923 924 // Step 7. Assert: !IsWritableStreamLocked(dest) is false. 925 MOZ_ASSERT(!IsWritableStreamLocked(aDest)); 926 927 AutoJSAPI jsapi; 928 if (!jsapi.Init(aSource->GetParentObject())) { 929 aRv.ThrowUnknownError("Internal error"); 930 return nullptr; 931 } 932 JSContext* cx = jsapi.cx(); 933 934 // Step 8. If source.[[controller]] implements ReadableByteStreamController, 935 // let reader be either !AcquireReadableStreamBYOBReader(source) or 936 // !AcquireReadableStreamDefaultReader(source), at the user agent’s 937 // discretion. 938 // Step 9. Otherwise, let reader be 939 // !AcquireReadableStreamDefaultReader(source). 940 941 // Note: In the interests of simplicity, we choose here to always acquire 942 // a default reader. 943 RefPtr<ReadableStreamDefaultReader> reader = 944 AcquireReadableStreamDefaultReader(aSource, aRv); 945 if (aRv.Failed()) { 946 return nullptr; 947 } 948 949 // Step 10. Let writer be ! AcquireWritableStreamDefaultWriter(dest). 950 RefPtr<WritableStreamDefaultWriter> writer = 951 AcquireWritableStreamDefaultWriter(aDest, aRv); 952 if (aRv.Failed()) { 953 return nullptr; 954 } 955 956 // Step 11. Set source.[[disturbed]] to true. 957 aSource->SetDisturbed(true); 958 959 // Step 12. Let shuttingDown be false. 960 // Note: PipeToPump ensures this by construction. 961 962 // Step 13. Let promise be a new promise. 963 RefPtr<Promise> promise = 964 Promise::CreateInfallible(aSource->GetParentObject()); 965 966 // Steps 14-15. 967 RefPtr<PipeToPump> pump = new PipeToPump( 968 promise, reader, writer, aPreventClose, aPreventAbort, aPreventCancel); 969 pump->Start(cx, aSignal); 970 971 // Step 16. Return promise. 972 return promise.forget(); 973 } 974 } // namespace streams_abstract 975 976 } // namespace mozilla::dom