sender.js (12668B)
1 /* eslint no-unused-vars: ["error", { "varsIgnorePattern": "^net|tls$" }] */ 2 3 'use strict'; 4 5 const net = require('net'); 6 const tls = require('tls'); 7 const { randomFillSync } = require('crypto'); 8 9 const PerMessageDeflate = require('./permessage-deflate'); 10 const { EMPTY_BUFFER } = require('./constants'); 11 const { isValidStatusCode } = require('./validation'); 12 const { mask: applyMask, toBuffer } = require('./buffer-util'); 13 14 const kByteLength = Symbol('kByteLength'); 15 const maskBuffer = Buffer.alloc(4); 16 17 /** 18 * HyBi Sender implementation. 19 */ 20 class Sender { 21 /** 22 * Creates a Sender instance. 23 * 24 * @param {(net.Socket|tls.Socket)} socket The connection socket 25 * @param {Object} [extensions] An object containing the negotiated extensions 26 * @param {Function} [generateMask] The function used to generate the masking 27 * key 28 */ 29 constructor(socket, extensions, generateMask) { 30 this._extensions = extensions || {}; 31 32 if (generateMask) { 33 this._generateMask = generateMask; 34 this._maskBuffer = Buffer.alloc(4); 35 } 36 37 this._socket = socket; 38 39 this._firstFragment = true; 40 this._compress = false; 41 42 this._bufferedBytes = 0; 43 this._deflating = false; 44 this._queue = []; 45 } 46 47 /** 48 * Frames a piece of data according to the HyBi WebSocket protocol. 49 * 50 * @param {(Buffer|String)} data The data to frame 51 * @param {Object} options Options object 52 * @param {Boolean} [options.fin=false] Specifies whether or not to set the 53 * FIN bit 54 * @param {Function} [options.generateMask] The function used to generate the 55 * masking key 56 * @param {Boolean} [options.mask=false] Specifies whether or not to mask 57 * `data` 58 * @param {Buffer} [options.maskBuffer] The buffer used to store the masking 59 * key 60 * @param {Number} options.opcode The opcode 61 * @param {Boolean} [options.readOnly=false] Specifies whether `data` can be 62 * modified 63 * @param {Boolean} [options.rsv1=false] Specifies whether or not to set the 64 * RSV1 bit 65 * @return {(Buffer|String)[]} The framed data 66 * @public 67 */ 68 static frame(data, options) { 69 let mask; 70 let merge = false; 71 let offset = 2; 72 let skipMasking = false; 73 74 if (options.mask) { 75 mask = options.maskBuffer || maskBuffer; 76 77 if (options.generateMask) { 78 options.generateMask(mask); 79 } else { 80 randomFillSync(mask, 0, 4); 81 } 82 83 skipMasking = (mask[0] | mask[1] | mask[2] | mask[3]) === 0; 84 offset = 6; 85 } 86 87 let dataLength; 88 89 if (typeof data === 'string') { 90 if ( 91 (!options.mask || skipMasking) && 92 options[kByteLength] !== undefined 93 ) { 94 dataLength = options[kByteLength]; 95 } else { 96 data = Buffer.from(data); 97 dataLength = data.length; 98 } 99 } else { 100 dataLength = data.length; 101 merge = options.mask && options.readOnly && !skipMasking; 102 } 103 104 let payloadLength = dataLength; 105 106 if (dataLength >= 65536) { 107 offset += 8; 108 payloadLength = 127; 109 } else if (dataLength > 125) { 110 offset += 2; 111 payloadLength = 126; 112 } 113 114 const target = Buffer.allocUnsafe(merge ? dataLength + offset : offset); 115 116 target[0] = options.fin ? options.opcode | 0x80 : options.opcode; 117 if (options.rsv1) target[0] |= 0x40; 118 119 target[1] = payloadLength; 120 121 if (payloadLength === 126) { 122 target.writeUInt16BE(dataLength, 2); 123 } else if (payloadLength === 127) { 124 target[2] = target[3] = 0; 125 target.writeUIntBE(dataLength, 4, 6); 126 } 127 128 if (!options.mask) return [target, data]; 129 130 target[1] |= 0x80; 131 target[offset - 4] = mask[0]; 132 target[offset - 3] = mask[1]; 133 target[offset - 2] = mask[2]; 134 target[offset - 1] = mask[3]; 135 136 if (skipMasking) return [target, data]; 137 138 if (merge) { 139 applyMask(data, mask, target, offset, dataLength); 140 return [target]; 141 } 142 143 applyMask(data, mask, data, 0, dataLength); 144 return [target, data]; 145 } 146 147 /** 148 * Sends a close message to the other peer. 149 * 150 * @param {Number} [code] The status code component of the body 151 * @param {(String|Buffer)} [data] The message component of the body 152 * @param {Boolean} [mask=false] Specifies whether or not to mask the message 153 * @param {Function} [cb] Callback 154 * @public 155 */ 156 close(code, data, mask, cb) { 157 let buf; 158 159 if (code === undefined) { 160 buf = EMPTY_BUFFER; 161 } else if (typeof code !== 'number' || !isValidStatusCode(code)) { 162 throw new TypeError('First argument must be a valid error code number'); 163 } else if (data === undefined || !data.length) { 164 buf = Buffer.allocUnsafe(2); 165 buf.writeUInt16BE(code, 0); 166 } else { 167 const length = Buffer.byteLength(data); 168 169 if (length > 123) { 170 throw new RangeError('The message must not be greater than 123 bytes'); 171 } 172 173 buf = Buffer.allocUnsafe(2 + length); 174 buf.writeUInt16BE(code, 0); 175 176 if (typeof data === 'string') { 177 buf.write(data, 2); 178 } else { 179 buf.set(data, 2); 180 } 181 } 182 183 const options = { 184 [kByteLength]: buf.length, 185 fin: true, 186 generateMask: this._generateMask, 187 mask, 188 maskBuffer: this._maskBuffer, 189 opcode: 0x08, 190 readOnly: false, 191 rsv1: false 192 }; 193 194 if (this._deflating) { 195 this.enqueue([this.dispatch, buf, false, options, cb]); 196 } else { 197 this.sendFrame(Sender.frame(buf, options), cb); 198 } 199 } 200 201 /** 202 * Sends a ping message to the other peer. 203 * 204 * @param {*} data The message to send 205 * @param {Boolean} [mask=false] Specifies whether or not to mask `data` 206 * @param {Function} [cb] Callback 207 * @public 208 */ 209 ping(data, mask, cb) { 210 let byteLength; 211 let readOnly; 212 213 if (typeof data === 'string') { 214 byteLength = Buffer.byteLength(data); 215 readOnly = false; 216 } else { 217 data = toBuffer(data); 218 byteLength = data.length; 219 readOnly = toBuffer.readOnly; 220 } 221 222 if (byteLength > 125) { 223 throw new RangeError('The data size must not be greater than 125 bytes'); 224 } 225 226 const options = { 227 [kByteLength]: byteLength, 228 fin: true, 229 generateMask: this._generateMask, 230 mask, 231 maskBuffer: this._maskBuffer, 232 opcode: 0x09, 233 readOnly, 234 rsv1: false 235 }; 236 237 if (this._deflating) { 238 this.enqueue([this.dispatch, data, false, options, cb]); 239 } else { 240 this.sendFrame(Sender.frame(data, options), cb); 241 } 242 } 243 244 /** 245 * Sends a pong message to the other peer. 246 * 247 * @param {*} data The message to send 248 * @param {Boolean} [mask=false] Specifies whether or not to mask `data` 249 * @param {Function} [cb] Callback 250 * @public 251 */ 252 pong(data, mask, cb) { 253 let byteLength; 254 let readOnly; 255 256 if (typeof data === 'string') { 257 byteLength = Buffer.byteLength(data); 258 readOnly = false; 259 } else { 260 data = toBuffer(data); 261 byteLength = data.length; 262 readOnly = toBuffer.readOnly; 263 } 264 265 if (byteLength > 125) { 266 throw new RangeError('The data size must not be greater than 125 bytes'); 267 } 268 269 const options = { 270 [kByteLength]: byteLength, 271 fin: true, 272 generateMask: this._generateMask, 273 mask, 274 maskBuffer: this._maskBuffer, 275 opcode: 0x0a, 276 readOnly, 277 rsv1: false 278 }; 279 280 if (this._deflating) { 281 this.enqueue([this.dispatch, data, false, options, cb]); 282 } else { 283 this.sendFrame(Sender.frame(data, options), cb); 284 } 285 } 286 287 /** 288 * Sends a data message to the other peer. 289 * 290 * @param {*} data The message to send 291 * @param {Object} options Options object 292 * @param {Boolean} [options.binary=false] Specifies whether `data` is binary 293 * or text 294 * @param {Boolean} [options.compress=false] Specifies whether or not to 295 * compress `data` 296 * @param {Boolean} [options.fin=false] Specifies whether the fragment is the 297 * last one 298 * @param {Boolean} [options.mask=false] Specifies whether or not to mask 299 * `data` 300 * @param {Function} [cb] Callback 301 * @public 302 */ 303 send(data, options, cb) { 304 const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName]; 305 let opcode = options.binary ? 2 : 1; 306 let rsv1 = options.compress; 307 308 let byteLength; 309 let readOnly; 310 311 if (typeof data === 'string') { 312 byteLength = Buffer.byteLength(data); 313 readOnly = false; 314 } else { 315 data = toBuffer(data); 316 byteLength = data.length; 317 readOnly = toBuffer.readOnly; 318 } 319 320 if (this._firstFragment) { 321 this._firstFragment = false; 322 if ( 323 rsv1 && 324 perMessageDeflate && 325 perMessageDeflate.params[ 326 perMessageDeflate._isServer 327 ? 'server_no_context_takeover' 328 : 'client_no_context_takeover' 329 ] 330 ) { 331 rsv1 = byteLength >= perMessageDeflate._threshold; 332 } 333 this._compress = rsv1; 334 } else { 335 rsv1 = false; 336 opcode = 0; 337 } 338 339 if (options.fin) this._firstFragment = true; 340 341 if (perMessageDeflate) { 342 const opts = { 343 [kByteLength]: byteLength, 344 fin: options.fin, 345 generateMask: this._generateMask, 346 mask: options.mask, 347 maskBuffer: this._maskBuffer, 348 opcode, 349 readOnly, 350 rsv1 351 }; 352 353 if (this._deflating) { 354 this.enqueue([this.dispatch, data, this._compress, opts, cb]); 355 } else { 356 this.dispatch(data, this._compress, opts, cb); 357 } 358 } else { 359 this.sendFrame( 360 Sender.frame(data, { 361 [kByteLength]: byteLength, 362 fin: options.fin, 363 generateMask: this._generateMask, 364 mask: options.mask, 365 maskBuffer: this._maskBuffer, 366 opcode, 367 readOnly, 368 rsv1: false 369 }), 370 cb 371 ); 372 } 373 } 374 375 /** 376 * Dispatches a message. 377 * 378 * @param {(Buffer|String)} data The message to send 379 * @param {Boolean} [compress=false] Specifies whether or not to compress 380 * `data` 381 * @param {Object} options Options object 382 * @param {Boolean} [options.fin=false] Specifies whether or not to set the 383 * FIN bit 384 * @param {Function} [options.generateMask] The function used to generate the 385 * masking key 386 * @param {Boolean} [options.mask=false] Specifies whether or not to mask 387 * `data` 388 * @param {Buffer} [options.maskBuffer] The buffer used to store the masking 389 * key 390 * @param {Number} options.opcode The opcode 391 * @param {Boolean} [options.readOnly=false] Specifies whether `data` can be 392 * modified 393 * @param {Boolean} [options.rsv1=false] Specifies whether or not to set the 394 * RSV1 bit 395 * @param {Function} [cb] Callback 396 * @private 397 */ 398 dispatch(data, compress, options, cb) { 399 if (!compress) { 400 this.sendFrame(Sender.frame(data, options), cb); 401 return; 402 } 403 404 const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName]; 405 406 this._bufferedBytes += options[kByteLength]; 407 this._deflating = true; 408 perMessageDeflate.compress(data, options.fin, (_, buf) => { 409 if (this._socket.destroyed) { 410 const err = new Error( 411 'The socket was closed while data was being compressed' 412 ); 413 414 if (typeof cb === 'function') cb(err); 415 416 for (let i = 0; i < this._queue.length; i++) { 417 const params = this._queue[i]; 418 const callback = params[params.length - 1]; 419 420 if (typeof callback === 'function') callback(err); 421 } 422 423 return; 424 } 425 426 this._bufferedBytes -= options[kByteLength]; 427 this._deflating = false; 428 options.readOnly = false; 429 this.sendFrame(Sender.frame(buf, options), cb); 430 this.dequeue(); 431 }); 432 } 433 434 /** 435 * Executes queued send operations. 436 * 437 * @private 438 */ 439 dequeue() { 440 while (!this._deflating && this._queue.length) { 441 const params = this._queue.shift(); 442 443 this._bufferedBytes -= params[3][kByteLength]; 444 Reflect.apply(params[0], this, params.slice(1)); 445 } 446 } 447 448 /** 449 * Enqueues a send operation. 450 * 451 * @param {Array} params Send operation parameters. 452 * @private 453 */ 454 enqueue(params) { 455 this._bufferedBytes += params[3][kByteLength]; 456 this._queue.push(params); 457 } 458 459 /** 460 * Sends a frame. 461 * 462 * @param {Buffer[]} list The frame to send 463 * @param {Function} [cb] Callback 464 * @private 465 */ 466 sendFrame(list, cb) { 467 if (list.length === 2) { 468 this._socket.cork(); 469 this._socket.write(list[0]); 470 this._socket.write(list[1], cb); 471 this._socket.uncork(); 472 } else { 473 this._socket.write(list[0], cb); 474 } 475 } 476 } 477 478 module.exports = Sender;