FxAccountsCommands.sys.mjs (36047B)
1 /* This Source Code Form is subject to the terms of the Mozilla Public 2 * License, v. 2.0. If a copy of the MPL was not distributed with this 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4 5 import { 6 CLIENT_IS_THUNDERBIRD, 7 COMMAND_SENDTAB, 8 COMMAND_SENDTAB_TAIL, 9 COMMAND_CLOSETAB, 10 COMMAND_CLOSETAB_TAIL, 11 SCOPE_OLD_SYNC, 12 log, 13 } from "resource://gre/modules/FxAccountsCommon.sys.mjs"; 14 15 import { clearTimeout, setTimeout } from "resource://gre/modules/Timer.sys.mjs"; 16 17 import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs"; 18 19 import { Observers } from "resource://services-common/observers.sys.mjs"; 20 21 const lazy = {}; 22 23 ChromeUtils.defineESModuleGetters(lazy, { 24 AsyncShutdown: "resource://gre/modules/AsyncShutdown.sys.mjs", 25 BulkKeyBundle: "resource://services-sync/keys.sys.mjs", 26 CryptoWrapper: "resource://services-sync/record.sys.mjs", 27 PushCrypto: "resource://gre/modules/PushCrypto.sys.mjs", 28 getRemoteCommandStore: "resource://services-sync/TabsStore.sys.mjs", 29 RemoteCommand: "resource://services-sync/TabsStore.sys.mjs", 30 Resource: "resource://services-sync/resource.sys.mjs", 31 Utils: "resource://services-sync/util.sys.mjs", 32 NimbusFeatures: "resource://nimbus/ExperimentAPI.sys.mjs", 33 }); 34 35 XPCOMUtils.defineLazyPreferenceGetter( 36 lazy, 37 "INVALID_SHAREABLE_SCHEMES", 38 "services.sync.engine.tabs.filteredSchemes", 39 "", 40 null, 41 val => { 42 return new Set(val.split("|")); 43 } 44 ); 45 46 XPCOMUtils.defineLazyServiceGetter( 47 lazy, 48 "idleService", 49 "@mozilla.org/widget/useridleservice;1", 50 Ci.nsIUserIdleService 51 ); 52 53 const TOPIC_TABS_CHANGED = "services.sync.tabs.changed"; 54 const COMMAND_MAX_PAYLOAD_SIZE = 16 * 1024; 55 56 export class FxAccountsCommands { 57 constructor(fxAccountsInternal) { 58 this._fxai = fxAccountsInternal; 59 this.sendTab = new SendTab(this, fxAccountsInternal); 60 this.closeTab = new CloseRemoteTab(this, fxAccountsInternal); 61 this.commandQueue = new CommandQueue(this, fxAccountsInternal); 62 this._invokeRateLimitExpiry = 0; 63 } 64 65 async availableCommands() { 66 let commands = {}; 67 68 if (!CLIENT_IS_THUNDERBIRD) { 69 // Invalid keys usually means the account is not verified yet. 70 const encryptedSendTabKeys = await this.sendTab.getEncryptedCommandKeys(); 71 72 if (encryptedSendTabKeys) { 73 commands[COMMAND_SENDTAB] = encryptedSendTabKeys; 74 } 75 76 const encryptedCloseTabKeys = 77 await this.closeTab.getEncryptedCommandKeys(); 78 if (encryptedCloseTabKeys) { 79 commands[COMMAND_CLOSETAB] = encryptedCloseTabKeys; 80 } 81 } 82 83 return commands; 84 } 85 86 async invoke(command, device, payload) { 87 const { sessionToken } = await this._fxai.getUserAccountData([ 88 "sessionToken", 89 ]); 90 const client = this._fxai.fxAccountsClient; 91 const now = Date.now(); 92 if (now < this._invokeRateLimitExpiry) { 93 const remaining = (this._invokeRateLimitExpiry - now) / 1000; 94 throw new Error( 95 `Invoke for ${command} is rate-limited for ${remaining} seconds.` 96 ); 97 } 98 try { 99 let info = await client.invokeCommand( 100 sessionToken, 101 command, 102 device.id, 103 payload 104 ); 105 if (!info.enqueued || !info.notified) { 106 // We want an error log here to help diagnose users who report failure. 107 log.error("Sending was only partially successful", info); 108 } else { 109 log.info("Successfully sent", info); 110 } 111 } catch (err) { 112 if (err.code && err.code === 429 && err.retryAfter) { 113 this._invokeRateLimitExpiry = Date.now() + err.retryAfter * 1000; 114 } 115 throw err; 116 } 117 log.info(`Payload sent to device ${device.id}.`); 118 } 119 120 /** 121 * Poll and handle device commands for the current device. 122 * This method can be called either in response to a Push message, 123 * or by itself as a "commands recovery" mechanism. 124 * 125 * @param {number} notifiedIndex "Command received" push messages include 126 * the index of the command that triggered the message. We use it as a 127 * hint when we have no "last command index" stored. 128 */ 129 async pollDeviceCommands(notifiedIndex = 0) { 130 // Whether the call to `pollDeviceCommands` was initiated by a Push message from the FxA 131 // servers in response to a message being received or simply scheduled in order 132 // to fetch missed messages. 133 log.info(`Polling device commands.`); 134 await this._fxai.withCurrentAccountState(async state => { 135 const { device } = await state.getUserAccountData(["device"]); 136 if (!device) { 137 throw new Error("No device registration."); 138 } 139 // We increment lastCommandIndex by 1 because the server response includes the current index. 140 // If we don't have a `lastCommandIndex` stored, we fall back on the index from the push message we just got. 141 const lastCommandIndex = device.lastCommandIndex + 1 || notifiedIndex; 142 // We have already received this message before. 143 if (notifiedIndex > 0 && notifiedIndex < lastCommandIndex) { 144 return; 145 } 146 const { index, messages } = 147 await this._fetchDeviceCommands(lastCommandIndex); 148 if (messages.length) { 149 await state.updateUserAccountData({ 150 device: { ...device, lastCommandIndex: index }, 151 }); 152 log.info(`Handling ${messages.length} messages`); 153 await this._handleCommands(messages, notifiedIndex); 154 } 155 }); 156 return true; 157 } 158 159 async _fetchDeviceCommands(index, limit = null) { 160 const userData = await this._fxai.getUserAccountData(); 161 if (!userData) { 162 throw new Error("No user."); 163 } 164 const { sessionToken } = userData; 165 if (!sessionToken) { 166 throw new Error("No session token."); 167 } 168 const client = this._fxai.fxAccountsClient; 169 const opts = { index }; 170 if (limit != null) { 171 opts.limit = limit; 172 } 173 return client.getCommands(sessionToken, opts); 174 } 175 176 _getReason(notifiedIndex, messageIndex) { 177 // The returned reason value represents an explanation for why the command associated with the 178 // message of the given `messageIndex` is being handled. If `notifiedIndex` is zero the command 179 // is a part of a fallback polling process initiated by "Sync Now" ["poll"]. If `notifiedIndex` is 180 // greater than `messageIndex` this is a push command that was previously missed ["push-missed"], 181 // otherwise we assume this is a push command with no missed messages ["push"]. 182 if (notifiedIndex == 0) { 183 return "poll"; 184 } else if (notifiedIndex > messageIndex) { 185 return "push-missed"; 186 } 187 // Note: The returned reason might be "push" in the case where a user sends multiple tabs 188 // in quick succession. We are not attempting to distinguish this from other push cases at 189 // present. 190 return "push"; 191 } 192 193 async _handleCommands(messages, notifiedIndex) { 194 try { 195 await this._fxai.device.refreshDeviceList(); 196 } catch (e) { 197 log.warn("Error refreshing device list", e); 198 } 199 // We debounce multiple incoming tabs so we show a single notification. 200 const tabsReceived = []; 201 const tabsToClose = []; 202 for (const { index, data } of messages) { 203 const { command, payload, sender: senderId } = data; 204 const reason = this._getReason(notifiedIndex, index); 205 const sender = 206 senderId && this._fxai.device.recentDeviceList 207 ? this._fxai.device.recentDeviceList.find(d => d.id == senderId) 208 : null; 209 if (!sender) { 210 log.warn( 211 "Incoming command is from an unknown device (maybe disconnected?)" 212 ); 213 } 214 switch (command) { 215 case COMMAND_CLOSETAB: 216 try { 217 const { urls } = await this.closeTab.handleTabClose( 218 senderId, 219 payload, 220 reason 221 ); 222 log.info( 223 `Close Tab received with FxA commands: "${urls.length} tabs" 224 from ${sender ? sender.name : "Unknown device"}.` 225 ); 226 // URLs are PII, so only logged at trace. 227 log.trace(`Close Remote Tabs received URLs: ${urls}`); 228 tabsToClose.push({ urls, sender }); 229 } catch (e) { 230 log.error(`Error while handling incoming Close Tab payload.`, e); 231 } 232 break; 233 case COMMAND_SENDTAB: 234 try { 235 const { title, uri } = await this.sendTab.handle( 236 senderId, 237 payload, 238 reason 239 ); 240 log.info( 241 `Tab received with FxA commands: "${ 242 title || "<no title>" 243 }" from ${sender ? sender.name : "Unknown device"}.` 244 ); 245 // URLs are PII, so only logged at trace. 246 log.trace(`Tab received URL: ${uri}`); 247 // This should eventually be rare to hit as all platforms will be using the same 248 // scheme filter list, but we have this here in the case other platforms 249 // haven't caught up and/or trying to send invalid uris using older versions 250 const scheme = Services.io.newURI(uri).scheme; 251 if (lazy.INVALID_SHAREABLE_SCHEMES.has(scheme)) { 252 throw new Error("Invalid scheme found for received URI."); 253 } 254 tabsReceived.push({ title, uri, sender }); 255 } catch (e) { 256 log.error(`Error while handling incoming Send Tab payload.`, e); 257 } 258 break; 259 default: 260 log.info(`Unknown command: ${command}.`); 261 } 262 } 263 if (tabsReceived.length) { 264 this._notifyFxATabsReceived(tabsReceived); 265 } 266 if (tabsToClose.length) { 267 this._notifyFxATabsClosed(tabsToClose); 268 } 269 } 270 271 _notifyFxATabsReceived(tabsReceived) { 272 Observers.notify("fxaccounts:commands:open-uri", tabsReceived); 273 } 274 275 _notifyFxATabsClosed(tabsToClose) { 276 Observers.notify("fxaccounts:commands:close-uri", tabsToClose); 277 } 278 } 279 280 /** 281 * This is built on top of FxA commands. 282 * 283 * Devices exchange keys wrapped in the oldsync key between themselves (getEncryptedCommandKeys) 284 * during the device registration flow. The FxA server can theoretically never 285 * retrieve the send tab keys since it doesn't know the oldsync key. 286 * 287 * Note about the keys: 288 * The server has the `pushPublicKey`. The FxA server encrypt the send-tab payload again using the 289 * push keys - after the client has encrypted the payload using the send-tab keys. 290 * The push keys are different from the send-tab keys. The FxA server uses 291 * the push keys to deliver the tabs using same mechanism we use for web-push. 292 * However, clients use the send-tab keys for end-to-end encryption. 293 * 294 * Every command uses the same key management code, although each has its own key. 295 */ 296 297 export class Command { 298 constructor(commands, fxAccountsInternal) { 299 this._commands = commands; 300 this._fxai = fxAccountsInternal; 301 } 302 303 // Must be set by the command. 304 deviceCapability; // eg, COMMAND_SENDTAB; 305 keyFieldName; // eg, "sendTabKeys"; 306 encryptedKeyFieldName; // eg, "encryptedSendTabKeys" 307 308 // Returns true if the target device is compatible with FxA Commands Send tab. 309 isDeviceCompatible(device) { 310 return ( 311 device.availableCommands && 312 device.availableCommands[this.deviceCapability] 313 ); 314 } 315 316 async _encrypt(bytes, device) { 317 let bundle = device.availableCommands[this.deviceCapability]; 318 if (!bundle) { 319 throw new Error(`Device ${device.id} does not have send tab keys.`); 320 } 321 const oldsyncKey = await this._fxai.keys.getKeyForScope(SCOPE_OLD_SYNC); 322 // Older clients expect this to be hex, due to pre-JWK sync key ids :-( 323 const ourKid = this._fxai.keys.kidAsHex(oldsyncKey); 324 const { kid: theirKid } = JSON.parse( 325 device.availableCommands[this.deviceCapability] 326 ); 327 if (theirKid != ourKid) { 328 throw new Error("Target Send Tab key ID is different from ours"); 329 } 330 const json = JSON.parse(bundle); 331 const wrapper = new lazy.CryptoWrapper(); 332 wrapper.deserialize({ payload: json }); 333 const syncKeyBundle = lazy.BulkKeyBundle.fromJWK(oldsyncKey); 334 let { publicKey, authSecret } = await wrapper.decrypt(syncKeyBundle); 335 authSecret = urlsafeBase64Decode(authSecret); 336 publicKey = urlsafeBase64Decode(publicKey); 337 338 const { ciphertext: encrypted } = await lazy.PushCrypto.encrypt( 339 bytes, 340 publicKey, 341 authSecret 342 ); 343 return urlsafeBase64Encode(encrypted); 344 } 345 346 async _decrypt(ciphertext) { 347 let { privateKey, publicKey, authSecret } = 348 await this._getPersistedCommandKeys(); 349 publicKey = urlsafeBase64Decode(publicKey); 350 authSecret = urlsafeBase64Decode(authSecret); 351 ciphertext = new Uint8Array(urlsafeBase64Decode(ciphertext)); 352 return lazy.PushCrypto.decrypt( 353 privateKey, 354 publicKey, 355 authSecret, 356 // The only Push encoding we support. 357 { encoding: "aes128gcm" }, 358 ciphertext 359 ); 360 } 361 362 async _getPersistedCommandKeys() { 363 const { device } = await this._fxai.getUserAccountData(["device"]); 364 return device && device[this.keyFieldName]; 365 } 366 367 async _generateAndPersistCommandKeys() { 368 let [publicKey, privateKey] = await lazy.PushCrypto.generateKeys(); 369 publicKey = urlsafeBase64Encode(publicKey); 370 let authSecret = lazy.PushCrypto.generateAuthenticationSecret(); 371 authSecret = urlsafeBase64Encode(authSecret); 372 const sendTabKeys = { 373 publicKey, 374 privateKey, 375 authSecret, 376 }; 377 await this._fxai.withCurrentAccountState(async state => { 378 const { device = {} } = await state.getUserAccountData(["device"]); 379 device[this.keyFieldName] = sendTabKeys; 380 log.trace( 381 `writing to ${this.keyFieldName} for command ${this.deviceCapability}` 382 ); 383 await state.updateUserAccountData({ 384 device, 385 }); 386 }); 387 return sendTabKeys; 388 } 389 390 async _getPersistedEncryptedCommandKey() { 391 const data = await this._fxai.getUserAccountData([ 392 this.encryptedKeyFieldName, 393 ]); 394 return data[this.encryptedKeyFieldName]; 395 } 396 397 async _generateAndPersistEncryptedCommandKey() { 398 if (!(await this._fxai.keys.canGetKeyForScope(SCOPE_OLD_SYNC))) { 399 log.info("Can't fetch keys, so unable to determine command keys"); 400 return null; 401 } 402 let sendTabKeys = await this._getPersistedCommandKeys(); 403 if (!sendTabKeys) { 404 log.info("Could not find command keys, generating them"); 405 sendTabKeys = await this._generateAndPersistCommandKeys(); 406 } 407 // Strip the private key from the bundle to encrypt. 408 const keyToEncrypt = { 409 publicKey: sendTabKeys.publicKey, 410 authSecret: sendTabKeys.authSecret, 411 }; 412 let oldsyncKey; 413 try { 414 oldsyncKey = await this._fxai.keys.getKeyForScope(SCOPE_OLD_SYNC); 415 } catch (ex) { 416 log.warn("Failed to fetch keys, so unable to determine command keys", ex); 417 return null; 418 } 419 const wrapper = new lazy.CryptoWrapper(); 420 wrapper.cleartext = keyToEncrypt; 421 const keyBundle = lazy.BulkKeyBundle.fromJWK(oldsyncKey); 422 await wrapper.encrypt(keyBundle); 423 const encryptedSendTabKeys = JSON.stringify({ 424 // This is expected in hex, due to pre-JWK sync key ids :-( 425 kid: this._fxai.keys.kidAsHex(oldsyncKey), 426 IV: wrapper.IV, 427 hmac: wrapper.hmac, 428 ciphertext: wrapper.ciphertext, 429 }); 430 await this._fxai.withCurrentAccountState(async state => { 431 let data = {}; 432 data[this.encryptedKeyFieldName] = encryptedSendTabKeys; 433 await state.updateUserAccountData(data); 434 }); 435 return encryptedSendTabKeys; 436 } 437 438 async getEncryptedCommandKeys() { 439 log.trace("Getting command keys", this.deviceCapability); 440 let encryptedSendTabKeys = await this._getPersistedEncryptedCommandKey(); 441 const sendTabKeys = await this._getPersistedCommandKeys(); 442 if (!encryptedSendTabKeys || !sendTabKeys) { 443 log.info( 444 `Generating and persisting encrypted key (${!!encryptedSendTabKeys}, ${!!sendTabKeys})` 445 ); 446 // Generating the encrypted key requires the sync key so we expect to fail 447 // in some cases (primary password is locked, account not verified, etc) 448 // However, we will eventually end up generating it when we can, and device registration 449 // will handle this late update and update the remote record as necessary, so it gets there in the end. 450 // It's okay to persist these keys in plain text; they're encrypted. 451 encryptedSendTabKeys = 452 await this._generateAndPersistEncryptedCommandKey(); 453 } 454 return encryptedSendTabKeys; 455 } 456 } 457 458 /** 459 * Send Tab 460 */ 461 export class SendTab extends Command { 462 deviceCapability = COMMAND_SENDTAB; 463 keyFieldName = "sendTabKeys"; 464 encryptedKeyFieldName = "encryptedSendTabKeys"; 465 466 /** 467 * @param {Device[]} to - Device objects (typically returned by fxAccounts.getDevicesList()). 468 * @param {object} tab 469 * @param {string} tab.url 470 * @param {string} tab.title 471 * @returns A report object, in the shape of 472 * {succeded: [Device], error: [{device: Device, error: Exception}]} 473 */ 474 async send(to, tab) { 475 log.info(`Sending a tab to ${to.length} devices.`); 476 const flowID = this._fxai.telemetry.generateFlowID(); 477 const encoder = new TextEncoder(); 478 const data = { entries: [{ title: tab.title, url: tab.url }] }; 479 const report = { 480 succeeded: [], 481 failed: [], 482 }; 483 for (let device of to) { 484 try { 485 const streamID = this._fxai.telemetry.generateFlowID(); 486 const targetData = Object.assign({ flowID, streamID }, data); 487 const bytes = encoder.encode(JSON.stringify(targetData)); 488 const encrypted = await this._encrypt(bytes, device); 489 // FxA expects an object as the payload, but we only have a single encrypted string; wrap it. 490 // If you add any plaintext items to this payload, please carefully consider the privacy implications 491 // of revealing that data to the FxA server. 492 const payload = { encrypted }; 493 await this._commands.invoke(COMMAND_SENDTAB, device, payload); 494 const deviceId = this._fxai.telemetry.sanitizeDeviceId(device.id); 495 Glean.fxa.sendtabSent.record({ 496 flow_id: flowID, 497 hashed_device_id: deviceId, 498 server_time: lazy.Resource.serverTime, 499 stream_id: streamID, 500 }); 501 this._fxai.telemetry.recordEvent( 502 "command-sent", 503 COMMAND_SENDTAB_TAIL, 504 deviceId, 505 { flowID, streamID } 506 ); 507 report.succeeded.push(device); 508 } catch (error) { 509 log.error("Error while invoking a send tab command.", error); 510 report.failed.push({ device, error }); 511 } 512 } 513 return report; 514 } 515 516 // Handle incoming send tab payload, called by FxAccountsCommands. 517 async handle(senderID, { encrypted }, reason) { 518 const bytes = await this._decrypt(encrypted); 519 const decoder = new TextDecoder("utf8"); 520 const data = JSON.parse(decoder.decode(bytes)); 521 const { flowID, streamID, entries } = data; 522 const current = data.hasOwnProperty("current") 523 ? data.current 524 : entries.length - 1; 525 const { title, url: uri } = entries[current]; 526 // `flowID` and `streamID` are in the top-level of the JSON, `entries` is 527 // an array of "tabs" with `current` being what index is the one we care 528 // about, or the last one if not specified. 529 const deviceId = this._fxai.telemetry.sanitizeDeviceId(senderID); 530 Glean.fxa.sendtabReceived.record({ 531 flow_id: flowID, 532 hashed_device_id: deviceId, 533 reason, 534 server_time: lazy.Resource.serverTime, 535 stream_id: streamID, 536 }); 537 this._fxai.telemetry.recordEvent( 538 "command-received", 539 COMMAND_SENDTAB_TAIL, 540 deviceId, 541 { flowID, streamID, reason } 542 ); 543 544 return { 545 title, 546 uri, 547 }; 548 } 549 } 550 551 /** 552 * Close Tabs 553 */ 554 export class CloseRemoteTab extends Command { 555 deviceCapability = COMMAND_CLOSETAB; 556 keyFieldName = "closeTabKeys"; 557 encryptedKeyFieldName = "encryptedCloseTabKeys"; 558 559 /** 560 * @param {Device} target - Device object (typically returned by fxAccounts.getDevicesList()). 561 * @param {string[]} urls - array of urls that should be closed on the remote device 562 */ 563 async sendCloseTabsCommand(target, urls, flowID) { 564 log.info(`Sending tab closures to ${target.id} device.`); 565 const encoder = new TextEncoder(); 566 try { 567 const streamID = this._fxai.telemetry.generateFlowID(); 568 const targetData = { flowID, streamID, urls }; 569 const bytes = encoder.encode(JSON.stringify(targetData)); 570 const encrypted = await this._encrypt(bytes, target); 571 // FxA expects an object as the payload, but we only have a single encrypted string; wrap it. 572 // If you add any plaintext items to this payload, please carefully consider the privacy implications 573 // of revealing that data to the FxA server. 574 const payload = { encrypted }; 575 await this._commands.invoke(COMMAND_CLOSETAB, target, payload); 576 const deviceId = this._fxai.telemetry.sanitizeDeviceId(target.id); 577 Glean.fxa.closetabSent.record({ 578 flow_id: flowID, 579 hashed_device_id: deviceId, 580 server_time: lazy.Resource.serverTime, 581 stream_id: streamID, 582 }); 583 this._fxai.telemetry.recordEvent( 584 "command-sent", 585 COMMAND_CLOSETAB_TAIL, 586 deviceId, 587 { flowID, streamID } 588 ); 589 return true; 590 } catch (error) { 591 // We should also show the user there was some kind've error 592 log.error("Error while invoking a send tab command.", error); 593 return false; 594 } 595 } 596 597 // Returns true if: 598 // - The target device is compatible with closing a tab (device capability) and 599 // - The local device has the feature enabled locally 600 isDeviceCompatible(device) { 601 return ( 602 lazy.NimbusFeatures.remoteTabManagement.getVariable("closeTabsEnabled") && 603 super.isDeviceCompatible(device) 604 ); 605 } 606 607 // Handle incoming remote tab payload, called by FxAccountsCommands. 608 async handleTabClose(senderID, { encrypted }, reason) { 609 const bytes = await this._decrypt(encrypted); 610 const decoder = new TextDecoder("utf8"); 611 const data = JSON.parse(decoder.decode(bytes)); 612 // urls is an array of strings 613 const { flowID, streamID, urls } = data; 614 const deviceId = this._fxai.telemetry.sanitizeDeviceId(senderID); 615 Glean.fxa.closetabReceived.record({ 616 flow_id: flowID, 617 hashed_device_id: deviceId, 618 reason, 619 server_time: lazy.Resource.serverTime, 620 stream_id: streamID, 621 }); 622 this._fxai.telemetry.recordEvent( 623 "command-received", 624 COMMAND_CLOSETAB_TAIL, 625 deviceId, 626 { flowID, streamID, reason } 627 ); 628 629 return { 630 urls, 631 }; 632 } 633 } 634 635 export class CommandQueue { 636 // The delay between a command being queued and it being actioned. This delay 637 // is primarily to support "undo" functionality in the UI. 638 // It's likely we will end up needing a different delay per command (including no delay), but this 639 // seems fine while we work that out. 640 DELAY = 5000; 641 642 // The timer ID if we have one scheduled, otherwise null 643 #timer = null; 644 645 // `this.#onShutdown` bound to `this`. 646 #onShutdownBound = null; 647 648 // Since we only ever show one notification to the user 649 // we keep track of how many tabs have actually been closed 650 // and update the count, user dismissing the notification will 651 // reset the count 652 closeTabNotificationCount = 0; 653 hasPendingCloseTabNotification = false; 654 655 // We ensure the queue is flushed soon after startup. After the first tab sync we see, we 656 // wait for this many seconds of being idle before checking. 657 // Note that this delay has nothing to do with DELAY - that is for "undo" capability, this 658 // delay is to ensure we don't put unnecessary load on the browser during startup. 659 #idleThresholdSeconds = 3; 660 #isObservingTabSyncs = false; 661 // This helps ensure we aren't flushing the queue multiple times concurrently. 662 #flushQueuePromise = null; 663 664 constructor(commands, fxAccountsInternal) { 665 this._commands = commands; 666 this._fxai = fxAccountsInternal; 667 Services.obs.addObserver(this, "services.sync.tabs.command-queued"); 668 Services.obs.addObserver(this, "weave:engine:sync:finish"); 669 this.#isObservingTabSyncs = true; 670 log.trace("Command queue observer created"); 671 this.#onShutdownBound = this.#onShutdown.bind(this); 672 lazy.AsyncShutdown.appShutdownConfirmed.addBlocker( 673 "FxAccountsCommands: flush command queue", 674 this.#onShutdownBound 675 ); 676 } 677 678 // Used for tests - when in the browser this object lives forever. 679 shutdown() { 680 if (this.#timer) { 681 clearTimeout(this.#timer); 682 } 683 Services.obs.removeObserver(this, "services.sync.tabs.command-queued"); 684 if (this.#isObservingTabSyncs) { 685 Services.obs.removeObserver(this, "weave:engine:sync:finish"); 686 this.#isObservingTabSyncs = false; 687 } 688 lazy.AsyncShutdown.appShutdownConfirmed.removeBlocker( 689 this.#onShutdownBound 690 ); 691 this.#onShutdownBound = null; 692 } 693 694 observe(subject, topic, data) { 695 log.trace( 696 `CommandQueue observed topic=${topic}, data=${data}, subject=${subject}` 697 ); 698 switch (topic) { 699 case "services.sync.tabs.command-queued": 700 this.flushQueue().catch(e => { 701 log.error("Failed to flush the outgoing queue", e); 702 }); 703 break; 704 705 case "weave:engine:sync:finish": 706 // This is to pick up pending commands we failed to send in the last session. 707 if (data != "tabs") { 708 return; 709 } 710 Services.obs.removeObserver(this, "weave:engine:sync:finish"); 711 this.#isObservingTabSyncs = false; 712 this.#checkQueueAfterStartup(); 713 break; 714 715 default: 716 log.error(`unexpected observer topic: ${topic}`); 717 } 718 } 719 720 // for test mocking. 721 _getIdleService() { 722 return lazy.idleService; 723 } 724 725 async #checkQueueAfterStartup() { 726 // do this on idle because we are probably syncing quite close to startup. 727 const idleService = this._getIdleService(); 728 const idleObserver = (/* subject, topic, data */) => { 729 idleService.removeIdleObserver(idleObserver, this.#idleThresholdSeconds); 730 log.info("checking if the command queue is empty now we are idle"); 731 this.flushQueue() 732 .then(didSend => { 733 // TODO: it would be good to get telemetry here, because we expect this to be true rarely. 734 log.info( 735 `pending command check had ${didSend ? "some" : "no"} commands` 736 ); 737 }) 738 .catch(err => { 739 log.error( 740 "Checking for pending tab commands after first tab sync failed", 741 err 742 ); 743 }); 744 }; 745 idleService.addIdleObserver(idleObserver, this.#idleThresholdSeconds); 746 } 747 748 async flushQueue(isForShutdown = false) { 749 // We really don't want multiple queue flushes concurrently, which is a real possibility. 750 // If we are shutting down and there's already a `flushQueue()` running, it's almost certainly 751 // not going to be `isForShutdown()`. We don't really want to wait for that to complete just 752 // to start another, so there's a risk we will fail to send commands in that scenario - but 753 // we will send them at startup time. 754 if (this.#flushQueuePromise == null) { 755 this.#flushQueuePromise = this.#flushQueue(isForShutdown); 756 } 757 try { 758 return await this.#flushQueuePromise; 759 } finally { 760 this.#flushQueuePromise = null; 761 } 762 } 763 764 async #flushQueue(isForShutdown) { 765 // get all the queued items to work out what's ready to send. If a device has queued item less than 766 // our pushDelay, then we don't send *any* command for that device yet, but ensure a timer is set 767 // for the delay. 768 let store = await lazy.getRemoteCommandStore(); 769 let pending = await store.getUnsentCommands(); 770 log.trace("flushQueue total queued items", pending.length); 771 // any timeRequested less than `sendThreshold` should be sent now (unless we are shutting down, 772 // in which case we send everything now) 773 let now = this.now(); 774 // We want to be efficient with batching commands to send to the user 775 // so we categorize things into 3 buckets: 776 // mustSend - overdue and should be sent as early as we can 777 // canSend - is due but not yet "overdue", should be sent if possible 778 // early - can still be undone and should not be sent yet 779 const mustSendThreshold = isForShutdown ? Infinity : now - this.DELAY; 780 const canSendThreshold = isForShutdown ? Infinity : now - this.DELAY * 2; 781 // make a map of deviceId -> device 782 let recentDevices = this._fxai.device.recentDeviceList; 783 if (!recentDevices.length) { 784 // If we can't map a device ID to the device with the keys etc, we are screwed! 785 log.error("No devices available for queued tab commands"); 786 return false; 787 } 788 let deviceMap = new Map(recentDevices.map(d => [d.id, d])); 789 // make a map of commands keyed by device ID. 790 let byDevice = Map.groupBy(pending, c => c.deviceId); 791 let nextTime = Infinity; 792 let didSend = false; 793 for (let [deviceId, commands] of byDevice) { 794 let device = deviceMap.get(deviceId); 795 if (!device) { 796 // If we can't map *this* device ID to a device with the keys etc, we are screwed! 797 // This however *is* possible if the target device was disconnected before we had a chance to send it, 798 // so remove this item. 799 log.warn("Unknown device for queued tab commands", deviceId); 800 await Promise.all( 801 commands.map(command => store.removeRemoteCommand(deviceId, command)) 802 ); 803 continue; 804 } 805 let toSend = []; 806 // We process in reverse order so it's newest-to-oldest 807 // which means if the newest is already a "must send" 808 // we can simply send all of the "can sends" 809 for (const command of commands.reverse()) { 810 if (!(command.command instanceof lazy.RemoteCommand.CloseTab)) { 811 log.error(`Ignoring unknown pending command ${command}`); 812 continue; 813 } 814 if (command.timeRequested <= mustSendThreshold) { 815 log.trace( 816 `command for url ${command.command.url} is overdue, adding to send` 817 ); 818 toSend.push(command); 819 } else if (command.timeRequested <= canSendThreshold) { 820 if (toSend.length) { 821 log.trace(`command for url ${command.command.url} is due, 822 since there others to be sent, also adding to send`); 823 toSend.push(command); 824 } else { 825 // Though it's due, since there are no others we can check again 826 // and see if we can batch 827 nextTime = Math.min(nextTime, command.timeRequested + this.DELAY); 828 } 829 } else { 830 // We set the next timer just a little later to ensure we'll have an overdue 831 nextTime = Math.min( 832 nextTime, 833 command.timeRequested + this.DELAY * 1.1 834 ); 835 // Since the list is sorted newest to oldest, 836 // we can assume the rest are not ready 837 break; 838 } 839 } 840 841 if (toSend.length) { 842 let urlsToClose = toSend.map(c => c.command.url); 843 // Generate a flowID to use for all chunked commands 844 const flowID = this._fxai.telemetry.generateFlowID(); 845 // If we're dealing with large sets of urls, we should split them across 846 // multiple payloads to prevent breaking the issues for the user 847 let chunks = this.chunkUrls(urlsToClose, COMMAND_MAX_PAYLOAD_SIZE); 848 for (let chunk of chunks) { 849 if ( 850 await this._commands.closeTab.sendCloseTabsCommand( 851 device, 852 chunk, 853 flowID 854 ) 855 ) { 856 // We build a set from the sent urls for faster comparing 857 const urlChunkSet = new Set(chunk); 858 // success! Mark them as sent. 859 for (let cmd of toSend.filter(c => 860 urlChunkSet.has(c.command.url) 861 )) { 862 log.trace( 863 `Setting pending command for device ${deviceId} as sent`, 864 cmd 865 ); 866 await store.setPendingCommandSent(cmd); 867 didSend = true; 868 } 869 } else { 870 // We should investigate a better backoff strategy 871 // https://bugzilla.mozilla.org/show_bug.cgi?id=1899433 872 // For now just say 60s. 873 log.warn( 874 `Failed to send close tab commands for device ${deviceId}` 875 ); 876 nextTime = Math.min(nextTime, now + 60000); 877 } 878 } 879 } else { 880 log.trace(`Skipping send for device ${deviceId}`); 881 } 882 } 883 884 if (didSend) { 885 Services.obs.notifyObservers(null, TOPIC_TABS_CHANGED); 886 } 887 888 if (nextTime == Infinity) { 889 log.info("No new close-tab timer needed"); 890 } else if (isForShutdown) { 891 // because we never delay sending in this case the logic above should never set `nextTime` 892 log.error( 893 "logic error in command queue manager: flush for shutdown should never set a timer" 894 ); 895 } else { 896 let delay = nextTime - now + 10; 897 log.trace(`Setting new close-tab timer for ${delay}ms`); 898 this._ensureTimer(delay); 899 } 900 return didSend; 901 } 902 903 // Take a an array of urls and a max size and split them into chunks 904 // that are smaller than the passed in max size 905 // Note: This method modifies the passed in array 906 chunkUrls(urls, maxSize) { 907 let chunks = []; 908 909 // For optimal packing, we sort the array of urls from shortest-to-longest 910 urls.sort((a, b) => a.length - b.length); 911 912 while (urls.length) { 913 let chunk = lazy.Utils.tryFitItems(urls, maxSize); 914 if (!chunk.length) { 915 // None of the remaining URLs can fit into a single command 916 urls.forEach(url => { 917 log.warn(`Skipping oversized URL: ${url}`); 918 }); 919 break; 920 } 921 chunks.push(chunk); 922 // Remove the processed URLs from the list 923 urls.splice(0, chunk.length); 924 } 925 return chunks; 926 } 927 928 async _ensureTimer(timeout) { 929 log.info( 930 `Setting a new close-tab timer with delay=${timeout} with existing timer=${!!this 931 .#timer}` 932 ); 933 934 if (this.#timer) { 935 clearTimeout(this.#timer); 936 } 937 938 // If the browser shuts down while a timer exists we should force the send 939 // While we should pick up the command after a restart, we don't know 940 // how long that will be. 941 // See https://bugzilla.mozilla.org/show_bug.cgi?id=1888299 942 this.#timer = setTimeout(async () => { 943 // XXX - this might be racey - if a new timer fires before this promise resolves - it 944 // might seem unlikely, but network is involved! 945 // flushQueue might create another timer, so we must clear our current timer first. 946 this.#timer = null; 947 await this.flushQueue(); 948 }, timeout); 949 } 950 951 // On shutdown we want to send any pending items - ie, pretend the timer fired *now*. 952 // Sadly it's not easy for us to abort any in-flight requests, nor to limit the amount of 953 // time any new requests we create take, so we don't do this for now. This means that in 954 // the case of a super slow network or super slow FxA, we might crash at shutdown, but we 955 // can think of doing this in a followup. 956 async #onShutdown() { 957 // If there is no timer set, then there's nothing pending to do. 958 log.debug( 959 `CommandQueue shutdown is flushing the queue with a timer=${!!this 960 .#timer}` 961 ); 962 if (this.#timer) { 963 // We don't want the current one to fire at the same time! 964 clearTimeout(this.#timer); 965 this.#timer = null; 966 await this.flushQueue(true); 967 } 968 } 969 970 // hook points for tests. 971 now() { 972 return Date.now(); 973 } 974 } 975 976 function urlsafeBase64Encode(buffer) { 977 return ChromeUtils.base64URLEncode(new Uint8Array(buffer), { pad: false }); 978 } 979 980 function urlsafeBase64Decode(str) { 981 return ChromeUtils.base64URLDecode(str, { padding: "reject" }); 982 }