transport.js (18111B)
1 /* This Source Code Form is subject to the terms of the Mozilla Public 2 * License, v. 2.0. If a copy of the MPL was not distributed with this 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4 5 "use strict"; 6 7 const DevToolsUtils = require("resource://devtools/shared/DevToolsUtils.js"); 8 const { dumpn, dumpv } = DevToolsUtils; 9 const flags = require("resource://devtools/shared/flags.js"); 10 const StreamUtils = require("resource://devtools/shared/transport/stream-utils.js"); 11 const { 12 Packet, 13 JSONPacket, 14 BulkPacket, 15 } = require("resource://devtools/shared/transport/packets.js"); 16 17 loader.lazyGetter(this, "ScriptableInputStream", () => { 18 return Components.Constructor( 19 "@mozilla.org/scriptableinputstream;1", 20 "nsIScriptableInputStream", 21 "init" 22 ); 23 }); 24 25 const PACKET_HEADER_MAX = 200; 26 27 /** 28 * An adapter that handles data transfers between the devtools client and 29 * server. It can work with both nsIPipe and nsIServerSocket transports so 30 * long as the properly created input and output streams are specified. 31 * (However, for intra-process connections, LocalDebuggerTransport, below, 32 * is more efficient than using an nsIPipe pair with DebuggerTransport.) 33 * 34 * Given a DebuggerTransport instance dt: 35 * 1) Set dt.hooks to a packet handler object (described below). 36 * 2) Call dt.ready() to begin watching for input packets. 37 * 3) Call dt.send() / dt.startBulkSend() to send packets. 38 * 4) Call dt.close() to close the connection, and disengage from the event 39 * loop. 40 * 41 * A packet handler is an object with the following methods: 42 * 43 * - onPacket(packet) - called when we have received a complete packet. 44 * |packet| is the parsed form of the packet --- a JavaScript value, not 45 * a JSON-syntax string. 46 * 47 * - onBulkPacket(packet) - called when we have switched to bulk packet 48 * receiving mode. |packet| is an object containing: 49 * * actor: Name of actor that will receive the packet 50 * * type: Name of actor's method that should be called on receipt 51 * * length: Size of the data to be read 52 * * stream: This input stream should only be used directly if you can ensure 53 * that you will read exactly |length| bytes and will not close the 54 * stream when reading is complete 55 * * done: If you use the stream directly (instead of |copyTo| or 56 * |copyToBuffer| below), you must signal completion by 57 * resolving / rejecting this deferred. If it's rejected, the 58 * transport will be closed. If an Error is supplied as a 59 * rejection value, it will be logged via |dumpn|. If you do 60 * use |copyTo| or |copyToBuffer|, resolving is taken care of 61 * for you when copying completes. 62 * * copyTo: A helper function for getting your data out of the stream that 63 * meets the stream handling requirements above, and has the 64 * following signature: 65 * 66 * @param output nsIAsyncOutputStream 67 * The stream to copy to. 68 * @return Promise 69 * The promise is resolved when copying completes or rejected if any 70 * (unexpected) error occurs. 71 * This object also emits "progress" events for each chunk that is 72 * copied. See stream-utils.js. 73 * * copyToBuffer: a helper function for getting your data out of the stream 74 * that meets the stream handling requirements above, and has 75 * the following signature: 76 * @param output ArrayBuffer 77 * The buffer to copy to. It needs to be the same length as the data 78 * to be transfered. 79 * @return Promise 80 * The promise is resolved when copying completes or rejected if any 81 * (unexpected) error occurs. 82 * 83 * - onTransportClosed(reason) - called when the connection is closed. |reason| is 84 * an optional nsresult or object, typically passed when the transport is 85 * closed due to some error in a underlying stream. 86 * 87 * See ./packets.js and the Remote Debugging Protocol specification for more 88 * details on the format of these packets. 89 */ 90 class DebuggerTransport { 91 /** 92 * @param {nsIAsyncInputStream} input 93 * The input stream. 94 * @param {nsIAsyncOutputStream} output 95 * The output stream. 96 */ 97 constructor(input, output) { 98 this._input = input; 99 this._scriptableInput = new ScriptableInputStream(input); 100 this._output = output; 101 102 // The current incoming (possibly partial) header, which will determine which 103 // type of Packet |_incoming| below will become. 104 this._incomingHeader = ""; 105 // The current incoming Packet object 106 this._incoming = null; 107 // A queue of outgoing Packet objects 108 this._outgoing = []; 109 110 this.hooks = null; 111 this.active = false; 112 113 this._incomingEnabled = true; 114 this._outgoingEnabled = true; 115 116 this.close = this.close.bind(this); 117 } 118 119 /** 120 * Transmit an object as a JSON packet. 121 * 122 * This method returns immediately, without waiting for the entire 123 * packet to be transmitted, registering event handlers as needed to 124 * transmit the entire packet. Packets are transmitted in the order 125 * they are passed to this method. 126 */ 127 send(object) { 128 const packet = new JSONPacket(this); 129 packet.object = object; 130 this._outgoing.push(packet); 131 this._flushOutgoing(); 132 } 133 134 /** 135 * Transmit streaming data via a bulk packet. 136 * 137 * This method initiates the bulk send process by queuing up the header data. 138 * The caller receives eventual access to a stream for writing. 139 * 140 * N.B.: Do *not* attempt to close the stream handed to you, as it will 141 * continue to be used by this transport afterwards. Most users should 142 * instead use the provided |copyFrom| or |copyFromBuffer| functions instead. 143 * 144 * @param {object} header 145 * This is modeled after the format of JSON packets above, but does not 146 * actually contain the data, but is instead just a routing header: 147 * * actor: Name of actor that will receive the packet 148 * * type: Name of actor's method that should be called on receipt 149 * * length: Size of the data to be sent 150 * @return {Promise} 151 * The promise will be resolved when you are allowed to write to the 152 * stream with an object containing: 153 * * stream: This output stream should only be used directly if 154 * you can ensure that you will write exactly |length| 155 * bytes and will not close the stream when writing is 156 * complete 157 * * done: If you use the stream directly (instead of |copyFrom| 158 * or |copyFromBuffer| below), you must signal 159 * completion by resolving / rejecting this 160 * deferred. If it's rejected, the transport will 161 * be closed. If an Error is supplied as a 162 * rejection value, it will be logged via |dumpn|. 163 * If you do use |copyFrom| or |copyFromBuffer|, 164 * resolving is taken care of for you when copying 165 * completes. 166 * * copyFrom: A helper function for getting your data onto the 167 * stream that meets the stream handling requirements 168 * above, and has the following signature: 169 * @param input nsIAsyncInputStream 170 * The stream to copy from. 171 * @return Promise 172 * The promise is resolved when copying completes or 173 * rejected if any (unexpected) errors occur. 174 * This object also emits "progress" events for each chunk 175 * that is copied. See stream-utils.js. 176 * * copyFromBuffer: A helper function for getting your data onto 177 * the stream that meets the stream handling 178 * requirements above, and has the following 179 * signature: 180 * @param input ArrayBuffer 181 * The buffer to read from. It needs to be the same length 182 * as the data to write. 183 * @return Promise 184 * The promise is resolved when copying completes or 185 * rejected if any (unexpected) errors occur. 186 */ 187 startBulkSend(header) { 188 const packet = new BulkPacket(this); 189 packet.header = header; 190 this._outgoing.push(packet); 191 this._flushOutgoing(); 192 return packet.streamReadyForWriting; 193 } 194 195 /** 196 * Close the transport. 197 * 198 * @param reason nsresult / object (optional) 199 * The status code or error message that corresponds to the reason for 200 * closing the transport (likely because a stream closed or failed). 201 */ 202 close(reason) { 203 this.active = false; 204 this._input.close(); 205 this._scriptableInput.close(); 206 this._output.close(); 207 this._destroyIncoming(); 208 this._destroyAllOutgoing(); 209 if (this.hooks) { 210 this.hooks.onTransportClosed(reason); 211 this.hooks = null; 212 } 213 if (reason) { 214 dumpn("Transport closed: " + DevToolsUtils.safeErrorString(reason)); 215 } else { 216 dumpn("Transport closed."); 217 } 218 } 219 220 /** 221 * The currently outgoing packet (at the top of the queue). 222 */ 223 get _currentOutgoing() { 224 return this._outgoing[0]; 225 } 226 227 /** 228 * Flush data to the outgoing stream. Waits until the output stream notifies 229 * us that it is ready to be written to (via onOutputStreamReady). 230 */ 231 _flushOutgoing() { 232 if (!this._outgoingEnabled || this._outgoing.length === 0) { 233 return; 234 } 235 236 // If the top of the packet queue has nothing more to send, remove it. 237 if (this._currentOutgoing.done) { 238 this._finishCurrentOutgoing(); 239 } 240 241 if (this._outgoing.length) { 242 const threadManager = Cc["@mozilla.org/thread-manager;1"].getService(); 243 this._output.asyncWait(this, 0, 0, threadManager.currentThread); 244 } 245 } 246 247 /** 248 * Pause this transport's attempts to write to the output stream. This is 249 * used when we've temporarily handed off our output stream for writing bulk 250 * data. 251 */ 252 pauseOutgoing() { 253 this._outgoingEnabled = false; 254 } 255 256 /** 257 * Resume this transport's attempts to write to the output stream. 258 */ 259 resumeOutgoing() { 260 this._outgoingEnabled = true; 261 this._flushOutgoing(); 262 } 263 264 // nsIOutputStreamCallback 265 /** 266 * This is called when the output stream is ready for more data to be written. 267 * The current outgoing packet will attempt to write some amount of data, but 268 * may not complete. 269 */ 270 onOutputStreamReady = DevToolsUtils.makeInfallible(function (stream) { 271 if (!this._outgoingEnabled || this._outgoing.length === 0) { 272 return; 273 } 274 275 try { 276 this._currentOutgoing.write(stream); 277 } catch (e) { 278 if (e.result != Cr.NS_BASE_STREAM_WOULD_BLOCK) { 279 this.close(e.result); 280 return; 281 } 282 throw e; 283 } 284 285 this._flushOutgoing(); 286 }, "DebuggerTransport.prototype.onOutputStreamReady"); 287 288 /** 289 * Remove the current outgoing packet from the queue upon completion. 290 */ 291 _finishCurrentOutgoing() { 292 if (this._currentOutgoing) { 293 this._currentOutgoing.destroy(); 294 this._outgoing.shift(); 295 } 296 } 297 298 /** 299 * Clear the entire outgoing queue. 300 */ 301 _destroyAllOutgoing() { 302 for (const packet of this._outgoing) { 303 packet.destroy(); 304 } 305 this._outgoing = []; 306 } 307 308 /** 309 * Initialize the input stream for reading. Once this method has been called, 310 * we watch for packets on the input stream, and pass them to the appropriate 311 * handlers via this.hooks. 312 */ 313 ready() { 314 this.active = true; 315 this._waitForIncoming(); 316 } 317 318 /** 319 * Asks the input stream to notify us (via onInputStreamReady) when it is 320 * ready for reading. 321 */ 322 _waitForIncoming() { 323 if (this._incomingEnabled) { 324 const threadManager = Cc["@mozilla.org/thread-manager;1"].getService(); 325 this._input.asyncWait(this, 0, 0, threadManager.currentThread); 326 } 327 } 328 329 /** 330 * Pause this transport's attempts to read from the input stream. This is 331 * used when we've temporarily handed off our input stream for reading bulk 332 * data. 333 */ 334 pauseIncoming() { 335 this._incomingEnabled = false; 336 } 337 338 /** 339 * Resume this transport's attempts to read from the input stream. 340 */ 341 resumeIncoming() { 342 this._incomingEnabled = true; 343 this._flushIncoming(); 344 this._waitForIncoming(); 345 } 346 347 // nsIInputStreamCallback 348 /** 349 * Called when the stream is either readable or closed. 350 */ 351 onInputStreamReady = DevToolsUtils.makeInfallible(function (stream) { 352 try { 353 while ( 354 stream.available() && 355 this._incomingEnabled && 356 this._processIncoming(stream, stream.available()) 357 ) { 358 // Loop until there is nothing more to process 359 } 360 this._waitForIncoming(); 361 } catch (e) { 362 if (e.result != Cr.NS_BASE_STREAM_WOULD_BLOCK) { 363 this.close(e.result); 364 } else { 365 throw e; 366 } 367 } 368 }, "DebuggerTransport.prototype.onInputStreamReady"); 369 370 /** 371 * Process the incoming data. Will create a new currently incoming Packet if 372 * needed. Tells the incoming Packet to read as much data as it can, but 373 * reading may not complete. The Packet signals that its data is ready for 374 * delivery by calling one of this transport's _on*Ready methods (see 375 * ./packets.js and the _on*Ready methods below). 376 * 377 * @return {boolean} 378 * Whether incoming stream processing should continue for any 379 * remaining data. 380 */ 381 _processIncoming(stream, count) { 382 dumpv("Data available: " + count); 383 384 if (!count) { 385 dumpv("Nothing to read, skipping"); 386 return false; 387 } 388 389 try { 390 if (!this._incoming) { 391 dumpv("Creating a new packet from incoming"); 392 393 if (!this._readHeader(stream)) { 394 // Not enough data to read packet type 395 return false; 396 } 397 398 // Attempt to create a new Packet by trying to parse each possible 399 // header pattern. 400 this._incoming = Packet.fromHeader(this._incomingHeader, this); 401 if (!this._incoming) { 402 throw new Error( 403 "No packet types for header: " + this._incomingHeader 404 ); 405 } 406 } 407 408 if (!this._incoming.done) { 409 // We have an incomplete packet, keep reading it. 410 dumpv("Existing packet incomplete, keep reading"); 411 this._incoming.read(stream, this._scriptableInput); 412 } 413 } catch (e) { 414 const msg = 415 "Error reading incoming packet: (" + e + " - " + e.stack + ")"; 416 dumpn(msg); 417 418 // Now in an invalid state, shut down the transport. 419 this.close(); 420 return false; 421 } 422 423 if (!this._incoming.done) { 424 // Still not complete, we'll wait for more data. 425 dumpv("Packet not done, wait for more"); 426 return true; 427 } 428 429 // Ready for next packet 430 this._flushIncoming(); 431 return true; 432 } 433 434 /** 435 * Read as far as we can into the incoming data, attempting to build up a 436 * complete packet header (which terminates with ":"). We'll only read up to 437 * PACKET_HEADER_MAX characters. 438 * 439 * @return boolean 440 * True if we now have a complete header. 441 */ 442 _readHeader() { 443 const amountToRead = PACKET_HEADER_MAX - this._incomingHeader.length; 444 this._incomingHeader += StreamUtils.delimitedRead( 445 this._scriptableInput, 446 ":", 447 amountToRead 448 ); 449 if (flags.wantVerbose) { 450 dumpv("Header read: " + this._incomingHeader); 451 } 452 453 if (this._incomingHeader.endsWith(":")) { 454 if (flags.wantVerbose) { 455 dumpv("Found packet header successfully: " + this._incomingHeader); 456 } 457 return true; 458 } 459 460 if (this._incomingHeader.length >= PACKET_HEADER_MAX) { 461 throw new Error("Failed to parse packet header!"); 462 } 463 464 // Not enough data yet. 465 return false; 466 } 467 468 /** 469 * If the incoming packet is done, log it as needed and clear the buffer. 470 */ 471 _flushIncoming() { 472 if (!this._incoming.done) { 473 return; 474 } 475 if (flags.wantLogging) { 476 dumpn("Got: " + this._incoming); 477 } 478 this._destroyIncoming(); 479 } 480 481 /** 482 * Handler triggered by an incoming JSONPacket completing it's |read| method. 483 * Delivers the packet to this.hooks.onPacket. 484 */ 485 _onJSONObjectReady(object) { 486 DevToolsUtils.executeSoon( 487 DevToolsUtils.makeInfallible(() => { 488 // Ensure the transport is still alive by the time this runs. 489 if (this.active) { 490 this.hooks.onPacket(object); 491 } 492 }, "DebuggerTransport instance's this.hooks.onPacket") 493 ); 494 } 495 496 /** 497 * Handler triggered by an incoming BulkPacket entering the |read| phase for 498 * the stream portion of the packet. Delivers info about the incoming 499 * streaming data to this.hooks.onBulkPacket. See the main comment on the 500 * transport at the top of this file for more details. 501 */ 502 _onBulkReadReady(...args) { 503 DevToolsUtils.executeSoon( 504 DevToolsUtils.makeInfallible(() => { 505 // Ensure the transport is still alive by the time this runs. 506 if (this.active) { 507 this.hooks.onBulkPacket(...args); 508 } 509 }, "DebuggerTransport instance's this.hooks.onBulkPacket") 510 ); 511 } 512 513 /** 514 * Remove all handlers and references related to the current incoming packet, 515 * either because it is now complete or because the transport is closing. 516 */ 517 _destroyIncoming() { 518 if (this._incoming) { 519 this._incoming.destroy(); 520 } 521 this._incomingHeader = ""; 522 this._incoming = null; 523 } 524 } 525 526 exports.DebuggerTransport = DebuggerTransport;