tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

flow.js (9314B)


      1 var expect = require('chai').expect;
      2 var util = require('./util');
      3 
      4 var Flow = require('../lib/protocol/flow').Flow;
      5 
      6 var MAX_PAYLOAD_SIZE = 16384;
      7 
      8 function createFlow(log) {
      9  var flowControlId = util.random(10, 100);
     10  var flow = new Flow(flowControlId);
     11  flow._log = util.log.child(log || {});
     12  return flow;
     13 }
     14 
     15 describe('flow.js', function() {
     16  describe('Flow class', function() {
     17    var flow;
     18    beforeEach(function() {
     19      flow = createFlow();
     20    });
     21 
     22    describe('._receive(frame, callback) method', function() {
     23      it('is called when there\'s a frame in the input buffer to be consumed', function(done) {
     24        var frame = { type: 'PRIORITY', flags: {}, priority: 1 };
     25        flow._receive = function _receive(receivedFrame, callback) {
     26          expect(receivedFrame).to.equal(frame);
     27          callback();
     28        };
     29        flow.write(frame, done);
     30      });
     31      it('has to be overridden by the child class, otherwise it throws', function() {
     32        expect(flow._receive.bind(flow)).to.throw(Error);
     33      });
     34    });
     35    describe('._send() method', function() {
     36      it('is called when the output buffer should be filled with more frames and the flow' +
     37         'control queue is empty', function() {
     38        var notFlowControlledFrame = { type: 'PRIORITY', flags: {}, priority: 1 };
     39        flow._send = function _send() {
     40          this.push(notFlowControlledFrame);
     41        };
     42        expect(flow.read()).to.equal(notFlowControlledFrame);
     43 
     44        flow._window = 0;
     45        flow._queue.push({ type: 'DATA', flags: {}, data: { length: 1 } });
     46        var frame = flow.read();
     47        while (frame.type === notFlowControlledFrame.type) frame = flow.read();
     48        expect(frame.type).to.equal('BLOCKED');
     49        expect(flow.read()).to.equal(null);
     50      });
     51      it('has to be overridden by the child class, otherwise it throws', function() {
     52        expect(flow._send.bind(flow)).to.throw(Error);
     53      });
     54    });
     55    describe('._increaseWindow(size) method', function() {
     56      it('should increase `this._window` by `size`', function() {
     57        flow._send = util.noop;
     58        flow._window = 0;
     59 
     60        var increase1 = util.random(0,100);
     61        var increase2 = util.random(0,100);
     62        flow._increaseWindow(increase1);
     63        flow._increaseWindow(increase2);
     64        expect(flow._window).to.equal(increase1 + increase2);
     65 
     66        flow._increaseWindow(Infinity);
     67        expect(flow._window).to.equal(Infinity);
     68      });
     69      it('should emit error when increasing with a finite `size` when `_window` is infinite', function() {
     70        flow._send = util.noop;
     71        flow._increaseWindow(Infinity);
     72        var increase = util.random(1,100);
     73 
     74        expect(flow._increaseWindow.bind(flow, increase)).to.throw('Uncaught, unspecified "error" event.');
     75      });
     76      it('should emit error when `_window` grows over the window limit', function() {
     77        var WINDOW_SIZE_LIMIT = Math.pow(2, 31) - 1;
     78        flow._send = util.noop;
     79        flow._window = 0;
     80 
     81        flow._increaseWindow(WINDOW_SIZE_LIMIT);
     82        expect(flow._increaseWindow.bind(flow, 1)).to.throw('Uncaught, unspecified "error" event.');
     83 
     84      });
     85    });
     86    describe('.read() method', function() {
     87      describe('when the flow control queue is not empty', function() {
     88        it('should return the first item in the queue if the window is enough', function() {
     89          var priorityFrame = { type: 'PRIORITY', flags: {}, priority: 1 };
     90          var dataFrame = { type: 'DATA', flags: {}, data: { length: 10 } };
     91          flow._send = util.noop;
     92          flow._window = 10;
     93          flow._queue = [priorityFrame, dataFrame];
     94 
     95          expect(flow.read()).to.equal(priorityFrame);
     96          expect(flow.read()).to.equal(dataFrame);
     97        });
     98        it('should also split DATA frames when needed', function() {
     99          var buffer = Buffer.alloc(10);
    100          var dataFrame = { type: 'DATA', flags: {}, stream: util.random(0, 100), data: buffer };
    101          flow._send = util.noop;
    102          flow._window = 5;
    103          flow._queue = [dataFrame];
    104 
    105          var expectedFragment = { flags: {}, type: 'DATA', stream: dataFrame.stream, data: buffer.slice(0,5) };
    106          expect(flow.read()).to.deep.equal(expectedFragment);
    107          expect(dataFrame.data).to.deep.equal(buffer.slice(5));
    108        });
    109      });
    110    });
    111    describe('.push(frame) method', function() {
    112      it('should push `frame` into the output queue or the flow control queue', function() {
    113        var priorityFrame = { type: 'PRIORITY', flags: {}, priority: 1 };
    114        var dataFrame = { type: 'DATA', flags: {}, data: { length: 10 } };
    115        flow._window = 10;
    116 
    117        flow.push(dataFrame);     // output queue
    118        flow.push(dataFrame);     // flow control queue, because of depleted window
    119        flow.push(priorityFrame); // flow control queue, because it's not empty
    120 
    121        expect(flow.read()).to.be.equal(dataFrame);
    122        expect(flow._queue[0]).to.be.equal(dataFrame);
    123        expect(flow._queue[1]).to.be.equal(priorityFrame);
    124      });
    125    });
    126    describe('.write() method', function() {
    127      it('call with a DATA frame should trigger sending WINDOW_UPDATE if remote flow control is not' +
    128         'disabled', function(done) {
    129        flow._window = 100;
    130        flow._send = util.noop;
    131        flow._receive = function(frame, callback) {
    132          callback();
    133        };
    134 
    135        var buffer = Buffer.alloc(util.random(10, 100));
    136        flow.write({ type: 'DATA', flags: {}, data: buffer });
    137        flow.once('readable', function() {
    138          expect(flow.read()).to.be.deep.equal({
    139            type: 'WINDOW_UPDATE',
    140            flags: {},
    141            stream: flow._flowControlId,
    142            window_size: buffer.length
    143          });
    144          done();
    145        });
    146      });
    147    });
    148  });
    149  describe('test scenario', function() {
    150    var flow1, flow2;
    151    beforeEach(function() {
    152      flow1 = createFlow({ flow: 1 });
    153      flow2 = createFlow({ flow: 2 });
    154      flow1._flowControlId = flow2._flowControlId;
    155      flow1._send = flow2._send = util.noop;
    156      flow1._receive = flow2._receive = function(frame, callback) { callback(); };
    157    });
    158 
    159    describe('sending a large data stream', function() {
    160      it('should work as expected', function(done) {
    161        // Sender side
    162        var frameNumber = util.random(5, 8);
    163        var input = [];
    164        flow1._send = function _send() {
    165          if (input.length >= frameNumber) {
    166            this.push({ type: 'DATA', flags: { END_STREAM: true }, data: Buffer.alloc(0) });
    167            this.push(null);
    168          } else {
    169            var buffer = Buffer.allocUnsafe(util.random(1000, 100000));
    170            input.push(buffer);
    171            this.push({ type: 'DATA', flags: {}, data: buffer });
    172          }
    173        };
    174 
    175        // Receiver side
    176        var output = [];
    177        flow2._receive = function _receive(frame, callback) {
    178          if (frame.type === 'DATA') {
    179            expect(frame.data.length).to.be.lte(MAX_PAYLOAD_SIZE);
    180            output.push(frame.data);
    181          }
    182          if (frame.flags.END_STREAM) {
    183            this.emit('end_stream');
    184          }
    185          callback();
    186        };
    187 
    188        // Checking results
    189        flow2.on('end_stream', function() {
    190          input = util.concat(input);
    191          output = util.concat(output);
    192 
    193          expect(input).to.deep.equal(output);
    194 
    195          done();
    196        });
    197 
    198        // Start piping
    199        flow1.pipe(flow2).pipe(flow1);
    200      });
    201    });
    202 
    203    describe('when running out of window', function() {
    204      it('should send a BLOCKED frame', function(done) {
    205        // Sender side
    206        var frameNumber = util.random(5, 8);
    207        var input = [];
    208        flow1._send = function _send() {
    209          if (input.length >= frameNumber) {
    210            this.push({ type: 'DATA', flags: { END_STREAM: true }, data: Buffer.alloc(0) });
    211            this.push(null);
    212          } else {
    213            var buffer = Buffer.allocUnsafe(util.random(1000, 100000));
    214            input.push(buffer);
    215            this.push({ type: 'DATA', flags: {}, data: buffer });
    216          }
    217        };
    218 
    219        // Receiver side
    220        // Do not send WINDOW_UPDATESs except when the other side sends BLOCKED
    221        var output = [];
    222        flow2._restoreWindow = util.noop;
    223        flow2._receive = function _receive(frame, callback) {
    224          if (frame.type === 'DATA') {
    225            expect(frame.data.length).to.be.lte(MAX_PAYLOAD_SIZE);
    226            output.push(frame.data);
    227          }
    228          if (frame.flags.END_STREAM) {
    229            this.emit('end_stream');
    230          }
    231          if (frame.type === 'BLOCKED') {
    232            setTimeout(function() {
    233              this._push({
    234                type: 'WINDOW_UPDATE',
    235                flags: {},
    236                stream: this._flowControlId,
    237                window_size: this._received
    238              });
    239              this._received = 0;
    240            }.bind(this), 20);
    241          }
    242          callback();
    243        };
    244 
    245        // Checking results
    246        flow2.on('end_stream', function() {
    247          input = util.concat(input);
    248          output = util.concat(output);
    249 
    250          expect(input).to.deep.equal(output);
    251 
    252          done();
    253        });
    254 
    255        // Start piping
    256        flow1.pipe(flow2).pipe(flow1);
    257      });
    258    });
    259  });
    260 });