permessage-deflate.js (14018B)
1 'use strict'; 2 3 const zlib = require('zlib'); 4 5 const bufferUtil = require('./buffer-util'); 6 const Limiter = require('./limiter'); 7 const { kStatusCode } = require('./constants'); 8 9 const TRAILER = Buffer.from([0x00, 0x00, 0xff, 0xff]); 10 const kPerMessageDeflate = Symbol('permessage-deflate'); 11 const kTotalLength = Symbol('total-length'); 12 const kCallback = Symbol('callback'); 13 const kBuffers = Symbol('buffers'); 14 const kError = Symbol('error'); 15 16 // 17 // We limit zlib concurrency, which prevents severe memory fragmentation 18 // as documented in https://github.com/nodejs/node/issues/8871#issuecomment-250915913 19 // and https://github.com/websockets/ws/issues/1202 20 // 21 // Intentionally global; it's the global thread pool that's an issue. 22 // 23 let zlibLimiter; 24 25 /** 26 * permessage-deflate implementation. 27 */ 28 class PerMessageDeflate { 29 /** 30 * Creates a PerMessageDeflate instance. 31 * 32 * @param {Object} [options] Configuration options 33 * @param {(Boolean|Number)} [options.clientMaxWindowBits] Advertise support 34 * for, or request, a custom client window size 35 * @param {Boolean} [options.clientNoContextTakeover=false] Advertise/ 36 * acknowledge disabling of client context takeover 37 * @param {Number} [options.concurrencyLimit=10] The number of concurrent 38 * calls to zlib 39 * @param {(Boolean|Number)} [options.serverMaxWindowBits] Request/confirm the 40 * use of a custom server window size 41 * @param {Boolean} [options.serverNoContextTakeover=false] Request/accept 42 * disabling of server context takeover 43 * @param {Number} [options.threshold=1024] Size (in bytes) below which 44 * messages should not be compressed if context takeover is disabled 45 * @param {Object} [options.zlibDeflateOptions] Options to pass to zlib on 46 * deflate 47 * @param {Object} [options.zlibInflateOptions] Options to pass to zlib on 48 * inflate 49 * @param {Boolean} [isServer=false] Create the instance in either server or 50 * client mode 51 * @param {Number} [maxPayload=0] The maximum allowed message length 52 */ 53 constructor(options, isServer, maxPayload) { 54 this._maxPayload = maxPayload | 0; 55 this._options = options || {}; 56 this._threshold = 57 this._options.threshold !== undefined ? this._options.threshold : 1024; 58 this._isServer = !!isServer; 59 this._deflate = null; 60 this._inflate = null; 61 62 this.params = null; 63 64 if (!zlibLimiter) { 65 const concurrency = 66 this._options.concurrencyLimit !== undefined 67 ? this._options.concurrencyLimit 68 : 10; 69 zlibLimiter = new Limiter(concurrency); 70 } 71 } 72 73 /** 74 * @type {String} 75 */ 76 static get extensionName() { 77 return 'permessage-deflate'; 78 } 79 80 /** 81 * Create an extension negotiation offer. 82 * 83 * @return {Object} Extension parameters 84 * @public 85 */ 86 offer() { 87 const params = {}; 88 89 if (this._options.serverNoContextTakeover) { 90 params.server_no_context_takeover = true; 91 } 92 if (this._options.clientNoContextTakeover) { 93 params.client_no_context_takeover = true; 94 } 95 if (this._options.serverMaxWindowBits) { 96 params.server_max_window_bits = this._options.serverMaxWindowBits; 97 } 98 if (this._options.clientMaxWindowBits) { 99 params.client_max_window_bits = this._options.clientMaxWindowBits; 100 } else if (this._options.clientMaxWindowBits == null) { 101 params.client_max_window_bits = true; 102 } 103 104 return params; 105 } 106 107 /** 108 * Accept an extension negotiation offer/response. 109 * 110 * @param {Array} configurations The extension negotiation offers/reponse 111 * @return {Object} Accepted configuration 112 * @public 113 */ 114 accept(configurations) { 115 configurations = this.normalizeParams(configurations); 116 117 this.params = this._isServer 118 ? this.acceptAsServer(configurations) 119 : this.acceptAsClient(configurations); 120 121 return this.params; 122 } 123 124 /** 125 * Releases all resources used by the extension. 126 * 127 * @public 128 */ 129 cleanup() { 130 if (this._inflate) { 131 this._inflate.close(); 132 this._inflate = null; 133 } 134 135 if (this._deflate) { 136 const callback = this._deflate[kCallback]; 137 138 this._deflate.close(); 139 this._deflate = null; 140 141 if (callback) { 142 callback( 143 new Error( 144 'The deflate stream was closed while data was being processed' 145 ) 146 ); 147 } 148 } 149 } 150 151 /** 152 * Accept an extension negotiation offer. 153 * 154 * @param {Array} offers The extension negotiation offers 155 * @return {Object} Accepted configuration 156 * @private 157 */ 158 acceptAsServer(offers) { 159 const opts = this._options; 160 const accepted = offers.find((params) => { 161 if ( 162 (opts.serverNoContextTakeover === false && 163 params.server_no_context_takeover) || 164 (params.server_max_window_bits && 165 (opts.serverMaxWindowBits === false || 166 (typeof opts.serverMaxWindowBits === 'number' && 167 opts.serverMaxWindowBits > params.server_max_window_bits))) || 168 (typeof opts.clientMaxWindowBits === 'number' && 169 !params.client_max_window_bits) 170 ) { 171 return false; 172 } 173 174 return true; 175 }); 176 177 if (!accepted) { 178 throw new Error('None of the extension offers can be accepted'); 179 } 180 181 if (opts.serverNoContextTakeover) { 182 accepted.server_no_context_takeover = true; 183 } 184 if (opts.clientNoContextTakeover) { 185 accepted.client_no_context_takeover = true; 186 } 187 if (typeof opts.serverMaxWindowBits === 'number') { 188 accepted.server_max_window_bits = opts.serverMaxWindowBits; 189 } 190 if (typeof opts.clientMaxWindowBits === 'number') { 191 accepted.client_max_window_bits = opts.clientMaxWindowBits; 192 } else if ( 193 accepted.client_max_window_bits === true || 194 opts.clientMaxWindowBits === false 195 ) { 196 delete accepted.client_max_window_bits; 197 } 198 199 return accepted; 200 } 201 202 /** 203 * Accept the extension negotiation response. 204 * 205 * @param {Array} response The extension negotiation response 206 * @return {Object} Accepted configuration 207 * @private 208 */ 209 acceptAsClient(response) { 210 const params = response[0]; 211 212 if ( 213 this._options.clientNoContextTakeover === false && 214 params.client_no_context_takeover 215 ) { 216 throw new Error('Unexpected parameter "client_no_context_takeover"'); 217 } 218 219 if (!params.client_max_window_bits) { 220 if (typeof this._options.clientMaxWindowBits === 'number') { 221 params.client_max_window_bits = this._options.clientMaxWindowBits; 222 } 223 } else if ( 224 this._options.clientMaxWindowBits === false || 225 (typeof this._options.clientMaxWindowBits === 'number' && 226 params.client_max_window_bits > this._options.clientMaxWindowBits) 227 ) { 228 throw new Error( 229 'Unexpected or invalid parameter "client_max_window_bits"' 230 ); 231 } 232 233 return params; 234 } 235 236 /** 237 * Normalize parameters. 238 * 239 * @param {Array} configurations The extension negotiation offers/reponse 240 * @return {Array} The offers/response with normalized parameters 241 * @private 242 */ 243 normalizeParams(configurations) { 244 configurations.forEach((params) => { 245 Object.keys(params).forEach((key) => { 246 let value = params[key]; 247 248 if (value.length > 1) { 249 throw new Error(`Parameter "${key}" must have only a single value`); 250 } 251 252 value = value[0]; 253 254 if (key === 'client_max_window_bits') { 255 if (value !== true) { 256 const num = +value; 257 if (!Number.isInteger(num) || num < 8 || num > 15) { 258 throw new TypeError( 259 `Invalid value for parameter "${key}": ${value}` 260 ); 261 } 262 value = num; 263 } else if (!this._isServer) { 264 throw new TypeError( 265 `Invalid value for parameter "${key}": ${value}` 266 ); 267 } 268 } else if (key === 'server_max_window_bits') { 269 const num = +value; 270 if (!Number.isInteger(num) || num < 8 || num > 15) { 271 throw new TypeError( 272 `Invalid value for parameter "${key}": ${value}` 273 ); 274 } 275 value = num; 276 } else if ( 277 key === 'client_no_context_takeover' || 278 key === 'server_no_context_takeover' 279 ) { 280 if (value !== true) { 281 throw new TypeError( 282 `Invalid value for parameter "${key}": ${value}` 283 ); 284 } 285 } else { 286 throw new Error(`Unknown parameter "${key}"`); 287 } 288 289 params[key] = value; 290 }); 291 }); 292 293 return configurations; 294 } 295 296 /** 297 * Decompress data. Concurrency limited. 298 * 299 * @param {Buffer} data Compressed data 300 * @param {Boolean} fin Specifies whether or not this is the last fragment 301 * @param {Function} callback Callback 302 * @public 303 */ 304 decompress(data, fin, callback) { 305 zlibLimiter.add((done) => { 306 this._decompress(data, fin, (err, result) => { 307 done(); 308 callback(err, result); 309 }); 310 }); 311 } 312 313 /** 314 * Compress data. Concurrency limited. 315 * 316 * @param {(Buffer|String)} data Data to compress 317 * @param {Boolean} fin Specifies whether or not this is the last fragment 318 * @param {Function} callback Callback 319 * @public 320 */ 321 compress(data, fin, callback) { 322 zlibLimiter.add((done) => { 323 this._compress(data, fin, (err, result) => { 324 done(); 325 callback(err, result); 326 }); 327 }); 328 } 329 330 /** 331 * Decompress data. 332 * 333 * @param {Buffer} data Compressed data 334 * @param {Boolean} fin Specifies whether or not this is the last fragment 335 * @param {Function} callback Callback 336 * @private 337 */ 338 _decompress(data, fin, callback) { 339 const endpoint = this._isServer ? 'client' : 'server'; 340 341 if (!this._inflate) { 342 const key = `${endpoint}_max_window_bits`; 343 const windowBits = 344 typeof this.params[key] !== 'number' 345 ? zlib.Z_DEFAULT_WINDOWBITS 346 : this.params[key]; 347 348 this._inflate = zlib.createInflateRaw({ 349 ...this._options.zlibInflateOptions, 350 windowBits 351 }); 352 this._inflate[kPerMessageDeflate] = this; 353 this._inflate[kTotalLength] = 0; 354 this._inflate[kBuffers] = []; 355 this._inflate.on('error', inflateOnError); 356 this._inflate.on('data', inflateOnData); 357 } 358 359 this._inflate[kCallback] = callback; 360 361 this._inflate.write(data); 362 if (fin) this._inflate.write(TRAILER); 363 364 this._inflate.flush(() => { 365 const err = this._inflate[kError]; 366 367 if (err) { 368 this._inflate.close(); 369 this._inflate = null; 370 callback(err); 371 return; 372 } 373 374 const data = bufferUtil.concat( 375 this._inflate[kBuffers], 376 this._inflate[kTotalLength] 377 ); 378 379 if (this._inflate._readableState.endEmitted) { 380 this._inflate.close(); 381 this._inflate = null; 382 } else { 383 this._inflate[kTotalLength] = 0; 384 this._inflate[kBuffers] = []; 385 386 if (fin && this.params[`${endpoint}_no_context_takeover`]) { 387 this._inflate.reset(); 388 } 389 } 390 391 callback(null, data); 392 }); 393 } 394 395 /** 396 * Compress data. 397 * 398 * @param {(Buffer|String)} data Data to compress 399 * @param {Boolean} fin Specifies whether or not this is the last fragment 400 * @param {Function} callback Callback 401 * @private 402 */ 403 _compress(data, fin, callback) { 404 const endpoint = this._isServer ? 'server' : 'client'; 405 406 if (!this._deflate) { 407 const key = `${endpoint}_max_window_bits`; 408 const windowBits = 409 typeof this.params[key] !== 'number' 410 ? zlib.Z_DEFAULT_WINDOWBITS 411 : this.params[key]; 412 413 this._deflate = zlib.createDeflateRaw({ 414 ...this._options.zlibDeflateOptions, 415 windowBits 416 }); 417 418 this._deflate[kTotalLength] = 0; 419 this._deflate[kBuffers] = []; 420 421 this._deflate.on('data', deflateOnData); 422 } 423 424 this._deflate[kCallback] = callback; 425 426 this._deflate.write(data); 427 this._deflate.flush(zlib.Z_SYNC_FLUSH, () => { 428 if (!this._deflate) { 429 // 430 // The deflate stream was closed while data was being processed. 431 // 432 return; 433 } 434 435 let data = bufferUtil.concat( 436 this._deflate[kBuffers], 437 this._deflate[kTotalLength] 438 ); 439 440 if (fin) data = data.slice(0, data.length - 4); 441 442 // 443 // Ensure that the callback will not be called again in 444 // `PerMessageDeflate#cleanup()`. 445 // 446 this._deflate[kCallback] = null; 447 448 this._deflate[kTotalLength] = 0; 449 this._deflate[kBuffers] = []; 450 451 if (fin && this.params[`${endpoint}_no_context_takeover`]) { 452 this._deflate.reset(); 453 } 454 455 callback(null, data); 456 }); 457 } 458 } 459 460 module.exports = PerMessageDeflate; 461 462 /** 463 * The listener of the `zlib.DeflateRaw` stream `'data'` event. 464 * 465 * @param {Buffer} chunk A chunk of data 466 * @private 467 */ 468 function deflateOnData(chunk) { 469 this[kBuffers].push(chunk); 470 this[kTotalLength] += chunk.length; 471 } 472 473 /** 474 * The listener of the `zlib.InflateRaw` stream `'data'` event. 475 * 476 * @param {Buffer} chunk A chunk of data 477 * @private 478 */ 479 function inflateOnData(chunk) { 480 this[kTotalLength] += chunk.length; 481 482 if ( 483 this[kPerMessageDeflate]._maxPayload < 1 || 484 this[kTotalLength] <= this[kPerMessageDeflate]._maxPayload 485 ) { 486 this[kBuffers].push(chunk); 487 return; 488 } 489 490 this[kError] = new RangeError('Max payload size exceeded'); 491 this[kError].code = 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH'; 492 this[kError][kStatusCode] = 1009; 493 this.removeListener('data', inflateOnData); 494 this.reset(); 495 } 496 497 /** 498 * The listener of the `zlib.InflateRaw` stream `'error'` event. 499 * 500 * @param {Error} err The emitted error 501 * @private 502 */ 503 function inflateOnError(err) { 504 // 505 // There is no need to call `Zlib#close()` as the handle is automatically 506 // closed when an error is emitted. 507 // 508 this[kPerMessageDeflate]._inflate = null; 509 err[kStatusCode] = 1007; 510 this[kCallback](err); 511 }