create-websocket-stream.test.js (16779B)
1 'use strict'; 2 3 const assert = require('assert'); 4 const EventEmitter = require('events'); 5 const { createServer } = require('http'); 6 const { Duplex } = require('stream'); 7 const { randomBytes } = require('crypto'); 8 9 const createWebSocketStream = require('../lib/stream'); 10 const Sender = require('../lib/sender'); 11 const WebSocket = require('..'); 12 const { EMPTY_BUFFER } = require('../lib/constants'); 13 14 describe('createWebSocketStream', () => { 15 it('is exposed as a property of the `WebSocket` class', () => { 16 assert.strictEqual(WebSocket.createWebSocketStream, createWebSocketStream); 17 }); 18 19 it('returns a `Duplex` stream', () => { 20 const duplex = createWebSocketStream(new EventEmitter()); 21 22 assert.ok(duplex instanceof Duplex); 23 }); 24 25 it('passes the options object to the `Duplex` constructor', (done) => { 26 const wss = new WebSocket.Server({ port: 0 }, () => { 27 const ws = new WebSocket(`ws://localhost:${wss.address().port}`); 28 const duplex = createWebSocketStream(ws, { 29 allowHalfOpen: false, 30 encoding: 'utf8' 31 }); 32 33 duplex.on('data', (chunk) => { 34 assert.strictEqual(chunk, 'hi'); 35 36 duplex.on('close', () => { 37 wss.close(done); 38 }); 39 }); 40 }); 41 42 wss.on('connection', (ws) => { 43 ws.send(Buffer.from('hi')); 44 ws.close(); 45 }); 46 }); 47 48 describe('The returned stream', () => { 49 it('buffers writes if `readyState` is `CONNECTING`', (done) => { 50 const chunk = randomBytes(1024); 51 const wss = new WebSocket.Server({ port: 0 }, () => { 52 const ws = new WebSocket(`ws://localhost:${wss.address().port}`); 53 54 assert.strictEqual(ws.readyState, WebSocket.CONNECTING); 55 56 const duplex = createWebSocketStream(ws); 57 58 duplex.write(chunk); 59 }); 60 61 wss.on('connection', (ws) => { 62 ws.on('message', (message, isBinary) => { 63 ws.on('close', (code, reason) => { 64 assert.deepStrictEqual(message, chunk); 65 assert.ok(isBinary); 66 assert.strictEqual(code, 1005); 67 assert.strictEqual(reason, EMPTY_BUFFER); 68 wss.close(done); 69 }); 70 }); 71 72 ws.close(); 73 }); 74 }); 75 76 it('errors if a write occurs when `readyState` is `CLOSING`', (done) => { 77 const wss = new WebSocket.Server({ port: 0 }, () => { 78 const ws = new WebSocket(`ws://localhost:${wss.address().port}`); 79 const duplex = createWebSocketStream(ws); 80 81 duplex.on('error', (err) => { 82 assert.ok(duplex.destroyed); 83 assert.ok(err instanceof Error); 84 assert.strictEqual( 85 err.message, 86 'WebSocket is not open: readyState 2 (CLOSING)' 87 ); 88 89 duplex.on('close', () => { 90 wss.close(done); 91 }); 92 }); 93 94 ws.on('open', () => { 95 ws._receiver.on('conclude', () => { 96 duplex.write('hi'); 97 }); 98 }); 99 }); 100 101 wss.on('connection', (ws) => { 102 ws.close(); 103 }); 104 }); 105 106 it('errors if a write occurs when `readyState` is `CLOSED`', (done) => { 107 const wss = new WebSocket.Server({ port: 0 }, () => { 108 const ws = new WebSocket(`ws://localhost:${wss.address().port}`); 109 const duplex = createWebSocketStream(ws); 110 111 duplex.on('error', (err) => { 112 assert.ok(duplex.destroyed); 113 assert.ok(err instanceof Error); 114 assert.strictEqual( 115 err.message, 116 'WebSocket is not open: readyState 3 (CLOSED)' 117 ); 118 119 duplex.on('close', () => { 120 wss.close(done); 121 }); 122 }); 123 124 ws.on('close', () => { 125 duplex.write('hi'); 126 }); 127 }); 128 129 wss.on('connection', (ws) => { 130 ws.close(); 131 }); 132 }); 133 134 it('does not error if `_final()` is called while connecting', (done) => { 135 const wss = new WebSocket.Server({ port: 0 }, () => { 136 const ws = new WebSocket(`ws://localhost:${wss.address().port}`); 137 138 assert.strictEqual(ws.readyState, WebSocket.CONNECTING); 139 140 const duplex = createWebSocketStream(ws); 141 142 duplex.on('close', () => { 143 wss.close(done); 144 }); 145 146 duplex.resume(); 147 duplex.end(); 148 }); 149 }); 150 151 it('makes `_final()` a noop if no socket is assigned', (done) => { 152 const server = createServer(); 153 154 server.on('upgrade', (request, socket) => { 155 socket.on('end', socket.end); 156 157 const headers = [ 158 'HTTP/1.1 101 Switching Protocols', 159 'Upgrade: websocket', 160 'Connection: Upgrade', 161 'Sec-WebSocket-Accept: foo' 162 ]; 163 164 socket.write(headers.concat('\r\n').join('\r\n')); 165 }); 166 167 server.listen(() => { 168 const called = []; 169 const ws = new WebSocket(`ws://localhost:${server.address().port}`); 170 const duplex = WebSocket.createWebSocketStream(ws); 171 const final = duplex._final; 172 173 duplex._final = (callback) => { 174 called.push('final'); 175 assert.strictEqual(ws.readyState, WebSocket.CLOSING); 176 assert.strictEqual(ws._socket, null); 177 178 final(callback); 179 }; 180 181 duplex.on('error', (err) => { 182 called.push('error'); 183 assert.ok(err instanceof Error); 184 assert.strictEqual( 185 err.message, 186 'Invalid Sec-WebSocket-Accept header' 187 ); 188 }); 189 190 duplex.on('finish', () => { 191 called.push('finish'); 192 }); 193 194 duplex.on('close', () => { 195 assert.deepStrictEqual(called, ['final', 'error']); 196 server.close(done); 197 }); 198 199 ws.on('upgrade', () => { 200 process.nextTick(() => { 201 duplex.end(); 202 }); 203 }); 204 }); 205 }); 206 207 it('reemits errors', (done) => { 208 let duplexCloseEventEmitted = false; 209 let serverClientCloseEventEmitted = false; 210 211 const wss = new WebSocket.Server({ port: 0 }, () => { 212 const ws = new WebSocket(`ws://localhost:${wss.address().port}`); 213 const duplex = createWebSocketStream(ws); 214 215 duplex.on('error', (err) => { 216 assert.ok(err instanceof RangeError); 217 assert.strictEqual(err.code, 'WS_ERR_INVALID_OPCODE'); 218 assert.strictEqual( 219 err.message, 220 'Invalid WebSocket frame: invalid opcode 5' 221 ); 222 223 duplex.on('close', () => { 224 duplexCloseEventEmitted = true; 225 if (serverClientCloseEventEmitted) wss.close(done); 226 }); 227 }); 228 }); 229 230 wss.on('connection', (ws) => { 231 ws._socket.write(Buffer.from([0x85, 0x00])); 232 ws.on('close', (code, reason) => { 233 assert.strictEqual(code, 1002); 234 assert.deepStrictEqual(reason, EMPTY_BUFFER); 235 236 serverClientCloseEventEmitted = true; 237 if (duplexCloseEventEmitted) wss.close(done); 238 }); 239 }); 240 }); 241 242 it('does not swallow errors that may occur while destroying', (done) => { 243 const frame = Buffer.concat( 244 Sender.frame(Buffer.from([0x22, 0xfa, 0xec, 0x78]), { 245 fin: true, 246 rsv1: true, 247 opcode: 0x02, 248 mask: false, 249 readOnly: false 250 }) 251 ); 252 253 const wss = new WebSocket.Server( 254 { 255 perMessageDeflate: true, 256 port: 0 257 }, 258 () => { 259 const ws = new WebSocket(`ws://localhost:${wss.address().port}`); 260 const duplex = createWebSocketStream(ws); 261 262 duplex.on('error', (err) => { 263 assert.ok(err instanceof Error); 264 assert.strictEqual(err.code, 'Z_DATA_ERROR'); 265 assert.strictEqual(err.errno, -3); 266 267 duplex.on('close', () => { 268 wss.close(done); 269 }); 270 }); 271 272 let bytesRead = 0; 273 274 ws.on('open', () => { 275 ws._socket.on('data', (chunk) => { 276 bytesRead += chunk.length; 277 if (bytesRead === frame.length) duplex.destroy(); 278 }); 279 }); 280 } 281 ); 282 283 wss.on('connection', (ws) => { 284 ws._socket.write(frame); 285 }); 286 }); 287 288 it("does not suppress the throwing behavior of 'error' events", (done) => { 289 const wss = new WebSocket.Server({ port: 0 }, () => { 290 const ws = new WebSocket(`ws://localhost:${wss.address().port}`); 291 createWebSocketStream(ws); 292 }); 293 294 wss.on('connection', (ws) => { 295 ws._socket.write(Buffer.from([0x85, 0x00])); 296 }); 297 298 assert.strictEqual(process.listenerCount('uncaughtException'), 1); 299 300 const [listener] = process.listeners('uncaughtException'); 301 302 process.removeAllListeners('uncaughtException'); 303 process.once('uncaughtException', (err) => { 304 assert.ok(err instanceof Error); 305 assert.strictEqual( 306 err.message, 307 'Invalid WebSocket frame: invalid opcode 5' 308 ); 309 310 process.on('uncaughtException', listener); 311 wss.close(done); 312 }); 313 }); 314 315 it("is destroyed after 'end' and 'finish' are emitted (1/2)", (done) => { 316 const wss = new WebSocket.Server({ port: 0 }, () => { 317 const events = []; 318 const ws = new WebSocket(`ws://localhost:${wss.address().port}`); 319 const duplex = createWebSocketStream(ws); 320 321 duplex.on('end', () => { 322 events.push('end'); 323 assert.ok(duplex.destroyed); 324 }); 325 326 duplex.on('close', () => { 327 assert.deepStrictEqual(events, ['finish', 'end']); 328 wss.close(done); 329 }); 330 331 duplex.on('finish', () => { 332 events.push('finish'); 333 assert.ok(!duplex.destroyed); 334 assert.ok(duplex.readable); 335 336 duplex.resume(); 337 }); 338 339 ws.on('close', () => { 340 duplex.end(); 341 }); 342 }); 343 344 wss.on('connection', (ws) => { 345 ws.send('foo'); 346 ws.close(); 347 }); 348 }); 349 350 it("is destroyed after 'end' and 'finish' are emitted (2/2)", (done) => { 351 const wss = new WebSocket.Server({ port: 0 }, () => { 352 const events = []; 353 const ws = new WebSocket(`ws://localhost:${wss.address().port}`); 354 const duplex = createWebSocketStream(ws); 355 356 duplex.on('end', () => { 357 events.push('end'); 358 assert.ok(!duplex.destroyed); 359 assert.ok(duplex.writable); 360 361 duplex.end(); 362 }); 363 364 duplex.on('close', () => { 365 assert.deepStrictEqual(events, ['end', 'finish']); 366 wss.close(done); 367 }); 368 369 duplex.on('finish', () => { 370 events.push('finish'); 371 }); 372 373 duplex.resume(); 374 }); 375 376 wss.on('connection', (ws) => { 377 ws.close(); 378 }); 379 }); 380 381 it('handles backpressure (1/3)', (done) => { 382 const wss = new WebSocket.Server({ port: 0 }, () => { 383 // eslint-disable-next-line no-unused-vars 384 const ws = new WebSocket(`ws://localhost:${wss.address().port}`); 385 }); 386 387 wss.on('connection', (ws) => { 388 const duplex = createWebSocketStream(ws); 389 390 duplex.resume(); 391 392 duplex.on('drain', () => { 393 duplex.on('close', () => { 394 wss.close(done); 395 }); 396 397 duplex.end(); 398 }); 399 400 const chunk = randomBytes(1024); 401 let ret; 402 403 do { 404 ret = duplex.write(chunk); 405 } while (ret !== false); 406 }); 407 }); 408 409 it('handles backpressure (2/3)', (done) => { 410 const wss = new WebSocket.Server( 411 { port: 0, perMessageDeflate: true }, 412 () => { 413 const called = []; 414 const ws = new WebSocket(`ws://localhost:${wss.address().port}`); 415 const duplex = createWebSocketStream(ws); 416 const read = duplex._read; 417 418 duplex._read = () => { 419 duplex._read = read; 420 called.push('read'); 421 assert.ok(ws._receiver._writableState.needDrain); 422 read(); 423 assert.ok(ws._socket.isPaused()); 424 }; 425 426 ws.on('open', () => { 427 ws._socket.on('pause', () => { 428 duplex.resume(); 429 }); 430 431 ws._receiver.on('drain', () => { 432 called.push('drain'); 433 assert.ok(!ws._socket.isPaused()); 434 duplex.end(); 435 }); 436 437 const opts = { 438 fin: true, 439 opcode: 0x02, 440 mask: false, 441 readOnly: false 442 }; 443 444 const list = [ 445 ...Sender.frame(randomBytes(16 * 1024), { rsv1: false, ...opts }), 446 ...Sender.frame(Buffer.alloc(1), { rsv1: true, ...opts }) 447 ]; 448 449 // This hack is used because there is no guarantee that more than 450 // 16 KiB will be sent as a single TCP packet. 451 ws._socket.push(Buffer.concat(list)); 452 }); 453 454 duplex.on('close', () => { 455 assert.deepStrictEqual(called, ['read', 'drain']); 456 wss.close(done); 457 }); 458 } 459 ); 460 }); 461 462 it('handles backpressure (3/3)', (done) => { 463 const wss = new WebSocket.Server( 464 { port: 0, perMessageDeflate: true }, 465 () => { 466 const called = []; 467 const ws = new WebSocket(`ws://localhost:${wss.address().port}`); 468 const duplex = createWebSocketStream(ws); 469 const read = duplex._read; 470 471 duplex._read = () => { 472 called.push('read'); 473 assert.ok(!ws._receiver._writableState.needDrain); 474 read(); 475 assert.ok(!ws._socket.isPaused()); 476 duplex.end(); 477 }; 478 479 ws.on('open', () => { 480 ws._receiver.on('drain', () => { 481 called.push('drain'); 482 assert.ok(ws._socket.isPaused()); 483 duplex.resume(); 484 }); 485 486 const opts = { 487 fin: true, 488 opcode: 0x02, 489 mask: false, 490 readOnly: false 491 }; 492 493 const list = [ 494 ...Sender.frame(randomBytes(16 * 1024), { rsv1: false, ...opts }), 495 ...Sender.frame(Buffer.alloc(1), { rsv1: true, ...opts }) 496 ]; 497 498 ws._socket.push(Buffer.concat(list)); 499 }); 500 501 duplex.on('close', () => { 502 assert.deepStrictEqual(called, ['drain', 'read']); 503 wss.close(done); 504 }); 505 } 506 ); 507 }); 508 509 it('can be destroyed (1/2)', (done) => { 510 const wss = new WebSocket.Server({ port: 0 }, () => { 511 const error = new Error('Oops'); 512 const ws = new WebSocket(`ws://localhost:${wss.address().port}`); 513 const duplex = createWebSocketStream(ws); 514 515 duplex.on('error', (err) => { 516 assert.strictEqual(err, error); 517 518 duplex.on('close', () => { 519 wss.close(done); 520 }); 521 }); 522 523 ws.on('open', () => { 524 duplex.destroy(error); 525 }); 526 }); 527 }); 528 529 it('can be destroyed (2/2)', (done) => { 530 const wss = new WebSocket.Server({ port: 0 }, () => { 531 const ws = new WebSocket(`ws://localhost:${wss.address().port}`); 532 const duplex = createWebSocketStream(ws); 533 534 duplex.on('close', () => { 535 wss.close(done); 536 }); 537 538 ws.on('open', () => { 539 duplex.destroy(); 540 }); 541 }); 542 }); 543 544 it('converts text messages to strings in readable object mode', (done) => { 545 const wss = new WebSocket.Server({ port: 0 }, () => { 546 const events = []; 547 const ws = new WebSocket(`ws://localhost:${wss.address().port}`); 548 const duplex = createWebSocketStream(ws, { readableObjectMode: true }); 549 550 duplex.on('data', (data) => { 551 events.push('data'); 552 assert.strictEqual(data, 'foo'); 553 }); 554 555 duplex.on('end', () => { 556 events.push('end'); 557 duplex.end(); 558 }); 559 560 duplex.on('close', () => { 561 assert.deepStrictEqual(events, ['data', 'end']); 562 wss.close(done); 563 }); 564 }); 565 566 wss.on('connection', (ws) => { 567 ws.send('foo'); 568 ws.close(); 569 }); 570 }); 571 572 it('resumes the socket if `readyState` is `CLOSING`', (done) => { 573 const wss = new WebSocket.Server({ port: 0 }, () => { 574 const ws = new WebSocket(`ws://localhost:${wss.address().port}`); 575 const duplex = createWebSocketStream(ws); 576 577 ws.on('message', () => { 578 assert.ok(ws._socket.isPaused()); 579 580 duplex.on('close', () => { 581 wss.close(done); 582 }); 583 584 duplex.end(); 585 586 process.nextTick(() => { 587 assert.strictEqual(ws.readyState, WebSocket.CLOSING); 588 duplex.resume(); 589 }); 590 }); 591 }); 592 593 wss.on('connection', (ws) => { 594 ws.send(randomBytes(16 * 1024)); 595 }); 596 }); 597 }); 598 });