NetworkThrottleManager.sys.mjs (14541B)
1 /* This Source Code Form is subject to the terms of the Mozilla Public 2 * License, v. 2.0. If a copy of the MPL was not distributed with this 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4 5 const ArrayBufferInputStream = Components.Constructor( 6 "@mozilla.org/io/arraybuffer-input-stream;1", 7 "nsIArrayBufferInputStream" 8 ); 9 const BinaryInputStream = Components.Constructor( 10 "@mozilla.org/binaryinputstream;1", 11 "nsIBinaryInputStream", 12 "setInputStream" 13 ); 14 15 import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs"; 16 17 const lazy = {}; 18 19 XPCOMUtils.defineLazyServiceGetter( 20 lazy, 21 "gActivityDistributor", 22 "@mozilla.org/network/http-activity-distributor;1", 23 Ci.nsIHttpActivityDistributor 24 ); 25 26 ChromeUtils.defineESModuleGetters(lazy, { 27 setTimeout: "resource://gre/modules/Timer.sys.mjs", 28 }); 29 30 class NetworkThrottleListener { 31 #activities; 32 #offset; 33 #originalListener; 34 #pendingData; 35 #pendingException; 36 #queue; 37 #responseStarted; 38 #shouldStopThrottling; 39 40 /** 41 * Construct a new nsIStreamListener that buffers data and provides a 42 * method to notify another listener when data is available. This is 43 * used to throttle network data on a per-channel basis. 44 * 45 * After construction, @see setOriginalListener must be called on the 46 * new object. 47 * 48 * @param {NetworkThrottleQueue} queue the NetworkThrottleQueue to 49 * which status changes should be reported 50 */ 51 constructor(queue) { 52 this.#activities = {}; 53 this.#offset = 0; 54 this.#pendingData = []; 55 this.#pendingException = null; 56 this.#queue = queue; 57 this.#responseStarted = false; 58 this.#shouldStopThrottling = false; 59 } 60 61 stopThrottling() { 62 // When the shouldStopThrottling flag is flipped the next call to 63 // sendSomeData will bypass throttling and send all data immediately. 64 this.#shouldStopThrottling = true; 65 } 66 67 /** 68 * Set the original listener for this object. The original listener 69 * will receive requests from this object when the queue allows data 70 * through. 71 * 72 * @param {nsIStreamListener} originalListener the original listener 73 * for the channel, to which all requests will be sent 74 */ 75 setOriginalListener(originalListener) { 76 this.#originalListener = originalListener; 77 } 78 79 /** 80 * @see nsIStreamListener.onStartRequest. 81 */ 82 onStartRequest(request) { 83 this.#originalListener.onStartRequest(request); 84 this.#queue.start(this); 85 } 86 87 /** 88 * @see nsIStreamListener.onStopRequest. 89 */ 90 onStopRequest(request, statusCode) { 91 this.#pendingData.push({ request, statusCode }); 92 this.#queue.dataAvailable(this); 93 } 94 95 /** 96 * @see nsIStreamListener.onDataAvailable. 97 */ 98 onDataAvailable(request, inputStream, offset, count) { 99 if (this.#pendingException) { 100 throw this.#pendingException; 101 } 102 103 const bin = new BinaryInputStream(inputStream); 104 const bytes = new ArrayBuffer(count); 105 bin.readArrayBuffer(count, bytes); 106 107 const stream = new ArrayBufferInputStream(); 108 stream.setData(bytes, 0, count); 109 110 this.#pendingData.push({ request, stream, count }); 111 this.#queue.dataAvailable(this); 112 } 113 114 /** 115 * Allow some buffered data from this object to be forwarded to this 116 * object's originalListener. 117 * 118 * @param {number} bytesPermitted The maximum number of bytes 119 * permitted to be sent. 120 * @return {object} an object of the form {length, done}, where 121 * |length| is the number of bytes actually forwarded, and 122 * |done| is a boolean indicating whether this particular 123 * request has been completed. (A NetworkThrottleListener 124 * may be queued multiple times, so this does not mean that 125 * all available data has been sent.) 126 */ 127 sendSomeData(bytesPermitted) { 128 if (this.#pendingData.length === 0) { 129 // Shouldn't happen. 130 return { length: 0, done: true }; 131 } 132 133 const { request, stream, count, statusCode } = this.#pendingData[0]; 134 135 if (statusCode !== undefined) { 136 this.#pendingData.shift(); 137 this.#originalListener.onStopRequest(request, statusCode); 138 return { length: 0, done: true }; 139 } 140 141 if (bytesPermitted > count || this.#shouldStopThrottling) { 142 bytesPermitted = count; 143 } 144 145 try { 146 this.#originalListener.onDataAvailable( 147 request, 148 stream, 149 this.#offset, 150 bytesPermitted 151 ); 152 } catch (e) { 153 this.#pendingException = e; 154 } 155 156 let done = false; 157 if (bytesPermitted === count) { 158 this.#pendingData.shift(); 159 done = true; 160 } else { 161 this.#pendingData[0].count -= bytesPermitted; 162 } 163 164 this.#offset += bytesPermitted; 165 // Maybe our state has changed enough to emit an event. 166 this.#maybeEmitEvents(); 167 168 return { length: bytesPermitted, done }; 169 } 170 171 /** 172 * Return the number of pending data requests available for this 173 * listener. 174 */ 175 pendingCount() { 176 return this.#pendingData.length; 177 } 178 179 /** 180 * This is called when an http activity event is delivered. This 181 * object delays the event until the appropriate moment. 182 */ 183 addActivityCallback( 184 callback, 185 httpActivity, 186 channel, 187 activityType, 188 activitySubtype, 189 timestamp, 190 extraSizeData, 191 extraStringData 192 ) { 193 const datum = { 194 callback, 195 httpActivity, 196 channel, 197 activityType, 198 activitySubtype, 199 extraSizeData, 200 extraStringData, 201 }; 202 this.#activities[activitySubtype] = datum; 203 204 if ( 205 activitySubtype === 206 lazy.gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_COMPLETE 207 ) { 208 this.totalSize = extraSizeData; 209 } 210 211 this.#maybeEmitEvents(); 212 } 213 214 /** 215 * This is called for a download throttler when the latency timeout 216 * has ended. 217 */ 218 responseStart() { 219 this.#responseStarted = true; 220 this.#maybeEmitEvents(); 221 } 222 223 /** 224 * Check our internal state and emit any http activity events as 225 * needed. Note that we wait until both our internal state has 226 * changed and we've received the real http activity event from 227 * platform. This approach ensures we can both pass on the correct 228 * data from the original event, and update the reported time to be 229 * consistent with the delay we're introducing. 230 */ 231 #maybeEmitEvents() { 232 if (this.#responseStarted) { 233 this.#maybeEmit( 234 lazy.gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_START 235 ); 236 this.#maybeEmit( 237 lazy.gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_HEADER 238 ); 239 } 240 241 if (this.totalSize !== undefined && this.#offset >= this.totalSize) { 242 this.#maybeEmit( 243 lazy.gActivityDistributor.ACTIVITY_SUBTYPE_RESPONSE_COMPLETE 244 ); 245 this.#maybeEmit( 246 lazy.gActivityDistributor.ACTIVITY_SUBTYPE_TRANSACTION_CLOSE 247 ); 248 } 249 } 250 251 /** 252 * Emit an event for |code|, if the appropriate entry in 253 * |activities| is defined. 254 */ 255 #maybeEmit(code) { 256 if (this.#activities[code] !== undefined) { 257 const { 258 callback, 259 httpActivity, 260 channel, 261 activityType, 262 activitySubtype, 263 extraSizeData, 264 extraStringData, 265 } = this.#activities[code]; 266 const now = Date.now() * 1000; 267 callback( 268 httpActivity, 269 channel, 270 activityType, 271 activitySubtype, 272 now, 273 extraSizeData, 274 extraStringData 275 ); 276 this.#activities[code] = undefined; 277 } 278 } 279 280 QueryInterface = ChromeUtils.generateQI([ 281 "nsIStreamListener", 282 "nsIInterfaceRequestor", 283 ]); 284 } 285 286 class NetworkThrottleQueue { 287 #latencyMax; 288 #latencyMean; 289 #maxBPS; 290 #meanBPS; 291 #pendingRequests; 292 #previousReads; 293 #pumping; 294 #throttleListeners; 295 296 /** 297 * Construct a new queue that can be used to throttle the network for 298 * a group of related network requests. 299 * 300 * meanBPS {Number} Mean bytes per second. 301 * maxBPS {Number} Maximum bytes per second. 302 * latencyMean {Number} Mean latency in milliseconds. 303 * latencyMax {Number} Maximum latency in milliseconds. 304 */ 305 constructor(meanBPS, maxBPS, latencyMean, latencyMax) { 306 this.#meanBPS = meanBPS; 307 this.#maxBPS = maxBPS; 308 this.#latencyMean = latencyMean; 309 this.#latencyMax = latencyMax; 310 311 this.#pendingRequests = new Set(); 312 this.#throttleListeners = []; 313 this.#previousReads = []; 314 315 this.#pumping = false; 316 } 317 318 destroy() { 319 for (const listener of this.#throttleListeners) { 320 listener.stopThrottling(); 321 } 322 } 323 324 /** 325 * A helper function that lets the indicating listener start sending 326 * data. This is called after the initial round trip time for the 327 * listener has elapsed. 328 */ 329 #allowDataFrom(throttleListener) { 330 throttleListener.responseStart(); 331 this.#pendingRequests.delete(throttleListener); 332 const count = throttleListener.pendingCount(); 333 for (let i = 0; i < count; ++i) { 334 this.#throttleListeners.push(throttleListener); 335 } 336 this.#pump(); 337 } 338 339 /** 340 * An internal function that permits individual listeners to send 341 * data. 342 */ 343 #pump() { 344 // A redirect will cause two NetworkThrottleListeners to be on a 345 // listener chain. In this case, we might recursively call into 346 // this method. Avoid infinite recursion here. 347 if (this.#pumping) { 348 return; 349 } 350 this.#pumping = true; 351 352 const now = Date.now(); 353 const oneSecondAgo = now - 1000; 354 355 while ( 356 this.#previousReads.length && 357 this.#previousReads[0].when < oneSecondAgo 358 ) { 359 this.#previousReads.shift(); 360 } 361 362 const totalBytes = this.#previousReads.reduce((sum, elt) => { 363 return sum + elt.numBytes; 364 }, 0); 365 366 let thisSliceBytes = this.#random(this.#meanBPS, this.#maxBPS); 367 if (totalBytes < thisSliceBytes) { 368 thisSliceBytes -= totalBytes; 369 let readThisTime = 0; 370 while (thisSliceBytes > 0 && this.#throttleListeners.length) { 371 const { length, done } = 372 this.#throttleListeners[0].sendSomeData(thisSliceBytes); 373 thisSliceBytes -= length; 374 readThisTime += length; 375 if (done) { 376 this.#throttleListeners.shift(); 377 } 378 } 379 this.#previousReads.push({ when: now, numBytes: readThisTime }); 380 } 381 382 // If there is more data to download, then schedule ourselves for 383 // one second after the oldest previous read. 384 if (this.#throttleListeners.length) { 385 const when = this.#previousReads[0].when + 1000; 386 lazy.setTimeout(this.#pump.bind(this), when - now); 387 } 388 389 this.#pumping = false; 390 } 391 392 /** 393 * A helper function that, given a mean and a maximum, returns a 394 * random integer between (mean - (max - mean)) and max. 395 */ 396 #random(mean, max) { 397 return mean - (max - mean) + Math.floor(2 * (max - mean) * Math.random()); 398 } 399 400 /** 401 * Notice a new listener object. This is called by the 402 * NetworkThrottleListener when the request has started. Initially 403 * a new listener object is put into a "pending" state, until the 404 * round-trip time has elapsed. This is used to simulate latency. 405 * 406 * @param {NetworkThrottleListener} throttleListener the new listener 407 */ 408 start(throttleListener) { 409 this.#pendingRequests.add(throttleListener); 410 const delay = this.#random(this.#latencyMean, this.#latencyMax); 411 if (delay > 0) { 412 lazy.setTimeout(() => this.#allowDataFrom(throttleListener), delay); 413 } else { 414 this.#allowDataFrom(throttleListener); 415 } 416 } 417 418 /** 419 * Note that new data is available for a given listener. Each time 420 * data is available, the listener will be re-queued. 421 * 422 * @param {NetworkThrottleListener} throttleListener the listener 423 * which has data available. 424 */ 425 dataAvailable(throttleListener) { 426 if (!this.#pendingRequests.has(throttleListener)) { 427 this.#throttleListeners.push(throttleListener); 428 this.#pump(); 429 } 430 } 431 } 432 433 /** 434 * Construct a new object that can be used to throttle the network for 435 * a group of related network requests. 436 * 437 * @param {object} An object with the following attributes: 438 * latencyMean {Number} Mean latency in milliseconds. 439 * latencyMax {Number} Maximum latency in milliseconds. 440 * downloadBPSMean {Number} Mean bytes per second for downloads. 441 * downloadBPSMax {Number} Maximum bytes per second for downloads. 442 * uploadBPSMean {Number} Mean bytes per second for uploads. 443 * uploadBPSMax {Number} Maximum bytes per second for uploads. 444 * 445 * Download throttling will not be done if downloadBPSMean and 446 * downloadBPSMax are <= 0. Upload throttling will not be done if 447 * uploadBPSMean and uploadBPSMax are <= 0. 448 */ 449 export class NetworkThrottleManager { 450 #downloadQueue; 451 #uploadQueue; 452 453 constructor({ 454 latencyMean, 455 latencyMax, 456 downloadBPSMean, 457 downloadBPSMax, 458 uploadBPSMean, 459 uploadBPSMax, 460 }) { 461 if (downloadBPSMax <= 0 && downloadBPSMean <= 0) { 462 this.#downloadQueue = null; 463 } else { 464 this.#downloadQueue = new NetworkThrottleQueue( 465 downloadBPSMean, 466 downloadBPSMax, 467 latencyMean, 468 latencyMax 469 ); 470 } 471 if (uploadBPSMax <= 0 && uploadBPSMean <= 0) { 472 this.#uploadQueue = null; 473 } else { 474 this.#uploadQueue = Cc[ 475 "@mozilla.org/network/throttlequeue;1" 476 ].createInstance(Ci.nsIInputChannelThrottleQueue); 477 this.#uploadQueue.init(uploadBPSMean, uploadBPSMax); 478 } 479 } 480 481 destroy() { 482 // The #uploadQueue is not a NetworkThrottleQueue and at the moment, there 483 // is no way to destroy it. 484 if (this.#downloadQueue !== null) { 485 this.#downloadQueue.destroy(); 486 this.#downloadQueue = null; 487 } 488 } 489 490 /** 491 * Create a new NetworkThrottleListener for a given channel and 492 * install it using |setNewListener|. 493 * 494 * @param {nsITraceableChannel} channel the channel to manage 495 * @return {NetworkThrottleListener} the new listener, or null if 496 * download throttling is not being done. 497 */ 498 manage(channel) { 499 if (this.#downloadQueue) { 500 const listener = new NetworkThrottleListener(this.#downloadQueue); 501 const originalListener = channel.setNewListener(listener); 502 listener.setOriginalListener(originalListener); 503 return listener; 504 } 505 return null; 506 } 507 508 /** 509 * Throttle uploads taking place on the given channel. 510 * 511 * @param {nsITraceableChannel} channel the channel to manage 512 */ 513 manageUpload(channel) { 514 if (this.#uploadQueue) { 515 channel = channel.QueryInterface(Ci.nsIThrottledInputChannel); 516 channel.throttleQueue = this.#uploadQueue; 517 } 518 } 519 }