websocket.js (33914B)
1 /* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^Readable$" }] */ 2 3 'use strict'; 4 5 const EventEmitter = require('events'); 6 const https = require('https'); 7 const http = require('http'); 8 const net = require('net'); 9 const tls = require('tls'); 10 const { randomBytes, createHash } = require('crypto'); 11 const { Readable } = require('stream'); 12 const { URL } = require('url'); 13 14 const PerMessageDeflate = require('./permessage-deflate'); 15 const Receiver = require('./receiver'); 16 const Sender = require('./sender'); 17 const { 18 BINARY_TYPES, 19 EMPTY_BUFFER, 20 GUID, 21 kForOnEventAttribute, 22 kListener, 23 kStatusCode, 24 kWebSocket, 25 NOOP 26 } = require('./constants'); 27 const { 28 EventTarget: { addEventListener, removeEventListener } 29 } = require('./event-target'); 30 const { format, parse } = require('./extension'); 31 const { toBuffer } = require('./buffer-util'); 32 33 const closeTimeout = 30 * 1000; 34 const kAborted = Symbol('kAborted'); 35 const protocolVersions = [8, 13]; 36 const readyStates = ['CONNECTING', 'OPEN', 'CLOSING', 'CLOSED']; 37 const subprotocolRegex = /^[!#$%&'*+\-.0-9A-Z^_`|a-z~]+$/; 38 39 /** 40 * Class representing a WebSocket. 41 * 42 * @extends EventEmitter 43 */ 44 class WebSocket extends EventEmitter { 45 /** 46 * Create a new `WebSocket`. 47 * 48 * @param {(String|URL)} address The URL to which to connect 49 * @param {(String|String[])} [protocols] The subprotocols 50 * @param {Object} [options] Connection options 51 */ 52 constructor(address, protocols, options) { 53 super(); 54 55 this._binaryType = BINARY_TYPES[0]; 56 this._closeCode = 1006; 57 this._closeFrameReceived = false; 58 this._closeFrameSent = false; 59 this._closeMessage = EMPTY_BUFFER; 60 this._closeTimer = null; 61 this._extensions = {}; 62 this._paused = false; 63 this._protocol = ''; 64 this._readyState = WebSocket.CONNECTING; 65 this._receiver = null; 66 this._sender = null; 67 this._socket = null; 68 69 if (address !== null) { 70 this._bufferedAmount = 0; 71 this._isServer = false; 72 this._redirects = 0; 73 74 if (protocols === undefined) { 75 protocols = []; 76 } else if (!Array.isArray(protocols)) { 77 if (typeof protocols === 'object' && protocols !== null) { 78 options = protocols; 79 protocols = []; 80 } else { 81 protocols = [protocols]; 82 } 83 } 84 85 initAsClient(this, address, protocols, options); 86 } else { 87 this._isServer = true; 88 } 89 } 90 91 /** 92 * This deviates from the WHATWG interface since ws doesn't support the 93 * required default "blob" type (instead we define a custom "nodebuffer" 94 * type). 95 * 96 * @type {String} 97 */ 98 get binaryType() { 99 return this._binaryType; 100 } 101 102 set binaryType(type) { 103 if (!BINARY_TYPES.includes(type)) return; 104 105 this._binaryType = type; 106 107 // 108 // Allow to change `binaryType` on the fly. 109 // 110 if (this._receiver) this._receiver._binaryType = type; 111 } 112 113 /** 114 * @type {Number} 115 */ 116 get bufferedAmount() { 117 if (!this._socket) return this._bufferedAmount; 118 119 return this._socket._writableState.length + this._sender._bufferedBytes; 120 } 121 122 /** 123 * @type {String} 124 */ 125 get extensions() { 126 return Object.keys(this._extensions).join(); 127 } 128 129 /** 130 * @type {Boolean} 131 */ 132 get isPaused() { 133 return this._paused; 134 } 135 136 /** 137 * @type {Function} 138 */ 139 /* istanbul ignore next */ 140 get onclose() { 141 return null; 142 } 143 144 /** 145 * @type {Function} 146 */ 147 /* istanbul ignore next */ 148 get onerror() { 149 return null; 150 } 151 152 /** 153 * @type {Function} 154 */ 155 /* istanbul ignore next */ 156 get onopen() { 157 return null; 158 } 159 160 /** 161 * @type {Function} 162 */ 163 /* istanbul ignore next */ 164 get onmessage() { 165 return null; 166 } 167 168 /** 169 * @type {String} 170 */ 171 get protocol() { 172 return this._protocol; 173 } 174 175 /** 176 * @type {Number} 177 */ 178 get readyState() { 179 return this._readyState; 180 } 181 182 /** 183 * @type {String} 184 */ 185 get url() { 186 return this._url; 187 } 188 189 /** 190 * Set up the socket and the internal resources. 191 * 192 * @param {(net.Socket|tls.Socket)} socket The network socket between the 193 * server and client 194 * @param {Buffer} head The first packet of the upgraded stream 195 * @param {Object} options Options object 196 * @param {Function} [options.generateMask] The function used to generate the 197 * masking key 198 * @param {Number} [options.maxPayload=0] The maximum allowed message size 199 * @param {Boolean} [options.skipUTF8Validation=false] Specifies whether or 200 * not to skip UTF-8 validation for text and close messages 201 * @private 202 */ 203 setSocket(socket, head, options) { 204 const receiver = new Receiver({ 205 binaryType: this.binaryType, 206 extensions: this._extensions, 207 isServer: this._isServer, 208 maxPayload: options.maxPayload, 209 skipUTF8Validation: options.skipUTF8Validation 210 }); 211 212 this._sender = new Sender(socket, this._extensions, options.generateMask); 213 this._receiver = receiver; 214 this._socket = socket; 215 216 receiver[kWebSocket] = this; 217 socket[kWebSocket] = this; 218 219 receiver.on('conclude', receiverOnConclude); 220 receiver.on('drain', receiverOnDrain); 221 receiver.on('error', receiverOnError); 222 receiver.on('message', receiverOnMessage); 223 receiver.on('ping', receiverOnPing); 224 receiver.on('pong', receiverOnPong); 225 226 socket.setTimeout(0); 227 socket.setNoDelay(); 228 229 if (head.length > 0) socket.unshift(head); 230 231 socket.on('close', socketOnClose); 232 socket.on('data', socketOnData); 233 socket.on('end', socketOnEnd); 234 socket.on('error', socketOnError); 235 236 this._readyState = WebSocket.OPEN; 237 this.emit('open'); 238 } 239 240 /** 241 * Emit the `'close'` event. 242 * 243 * @private 244 */ 245 emitClose() { 246 if (!this._socket) { 247 this._readyState = WebSocket.CLOSED; 248 this.emit('close', this._closeCode, this._closeMessage); 249 return; 250 } 251 252 if (this._extensions[PerMessageDeflate.extensionName]) { 253 this._extensions[PerMessageDeflate.extensionName].cleanup(); 254 } 255 256 this._receiver.removeAllListeners(); 257 this._readyState = WebSocket.CLOSED; 258 this.emit('close', this._closeCode, this._closeMessage); 259 } 260 261 /** 262 * Start a closing handshake. 263 * 264 * +----------+ +-----------+ +----------+ 265 * - - -|ws.close()|-->|close frame|-->|ws.close()|- - - 266 * | +----------+ +-----------+ +----------+ | 267 * +----------+ +-----------+ | 268 * CLOSING |ws.close()|<--|close frame|<--+-----+ CLOSING 269 * +----------+ +-----------+ | 270 * | | | +---+ | 271 * +------------------------+-->|fin| - - - - 272 * | +---+ | +---+ 273 * - - - - -|fin|<---------------------+ 274 * +---+ 275 * 276 * @param {Number} [code] Status code explaining why the connection is closing 277 * @param {(String|Buffer)} [data] The reason why the connection is 278 * closing 279 * @public 280 */ 281 close(code, data) { 282 if (this.readyState === WebSocket.CLOSED) return; 283 if (this.readyState === WebSocket.CONNECTING) { 284 const msg = 'WebSocket was closed before the connection was established'; 285 return abortHandshake(this, this._req, msg); 286 } 287 288 if (this.readyState === WebSocket.CLOSING) { 289 if ( 290 this._closeFrameSent && 291 (this._closeFrameReceived || this._receiver._writableState.errorEmitted) 292 ) { 293 this._socket.end(); 294 } 295 296 return; 297 } 298 299 this._readyState = WebSocket.CLOSING; 300 this._sender.close(code, data, !this._isServer, (err) => { 301 // 302 // This error is handled by the `'error'` listener on the socket. We only 303 // want to know if the close frame has been sent here. 304 // 305 if (err) return; 306 307 this._closeFrameSent = true; 308 309 if ( 310 this._closeFrameReceived || 311 this._receiver._writableState.errorEmitted 312 ) { 313 this._socket.end(); 314 } 315 }); 316 317 // 318 // Specify a timeout for the closing handshake to complete. 319 // 320 this._closeTimer = setTimeout( 321 this._socket.destroy.bind(this._socket), 322 closeTimeout 323 ); 324 } 325 326 /** 327 * Pause the socket. 328 * 329 * @public 330 */ 331 pause() { 332 if ( 333 this.readyState === WebSocket.CONNECTING || 334 this.readyState === WebSocket.CLOSED 335 ) { 336 return; 337 } 338 339 this._paused = true; 340 this._socket.pause(); 341 } 342 343 /** 344 * Send a ping. 345 * 346 * @param {*} [data] The data to send 347 * @param {Boolean} [mask] Indicates whether or not to mask `data` 348 * @param {Function} [cb] Callback which is executed when the ping is sent 349 * @public 350 */ 351 ping(data, mask, cb) { 352 if (this.readyState === WebSocket.CONNECTING) { 353 throw new Error('WebSocket is not open: readyState 0 (CONNECTING)'); 354 } 355 356 if (typeof data === 'function') { 357 cb = data; 358 data = mask = undefined; 359 } else if (typeof mask === 'function') { 360 cb = mask; 361 mask = undefined; 362 } 363 364 if (typeof data === 'number') data = data.toString(); 365 366 if (this.readyState !== WebSocket.OPEN) { 367 sendAfterClose(this, data, cb); 368 return; 369 } 370 371 if (mask === undefined) mask = !this._isServer; 372 this._sender.ping(data || EMPTY_BUFFER, mask, cb); 373 } 374 375 /** 376 * Send a pong. 377 * 378 * @param {*} [data] The data to send 379 * @param {Boolean} [mask] Indicates whether or not to mask `data` 380 * @param {Function} [cb] Callback which is executed when the pong is sent 381 * @public 382 */ 383 pong(data, mask, cb) { 384 if (this.readyState === WebSocket.CONNECTING) { 385 throw new Error('WebSocket is not open: readyState 0 (CONNECTING)'); 386 } 387 388 if (typeof data === 'function') { 389 cb = data; 390 data = mask = undefined; 391 } else if (typeof mask === 'function') { 392 cb = mask; 393 mask = undefined; 394 } 395 396 if (typeof data === 'number') data = data.toString(); 397 398 if (this.readyState !== WebSocket.OPEN) { 399 sendAfterClose(this, data, cb); 400 return; 401 } 402 403 if (mask === undefined) mask = !this._isServer; 404 this._sender.pong(data || EMPTY_BUFFER, mask, cb); 405 } 406 407 /** 408 * Resume the socket. 409 * 410 * @public 411 */ 412 resume() { 413 if ( 414 this.readyState === WebSocket.CONNECTING || 415 this.readyState === WebSocket.CLOSED 416 ) { 417 return; 418 } 419 420 this._paused = false; 421 if (!this._receiver._writableState.needDrain) this._socket.resume(); 422 } 423 424 /** 425 * Send a data message. 426 * 427 * @param {*} data The message to send 428 * @param {Object} [options] Options object 429 * @param {Boolean} [options.binary] Specifies whether `data` is binary or 430 * text 431 * @param {Boolean} [options.compress] Specifies whether or not to compress 432 * `data` 433 * @param {Boolean} [options.fin=true] Specifies whether the fragment is the 434 * last one 435 * @param {Boolean} [options.mask] Specifies whether or not to mask `data` 436 * @param {Function} [cb] Callback which is executed when data is written out 437 * @public 438 */ 439 send(data, options, cb) { 440 if (this.readyState === WebSocket.CONNECTING) { 441 throw new Error('WebSocket is not open: readyState 0 (CONNECTING)'); 442 } 443 444 if (typeof options === 'function') { 445 cb = options; 446 options = {}; 447 } 448 449 if (typeof data === 'number') data = data.toString(); 450 451 if (this.readyState !== WebSocket.OPEN) { 452 sendAfterClose(this, data, cb); 453 return; 454 } 455 456 const opts = { 457 binary: typeof data !== 'string', 458 mask: !this._isServer, 459 compress: true, 460 fin: true, 461 ...options 462 }; 463 464 if (!this._extensions[PerMessageDeflate.extensionName]) { 465 opts.compress = false; 466 } 467 468 this._sender.send(data || EMPTY_BUFFER, opts, cb); 469 } 470 471 /** 472 * Forcibly close the connection. 473 * 474 * @public 475 */ 476 terminate() { 477 if (this.readyState === WebSocket.CLOSED) return; 478 if (this.readyState === WebSocket.CONNECTING) { 479 const msg = 'WebSocket was closed before the connection was established'; 480 return abortHandshake(this, this._req, msg); 481 } 482 483 if (this._socket) { 484 this._readyState = WebSocket.CLOSING; 485 this._socket.destroy(); 486 } 487 } 488 } 489 490 /** 491 * @constant {Number} CONNECTING 492 * @memberof WebSocket 493 */ 494 Object.defineProperty(WebSocket, 'CONNECTING', { 495 enumerable: true, 496 value: readyStates.indexOf('CONNECTING') 497 }); 498 499 /** 500 * @constant {Number} CONNECTING 501 * @memberof WebSocket.prototype 502 */ 503 Object.defineProperty(WebSocket.prototype, 'CONNECTING', { 504 enumerable: true, 505 value: readyStates.indexOf('CONNECTING') 506 }); 507 508 /** 509 * @constant {Number} OPEN 510 * @memberof WebSocket 511 */ 512 Object.defineProperty(WebSocket, 'OPEN', { 513 enumerable: true, 514 value: readyStates.indexOf('OPEN') 515 }); 516 517 /** 518 * @constant {Number} OPEN 519 * @memberof WebSocket.prototype 520 */ 521 Object.defineProperty(WebSocket.prototype, 'OPEN', { 522 enumerable: true, 523 value: readyStates.indexOf('OPEN') 524 }); 525 526 /** 527 * @constant {Number} CLOSING 528 * @memberof WebSocket 529 */ 530 Object.defineProperty(WebSocket, 'CLOSING', { 531 enumerable: true, 532 value: readyStates.indexOf('CLOSING') 533 }); 534 535 /** 536 * @constant {Number} CLOSING 537 * @memberof WebSocket.prototype 538 */ 539 Object.defineProperty(WebSocket.prototype, 'CLOSING', { 540 enumerable: true, 541 value: readyStates.indexOf('CLOSING') 542 }); 543 544 /** 545 * @constant {Number} CLOSED 546 * @memberof WebSocket 547 */ 548 Object.defineProperty(WebSocket, 'CLOSED', { 549 enumerable: true, 550 value: readyStates.indexOf('CLOSED') 551 }); 552 553 /** 554 * @constant {Number} CLOSED 555 * @memberof WebSocket.prototype 556 */ 557 Object.defineProperty(WebSocket.prototype, 'CLOSED', { 558 enumerable: true, 559 value: readyStates.indexOf('CLOSED') 560 }); 561 562 [ 563 'binaryType', 564 'bufferedAmount', 565 'extensions', 566 'isPaused', 567 'protocol', 568 'readyState', 569 'url' 570 ].forEach((property) => { 571 Object.defineProperty(WebSocket.prototype, property, { enumerable: true }); 572 }); 573 574 // 575 // Add the `onopen`, `onerror`, `onclose`, and `onmessage` attributes. 576 // See https://html.spec.whatwg.org/multipage/comms.html#the-websocket-interface 577 // 578 ['open', 'error', 'close', 'message'].forEach((method) => { 579 Object.defineProperty(WebSocket.prototype, `on${method}`, { 580 enumerable: true, 581 get() { 582 for (const listener of this.listeners(method)) { 583 if (listener[kForOnEventAttribute]) return listener[kListener]; 584 } 585 586 return null; 587 }, 588 set(handler) { 589 for (const listener of this.listeners(method)) { 590 if (listener[kForOnEventAttribute]) { 591 this.removeListener(method, listener); 592 break; 593 } 594 } 595 596 if (typeof handler !== 'function') return; 597 598 this.addEventListener(method, handler, { 599 [kForOnEventAttribute]: true 600 }); 601 } 602 }); 603 }); 604 605 WebSocket.prototype.addEventListener = addEventListener; 606 WebSocket.prototype.removeEventListener = removeEventListener; 607 608 module.exports = WebSocket; 609 610 /** 611 * Initialize a WebSocket client. 612 * 613 * @param {WebSocket} websocket The client to initialize 614 * @param {(String|URL)} address The URL to which to connect 615 * @param {Array} protocols The subprotocols 616 * @param {Object} [options] Connection options 617 * @param {Boolean} [options.followRedirects=false] Whether or not to follow 618 * redirects 619 * @param {Function} [options.generateMask] The function used to generate the 620 * masking key 621 * @param {Number} [options.handshakeTimeout] Timeout in milliseconds for the 622 * handshake request 623 * @param {Number} [options.maxPayload=104857600] The maximum allowed message 624 * size 625 * @param {Number} [options.maxRedirects=10] The maximum number of redirects 626 * allowed 627 * @param {String} [options.origin] Value of the `Origin` or 628 * `Sec-WebSocket-Origin` header 629 * @param {(Boolean|Object)} [options.perMessageDeflate=true] Enable/disable 630 * permessage-deflate 631 * @param {Number} [options.protocolVersion=13] Value of the 632 * `Sec-WebSocket-Version` header 633 * @param {Boolean} [options.skipUTF8Validation=false] Specifies whether or 634 * not to skip UTF-8 validation for text and close messages 635 * @private 636 */ 637 function initAsClient(websocket, address, protocols, options) { 638 const opts = { 639 protocolVersion: protocolVersions[1], 640 maxPayload: 100 * 1024 * 1024, 641 skipUTF8Validation: false, 642 perMessageDeflate: true, 643 followRedirects: false, 644 maxRedirects: 10, 645 ...options, 646 createConnection: undefined, 647 socketPath: undefined, 648 hostname: undefined, 649 protocol: undefined, 650 timeout: undefined, 651 method: 'GET', 652 host: undefined, 653 path: undefined, 654 port: undefined 655 }; 656 657 if (!protocolVersions.includes(opts.protocolVersion)) { 658 throw new RangeError( 659 `Unsupported protocol version: ${opts.protocolVersion} ` + 660 `(supported versions: ${protocolVersions.join(', ')})` 661 ); 662 } 663 664 let parsedUrl; 665 666 if (address instanceof URL) { 667 parsedUrl = address; 668 websocket._url = address.href; 669 } else { 670 try { 671 parsedUrl = new URL(address); 672 } catch (e) { 673 throw new SyntaxError(`Invalid URL: ${address}`); 674 } 675 676 websocket._url = address; 677 } 678 679 const isSecure = parsedUrl.protocol === 'wss:'; 680 const isUnixSocket = parsedUrl.protocol === 'ws+unix:'; 681 let invalidURLMessage; 682 683 if (parsedUrl.protocol !== 'ws:' && !isSecure && !isUnixSocket) { 684 invalidURLMessage = 685 'The URL\'s protocol must be one of "ws:", "wss:", or "ws+unix:"'; 686 } else if (isUnixSocket && !parsedUrl.pathname) { 687 invalidURLMessage = "The URL's pathname is empty"; 688 } else if (parsedUrl.hash) { 689 invalidURLMessage = 'The URL contains a fragment identifier'; 690 } 691 692 if (invalidURLMessage) { 693 const err = new SyntaxError(invalidURLMessage); 694 695 if (websocket._redirects === 0) { 696 throw err; 697 } else { 698 emitErrorAndClose(websocket, err); 699 return; 700 } 701 } 702 703 const defaultPort = isSecure ? 443 : 80; 704 const key = randomBytes(16).toString('base64'); 705 const request = isSecure ? https.request : http.request; 706 const protocolSet = new Set(); 707 let perMessageDeflate; 708 709 opts.createConnection = isSecure ? tlsConnect : netConnect; 710 opts.defaultPort = opts.defaultPort || defaultPort; 711 opts.port = parsedUrl.port || defaultPort; 712 opts.host = parsedUrl.hostname.startsWith('[') 713 ? parsedUrl.hostname.slice(1, -1) 714 : parsedUrl.hostname; 715 opts.headers = { 716 ...opts.headers, 717 'Sec-WebSocket-Version': opts.protocolVersion, 718 'Sec-WebSocket-Key': key, 719 Connection: 'Upgrade', 720 Upgrade: 'websocket' 721 }; 722 opts.path = parsedUrl.pathname + parsedUrl.search; 723 opts.timeout = opts.handshakeTimeout; 724 725 if (opts.perMessageDeflate) { 726 perMessageDeflate = new PerMessageDeflate( 727 opts.perMessageDeflate !== true ? opts.perMessageDeflate : {}, 728 false, 729 opts.maxPayload 730 ); 731 opts.headers['Sec-WebSocket-Extensions'] = format({ 732 [PerMessageDeflate.extensionName]: perMessageDeflate.offer() 733 }); 734 } 735 if (protocols.length) { 736 for (const protocol of protocols) { 737 if ( 738 typeof protocol !== 'string' || 739 !subprotocolRegex.test(protocol) || 740 protocolSet.has(protocol) 741 ) { 742 throw new SyntaxError( 743 'An invalid or duplicated subprotocol was specified' 744 ); 745 } 746 747 protocolSet.add(protocol); 748 } 749 750 opts.headers['Sec-WebSocket-Protocol'] = protocols.join(','); 751 } 752 if (opts.origin) { 753 if (opts.protocolVersion < 13) { 754 opts.headers['Sec-WebSocket-Origin'] = opts.origin; 755 } else { 756 opts.headers.Origin = opts.origin; 757 } 758 } 759 if (parsedUrl.username || parsedUrl.password) { 760 opts.auth = `${parsedUrl.username}:${parsedUrl.password}`; 761 } 762 763 if (isUnixSocket) { 764 const parts = opts.path.split(':'); 765 766 opts.socketPath = parts[0]; 767 opts.path = parts[1]; 768 } 769 770 let req; 771 772 if (opts.followRedirects) { 773 if (websocket._redirects === 0) { 774 websocket._originalUnixSocket = isUnixSocket; 775 websocket._originalSecure = isSecure; 776 websocket._originalHostOrSocketPath = isUnixSocket 777 ? opts.socketPath 778 : parsedUrl.host; 779 780 const headers = options && options.headers; 781 782 // 783 // Shallow copy the user provided options so that headers can be changed 784 // without mutating the original object. 785 // 786 options = { ...options, headers: {} }; 787 788 if (headers) { 789 for (const [key, value] of Object.entries(headers)) { 790 options.headers[key.toLowerCase()] = value; 791 } 792 } 793 } else if (websocket.listenerCount('redirect') === 0) { 794 const isSameHost = isUnixSocket 795 ? websocket._originalUnixSocket 796 ? opts.socketPath === websocket._originalHostOrSocketPath 797 : false 798 : websocket._originalUnixSocket 799 ? false 800 : parsedUrl.host === websocket._originalHostOrSocketPath; 801 802 if (!isSameHost || (websocket._originalSecure && !isSecure)) { 803 // 804 // Match curl 7.77.0 behavior and drop the following headers. These 805 // headers are also dropped when following a redirect to a subdomain. 806 // 807 delete opts.headers.authorization; 808 delete opts.headers.cookie; 809 810 if (!isSameHost) delete opts.headers.host; 811 812 opts.auth = undefined; 813 } 814 } 815 816 // 817 // Match curl 7.77.0 behavior and make the first `Authorization` header win. 818 // If the `Authorization` header is set, then there is nothing to do as it 819 // will take precedence. 820 // 821 if (opts.auth && !options.headers.authorization) { 822 options.headers.authorization = 823 'Basic ' + Buffer.from(opts.auth).toString('base64'); 824 } 825 826 req = websocket._req = request(opts); 827 828 if (websocket._redirects) { 829 // 830 // Unlike what is done for the `'upgrade'` event, no early exit is 831 // triggered here if the user calls `websocket.close()` or 832 // `websocket.terminate()` from a listener of the `'redirect'` event. This 833 // is because the user can also call `request.destroy()` with an error 834 // before calling `websocket.close()` or `websocket.terminate()` and this 835 // would result in an error being emitted on the `request` object with no 836 // `'error'` event listeners attached. 837 // 838 websocket.emit('redirect', websocket.url, req); 839 } 840 } else { 841 req = websocket._req = request(opts); 842 } 843 844 if (opts.timeout) { 845 req.on('timeout', () => { 846 abortHandshake(websocket, req, 'Opening handshake has timed out'); 847 }); 848 } 849 850 req.on('error', (err) => { 851 if (req === null || req[kAborted]) return; 852 853 req = websocket._req = null; 854 emitErrorAndClose(websocket, err); 855 }); 856 857 req.on('response', (res) => { 858 const location = res.headers.location; 859 const statusCode = res.statusCode; 860 861 if ( 862 location && 863 opts.followRedirects && 864 statusCode >= 300 && 865 statusCode < 400 866 ) { 867 if (++websocket._redirects > opts.maxRedirects) { 868 abortHandshake(websocket, req, 'Maximum redirects exceeded'); 869 return; 870 } 871 872 req.abort(); 873 874 let addr; 875 876 try { 877 addr = new URL(location, address); 878 } catch (e) { 879 const err = new SyntaxError(`Invalid URL: ${location}`); 880 emitErrorAndClose(websocket, err); 881 return; 882 } 883 884 initAsClient(websocket, addr, protocols, options); 885 } else if (!websocket.emit('unexpected-response', req, res)) { 886 abortHandshake( 887 websocket, 888 req, 889 `Unexpected server response: ${res.statusCode}` 890 ); 891 } 892 }); 893 894 req.on('upgrade', (res, socket, head) => { 895 websocket.emit('upgrade', res); 896 897 // 898 // The user may have closed the connection from a listener of the 899 // `'upgrade'` event. 900 // 901 if (websocket.readyState !== WebSocket.CONNECTING) return; 902 903 req = websocket._req = null; 904 905 if (res.headers.upgrade.toLowerCase() !== 'websocket') { 906 abortHandshake(websocket, socket, 'Invalid Upgrade header'); 907 return; 908 } 909 910 const digest = createHash('sha1') 911 .update(key + GUID) 912 .digest('base64'); 913 914 if (res.headers['sec-websocket-accept'] !== digest) { 915 abortHandshake(websocket, socket, 'Invalid Sec-WebSocket-Accept header'); 916 return; 917 } 918 919 const serverProt = res.headers['sec-websocket-protocol']; 920 let protError; 921 922 if (serverProt !== undefined) { 923 if (!protocolSet.size) { 924 protError = 'Server sent a subprotocol but none was requested'; 925 } else if (!protocolSet.has(serverProt)) { 926 protError = 'Server sent an invalid subprotocol'; 927 } 928 } else if (protocolSet.size) { 929 protError = 'Server sent no subprotocol'; 930 } 931 932 if (protError) { 933 abortHandshake(websocket, socket, protError); 934 return; 935 } 936 937 if (serverProt) websocket._protocol = serverProt; 938 939 const secWebSocketExtensions = res.headers['sec-websocket-extensions']; 940 941 if (secWebSocketExtensions !== undefined) { 942 if (!perMessageDeflate) { 943 const message = 944 'Server sent a Sec-WebSocket-Extensions header but no extension ' + 945 'was requested'; 946 abortHandshake(websocket, socket, message); 947 return; 948 } 949 950 let extensions; 951 952 try { 953 extensions = parse(secWebSocketExtensions); 954 } catch (err) { 955 const message = 'Invalid Sec-WebSocket-Extensions header'; 956 abortHandshake(websocket, socket, message); 957 return; 958 } 959 960 const extensionNames = Object.keys(extensions); 961 962 if ( 963 extensionNames.length !== 1 || 964 extensionNames[0] !== PerMessageDeflate.extensionName 965 ) { 966 const message = 'Server indicated an extension that was not requested'; 967 abortHandshake(websocket, socket, message); 968 return; 969 } 970 971 try { 972 perMessageDeflate.accept(extensions[PerMessageDeflate.extensionName]); 973 } catch (err) { 974 const message = 'Invalid Sec-WebSocket-Extensions header'; 975 abortHandshake(websocket, socket, message); 976 return; 977 } 978 979 websocket._extensions[PerMessageDeflate.extensionName] = 980 perMessageDeflate; 981 } 982 983 websocket.setSocket(socket, head, { 984 generateMask: opts.generateMask, 985 maxPayload: opts.maxPayload, 986 skipUTF8Validation: opts.skipUTF8Validation 987 }); 988 }); 989 990 req.end(); 991 } 992 993 /** 994 * Emit the `'error'` and `'close'` events. 995 * 996 * @param {WebSocket} websocket The WebSocket instance 997 * @param {Error} The error to emit 998 * @private 999 */ 1000 function emitErrorAndClose(websocket, err) { 1001 websocket._readyState = WebSocket.CLOSING; 1002 websocket.emit('error', err); 1003 websocket.emitClose(); 1004 } 1005 1006 /** 1007 * Create a `net.Socket` and initiate a connection. 1008 * 1009 * @param {Object} options Connection options 1010 * @return {net.Socket} The newly created socket used to start the connection 1011 * @private 1012 */ 1013 function netConnect(options) { 1014 options.path = options.socketPath; 1015 return net.connect(options); 1016 } 1017 1018 /** 1019 * Create a `tls.TLSSocket` and initiate a connection. 1020 * 1021 * @param {Object} options Connection options 1022 * @return {tls.TLSSocket} The newly created socket used to start the connection 1023 * @private 1024 */ 1025 function tlsConnect(options) { 1026 options.path = undefined; 1027 1028 if (!options.servername && options.servername !== '') { 1029 options.servername = net.isIP(options.host) ? '' : options.host; 1030 } 1031 1032 return tls.connect(options); 1033 } 1034 1035 /** 1036 * Abort the handshake and emit an error. 1037 * 1038 * @param {WebSocket} websocket The WebSocket instance 1039 * @param {(http.ClientRequest|net.Socket|tls.Socket)} stream The request to 1040 * abort or the socket to destroy 1041 * @param {String} message The error message 1042 * @private 1043 */ 1044 function abortHandshake(websocket, stream, message) { 1045 websocket._readyState = WebSocket.CLOSING; 1046 1047 const err = new Error(message); 1048 Error.captureStackTrace(err, abortHandshake); 1049 1050 if (stream.setHeader) { 1051 stream[kAborted] = true; 1052 stream.abort(); 1053 1054 if (stream.socket && !stream.socket.destroyed) { 1055 // 1056 // On Node.js >= 14.3.0 `request.abort()` does not destroy the socket if 1057 // called after the request completed. See 1058 // https://github.com/websockets/ws/issues/1869. 1059 // 1060 stream.socket.destroy(); 1061 } 1062 1063 process.nextTick(emitErrorAndClose, websocket, err); 1064 } else { 1065 stream.destroy(err); 1066 stream.once('error', websocket.emit.bind(websocket, 'error')); 1067 stream.once('close', websocket.emitClose.bind(websocket)); 1068 } 1069 } 1070 1071 /** 1072 * Handle cases where the `ping()`, `pong()`, or `send()` methods are called 1073 * when the `readyState` attribute is `CLOSING` or `CLOSED`. 1074 * 1075 * @param {WebSocket} websocket The WebSocket instance 1076 * @param {*} [data] The data to send 1077 * @param {Function} [cb] Callback 1078 * @private 1079 */ 1080 function sendAfterClose(websocket, data, cb) { 1081 if (data) { 1082 const length = toBuffer(data).length; 1083 1084 // 1085 // The `_bufferedAmount` property is used only when the peer is a client and 1086 // the opening handshake fails. Under these circumstances, in fact, the 1087 // `setSocket()` method is not called, so the `_socket` and `_sender` 1088 // properties are set to `null`. 1089 // 1090 if (websocket._socket) websocket._sender._bufferedBytes += length; 1091 else websocket._bufferedAmount += length; 1092 } 1093 1094 if (cb) { 1095 const err = new Error( 1096 `WebSocket is not open: readyState ${websocket.readyState} ` + 1097 `(${readyStates[websocket.readyState]})` 1098 ); 1099 cb(err); 1100 } 1101 } 1102 1103 /** 1104 * The listener of the `Receiver` `'conclude'` event. 1105 * 1106 * @param {Number} code The status code 1107 * @param {Buffer} reason The reason for closing 1108 * @private 1109 */ 1110 function receiverOnConclude(code, reason) { 1111 const websocket = this[kWebSocket]; 1112 1113 websocket._closeFrameReceived = true; 1114 websocket._closeMessage = reason; 1115 websocket._closeCode = code; 1116 1117 if (websocket._socket[kWebSocket] === undefined) return; 1118 1119 websocket._socket.removeListener('data', socketOnData); 1120 process.nextTick(resume, websocket._socket); 1121 1122 if (code === 1005) websocket.close(); 1123 else websocket.close(code, reason); 1124 } 1125 1126 /** 1127 * The listener of the `Receiver` `'drain'` event. 1128 * 1129 * @private 1130 */ 1131 function receiverOnDrain() { 1132 const websocket = this[kWebSocket]; 1133 1134 if (!websocket.isPaused) websocket._socket.resume(); 1135 } 1136 1137 /** 1138 * The listener of the `Receiver` `'error'` event. 1139 * 1140 * @param {(RangeError|Error)} err The emitted error 1141 * @private 1142 */ 1143 function receiverOnError(err) { 1144 const websocket = this[kWebSocket]; 1145 1146 if (websocket._socket[kWebSocket] !== undefined) { 1147 websocket._socket.removeListener('data', socketOnData); 1148 1149 // 1150 // On Node.js < 14.0.0 the `'error'` event is emitted synchronously. See 1151 // https://github.com/websockets/ws/issues/1940. 1152 // 1153 process.nextTick(resume, websocket._socket); 1154 1155 websocket.close(err[kStatusCode]); 1156 } 1157 1158 websocket.emit('error', err); 1159 } 1160 1161 /** 1162 * The listener of the `Receiver` `'finish'` event. 1163 * 1164 * @private 1165 */ 1166 function receiverOnFinish() { 1167 this[kWebSocket].emitClose(); 1168 } 1169 1170 /** 1171 * The listener of the `Receiver` `'message'` event. 1172 * 1173 * @param {Buffer|ArrayBuffer|Buffer[])} data The message 1174 * @param {Boolean} isBinary Specifies whether the message is binary or not 1175 * @private 1176 */ 1177 function receiverOnMessage(data, isBinary) { 1178 this[kWebSocket].emit('message', data, isBinary); 1179 } 1180 1181 /** 1182 * The listener of the `Receiver` `'ping'` event. 1183 * 1184 * @param {Buffer} data The data included in the ping frame 1185 * @private 1186 */ 1187 function receiverOnPing(data) { 1188 const websocket = this[kWebSocket]; 1189 1190 websocket.pong(data, !websocket._isServer, NOOP); 1191 websocket.emit('ping', data); 1192 } 1193 1194 /** 1195 * The listener of the `Receiver` `'pong'` event. 1196 * 1197 * @param {Buffer} data The data included in the pong frame 1198 * @private 1199 */ 1200 function receiverOnPong(data) { 1201 this[kWebSocket].emit('pong', data); 1202 } 1203 1204 /** 1205 * Resume a readable stream 1206 * 1207 * @param {Readable} stream The readable stream 1208 * @private 1209 */ 1210 function resume(stream) { 1211 stream.resume(); 1212 } 1213 1214 /** 1215 * The listener of the `net.Socket` `'close'` event. 1216 * 1217 * @private 1218 */ 1219 function socketOnClose() { 1220 const websocket = this[kWebSocket]; 1221 1222 this.removeListener('close', socketOnClose); 1223 this.removeListener('data', socketOnData); 1224 this.removeListener('end', socketOnEnd); 1225 1226 websocket._readyState = WebSocket.CLOSING; 1227 1228 let chunk; 1229 1230 // 1231 // The close frame might not have been received or the `'end'` event emitted, 1232 // for example, if the socket was destroyed due to an error. Ensure that the 1233 // `receiver` stream is closed after writing any remaining buffered data to 1234 // it. If the readable side of the socket is in flowing mode then there is no 1235 // buffered data as everything has been already written and `readable.read()` 1236 // will return `null`. If instead, the socket is paused, any possible buffered 1237 // data will be read as a single chunk. 1238 // 1239 if ( 1240 !this._readableState.endEmitted && 1241 !websocket._closeFrameReceived && 1242 !websocket._receiver._writableState.errorEmitted && 1243 (chunk = websocket._socket.read()) !== null 1244 ) { 1245 websocket._receiver.write(chunk); 1246 } 1247 1248 websocket._receiver.end(); 1249 1250 this[kWebSocket] = undefined; 1251 1252 clearTimeout(websocket._closeTimer); 1253 1254 if ( 1255 websocket._receiver._writableState.finished || 1256 websocket._receiver._writableState.errorEmitted 1257 ) { 1258 websocket.emitClose(); 1259 } else { 1260 websocket._receiver.on('error', receiverOnFinish); 1261 websocket._receiver.on('finish', receiverOnFinish); 1262 } 1263 } 1264 1265 /** 1266 * The listener of the `net.Socket` `'data'` event. 1267 * 1268 * @param {Buffer} chunk A chunk of data 1269 * @private 1270 */ 1271 function socketOnData(chunk) { 1272 if (!this[kWebSocket]._receiver.write(chunk)) { 1273 this.pause(); 1274 } 1275 } 1276 1277 /** 1278 * The listener of the `net.Socket` `'end'` event. 1279 * 1280 * @private 1281 */ 1282 function socketOnEnd() { 1283 const websocket = this[kWebSocket]; 1284 1285 websocket._readyState = WebSocket.CLOSING; 1286 websocket._receiver.end(); 1287 this.end(); 1288 } 1289 1290 /** 1291 * The listener of the `net.Socket` `'error'` event. 1292 * 1293 * @private 1294 */ 1295 function socketOnError() { 1296 const websocket = this[kWebSocket]; 1297 1298 this.removeListener('error', socketOnError); 1299 this.on('error', NOOP); 1300 1301 if (websocket) { 1302 websocket._readyState = WebSocket.CLOSING; 1303 this.destroy(); 1304 } 1305 }