observable-takeUntil.any.js (12668B)
1 // Because we test that the global error handler is called at various times. 2 setup({allow_uncaught_exception: true}); 3 4 promise_test(async () => { 5 const source = new Observable(subscriber => { 6 let i = 0; 7 const interval = setInterval(() => { 8 if (i < 5) { 9 subscriber.next(++i); 10 } else { 11 subscriber.complete(); 12 clearInterval(interval); 13 } 14 }, 0); 15 }); 16 17 const result = await source.takeUntil(new Observable(() => {})).toArray(); 18 assert_array_equals(result, [1, 2, 3, 4, 5]); 19 }, "takeUntil subscribes to source Observable and mirrors it uninterrupted"); 20 21 promise_test(async () => { 22 const source = new Observable(() => {}); 23 let notifierSubscribedTo = false; 24 const notifier = new Observable(() => notifierSubscribedTo = true); 25 26 source.takeUntil(notifier).subscribe(); 27 assert_true(notifierSubscribedTo); 28 }, "takeUntil subscribes to notifier"); 29 30 // This test is important because ordinarily, calling `subscriber.next()` does 31 // not cancel a subscription associated with `subscriber`. However, for the 32 // `takeUntil()` operator, the spec responds to `notifier`'s `next()` by 33 // unsubscribing from `notifier`, which is what this test asserts. 34 promise_test(async () => { 35 const results = []; 36 const source = new Observable(subscriber => { 37 results.push('source subscribe callback'); 38 subscriber.addTeardown(() => results.push('source teardown')); 39 }); 40 41 const notifier = new Observable(subscriber => { 42 subscriber.addTeardown(() => results.push('notifier teardown')); 43 44 results.push('notifier subscribe callback'); 45 // Calling `next()` causes `takeUntil()` to unsubscribe from `notifier`. 46 results.push(`notifer active before next(): ${subscriber.active}`); 47 subscriber.next('value'); 48 results.push(`notifer active after next(): ${subscriber.active}`); 49 }); 50 51 source.takeUntil(notifier).subscribe({ 52 next: () => results.push('takeUntil() next callback'), 53 error: e => results.push(`takeUntil() error callback: ${error}`), 54 complete: () => results.push('takeUntil() complete callback'), 55 }); 56 57 assert_array_equals(results, [ 58 'notifier subscribe callback', 59 'notifer active before next(): true', 60 'notifier teardown', 61 'takeUntil() complete callback', 62 'notifer active after next(): false', 63 ]); 64 }, "takeUntil: notifier next() unsubscribes from notifier"); 65 // This test is identical to the one above, with the exception being that the 66 // `notifier` calls `subscriber.error()` instead `subscriber.next()`. 67 promise_test(async () => { 68 const results = []; 69 const source = new Observable(subscriber => { 70 results.push('source subscribe callback'); 71 subscriber.addTeardown(() => results.push('source teardown')); 72 }); 73 74 const notifier = new Observable(subscriber => { 75 subscriber.addTeardown(() => results.push('notifier teardown')); 76 77 results.push('notifier subscribe callback'); 78 // Calling `next()` causes `takeUntil()` to unsubscribe from `notifier`. 79 results.push(`notifer active before error(): ${subscriber.active}`); 80 subscriber.error('error'); 81 results.push(`notifer active after error(): ${subscriber.active}`); 82 }); 83 84 source.takeUntil(notifier).subscribe({ 85 next: () => results.push('takeUntil() next callback'), 86 error: e => results.push(`takeUntil() error callback: ${error}`), 87 complete: () => results.push('takeUntil() complete callback'), 88 }); 89 90 assert_array_equals(results, [ 91 'notifier subscribe callback', 92 'notifer active before error(): true', 93 'notifier teardown', 94 'takeUntil() complete callback', 95 'notifer active after error(): false', 96 ]); 97 }, "takeUntil: notifier error() unsubscribes from notifier"); 98 // This test is identical to the above except it `throw`s instead of calling 99 // `Subscriber#error()`. 100 promise_test(async () => { 101 const results = []; 102 const source = new Observable(subscriber => { 103 results.push('source subscribe callback'); 104 subscriber.addTeardown(() => results.push('source teardown')); 105 }); 106 107 const notifier = new Observable(subscriber => { 108 subscriber.addTeardown(() => results.push('notifier teardown')); 109 110 results.push('notifier subscribe callback'); 111 // Calling `next()` causes `takeUntil()` to unsubscribe from `notifier`. 112 results.push(`notifer active before throw: ${subscriber.active}`); 113 throw new Error('custom error'); 114 // Won't run: 115 results.push(`notifer active after throw: ${subscriber.active}`); 116 }); 117 118 source.takeUntil(notifier).subscribe({ 119 next: () => results.push('takeUntil() next callback'), 120 error: e => results.push(`takeUntil() error callback: ${error}`), 121 complete: () => results.push('takeUntil() complete callback'), 122 }); 123 124 assert_array_equals(results, [ 125 'notifier subscribe callback', 126 'notifer active before throw: true', 127 'notifier teardown', 128 'takeUntil() complete callback', 129 ]); 130 }, "takeUntil: notifier throw Error unsubscribes from notifier"); 131 132 // Test that `notifier` unsubscribes from source Observable. 133 promise_test(async t => { 134 const results = []; 135 136 const source = new Observable(subscriber => { 137 results.push('source subscribed'); 138 subscriber.addTeardown(() => results.push('source teardown')); 139 subscriber.signal.addEventListener('abort', 140 e => results.push('source signal abort')); 141 }); 142 143 let notifierTeardownCalled = false; 144 const notifier = new Observable(subscriber => { 145 results.push('notifier subscribed'); 146 subscriber.addTeardown(() => { 147 results.push('notifier teardown'); 148 notifierTeardownCalled = true; 149 }); 150 subscriber.signal.addEventListener('abort', 151 e => results.push('notifier signal abort')); 152 153 // Asynchronously shut everything down. 154 t.step_timeout(() => subscriber.next('value')); 155 }); 156 157 let nextOrErrorCalled = false; 158 let notifierTeardownCalledBeforeCompleteCallback; 159 await new Promise(resolve => { 160 source.takeUntil(notifier).subscribe({ 161 next: () => {nextOrErrorCalled = true; results.push('next callback');}, 162 error: () => {nextOrErrorCalled = true; results.push('error callback');}, 163 complete: () => { 164 results.push('complete callback'); 165 notifierTeardownCalledBeforeCompleteCallback = notifierTeardownCalled; 166 resolve(); 167 }, 168 }); 169 }); 170 171 // The outer `Observer#complete()` callback is called before any teardowns are 172 // invoked. 173 assert_false(nextOrErrorCalled); 174 // The notifier/source teardowns are not called by the time the outer 175 // `Observer#complete()` callback is invoked, but they are all run *after* 176 // (i.e., before `notifier`'s `subscriber.next()` returns internally). 177 assert_true(notifierTeardownCalledBeforeCompleteCallback); 178 assert_true(notifierTeardownCalled); 179 assert_array_equals(results, [ 180 "notifier subscribed", 181 "source subscribed", 182 "notifier signal abort", 183 "notifier teardown", 184 "source signal abort", 185 "source teardown", 186 "complete callback", 187 ]); 188 }, "takeUntil: notifier next() unsubscribes from notifier & source observable"); 189 190 // This test is almost identical to the above test, however instead of the 191 // `notifier` Observable being the thing that causes the unsubscription from 192 // `notifier` and `source`, it is the outer composite Observable's 193 // `SubscribeOptions#signal` being aborted that does this. 194 promise_test(async t => { 195 const results = []; 196 // This will get populated later with a function that resolves a promise. 197 let resolver; 198 199 const source = new Observable(subscriber => { 200 results.push('source subscribed'); 201 subscriber.addTeardown(() => results.push('source teardown')); 202 subscriber.signal.addEventListener('abort', e => { 203 results.push('source signal abort'); 204 // This should be the last thing run in the whole teardown sequence. After 205 // this, we can resolve the promise that this test is waiting on, via 206 // `resolver`. That'll wrap things up and move us on to the assertions. 207 resolver(); 208 }); 209 }); 210 211 const notifier = new Observable(subscriber => { 212 results.push('notifier subscribed'); 213 subscriber.addTeardown(() => { 214 results.push('notifier teardown'); 215 }); 216 subscriber.signal.addEventListener('abort', 217 e => results.push('notifier signal abort')); 218 }); 219 220 let observerCallbackCalled = false; 221 await new Promise(resolve => { 222 resolver = resolve; 223 const controller = new AbortController(); 224 source.takeUntil(notifier).subscribe({ 225 next: () => observerCallbackCalled = true, 226 error: () => observerCallbackCalled = true, 227 complete: () => observerCallbackCalled = true, 228 }, {signal: controller.signal}); 229 230 // Asynchronously shut everything down. 231 t.step_timeout(() => controller.abort()); 232 }); 233 234 assert_false(observerCallbackCalled); 235 assert_array_equals(results, [ 236 "notifier subscribed", 237 "source subscribed", 238 "notifier signal abort", 239 "notifier teardown", 240 "source signal abort", 241 "source teardown", 242 ]); 243 }, "takeUntil()'s AbortSignal unsubscribes from notifier & source observable"); 244 245 promise_test(async () => { 246 let sourceSubscribedTo = false; 247 const source = new Observable(subscriber => { 248 sourceSubscribedTo = true; 249 }); 250 251 const notifier = new Observable(subscriber => subscriber.next('value')); 252 253 let nextOrErrorCalled = false; 254 let completeCalled = false; 255 const result = source.takeUntil(notifier).subscribe({ 256 next: v => nextOrErrorCalled = true, 257 error: e => nextOrErrorCalled = true, 258 complete: () => completeCalled = true, 259 }); 260 261 assert_false(sourceSubscribedTo); 262 assert_true(completeCalled); 263 assert_false(nextOrErrorCalled); 264 }, "takeUntil: source never subscribed to when notifier synchronously emits a value"); 265 266 promise_test(async () => { 267 let sourceSubscribedTo = false; 268 const source = new Observable(subscriber => { 269 sourceSubscribedTo = true; 270 }); 271 272 const notifier = new Observable(subscriber => subscriber.error('error')); 273 274 let nextOrErrorCalled = false; 275 let completeCalled = false; 276 const result = source.takeUntil(notifier).subscribe({ 277 next: v => nextOrErrorCalled = true, 278 error: e => nextOrErrorCalled = true, 279 complete: () => completeCalled = true, 280 }); 281 282 assert_false(sourceSubscribedTo); 283 assert_true(completeCalled); 284 assert_false(nextOrErrorCalled); 285 }, "takeUntil: source never subscribed to when notifier synchronously emits error"); 286 287 promise_test(async () => { 288 const source = new Observable(subscriber => { 289 let i = 0; 290 const interval = setInterval(() => { 291 if (i < 5) { 292 subscriber.next(++i); 293 } else { 294 subscriber.complete(); 295 clearInterval(interval); 296 } 297 }, 500); 298 }); 299 300 const notifier = new Observable(subscriber => subscriber.complete()); 301 302 const result = await source.takeUntil(notifier).toArray(); 303 assert_array_equals(result, [1, 2, 3, 4, 5]); 304 }, "takeUntil: source is uninterrupted when notifier completes, even synchronously"); 305 306 promise_test(async () => { 307 const results = []; 308 309 let sourceSubscriber; 310 let notifierSubscriber; 311 const source = new Observable(subscriber => sourceSubscriber = subscriber); 312 const notifier = new Observable(subscriber => notifierSubscriber = subscriber); 313 314 source.takeUntil(notifier).subscribe({ 315 next: v => results.push(v), 316 complete: () => results.push("complete"), 317 }); 318 319 sourceSubscriber.next(1); 320 sourceSubscriber.next(2); 321 notifierSubscriber.next('notifier value'); 322 sourceSubscriber.next(3); 323 324 assert_array_equals(results, [1, 2, 'complete']); 325 }, "takeUntil() mirrors the source Observable until its first next() value"); 326 327 promise_test(async t => { 328 let errorReported = null; 329 330 self.addEventListener("error", e => errorReported = e, { once: true }); 331 332 const source = new Observable(() => {}); 333 const notifier = new Observable(subscriber => { 334 t.step_timeout(() => { 335 subscriber.error('error 1'); 336 subscriber.error('error 2'); 337 }); 338 }); 339 340 let errorCallbackCalled = false; 341 await new Promise(resolve => { 342 source.takeUntil(notifier).subscribe({ 343 error: e => errorCallbackCalled = true, 344 complete: () => resolve(), 345 }); 346 }); 347 348 assert_false(errorCallbackCalled); 349 assert_true(errorReported !== null, "Exception was reported to global"); 350 assert_true(errorReported.message.includes("error 2"), "Error message matches"); 351 assert_greater_than(errorReported.lineno, 0, "Error lineno is greater than 0"); 352 assert_greater_than(errorReported.colno, 0, "Error lineno is greater than 0"); 353 assert_equals(errorReported.error, 'error 2', "Error object is equivalent (just a string)"); 354 }, "takeUntil: notifier calls `Subscriber#error()` twice; second goes to global error handler");