tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

flow.js (13762B)


      1 var assert = require('assert');
      2 
      3 // The Flow class
      4 // ==============
      5 
      6 // Flow is a [Duplex stream][1] subclass which implements HTTP/2 flow control. It is designed to be
      7 // subclassed by [Connection](connection.html) and the `upstream` component of [Stream](stream.html).
      8 // [1]: https://nodejs.org/api/stream.html#stream_class_stream_duplex
      9 
     10 var Duplex  = require('stream').Duplex;
     11 
     12 exports.Flow = Flow;
     13 
     14 // Public API
     15 // ----------
     16 
     17 // * **Event: 'error' (type)**: signals an error
     18 //
     19 // * **setInitialWindow(size)**: the initial flow control window size can be changed *any time*
     20 //   ([as described in the standard][1]) using this method
     21 //
     22 // [1]: https://tools.ietf.org/html/rfc7540#section-6.9.2
     23 
     24 // API for child classes
     25 // ---------------------
     26 
     27 // * **new Flow([flowControlId])**: creating a new flow that will listen for WINDOW_UPDATES frames
     28 //   with the given `flowControlId` (or every update frame if not given)
     29 //
     30 // * **_send()**: called when more frames should be pushed. The child class is expected to override
     31 //   this (instead of the `_read` method of the Duplex class).
     32 //
     33 // * **_receive(frame, readyCallback)**: called when there's an incoming frame. The child class is
     34 //   expected to override this (instead of the `_write` method of the Duplex class).
     35 //
     36 // * **push(frame): bool**: schedules `frame` for sending.
     37 //
     38 //   Returns `true` if it needs more frames in the output queue, `false` if the output queue is
     39 //   full, and `null` if did not push the frame into the output queue (instead, it pushed it into
     40 //   the flow control queue).
     41 //
     42 // * **read(limit): frame**: like the regular `read`, but the 'flow control size' (0 for non-DATA
     43 //   frames, length of the payload for DATA frames) of the returned frame will be under `limit`.
     44 //   Small exception: pass -1 as `limit` if the max. flow control size is 0. `read(0)` means the
     45 //   same thing as [in the original API](https://nodejs.org/api/stream.html#stream_stream_read_0).
     46 //
     47 // * **getLastQueuedFrame(): frame**: returns the last frame in output buffers
     48 //
     49 // * **_log**: the Flow class uses the `_log` object of the parent
     50 
     51 // Constructor
     52 // -----------
     53 
     54 // When a HTTP/2.0 connection is first established, new streams are created with an initial flow
     55 // control window size of 65535 bytes.
     56 var INITIAL_WINDOW_SIZE = 65535;
     57 
     58 // `flowControlId` is needed if only specific WINDOW_UPDATEs should be watched.
     59 function Flow(flowControlId) {
     60  Duplex.call(this, { objectMode: true });
     61 
     62  this._window = this._initialWindow = INITIAL_WINDOW_SIZE;
     63  this._flowControlId = flowControlId;
     64  this._queue = [];
     65  this._ended = false;
     66  this._received = 0;
     67 }
     68 Flow.prototype = Object.create(Duplex.prototype, { constructor: { value: Flow } });
     69 
     70 // Incoming frames
     71 // ---------------
     72 
     73 // `_receive` is called when there's an incoming frame.
     74 Flow.prototype._receive = function _receive(frame, callback) {
     75  throw new Error('The _receive(frame, callback) method has to be overridden by the child class!');
     76 };
     77 
     78 // `_receive` is called by `_write` which in turn is [called by Duplex][1] when someone `write()`s
     79 // to the flow. It emits the 'receiving' event and notifies the window size tracking code if the
     80 // incoming frame is a WINDOW_UPDATE.
     81 // [1]: https://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback_1
     82 Flow.prototype._write = function _write(frame, encoding, callback) {
     83  var sentToUs = (this._flowControlId === undefined) || (frame.stream === this._flowControlId);
     84 
     85  if (sentToUs && (frame.flags.END_STREAM || (frame.type === 'RST_STREAM'))) {
     86    this._ended = true;
     87  }
     88 
     89  if ((frame.type === 'DATA') && (frame.data.length > 0)) {
     90    this._receive(frame, function() {
     91      this._received += frame.data.length;
     92      if (!this._restoreWindowTimer) {
     93        this._restoreWindowTimer = setImmediate(this._restoreWindow.bind(this));
     94      }
     95      callback();
     96    }.bind(this));
     97  }
     98 
     99  else {
    100    this._receive(frame, callback);
    101  }
    102 
    103  if (sentToUs && (frame.type === 'WINDOW_UPDATE')) {
    104    this._updateWindow(frame);
    105  }
    106 };
    107 
    108 // `_restoreWindow` basically acknowledges the DATA frames received since it's last call. It sends
    109 // a WINDOW_UPDATE that restores the flow control window of the remote end.
    110 // TODO: push this directly into the output queue. No need to wait for DATA frames in the queue.
    111 Flow.prototype._restoreWindow = function _restoreWindow() {
    112  delete this._restoreWindowTimer;
    113  if (!this._ended && (this._received > 0)) {
    114    this.push({
    115      type: 'WINDOW_UPDATE',
    116      flags: {},
    117      stream: this._flowControlId,
    118      window_size: this._received
    119    });
    120    this._received = 0;
    121  }
    122 };
    123 
    124 // Outgoing frames - sending procedure
    125 // -----------------------------------
    126 
    127 //                                         flow
    128 //                +-------------------------------------------------+
    129 //                |                                                 |
    130 //                +--------+           +---------+                  |
    131 //        read()  | output |  _read()  | flow    |  _send()         |
    132 //     <----------|        |<----------| control |<-------------    |
    133 //                | buffer |           | buffer  |                  |
    134 //                +--------+           +---------+                  |
    135 //                | input  |                                        |
    136 //     ---------->|        |----------------------------------->    |
    137 //       write()  | buffer |  _write()              _receive()      |
    138 //                +--------+                                        |
    139 //                |                                                 |
    140 //                +-------------------------------------------------+
    141 
    142 // `_send` is called when more frames should be pushed to the output buffer.
    143 Flow.prototype._send = function _send() {
    144  throw new Error('The _send() method has to be overridden by the child class!');
    145 };
    146 
    147 // `_send` is called by `_read` which is in turn [called by Duplex][1] when it wants to have more
    148 // items in the output queue.
    149 // [1]: https://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback_1
    150 Flow.prototype._read = function _read() {
    151  // * if the flow control queue is empty, then let the user push more frames
    152  if (this._queue.length === 0) {
    153    this._send();
    154  }
    155 
    156  // * if there are items in the flow control queue, then let's put them into the output queue (to
    157  //   the extent it is possible with respect to the window size and output queue feedback)
    158  else if (this._window > 0) {
    159    this._readableState.sync = true; // to avoid reentrant calls
    160    do {
    161      var moreNeeded = this._push(this._queue[0]);
    162      if (moreNeeded !== null) {
    163        this._queue.shift();
    164      }
    165    } while (moreNeeded && (this._queue.length > 0));
    166    this._readableState.sync = false;
    167 
    168    assert((!moreNeeded) ||                              // * output queue is full
    169           (this._queue.length === 0) ||                         // * flow control queue is empty
    170           (!this._window && (this._queue[0].type === 'DATA'))); // * waiting for window update
    171  }
    172 
    173  // * otherwise, come back when the flow control window is positive
    174  else {
    175    if (!this.listenerCount('window_update')) {
    176      this.once('window_update', this._read);
    177    }
    178  }
    179 };
    180 
    181 var MAX_PAYLOAD_SIZE = 4096; // Must not be greater than MAX_HTTP_PAYLOAD_SIZE which is 16383
    182 
    183 // `read(limit)` is like the `read` of the Readable class, but it guarantess that the 'flow control
    184 // size' (0 for non-DATA frames, length of the payload for DATA frames) of the returned frame will
    185 // be under `limit`.
    186 Flow.prototype.read = function read(limit) {
    187  if (limit === 0) {
    188    return Duplex.prototype.read.call(this, 0);
    189  } else if (limit === -1) {
    190    limit = 0;
    191  } else if ((limit === undefined) || (limit > MAX_PAYLOAD_SIZE)) {
    192    limit = MAX_PAYLOAD_SIZE;
    193  }
    194 
    195  // * Looking at the first frame in the queue without pulling it out if possible.
    196  var frame = this._readableState.buffer[0];
    197  if (!frame && !this._readableState.ended) {
    198    this._read();
    199    frame = this._readableState.buffer[0];
    200  }
    201 
    202  if (frame && (frame.type === 'DATA')) {
    203    // * If the frame is DATA, then there's two special cases:
    204    //   * if the limit is 0, we shouldn't return anything
    205    //   * if the size of the frame is larger than limit, then the frame should be split
    206    if (limit === 0) {
    207      return Duplex.prototype.read.call(this, 0);
    208    }
    209 
    210    else if (frame.data.length > limit) {
    211      this._log.trace({ frame: frame, size: frame.data.length, forwardable: limit },
    212        'Splitting out forwardable part of a DATA frame.');
    213      this.unshift({
    214        type: 'DATA',
    215        flags: {},
    216        stream: frame.stream,
    217        data: frame.data.slice(0, limit)
    218      });
    219      frame.data = frame.data.slice(limit);
    220    }
    221  }
    222 
    223  return Duplex.prototype.read.call(this);
    224 };
    225 
    226 // `_parentPush` pushes the given `frame` into the output queue
    227 Flow.prototype._parentPush = function _parentPush(frame) {
    228  this._log.trace({ frame: frame }, 'Pushing frame into the output queue');
    229 
    230  if (frame && (frame.type === 'DATA') && (this._window !== Infinity)) {
    231    this._log.trace({ window: this._window, by: frame.data.length },
    232                    'Decreasing flow control window size.');
    233    this._window -= frame.data.length;
    234    assert(this._window >= 0);
    235  }
    236 
    237  return Duplex.prototype.push.call(this, frame);
    238 };
    239 
    240 // `_push(frame)` pushes `frame` into the output queue and decreases the flow control window size.
    241 // It is capable of splitting DATA frames into smaller parts, if the window size is not enough to
    242 // push the whole frame. The return value is similar to `push` except that it returns `null` if it
    243 // did not push the whole frame to the output queue (but maybe it did push part of the frame).
    244 Flow.prototype._push = function _push(frame) {
    245  var data = frame && (frame.type === 'DATA') && frame.data;
    246  var maxFrameLength = (this._window < 16384) ? this._window : 16384;
    247 
    248  if (!data || (data.length <= maxFrameLength)) {
    249    return this._parentPush(frame);
    250  }
    251 
    252  else if (this._window <= 0) {
    253    return null;
    254  }
    255 
    256  else {
    257    this._log.trace({ frame: frame, size: frame.data.length, forwardable: this._window },
    258                    'Splitting out forwardable part of a DATA frame.');
    259    frame.data = data.slice(maxFrameLength);
    260    this._parentPush({
    261      type: 'DATA',
    262      flags: {},
    263      stream: frame.stream,
    264      data: data.slice(0, maxFrameLength)
    265    });
    266    return null;
    267  }
    268 };
    269 
    270 // Push `frame` into the flow control queue, or if it's empty, then directly into the output queue
    271 Flow.prototype.push = function push(frame) {
    272  if (frame === null) {
    273    this._log.debug('Enqueueing outgoing End Of Stream');
    274  } else {
    275    this._log.debug({ frame: frame }, 'Enqueueing outgoing frame');
    276  }
    277 
    278  var moreNeeded = null;
    279  if (this._queue.length === 0) {
    280    moreNeeded = this._push(frame);
    281  }
    282 
    283  if (moreNeeded === null) {
    284    this._queue.push(frame);
    285  }
    286 
    287  return moreNeeded;
    288 };
    289 
    290 // `getLastQueuedFrame` returns the last frame in output buffers. This is primarily used by the
    291 // [Stream](stream.html) class to mark the last frame with END_STREAM flag.
    292 Flow.prototype.getLastQueuedFrame = function getLastQueuedFrame() {
    293  var readableQueue = this._readableState.buffer;
    294  return this._queue[this._queue.length - 1] || readableQueue[readableQueue.length - 1];
    295 };
    296 
    297 // Outgoing frames - managing the window size
    298 // ------------------------------------------
    299 
    300 // Flow control window size is manipulated using the `_increaseWindow` method.
    301 //
    302 // * Invoking it with `Infinite` means turning off flow control. Flow control cannot be enabled
    303 //   again once disabled. Any attempt to re-enable flow control MUST be rejected with a
    304 //   FLOW_CONTROL_ERROR error code.
    305 // * A sender MUST NOT allow a flow control window to exceed 2^31 - 1 bytes. The action taken
    306 //   depends on it being a stream or the connection itself.
    307 
    308 var WINDOW_SIZE_LIMIT = Math.pow(2, 31) - 1;
    309 
    310 Flow.prototype._increaseWindow = function _increaseWindow(size) {
    311  if ((this._window === Infinity) && (size !== Infinity)) {
    312    this._log.error('Trying to increase flow control window after flow control was turned off.');
    313    this.emit('error', 'FLOW_CONTROL_ERROR');
    314  } else {
    315    this._log.trace({ window: this._window, by: size }, 'Increasing flow control window size.');
    316    this._window += size;
    317    if ((this._window !== Infinity) && (this._window > WINDOW_SIZE_LIMIT)) {
    318      this._log.error('Flow control window grew too large.');
    319      this.emit('error', 'FLOW_CONTROL_ERROR');
    320    } else {
    321      if (size != 0) {
    322        this.emit('window_update');
    323      }
    324    }
    325  }
    326 };
    327 
    328 // The `_updateWindow` method gets called every time there's an incoming WINDOW_UPDATE frame. It
    329 // modifies the flow control window:
    330 //
    331 // * Flow control can be disabled for an individual stream by sending a WINDOW_UPDATE with the
    332 //   END_FLOW_CONTROL flag set. The payload of a WINDOW_UPDATE frame that has the END_FLOW_CONTROL
    333 //   flag set is ignored.
    334 // * A sender that receives a WINDOW_UPDATE frame updates the corresponding window by the amount
    335 //   specified in the frame.
    336 Flow.prototype._updateWindow = function _updateWindow(frame) {
    337  this._increaseWindow(frame.flags.END_FLOW_CONTROL ? Infinity : frame.window_size);
    338 };
    339 
    340 // A SETTINGS frame can alter the initial flow control window size for all current streams. When the
    341 // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the size of all stream by
    342 // calling the `setInitialWindow` method. The window size has to be modified by the difference
    343 // between the new value and the old value.
    344 Flow.prototype.setInitialWindow = function setInitialWindow(initialWindow) {
    345  this._increaseWindow(initialWindow - this._initialWindow);
    346  this._initialWindow = initialWindow;
    347 };