websocket-server.js (15828B)
1 /* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^net|tls|https$" }] */ 2 3 'use strict'; 4 5 const EventEmitter = require('events'); 6 const http = require('http'); 7 const https = require('https'); 8 const net = require('net'); 9 const tls = require('tls'); 10 const { createHash } = require('crypto'); 11 12 const extension = require('./extension'); 13 const PerMessageDeflate = require('./permessage-deflate'); 14 const subprotocol = require('./subprotocol'); 15 const WebSocket = require('./websocket'); 16 const { GUID, kWebSocket } = require('./constants'); 17 18 const keyRegex = /^[+/0-9A-Za-z]{22}==$/; 19 20 const RUNNING = 0; 21 const CLOSING = 1; 22 const CLOSED = 2; 23 24 /** 25 * Class representing a WebSocket server. 26 * 27 * @extends EventEmitter 28 */ 29 class WebSocketServer extends EventEmitter { 30 /** 31 * Create a `WebSocketServer` instance. 32 * 33 * @param {Object} options Configuration options 34 * @param {Number} [options.backlog=511] The maximum length of the queue of 35 * pending connections 36 * @param {Boolean} [options.clientTracking=true] Specifies whether or not to 37 * track clients 38 * @param {Function} [options.handleProtocols] A hook to handle protocols 39 * @param {String} [options.host] The hostname where to bind the server 40 * @param {Number} [options.maxPayload=104857600] The maximum allowed message 41 * size 42 * @param {Boolean} [options.noServer=false] Enable no server mode 43 * @param {String} [options.path] Accept only connections matching this path 44 * @param {(Boolean|Object)} [options.perMessageDeflate=false] Enable/disable 45 * permessage-deflate 46 * @param {Number} [options.port] The port where to bind the server 47 * @param {(http.Server|https.Server)} [options.server] A pre-created HTTP/S 48 * server to use 49 * @param {Boolean} [options.skipUTF8Validation=false] Specifies whether or 50 * not to skip UTF-8 validation for text and close messages 51 * @param {Function} [options.verifyClient] A hook to reject connections 52 * @param {Function} [options.WebSocket=WebSocket] Specifies the `WebSocket` 53 * class to use. It must be the `WebSocket` class or class that extends it 54 * @param {Function} [callback] A listener for the `listening` event 55 */ 56 constructor(options, callback) { 57 super(); 58 59 options = { 60 maxPayload: 100 * 1024 * 1024, 61 skipUTF8Validation: false, 62 perMessageDeflate: false, 63 handleProtocols: null, 64 clientTracking: true, 65 verifyClient: null, 66 noServer: false, 67 backlog: null, // use default (511 as implemented in net.js) 68 server: null, 69 host: null, 70 path: null, 71 port: null, 72 WebSocket, 73 ...options 74 }; 75 76 if ( 77 (options.port == null && !options.server && !options.noServer) || 78 (options.port != null && (options.server || options.noServer)) || 79 (options.server && options.noServer) 80 ) { 81 throw new TypeError( 82 'One and only one of the "port", "server", or "noServer" options ' + 83 'must be specified' 84 ); 85 } 86 87 if (options.port != null) { 88 this._server = http.createServer((req, res) => { 89 const body = http.STATUS_CODES[426]; 90 91 res.writeHead(426, { 92 'Content-Length': body.length, 93 'Content-Type': 'text/plain' 94 }); 95 res.end(body); 96 }); 97 this._server.listen( 98 options.port, 99 options.host, 100 options.backlog, 101 callback 102 ); 103 } else if (options.server) { 104 this._server = options.server; 105 } 106 107 if (this._server) { 108 const emitConnection = this.emit.bind(this, 'connection'); 109 110 this._removeListeners = addListeners(this._server, { 111 listening: this.emit.bind(this, 'listening'), 112 error: this.emit.bind(this, 'error'), 113 upgrade: (req, socket, head) => { 114 this.handleUpgrade(req, socket, head, emitConnection); 115 } 116 }); 117 } 118 119 if (options.perMessageDeflate === true) options.perMessageDeflate = {}; 120 if (options.clientTracking) { 121 this.clients = new Set(); 122 this._shouldEmitClose = false; 123 } 124 125 this.options = options; 126 this._state = RUNNING; 127 } 128 129 /** 130 * Returns the bound address, the address family name, and port of the server 131 * as reported by the operating system if listening on an IP socket. 132 * If the server is listening on a pipe or UNIX domain socket, the name is 133 * returned as a string. 134 * 135 * @return {(Object|String|null)} The address of the server 136 * @public 137 */ 138 address() { 139 if (this.options.noServer) { 140 throw new Error('The server is operating in "noServer" mode'); 141 } 142 143 if (!this._server) return null; 144 return this._server.address(); 145 } 146 147 /** 148 * Stop the server from accepting new connections and emit the `'close'` event 149 * when all existing connections are closed. 150 * 151 * @param {Function} [cb] A one-time listener for the `'close'` event 152 * @public 153 */ 154 close(cb) { 155 if (this._state === CLOSED) { 156 if (cb) { 157 this.once('close', () => { 158 cb(new Error('The server is not running')); 159 }); 160 } 161 162 process.nextTick(emitClose, this); 163 return; 164 } 165 166 if (cb) this.once('close', cb); 167 168 if (this._state === CLOSING) return; 169 this._state = CLOSING; 170 171 if (this.options.noServer || this.options.server) { 172 if (this._server) { 173 this._removeListeners(); 174 this._removeListeners = this._server = null; 175 } 176 177 if (this.clients) { 178 if (!this.clients.size) { 179 process.nextTick(emitClose, this); 180 } else { 181 this._shouldEmitClose = true; 182 } 183 } else { 184 process.nextTick(emitClose, this); 185 } 186 } else { 187 const server = this._server; 188 189 this._removeListeners(); 190 this._removeListeners = this._server = null; 191 192 // 193 // The HTTP/S server was created internally. Close it, and rely on its 194 // `'close'` event. 195 // 196 server.close(() => { 197 emitClose(this); 198 }); 199 } 200 } 201 202 /** 203 * See if a given request should be handled by this server instance. 204 * 205 * @param {http.IncomingMessage} req Request object to inspect 206 * @return {Boolean} `true` if the request is valid, else `false` 207 * @public 208 */ 209 shouldHandle(req) { 210 if (this.options.path) { 211 const index = req.url.indexOf('?'); 212 const pathname = index !== -1 ? req.url.slice(0, index) : req.url; 213 214 if (pathname !== this.options.path) return false; 215 } 216 217 return true; 218 } 219 220 /** 221 * Handle a HTTP Upgrade request. 222 * 223 * @param {http.IncomingMessage} req The request object 224 * @param {(net.Socket|tls.Socket)} socket The network socket between the 225 * server and client 226 * @param {Buffer} head The first packet of the upgraded stream 227 * @param {Function} cb Callback 228 * @public 229 */ 230 handleUpgrade(req, socket, head, cb) { 231 socket.on('error', socketOnError); 232 233 const key = req.headers['sec-websocket-key']; 234 const version = +req.headers['sec-websocket-version']; 235 236 if (req.method !== 'GET') { 237 const message = 'Invalid HTTP method'; 238 abortHandshakeOrEmitwsClientError(this, req, socket, 405, message); 239 return; 240 } 241 242 if (req.headers.upgrade.toLowerCase() !== 'websocket') { 243 const message = 'Invalid Upgrade header'; 244 abortHandshakeOrEmitwsClientError(this, req, socket, 400, message); 245 return; 246 } 247 248 if (!key || !keyRegex.test(key)) { 249 const message = 'Missing or invalid Sec-WebSocket-Key header'; 250 abortHandshakeOrEmitwsClientError(this, req, socket, 400, message); 251 return; 252 } 253 254 if (version !== 8 && version !== 13) { 255 const message = 'Missing or invalid Sec-WebSocket-Version header'; 256 abortHandshakeOrEmitwsClientError(this, req, socket, 400, message); 257 return; 258 } 259 260 if (!this.shouldHandle(req)) { 261 abortHandshake(socket, 400); 262 return; 263 } 264 265 const secWebSocketProtocol = req.headers['sec-websocket-protocol']; 266 let protocols = new Set(); 267 268 if (secWebSocketProtocol !== undefined) { 269 try { 270 protocols = subprotocol.parse(secWebSocketProtocol); 271 } catch (err) { 272 const message = 'Invalid Sec-WebSocket-Protocol header'; 273 abortHandshakeOrEmitwsClientError(this, req, socket, 400, message); 274 return; 275 } 276 } 277 278 const secWebSocketExtensions = req.headers['sec-websocket-extensions']; 279 const extensions = {}; 280 281 if ( 282 this.options.perMessageDeflate && 283 secWebSocketExtensions !== undefined 284 ) { 285 const perMessageDeflate = new PerMessageDeflate( 286 this.options.perMessageDeflate, 287 true, 288 this.options.maxPayload 289 ); 290 291 try { 292 const offers = extension.parse(secWebSocketExtensions); 293 294 if (offers[PerMessageDeflate.extensionName]) { 295 perMessageDeflate.accept(offers[PerMessageDeflate.extensionName]); 296 extensions[PerMessageDeflate.extensionName] = perMessageDeflate; 297 } 298 } catch (err) { 299 const message = 300 'Invalid or unacceptable Sec-WebSocket-Extensions header'; 301 abortHandshakeOrEmitwsClientError(this, req, socket, 400, message); 302 return; 303 } 304 } 305 306 // 307 // Optionally call external client verification handler. 308 // 309 if (this.options.verifyClient) { 310 const info = { 311 origin: 312 req.headers[`${version === 8 ? 'sec-websocket-origin' : 'origin'}`], 313 secure: !!(req.socket.authorized || req.socket.encrypted), 314 req 315 }; 316 317 if (this.options.verifyClient.length === 2) { 318 this.options.verifyClient(info, (verified, code, message, headers) => { 319 if (!verified) { 320 return abortHandshake(socket, code || 401, message, headers); 321 } 322 323 this.completeUpgrade( 324 extensions, 325 key, 326 protocols, 327 req, 328 socket, 329 head, 330 cb 331 ); 332 }); 333 return; 334 } 335 336 if (!this.options.verifyClient(info)) return abortHandshake(socket, 401); 337 } 338 339 this.completeUpgrade(extensions, key, protocols, req, socket, head, cb); 340 } 341 342 /** 343 * Upgrade the connection to WebSocket. 344 * 345 * @param {Object} extensions The accepted extensions 346 * @param {String} key The value of the `Sec-WebSocket-Key` header 347 * @param {Set} protocols The subprotocols 348 * @param {http.IncomingMessage} req The request object 349 * @param {(net.Socket|tls.Socket)} socket The network socket between the 350 * server and client 351 * @param {Buffer} head The first packet of the upgraded stream 352 * @param {Function} cb Callback 353 * @throws {Error} If called more than once with the same socket 354 * @private 355 */ 356 completeUpgrade(extensions, key, protocols, req, socket, head, cb) { 357 // 358 // Destroy the socket if the client has already sent a FIN packet. 359 // 360 if (!socket.readable || !socket.writable) return socket.destroy(); 361 362 if (socket[kWebSocket]) { 363 throw new Error( 364 'server.handleUpgrade() was called more than once with the same ' + 365 'socket, possibly due to a misconfiguration' 366 ); 367 } 368 369 if (this._state > RUNNING) return abortHandshake(socket, 503); 370 371 const digest = createHash('sha1') 372 .update(key + GUID) 373 .digest('base64'); 374 375 const headers = [ 376 'HTTP/1.1 101 Switching Protocols', 377 'Upgrade: websocket', 378 'Connection: Upgrade', 379 `Sec-WebSocket-Accept: ${digest}` 380 ]; 381 382 const ws = new this.options.WebSocket(null); 383 384 if (protocols.size) { 385 // 386 // Optionally call external protocol selection handler. 387 // 388 const protocol = this.options.handleProtocols 389 ? this.options.handleProtocols(protocols, req) 390 : protocols.values().next().value; 391 392 if (protocol) { 393 headers.push(`Sec-WebSocket-Protocol: ${protocol}`); 394 ws._protocol = protocol; 395 } 396 } 397 398 if (extensions[PerMessageDeflate.extensionName]) { 399 const params = extensions[PerMessageDeflate.extensionName].params; 400 const value = extension.format({ 401 [PerMessageDeflate.extensionName]: [params] 402 }); 403 headers.push(`Sec-WebSocket-Extensions: ${value}`); 404 ws._extensions = extensions; 405 } 406 407 // 408 // Allow external modification/inspection of handshake headers. 409 // 410 this.emit('headers', headers, req); 411 412 socket.write(headers.concat('\r\n').join('\r\n')); 413 socket.removeListener('error', socketOnError); 414 415 ws.setSocket(socket, head, { 416 maxPayload: this.options.maxPayload, 417 skipUTF8Validation: this.options.skipUTF8Validation 418 }); 419 420 if (this.clients) { 421 this.clients.add(ws); 422 ws.on('close', () => { 423 this.clients.delete(ws); 424 425 if (this._shouldEmitClose && !this.clients.size) { 426 process.nextTick(emitClose, this); 427 } 428 }); 429 } 430 431 cb(ws, req); 432 } 433 } 434 435 module.exports = WebSocketServer; 436 437 /** 438 * Add event listeners on an `EventEmitter` using a map of <event, listener> 439 * pairs. 440 * 441 * @param {EventEmitter} server The event emitter 442 * @param {Object.<String, Function>} map The listeners to add 443 * @return {Function} A function that will remove the added listeners when 444 * called 445 * @private 446 */ 447 function addListeners(server, map) { 448 for (const event of Object.keys(map)) server.on(event, map[event]); 449 450 return function removeListeners() { 451 for (const event of Object.keys(map)) { 452 server.removeListener(event, map[event]); 453 } 454 }; 455 } 456 457 /** 458 * Emit a `'close'` event on an `EventEmitter`. 459 * 460 * @param {EventEmitter} server The event emitter 461 * @private 462 */ 463 function emitClose(server) { 464 server._state = CLOSED; 465 server.emit('close'); 466 } 467 468 /** 469 * Handle socket errors. 470 * 471 * @private 472 */ 473 function socketOnError() { 474 this.destroy(); 475 } 476 477 /** 478 * Close the connection when preconditions are not fulfilled. 479 * 480 * @param {(net.Socket|tls.Socket)} socket The socket of the upgrade request 481 * @param {Number} code The HTTP response status code 482 * @param {String} [message] The HTTP response body 483 * @param {Object} [headers] Additional HTTP response headers 484 * @private 485 */ 486 function abortHandshake(socket, code, message, headers) { 487 // 488 // The socket is writable unless the user destroyed or ended it before calling 489 // `server.handleUpgrade()` or in the `verifyClient` function, which is a user 490 // error. Handling this does not make much sense as the worst that can happen 491 // is that some of the data written by the user might be discarded due to the 492 // call to `socket.end()` below, which triggers an `'error'` event that in 493 // turn causes the socket to be destroyed. 494 // 495 message = message || http.STATUS_CODES[code]; 496 headers = { 497 Connection: 'close', 498 'Content-Type': 'text/html', 499 'Content-Length': Buffer.byteLength(message), 500 ...headers 501 }; 502 503 socket.once('finish', socket.destroy); 504 505 socket.end( 506 `HTTP/1.1 ${code} ${http.STATUS_CODES[code]}\r\n` + 507 Object.keys(headers) 508 .map((h) => `${h}: ${headers[h]}`) 509 .join('\r\n') + 510 '\r\n\r\n' + 511 message 512 ); 513 } 514 515 /** 516 * Emit a `'wsClientError'` event on a `WebSocketServer` if there is at least 517 * one listener for it, otherwise call `abortHandshake()`. 518 * 519 * @param {WebSocketServer} server The WebSocket server 520 * @param {http.IncomingMessage} req The request object 521 * @param {(net.Socket|tls.Socket)} socket The socket of the upgrade request 522 * @param {Number} code The HTTP response status code 523 * @param {String} message The HTTP response body 524 * @private 525 */ 526 function abortHandshakeOrEmitwsClientError(server, req, socket, code, message) { 527 if (server.listenerCount('wsClientError')) { 528 const err = new Error(message); 529 Error.captureStackTrace(err, abortHandshakeOrEmitwsClientError); 530 531 server.emit('wsClientError', err, socket, req); 532 } else { 533 abortHandshake(socket, code, message); 534 } 535 }