receiver.js (14511B)
1 'use strict'; 2 3 const { Writable } = require('stream'); 4 5 const PerMessageDeflate = require('./permessage-deflate'); 6 const { 7 BINARY_TYPES, 8 EMPTY_BUFFER, 9 kStatusCode, 10 kWebSocket 11 } = require('./constants'); 12 const { concat, toArrayBuffer, unmask } = require('./buffer-util'); 13 const { isValidStatusCode, isValidUTF8 } = require('./validation'); 14 15 const GET_INFO = 0; 16 const GET_PAYLOAD_LENGTH_16 = 1; 17 const GET_PAYLOAD_LENGTH_64 = 2; 18 const GET_MASK = 3; 19 const GET_DATA = 4; 20 const INFLATING = 5; 21 22 /** 23 * HyBi Receiver implementation. 24 * 25 * @extends Writable 26 */ 27 class Receiver extends Writable { 28 /** 29 * Creates a Receiver instance. 30 * 31 * @param {Object} [options] Options object 32 * @param {String} [options.binaryType=nodebuffer] The type for binary data 33 * @param {Object} [options.extensions] An object containing the negotiated 34 * extensions 35 * @param {Boolean} [options.isServer=false] Specifies whether to operate in 36 * client or server mode 37 * @param {Number} [options.maxPayload=0] The maximum allowed message length 38 * @param {Boolean} [options.skipUTF8Validation=false] Specifies whether or 39 * not to skip UTF-8 validation for text and close messages 40 */ 41 constructor(options = {}) { 42 super(); 43 44 this._binaryType = options.binaryType || BINARY_TYPES[0]; 45 this._extensions = options.extensions || {}; 46 this._isServer = !!options.isServer; 47 this._maxPayload = options.maxPayload | 0; 48 this._skipUTF8Validation = !!options.skipUTF8Validation; 49 this[kWebSocket] = undefined; 50 51 this._bufferedBytes = 0; 52 this._buffers = []; 53 54 this._compressed = false; 55 this._payloadLength = 0; 56 this._mask = undefined; 57 this._fragmented = 0; 58 this._masked = false; 59 this._fin = false; 60 this._opcode = 0; 61 62 this._totalPayloadLength = 0; 63 this._messageLength = 0; 64 this._fragments = []; 65 66 this._state = GET_INFO; 67 this._loop = false; 68 } 69 70 /** 71 * Implements `Writable.prototype._write()`. 72 * 73 * @param {Buffer} chunk The chunk of data to write 74 * @param {String} encoding The character encoding of `chunk` 75 * @param {Function} cb Callback 76 * @private 77 */ 78 _write(chunk, encoding, cb) { 79 if (this._opcode === 0x08 && this._state == GET_INFO) return cb(); 80 81 this._bufferedBytes += chunk.length; 82 this._buffers.push(chunk); 83 this.startLoop(cb); 84 } 85 86 /** 87 * Consumes `n` bytes from the buffered data. 88 * 89 * @param {Number} n The number of bytes to consume 90 * @return {Buffer} The consumed bytes 91 * @private 92 */ 93 consume(n) { 94 this._bufferedBytes -= n; 95 96 if (n === this._buffers[0].length) return this._buffers.shift(); 97 98 if (n < this._buffers[0].length) { 99 const buf = this._buffers[0]; 100 this._buffers[0] = buf.slice(n); 101 return buf.slice(0, n); 102 } 103 104 const dst = Buffer.allocUnsafe(n); 105 106 do { 107 const buf = this._buffers[0]; 108 const offset = dst.length - n; 109 110 if (n >= buf.length) { 111 dst.set(this._buffers.shift(), offset); 112 } else { 113 dst.set(new Uint8Array(buf.buffer, buf.byteOffset, n), offset); 114 this._buffers[0] = buf.slice(n); 115 } 116 117 n -= buf.length; 118 } while (n > 0); 119 120 return dst; 121 } 122 123 /** 124 * Starts the parsing loop. 125 * 126 * @param {Function} cb Callback 127 * @private 128 */ 129 startLoop(cb) { 130 let err; 131 this._loop = true; 132 133 do { 134 switch (this._state) { 135 case GET_INFO: 136 err = this.getInfo(); 137 break; 138 case GET_PAYLOAD_LENGTH_16: 139 err = this.getPayloadLength16(); 140 break; 141 case GET_PAYLOAD_LENGTH_64: 142 err = this.getPayloadLength64(); 143 break; 144 case GET_MASK: 145 this.getMask(); 146 break; 147 case GET_DATA: 148 err = this.getData(cb); 149 break; 150 default: 151 // `INFLATING` 152 this._loop = false; 153 return; 154 } 155 } while (this._loop); 156 157 cb(err); 158 } 159 160 /** 161 * Reads the first two bytes of a frame. 162 * 163 * @return {(RangeError|undefined)} A possible error 164 * @private 165 */ 166 getInfo() { 167 if (this._bufferedBytes < 2) { 168 this._loop = false; 169 return; 170 } 171 172 const buf = this.consume(2); 173 174 if ((buf[0] & 0x30) !== 0x00) { 175 this._loop = false; 176 return error( 177 RangeError, 178 'RSV2 and RSV3 must be clear', 179 true, 180 1002, 181 'WS_ERR_UNEXPECTED_RSV_2_3' 182 ); 183 } 184 185 const compressed = (buf[0] & 0x40) === 0x40; 186 187 if (compressed && !this._extensions[PerMessageDeflate.extensionName]) { 188 this._loop = false; 189 return error( 190 RangeError, 191 'RSV1 must be clear', 192 true, 193 1002, 194 'WS_ERR_UNEXPECTED_RSV_1' 195 ); 196 } 197 198 this._fin = (buf[0] & 0x80) === 0x80; 199 this._opcode = buf[0] & 0x0f; 200 this._payloadLength = buf[1] & 0x7f; 201 202 if (this._opcode === 0x00) { 203 if (compressed) { 204 this._loop = false; 205 return error( 206 RangeError, 207 'RSV1 must be clear', 208 true, 209 1002, 210 'WS_ERR_UNEXPECTED_RSV_1' 211 ); 212 } 213 214 if (!this._fragmented) { 215 this._loop = false; 216 return error( 217 RangeError, 218 'invalid opcode 0', 219 true, 220 1002, 221 'WS_ERR_INVALID_OPCODE' 222 ); 223 } 224 225 this._opcode = this._fragmented; 226 } else if (this._opcode === 0x01 || this._opcode === 0x02) { 227 if (this._fragmented) { 228 this._loop = false; 229 return error( 230 RangeError, 231 `invalid opcode ${this._opcode}`, 232 true, 233 1002, 234 'WS_ERR_INVALID_OPCODE' 235 ); 236 } 237 238 this._compressed = compressed; 239 } else if (this._opcode > 0x07 && this._opcode < 0x0b) { 240 if (!this._fin) { 241 this._loop = false; 242 return error( 243 RangeError, 244 'FIN must be set', 245 true, 246 1002, 247 'WS_ERR_EXPECTED_FIN' 248 ); 249 } 250 251 if (compressed) { 252 this._loop = false; 253 return error( 254 RangeError, 255 'RSV1 must be clear', 256 true, 257 1002, 258 'WS_ERR_UNEXPECTED_RSV_1' 259 ); 260 } 261 262 if (this._payloadLength > 0x7d) { 263 this._loop = false; 264 return error( 265 RangeError, 266 `invalid payload length ${this._payloadLength}`, 267 true, 268 1002, 269 'WS_ERR_INVALID_CONTROL_PAYLOAD_LENGTH' 270 ); 271 } 272 } else { 273 this._loop = false; 274 return error( 275 RangeError, 276 `invalid opcode ${this._opcode}`, 277 true, 278 1002, 279 'WS_ERR_INVALID_OPCODE' 280 ); 281 } 282 283 if (!this._fin && !this._fragmented) this._fragmented = this._opcode; 284 this._masked = (buf[1] & 0x80) === 0x80; 285 286 if (this._isServer) { 287 if (!this._masked) { 288 this._loop = false; 289 return error( 290 RangeError, 291 'MASK must be set', 292 true, 293 1002, 294 'WS_ERR_EXPECTED_MASK' 295 ); 296 } 297 } else if (this._masked) { 298 this._loop = false; 299 return error( 300 RangeError, 301 'MASK must be clear', 302 true, 303 1002, 304 'WS_ERR_UNEXPECTED_MASK' 305 ); 306 } 307 308 if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16; 309 else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64; 310 else return this.haveLength(); 311 } 312 313 /** 314 * Gets extended payload length (7+16). 315 * 316 * @return {(RangeError|undefined)} A possible error 317 * @private 318 */ 319 getPayloadLength16() { 320 if (this._bufferedBytes < 2) { 321 this._loop = false; 322 return; 323 } 324 325 this._payloadLength = this.consume(2).readUInt16BE(0); 326 return this.haveLength(); 327 } 328 329 /** 330 * Gets extended payload length (7+64). 331 * 332 * @return {(RangeError|undefined)} A possible error 333 * @private 334 */ 335 getPayloadLength64() { 336 if (this._bufferedBytes < 8) { 337 this._loop = false; 338 return; 339 } 340 341 const buf = this.consume(8); 342 const num = buf.readUInt32BE(0); 343 344 // 345 // The maximum safe integer in JavaScript is 2^53 - 1. An error is returned 346 // if payload length is greater than this number. 347 // 348 if (num > Math.pow(2, 53 - 32) - 1) { 349 this._loop = false; 350 return error( 351 RangeError, 352 'Unsupported WebSocket frame: payload length > 2^53 - 1', 353 false, 354 1009, 355 'WS_ERR_UNSUPPORTED_DATA_PAYLOAD_LENGTH' 356 ); 357 } 358 359 this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4); 360 return this.haveLength(); 361 } 362 363 /** 364 * Payload length has been read. 365 * 366 * @return {(RangeError|undefined)} A possible error 367 * @private 368 */ 369 haveLength() { 370 if (this._payloadLength && this._opcode < 0x08) { 371 this._totalPayloadLength += this._payloadLength; 372 if (this._totalPayloadLength > this._maxPayload && this._maxPayload > 0) { 373 this._loop = false; 374 return error( 375 RangeError, 376 'Max payload size exceeded', 377 false, 378 1009, 379 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH' 380 ); 381 } 382 } 383 384 if (this._masked) this._state = GET_MASK; 385 else this._state = GET_DATA; 386 } 387 388 /** 389 * Reads mask bytes. 390 * 391 * @private 392 */ 393 getMask() { 394 if (this._bufferedBytes < 4) { 395 this._loop = false; 396 return; 397 } 398 399 this._mask = this.consume(4); 400 this._state = GET_DATA; 401 } 402 403 /** 404 * Reads data bytes. 405 * 406 * @param {Function} cb Callback 407 * @return {(Error|RangeError|undefined)} A possible error 408 * @private 409 */ 410 getData(cb) { 411 let data = EMPTY_BUFFER; 412 413 if (this._payloadLength) { 414 if (this._bufferedBytes < this._payloadLength) { 415 this._loop = false; 416 return; 417 } 418 419 data = this.consume(this._payloadLength); 420 421 if ( 422 this._masked && 423 (this._mask[0] | this._mask[1] | this._mask[2] | this._mask[3]) !== 0 424 ) { 425 unmask(data, this._mask); 426 } 427 } 428 429 if (this._opcode > 0x07) return this.controlMessage(data); 430 431 if (this._compressed) { 432 this._state = INFLATING; 433 this.decompress(data, cb); 434 return; 435 } 436 437 if (data.length) { 438 // 439 // This message is not compressed so its length is the sum of the payload 440 // length of all fragments. 441 // 442 this._messageLength = this._totalPayloadLength; 443 this._fragments.push(data); 444 } 445 446 return this.dataMessage(); 447 } 448 449 /** 450 * Decompresses data. 451 * 452 * @param {Buffer} data Compressed data 453 * @param {Function} cb Callback 454 * @private 455 */ 456 decompress(data, cb) { 457 const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName]; 458 459 perMessageDeflate.decompress(data, this._fin, (err, buf) => { 460 if (err) return cb(err); 461 462 if (buf.length) { 463 this._messageLength += buf.length; 464 if (this._messageLength > this._maxPayload && this._maxPayload > 0) { 465 return cb( 466 error( 467 RangeError, 468 'Max payload size exceeded', 469 false, 470 1009, 471 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH' 472 ) 473 ); 474 } 475 476 this._fragments.push(buf); 477 } 478 479 const er = this.dataMessage(); 480 if (er) return cb(er); 481 482 this.startLoop(cb); 483 }); 484 } 485 486 /** 487 * Handles a data message. 488 * 489 * @return {(Error|undefined)} A possible error 490 * @private 491 */ 492 dataMessage() { 493 if (this._fin) { 494 const messageLength = this._messageLength; 495 const fragments = this._fragments; 496 497 this._totalPayloadLength = 0; 498 this._messageLength = 0; 499 this._fragmented = 0; 500 this._fragments = []; 501 502 if (this._opcode === 2) { 503 let data; 504 505 if (this._binaryType === 'nodebuffer') { 506 data = concat(fragments, messageLength); 507 } else if (this._binaryType === 'arraybuffer') { 508 data = toArrayBuffer(concat(fragments, messageLength)); 509 } else { 510 data = fragments; 511 } 512 513 this.emit('message', data, true); 514 } else { 515 const buf = concat(fragments, messageLength); 516 517 if (!this._skipUTF8Validation && !isValidUTF8(buf)) { 518 this._loop = false; 519 return error( 520 Error, 521 'invalid UTF-8 sequence', 522 true, 523 1007, 524 'WS_ERR_INVALID_UTF8' 525 ); 526 } 527 528 this.emit('message', buf, false); 529 } 530 } 531 532 this._state = GET_INFO; 533 } 534 535 /** 536 * Handles a control message. 537 * 538 * @param {Buffer} data Data to handle 539 * @return {(Error|RangeError|undefined)} A possible error 540 * @private 541 */ 542 controlMessage(data) { 543 if (this._opcode === 0x08) { 544 this._loop = false; 545 546 if (data.length === 0) { 547 this.emit('conclude', 1005, EMPTY_BUFFER); 548 this.end(); 549 } else if (data.length === 1) { 550 return error( 551 RangeError, 552 'invalid payload length 1', 553 true, 554 1002, 555 'WS_ERR_INVALID_CONTROL_PAYLOAD_LENGTH' 556 ); 557 } else { 558 const code = data.readUInt16BE(0); 559 560 if (!isValidStatusCode(code)) { 561 return error( 562 RangeError, 563 `invalid status code ${code}`, 564 true, 565 1002, 566 'WS_ERR_INVALID_CLOSE_CODE' 567 ); 568 } 569 570 const buf = data.slice(2); 571 572 if (!this._skipUTF8Validation && !isValidUTF8(buf)) { 573 return error( 574 Error, 575 'invalid UTF-8 sequence', 576 true, 577 1007, 578 'WS_ERR_INVALID_UTF8' 579 ); 580 } 581 582 this.emit('conclude', code, buf); 583 this.end(); 584 } 585 } else if (this._opcode === 0x09) { 586 this.emit('ping', data); 587 } else { 588 this.emit('pong', data); 589 } 590 591 this._state = GET_INFO; 592 } 593 } 594 595 module.exports = Receiver; 596 597 /** 598 * Builds an error object. 599 * 600 * @param {function(new:Error|RangeError)} ErrorCtor The error constructor 601 * @param {String} message The error message 602 * @param {Boolean} prefix Specifies whether or not to add a default prefix to 603 * `message` 604 * @param {Number} statusCode The status code 605 * @param {String} errorCode The exposed error code 606 * @return {(Error|RangeError)} The error 607 * @private 608 */ 609 function error(ErrorCtor, message, prefix, statusCode, errorCode) { 610 const err = new ErrorCtor( 611 prefix ? `Invalid WebSocket frame: ${message}` : message 612 ); 613 614 Error.captureStackTrace(err, error); 615 err.code = errorCode; 616 err[kStatusCode] = statusCode; 617 return err; 618 }