tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

endpoint.js (9633B)


      1 var assert = require('assert');
      2 
      3 var Serializer   = require('./framer').Serializer;
      4 var Deserializer = require('./framer').Deserializer;
      5 var Compressor   = require('./compressor').Compressor;
      6 var Decompressor = require('./compressor').Decompressor;
      7 var Connection   = require('./connection').Connection;
      8 var Duplex       = require('stream').Duplex;
      9 var Transform    = require('stream').Transform;
     10 
     11 exports.Endpoint = Endpoint;
     12 
     13 // The Endpoint class
     14 // ==================
     15 
     16 // Public API
     17 // ----------
     18 
     19 // - **new Endpoint(log, role, settings, filters)**: create a new Endpoint.
     20 //
     21 //   - `log`: bunyan logger of the parent
     22 //   - `role`: 'CLIENT' or 'SERVER'
     23 //   - `settings`: initial HTTP/2 settings
     24 //   - `filters`: a map of functions that filter the traffic between components (for debugging or
     25 //     intentional failure injection).
     26 //
     27 //     Filter functions get three arguments:
     28 //     1. `frame`: the current frame
     29 //     2. `forward(frame)`: function that can be used to forward a frame to the next component
     30 //     3. `done()`: callback to signal the end of the filter process
     31 //
     32 //     Valid filter names and their position in the stack:
     33 //     - `beforeSerialization`: after compression, before serialization
     34 //     - `beforeCompression`: after multiplexing, before compression
     35 //     - `afterDeserialization`: after deserialization, before decompression
     36 //     - `afterDecompression`: after decompression, before multiplexing
     37 //
     38 // * **Event: 'stream' (Stream)**: 'stream' event forwarded from the underlying Connection
     39 //
     40 // * **Event: 'error' (type)**: signals an error
     41 //
     42 // * **createStream(): Stream**: initiate a new stream (forwarded to the underlying Connection)
     43 //
     44 // * **close([error])**: close the connection with an error code
     45 
     46 // Constructor
     47 // -----------
     48 
     49 // The process of initialization:
     50 function Endpoint(log, role, settings, filters) {
     51  Duplex.call(this);
     52 
     53  // * Initializing logging infrastructure
     54  this._log = log.child({ component: 'endpoint', e: this });
     55 
     56  // * First part of the handshake process: sending and receiving the client connection header
     57  //   prelude.
     58  assert((role === 'CLIENT') || role === 'SERVER');
     59  if (role === 'CLIENT') {
     60    this._writePrelude();
     61  } else {
     62    this._readPrelude();
     63  }
     64 
     65  // * Initialization of component. This includes the second part of the handshake process:
     66  //   sending the first SETTINGS frame. This is done by the connection class right after
     67  //   initialization.
     68  this._initializeDataFlow(role, settings, filters || {});
     69 
     70  // * Initialization of management code.
     71  this._initializeManagement();
     72 
     73  // * Initializing error handling.
     74  this._initializeErrorHandling();
     75 }
     76 Endpoint.prototype = Object.create(Duplex.prototype, { constructor: { value: Endpoint } });
     77 
     78 // Handshake
     79 // ---------
     80 
     81 var CLIENT_PRELUDE = Buffer.from('PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n');
     82 
     83 // Writing the client header is simple and synchronous.
     84 Endpoint.prototype._writePrelude = function _writePrelude() {
     85  this._log.debug('Sending the client connection header prelude.');
     86  this.push(CLIENT_PRELUDE);
     87 };
     88 
     89 // The asynchronous process of reading the client header:
     90 Endpoint.prototype._readPrelude = function _readPrelude() {
     91  // * progress in the header is tracker using a `cursor`
     92  var cursor = 0;
     93 
     94  // * `_write` is temporarily replaced by the comparator function
     95  this._write = function _temporalWrite(chunk, encoding, done) {
     96    // * which compares the stored header with the current `chunk` byte by byte and emits the
     97    //   'error' event if there's a byte that doesn't match
     98    var offset = cursor;
     99    while(cursor < CLIENT_PRELUDE.length && (cursor - offset) < chunk.length) {
    100      if (CLIENT_PRELUDE[cursor] !== chunk[cursor - offset]) {
    101        this._log.fatal({ cursor: cursor, offset: offset, chunk: chunk },
    102                        'Client connection header prelude does not match.');
    103        this._error('handshake', 'PROTOCOL_ERROR');
    104        return;
    105      }
    106      cursor += 1;
    107    }
    108 
    109    // * if the whole header is over, and there were no error then restore the original `_write`
    110    //   and call it with the remaining part of the current chunk
    111    if (cursor === CLIENT_PRELUDE.length) {
    112      this._log.debug('Successfully received the client connection header prelude.');
    113      delete this._write;
    114      chunk = chunk.slice(cursor - offset);
    115      this._write(chunk, encoding, done);
    116    }
    117  };
    118 };
    119 
    120 // Data flow
    121 // ---------
    122 
    123 //     +---------------------------------------------+
    124 //     |                                             |
    125 //     |   +-------------------------------------+   |
    126 //     |   | +---------+ +---------+ +---------+ |   |
    127 //     |   | | stream1 | | stream2 | |   ...   | |   |
    128 //     |   | +---------+ +---------+ +---------+ |   |
    129 //     |   |             connection              |   |
    130 //     |   +-------------------------------------+   |
    131 //     |             |                 ^             |
    132 //     |        pipe |                 | pipe        |
    133 //     |             v                 |             |
    134 //     |   +------------------+------------------+   |
    135 //     |   |    compressor    |   decompressor   |   |
    136 //     |   +------------------+------------------+   |
    137 //     |             |                 ^             |
    138 //     |        pipe |                 | pipe        |
    139 //     |             v                 |             |
    140 //     |   +------------------+------------------+   |
    141 //     |   |    serializer    |   deserializer   |   |
    142 //     |   +------------------+------------------+   |
    143 //     |             |                 ^             |
    144 //     |     _read() |                 | _write()    |
    145 //     |             v                 |             |
    146 //     |      +------------+     +-----------+       |
    147 //     |      |output queue|     |input queue|       |
    148 //     +------+------------+-----+-----------+-------+
    149 //                   |                 ^
    150 //            read() |                 | write()
    151 //                   v                 |
    152 
    153 function createTransformStream(filter) {
    154  var transform = new Transform({ objectMode: true });
    155  var push = transform.push.bind(transform);
    156  transform._transform = function(frame, encoding, done) {
    157    filter(frame, push, done);
    158  };
    159  return transform;
    160 }
    161 
    162 function pipeAndFilter(stream1, stream2, filter) {
    163  if (filter) {
    164    stream1.pipe(createTransformStream(filter)).pipe(stream2);
    165  } else {
    166    stream1.pipe(stream2);
    167  }
    168 }
    169 
    170 Endpoint.prototype._initializeDataFlow = function _initializeDataFlow(role, settings, filters) {
    171  var firstStreamId, compressorRole, decompressorRole;
    172  if (role === 'CLIENT') {
    173    firstStreamId = 1;
    174    compressorRole = 'REQUEST';
    175    decompressorRole = 'RESPONSE';
    176  } else {
    177    firstStreamId = 2;
    178    compressorRole = 'RESPONSE';
    179    decompressorRole = 'REQUEST';
    180  }
    181 
    182  this._serializer   = new Serializer(this._log);
    183  this._deserializer = new Deserializer(this._log);
    184  this._compressor   = new Compressor(this._log, compressorRole);
    185  this._decompressor = new Decompressor(this._log, decompressorRole);
    186  this._connection   = new Connection(this._log, firstStreamId, settings);
    187 
    188  pipeAndFilter(this._connection, this._compressor, filters.beforeCompression);
    189  pipeAndFilter(this._compressor, this._serializer, filters.beforeSerialization);
    190  pipeAndFilter(this._deserializer, this._decompressor, filters.afterDeserialization);
    191  pipeAndFilter(this._decompressor, this._connection, filters.afterDecompression);
    192 
    193  this._connection.on('ACKNOWLEDGED_SETTINGS_HEADER_TABLE_SIZE',
    194                      this._decompressor.setTableSizeLimit.bind(this._decompressor));
    195  this._connection.on('RECEIVING_SETTINGS_HEADER_TABLE_SIZE',
    196                      this._compressor.setTableSizeLimit.bind(this._compressor));
    197 };
    198 
    199 var noread = {};
    200 Endpoint.prototype._read = function _read() {
    201  this._readableState.sync = true;
    202  var moreNeeded = noread, chunk;
    203  while (moreNeeded && (chunk = this._serializer.read())) {
    204    moreNeeded = this.push(chunk);
    205  }
    206  if (moreNeeded === noread) {
    207    this._serializer.once('readable', this._read.bind(this));
    208  }
    209  this._readableState.sync = false;
    210 };
    211 
    212 Endpoint.prototype._write = function _write(chunk, encoding, done) {
    213  this._deserializer.write(chunk, encoding, done);
    214 };
    215 
    216 // Management
    217 // --------------
    218 
    219 Endpoint.prototype._initializeManagement = function _initializeManagement() {
    220  this._connection.on('stream', this.emit.bind(this, 'stream'));
    221 };
    222 
    223 Endpoint.prototype.createStream = function createStream() {
    224  return this._connection.createStream();
    225 };
    226 
    227 // Error handling
    228 // --------------
    229 
    230 Endpoint.prototype._initializeErrorHandling = function _initializeErrorHandling() {
    231  this._serializer.on('error', this._error.bind(this, 'serializer'));
    232  this._deserializer.on('error', this._error.bind(this, 'deserializer'));
    233  this._compressor.on('error', this._error.bind(this, 'compressor'));
    234  this._decompressor.on('error', this._error.bind(this, 'decompressor'));
    235  this._connection.on('error', this._error.bind(this, 'connection'));
    236 
    237  this._connection.on('peerError', this.emit.bind(this, 'peerError'));
    238 };
    239 
    240 Endpoint.prototype._error = function _error(component, error) {
    241  this._log.fatal({ source: component, message: error }, 'Fatal error, closing connection');
    242  this.close(error);
    243  setImmediate(this.emit.bind(this, 'error', error));
    244 };
    245 
    246 Endpoint.prototype.close = function close(error) {
    247  this._connection.close(error);
    248 };
    249 
    250 // Bunyan serializers
    251 // ------------------
    252 
    253 exports.serializers = {};
    254 
    255 var nextId = 0;
    256 exports.serializers.e = function(endpoint) {
    257  if (!('id' in endpoint)) {
    258    endpoint.id = nextId;
    259    nextId += 1;
    260  }
    261  return endpoint.id;
    262 };