tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

create-websocket-stream.test.js (16779B)


      1 'use strict';
      2 
      3 const assert = require('assert');
      4 const EventEmitter = require('events');
      5 const { createServer } = require('http');
      6 const { Duplex } = require('stream');
      7 const { randomBytes } = require('crypto');
      8 
      9 const createWebSocketStream = require('../lib/stream');
     10 const Sender = require('../lib/sender');
     11 const WebSocket = require('..');
     12 const { EMPTY_BUFFER } = require('../lib/constants');
     13 
     14 describe('createWebSocketStream', () => {
     15  it('is exposed as a property of the `WebSocket` class', () => {
     16    assert.strictEqual(WebSocket.createWebSocketStream, createWebSocketStream);
     17  });
     18 
     19  it('returns a `Duplex` stream', () => {
     20    const duplex = createWebSocketStream(new EventEmitter());
     21 
     22    assert.ok(duplex instanceof Duplex);
     23  });
     24 
     25  it('passes the options object to the `Duplex` constructor', (done) => {
     26    const wss = new WebSocket.Server({ port: 0 }, () => {
     27      const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
     28      const duplex = createWebSocketStream(ws, {
     29        allowHalfOpen: false,
     30        encoding: 'utf8'
     31      });
     32 
     33      duplex.on('data', (chunk) => {
     34        assert.strictEqual(chunk, 'hi');
     35 
     36        duplex.on('close', () => {
     37          wss.close(done);
     38        });
     39      });
     40    });
     41 
     42    wss.on('connection', (ws) => {
     43      ws.send(Buffer.from('hi'));
     44      ws.close();
     45    });
     46  });
     47 
     48  describe('The returned stream', () => {
     49    it('buffers writes if `readyState` is `CONNECTING`', (done) => {
     50      const chunk = randomBytes(1024);
     51      const wss = new WebSocket.Server({ port: 0 }, () => {
     52        const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
     53 
     54        assert.strictEqual(ws.readyState, WebSocket.CONNECTING);
     55 
     56        const duplex = createWebSocketStream(ws);
     57 
     58        duplex.write(chunk);
     59      });
     60 
     61      wss.on('connection', (ws) => {
     62        ws.on('message', (message, isBinary) => {
     63          ws.on('close', (code, reason) => {
     64            assert.deepStrictEqual(message, chunk);
     65            assert.ok(isBinary);
     66            assert.strictEqual(code, 1005);
     67            assert.strictEqual(reason, EMPTY_BUFFER);
     68            wss.close(done);
     69          });
     70        });
     71 
     72        ws.close();
     73      });
     74    });
     75 
     76    it('errors if a write occurs when `readyState` is `CLOSING`', (done) => {
     77      const wss = new WebSocket.Server({ port: 0 }, () => {
     78        const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
     79        const duplex = createWebSocketStream(ws);
     80 
     81        duplex.on('error', (err) => {
     82          assert.ok(duplex.destroyed);
     83          assert.ok(err instanceof Error);
     84          assert.strictEqual(
     85            err.message,
     86            'WebSocket is not open: readyState 2 (CLOSING)'
     87          );
     88 
     89          duplex.on('close', () => {
     90            wss.close(done);
     91          });
     92        });
     93 
     94        ws.on('open', () => {
     95          ws._receiver.on('conclude', () => {
     96            duplex.write('hi');
     97          });
     98        });
     99      });
    100 
    101      wss.on('connection', (ws) => {
    102        ws.close();
    103      });
    104    });
    105 
    106    it('errors if a write occurs when `readyState` is `CLOSED`', (done) => {
    107      const wss = new WebSocket.Server({ port: 0 }, () => {
    108        const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
    109        const duplex = createWebSocketStream(ws);
    110 
    111        duplex.on('error', (err) => {
    112          assert.ok(duplex.destroyed);
    113          assert.ok(err instanceof Error);
    114          assert.strictEqual(
    115            err.message,
    116            'WebSocket is not open: readyState 3 (CLOSED)'
    117          );
    118 
    119          duplex.on('close', () => {
    120            wss.close(done);
    121          });
    122        });
    123 
    124        ws.on('close', () => {
    125          duplex.write('hi');
    126        });
    127      });
    128 
    129      wss.on('connection', (ws) => {
    130        ws.close();
    131      });
    132    });
    133 
    134    it('does not error if `_final()` is called while connecting', (done) => {
    135      const wss = new WebSocket.Server({ port: 0 }, () => {
    136        const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
    137 
    138        assert.strictEqual(ws.readyState, WebSocket.CONNECTING);
    139 
    140        const duplex = createWebSocketStream(ws);
    141 
    142        duplex.on('close', () => {
    143          wss.close(done);
    144        });
    145 
    146        duplex.resume();
    147        duplex.end();
    148      });
    149    });
    150 
    151    it('makes `_final()` a noop if no socket is assigned', (done) => {
    152      const server = createServer();
    153 
    154      server.on('upgrade', (request, socket) => {
    155        socket.on('end', socket.end);
    156 
    157        const headers = [
    158          'HTTP/1.1 101 Switching Protocols',
    159          'Upgrade: websocket',
    160          'Connection: Upgrade',
    161          'Sec-WebSocket-Accept: foo'
    162        ];
    163 
    164        socket.write(headers.concat('\r\n').join('\r\n'));
    165      });
    166 
    167      server.listen(() => {
    168        const called = [];
    169        const ws = new WebSocket(`ws://localhost:${server.address().port}`);
    170        const duplex = WebSocket.createWebSocketStream(ws);
    171        const final = duplex._final;
    172 
    173        duplex._final = (callback) => {
    174          called.push('final');
    175          assert.strictEqual(ws.readyState, WebSocket.CLOSING);
    176          assert.strictEqual(ws._socket, null);
    177 
    178          final(callback);
    179        };
    180 
    181        duplex.on('error', (err) => {
    182          called.push('error');
    183          assert.ok(err instanceof Error);
    184          assert.strictEqual(
    185            err.message,
    186            'Invalid Sec-WebSocket-Accept header'
    187          );
    188        });
    189 
    190        duplex.on('finish', () => {
    191          called.push('finish');
    192        });
    193 
    194        duplex.on('close', () => {
    195          assert.deepStrictEqual(called, ['final', 'error']);
    196          server.close(done);
    197        });
    198 
    199        ws.on('upgrade', () => {
    200          process.nextTick(() => {
    201            duplex.end();
    202          });
    203        });
    204      });
    205    });
    206 
    207    it('reemits errors', (done) => {
    208      let duplexCloseEventEmitted = false;
    209      let serverClientCloseEventEmitted = false;
    210 
    211      const wss = new WebSocket.Server({ port: 0 }, () => {
    212        const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
    213        const duplex = createWebSocketStream(ws);
    214 
    215        duplex.on('error', (err) => {
    216          assert.ok(err instanceof RangeError);
    217          assert.strictEqual(err.code, 'WS_ERR_INVALID_OPCODE');
    218          assert.strictEqual(
    219            err.message,
    220            'Invalid WebSocket frame: invalid opcode 5'
    221          );
    222 
    223          duplex.on('close', () => {
    224            duplexCloseEventEmitted = true;
    225            if (serverClientCloseEventEmitted) wss.close(done);
    226          });
    227        });
    228      });
    229 
    230      wss.on('connection', (ws) => {
    231        ws._socket.write(Buffer.from([0x85, 0x00]));
    232        ws.on('close', (code, reason) => {
    233          assert.strictEqual(code, 1002);
    234          assert.deepStrictEqual(reason, EMPTY_BUFFER);
    235 
    236          serverClientCloseEventEmitted = true;
    237          if (duplexCloseEventEmitted) wss.close(done);
    238        });
    239      });
    240    });
    241 
    242    it('does not swallow errors that may occur while destroying', (done) => {
    243      const frame = Buffer.concat(
    244        Sender.frame(Buffer.from([0x22, 0xfa, 0xec, 0x78]), {
    245          fin: true,
    246          rsv1: true,
    247          opcode: 0x02,
    248          mask: false,
    249          readOnly: false
    250        })
    251      );
    252 
    253      const wss = new WebSocket.Server(
    254        {
    255          perMessageDeflate: true,
    256          port: 0
    257        },
    258        () => {
    259          const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
    260          const duplex = createWebSocketStream(ws);
    261 
    262          duplex.on('error', (err) => {
    263            assert.ok(err instanceof Error);
    264            assert.strictEqual(err.code, 'Z_DATA_ERROR');
    265            assert.strictEqual(err.errno, -3);
    266 
    267            duplex.on('close', () => {
    268              wss.close(done);
    269            });
    270          });
    271 
    272          let bytesRead = 0;
    273 
    274          ws.on('open', () => {
    275            ws._socket.on('data', (chunk) => {
    276              bytesRead += chunk.length;
    277              if (bytesRead === frame.length) duplex.destroy();
    278            });
    279          });
    280        }
    281      );
    282 
    283      wss.on('connection', (ws) => {
    284        ws._socket.write(frame);
    285      });
    286    });
    287 
    288    it("does not suppress the throwing behavior of 'error' events", (done) => {
    289      const wss = new WebSocket.Server({ port: 0 }, () => {
    290        const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
    291        createWebSocketStream(ws);
    292      });
    293 
    294      wss.on('connection', (ws) => {
    295        ws._socket.write(Buffer.from([0x85, 0x00]));
    296      });
    297 
    298      assert.strictEqual(process.listenerCount('uncaughtException'), 1);
    299 
    300      const [listener] = process.listeners('uncaughtException');
    301 
    302      process.removeAllListeners('uncaughtException');
    303      process.once('uncaughtException', (err) => {
    304        assert.ok(err instanceof Error);
    305        assert.strictEqual(
    306          err.message,
    307          'Invalid WebSocket frame: invalid opcode 5'
    308        );
    309 
    310        process.on('uncaughtException', listener);
    311        wss.close(done);
    312      });
    313    });
    314 
    315    it("is destroyed after 'end' and 'finish' are emitted (1/2)", (done) => {
    316      const wss = new WebSocket.Server({ port: 0 }, () => {
    317        const events = [];
    318        const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
    319        const duplex = createWebSocketStream(ws);
    320 
    321        duplex.on('end', () => {
    322          events.push('end');
    323          assert.ok(duplex.destroyed);
    324        });
    325 
    326        duplex.on('close', () => {
    327          assert.deepStrictEqual(events, ['finish', 'end']);
    328          wss.close(done);
    329        });
    330 
    331        duplex.on('finish', () => {
    332          events.push('finish');
    333          assert.ok(!duplex.destroyed);
    334          assert.ok(duplex.readable);
    335 
    336          duplex.resume();
    337        });
    338 
    339        ws.on('close', () => {
    340          duplex.end();
    341        });
    342      });
    343 
    344      wss.on('connection', (ws) => {
    345        ws.send('foo');
    346        ws.close();
    347      });
    348    });
    349 
    350    it("is destroyed after 'end' and 'finish' are emitted (2/2)", (done) => {
    351      const wss = new WebSocket.Server({ port: 0 }, () => {
    352        const events = [];
    353        const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
    354        const duplex = createWebSocketStream(ws);
    355 
    356        duplex.on('end', () => {
    357          events.push('end');
    358          assert.ok(!duplex.destroyed);
    359          assert.ok(duplex.writable);
    360 
    361          duplex.end();
    362        });
    363 
    364        duplex.on('close', () => {
    365          assert.deepStrictEqual(events, ['end', 'finish']);
    366          wss.close(done);
    367        });
    368 
    369        duplex.on('finish', () => {
    370          events.push('finish');
    371        });
    372 
    373        duplex.resume();
    374      });
    375 
    376      wss.on('connection', (ws) => {
    377        ws.close();
    378      });
    379    });
    380 
    381    it('handles backpressure (1/3)', (done) => {
    382      const wss = new WebSocket.Server({ port: 0 }, () => {
    383        // eslint-disable-next-line no-unused-vars
    384        const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
    385      });
    386 
    387      wss.on('connection', (ws) => {
    388        const duplex = createWebSocketStream(ws);
    389 
    390        duplex.resume();
    391 
    392        duplex.on('drain', () => {
    393          duplex.on('close', () => {
    394            wss.close(done);
    395          });
    396 
    397          duplex.end();
    398        });
    399 
    400        const chunk = randomBytes(1024);
    401        let ret;
    402 
    403        do {
    404          ret = duplex.write(chunk);
    405        } while (ret !== false);
    406      });
    407    });
    408 
    409    it('handles backpressure (2/3)', (done) => {
    410      const wss = new WebSocket.Server(
    411        { port: 0, perMessageDeflate: true },
    412        () => {
    413          const called = [];
    414          const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
    415          const duplex = createWebSocketStream(ws);
    416          const read = duplex._read;
    417 
    418          duplex._read = () => {
    419            duplex._read = read;
    420            called.push('read');
    421            assert.ok(ws._receiver._writableState.needDrain);
    422            read();
    423            assert.ok(ws._socket.isPaused());
    424          };
    425 
    426          ws.on('open', () => {
    427            ws._socket.on('pause', () => {
    428              duplex.resume();
    429            });
    430 
    431            ws._receiver.on('drain', () => {
    432              called.push('drain');
    433              assert.ok(!ws._socket.isPaused());
    434              duplex.end();
    435            });
    436 
    437            const opts = {
    438              fin: true,
    439              opcode: 0x02,
    440              mask: false,
    441              readOnly: false
    442            };
    443 
    444            const list = [
    445              ...Sender.frame(randomBytes(16 * 1024), { rsv1: false, ...opts }),
    446              ...Sender.frame(Buffer.alloc(1), { rsv1: true, ...opts })
    447            ];
    448 
    449            // This hack is used because there is no guarantee that more than
    450            // 16 KiB will be sent as a single TCP packet.
    451            ws._socket.push(Buffer.concat(list));
    452          });
    453 
    454          duplex.on('close', () => {
    455            assert.deepStrictEqual(called, ['read', 'drain']);
    456            wss.close(done);
    457          });
    458        }
    459      );
    460    });
    461 
    462    it('handles backpressure (3/3)', (done) => {
    463      const wss = new WebSocket.Server(
    464        { port: 0, perMessageDeflate: true },
    465        () => {
    466          const called = [];
    467          const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
    468          const duplex = createWebSocketStream(ws);
    469          const read = duplex._read;
    470 
    471          duplex._read = () => {
    472            called.push('read');
    473            assert.ok(!ws._receiver._writableState.needDrain);
    474            read();
    475            assert.ok(!ws._socket.isPaused());
    476            duplex.end();
    477          };
    478 
    479          ws.on('open', () => {
    480            ws._receiver.on('drain', () => {
    481              called.push('drain');
    482              assert.ok(ws._socket.isPaused());
    483              duplex.resume();
    484            });
    485 
    486            const opts = {
    487              fin: true,
    488              opcode: 0x02,
    489              mask: false,
    490              readOnly: false
    491            };
    492 
    493            const list = [
    494              ...Sender.frame(randomBytes(16 * 1024), { rsv1: false, ...opts }),
    495              ...Sender.frame(Buffer.alloc(1), { rsv1: true, ...opts })
    496            ];
    497 
    498            ws._socket.push(Buffer.concat(list));
    499          });
    500 
    501          duplex.on('close', () => {
    502            assert.deepStrictEqual(called, ['drain', 'read']);
    503            wss.close(done);
    504          });
    505        }
    506      );
    507    });
    508 
    509    it('can be destroyed (1/2)', (done) => {
    510      const wss = new WebSocket.Server({ port: 0 }, () => {
    511        const error = new Error('Oops');
    512        const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
    513        const duplex = createWebSocketStream(ws);
    514 
    515        duplex.on('error', (err) => {
    516          assert.strictEqual(err, error);
    517 
    518          duplex.on('close', () => {
    519            wss.close(done);
    520          });
    521        });
    522 
    523        ws.on('open', () => {
    524          duplex.destroy(error);
    525        });
    526      });
    527    });
    528 
    529    it('can be destroyed (2/2)', (done) => {
    530      const wss = new WebSocket.Server({ port: 0 }, () => {
    531        const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
    532        const duplex = createWebSocketStream(ws);
    533 
    534        duplex.on('close', () => {
    535          wss.close(done);
    536        });
    537 
    538        ws.on('open', () => {
    539          duplex.destroy();
    540        });
    541      });
    542    });
    543 
    544    it('converts text messages to strings in readable object mode', (done) => {
    545      const wss = new WebSocket.Server({ port: 0 }, () => {
    546        const events = [];
    547        const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
    548        const duplex = createWebSocketStream(ws, { readableObjectMode: true });
    549 
    550        duplex.on('data', (data) => {
    551          events.push('data');
    552          assert.strictEqual(data, 'foo');
    553        });
    554 
    555        duplex.on('end', () => {
    556          events.push('end');
    557          duplex.end();
    558        });
    559 
    560        duplex.on('close', () => {
    561          assert.deepStrictEqual(events, ['data', 'end']);
    562          wss.close(done);
    563        });
    564      });
    565 
    566      wss.on('connection', (ws) => {
    567        ws.send('foo');
    568        ws.close();
    569      });
    570    });
    571 
    572    it('resumes the socket if `readyState` is `CLOSING`', (done) => {
    573      const wss = new WebSocket.Server({ port: 0 }, () => {
    574        const ws = new WebSocket(`ws://localhost:${wss.address().port}`);
    575        const duplex = createWebSocketStream(ws);
    576 
    577        ws.on('message', () => {
    578          assert.ok(ws._socket.isPaused());
    579 
    580          duplex.on('close', () => {
    581            wss.close(done);
    582          });
    583 
    584          duplex.end();
    585 
    586          process.nextTick(() => {
    587            assert.strictEqual(ws.readyState, WebSocket.CLOSING);
    588            duplex.resume();
    589          });
    590        });
    591      });
    592 
    593      wss.on('connection', (ws) => {
    594        ws.send(randomBytes(16 * 1024));
    595      });
    596    });
    597  });
    598 });