tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

stream-utils.sys.mjs (7749B)


      1 /* This Source Code Form is subject to the terms of the Mozilla Public
      2 * License, v. 2.0. If a copy of the MPL was not distributed with this
      3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      4 
      5 import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs";
      6 
      7 const lazy = {};
      8 
      9 ChromeUtils.defineESModuleGetters(lazy, {
     10  EventEmitter: "resource://gre/modules/EventEmitter.sys.mjs",
     11 });
     12 
     13 XPCOMUtils.defineLazyServiceGetter(
     14  lazy,
     15  "IOUtil",
     16  "@mozilla.org/io-util;1",
     17  Ci.nsIIOUtil
     18 );
     19 
     20 ChromeUtils.defineLazyGetter(lazy, "ScriptableInputStream", () => {
     21  return Components.Constructor(
     22    "@mozilla.org/scriptableinputstream;1",
     23    "nsIScriptableInputStream",
     24    "init"
     25  );
     26 });
     27 
     28 const BUFFER_SIZE = 0x8000;
     29 
     30 /**
     31 * This helper function (and its companion object) are used by bulk
     32 * senders and receivers to read and write data in and out of other streams.
     33 * Functions that make use of this tool are passed to callers when it is
     34 * time to read or write bulk data.  It is highly recommended to use these
     35 * copier functions instead of the stream directly because the copier
     36 * enforces the agreed upon length. Since bulk mode reuses an existing
     37 * stream, the sender and receiver must write and read exactly the agreed
     38 * upon amount of data, or else the entire transport will be left in a
     39 * invalid state.  Additionally, other methods of stream copying (such as
     40 * NetUtil.asyncCopy) close the streams involved, which would terminate
     41 * the debugging transport, and so it is avoided here.
     42 *
     43 * Overall, this *works*, but clearly the optimal solution would be
     44 * able to just use the streams directly.  If it were possible to fully
     45 * implement nsIInputStream/nsIOutputStream in JS, wrapper streams could
     46 * be created to enforce the length and avoid closing, and consumers could
     47 * use familiar stream utilities like NetUtil.asyncCopy.
     48 *
     49 * The function takes two async streams and copies a precise number
     50 * of bytes from one to the other.  Copying begins immediately, but may
     51 * complete at some future time depending on data size.  Use the returned
     52 * promise to know when it's complete.
     53 *
     54 * @param {nsIAsyncInputStream} input
     55 *     Stream to copy from.
     56 * @param {nsIAsyncOutputStream} output
     57 *        Stream to copy to.
     58 * @param {number} length
     59 *        Amount of data that needs to be copied.
     60 *
     61 * @returns {Promise}
     62 *     Promise is resolved when copying completes or rejected if any
     63 *     (unexpected) errors occur.
     64 */
     65 function copyStream(input, output, length) {
     66  let copier = new StreamCopier(input, output, length);
     67  return copier.copy();
     68 }
     69 
     70 /** @class */
     71 function StreamCopier(input, output, length) {
     72  lazy.EventEmitter.decorate(this);
     73  this._id = StreamCopier._nextId++;
     74  this.input = input;
     75  // Save off the base output stream, since we know it's async as we've
     76  // required
     77  this.baseAsyncOutput = output;
     78  if (lazy.IOUtil.outputStreamIsBuffered(output)) {
     79    this.output = output;
     80  } else {
     81    this.output = Cc[
     82      "@mozilla.org/network/buffered-output-stream;1"
     83    ].createInstance(Ci.nsIBufferedOutputStream);
     84    this.output.init(output, BUFFER_SIZE);
     85  }
     86  this._length = length;
     87  this._amountLeft = length;
     88  this._deferred = {
     89    promise: new Promise((resolve, reject) => {
     90      this._deferred.resolve = resolve;
     91      this._deferred.reject = reject;
     92    }),
     93  };
     94 
     95  this._copy = this._copy.bind(this);
     96  this._flush = this._flush.bind(this);
     97  this._destroy = this._destroy.bind(this);
     98 
     99  // Copy promise's then method up to this object.
    100  //
    101  // Allows the copier to offer a promise interface for the simple succeed
    102  // or fail scenarios, but also emit events (due to the EventEmitter)
    103  // for other states, like progress.
    104  this.then = this._deferred.promise.then.bind(this._deferred.promise);
    105  this.then(this._destroy, this._destroy);
    106 
    107  // Stream ready callback starts as |_copy|, but may switch to |_flush|
    108  // at end if flushing would block the output stream.
    109  this._streamReadyCallback = this._copy;
    110 }
    111 StreamCopier._nextId = 0;
    112 
    113 StreamCopier.prototype = {
    114  copy() {
    115    // Dispatch to the next tick so that it's possible to attach a progress
    116    // event listener, even for extremely fast copies (like when testing).
    117    Services.tm.currentThread.dispatch(() => {
    118      try {
    119        this._copy();
    120      } catch (e) {
    121        this._deferred.reject(e);
    122      }
    123    }, 0);
    124    return this;
    125  },
    126 
    127  _copy() {
    128    let bytesAvailable = this.input.available();
    129    let amountToCopy = Math.min(bytesAvailable, this._amountLeft);
    130    this._debug("Trying to copy: " + amountToCopy);
    131 
    132    let bytesCopied;
    133    try {
    134      bytesCopied = this.output.writeFrom(this.input, amountToCopy);
    135    } catch (e) {
    136      if (e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK) {
    137        this._debug("Base stream would block, will retry");
    138        this._debug("Waiting for output stream");
    139        this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
    140        return;
    141      }
    142      throw e;
    143    }
    144 
    145    this._amountLeft -= bytesCopied;
    146    this._debug("Copied: " + bytesCopied + ", Left: " + this._amountLeft);
    147    this._emitProgress();
    148 
    149    if (this._amountLeft === 0) {
    150      this._debug("Copy done!");
    151      this._flush();
    152      return;
    153    }
    154 
    155    this._debug("Waiting for input stream");
    156    this.input.asyncWait(this, 0, 0, Services.tm.currentThread);
    157  },
    158 
    159  _emitProgress() {
    160    this.emit("progress", {
    161      bytesSent: this._length - this._amountLeft,
    162      totalBytes: this._length,
    163    });
    164  },
    165 
    166  _flush() {
    167    try {
    168      this.output.flush();
    169    } catch (e) {
    170      if (
    171        e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK ||
    172        e.result == Cr.NS_ERROR_FAILURE
    173      ) {
    174        this._debug("Flush would block, will retry");
    175        this._streamReadyCallback = this._flush;
    176        this._debug("Waiting for output stream");
    177        this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
    178        return;
    179      }
    180      throw e;
    181    }
    182    this._deferred.resolve();
    183  },
    184 
    185  _destroy() {
    186    this._destroy = null;
    187    this._copy = null;
    188    this._flush = null;
    189    this.input = null;
    190    this.output = null;
    191  },
    192 
    193  // nsIInputStreamCallback
    194  onInputStreamReady() {
    195    this._streamReadyCallback();
    196  },
    197 
    198  // nsIOutputStreamCallback
    199  onOutputStreamReady() {
    200    this._streamReadyCallback();
    201  },
    202 
    203  _debug() {},
    204 };
    205 
    206 /**
    207 * Read from a stream, one byte at a time, up to the next
    208 * <var>delimiter</var> character, but stopping if we've read |count|
    209 * without finding it.  Reading also terminates early if there are less
    210 * than <var>count</var> bytes available on the stream.  In that case,
    211 * we only read as many bytes as the stream currently has to offer.
    212 *
    213 * @param {nsIInputStream} stream
    214 *     Input stream to read from.
    215 * @param {string} delimiter
    216 *     Character we're trying to find.
    217 * @param {number} count
    218 *     Max number of characters to read while searching.
    219 *
    220 * @returns {string}
    221 *     Collected data.  If the delimiter was found, this string will
    222 *     end with it.
    223 */
    224 // TODO: This implementation could be removed if bug 984651 is fixed,
    225 // which provides a native version of the same idea.
    226 function delimitedRead(stream, delimiter, count) {
    227  let scriptableStream;
    228  if (stream instanceof Ci.nsIScriptableInputStream) {
    229    scriptableStream = stream;
    230  } else {
    231    scriptableStream = new lazy.ScriptableInputStream(stream);
    232  }
    233 
    234  let data = "";
    235 
    236  // Don't exceed what's available on the stream
    237  count = Math.min(count, stream.available());
    238 
    239  if (count <= 0) {
    240    return data;
    241  }
    242 
    243  let char;
    244  while (char !== delimiter && count > 0) {
    245    char = scriptableStream.readBytes(1);
    246    count--;
    247    data += char;
    248  }
    249 
    250  return data;
    251 }
    252 
    253 export const StreamUtils = {
    254  copyStream,
    255  delimitedRead,
    256 };