stream-utils.sys.mjs (7749B)
1 /* This Source Code Form is subject to the terms of the Mozilla Public 2 * License, v. 2.0. If a copy of the MPL was not distributed with this 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4 5 import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs"; 6 7 const lazy = {}; 8 9 ChromeUtils.defineESModuleGetters(lazy, { 10 EventEmitter: "resource://gre/modules/EventEmitter.sys.mjs", 11 }); 12 13 XPCOMUtils.defineLazyServiceGetter( 14 lazy, 15 "IOUtil", 16 "@mozilla.org/io-util;1", 17 Ci.nsIIOUtil 18 ); 19 20 ChromeUtils.defineLazyGetter(lazy, "ScriptableInputStream", () => { 21 return Components.Constructor( 22 "@mozilla.org/scriptableinputstream;1", 23 "nsIScriptableInputStream", 24 "init" 25 ); 26 }); 27 28 const BUFFER_SIZE = 0x8000; 29 30 /** 31 * This helper function (and its companion object) are used by bulk 32 * senders and receivers to read and write data in and out of other streams. 33 * Functions that make use of this tool are passed to callers when it is 34 * time to read or write bulk data. It is highly recommended to use these 35 * copier functions instead of the stream directly because the copier 36 * enforces the agreed upon length. Since bulk mode reuses an existing 37 * stream, the sender and receiver must write and read exactly the agreed 38 * upon amount of data, or else the entire transport will be left in a 39 * invalid state. Additionally, other methods of stream copying (such as 40 * NetUtil.asyncCopy) close the streams involved, which would terminate 41 * the debugging transport, and so it is avoided here. 42 * 43 * Overall, this *works*, but clearly the optimal solution would be 44 * able to just use the streams directly. If it were possible to fully 45 * implement nsIInputStream/nsIOutputStream in JS, wrapper streams could 46 * be created to enforce the length and avoid closing, and consumers could 47 * use familiar stream utilities like NetUtil.asyncCopy. 48 * 49 * The function takes two async streams and copies a precise number 50 * of bytes from one to the other. Copying begins immediately, but may 51 * complete at some future time depending on data size. Use the returned 52 * promise to know when it's complete. 53 * 54 * @param {nsIAsyncInputStream} input 55 * Stream to copy from. 56 * @param {nsIAsyncOutputStream} output 57 * Stream to copy to. 58 * @param {number} length 59 * Amount of data that needs to be copied. 60 * 61 * @returns {Promise} 62 * Promise is resolved when copying completes or rejected if any 63 * (unexpected) errors occur. 64 */ 65 function copyStream(input, output, length) { 66 let copier = new StreamCopier(input, output, length); 67 return copier.copy(); 68 } 69 70 /** @class */ 71 function StreamCopier(input, output, length) { 72 lazy.EventEmitter.decorate(this); 73 this._id = StreamCopier._nextId++; 74 this.input = input; 75 // Save off the base output stream, since we know it's async as we've 76 // required 77 this.baseAsyncOutput = output; 78 if (lazy.IOUtil.outputStreamIsBuffered(output)) { 79 this.output = output; 80 } else { 81 this.output = Cc[ 82 "@mozilla.org/network/buffered-output-stream;1" 83 ].createInstance(Ci.nsIBufferedOutputStream); 84 this.output.init(output, BUFFER_SIZE); 85 } 86 this._length = length; 87 this._amountLeft = length; 88 this._deferred = { 89 promise: new Promise((resolve, reject) => { 90 this._deferred.resolve = resolve; 91 this._deferred.reject = reject; 92 }), 93 }; 94 95 this._copy = this._copy.bind(this); 96 this._flush = this._flush.bind(this); 97 this._destroy = this._destroy.bind(this); 98 99 // Copy promise's then method up to this object. 100 // 101 // Allows the copier to offer a promise interface for the simple succeed 102 // or fail scenarios, but also emit events (due to the EventEmitter) 103 // for other states, like progress. 104 this.then = this._deferred.promise.then.bind(this._deferred.promise); 105 this.then(this._destroy, this._destroy); 106 107 // Stream ready callback starts as |_copy|, but may switch to |_flush| 108 // at end if flushing would block the output stream. 109 this._streamReadyCallback = this._copy; 110 } 111 StreamCopier._nextId = 0; 112 113 StreamCopier.prototype = { 114 copy() { 115 // Dispatch to the next tick so that it's possible to attach a progress 116 // event listener, even for extremely fast copies (like when testing). 117 Services.tm.currentThread.dispatch(() => { 118 try { 119 this._copy(); 120 } catch (e) { 121 this._deferred.reject(e); 122 } 123 }, 0); 124 return this; 125 }, 126 127 _copy() { 128 let bytesAvailable = this.input.available(); 129 let amountToCopy = Math.min(bytesAvailable, this._amountLeft); 130 this._debug("Trying to copy: " + amountToCopy); 131 132 let bytesCopied; 133 try { 134 bytesCopied = this.output.writeFrom(this.input, amountToCopy); 135 } catch (e) { 136 if (e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK) { 137 this._debug("Base stream would block, will retry"); 138 this._debug("Waiting for output stream"); 139 this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread); 140 return; 141 } 142 throw e; 143 } 144 145 this._amountLeft -= bytesCopied; 146 this._debug("Copied: " + bytesCopied + ", Left: " + this._amountLeft); 147 this._emitProgress(); 148 149 if (this._amountLeft === 0) { 150 this._debug("Copy done!"); 151 this._flush(); 152 return; 153 } 154 155 this._debug("Waiting for input stream"); 156 this.input.asyncWait(this, 0, 0, Services.tm.currentThread); 157 }, 158 159 _emitProgress() { 160 this.emit("progress", { 161 bytesSent: this._length - this._amountLeft, 162 totalBytes: this._length, 163 }); 164 }, 165 166 _flush() { 167 try { 168 this.output.flush(); 169 } catch (e) { 170 if ( 171 e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK || 172 e.result == Cr.NS_ERROR_FAILURE 173 ) { 174 this._debug("Flush would block, will retry"); 175 this._streamReadyCallback = this._flush; 176 this._debug("Waiting for output stream"); 177 this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread); 178 return; 179 } 180 throw e; 181 } 182 this._deferred.resolve(); 183 }, 184 185 _destroy() { 186 this._destroy = null; 187 this._copy = null; 188 this._flush = null; 189 this.input = null; 190 this.output = null; 191 }, 192 193 // nsIInputStreamCallback 194 onInputStreamReady() { 195 this._streamReadyCallback(); 196 }, 197 198 // nsIOutputStreamCallback 199 onOutputStreamReady() { 200 this._streamReadyCallback(); 201 }, 202 203 _debug() {}, 204 }; 205 206 /** 207 * Read from a stream, one byte at a time, up to the next 208 * <var>delimiter</var> character, but stopping if we've read |count| 209 * without finding it. Reading also terminates early if there are less 210 * than <var>count</var> bytes available on the stream. In that case, 211 * we only read as many bytes as the stream currently has to offer. 212 * 213 * @param {nsIInputStream} stream 214 * Input stream to read from. 215 * @param {string} delimiter 216 * Character we're trying to find. 217 * @param {number} count 218 * Max number of characters to read while searching. 219 * 220 * @returns {string} 221 * Collected data. If the delimiter was found, this string will 222 * end with it. 223 */ 224 // TODO: This implementation could be removed if bug 984651 is fixed, 225 // which provides a native version of the same idea. 226 function delimitedRead(stream, delimiter, count) { 227 let scriptableStream; 228 if (stream instanceof Ci.nsIScriptableInputStream) { 229 scriptableStream = stream; 230 } else { 231 scriptableStream = new lazy.ScriptableInputStream(stream); 232 } 233 234 let data = ""; 235 236 // Don't exceed what's available on the stream 237 count = Math.min(count, stream.available()); 238 239 if (count <= 0) { 240 return data; 241 } 242 243 let char; 244 while (char !== delimiter && count > 0) { 245 char = scriptableStream.readBytes(1); 246 count--; 247 data += char; 248 } 249 250 return data; 251 } 252 253 export const StreamUtils = { 254 copyStream, 255 delimitedRead, 256 };