tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

stream.js (4083B)


      1 'use strict';
      2 
      3 const { Duplex } = require('stream');
      4 
      5 /**
      6 * Emits the `'close'` event on a stream.
      7 *
      8 * @param {Duplex} stream The stream.
      9 * @private
     10 */
     11 function emitClose(stream) {
     12  stream.emit('close');
     13 }
     14 
     15 /**
     16 * The listener of the `'end'` event.
     17 *
     18 * @private
     19 */
     20 function duplexOnEnd() {
     21  if (!this.destroyed && this._writableState.finished) {
     22    this.destroy();
     23  }
     24 }
     25 
     26 /**
     27 * The listener of the `'error'` event.
     28 *
     29 * @param {Error} err The error
     30 * @private
     31 */
     32 function duplexOnError(err) {
     33  this.removeListener('error', duplexOnError);
     34  this.destroy();
     35  if (this.listenerCount('error') === 0) {
     36    // Do not suppress the throwing behavior.
     37    this.emit('error', err);
     38  }
     39 }
     40 
     41 /**
     42 * Wraps a `WebSocket` in a duplex stream.
     43 *
     44 * @param {WebSocket} ws The `WebSocket` to wrap
     45 * @param {Object} [options] The options for the `Duplex` constructor
     46 * @return {Duplex} The duplex stream
     47 * @public
     48 */
     49 function createWebSocketStream(ws, options) {
     50  let terminateOnDestroy = true;
     51 
     52  const duplex = new Duplex({
     53    ...options,
     54    autoDestroy: false,
     55    emitClose: false,
     56    objectMode: false,
     57    writableObjectMode: false
     58  });
     59 
     60  ws.on('message', function message(msg, isBinary) {
     61    const data =
     62      !isBinary && duplex._readableState.objectMode ? msg.toString() : msg;
     63 
     64    if (!duplex.push(data)) ws.pause();
     65  });
     66 
     67  ws.once('error', function error(err) {
     68    if (duplex.destroyed) return;
     69 
     70    // Prevent `ws.terminate()` from being called by `duplex._destroy()`.
     71    //
     72    // - If the `'error'` event is emitted before the `'open'` event, then
     73    //   `ws.terminate()` is a noop as no socket is assigned.
     74    // - Otherwise, the error is re-emitted by the listener of the `'error'`
     75    //   event of the `Receiver` object. The listener already closes the
     76    //   connection by calling `ws.close()`. This allows a close frame to be
     77    //   sent to the other peer. If `ws.terminate()` is called right after this,
     78    //   then the close frame might not be sent.
     79    terminateOnDestroy = false;
     80    duplex.destroy(err);
     81  });
     82 
     83  ws.once('close', function close() {
     84    if (duplex.destroyed) return;
     85 
     86    duplex.push(null);
     87  });
     88 
     89  duplex._destroy = function (err, callback) {
     90    if (ws.readyState === ws.CLOSED) {
     91      callback(err);
     92      process.nextTick(emitClose, duplex);
     93      return;
     94    }
     95 
     96    let called = false;
     97 
     98    ws.once('error', function error(err) {
     99      called = true;
    100      callback(err);
    101    });
    102 
    103    ws.once('close', function close() {
    104      if (!called) callback(err);
    105      process.nextTick(emitClose, duplex);
    106    });
    107 
    108    if (terminateOnDestroy) ws.terminate();
    109  };
    110 
    111  duplex._final = function (callback) {
    112    if (ws.readyState === ws.CONNECTING) {
    113      ws.once('open', function open() {
    114        duplex._final(callback);
    115      });
    116      return;
    117    }
    118 
    119    // If the value of the `_socket` property is `null` it means that `ws` is a
    120    // client websocket and the handshake failed. In fact, when this happens, a
    121    // socket is never assigned to the websocket. Wait for the `'error'` event
    122    // that will be emitted by the websocket.
    123    if (ws._socket === null) return;
    124 
    125    if (ws._socket._writableState.finished) {
    126      callback();
    127      if (duplex._readableState.endEmitted) duplex.destroy();
    128    } else {
    129      ws._socket.once('finish', function finish() {
    130        // `duplex` is not destroyed here because the `'end'` event will be
    131        // emitted on `duplex` after this `'finish'` event. The EOF signaling
    132        // `null` chunk is, in fact, pushed when the websocket emits `'close'`.
    133        callback();
    134      });
    135      ws.close();
    136    }
    137  };
    138 
    139  duplex._read = function () {
    140    if (ws.isPaused) ws.resume();
    141  };
    142 
    143  duplex._write = function (chunk, encoding, callback) {
    144    if (ws.readyState === ws.CONNECTING) {
    145      ws.once('open', function open() {
    146        duplex._write(chunk, encoding, callback);
    147      });
    148      return;
    149    }
    150 
    151    ws.send(chunk, callback);
    152  };
    153 
    154  duplex.on('end', duplexOnEnd);
    155  duplex.on('error', duplexOnError);
    156  return duplex;
    157 }
    158 
    159 module.exports = createWebSocketStream;