tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

stream.js (16157B)


      1 var expect = require('chai').expect;
      2 var util = require('./util');
      3 
      4 var stream = require('../lib/protocol/stream');
      5 var Stream = stream.Stream;
      6 
      7 function createStream() {
      8  var stream = new Stream(util.log, null);
      9  stream.upstream._window = Infinity;
     10  return stream;
     11 }
     12 
     13 // Execute a list of commands and assertions
     14 var recorded_events = ['state', 'error', 'window_update', 'headers', 'promise'];
     15 function execute_sequence(stream, sequence, done) {
     16  if (!done) {
     17    done = sequence;
     18    sequence = stream;
     19    stream = createStream();
     20  }
     21 
     22  var outgoing_frames = [];
     23 
     24  var emit = stream.emit, events = [];
     25  stream.emit = function(name) {
     26    if (recorded_events.indexOf(name) !== -1) {
     27      events.push({ name: name, data: Array.prototype.slice.call(arguments, 1) });
     28    }
     29    return emit.apply(this, arguments);
     30  };
     31 
     32  var commands = [], checks = [];
     33  sequence.forEach(function(step) {
     34    if ('method' in step || 'incoming' in step || 'outgoing' in step || 'wait' in step || 'set_state' in step) {
     35      commands.push(step);
     36    }
     37 
     38    if ('outgoing' in step || 'event' in step || 'active' in step) {
     39      checks.push(step);
     40    }
     41  });
     42 
     43  var activeCount = 0;
     44  function count_change(change) {
     45    activeCount += change;
     46  }
     47 
     48  function execute(callback) {
     49    var command = commands.shift();
     50    if (command) {
     51      if ('method' in command) {
     52        var value = stream[command.method.name].apply(stream, command.method.arguments);
     53        if (command.method.ret) {
     54          command.method.ret(value);
     55        }
     56        execute(callback);
     57      } else if ('incoming' in command) {
     58        command.incoming.count_change = count_change;
     59        stream.upstream.write(command.incoming);
     60        execute(callback);
     61      } else if ('outgoing' in command) {
     62        outgoing_frames.push(stream.upstream.read());
     63        execute(callback);
     64      } else if ('set_state' in command) {
     65        stream.state = command.set_state;
     66        execute(callback);
     67      } else if ('wait' in command) {
     68        setTimeout(execute.bind(null, callback), command.wait);
     69      } else {
     70        throw new Error('Invalid command', command);
     71      }
     72    } else {
     73      setTimeout(callback, 5);
     74    }
     75  }
     76 
     77  function check() {
     78    checks.forEach(function(check) {
     79      if ('outgoing' in check) {
     80        var frame = outgoing_frames.shift();
     81        for (var key in check.outgoing) {
     82          expect(frame).to.have.property(key).that.deep.equals(check.outgoing[key]);
     83        }
     84        count_change(frame.count_change);
     85      } else if ('event' in check) {
     86        var event = events.shift();
     87        expect(event.name).to.be.equal(check.event.name);
     88        check.event.data.forEach(function(data, index) {
     89          expect(event.data[index]).to.deep.equal(data);
     90        });
     91      } else if ('active' in check) {
     92        expect(activeCount).to.be.equal(check.active);
     93      } else {
     94        throw new Error('Invalid check', check);
     95      }
     96    });
     97    done();
     98  }
     99 
    100  setImmediate(execute.bind(null, check));
    101 }
    102 
    103 var example_frames = [
    104  { type: 'PRIORITY', flags: {}, priority: 1 },
    105  { type: 'WINDOW_UPDATE', flags: {}, settings: {} },
    106  { type: 'RST_STREAM', flags: {}, error: 'CANCEL' },
    107  { type: 'HEADERS', flags: {}, headers: {}, priority: undefined },
    108  { type: 'DATA', flags: {}, data: Buffer.alloc(5) },
    109  { type: 'PUSH_PROMISE', flags: {}, headers: {}, promised_stream: new Stream(util.log, null) }
    110 ];
    111 
    112 var invalid_incoming_frames = {
    113  IDLE: [
    114    { type: 'DATA', flags: {}, data: Buffer.alloc(5) },
    115    { type: 'WINDOW_UPDATE', flags: {}, settings: {} },
    116    { type: 'PUSH_PROMISE', flags: {}, headers: {} },
    117    { type: 'RST_STREAM', flags: {}, error: 'CANCEL' }
    118  ],
    119  RESERVED_LOCAL: [
    120    { type: 'DATA', flags: {}, data: Buffer.alloc(5) },
    121    { type: 'HEADERS', flags: {}, headers: {}, priority: undefined },
    122    { type: 'PUSH_PROMISE', flags: {}, headers: {} },
    123    { type: 'WINDOW_UPDATE', flags: {}, settings: {} }
    124  ],
    125  RESERVED_REMOTE: [
    126    { type: 'DATA', flags: {}, data: Buffer.alloc(5) },
    127    { type: 'PUSH_PROMISE', flags: {}, headers: {} },
    128    { type: 'WINDOW_UPDATE', flags: {}, settings: {} }
    129  ],
    130  OPEN: [
    131  ],
    132  HALF_CLOSED_LOCAL: [
    133  ],
    134  HALF_CLOSED_REMOTE: [
    135    { type: 'DATA', flags: {}, data: Buffer.alloc(5) },
    136    { type: 'HEADERS', flags: {}, headers: {}, priority: undefined },
    137    { type: 'PUSH_PROMISE', flags: {}, headers: {} }
    138  ]
    139 };
    140 
    141 var invalid_outgoing_frames = {
    142  IDLE: [
    143    { type: 'DATA', flags: {}, data: Buffer.alloc(5) },
    144    { type: 'WINDOW_UPDATE', flags: {}, settings: {} },
    145    { type: 'PUSH_PROMISE', flags: {}, headers: {} }
    146  ],
    147  RESERVED_LOCAL: [
    148    { type: 'DATA', flags: {}, data: Buffer.alloc(5) },
    149    { type: 'PUSH_PROMISE', flags: {}, headers: {} },
    150    { type: 'WINDOW_UPDATE', flags: {}, settings: {} }
    151  ],
    152  RESERVED_REMOTE: [
    153    { type: 'DATA', flags: {}, data: Buffer.alloc(5) },
    154    { type: 'HEADERS', flags: {}, headers: {}, priority: undefined },
    155    { type: 'PUSH_PROMISE', flags: {}, headers: {} },
    156    { type: 'WINDOW_UPDATE', flags: {}, settings: {} }
    157  ],
    158  OPEN: [
    159  ],
    160  HALF_CLOSED_LOCAL: [
    161    { type: 'DATA', flags: {}, data: Buffer.alloc(5) },
    162    { type: 'HEADERS', flags: {}, headers: {}, priority: undefined },
    163    { type: 'PUSH_PROMISE', flags: {}, headers: {} }
    164  ],
    165  HALF_CLOSED_REMOTE: [
    166  ],
    167  CLOSED: [
    168    { type: 'WINDOW_UPDATE', flags: {}, settings: {} },
    169    { type: 'HEADERS', flags: {}, headers: {}, priority: undefined },
    170    { type: 'DATA', flags: {}, data: Buffer.alloc(5) },
    171    { type: 'PUSH_PROMISE', flags: {}, headers: {}, promised_stream: new Stream(util.log, null) }
    172  ]
    173 };
    174 
    175 describe('stream.js', function() {
    176  describe('Stream class', function() {
    177    describe('._transition(sending, frame) method', function() {
    178      it('should emit error, and answer RST_STREAM for invalid incoming frames', function() {
    179        Object.keys(invalid_incoming_frames).forEach(function(state) {
    180          invalid_incoming_frames[state].forEach(function(invalid_frame) {
    181            var stream = createStream();
    182            var connectionErrorHappened = false;
    183            stream.state = state;
    184            stream.once('connectionError', function() { connectionErrorHappened = true; });
    185            stream._transition(false, invalid_frame);
    186            expect(connectionErrorHappened);
    187          });
    188        });
    189 
    190        // CLOSED state as a result of incoming END_STREAM (or RST_STREAM)
    191        var stream = createStream();
    192        stream.headers({});
    193        stream.end();
    194        stream.upstream.write({ type: 'HEADERS', headers:{}, flags: { END_STREAM: true }, count_change: util.noop });
    195        example_frames.slice(2).forEach(function(invalid_frame) {
    196          invalid_frame.count_change = util.noop;
    197          expect(stream._transition.bind(stream, false, invalid_frame)).to.throw('Uncaught, unspecified "error" event.');
    198        });
    199 
    200        // CLOSED state as a result of outgoing END_STREAM
    201        stream = createStream();
    202        stream.upstream.write({ type: 'HEADERS', headers:{}, flags: { END_STREAM: true }, count_change: util.noop });
    203        stream.headers({});
    204        stream.end();
    205        example_frames.slice(3).forEach(function(invalid_frame) {
    206          invalid_frame.count_change = util.noop;
    207          expect(stream._transition.bind(stream, false, invalid_frame)).to.throw('Uncaught, unspecified "error" event.');
    208        });
    209      });
    210      it('should throw exception for invalid outgoing frames', function() {
    211        Object.keys(invalid_outgoing_frames).forEach(function(state) {
    212          invalid_outgoing_frames[state].forEach(function(invalid_frame) {
    213            var stream = createStream();
    214            stream.state = state;
    215            expect(stream._transition.bind(stream, true, invalid_frame)).to.throw(Error);
    216          });
    217        });
    218      });
    219      it('should close the stream when there\'s an incoming or outgoing RST_STREAM', function() {
    220        [
    221          'RESERVED_LOCAL',
    222          'RESERVED_REMOTE',
    223          'OPEN',
    224          'HALF_CLOSED_LOCAL',
    225          'HALF_CLOSED_REMOTE'
    226        ].forEach(function(state) {
    227            [true, false].forEach(function(sending) {
    228              var stream = createStream();
    229              stream.state = state;
    230              stream._transition(sending, { type: 'RST_STREAM', flags: {} });
    231              expect(stream.state).to.be.equal('CLOSED');
    232            });
    233          });
    234      });
    235      it('should ignore any incoming frame after sending reset', function() {
    236        var stream = createStream();
    237        stream.reset();
    238        example_frames.forEach(stream._transition.bind(stream, false));
    239      });
    240      it('should ignore certain incoming frames after closing the stream with END_STREAM', function() {
    241        var stream = createStream();
    242        stream.upstream.write({ type: 'HEADERS', flags: { END_STREAM: true }, headers:{} });
    243        stream.headers({});
    244        stream.end();
    245        example_frames.slice(0,3).forEach(function(frame) {
    246          frame.count_change = util.noop;
    247          stream._transition(false, frame);
    248        });
    249      });
    250    });
    251  });
    252  describe('test scenario', function() {
    253    describe('sending request', function() {
    254      it('should trigger the appropriate state transitions and outgoing frames', function(done) {
    255        execute_sequence([
    256          { method  : { name: 'headers', arguments: [{ ':path': '/' }] } },
    257          { outgoing: { type: 'HEADERS', flags: { }, headers: { ':path': '/' } } },
    258          { event   : { name: 'state', data: ['OPEN'] } },
    259 
    260          { wait    : 5 },
    261          { method  : { name: 'end', arguments: [] } },
    262          { event   : { name: 'state', data: ['HALF_CLOSED_LOCAL'] } },
    263          { outgoing: { type: 'DATA', flags: { END_STREAM: true  }, data: Buffer.alloc(0) } },
    264 
    265          { wait    : 10 },
    266          { incoming: { type: 'HEADERS', flags: { }, headers: { ':status': 200 } } },
    267          { incoming: { type: 'DATA'   , flags: { END_STREAM: true  }, data: Buffer.alloc(5) } },
    268          { event   : { name: 'headers', data: [{ ':status': 200 }] } },
    269          { event   : { name: 'state', data: ['CLOSED'] } },
    270 
    271          { active  : 0 }
    272        ], done);
    273      });
    274    });
    275    describe('answering request', function() {
    276      it('should trigger the appropriate state transitions and outgoing frames', function(done) {
    277        var payload = Buffer.alloc(5);
    278        execute_sequence([
    279          { incoming: { type: 'HEADERS', flags: { }, headers: { ':path': '/' } } },
    280          { event   : { name: 'state', data: ['OPEN'] } },
    281          { event   : { name: 'headers', data: [{ ':path': '/' }] } },
    282 
    283          { wait    : 5 },
    284          { incoming: { type: 'DATA', flags: { }, data: Buffer.alloc(5) } },
    285          { incoming: { type: 'DATA', flags: { END_STREAM: true  }, data: Buffer.alloc(10) } },
    286          { event   : { name: 'state', data: ['HALF_CLOSED_REMOTE'] } },
    287 
    288          { wait    : 5 },
    289          { method  : { name: 'headers', arguments: [{ ':status': 200 }] } },
    290          { outgoing: { type: 'HEADERS', flags: { }, headers: { ':status': 200 } } },
    291 
    292          { wait    : 5 },
    293          { method  : { name: 'end', arguments: [payload] } },
    294          { outgoing: { type: 'DATA', flags: { END_STREAM: true  }, data: payload } },
    295          { event   : { name: 'state', data: ['CLOSED'] } },
    296 
    297          { active  : 0 }
    298        ], done);
    299      });
    300    });
    301    describe('sending push stream', function() {
    302      it('should trigger the appropriate state transitions and outgoing frames', function(done) {
    303        var payload = Buffer.alloc(5);
    304        var pushStream;
    305 
    306        execute_sequence([
    307          // receiving request
    308          { incoming: { type: 'HEADERS', flags: { END_STREAM: true }, headers: { ':path': '/' } } },
    309          { event   : { name: 'state', data: ['OPEN'] } },
    310          { event   : { name: 'state', data: ['HALF_CLOSED_REMOTE'] } },
    311          { event   : { name: 'headers', data: [{ ':path': '/' }] } },
    312 
    313          // sending response headers
    314          { wait    : 5 },
    315          { method  : { name: 'headers', arguments: [{ ':status': '200' }] } },
    316          { outgoing: { type: 'HEADERS', flags: {  }, headers: { ':status': '200' } } },
    317 
    318          // sending push promise
    319          { method  : { name: 'promise', arguments: [{ ':path': '/' }], ret: function(str) { pushStream = str; } } },
    320          { outgoing: { type: 'PUSH_PROMISE', flags: { }, headers: { ':path': '/' } } },
    321 
    322          // sending response data
    323          { method  : { name: 'end', arguments: [payload] } },
    324          { outgoing: { type: 'DATA', flags: { END_STREAM: true  }, data: payload } },
    325          { event   : { name: 'state', data: ['CLOSED'] } },
    326 
    327          { active  : 0 }
    328        ], function() {
    329          // initial state of the promised stream
    330          expect(pushStream.state).to.equal('RESERVED_LOCAL');
    331 
    332          execute_sequence(pushStream, [
    333            // push headers
    334            { wait    : 5 },
    335            { method  : { name: 'headers', arguments: [{ ':status': '200' }] } },
    336            { outgoing: { type: 'HEADERS', flags: { }, headers: { ':status': '200' } } },
    337            { event   : { name: 'state', data: ['HALF_CLOSED_REMOTE'] } },
    338 
    339            // push data
    340            { method  : { name: 'end', arguments: [payload] } },
    341            { outgoing: { type: 'DATA', flags: { END_STREAM: true  }, data: payload } },
    342            { event   : { name: 'state', data: ['CLOSED'] } },
    343 
    344            { active  : 1 }
    345          ], done);
    346        });
    347      });
    348    });
    349    describe('receiving push stream', function() {
    350      it('should trigger the appropriate state transitions and outgoing frames', function(done) {
    351        var payload = Buffer.alloc(5);
    352        var original_stream = createStream();
    353        var promised_stream = createStream();
    354 
    355        done = util.callNTimes(2, done);
    356 
    357        execute_sequence(original_stream, [
    358          // sending request headers
    359          { method  : { name: 'headers', arguments: [{ ':path': '/' }] } },
    360          { method  : { name: 'end', arguments: [] } },
    361          { outgoing: { type: 'HEADERS', flags: { END_STREAM: true  }, headers: { ':path': '/' } } },
    362          { event   : { name: 'state', data: ['OPEN'] } },
    363          { event   : { name: 'state', data: ['HALF_CLOSED_LOCAL'] } },
    364 
    365          // receiving response headers
    366          { wait    : 10 },
    367          { incoming: { type: 'HEADERS', flags: { }, headers: { ':status': 200 } } },
    368          { event   : { name: 'headers', data: [{ ':status': 200 }] } },
    369 
    370          // receiving push promise
    371          { incoming: { type: 'PUSH_PROMISE', flags: { }, headers: { ':path': '/2.html' }, promised_stream: promised_stream } },
    372          { event   : { name: 'promise', data: [promised_stream, { ':path': '/2.html' }] } },
    373 
    374          // receiving response data
    375          { incoming: { type: 'DATA'   , flags: { END_STREAM: true  }, data: payload } },
    376          { event   : { name: 'state', data: ['CLOSED'] } },
    377 
    378          { active  : 0 }
    379        ], done);
    380 
    381        execute_sequence(promised_stream, [
    382          // initial state of the promised stream
    383          { event   : { name: 'state', data: ['RESERVED_REMOTE'] } },
    384 
    385          // push headers
    386          { wait    : 10 },
    387          { incoming: { type: 'HEADERS', flags: { END_STREAM: false }, headers: { ':status': 200 } } },
    388          { event   : { name: 'state', data: ['HALF_CLOSED_LOCAL'] } },
    389          { event   : { name: 'headers', data: [{ ':status': 200 }] } },
    390 
    391          // push data
    392          { incoming: { type: 'DATA', flags: { END_STREAM: true  }, data: payload } },
    393          { event   : { name: 'state', data: ['CLOSED'] } },
    394 
    395          { active  : 0 }
    396        ], done);
    397      });
    398    });
    399  });
    400 
    401  describe('bunyan formatter', function() {
    402    describe('`s`', function() {
    403      var format = stream.serializers.s;
    404      it('should assign a unique ID to each frame', function() {
    405        var stream1 = createStream();
    406        var stream2 = createStream();
    407        expect(format(stream1)).to.be.equal(format(stream1));
    408        expect(format(stream2)).to.be.equal(format(stream2));
    409        expect(format(stream1)).to.not.be.equal(format(stream2));
    410      });
    411    });
    412  });
    413 });