tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

NetworkThrottleManager.sys.mjs (14541B)


      1 /* This Source Code Form is subject to the terms of the Mozilla Public
      2 * License, v. 2.0. If a copy of the MPL was not distributed with this
      3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      4 
      5 const ArrayBufferInputStream = Components.Constructor(
      6  "@mozilla.org/io/arraybuffer-input-stream;1",
      7  "nsIArrayBufferInputStream"
      8 );
      9 const BinaryInputStream = Components.Constructor(
     10  "@mozilla.org/binaryinputstream;1",
     11  "nsIBinaryInputStream",
     12  "setInputStream"
     13 );
     14 
     15 import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs";
     16 
     17 const lazy = {};
     18 
     19 XPCOMUtils.defineLazyServiceGetter(
     20  lazy,
     21  "gActivityDistributor",
     22  "@mozilla.org/network/http-activity-distributor;1",
     23  Ci.nsIHttpActivityDistributor
     24 );
     25 
     26 ChromeUtils.defineESModuleGetters(lazy, {
     27  setTimeout: "resource://gre/modules/Timer.sys.mjs",
     28 });
     29 
     30 class NetworkThrottleListener {
     31  #activities;
     32  #offset;
     33  #originalListener;
     34  #pendingData;
     35  #pendingException;
     36  #queue;
     37  #responseStarted;
     38  #shouldStopThrottling;
     39 
     40  /**
     41   * Construct a new nsIStreamListener that buffers data and provides a
     42   * method to notify another listener when data is available.  This is
     43   * used to throttle network data on a per-channel basis.
     44   *
     45   * After construction, @see setOriginalListener must be called on the
     46   * new object.
     47   *
     48   * @param {NetworkThrottleQueue} queue the NetworkThrottleQueue to
     49   *        which status changes should be reported
     50   */
     51  constructor(queue) {
     52    this.#activities = {};
     53    this.#offset = 0;
     54    this.#pendingData = [];
     55    this.#pendingException = null;
     56    this.#queue = queue;
     57    this.#responseStarted = false;
     58    this.#shouldStopThrottling = false;
     59  }
     60 
     61  stopThrottling() {
     62    // When the shouldStopThrottling flag is flipped the next call to
     63    // sendSomeData will bypass throttling and send all data immediately.
     64    this.#shouldStopThrottling = true;
     65  }
     66 
     67  /**
     68   * Set the original listener for this object.  The original listener
     69   * will receive requests from this object when the queue allows data
     70   * through.
     71   *
     72   * @param {nsIStreamListener} originalListener the original listener
     73   *        for the channel, to which all requests will be sent
     74   */
     75  setOriginalListener(originalListener) {
     76    this.#originalListener = originalListener;
     77  }
     78 
     79  /**
     80   * @see nsIStreamListener.onStartRequest.
     81   */
     82  onStartRequest(request) {
     83    this.#originalListener.onStartRequest(request);
     84    this.#queue.start(this);
     85  }
     86 
     87  /**
     88   * @see nsIStreamListener.onStopRequest.
     89   */
     90  onStopRequest(request, statusCode) {
     91    this.#pendingData.push({ request, statusCode });
     92    this.#queue.dataAvailable(this);
     93  }
     94 
     95  /**
     96   * @see nsIStreamListener.onDataAvailable.
     97   */
     98  onDataAvailable(request, inputStream, offset, count) {
     99    if (this.#pendingException) {
    100      throw this.#pendingException;
    101    }
    102 
    103    const bin = new BinaryInputStream(inputStream);
    104    const bytes = new ArrayBuffer(count);
    105    bin.readArrayBuffer(count, bytes);
    106 
    107    const stream = new ArrayBufferInputStream();
    108    stream.setData(bytes, 0, count);
    109 
    110    this.#pendingData.push({ request, stream, count });
    111    this.#queue.dataAvailable(this);
    112  }
    113 
    114  /**
    115   * Allow some buffered data from this object to be forwarded to this
    116   * object's originalListener.
    117   *
    118   * @param {number} bytesPermitted The maximum number of bytes
    119   *        permitted to be sent.
    120   * @return {object} an object of the form {length, done}, where
    121   *         |length| is the number of bytes actually forwarded, and
    122   *         |done| is a boolean indicating whether this particular
    123   *         request has been completed.  (A NetworkThrottleListener
    124   *         may be queued multiple times, so this does not mean that
    125   *         all available data has been sent.)
    126   */
    127  sendSomeData(bytesPermitted) {
    128    if (this.#pendingData.length === 0) {
    129      // Shouldn't happen.
    130      return { length: 0, done: true };
    131    }
    132 
    133    const { request, stream, count, statusCode } = this.#pendingData[0];
    134 
    135    if (statusCode !== undefined) {
    136      this.#pendingData.shift();
    137      this.#originalListener.onStopRequest(request, statusCode);
    138      return { length: 0, done: true };
    139    }
    140 
    141    if (bytesPermitted > count || this.#shouldStopThrottling) {
    142      bytesPermitted = count;
    143    }
    144 
    145    try {
    146      this.#originalListener.onDataAvailable(
    147        request,
    148        stream,
    149        this.#offset,
    150        bytesPermitted
    151      );
    152    } catch (e) {
    153      this.#pendingException = e;
    154    }
    155 
    156    let done = false;
    157    if (bytesPermitted === count) {
    158      this.#pendingData.shift();
    159      done = true;
    160    } else {
    161      this.#pendingData[0].count -= bytesPermitted;
    162    }
    163 
    164    this.#offset += bytesPermitted;
    165    // Maybe our state has changed enough to emit an event.
    166    this.#maybeEmitEvents();
    167 
    168    return { length: bytesPermitted, done };
    169  }
    170 
    171  /**
    172   * Return the number of pending data requests available for this
    173   * listener.
    174   */
    175  pendingCount() {
    176    return this.#pendingData.length;
    177  }
    178 
    179  /**
    180   * This is called when an http activity event is delivered.  This
    181   * object delays the event until the appropriate moment.
    182   */
    183  addActivityCallback(
    184    callback,
    185    httpActivity,
    186    channel,
    187    activityType,
    188    activitySubtype,
    189    timestamp,
    190    extraSizeData,
    191    extraStringData
    192  ) {
    193    const datum = {
    194      callback,
    195      httpActivity,
    196      channel,
    197      activityType,
    198      activitySubtype,
    199      extraSizeData,
    200      extraStringData,
    201    };
    202    this.#activities[activitySubtype] = datum;
    203 
    204    if (
    205      activitySubtype ===
    206      lazy.gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_COMPLETE
    207    ) {
    208      this.totalSize = extraSizeData;
    209    }
    210 
    211    this.#maybeEmitEvents();
    212  }
    213 
    214  /**
    215   * This is called for a download throttler when the latency timeout
    216   * has ended.
    217   */
    218  responseStart() {
    219    this.#responseStarted = true;
    220    this.#maybeEmitEvents();
    221  }
    222 
    223  /**
    224   * Check our internal state and emit any http activity events as
    225   * needed.  Note that we wait until both our internal state has
    226   * changed and we've received the real http activity event from
    227   * platform.  This approach ensures we can both pass on the correct
    228   * data from the original event, and update the reported time to be
    229   * consistent with the delay we're introducing.
    230   */
    231  #maybeEmitEvents() {
    232    if (this.#responseStarted) {
    233      this.#maybeEmit(
    234        lazy.gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_START
    235      );
    236      this.#maybeEmit(
    237        lazy.gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_HEADER
    238      );
    239    }
    240 
    241    if (this.totalSize !== undefined && this.#offset >= this.totalSize) {
    242      this.#maybeEmit(
    243        lazy.gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_COMPLETE
    244      );
    245      this.#maybeEmit(
    246        lazy.gActivityDistributor.ACTIVITY_SUBTYPE_TRANSACTION_CLOSE
    247      );
    248    }
    249  }
    250 
    251  /**
    252   * Emit an event for |code|, if the appropriate entry in
    253   * |activities| is defined.
    254   */
    255  #maybeEmit(code) {
    256    if (this.#activities[code] !== undefined) {
    257      const {
    258        callback,
    259        httpActivity,
    260        channel,
    261        activityType,
    262        activitySubtype,
    263        extraSizeData,
    264        extraStringData,
    265      } = this.#activities[code];
    266      const now = Date.now() * 1000;
    267      callback(
    268        httpActivity,
    269        channel,
    270        activityType,
    271        activitySubtype,
    272        now,
    273        extraSizeData,
    274        extraStringData
    275      );
    276      this.#activities[code] = undefined;
    277    }
    278  }
    279 
    280  QueryInterface = ChromeUtils.generateQI([
    281    "nsIStreamListener",
    282    "nsIInterfaceRequestor",
    283  ]);
    284 }
    285 
    286 class NetworkThrottleQueue {
    287  #latencyMax;
    288  #latencyMean;
    289  #maxBPS;
    290  #meanBPS;
    291  #pendingRequests;
    292  #previousReads;
    293  #pumping;
    294  #throttleListeners;
    295 
    296  /**
    297   * Construct a new queue that can be used to throttle the network for
    298   * a group of related network requests.
    299   *
    300   * meanBPS {Number} Mean bytes per second.
    301   * maxBPS {Number} Maximum bytes per second.
    302   * latencyMean {Number} Mean latency in milliseconds.
    303   * latencyMax {Number} Maximum latency in milliseconds.
    304   */
    305  constructor(meanBPS, maxBPS, latencyMean, latencyMax) {
    306    this.#meanBPS = meanBPS;
    307    this.#maxBPS = maxBPS;
    308    this.#latencyMean = latencyMean;
    309    this.#latencyMax = latencyMax;
    310 
    311    this.#pendingRequests = new Set();
    312    this.#throttleListeners = [];
    313    this.#previousReads = [];
    314 
    315    this.#pumping = false;
    316  }
    317 
    318  destroy() {
    319    for (const listener of this.#throttleListeners) {
    320      listener.stopThrottling();
    321    }
    322  }
    323 
    324  /**
    325   * A helper function that lets the indicating listener start sending
    326   * data.  This is called after the initial round trip time for the
    327   * listener has elapsed.
    328   */
    329  #allowDataFrom(throttleListener) {
    330    throttleListener.responseStart();
    331    this.#pendingRequests.delete(throttleListener);
    332    const count = throttleListener.pendingCount();
    333    for (let i = 0; i < count; ++i) {
    334      this.#throttleListeners.push(throttleListener);
    335    }
    336    this.#pump();
    337  }
    338 
    339  /**
    340   * An internal function that permits individual listeners to send
    341   * data.
    342   */
    343  #pump() {
    344    // A redirect will cause two NetworkThrottleListeners to be on a
    345    // listener chain.  In this case, we might recursively call into
    346    // this method.  Avoid infinite recursion here.
    347    if (this.#pumping) {
    348      return;
    349    }
    350    this.#pumping = true;
    351 
    352    const now = Date.now();
    353    const oneSecondAgo = now - 1000;
    354 
    355    while (
    356      this.#previousReads.length &&
    357      this.#previousReads[0].when < oneSecondAgo
    358    ) {
    359      this.#previousReads.shift();
    360    }
    361 
    362    const totalBytes = this.#previousReads.reduce((sum, elt) => {
    363      return sum + elt.numBytes;
    364    }, 0);
    365 
    366    let thisSliceBytes = this.#random(this.#meanBPS, this.#maxBPS);
    367    if (totalBytes < thisSliceBytes) {
    368      thisSliceBytes -= totalBytes;
    369      let readThisTime = 0;
    370      while (thisSliceBytes > 0 && this.#throttleListeners.length) {
    371        const { length, done } =
    372          this.#throttleListeners[0].sendSomeData(thisSliceBytes);
    373        thisSliceBytes -= length;
    374        readThisTime += length;
    375        if (done) {
    376          this.#throttleListeners.shift();
    377        }
    378      }
    379      this.#previousReads.push({ when: now, numBytes: readThisTime });
    380    }
    381 
    382    // If there is more data to download, then schedule ourselves for
    383    // one second after the oldest previous read.
    384    if (this.#throttleListeners.length) {
    385      const when = this.#previousReads[0].when + 1000;
    386      lazy.setTimeout(this.#pump.bind(this), when - now);
    387    }
    388 
    389    this.#pumping = false;
    390  }
    391 
    392  /**
    393   * A helper function that, given a mean and a maximum, returns a
    394   * random integer between (mean - (max - mean)) and max.
    395   */
    396  #random(mean, max) {
    397    return mean - (max - mean) + Math.floor(2 * (max - mean) * Math.random());
    398  }
    399 
    400  /**
    401   * Notice a new listener object.  This is called by the
    402   * NetworkThrottleListener when the request has started.  Initially
    403   * a new listener object is put into a "pending" state, until the
    404   * round-trip time has elapsed.  This is used to simulate latency.
    405   *
    406   * @param {NetworkThrottleListener} throttleListener the new listener
    407   */
    408  start(throttleListener) {
    409    this.#pendingRequests.add(throttleListener);
    410    const delay = this.#random(this.#latencyMean, this.#latencyMax);
    411    if (delay > 0) {
    412      lazy.setTimeout(() => this.#allowDataFrom(throttleListener), delay);
    413    } else {
    414      this.#allowDataFrom(throttleListener);
    415    }
    416  }
    417 
    418  /**
    419   * Note that new data is available for a given listener.  Each time
    420   * data is available, the listener will be re-queued.
    421   *
    422   * @param {NetworkThrottleListener} throttleListener the listener
    423   *        which has data available.
    424   */
    425  dataAvailable(throttleListener) {
    426    if (!this.#pendingRequests.has(throttleListener)) {
    427      this.#throttleListeners.push(throttleListener);
    428      this.#pump();
    429    }
    430  }
    431 }
    432 
    433 /**
    434 * Construct a new object that can be used to throttle the network for
    435 * a group of related network requests.
    436 *
    437 * @param {object} An object with the following attributes:
    438 * latencyMean {Number} Mean latency in milliseconds.
    439 * latencyMax {Number} Maximum latency in milliseconds.
    440 * downloadBPSMean {Number} Mean bytes per second for downloads.
    441 * downloadBPSMax {Number} Maximum bytes per second for downloads.
    442 * uploadBPSMean {Number} Mean bytes per second for uploads.
    443 * uploadBPSMax {Number} Maximum bytes per second for uploads.
    444 *
    445 * Download throttling will not be done if downloadBPSMean and
    446 * downloadBPSMax are <= 0.  Upload throttling will not be done if
    447 * uploadBPSMean and uploadBPSMax are <= 0.
    448 */
    449 export class NetworkThrottleManager {
    450  #downloadQueue;
    451  #uploadQueue;
    452 
    453  constructor({
    454    latencyMean,
    455    latencyMax,
    456    downloadBPSMean,
    457    downloadBPSMax,
    458    uploadBPSMean,
    459    uploadBPSMax,
    460  }) {
    461    if (downloadBPSMax <= 0 && downloadBPSMean <= 0) {
    462      this.#downloadQueue = null;
    463    } else {
    464      this.#downloadQueue = new NetworkThrottleQueue(
    465        downloadBPSMean,
    466        downloadBPSMax,
    467        latencyMean,
    468        latencyMax
    469      );
    470    }
    471    if (uploadBPSMax <= 0 && uploadBPSMean <= 0) {
    472      this.#uploadQueue = null;
    473    } else {
    474      this.#uploadQueue = Cc[
    475        "@mozilla.org/network/throttlequeue;1"
    476      ].createInstance(Ci.nsIInputChannelThrottleQueue);
    477      this.#uploadQueue.init(uploadBPSMean, uploadBPSMax);
    478    }
    479  }
    480 
    481  destroy() {
    482    // The #uploadQueue is not a NetworkThrottleQueue and at the moment, there
    483    // is no way to destroy it.
    484    if (this.#downloadQueue !== null) {
    485      this.#downloadQueue.destroy();
    486      this.#downloadQueue = null;
    487    }
    488  }
    489 
    490  /**
    491   * Create a new NetworkThrottleListener for a given channel and
    492   * install it using |setNewListener|.
    493   *
    494   * @param {nsITraceableChannel} channel the channel to manage
    495   * @return {NetworkThrottleListener} the new listener, or null if
    496   *         download throttling is not being done.
    497   */
    498  manage(channel) {
    499    if (this.#downloadQueue) {
    500      const listener = new NetworkThrottleListener(this.#downloadQueue);
    501      const originalListener = channel.setNewListener(listener);
    502      listener.setOriginalListener(originalListener);
    503      return listener;
    504    }
    505    return null;
    506  }
    507 
    508  /**
    509   * Throttle uploads taking place on the given channel.
    510   *
    511   * @param {nsITraceableChannel} channel the channel to manage
    512   */
    513  manageUpload(channel) {
    514    if (this.#uploadQueue) {
    515      channel = channel.QueryInterface(Ci.nsIThrottledInputChannel);
    516      channel.throttleQueue = this.#uploadQueue;
    517    }
    518  }
    519 }