flow.js (13762B)
1 var assert = require('assert'); 2 3 // The Flow class 4 // ============== 5 6 // Flow is a [Duplex stream][1] subclass which implements HTTP/2 flow control. It is designed to be 7 // subclassed by [Connection](connection.html) and the `upstream` component of [Stream](stream.html). 8 // [1]: https://nodejs.org/api/stream.html#stream_class_stream_duplex 9 10 var Duplex = require('stream').Duplex; 11 12 exports.Flow = Flow; 13 14 // Public API 15 // ---------- 16 17 // * **Event: 'error' (type)**: signals an error 18 // 19 // * **setInitialWindow(size)**: the initial flow control window size can be changed *any time* 20 // ([as described in the standard][1]) using this method 21 // 22 // [1]: https://tools.ietf.org/html/rfc7540#section-6.9.2 23 24 // API for child classes 25 // --------------------- 26 27 // * **new Flow([flowControlId])**: creating a new flow that will listen for WINDOW_UPDATES frames 28 // with the given `flowControlId` (or every update frame if not given) 29 // 30 // * **_send()**: called when more frames should be pushed. The child class is expected to override 31 // this (instead of the `_read` method of the Duplex class). 32 // 33 // * **_receive(frame, readyCallback)**: called when there's an incoming frame. The child class is 34 // expected to override this (instead of the `_write` method of the Duplex class). 35 // 36 // * **push(frame): bool**: schedules `frame` for sending. 37 // 38 // Returns `true` if it needs more frames in the output queue, `false` if the output queue is 39 // full, and `null` if did not push the frame into the output queue (instead, it pushed it into 40 // the flow control queue). 41 // 42 // * **read(limit): frame**: like the regular `read`, but the 'flow control size' (0 for non-DATA 43 // frames, length of the payload for DATA frames) of the returned frame will be under `limit`. 44 // Small exception: pass -1 as `limit` if the max. flow control size is 0. `read(0)` means the 45 // same thing as [in the original API](https://nodejs.org/api/stream.html#stream_stream_read_0). 46 // 47 // * **getLastQueuedFrame(): frame**: returns the last frame in output buffers 48 // 49 // * **_log**: the Flow class uses the `_log` object of the parent 50 51 // Constructor 52 // ----------- 53 54 // When a HTTP/2.0 connection is first established, new streams are created with an initial flow 55 // control window size of 65535 bytes. 56 var INITIAL_WINDOW_SIZE = 65535; 57 58 // `flowControlId` is needed if only specific WINDOW_UPDATEs should be watched. 59 function Flow(flowControlId) { 60 Duplex.call(this, { objectMode: true }); 61 62 this._window = this._initialWindow = INITIAL_WINDOW_SIZE; 63 this._flowControlId = flowControlId; 64 this._queue = []; 65 this._ended = false; 66 this._received = 0; 67 } 68 Flow.prototype = Object.create(Duplex.prototype, { constructor: { value: Flow } }); 69 70 // Incoming frames 71 // --------------- 72 73 // `_receive` is called when there's an incoming frame. 74 Flow.prototype._receive = function _receive(frame, callback) { 75 throw new Error('The _receive(frame, callback) method has to be overridden by the child class!'); 76 }; 77 78 // `_receive` is called by `_write` which in turn is [called by Duplex][1] when someone `write()`s 79 // to the flow. It emits the 'receiving' event and notifies the window size tracking code if the 80 // incoming frame is a WINDOW_UPDATE. 81 // [1]: https://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback_1 82 Flow.prototype._write = function _write(frame, encoding, callback) { 83 var sentToUs = (this._flowControlId === undefined) || (frame.stream === this._flowControlId); 84 85 if (sentToUs && (frame.flags.END_STREAM || (frame.type === 'RST_STREAM'))) { 86 this._ended = true; 87 } 88 89 if ((frame.type === 'DATA') && (frame.data.length > 0)) { 90 this._receive(frame, function() { 91 this._received += frame.data.length; 92 if (!this._restoreWindowTimer) { 93 this._restoreWindowTimer = setImmediate(this._restoreWindow.bind(this)); 94 } 95 callback(); 96 }.bind(this)); 97 } 98 99 else { 100 this._receive(frame, callback); 101 } 102 103 if (sentToUs && (frame.type === 'WINDOW_UPDATE')) { 104 this._updateWindow(frame); 105 } 106 }; 107 108 // `_restoreWindow` basically acknowledges the DATA frames received since it's last call. It sends 109 // a WINDOW_UPDATE that restores the flow control window of the remote end. 110 // TODO: push this directly into the output queue. No need to wait for DATA frames in the queue. 111 Flow.prototype._restoreWindow = function _restoreWindow() { 112 delete this._restoreWindowTimer; 113 if (!this._ended && (this._received > 0)) { 114 this.push({ 115 type: 'WINDOW_UPDATE', 116 flags: {}, 117 stream: this._flowControlId, 118 window_size: this._received 119 }); 120 this._received = 0; 121 } 122 }; 123 124 // Outgoing frames - sending procedure 125 // ----------------------------------- 126 127 // flow 128 // +-------------------------------------------------+ 129 // | | 130 // +--------+ +---------+ | 131 // read() | output | _read() | flow | _send() | 132 // <----------| |<----------| control |<------------- | 133 // | buffer | | buffer | | 134 // +--------+ +---------+ | 135 // | input | | 136 // ---------->| |-----------------------------------> | 137 // write() | buffer | _write() _receive() | 138 // +--------+ | 139 // | | 140 // +-------------------------------------------------+ 141 142 // `_send` is called when more frames should be pushed to the output buffer. 143 Flow.prototype._send = function _send() { 144 throw new Error('The _send() method has to be overridden by the child class!'); 145 }; 146 147 // `_send` is called by `_read` which is in turn [called by Duplex][1] when it wants to have more 148 // items in the output queue. 149 // [1]: https://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback_1 150 Flow.prototype._read = function _read() { 151 // * if the flow control queue is empty, then let the user push more frames 152 if (this._queue.length === 0) { 153 this._send(); 154 } 155 156 // * if there are items in the flow control queue, then let's put them into the output queue (to 157 // the extent it is possible with respect to the window size and output queue feedback) 158 else if (this._window > 0) { 159 this._readableState.sync = true; // to avoid reentrant calls 160 do { 161 var moreNeeded = this._push(this._queue[0]); 162 if (moreNeeded !== null) { 163 this._queue.shift(); 164 } 165 } while (moreNeeded && (this._queue.length > 0)); 166 this._readableState.sync = false; 167 168 assert((!moreNeeded) || // * output queue is full 169 (this._queue.length === 0) || // * flow control queue is empty 170 (!this._window && (this._queue[0].type === 'DATA'))); // * waiting for window update 171 } 172 173 // * otherwise, come back when the flow control window is positive 174 else { 175 if (!this.listenerCount('window_update')) { 176 this.once('window_update', this._read); 177 } 178 } 179 }; 180 181 var MAX_PAYLOAD_SIZE = 4096; // Must not be greater than MAX_HTTP_PAYLOAD_SIZE which is 16383 182 183 // `read(limit)` is like the `read` of the Readable class, but it guarantess that the 'flow control 184 // size' (0 for non-DATA frames, length of the payload for DATA frames) of the returned frame will 185 // be under `limit`. 186 Flow.prototype.read = function read(limit) { 187 if (limit === 0) { 188 return Duplex.prototype.read.call(this, 0); 189 } else if (limit === -1) { 190 limit = 0; 191 } else if ((limit === undefined) || (limit > MAX_PAYLOAD_SIZE)) { 192 limit = MAX_PAYLOAD_SIZE; 193 } 194 195 // * Looking at the first frame in the queue without pulling it out if possible. 196 var frame = this._readableState.buffer[0]; 197 if (!frame && !this._readableState.ended) { 198 this._read(); 199 frame = this._readableState.buffer[0]; 200 } 201 202 if (frame && (frame.type === 'DATA')) { 203 // * If the frame is DATA, then there's two special cases: 204 // * if the limit is 0, we shouldn't return anything 205 // * if the size of the frame is larger than limit, then the frame should be split 206 if (limit === 0) { 207 return Duplex.prototype.read.call(this, 0); 208 } 209 210 else if (frame.data.length > limit) { 211 this._log.trace({ frame: frame, size: frame.data.length, forwardable: limit }, 212 'Splitting out forwardable part of a DATA frame.'); 213 this.unshift({ 214 type: 'DATA', 215 flags: {}, 216 stream: frame.stream, 217 data: frame.data.slice(0, limit) 218 }); 219 frame.data = frame.data.slice(limit); 220 } 221 } 222 223 return Duplex.prototype.read.call(this); 224 }; 225 226 // `_parentPush` pushes the given `frame` into the output queue 227 Flow.prototype._parentPush = function _parentPush(frame) { 228 this._log.trace({ frame: frame }, 'Pushing frame into the output queue'); 229 230 if (frame && (frame.type === 'DATA') && (this._window !== Infinity)) { 231 this._log.trace({ window: this._window, by: frame.data.length }, 232 'Decreasing flow control window size.'); 233 this._window -= frame.data.length; 234 assert(this._window >= 0); 235 } 236 237 return Duplex.prototype.push.call(this, frame); 238 }; 239 240 // `_push(frame)` pushes `frame` into the output queue and decreases the flow control window size. 241 // It is capable of splitting DATA frames into smaller parts, if the window size is not enough to 242 // push the whole frame. The return value is similar to `push` except that it returns `null` if it 243 // did not push the whole frame to the output queue (but maybe it did push part of the frame). 244 Flow.prototype._push = function _push(frame) { 245 var data = frame && (frame.type === 'DATA') && frame.data; 246 var maxFrameLength = (this._window < 16384) ? this._window : 16384; 247 248 if (!data || (data.length <= maxFrameLength)) { 249 return this._parentPush(frame); 250 } 251 252 else if (this._window <= 0) { 253 return null; 254 } 255 256 else { 257 this._log.trace({ frame: frame, size: frame.data.length, forwardable: this._window }, 258 'Splitting out forwardable part of a DATA frame.'); 259 frame.data = data.slice(maxFrameLength); 260 this._parentPush({ 261 type: 'DATA', 262 flags: {}, 263 stream: frame.stream, 264 data: data.slice(0, maxFrameLength) 265 }); 266 return null; 267 } 268 }; 269 270 // Push `frame` into the flow control queue, or if it's empty, then directly into the output queue 271 Flow.prototype.push = function push(frame) { 272 if (frame === null) { 273 this._log.debug('Enqueueing outgoing End Of Stream'); 274 } else { 275 this._log.debug({ frame: frame }, 'Enqueueing outgoing frame'); 276 } 277 278 var moreNeeded = null; 279 if (this._queue.length === 0) { 280 moreNeeded = this._push(frame); 281 } 282 283 if (moreNeeded === null) { 284 this._queue.push(frame); 285 } 286 287 return moreNeeded; 288 }; 289 290 // `getLastQueuedFrame` returns the last frame in output buffers. This is primarily used by the 291 // [Stream](stream.html) class to mark the last frame with END_STREAM flag. 292 Flow.prototype.getLastQueuedFrame = function getLastQueuedFrame() { 293 var readableQueue = this._readableState.buffer; 294 return this._queue[this._queue.length - 1] || readableQueue[readableQueue.length - 1]; 295 }; 296 297 // Outgoing frames - managing the window size 298 // ------------------------------------------ 299 300 // Flow control window size is manipulated using the `_increaseWindow` method. 301 // 302 // * Invoking it with `Infinite` means turning off flow control. Flow control cannot be enabled 303 // again once disabled. Any attempt to re-enable flow control MUST be rejected with a 304 // FLOW_CONTROL_ERROR error code. 305 // * A sender MUST NOT allow a flow control window to exceed 2^31 - 1 bytes. The action taken 306 // depends on it being a stream or the connection itself. 307 308 var WINDOW_SIZE_LIMIT = Math.pow(2, 31) - 1; 309 310 Flow.prototype._increaseWindow = function _increaseWindow(size) { 311 if ((this._window === Infinity) && (size !== Infinity)) { 312 this._log.error('Trying to increase flow control window after flow control was turned off.'); 313 this.emit('error', 'FLOW_CONTROL_ERROR'); 314 } else { 315 this._log.trace({ window: this._window, by: size }, 'Increasing flow control window size.'); 316 this._window += size; 317 if ((this._window !== Infinity) && (this._window > WINDOW_SIZE_LIMIT)) { 318 this._log.error('Flow control window grew too large.'); 319 this.emit('error', 'FLOW_CONTROL_ERROR'); 320 } else { 321 if (size != 0) { 322 this.emit('window_update'); 323 } 324 } 325 } 326 }; 327 328 // The `_updateWindow` method gets called every time there's an incoming WINDOW_UPDATE frame. It 329 // modifies the flow control window: 330 // 331 // * Flow control can be disabled for an individual stream by sending a WINDOW_UPDATE with the 332 // END_FLOW_CONTROL flag set. The payload of a WINDOW_UPDATE frame that has the END_FLOW_CONTROL 333 // flag set is ignored. 334 // * A sender that receives a WINDOW_UPDATE frame updates the corresponding window by the amount 335 // specified in the frame. 336 Flow.prototype._updateWindow = function _updateWindow(frame) { 337 this._increaseWindow(frame.flags.END_FLOW_CONTROL ? Infinity : frame.window_size); 338 }; 339 340 // A SETTINGS frame can alter the initial flow control window size for all current streams. When the 341 // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the size of all stream by 342 // calling the `setInitialWindow` method. The window size has to be modified by the difference 343 // between the new value and the old value. 344 Flow.prototype.setInitialWindow = function setInitialWindow(initialWindow) { 345 this._increaseWindow(initialWindow - this._initialWindow); 346 this._initialWindow = initialWindow; 347 };