transport.sys.mjs (16600B)
1 /* This Source Code Form is subject to the terms of the Mozilla Public 2 * License, v. 2.0. If a copy of the MPL was not distributed with this 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4 5 const lazy = {}; 6 7 ChromeUtils.defineESModuleGetters(lazy, { 8 EventEmitter: "resource://gre/modules/EventEmitter.sys.mjs", 9 10 BulkPacket: "chrome://remote/content/marionette/packets.sys.mjs", 11 executeSoon: "chrome://remote/content/shared/Sync.sys.mjs", 12 JSONPacket: "chrome://remote/content/marionette/packets.sys.mjs", 13 Packet: "chrome://remote/content/marionette/packets.sys.mjs", 14 StreamUtils: "chrome://remote/content/marionette/stream-utils.sys.mjs", 15 }); 16 17 ChromeUtils.defineLazyGetter(lazy, "ScriptableInputStream", () => { 18 return Components.Constructor( 19 "@mozilla.org/scriptableinputstream;1", 20 "nsIScriptableInputStream", 21 "init" 22 ); 23 }); 24 25 const flags = { wantVerbose: false, wantLogging: false }; 26 27 const dumpv = flags.wantVerbose 28 ? function (msg) { 29 dump(msg + "\n"); 30 } 31 : function () {}; 32 33 const PACKET_HEADER_MAX = 200; 34 35 /** 36 * An adapter that handles data transfers between the debugger client 37 * and server. It can work with both nsIPipe and nsIServerSocket 38 * transports so long as the properly created input and output streams 39 * are specified. (However, for intra-process connections, 40 * LocalDebuggerTransport, below, is more efficient than using an nsIPipe 41 * pair with DebuggerTransport.) 42 * 43 * @param {nsIAsyncInputStream} input 44 * The input stream. 45 * @param {nsIAsyncOutputStream} output 46 * The output stream. 47 * 48 * Given a DebuggerTransport instance dt: 49 * 1) Set dt.hooks to a packet handler object (described below). 50 * 2) Call dt.ready() to begin watching for input packets. 51 * 3) Call dt.send() / dt.startBulkSend() to send packets. 52 * 4) Call dt.close() to close the connection, and disengage from 53 * the event loop. 54 * 55 * A packet handler is an object with the following methods: 56 * 57 * - onPacket(packet) - called when we have received a complete packet. 58 * |packet| is the parsed form of the packet --- a JavaScript value, not 59 * a JSON-syntax string. 60 * 61 * - onBulkPacket(packet) - called when we have switched to bulk packet 62 * receiving mode. |packet| is an object containing: 63 * actor: Name of actor that will receive the packet 64 * type: Name of actor's method that should be called on receipt 65 * length: Size of the data to be read 66 * stream: This input stream should only be used directly if you 67 * can ensure that you will read exactly |length| bytes and 68 * will not close the stream when reading is complete 69 * done: If you use the stream directly (instead of |copyTo| 70 * below), you must signal completion by resolving/rejecting 71 * this deferred. If it's rejected, the transport will 72 * be closed. If an Error is supplied as a rejection value, 73 * it will be logged via |dump|. If you do use |copyTo|, 74 * resolving is taken care of for you when copying completes. 75 * copyTo: A helper function for getting your data out of the 76 * stream that meets the stream handling requirements above, 77 * and has the following signature: 78 * 79 * - params 80 * {nsIAsyncOutputStream} output 81 * The stream to copy to. 82 * - returns {Promise} 83 * The promise is resolved when copying completes or 84 * rejected if any (unexpected) errors occur. This object 85 * also emits "progress" events for each chunk that is 86 * copied. See stream-utils.js. 87 * 88 * - onClosed(reason) - called when the connection is closed. |reason| 89 * is an optional nsresult or object, typically passed when the 90 * transport is closed due to some error in a underlying stream. 91 * 92 * See ./packets.js and the Remote Debugging Protocol specification for 93 * more details on the format of these packets. 94 * 95 * @class 96 */ 97 export function DebuggerTransport(input, output) { 98 lazy.EventEmitter.decorate(this); 99 100 this._input = input; 101 this._scriptableInput = new lazy.ScriptableInputStream(input); 102 this._output = output; 103 104 // The current incoming (possibly partial) header, which will determine 105 // which type of Packet |_incoming| below will become. 106 this._incomingHeader = ""; 107 // The current incoming Packet object 108 this._incoming = null; 109 // A queue of outgoing Packet objects 110 this._outgoing = []; 111 112 this.hooks = null; 113 this.active = false; 114 115 this._incomingEnabled = true; 116 this._outgoingEnabled = true; 117 118 this.close = this.close.bind(this); 119 } 120 121 DebuggerTransport.prototype = { 122 /** 123 * Transmit an object as a JSON packet. 124 * 125 * This method returns immediately, without waiting for the entire 126 * packet to be transmitted, registering event handlers as needed to 127 * transmit the entire packet. Packets are transmitted in the order they 128 * are passed to this method. 129 */ 130 send(object) { 131 this.emit("send", object); 132 133 let packet = new lazy.JSONPacket(this); 134 packet.object = object; 135 this._outgoing.push(packet); 136 this._flushOutgoing(); 137 }, 138 139 /** 140 * Transmit streaming data via a bulk packet. 141 * 142 * This method initiates the bulk send process by queuing up the header 143 * data. The caller receives eventual access to a stream for writing. 144 * 145 * N.B.: Do *not* attempt to close the stream handed to you, as it 146 * will continue to be used by this transport afterwards. Most users 147 * should instead use the provided |copyFrom| function instead. 148 * 149 * @param {object} header 150 * This is modeled after the format of JSON packets above, but does 151 * not actually contain the data, but is instead just a routing 152 * header: 153 * 154 * - actor: Name of actor that will receive the packet 155 * - type: Name of actor's method that should be called on receipt 156 * - length: Size of the data to be sent 157 * 158 * @returns {Promise} 159 * The promise will be resolved when you are allowed to write to 160 * the stream with an object containing: 161 * 162 * - stream: This output stream should only be used directly 163 * if you can ensure that you will write exactly 164 * |length| bytes and will not close the stream when 165 * writing is complete. 166 * - done: If you use the stream directly (instead of 167 * |copyFrom| below), you must signal completion by 168 * resolving/rejecting this deferred. If it's 169 * rejected, the transport will be closed. If an 170 * Error is supplied as a rejection value, it will 171 * be logged via |dump|. If you do use |copyFrom|, 172 * resolving is taken care of for you when copying 173 * completes. 174 * - copyFrom: A helper function for getting your data onto the 175 * stream that meets the stream handling requirements 176 * above, and has the following signature: 177 * 178 * - params 179 * {nsIAsyncInputStream} input 180 * The stream to copy from. 181 * - returns {Promise} 182 * The promise is resolved when copying completes 183 * or rejected if any (unexpected) errors occur. 184 * This object also emits "progress" events for 185 * each chunkthat is copied. See stream-utils.js. 186 */ 187 startBulkSend(header) { 188 this.emit("startbulksend", header); 189 190 let packet = new lazy.BulkPacket(this); 191 packet.header = header; 192 this._outgoing.push(packet); 193 this._flushOutgoing(); 194 return packet.streamReadyForWriting; 195 }, 196 197 /** 198 * Close the transport. 199 * 200 * @param {(nsresult|object)=} reason 201 * The status code or error message that corresponds to the reason 202 * for closing the transport (likely because a stream closed 203 * or failed). 204 */ 205 close(reason) { 206 this.emit("close", reason); 207 208 this.active = false; 209 this._input.close(); 210 this._scriptableInput.close(); 211 this._output.close(); 212 this._destroyIncoming(); 213 this._destroyAllOutgoing(); 214 if (this.hooks) { 215 this.hooks.onClosed(reason); 216 this.hooks = null; 217 } 218 if (reason) { 219 dumpv("Transport closed: " + reason); 220 } else { 221 dumpv("Transport closed."); 222 } 223 }, 224 225 /** 226 * The currently outgoing packet (at the top of the queue). 227 */ 228 get _currentOutgoing() { 229 return this._outgoing[0]; 230 }, 231 232 /** 233 * Flush data to the outgoing stream. Waits until the output 234 * stream notifies us that it is ready to be written to (via 235 * onOutputStreamReady). 236 */ 237 _flushOutgoing() { 238 if (!this._outgoingEnabled || this._outgoing.length === 0) { 239 return; 240 } 241 242 // If the top of the packet queue has nothing more to send, remove it. 243 if (this._currentOutgoing.done) { 244 this._finishCurrentOutgoing(); 245 } 246 247 if (this._outgoing.length) { 248 let threadManager = Cc["@mozilla.org/thread-manager;1"].getService(); 249 this._output.asyncWait(this, 0, 0, threadManager.currentThread); 250 } 251 }, 252 253 /** 254 * Pause this transport's attempts to write to the output stream. 255 * This is used when we've temporarily handed off our output stream for 256 * writing bulk data. 257 */ 258 pauseOutgoing() { 259 this._outgoingEnabled = false; 260 }, 261 262 /** 263 * Resume this transport's attempts to write to the output stream. 264 */ 265 resumeOutgoing() { 266 this._outgoingEnabled = true; 267 this._flushOutgoing(); 268 }, 269 270 // nsIOutputStreamCallback 271 /** 272 * This is called when the output stream is ready for more data to 273 * be written. The current outgoing packet will attempt to write some 274 * amount of data, but may not complete. 275 */ 276 onOutputStreamReady(stream) { 277 if (!this._outgoingEnabled || this._outgoing.length === 0) { 278 return; 279 } 280 281 try { 282 this._currentOutgoing.write(stream); 283 } catch (e) { 284 if (e.result != Cr.NS_BASE_STREAM_WOULD_BLOCK) { 285 this.close(e.result); 286 return; 287 } 288 throw e; 289 } 290 291 this._flushOutgoing(); 292 }, 293 294 /** 295 * Remove the current outgoing packet from the queue upon completion. 296 */ 297 _finishCurrentOutgoing() { 298 if (this._currentOutgoing) { 299 this._currentOutgoing.destroy(); 300 this._outgoing.shift(); 301 } 302 }, 303 304 /** 305 * Clear the entire outgoing queue. 306 */ 307 _destroyAllOutgoing() { 308 for (let packet of this._outgoing) { 309 packet.destroy(); 310 } 311 this._outgoing = []; 312 }, 313 314 /** 315 * Initialize the input stream for reading. Once this method has been 316 * called, we watch for packets on the input stream, and pass them to 317 * the appropriate handlers via this.hooks. 318 */ 319 ready() { 320 this.active = true; 321 this._waitForIncoming(); 322 }, 323 324 /** 325 * Asks the input stream to notify us (via onInputStreamReady) when it is 326 * ready for reading. 327 */ 328 _waitForIncoming() { 329 if (this._incomingEnabled) { 330 let threadManager = Cc["@mozilla.org/thread-manager;1"].getService(); 331 this._input.asyncWait(this, 0, 0, threadManager.currentThread); 332 } 333 }, 334 335 /** 336 * Pause this transport's attempts to read from the input stream. 337 * This is used when we've temporarily handed off our input stream for 338 * reading bulk data. 339 */ 340 pauseIncoming() { 341 this._incomingEnabled = false; 342 }, 343 344 /** 345 * Resume this transport's attempts to read from the input stream. 346 */ 347 resumeIncoming() { 348 this._incomingEnabled = true; 349 this._flushIncoming(); 350 this._waitForIncoming(); 351 }, 352 353 // nsIInputStreamCallback 354 /** 355 * Called when the stream is either readable or closed. 356 */ 357 onInputStreamReady(stream) { 358 try { 359 while ( 360 stream.available() && 361 this._incomingEnabled && 362 this._processIncoming(stream, stream.available()) 363 ) { 364 // Loop until there is nothing more to process 365 } 366 this._waitForIncoming(); 367 } catch (e) { 368 if (e.result != Cr.NS_BASE_STREAM_WOULD_BLOCK) { 369 this.close(e.result); 370 } else { 371 throw e; 372 } 373 } 374 }, 375 376 /** 377 * Process the incoming data. Will create a new currently incoming 378 * Packet if needed. Tells the incoming Packet to read as much data 379 * as it can, but reading may not complete. The Packet signals that 380 * its data is ready for delivery by calling one of this transport's 381 * _on*Ready methods (see ./packets.js and the _on*Ready methods below). 382 * 383 * @returns {boolean} 384 * Whether incoming stream processing should continue for any 385 * remaining data. 386 */ 387 _processIncoming(stream, count) { 388 dumpv("Data available: " + count); 389 390 if (!count) { 391 dumpv("Nothing to read, skipping"); 392 return false; 393 } 394 395 try { 396 if (!this._incoming) { 397 dumpv("Creating a new packet from incoming"); 398 399 if (!this._readHeader(stream)) { 400 // Not enough data to read packet type 401 return false; 402 } 403 404 // Attempt to create a new Packet by trying to parse each possible 405 // header pattern. 406 this._incoming = lazy.Packet.fromHeader(this._incomingHeader, this); 407 if (!this._incoming) { 408 throw new Error( 409 "No packet types for header: " + this._incomingHeader 410 ); 411 } 412 } 413 414 if (!this._incoming.done) { 415 // We have an incomplete packet, keep reading it. 416 dumpv("Existing packet incomplete, keep reading"); 417 this._incoming.read(stream, this._scriptableInput); 418 } 419 } catch (e) { 420 dump(`Error reading incoming packet: (${e} - ${e.stack})\n`); 421 422 // Now in an invalid state, shut down the transport. 423 this.close(); 424 return false; 425 } 426 427 if (!this._incoming.done) { 428 // Still not complete, we'll wait for more data. 429 dumpv("Packet not done, wait for more"); 430 return true; 431 } 432 433 // Ready for next packet 434 this._flushIncoming(); 435 return true; 436 }, 437 438 /** 439 * Read as far as we can into the incoming data, attempting to build 440 * up a complete packet header (which terminates with ":"). We'll only 441 * read up to PACKET_HEADER_MAX characters. 442 * 443 * @returns {boolean} 444 * True if we now have a complete header. 445 */ 446 _readHeader() { 447 let amountToRead = PACKET_HEADER_MAX - this._incomingHeader.length; 448 this._incomingHeader += lazy.StreamUtils.delimitedRead( 449 this._scriptableInput, 450 ":", 451 amountToRead 452 ); 453 if (flags.wantVerbose) { 454 dumpv("Header read: " + this._incomingHeader); 455 } 456 457 if (this._incomingHeader.endsWith(":")) { 458 if (flags.wantVerbose) { 459 dumpv("Found packet header successfully: " + this._incomingHeader); 460 } 461 return true; 462 } 463 464 if (this._incomingHeader.length >= PACKET_HEADER_MAX) { 465 throw new Error("Failed to parse packet header!"); 466 } 467 468 // Not enough data yet. 469 return false; 470 }, 471 472 /** 473 * If the incoming packet is done, log it as needed and clear the buffer. 474 */ 475 _flushIncoming() { 476 if (!this._incoming.done) { 477 return; 478 } 479 if (flags.wantLogging) { 480 dumpv("Got: " + this._incoming); 481 } 482 this._destroyIncoming(); 483 }, 484 485 /** 486 * Handler triggered by an incoming JSONPacket completing it's |read| 487 * method. Delivers the packet to this.hooks.onPacket. 488 */ 489 _onJSONObjectReady(object) { 490 lazy.executeSoon(() => { 491 // Ensure the transport is still alive by the time this runs. 492 if (this.active) { 493 this.emit("packet", object); 494 this.hooks.onPacket(object); 495 } 496 }); 497 }, 498 499 /** 500 * Handler triggered by an incoming BulkPacket entering the |read| 501 * phase for the stream portion of the packet. Delivers info about the 502 * incoming streaming data to this.hooks.onBulkPacket. See the main 503 * comment on the transport at the top of this file for more details. 504 */ 505 _onBulkReadReady(...args) { 506 lazy.executeSoon(() => { 507 // Ensure the transport is still alive by the time this runs. 508 if (this.active) { 509 this.emit("bulkpacket", ...args); 510 this.hooks.onBulkPacket(...args); 511 } 512 }); 513 }, 514 515 /** 516 * Remove all handlers and references related to the current incoming 517 * packet, either because it is now complete or because the transport 518 * is closing. 519 */ 520 _destroyIncoming() { 521 if (this._incoming) { 522 this._incoming.destroy(); 523 } 524 this._incomingHeader = ""; 525 this._incoming = null; 526 }, 527 };