tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

transport.js (18111B)


      1 /* This Source Code Form is subject to the terms of the Mozilla Public
      2 * License, v. 2.0. If a copy of the MPL was not distributed with this
      3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      4 
      5 "use strict";
      6 
      7 const DevToolsUtils = require("resource://devtools/shared/DevToolsUtils.js");
      8 const { dumpn, dumpv } = DevToolsUtils;
      9 const flags = require("resource://devtools/shared/flags.js");
     10 const StreamUtils = require("resource://devtools/shared/transport/stream-utils.js");
     11 const {
     12  Packet,
     13  JSONPacket,
     14  BulkPacket,
     15 } = require("resource://devtools/shared/transport/packets.js");
     16 
     17 loader.lazyGetter(this, "ScriptableInputStream", () => {
     18  return Components.Constructor(
     19    "@mozilla.org/scriptableinputstream;1",
     20    "nsIScriptableInputStream",
     21    "init"
     22  );
     23 });
     24 
     25 const PACKET_HEADER_MAX = 200;
     26 
     27 /**
     28 * An adapter that handles data transfers between the devtools client and
     29 * server. It can work with both nsIPipe and nsIServerSocket transports so
     30 * long as the properly created input and output streams are specified.
     31 * (However, for intra-process connections, LocalDebuggerTransport, below,
     32 * is more efficient than using an nsIPipe pair with DebuggerTransport.)
     33 *
     34 * Given a DebuggerTransport instance dt:
     35 * 1) Set dt.hooks to a packet handler object (described below).
     36 * 2) Call dt.ready() to begin watching for input packets.
     37 * 3) Call dt.send() / dt.startBulkSend() to send packets.
     38 * 4) Call dt.close() to close the connection, and disengage from the event
     39 *    loop.
     40 *
     41 * A packet handler is an object with the following methods:
     42 *
     43 * - onPacket(packet) - called when we have received a complete packet.
     44 *   |packet| is the parsed form of the packet --- a JavaScript value, not
     45 *   a JSON-syntax string.
     46 *
     47 * - onBulkPacket(packet) - called when we have switched to bulk packet
     48 *   receiving mode. |packet| is an object containing:
     49 *   * actor:        Name of actor that will receive the packet
     50 *   * type:         Name of actor's method that should be called on receipt
     51 *   * length:       Size of the data to be read
     52 *   * stream:       This input stream should only be used directly if you can ensure
     53 *                   that you will read exactly |length| bytes and will not close the
     54 *                   stream when reading is complete
     55 *   * done:         If you use the stream directly (instead of |copyTo| or
     56 *                   |copyToBuffer| below), you must signal completion by
     57 *                   resolving / rejecting this deferred. If it's rejected, the
     58 *                   transport will be closed.  If an Error is supplied as a
     59 *                   rejection value, it will be logged via |dumpn|. If you do
     60 *                   use |copyTo| or |copyToBuffer|, resolving is taken care of
     61 *                   for you when copying completes.
     62 *   * copyTo:       A helper function for getting your data out of the stream that
     63 *                   meets the stream handling requirements above, and has the
     64 *                   following signature:
     65 *
     66 *     @param  output nsIAsyncOutputStream
     67 *                   The stream to copy to.
     68 *     @return Promise
     69 *                   The promise is resolved when copying completes or rejected if any
     70 *                   (unexpected) error occurs.
     71 *                   This object also emits "progress" events for each chunk that is
     72 *                   copied.  See stream-utils.js.
     73 *   * copyToBuffer: a helper function for getting your data out of the stream
     74 *                   that meets the stream handling requirements above, and has
     75 *                   the following signature:
     76 *     @param output ArrayBuffer
     77 *                   The buffer to copy to. It needs to be the same length as the data
     78 *                   to be transfered.
     79 *     @return Promise
     80 *                   The promise is resolved when copying completes or rejected if any
     81 *                   (unexpected) error occurs.
     82 *
     83 * - onTransportClosed(reason) - called when the connection is closed. |reason| is
     84 *   an optional nsresult or object, typically passed when the transport is
     85 *   closed due to some error in a underlying stream.
     86 *
     87 * See ./packets.js and the Remote Debugging Protocol specification for more
     88 * details on the format of these packets.
     89 */
     90 class DebuggerTransport {
     91  /**
     92   * @param {nsIAsyncInputStream} input
     93   *        The input stream.
     94   * @param {nsIAsyncOutputStream} output
     95   *        The output stream.
     96   */
     97  constructor(input, output) {
     98    this._input = input;
     99    this._scriptableInput = new ScriptableInputStream(input);
    100    this._output = output;
    101 
    102    // The current incoming (possibly partial) header, which will determine which
    103    // type of Packet |_incoming| below will become.
    104    this._incomingHeader = "";
    105    // The current incoming Packet object
    106    this._incoming = null;
    107    // A queue of outgoing Packet objects
    108    this._outgoing = [];
    109 
    110    this.hooks = null;
    111    this.active = false;
    112 
    113    this._incomingEnabled = true;
    114    this._outgoingEnabled = true;
    115 
    116    this.close = this.close.bind(this);
    117  }
    118 
    119  /**
    120   * Transmit an object as a JSON packet.
    121   *
    122   * This method returns immediately, without waiting for the entire
    123   * packet to be transmitted, registering event handlers as needed to
    124   * transmit the entire packet. Packets are transmitted in the order
    125   * they are passed to this method.
    126   */
    127  send(object) {
    128    const packet = new JSONPacket(this);
    129    packet.object = object;
    130    this._outgoing.push(packet);
    131    this._flushOutgoing();
    132  }
    133 
    134  /**
    135   * Transmit streaming data via a bulk packet.
    136   *
    137   * This method initiates the bulk send process by queuing up the header data.
    138   * The caller receives eventual access to a stream for writing.
    139   *
    140   * N.B.: Do *not* attempt to close the stream handed to you, as it will
    141   * continue to be used by this transport afterwards.  Most users should
    142   * instead use the provided |copyFrom| or |copyFromBuffer| functions instead.
    143   *
    144   * @param {object} header
    145   *        This is modeled after the format of JSON packets above, but does not
    146   *        actually contain the data, but is instead just a routing header:
    147   *          * actor:  Name of actor that will receive the packet
    148   *          * type:   Name of actor's method that should be called on receipt
    149   *          * length: Size of the data to be sent
    150   * @return {Promise}
    151   *         The promise will be resolved when you are allowed to write to the
    152   *         stream with an object containing:
    153   *           * stream:         This output stream should only be used directly if
    154   *                             you can ensure that you will write exactly |length|
    155   *                             bytes and will not close the stream when writing is
    156   *                             complete
    157   *           * done:           If you use the stream directly (instead of |copyFrom|
    158   *                             or |copyFromBuffer| below), you must signal
    159   *                             completion by resolving / rejecting this
    160   *                             deferred.  If it's rejected, the transport will
    161   *                             be closed.  If an Error is supplied as a
    162   *                             rejection value, it will be logged via |dumpn|.
    163   *                             If you do use |copyFrom| or |copyFromBuffer|,
    164   *                             resolving is taken care of for you when copying
    165   *                             completes.
    166   *           * copyFrom:       A helper function for getting your data onto the
    167   *                             stream that meets the stream handling requirements
    168   *                             above, and has the following signature:
    169   *             @param  input nsIAsyncInputStream
    170   *                     The stream to copy from.
    171   *             @return Promise
    172   *                     The promise is resolved when copying completes or
    173   *                     rejected if any (unexpected) errors occur.
    174   *                     This object also emits "progress" events for each chunk
    175   *                     that is copied.  See stream-utils.js.
    176   *           * copyFromBuffer: A helper function for getting your data onto
    177   *                             the stream that meets the stream handling
    178   *                             requirements above, and has the following
    179   *                             signature:
    180   *             @param  input ArrayBuffer
    181   *                     The buffer to read from. It needs to be the same length
    182   *                     as the data to write.
    183   *             @return Promise
    184   *                     The promise is resolved when copying completes or
    185   *                     rejected if any (unexpected) errors occur.
    186   */
    187  startBulkSend(header) {
    188    const packet = new BulkPacket(this);
    189    packet.header = header;
    190    this._outgoing.push(packet);
    191    this._flushOutgoing();
    192    return packet.streamReadyForWriting;
    193  }
    194 
    195  /**
    196   * Close the transport.
    197   *
    198   * @param reason nsresult / object (optional)
    199   *        The status code or error message that corresponds to the reason for
    200   *        closing the transport (likely because a stream closed or failed).
    201   */
    202  close(reason) {
    203    this.active = false;
    204    this._input.close();
    205    this._scriptableInput.close();
    206    this._output.close();
    207    this._destroyIncoming();
    208    this._destroyAllOutgoing();
    209    if (this.hooks) {
    210      this.hooks.onTransportClosed(reason);
    211      this.hooks = null;
    212    }
    213    if (reason) {
    214      dumpn("Transport closed: " + DevToolsUtils.safeErrorString(reason));
    215    } else {
    216      dumpn("Transport closed.");
    217    }
    218  }
    219 
    220  /**
    221   * The currently outgoing packet (at the top of the queue).
    222   */
    223  get _currentOutgoing() {
    224    return this._outgoing[0];
    225  }
    226 
    227  /**
    228   * Flush data to the outgoing stream.  Waits until the output stream notifies
    229   * us that it is ready to be written to (via onOutputStreamReady).
    230   */
    231  _flushOutgoing() {
    232    if (!this._outgoingEnabled || this._outgoing.length === 0) {
    233      return;
    234    }
    235 
    236    // If the top of the packet queue has nothing more to send, remove it.
    237    if (this._currentOutgoing.done) {
    238      this._finishCurrentOutgoing();
    239    }
    240 
    241    if (this._outgoing.length) {
    242      const threadManager = Cc["@mozilla.org/thread-manager;1"].getService();
    243      this._output.asyncWait(this, 0, 0, threadManager.currentThread);
    244    }
    245  }
    246 
    247  /**
    248   * Pause this transport's attempts to write to the output stream.  This is
    249   * used when we've temporarily handed off our output stream for writing bulk
    250   * data.
    251   */
    252  pauseOutgoing() {
    253    this._outgoingEnabled = false;
    254  }
    255 
    256  /**
    257   * Resume this transport's attempts to write to the output stream.
    258   */
    259  resumeOutgoing() {
    260    this._outgoingEnabled = true;
    261    this._flushOutgoing();
    262  }
    263 
    264  // nsIOutputStreamCallback
    265  /**
    266   * This is called when the output stream is ready for more data to be written.
    267   * The current outgoing packet will attempt to write some amount of data, but
    268   * may not complete.
    269   */
    270  onOutputStreamReady = DevToolsUtils.makeInfallible(function (stream) {
    271    if (!this._outgoingEnabled || this._outgoing.length === 0) {
    272      return;
    273    }
    274 
    275    try {
    276      this._currentOutgoing.write(stream);
    277    } catch (e) {
    278      if (e.result != Cr.NS_BASE_STREAM_WOULD_BLOCK) {
    279        this.close(e.result);
    280        return;
    281      }
    282      throw e;
    283    }
    284 
    285    this._flushOutgoing();
    286  }, "DebuggerTransport.prototype.onOutputStreamReady");
    287 
    288  /**
    289   * Remove the current outgoing packet from the queue upon completion.
    290   */
    291  _finishCurrentOutgoing() {
    292    if (this._currentOutgoing) {
    293      this._currentOutgoing.destroy();
    294      this._outgoing.shift();
    295    }
    296  }
    297 
    298  /**
    299   * Clear the entire outgoing queue.
    300   */
    301  _destroyAllOutgoing() {
    302    for (const packet of this._outgoing) {
    303      packet.destroy();
    304    }
    305    this._outgoing = [];
    306  }
    307 
    308  /**
    309   * Initialize the input stream for reading. Once this method has been called,
    310   * we watch for packets on the input stream, and pass them to the appropriate
    311   * handlers via this.hooks.
    312   */
    313  ready() {
    314    this.active = true;
    315    this._waitForIncoming();
    316  }
    317 
    318  /**
    319   * Asks the input stream to notify us (via onInputStreamReady) when it is
    320   * ready for reading.
    321   */
    322  _waitForIncoming() {
    323    if (this._incomingEnabled) {
    324      const threadManager = Cc["@mozilla.org/thread-manager;1"].getService();
    325      this._input.asyncWait(this, 0, 0, threadManager.currentThread);
    326    }
    327  }
    328 
    329  /**
    330   * Pause this transport's attempts to read from the input stream.  This is
    331   * used when we've temporarily handed off our input stream for reading bulk
    332   * data.
    333   */
    334  pauseIncoming() {
    335    this._incomingEnabled = false;
    336  }
    337 
    338  /**
    339   * Resume this transport's attempts to read from the input stream.
    340   */
    341  resumeIncoming() {
    342    this._incomingEnabled = true;
    343    this._flushIncoming();
    344    this._waitForIncoming();
    345  }
    346 
    347  // nsIInputStreamCallback
    348  /**
    349   * Called when the stream is either readable or closed.
    350   */
    351  onInputStreamReady = DevToolsUtils.makeInfallible(function (stream) {
    352    try {
    353      while (
    354        stream.available() &&
    355        this._incomingEnabled &&
    356        this._processIncoming(stream, stream.available())
    357      ) {
    358        // Loop until there is nothing more to process
    359      }
    360      this._waitForIncoming();
    361    } catch (e) {
    362      if (e.result != Cr.NS_BASE_STREAM_WOULD_BLOCK) {
    363        this.close(e.result);
    364      } else {
    365        throw e;
    366      }
    367    }
    368  }, "DebuggerTransport.prototype.onInputStreamReady");
    369 
    370  /**
    371   * Process the incoming data.  Will create a new currently incoming Packet if
    372   * needed.  Tells the incoming Packet to read as much data as it can, but
    373   * reading may not complete.  The Packet signals that its data is ready for
    374   * delivery by calling one of this transport's _on*Ready methods (see
    375   * ./packets.js and the _on*Ready methods below).
    376   *
    377   * @return {boolean}
    378   *         Whether incoming stream processing should continue for any
    379   *         remaining data.
    380   */
    381  _processIncoming(stream, count) {
    382    dumpv("Data available: " + count);
    383 
    384    if (!count) {
    385      dumpv("Nothing to read, skipping");
    386      return false;
    387    }
    388 
    389    try {
    390      if (!this._incoming) {
    391        dumpv("Creating a new packet from incoming");
    392 
    393        if (!this._readHeader(stream)) {
    394          // Not enough data to read packet type
    395          return false;
    396        }
    397 
    398        // Attempt to create a new Packet by trying to parse each possible
    399        // header pattern.
    400        this._incoming = Packet.fromHeader(this._incomingHeader, this);
    401        if (!this._incoming) {
    402          throw new Error(
    403            "No packet types for header: " + this._incomingHeader
    404          );
    405        }
    406      }
    407 
    408      if (!this._incoming.done) {
    409        // We have an incomplete packet, keep reading it.
    410        dumpv("Existing packet incomplete, keep reading");
    411        this._incoming.read(stream, this._scriptableInput);
    412      }
    413    } catch (e) {
    414      const msg =
    415        "Error reading incoming packet: (" + e + " - " + e.stack + ")";
    416      dumpn(msg);
    417 
    418      // Now in an invalid state, shut down the transport.
    419      this.close();
    420      return false;
    421    }
    422 
    423    if (!this._incoming.done) {
    424      // Still not complete, we'll wait for more data.
    425      dumpv("Packet not done, wait for more");
    426      return true;
    427    }
    428 
    429    // Ready for next packet
    430    this._flushIncoming();
    431    return true;
    432  }
    433 
    434  /**
    435   * Read as far as we can into the incoming data, attempting to build up a
    436   * complete packet header (which terminates with ":").  We'll only read up to
    437   * PACKET_HEADER_MAX characters.
    438   *
    439   * @return boolean
    440   *         True if we now have a complete header.
    441   */
    442  _readHeader() {
    443    const amountToRead = PACKET_HEADER_MAX - this._incomingHeader.length;
    444    this._incomingHeader += StreamUtils.delimitedRead(
    445      this._scriptableInput,
    446      ":",
    447      amountToRead
    448    );
    449    if (flags.wantVerbose) {
    450      dumpv("Header read: " + this._incomingHeader);
    451    }
    452 
    453    if (this._incomingHeader.endsWith(":")) {
    454      if (flags.wantVerbose) {
    455        dumpv("Found packet header successfully: " + this._incomingHeader);
    456      }
    457      return true;
    458    }
    459 
    460    if (this._incomingHeader.length >= PACKET_HEADER_MAX) {
    461      throw new Error("Failed to parse packet header!");
    462    }
    463 
    464    // Not enough data yet.
    465    return false;
    466  }
    467 
    468  /**
    469   * If the incoming packet is done, log it as needed and clear the buffer.
    470   */
    471  _flushIncoming() {
    472    if (!this._incoming.done) {
    473      return;
    474    }
    475    if (flags.wantLogging) {
    476      dumpn("Got: " + this._incoming);
    477    }
    478    this._destroyIncoming();
    479  }
    480 
    481  /**
    482   * Handler triggered by an incoming JSONPacket completing it's |read| method.
    483   * Delivers the packet to this.hooks.onPacket.
    484   */
    485  _onJSONObjectReady(object) {
    486    DevToolsUtils.executeSoon(
    487      DevToolsUtils.makeInfallible(() => {
    488        // Ensure the transport is still alive by the time this runs.
    489        if (this.active) {
    490          this.hooks.onPacket(object);
    491        }
    492      }, "DebuggerTransport instance's this.hooks.onPacket")
    493    );
    494  }
    495 
    496  /**
    497   * Handler triggered by an incoming BulkPacket entering the |read| phase for
    498   * the stream portion of the packet.  Delivers info about the incoming
    499   * streaming data to this.hooks.onBulkPacket.  See the main comment on the
    500   * transport at the top of this file for more details.
    501   */
    502  _onBulkReadReady(...args) {
    503    DevToolsUtils.executeSoon(
    504      DevToolsUtils.makeInfallible(() => {
    505        // Ensure the transport is still alive by the time this runs.
    506        if (this.active) {
    507          this.hooks.onBulkPacket(...args);
    508        }
    509      }, "DebuggerTransport instance's this.hooks.onBulkPacket")
    510    );
    511  }
    512 
    513  /**
    514   * Remove all handlers and references related to the current incoming packet,
    515   * either because it is now complete or because the transport is closing.
    516   */
    517  _destroyIncoming() {
    518    if (this._incoming) {
    519      this._incoming.destroy();
    520    }
    521    this._incomingHeader = "";
    522    this._incoming = null;
    523  }
    524 }
    525 
    526 exports.DebuggerTransport = DebuggerTransport;