observable-constructor.any.js (41606B)
1 // Because we test that the global error handler is called at various times. 2 setup({allow_uncaught_exception: true}); 3 4 test(() => { 5 assert_implements(self.Observable, "The Observable interface is not implemented"); 6 7 assert_true( 8 typeof Observable === "function", 9 "Observable constructor is defined" 10 ); 11 12 assert_throws_js(TypeError, () => { new Observable(); }); 13 }, "Observable constructor"); 14 15 test(() => { 16 let initializerCalled = false; 17 const source = new Observable(() => { 18 initializerCalled = true; 19 }); 20 21 assert_false( 22 initializerCalled, 23 "initializer should not be called by construction" 24 ); 25 source.subscribe(); 26 assert_true(initializerCalled, "initializer should be called by subscribe"); 27 }, "subscribe() can be called with no arguments"); 28 29 test(() => { 30 assert_implements(self.Subscriber, "The Subscriber interface is not implemented"); 31 assert_true( 32 typeof Subscriber === "function", 33 "Subscriber interface is defined as a function" 34 ); 35 36 assert_throws_js(TypeError, () => { new Subscriber(); }); 37 38 let initializerCalled = false; 39 new Observable(subscriber => { 40 assert_not_equals(subscriber, undefined, "A Subscriber must be passed into the subscribe callback"); 41 assert_implements(subscriber.next, "A Subscriber object must have a next() method"); 42 assert_implements(subscriber.complete, "A Subscriber object must have a complete() method"); 43 assert_implements(subscriber.error, "A Subscriber object must have an error() method"); 44 initializerCalled = true; 45 }).subscribe(); 46 assert_true(initializerCalled, "initializer should be called by subscribe"); 47 }, "Subscriber interface is not constructible"); 48 49 test(() => { 50 let initializerCalled = false; 51 const results = []; 52 53 const source = new Observable((subscriber) => { 54 initializerCalled = true; 55 subscriber.next(1); 56 subscriber.next(2); 57 subscriber.next(3); 58 }); 59 60 assert_false( 61 initializerCalled, 62 "initializer should not be called by construction" 63 ); 64 65 source.subscribe(x => results.push(x)); 66 67 assert_true(initializerCalled, "initializer should be called by subscribe"); 68 assert_array_equals( 69 results, 70 [1, 2, 3], 71 "should emit values synchronously, but not complete" 72 ); 73 }, "Subscribe with just a function as the next handler"); 74 75 test(() => { 76 let initializerCalled = false; 77 const results = []; 78 79 const source = new Observable((subscriber) => { 80 initializerCalled = true; 81 subscriber.next(1); 82 subscriber.next(2); 83 subscriber.next(3); 84 subscriber.complete(); 85 }); 86 87 assert_false( 88 initializerCalled, 89 "initializer should not be called by construction" 90 ); 91 92 source.subscribe({ 93 next: (x) => results.push(x), 94 error: () => assert_unreached("error should not be called"), 95 complete: () => results.push("complete"), 96 }); 97 98 assert_true(initializerCalled, "initializer should be called by subscribe"); 99 assert_array_equals( 100 results, 101 [1, 2, 3, "complete"], 102 "should emit values synchronously" 103 ); 104 }, "Observable constructor calls initializer on subscribe"); 105 106 test(() => { 107 const error = new Error("error"); 108 const results = []; 109 110 const source = new Observable((subscriber) => { 111 subscriber.next(1); 112 subscriber.next(2); 113 subscriber.error(error); 114 }); 115 116 source.subscribe({ 117 next: (x) => results.push(x), 118 error: (e) => results.push(e), 119 complete: () => assert_unreached("complete should not be called"), 120 }); 121 122 assert_array_equals( 123 results, 124 [1, 2, error], 125 "should emit error synchronously" 126 ); 127 }, "Observable error path called synchronously"); 128 129 test(() => { 130 let subscriber; 131 new Observable(s => { subscriber = s }).subscribe(); 132 const {next, complete, error} = subscriber; 133 assert_throws_js(TypeError, () => next(1)); 134 assert_throws_js(TypeError, () => complete()); 135 assert_throws_js(TypeError, () => error(1)); 136 }, "Subscriber must have receiver"); 137 138 test(() => { 139 let subscriber; 140 new Observable(s => { subscriber = s }).subscribe(); 141 assert_throws_js(TypeError, () => subscriber.next()); 142 assert_throws_js(TypeError, () => subscriber.error()); 143 }, "Subscriber next & error must recieve argument"); 144 145 test(() => { 146 let subscriber; 147 new Observable(s => { subscriber = s }).subscribe(); 148 assert_true(subscriber.active); 149 assert_false(subscriber.signal.aborted); 150 subscriber.complete(); 151 assert_false(subscriber.active); 152 assert_true(subscriber.signal.aborted); 153 }, "Subscriber complete() will set active to false, and abort signal"); 154 155 test(() => { 156 let subscriber; 157 new Observable(s => { subscriber = s }).subscribe(); 158 assert_true(subscriber.active); 159 subscriber.active = false; 160 assert_true(subscriber.active); 161 }, "Subscriber active is readonly"); 162 163 test(() => { 164 let subscriber; 165 new Observable(s => { subscriber = s }).subscribe(); 166 assert_false(subscriber.signal.aborted); 167 const oldSignal = subscriber.signal; 168 const newSignal = AbortSignal.abort(); 169 subscriber.signal = newSignal; 170 assert_false(subscriber.signal.aborted); 171 assert_equals(subscriber.signal, oldSignal, "signal did not change"); 172 }, "Subscriber signal is readonly"); 173 174 test(() => { 175 const error = new Error("error"); 176 const results = []; 177 let errorReported = null; 178 let innerSubscriber = null; 179 let subscriptionActivityInFinallyAfterThrow; 180 let subscriptionActivityInErrorHandlerAfterThrow; 181 182 self.addEventListener("error", e => errorReported = e, {once: true}); 183 184 const source = new Observable((subscriber) => { 185 innerSubscriber = subscriber; 186 subscriber.next(1); 187 try { 188 throw error; 189 } finally { 190 subscriptionActivityInFinallyAfterThrow = subscriber.active; 191 } 192 }); 193 194 source.subscribe({ 195 next: (x) => results.push(x), 196 error: (e) => { 197 subscriptionActivityInErrorHandlerAfterThrow = innerSubscriber.active; 198 results.push(e); 199 }, 200 complete: () => assert_unreached("complete should not be called"), 201 }); 202 203 assert_equals(errorReported, null, "The global error handler should not be " + 204 "invoked when the subscribe callback throws an error and the " + 205 "subscriber has given an error handler"); 206 assert_true(subscriptionActivityInFinallyAfterThrow, "Subscriber is " + 207 "considered active in finally block before error handler is invoked"); 208 assert_false(subscriptionActivityInErrorHandlerAfterThrow, "Subscriber is " + 209 "considered inactive in error handler block after thrown error"); 210 assert_array_equals( 211 results, 212 [1, error], 213 "should emit values and the thrown error synchronously" 214 ); 215 }, "Observable should error if initializer throws"); 216 217 test(t => { 218 let innerSubscriber = null; 219 let activeBeforeComplete = false; 220 let activeAfterComplete = false; 221 let activeDuringComplete = false; 222 let abortedBeforeComplete = false; 223 let abortedDuringComplete = false; 224 let abortedAfterComplete = false; 225 226 const source = new Observable((subscriber) => { 227 innerSubscriber = subscriber; 228 activeBeforeComplete = subscriber.active; 229 abortedBeforeComplete = subscriber.signal.aborted; 230 231 subscriber.complete(); 232 activeAfterComplete = subscriber.active; 233 abortedAfterComplete = subscriber.signal.aborted; 234 }); 235 236 source.subscribe({ 237 complete: () => { 238 activeDuringComplete = innerSubscriber.active; 239 abortedDuringComplete = innerSubscriber.signal.aborted; 240 } 241 }); 242 assert_true(activeBeforeComplete, "Subscription is active before complete"); 243 assert_false(abortedBeforeComplete, "Subscription is not aborted before complete"); 244 assert_false(activeDuringComplete, 245 "Subscription becomes inactive during Subscriber#complete(), just " + 246 "before Observer#complete() callback is invoked"); 247 assert_true(abortedDuringComplete, 248 "Subscription's signal is aborted during Subscriber#complete(), just " + 249 "before Observer#complete() callback is invoked"); 250 assert_false(activeAfterComplete, "Subscription is not active after complete"); 251 assert_true(abortedAfterComplete, "Subscription is aborted after complete"); 252 }, "Subscription is inactive after complete()"); 253 254 test(t => { 255 let innerSubscriber = null; 256 let activeBeforeError = false; 257 let activeAfterError = false; 258 let activeDuringError = false; 259 let abortedBeforeError = false; 260 let abortedDuringError = false; 261 let abortedAfterError = false; 262 263 const error = new Error("error"); 264 const source = new Observable((subscriber) => { 265 innerSubscriber = subscriber; 266 activeBeforeError = subscriber.active; 267 abortedBeforeError = subscriber.signal.aborted; 268 269 subscriber.error(error); 270 activeAfterError = subscriber.active; 271 abortedAfterError = subscriber.signal.aborted; 272 }); 273 274 source.subscribe({ 275 error: () => { 276 activeDuringError = innerSubscriber.active; 277 abortedDuringError = innerSubscriber.signal.aborted; 278 } 279 }); 280 assert_true(activeBeforeError, "Subscription is active before error"); 281 assert_false(abortedBeforeError, "Subscription is not aborted before error"); 282 assert_false(activeDuringError, 283 "Subscription becomes inactive during Subscriber#error(), just " + 284 "before Observer#error() callback is invoked"); 285 assert_true(abortedDuringError, 286 "Subscription's signal is aborted during Subscriber#error(), just " + 287 "before Observer#error() callback is invoked"); 288 assert_false(activeAfterError, "Subscription is not active after error"); 289 assert_true(abortedAfterError, "Subscription is not aborted after error"); 290 }, "Subscription is inactive after error()"); 291 292 test(t => { 293 let innerSubscriber; 294 let initialActivity; 295 let initialSignalAborted; 296 297 const source = new Observable((subscriber) => { 298 innerSubscriber = subscriber; 299 initialActivity = subscriber.active; 300 initialSignalAborted = subscriber.signal.aborted; 301 }); 302 303 source.subscribe({}, {signal: AbortSignal.abort('Initially aborted')}); 304 assert_false(initialActivity); 305 assert_true(initialSignalAborted); 306 assert_equals(innerSubscriber.signal.reason, 'Initially aborted'); 307 }, "Subscription is inactive when aborted signal is passed in"); 308 309 test(() => { 310 let outerSubscriber = null; 311 312 const source = new Observable(subscriber => outerSubscriber = subscriber); 313 314 const controller = new AbortController(); 315 source.subscribe({}, {signal: controller.signal}); 316 317 assert_not_equals(controller.signal, outerSubscriber.signal); 318 }, "Subscriber#signal is not the same AbortSignal as the one passed into `subscribe()`"); 319 320 test(() => { 321 const results = []; 322 323 const source = new Observable((subscriber) => { 324 subscriber.next(1); 325 subscriber.next(2); 326 subscriber.complete(); 327 subscriber.next(3); 328 }); 329 330 source.subscribe({ 331 next: (x) => results.push(x), 332 error: () => assert_unreached("error should not be called"), 333 complete: () => results.push("complete"), 334 }); 335 336 assert_array_equals( 337 results, 338 [1, 2, "complete"], 339 "should emit values synchronously, but not nexted values after complete" 340 ); 341 }, "Subscription does not emit values after completion"); 342 343 test(() => { 344 const error = new Error("error"); 345 const results = []; 346 347 const source = new Observable((subscriber) => { 348 subscriber.next(1); 349 subscriber.next(2); 350 subscriber.error(error); 351 subscriber.next(3); 352 }); 353 354 source.subscribe({ 355 next: (x) => results.push(x), 356 error: (e) => results.push(e), 357 complete: () => assert_unreached("complete should not be called"), 358 }); 359 360 assert_array_equals( 361 results, 362 [1, 2, error], 363 "should emit values synchronously, but not nexted values after error" 364 ); 365 }, "Subscription does not emit values after error"); 366 367 test(() => { 368 const error = new Error("error"); 369 const results = []; 370 371 const source = new Observable((subscriber) => { 372 subscriber.next(1); 373 subscriber.next(2); 374 subscriber.error(error); 375 assert_false(subscriber.active, "subscriber is closed after error"); 376 subscriber.next(3); 377 subscriber.complete(); 378 }); 379 380 source.subscribe({ 381 next: (x) => results.push(x), 382 error: (error) => results.push(error), 383 complete: () => assert_unreached("complete should not be called"), 384 }); 385 386 assert_array_equals(results, [1, 2, error], "should emit synchronously"); 387 }, "Completing or nexting a subscriber after an error does nothing"); 388 389 test(() => { 390 const error = new Error("custom error"); 391 let errorReported = null; 392 393 self.addEventListener("error", e => errorReported = e, { once: true }); 394 395 const source = new Observable((subscriber) => { 396 subscriber.error(error); 397 }); 398 399 // No error handler provided... 400 source.subscribe({ 401 next: () => assert_unreached("next should not be called"), 402 complete: () => assert_unreached("complete should not be called"), 403 }); 404 405 // ... still the exception is reported to the global. 406 assert_true(errorReported !== null, "Exception was reported to global"); 407 assert_true(errorReported.message.includes("custom error"), "Error message matches"); 408 assert_greater_than(errorReported.lineno, 0, "Error lineno is greater than 0"); 409 assert_greater_than(errorReported.colno, 0, "Error lineno is greater than 0"); 410 assert_equals(errorReported.error, error, "Error object is equivalent"); 411 }, "Errors pushed to the subscriber that are not handled by the subscription " + 412 "are reported to the global"); 413 414 test(() => { 415 const error = new Error("custom error"); 416 let errorReported = null; 417 418 self.addEventListener("error", e => errorReported = e, { once: true }); 419 420 const source = new Observable((subscriber) => { 421 throw error; 422 }); 423 424 // No error handler provided... 425 source.subscribe({ 426 next: () => assert_unreached("next should not be called"), 427 complete: () => assert_unreached("complete should not be called"), 428 }); 429 430 // ... still the exception is reported to the global. 431 assert_true(errorReported !== null, "Exception was reported to global"); 432 assert_true(errorReported.message.includes("custom error"), "Error message matches"); 433 assert_greater_than(errorReported.lineno, 0, "Error lineno is greater than 0"); 434 assert_greater_than(errorReported.colno, 0, "Error lineno is greater than 0"); 435 assert_equals(errorReported.error, error, "Error object is equivalent"); 436 }, "Errors thrown in the initializer that are not handled by the " + 437 "subscription are reported to the global"); 438 439 test(() => { 440 const error = new Error("custom error"); 441 const results = []; 442 let errorReported = null; 443 444 self.addEventListener("error", e => errorReported = e, { once: true }); 445 446 const source = new Observable((subscriber) => { 447 subscriber.next(1); 448 subscriber.next(2); 449 subscriber.complete(); 450 subscriber.error(error); 451 }); 452 453 source.subscribe({ 454 next: (x) => results.push(x), 455 error: () => assert_unreached("error should not be called"), 456 complete: () => results.push("complete"), 457 }); 458 459 assert_array_equals( 460 results, 461 [1, 2, "complete"], 462 "should emit values synchronously, but not error values after complete" 463 ); 464 465 // Error reporting still happens even after the subscription is closed. 466 assert_true(errorReported !== null, "Exception was reported to global"); 467 assert_true(errorReported.message.includes("custom error"),"Error message matches"); 468 assert_greater_than(errorReported.lineno, 0, "Error lineno is greater than 0"); 469 assert_greater_than(errorReported.colno, 0, "Error lineno is greater than 0"); 470 assert_equals(errorReported.error, error, "Error object is equivalent"); 471 }, "Subscription reports errors that are pushed after subscriber is closed " + 472 "by completion"); 473 474 test(t => { 475 const error = new Error("custom error"); 476 const results = []; 477 let errorReported = null; 478 479 self.addEventListener("error", e => errorReported = e, { once: true }); 480 481 const source = new Observable((subscriber) => { 482 subscriber.next(1); 483 subscriber.next(2); 484 subscriber.complete(); 485 throw error; 486 }); 487 488 source.subscribe({ 489 next: (x) => results.push(x), 490 error: () => assert_unreached("error should not be called"), 491 complete: () => results.push("complete"), 492 }); 493 494 assert_array_equals(results, [1, 2, "complete"], 495 "should emit values synchronously, but not error after complete" 496 ); 497 498 assert_true(errorReported !== null, "Exception was reported to global"); 499 assert_true(errorReported.message.includes("custom error"), "Error message matches"); 500 assert_greater_than(errorReported.lineno, 0, "Error lineno is greater than 0"); 501 assert_greater_than(errorReported.colno, 0, "Error lineno is greater than 0"); 502 assert_equals(errorReported.error, error, "Error object is equivalent"); 503 }, "Errors thrown by initializer function after subscriber is closed by " + 504 "completion are reported"); 505 506 test(() => { 507 const error1 = new Error("error 1"); 508 const error2 = new Error("error 2"); 509 const results = []; 510 let errorReported = null; 511 512 self.addEventListener("error", e => errorReported = e, { once: true }); 513 514 const source = new Observable((subscriber) => { 515 subscriber.next(1); 516 subscriber.next(2); 517 subscriber.error(error1); 518 throw error2; 519 }); 520 521 source.subscribe({ 522 next: (x) => results.push(x), 523 error: (error) => results.push(error), 524 complete: () => assert_unreached("complete should not be called"), 525 }); 526 527 assert_array_equals( 528 results, 529 [1, 2, error1], 530 "should emit values synchronously, but not nexted values after error" 531 ); 532 533 assert_true(errorReported !== null, "Exception was reported to global"); 534 assert_true(errorReported.message.includes("error 2"), "Error message matches"); 535 assert_greater_than(errorReported.lineno, 0, "Error lineno is greater than 0"); 536 assert_greater_than(errorReported.colno, 0, "Error lineno is greater than 0"); 537 assert_equals(errorReported.error, error2, "Error object is equivalent"); 538 }, "Errors thrown by initializer function after subscriber is closed by " + 539 "error are reported"); 540 541 test(() => { 542 const error1 = new Error("error 1"); 543 const error2 = new Error("error 2"); 544 const results = []; 545 let errorReported = null; 546 547 self.addEventListener("error", e => errorReported = e, { once: true }); 548 549 const source = new Observable((subscriber) => { 550 subscriber.next(1); 551 subscriber.next(2); 552 subscriber.error(error1); 553 subscriber.error(error2); 554 }); 555 556 source.subscribe({ 557 next: (x) => results.push(x), 558 error: (error) => results.push(error), 559 complete: () => assert_unreached("complete should not be called"), 560 }); 561 562 assert_array_equals( 563 results, 564 [1, 2, error1], 565 "should emit values synchronously, but not nexted values after error" 566 ); 567 568 assert_true(errorReported !== null, "Exception was reported to global"); 569 assert_true(errorReported.message.includes("error 2"), "Error message matches"); 570 assert_greater_than(errorReported.lineno, 0, "Error lineno is greater than 0"); 571 assert_greater_than(errorReported.colno, 0, "Error lineno is greater than 0"); 572 assert_equals(errorReported.error, error2, "Error object is equivalent"); 573 }, "Errors pushed by initializer function after subscriber is closed by " + 574 "error are reported"); 575 576 test(() => { 577 const results = []; 578 const target = new EventTarget(); 579 580 const source = new Observable((subscriber) => { 581 target.addEventListener('custom event', e => { 582 subscriber.next(1); 583 subscriber.complete(); 584 subscriber.error('not a real error'); 585 }); 586 }); 587 588 source.subscribe({ 589 next: (x) => results.push(x), 590 error: (error) => results.push(error), 591 complete: () => { 592 results.push('complete'), 593 // Re-entrantly tries to invoke `complete()`. However, this function must 594 // only ever run once. 595 target.dispatchEvent(new Event('custom event')); 596 }, 597 }); 598 599 target.dispatchEvent(new Event('custom event')); 600 601 assert_array_equals( 602 results, 603 [1, 'complete'], 604 "complete() can only be called once, and cannot invoke other Observer methods" 605 ); 606 }, "Subscriber#complete() cannot re-entrantly invoke itself"); 607 608 test(() => { 609 const results = []; 610 const target = new EventTarget(); 611 612 const source = new Observable((subscriber) => { 613 target.addEventListener('custom event', e => { 614 subscriber.next(1); 615 subscriber.error('not a real error'); 616 subscriber.complete(); 617 }); 618 }); 619 620 source.subscribe({ 621 next: (x) => results.push(x), 622 error: (error) => { 623 results.push('error'), 624 // Re-entrantly tries to invoke `error()`. However, this function must 625 // only ever run once. 626 target.dispatchEvent(new Event('custom event')); 627 }, 628 complete: () => results.push('complete'), 629 }); 630 631 target.dispatchEvent(new Event('custom event')); 632 633 assert_array_equals( 634 results, 635 [1, 'error'], 636 "error() can only be called once, and cannot invoke other Observer methods" 637 ); 638 }, "Subscriber#error() cannot re-entrantly invoke itself"); 639 640 test(() => { 641 const results = []; 642 let innerSubscriber = null; 643 let activeDuringTeardown1 = null; 644 let abortedDuringTeardown1 = null; 645 let activeDuringTeardown2 = null; 646 let abortedDuringTeardown2 = null; 647 648 const source = new Observable((subscriber) => { 649 assert_true(subscriber.active); 650 assert_false(subscriber.signal.aborted); 651 results.push('subscribe() callback'); 652 innerSubscriber = subscriber; 653 654 subscriber.signal.addEventListener('abort', () => { 655 assert_false(subscriber.active); 656 assert_true(subscriber.signal.aborted); 657 results.push('inner abort handler'); 658 subscriber.next('next from inner abort handler'); 659 subscriber.complete(); 660 }); 661 662 subscriber.addTeardown(() => { 663 activeDuringTeardown1 = subscriber.active; 664 abortedDuringTeardown1 = subscriber.signal.aborted; 665 results.push('teardown 1'); 666 }); 667 668 subscriber.addTeardown(() => { 669 activeDuringTeardown2 = subscriber.active; 670 abortedDuringTeardown2 = subscriber.signal.aborted; 671 results.push('teardown 2'); 672 }); 673 }); 674 675 const ac = new AbortController(); 676 source.subscribe({ 677 // This should never get called. If it is, the array assertion below will fail. 678 next: (x) => results.push(x), 679 complete: () => results.push('complete()') 680 }, {signal: ac.signal}); 681 682 ac.signal.addEventListener('abort', () => { 683 results.push('outer abort handler'); 684 assert_true(ac.signal.aborted); 685 assert_false(innerSubscriber.signal.aborted); 686 }); 687 688 assert_array_equals(results, ['subscribe() callback']); 689 ac.abort(); 690 results.push('abort() returned'); 691 // The reason the "inner" abort event handler is invoked first is because the 692 // "inner" AbortSignal is not a dependent signal (that would ordinarily get 693 // aborted after the parent, aka "outer" signal, is completely finished being 694 // aborted). Instead, the order of operations looks like this: 695 // 1. "Outer" signal begins to be aborted 696 // 2. Its abort algorithms [1] run [2]; the internal abort algorithm here is 697 // the "inner" Subscriber's "Close a subscription" [0]. 698 // a. This signals abort on the "inner" Subscriber's signal, firing the 699 // abort event 700 // b. Then, the "inner" Subscriber's teardowns run. 701 // 3. Once the "outer" signal's abort algorithms are finished, the abort 702 // event is fired [3], triggering the outer abort handler. 703 // 704 // [0]: https://wicg.github.io/observable/#close-a-subscription 705 // [1]: https://dom.spec.whatwg.org/#abortsignal-abort-algorithms 706 // [2]: https://dom.spec.whatwg.org/#ref-for-abortsignal-abort-algorithms%E2%91%A2:~:text=For%20each%20algorithm%20of%20signal%E2%80%99s%20abort%20algorithms%3A%20run%20algorithm 707 // [3]: https://dom.spec.whatwg.org/#abortsignal-signal-abort:~:text=Fire%20an%20event%20named%20abort%20at%20signal 708 assert_array_equals(results, [ 709 'subscribe() callback', 'inner abort handler', 'teardown 2', 'teardown 1', 710 'outer abort handler', 'abort() returned', 711 ]); 712 assert_false(activeDuringTeardown1, 'should not be active during teardown callback 1'); 713 assert_false(activeDuringTeardown2, 'should not be active during teardown callback 2'); 714 assert_true(abortedDuringTeardown1, 'should be aborted during teardown callback 1'); 715 assert_true(abortedDuringTeardown2, 'should be aborted during teardown callback 2'); 716 }, "Unsubscription lifecycle"); 717 718 // In the usual consumer-initiated unsubscription case, when the AbortController 719 // is aborted after subscription, teardowns run from upstream->downstream. This 720 // is because for a given Subscriber, when a downstream signal is aborted 721 // (`ac.signal` in this case), the "Close" algorithm prompts the Subscriber to 722 // first abort *its* own signal (the one accessible via `Subscriber#signal`) and 723 // then run its teardowns. 724 // 725 // This means upstream Subscribers get the first opportunity their teardowns 726 // before the control flow is returned to downstream Subscribers to run *their* 727 // teardowns (after they abort their internal signal). 728 test(() => { 729 const results = []; 730 const upstream = new Observable(subscriber => { 731 subscriber.signal.addEventListener('abort', 732 e => results.push('upstream abort handler'), {once: true}); 733 subscriber.addTeardown( 734 () => results.push(`upstream teardown. reason: ${subscriber.signal.reason}`)); 735 }); 736 const middle = new Observable(subscriber => { 737 subscriber.signal.addEventListener('abort', 738 e => results.push('middle abort handler'), {once: true}); 739 subscriber.addTeardown( 740 () => results.push(`middle teardown. reason: ${subscriber.signal.reason}`)); 741 upstream.subscribe({}, {signal: subscriber.signal}); 742 }); 743 const downstream = new Observable(subscriber => { 744 subscriber.signal.addEventListener('abort', 745 e => results.push('downstream abort handler'), {once: true}); 746 subscriber.addTeardown( 747 () => results.push(`downstream teardown. reason: ${subscriber.signal.reason}`)); 748 middle.subscribe({}, {signal: subscriber.signal}); 749 }); 750 751 const ac = new AbortController(); 752 downstream.subscribe({}, {signal: ac.signal}); 753 ac.abort('Abort!'); 754 assert_array_equals(results, [ 755 'upstream abort handler', 756 'upstream teardown. reason: Abort!', 757 'middle abort handler', 758 'middle teardown. reason: Abort!', 759 'downstream abort handler', 760 'downstream teardown. reason: Abort!', 761 ]); 762 }, "Teardowns are called in upstream->downstream order on " + 763 "consumer-initiated unsubscription"); 764 765 // This test is like the above, but asserts the exact opposite order of 766 // teardowns. This is because, since the Subscriber's signal is aborted 767 // immediately upon construction, `addTeardown()` runs teardowns synchronously 768 // in subscriber-order, which goes from downstream->upstream. 769 test(() => { 770 const results = []; 771 const upstream = new Observable(subscriber => { 772 subscriber.addTeardown( 773 () => results.push(`upstream teardown. reason: ${subscriber.signal.reason}`)); 774 }); 775 const middle = new Observable(subscriber => { 776 subscriber.addTeardown( 777 () => results.push(`middle teardown. reason: ${subscriber.signal.reason}`)); 778 upstream.subscribe({}, {signal: subscriber.signal}); 779 }); 780 const downstream = new Observable(subscriber => { 781 subscriber.addTeardown( 782 () => results.push(`downstream teardown. reason: ${subscriber.signal.reason}`)); 783 middle.subscribe({}, {signal: subscriber.signal}); 784 }); 785 786 downstream.subscribe({}, {signal: AbortSignal.abort('Initial abort')}); 787 assert_array_equals(results, [ 788 "downstream teardown. reason: Initial abort", 789 "middle teardown. reason: Initial abort", 790 "upstream teardown. reason: Initial abort", 791 ]); 792 }, "Teardowns are called in downstream->upstream order on " + 793 "consumer-initiated unsubscription with pre-aborted Signal"); 794 795 // Producer-initiated unsubscription test, capturing the ordering of abort events and teardowns. 796 test(() => { 797 const results = []; 798 799 const source = new Observable(subscriber => { 800 subscriber.addTeardown(() => results.push('source teardown')); 801 subscriber.signal.addEventListener('abort', 802 e => results.push('source abort event')); 803 }); 804 805 const middle = new Observable(subscriber => { 806 subscriber.addTeardown(() => results.push('middle teardown')); 807 subscriber.signal.addEventListener('abort', 808 e => results.push('middle abort event')); 809 810 source.subscribe(() => {}, {signal: subscriber.signal}); 811 }); 812 813 let innerSubscriber = null; 814 const downstream = new Observable(subscriber => { 815 innerSubscriber = subscriber; 816 subscriber.addTeardown(() => results.push('downstream teardown')); 817 subscriber.signal.addEventListener('abort', 818 e => results.push('downstream abort event')); 819 820 middle.subscribe(() => {}, {signal: subscriber.signal}); 821 }); 822 823 downstream.subscribe(); 824 825 // Trigger a producer-initiated unsubscription from the most-downstream Observable. 826 innerSubscriber.complete(); 827 828 assert_array_equals(results, [ 829 'source abort event', 830 'source teardown', 831 'middle abort event', 832 'middle teardown', 833 'downstream abort event', 834 'downstream teardown', 835 ]); 836 }, "Producer-initiated unsubscription in a downstream Observable fires abort " + 837 "events before each teardown, in downstream->upstream order"); 838 839 test(t => { 840 let innerSubscriber = null; 841 const source = new Observable(subscriber => { 842 innerSubscriber = subscriber; 843 subscriber.error('calling error()'); 844 }); 845 846 source.subscribe(); 847 assert_equals(innerSubscriber.signal.reason, "calling error()", 848 "Reason is set correctly"); 849 }, "Subscriber#error() value is stored as Subscriber's AbortSignal's reason"); 850 851 test(t => { 852 const source = new Observable((subscriber) => { 853 let n = 0; 854 while (!subscriber.signal.aborted) { 855 assert_true(subscriber.active); 856 subscriber.next(n++); 857 if (n > 3) { 858 assert_unreached("The subscriber should be closed by now"); 859 } 860 } 861 assert_false(subscriber.active); 862 }); 863 864 const ac = new AbortController(); 865 const results = []; 866 867 source.subscribe({ 868 next: (x) => { 869 results.push(x); 870 if (x === 2) { 871 ac.abort(); 872 } 873 }, 874 error: () => results.push('error'), 875 complete: () => results.push('complete') 876 }, {signal: ac.signal}); 877 878 assert_array_equals( 879 results, 880 [0, 1, 2], 881 "should emit values synchronously before abort" 882 ); 883 }, "Aborting a subscription should stop emitting values"); 884 885 test(() => { 886 const error = new Error("custom error"); 887 let errorReported = null; 888 889 self.addEventListener("error", e => errorReported = e, { once: true }); 890 891 const source = new Observable(() => { 892 throw error; 893 }); 894 895 try { 896 source.subscribe(); 897 } catch { 898 assert_unreached("subscriber() never throws an error"); 899 } 900 901 assert_true(errorReported !== null, "Exception was reported to global"); 902 assert_true(errorReported.message.includes("custom error"), "Error message matches"); 903 assert_greater_than(errorReported.lineno, 0, "Error lineno is greater than 0"); 904 assert_greater_than(errorReported.colno, 0, "Error lineno is greater than 0"); 905 assert_equals(errorReported.error, error, "Error object is equivalent"); 906 }, "Calling subscribe should never throw an error synchronously, initializer throws error"); 907 908 test(() => { 909 const error = new Error("custom error"); 910 let errorReported = null; 911 912 self.addEventListener("error", e => errorReported = e, { once: true }); 913 914 const source = new Observable((subscriber) => { 915 subscriber.error(error); 916 }); 917 918 try { 919 source.subscribe(); 920 } catch { 921 assert_unreached("subscriber() never throws an error"); 922 } 923 924 assert_true(errorReported !== null, "Exception was reported to global"); 925 assert_true(errorReported.message.includes("custom error"), "Error message matches"); 926 assert_greater_than(errorReported.lineno, 0, "Error lineno is greater than 0"); 927 assert_greater_than(errorReported.colno, 0, "Error lineno is greater than 0"); 928 assert_equals(errorReported.error, error, "Error object is equivalent"); 929 }, "Calling subscribe should never throw an error synchronously, subscriber pushes error"); 930 931 test(() => { 932 let addTeardownCalled = false; 933 let activeDuringTeardown; 934 935 const source = new Observable((subscriber) => { 936 subscriber.addTeardown(() => { 937 addTeardownCalled = true; 938 activeDuringTeardown = subscriber.active; 939 }); 940 }); 941 942 const ac = new AbortController(); 943 source.subscribe({}, {signal: ac.signal}); 944 945 assert_false(addTeardownCalled, "Teardown is not be called upon subscription"); 946 ac.abort(); 947 assert_true(addTeardownCalled, "Teardown is called when subscription is aborted"); 948 assert_false(activeDuringTeardown, "Teardown observers inactive subscription"); 949 }, "Teardown should be called when subscription is aborted"); 950 951 test(() => { 952 const addTeardownsCalled = []; 953 // This is used to snapshot `addTeardownsCalled` from within the subscribe 954 // callback, for assertion/comparison later. 955 let teardownsSnapshot = []; 956 const results = []; 957 958 const source = new Observable((subscriber) => { 959 subscriber.addTeardown(() => addTeardownsCalled.push("teardown 1")); 960 subscriber.addTeardown(() => addTeardownsCalled.push("teardown 2")); 961 962 subscriber.next(1); 963 subscriber.next(2); 964 subscriber.next(3); 965 subscriber.complete(); 966 967 // We don't run the actual `assert_array_equals` here because if it fails, 968 // it won't be properly caught. This is because assertion failures throw an 969 // error, and in subscriber callback, thrown errors result in 970 // `window.onerror` handlers being called, which this test file doesn't 971 // record as an error (see the first line of this file). 972 teardownsSnapshot = addTeardownsCalled; 973 }); 974 975 source.subscribe({ 976 next: (x) => results.push(x), 977 error: () => results.push("unreached"), 978 complete: () => results.push("complete"), 979 }); 980 981 assert_array_equals( 982 results, 983 [1, 2, 3, "complete"], 984 "should emit values and complete synchronously" 985 ); 986 987 assert_array_equals(teardownsSnapshot, addTeardownsCalled); 988 assert_array_equals(addTeardownsCalled, ["teardown 2", "teardown 1"], 989 "Teardowns called in LIFO order synchronously after complete()"); 990 }, "Teardowns should be called when subscription is closed by completion"); 991 992 test(() => { 993 const addTeardownsCalled = []; 994 let teardownsSnapshot = []; 995 const error = new Error("error"); 996 const results = []; 997 998 const source = new Observable((subscriber) => { 999 subscriber.addTeardown(() => addTeardownsCalled.push("teardown 1")); 1000 subscriber.addTeardown(() => addTeardownsCalled.push("teardown 2")); 1001 1002 subscriber.next(1); 1003 subscriber.next(2); 1004 subscriber.next(3); 1005 subscriber.error(error); 1006 1007 teardownsSnapshot = addTeardownsCalled; 1008 }); 1009 1010 source.subscribe({ 1011 next: (x) => results.push(x), 1012 error: (error) => results.push(error), 1013 complete: () => assert_unreached("complete should not be called"), 1014 }); 1015 1016 assert_array_equals( 1017 results, 1018 [1, 2, 3, error], 1019 "should emit values and error synchronously" 1020 ); 1021 1022 assert_array_equals(teardownsSnapshot, addTeardownsCalled); 1023 assert_array_equals(addTeardownsCalled, ["teardown 2", "teardown 1"], 1024 "Teardowns called in LIFO order synchronously after error()"); 1025 }, "Teardowns should be called when subscription is closed by subscriber pushing an error"); 1026 1027 test(() => { 1028 const addTeardownsCalled = []; 1029 const error = new Error("error"); 1030 const results = []; 1031 1032 const source = new Observable((subscriber) => { 1033 subscriber.addTeardown(() => addTeardownsCalled.push("teardown 1")); 1034 subscriber.addTeardown(() => addTeardownsCalled.push("teardown 2")); 1035 1036 subscriber.next(1); 1037 subscriber.next(2); 1038 subscriber.next(3); 1039 throw error; 1040 }); 1041 1042 source.subscribe({ 1043 next: (x) => results.push(x), 1044 error: (error) => results.push(error), 1045 complete: () => assert_unreached("complete should not be called"), 1046 }); 1047 1048 assert_array_equals( 1049 results, 1050 [1, 2, 3, error], 1051 "should emit values and error synchronously" 1052 ); 1053 1054 assert_array_equals(addTeardownsCalled, ["teardown 2", "teardown 1"], 1055 "Teardowns called in LIFO order synchronously after thrown error"); 1056 }, "Teardowns should be called when subscription is closed by subscriber throwing error"); 1057 1058 test(() => { 1059 const addTeardownsCalled = []; 1060 const results = []; 1061 let firstTeardownInvokedSynchronously = false; 1062 let secondTeardownInvokedSynchronously = false; 1063 1064 const source = new Observable((subscriber) => { 1065 subscriber.addTeardown(() => addTeardownsCalled.push("teardown 1")); 1066 if (addTeardownsCalled.length === 1) { 1067 firstTeardownInvokedSynchronously = true; 1068 } 1069 subscriber.addTeardown(() => addTeardownsCalled.push("teardown 2")); 1070 if (addTeardownsCalled.length === 2) { 1071 secondTeardownInvokedSynchronously = true; 1072 } 1073 1074 subscriber.next(1); 1075 subscriber.next(2); 1076 subscriber.next(3); 1077 subscriber.complete(); 1078 }); 1079 1080 const ac = new AbortController(); 1081 ac.abort(); 1082 source.subscribe({ 1083 next: (x) => results.push(x), 1084 error: (error) => results.push(error), 1085 complete: () => results.push('complete') 1086 }, {signal: ac.signal}); 1087 1088 assert_array_equals(results, []); 1089 assert_true(firstTeardownInvokedSynchronously, "First teardown callback is invoked during addTeardown()"); 1090 assert_true(secondTeardownInvokedSynchronously, "Second teardown callback is invoked during addTeardown()"); 1091 assert_array_equals(addTeardownsCalled, ["teardown 1", "teardown 2"], 1092 "Teardowns called synchronously upon addition end up in FIFO order"); 1093 }, "Teardowns should be called synchronously during addTeardown() if the subscription is inactive"); 1094 1095 test(() => { 1096 const results = []; 1097 let producerInvocations = 0; 1098 let teardownInvocations = 0; 1099 1100 const source = new Observable((subscriber) => { 1101 producerInvocations++; 1102 results.push('producer invoked'); 1103 subscriber.addTeardown(() => { 1104 teardownInvocations++; 1105 results.push('teardown invoked'); 1106 }); 1107 }); 1108 1109 const ac1 = new AbortController(); 1110 const ac2 = new AbortController(); 1111 1112 // First subscription. 1113 source.subscribe({}, {signal: ac1.signal}); 1114 assert_equals(producerInvocations, 1, 1115 "Producer is invoked once for first subscription"); 1116 1117 // Second subscription should reuse the same producer. 1118 source.subscribe({}, {signal: ac2.signal}); 1119 assert_equals(producerInvocations, 1, 1120 "Producer should not be invoked again for second subscription"); 1121 1122 // First unsubscribe. 1123 ac1.abort(); 1124 assert_equals(teardownInvocations, 0, 1125 "Teardown not run when first subscriber unsubscribes"); 1126 1127 // Second unsubscribe. 1128 ac2.abort(); 1129 assert_equals(teardownInvocations, 1, 1130 "Teardown should run after last subscriber unsubscribes"); 1131 1132 assert_array_equals(results, ['producer invoked', 'teardown invoked']); 1133 }, "Multiple subscriptions share the same producer and teardown runs only " + 1134 "after last subscription abort"); 1135 1136 test(() => { 1137 const results = []; 1138 let activeSubscriber = null; 1139 1140 const source = new Observable(subscriber => { 1141 activeSubscriber = subscriber; 1142 results.push('producer start'); 1143 subscriber.addTeardown(() => results.push('teardown')); 1144 }); 1145 1146 // First subscription. 1147 const ac1 = new AbortController(); 1148 source.subscribe({}, {signal: ac1.signal}); 1149 assert_array_equals(results, ['producer start']); 1150 1151 // Second subscription. 1152 const ac2 = new AbortController(); 1153 source.subscribe({}, {signal: ac2.signal}); 1154 1155 // Complete the subscription. 1156 activeSubscriber.complete(); 1157 assert_array_equals(results, ['producer start', 'teardown']); 1158 1159 // Additional subscription after complete. 1160 const ac3 = new AbortController(); 1161 source.subscribe({}, {signal: ac3.signal}); 1162 1163 assert_array_equals(results, ['producer start', 'teardown', 'producer start']); 1164 }, "New subscription after complete creates new producer"); 1165 1166 test(() => { 1167 const results = []; 1168 let producerInvocations = 0; 1169 1170 const source = new Observable(subscriber => { 1171 producerInvocations++; 1172 results.push('producer start'); 1173 subscriber.addTeardown(() => results.push('teardown')); 1174 }); 1175 1176 // Create 3 subscriptions. 1177 const ac1 = new AbortController(); 1178 const ac2 = new AbortController(); 1179 const ac3 = new AbortController(); 1180 source.subscribe({}, {signal: ac1.signal}); 1181 source.subscribe({}, {signal: ac2.signal}); 1182 source.subscribe({}, {signal: ac3.signal}); 1183 1184 assert_equals(producerInvocations, 1, "Producer should be invoked once"); 1185 1186 // Unsubscribe in a different order. 1187 ac2.abort(); 1188 results.push('after first abort'); 1189 ac1.abort(); 1190 results.push('after second abort'); 1191 ac3.abort(); 1192 results.push('after final abort'); 1193 1194 assert_array_equals(results, [ 1195 'producer start', 1196 'after first abort', 1197 'after second abort', 1198 'teardown', 1199 'after final abort' 1200 ]); 1201 }, "Teardown runs after last unsubscribe regardless of unsubscription order"); 1202 1203 test(() => { 1204 const results = []; 1205 const source = new Observable(subscriber => { 1206 subscriber.next(1); 1207 subscriber.next(2); 1208 subscriber.complete(); 1209 }); 1210 1211 source.subscribe(v => { 1212 results.push(`${v}-first-sub`); 1213 if (v === 1) { 1214 // This new subscription adds a new internal observer to the subscriber's 1215 // internal observer list, but it does not get the value `1` that the 1216 // subscriber is currently pushing. This is because the Subscriber 1217 // iterates over a snapshot of its internal observers to push values to. 1218 // The first value that this new subscription will see is `2`. 1219 // 1220 // See https://github.com/WICG/observable/pull/214. 1221 source.subscribe(v => results.push(`${v}-second-sub`)); 1222 } 1223 }); 1224 1225 assert_array_equals(results, [ 1226 "1-first-sub", 1227 1228 "2-first-sub", 1229 "2-second-sub", 1230 ]); 1231 }, "Subscriber iterates over a snapshot of its internal observers");