tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

FxAccountsCommands.sys.mjs (36047B)


      1 /* This Source Code Form is subject to the terms of the Mozilla Public
      2 * License, v. 2.0. If a copy of the MPL was not distributed with this
      3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      4 
      5 import {
      6  CLIENT_IS_THUNDERBIRD,
      7  COMMAND_SENDTAB,
      8  COMMAND_SENDTAB_TAIL,
      9  COMMAND_CLOSETAB,
     10  COMMAND_CLOSETAB_TAIL,
     11  SCOPE_OLD_SYNC,
     12  log,
     13 } from "resource://gre/modules/FxAccountsCommon.sys.mjs";
     14 
     15 import { clearTimeout, setTimeout } from "resource://gre/modules/Timer.sys.mjs";
     16 
     17 import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs";
     18 
     19 import { Observers } from "resource://services-common/observers.sys.mjs";
     20 
     21 const lazy = {};
     22 
     23 ChromeUtils.defineESModuleGetters(lazy, {
     24  AsyncShutdown: "resource://gre/modules/AsyncShutdown.sys.mjs",
     25  BulkKeyBundle: "resource://services-sync/keys.sys.mjs",
     26  CryptoWrapper: "resource://services-sync/record.sys.mjs",
     27  PushCrypto: "resource://gre/modules/PushCrypto.sys.mjs",
     28  getRemoteCommandStore: "resource://services-sync/TabsStore.sys.mjs",
     29  RemoteCommand: "resource://services-sync/TabsStore.sys.mjs",
     30  Resource: "resource://services-sync/resource.sys.mjs",
     31  Utils: "resource://services-sync/util.sys.mjs",
     32  NimbusFeatures: "resource://nimbus/ExperimentAPI.sys.mjs",
     33 });
     34 
     35 XPCOMUtils.defineLazyPreferenceGetter(
     36  lazy,
     37  "INVALID_SHAREABLE_SCHEMES",
     38  "services.sync.engine.tabs.filteredSchemes",
     39  "",
     40  null,
     41  val => {
     42    return new Set(val.split("|"));
     43  }
     44 );
     45 
     46 XPCOMUtils.defineLazyServiceGetter(
     47  lazy,
     48  "idleService",
     49  "@mozilla.org/widget/useridleservice;1",
     50  Ci.nsIUserIdleService
     51 );
     52 
     53 const TOPIC_TABS_CHANGED = "services.sync.tabs.changed";
     54 const COMMAND_MAX_PAYLOAD_SIZE = 16 * 1024;
     55 
     56 export class FxAccountsCommands {
     57  constructor(fxAccountsInternal) {
     58    this._fxai = fxAccountsInternal;
     59    this.sendTab = new SendTab(this, fxAccountsInternal);
     60    this.closeTab = new CloseRemoteTab(this, fxAccountsInternal);
     61    this.commandQueue = new CommandQueue(this, fxAccountsInternal);
     62    this._invokeRateLimitExpiry = 0;
     63  }
     64 
     65  async availableCommands() {
     66    let commands = {};
     67 
     68    if (!CLIENT_IS_THUNDERBIRD) {
     69      // Invalid keys usually means the account is not verified yet.
     70      const encryptedSendTabKeys = await this.sendTab.getEncryptedCommandKeys();
     71 
     72      if (encryptedSendTabKeys) {
     73        commands[COMMAND_SENDTAB] = encryptedSendTabKeys;
     74      }
     75 
     76      const encryptedCloseTabKeys =
     77        await this.closeTab.getEncryptedCommandKeys();
     78      if (encryptedCloseTabKeys) {
     79        commands[COMMAND_CLOSETAB] = encryptedCloseTabKeys;
     80      }
     81    }
     82 
     83    return commands;
     84  }
     85 
     86  async invoke(command, device, payload) {
     87    const { sessionToken } = await this._fxai.getUserAccountData([
     88      "sessionToken",
     89    ]);
     90    const client = this._fxai.fxAccountsClient;
     91    const now = Date.now();
     92    if (now < this._invokeRateLimitExpiry) {
     93      const remaining = (this._invokeRateLimitExpiry - now) / 1000;
     94      throw new Error(
     95        `Invoke for ${command} is rate-limited for ${remaining} seconds.`
     96      );
     97    }
     98    try {
     99      let info = await client.invokeCommand(
    100        sessionToken,
    101        command,
    102        device.id,
    103        payload
    104      );
    105      if (!info.enqueued || !info.notified) {
    106        // We want an error log here to help diagnose users who report failure.
    107        log.error("Sending was only partially successful", info);
    108      } else {
    109        log.info("Successfully sent", info);
    110      }
    111    } catch (err) {
    112      if (err.code && err.code === 429 && err.retryAfter) {
    113        this._invokeRateLimitExpiry = Date.now() + err.retryAfter * 1000;
    114      }
    115      throw err;
    116    }
    117    log.info(`Payload sent to device ${device.id}.`);
    118  }
    119 
    120  /**
    121   * Poll and handle device commands for the current device.
    122   * This method can be called either in response to a Push message,
    123   * or by itself as a "commands recovery" mechanism.
    124   *
    125   * @param {number} notifiedIndex "Command received" push messages include
    126   * the index of the command that triggered the message. We use it as a
    127   * hint when we have no "last command index" stored.
    128   */
    129  async pollDeviceCommands(notifiedIndex = 0) {
    130    // Whether the call to `pollDeviceCommands` was initiated by a Push message from the FxA
    131    // servers in response to a message being received or simply scheduled in order
    132    // to fetch missed messages.
    133    log.info(`Polling device commands.`);
    134    await this._fxai.withCurrentAccountState(async state => {
    135      const { device } = await state.getUserAccountData(["device"]);
    136      if (!device) {
    137        throw new Error("No device registration.");
    138      }
    139      // We increment lastCommandIndex by 1 because the server response includes the current index.
    140      // If we don't have a `lastCommandIndex` stored, we fall back on the index from the push message we just got.
    141      const lastCommandIndex = device.lastCommandIndex + 1 || notifiedIndex;
    142      // We have already received this message before.
    143      if (notifiedIndex > 0 && notifiedIndex < lastCommandIndex) {
    144        return;
    145      }
    146      const { index, messages } =
    147        await this._fetchDeviceCommands(lastCommandIndex);
    148      if (messages.length) {
    149        await state.updateUserAccountData({
    150          device: { ...device, lastCommandIndex: index },
    151        });
    152        log.info(`Handling ${messages.length} messages`);
    153        await this._handleCommands(messages, notifiedIndex);
    154      }
    155    });
    156    return true;
    157  }
    158 
    159  async _fetchDeviceCommands(index, limit = null) {
    160    const userData = await this._fxai.getUserAccountData();
    161    if (!userData) {
    162      throw new Error("No user.");
    163    }
    164    const { sessionToken } = userData;
    165    if (!sessionToken) {
    166      throw new Error("No session token.");
    167    }
    168    const client = this._fxai.fxAccountsClient;
    169    const opts = { index };
    170    if (limit != null) {
    171      opts.limit = limit;
    172    }
    173    return client.getCommands(sessionToken, opts);
    174  }
    175 
    176  _getReason(notifiedIndex, messageIndex) {
    177    // The returned reason value represents an explanation for why the command associated with the
    178    // message of the given `messageIndex` is being handled. If `notifiedIndex` is zero the command
    179    // is a part of a fallback polling process initiated by "Sync Now" ["poll"]. If `notifiedIndex` is
    180    // greater than `messageIndex` this is a push command that was previously missed ["push-missed"],
    181    // otherwise we assume this is a push command with no missed messages ["push"].
    182    if (notifiedIndex == 0) {
    183      return "poll";
    184    } else if (notifiedIndex > messageIndex) {
    185      return "push-missed";
    186    }
    187    // Note: The returned reason might be "push" in the case where a user sends multiple tabs
    188    // in quick succession. We are not attempting to distinguish this from other push cases at
    189    // present.
    190    return "push";
    191  }
    192 
    193  async _handleCommands(messages, notifiedIndex) {
    194    try {
    195      await this._fxai.device.refreshDeviceList();
    196    } catch (e) {
    197      log.warn("Error refreshing device list", e);
    198    }
    199    // We debounce multiple incoming tabs so we show a single notification.
    200    const tabsReceived = [];
    201    const tabsToClose = [];
    202    for (const { index, data } of messages) {
    203      const { command, payload, sender: senderId } = data;
    204      const reason = this._getReason(notifiedIndex, index);
    205      const sender =
    206        senderId && this._fxai.device.recentDeviceList
    207          ? this._fxai.device.recentDeviceList.find(d => d.id == senderId)
    208          : null;
    209      if (!sender) {
    210        log.warn(
    211          "Incoming command is from an unknown device (maybe disconnected?)"
    212        );
    213      }
    214      switch (command) {
    215        case COMMAND_CLOSETAB:
    216          try {
    217            const { urls } = await this.closeTab.handleTabClose(
    218              senderId,
    219              payload,
    220              reason
    221            );
    222            log.info(
    223              `Close Tab received with FxA commands: "${urls.length} tabs"
    224               from ${sender ? sender.name : "Unknown device"}.`
    225            );
    226            // URLs are PII, so only logged at trace.
    227            log.trace(`Close Remote Tabs received URLs: ${urls}`);
    228            tabsToClose.push({ urls, sender });
    229          } catch (e) {
    230            log.error(`Error while handling incoming Close Tab payload.`, e);
    231          }
    232          break;
    233        case COMMAND_SENDTAB:
    234          try {
    235            const { title, uri } = await this.sendTab.handle(
    236              senderId,
    237              payload,
    238              reason
    239            );
    240            log.info(
    241              `Tab received with FxA commands: "${
    242                title || "<no title>"
    243              }" from ${sender ? sender.name : "Unknown device"}.`
    244            );
    245            // URLs are PII, so only logged at trace.
    246            log.trace(`Tab received URL: ${uri}`);
    247            // This should eventually be rare to hit as all platforms will be using the same
    248            // scheme filter list, but we have this here in the case other platforms
    249            // haven't caught up and/or trying to send invalid uris using older versions
    250            const scheme = Services.io.newURI(uri).scheme;
    251            if (lazy.INVALID_SHAREABLE_SCHEMES.has(scheme)) {
    252              throw new Error("Invalid scheme found for received URI.");
    253            }
    254            tabsReceived.push({ title, uri, sender });
    255          } catch (e) {
    256            log.error(`Error while handling incoming Send Tab payload.`, e);
    257          }
    258          break;
    259        default:
    260          log.info(`Unknown command: ${command}.`);
    261      }
    262    }
    263    if (tabsReceived.length) {
    264      this._notifyFxATabsReceived(tabsReceived);
    265    }
    266    if (tabsToClose.length) {
    267      this._notifyFxATabsClosed(tabsToClose);
    268    }
    269  }
    270 
    271  _notifyFxATabsReceived(tabsReceived) {
    272    Observers.notify("fxaccounts:commands:open-uri", tabsReceived);
    273  }
    274 
    275  _notifyFxATabsClosed(tabsToClose) {
    276    Observers.notify("fxaccounts:commands:close-uri", tabsToClose);
    277  }
    278 }
    279 
    280 /**
    281 * This is built on top of FxA commands.
    282 *
    283 * Devices exchange keys wrapped in the oldsync key between themselves (getEncryptedCommandKeys)
    284 * during the device registration flow. The FxA server can theoretically never
    285 * retrieve the send tab keys since it doesn't know the oldsync key.
    286 *
    287 * Note about the keys:
    288 * The server has the `pushPublicKey`. The FxA server encrypt the send-tab payload again using the
    289 * push keys - after the client has encrypted the payload using the send-tab keys.
    290 * The push keys are different from the send-tab keys. The FxA server uses
    291 * the push keys to deliver the tabs using same mechanism we use for web-push.
    292 * However, clients use the send-tab keys for end-to-end encryption.
    293 *
    294 * Every command uses the same key management code, although each has its own key.
    295 */
    296 
    297 export class Command {
    298  constructor(commands, fxAccountsInternal) {
    299    this._commands = commands;
    300    this._fxai = fxAccountsInternal;
    301  }
    302 
    303  // Must be set by the command.
    304  deviceCapability; // eg, COMMAND_SENDTAB;
    305  keyFieldName; // eg, "sendTabKeys";
    306  encryptedKeyFieldName; // eg, "encryptedSendTabKeys"
    307 
    308  // Returns true if the target device is compatible with FxA Commands Send tab.
    309  isDeviceCompatible(device) {
    310    return (
    311      device.availableCommands &&
    312      device.availableCommands[this.deviceCapability]
    313    );
    314  }
    315 
    316  async _encrypt(bytes, device) {
    317    let bundle = device.availableCommands[this.deviceCapability];
    318    if (!bundle) {
    319      throw new Error(`Device ${device.id} does not have send tab keys.`);
    320    }
    321    const oldsyncKey = await this._fxai.keys.getKeyForScope(SCOPE_OLD_SYNC);
    322    // Older clients expect this to be hex, due to pre-JWK sync key ids :-(
    323    const ourKid = this._fxai.keys.kidAsHex(oldsyncKey);
    324    const { kid: theirKid } = JSON.parse(
    325      device.availableCommands[this.deviceCapability]
    326    );
    327    if (theirKid != ourKid) {
    328      throw new Error("Target Send Tab key ID is different from ours");
    329    }
    330    const json = JSON.parse(bundle);
    331    const wrapper = new lazy.CryptoWrapper();
    332    wrapper.deserialize({ payload: json });
    333    const syncKeyBundle = lazy.BulkKeyBundle.fromJWK(oldsyncKey);
    334    let { publicKey, authSecret } = await wrapper.decrypt(syncKeyBundle);
    335    authSecret = urlsafeBase64Decode(authSecret);
    336    publicKey = urlsafeBase64Decode(publicKey);
    337 
    338    const { ciphertext: encrypted } = await lazy.PushCrypto.encrypt(
    339      bytes,
    340      publicKey,
    341      authSecret
    342    );
    343    return urlsafeBase64Encode(encrypted);
    344  }
    345 
    346  async _decrypt(ciphertext) {
    347    let { privateKey, publicKey, authSecret } =
    348      await this._getPersistedCommandKeys();
    349    publicKey = urlsafeBase64Decode(publicKey);
    350    authSecret = urlsafeBase64Decode(authSecret);
    351    ciphertext = new Uint8Array(urlsafeBase64Decode(ciphertext));
    352    return lazy.PushCrypto.decrypt(
    353      privateKey,
    354      publicKey,
    355      authSecret,
    356      // The only Push encoding we support.
    357      { encoding: "aes128gcm" },
    358      ciphertext
    359    );
    360  }
    361 
    362  async _getPersistedCommandKeys() {
    363    const { device } = await this._fxai.getUserAccountData(["device"]);
    364    return device && device[this.keyFieldName];
    365  }
    366 
    367  async _generateAndPersistCommandKeys() {
    368    let [publicKey, privateKey] = await lazy.PushCrypto.generateKeys();
    369    publicKey = urlsafeBase64Encode(publicKey);
    370    let authSecret = lazy.PushCrypto.generateAuthenticationSecret();
    371    authSecret = urlsafeBase64Encode(authSecret);
    372    const sendTabKeys = {
    373      publicKey,
    374      privateKey,
    375      authSecret,
    376    };
    377    await this._fxai.withCurrentAccountState(async state => {
    378      const { device = {} } = await state.getUserAccountData(["device"]);
    379      device[this.keyFieldName] = sendTabKeys;
    380      log.trace(
    381        `writing to ${this.keyFieldName} for command ${this.deviceCapability}`
    382      );
    383      await state.updateUserAccountData({
    384        device,
    385      });
    386    });
    387    return sendTabKeys;
    388  }
    389 
    390  async _getPersistedEncryptedCommandKey() {
    391    const data = await this._fxai.getUserAccountData([
    392      this.encryptedKeyFieldName,
    393    ]);
    394    return data[this.encryptedKeyFieldName];
    395  }
    396 
    397  async _generateAndPersistEncryptedCommandKey() {
    398    if (!(await this._fxai.keys.canGetKeyForScope(SCOPE_OLD_SYNC))) {
    399      log.info("Can't fetch keys, so unable to determine command keys");
    400      return null;
    401    }
    402    let sendTabKeys = await this._getPersistedCommandKeys();
    403    if (!sendTabKeys) {
    404      log.info("Could not find command keys, generating them");
    405      sendTabKeys = await this._generateAndPersistCommandKeys();
    406    }
    407    // Strip the private key from the bundle to encrypt.
    408    const keyToEncrypt = {
    409      publicKey: sendTabKeys.publicKey,
    410      authSecret: sendTabKeys.authSecret,
    411    };
    412    let oldsyncKey;
    413    try {
    414      oldsyncKey = await this._fxai.keys.getKeyForScope(SCOPE_OLD_SYNC);
    415    } catch (ex) {
    416      log.warn("Failed to fetch keys, so unable to determine command keys", ex);
    417      return null;
    418    }
    419    const wrapper = new lazy.CryptoWrapper();
    420    wrapper.cleartext = keyToEncrypt;
    421    const keyBundle = lazy.BulkKeyBundle.fromJWK(oldsyncKey);
    422    await wrapper.encrypt(keyBundle);
    423    const encryptedSendTabKeys = JSON.stringify({
    424      // This is expected in hex, due to pre-JWK sync key ids :-(
    425      kid: this._fxai.keys.kidAsHex(oldsyncKey),
    426      IV: wrapper.IV,
    427      hmac: wrapper.hmac,
    428      ciphertext: wrapper.ciphertext,
    429    });
    430    await this._fxai.withCurrentAccountState(async state => {
    431      let data = {};
    432      data[this.encryptedKeyFieldName] = encryptedSendTabKeys;
    433      await state.updateUserAccountData(data);
    434    });
    435    return encryptedSendTabKeys;
    436  }
    437 
    438  async getEncryptedCommandKeys() {
    439    log.trace("Getting command keys", this.deviceCapability);
    440    let encryptedSendTabKeys = await this._getPersistedEncryptedCommandKey();
    441    const sendTabKeys = await this._getPersistedCommandKeys();
    442    if (!encryptedSendTabKeys || !sendTabKeys) {
    443      log.info(
    444        `Generating and persisting encrypted key (${!!encryptedSendTabKeys}, ${!!sendTabKeys})`
    445      );
    446      // Generating the encrypted key requires the sync key so we expect to fail
    447      // in some cases (primary password is locked, account not verified, etc)
    448      // However, we will eventually end up generating it when we can, and device registration
    449      // will handle this late update and update the remote record as necessary, so it gets there in the end.
    450      // It's okay to persist these keys in plain text; they're encrypted.
    451      encryptedSendTabKeys =
    452        await this._generateAndPersistEncryptedCommandKey();
    453    }
    454    return encryptedSendTabKeys;
    455  }
    456 }
    457 
    458 /**
    459 * Send Tab
    460 */
    461 export class SendTab extends Command {
    462  deviceCapability = COMMAND_SENDTAB;
    463  keyFieldName = "sendTabKeys";
    464  encryptedKeyFieldName = "encryptedSendTabKeys";
    465 
    466  /**
    467   * @param {Device[]} to - Device objects (typically returned by fxAccounts.getDevicesList()).
    468   * @param {object} tab
    469   * @param {string} tab.url
    470   * @param {string} tab.title
    471   * @returns A report object, in the shape of
    472   *          {succeded: [Device], error: [{device: Device, error: Exception}]}
    473   */
    474  async send(to, tab) {
    475    log.info(`Sending a tab to ${to.length} devices.`);
    476    const flowID = this._fxai.telemetry.generateFlowID();
    477    const encoder = new TextEncoder();
    478    const data = { entries: [{ title: tab.title, url: tab.url }] };
    479    const report = {
    480      succeeded: [],
    481      failed: [],
    482    };
    483    for (let device of to) {
    484      try {
    485        const streamID = this._fxai.telemetry.generateFlowID();
    486        const targetData = Object.assign({ flowID, streamID }, data);
    487        const bytes = encoder.encode(JSON.stringify(targetData));
    488        const encrypted = await this._encrypt(bytes, device);
    489        // FxA expects an object as the payload, but we only have a single encrypted string; wrap it.
    490        // If you add any plaintext items to this payload, please carefully consider the privacy implications
    491        // of revealing that data to the FxA server.
    492        const payload = { encrypted };
    493        await this._commands.invoke(COMMAND_SENDTAB, device, payload);
    494        const deviceId = this._fxai.telemetry.sanitizeDeviceId(device.id);
    495        Glean.fxa.sendtabSent.record({
    496          flow_id: flowID,
    497          hashed_device_id: deviceId,
    498          server_time: lazy.Resource.serverTime,
    499          stream_id: streamID,
    500        });
    501        this._fxai.telemetry.recordEvent(
    502          "command-sent",
    503          COMMAND_SENDTAB_TAIL,
    504          deviceId,
    505          { flowID, streamID }
    506        );
    507        report.succeeded.push(device);
    508      } catch (error) {
    509        log.error("Error while invoking a send tab command.", error);
    510        report.failed.push({ device, error });
    511      }
    512    }
    513    return report;
    514  }
    515 
    516  // Handle incoming send tab payload, called by FxAccountsCommands.
    517  async handle(senderID, { encrypted }, reason) {
    518    const bytes = await this._decrypt(encrypted);
    519    const decoder = new TextDecoder("utf8");
    520    const data = JSON.parse(decoder.decode(bytes));
    521    const { flowID, streamID, entries } = data;
    522    const current = data.hasOwnProperty("current")
    523      ? data.current
    524      : entries.length - 1;
    525    const { title, url: uri } = entries[current];
    526    // `flowID` and `streamID` are in the top-level of the JSON, `entries` is
    527    // an array of "tabs" with `current` being what index is the one we care
    528    // about, or the last one if not specified.
    529    const deviceId = this._fxai.telemetry.sanitizeDeviceId(senderID);
    530    Glean.fxa.sendtabReceived.record({
    531      flow_id: flowID,
    532      hashed_device_id: deviceId,
    533      reason,
    534      server_time: lazy.Resource.serverTime,
    535      stream_id: streamID,
    536    });
    537    this._fxai.telemetry.recordEvent(
    538      "command-received",
    539      COMMAND_SENDTAB_TAIL,
    540      deviceId,
    541      { flowID, streamID, reason }
    542    );
    543 
    544    return {
    545      title,
    546      uri,
    547    };
    548  }
    549 }
    550 
    551 /**
    552 * Close Tabs
    553 */
    554 export class CloseRemoteTab extends Command {
    555  deviceCapability = COMMAND_CLOSETAB;
    556  keyFieldName = "closeTabKeys";
    557  encryptedKeyFieldName = "encryptedCloseTabKeys";
    558 
    559  /**
    560   * @param {Device} target - Device object (typically returned by fxAccounts.getDevicesList()).
    561   * @param {string[]} urls - array of urls that should be closed on the remote device
    562   */
    563  async sendCloseTabsCommand(target, urls, flowID) {
    564    log.info(`Sending tab closures to ${target.id} device.`);
    565    const encoder = new TextEncoder();
    566    try {
    567      const streamID = this._fxai.telemetry.generateFlowID();
    568      const targetData = { flowID, streamID, urls };
    569      const bytes = encoder.encode(JSON.stringify(targetData));
    570      const encrypted = await this._encrypt(bytes, target);
    571      // FxA expects an object as the payload, but we only have a single encrypted string; wrap it.
    572      // If you add any plaintext items to this payload, please carefully consider the privacy implications
    573      // of revealing that data to the FxA server.
    574      const payload = { encrypted };
    575      await this._commands.invoke(COMMAND_CLOSETAB, target, payload);
    576      const deviceId = this._fxai.telemetry.sanitizeDeviceId(target.id);
    577      Glean.fxa.closetabSent.record({
    578        flow_id: flowID,
    579        hashed_device_id: deviceId,
    580        server_time: lazy.Resource.serverTime,
    581        stream_id: streamID,
    582      });
    583      this._fxai.telemetry.recordEvent(
    584        "command-sent",
    585        COMMAND_CLOSETAB_TAIL,
    586        deviceId,
    587        { flowID, streamID }
    588      );
    589      return true;
    590    } catch (error) {
    591      // We should also show the user there was some kind've error
    592      log.error("Error while invoking a send tab command.", error);
    593      return false;
    594    }
    595  }
    596 
    597  // Returns true if:
    598  // - The target device is compatible with closing a tab (device capability) and
    599  // - The local device has the feature enabled locally
    600  isDeviceCompatible(device) {
    601    return (
    602      lazy.NimbusFeatures.remoteTabManagement.getVariable("closeTabsEnabled") &&
    603      super.isDeviceCompatible(device)
    604    );
    605  }
    606 
    607  // Handle incoming remote tab payload, called by FxAccountsCommands.
    608  async handleTabClose(senderID, { encrypted }, reason) {
    609    const bytes = await this._decrypt(encrypted);
    610    const decoder = new TextDecoder("utf8");
    611    const data = JSON.parse(decoder.decode(bytes));
    612    // urls is an array of strings
    613    const { flowID, streamID, urls } = data;
    614    const deviceId = this._fxai.telemetry.sanitizeDeviceId(senderID);
    615    Glean.fxa.closetabReceived.record({
    616      flow_id: flowID,
    617      hashed_device_id: deviceId,
    618      reason,
    619      server_time: lazy.Resource.serverTime,
    620      stream_id: streamID,
    621    });
    622    this._fxai.telemetry.recordEvent(
    623      "command-received",
    624      COMMAND_CLOSETAB_TAIL,
    625      deviceId,
    626      { flowID, streamID, reason }
    627    );
    628 
    629    return {
    630      urls,
    631    };
    632  }
    633 }
    634 
    635 export class CommandQueue {
    636  // The delay between a command being queued and it being actioned. This delay
    637  // is primarily to support "undo" functionality in the UI.
    638  // It's likely we will end up needing a different delay per command (including no delay), but this
    639  // seems fine while we work that out.
    640  DELAY = 5000;
    641 
    642  // The timer ID if we have one scheduled, otherwise null
    643  #timer = null;
    644 
    645  // `this.#onShutdown` bound to `this`.
    646  #onShutdownBound = null;
    647 
    648  // Since we only ever show one notification to the user
    649  // we keep track of how many tabs have actually been closed
    650  // and update the count, user dismissing the notification will
    651  // reset the count
    652  closeTabNotificationCount = 0;
    653  hasPendingCloseTabNotification = false;
    654 
    655  // We ensure the queue is flushed soon after startup. After the first tab sync we see, we
    656  // wait for this many seconds of being idle before checking.
    657  // Note that this delay has nothing to do with DELAY - that is for "undo" capability, this
    658  // delay is to ensure we don't put unnecessary load on the browser during startup.
    659  #idleThresholdSeconds = 3;
    660  #isObservingTabSyncs = false;
    661  // This helps ensure we aren't flushing the queue multiple times concurrently.
    662  #flushQueuePromise = null;
    663 
    664  constructor(commands, fxAccountsInternal) {
    665    this._commands = commands;
    666    this._fxai = fxAccountsInternal;
    667    Services.obs.addObserver(this, "services.sync.tabs.command-queued");
    668    Services.obs.addObserver(this, "weave:engine:sync:finish");
    669    this.#isObservingTabSyncs = true;
    670    log.trace("Command queue observer created");
    671    this.#onShutdownBound = this.#onShutdown.bind(this);
    672    lazy.AsyncShutdown.appShutdownConfirmed.addBlocker(
    673      "FxAccountsCommands: flush command queue",
    674      this.#onShutdownBound
    675    );
    676  }
    677 
    678  // Used for tests - when in the browser this object lives forever.
    679  shutdown() {
    680    if (this.#timer) {
    681      clearTimeout(this.#timer);
    682    }
    683    Services.obs.removeObserver(this, "services.sync.tabs.command-queued");
    684    if (this.#isObservingTabSyncs) {
    685      Services.obs.removeObserver(this, "weave:engine:sync:finish");
    686      this.#isObservingTabSyncs = false;
    687    }
    688    lazy.AsyncShutdown.appShutdownConfirmed.removeBlocker(
    689      this.#onShutdownBound
    690    );
    691    this.#onShutdownBound = null;
    692  }
    693 
    694  observe(subject, topic, data) {
    695    log.trace(
    696      `CommandQueue observed topic=${topic}, data=${data}, subject=${subject}`
    697    );
    698    switch (topic) {
    699      case "services.sync.tabs.command-queued":
    700        this.flushQueue().catch(e => {
    701          log.error("Failed to flush the outgoing queue", e);
    702        });
    703        break;
    704 
    705      case "weave:engine:sync:finish":
    706        // This is to pick up pending commands we failed to send in the last session.
    707        if (data != "tabs") {
    708          return;
    709        }
    710        Services.obs.removeObserver(this, "weave:engine:sync:finish");
    711        this.#isObservingTabSyncs = false;
    712        this.#checkQueueAfterStartup();
    713        break;
    714 
    715      default:
    716        log.error(`unexpected observer topic: ${topic}`);
    717    }
    718  }
    719 
    720  // for test mocking.
    721  _getIdleService() {
    722    return lazy.idleService;
    723  }
    724 
    725  async #checkQueueAfterStartup() {
    726    // do this on idle because we are probably syncing quite close to startup.
    727    const idleService = this._getIdleService();
    728    const idleObserver = (/* subject, topic, data */) => {
    729      idleService.removeIdleObserver(idleObserver, this.#idleThresholdSeconds);
    730      log.info("checking if the command queue is empty now we are idle");
    731      this.flushQueue()
    732        .then(didSend => {
    733          // TODO: it would be good to get telemetry here, because we expect this to be true rarely.
    734          log.info(
    735            `pending command check had ${didSend ? "some" : "no"} commands`
    736          );
    737        })
    738        .catch(err => {
    739          log.error(
    740            "Checking for pending tab commands after first tab sync failed",
    741            err
    742          );
    743        });
    744    };
    745    idleService.addIdleObserver(idleObserver, this.#idleThresholdSeconds);
    746  }
    747 
    748  async flushQueue(isForShutdown = false) {
    749    // We really don't want multiple queue flushes concurrently, which is a real possibility.
    750    // If we are shutting down and there's already a `flushQueue()` running, it's almost certainly
    751    // not going to be `isForShutdown()`. We don't really want to wait for that to complete just
    752    // to start another, so there's a risk we will fail to send commands in that scenario - but
    753    // we will send them at startup time.
    754    if (this.#flushQueuePromise == null) {
    755      this.#flushQueuePromise = this.#flushQueue(isForShutdown);
    756    }
    757    try {
    758      return await this.#flushQueuePromise;
    759    } finally {
    760      this.#flushQueuePromise = null;
    761    }
    762  }
    763 
    764  async #flushQueue(isForShutdown) {
    765    // get all the queued items to work out what's ready to send. If a device has queued item less than
    766    // our pushDelay, then we don't send *any* command for that device yet, but ensure a timer is set
    767    // for the delay.
    768    let store = await lazy.getRemoteCommandStore();
    769    let pending = await store.getUnsentCommands();
    770    log.trace("flushQueue total queued items", pending.length);
    771    // any timeRequested less than `sendThreshold` should be sent now (unless we are shutting down,
    772    // in which case we send everything now)
    773    let now = this.now();
    774    // We want to be efficient with batching commands to send to the user
    775    // so we categorize things into 3 buckets:
    776    // mustSend - overdue and should be sent as early as we can
    777    // canSend - is due but not yet "overdue", should be sent if possible
    778    // early - can still be undone and should not be sent yet
    779    const mustSendThreshold = isForShutdown ? Infinity : now - this.DELAY;
    780    const canSendThreshold = isForShutdown ? Infinity : now - this.DELAY * 2;
    781    // make a map of deviceId -> device
    782    let recentDevices = this._fxai.device.recentDeviceList;
    783    if (!recentDevices.length) {
    784      // If we can't map a device ID to the device with the keys etc, we are screwed!
    785      log.error("No devices available for queued tab commands");
    786      return false;
    787    }
    788    let deviceMap = new Map(recentDevices.map(d => [d.id, d]));
    789    // make a map of commands keyed by device ID.
    790    let byDevice = Map.groupBy(pending, c => c.deviceId);
    791    let nextTime = Infinity;
    792    let didSend = false;
    793    for (let [deviceId, commands] of byDevice) {
    794      let device = deviceMap.get(deviceId);
    795      if (!device) {
    796        // If we can't map *this* device ID to a device with the keys etc, we are screwed!
    797        // This however *is* possible if the target device was disconnected before we had a chance to send it,
    798        // so remove this item.
    799        log.warn("Unknown device for queued tab commands", deviceId);
    800        await Promise.all(
    801          commands.map(command => store.removeRemoteCommand(deviceId, command))
    802        );
    803        continue;
    804      }
    805      let toSend = [];
    806      // We process in reverse order so it's newest-to-oldest
    807      // which means if the newest is already a "must send"
    808      // we can simply send all of the "can sends"
    809      for (const command of commands.reverse()) {
    810        if (!(command.command instanceof lazy.RemoteCommand.CloseTab)) {
    811          log.error(`Ignoring unknown pending command ${command}`);
    812          continue;
    813        }
    814        if (command.timeRequested <= mustSendThreshold) {
    815          log.trace(
    816            `command for url ${command.command.url} is overdue, adding to send`
    817          );
    818          toSend.push(command);
    819        } else if (command.timeRequested <= canSendThreshold) {
    820          if (toSend.length) {
    821            log.trace(`command for url ${command.command.url} is due,
    822              since there others to be sent, also adding to send`);
    823            toSend.push(command);
    824          } else {
    825            // Though it's due, since there are no others we can check again
    826            // and see if we can batch
    827            nextTime = Math.min(nextTime, command.timeRequested + this.DELAY);
    828          }
    829        } else {
    830          // We set the next timer just a little later to ensure we'll have an overdue
    831          nextTime = Math.min(
    832            nextTime,
    833            command.timeRequested + this.DELAY * 1.1
    834          );
    835          // Since the list is sorted newest to oldest,
    836          // we can assume the rest are not ready
    837          break;
    838        }
    839      }
    840 
    841      if (toSend.length) {
    842        let urlsToClose = toSend.map(c => c.command.url);
    843        // Generate a flowID to use for all chunked commands
    844        const flowID = this._fxai.telemetry.generateFlowID();
    845        // If we're dealing with large sets of urls, we should split them across
    846        // multiple payloads to prevent breaking the issues for the user
    847        let chunks = this.chunkUrls(urlsToClose, COMMAND_MAX_PAYLOAD_SIZE);
    848        for (let chunk of chunks) {
    849          if (
    850            await this._commands.closeTab.sendCloseTabsCommand(
    851              device,
    852              chunk,
    853              flowID
    854            )
    855          ) {
    856            // We build a set from the sent urls for faster comparing
    857            const urlChunkSet = new Set(chunk);
    858            // success! Mark them as sent.
    859            for (let cmd of toSend.filter(c =>
    860              urlChunkSet.has(c.command.url)
    861            )) {
    862              log.trace(
    863                `Setting pending command for device ${deviceId} as sent`,
    864                cmd
    865              );
    866              await store.setPendingCommandSent(cmd);
    867              didSend = true;
    868            }
    869          } else {
    870            // We should investigate a better backoff strategy
    871            // https://bugzilla.mozilla.org/show_bug.cgi?id=1899433
    872            // For now just say 60s.
    873            log.warn(
    874              `Failed to send close tab commands for device ${deviceId}`
    875            );
    876            nextTime = Math.min(nextTime, now + 60000);
    877          }
    878        }
    879      } else {
    880        log.trace(`Skipping send for device ${deviceId}`);
    881      }
    882    }
    883 
    884    if (didSend) {
    885      Services.obs.notifyObservers(null, TOPIC_TABS_CHANGED);
    886    }
    887 
    888    if (nextTime == Infinity) {
    889      log.info("No new close-tab timer needed");
    890    } else if (isForShutdown) {
    891      // because we never delay sending in this case the logic above should never set `nextTime`
    892      log.error(
    893        "logic error in command queue manager: flush for shutdown should never set a timer"
    894      );
    895    } else {
    896      let delay = nextTime - now + 10;
    897      log.trace(`Setting new close-tab timer for ${delay}ms`);
    898      this._ensureTimer(delay);
    899    }
    900    return didSend;
    901  }
    902 
    903  // Take a an array of urls and a max size and split them into chunks
    904  // that are smaller than the passed in max size
    905  // Note: This method modifies the passed in array
    906  chunkUrls(urls, maxSize) {
    907    let chunks = [];
    908 
    909    // For optimal packing, we sort the array of urls from shortest-to-longest
    910    urls.sort((a, b) => a.length - b.length);
    911 
    912    while (urls.length) {
    913      let chunk = lazy.Utils.tryFitItems(urls, maxSize);
    914      if (!chunk.length) {
    915        // None of the remaining URLs can fit into a single command
    916        urls.forEach(url => {
    917          log.warn(`Skipping oversized URL: ${url}`);
    918        });
    919        break;
    920      }
    921      chunks.push(chunk);
    922      // Remove the processed URLs from the list
    923      urls.splice(0, chunk.length);
    924    }
    925    return chunks;
    926  }
    927 
    928  async _ensureTimer(timeout) {
    929    log.info(
    930      `Setting a new close-tab timer with delay=${timeout} with existing timer=${!!this
    931        .#timer}`
    932    );
    933 
    934    if (this.#timer) {
    935      clearTimeout(this.#timer);
    936    }
    937 
    938    // If the browser shuts down while a timer exists we should force the send
    939    // While we should pick up the command after a restart, we don't know
    940    // how long that will be.
    941    // See https://bugzilla.mozilla.org/show_bug.cgi?id=1888299
    942    this.#timer = setTimeout(async () => {
    943      // XXX - this might be racey - if a new timer fires before this promise resolves - it
    944      // might seem unlikely, but network is involved!
    945      // flushQueue might create another timer, so we must clear our current timer first.
    946      this.#timer = null;
    947      await this.flushQueue();
    948    }, timeout);
    949  }
    950 
    951  // On shutdown we want to send any pending items - ie, pretend the timer fired *now*.
    952  // Sadly it's not easy for us to abort any in-flight requests, nor to limit the amount of
    953  // time any new requests we create take, so we don't do this for now. This means that in
    954  // the case of a super slow network or super slow FxA, we might crash at shutdown, but we
    955  // can think of doing this in a followup.
    956  async #onShutdown() {
    957    // If there is no timer set, then there's nothing pending to do.
    958    log.debug(
    959      `CommandQueue shutdown is flushing the queue with a timer=${!!this
    960        .#timer}`
    961    );
    962    if (this.#timer) {
    963      // We don't want the current one to fire at the same time!
    964      clearTimeout(this.#timer);
    965      this.#timer = null;
    966      await this.flushQueue(true);
    967    }
    968  }
    969 
    970  // hook points for tests.
    971  now() {
    972    return Date.now();
    973  }
    974 }
    975 
    976 function urlsafeBase64Encode(buffer) {
    977  return ChromeUtils.base64URLEncode(new Uint8Array(buffer), { pad: false });
    978 }
    979 
    980 function urlsafeBase64Decode(str) {
    981  return ChromeUtils.base64URLDecode(str, { padding: "reject" });
    982 }