ReadableStreamTee.cpp (37577B)
1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ 2 /* vim:set ts=2 sw=2 sts=2 et cindent: */ 3 /* This Source Code Form is subject to the terms of the Mozilla Public 4 * License, v. 2.0. If a copy of the MPL was not distributed with this 5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 6 7 #include "ReadableStreamTee.h" 8 9 #include "ReadIntoRequest.h" 10 #include "TeeState.h" 11 #include "js/Exception.h" 12 #include "js/TypeDecls.h" 13 #include "js/experimental/TypedData.h" 14 #include "mozilla/CycleCollectedJSContext.h" 15 #include "mozilla/dom/ByteStreamHelpers.h" 16 #include "mozilla/dom/Promise-inl.h" 17 #include "mozilla/dom/ReadableByteStreamController.h" 18 #include "mozilla/dom/ReadableStream.h" 19 #include "mozilla/dom/ReadableStreamBYOBReader.h" 20 #include "mozilla/dom/ReadableStreamDefaultController.h" 21 #include "mozilla/dom/ReadableStreamDefaultReader.h" 22 #include "mozilla/dom/ReadableStreamGenericReader.h" 23 #include "mozilla/dom/UnderlyingSourceBinding.h" 24 #include "mozilla/dom/UnderlyingSourceCallbackHelpers.h" 25 #include "nsCycleCollectionParticipant.h" 26 27 namespace mozilla::dom { 28 29 using namespace streams_abstract; 30 31 NS_IMPL_CYCLE_COLLECTION_INHERITED(ReadableStreamDefaultTeeSourceAlgorithms, 32 UnderlyingSourceAlgorithmsBase, mTeeState) 33 NS_IMPL_ADDREF_INHERITED(ReadableStreamDefaultTeeSourceAlgorithms, 34 UnderlyingSourceAlgorithmsBase) 35 NS_IMPL_RELEASE_INHERITED(ReadableStreamDefaultTeeSourceAlgorithms, 36 UnderlyingSourceAlgorithmsBase) 37 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION( 38 ReadableStreamDefaultTeeSourceAlgorithms) 39 NS_INTERFACE_MAP_END_INHERITING(UnderlyingSourceAlgorithmsBase) 40 41 already_AddRefed<Promise> 42 ReadableStreamDefaultTeeSourceAlgorithms::PullCallback( 43 JSContext* aCx, ReadableStreamControllerBase& aController, 44 ErrorResult& aRv) { 45 nsCOMPtr<nsIGlobalObject> global = aController.GetParentObject(); 46 mTeeState->PullCallback(aCx, global, aRv); 47 if (!aRv.Failed()) { 48 return Promise::CreateResolvedWithUndefined(global, aRv); 49 } 50 return nullptr; 51 } 52 53 NS_IMPL_CYCLE_COLLECTION_CLASS(ReadableStreamDefaultTeeReadRequest) 54 55 NS_IMPL_CYCLE_COLLECTION_UNLINK_BEGIN_INHERITED( 56 ReadableStreamDefaultTeeReadRequest, ReadRequest) 57 NS_IMPL_CYCLE_COLLECTION_UNLINK(mTeeState) 58 NS_IMPL_CYCLE_COLLECTION_UNLINK_END 59 60 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_BEGIN_INHERITED( 61 ReadableStreamDefaultTeeReadRequest, ReadRequest) 62 NS_IMPL_CYCLE_COLLECTION_TRAVERSE(mTeeState) 63 NS_IMPL_CYCLE_COLLECTION_TRAVERSE_END 64 65 NS_IMPL_ADDREF_INHERITED(ReadableStreamDefaultTeeReadRequest, ReadRequest) 66 NS_IMPL_RELEASE_INHERITED(ReadableStreamDefaultTeeReadRequest, ReadRequest) 67 68 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(ReadableStreamDefaultTeeReadRequest) 69 NS_INTERFACE_MAP_END_INHERITING(ReadRequest) 70 71 void ReadableStreamDefaultTeeReadRequest::ChunkSteps( 72 JSContext* aCx, JS::Handle<JS::Value> aChunk, ErrorResult& aRv) { 73 // Step 1. 74 class ReadableStreamDefaultTeeReadRequestChunkSteps 75 : public MicroTaskRunnable { 76 // Virtually const, but is cycle collected 77 MOZ_KNOWN_LIVE RefPtr<TeeState> mTeeState; 78 JS::PersistentRooted<JS::Value> mChunk; 79 80 public: 81 ReadableStreamDefaultTeeReadRequestChunkSteps(JSContext* aCx, 82 TeeState* aTeeState, 83 JS::Handle<JS::Value> aChunk) 84 : mTeeState(aTeeState), mChunk(aCx, aChunk) {} 85 86 MOZ_CAN_RUN_SCRIPT 87 void Run(AutoSlowOperation& aAso) override { 88 AutoJSAPI jsapi; 89 if (NS_WARN_IF(!jsapi.Init(mTeeState->GetStream()->GetParentObject()))) { 90 return; 91 } 92 JSContext* cx = jsapi.cx(); 93 // Step Numbering below is relative to Chunk steps Microtask: 94 // 95 // Step 1. 96 mTeeState->SetReadAgain(false); 97 98 // Step 2. 99 JS::Rooted<JS::Value> chunk1(cx, mChunk); 100 JS::Rooted<JS::Value> chunk2(cx, mChunk); 101 102 // Step 3. Skipped until we implement cloneForBranch2 path. 103 MOZ_RELEASE_ASSERT(!mTeeState->CloneForBranch2()); 104 105 // Step 4. 106 if (!mTeeState->Canceled1()) { 107 IgnoredErrorResult rv; 108 // Since we controlled the creation of the two stream branches, we know 109 // they both have default controllers. 110 RefPtr<ReadableStreamDefaultController> controller( 111 mTeeState->Branch1()->DefaultController()); 112 ReadableStreamDefaultControllerEnqueue(cx, controller, chunk1, rv); 113 (void)NS_WARN_IF(rv.Failed()); 114 } 115 116 // Step 5. 117 if (!mTeeState->Canceled2()) { 118 IgnoredErrorResult rv; 119 RefPtr<ReadableStreamDefaultController> controller( 120 mTeeState->Branch2()->DefaultController()); 121 ReadableStreamDefaultControllerEnqueue(cx, controller, chunk2, rv); 122 (void)NS_WARN_IF(rv.Failed()); 123 } 124 125 // Step 6. 126 mTeeState->SetReading(false); 127 128 // Step 7. If |readAgain| is true, perform |pullAlgorithm|. 129 if (mTeeState->ReadAgain()) { 130 IgnoredErrorResult rv; 131 nsCOMPtr<nsIGlobalObject> global( 132 mTeeState->GetStream()->GetParentObject()); 133 mTeeState->PullCallback(cx, global, rv); 134 (void)NS_WARN_IF(rv.Failed()); 135 } 136 } 137 138 bool Suppressed() override { 139 nsIGlobalObject* global = mTeeState->GetStream()->GetParentObject(); 140 return global && global->IsInSyncOperation(); 141 } 142 }; 143 144 RefPtr<ReadableStreamDefaultTeeReadRequestChunkSteps> task = 145 MakeRefPtr<ReadableStreamDefaultTeeReadRequestChunkSteps>(aCx, mTeeState, 146 aChunk); 147 CycleCollectedJSContext::Get()->DispatchToMicroTask(task.forget()); 148 } 149 150 void ReadableStreamDefaultTeeReadRequest::CloseSteps(JSContext* aCx, 151 ErrorResult& aRv) { 152 // Step Numbering below is relative to 'close steps' of 153 // https://streams.spec.whatwg.org/#abstract-opdef-readablestreamdefaulttee 154 // 155 // Step 1. 156 mTeeState->SetReading(false); 157 158 // Step 2. 159 if (!mTeeState->Canceled1()) { 160 RefPtr<ReadableStreamDefaultController> controller( 161 mTeeState->Branch1()->DefaultController()); 162 ReadableStreamDefaultControllerClose(aCx, controller, aRv); 163 if (aRv.Failed()) { 164 return; 165 } 166 } 167 168 // Step 3. 169 if (!mTeeState->Canceled2()) { 170 RefPtr<ReadableStreamDefaultController> controller( 171 mTeeState->Branch2()->DefaultController()); 172 ReadableStreamDefaultControllerClose(aCx, controller, aRv); 173 if (aRv.Failed()) { 174 return; 175 } 176 } 177 178 // Step 4. 179 if (!mTeeState->Canceled1() || !mTeeState->Canceled2()) { 180 mTeeState->CancelPromise()->MaybeResolveWithUndefined(); 181 } 182 } 183 184 void ReadableStreamDefaultTeeReadRequest::ErrorSteps( 185 JSContext* aCx, JS::Handle<JS::Value> aError, ErrorResult& aRv) { 186 mTeeState->SetReading(false); 187 } 188 189 MOZ_CAN_RUN_SCRIPT void PullWithDefaultReader(JSContext* aCx, 190 TeeState* aTeeState, 191 ErrorResult& aRv); 192 MOZ_CAN_RUN_SCRIPT void PullWithBYOBReader(JSContext* aCx, TeeState* aTeeState, 193 JS::Handle<JSObject*> aView, 194 TeeBranch aForBranch, 195 ErrorResult& aRv); 196 197 // Algorithm described in 198 // https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee, Steps 199 // 17 and Steps 18, genericized across branch numbers: 200 // 201 // Note: As specified this algorithm always returns a promise resolved with 202 // undefined, however as some places immediately discard said promise, we 203 // provide this version which doesn't return a promise. 204 // 205 // NativeByteStreamTeePullAlgorithm, which implements 206 // UnderlyingSourcePullCallbackHelper is the version which provies the return 207 // promise. 208 MOZ_CAN_RUN_SCRIPT void ByteStreamTeePullAlgorithm(JSContext* aCx, 209 TeeBranch aForBranch, 210 TeeState* aTeeState, 211 ErrorResult& aRv) { 212 // Step {17,18}.1: If reading is true, 213 if (aTeeState->Reading()) { 214 // Step {17,18}.1.1: Set readAgainForBranch{1,2} to true. 215 aTeeState->SetReadAgainForBranch(aForBranch, true); 216 217 // Step {17,18}.1.1: Return a promise resolved with undefined. 218 return; 219 } 220 221 // Step {17,18}.2: Set reading to true. 222 aTeeState->SetReading(true); 223 224 // Step {17,18}.3: Let byobRequest be 225 // !ReadableByteStreamControllerGetBYOBRequest(branch{1,2}.[[controller]]). 226 RefPtr<ReadableStreamBYOBRequest> byobRequest = 227 ReadableByteStreamControllerGetBYOBRequest( 228 aCx, aTeeState->Branch(aForBranch)->Controller()->AsByte(), aRv); 229 if (aRv.Failed()) { 230 return; 231 } 232 233 // Step {17,18}.4: If byobRequest is null, perform pullWithDefaultReader. 234 if (!byobRequest) { 235 PullWithDefaultReader(aCx, aTeeState, aRv); 236 } else { 237 // Step {17,18}.5: Otherwise, perform pullWithBYOBReader, given 238 // byobRequest.[[view]] and {false, true}. 239 JS::Rooted<JSObject*> view(aCx, byobRequest->View()); 240 PullWithBYOBReader(aCx, aTeeState, view, aForBranch, aRv); 241 } 242 243 // Step {17,18}.6: Return a promise resolved with undefined. 244 } 245 246 // https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee 247 class ByteStreamTeeSourceAlgorithms final 248 : public UnderlyingSourceAlgorithmsBase { 249 public: 250 NS_DECL_ISUPPORTS_INHERITED 251 NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(ByteStreamTeeSourceAlgorithms, 252 UnderlyingSourceAlgorithmsBase) 253 254 ByteStreamTeeSourceAlgorithms(TeeState* aTeeState, TeeBranch aBranch) 255 : mTeeState(aTeeState), mBranch(aBranch) {} 256 257 MOZ_CAN_RUN_SCRIPT void StartCallback( 258 JSContext* aCx, ReadableStreamControllerBase& aController, 259 JS::MutableHandle<JS::Value> aRetVal, ErrorResult& aRv) override { 260 // Step 21: Let startAlgorithm be an algorithm that returns undefined. 261 aRetVal.setUndefined(); 262 } 263 264 // Step 17, 18 265 MOZ_CAN_RUN_SCRIPT already_AddRefed<Promise> PullCallback( 266 JSContext* aCx, ReadableStreamControllerBase& aController, 267 ErrorResult& aRv) override { 268 // Step 1 - 5 269 ByteStreamTeePullAlgorithm(aCx, mBranch, MOZ_KnownLive(mTeeState), aRv); 270 271 // Step 6: Return a promise resolved with undefined. 272 return Promise::CreateResolvedWithUndefined( 273 mTeeState->GetStream()->GetParentObject(), aRv); 274 } 275 276 // Step 19, 20 277 MOZ_CAN_RUN_SCRIPT already_AddRefed<Promise> CancelCallback( 278 JSContext* aCx, const Optional<JS::Handle<JS::Value>>& aReason, 279 ErrorResult& aRv) override { 280 // Step 1. 281 mTeeState->SetCanceled(mBranch, true); 282 283 // Step 2. 284 mTeeState->SetReason(mBranch, aReason.Value()); 285 286 // Step 3. 287 if (mTeeState->Canceled(otherStream())) { 288 // Step 3.1 289 JS::Rooted<JSObject*> compositeReason(aCx, JS::NewArrayObject(aCx, 2)); 290 if (!compositeReason) { 291 aRv.StealExceptionFromJSContext(aCx); 292 return nullptr; 293 } 294 295 JS::Rooted<JS::Value> reason1(aCx, mTeeState->Reason1()); 296 if (!JS_SetElement(aCx, compositeReason, 0, reason1)) { 297 aRv.StealExceptionFromJSContext(aCx); 298 return nullptr; 299 } 300 301 JS::Rooted<JS::Value> reason2(aCx, mTeeState->Reason2()); 302 if (!JS_SetElement(aCx, compositeReason, 1, reason2)) { 303 aRv.StealExceptionFromJSContext(aCx); 304 return nullptr; 305 } 306 307 // Step 3.2 308 JS::Rooted<JS::Value> compositeReasonValue( 309 aCx, JS::ObjectValue(*compositeReason)); 310 RefPtr<ReadableStream> stream(mTeeState->GetStream()); 311 RefPtr<Promise> cancelResult = 312 ReadableStreamCancel(aCx, stream, compositeReasonValue, aRv); 313 if (aRv.Failed()) { 314 return nullptr; 315 } 316 317 // Step 3.3 318 mTeeState->CancelPromise()->MaybeResolve(cancelResult); 319 } 320 321 // Step 4. 322 return do_AddRef(mTeeState->CancelPromise()); 323 }; 324 325 protected: 326 ~ByteStreamTeeSourceAlgorithms() override = default; 327 328 private: 329 TeeBranch otherStream() { return OtherTeeBranch(mBranch); } 330 331 RefPtr<TeeState> mTeeState; 332 TeeBranch mBranch; 333 }; 334 335 NS_IMPL_CYCLE_COLLECTION_INHERITED(ByteStreamTeeSourceAlgorithms, 336 UnderlyingSourceAlgorithmsBase, mTeeState) 337 NS_IMPL_ADDREF_INHERITED(ByteStreamTeeSourceAlgorithms, 338 UnderlyingSourceAlgorithmsBase) 339 NS_IMPL_RELEASE_INHERITED(ByteStreamTeeSourceAlgorithms, 340 UnderlyingSourceAlgorithmsBase) 341 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(ByteStreamTeeSourceAlgorithms) 342 NS_INTERFACE_MAP_END_INHERITING(UnderlyingSourceAlgorithmsBase) 343 344 struct PullWithDefaultReaderReadRequest final : public ReadRequest { 345 RefPtr<TeeState> mTeeState; 346 347 public: 348 NS_DECL_ISUPPORTS_INHERITED 349 NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(PullWithDefaultReaderReadRequest, 350 ReadRequest) 351 352 explicit PullWithDefaultReaderReadRequest(TeeState* aTeeState) 353 : mTeeState(aTeeState) {} 354 355 void ChunkSteps(JSContext* aCx, JS::Handle<JS::Value> aChunk, 356 ErrorResult& aRv) override { 357 // https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee 358 // Step 15.2.1 359 class PullWithDefaultReaderChunkStepMicrotask : public MicroTaskRunnable { 360 RefPtr<TeeState> mTeeState; 361 JS::PersistentRooted<JSObject*> mChunk; 362 363 public: 364 PullWithDefaultReaderChunkStepMicrotask(JSContext* aCx, 365 TeeState* aTeeState, 366 JS::Handle<JSObject*> aChunk) 367 : mTeeState(aTeeState), mChunk(aCx, aChunk) {} 368 369 MOZ_CAN_RUN_SCRIPT 370 void Run(AutoSlowOperation& aAso) override { 371 // Step Numbering in this function is relative to the Queue a microtask 372 // of the Chunk steps of 15.2.1 of 373 // https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee 374 AutoJSAPI jsapi; 375 if (NS_WARN_IF( 376 !jsapi.Init(mTeeState->GetStream()->GetParentObject()))) { 377 return; 378 } 379 JSContext* cx = jsapi.cx(); 380 381 // Step 1. Set readAgainForBranch1 to false. 382 mTeeState->SetReadAgainForBranch1(false); 383 384 // Step 2. Set readAgainForBranch2 to false. 385 mTeeState->SetReadAgainForBranch2(false); 386 387 // Step 3. Let chunk1 and chunk2 be chunk. 388 JS::Rooted<JSObject*> chunk1(cx, mChunk); 389 JS::Rooted<JSObject*> chunk2(cx, mChunk); 390 391 // Step 4. If canceled1 is false and canceled2 is false, 392 ErrorResult rv; 393 if (!mTeeState->Canceled1() && !mTeeState->Canceled2()) { 394 // Step 4.1. Let cloneResult be CloneAsUint8Array(chunk). 395 JS::Rooted<JSObject*> cloneResult(cx, CloneAsUint8Array(cx, mChunk)); 396 397 // Step 4.2. If cloneResult is an abrupt completion, 398 if (!cloneResult) { 399 // Step 4.2.1 Perform 400 // !ReadableByteStreamControllerError(branch1.[[controller]], 401 // cloneResult.[[Value]]). 402 JS::Rooted<JS::Value> exceptionValue(cx); 403 if (!JS_GetPendingException(cx, &exceptionValue)) { 404 // Uncatchable exception, simply return. 405 return; 406 } 407 JS_ClearPendingException(cx); 408 409 ErrorResult rv; 410 ReadableByteStreamControllerError( 411 mTeeState->Branch1()->Controller()->AsByte(), exceptionValue, 412 rv); 413 if (rv.MaybeSetPendingException( 414 cx, "Error during ReadableByteStreamControllerError")) { 415 return; 416 } 417 418 // Step 4.2.2. Perform ! 419 // ReadableByteStreamControllerError(branch2.[[controller]], 420 // cloneResult.[[Value]]). 421 ReadableByteStreamControllerError( 422 mTeeState->Branch2()->Controller()->AsByte(), exceptionValue, 423 rv); 424 if (rv.MaybeSetPendingException( 425 cx, "Error during ReadableByteStreamControllerError")) { 426 return; 427 } 428 429 // Step 4.2.3. Resolve cancelPromise with ! 430 // ReadableStreamCancel(stream, cloneResult.[[Value]]). 431 RefPtr<ReadableStream> stream(mTeeState->GetStream()); 432 RefPtr<Promise> promise = 433 ReadableStreamCancel(cx, stream, exceptionValue, rv); 434 if (rv.MaybeSetPendingException( 435 cx, "Error during ReadableByteStreamControllerError")) { 436 return; 437 } 438 mTeeState->CancelPromise()->MaybeResolve(promise); 439 440 // Step 4.2.4. Return. 441 return; 442 } 443 444 // Step 4.3. Otherwise, set chunk2 to cloneResult.[[Value]]. 445 chunk2 = cloneResult; 446 } 447 448 // Step 5. If canceled1 is false, 449 // perform ! ReadableByteStreamControllerEnqueue(branch1.[[controller]], 450 // chunk1). 451 if (!mTeeState->Canceled1()) { 452 ErrorResult rv; 453 RefPtr<ReadableByteStreamController> controller( 454 mTeeState->Branch1()->Controller()->AsByte()); 455 ReadableByteStreamControllerEnqueue(cx, controller, chunk1, rv); 456 if (rv.MaybeSetPendingException( 457 cx, "Error during ReadableByteStreamControllerEnqueue")) { 458 return; 459 } 460 } 461 462 // Step 6. If canceled2 is false, 463 // perform ! ReadableByteStreamControllerEnqueue(branch2.[[controller]], 464 // chunk2). 465 if (!mTeeState->Canceled2()) { 466 ErrorResult rv; 467 RefPtr<ReadableByteStreamController> controller( 468 mTeeState->Branch2()->Controller()->AsByte()); 469 ReadableByteStreamControllerEnqueue(cx, controller, chunk2, rv); 470 if (rv.MaybeSetPendingException( 471 cx, "Error during ReadableByteStreamControllerEnqueue")) { 472 return; 473 } 474 } 475 476 // Step 7. Set reading to false. 477 mTeeState->SetReading(false); 478 479 // Step 8. If readAgainForBranch1 is true, perform pull1Algorithm. 480 if (mTeeState->ReadAgainForBranch1()) { 481 ByteStreamTeePullAlgorithm(cx, TeeBranch::Branch1, 482 MOZ_KnownLive(mTeeState), rv); 483 } else if (mTeeState->ReadAgainForBranch2()) { 484 // Step 9. Otherwise, if readAgainForBranch2 is true, perform 485 // pull2Algorithm. 486 ByteStreamTeePullAlgorithm(cx, TeeBranch::Branch2, 487 MOZ_KnownLive(mTeeState), rv); 488 } 489 } 490 491 bool Suppressed() override { 492 nsIGlobalObject* global = mTeeState->GetStream()->GetParentObject(); 493 return global && global->IsInSyncOperation(); 494 } 495 }; 496 497 MOZ_ASSERT(aChunk.isObjectOrNull()); 498 MOZ_ASSERT(aChunk.toObjectOrNull() != nullptr); 499 JS::Rooted<JSObject*> chunk(aCx, &aChunk.toObject()); 500 RefPtr<PullWithDefaultReaderChunkStepMicrotask> task = 501 MakeRefPtr<PullWithDefaultReaderChunkStepMicrotask>(aCx, mTeeState, 502 chunk); 503 CycleCollectedJSContext::Get()->DispatchToMicroTask(task.forget()); 504 } 505 506 MOZ_CAN_RUN_SCRIPT void CloseSteps(JSContext* aCx, 507 ErrorResult& aRv) override { 508 // Step numbering below is relative to Step 15.2. 'close steps' of 509 // https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee 510 511 // Step 1. Set reading to false. 512 mTeeState->SetReading(false); 513 514 // Step 2. If canceled1 is false, perform ! 515 // ReadableByteStreamControllerClose(branch1.[[controller]]). 516 RefPtr<ReadableByteStreamController> branch1Controller = 517 mTeeState->Branch1()->Controller()->AsByte(); 518 if (!mTeeState->Canceled1()) { 519 ReadableByteStreamControllerClose(aCx, branch1Controller, aRv); 520 if (aRv.Failed()) { 521 return; 522 } 523 } 524 525 // Step 3. If canceled2 is false, perform ! 526 // ReadableByteStreamControllerClose(branch2.[[controller]]). 527 RefPtr<ReadableByteStreamController> branch2Controller = 528 mTeeState->Branch2()->Controller()->AsByte(); 529 if (!mTeeState->Canceled2()) { 530 ReadableByteStreamControllerClose(aCx, branch2Controller, aRv); 531 if (aRv.Failed()) { 532 return; 533 } 534 } 535 536 // Step 4. If branch1.[[controller]].[[pendingPullIntos]] is not empty, 537 // perform ! ReadableByteStreamControllerRespond(branch1.[[controller]], 0). 538 if (!branch1Controller->PendingPullIntos().isEmpty()) { 539 ReadableByteStreamControllerRespond(aCx, branch1Controller, 0, aRv); 540 if (aRv.Failed()) { 541 return; 542 } 543 } 544 545 // Step 5. If branch2.[[controller]].[[pendingPullIntos]] is not empty, 546 // perform ! ReadableByteStreamControllerRespond(branch2.[[controller]], 0). 547 if (!branch2Controller->PendingPullIntos().isEmpty()) { 548 ReadableByteStreamControllerRespond(aCx, branch2Controller, 0, aRv); 549 if (aRv.Failed()) { 550 return; 551 } 552 } 553 554 // Step 6. If canceled1 is false or canceled2 is false, resolve 555 // cancelPromise with undefined. 556 if (!mTeeState->Canceled1() || !mTeeState->Canceled2()) { 557 mTeeState->CancelPromise()->MaybeResolveWithUndefined(); 558 } 559 } 560 561 void ErrorSteps(JSContext* aCx, JS::Handle<JS::Value> aError, 562 ErrorResult& aRv) override { 563 mTeeState->SetReading(false); 564 } 565 566 protected: 567 ~PullWithDefaultReaderReadRequest() override = default; 568 }; 569 570 NS_IMPL_CYCLE_COLLECTION_INHERITED(PullWithDefaultReaderReadRequest, 571 ReadRequest, mTeeState) 572 NS_IMPL_ADDREF_INHERITED(PullWithDefaultReaderReadRequest, ReadRequest) 573 NS_IMPL_RELEASE_INHERITED(PullWithDefaultReaderReadRequest, ReadRequest) 574 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(PullWithDefaultReaderReadRequest) 575 NS_INTERFACE_MAP_END_INHERITING(ReadRequest) 576 577 void ForwardReaderError(TeeState* aTeeState, 578 ReadableStreamGenericReader* aThisReader); 579 580 // https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee: 581 // Step 15. 582 void PullWithDefaultReader(JSContext* aCx, TeeState* aTeeState, 583 ErrorResult& aRv) { 584 RefPtr<ReadableStreamGenericReader> reader = aTeeState->GetReader(); 585 586 // Step 15.1. If reader implements ReadableStreamBYOBReader, 587 if (reader->IsBYOB()) { 588 // Step 15.1.1. Assert: reader.[[readIntoRequests]] is empty. 589 MOZ_ASSERT(reader->AsBYOB()->ReadIntoRequests().length() == 0); 590 591 // Step 15.1.2. Perform ! ReadableStreamBYOBReaderRelease(reader). 592 ReadableStreamBYOBReaderRelease(aCx, reader->AsBYOB(), aRv); 593 if (aRv.Failed()) { 594 return; 595 } 596 597 // Step 15.1.3. Set reader to ! AcquireReadableStreamDefaultReader(stream). 598 reader = AcquireReadableStreamDefaultReader(aTeeState->GetStream(), aRv); 599 if (aRv.Failed()) { 600 return; 601 } 602 aTeeState->SetReader(reader); 603 604 // Step 16.1.4. Perform forwardReaderError, given reader. 605 ForwardReaderError(aTeeState, reader); 606 } 607 608 // Step 15.2 609 RefPtr<ReadRequest> readRequest = 610 new PullWithDefaultReaderReadRequest(aTeeState); 611 612 // Step 15.3 613 ReadableStreamDefaultReaderRead(aCx, reader, readRequest, aRv); 614 } 615 616 class PullWithBYOBReader_ReadIntoRequest final : public ReadIntoRequest { 617 RefPtr<TeeState> mTeeState; 618 const TeeBranch mForBranch; 619 ~PullWithBYOBReader_ReadIntoRequest() override = default; 620 621 public: 622 NS_DECL_ISUPPORTS_INHERITED 623 NS_DECL_CYCLE_COLLECTION_CLASS_INHERITED(PullWithBYOBReader_ReadIntoRequest, 624 ReadIntoRequest) 625 626 explicit PullWithBYOBReader_ReadIntoRequest(TeeState* aTeeState, 627 TeeBranch aForBranch) 628 : mTeeState(aTeeState), mForBranch(aForBranch) {} 629 630 void ChunkSteps(JSContext* aCx, JS::Handle<JS::Value> aChunk, 631 ErrorResult& aRv) override { 632 // https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee 633 // Step 16.4 chunk steps, Step 1. 634 class PullWithBYOBReaderChunkMicrotask : public MicroTaskRunnable { 635 RefPtr<TeeState> mTeeState; 636 JS::PersistentRooted<JSObject*> mChunk; 637 const TeeBranch mForBranch; 638 639 public: 640 PullWithBYOBReaderChunkMicrotask(JSContext* aCx, TeeState* aTeeState, 641 JS::Handle<JSObject*> aChunk, 642 TeeBranch aForBranch) 643 : mTeeState(aTeeState), mChunk(aCx, aChunk), mForBranch(aForBranch) {} 644 645 MOZ_CAN_RUN_SCRIPT 646 void Run(AutoSlowOperation& aAso) override { 647 AutoJSAPI jsapi; 648 if (NS_WARN_IF( 649 !jsapi.Init(mTeeState->GetStream()->GetParentObject()))) { 650 return; 651 } 652 JSContext* cx = jsapi.cx(); 653 ErrorResult rv; 654 // https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee 655 // 656 // Step Numbering below is relative to Chunk steps Microtask at 657 // Step 16.4 chunk steps, Step 1. 658 659 // Step 1. 660 mTeeState->SetReadAgainForBranch1(false); 661 662 // Step 2. 663 mTeeState->SetReadAgainForBranch2(false); 664 665 // Step 3. 666 bool byobCanceled = mTeeState->Canceled(mForBranch); 667 // Step 4. 668 bool otherCanceled = mTeeState->Canceled(OtherTeeBranch(mForBranch)); 669 670 // Rather than store byobBranch / otherBranch, we re-derive the pointers 671 // below, as borrowed from steps 16.2/16.3 672 ReadableStream* byobBranch = mTeeState->Branch(mForBranch); 673 ReadableStream* otherBranch = 674 mTeeState->Branch(OtherTeeBranch(mForBranch)); 675 676 // Step 5. 677 if (!otherCanceled) { 678 // Step 5.1 (using the name clonedChunk because we don't want to name 679 // the completion record explicitly) 680 JS::Rooted<JSObject*> clonedChunk(cx, CloneAsUint8Array(cx, mChunk)); 681 682 // Step 5.2. If cloneResult is an abrupt completion, 683 if (!clonedChunk) { 684 JS::Rooted<JS::Value> exception(cx); 685 if (!JS_GetPendingException(cx, &exception)) { 686 // Uncatchable exception. Return with pending 687 // exception still on context. 688 return; 689 } 690 691 // It's not expliclitly stated, but I assume the intention here is 692 // that we perform a normal completion here, so we clear the 693 // exception. 694 JS_ClearPendingException(cx); 695 696 // Step 5.2.1 697 698 ReadableByteStreamControllerError( 699 byobBranch->Controller()->AsByte(), exception, rv); 700 if (rv.MaybeSetPendingException(cx)) { 701 return; 702 } 703 704 // Step 5.2.2. 705 ReadableByteStreamControllerError( 706 otherBranch->Controller()->AsByte(), exception, rv); 707 if (rv.MaybeSetPendingException(cx)) { 708 return; 709 } 710 711 // Step 5.2.3. 712 RefPtr<ReadableStream> stream = mTeeState->GetStream(); 713 RefPtr<Promise> cancelPromise = 714 ReadableStreamCancel(cx, stream, exception, rv); 715 if (rv.MaybeSetPendingException(cx)) { 716 return; 717 } 718 mTeeState->CancelPromise()->MaybeResolve(cancelPromise); 719 720 // Step 5.2.4. 721 return; 722 } 723 724 // Step 5.3 (implicitly handled above by name selection) 725 // Step 5.4. 726 if (!byobCanceled) { 727 RefPtr<ReadableByteStreamController> controller( 728 byobBranch->Controller()->AsByte()); 729 ReadableByteStreamControllerRespondWithNewView(cx, controller, 730 mChunk, rv); 731 if (rv.MaybeSetPendingException(cx)) { 732 return; 733 } 734 } 735 736 // Step 5.4. 737 RefPtr<ReadableByteStreamController> otherController = 738 otherBranch->Controller()->AsByte(); 739 ReadableByteStreamControllerEnqueue(cx, otherController, clonedChunk, 740 rv); 741 if (rv.MaybeSetPendingException(cx)) { 742 return; 743 } 744 // Step 6. 745 } else if (!byobCanceled) { 746 RefPtr<ReadableByteStreamController> byobController = 747 byobBranch->Controller()->AsByte(); 748 ReadableByteStreamControllerRespondWithNewView(cx, byobController, 749 mChunk, rv); 750 if (rv.MaybeSetPendingException(cx)) { 751 return; 752 } 753 } 754 755 // Step 7. 756 mTeeState->SetReading(false); 757 758 // Step 8. 759 if (mTeeState->ReadAgainForBranch1()) { 760 ByteStreamTeePullAlgorithm(cx, TeeBranch::Branch1, 761 MOZ_KnownLive(mTeeState), rv); 762 if (rv.MaybeSetPendingException(cx)) { 763 return; 764 } 765 } else if (mTeeState->ReadAgainForBranch2()) { 766 ByteStreamTeePullAlgorithm(cx, TeeBranch::Branch2, 767 MOZ_KnownLive(mTeeState), rv); 768 if (rv.MaybeSetPendingException(cx)) { 769 return; 770 } 771 } 772 } 773 774 bool Suppressed() override { 775 nsIGlobalObject* global = mTeeState->GetStream()->GetParentObject(); 776 return global && global->IsInSyncOperation(); 777 } 778 }; 779 780 MOZ_ASSERT(aChunk.isObjectOrNull()); 781 MOZ_ASSERT(aChunk.toObjectOrNull()); 782 JS::Rooted<JSObject*> chunk(aCx, aChunk.toObjectOrNull()); 783 RefPtr<PullWithBYOBReaderChunkMicrotask> task = 784 MakeRefPtr<PullWithBYOBReaderChunkMicrotask>(aCx, mTeeState, chunk, 785 mForBranch); 786 CycleCollectedJSContext::Get()->DispatchToMicroTask(task.forget()); 787 } 788 789 MOZ_CAN_RUN_SCRIPT 790 void CloseSteps(JSContext* aCx, JS::Handle<JS::Value> aChunk, 791 ErrorResult& aRv) override { 792 // Step 1. 793 mTeeState->SetReading(false); 794 795 // Step 2. 796 bool byobCanceled = mTeeState->Canceled(mForBranch); 797 798 // Step 3. 799 bool otherCanceled = mTeeState->Canceled(OtherTeeBranch(mForBranch)); 800 801 // Rather than store byobBranch / otherBranch, we re-derive the pointers 802 // below, as borrowed from steps 16.2/16.3 803 ReadableStream* byobBranch = mTeeState->Branch(mForBranch); 804 ReadableStream* otherBranch = mTeeState->Branch(OtherTeeBranch(mForBranch)); 805 806 // Step 4. 807 if (!byobCanceled) { 808 RefPtr<ReadableByteStreamController> controller = 809 byobBranch->Controller()->AsByte(); 810 ReadableByteStreamControllerClose(aCx, controller, aRv); 811 if (aRv.Failed()) { 812 return; 813 } 814 } 815 // Step 5. 816 if (!otherCanceled) { 817 RefPtr<ReadableByteStreamController> controller = 818 otherBranch->Controller()->AsByte(); 819 ReadableByteStreamControllerClose(aCx, controller, aRv); 820 if (aRv.Failed()) { 821 return; 822 } 823 } 824 825 // Step 6. 826 if (!aChunk.isUndefined()) { 827 MOZ_ASSERT(aChunk.isObject()); 828 MOZ_ASSERT(aChunk.toObjectOrNull()); 829 830 JS::Rooted<JSObject*> chunkObject(aCx, &aChunk.toObject()); 831 MOZ_ASSERT(JS_IsArrayBufferViewObject(chunkObject)); 832 // Step 6.1. 833 MOZ_ASSERT(JS_GetArrayBufferViewByteLength(chunkObject) == 0); 834 835 // Step 6.2. 836 if (!byobCanceled) { 837 RefPtr<ReadableByteStreamController> byobController( 838 byobBranch->Controller()->AsByte()); 839 ReadableByteStreamControllerRespondWithNewView(aCx, byobController, 840 chunkObject, aRv); 841 if (aRv.Failed()) { 842 return; 843 } 844 } 845 846 // Step 6.3 847 if (!otherCanceled && 848 !otherBranch->Controller()->AsByte()->PendingPullIntos().isEmpty()) { 849 RefPtr<ReadableByteStreamController> otherController( 850 otherBranch->Controller()->AsByte()); 851 ReadableByteStreamControllerRespond(aCx, otherController, 0, aRv); 852 if (aRv.Failed()) { 853 return; 854 } 855 } 856 } 857 858 // Step 7. 859 if (!byobCanceled || !otherCanceled) { 860 mTeeState->CancelPromise()->MaybeResolveWithUndefined(); 861 } 862 } 863 864 void ErrorSteps(JSContext* aCx, JS::Handle<JS::Value> e, 865 ErrorResult& aRv) override { 866 // Step 1. 867 mTeeState->SetReading(false); 868 } 869 }; 870 871 NS_IMPL_CYCLE_COLLECTION_INHERITED(PullWithBYOBReader_ReadIntoRequest, 872 ReadIntoRequest, mTeeState) 873 NS_IMPL_ADDREF_INHERITED(PullWithBYOBReader_ReadIntoRequest, ReadIntoRequest) 874 NS_IMPL_RELEASE_INHERITED(PullWithBYOBReader_ReadIntoRequest, ReadIntoRequest) 875 876 NS_INTERFACE_MAP_BEGIN_CYCLE_COLLECTION(PullWithBYOBReader_ReadIntoRequest) 877 NS_INTERFACE_MAP_END_INHERITING(ReadIntoRequest) 878 879 // https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee 880 // Step 16. 881 void PullWithBYOBReader(JSContext* aCx, TeeState* aTeeState, 882 JS::Handle<JSObject*> aView, TeeBranch aForBranch, 883 ErrorResult& aRv) { 884 // Step 16.1 885 if (aTeeState->GetReader()->IsDefault()) { 886 // Step 16.1.1 887 MOZ_ASSERT(aTeeState->GetDefaultReader()->ReadRequests().isEmpty()); 888 889 // Step 16.1.2. Perform ! ReadableStreamDefaultReaderRelease(reader). 890 ReadableStreamDefaultReaderRelease(aCx, aTeeState->GetDefaultReader(), aRv); 891 if (aRv.Failed()) { 892 return; 893 } 894 895 // Step 16.1.3. Set reader to !AcquireReadableStreamBYOBReader(stream). 896 RefPtr<ReadableStreamBYOBReader> reader = 897 AcquireReadableStreamBYOBReader(aTeeState->GetStream(), aRv); 898 if (aRv.Failed()) { 899 return; 900 } 901 aTeeState->SetReader(reader); 902 903 // Step 16.1.4. Perform forwardReaderError, given reader. 904 ForwardReaderError(aTeeState, reader); 905 } 906 907 // Step 16.2. Unused in this function, moved to consumers. 908 // Step 16.3. Unused in this function, moved to consumers. 909 910 // Step 16.4. 911 RefPtr<ReadIntoRequest> readIntoRequest = 912 new PullWithBYOBReader_ReadIntoRequest(aTeeState, aForBranch); 913 914 // Step 16.5. Perform ! ReadableStreamBYOBReaderRead(reader, view, 1, 915 // readIntoRequest). 916 RefPtr<ReadableStreamBYOBReader> byobReader = 917 aTeeState->GetReader()->AsBYOB(); 918 ReadableStreamBYOBReaderRead(aCx, byobReader, aView, 1, readIntoRequest, aRv); 919 } 920 921 // See https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee 922 // Step 14. 923 void ForwardReaderError(TeeState* aTeeState, 924 ReadableStreamGenericReader* aThisReader) { 925 aThisReader->ClosedPromise()->AddCallbacksWithCycleCollectedArgs( 926 [](JSContext* aCx, JS::Handle<JS::Value> aValue, ErrorResult& aRv, 927 TeeState* aTeeState, ReadableStreamGenericReader* aThisReader) {}, 928 [](JSContext* aCx, JS::Handle<JS::Value> aValue, ErrorResult& aRv, 929 TeeState* aTeeState, ReadableStreamGenericReader* aReader) { 930 // Step 14.1.1 931 if (aTeeState->GetReader() != aReader) { 932 return; 933 } 934 935 ErrorResult rv; 936 // Step 14.1.2: Perform 937 // !ReadableByteStreamControllerError(branch1.[[controller]], r). 938 MOZ_ASSERT(aTeeState->Branch1()->Controller()->IsByte()); 939 ReadableByteStreamControllerError( 940 aTeeState->Branch1()->Controller()->AsByte(), aValue, aRv); 941 if (aRv.Failed()) { 942 return; 943 } 944 945 // Step 14.1.3: Perform 946 // !ReadableByteStreamControllerError(branch2.[[controller]], r). 947 MOZ_ASSERT(aTeeState->Branch2()->Controller()->IsByte()); 948 ReadableByteStreamControllerError( 949 aTeeState->Branch2()->Controller()->AsByte(), aValue, aRv); 950 if (aRv.Failed()) { 951 return; 952 } 953 954 // Step 14.1.4: If canceled1 is false or canceled2 is false, resolve 955 // cancelPromise with undefined. 956 if (!aTeeState->Canceled1() || !aTeeState->Canceled2()) { 957 aTeeState->CancelPromise()->MaybeResolveWithUndefined(); 958 } 959 }, 960 RefPtr(aTeeState), RefPtr(aThisReader)); 961 } 962 963 namespace streams_abstract { 964 // https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamtee 965 void ReadableByteStreamTee(JSContext* aCx, ReadableStream* aStream, 966 nsTArray<RefPtr<ReadableStream>>& aResult, 967 ErrorResult& aRv) { 968 // Step 1. Implicit 969 // Step 2. 970 MOZ_ASSERT(aStream->Controller()->IsByte()); 971 972 // Step 3-13 captured as part of TeeState allocation 973 RefPtr<TeeState> teeState = TeeState::Create(aStream, false, aRv); 974 if (aRv.Failed()) { 975 return; 976 } 977 978 // Step 14: See ForwardReaderError 979 // Step 15. See PullWithDefaultReader 980 // Step 16. See PullWithBYOBReader 981 // Step 17,18. See {Native,}ByteStreamTeePullAlgorithm 982 // Step 19,20. See ReadableByteStreamTeeCancelAlgorithm 983 // Step 21. Elided because consumers know how to handle nullptr correctly. 984 // Step 22. 985 nsCOMPtr<nsIGlobalObject> global = aStream->GetParentObject(); 986 auto branch1Algorithms = 987 MakeRefPtr<ByteStreamTeeSourceAlgorithms>(teeState, TeeBranch::Branch1); 988 teeState->SetBranch1( 989 ReadableStream::CreateByteAbstract(aCx, global, branch1Algorithms, aRv)); 990 if (aRv.Failed()) { 991 return; 992 } 993 994 // Step 23. 995 auto branch2Algorithms = 996 MakeRefPtr<ByteStreamTeeSourceAlgorithms>(teeState, TeeBranch::Branch2); 997 teeState->SetBranch2( 998 ReadableStream::CreateByteAbstract(aCx, global, branch2Algorithms, aRv)); 999 if (aRv.Failed()) { 1000 return; 1001 } 1002 1003 // Step 24. 1004 ForwardReaderError(teeState, teeState->GetReader()); 1005 1006 // Step 25. 1007 aResult.AppendElement(teeState->Branch1()); 1008 aResult.AppendElement(teeState->Branch2()); 1009 } 1010 } // namespace streams_abstract 1011 1012 } // namespace mozilla::dom