tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

transport.sys.mjs (16600B)


      1 /* This Source Code Form is subject to the terms of the Mozilla Public
      2 * License, v. 2.0. If a copy of the MPL was not distributed with this
      3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      4 
      5 const lazy = {};
      6 
      7 ChromeUtils.defineESModuleGetters(lazy, {
      8  EventEmitter: "resource://gre/modules/EventEmitter.sys.mjs",
      9 
     10  BulkPacket: "chrome://remote/content/marionette/packets.sys.mjs",
     11  executeSoon: "chrome://remote/content/shared/Sync.sys.mjs",
     12  JSONPacket: "chrome://remote/content/marionette/packets.sys.mjs",
     13  Packet: "chrome://remote/content/marionette/packets.sys.mjs",
     14  StreamUtils: "chrome://remote/content/marionette/stream-utils.sys.mjs",
     15 });
     16 
     17 ChromeUtils.defineLazyGetter(lazy, "ScriptableInputStream", () => {
     18  return Components.Constructor(
     19    "@mozilla.org/scriptableinputstream;1",
     20    "nsIScriptableInputStream",
     21    "init"
     22  );
     23 });
     24 
     25 const flags = { wantVerbose: false, wantLogging: false };
     26 
     27 const dumpv = flags.wantVerbose
     28  ? function (msg) {
     29      dump(msg + "\n");
     30    }
     31  : function () {};
     32 
     33 const PACKET_HEADER_MAX = 200;
     34 
     35 /**
     36 * An adapter that handles data transfers between the debugger client
     37 * and server. It can work with both nsIPipe and nsIServerSocket
     38 * transports so long as the properly created input and output streams
     39 * are specified.  (However, for intra-process connections,
     40 * LocalDebuggerTransport, below, is more efficient than using an nsIPipe
     41 * pair with DebuggerTransport.)
     42 *
     43 * @param {nsIAsyncInputStream} input
     44 *     The input stream.
     45 * @param {nsIAsyncOutputStream} output
     46 *     The output stream.
     47 *
     48 * Given a DebuggerTransport instance dt:
     49 * 1) Set dt.hooks to a packet handler object (described below).
     50 * 2) Call dt.ready() to begin watching for input packets.
     51 * 3) Call dt.send() / dt.startBulkSend() to send packets.
     52 * 4) Call dt.close() to close the connection, and disengage from
     53 *    the event loop.
     54 *
     55 * A packet handler is an object with the following methods:
     56 *
     57 * - onPacket(packet) - called when we have received a complete packet.
     58 *   |packet| is the parsed form of the packet --- a JavaScript value, not
     59 *   a JSON-syntax string.
     60 *
     61 * - onBulkPacket(packet) - called when we have switched to bulk packet
     62 *   receiving mode. |packet| is an object containing:
     63 *   actor:  Name of actor that will receive the packet
     64 *   type:   Name of actor's method that should be called on receipt
     65 *   length: Size of the data to be read
     66 *   stream: This input stream should only be used directly if you
     67 *             can ensure that you will read exactly |length| bytes and
     68 *             will not close the stream when reading is complete
     69 *   done:   If you use the stream directly (instead of |copyTo|
     70 *             below), you must signal completion by resolving/rejecting
     71 *             this deferred.  If it's rejected, the transport will
     72 *             be closed.  If an Error is supplied as a rejection value,
     73 *             it will be logged via |dump|.  If you do use |copyTo|,
     74 *             resolving is taken care of for you when copying completes.
     75 *   copyTo: A helper function for getting your data out of the
     76 *             stream that meets the stream handling requirements above,
     77 *             and has the following signature:
     78 *
     79 *             - params
     80 *                {nsIAsyncOutputStream} output
     81 *                  The stream to copy to.
     82 *             - returns {Promise}
     83 *                  The promise is resolved when copying completes or
     84 *                  rejected if any (unexpected) errors occur.  This object
     85 *                  also emits "progress" events for each chunk that is
     86 *                  copied.  See stream-utils.js.
     87 *
     88 * - onClosed(reason) - called when the connection is closed. |reason|
     89 *   is an optional nsresult or object, typically passed when the
     90 *   transport is closed due to some error in a underlying stream.
     91 *
     92 * See ./packets.js and the Remote Debugging Protocol specification for
     93 * more details on the format of these packets.
     94 *
     95 * @class
     96 */
     97 export function DebuggerTransport(input, output) {
     98  lazy.EventEmitter.decorate(this);
     99 
    100  this._input = input;
    101  this._scriptableInput = new lazy.ScriptableInputStream(input);
    102  this._output = output;
    103 
    104  // The current incoming (possibly partial) header, which will determine
    105  // which type of Packet |_incoming| below will become.
    106  this._incomingHeader = "";
    107  // The current incoming Packet object
    108  this._incoming = null;
    109  // A queue of outgoing Packet objects
    110  this._outgoing = [];
    111 
    112  this.hooks = null;
    113  this.active = false;
    114 
    115  this._incomingEnabled = true;
    116  this._outgoingEnabled = true;
    117 
    118  this.close = this.close.bind(this);
    119 }
    120 
    121 DebuggerTransport.prototype = {
    122  /**
    123   * Transmit an object as a JSON packet.
    124   *
    125   * This method returns immediately, without waiting for the entire
    126   * packet to be transmitted, registering event handlers as needed to
    127   * transmit the entire packet. Packets are transmitted in the order they
    128   * are passed to this method.
    129   */
    130  send(object) {
    131    this.emit("send", object);
    132 
    133    let packet = new lazy.JSONPacket(this);
    134    packet.object = object;
    135    this._outgoing.push(packet);
    136    this._flushOutgoing();
    137  },
    138 
    139  /**
    140   * Transmit streaming data via a bulk packet.
    141   *
    142   * This method initiates the bulk send process by queuing up the header
    143   * data.  The caller receives eventual access to a stream for writing.
    144   *
    145   * N.B.: Do *not* attempt to close the stream handed to you, as it
    146   * will continue to be used by this transport afterwards.  Most users
    147   * should instead use the provided |copyFrom| function instead.
    148   *
    149   * @param {object} header
    150   *     This is modeled after the format of JSON packets above, but does
    151   *     not actually contain the data, but is instead just a routing
    152   *     header:
    153   *
    154   *       - actor:  Name of actor that will receive the packet
    155   *       - type:   Name of actor's method that should be called on receipt
    156   *       - length: Size of the data to be sent
    157   *
    158   * @returns {Promise}
    159   *     The promise will be resolved when you are allowed to write to
    160   *     the stream with an object containing:
    161   *
    162   *       - stream:   This output stream should only be used directly
    163   *                   if you can ensure that you will write exactly
    164   *                   |length| bytes and will not close the stream when
    165   *                    writing is complete.
    166   *       - done:     If you use the stream directly (instead of
    167   *                   |copyFrom| below), you must signal completion by
    168   *                   resolving/rejecting this deferred.  If it's
    169   *                   rejected, the transport will be closed.  If an
    170   *                   Error is supplied as a rejection value, it will
    171   *                   be logged via |dump|.  If you do use |copyFrom|,
    172   *                   resolving is taken care of for you when copying
    173   *                   completes.
    174   *       - copyFrom: A helper function for getting your data onto the
    175   *                   stream that meets the stream handling requirements
    176   *                   above, and has the following signature:
    177   *
    178   *                   - params
    179   *                     {nsIAsyncInputStream} input
    180   *                       The stream to copy from.
    181   *                   - returns {Promise}
    182   *                       The promise is resolved when copying completes
    183   *                       or rejected if any (unexpected) errors occur.
    184   *                       This object also emits "progress" events for
    185   *                       each chunkthat is copied.  See stream-utils.js.
    186   */
    187  startBulkSend(header) {
    188    this.emit("startbulksend", header);
    189 
    190    let packet = new lazy.BulkPacket(this);
    191    packet.header = header;
    192    this._outgoing.push(packet);
    193    this._flushOutgoing();
    194    return packet.streamReadyForWriting;
    195  },
    196 
    197  /**
    198   * Close the transport.
    199   *
    200   * @param {(nsresult|object)=} reason
    201   *     The status code or error message that corresponds to the reason
    202   *     for closing the transport (likely because a stream closed
    203   *     or failed).
    204   */
    205  close(reason) {
    206    this.emit("close", reason);
    207 
    208    this.active = false;
    209    this._input.close();
    210    this._scriptableInput.close();
    211    this._output.close();
    212    this._destroyIncoming();
    213    this._destroyAllOutgoing();
    214    if (this.hooks) {
    215      this.hooks.onClosed(reason);
    216      this.hooks = null;
    217    }
    218    if (reason) {
    219      dumpv("Transport closed: " + reason);
    220    } else {
    221      dumpv("Transport closed.");
    222    }
    223  },
    224 
    225  /**
    226   * The currently outgoing packet (at the top of the queue).
    227   */
    228  get _currentOutgoing() {
    229    return this._outgoing[0];
    230  },
    231 
    232  /**
    233   * Flush data to the outgoing stream.  Waits until the output
    234   * stream notifies us that it is ready to be written to (via
    235   * onOutputStreamReady).
    236   */
    237  _flushOutgoing() {
    238    if (!this._outgoingEnabled || this._outgoing.length === 0) {
    239      return;
    240    }
    241 
    242    // If the top of the packet queue has nothing more to send, remove it.
    243    if (this._currentOutgoing.done) {
    244      this._finishCurrentOutgoing();
    245    }
    246 
    247    if (this._outgoing.length) {
    248      let threadManager = Cc["@mozilla.org/thread-manager;1"].getService();
    249      this._output.asyncWait(this, 0, 0, threadManager.currentThread);
    250    }
    251  },
    252 
    253  /**
    254   * Pause this transport's attempts to write to the output stream.
    255   * This is used when we've temporarily handed off our output stream for
    256   * writing bulk data.
    257   */
    258  pauseOutgoing() {
    259    this._outgoingEnabled = false;
    260  },
    261 
    262  /**
    263   * Resume this transport's attempts to write to the output stream.
    264   */
    265  resumeOutgoing() {
    266    this._outgoingEnabled = true;
    267    this._flushOutgoing();
    268  },
    269 
    270  // nsIOutputStreamCallback
    271  /**
    272   * This is called when the output stream is ready for more data to
    273   * be written.  The current outgoing packet will attempt to write some
    274   * amount of data, but may not complete.
    275   */
    276  onOutputStreamReady(stream) {
    277    if (!this._outgoingEnabled || this._outgoing.length === 0) {
    278      return;
    279    }
    280 
    281    try {
    282      this._currentOutgoing.write(stream);
    283    } catch (e) {
    284      if (e.result != Cr.NS_BASE_STREAM_WOULD_BLOCK) {
    285        this.close(e.result);
    286        return;
    287      }
    288      throw e;
    289    }
    290 
    291    this._flushOutgoing();
    292  },
    293 
    294  /**
    295   * Remove the current outgoing packet from the queue upon completion.
    296   */
    297  _finishCurrentOutgoing() {
    298    if (this._currentOutgoing) {
    299      this._currentOutgoing.destroy();
    300      this._outgoing.shift();
    301    }
    302  },
    303 
    304  /**
    305   * Clear the entire outgoing queue.
    306   */
    307  _destroyAllOutgoing() {
    308    for (let packet of this._outgoing) {
    309      packet.destroy();
    310    }
    311    this._outgoing = [];
    312  },
    313 
    314  /**
    315   * Initialize the input stream for reading. Once this method has been
    316   * called, we watch for packets on the input stream, and pass them to
    317   * the appropriate handlers via this.hooks.
    318   */
    319  ready() {
    320    this.active = true;
    321    this._waitForIncoming();
    322  },
    323 
    324  /**
    325   * Asks the input stream to notify us (via onInputStreamReady) when it is
    326   * ready for reading.
    327   */
    328  _waitForIncoming() {
    329    if (this._incomingEnabled) {
    330      let threadManager = Cc["@mozilla.org/thread-manager;1"].getService();
    331      this._input.asyncWait(this, 0, 0, threadManager.currentThread);
    332    }
    333  },
    334 
    335  /**
    336   * Pause this transport's attempts to read from the input stream.
    337   * This is used when we've temporarily handed off our input stream for
    338   * reading bulk data.
    339   */
    340  pauseIncoming() {
    341    this._incomingEnabled = false;
    342  },
    343 
    344  /**
    345   * Resume this transport's attempts to read from the input stream.
    346   */
    347  resumeIncoming() {
    348    this._incomingEnabled = true;
    349    this._flushIncoming();
    350    this._waitForIncoming();
    351  },
    352 
    353  // nsIInputStreamCallback
    354  /**
    355   * Called when the stream is either readable or closed.
    356   */
    357  onInputStreamReady(stream) {
    358    try {
    359      while (
    360        stream.available() &&
    361        this._incomingEnabled &&
    362        this._processIncoming(stream, stream.available())
    363      ) {
    364        // Loop until there is nothing more to process
    365      }
    366      this._waitForIncoming();
    367    } catch (e) {
    368      if (e.result != Cr.NS_BASE_STREAM_WOULD_BLOCK) {
    369        this.close(e.result);
    370      } else {
    371        throw e;
    372      }
    373    }
    374  },
    375 
    376  /**
    377   * Process the incoming data.  Will create a new currently incoming
    378   * Packet if needed.  Tells the incoming Packet to read as much data
    379   * as it can, but reading may not complete.  The Packet signals that
    380   * its data is ready for delivery by calling one of this transport's
    381   * _on*Ready methods (see ./packets.js and the _on*Ready methods below).
    382   *
    383   * @returns {boolean}
    384   *     Whether incoming stream processing should continue for any
    385   *     remaining data.
    386   */
    387  _processIncoming(stream, count) {
    388    dumpv("Data available: " + count);
    389 
    390    if (!count) {
    391      dumpv("Nothing to read, skipping");
    392      return false;
    393    }
    394 
    395    try {
    396      if (!this._incoming) {
    397        dumpv("Creating a new packet from incoming");
    398 
    399        if (!this._readHeader(stream)) {
    400          // Not enough data to read packet type
    401          return false;
    402        }
    403 
    404        // Attempt to create a new Packet by trying to parse each possible
    405        // header pattern.
    406        this._incoming = lazy.Packet.fromHeader(this._incomingHeader, this);
    407        if (!this._incoming) {
    408          throw new Error(
    409            "No packet types for header: " + this._incomingHeader
    410          );
    411        }
    412      }
    413 
    414      if (!this._incoming.done) {
    415        // We have an incomplete packet, keep reading it.
    416        dumpv("Existing packet incomplete, keep reading");
    417        this._incoming.read(stream, this._scriptableInput);
    418      }
    419    } catch (e) {
    420      dump(`Error reading incoming packet: (${e} - ${e.stack})\n`);
    421 
    422      // Now in an invalid state, shut down the transport.
    423      this.close();
    424      return false;
    425    }
    426 
    427    if (!this._incoming.done) {
    428      // Still not complete, we'll wait for more data.
    429      dumpv("Packet not done, wait for more");
    430      return true;
    431    }
    432 
    433    // Ready for next packet
    434    this._flushIncoming();
    435    return true;
    436  },
    437 
    438  /**
    439   * Read as far as we can into the incoming data, attempting to build
    440   * up a complete packet header (which terminates with ":").  We'll only
    441   * read up to PACKET_HEADER_MAX characters.
    442   *
    443   * @returns {boolean}
    444   *     True if we now have a complete header.
    445   */
    446  _readHeader() {
    447    let amountToRead = PACKET_HEADER_MAX - this._incomingHeader.length;
    448    this._incomingHeader += lazy.StreamUtils.delimitedRead(
    449      this._scriptableInput,
    450      ":",
    451      amountToRead
    452    );
    453    if (flags.wantVerbose) {
    454      dumpv("Header read: " + this._incomingHeader);
    455    }
    456 
    457    if (this._incomingHeader.endsWith(":")) {
    458      if (flags.wantVerbose) {
    459        dumpv("Found packet header successfully: " + this._incomingHeader);
    460      }
    461      return true;
    462    }
    463 
    464    if (this._incomingHeader.length >= PACKET_HEADER_MAX) {
    465      throw new Error("Failed to parse packet header!");
    466    }
    467 
    468    // Not enough data yet.
    469    return false;
    470  },
    471 
    472  /**
    473   * If the incoming packet is done, log it as needed and clear the buffer.
    474   */
    475  _flushIncoming() {
    476    if (!this._incoming.done) {
    477      return;
    478    }
    479    if (flags.wantLogging) {
    480      dumpv("Got: " + this._incoming);
    481    }
    482    this._destroyIncoming();
    483  },
    484 
    485  /**
    486   * Handler triggered by an incoming JSONPacket completing it's |read|
    487   * method.  Delivers the packet to this.hooks.onPacket.
    488   */
    489  _onJSONObjectReady(object) {
    490    lazy.executeSoon(() => {
    491      // Ensure the transport is still alive by the time this runs.
    492      if (this.active) {
    493        this.emit("packet", object);
    494        this.hooks.onPacket(object);
    495      }
    496    });
    497  },
    498 
    499  /**
    500   * Handler triggered by an incoming BulkPacket entering the |read|
    501   * phase for the stream portion of the packet.  Delivers info about the
    502   * incoming streaming data to this.hooks.onBulkPacket.  See the main
    503   * comment on the transport at the top of this file for more details.
    504   */
    505  _onBulkReadReady(...args) {
    506    lazy.executeSoon(() => {
    507      // Ensure the transport is still alive by the time this runs.
    508      if (this.active) {
    509        this.emit("bulkpacket", ...args);
    510        this.hooks.onBulkPacket(...args);
    511      }
    512    });
    513  },
    514 
    515  /**
    516   * Remove all handlers and references related to the current incoming
    517   * packet, either because it is now complete or because the transport
    518   * is closing.
    519   */
    520  _destroyIncoming() {
    521    if (this._incoming) {
    522      this._incoming.destroy();
    523    }
    524    this._incomingHeader = "";
    525    this._incoming = null;
    526  },
    527 };