stream-utils.js (15846B)
1 /* This Source Code Form is subject to the terms of the Mozilla Public 2 * License, v. 2.0. If a copy of the MPL was not distributed with this 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4 5 "use strict"; 6 7 const DevToolsUtils = require("resource://devtools/shared/DevToolsUtils.js"); 8 const { dumpv } = DevToolsUtils; 9 const EventEmitter = require("resource://devtools/shared/event-emitter.js"); 10 11 DevToolsUtils.defineLazyGetter(this, "IOUtil", () => { 12 return Cc["@mozilla.org/io-util;1"].getService(Ci.nsIIOUtil); 13 }); 14 15 DevToolsUtils.defineLazyGetter(this, "ScriptableInputStream", () => { 16 return Components.Constructor( 17 "@mozilla.org/scriptableinputstream;1", 18 "nsIScriptableInputStream", 19 "init" 20 ); 21 }); 22 23 const BUFFER_SIZE = 0x8000; 24 25 /** 26 * This helper function (and its companion object) are used by bulk senders and 27 * receivers to read and write data in and out of other streams. Functions that 28 * make use of this tool are passed to callers when it is time to read or write 29 * bulk data. It is highly recommended to use these copier functions instead of 30 * the stream directly because the copier enforces the agreed upon length. 31 * Since bulk mode reuses an existing stream, the sender and receiver must write 32 * and read exactly the agreed upon amount of data, or else the entire transport 33 * will be left in a invalid state. Additionally, other methods of stream 34 * copying (such as NetUtil.asyncCopy) close the streams involved, which would 35 * terminate the debugging transport, and so it is avoided here. 36 * 37 * Overall, this *works*, but clearly the optimal solution would be able to just 38 * use the streams directly. If it were possible to fully implement 39 * nsIInputStream / nsIOutputStream in JS, wrapper streams could be created to 40 * enforce the length and avoid closing, and consumers could use familiar stream 41 * utilities like NetUtil.asyncCopy. 42 * 43 * The function takes two async streams and copies a precise number of bytes 44 * from one to the other. Copying begins immediately, but may complete at some 45 * future time depending on data size. Use the returned promise to know when 46 * it's complete. 47 * 48 * @param input nsIAsyncInputStream 49 * The stream to copy from. 50 * @param output nsIAsyncOutputStream 51 * The stream to copy to. 52 * @param length Integer 53 * The amount of data that needs to be copied. 54 * @return Promise 55 * The promise is resolved when copying completes or rejected if any 56 * (unexpected) errors occur. 57 */ 58 function copyStream(input, output, length) { 59 const copier = new StreamCopier(input, output, length); 60 return copier.copy(); 61 } 62 63 class StreamCopier { 64 static _nextId = 0; 65 66 constructor(input, output, length) { 67 EventEmitter.decorate(this); 68 this._id = StreamCopier._nextId++; 69 this.input = input; 70 // Save off the base output stream, since we know it's async as we've required 71 this.baseAsyncOutput = output; 72 if (IOUtil.outputStreamIsBuffered(output)) { 73 this.output = output; 74 } else { 75 this.output = Cc[ 76 "@mozilla.org/network/buffered-output-stream;1" 77 ].createInstance(Ci.nsIBufferedOutputStream); 78 this.output.init(output, BUFFER_SIZE); 79 } 80 this._length = length; 81 this._amountLeft = length; 82 let _resolve; 83 let _reject; 84 this._deferred = new Promise((resolve, reject) => { 85 _resolve = resolve; 86 _reject = reject; 87 }); 88 this._deferred.resolve = _resolve; 89 this._deferred.reject = _reject; 90 91 this._copy = this._copy.bind(this); 92 this._flush = this._flush.bind(this); 93 this._destroy = this._destroy.bind(this); 94 95 // Copy promise's then method up to this object. 96 // Allows the copier to offer a promise interface for the simple succeed or 97 // fail scenarios, but also emit events (due to the EventEmitter) for other 98 // states, like progress. 99 this.then = this._deferred.then.bind(this._deferred); 100 this.then(this._destroy, this._destroy); 101 102 // Stream ready callback starts as |_copy|, but may switch to |_flush| at end 103 // if flushing would block the output stream. 104 this._streamReadyCallback = this._copy; 105 } 106 copy() { 107 // Dispatch to the next tick so that it's possible to attach a progress 108 // event listener, even for extremely fast copies (like when testing). 109 Services.tm.dispatchToMainThread(() => { 110 try { 111 this._copy(); 112 } catch (e) { 113 this._deferred.reject(e); 114 } 115 }); 116 return this; 117 } 118 119 _copy() { 120 const bytesAvailable = this.input.available(); 121 const amountToCopy = Math.min(bytesAvailable, this._amountLeft); 122 this._debug("Trying to copy: " + amountToCopy); 123 124 let bytesCopied; 125 try { 126 bytesCopied = this.output.writeFrom(this.input, amountToCopy); 127 } catch (e) { 128 if (e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK) { 129 this._debug("Base stream would block, will retry"); 130 this._debug("Waiting for output stream"); 131 this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread); 132 return; 133 } 134 throw e; 135 } 136 137 this._amountLeft -= bytesCopied; 138 this._debug("Copied: " + bytesCopied + ", Left: " + this._amountLeft); 139 this._emitProgress(); 140 141 if (this._amountLeft === 0) { 142 this._debug("Copy done!"); 143 this._flush(); 144 return; 145 } 146 147 this._debug("Waiting for input stream"); 148 this.input.asyncWait(this, 0, 0, Services.tm.currentThread); 149 } 150 151 _emitProgress() { 152 this.emit("progress", { 153 bytesSent: this._length - this._amountLeft, 154 totalBytes: this._length, 155 }); 156 } 157 158 _flush() { 159 try { 160 this.output.flush(); 161 } catch (e) { 162 if ( 163 e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK || 164 e.result == Cr.NS_ERROR_FAILURE 165 ) { 166 this._debug("Flush would block, will retry"); 167 this._streamReadyCallback = this._flush; 168 this._debug("Waiting for output stream"); 169 this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread); 170 return; 171 } 172 throw e; 173 } 174 this._deferred.resolve(); 175 } 176 177 _destroy() { 178 this._destroy = null; 179 this._copy = null; 180 this._flush = null; 181 this.input = null; 182 this.output = null; 183 } 184 185 // nsIInputStreamCallback 186 onInputStreamReady() { 187 this._streamReadyCallback(); 188 } 189 190 // nsIOutputStreamCallback 191 onOutputStreamReady() { 192 this._streamReadyCallback(); 193 } 194 195 _debug(msg) { 196 // Prefix logs with the copier ID, which makes logs much easier to 197 // understand when several copiers are running simultaneously 198 dumpv("Copier: " + this._id + " " + msg); 199 } 200 } 201 202 /** 203 * Read from a stream, one byte at a time, up to the next |delimiter| 204 * character, but stopping if we've read |count| without finding it. Reading 205 * also terminates early if there are less than |count| bytes available on the 206 * stream. In that case, we only read as many bytes as the stream currently has 207 * to offer. 208 * TODO: This implementation could be removed if bug 984651 is fixed, which 209 * provides a native version of the same idea. 210 * 211 * @param stream nsIInputStream 212 * The input stream to read from. 213 * @param delimiter string 214 * The character we're trying to find. 215 * @param count integer 216 * The max number of characters to read while searching. 217 * @return string 218 * The data collected. If the delimiter was found, this string will 219 * end with it. 220 */ 221 function delimitedRead(stream, delimiter, count) { 222 dumpv( 223 "Starting delimited read for " + delimiter + " up to " + count + " bytes" 224 ); 225 226 let scriptableStream; 227 if (stream instanceof Ci.nsIScriptableInputStream) { 228 scriptableStream = stream; 229 } else { 230 scriptableStream = new ScriptableInputStream(stream); 231 } 232 233 let data = ""; 234 235 // Don't exceed what's available on the stream 236 count = Math.min(count, stream.available()); 237 238 if (count <= 0) { 239 return data; 240 } 241 242 let char; 243 while (char !== delimiter && count > 0) { 244 char = scriptableStream.readBytes(1); 245 count--; 246 data += char; 247 } 248 249 return data; 250 } 251 252 /** 253 * This function efficiently copies an async stream to an array buffer. 254 * Usage: 255 * // The buffer length is used to define the length of data to copy from the stream. 256 * const buffer = new ArrayBuffer(length); 257 * await copyAsyncStreamToArrayBuffer(inputStream, buffer); 258 * 259 * @param {nsIAsyncStream} asyncInputStream 260 * @param {ArrayBuffer} buffer The byteLength of this buffer will be used to define the length of data to copy from the stream. 261 */ 262 async function copyAsyncStreamToArrayBuffer(asyncInputStream, buffer) { 263 const reader = new AsyncStreamToArrayBufferCopier(asyncInputStream, buffer); 264 await reader.asyncRead(); 265 } 266 267 class AsyncStreamToArrayBufferCopier { 268 #BUFFER_SIZE = 16 * 1024; // A 16k buffer 269 270 /** 271 * @typedef {nsIAsyncInputStream} 272 */ 273 #originalStream; 274 275 /** 276 * This is a wrapper on top of #originalStream, to be able to read buffers 277 * easily. 278 * 279 * @typedef {nsIBinaryInputStream} 280 */ 281 #binaryStream; 282 283 /** 284 * This is the output buffer, accessed as an UInt8Array. 285 * 286 * @typedef {Uint8Array} 287 */ 288 #outputArray; 289 290 /** 291 * How many bytes have been read already. This is also the next index to write 292 * in #outputArray. 293 * 294 * @typedef {number} 295 */ 296 #pointer = 0; 297 298 /** 299 * The count of bytes to be transfered. It is infered from the byteLength of 300 * of the output buffer. 301 * 302 * @typedef {number} 303 */ 304 #count; 305 306 /** 307 * This temporary buffer is used when reading from #binaryStream. 308 * 309 * @typedef {ArrayBuffer} 310 */ 311 #tempBuffer; 312 313 /** 314 * @typedef {Uint8Array} 315 */ 316 #tempBufferAsArray; 317 318 /** 319 * @param {nsIAsyncStream} stream 320 * @param {ArrayBuffer} arrayBuffer The byteLength of this buffer will be used to define the length of data to copy from the stream. 321 */ 322 constructor(stream, arrayBuffer) { 323 this.#originalStream = stream; 324 this.#binaryStream = Cc["@mozilla.org/binaryinputstream;1"].createInstance( 325 Ci.nsIBinaryInputStream 326 ); 327 this.#binaryStream.setInputStream(stream); 328 329 this.#outputArray = new Uint8Array(arrayBuffer); 330 this.#count = arrayBuffer.byteLength; 331 this.#tempBuffer = new ArrayBuffer(this.#BUFFER_SIZE); 332 this.#tempBufferAsArray = new Uint8Array(this.#tempBuffer); 333 } 334 335 /** 336 * @returns {Promise<void>} Resolves when the reading has finished. 337 */ 338 async asyncRead() { 339 do { 340 await this.#waitForStreamAvailability(); 341 this.#syncRead(); 342 } while (this.#pointer < this.#count); 343 dumpv(`Successfully read ${this.#count} bytes!`); 344 } 345 346 /** 347 * @returns {Promise<void>} Resolves when the stream is available. 348 */ 349 async #waitForStreamAvailability() { 350 return new Promise(resolve => { 351 this.#originalStream.asyncWait( 352 () => resolve(), 353 0, 354 0, 355 Services.tm.currentThread 356 ); 357 }); 358 } 359 360 /** 361 * @returns {void} 362 */ 363 #syncRead() { 364 const amountLeft = this.#count - this.#pointer; 365 const count = Math.min(this.#binaryStream.available(), amountLeft); 366 if (count <= 0) { 367 return; 368 } 369 370 dumpv( 371 `Will read synchronously ${count} bytes out of ${amountLeft} bytes left.` 372 ); 373 374 let remaining = count; 375 while (remaining) { 376 // TODO readArrayBuffer doesn't know how to write to an offset in the buffer, 377 // see bug 1962705. 378 const willRead = Math.min(remaining, this.#BUFFER_SIZE); 379 const hasRead = this.#binaryStream.readArrayBuffer( 380 willRead, 381 this.#tempBuffer 382 ); 383 384 if (hasRead < willRead) { 385 console.error( 386 `[devtools perf front] We were expecting ${willRead} bytes, but received ${hasRead} bytes instead.` 387 ); 388 } 389 const toCopyArray = this.#tempBufferAsArray.subarray(0, hasRead); 390 this.#outputArray.set(toCopyArray, this.#pointer); 391 this.#pointer += hasRead; 392 remaining -= hasRead; 393 } 394 dumpv( 395 `${count} bytes have been successfully read. Total: ${this.#pointer} / ${this.#count}` 396 ); 397 } 398 } 399 400 /** 401 * This function efficiently copies the content of an array buffer to an async stream. 402 * Usage: 403 * // The buffer length is used to define the length of data to copy to the stream. 404 * await copyArrayBufferToAsyncStream(buffer, asyncOutputStream); 405 * 406 * @param {ArrayBuffer} buffer The byteLength of this buffer will be used to define the length of data to copy to the stream. 407 * @param {nsIAsyncStream} asyncOutputStream 408 */ 409 async function copyArrayBufferToAsyncStream(buffer, asyncOutputStream) { 410 const writer = new ArrayBufferToAsyncStreamCopier(buffer, asyncOutputStream); 411 await writer.asyncWrite(); 412 } 413 414 class ArrayBufferToAsyncStreamCopier { 415 #BUFFER_SIZE = 16 * 1024; // A 16k buffer 416 417 /** 418 * @typedef {nsIAsyncOutputStream} 419 */ 420 #originalStream; 421 422 /** 423 * This is a wrapper on top of #originalStream, to be able to write buffers 424 * easily. 425 * 426 * @typedef {nsIBinaryOutputStream} 427 */ 428 #binaryStream; 429 430 /** 431 * This is the input buffer, accessed as an UInt8Array. 432 * 433 * @typedef {Uint8Array} 434 */ 435 #inputArray; 436 437 /** 438 * How many bytes have been read already. This is also the next index to read 439 * in #outputArray. 440 * 441 * @typedef {number} 442 */ 443 #pointer = 0; 444 445 /** 446 * The count of bytes to be transfered. It is infered from the byteLength of 447 * of the input buffer. 448 * 449 * @typedef {number} 450 */ 451 #count; 452 453 /** 454 * @param {ArrayBuffer} arrayBuffer The byteLength of this buffer will be used to define the length of data to copy to the stream. 455 * @param {nsIAsyncStream} stream 456 */ 457 constructor(arrayBuffer, stream) { 458 this.#originalStream = stream; 459 this.#binaryStream = Cc["@mozilla.org/binaryoutputstream;1"].createInstance( 460 Ci.nsIBinaryOutputStream 461 ); 462 this.#binaryStream.setOutputStream(stream); 463 464 this.#inputArray = new Uint8Array(arrayBuffer); 465 this.#count = arrayBuffer.byteLength; 466 } 467 468 /** 469 * @returns {Promise<void>} Resolves when the reading has finished. 470 */ 471 async asyncWrite() { 472 do { 473 await this.#waitForStreamAvailability(); 474 this.#syncWrite(); 475 } while (this.#pointer < this.#count); 476 dumpv(`Successfully wrote ${this.#count} bytes!`); 477 } 478 479 /** 480 * @returns {Promise<void>} Resolves when the stream is available. 481 */ 482 async #waitForStreamAvailability() { 483 return new Promise(resolve => { 484 this.#originalStream.asyncWait( 485 () => resolve(), 486 0, 487 0, 488 Services.tm.currentThread 489 ); 490 }); 491 } 492 493 /** 494 * @returns {void} 495 */ 496 #syncWrite() { 497 const amountLeft = this.#count - this.#pointer; 498 if (amountLeft <= 0) { 499 return; 500 } 501 502 let remaining = amountLeft; 503 while (remaining) { 504 const willWrite = Math.min(remaining, this.#BUFFER_SIZE); 505 const subarray = this.#inputArray.subarray( 506 this.#pointer, 507 this.#pointer + willWrite 508 ); 509 try { 510 // Bug 1962705: writeByteArray does a copy in 511 // https://searchfox.org/mozilla-central/rev/3d294b119bf2add880f615a0fc61a5d54bcd6264/js/xpconnect/src/XPCConvert.cpp#1440 512 // modify BinaryOutputStream so that it can read directly from the buffer. 513 this.#binaryStream.writeByteArray(subarray); 514 } catch (e) { 515 if (e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK) { 516 dumpv( 517 `Base stream would block, will retry. ${amountLeft - remaining} bytes have been successfully written. Total: ${this.#pointer} / ${this.#count}` 518 ); 519 return; 520 } 521 throw e; 522 } 523 524 this.#pointer += willWrite; 525 remaining -= willWrite; 526 } 527 dumpv( 528 `${amountLeft - remaining} bytes have been successfully written. Total: ${this.#pointer} / ${this.#count}` 529 ); 530 } 531 } 532 533 module.exports = { 534 copyStream, 535 delimitedRead, 536 copyAsyncStreamToArrayBuffer, 537 copyArrayBufferToAsyncStream, 538 };