async-iterator.any.js (21224B)
1 // META: global=window,worker,shadowrealm 2 // META: script=../resources/rs-utils.js 3 // META: script=../resources/test-utils.js 4 // META: script=../resources/recording-streams.js 5 'use strict'; 6 7 const error1 = new Error('error1'); 8 9 function assert_iter_result(iterResult, value, done, message) { 10 const prefix = message === undefined ? '' : `${message} `; 11 assert_equals(typeof iterResult, 'object', `${prefix}type is object`); 12 assert_equals(Object.getPrototypeOf(iterResult), Object.prototype, `${prefix}[[Prototype]]`); 13 assert_array_equals(Object.getOwnPropertyNames(iterResult).sort(), ['done', 'value'], `${prefix}property names`); 14 assert_equals(iterResult.value, value, `${prefix}value`); 15 assert_equals(iterResult.done, done, `${prefix}done`); 16 } 17 18 test(() => { 19 const s = new ReadableStream(); 20 const it = s.values(); 21 const proto = Object.getPrototypeOf(it); 22 23 const AsyncIteratorPrototype = Object.getPrototypeOf(Object.getPrototypeOf(async function* () {}).prototype); 24 assert_equals(Object.getPrototypeOf(proto), AsyncIteratorPrototype, 'prototype should extend AsyncIteratorPrototype'); 25 26 const methods = ['next', 'return'].sort(); 27 assert_array_equals(Object.getOwnPropertyNames(proto).sort(), methods, 'should have all the correct methods'); 28 29 for (const m of methods) { 30 const propDesc = Object.getOwnPropertyDescriptor(proto, m); 31 assert_true(propDesc.enumerable, 'method should be enumerable'); 32 assert_true(propDesc.configurable, 'method should be configurable'); 33 assert_true(propDesc.writable, 'method should be writable'); 34 assert_equals(typeof it[m], 'function', 'method should be a function'); 35 assert_equals(it[m].name, m, 'method should have the correct name'); 36 } 37 38 assert_equals(it.next.length, 0, 'next should have no parameters'); 39 assert_equals(it.return.length, 1, 'return should have 1 parameter'); 40 assert_equals(typeof it.throw, 'undefined', 'throw should not exist'); 41 }, 'Async iterator instances should have the correct list of properties'); 42 43 promise_test(async () => { 44 const s = new ReadableStream({ 45 start(c) { 46 c.enqueue(1); 47 c.enqueue(2); 48 c.enqueue(3); 49 c.close(); 50 } 51 }); 52 53 const chunks = []; 54 for await (const chunk of s) { 55 chunks.push(chunk); 56 } 57 assert_array_equals(chunks, [1, 2, 3]); 58 }, 'Async-iterating a push source'); 59 60 promise_test(async () => { 61 let i = 1; 62 const s = new ReadableStream({ 63 pull(c) { 64 c.enqueue(i); 65 if (i >= 3) { 66 c.close(); 67 } 68 i += 1; 69 } 70 }); 71 72 const chunks = []; 73 for await (const chunk of s) { 74 chunks.push(chunk); 75 } 76 assert_array_equals(chunks, [1, 2, 3]); 77 }, 'Async-iterating a pull source'); 78 79 promise_test(async () => { 80 const s = new ReadableStream({ 81 start(c) { 82 c.enqueue(undefined); 83 c.enqueue(undefined); 84 c.enqueue(undefined); 85 c.close(); 86 } 87 }); 88 89 const chunks = []; 90 for await (const chunk of s) { 91 chunks.push(chunk); 92 } 93 assert_array_equals(chunks, [undefined, undefined, undefined]); 94 }, 'Async-iterating a push source with undefined values'); 95 96 promise_test(async () => { 97 let i = 1; 98 const s = new ReadableStream({ 99 pull(c) { 100 c.enqueue(undefined); 101 if (i >= 3) { 102 c.close(); 103 } 104 i += 1; 105 } 106 }); 107 108 const chunks = []; 109 for await (const chunk of s) { 110 chunks.push(chunk); 111 } 112 assert_array_equals(chunks, [undefined, undefined, undefined]); 113 }, 'Async-iterating a pull source with undefined values'); 114 115 promise_test(async () => { 116 let i = 1; 117 const s = recordingReadableStream({ 118 pull(c) { 119 c.enqueue(i); 120 if (i >= 3) { 121 c.close(); 122 } 123 i += 1; 124 }, 125 }, new CountQueuingStrategy({ highWaterMark: 0 })); 126 127 const it = s.values(); 128 assert_array_equals(s.events, []); 129 130 const read1 = await it.next(); 131 assert_iter_result(read1, 1, false); 132 assert_array_equals(s.events, ['pull']); 133 134 const read2 = await it.next(); 135 assert_iter_result(read2, 2, false); 136 assert_array_equals(s.events, ['pull', 'pull']); 137 138 const read3 = await it.next(); 139 assert_iter_result(read3, 3, false); 140 assert_array_equals(s.events, ['pull', 'pull', 'pull']); 141 142 const read4 = await it.next(); 143 assert_iter_result(read4, undefined, true); 144 assert_array_equals(s.events, ['pull', 'pull', 'pull']); 145 }, 'Async-iterating a pull source manually'); 146 147 promise_test(async () => { 148 const s = new ReadableStream({ 149 start(c) { 150 c.error('e'); 151 }, 152 }); 153 154 try { 155 for await (const chunk of s) {} 156 assert_unreached(); 157 } catch (e) { 158 assert_equals(e, 'e'); 159 } 160 }, 'Async-iterating an errored stream throws'); 161 162 promise_test(async () => { 163 const s = new ReadableStream({ 164 start(c) { 165 c.close(); 166 } 167 }); 168 169 for await (const chunk of s) { 170 assert_unreached(); 171 } 172 }, 'Async-iterating a closed stream never executes the loop body, but works fine'); 173 174 promise_test(async () => { 175 const s = new ReadableStream(); 176 177 const loop = async () => { 178 for await (const chunk of s) { 179 assert_unreached(); 180 } 181 assert_unreached(); 182 }; 183 184 await Promise.race([ 185 loop(), 186 flushAsyncEvents() 187 ]); 188 }, 'Async-iterating an empty but not closed/errored stream never executes the loop body and stalls the async function'); 189 190 promise_test(async () => { 191 const s = new ReadableStream({ 192 start(c) { 193 c.enqueue(1); 194 c.enqueue(2); 195 c.enqueue(3); 196 c.close(); 197 }, 198 }); 199 200 const reader = s.getReader(); 201 const readResult = await reader.read(); 202 assert_iter_result(readResult, 1, false); 203 reader.releaseLock(); 204 205 const chunks = []; 206 for await (const chunk of s) { 207 chunks.push(chunk); 208 } 209 assert_array_equals(chunks, [2, 3]); 210 }, 'Async-iterating a partially consumed stream'); 211 212 for (const type of ['throw', 'break', 'return']) { 213 for (const preventCancel of [false, true]) { 214 promise_test(async () => { 215 const s = recordingReadableStream({ 216 start(c) { 217 c.enqueue(0); 218 } 219 }); 220 221 // use a separate function for the loop body so return does not stop the test 222 const loop = async () => { 223 for await (const c of s.values({ preventCancel })) { 224 if (type === 'throw') { 225 throw new Error(); 226 } else if (type === 'break') { 227 break; 228 } else if (type === 'return') { 229 return; 230 } 231 } 232 }; 233 234 try { 235 await loop(); 236 } catch (e) {} 237 238 if (preventCancel) { 239 assert_array_equals(s.events, ['pull'], `cancel() should not be called`); 240 } else { 241 assert_array_equals(s.events, ['pull', 'cancel', undefined], `cancel() should be called`); 242 } 243 }, `Cancellation behavior when ${type}ing inside loop body; preventCancel = ${preventCancel}`); 244 } 245 } 246 247 for (const preventCancel of [false, true]) { 248 promise_test(async () => { 249 const s = recordingReadableStream({ 250 start(c) { 251 c.enqueue(0); 252 } 253 }); 254 255 const it = s.values({ preventCancel }); 256 await it.return(); 257 258 if (preventCancel) { 259 assert_array_equals(s.events, [], `cancel() should not be called`); 260 } else { 261 assert_array_equals(s.events, ['cancel', undefined], `cancel() should be called`); 262 } 263 }, `Cancellation behavior when manually calling return(); preventCancel = ${preventCancel}`); 264 } 265 266 promise_test(async t => { 267 let timesPulled = 0; 268 const s = new ReadableStream({ 269 pull(c) { 270 if (timesPulled === 0) { 271 c.enqueue(0); 272 ++timesPulled; 273 } else { 274 c.error(error1); 275 } 276 } 277 }); 278 279 const it = s[Symbol.asyncIterator](); 280 281 const iterResult1 = await it.next(); 282 assert_iter_result(iterResult1, 0, false, '1st next()'); 283 284 await promise_rejects_exactly(t, error1, it.next(), '2nd next()'); 285 }, 'next() rejects if the stream errors'); 286 287 promise_test(async () => { 288 let timesPulled = 0; 289 const s = new ReadableStream({ 290 pull(c) { 291 if (timesPulled === 0) { 292 c.enqueue(0); 293 ++timesPulled; 294 } else { 295 c.error(error1); 296 } 297 } 298 }); 299 300 const it = s[Symbol.asyncIterator](); 301 302 const iterResult = await it.return('return value'); 303 assert_iter_result(iterResult, 'return value', true); 304 }, 'return() does not rejects if the stream has not errored yet'); 305 306 promise_test(async t => { 307 let timesPulled = 0; 308 const s = new ReadableStream({ 309 pull(c) { 310 // Do not error in start() because doing so would prevent acquiring a reader/async iterator. 311 c.error(error1); 312 } 313 }); 314 315 const it = s[Symbol.asyncIterator](); 316 317 await flushAsyncEvents(); 318 await promise_rejects_exactly(t, error1, it.return('return value')); 319 }, 'return() rejects if the stream has errored'); 320 321 promise_test(async t => { 322 let timesPulled = 0; 323 const s = new ReadableStream({ 324 pull(c) { 325 if (timesPulled === 0) { 326 c.enqueue(0); 327 ++timesPulled; 328 } else { 329 c.error(error1); 330 } 331 } 332 }); 333 334 const it = s[Symbol.asyncIterator](); 335 336 const iterResult1 = await it.next(); 337 assert_iter_result(iterResult1, 0, false, '1st next()'); 338 339 await promise_rejects_exactly(t, error1, it.next(), '2nd next()'); 340 341 const iterResult3 = await it.next(); 342 assert_iter_result(iterResult3, undefined, true, '3rd next()'); 343 }, 'next() that succeeds; next() that reports an error; next()'); 344 345 promise_test(async () => { 346 let timesPulled = 0; 347 const s = new ReadableStream({ 348 pull(c) { 349 if (timesPulled === 0) { 350 c.enqueue(0); 351 ++timesPulled; 352 } else { 353 c.error(error1); 354 } 355 } 356 }); 357 358 const it = s[Symbol.asyncIterator](); 359 360 const iterResults = await Promise.allSettled([it.next(), it.next(), it.next()]); 361 362 assert_equals(iterResults[0].status, 'fulfilled', '1st next() promise status'); 363 assert_iter_result(iterResults[0].value, 0, false, '1st next()'); 364 365 assert_equals(iterResults[1].status, 'rejected', '2nd next() promise status'); 366 assert_equals(iterResults[1].reason, error1, '2nd next() rejection reason'); 367 368 assert_equals(iterResults[2].status, 'fulfilled', '3rd next() promise status'); 369 assert_iter_result(iterResults[2].value, undefined, true, '3rd next()'); 370 }, 'next() that succeeds; next() that reports an error(); next() [no awaiting]'); 371 372 promise_test(async t => { 373 let timesPulled = 0; 374 const s = new ReadableStream({ 375 pull(c) { 376 if (timesPulled === 0) { 377 c.enqueue(0); 378 ++timesPulled; 379 } else { 380 c.error(error1); 381 } 382 } 383 }); 384 385 const it = s[Symbol.asyncIterator](); 386 387 const iterResult1 = await it.next(); 388 assert_iter_result(iterResult1, 0, false, '1st next()'); 389 390 await promise_rejects_exactly(t, error1, it.next(), '2nd next()'); 391 392 const iterResult3 = await it.return('return value'); 393 assert_iter_result(iterResult3, 'return value', true, 'return()'); 394 }, 'next() that succeeds; next() that reports an error(); return()'); 395 396 promise_test(async () => { 397 let timesPulled = 0; 398 const s = new ReadableStream({ 399 pull(c) { 400 if (timesPulled === 0) { 401 c.enqueue(0); 402 ++timesPulled; 403 } else { 404 c.error(error1); 405 } 406 } 407 }); 408 409 const it = s[Symbol.asyncIterator](); 410 411 const iterResults = await Promise.allSettled([it.next(), it.next(), it.return('return value')]); 412 413 assert_equals(iterResults[0].status, 'fulfilled', '1st next() promise status'); 414 assert_iter_result(iterResults[0].value, 0, false, '1st next()'); 415 416 assert_equals(iterResults[1].status, 'rejected', '2nd next() promise status'); 417 assert_equals(iterResults[1].reason, error1, '2nd next() rejection reason'); 418 419 assert_equals(iterResults[2].status, 'fulfilled', 'return() promise status'); 420 assert_iter_result(iterResults[2].value, 'return value', true, 'return()'); 421 }, 'next() that succeeds; next() that reports an error(); return() [no awaiting]'); 422 423 promise_test(async () => { 424 let timesPulled = 0; 425 const s = new ReadableStream({ 426 pull(c) { 427 c.enqueue(timesPulled); 428 ++timesPulled; 429 } 430 }); 431 const it = s[Symbol.asyncIterator](); 432 433 const iterResult1 = await it.next(); 434 assert_iter_result(iterResult1, 0, false, 'next()'); 435 436 const iterResult2 = await it.return('return value'); 437 assert_iter_result(iterResult2, 'return value', true, 'return()'); 438 439 assert_equals(timesPulled, 2); 440 }, 'next() that succeeds; return()'); 441 442 promise_test(async () => { 443 let timesPulled = 0; 444 const s = new ReadableStream({ 445 pull(c) { 446 c.enqueue(timesPulled); 447 ++timesPulled; 448 } 449 }); 450 const it = s[Symbol.asyncIterator](); 451 452 const iterResults = await Promise.allSettled([it.next(), it.return('return value')]); 453 454 assert_equals(iterResults[0].status, 'fulfilled', 'next() promise status'); 455 assert_iter_result(iterResults[0].value, 0, false, 'next()'); 456 457 assert_equals(iterResults[1].status, 'fulfilled', 'return() promise status'); 458 assert_iter_result(iterResults[1].value, 'return value', true, 'return()'); 459 460 assert_equals(timesPulled, 2); 461 }, 'next() that succeeds; return() [no awaiting]'); 462 463 promise_test(async () => { 464 const rs = new ReadableStream(); 465 const it = rs.values(); 466 467 const iterResult1 = await it.return('return value'); 468 assert_iter_result(iterResult1, 'return value', true, 'return()'); 469 470 const iterResult2 = await it.next(); 471 assert_iter_result(iterResult2, undefined, true, 'next()'); 472 }, 'return(); next()'); 473 474 promise_test(async () => { 475 const rs = new ReadableStream(); 476 const it = rs.values(); 477 478 const resolveOrder = []; 479 const iterResults = await Promise.allSettled([ 480 it.return('return value').then(result => { 481 resolveOrder.push('return'); 482 return result; 483 }), 484 it.next().then(result => { 485 resolveOrder.push('next'); 486 return result; 487 }) 488 ]); 489 490 assert_equals(iterResults[0].status, 'fulfilled', 'return() promise status'); 491 assert_iter_result(iterResults[0].value, 'return value', true, 'return()'); 492 493 assert_equals(iterResults[1].status, 'fulfilled', 'next() promise status'); 494 assert_iter_result(iterResults[1].value, undefined, true, 'next()'); 495 496 assert_array_equals(resolveOrder, ['return', 'next'], 'next() resolves after return()'); 497 }, 'return(); next() [no awaiting]'); 498 499 promise_test(async () => { 500 let resolveCancelPromise; 501 const rs = recordingReadableStream({ 502 cancel(reason) { 503 return new Promise(r => resolveCancelPromise = r); 504 } 505 }); 506 const it = rs.values(); 507 508 let returnResolved = false; 509 const returnPromise = it.return('return value').then(result => { 510 returnResolved = true; 511 return result; 512 }); 513 await flushAsyncEvents(); 514 assert_false(returnResolved, 'return() should not resolve while cancel() promise is pending'); 515 516 resolveCancelPromise(); 517 const iterResult1 = await returnPromise; 518 assert_iter_result(iterResult1, 'return value', true, 'return()'); 519 520 const iterResult2 = await it.next(); 521 assert_iter_result(iterResult2, undefined, true, 'next()'); 522 }, 'return(); next() with delayed cancel()'); 523 524 promise_test(async () => { 525 let resolveCancelPromise; 526 const rs = recordingReadableStream({ 527 cancel(reason) { 528 return new Promise(r => resolveCancelPromise = r); 529 } 530 }); 531 const it = rs.values(); 532 533 const resolveOrder = []; 534 const returnPromise = it.return('return value').then(result => { 535 resolveOrder.push('return'); 536 return result; 537 }); 538 const nextPromise = it.next().then(result => { 539 resolveOrder.push('next'); 540 return result; 541 }); 542 543 assert_array_equals(rs.events, ['cancel', 'return value'], 'return() should call cancel()'); 544 assert_array_equals(resolveOrder, [], 'return() should not resolve before cancel() resolves'); 545 546 resolveCancelPromise(); 547 const iterResult1 = await returnPromise; 548 assert_iter_result(iterResult1, 'return value', true, 'return() should resolve with original reason'); 549 const iterResult2 = await nextPromise; 550 assert_iter_result(iterResult2, undefined, true, 'next() should resolve with done result'); 551 552 assert_array_equals(rs.events, ['cancel', 'return value'], 'no pull() after cancel()'); 553 assert_array_equals(resolveOrder, ['return', 'next'], 'next() should resolve after return() resolves'); 554 555 }, 'return(); next() with delayed cancel() [no awaiting]'); 556 557 promise_test(async () => { 558 const rs = new ReadableStream(); 559 const it = rs.values(); 560 561 const iterResult1 = await it.return('return value 1'); 562 assert_iter_result(iterResult1, 'return value 1', true, '1st return()'); 563 564 const iterResult2 = await it.return('return value 2'); 565 assert_iter_result(iterResult2, 'return value 2', true, '1st return()'); 566 }, 'return(); return()'); 567 568 promise_test(async () => { 569 const rs = new ReadableStream(); 570 const it = rs.values(); 571 572 const resolveOrder = []; 573 const iterResults = await Promise.allSettled([ 574 it.return('return value 1').then(result => { 575 resolveOrder.push('return 1'); 576 return result; 577 }), 578 it.return('return value 2').then(result => { 579 resolveOrder.push('return 2'); 580 return result; 581 }) 582 ]); 583 584 assert_equals(iterResults[0].status, 'fulfilled', '1st return() promise status'); 585 assert_iter_result(iterResults[0].value, 'return value 1', true, '1st return()'); 586 587 assert_equals(iterResults[1].status, 'fulfilled', '2nd return() promise status'); 588 assert_iter_result(iterResults[1].value, 'return value 2', true, '1st return()'); 589 590 assert_array_equals(resolveOrder, ['return 1', 'return 2'], '2nd return() resolves after 1st return()'); 591 }, 'return(); return() [no awaiting]'); 592 593 test(() => { 594 const s = new ReadableStream({ 595 start(c) { 596 c.enqueue(0); 597 c.close(); 598 }, 599 }); 600 s.values(); 601 assert_throws_js(TypeError, () => s.values(), 'values() should throw'); 602 }, 'values() throws if there\'s already a lock'); 603 604 promise_test(async () => { 605 const s = new ReadableStream({ 606 start(c) { 607 c.enqueue(1); 608 c.enqueue(2); 609 c.enqueue(3); 610 c.close(); 611 } 612 }); 613 614 const chunks = []; 615 for await (const chunk of s) { 616 chunks.push(chunk); 617 } 618 assert_array_equals(chunks, [1, 2, 3]); 619 620 const reader = s.getReader(); 621 await reader.closed; 622 }, 'Acquiring a reader after exhaustively async-iterating a stream'); 623 624 promise_test(async t => { 625 let timesPulled = 0; 626 const s = new ReadableStream({ 627 pull(c) { 628 if (timesPulled === 0) { 629 c.enqueue(0); 630 ++timesPulled; 631 } else { 632 c.error(error1); 633 } 634 } 635 }); 636 637 const it = s[Symbol.asyncIterator]({ preventCancel: true }); 638 639 const iterResult1 = await it.next(); 640 assert_iter_result(iterResult1, 0, false, '1st next()'); 641 642 await promise_rejects_exactly(t, error1, it.next(), '2nd next()'); 643 644 const iterResult2 = await it.return('return value'); 645 assert_iter_result(iterResult2, 'return value', true, 'return()'); 646 647 // i.e. it should not reject with a generic "this stream is locked" TypeError. 648 const reader = s.getReader(); 649 await promise_rejects_exactly(t, error1, reader.closed, 'closed on the new reader should reject with the error'); 650 }, 'Acquiring a reader after return()ing from a stream that errors'); 651 652 promise_test(async () => { 653 const s = new ReadableStream({ 654 start(c) { 655 c.enqueue(1); 656 c.enqueue(2); 657 c.enqueue(3); 658 c.close(); 659 }, 660 }); 661 662 // read the first two chunks, then cancel 663 const chunks = []; 664 for await (const chunk of s) { 665 chunks.push(chunk); 666 if (chunk >= 2) { 667 break; 668 } 669 } 670 assert_array_equals(chunks, [1, 2]); 671 672 const reader = s.getReader(); 673 await reader.closed; 674 }, 'Acquiring a reader after partially async-iterating a stream'); 675 676 promise_test(async () => { 677 const s = new ReadableStream({ 678 start(c) { 679 c.enqueue(1); 680 c.enqueue(2); 681 c.enqueue(3); 682 c.close(); 683 }, 684 }); 685 686 // read the first two chunks, then release lock 687 const chunks = []; 688 for await (const chunk of s.values({preventCancel: true})) { 689 chunks.push(chunk); 690 if (chunk >= 2) { 691 break; 692 } 693 } 694 assert_array_equals(chunks, [1, 2]); 695 696 const reader = s.getReader(); 697 const readResult = await reader.read(); 698 assert_iter_result(readResult, 3, false); 699 await reader.closed; 700 }, 'Acquiring a reader and reading the remaining chunks after partially async-iterating a stream with preventCancel = true'); 701 702 for (const preventCancel of [false, true]) { 703 test(() => { 704 const rs = new ReadableStream(); 705 rs.values({ preventCancel }).return(); 706 // The test passes if this line doesn't throw. 707 rs.getReader(); 708 }, `return() should unlock the stream synchronously when preventCancel = ${preventCancel}`); 709 } 710 711 promise_test(async () => { 712 const rs = new ReadableStream({ 713 async start(c) { 714 c.enqueue('a'); 715 c.enqueue('b'); 716 c.enqueue('c'); 717 await flushAsyncEvents(); 718 // At this point, the async iterator has a read request in the stream's queue for its pending next() promise. 719 // Closing the stream now causes two things to happen *synchronously*: 720 // 1. ReadableStreamClose resolves reader.[[closedPromise]] with undefined. 721 // 2. ReadableStreamClose calls the read request's close steps, which calls ReadableStreamReaderGenericRelease, 722 // which replaces reader.[[closedPromise]] with a rejected promise. 723 c.close(); 724 } 725 }); 726 727 const chunks = []; 728 for await (const chunk of rs) { 729 chunks.push(chunk); 730 } 731 assert_array_equals(chunks, ['a', 'b', 'c']); 732 }, 'close() while next() is pending');