stream.js (25913B)
1 var assert = require('assert'); 2 3 // The Stream class 4 // ================ 5 6 // Stream is a [Duplex stream](https://nodejs.org/api/stream.html#stream_class_stream_duplex) 7 // subclass that implements the [HTTP/2 Stream](https://tools.ietf.org/html/rfc7540#section-5) 8 // concept. It has two 'sides': one that is used by the user to send/receive data (the `stream` 9 // object itself) and one that is used by a Connection to read/write frames to/from the other peer 10 // (`stream.upstream`). 11 12 var Duplex = require('stream').Duplex; 13 14 exports.Stream = Stream; 15 16 // Public API 17 // ---------- 18 19 // * **new Stream(log, connection)**: create a new Stream 20 // 21 // * **Event: 'headers' (headers)**: signals incoming headers 22 // 23 // * **Event: 'promise' (stream, headers)**: signals an incoming push promise 24 // 25 // * **Event: 'priority' (priority)**: signals a priority change. `priority` is a number between 0 26 // (highest priority) and 2^31-1 (lowest priority). Default value is 2^30. 27 // 28 // * **Event: 'error' (type)**: signals an error 29 // 30 // * **headers(headers)**: send headers 31 // 32 // * **promise(headers): Stream**: promise a stream 33 // 34 // * **priority(priority)**: set the priority of the stream. Priority can be changed by the peer 35 // too, but once it is set locally, it can not be changed remotely. 36 // 37 // * **reset(error)**: reset the stream with an error code 38 // 39 // * **upstream**: a [Flow](flow.js) that is used by the parent connection to write/read frames 40 // that are to be sent/arrived to/from the peer and are related to this stream. 41 // 42 // Headers are always in the [regular node.js header format][1]. 43 // [1]: https://nodejs.org/api/http.html#http_message_headers 44 45 // Constructor 46 // ----------- 47 48 // The main aspects of managing the stream are: 49 function Stream(log, connection) { 50 Duplex.call(this); 51 52 // * logging 53 this._log = log.child({ component: 'stream', s: this }); 54 55 // * receiving and sending stream management commands 56 this._initializeManagement(); 57 58 // * sending and receiving frames to/from the upstream connection 59 this._initializeDataFlow(); 60 61 // * maintaining the state of the stream (idle, open, closed, etc.) and error detection 62 this._initializeState(); 63 64 this.connection = connection; 65 this.sentEndStream = false; 66 } 67 68 Stream.prototype = Object.create(Duplex.prototype, { constructor: { value: Stream } }); 69 70 // Managing the stream 71 // ------------------- 72 73 // the default stream priority is 2^30 74 var DEFAULT_PRIORITY = Math.pow(2, 30); 75 var MAX_PRIORITY = Math.pow(2, 31) - 1; 76 77 // PUSH_PROMISE and HEADERS are forwarded to the user through events. 78 Stream.prototype._initializeManagement = function _initializeManagement() { 79 this._resetSent = false; 80 this._priority = DEFAULT_PRIORITY; 81 this._letPeerPrioritize = true; 82 }; 83 84 Stream.prototype.promise = function promise(headers) { 85 var stream = new Stream(this._log, this.connection); 86 stream._priority = Math.min(this._priority + 1, MAX_PRIORITY); 87 this._pushUpstream({ 88 type: 'PUSH_PROMISE', 89 flags: {}, 90 stream: this.id, 91 promised_stream: stream, 92 headers: headers 93 }); 94 return stream; 95 }; 96 97 Stream.prototype._onPromise = function _onPromise(frame) { 98 this.emit('promise', frame.promised_stream, frame.headers); 99 }; 100 101 Stream.prototype.headers = function headers(headers) { 102 this._pushUpstream({ 103 type: 'HEADERS', 104 flags: {}, 105 stream: this.id, 106 headers: headers 107 }); 108 }; 109 110 Stream.prototype.trailers = function trailers(trailers) { 111 this.sentEndStream = true; 112 this._pushUpstream({ 113 type: 'HEADERS', 114 flags: {'END_STREAM': true}, 115 stream: this.id, 116 headers: trailers 117 }); 118 }; 119 120 Stream.prototype._onHeaders = function _onHeaders(frame) { 121 if (frame.priority !== undefined) { 122 this.priority(frame.priority, true); 123 } 124 this.emit('headers', frame.headers); 125 }; 126 127 Stream.prototype.priority = function priority(priority, peer) { 128 if ((peer && this._letPeerPrioritize) || !peer) { 129 if (!peer) { 130 this._letPeerPrioritize = false; 131 132 var lastFrame = this.upstream.getLastQueuedFrame(); 133 if (lastFrame && ((lastFrame.type === 'HEADERS') || (lastFrame.type === 'PRIORITY'))) { 134 lastFrame.priority = priority; 135 } else { 136 this._pushUpstream({ 137 type: 'PRIORITY', 138 flags: {}, 139 stream: this.id, 140 priority: priority 141 }); 142 } 143 } 144 145 this._log.debug({ priority: priority }, 'Changing priority'); 146 this.emit('priority', priority); 147 this._priority = priority; 148 } 149 }; 150 151 Stream.prototype._onPriority = function _onPriority(frame) { 152 this.priority(frame.priority, true); 153 }; 154 155 // Resetting the stream. Normally, an endpoint SHOULD NOT send more than one RST_STREAM frame for 156 // any stream. 157 Stream.prototype.reset = function reset(error) { 158 if (!this._resetSent) { 159 this._resetSent = true; 160 this._pushUpstream({ 161 type: 'RST_STREAM', 162 flags: {}, 163 stream: this.id, 164 error: error 165 }); 166 } 167 }; 168 169 // Specify an alternate service for the origin of this stream 170 Stream.prototype.altsvc = function altsvc(host, port, protocolID, maxAge, origin) { 171 var stream; 172 if (origin) { 173 stream = 0; 174 } else { 175 stream = this.id; 176 } 177 this._pushUpstream({ 178 type: 'ALTSVC', 179 flags: {}, 180 stream: stream, 181 host: host, 182 port: port, 183 protocolID: protocolID, 184 origin: origin, 185 maxAge: maxAge 186 }); 187 }; 188 189 // Data flow 190 // --------- 191 192 // The incoming and the generated outgoing frames are received/transmitted on the `this.upstream` 193 // [Flow](flow.html). The [Connection](connection.html) object instantiating the stream will read 194 // and write frames to/from it. The stream itself is a regular [Duplex stream][1], and is used by 195 // the user to write or read the body of the request. 196 // [1]: https://nodejs.org/api/stream.html#stream_class_stream_duplex 197 198 // upstream side stream user side 199 // 200 // +------------------------------------+ 201 // | | 202 // +------------------+ | 203 // | upstream | | 204 // | | | 205 // +--+ | +--| 206 // read() | | _send() | _write() | | write(buf) 207 // <--------------|B |<--------------|--------------| B|<------------ 208 // | | | | | 209 // frames +--+ | +--| buffers 210 // | | | | | 211 // -------------->|B |---------------|------------->| B|------------> 212 // write(frame) | | _receive() | _read() | | read() 213 // +--+ | +--| 214 // | | | 215 // | | | 216 // +------------------+ | 217 // | | 218 // +------------------------------------+ 219 // 220 // B: input or output buffer 221 222 var Flow = require('./flow').Flow; 223 224 Stream.prototype._initializeDataFlow = function _initializeDataFlow() { 225 this.id = undefined; 226 227 this._ended = false; 228 229 this.upstream = new Flow(); 230 this.upstream._log = this._log; 231 this.upstream._send = this._send.bind(this); 232 this.upstream._receive = this._receive.bind(this); 233 this.upstream.write = this._writeUpstream.bind(this); 234 this.upstream.on('error', this.emit.bind(this, 'error')); 235 236 this.on('finish', this._finishing); 237 }; 238 239 Stream.prototype._pushUpstream = function _pushUpstream(frame) { 240 this.upstream.push(frame); 241 this._transition(true, frame); 242 }; 243 244 // Overriding the upstream's `write` allows us to act immediately instead of waiting for the input 245 // queue to empty. This is important in case of control frames. 246 Stream.prototype._writeUpstream = function _writeUpstream(frame) { 247 this._log.debug({ frame: frame }, 'Receiving frame'); 248 249 var moreNeeded = Flow.prototype.write.call(this.upstream, frame); 250 251 // * Transition to a new state if that's the effect of receiving the frame 252 this._transition(false, frame); 253 254 // * If it's a control frame. Call the appropriate handler method. 255 if (frame.type === 'HEADERS') { 256 if (this._processedHeaders && !frame.flags['END_STREAM']) { 257 this.emit('error', 'PROTOCOL_ERROR'); 258 } 259 this._processedHeaders = true; 260 this._onHeaders(frame); 261 } else if (frame.type === 'PUSH_PROMISE') { 262 this._onPromise(frame); 263 } else if (frame.type === 'PRIORITY') { 264 this._onPriority(frame); 265 } else if (frame.type === 'ALTSVC') { 266 // TODO 267 } else if (frame.type === 'ORIGIN') { 268 // TODO 269 } 270 271 // * If it's an invalid stream level frame, emit error 272 else if ((frame.type !== 'DATA') && 273 (frame.type !== 'WINDOW_UPDATE') && 274 (frame.type !== 'RST_STREAM')) { 275 this._log.error({ frame: frame }, 'Invalid stream level frame'); 276 this.emit('error', 'PROTOCOL_ERROR'); 277 } 278 279 return moreNeeded; 280 }; 281 282 // The `_receive` method (= `upstream._receive`) gets called when there's an incoming frame. 283 Stream.prototype._receive = function _receive(frame, ready) { 284 // * If it's a DATA frame, then push the payload into the output buffer on the other side. 285 // Call ready when the other side is ready to receive more. 286 if (!this._ended && (frame.type === 'DATA')) { 287 var moreNeeded = this.push(frame.data); 288 if (!moreNeeded) { 289 this._receiveMore = ready; 290 } 291 } 292 293 // * Any frame may signal the end of the stream with the END_STREAM flag 294 if (!this._ended && (frame.flags.END_STREAM || (frame.type === 'RST_STREAM'))) { 295 this.push(null); 296 this._ended = true; 297 } 298 299 // * Postpone calling `ready` if `push()` returned a falsy value 300 if (this._receiveMore !== ready) { 301 ready(); 302 } 303 }; 304 305 // The `_read` method is called when the user side is ready to receive more data. If there's a 306 // pending write on the upstream, then call its pending ready callback to receive more frames. 307 Stream.prototype._read = function _read() { 308 if (this._receiveMore) { 309 var receiveMore = this._receiveMore; 310 delete this._receiveMore; 311 receiveMore(); 312 } 313 }; 314 315 // The `write` method gets called when there's a write request from the user. 316 Stream.prototype._write = function _write(buffer, encoding, ready) { 317 // * Chunking is done by the upstream Flow. 318 var moreNeeded = this._pushUpstream({ 319 type: 'DATA', 320 flags: {}, 321 stream: this.id, 322 data: buffer 323 }); 324 325 // * Call ready when upstream is ready to receive more frames. 326 if (moreNeeded) { 327 ready(); 328 } else { 329 this._sendMore = ready; 330 } 331 }; 332 333 // The `_send` (= `upstream._send`) method is called when upstream is ready to receive more frames. 334 // If there's a pending write on the user side, then call its pending ready callback to receive more 335 // writes. 336 Stream.prototype._send = function _send() { 337 if (this._sendMore) { 338 var sendMore = this._sendMore; 339 delete this._sendMore; 340 sendMore(); 341 } 342 }; 343 344 // When the stream is finishing (the user calls `end()` on it), then we have to set the `END_STREAM` 345 // flag on the last frame. If there's no frame in the queue, or if it doesn't support this flag, 346 // then we create a 0 length DATA frame. We could do this all the time, but putting the flag on an 347 // existing frame is a nice optimization. 348 var emptyBuffer = Buffer.alloc(0); 349 Stream.prototype._finishing = function _finishing() { 350 var endFrame = { 351 type: 'DATA', 352 flags: { END_STREAM: true }, 353 stream: this.id, 354 data: emptyBuffer 355 }; 356 357 if (this.sentEndStream) { 358 this._log.debug('Already sent END_STREAM, not sending again.'); 359 return; 360 } 361 362 this.sentEndStream = true; 363 var lastFrame = this.upstream.getLastQueuedFrame(); 364 if (lastFrame && ((lastFrame.type === 'DATA') || (lastFrame.type === 'HEADERS'))) { 365 this._log.debug({ frame: lastFrame }, 'Marking last frame with END_STREAM flag.'); 366 lastFrame.flags.END_STREAM = true; 367 this._transition(true, endFrame); 368 } else { 369 this._pushUpstream(endFrame); 370 } 371 }; 372 373 // [Stream States](https://tools.ietf.org/html/rfc7540#section-5.1) 374 // ---------------- 375 // 376 // +--------+ 377 // PP | | PP 378 // ,--------| idle |--------. 379 // / | | \ 380 // v +--------+ v 381 // +----------+ | +----------+ 382 // | | | H | | 383 // ,---| reserved | | | reserved |---. 384 // | | (local) | v | (remote) | | 385 // | +----------+ +--------+ +----------+ | 386 // | | ES | | ES | | 387 // | | H ,-------| open |-------. | H | 388 // | | / | | \ | | 389 // | v v +--------+ v v | 390 // | +----------+ | +----------+ | 391 // | | half | | | half | | 392 // | | closed | | R | closed | | 393 // | | (remote) | | | (local) | | 394 // | +----------+ | +----------+ | 395 // | | v | | 396 // | | ES / R +--------+ ES / R | | 397 // | `----------->| |<-----------' | 398 // | R | closed | R | 399 // `-------------------->| |<--------------------' 400 // +--------+ 401 402 // Streams begin in the IDLE state and transitions happen when there's an incoming or outgoing frame 403 Stream.prototype._initializeState = function _initializeState() { 404 this.state = 'IDLE'; 405 this._initiated = undefined; 406 this._closedByUs = undefined; 407 this._closedWithRst = undefined; 408 this._processedHeaders = false; 409 }; 410 411 // Only `_setState` should change `this.state` directly. It also logs the state change and notifies 412 // interested parties using the 'state' event. 413 Stream.prototype._setState = function transition(state) { 414 assert(this.state !== state); 415 this._log.debug({ from: this.state, to: state }, 'State transition'); 416 this.state = state; 417 this.emit('state', state); 418 }; 419 420 // A state is 'active' if the stream in that state counts towards the concurrency limit. Streams 421 // that are in the "open" state, or either of the "half closed" states count toward this limit. 422 function activeState(state) { 423 return ((state === 'HALF_CLOSED_LOCAL') || (state === 'HALF_CLOSED_REMOTE') || (state === 'OPEN')); 424 } 425 426 // `_transition` is called every time there's an incoming or outgoing frame. It manages state 427 // transitions, and detects stream errors. A stream error is always caused by a frame that is not 428 // allowed in the current state. 429 Stream.prototype._transition = function transition(sending, frame) { 430 var receiving = !sending; 431 var connectionError; 432 var streamError; 433 434 var DATA = false, HEADERS = false, PRIORITY = false, ALTSVC = false, ORIGIN = false; 435 var RST_STREAM = false, PUSH_PROMISE = false, WINDOW_UPDATE = false; 436 switch(frame.type) { 437 case 'DATA' : DATA = true; break; 438 case 'HEADERS' : HEADERS = true; break; 439 case 'PRIORITY' : PRIORITY = true; break; 440 case 'RST_STREAM' : RST_STREAM = true; break; 441 case 'PUSH_PROMISE' : PUSH_PROMISE = true; break; 442 case 'WINDOW_UPDATE': WINDOW_UPDATE = true; break; 443 case 'ALTSVC' : ALTSVC = true; break; 444 case 'ORIGIN' : ORIGIN = true; break; 445 } 446 447 var previousState = this.state; 448 449 switch (this.state) { 450 // All streams start in the **idle** state. In this state, no frames have been exchanged. 451 // 452 // * Sending or receiving a HEADERS frame causes the stream to become "open". 453 // 454 // When the HEADERS frame contains the END_STREAM flags, then two state transitions happen. 455 case 'IDLE': 456 if (HEADERS) { 457 this._setState('OPEN'); 458 if (frame.flags.END_STREAM) { 459 this._setState(sending ? 'HALF_CLOSED_LOCAL' : 'HALF_CLOSED_REMOTE'); 460 } 461 this._initiated = sending; 462 } else if (sending && RST_STREAM) { 463 this._setState('CLOSED'); 464 } else if (PRIORITY) { 465 /* No state change */ 466 } else { 467 connectionError = 'PROTOCOL_ERROR'; 468 } 469 break; 470 471 // A stream in the **reserved (local)** state is one that has been promised by sending a 472 // PUSH_PROMISE frame. 473 // 474 // * The endpoint can send a HEADERS frame. This causes the stream to open in a "half closed 475 // (remote)" state. 476 // * Either endpoint can send a RST_STREAM frame to cause the stream to become "closed". This 477 // releases the stream reservation. 478 // * An endpoint may receive PRIORITY frame in this state. 479 // * An endpoint MUST NOT send any other type of frame in this state. 480 case 'RESERVED_LOCAL': 481 if (sending && HEADERS) { 482 this._setState('HALF_CLOSED_REMOTE'); 483 } else if (RST_STREAM) { 484 this._setState('CLOSED'); 485 } else if (PRIORITY) { 486 /* No state change */ 487 } else { 488 connectionError = 'PROTOCOL_ERROR'; 489 } 490 break; 491 492 // A stream in the **reserved (remote)** state has been reserved by a remote peer. 493 // 494 // * Either endpoint can send a RST_STREAM frame to cause the stream to become "closed". This 495 // releases the stream reservation. 496 // * Receiving a HEADERS frame causes the stream to transition to "half closed (local)". 497 // * An endpoint MAY send PRIORITY frames in this state to reprioritize the stream. 498 // * Receiving any other type of frame MUST be treated as a stream error of type PROTOCOL_ERROR. 499 case 'RESERVED_REMOTE': 500 if (RST_STREAM) { 501 this._setState('CLOSED'); 502 } else if (receiving && HEADERS) { 503 this._setState('HALF_CLOSED_LOCAL'); 504 } else if (PRIORITY || ORIGIN) { 505 /* No state change */ 506 } else { 507 connectionError = 'PROTOCOL_ERROR'; 508 } 509 break; 510 511 // The **open** state is where both peers can send frames. In this state, sending peers observe 512 // advertised stream level flow control limits. 513 // 514 // * From this state either endpoint can send a frame with a END_STREAM flag set, which causes 515 // the stream to transition into one of the "half closed" states: an endpoint sending a 516 // END_STREAM flag causes the stream state to become "half closed (local)"; an endpoint 517 // receiving a END_STREAM flag causes the stream state to become "half closed (remote)". 518 // * Either endpoint can send a RST_STREAM frame from this state, causing it to transition 519 // immediately to "closed". 520 case 'OPEN': 521 if (frame.flags.END_STREAM) { 522 this._setState(sending ? 'HALF_CLOSED_LOCAL' : 'HALF_CLOSED_REMOTE'); 523 } else if (RST_STREAM) { 524 this._setState('CLOSED'); 525 } else { 526 /* No state change */ 527 } 528 break; 529 530 // A stream that is **half closed (local)** cannot be used for sending frames. 531 // 532 // * A stream transitions from this state to "closed" when a frame that contains a END_STREAM 533 // flag is received, or when either peer sends a RST_STREAM frame. 534 // * An endpoint MAY send or receive PRIORITY frames in this state to reprioritize the stream. 535 // * WINDOW_UPDATE can be sent by a peer that has sent a frame bearing the END_STREAM flag. 536 case 'HALF_CLOSED_LOCAL': 537 if (RST_STREAM || (receiving && frame.flags.END_STREAM)) { 538 this._setState('CLOSED'); 539 } else if (ORIGIN || ALTSVC || receiving || PRIORITY || (sending && WINDOW_UPDATE)) { 540 /* No state change */ 541 } else { 542 connectionError = 'PROTOCOL_ERROR'; 543 } 544 break; 545 546 // A stream that is **half closed (remote)** is no longer being used by the peer to send frames. 547 // In this state, an endpoint is no longer obligated to maintain a receiver flow control window 548 // if it performs flow control. 549 // 550 // * If an endpoint receives additional frames for a stream that is in this state it MUST 551 // respond with a stream error of type STREAM_CLOSED. 552 // * A stream can transition from this state to "closed" by sending a frame that contains a 553 // END_STREAM flag, or when either peer sends a RST_STREAM frame. 554 // * An endpoint MAY send or receive PRIORITY frames in this state to reprioritize the stream. 555 // * A receiver MAY receive a WINDOW_UPDATE frame on a "half closed (remote)" stream. 556 case 'HALF_CLOSED_REMOTE': 557 if (RST_STREAM || (sending && frame.flags.END_STREAM)) { 558 this._setState('CLOSED'); 559 } else if (ORIGIN || ALTSVC || sending || PRIORITY || (receiving && WINDOW_UPDATE)) { 560 /* No state change */ 561 } else { 562 connectionError = 'PROTOCOL_ERROR'; 563 } 564 break; 565 566 // The **closed** state is the terminal state. 567 // 568 // * An endpoint MUST NOT send frames on a closed stream. An endpoint that receives a frame 569 // after receiving a RST_STREAM or a frame containing a END_STREAM flag on that stream MUST 570 // treat that as a stream error of type STREAM_CLOSED. 571 // * WINDOW_UPDATE, PRIORITY or RST_STREAM frames can be received in this state for a short 572 // period after a frame containing an END_STREAM flag is sent. Until the remote peer receives 573 // and processes the frame bearing the END_STREAM flag, it might send either frame type. 574 // Endpoints MUST ignore WINDOW_UPDATE frames received in this state, though endpoints MAY 575 // choose to treat WINDOW_UPDATE frames that arrive a significant time after sending 576 // END_STREAM as a connection error of type PROTOCOL_ERROR. 577 // * If this state is reached as a result of sending a RST_STREAM frame, the peer that receives 578 // the RST_STREAM might have already sent - or enqueued for sending - frames on the stream 579 // that cannot be withdrawn. An endpoint that sends a RST_STREAM frame MUST ignore frames that 580 // it receives on closed streams after it has sent a RST_STREAM frame. An endpoint MAY choose 581 // to limit the period over which it ignores frames and treat frames that arrive after this 582 // time as being in error. 583 // * An endpoint might receive a PUSH_PROMISE frame after it sends RST_STREAM. PUSH_PROMISE 584 // causes a stream to become "reserved". If promised streams are not desired, a RST_STREAM 585 // can be used to close any of those streams. 586 case 'CLOSED': 587 if (PRIORITY || (sending && RST_STREAM) || 588 (receiving && WINDOW_UPDATE) || 589 (receiving && this._closedByUs && 590 (this._closedWithRst || RST_STREAM || ALTSVC || ORIGIN))) { 591 /* No state change */ 592 } else { 593 streamError = 'STREAM_CLOSED'; 594 } 595 break; 596 } 597 598 // Noting that the connection was closed by the other endpoint. It may be important in edge cases. 599 // For example, when the peer tries to cancel a promised stream, but we already sent every data 600 // on it, then the stream is in CLOSED state, yet we want to ignore the incoming RST_STREAM. 601 if ((this.state === 'CLOSED') && (previousState !== 'CLOSED')) { 602 this._closedByUs = sending; 603 this._closedWithRst = RST_STREAM; 604 } 605 606 // Sending/receiving a PUSH_PROMISE 607 // 608 // * Sending a PUSH_PROMISE frame marks the associated stream for later use. The stream state 609 // for the reserved stream transitions to "reserved (local)". 610 // * Receiving a PUSH_PROMISE frame marks the associated stream as reserved by the remote peer. 611 // The state of the stream becomes "reserved (remote)". 612 if (PUSH_PROMISE && !connectionError && !streamError) { 613 /* This assertion must hold, because _transition is called immediately when a frame is written 614 to the stream. If it would be called when a frame gets out of the input queue, the state 615 of the reserved could have been changed by then. */ 616 assert(frame.promised_stream.state === 'IDLE', frame.promised_stream.state); 617 frame.promised_stream._setState(sending ? 'RESERVED_LOCAL' : 'RESERVED_REMOTE'); 618 frame.promised_stream._initiated = sending; 619 } 620 621 // Signaling how sending/receiving this frame changes the active stream count (-1, 0 or +1) 622 if (this._initiated) { 623 var change = (activeState(this.state) - activeState(previousState)); 624 if (sending) { 625 frame.count_change = change; 626 } else { 627 frame.count_change(change); 628 } 629 } else if (sending) { 630 frame.count_change = 0; 631 } 632 633 // Common error handling. 634 if (connectionError || streamError) { 635 var info = { 636 error: connectionError, 637 frame: frame, 638 state: this.state, 639 closedByUs: this._closedByUs, 640 closedWithRst: this._closedWithRst 641 }; 642 643 // * When sending something invalid, throwing an exception, since it is probably a bug. 644 if (sending) { 645 this._log.error(info, 'Sending illegal frame.'); 646 return this.emit('error', new Error('Sending illegal frame (' + frame.type + ') in ' + this.state + ' state.')); 647 } 648 649 // * In case of a serious problem, emitting and error and letting someone else handle it 650 // (e.g. closing the connection) 651 // * When receiving something invalid, sending an RST_STREAM using the `reset` method. 652 // This will automatically cause a transition to the CLOSED state. 653 else { 654 this._log.error(info, 'Received illegal frame.'); 655 if (connectionError) { 656 this.emit('connectionError', connectionError); 657 } else { 658 this.reset(streamError); 659 this.emit('error', streamError); 660 } 661 } 662 } 663 }; 664 665 // Bunyan serializers 666 // ------------------ 667 668 exports.serializers = {}; 669 670 var nextId = 0; 671 exports.serializers.s = function(stream) { 672 if (!('_id' in stream)) { 673 stream._id = nextId; 674 nextId += 1; 675 } 676 return stream._id; 677 };