tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

flow-control.any.js (10878B)


      1 // META: global=window,worker,shadowrealm
      2 // META: script=../resources/test-utils.js
      3 // META: script=../resources/rs-utils.js
      4 // META: script=../resources/recording-streams.js
      5 'use strict';
      6 
      7 const error1 = new Error('error1!');
      8 error1.name = 'error1';
      9 
     10 promise_test(t => {
     11 
     12  const rs = recordingReadableStream({
     13    start(controller) {
     14      controller.enqueue('a');
     15      controller.enqueue('b');
     16      controller.close();
     17    }
     18  });
     19 
     20  const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 }));
     21 
     22  const pipePromise = rs.pipeTo(ws, { preventCancel: true });
     23 
     24  // Wait and make sure it doesn't do any reading.
     25  return flushAsyncEvents().then(() => {
     26    ws.controller.error(error1);
     27  })
     28  .then(() => promise_rejects_exactly(t, error1, pipePromise, 'pipeTo must reject with the same error'))
     29  .then(() => {
     30    assert_array_equals(rs.eventsWithoutPulls, []);
     31    assert_array_equals(ws.events, []);
     32  })
     33  .then(() => readableStreamToArray(rs))
     34  .then(chunksNotPreviouslyRead => {
     35    assert_array_equals(chunksNotPreviouslyRead, ['a', 'b']);
     36  });
     37 
     38 }, 'Piping from a non-empty ReadableStream into a WritableStream that does not desire chunks');
     39 
     40 promise_test(() => {
     41 
     42  const rs = recordingReadableStream({
     43    start(controller) {
     44      controller.enqueue('b');
     45      controller.close();
     46    }
     47  });
     48 
     49  let resolveWritePromise;
     50  const ws = recordingWritableStream({
     51    write() {
     52      if (!resolveWritePromise) {
     53        // first write
     54        return new Promise(resolve => {
     55          resolveWritePromise = resolve;
     56        });
     57      }
     58      return undefined;
     59    }
     60  });
     61 
     62  const writer = ws.getWriter();
     63  const firstWritePromise = writer.write('a');
     64  assert_equals(writer.desiredSize, 0, 'after writing the writer\'s desiredSize must be 0');
     65  writer.releaseLock();
     66 
     67  // firstWritePromise won't settle until we call resolveWritePromise.
     68 
     69  const pipePromise = rs.pipeTo(ws);
     70 
     71  return flushAsyncEvents().then(() => resolveWritePromise())
     72    .then(() => Promise.all([firstWritePromise, pipePromise]))
     73    .then(() => {
     74      assert_array_equals(rs.eventsWithoutPulls, []);
     75      assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'close']);
     76    });
     77 
     78 }, 'Piping from a non-empty ReadableStream into a WritableStream that does not desire chunks, but then does');
     79 
     80 promise_test(() => {
     81 
     82  const rs = recordingReadableStream();
     83 
     84  let resolveWritePromise;
     85  const ws = recordingWritableStream({
     86    write() {
     87      if (!resolveWritePromise) {
     88        // first write
     89        return new Promise(resolve => {
     90          resolveWritePromise = resolve;
     91        });
     92      }
     93      return undefined;
     94    }
     95  });
     96 
     97  const writer = ws.getWriter();
     98  writer.write('a');
     99 
    100  return flushAsyncEvents().then(() => {
    101    assert_array_equals(ws.events, ['write', 'a']);
    102    assert_equals(writer.desiredSize, 0, 'after writing the writer\'s desiredSize must be 0');
    103    writer.releaseLock();
    104 
    105    const pipePromise = rs.pipeTo(ws);
    106 
    107    rs.controller.enqueue('b');
    108    resolveWritePromise();
    109    rs.controller.close();
    110 
    111    return pipePromise.then(() => {
    112      assert_array_equals(rs.eventsWithoutPulls, []);
    113      assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'close']);
    114    });
    115  });
    116 
    117 }, 'Piping from an empty ReadableStream into a WritableStream that does not desire chunks, but then the readable ' +
    118   'stream becomes non-empty and the writable stream starts desiring chunks');
    119 
    120 promise_test(() => {
    121  const unreadChunks = ['b', 'c', 'd'];
    122 
    123  const rs = recordingReadableStream({
    124    pull(controller) {
    125      controller.enqueue(unreadChunks.shift());
    126      if (unreadChunks.length === 0) {
    127        controller.close();
    128      }
    129    }
    130  }, new CountQueuingStrategy({ highWaterMark: 0 }));
    131 
    132  let resolveWritePromise;
    133  const ws = recordingWritableStream({
    134    write() {
    135      if (!resolveWritePromise) {
    136        // first write
    137        return new Promise(resolve => {
    138          resolveWritePromise = resolve;
    139        });
    140      }
    141      return undefined;
    142    }
    143  }, new CountQueuingStrategy({ highWaterMark: 3 }));
    144 
    145  const writer = ws.getWriter();
    146  const firstWritePromise = writer.write('a');
    147  assert_equals(writer.desiredSize, 2, 'after writing the writer\'s desiredSize must be 2');
    148  writer.releaseLock();
    149 
    150  // firstWritePromise won't settle until we call resolveWritePromise.
    151 
    152  const pipePromise = rs.pipeTo(ws);
    153 
    154  return flushAsyncEvents().then(() => {
    155    assert_array_equals(ws.events, ['write', 'a']);
    156    assert_equals(unreadChunks.length, 1, 'chunks should continue to be enqueued until the HWM is reached');
    157  }).then(() => resolveWritePromise())
    158    .then(() => Promise.all([firstWritePromise, pipePromise]))
    159    .then(() => {
    160      assert_array_equals(rs.events, ['pull', 'pull', 'pull']);
    161      assert_array_equals(ws.events, ['write', 'a', 'write', 'b','write', 'c','write', 'd', 'close']);
    162    });
    163 
    164 }, 'Piping from a ReadableStream to a WritableStream that desires more chunks before finishing with previous ones');
    165 
    166 class StepTracker {
    167  constructor() {
    168    this.waiters = [];
    169    this.wakers = [];
    170  }
    171 
    172  // Returns promise which resolves when step `n` is reached. Also schedules step n + 1 to happen shortly after the
    173  // promise is resolved.
    174  waitThenAdvance(n) {
    175    if (this.waiters[n] === undefined) {
    176      this.waiters[n] = new Promise(resolve => {
    177        this.wakers[n] = resolve;
    178      });
    179      this.waiters[n]
    180          .then(() => flushAsyncEvents())
    181          .then(() => {
    182            if (this.wakers[n + 1] !== undefined) {
    183              this.wakers[n + 1]();
    184            }
    185          });
    186    }
    187    if (n == 0) {
    188      this.wakers[0]();
    189    }
    190    return this.waiters[n];
    191  }
    192 }
    193 
    194 promise_test(() => {
    195  const steps = new StepTracker();
    196  const desiredSizes = [];
    197  const rs = recordingReadableStream({
    198    start(controller) {
    199      steps.waitThenAdvance(1).then(() => enqueue('a'));
    200      steps.waitThenAdvance(3).then(() => enqueue('b'));
    201      steps.waitThenAdvance(5).then(() => enqueue('c'));
    202      steps.waitThenAdvance(7).then(() => enqueue('d'));
    203      steps.waitThenAdvance(11).then(() => controller.close());
    204 
    205      function enqueue(chunk) {
    206        controller.enqueue(chunk);
    207        desiredSizes.push(controller.desiredSize);
    208      }
    209    }
    210  });
    211 
    212  const chunksFinishedWriting = [];
    213  const writableStartPromise = Promise.resolve();
    214  let writeCalled = false;
    215  const ws = recordingWritableStream({
    216    start() {
    217      return writableStartPromise;
    218    },
    219    write(chunk) {
    220      const waitForStep = writeCalled ? 12 : 9;
    221      writeCalled = true;
    222      return steps.waitThenAdvance(waitForStep).then(() => {
    223        chunksFinishedWriting.push(chunk);
    224      });
    225    }
    226  });
    227 
    228  return writableStartPromise.then(() => {
    229    const pipePromise = rs.pipeTo(ws);
    230    steps.waitThenAdvance(0);
    231 
    232    return Promise.all([
    233      steps.waitThenAdvance(2).then(() => {
    234        assert_array_equals(chunksFinishedWriting, [], 'at step 2, zero chunks must have finished writing');
    235        assert_array_equals(ws.events, ['write', 'a'], 'at step 2, one chunk must have been written');
    236 
    237        // When 'a' (the very first chunk) was enqueued, it was immediately used to fulfill the outstanding read request
    238        // promise, leaving the queue empty.
    239        assert_array_equals(desiredSizes, [1],
    240                            'at step 2, the desiredSize at the last enqueue (step 1) must have been 1');
    241        assert_equals(rs.controller.desiredSize, 1, 'at step 2, the current desiredSize must be 1');
    242      }),
    243 
    244      steps.waitThenAdvance(4).then(() => {
    245        assert_array_equals(chunksFinishedWriting, [], 'at step 4, zero chunks must have finished writing');
    246        assert_array_equals(ws.events, ['write', 'a'], 'at step 4, one chunk must have been written');
    247 
    248        // When 'b' was enqueued at step 3, the queue was also empty, since immediately after enqueuing 'a' at
    249        // step 1, it was dequeued in order to fulfill the read() call that was made at step 0. Thus the queue
    250        // had size 1 (thus desiredSize of 0).
    251        assert_array_equals(desiredSizes, [1, 0],
    252                            'at step 4, the desiredSize at the last enqueue (step 3) must have been 0');
    253        assert_equals(rs.controller.desiredSize, 0, 'at step 4, the current desiredSize must be 0');
    254      }),
    255 
    256      steps.waitThenAdvance(6).then(() => {
    257        assert_array_equals(chunksFinishedWriting, [], 'at step 6, zero chunks must have finished writing');
    258        assert_array_equals(ws.events, ['write', 'a'], 'at step 6, one chunk must have been written');
    259 
    260        // When 'c' was enqueued at step 5, the queue was not empty; it had 'b' in it, since 'b' will not be read until
    261        // the first write completes at step 9. Thus, the queue size is 2 after enqueuing 'c', giving a desiredSize of
    262        // -1.
    263        assert_array_equals(desiredSizes, [1, 0, -1],
    264                            'at step 6, the desiredSize at the last enqueue (step 5) must have been -1');
    265        assert_equals(rs.controller.desiredSize, -1, 'at step 6, the current desiredSize must be -1');
    266      }),
    267 
    268      steps.waitThenAdvance(8).then(() => {
    269        assert_array_equals(chunksFinishedWriting, [], 'at step 8, zero chunks must have finished writing');
    270        assert_array_equals(ws.events, ['write', 'a'], 'at step 8, one chunk must have been written');
    271 
    272        // When 'd' was enqueued at step 7, the situation is the same as before, leading to a queue containing 'b', 'c',
    273        // and 'd'.
    274        assert_array_equals(desiredSizes, [1, 0, -1, -2],
    275                            'at step 8, the desiredSize at the last enqueue (step 7) must have been -2');
    276        assert_equals(rs.controller.desiredSize, -2, 'at step 8, the current desiredSize must be -2');
    277      }),
    278 
    279      steps.waitThenAdvance(10).then(() => {
    280        assert_array_equals(chunksFinishedWriting, ['a'], 'at step 10, one chunk must have finished writing');
    281        assert_array_equals(ws.events, ['write', 'a', 'write', 'b'],
    282                            'at step 10, two chunks must have been written');
    283 
    284        assert_equals(rs.controller.desiredSize, -1, 'at step 10, the current desiredSize must be -1');
    285      }),
    286 
    287      pipePromise.then(() => {
    288        assert_array_equals(desiredSizes, [1, 0, -1, -2], 'backpressure must have been exerted at the source');
    289        assert_array_equals(chunksFinishedWriting, ['a', 'b', 'c', 'd'], 'all chunks finished writing');
    290 
    291        assert_array_equals(rs.eventsWithoutPulls, [], 'nothing unexpected should happen to the ReadableStream');
    292        assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'write', 'c', 'write', 'd', 'close'],
    293                            'all chunks were written (and the WritableStream closed)');
    294      })
    295    ]);
    296  });
    297 }, 'Piping to a WritableStream that does not consume the writes fast enough exerts backpressure on the ReadableStream');