tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

connection.js (21598B)


      1 var assert = require('assert');
      2 
      3 // The Connection class
      4 // ====================
      5 
      6 // The Connection class manages HTTP/2 connections. Each instance corresponds to one transport
      7 // stream (TCP stream). It operates by sending and receiving frames and is implemented as a
      8 // [Flow](flow.html) subclass.
      9 
     10 var Flow = require('./flow').Flow;
     11 
     12 exports.Connection = Connection;
     13 
     14 // Public API
     15 // ----------
     16 
     17 // * **new Connection(log, firstStreamId, settings)**: create a new Connection
     18 //
     19 // * **Event: 'error' (type)**: signals a connection level error made by the other end
     20 //
     21 // * **Event: 'peerError' (type)**: signals the receipt of a GOAWAY frame that contains an error
     22 //   code other than NO_ERROR
     23 //
     24 // * **Event: 'stream' (stream)**: signals that there's an incoming stream
     25 //
     26 // * **createStream(): stream**: initiate a new stream
     27 //
     28 // * **set(settings, callback)**: change the value of one or more settings according to the
     29 //   key-value pairs of `settings`. The callback is called after the peer acknowledged the changes.
     30 //
     31 // * **ping([callback])**: send a ping and call callback when the answer arrives
     32 //
     33 // * **close([error])**: close the stream with an error code
     34 
     35 // Constructor
     36 // -----------
     37 
     38 // The main aspects of managing the connection are:
     39 function Connection(log, firstStreamId, settings) {
     40  // * initializing the base class
     41  Flow.call(this, 0);
     42 
     43  // * logging: every method uses the common logger object
     44  this._log = log.child({ component: 'connection' });
     45 
     46  // * stream management
     47  this._initializeStreamManagement(firstStreamId);
     48 
     49  // * lifecycle management
     50  this._initializeLifecycleManagement();
     51 
     52  // * flow control
     53  this._initializeFlowControl();
     54 
     55  // * settings management
     56  this._initializeSettingsManagement(settings);
     57 
     58  // * multiplexing
     59  this._initializeMultiplexing();
     60 }
     61 Connection.prototype = Object.create(Flow.prototype, { constructor: { value: Connection } });
     62 
     63 // Overview
     64 // --------
     65 
     66 //              |    ^             |    ^
     67 //              v    |             v    |
     68 //         +--------------+   +--------------+
     69 //     +---|   stream1    |---|   stream2    |----      ....      ---+
     70 //     |   | +----------+ |   | +----------+ |                       |
     71 //     |   | | stream1. | |   | | stream2. | |                       |
     72 //     |   +-| upstream |-+   +-| upstream |-+                       |
     73 //     |     +----------+       +----------+                         |
     74 //     |       |     ^             |     ^                           |
     75 //     |       v     |             v     |                           |
     76 //     |       +-----+-------------+-----+--------      ....         |
     77 //     |       ^     |             |     |                           |
     78 //     |       |     v             |     |                           |
     79 //     |   +--------------+        |     |                           |
     80 //     |   |   stream0    |        |     |                           |
     81 //     |   |  connection  |        |     |                           |
     82 //     |   |  management  |     multiplexing                         |
     83 //     |   +--------------+     flow control                         |
     84 //     |                           |     ^                           |
     85 //     |                   _read() |     | _write()                  |
     86 //     |                           v     |                           |
     87 //     |                +------------+ +-----------+                 |
     88 //     |                |output queue| |input queue|                 |
     89 //     +----------------+------------+-+-----------+-----------------+
     90 //                                 |     ^
     91 //                          read() |     | write()
     92 //                                 v     |
     93 
     94 // Stream management
     95 // -----------------
     96 
     97 var Stream  = require('./stream').Stream;
     98 
     99 // Initialization:
    100 Connection.prototype._initializeStreamManagement = function _initializeStreamManagement(firstStreamId) {
    101  // * streams are stored in two data structures:
    102  //   * `_streamIds` is an id -> stream map of the streams that are allowed to receive frames.
    103  //   * `_streamPriorities` is a priority -> [stream] map of stream that allowed to send frames.
    104  this._streamIds = [];
    105  this._streamPriorities = [];
    106 
    107  // * The next outbound stream ID and the last inbound stream id
    108  this._nextStreamId = firstStreamId;
    109  this._lastIncomingStream = 0;
    110 
    111  // * Calling `_writeControlFrame` when there's an incoming stream with 0 as stream ID
    112  this._streamIds[0] = { upstream: { write: this._writeControlFrame.bind(this) } };
    113 
    114  // * By default, the number of concurrent outbound streams is not limited. The `_streamLimit` can
    115  //   be set by the SETTINGS_MAX_CONCURRENT_STREAMS setting.
    116  this._streamSlotsFree = Infinity;
    117  this._streamLimit = Infinity;
    118  this.on('RECEIVING_SETTINGS_MAX_CONCURRENT_STREAMS', this._updateStreamLimit);
    119 };
    120 
    121 // `_writeControlFrame` is called when there's an incoming frame in the `_control` stream. It
    122 // broadcasts the message by creating an event on it.
    123 Connection.prototype._writeControlFrame = function _writeControlFrame(frame) {
    124  if ((frame.type === 'SETTINGS') || (frame.type === 'PING') ||
    125      (frame.type === 'GOAWAY') || (frame.type === 'WINDOW_UPDATE') ||
    126      (frame.type === 'ALTSVC') || (frame.type == 'ORIGIN')) {
    127    this._log.debug({ frame: frame }, 'Receiving connection level frame');
    128    this.emit(frame.type, frame);
    129  } else {
    130    this._log.error({ frame: frame }, 'Invalid connection level frame');
    131    this.emit('error', 'PROTOCOL_ERROR');
    132  }
    133 };
    134 
    135 // Methods to manage the stream slot pool:
    136 Connection.prototype._updateStreamLimit = function _updateStreamLimit(newStreamLimit) {
    137  var wakeup = (this._streamSlotsFree === 0) && (newStreamLimit > this._streamLimit);
    138  this._streamSlotsFree += newStreamLimit - this._streamLimit;
    139  this._streamLimit = newStreamLimit;
    140  if (wakeup) {
    141    this.emit('wakeup');
    142  }
    143 };
    144 
    145 Connection.prototype._changeStreamCount = function _changeStreamCount(change) {
    146  if (change) {
    147    this._log.trace({ free: this._streamSlotsFree, change: change },
    148                    'Changing active stream count.');
    149    var wakeup = (this._streamSlotsFree === 0) && (change < 0);
    150    this._streamSlotsFree -= change;
    151    if (wakeup) {
    152      this.emit('wakeup');
    153    }
    154  }
    155 };
    156 
    157 // Creating a new *inbound or outbound* stream with the given `id` (which is undefined in case of
    158 // an outbound stream) consists of three steps:
    159 //
    160 // 1. var stream = new Stream(this._log, this);
    161 // 2. this._allocateId(stream, id);
    162 // 2. this._allocatePriority(stream);
    163 
    164 // Allocating an ID to a stream
    165 Connection.prototype._allocateId = function _allocateId(stream, id) {
    166  // * initiated stream without definite ID
    167  if (id === undefined) {
    168    id = this._nextStreamId;
    169    this._nextStreamId += 2;
    170  }
    171 
    172  // * incoming stream with a legitim ID (larger than any previous and different parity than ours)
    173  else if ((id > this._lastIncomingStream) && ((id - this._nextStreamId) % 2 !== 0)) {
    174    this._lastIncomingStream = id;
    175  }
    176 
    177  // * incoming stream with invalid ID
    178  else {
    179    this._log.error({ stream_id: id, lastIncomingStream: this._lastIncomingStream },
    180                    'Invalid incoming stream ID.');
    181    this.emit('error', 'PROTOCOL_ERROR');
    182    return undefined;
    183  }
    184 
    185  assert(!(id in this._streamIds));
    186 
    187  // * adding to `this._streamIds`
    188  this._log.trace({ s: stream, stream_id: id }, 'Allocating ID for stream.');
    189  this._streamIds[id] = stream;
    190  stream.id = id;
    191  this.emit('new_stream', stream, id);
    192 
    193  // * forwarding connection errors from streams
    194  stream.on('connectionError', this.emit.bind(this, 'error'));
    195 
    196  return id;
    197 };
    198 
    199 // Allocating a priority to a stream, and managing priority changes
    200 Connection.prototype._allocatePriority = function _allocatePriority(stream) {
    201  this._log.trace({ s: stream }, 'Allocating priority for stream.');
    202  this._insert(stream, stream._priority);
    203  stream.on('priority', this._reprioritize.bind(this, stream));
    204  stream.upstream.on('readable', this.emit.bind(this, 'wakeup'));
    205  this.emit('wakeup');
    206 };
    207 
    208 Connection.prototype._insert = function _insert(stream, priority) {
    209  if (priority in this._streamPriorities) {
    210    this._streamPriorities[priority].push(stream);
    211  } else {
    212    this._streamPriorities[priority] = [stream];
    213  }
    214 };
    215 
    216 Connection.prototype._reprioritize = function _reprioritize(stream, priority) {
    217  var bucket = this._streamPriorities[stream._priority];
    218  var index = bucket.indexOf(stream);
    219  assert(index !== -1);
    220  bucket.splice(index, 1);
    221  if (bucket.length === 0) {
    222    delete this._streamPriorities[stream._priority];
    223  }
    224 
    225  this._insert(stream, priority);
    226 };
    227 
    228 // Creating an *inbound* stream with the given ID. It is called when there's an incoming frame to
    229 // a previously nonexistent stream.
    230 Connection.prototype._createIncomingStream = function _createIncomingStream(id) {
    231  this._log.debug({ stream_id: id }, 'New incoming stream.');
    232 
    233  var stream = new Stream(this._log, this);
    234  this._allocateId(stream, id);
    235  this._allocatePriority(stream);
    236  this.emit('stream', stream, id);
    237 
    238  return stream;
    239 };
    240 
    241 // Creating an *outbound* stream
    242 Connection.prototype.createStream = function createStream() {
    243  this._log.trace('Creating new outbound stream.');
    244 
    245  // * Receiving is enabled immediately, and an ID gets assigned to the stream
    246  var stream = new Stream(this._log, this);
    247  this._allocatePriority(stream);
    248 
    249  return stream;
    250 };
    251 
    252 // Multiplexing
    253 // ------------
    254 
    255 Connection.prototype._initializeMultiplexing = function _initializeMultiplexing() {
    256  this.on('window_update', this.emit.bind(this, 'wakeup'));
    257  this._sendScheduled = false;
    258  this._firstFrameReceived = false;
    259 };
    260 
    261 // The `_send` method is a virtual method of the [Flow class](flow.html) that has to be implemented
    262 // by child classes. It reads frames from streams and pushes them to the output buffer.
    263 Connection.prototype._send = function _send(immediate) {
    264  // * Do not do anything if the connection is already closed
    265  if (this._closed) {
    266    return;
    267  }
    268 
    269  // * Collapsing multiple calls in a turn into a single deferred call
    270  if (immediate) {
    271    this._sendScheduled = false;
    272  } else {
    273    if (!this._sendScheduled) {
    274      this._sendScheduled = true;
    275      setImmediate(this._send.bind(this, true));
    276    }
    277    return;
    278  }
    279 
    280  this._log.trace('Starting forwarding frames from streams.');
    281 
    282  // * Looping through priority `bucket`s in priority order.
    283 priority_loop:
    284  for (var priority in this._streamPriorities) {
    285    var bucket = this._streamPriorities[priority];
    286    var nextBucket = [];
    287 
    288    // * Forwarding frames from buckets with round-robin scheduling.
    289    //   1. pulling out frame
    290    //   2. if there's no frame, skip this stream
    291    //   3. if forwarding this frame would make `streamCount` greater than `streamLimit`, skip
    292    //      this stream
    293    //   4. adding stream to the bucket of the next round
    294    //   5. assigning an ID to the frame (allocating an ID to the stream if there isn't already)
    295    //   6. if forwarding a PUSH_PROMISE, allocate ID to the promised stream
    296    //   7. forwarding the frame, changing `streamCount` as appropriate
    297    //   8. stepping to the next stream if there's still more frame needed in the output buffer
    298    //   9. switching to the bucket of the next round
    299    while (bucket.length > 0) {
    300      for (var index = 0; index < bucket.length; index++) {
    301        var stream = bucket[index];
    302        var frame = stream.upstream.read((this._window > 0) ? this._window : -1);
    303 
    304        if (!frame) {
    305          continue;
    306        } else if (frame.count_change > this._streamSlotsFree) {
    307          stream.upstream.unshift(frame);
    308          continue;
    309        }
    310 
    311        nextBucket.push(stream);
    312 
    313        if (frame.stream === undefined) {
    314          frame.stream = stream.id || this._allocateId(stream);
    315        }
    316 
    317        if (frame.type === 'PUSH_PROMISE') {
    318          this._allocatePriority(frame.promised_stream);
    319          frame.promised_stream = this._allocateId(frame.promised_stream);
    320        }
    321 
    322        this._log.trace({ s: stream, frame: frame }, 'Forwarding outgoing frame');
    323        var moreNeeded = this.push(frame);
    324        this._changeStreamCount(frame.count_change);
    325 
    326        assert(moreNeeded !== null); // The frame shouldn't be unforwarded
    327        if (moreNeeded === false) {
    328          break priority_loop;
    329        }
    330      }
    331 
    332      bucket = nextBucket;
    333      nextBucket = [];
    334    }
    335  }
    336 
    337  // * if we couldn't forward any frame, then sleep until window update, or some other wakeup event
    338  if (moreNeeded === undefined) {
    339    this.once('wakeup', this._send.bind(this));
    340  }
    341 
    342  this._log.trace({ moreNeeded: moreNeeded }, 'Stopping forwarding frames from streams.');
    343 };
    344 
    345 // The `_receive` method is another virtual method of the [Flow class](flow.html) that has to be
    346 // implemented by child classes. It forwards the given frame to the appropriate stream:
    347 Connection.prototype._receive = function _receive(frame, done) {
    348  this._log.trace({ frame: frame }, 'Forwarding incoming frame');
    349 
    350  // * first frame needs to be checked by the `_onFirstFrameReceived` method
    351  if (!this._firstFrameReceived) {
    352    this._firstFrameReceived = true;
    353    this._onFirstFrameReceived(frame);
    354  }
    355 
    356  // Do some sanity checking here before we create a stream
    357  if ((frame.type == 'SETTINGS' ||
    358       frame.type == 'PING' ||
    359       frame.type == 'GOAWAY') &&
    360      frame.stream != 0) {
    361    // Got connection-level frame on a stream - EEP!
    362    this.close('PROTOCOL_ERROR');
    363    return;
    364  } else if ((frame.type == 'DATA' ||
    365              frame.type == 'HEADERS' ||
    366              frame.type == 'PRIORITY' ||
    367              frame.type == 'RST_STREAM' ||
    368              frame.type == 'PUSH_PROMISE' ||
    369              frame.type == 'CONTINUATION') &&
    370             frame.stream == 0) {
    371    // Got stream-level frame on connection - EEP!
    372    this.close('PROTOCOL_ERROR');
    373    return;
    374  }
    375  // WINDOW_UPDATE can be on either stream or connection
    376 
    377  // * gets the appropriate stream from the stream registry
    378  var stream = this._streamIds[frame.stream];
    379 
    380  // * or creates one if it's not in `this.streams`
    381  if (!stream) {
    382    stream = this._createIncomingStream(frame.stream);
    383  }
    384 
    385  // * in case of PUSH_PROMISE, replaces the promised stream id with a new incoming stream
    386  if (frame.type === 'PUSH_PROMISE') {
    387    frame.promised_stream = this._createIncomingStream(frame.promised_stream);
    388  }
    389 
    390  frame.count_change = this._changeStreamCount.bind(this);
    391 
    392  // * and writes it to the `stream`'s `upstream`
    393  stream.upstream.write(frame);
    394 
    395  done();
    396 };
    397 
    398 // Settings management
    399 // -------------------
    400 
    401 var defaultSettings = {
    402 };
    403 
    404 // Settings management initialization:
    405 Connection.prototype._initializeSettingsManagement = function _initializeSettingsManagement(settings) {
    406  // * Setting up the callback queue for setting acknowledgements
    407  this._settingsAckCallbacks = [];
    408 
    409  // * Sending the initial settings.
    410  this._log.debug({ settings: settings },
    411                  'Sending the first SETTINGS frame as part of the connection header.');
    412  this.set(settings || defaultSettings);
    413 
    414  // * Forwarding SETTINGS frames to the `_receiveSettings` method
    415  this.on('SETTINGS', this._receiveSettings);
    416  this.on('RECEIVING_SETTINGS_MAX_FRAME_SIZE', this._sanityCheckMaxFrameSize);
    417 };
    418 
    419 // * Checking that the first frame the other endpoint sends is SETTINGS
    420 Connection.prototype._onFirstFrameReceived = function _onFirstFrameReceived(frame) {
    421  if ((frame.stream === 0) && (frame.type === 'SETTINGS')) {
    422    this._log.debug('Receiving the first SETTINGS frame as part of the connection header.');
    423  } else {
    424    this._log.fatal({ frame: frame }, 'Invalid connection header: first frame is not SETTINGS.');
    425    this.emit('error', 'PROTOCOL_ERROR');
    426  }
    427 };
    428 
    429 // Handling of incoming SETTINGS frames.
    430 Connection.prototype._receiveSettings = function _receiveSettings(frame) {
    431  // * If it's an ACK, call the appropriate callback
    432  if (frame.flags.ACK) {
    433    var callback = this._settingsAckCallbacks.shift();
    434    if (callback) {
    435      callback();
    436    }
    437  }
    438 
    439  // * If it's a setting change request, then send an ACK and change the appropriate settings
    440  else {
    441    if (!this._closed) {
    442      this.push({
    443        type: 'SETTINGS',
    444        flags: { ACK: true },
    445        stream: 0,
    446        settings: {}
    447      });
    448    }
    449    for (var name in frame.settings) {
    450      this.emit('RECEIVING_' + name, frame.settings[name]);
    451    }
    452  }
    453 };
    454 
    455 Connection.prototype._sanityCheckMaxFrameSize = function _sanityCheckMaxFrameSize(value) {
    456  if ((value < 0x4000) || (value >= 0x01000000)) {
    457    this._log.fatal('Received invalid value for max frame size: ' + value);
    458    this.emit('error');
    459  }
    460 };
    461 
    462 // Changing one or more settings value and sending out a SETTINGS frame
    463 Connection.prototype.set = function set(settings, callback) {
    464  // * Calling the callback and emitting event when the change is acknowledges
    465  var self = this;
    466  this._settingsAckCallbacks.push(function() {
    467    for (var name in settings) {
    468      self.emit('ACKNOWLEDGED_' + name, settings[name]);
    469    }
    470    if (callback) {
    471      callback();
    472    }
    473  });
    474 
    475  // * Sending out the SETTINGS frame
    476  this.push({
    477    type: 'SETTINGS',
    478    flags: { ACK: false },
    479    stream: 0,
    480    settings: settings
    481  });
    482  for (var name in settings) {
    483    this.emit('SENDING_' + name, settings[name]);
    484  }
    485 };
    486 
    487 // Lifecycle management
    488 // --------------------
    489 
    490 // The main responsibilities of lifecycle management code:
    491 //
    492 // * keeping the connection alive by
    493 //   * sending PINGs when the connection is idle
    494 //   * answering PINGs
    495 // * ending the connection
    496 
    497 Connection.prototype._initializeLifecycleManagement = function _initializeLifecycleManagement() {
    498  this._pings = {};
    499  this.on('PING', this._receivePing);
    500  this.on('GOAWAY', this._receiveGoaway);
    501  this._closed = false;
    502 };
    503 
    504 // Generating a string of length 16 with random hexadecimal digits
    505 Connection.prototype._generatePingId = function _generatePingId() {
    506  do {
    507    var id = '';
    508    for (var i = 0; i < 16; i++) {
    509      id += Math.floor(Math.random()*16).toString(16);
    510    }
    511  } while(id in this._pings);
    512  return id;
    513 };
    514 
    515 // Sending a ping and calling `callback` when the answer arrives
    516 Connection.prototype.ping = function ping(callback) {
    517  var id = this._generatePingId();
    518  var data = Buffer.from(id, 'hex');
    519  this._pings[id] = callback;
    520 
    521  this._log.debug({ data: data }, 'Sending PING.');
    522  this.push({
    523    type: 'PING',
    524    flags: {
    525      ACK: false
    526    },
    527    stream: 0,
    528    data: data
    529  });
    530 };
    531 
    532 // Answering pings
    533 Connection.prototype._receivePing = function _receivePing(frame) {
    534  if (frame.flags.ACK) {
    535    var id = frame.data.toString('hex');
    536    if (id in this._pings) {
    537      this._log.debug({ data: frame.data }, 'Receiving answer for a PING.');
    538      var callback = this._pings[id];
    539      if (callback) {
    540        callback();
    541      }
    542      delete this._pings[id];
    543    } else {
    544      this._log.warn({ data: frame.data }, 'Unsolicited PING answer.');
    545    }
    546 
    547  } else {
    548    this._log.debug({ data: frame.data }, 'Answering PING.');
    549    this.push({
    550      type: 'PING',
    551      flags: {
    552        ACK: true
    553      },
    554      stream: 0,
    555      data: frame.data
    556    });
    557  }
    558 };
    559 
    560 Connection.prototype.originFrame = function originFrame(originList) {
    561  this._log.debug(originList, 'emitting origin frame');
    562 
    563  this.push({
    564    type: 'ORIGIN',
    565    flags: {},
    566    stream: 0,
    567    originList : originList,
    568  });
    569 };
    570 
    571 // Terminating the connection
    572 Connection.prototype.close = function close(error) {
    573  if (this._closed) {
    574    this._log.warn('Trying to close an already closed connection');
    575    return;
    576  }
    577 
    578  this._log.debug({ error: error }, 'Closing the connection');
    579  this.push({
    580    type: 'GOAWAY',
    581    flags: {},
    582    stream: 0,
    583    last_stream: this._lastIncomingStream,
    584    error: error || 'NO_ERROR'
    585  });
    586  this.push(null);
    587  this._closed = true;
    588 };
    589 
    590 Connection.prototype._receiveGoaway = function _receiveGoaway(frame) {
    591  this._log.debug({ error: frame.error }, 'Other end closed the connection');
    592  this.push(null);
    593  this._closed = true;
    594  if (frame.error !== 'NO_ERROR') {
    595    this.emit('peerError', frame.error);
    596  }
    597 };
    598 
    599 // Flow control
    600 // ------------
    601 
    602 Connection.prototype._initializeFlowControl = function _initializeFlowControl() {
    603  // Handling of initial window size of individual streams.
    604  this._initialStreamWindowSize = INITIAL_STREAM_WINDOW_SIZE;
    605  this.on('new_stream', function(stream) {
    606    stream.upstream.setInitialWindow(this._initialStreamWindowSize);
    607  });
    608  this.on('RECEIVING_SETTINGS_INITIAL_WINDOW_SIZE', this._setInitialStreamWindowSize);
    609  this._streamIds[0].upstream.setInitialWindow = function noop() {};
    610 };
    611 
    612 // The initial connection flow control window is 65535 bytes.
    613 var INITIAL_STREAM_WINDOW_SIZE = 65535;
    614 
    615 // A SETTINGS frame can alter the initial flow control window size for all current streams. When the
    616 // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the window size of all
    617 // stream by calling the `setInitialStreamWindowSize` method. The window size has to be modified by
    618 // the difference between the new value and the old value.
    619 Connection.prototype._setInitialStreamWindowSize = function _setInitialStreamWindowSize(size) {
    620  if ((this._initialStreamWindowSize === Infinity) && (size !== Infinity)) {
    621    this._log.error('Trying to manipulate initial flow control window size after flow control was turned off.');
    622    this.emit('error', 'FLOW_CONTROL_ERROR');
    623  } else {
    624    this._log.debug({ size: size }, 'Changing stream initial window size.');
    625    this._initialStreamWindowSize = size;
    626    this._streamIds.forEach(function(stream) {
    627      stream.upstream.setInitialWindow(size);
    628    });
    629  }
    630 };