tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

observable-flatMap.any.js (9201B)


      1 test(() => {
      2  const source = new Observable(subscriber => {
      3    subscriber.next(1);
      4    subscriber.next(2);
      5    subscriber.next(3);
      6    subscriber.complete();
      7  });
      8 
      9  let projectionCalls = 0;
     10 
     11  const results = [];
     12 
     13  const flattened = source.flatMap(value => {
     14    projectionCalls++;
     15    return new Observable((subscriber) => {
     16      subscriber.next(value * 10);
     17      subscriber.next(value * 100);
     18      subscriber.complete();
     19    });
     20  });
     21 
     22  assert_true(flattened instanceof Observable, "flatMap() returns an Observable");
     23  assert_equals(projectionCalls, 0,
     24      "Projection is not called until subscription starts");
     25 
     26  flattened.subscribe({
     27    next: v => results.push(v),
     28    error: () => results.push("error"),
     29    complete: () => results.push("complete"),
     30  });
     31 
     32  assert_equals(projectionCalls, 3,
     33      "Mapper is called three times, once for each source Observable value");
     34  assert_array_equals(results, [10, 100, 20, 200, 30, 300, "complete"],
     35      "flatMap() results are correct");
     36 }, "flatMap(): Flattens simple source Observable properly");
     37 
     38 test(() => {
     39  const error = new Error("error");
     40  const source = new Observable(subscriber => {
     41    subscriber.next(1);
     42    subscriber.next(2);
     43    subscriber.error(error);
     44    subscriber.next(3);
     45  });
     46 
     47  const flattened = source.flatMap(value => {
     48    return new Observable(subscriber => {
     49      subscriber.next(value * 10);
     50      subscriber.next(value * 100);
     51      subscriber.complete();
     52    });
     53  });
     54 
     55  const results = [];
     56 
     57  flattened.subscribe({
     58    next: v => results.push(v),
     59    error: e => results.push(e),
     60    complete: () => results.push("complete"),
     61  });
     62 
     63  assert_array_equals(results, [10, 100, 20, 200, error],
     64      "Source error is passed through to the flatMap() Observable");
     65 }, "flatMap(): Returned Observable passes through source Observable errors");
     66 
     67 test(() => {
     68  const results = [];
     69  const error = new Error("error");
     70  const source = new Observable(subscriber => {
     71    subscriber.next(1);
     72    results.push(subscriber.active ? "active" : "inactive");
     73    subscriber.next(2);
     74    results.push(subscriber.active ? "active" : "inactive");
     75    subscriber.next(3);
     76    subscriber.complete();
     77  });
     78 
     79  const flattened = source.flatMap((value) => {
     80    return new Observable((subscriber) => {
     81      subscriber.next(value * 10);
     82      subscriber.next(value * 100);
     83      if (value === 2) {
     84        subscriber.error(error);
     85      } else {
     86        subscriber.complete();
     87      }
     88    });
     89  });
     90 
     91  flattened.subscribe({
     92    next: v => results.push(v),
     93    error: e => results.push(e),
     94    complete: () => results.push("complete"),
     95  });
     96 
     97  assert_array_equals(results, [10, 100, "active", 20, 200, error, "inactive"],
     98      "Inner subscription error gets surfaced");
     99 }, "flatMap(): Outer Subscription synchronously becomes inactive when an " +
    100   "'inner' Observable emits an error");
    101 
    102 test(() => {
    103  const results = [];
    104  const error = new Error("error");
    105  const source = new Observable(subscriber => {
    106    subscriber.next(1);
    107    subscriber.next(2);
    108    subscriber.next(3);
    109    results.push(subscriber.active ? "active" : "inactive");
    110    subscriber.complete();
    111  });
    112 
    113  const flattened = source.flatMap(value => {
    114    if (value === 3) {
    115      throw error;
    116    }
    117    return new Observable(subscriber => {
    118      subscriber.next(value * 10);
    119      subscriber.next(value * 100);
    120      subscriber.complete();
    121    });
    122  });
    123 
    124  flattened.subscribe({
    125    next: v => results.push(v),
    126    error: e => results.push(e),
    127    complete: () => results.push("complete"),
    128  });
    129 
    130  assert_array_equals(results, [10, 100, 20, 200, error, "inactive"],
    131      "Inner subscriber thrown error gets surfaced");
    132 }, "flatMap(): Outer Subscription synchronously becomes inactive when an " +
    133   "'inner' Observable throws an error");
    134 
    135 test(() => {
    136  const source = createTestSubject();
    137  const inner1 = createTestSubject();
    138  const inner2 = createTestSubject();
    139 
    140  const flattened = source.flatMap(value => {
    141    if (value === 1) {
    142      return inner1;
    143    }
    144 
    145    return inner2;
    146  });
    147 
    148  const results = [];
    149 
    150  flattened.subscribe({
    151    next: v => results.push(v),
    152    error: e => results.push(e),
    153    complete: () => results.push("complete"),
    154  });
    155 
    156  assert_array_equals(results, []);
    157 
    158  source.next(1);
    159  assert_equals(inner1.subscriberCount(), 1, "inner1 gets subscribed to");
    160 
    161  source.next(2);
    162  assert_equals(inner2.subscriberCount(), 0,
    163      "inner2 is queued, not subscribed to until inner1 completes");
    164 
    165  assert_array_equals(results, []);
    166 
    167  inner1.next(100);
    168  inner1.next(101);
    169 
    170  assert_array_equals(results, [100, 101]);
    171 
    172  inner1.complete();
    173  assert_equals(inner1.subscriberCount(), 0,
    174      "inner1 becomes inactive once it completes");
    175  assert_equals(inner2.subscriberCount(), 1,
    176      "inner2 gets un-queued and subscribed to once inner1 completes");
    177 
    178  inner2.next(200);
    179  inner2.next(201);
    180  assert_array_equals(results, [100, 101, 200, 201]);
    181 
    182  inner2.complete();
    183  assert_equals(inner2.subscriberCount(), 0,
    184      "inner2 becomes inactive once it completes");
    185  assert_equals(source.subscriberCount(), 1,
    186      "source is not unsubscribed from yet, since it has not completed");
    187  assert_array_equals(results, [100, 101, 200, 201]);
    188 
    189  source.complete();
    190  assert_equals(source.subscriberCount(), 0,
    191      "source unsubscribed from after it completes");
    192 
    193  assert_array_equals(results, [100, 101, 200, 201, "complete"]);
    194 }, "flatMap(): result Observable does not complete until source and inner " +
    195   "Observables all complete");
    196 
    197 test(() => {
    198  const source = createTestSubject();
    199  const inner1 = createTestSubject();
    200  const inner2 = createTestSubject();
    201 
    202  const flattened = source.flatMap(value => {
    203    if (value === 1) {
    204      return inner1;
    205    }
    206 
    207    return inner2;
    208  });
    209 
    210  const results = [];
    211 
    212  flattened.subscribe({
    213    next: v => results.push(v),
    214    error: e => results.push(e),
    215    complete: () => results.push("complete"),
    216  });
    217 
    218  assert_array_equals(results, []);
    219 
    220  source.next(1);
    221  source.next(2);
    222  assert_equals(inner1.subscriberCount(), 1, "inner1 gets subscribed to");
    223  assert_equals(inner2.subscriberCount(), 0,
    224      "inner2 is queued, not subscribed to until inner1 completes");
    225 
    226  assert_array_equals(results, []);
    227 
    228  // Before `inner1` pushes any values, we first complete the source Observable.
    229  // This will not fire completion of the Observable returned from `flatMap()`,
    230  // because there are two values (corresponding to inner Observables) that are
    231  // queued to the inner queue that need to be processed first. Once the last
    232  // one of *those* completes (i.e., `inner2.complete()` further down), then the
    233  // returned Observable can finally complete.
    234  source.complete();
    235  assert_equals(source.subscriberCount(), 0,
    236      "source becomes inactive once it completes");
    237 
    238  inner1.next(100);
    239  inner1.next(101);
    240 
    241  assert_array_equals(results, [100, 101]);
    242 
    243  inner1.complete();
    244  assert_array_equals(results, [100, 101],
    245      "Outer completion not triggered after inner1 completes");
    246  assert_equals(inner2.subscriberCount(), 1,
    247      "inner2 gets un-queued and subscribed after inner1 completes");
    248 
    249  inner2.next(200);
    250  inner2.next(201);
    251  assert_array_equals(results, [100, 101, 200, 201]);
    252 
    253  inner2.complete();
    254  assert_equals(inner2.subscriberCount(), 0,
    255      "inner2 becomes inactive once it completes");
    256  assert_array_equals(results, [100, 101, 200, 201, "complete"]);
    257 }, "flatMap(): result Observable does not complete after source Observable " +
    258   "completes while there are still queued inner Observables to process " +
    259   "Observables all complete");
    260 
    261 test(() => {
    262  const source = createTestSubject();
    263  const inner = createTestSubject();
    264  const result = source.flatMap(() => inner);
    265 
    266  const ac = new AbortController();
    267 
    268  result.subscribe({}, { signal: ac.signal, });
    269 
    270  source.next(1);
    271 
    272  assert_equals(inner.subscriberCount(), 1,
    273      "inner Observable subscribed to once source emits it");
    274 
    275  ac.abort();
    276 
    277  assert_equals(source.subscriberCount(), 0,
    278      "source unsubscribed from, once outer signal is aborted");
    279 
    280  assert_equals(inner.subscriberCount(), 0,
    281      "inner Observable unsubscribed from once the outer Observable is " +
    282      "subscribed from, as a result of the outer signal being aborted");
    283 }, "flatMap(): source and inner active Observables are both unsubscribed " +
    284   "from once the outer subscription signal is aborted");
    285 
    286 // A helper function to create an Observable that can be externally controlled
    287 // and examined for testing purposes.
    288 function createTestSubject() {
    289  const subscribers = new Set();
    290  const subject = new Observable(subscriber => {
    291    subscribers.add(subscriber);
    292    subscriber.addTeardown(() => subscribers.delete(subscriber));
    293  });
    294 
    295  subject.next = value => {
    296    for (const subscriber of Array.from(subscribers)) {
    297      subscriber.next(value);
    298    }
    299  };
    300  subject.error = error => {
    301    for (const subscriber of Array.from(subscribers)) {
    302      subscriber.error(error);
    303    }
    304  };
    305  subject.complete = () => {
    306    for (const subscriber of Array.from(subscribers)) {
    307      subscriber.complete();
    308    }
    309  };
    310  subject.subscriberCount = () => {
    311    return subscribers.size;
    312  };
    313 
    314  return subject;
    315 }