tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

stream-utils.js (15846B)


      1 /* This Source Code Form is subject to the terms of the Mozilla Public
      2 * License, v. 2.0. If a copy of the MPL was not distributed with this
      3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      4 
      5 "use strict";
      6 
      7 const DevToolsUtils = require("resource://devtools/shared/DevToolsUtils.js");
      8 const { dumpv } = DevToolsUtils;
      9 const EventEmitter = require("resource://devtools/shared/event-emitter.js");
     10 
     11 DevToolsUtils.defineLazyGetter(this, "IOUtil", () => {
     12  return Cc["@mozilla.org/io-util;1"].getService(Ci.nsIIOUtil);
     13 });
     14 
     15 DevToolsUtils.defineLazyGetter(this, "ScriptableInputStream", () => {
     16  return Components.Constructor(
     17    "@mozilla.org/scriptableinputstream;1",
     18    "nsIScriptableInputStream",
     19    "init"
     20  );
     21 });
     22 
     23 const BUFFER_SIZE = 0x8000;
     24 
     25 /**
     26 * This helper function (and its companion object) are used by bulk senders and
     27 * receivers to read and write data in and out of other streams.  Functions that
     28 * make use of this tool are passed to callers when it is time to read or write
     29 * bulk data.  It is highly recommended to use these copier functions instead of
     30 * the stream directly because the copier enforces the agreed upon length.
     31 * Since bulk mode reuses an existing stream, the sender and receiver must write
     32 * and read exactly the agreed upon amount of data, or else the entire transport
     33 * will be left in a invalid state.  Additionally, other methods of stream
     34 * copying (such as NetUtil.asyncCopy) close the streams involved, which would
     35 * terminate the debugging transport, and so it is avoided here.
     36 *
     37 * Overall, this *works*, but clearly the optimal solution would be able to just
     38 * use the streams directly.  If it were possible to fully implement
     39 * nsIInputStream / nsIOutputStream in JS, wrapper streams could be created to
     40 * enforce the length and avoid closing, and consumers could use familiar stream
     41 * utilities like NetUtil.asyncCopy.
     42 *
     43 * The function takes two async streams and copies a precise number of bytes
     44 * from one to the other.  Copying begins immediately, but may complete at some
     45 * future time depending on data size.  Use the returned promise to know when
     46 * it's complete.
     47 *
     48 * @param input nsIAsyncInputStream
     49 *        The stream to copy from.
     50 * @param output nsIAsyncOutputStream
     51 *        The stream to copy to.
     52 * @param length Integer
     53 *        The amount of data that needs to be copied.
     54 * @return Promise
     55 *         The promise is resolved when copying completes or rejected if any
     56 *         (unexpected) errors occur.
     57 */
     58 function copyStream(input, output, length) {
     59  const copier = new StreamCopier(input, output, length);
     60  return copier.copy();
     61 }
     62 
     63 class StreamCopier {
     64  static _nextId = 0;
     65 
     66  constructor(input, output, length) {
     67    EventEmitter.decorate(this);
     68    this._id = StreamCopier._nextId++;
     69    this.input = input;
     70    // Save off the base output stream, since we know it's async as we've required
     71    this.baseAsyncOutput = output;
     72    if (IOUtil.outputStreamIsBuffered(output)) {
     73      this.output = output;
     74    } else {
     75      this.output = Cc[
     76        "@mozilla.org/network/buffered-output-stream;1"
     77      ].createInstance(Ci.nsIBufferedOutputStream);
     78      this.output.init(output, BUFFER_SIZE);
     79    }
     80    this._length = length;
     81    this._amountLeft = length;
     82    let _resolve;
     83    let _reject;
     84    this._deferred = new Promise((resolve, reject) => {
     85      _resolve = resolve;
     86      _reject = reject;
     87    });
     88    this._deferred.resolve = _resolve;
     89    this._deferred.reject = _reject;
     90 
     91    this._copy = this._copy.bind(this);
     92    this._flush = this._flush.bind(this);
     93    this._destroy = this._destroy.bind(this);
     94 
     95    // Copy promise's then method up to this object.
     96    // Allows the copier to offer a promise interface for the simple succeed or
     97    // fail scenarios, but also emit events (due to the EventEmitter) for other
     98    // states, like progress.
     99    this.then = this._deferred.then.bind(this._deferred);
    100    this.then(this._destroy, this._destroy);
    101 
    102    // Stream ready callback starts as |_copy|, but may switch to |_flush| at end
    103    // if flushing would block the output stream.
    104    this._streamReadyCallback = this._copy;
    105  }
    106  copy() {
    107    // Dispatch to the next tick so that it's possible to attach a progress
    108    // event listener, even for extremely fast copies (like when testing).
    109    Services.tm.dispatchToMainThread(() => {
    110      try {
    111        this._copy();
    112      } catch (e) {
    113        this._deferred.reject(e);
    114      }
    115    });
    116    return this;
    117  }
    118 
    119  _copy() {
    120    const bytesAvailable = this.input.available();
    121    const amountToCopy = Math.min(bytesAvailable, this._amountLeft);
    122    this._debug("Trying to copy: " + amountToCopy);
    123 
    124    let bytesCopied;
    125    try {
    126      bytesCopied = this.output.writeFrom(this.input, amountToCopy);
    127    } catch (e) {
    128      if (e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK) {
    129        this._debug("Base stream would block, will retry");
    130        this._debug("Waiting for output stream");
    131        this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
    132        return;
    133      }
    134      throw e;
    135    }
    136 
    137    this._amountLeft -= bytesCopied;
    138    this._debug("Copied: " + bytesCopied + ", Left: " + this._amountLeft);
    139    this._emitProgress();
    140 
    141    if (this._amountLeft === 0) {
    142      this._debug("Copy done!");
    143      this._flush();
    144      return;
    145    }
    146 
    147    this._debug("Waiting for input stream");
    148    this.input.asyncWait(this, 0, 0, Services.tm.currentThread);
    149  }
    150 
    151  _emitProgress() {
    152    this.emit("progress", {
    153      bytesSent: this._length - this._amountLeft,
    154      totalBytes: this._length,
    155    });
    156  }
    157 
    158  _flush() {
    159    try {
    160      this.output.flush();
    161    } catch (e) {
    162      if (
    163        e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK ||
    164        e.result == Cr.NS_ERROR_FAILURE
    165      ) {
    166        this._debug("Flush would block, will retry");
    167        this._streamReadyCallback = this._flush;
    168        this._debug("Waiting for output stream");
    169        this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
    170        return;
    171      }
    172      throw e;
    173    }
    174    this._deferred.resolve();
    175  }
    176 
    177  _destroy() {
    178    this._destroy = null;
    179    this._copy = null;
    180    this._flush = null;
    181    this.input = null;
    182    this.output = null;
    183  }
    184 
    185  // nsIInputStreamCallback
    186  onInputStreamReady() {
    187    this._streamReadyCallback();
    188  }
    189 
    190  // nsIOutputStreamCallback
    191  onOutputStreamReady() {
    192    this._streamReadyCallback();
    193  }
    194 
    195  _debug(msg) {
    196    // Prefix logs with the copier ID, which makes logs much easier to
    197    // understand when several copiers are running simultaneously
    198    dumpv("Copier: " + this._id + " " + msg);
    199  }
    200 }
    201 
    202 /**
    203 * Read from a stream, one byte at a time, up to the next |delimiter|
    204 * character, but stopping if we've read |count| without finding it.  Reading
    205 * also terminates early if there are less than |count| bytes available on the
    206 * stream.  In that case, we only read as many bytes as the stream currently has
    207 * to offer.
    208 * TODO: This implementation could be removed if bug 984651 is fixed, which
    209 * provides a native version of the same idea.
    210 *
    211 * @param stream nsIInputStream
    212 *        The input stream to read from.
    213 * @param delimiter string
    214 *        The character we're trying to find.
    215 * @param count integer
    216 *        The max number of characters to read while searching.
    217 * @return string
    218 *         The data collected.  If the delimiter was found, this string will
    219 *         end with it.
    220 */
    221 function delimitedRead(stream, delimiter, count) {
    222  dumpv(
    223    "Starting delimited read for " + delimiter + " up to " + count + " bytes"
    224  );
    225 
    226  let scriptableStream;
    227  if (stream instanceof Ci.nsIScriptableInputStream) {
    228    scriptableStream = stream;
    229  } else {
    230    scriptableStream = new ScriptableInputStream(stream);
    231  }
    232 
    233  let data = "";
    234 
    235  // Don't exceed what's available on the stream
    236  count = Math.min(count, stream.available());
    237 
    238  if (count <= 0) {
    239    return data;
    240  }
    241 
    242  let char;
    243  while (char !== delimiter && count > 0) {
    244    char = scriptableStream.readBytes(1);
    245    count--;
    246    data += char;
    247  }
    248 
    249  return data;
    250 }
    251 
    252 /**
    253 * This function efficiently copies an async stream to an array buffer.
    254 * Usage:
    255 *   // The buffer length is used to define the length of data to copy from the stream.
    256 *   const buffer = new ArrayBuffer(length);
    257 *   await copyAsyncStreamToArrayBuffer(inputStream, buffer);
    258 *
    259 * @param {nsIAsyncStream} asyncInputStream
    260 * @param {ArrayBuffer} buffer The byteLength of this buffer will be used to define the length of data to copy from the stream.
    261 */
    262 async function copyAsyncStreamToArrayBuffer(asyncInputStream, buffer) {
    263  const reader = new AsyncStreamToArrayBufferCopier(asyncInputStream, buffer);
    264  await reader.asyncRead();
    265 }
    266 
    267 class AsyncStreamToArrayBufferCopier {
    268  #BUFFER_SIZE = 16 * 1024; // A 16k buffer
    269 
    270  /**
    271   * @typedef {nsIAsyncInputStream}
    272   */
    273  #originalStream;
    274 
    275  /**
    276   * This is a wrapper on top of #originalStream, to be able to read buffers
    277   * easily.
    278   *
    279   * @typedef {nsIBinaryInputStream}
    280   */
    281  #binaryStream;
    282 
    283  /**
    284   * This is the output buffer, accessed as an UInt8Array.
    285   *
    286   * @typedef {Uint8Array}
    287   */
    288  #outputArray;
    289 
    290  /**
    291   * How many bytes have been read already. This is also the next index to write
    292   * in #outputArray.
    293   *
    294   * @typedef {number}
    295   */
    296  #pointer = 0;
    297 
    298  /**
    299   * The count of bytes to be transfered. It is infered from the byteLength of
    300   * of the output buffer.
    301   *
    302   * @typedef {number}
    303   */
    304  #count;
    305 
    306  /**
    307   * This temporary buffer is used when reading from #binaryStream.
    308   *
    309   * @typedef {ArrayBuffer}
    310   */
    311  #tempBuffer;
    312 
    313  /**
    314   * @typedef {Uint8Array}
    315   */
    316  #tempBufferAsArray;
    317 
    318  /**
    319   * @param {nsIAsyncStream} stream
    320   * @param {ArrayBuffer} arrayBuffer The byteLength of this buffer will be used to define the length of data to copy from the stream.
    321   */
    322  constructor(stream, arrayBuffer) {
    323    this.#originalStream = stream;
    324    this.#binaryStream = Cc["@mozilla.org/binaryinputstream;1"].createInstance(
    325      Ci.nsIBinaryInputStream
    326    );
    327    this.#binaryStream.setInputStream(stream);
    328 
    329    this.#outputArray = new Uint8Array(arrayBuffer);
    330    this.#count = arrayBuffer.byteLength;
    331    this.#tempBuffer = new ArrayBuffer(this.#BUFFER_SIZE);
    332    this.#tempBufferAsArray = new Uint8Array(this.#tempBuffer);
    333  }
    334 
    335  /**
    336   * @returns {Promise<void>} Resolves when the reading has finished.
    337   */
    338  async asyncRead() {
    339    do {
    340      await this.#waitForStreamAvailability();
    341      this.#syncRead();
    342    } while (this.#pointer < this.#count);
    343    dumpv(`Successfully read ${this.#count} bytes!`);
    344  }
    345 
    346  /**
    347   * @returns {Promise<void>} Resolves when the stream is available.
    348   */
    349  async #waitForStreamAvailability() {
    350    return new Promise(resolve => {
    351      this.#originalStream.asyncWait(
    352        () => resolve(),
    353        0,
    354        0,
    355        Services.tm.currentThread
    356      );
    357    });
    358  }
    359 
    360  /**
    361   * @returns {void}
    362   */
    363  #syncRead() {
    364    const amountLeft = this.#count - this.#pointer;
    365    const count = Math.min(this.#binaryStream.available(), amountLeft);
    366    if (count <= 0) {
    367      return;
    368    }
    369 
    370    dumpv(
    371      `Will read synchronously ${count} bytes out of ${amountLeft} bytes left.`
    372    );
    373 
    374    let remaining = count;
    375    while (remaining) {
    376      // TODO readArrayBuffer doesn't know how to write to an offset in the buffer,
    377      // see bug 1962705.
    378      const willRead = Math.min(remaining, this.#BUFFER_SIZE);
    379      const hasRead = this.#binaryStream.readArrayBuffer(
    380        willRead,
    381        this.#tempBuffer
    382      );
    383 
    384      if (hasRead < willRead) {
    385        console.error(
    386          `[devtools perf front] We were expecting ${willRead} bytes, but received ${hasRead} bytes instead.`
    387        );
    388      }
    389      const toCopyArray = this.#tempBufferAsArray.subarray(0, hasRead);
    390      this.#outputArray.set(toCopyArray, this.#pointer);
    391      this.#pointer += hasRead;
    392      remaining -= hasRead;
    393    }
    394    dumpv(
    395      `${count} bytes have been successfully read. Total: ${this.#pointer} / ${this.#count}`
    396    );
    397  }
    398 }
    399 
    400 /**
    401 * This function efficiently copies the content of an array buffer to an async stream.
    402 * Usage:
    403 *   // The buffer length is used to define the length of data to copy to the stream.
    404 *   await copyArrayBufferToAsyncStream(buffer, asyncOutputStream);
    405 *
    406 * @param {ArrayBuffer} buffer The byteLength of this buffer will be used to define the length of data to copy to the stream.
    407 * @param {nsIAsyncStream} asyncOutputStream
    408 */
    409 async function copyArrayBufferToAsyncStream(buffer, asyncOutputStream) {
    410  const writer = new ArrayBufferToAsyncStreamCopier(buffer, asyncOutputStream);
    411  await writer.asyncWrite();
    412 }
    413 
    414 class ArrayBufferToAsyncStreamCopier {
    415  #BUFFER_SIZE = 16 * 1024; // A 16k buffer
    416 
    417  /**
    418   * @typedef {nsIAsyncOutputStream}
    419   */
    420  #originalStream;
    421 
    422  /**
    423   * This is a wrapper on top of #originalStream, to be able to write buffers
    424   * easily.
    425   *
    426   * @typedef {nsIBinaryOutputStream}
    427   */
    428  #binaryStream;
    429 
    430  /**
    431   * This is the input buffer, accessed as an UInt8Array.
    432   *
    433   * @typedef {Uint8Array}
    434   */
    435  #inputArray;
    436 
    437  /**
    438   * How many bytes have been read already. This is also the next index to read
    439   * in #outputArray.
    440   *
    441   * @typedef {number}
    442   */
    443  #pointer = 0;
    444 
    445  /**
    446   * The count of bytes to be transfered. It is infered from the byteLength of
    447   * of the input buffer.
    448   *
    449   * @typedef {number}
    450   */
    451  #count;
    452 
    453  /**
    454   * @param {ArrayBuffer} arrayBuffer The byteLength of this buffer will be used to define the length of data to copy to the stream.
    455   * @param {nsIAsyncStream} stream
    456   */
    457  constructor(arrayBuffer, stream) {
    458    this.#originalStream = stream;
    459    this.#binaryStream = Cc["@mozilla.org/binaryoutputstream;1"].createInstance(
    460      Ci.nsIBinaryOutputStream
    461    );
    462    this.#binaryStream.setOutputStream(stream);
    463 
    464    this.#inputArray = new Uint8Array(arrayBuffer);
    465    this.#count = arrayBuffer.byteLength;
    466  }
    467 
    468  /**
    469   * @returns {Promise<void>} Resolves when the reading has finished.
    470   */
    471  async asyncWrite() {
    472    do {
    473      await this.#waitForStreamAvailability();
    474      this.#syncWrite();
    475    } while (this.#pointer < this.#count);
    476    dumpv(`Successfully wrote ${this.#count} bytes!`);
    477  }
    478 
    479  /**
    480   * @returns {Promise<void>} Resolves when the stream is available.
    481   */
    482  async #waitForStreamAvailability() {
    483    return new Promise(resolve => {
    484      this.#originalStream.asyncWait(
    485        () => resolve(),
    486        0,
    487        0,
    488        Services.tm.currentThread
    489      );
    490    });
    491  }
    492 
    493  /**
    494   * @returns {void}
    495   */
    496  #syncWrite() {
    497    const amountLeft = this.#count - this.#pointer;
    498    if (amountLeft <= 0) {
    499      return;
    500    }
    501 
    502    let remaining = amountLeft;
    503    while (remaining) {
    504      const willWrite = Math.min(remaining, this.#BUFFER_SIZE);
    505      const subarray = this.#inputArray.subarray(
    506        this.#pointer,
    507        this.#pointer + willWrite
    508      );
    509      try {
    510        // Bug 1962705: writeByteArray does a copy in
    511        // https://searchfox.org/mozilla-central/rev/3d294b119bf2add880f615a0fc61a5d54bcd6264/js/xpconnect/src/XPCConvert.cpp#1440
    512        // modify BinaryOutputStream so that it can read directly from the buffer.
    513        this.#binaryStream.writeByteArray(subarray);
    514      } catch (e) {
    515        if (e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK) {
    516          dumpv(
    517            `Base stream would block, will retry. ${amountLeft - remaining} bytes have been successfully written. Total: ${this.#pointer} / ${this.#count}`
    518          );
    519          return;
    520        }
    521        throw e;
    522      }
    523 
    524      this.#pointer += willWrite;
    525      remaining -= willWrite;
    526    }
    527    dumpv(
    528      `${amountLeft - remaining} bytes have been successfully written. Total: ${this.#pointer} / ${this.#count}`
    529    );
    530  }
    531 }
    532 
    533 module.exports = {
    534  copyStream,
    535  delimitedRead,
    536  copyAsyncStreamToArrayBuffer,
    537  copyArrayBufferToAsyncStream,
    538 };