connection.js (21598B)
1 var assert = require('assert'); 2 3 // The Connection class 4 // ==================== 5 6 // The Connection class manages HTTP/2 connections. Each instance corresponds to one transport 7 // stream (TCP stream). It operates by sending and receiving frames and is implemented as a 8 // [Flow](flow.html) subclass. 9 10 var Flow = require('./flow').Flow; 11 12 exports.Connection = Connection; 13 14 // Public API 15 // ---------- 16 17 // * **new Connection(log, firstStreamId, settings)**: create a new Connection 18 // 19 // * **Event: 'error' (type)**: signals a connection level error made by the other end 20 // 21 // * **Event: 'peerError' (type)**: signals the receipt of a GOAWAY frame that contains an error 22 // code other than NO_ERROR 23 // 24 // * **Event: 'stream' (stream)**: signals that there's an incoming stream 25 // 26 // * **createStream(): stream**: initiate a new stream 27 // 28 // * **set(settings, callback)**: change the value of one or more settings according to the 29 // key-value pairs of `settings`. The callback is called after the peer acknowledged the changes. 30 // 31 // * **ping([callback])**: send a ping and call callback when the answer arrives 32 // 33 // * **close([error])**: close the stream with an error code 34 35 // Constructor 36 // ----------- 37 38 // The main aspects of managing the connection are: 39 function Connection(log, firstStreamId, settings) { 40 // * initializing the base class 41 Flow.call(this, 0); 42 43 // * logging: every method uses the common logger object 44 this._log = log.child({ component: 'connection' }); 45 46 // * stream management 47 this._initializeStreamManagement(firstStreamId); 48 49 // * lifecycle management 50 this._initializeLifecycleManagement(); 51 52 // * flow control 53 this._initializeFlowControl(); 54 55 // * settings management 56 this._initializeSettingsManagement(settings); 57 58 // * multiplexing 59 this._initializeMultiplexing(); 60 } 61 Connection.prototype = Object.create(Flow.prototype, { constructor: { value: Connection } }); 62 63 // Overview 64 // -------- 65 66 // | ^ | ^ 67 // v | v | 68 // +--------------+ +--------------+ 69 // +---| stream1 |---| stream2 |---- .... ---+ 70 // | | +----------+ | | +----------+ | | 71 // | | | stream1. | | | | stream2. | | | 72 // | +-| upstream |-+ +-| upstream |-+ | 73 // | +----------+ +----------+ | 74 // | | ^ | ^ | 75 // | v | v | | 76 // | +-----+-------------+-----+-------- .... | 77 // | ^ | | | | 78 // | | v | | | 79 // | +--------------+ | | | 80 // | | stream0 | | | | 81 // | | connection | | | | 82 // | | management | multiplexing | 83 // | +--------------+ flow control | 84 // | | ^ | 85 // | _read() | | _write() | 86 // | v | | 87 // | +------------+ +-----------+ | 88 // | |output queue| |input queue| | 89 // +----------------+------------+-+-----------+-----------------+ 90 // | ^ 91 // read() | | write() 92 // v | 93 94 // Stream management 95 // ----------------- 96 97 var Stream = require('./stream').Stream; 98 99 // Initialization: 100 Connection.prototype._initializeStreamManagement = function _initializeStreamManagement(firstStreamId) { 101 // * streams are stored in two data structures: 102 // * `_streamIds` is an id -> stream map of the streams that are allowed to receive frames. 103 // * `_streamPriorities` is a priority -> [stream] map of stream that allowed to send frames. 104 this._streamIds = []; 105 this._streamPriorities = []; 106 107 // * The next outbound stream ID and the last inbound stream id 108 this._nextStreamId = firstStreamId; 109 this._lastIncomingStream = 0; 110 111 // * Calling `_writeControlFrame` when there's an incoming stream with 0 as stream ID 112 this._streamIds[0] = { upstream: { write: this._writeControlFrame.bind(this) } }; 113 114 // * By default, the number of concurrent outbound streams is not limited. The `_streamLimit` can 115 // be set by the SETTINGS_MAX_CONCURRENT_STREAMS setting. 116 this._streamSlotsFree = Infinity; 117 this._streamLimit = Infinity; 118 this.on('RECEIVING_SETTINGS_MAX_CONCURRENT_STREAMS', this._updateStreamLimit); 119 }; 120 121 // `_writeControlFrame` is called when there's an incoming frame in the `_control` stream. It 122 // broadcasts the message by creating an event on it. 123 Connection.prototype._writeControlFrame = function _writeControlFrame(frame) { 124 if ((frame.type === 'SETTINGS') || (frame.type === 'PING') || 125 (frame.type === 'GOAWAY') || (frame.type === 'WINDOW_UPDATE') || 126 (frame.type === 'ALTSVC') || (frame.type == 'ORIGIN')) { 127 this._log.debug({ frame: frame }, 'Receiving connection level frame'); 128 this.emit(frame.type, frame); 129 } else { 130 this._log.error({ frame: frame }, 'Invalid connection level frame'); 131 this.emit('error', 'PROTOCOL_ERROR'); 132 } 133 }; 134 135 // Methods to manage the stream slot pool: 136 Connection.prototype._updateStreamLimit = function _updateStreamLimit(newStreamLimit) { 137 var wakeup = (this._streamSlotsFree === 0) && (newStreamLimit > this._streamLimit); 138 this._streamSlotsFree += newStreamLimit - this._streamLimit; 139 this._streamLimit = newStreamLimit; 140 if (wakeup) { 141 this.emit('wakeup'); 142 } 143 }; 144 145 Connection.prototype._changeStreamCount = function _changeStreamCount(change) { 146 if (change) { 147 this._log.trace({ free: this._streamSlotsFree, change: change }, 148 'Changing active stream count.'); 149 var wakeup = (this._streamSlotsFree === 0) && (change < 0); 150 this._streamSlotsFree -= change; 151 if (wakeup) { 152 this.emit('wakeup'); 153 } 154 } 155 }; 156 157 // Creating a new *inbound or outbound* stream with the given `id` (which is undefined in case of 158 // an outbound stream) consists of three steps: 159 // 160 // 1. var stream = new Stream(this._log, this); 161 // 2. this._allocateId(stream, id); 162 // 2. this._allocatePriority(stream); 163 164 // Allocating an ID to a stream 165 Connection.prototype._allocateId = function _allocateId(stream, id) { 166 // * initiated stream without definite ID 167 if (id === undefined) { 168 id = this._nextStreamId; 169 this._nextStreamId += 2; 170 } 171 172 // * incoming stream with a legitim ID (larger than any previous and different parity than ours) 173 else if ((id > this._lastIncomingStream) && ((id - this._nextStreamId) % 2 !== 0)) { 174 this._lastIncomingStream = id; 175 } 176 177 // * incoming stream with invalid ID 178 else { 179 this._log.error({ stream_id: id, lastIncomingStream: this._lastIncomingStream }, 180 'Invalid incoming stream ID.'); 181 this.emit('error', 'PROTOCOL_ERROR'); 182 return undefined; 183 } 184 185 assert(!(id in this._streamIds)); 186 187 // * adding to `this._streamIds` 188 this._log.trace({ s: stream, stream_id: id }, 'Allocating ID for stream.'); 189 this._streamIds[id] = stream; 190 stream.id = id; 191 this.emit('new_stream', stream, id); 192 193 // * forwarding connection errors from streams 194 stream.on('connectionError', this.emit.bind(this, 'error')); 195 196 return id; 197 }; 198 199 // Allocating a priority to a stream, and managing priority changes 200 Connection.prototype._allocatePriority = function _allocatePriority(stream) { 201 this._log.trace({ s: stream }, 'Allocating priority for stream.'); 202 this._insert(stream, stream._priority); 203 stream.on('priority', this._reprioritize.bind(this, stream)); 204 stream.upstream.on('readable', this.emit.bind(this, 'wakeup')); 205 this.emit('wakeup'); 206 }; 207 208 Connection.prototype._insert = function _insert(stream, priority) { 209 if (priority in this._streamPriorities) { 210 this._streamPriorities[priority].push(stream); 211 } else { 212 this._streamPriorities[priority] = [stream]; 213 } 214 }; 215 216 Connection.prototype._reprioritize = function _reprioritize(stream, priority) { 217 var bucket = this._streamPriorities[stream._priority]; 218 var index = bucket.indexOf(stream); 219 assert(index !== -1); 220 bucket.splice(index, 1); 221 if (bucket.length === 0) { 222 delete this._streamPriorities[stream._priority]; 223 } 224 225 this._insert(stream, priority); 226 }; 227 228 // Creating an *inbound* stream with the given ID. It is called when there's an incoming frame to 229 // a previously nonexistent stream. 230 Connection.prototype._createIncomingStream = function _createIncomingStream(id) { 231 this._log.debug({ stream_id: id }, 'New incoming stream.'); 232 233 var stream = new Stream(this._log, this); 234 this._allocateId(stream, id); 235 this._allocatePriority(stream); 236 this.emit('stream', stream, id); 237 238 return stream; 239 }; 240 241 // Creating an *outbound* stream 242 Connection.prototype.createStream = function createStream() { 243 this._log.trace('Creating new outbound stream.'); 244 245 // * Receiving is enabled immediately, and an ID gets assigned to the stream 246 var stream = new Stream(this._log, this); 247 this._allocatePriority(stream); 248 249 return stream; 250 }; 251 252 // Multiplexing 253 // ------------ 254 255 Connection.prototype._initializeMultiplexing = function _initializeMultiplexing() { 256 this.on('window_update', this.emit.bind(this, 'wakeup')); 257 this._sendScheduled = false; 258 this._firstFrameReceived = false; 259 }; 260 261 // The `_send` method is a virtual method of the [Flow class](flow.html) that has to be implemented 262 // by child classes. It reads frames from streams and pushes them to the output buffer. 263 Connection.prototype._send = function _send(immediate) { 264 // * Do not do anything if the connection is already closed 265 if (this._closed) { 266 return; 267 } 268 269 // * Collapsing multiple calls in a turn into a single deferred call 270 if (immediate) { 271 this._sendScheduled = false; 272 } else { 273 if (!this._sendScheduled) { 274 this._sendScheduled = true; 275 setImmediate(this._send.bind(this, true)); 276 } 277 return; 278 } 279 280 this._log.trace('Starting forwarding frames from streams.'); 281 282 // * Looping through priority `bucket`s in priority order. 283 priority_loop: 284 for (var priority in this._streamPriorities) { 285 var bucket = this._streamPriorities[priority]; 286 var nextBucket = []; 287 288 // * Forwarding frames from buckets with round-robin scheduling. 289 // 1. pulling out frame 290 // 2. if there's no frame, skip this stream 291 // 3. if forwarding this frame would make `streamCount` greater than `streamLimit`, skip 292 // this stream 293 // 4. adding stream to the bucket of the next round 294 // 5. assigning an ID to the frame (allocating an ID to the stream if there isn't already) 295 // 6. if forwarding a PUSH_PROMISE, allocate ID to the promised stream 296 // 7. forwarding the frame, changing `streamCount` as appropriate 297 // 8. stepping to the next stream if there's still more frame needed in the output buffer 298 // 9. switching to the bucket of the next round 299 while (bucket.length > 0) { 300 for (var index = 0; index < bucket.length; index++) { 301 var stream = bucket[index]; 302 var frame = stream.upstream.read((this._window > 0) ? this._window : -1); 303 304 if (!frame) { 305 continue; 306 } else if (frame.count_change > this._streamSlotsFree) { 307 stream.upstream.unshift(frame); 308 continue; 309 } 310 311 nextBucket.push(stream); 312 313 if (frame.stream === undefined) { 314 frame.stream = stream.id || this._allocateId(stream); 315 } 316 317 if (frame.type === 'PUSH_PROMISE') { 318 this._allocatePriority(frame.promised_stream); 319 frame.promised_stream = this._allocateId(frame.promised_stream); 320 } 321 322 this._log.trace({ s: stream, frame: frame }, 'Forwarding outgoing frame'); 323 var moreNeeded = this.push(frame); 324 this._changeStreamCount(frame.count_change); 325 326 assert(moreNeeded !== null); // The frame shouldn't be unforwarded 327 if (moreNeeded === false) { 328 break priority_loop; 329 } 330 } 331 332 bucket = nextBucket; 333 nextBucket = []; 334 } 335 } 336 337 // * if we couldn't forward any frame, then sleep until window update, or some other wakeup event 338 if (moreNeeded === undefined) { 339 this.once('wakeup', this._send.bind(this)); 340 } 341 342 this._log.trace({ moreNeeded: moreNeeded }, 'Stopping forwarding frames from streams.'); 343 }; 344 345 // The `_receive` method is another virtual method of the [Flow class](flow.html) that has to be 346 // implemented by child classes. It forwards the given frame to the appropriate stream: 347 Connection.prototype._receive = function _receive(frame, done) { 348 this._log.trace({ frame: frame }, 'Forwarding incoming frame'); 349 350 // * first frame needs to be checked by the `_onFirstFrameReceived` method 351 if (!this._firstFrameReceived) { 352 this._firstFrameReceived = true; 353 this._onFirstFrameReceived(frame); 354 } 355 356 // Do some sanity checking here before we create a stream 357 if ((frame.type == 'SETTINGS' || 358 frame.type == 'PING' || 359 frame.type == 'GOAWAY') && 360 frame.stream != 0) { 361 // Got connection-level frame on a stream - EEP! 362 this.close('PROTOCOL_ERROR'); 363 return; 364 } else if ((frame.type == 'DATA' || 365 frame.type == 'HEADERS' || 366 frame.type == 'PRIORITY' || 367 frame.type == 'RST_STREAM' || 368 frame.type == 'PUSH_PROMISE' || 369 frame.type == 'CONTINUATION') && 370 frame.stream == 0) { 371 // Got stream-level frame on connection - EEP! 372 this.close('PROTOCOL_ERROR'); 373 return; 374 } 375 // WINDOW_UPDATE can be on either stream or connection 376 377 // * gets the appropriate stream from the stream registry 378 var stream = this._streamIds[frame.stream]; 379 380 // * or creates one if it's not in `this.streams` 381 if (!stream) { 382 stream = this._createIncomingStream(frame.stream); 383 } 384 385 // * in case of PUSH_PROMISE, replaces the promised stream id with a new incoming stream 386 if (frame.type === 'PUSH_PROMISE') { 387 frame.promised_stream = this._createIncomingStream(frame.promised_stream); 388 } 389 390 frame.count_change = this._changeStreamCount.bind(this); 391 392 // * and writes it to the `stream`'s `upstream` 393 stream.upstream.write(frame); 394 395 done(); 396 }; 397 398 // Settings management 399 // ------------------- 400 401 var defaultSettings = { 402 }; 403 404 // Settings management initialization: 405 Connection.prototype._initializeSettingsManagement = function _initializeSettingsManagement(settings) { 406 // * Setting up the callback queue for setting acknowledgements 407 this._settingsAckCallbacks = []; 408 409 // * Sending the initial settings. 410 this._log.debug({ settings: settings }, 411 'Sending the first SETTINGS frame as part of the connection header.'); 412 this.set(settings || defaultSettings); 413 414 // * Forwarding SETTINGS frames to the `_receiveSettings` method 415 this.on('SETTINGS', this._receiveSettings); 416 this.on('RECEIVING_SETTINGS_MAX_FRAME_SIZE', this._sanityCheckMaxFrameSize); 417 }; 418 419 // * Checking that the first frame the other endpoint sends is SETTINGS 420 Connection.prototype._onFirstFrameReceived = function _onFirstFrameReceived(frame) { 421 if ((frame.stream === 0) && (frame.type === 'SETTINGS')) { 422 this._log.debug('Receiving the first SETTINGS frame as part of the connection header.'); 423 } else { 424 this._log.fatal({ frame: frame }, 'Invalid connection header: first frame is not SETTINGS.'); 425 this.emit('error', 'PROTOCOL_ERROR'); 426 } 427 }; 428 429 // Handling of incoming SETTINGS frames. 430 Connection.prototype._receiveSettings = function _receiveSettings(frame) { 431 // * If it's an ACK, call the appropriate callback 432 if (frame.flags.ACK) { 433 var callback = this._settingsAckCallbacks.shift(); 434 if (callback) { 435 callback(); 436 } 437 } 438 439 // * If it's a setting change request, then send an ACK and change the appropriate settings 440 else { 441 if (!this._closed) { 442 this.push({ 443 type: 'SETTINGS', 444 flags: { ACK: true }, 445 stream: 0, 446 settings: {} 447 }); 448 } 449 for (var name in frame.settings) { 450 this.emit('RECEIVING_' + name, frame.settings[name]); 451 } 452 } 453 }; 454 455 Connection.prototype._sanityCheckMaxFrameSize = function _sanityCheckMaxFrameSize(value) { 456 if ((value < 0x4000) || (value >= 0x01000000)) { 457 this._log.fatal('Received invalid value for max frame size: ' + value); 458 this.emit('error'); 459 } 460 }; 461 462 // Changing one or more settings value and sending out a SETTINGS frame 463 Connection.prototype.set = function set(settings, callback) { 464 // * Calling the callback and emitting event when the change is acknowledges 465 var self = this; 466 this._settingsAckCallbacks.push(function() { 467 for (var name in settings) { 468 self.emit('ACKNOWLEDGED_' + name, settings[name]); 469 } 470 if (callback) { 471 callback(); 472 } 473 }); 474 475 // * Sending out the SETTINGS frame 476 this.push({ 477 type: 'SETTINGS', 478 flags: { ACK: false }, 479 stream: 0, 480 settings: settings 481 }); 482 for (var name in settings) { 483 this.emit('SENDING_' + name, settings[name]); 484 } 485 }; 486 487 // Lifecycle management 488 // -------------------- 489 490 // The main responsibilities of lifecycle management code: 491 // 492 // * keeping the connection alive by 493 // * sending PINGs when the connection is idle 494 // * answering PINGs 495 // * ending the connection 496 497 Connection.prototype._initializeLifecycleManagement = function _initializeLifecycleManagement() { 498 this._pings = {}; 499 this.on('PING', this._receivePing); 500 this.on('GOAWAY', this._receiveGoaway); 501 this._closed = false; 502 }; 503 504 // Generating a string of length 16 with random hexadecimal digits 505 Connection.prototype._generatePingId = function _generatePingId() { 506 do { 507 var id = ''; 508 for (var i = 0; i < 16; i++) { 509 id += Math.floor(Math.random()*16).toString(16); 510 } 511 } while(id in this._pings); 512 return id; 513 }; 514 515 // Sending a ping and calling `callback` when the answer arrives 516 Connection.prototype.ping = function ping(callback) { 517 var id = this._generatePingId(); 518 var data = Buffer.from(id, 'hex'); 519 this._pings[id] = callback; 520 521 this._log.debug({ data: data }, 'Sending PING.'); 522 this.push({ 523 type: 'PING', 524 flags: { 525 ACK: false 526 }, 527 stream: 0, 528 data: data 529 }); 530 }; 531 532 // Answering pings 533 Connection.prototype._receivePing = function _receivePing(frame) { 534 if (frame.flags.ACK) { 535 var id = frame.data.toString('hex'); 536 if (id in this._pings) { 537 this._log.debug({ data: frame.data }, 'Receiving answer for a PING.'); 538 var callback = this._pings[id]; 539 if (callback) { 540 callback(); 541 } 542 delete this._pings[id]; 543 } else { 544 this._log.warn({ data: frame.data }, 'Unsolicited PING answer.'); 545 } 546 547 } else { 548 this._log.debug({ data: frame.data }, 'Answering PING.'); 549 this.push({ 550 type: 'PING', 551 flags: { 552 ACK: true 553 }, 554 stream: 0, 555 data: frame.data 556 }); 557 } 558 }; 559 560 Connection.prototype.originFrame = function originFrame(originList) { 561 this._log.debug(originList, 'emitting origin frame'); 562 563 this.push({ 564 type: 'ORIGIN', 565 flags: {}, 566 stream: 0, 567 originList : originList, 568 }); 569 }; 570 571 // Terminating the connection 572 Connection.prototype.close = function close(error) { 573 if (this._closed) { 574 this._log.warn('Trying to close an already closed connection'); 575 return; 576 } 577 578 this._log.debug({ error: error }, 'Closing the connection'); 579 this.push({ 580 type: 'GOAWAY', 581 flags: {}, 582 stream: 0, 583 last_stream: this._lastIncomingStream, 584 error: error || 'NO_ERROR' 585 }); 586 this.push(null); 587 this._closed = true; 588 }; 589 590 Connection.prototype._receiveGoaway = function _receiveGoaway(frame) { 591 this._log.debug({ error: frame.error }, 'Other end closed the connection'); 592 this.push(null); 593 this._closed = true; 594 if (frame.error !== 'NO_ERROR') { 595 this.emit('peerError', frame.error); 596 } 597 }; 598 599 // Flow control 600 // ------------ 601 602 Connection.prototype._initializeFlowControl = function _initializeFlowControl() { 603 // Handling of initial window size of individual streams. 604 this._initialStreamWindowSize = INITIAL_STREAM_WINDOW_SIZE; 605 this.on('new_stream', function(stream) { 606 stream.upstream.setInitialWindow(this._initialStreamWindowSize); 607 }); 608 this.on('RECEIVING_SETTINGS_INITIAL_WINDOW_SIZE', this._setInitialStreamWindowSize); 609 this._streamIds[0].upstream.setInitialWindow = function noop() {}; 610 }; 611 612 // The initial connection flow control window is 65535 bytes. 613 var INITIAL_STREAM_WINDOW_SIZE = 65535; 614 615 // A SETTINGS frame can alter the initial flow control window size for all current streams. When the 616 // value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the window size of all 617 // stream by calling the `setInitialStreamWindowSize` method. The window size has to be modified by 618 // the difference between the new value and the old value. 619 Connection.prototype._setInitialStreamWindowSize = function _setInitialStreamWindowSize(size) { 620 if ((this._initialStreamWindowSize === Infinity) && (size !== Infinity)) { 621 this._log.error('Trying to manipulate initial flow control window size after flow control was turned off.'); 622 this.emit('error', 'FLOW_CONTROL_ERROR'); 623 } else { 624 this._log.debug({ size: size }, 'Changing stream initial window size.'); 625 this._initialStreamWindowSize = size; 626 this._streamIds.forEach(function(stream) { 627 stream.upstream.setInitialWindow(size); 628 }); 629 } 630 };