observable-flatMap.any.js (9201B)
1 test(() => { 2 const source = new Observable(subscriber => { 3 subscriber.next(1); 4 subscriber.next(2); 5 subscriber.next(3); 6 subscriber.complete(); 7 }); 8 9 let projectionCalls = 0; 10 11 const results = []; 12 13 const flattened = source.flatMap(value => { 14 projectionCalls++; 15 return new Observable((subscriber) => { 16 subscriber.next(value * 10); 17 subscriber.next(value * 100); 18 subscriber.complete(); 19 }); 20 }); 21 22 assert_true(flattened instanceof Observable, "flatMap() returns an Observable"); 23 assert_equals(projectionCalls, 0, 24 "Projection is not called until subscription starts"); 25 26 flattened.subscribe({ 27 next: v => results.push(v), 28 error: () => results.push("error"), 29 complete: () => results.push("complete"), 30 }); 31 32 assert_equals(projectionCalls, 3, 33 "Mapper is called three times, once for each source Observable value"); 34 assert_array_equals(results, [10, 100, 20, 200, 30, 300, "complete"], 35 "flatMap() results are correct"); 36 }, "flatMap(): Flattens simple source Observable properly"); 37 38 test(() => { 39 const error = new Error("error"); 40 const source = new Observable(subscriber => { 41 subscriber.next(1); 42 subscriber.next(2); 43 subscriber.error(error); 44 subscriber.next(3); 45 }); 46 47 const flattened = source.flatMap(value => { 48 return new Observable(subscriber => { 49 subscriber.next(value * 10); 50 subscriber.next(value * 100); 51 subscriber.complete(); 52 }); 53 }); 54 55 const results = []; 56 57 flattened.subscribe({ 58 next: v => results.push(v), 59 error: e => results.push(e), 60 complete: () => results.push("complete"), 61 }); 62 63 assert_array_equals(results, [10, 100, 20, 200, error], 64 "Source error is passed through to the flatMap() Observable"); 65 }, "flatMap(): Returned Observable passes through source Observable errors"); 66 67 test(() => { 68 const results = []; 69 const error = new Error("error"); 70 const source = new Observable(subscriber => { 71 subscriber.next(1); 72 results.push(subscriber.active ? "active" : "inactive"); 73 subscriber.next(2); 74 results.push(subscriber.active ? "active" : "inactive"); 75 subscriber.next(3); 76 subscriber.complete(); 77 }); 78 79 const flattened = source.flatMap((value) => { 80 return new Observable((subscriber) => { 81 subscriber.next(value * 10); 82 subscriber.next(value * 100); 83 if (value === 2) { 84 subscriber.error(error); 85 } else { 86 subscriber.complete(); 87 } 88 }); 89 }); 90 91 flattened.subscribe({ 92 next: v => results.push(v), 93 error: e => results.push(e), 94 complete: () => results.push("complete"), 95 }); 96 97 assert_array_equals(results, [10, 100, "active", 20, 200, error, "inactive"], 98 "Inner subscription error gets surfaced"); 99 }, "flatMap(): Outer Subscription synchronously becomes inactive when an " + 100 "'inner' Observable emits an error"); 101 102 test(() => { 103 const results = []; 104 const error = new Error("error"); 105 const source = new Observable(subscriber => { 106 subscriber.next(1); 107 subscriber.next(2); 108 subscriber.next(3); 109 results.push(subscriber.active ? "active" : "inactive"); 110 subscriber.complete(); 111 }); 112 113 const flattened = source.flatMap(value => { 114 if (value === 3) { 115 throw error; 116 } 117 return new Observable(subscriber => { 118 subscriber.next(value * 10); 119 subscriber.next(value * 100); 120 subscriber.complete(); 121 }); 122 }); 123 124 flattened.subscribe({ 125 next: v => results.push(v), 126 error: e => results.push(e), 127 complete: () => results.push("complete"), 128 }); 129 130 assert_array_equals(results, [10, 100, 20, 200, error, "inactive"], 131 "Inner subscriber thrown error gets surfaced"); 132 }, "flatMap(): Outer Subscription synchronously becomes inactive when an " + 133 "'inner' Observable throws an error"); 134 135 test(() => { 136 const source = createTestSubject(); 137 const inner1 = createTestSubject(); 138 const inner2 = createTestSubject(); 139 140 const flattened = source.flatMap(value => { 141 if (value === 1) { 142 return inner1; 143 } 144 145 return inner2; 146 }); 147 148 const results = []; 149 150 flattened.subscribe({ 151 next: v => results.push(v), 152 error: e => results.push(e), 153 complete: () => results.push("complete"), 154 }); 155 156 assert_array_equals(results, []); 157 158 source.next(1); 159 assert_equals(inner1.subscriberCount(), 1, "inner1 gets subscribed to"); 160 161 source.next(2); 162 assert_equals(inner2.subscriberCount(), 0, 163 "inner2 is queued, not subscribed to until inner1 completes"); 164 165 assert_array_equals(results, []); 166 167 inner1.next(100); 168 inner1.next(101); 169 170 assert_array_equals(results, [100, 101]); 171 172 inner1.complete(); 173 assert_equals(inner1.subscriberCount(), 0, 174 "inner1 becomes inactive once it completes"); 175 assert_equals(inner2.subscriberCount(), 1, 176 "inner2 gets un-queued and subscribed to once inner1 completes"); 177 178 inner2.next(200); 179 inner2.next(201); 180 assert_array_equals(results, [100, 101, 200, 201]); 181 182 inner2.complete(); 183 assert_equals(inner2.subscriberCount(), 0, 184 "inner2 becomes inactive once it completes"); 185 assert_equals(source.subscriberCount(), 1, 186 "source is not unsubscribed from yet, since it has not completed"); 187 assert_array_equals(results, [100, 101, 200, 201]); 188 189 source.complete(); 190 assert_equals(source.subscriberCount(), 0, 191 "source unsubscribed from after it completes"); 192 193 assert_array_equals(results, [100, 101, 200, 201, "complete"]); 194 }, "flatMap(): result Observable does not complete until source and inner " + 195 "Observables all complete"); 196 197 test(() => { 198 const source = createTestSubject(); 199 const inner1 = createTestSubject(); 200 const inner2 = createTestSubject(); 201 202 const flattened = source.flatMap(value => { 203 if (value === 1) { 204 return inner1; 205 } 206 207 return inner2; 208 }); 209 210 const results = []; 211 212 flattened.subscribe({ 213 next: v => results.push(v), 214 error: e => results.push(e), 215 complete: () => results.push("complete"), 216 }); 217 218 assert_array_equals(results, []); 219 220 source.next(1); 221 source.next(2); 222 assert_equals(inner1.subscriberCount(), 1, "inner1 gets subscribed to"); 223 assert_equals(inner2.subscriberCount(), 0, 224 "inner2 is queued, not subscribed to until inner1 completes"); 225 226 assert_array_equals(results, []); 227 228 // Before `inner1` pushes any values, we first complete the source Observable. 229 // This will not fire completion of the Observable returned from `flatMap()`, 230 // because there are two values (corresponding to inner Observables) that are 231 // queued to the inner queue that need to be processed first. Once the last 232 // one of *those* completes (i.e., `inner2.complete()` further down), then the 233 // returned Observable can finally complete. 234 source.complete(); 235 assert_equals(source.subscriberCount(), 0, 236 "source becomes inactive once it completes"); 237 238 inner1.next(100); 239 inner1.next(101); 240 241 assert_array_equals(results, [100, 101]); 242 243 inner1.complete(); 244 assert_array_equals(results, [100, 101], 245 "Outer completion not triggered after inner1 completes"); 246 assert_equals(inner2.subscriberCount(), 1, 247 "inner2 gets un-queued and subscribed after inner1 completes"); 248 249 inner2.next(200); 250 inner2.next(201); 251 assert_array_equals(results, [100, 101, 200, 201]); 252 253 inner2.complete(); 254 assert_equals(inner2.subscriberCount(), 0, 255 "inner2 becomes inactive once it completes"); 256 assert_array_equals(results, [100, 101, 200, 201, "complete"]); 257 }, "flatMap(): result Observable does not complete after source Observable " + 258 "completes while there are still queued inner Observables to process " + 259 "Observables all complete"); 260 261 test(() => { 262 const source = createTestSubject(); 263 const inner = createTestSubject(); 264 const result = source.flatMap(() => inner); 265 266 const ac = new AbortController(); 267 268 result.subscribe({}, { signal: ac.signal, }); 269 270 source.next(1); 271 272 assert_equals(inner.subscriberCount(), 1, 273 "inner Observable subscribed to once source emits it"); 274 275 ac.abort(); 276 277 assert_equals(source.subscriberCount(), 0, 278 "source unsubscribed from, once outer signal is aborted"); 279 280 assert_equals(inner.subscriberCount(), 0, 281 "inner Observable unsubscribed from once the outer Observable is " + 282 "subscribed from, as a result of the outer signal being aborted"); 283 }, "flatMap(): source and inner active Observables are both unsubscribed " + 284 "from once the outer subscription signal is aborted"); 285 286 // A helper function to create an Observable that can be externally controlled 287 // and examined for testing purposes. 288 function createTestSubject() { 289 const subscribers = new Set(); 290 const subject = new Observable(subscriber => { 291 subscribers.add(subscriber); 292 subscriber.addTeardown(() => subscribers.delete(subscriber)); 293 }); 294 295 subject.next = value => { 296 for (const subscriber of Array.from(subscribers)) { 297 subscriber.next(value); 298 } 299 }; 300 subject.error = error => { 301 for (const subscriber of Array.from(subscribers)) { 302 subscriber.error(error); 303 } 304 }; 305 subject.complete = () => { 306 for (const subscriber of Array.from(subscribers)) { 307 subscriber.complete(); 308 } 309 }; 310 subject.subscriberCount = () => { 311 return subscribers.size; 312 }; 313 314 return subject; 315 }