tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

engines.sys.mjs (70363B)


      1 /* This Source Code Form is subject to the terms of the Mozilla Public
      2 * License, v. 2.0. If a copy of the MPL was not distributed with this
      3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      4 
      5 import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs";
      6 
      7 import { JSONFile } from "resource://gre/modules/JSONFile.sys.mjs";
      8 import { Log } from "resource://gre/modules/Log.sys.mjs";
      9 
     10 import { Async } from "resource://services-common/async.sys.mjs";
     11 import { Observers } from "resource://services-common/observers.sys.mjs";
     12 
     13 import {
     14  DEFAULT_DOWNLOAD_BATCH_SIZE,
     15  DEFAULT_GUID_FETCH_BATCH_SIZE,
     16  ENGINE_BATCH_INTERRUPTED,
     17  ENGINE_DOWNLOAD_FAIL,
     18  ENGINE_UPLOAD_FAIL,
     19  VERSION_OUT_OF_DATE,
     20  PREFS_BRANCH,
     21 } from "resource://services-sync/constants.sys.mjs";
     22 
     23 import {
     24  Collection,
     25  CryptoWrapper,
     26 } from "resource://services-sync/record.sys.mjs";
     27 import { Resource } from "resource://services-sync/resource.sys.mjs";
     28 import {
     29  SerializableSet,
     30  Svc,
     31  Utils,
     32 } from "resource://services-sync/util.sys.mjs";
     33 import { SyncedRecordsTelemetry } from "resource://services-sync/telemetry.sys.mjs";
     34 
     35 const lazy = {};
     36 
     37 ChromeUtils.defineESModuleGetters(lazy, {
     38  PlacesUtils: "resource://gre/modules/PlacesUtils.sys.mjs",
     39 });
     40 
     41 function ensureDirectory(path) {
     42  return IOUtils.makeDirectory(PathUtils.parent(path), {
     43    createAncestors: true,
     44  });
     45 }
     46 
     47 /**
     48 * Trackers are associated with a single engine and deal with
     49 * listening for changes to their particular data type.
     50 *
     51 * The base `Tracker` only supports listening for changes, and bumping the score
     52 * to indicate how urgently the engine wants to sync. It does not persist any
     53 * data. Engines that track changes directly in the storage layer (like
     54 * bookmarks, bridged engines, addresses, and credit cards) or only upload a
     55 * single record (tabs and preferences) should subclass `Tracker`.
     56 */
     57 export function Tracker(name, engine) {
     58  if (!engine) {
     59    throw new Error("Tracker must be associated with an Engine instance.");
     60  }
     61 
     62  name = name || "Unnamed";
     63  this.name = name.toLowerCase();
     64  this.engine = engine;
     65 
     66  this._log = Log.repository.getLogger(`Sync.Engine.${name}.Tracker`);
     67 
     68  this._score = 0;
     69 
     70  this.asyncObserver = Async.asyncObserver(this, this._log);
     71 }
     72 
     73 Tracker.prototype = {
     74  // New-style trackers use change sources to filter out changes made by Sync in
     75  // observer notifications, so we don't want to let the engine ignore all
     76  // changes during a sync.
     77  get ignoreAll() {
     78    return false;
     79  },
     80 
     81  // Define an empty setter so that the engine doesn't throw a `TypeError`
     82  // setting a read-only property.
     83  set ignoreAll(value) {},
     84 
     85  /*
     86   * Score can be called as often as desired to decide which engines to sync
     87   *
     88   * Valid values for score:
     89   * -1: Do not sync unless the user specifically requests it (almost disabled)
     90   * 0: Nothing has changed
     91   * 100: Please sync me ASAP!
     92   *
     93   * Setting it to other values should (but doesn't currently) throw an exception
     94   */
     95  get score() {
     96    return this._score;
     97  },
     98 
     99  set score(value) {
    100    this._score = value;
    101    Observers.notify("weave:engine:score:updated", this.name);
    102  },
    103 
    104  // Should be called by service everytime a sync has been done for an engine
    105  resetScore() {
    106    this._score = 0;
    107  },
    108 
    109  // Unsupported, and throws a more descriptive error to ensure callers aren't
    110  // accidentally using persistence.
    111  async getChangedIDs() {
    112    throw new TypeError("This tracker doesn't store changed IDs");
    113  },
    114 
    115  // Also unsupported.
    116  async addChangedID() {
    117    throw new TypeError("Can't add changed ID to this tracker");
    118  },
    119 
    120  // Ditto.
    121  async removeChangedID() {
    122    throw new TypeError("Can't remove changed IDs from this tracker");
    123  },
    124 
    125  // This method is called at various times, so we override with a no-op
    126  // instead of throwing.
    127  clearChangedIDs() {},
    128 
    129  _now() {
    130    return Date.now() / 1000;
    131  },
    132 
    133  _isTracking: false,
    134 
    135  start() {
    136    if (!this.engineIsEnabled()) {
    137      return;
    138    }
    139    this._log.trace("start().");
    140    if (!this._isTracking) {
    141      this.onStart();
    142      this._isTracking = true;
    143    }
    144  },
    145 
    146  async stop() {
    147    this._log.trace("stop().");
    148    if (this._isTracking) {
    149      await this.asyncObserver.promiseObserversComplete();
    150      this.onStop();
    151      this._isTracking = false;
    152    }
    153  },
    154 
    155  // Override these in your subclasses.
    156  onStart() {},
    157  onStop() {},
    158  async observe() {},
    159 
    160  engineIsEnabled() {
    161    if (!this.engine) {
    162      // Can't tell -- we must be running in a test!
    163      return true;
    164    }
    165    return this.engine.enabled;
    166  },
    167 
    168  /**
    169   * Starts or stops listening for changes depending on the associated engine's
    170   * enabled state.
    171   *
    172   * @param {boolean} engineEnabled Whether the engine was enabled.
    173   */
    174  async onEngineEnabledChanged(engineEnabled) {
    175    if (engineEnabled == this._isTracking) {
    176      return;
    177    }
    178 
    179    if (engineEnabled) {
    180      this.start();
    181    } else {
    182      await this.stop();
    183      this.clearChangedIDs();
    184    }
    185  },
    186 
    187  async finalize() {
    188    await this.stop();
    189  },
    190 };
    191 
    192 /*
    193 * A tracker that persists a list of IDs for all changed items that need to be
    194 * synced. This is 🚨 _extremely deprecated_ 🚨 and only kept around for current
    195 * engines. ⚠️ Please **don't use it** for new engines! ⚠️
    196 *
    197 * Why is this kind of external change tracking deprecated? Because it causes
    198 * consistency issues due to missed notifications, interrupted syncs, and the
    199 * tracker's view of what changed diverging from the data store's.
    200 */
    201 export function LegacyTracker(name, engine) {
    202  Tracker.call(this, name, engine);
    203 
    204  this._ignored = [];
    205  this.file = this.name;
    206  this._storage = new JSONFile({
    207    path: Utils.jsonFilePath("changes", this.file),
    208    dataPostProcessor: json => this._dataPostProcessor(json),
    209    beforeSave: () => this._beforeSave(),
    210  });
    211  this._ignoreAll = false;
    212 }
    213 
    214 LegacyTracker.prototype = {
    215  get ignoreAll() {
    216    return this._ignoreAll;
    217  },
    218 
    219  set ignoreAll(value) {
    220    this._ignoreAll = value;
    221  },
    222 
    223  // Default to an empty object if the file doesn't exist.
    224  _dataPostProcessor(json) {
    225    return (typeof json == "object" && json) || {};
    226  },
    227 
    228  // Ensure the Weave storage directory exists before writing the file.
    229  _beforeSave() {
    230    return ensureDirectory(this._storage.path);
    231  },
    232 
    233  async getChangedIDs() {
    234    await this._storage.load();
    235    return this._storage.data;
    236  },
    237 
    238  _saveChangedIDs() {
    239    this._storage.saveSoon();
    240  },
    241 
    242  // ignore/unignore specific IDs.  Useful for ignoring items that are
    243  // being processed, or that shouldn't be synced.
    244  // But note: not persisted to disk
    245 
    246  ignoreID(id) {
    247    this.unignoreID(id);
    248    this._ignored.push(id);
    249  },
    250 
    251  unignoreID(id) {
    252    let index = this._ignored.indexOf(id);
    253    if (index != -1) {
    254      this._ignored.splice(index, 1);
    255    }
    256  },
    257 
    258  async _saveChangedID(id, when) {
    259    this._log.trace(`Adding changed ID: ${id}, ${JSON.stringify(when)}`);
    260    const changedIDs = await this.getChangedIDs();
    261    changedIDs[id] = when;
    262    this._saveChangedIDs();
    263  },
    264 
    265  async addChangedID(id, when) {
    266    if (!id) {
    267      this._log.warn("Attempted to add undefined ID to tracker");
    268      return false;
    269    }
    270 
    271    if (this.ignoreAll || this._ignored.includes(id)) {
    272      return false;
    273    }
    274 
    275    // Default to the current time in seconds if no time is provided.
    276    if (when == null) {
    277      when = this._now();
    278    }
    279 
    280    const changedIDs = await this.getChangedIDs();
    281    // Add/update the entry if we have a newer time.
    282    if ((changedIDs[id] || -Infinity) < when) {
    283      await this._saveChangedID(id, when);
    284    }
    285 
    286    return true;
    287  },
    288 
    289  async removeChangedID(...ids) {
    290    if (!ids.length || this.ignoreAll) {
    291      return false;
    292    }
    293    for (let id of ids) {
    294      if (!id) {
    295        this._log.warn("Attempted to remove undefined ID from tracker");
    296        continue;
    297      }
    298      if (this._ignored.includes(id)) {
    299        this._log.debug(`Not removing ignored ID ${id} from tracker`);
    300        continue;
    301      }
    302      const changedIDs = await this.getChangedIDs();
    303      if (changedIDs[id] != null) {
    304        this._log.trace("Removing changed ID " + id);
    305        delete changedIDs[id];
    306      }
    307    }
    308    this._saveChangedIDs();
    309    return true;
    310  },
    311 
    312  clearChangedIDs() {
    313    this._log.trace("Clearing changed ID list");
    314    this._storage.data = {};
    315    this._saveChangedIDs();
    316  },
    317 
    318  async finalize() {
    319    // Persist all pending tracked changes to disk, and wait for the final write
    320    // to finish.
    321    await super.finalize();
    322    this._saveChangedIDs();
    323    await this._storage.finalize();
    324  },
    325 };
    326 Object.setPrototypeOf(LegacyTracker.prototype, Tracker.prototype);
    327 
    328 /**
    329 * The Store serves as the interface between Sync and stored data.
    330 *
    331 * The name "store" is slightly a misnomer because it doesn't actually "store"
    332 * anything. Instead, it serves as a gateway to something that actually does
    333 * the "storing."
    334 *
    335 * The store is responsible for record management inside an engine. It tells
    336 * Sync what items are available for Sync, converts items to and from Sync's
    337 * record format, and applies records from Sync into changes on the underlying
    338 * store.
    339 *
    340 * Store implementations require a number of functions to be implemented. These
    341 * are all documented below.
    342 *
    343 * For stores that deal with many records or which have expensive store access
    344 * routines, it is highly recommended to implement a custom applyIncomingBatch
    345 * and/or applyIncoming function on top of the basic APIs.
    346 */
    347 
    348 export function Store(name, engine) {
    349  if (!engine) {
    350    throw new Error("Store must be associated with an Engine instance.");
    351  }
    352 
    353  name = name || "Unnamed";
    354  this.name = name.toLowerCase();
    355  this.engine = engine;
    356 
    357  this._log = Log.repository.getLogger(`Sync.Engine.${name}.Store`);
    358 
    359  ChromeUtils.defineLazyGetter(this, "_timer", function () {
    360    return Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer);
    361  });
    362 }
    363 
    364 Store.prototype = {
    365  /**
    366   * Apply multiple incoming records against the store.
    367   *
    368   * This is called with a set of incoming records to process. The function
    369   * should look at each record, reconcile with the current local state, and
    370   * make the local changes required to bring its state in alignment with the
    371   * record.
    372   *
    373   * The default implementation simply iterates over all records and calls
    374   * applyIncoming(). Store implementations may overwrite this function
    375   * if desired.
    376   *
    377   * @param  records Array of records to apply
    378   * @param  a SyncedRecordsTelemetry obj that will keep track of failed reasons
    379   * @return Array of record IDs which did not apply cleanly
    380   */
    381  async applyIncomingBatch(records, countTelemetry) {
    382    let failed = [];
    383 
    384    await Async.yieldingForEach(records, async record => {
    385      try {
    386        await this.applyIncoming(record);
    387      } catch (ex) {
    388        if (ex.code == SyncEngine.prototype.eEngineAbortApplyIncoming) {
    389          // This kind of exception should have a 'cause' attribute, which is an
    390          // originating exception.
    391          // ex.cause will carry its stack with it when rethrown.
    392          throw ex.cause;
    393        }
    394        if (Async.isShutdownException(ex)) {
    395          throw ex;
    396        }
    397        this._log.warn("Failed to apply incoming record " + record.id, ex);
    398        failed.push(record.id);
    399        countTelemetry.addIncomingFailedReason(ex.message);
    400      }
    401    });
    402 
    403    return failed;
    404  },
    405 
    406  /**
    407   * Apply a single record against the store.
    408   *
    409   * This takes a single record and makes the local changes required so the
    410   * local state matches what's in the record.
    411   *
    412   * The default implementation calls one of remove(), create(), or update()
    413   * depending on the state obtained from the store itself. Store
    414   * implementations may overwrite this function if desired.
    415   *
    416   * @param record
    417   *        Record to apply
    418   */
    419  async applyIncoming(record) {
    420    if (record.deleted) {
    421      await this.remove(record);
    422    } else if (!(await this.itemExists(record.id))) {
    423      await this.create(record);
    424    } else {
    425      await this.update(record);
    426    }
    427  },
    428 
    429  // override these in derived objects
    430 
    431  /**
    432   * Create an item in the store from a record.
    433   *
    434   * This is called by the default implementation of applyIncoming(). If using
    435   * applyIncomingBatch(), this won't be called unless your store calls it.
    436   *
    437   * @param record
    438   *        The store record to create an item from
    439   */
    440  async create() {
    441    throw new Error("override create in a subclass");
    442  },
    443 
    444  /**
    445   * Remove an item in the store from a record.
    446   *
    447   * This is called by the default implementation of applyIncoming(). If using
    448   * applyIncomingBatch(), this won't be called unless your store calls it.
    449   *
    450   * @param record
    451   *        The store record to delete an item from
    452   */
    453  async remove() {
    454    throw new Error("override remove in a subclass");
    455  },
    456 
    457  /**
    458   * Update an item from a record.
    459   *
    460   * This is called by the default implementation of applyIncoming(). If using
    461   * applyIncomingBatch(), this won't be called unless your store calls it.
    462   *
    463   * @param record
    464   *        The record to use to update an item from
    465   */
    466  async update() {
    467    throw new Error("override update in a subclass");
    468  },
    469 
    470  /**
    471   * Determine whether a record with the specified ID exists.
    472   *
    473   * Takes a string record ID and returns a booleans saying whether the record
    474   * exists.
    475   *
    476   * @param  id
    477   *         string record ID
    478   * @return boolean indicating whether record exists locally
    479   */
    480  async itemExists() {
    481    throw new Error("override itemExists in a subclass");
    482  },
    483 
    484  /**
    485   * Create a record from the specified ID.
    486   *
    487   * If the ID is known, the record should be populated with metadata from
    488   * the store. If the ID is not known, the record should be created with the
    489   * delete field set to true.
    490   *
    491   * @param  id
    492   *         string record ID
    493   * @param  collection
    494   *         Collection to add record to. This is typically passed into the
    495   *         constructor for the newly-created record.
    496   * @return record type for this engine
    497   */
    498  async createRecord() {
    499    throw new Error("override createRecord in a subclass");
    500  },
    501 
    502  /**
    503   * Change the ID of a record.
    504   *
    505   * @param  oldID
    506   *         string old/current record ID
    507   * @param  newID
    508   *         string new record ID
    509   */
    510  async changeItemID() {
    511    throw new Error("override changeItemID in a subclass");
    512  },
    513 
    514  /**
    515   * Obtain the set of all known record IDs.
    516   *
    517   * @return Object with ID strings as keys and values of true. The values
    518   *         are ignored.
    519   */
    520  async getAllIDs() {
    521    throw new Error("override getAllIDs in a subclass");
    522  },
    523 
    524  /**
    525   * Wipe all data in the store.
    526   *
    527   * This function is called during remote wipes or when replacing local data
    528   * with remote data.
    529   *
    530   * This function should delete all local data that the store is managing. It
    531   * can be thought of as clearing out all state and restoring the "new
    532   * browser" state.
    533   */
    534  async wipe() {
    535    throw new Error("override wipe in a subclass");
    536  },
    537 };
    538 
    539 export function EngineManager(service) {
    540  this.service = service;
    541 
    542  this._engines = {};
    543 
    544  this._altEngineInfo = {};
    545 
    546  // This will be populated by Service on startup.
    547  this._declined = new Set();
    548  this._log = Log.repository.getLogger("Sync.EngineManager");
    549  this._log.manageLevelFromPref("services.sync.log.logger.service.engines");
    550  // define the default level for all engine logs here (although each engine
    551  // allows its level to be controlled via a specific, non-default pref)
    552  Log.repository
    553    .getLogger(`Sync.Engine`)
    554    .manageLevelFromPref("services.sync.log.logger.engine");
    555 }
    556 
    557 EngineManager.prototype = {
    558  get(name) {
    559    // Return an array of engines if we have an array of names
    560    if (Array.isArray(name)) {
    561      let engines = [];
    562      name.forEach(function (name) {
    563        let engine = this.get(name);
    564        if (engine) {
    565          engines.push(engine);
    566        }
    567      }, this);
    568      return engines;
    569    }
    570 
    571    return this._engines[name]; // Silently returns undefined for unknown names.
    572  },
    573 
    574  getAll() {
    575    let engines = [];
    576    for (let [, engine] of Object.entries(this._engines)) {
    577      engines.push(engine);
    578    }
    579    return engines;
    580  },
    581 
    582  /**
    583   * If a user has changed a pref that controls which variant of a sync engine
    584   * for a given collection we use, unregister the old engine and register the
    585   * new one.
    586   *
    587   * This is called by EngineSynchronizer before every sync.
    588   */
    589  async switchAlternatives() {
    590    for (let [name, info] of Object.entries(this._altEngineInfo)) {
    591      let prefValue = info.prefValue;
    592      if (prefValue === info.lastValue) {
    593        this._log.trace(
    594          `No change for engine ${name} (${info.pref} is still ${prefValue})`
    595        );
    596        continue;
    597      }
    598      // Unregister the old engine, register the new one.
    599      this._log.info(
    600        `Switching ${name} engine ("${info.pref}" went from ${info.lastValue} => ${prefValue})`
    601      );
    602      try {
    603        await this._removeAndFinalize(name);
    604      } catch (e) {
    605        this._log.warn(`Failed to remove previous ${name} engine...`, e);
    606      }
    607      let engineType = prefValue ? info.whenTrue : info.whenFalse;
    608      try {
    609        // If register throws, we'll try again next sync, but until then there
    610        // won't be an engine registered for this collection.
    611        await this.register(engineType);
    612        info.lastValue = prefValue;
    613        // Note: engineType.name is using Function.prototype.name.
    614        this._log.info(`Switched the ${name} engine to use ${engineType.name}`);
    615      } catch (e) {
    616        this._log.warn(
    617          `Switching the ${name} engine to use ${engineType.name} failed (couldn't register)`,
    618          e
    619        );
    620      }
    621    }
    622  },
    623 
    624  async registerAlternatives(name, pref, whenTrue, whenFalse) {
    625    let info = { name, pref, whenTrue, whenFalse };
    626 
    627    XPCOMUtils.defineLazyPreferenceGetter(info, "prefValue", pref, false);
    628 
    629    let chosen = info.prefValue ? info.whenTrue : info.whenFalse;
    630    info.lastValue = info.prefValue;
    631    this._altEngineInfo[name] = info;
    632 
    633    await this.register(chosen);
    634  },
    635 
    636  /**
    637   * N.B., does not pay attention to the declined list.
    638   */
    639  getEnabled() {
    640    return this.getAll()
    641      .filter(engine => engine.enabled)
    642      .sort((a, b) => a.syncPriority - b.syncPriority);
    643  },
    644 
    645  get enabledEngineNames() {
    646    return this.getEnabled().map(e => e.name);
    647  },
    648 
    649  persistDeclined() {
    650    Svc.PrefBranch.setStringPref(
    651      "declinedEngines",
    652      [...this._declined].join(",")
    653    );
    654  },
    655 
    656  /**
    657   * Returns an array.
    658   */
    659  getDeclined() {
    660    return [...this._declined];
    661  },
    662 
    663  setDeclined(engines) {
    664    this._declined = new Set(engines);
    665    this.persistDeclined();
    666  },
    667 
    668  isDeclined(engineName) {
    669    return this._declined.has(engineName);
    670  },
    671 
    672  /**
    673   * Accepts a Set or an array.
    674   */
    675  decline(engines) {
    676    for (let e of engines) {
    677      this._declined.add(e);
    678    }
    679    this.persistDeclined();
    680  },
    681 
    682  undecline(engines) {
    683    for (let e of engines) {
    684      this._declined.delete(e);
    685    }
    686    this.persistDeclined();
    687  },
    688 
    689  /**
    690   * Register an Engine to the service. Alternatively, give an array of engine
    691   * objects to register.
    692   *
    693   * @param engineObject
    694   *        Engine object used to get an instance of the engine
    695   * @return The engine object if anything failed
    696   */
    697  async register(engineObject) {
    698    if (Array.isArray(engineObject)) {
    699      for (const e of engineObject) {
    700        await this.register(e);
    701      }
    702      return;
    703    }
    704 
    705    try {
    706      let engine = new engineObject(this.service);
    707      let name = engine.name;
    708      if (name in this._engines) {
    709        this._log.error("Engine '" + name + "' is already registered!");
    710      } else {
    711        if (engine.initialize) {
    712          await engine.initialize();
    713        }
    714        this._engines[name] = engine;
    715      }
    716    } catch (ex) {
    717      let name = engineObject || "";
    718      name = name.prototype || "";
    719      name = name.name || "";
    720 
    721      this._log.error(`Could not initialize engine ${name}`, ex);
    722    }
    723  },
    724 
    725  async unregister(val) {
    726    let name = val;
    727    if (val instanceof SyncEngine) {
    728      name = val.name;
    729    }
    730    await this._removeAndFinalize(name);
    731    delete this._altEngineInfo[name];
    732  },
    733 
    734  // Common code for disabling an engine by name, that doesn't complain if the
    735  // engine doesn't exist. Doesn't touch the engine's alternative info (if any
    736  // exists).
    737  async _removeAndFinalize(name) {
    738    if (name in this._engines) {
    739      let engine = this._engines[name];
    740      delete this._engines[name];
    741      await engine.finalize();
    742    }
    743  },
    744 
    745  async clear() {
    746    for (let name in this._engines) {
    747      let engine = this._engines[name];
    748      delete this._engines[name];
    749      await engine.finalize();
    750    }
    751    this._altEngineInfo = {};
    752  },
    753 };
    754 
    755 export function SyncEngine(name, service) {
    756  if (!service) {
    757    throw new Error("SyncEngine must be associated with a Service instance.");
    758  }
    759 
    760  this.Name = name || "Unnamed";
    761  this.name = name.toLowerCase();
    762  this.service = service;
    763 
    764  this._notify = Utils.notify("weave:engine:");
    765  this._log = Log.repository.getLogger("Sync.Engine." + this.Name);
    766  this._log.manageLevelFromPref(`services.sync.log.logger.engine.${this.name}`);
    767 
    768  this._modified = this.emptyChangeset();
    769  this._tracker; // initialize tracker to load previously changed IDs
    770  this._log.debug("Engine constructed");
    771 
    772  this._toFetchStorage = new JSONFile({
    773    path: Utils.jsonFilePath("toFetch", this.name),
    774    dataPostProcessor: json => this._metadataPostProcessor(json),
    775    beforeSave: () => this._beforeSaveMetadata(),
    776  });
    777 
    778  this._previousFailedStorage = new JSONFile({
    779    path: Utils.jsonFilePath("failed", this.name),
    780    dataPostProcessor: json => this._metadataPostProcessor(json),
    781    beforeSave: () => this._beforeSaveMetadata(),
    782  });
    783 
    784  XPCOMUtils.defineLazyPreferenceGetter(
    785    this,
    786    "_enabled",
    787    `services.sync.engine.${this.prefName}`,
    788    false
    789  );
    790  XPCOMUtils.defineLazyPreferenceGetter(
    791    this,
    792    "_syncID",
    793    `services.sync.${this.name}.syncID`,
    794    ""
    795  );
    796  XPCOMUtils.defineLazyPreferenceGetter(
    797    this,
    798    "_lastSync",
    799    `services.sync.${this.name}.lastSync`,
    800    "0",
    801    null,
    802    v => parseFloat(v)
    803  );
    804  // Async initializations can be made in the initialize() method.
    805 
    806  this.asyncObserver = Async.asyncObserver(this, this._log);
    807 }
    808 
    809 // Enumeration to define approaches to handling bad records.
    810 // Attached to the constructor to allow use as a kind of static enumeration.
    811 SyncEngine.kRecoveryStrategy = {
    812  ignore: "ignore",
    813  retry: "retry",
    814  error: "error",
    815 };
    816 
    817 SyncEngine.prototype = {
    818  _recordObj: CryptoWrapper,
    819  // _storeObj, and _trackerObj should to be overridden in subclasses
    820  _storeObj: Store,
    821  _trackerObj: Tracker,
    822  version: 1,
    823 
    824  // Local 'constant'.
    825  // Signal to the engine that processing further records is pointless.
    826  eEngineAbortApplyIncoming: "error.engine.abort.applyincoming",
    827 
    828  // Should we keep syncing if we find a record that cannot be uploaded (ever)?
    829  // If this is false, we'll throw, otherwise, we'll ignore the record and
    830  // continue. This currently can only happen due to the record being larger
    831  // than the record upload limit.
    832  allowSkippedRecord: true,
    833 
    834  // Which sortindex to use when retrieving records for this engine.
    835  _defaultSort: undefined,
    836 
    837  _hasSyncedThisSession: false,
    838 
    839  _metadataPostProcessor(json) {
    840    if (Array.isArray(json)) {
    841      // Pre-`JSONFile` storage stored an array, but `JSONFile` defaults to
    842      // an object, so we wrap the array for consistency.
    843      json = { ids: json };
    844    }
    845    if (!json.ids) {
    846      json.ids = [];
    847    }
    848    // The set serializes the same way as an array, but offers more efficient
    849    // methods of manipulation.
    850    json.ids = new SerializableSet(json.ids);
    851    return json;
    852  },
    853 
    854  async _beforeSaveMetadata() {
    855    await ensureDirectory(this._toFetchStorage.path);
    856    await ensureDirectory(this._previousFailedStorage.path);
    857  },
    858 
    859  // A relative priority to use when computing an order
    860  // for engines to be synced. Higher-priority engines
    861  // (lower numbers) are synced first.
    862  // It is recommended that a unique value be used for each engine,
    863  // in order to guarantee a stable sequence.
    864  syncPriority: 0,
    865 
    866  // How many records to pull in a single sync. This is primarily to avoid very
    867  // long first syncs against profiles with many history records.
    868  downloadLimit: null,
    869 
    870  // How many records to pull at one time when specifying IDs. This is to avoid
    871  // URI length limitations.
    872  guidFetchBatchSize: DEFAULT_GUID_FETCH_BATCH_SIZE,
    873 
    874  downloadBatchSize: DEFAULT_DOWNLOAD_BATCH_SIZE,
    875 
    876  async initialize() {
    877    await this._toFetchStorage.load();
    878    await this._previousFailedStorage.load();
    879    Services.prefs.addObserver(
    880      `${PREFS_BRANCH}engine.${this.prefName}`,
    881      this.asyncObserver,
    882      true
    883    );
    884    this._log.debug("SyncEngine initialized", this.name);
    885  },
    886 
    887  get prefName() {
    888    return this.name;
    889  },
    890 
    891  get enabled() {
    892    return this._enabled;
    893  },
    894 
    895  set enabled(val) {
    896    if (!!val != this._enabled) {
    897      Svc.PrefBranch.setBoolPref("engine." + this.prefName, !!val);
    898    }
    899  },
    900 
    901  get score() {
    902    return this._tracker.score;
    903  },
    904 
    905  get _store() {
    906    let store = new this._storeObj(this.Name, this);
    907    this.__defineGetter__("_store", () => store);
    908    return store;
    909  },
    910 
    911  get _tracker() {
    912    let tracker = new this._trackerObj(this.Name, this);
    913    this.__defineGetter__("_tracker", () => tracker);
    914    return tracker;
    915  },
    916 
    917  get storageURL() {
    918    return this.service.storageURL;
    919  },
    920 
    921  get engineURL() {
    922    return this.storageURL + this.name;
    923  },
    924 
    925  get cryptoKeysURL() {
    926    return this.storageURL + "crypto/keys";
    927  },
    928 
    929  get metaURL() {
    930    return this.storageURL + "meta/global";
    931  },
    932 
    933  startTracking() {
    934    this._tracker.start();
    935  },
    936 
    937  // Returns a promise
    938  stopTracking() {
    939    return this._tracker.stop();
    940  },
    941 
    942  // Listens for engine enabled state changes, and updates the tracker's state.
    943  // This is an async observer because the tracker waits on all its async
    944  // observers to finish when it's stopped.
    945  async observe(subject, topic, data) {
    946    if (
    947      topic == "nsPref:changed" &&
    948      data == `services.sync.engine.${this.prefName}`
    949    ) {
    950      await this._tracker.onEngineEnabledChanged(this._enabled);
    951    }
    952  },
    953 
    954  async sync() {
    955    if (!this.enabled) {
    956      return false;
    957    }
    958 
    959    if (!this._sync) {
    960      throw new Error("engine does not implement _sync method");
    961    }
    962 
    963    return this._notify("sync", this.name, this._sync)();
    964  },
    965 
    966  // Override this method to return a new changeset type.
    967  emptyChangeset() {
    968    return new Changeset();
    969  },
    970 
    971  /**
    972   * Returns the local sync ID for this engine, or `""` if the engine hasn't
    973   * synced for the first time. This is exposed for tests.
    974   *
    975   * @return the current sync ID.
    976   */
    977  async getSyncID() {
    978    return this._syncID;
    979  },
    980 
    981  /**
    982   * Ensures that the local sync ID for the engine matches the sync ID for the
    983   * collection on the server. A mismatch indicates that another client wiped
    984   * the collection; we're syncing after a node reassignment, and another
    985   * client synced before us; or the store was replaced since the last sync.
    986   * In case of a mismatch, we need to reset all local Sync state and start
    987   * over as a first sync.
    988   *
    989   * In most cases, this method should return the new sync ID as-is. However, an
    990   * engine may ignore the given ID and assign a different one, if it determines
    991   * that the sync ID on the server is out of date. The bookmarks engine uses
    992   * this to wipe the server and other clients on the first sync after the user
    993   * restores from a backup.
    994   *
    995   * @param  newSyncID
    996   *         The new sync ID for the collection from `meta/global`.
    997   * @return The assigned sync ID. If this doesn't match `newSyncID`, we'll
    998   *         replace the sync ID in `meta/global` with the assigned ID.
    999   */
   1000  async ensureCurrentSyncID(newSyncID) {
   1001    let existingSyncID = this._syncID;
   1002    if (existingSyncID == newSyncID) {
   1003      return existingSyncID;
   1004    }
   1005    this._log.debug(
   1006      `Engine syncIDs differ (old="${existingSyncID}", new="${newSyncID}") - resetting the engine`
   1007    );
   1008    await this.resetClient();
   1009    Svc.PrefBranch.setStringPref(this.name + ".syncID", newSyncID);
   1010    Svc.PrefBranch.setStringPref(this.name + ".lastSync", "0");
   1011    return newSyncID;
   1012  },
   1013 
   1014  /**
   1015   * Resets the local sync ID for the engine, wipes the server, and resets all
   1016   * local Sync state to start over as a first sync.
   1017   *
   1018   * @return the new sync ID.
   1019   */
   1020  async resetSyncID() {
   1021    let newSyncID = await this.resetLocalSyncID();
   1022    await this.wipeServer();
   1023    return newSyncID;
   1024  },
   1025 
   1026  /**
   1027   * Resets the local sync ID for the engine, signaling that we're starting over
   1028   * as a first sync.
   1029   *
   1030   * @return the new sync ID.
   1031   */
   1032  async resetLocalSyncID() {
   1033    return this.ensureCurrentSyncID(Utils.makeGUID());
   1034  },
   1035 
   1036  /**
   1037   * Allows overriding scheduler logic -- added to help reduce kinto server
   1038   * getting hammered because our scheduler never got tuned for it.
   1039   *
   1040   * Note: Overriding engines must take resyncs into account -- score will not
   1041   * be cleared.
   1042   */
   1043  shouldSkipSync() {
   1044    return false;
   1045  },
   1046 
   1047  /*
   1048   * lastSync is a timestamp in server time.
   1049   */
   1050  async getLastSync() {
   1051    return this._lastSync;
   1052  },
   1053  async setLastSync(lastSync) {
   1054    // Store the value as a string to keep floating point precision
   1055    Svc.PrefBranch.setStringPref(this.name + ".lastSync", lastSync.toString());
   1056  },
   1057  async resetLastSync() {
   1058    this._log.debug("Resetting " + this.name + " last sync time");
   1059    await this.setLastSync(0);
   1060  },
   1061 
   1062  get hasSyncedThisSession() {
   1063    return this._hasSyncedThisSession;
   1064  },
   1065 
   1066  set hasSyncedThisSession(hasSynced) {
   1067    this._hasSyncedThisSession = hasSynced;
   1068  },
   1069 
   1070  get toFetch() {
   1071    this._toFetchStorage.ensureDataReady();
   1072    return this._toFetchStorage.data.ids;
   1073  },
   1074 
   1075  set toFetch(ids) {
   1076    if (ids.constructor.name != "SerializableSet") {
   1077      throw new Error(
   1078        "Bug: Attempted to set toFetch to something that isn't a SerializableSet"
   1079      );
   1080    }
   1081    this._toFetchStorage.data = { ids };
   1082    this._toFetchStorage.saveSoon();
   1083  },
   1084 
   1085  get previousFailed() {
   1086    this._previousFailedStorage.ensureDataReady();
   1087    return this._previousFailedStorage.data.ids;
   1088  },
   1089 
   1090  set previousFailed(ids) {
   1091    if (ids.constructor.name != "SerializableSet") {
   1092      throw new Error(
   1093        "Bug: Attempted to set previousFailed to something that isn't a SerializableSet"
   1094      );
   1095    }
   1096    this._previousFailedStorage.data = { ids };
   1097    this._previousFailedStorage.saveSoon();
   1098  },
   1099 
   1100  /*
   1101   * Returns a changeset for this sync. Engine implementations can override this
   1102   * method to bypass the tracker for certain or all changed items.
   1103   */
   1104  async getChangedIDs() {
   1105    return this._tracker.getChangedIDs();
   1106  },
   1107 
   1108  // Create a new record using the store and add in metadata.
   1109  async _createRecord(id) {
   1110    let record = await this._store.createRecord(id, this.name);
   1111    record.id = id;
   1112    record.collection = this.name;
   1113    return record;
   1114  },
   1115 
   1116  // Creates a tombstone Sync record with additional metadata.
   1117  _createTombstone(id) {
   1118    let tombstone = new this._recordObj(this.name, id);
   1119    tombstone.id = id;
   1120    tombstone.collection = this.name;
   1121    tombstone.deleted = true;
   1122    return tombstone;
   1123  },
   1124 
   1125  // Any setup that needs to happen at the beginning of each sync.
   1126  async _syncStartup() {
   1127    // Determine if we need to wipe on outdated versions
   1128    let metaGlobal = await this.service.recordManager.get(this.metaURL);
   1129    let engines = metaGlobal.payload.engines || {};
   1130    let engineData = engines[this.name] || {};
   1131 
   1132    // Assume missing versions are 0 and wipe the server
   1133    if ((engineData.version || 0) < this.version) {
   1134      this._log.debug("Old engine data: " + [engineData.version, this.version]);
   1135 
   1136      // Clear the server and reupload everything on bad version or missing
   1137      // meta. Note that we don't regenerate per-collection keys here.
   1138      let newSyncID = await this.resetSyncID();
   1139 
   1140      // Set the newer version and newly generated syncID
   1141      engineData.version = this.version;
   1142      engineData.syncID = newSyncID;
   1143 
   1144      // Put the new data back into meta/global and mark for upload
   1145      engines[this.name] = engineData;
   1146      metaGlobal.payload.engines = engines;
   1147      metaGlobal.changed = true;
   1148    } else if (engineData.version > this.version) {
   1149      // Don't sync this engine if the server has newer data
   1150 
   1151      let error = new Error("New data: " + [engineData.version, this.version]);
   1152      error.failureCode = VERSION_OUT_OF_DATE;
   1153      throw error;
   1154    } else {
   1155      // Changes to syncID mean we'll need to upload everything
   1156      let assignedSyncID = await this.ensureCurrentSyncID(engineData.syncID);
   1157      if (assignedSyncID != engineData.syncID) {
   1158        engineData.syncID = assignedSyncID;
   1159        metaGlobal.changed = true;
   1160      }
   1161    }
   1162 
   1163    // Save objects that need to be uploaded in this._modified. As we
   1164    // successfully upload objects we remove them from this._modified. If an
   1165    // error occurs or any objects fail to upload, they will remain in
   1166    // this._modified. At the end of a sync, or after an error, we add all
   1167    // objects remaining in this._modified to the tracker.
   1168    let initialChanges = await this.pullChanges();
   1169    this._modified.replace(initialChanges);
   1170    // Clear the tracker now. If the sync fails we'll add the ones we failed
   1171    // to upload back.
   1172    this._tracker.clearChangedIDs();
   1173    this._tracker.resetScore();
   1174 
   1175    // Keep track of what to delete at the end of sync
   1176    this._delete = {};
   1177  },
   1178 
   1179  async pullChanges() {
   1180    let lastSync = await this.getLastSync();
   1181    if (lastSync) {
   1182      return this.pullNewChanges();
   1183    }
   1184    this._log.debug("First sync, uploading all items");
   1185    return this.pullAllChanges();
   1186  },
   1187 
   1188  /**
   1189   * A tiny abstraction to make it easier to test incoming record
   1190   * application.
   1191   */
   1192  itemSource() {
   1193    return new Collection(this.engineURL, this._recordObj, this.service);
   1194  },
   1195 
   1196  /**
   1197   * Download and apply remote records changed since the last sync. This
   1198   * happens in three stages.
   1199   *
   1200   * In the first stage, we fetch full records for all changed items, newest
   1201   * first, up to the download limit. The limit lets us make progress for large
   1202   * collections, where the sync is likely to be interrupted before we
   1203   * can fetch everything.
   1204   *
   1205   * In the second stage, we fetch the IDs of any remaining records changed
   1206   * since the last sync, add them to our backlog, and fast-forward our last
   1207   * sync time.
   1208   *
   1209   * In the third stage, we fetch and apply records for all backlogged IDs,
   1210   * as well as any records that failed to apply during the last sync. We
   1211   * request records for the IDs in chunks, to avoid exceeding URL length
   1212   * limits, then remove successfully applied records from the backlog, and
   1213   * record IDs of any records that failed to apply to retry on the next sync.
   1214   */
   1215  async _processIncoming() {
   1216    this._log.trace("Downloading & applying server changes");
   1217 
   1218    let newitems = this.itemSource();
   1219    let lastSync = await this.getLastSync();
   1220 
   1221    newitems.newer = lastSync;
   1222    newitems.full = true;
   1223 
   1224    let downloadLimit = Infinity;
   1225    if (this.downloadLimit) {
   1226      // Fetch new records up to the download limit. Currently, only the history
   1227      // engine sets a limit, since the history collection has the highest volume
   1228      // of changed records between syncs. The other engines fetch all records
   1229      // changed since the last sync.
   1230      if (this._defaultSort) {
   1231        // A download limit with a sort order doesn't make sense: we won't know
   1232        // which records to backfill.
   1233        throw new Error("Can't specify download limit with default sort order");
   1234      }
   1235      newitems.sort = "newest";
   1236      downloadLimit = newitems.limit = this.downloadLimit;
   1237    } else if (this._defaultSort) {
   1238      // The bookmarks engine fetches records by sort index; other engines leave
   1239      // the order unspecified. We can remove `_defaultSort` entirely after bug
   1240      // 1305563: the sort index won't matter because we'll buffer all bookmarks
   1241      // before applying.
   1242      newitems.sort = this._defaultSort;
   1243    }
   1244 
   1245    // applied    => number of items that should be applied.
   1246    // failed     => number of items that failed in this sync.
   1247    // newFailed  => number of items that failed for the first time in this sync.
   1248    // reconciled => number of items that were reconciled.
   1249    // failedReasons => {name, count} of reasons a record failed
   1250    let countTelemetry = new SyncedRecordsTelemetry();
   1251    let count = countTelemetry.incomingCounts;
   1252    let recordsToApply = [];
   1253    let failedInCurrentSync = new SerializableSet();
   1254 
   1255    let oldestModified = this.lastModified;
   1256    let downloadedIDs = new Set();
   1257 
   1258    // Stage 1: Fetch new records from the server, up to the download limit.
   1259    if (this.lastModified == null || this.lastModified > lastSync) {
   1260      let { response, records } = await newitems.getBatched(
   1261        this.downloadBatchSize
   1262      );
   1263      if (!response.success) {
   1264        response.failureCode = ENGINE_DOWNLOAD_FAIL;
   1265        throw response;
   1266      }
   1267 
   1268      await Async.yieldingForEach(records, async record => {
   1269        downloadedIDs.add(record.id);
   1270 
   1271        if (record.modified < oldestModified) {
   1272          oldestModified = record.modified;
   1273        }
   1274 
   1275        let { shouldApply, error } = await this._maybeReconcile(record);
   1276        if (error) {
   1277          failedInCurrentSync.add(record.id);
   1278          count.failed++;
   1279          countTelemetry.addIncomingFailedReason(error.message);
   1280          return;
   1281        }
   1282        if (!shouldApply) {
   1283          count.reconciled++;
   1284          return;
   1285        }
   1286        recordsToApply.push(record);
   1287      });
   1288 
   1289      let failedToApply = await this._applyRecords(
   1290        recordsToApply,
   1291        countTelemetry
   1292      );
   1293      Utils.setAddAll(failedInCurrentSync, failedToApply);
   1294 
   1295      // `applied` is a bit of a misnomer: it counts records that *should* be
   1296      // applied, so it also includes records that we tried to apply and failed.
   1297      // `recordsToApply.length - failedToApply.length` is the number of records
   1298      // that we *successfully* applied.
   1299      count.failed += failedToApply.length;
   1300      count.applied += recordsToApply.length;
   1301    }
   1302 
   1303    // Stage 2: If we reached our download limit, we might still have records
   1304    // on the server that changed since the last sync. Fetch the IDs for the
   1305    // remaining records, and add them to the backlog. Note that this stage
   1306    // only runs for engines that set a download limit.
   1307    if (downloadedIDs.size == downloadLimit) {
   1308      let guidColl = this.itemSource();
   1309 
   1310      guidColl.newer = lastSync;
   1311      guidColl.older = oldestModified;
   1312      guidColl.sort = "oldest";
   1313 
   1314      let guids = await guidColl.get();
   1315      if (!guids.success) {
   1316        throw guids;
   1317      }
   1318 
   1319      // Filtering out already downloaded IDs here isn't necessary. We only do
   1320      // that in case the Sync server doesn't support `older` (bug 1316110).
   1321      let remainingIDs = guids.obj.filter(id => !downloadedIDs.has(id));
   1322      if (remainingIDs.length) {
   1323        this.toFetch = Utils.setAddAll(this.toFetch, remainingIDs);
   1324      }
   1325    }
   1326 
   1327    // Fast-foward the lastSync timestamp since we have backlogged the
   1328    // remaining items.
   1329    if (lastSync < this.lastModified) {
   1330      lastSync = this.lastModified;
   1331      await this.setLastSync(lastSync);
   1332    }
   1333 
   1334    // Stage 3: Backfill records from the backlog, and those that failed to
   1335    // decrypt or apply during the last sync. We only backfill up to the
   1336    // download limit, to prevent a large backlog for one engine from blocking
   1337    // the others. We'll keep processing the backlog on subsequent engine syncs.
   1338    let failedInPreviousSync = this.previousFailed;
   1339    let idsToBackfill = Array.from(
   1340      Utils.setAddAll(
   1341        Utils.subsetOfSize(this.toFetch, downloadLimit),
   1342        failedInPreviousSync
   1343      )
   1344    );
   1345 
   1346    // Note that we intentionally overwrite the previously failed list here.
   1347    // Records that fail to decrypt or apply in two consecutive syncs are likely
   1348    // corrupt; we remove them from the list because retrying and failing on
   1349    // every subsequent sync just adds noise.
   1350    this.previousFailed = failedInCurrentSync;
   1351 
   1352    let backfilledItems = this.itemSource();
   1353 
   1354    backfilledItems.sort = "newest";
   1355    backfilledItems.full = true;
   1356 
   1357    // `getBatched` includes the list of IDs as a query parameter, so we need to fetch
   1358    // records in chunks to avoid exceeding URI length limits.
   1359    if (this.guidFetchBatchSize) {
   1360      for (let ids of lazy.PlacesUtils.chunkArray(
   1361        idsToBackfill,
   1362        this.guidFetchBatchSize
   1363      )) {
   1364        backfilledItems.ids = ids;
   1365 
   1366        let { response, records } = await backfilledItems.getBatched(
   1367          this.downloadBatchSize
   1368        );
   1369        if (!response.success) {
   1370          response.failureCode = ENGINE_DOWNLOAD_FAIL;
   1371          throw response;
   1372        }
   1373 
   1374        let backfilledRecordsToApply = [];
   1375        let failedInBackfill = [];
   1376 
   1377        await Async.yieldingForEach(records, async record => {
   1378          let { shouldApply, error } = await this._maybeReconcile(record);
   1379          if (error) {
   1380            failedInBackfill.push(record.id);
   1381            count.failed++;
   1382            countTelemetry.addIncomingFailedReason(error.message);
   1383            return;
   1384          }
   1385          if (!shouldApply) {
   1386            count.reconciled++;
   1387            return;
   1388          }
   1389          backfilledRecordsToApply.push(record);
   1390        });
   1391 
   1392        let failedToApply = await this._applyRecords(
   1393          backfilledRecordsToApply,
   1394          countTelemetry
   1395        );
   1396        failedInBackfill.push(...failedToApply);
   1397 
   1398        count.failed += failedToApply.length;
   1399        count.applied += backfilledRecordsToApply.length;
   1400 
   1401        this.toFetch = Utils.setDeleteAll(this.toFetch, ids);
   1402        this.previousFailed = Utils.setAddAll(
   1403          this.previousFailed,
   1404          failedInBackfill
   1405        );
   1406 
   1407        if (lastSync < this.lastModified) {
   1408          lastSync = this.lastModified;
   1409          await this.setLastSync(lastSync);
   1410        }
   1411      }
   1412    }
   1413 
   1414    count.newFailed = 0;
   1415    for (let item of this.previousFailed) {
   1416      // Anything that failed in the current sync that also failed in
   1417      // the previous sync means there is likely something wrong with
   1418      // the record, we remove it from trying again to prevent
   1419      // infinitely syncing corrupted records
   1420      if (failedInPreviousSync.has(item)) {
   1421        this.previousFailed.delete(item);
   1422      } else {
   1423        // otherwise it's a new failed and we count it as so
   1424        ++count.newFailed;
   1425      }
   1426    }
   1427 
   1428    count.succeeded = Math.max(0, count.applied - count.failed);
   1429    this._log.info(
   1430      [
   1431        "Records:",
   1432        count.applied,
   1433        "applied,",
   1434        count.succeeded,
   1435        "successfully,",
   1436        count.failed,
   1437        "failed to apply,",
   1438        count.newFailed,
   1439        "newly failed to apply,",
   1440        count.reconciled,
   1441        "reconciled.",
   1442      ].join(" ")
   1443    );
   1444    Observers.notify("weave:engine:sync:applied", count, this.name);
   1445  },
   1446 
   1447  async _maybeReconcile(item) {
   1448    let key = this.service.collectionKeys.keyForCollection(this.name);
   1449 
   1450    // Grab a later last modified if possible
   1451    if (this.lastModified == null || item.modified > this.lastModified) {
   1452      this.lastModified = item.modified;
   1453    }
   1454 
   1455    try {
   1456      try {
   1457        await item.decrypt(key);
   1458      } catch (ex) {
   1459        if (!Utils.isHMACMismatch(ex)) {
   1460          throw ex;
   1461        }
   1462        let strategy = await this.handleHMACMismatch(item, true);
   1463        if (strategy == SyncEngine.kRecoveryStrategy.retry) {
   1464          // You only get one retry.
   1465          try {
   1466            // Try decrypting again, typically because we've got new keys.
   1467            this._log.info("Trying decrypt again...");
   1468            key = this.service.collectionKeys.keyForCollection(this.name);
   1469            await item.decrypt(key);
   1470            strategy = null;
   1471          } catch (ex) {
   1472            if (!Utils.isHMACMismatch(ex)) {
   1473              throw ex;
   1474            }
   1475            strategy = await this.handleHMACMismatch(item, false);
   1476          }
   1477        }
   1478 
   1479        switch (strategy) {
   1480          case null:
   1481            // Retry succeeded! No further handling.
   1482            break;
   1483          case SyncEngine.kRecoveryStrategy.retry:
   1484            this._log.debug("Ignoring second retry suggestion.");
   1485          // Fall through to error case.
   1486          case SyncEngine.kRecoveryStrategy.error:
   1487            this._log.warn("Error decrypting record", ex);
   1488            return { shouldApply: false, error: ex };
   1489          case SyncEngine.kRecoveryStrategy.ignore:
   1490            this._log.debug(
   1491              "Ignoring record " + item.id + " with bad HMAC: already handled."
   1492            );
   1493            return { shouldApply: false, error: null };
   1494        }
   1495      }
   1496    } catch (ex) {
   1497      if (Async.isShutdownException(ex)) {
   1498        throw ex;
   1499      }
   1500      this._log.warn("Error decrypting record", ex);
   1501      return { shouldApply: false, error: ex };
   1502    }
   1503 
   1504    if (this._shouldDeleteRemotely(item)) {
   1505      this._log.trace("Deleting item from server without applying", item);
   1506      await this._deleteId(item.id);
   1507      return { shouldApply: false, error: null };
   1508    }
   1509 
   1510    let shouldApply;
   1511    try {
   1512      shouldApply = await this._reconcile(item);
   1513    } catch (ex) {
   1514      if (ex.code == SyncEngine.prototype.eEngineAbortApplyIncoming) {
   1515        this._log.warn("Reconciliation failed: aborting incoming processing.");
   1516        throw ex.cause;
   1517      } else if (!Async.isShutdownException(ex)) {
   1518        this._log.warn("Failed to reconcile incoming record " + item.id, ex);
   1519        return { shouldApply: false, error: ex };
   1520      } else {
   1521        throw ex;
   1522      }
   1523    }
   1524 
   1525    if (!shouldApply) {
   1526      this._log.trace("Skipping reconciled incoming item " + item.id);
   1527    }
   1528 
   1529    return { shouldApply, error: null };
   1530  },
   1531 
   1532  async _applyRecords(records, countTelemetry) {
   1533    this._tracker.ignoreAll = true;
   1534    try {
   1535      let failedIDs = await this._store.applyIncomingBatch(
   1536        records,
   1537        countTelemetry
   1538      );
   1539      return failedIDs;
   1540    } catch (ex) {
   1541      // Catch any error that escapes from applyIncomingBatch. At present
   1542      // those will all be abort events.
   1543      this._log.warn("Got exception, aborting processIncoming", ex);
   1544      throw ex;
   1545    } finally {
   1546      this._tracker.ignoreAll = false;
   1547    }
   1548  },
   1549 
   1550  // Indicates whether an incoming item should be deleted from the server at
   1551  // the end of the sync. Engines can override this method to clean up records
   1552  // that shouldn't be on the server.
   1553  _shouldDeleteRemotely() {
   1554    return false;
   1555  },
   1556 
   1557  /**
   1558   * Find a GUID of an item that is a duplicate of the incoming item but happens
   1559   * to have a different GUID
   1560   *
   1561   * @return GUID of the similar item; falsy otherwise
   1562   */
   1563  async _findDupe() {
   1564    // By default, assume there's no dupe items for the engine
   1565  },
   1566 
   1567  /**
   1568   * Called before a remote record is discarded due to failed reconciliation.
   1569   * Used by bookmark sync to merge folder child orders.
   1570   */
   1571  beforeRecordDiscard() {},
   1572 
   1573  // Called when the server has a record marked as deleted, but locally we've
   1574  // changed it more recently than the deletion. If we return false, the
   1575  // record will be deleted locally. If we return true, we'll reupload the
   1576  // record to the server -- any extra work that's needed as part of this
   1577  // process should be done at this point (such as mark the record's parent
   1578  // for reuploading in the case of bookmarks).
   1579  async _shouldReviveRemotelyDeletedRecord() {
   1580    return true;
   1581  },
   1582 
   1583  async _deleteId(id) {
   1584    await this._tracker.removeChangedID(id);
   1585    this._noteDeletedId(id);
   1586  },
   1587 
   1588  // Marks an ID for deletion at the end of the sync.
   1589  _noteDeletedId(id) {
   1590    if (this._delete.ids == null) {
   1591      this._delete.ids = [id];
   1592    } else {
   1593      this._delete.ids.push(id);
   1594    }
   1595  },
   1596 
   1597  async _switchItemToDupe(localDupeGUID, incomingItem) {
   1598    // The local, duplicate ID is always deleted on the server.
   1599    await this._deleteId(localDupeGUID);
   1600 
   1601    // We unconditionally change the item's ID in case the engine knows of
   1602    // an item but doesn't expose it through itemExists. If the API
   1603    // contract were stronger, this could be changed.
   1604    this._log.debug(
   1605      "Switching local ID to incoming: " +
   1606        localDupeGUID +
   1607        " -> " +
   1608        incomingItem.id
   1609    );
   1610    return this._store.changeItemID(localDupeGUID, incomingItem.id);
   1611  },
   1612 
   1613  /**
   1614   * Reconcile incoming record with local state.
   1615   *
   1616   * This function essentially determines whether to apply an incoming record.
   1617   *
   1618   * @param  item
   1619   *         Record from server to be tested for application.
   1620   * @return boolean
   1621   *         Truthy if incoming record should be applied. False if not.
   1622   */
   1623  async _reconcile(item) {
   1624    if (this._log.level <= Log.Level.Trace) {
   1625      this._log.trace("Incoming: " + item);
   1626    }
   1627 
   1628    // We start reconciling by collecting a bunch of state. We do this here
   1629    // because some state may change during the course of this function and we
   1630    // need to operate on the original values.
   1631    let existsLocally = await this._store.itemExists(item.id);
   1632    let locallyModified = this._modified.has(item.id);
   1633 
   1634    // TODO Handle clock drift better. Tracked in bug 721181.
   1635    let remoteAge = Resource.serverTime - item.modified;
   1636    let localAge = locallyModified
   1637      ? Date.now() / 1000 - this._modified.getModifiedTimestamp(item.id)
   1638      : null;
   1639    let remoteIsNewer = remoteAge < localAge;
   1640 
   1641    this._log.trace(
   1642      "Reconciling " +
   1643        item.id +
   1644        ". exists=" +
   1645        existsLocally +
   1646        "; modified=" +
   1647        locallyModified +
   1648        "; local age=" +
   1649        localAge +
   1650        "; incoming age=" +
   1651        remoteAge
   1652    );
   1653 
   1654    // We handle deletions first so subsequent logic doesn't have to check
   1655    // deleted flags.
   1656    if (item.deleted) {
   1657      // If the item doesn't exist locally, there is nothing for us to do. We
   1658      // can't check for duplicates because the incoming record has no data
   1659      // which can be used for duplicate detection.
   1660      if (!existsLocally) {
   1661        this._log.trace(
   1662          "Ignoring incoming item because it was deleted and " +
   1663            "the item does not exist locally."
   1664        );
   1665        return false;
   1666      }
   1667 
   1668      // We decide whether to process the deletion by comparing the record
   1669      // ages. If the item is not modified locally, the remote side wins and
   1670      // the deletion is processed. If it is modified locally, we take the
   1671      // newer record.
   1672      if (!locallyModified) {
   1673        this._log.trace(
   1674          "Applying incoming delete because the local item " +
   1675            "exists and isn't modified."
   1676        );
   1677        return true;
   1678      }
   1679      this._log.trace("Incoming record is deleted but we had local changes.");
   1680 
   1681      if (remoteIsNewer) {
   1682        this._log.trace("Remote record is newer -- deleting local record.");
   1683        return true;
   1684      }
   1685      // If the local record is newer, we defer to individual engines for
   1686      // how to handle this. By default, we revive the record.
   1687      let willRevive = await this._shouldReviveRemotelyDeletedRecord(item);
   1688      this._log.trace("Local record is newer -- reviving? " + willRevive);
   1689 
   1690      return !willRevive;
   1691    }
   1692 
   1693    // At this point the incoming record is not for a deletion and must have
   1694    // data. If the incoming record does not exist locally, we check for a local
   1695    // duplicate existing under a different ID. The default implementation of
   1696    // _findDupe() is empty, so engines have to opt in to this functionality.
   1697    //
   1698    // If we find a duplicate, we change the local ID to the incoming ID and we
   1699    // refresh the metadata collected above. See bug 710448 for the history
   1700    // of this logic.
   1701    if (!existsLocally) {
   1702      let localDupeGUID = await this._findDupe(item);
   1703      if (localDupeGUID) {
   1704        this._log.trace(
   1705          "Local item " +
   1706            localDupeGUID +
   1707            " is a duplicate for " +
   1708            "incoming item " +
   1709            item.id
   1710        );
   1711 
   1712        // The current API contract does not mandate that the ID returned by
   1713        // _findDupe() actually exists. Therefore, we have to perform this
   1714        // check.
   1715        existsLocally = await this._store.itemExists(localDupeGUID);
   1716 
   1717        // If the local item was modified, we carry its metadata forward so
   1718        // appropriate reconciling can be performed.
   1719        if (this._modified.has(localDupeGUID)) {
   1720          locallyModified = true;
   1721          localAge =
   1722            this._tracker._now() -
   1723            this._modified.getModifiedTimestamp(localDupeGUID);
   1724          remoteIsNewer = remoteAge < localAge;
   1725 
   1726          this._modified.changeID(localDupeGUID, item.id);
   1727        } else {
   1728          locallyModified = false;
   1729          localAge = null;
   1730        }
   1731 
   1732        // Tell the engine to do whatever it needs to switch the items.
   1733        await this._switchItemToDupe(localDupeGUID, item);
   1734 
   1735        this._log.debug(
   1736          "Local item after duplication: age=" +
   1737            localAge +
   1738            "; modified=" +
   1739            locallyModified +
   1740            "; exists=" +
   1741            existsLocally
   1742        );
   1743      } else {
   1744        this._log.trace("No duplicate found for incoming item: " + item.id);
   1745      }
   1746    }
   1747 
   1748    // At this point we've performed duplicate detection. But, nothing here
   1749    // should depend on duplicate detection as the above should have updated
   1750    // state seamlessly.
   1751 
   1752    if (!existsLocally) {
   1753      // If the item doesn't exist locally and we have no local modifications
   1754      // to the item (implying that it was not deleted), always apply the remote
   1755      // item.
   1756      if (!locallyModified) {
   1757        this._log.trace(
   1758          "Applying incoming because local item does not exist " +
   1759            "and was not deleted."
   1760        );
   1761        return true;
   1762      }
   1763 
   1764      // If the item was modified locally but isn't present, it must have
   1765      // been deleted. If the incoming record is younger, we restore from
   1766      // that record.
   1767      if (remoteIsNewer) {
   1768        this._log.trace(
   1769          "Applying incoming because local item was deleted " +
   1770            "before the incoming item was changed."
   1771        );
   1772        this._modified.delete(item.id);
   1773        return true;
   1774      }
   1775 
   1776      this._log.trace(
   1777        "Ignoring incoming item because the local item's " +
   1778          "deletion is newer."
   1779      );
   1780      return false;
   1781    }
   1782 
   1783    // If the remote and local records are the same, there is nothing to be
   1784    // done, so we don't do anything. In the ideal world, this logic wouldn't
   1785    // be here and the engine would take a record and apply it. The reason we
   1786    // want to defer this logic is because it would avoid a redundant and
   1787    // possibly expensive dip into the storage layer to query item state.
   1788    // This should get addressed in the async rewrite, so we ignore it for now.
   1789    let localRecord = await this._createRecord(item.id);
   1790    let recordsEqual = Utils.deepEquals(item.cleartext, localRecord.cleartext);
   1791 
   1792    // If the records are the same, we don't need to do anything. This does
   1793    // potentially throw away a local modification time. But, if the records
   1794    // are the same, does it matter?
   1795    if (recordsEqual) {
   1796      this._log.trace(
   1797        "Ignoring incoming item because the local item is identical."
   1798      );
   1799 
   1800      this._modified.delete(item.id);
   1801      return false;
   1802    }
   1803 
   1804    // At this point the records are different.
   1805 
   1806    // If we have no local modifications, always take the server record.
   1807    if (!locallyModified) {
   1808      this._log.trace("Applying incoming record because no local conflicts.");
   1809      return true;
   1810    }
   1811 
   1812    // At this point, records are different and the local record is modified.
   1813    // We resolve conflicts by record age, where the newest one wins. This does
   1814    // result in data loss and should be handled by giving the engine an
   1815    // opportunity to merge the records. Bug 720592 tracks this feature.
   1816    this._log.warn(
   1817      "DATA LOSS: Both local and remote changes to record: " + item.id
   1818    );
   1819    if (!remoteIsNewer) {
   1820      this.beforeRecordDiscard(localRecord, item, remoteIsNewer);
   1821    }
   1822    return remoteIsNewer;
   1823  },
   1824 
   1825  // Upload outgoing records.
   1826  async _uploadOutgoing() {
   1827    this._log.trace("Uploading local changes to server.");
   1828 
   1829    // collection we'll upload
   1830    let up = new Collection(this.engineURL, null, this.service);
   1831    let modifiedIDs = new Set(this._modified.ids());
   1832    let countTelemetry = new SyncedRecordsTelemetry();
   1833    let counts = countTelemetry.outgoingCounts;
   1834    this._log.info(`Uploading ${modifiedIDs.size} outgoing records`);
   1835    if (modifiedIDs.size) {
   1836      counts.sent = modifiedIDs.size;
   1837 
   1838      let failed = [];
   1839      let successful = [];
   1840      let lastSync = await this.getLastSync();
   1841      let handleResponse = async (postQueue, resp, batchOngoing) => {
   1842        // Note: We don't want to update this.lastSync, or this._modified until
   1843        // the batch is complete, however we want to remember success/failure
   1844        // indicators for when that happens.
   1845        if (!resp.success) {
   1846          this._log.debug(`Uploading records failed: ${resp.status}`);
   1847          resp.failureCode =
   1848            resp.status == 412 ? ENGINE_BATCH_INTERRUPTED : ENGINE_UPLOAD_FAIL;
   1849          throw resp;
   1850        }
   1851 
   1852        // Update server timestamp from the upload.
   1853        failed = failed.concat(Object.keys(resp.obj.failed));
   1854        successful = successful.concat(resp.obj.success);
   1855 
   1856        if (batchOngoing) {
   1857          // Nothing to do yet
   1858          return;
   1859        }
   1860 
   1861        if (failed.length && this._log.level <= Log.Level.Debug) {
   1862          this._log.debug(
   1863            "Records that will be uploaded again because " +
   1864              "the server couldn't store them: " +
   1865              failed.join(", ")
   1866          );
   1867        }
   1868 
   1869        counts.failed += failed.length;
   1870        Object.values(failed).forEach(message => {
   1871          countTelemetry.addOutgoingFailedReason(message);
   1872        });
   1873 
   1874        for (let id of successful) {
   1875          this._modified.delete(id);
   1876        }
   1877 
   1878        await this._onRecordsWritten(
   1879          successful,
   1880          failed,
   1881          postQueue.lastModified
   1882        );
   1883 
   1884        // Advance lastSync since we've finished the batch.
   1885        if (postQueue.lastModified > lastSync) {
   1886          lastSync = postQueue.lastModified;
   1887          await this.setLastSync(lastSync);
   1888        }
   1889 
   1890        // clear for next batch
   1891        failed.length = 0;
   1892        successful.length = 0;
   1893      };
   1894 
   1895      let postQueue = up.newPostQueue(this._log, lastSync, handleResponse);
   1896 
   1897      for (let id of modifiedIDs) {
   1898        let out;
   1899        let ok = false;
   1900        try {
   1901          out = await this._createRecord(id);
   1902          if (this._log.level <= Log.Level.Trace) {
   1903            this._log.trace("Outgoing: " + out);
   1904          }
   1905          await out.encrypt(
   1906            this.service.collectionKeys.keyForCollection(this.name)
   1907          );
   1908          ok = true;
   1909        } catch (ex) {
   1910          this._log.warn("Error creating record", ex);
   1911          ++counts.failed;
   1912          countTelemetry.addOutgoingFailedReason(ex.message);
   1913          if (Async.isShutdownException(ex) || !this.allowSkippedRecord) {
   1914            if (!this.allowSkippedRecord) {
   1915              // Don't bother for shutdown errors
   1916              Observers.notify("weave:engine:sync:uploaded", counts, this.name);
   1917            }
   1918            throw ex;
   1919          }
   1920        }
   1921        if (ok) {
   1922          let { enqueued, error } = await postQueue.enqueue(out);
   1923          if (!enqueued) {
   1924            ++counts.failed;
   1925            countTelemetry.addOutgoingFailedReason(error.message);
   1926            if (!this.allowSkippedRecord) {
   1927              Observers.notify("weave:engine:sync:uploaded", counts, this.name);
   1928              this._log.warn(
   1929                `Failed to enqueue record "${id}" (aborting)`,
   1930                error
   1931              );
   1932              throw error;
   1933            }
   1934            this._modified.delete(id);
   1935            this._log.warn(
   1936              `Failed to enqueue record "${id}" (skipping)`,
   1937              error
   1938            );
   1939          }
   1940        }
   1941        await Async.promiseYield();
   1942      }
   1943      await postQueue.flush(true);
   1944    }
   1945 
   1946    if (counts.sent || counts.failed) {
   1947      Observers.notify("weave:engine:sync:uploaded", counts, this.name);
   1948    }
   1949  },
   1950 
   1951  async _onRecordsWritten() {
   1952    // Implement this method to take specific actions against successfully
   1953    // uploaded records and failed records.
   1954  },
   1955 
   1956  // Any cleanup necessary.
   1957  // Save the current snapshot so as to calculate changes at next sync
   1958  async _syncFinish() {
   1959    this._log.trace("Finishing up sync");
   1960 
   1961    let doDelete = async (key, val) => {
   1962      let coll = new Collection(this.engineURL, this._recordObj, this.service);
   1963      coll[key] = val;
   1964      await coll.delete();
   1965    };
   1966 
   1967    for (let [key, val] of Object.entries(this._delete)) {
   1968      // Remove the key for future uses
   1969      delete this._delete[key];
   1970 
   1971      this._log.trace("doing post-sync deletions", { key, val });
   1972      // Send a simple delete for the property
   1973      if (key != "ids" || val.length <= 100) {
   1974        await doDelete(key, val);
   1975      } else {
   1976        // For many ids, split into chunks of at most 100
   1977        while (val.length) {
   1978          await doDelete(key, val.slice(0, 100));
   1979          val = val.slice(100);
   1980        }
   1981      }
   1982    }
   1983    this.hasSyncedThisSession = true;
   1984    await this._tracker.asyncObserver.promiseObserversComplete();
   1985  },
   1986 
   1987  async _syncCleanup() {
   1988    try {
   1989      // Mark failed WBOs as changed again so they are reuploaded next time.
   1990      await this.trackRemainingChanges();
   1991    } finally {
   1992      this._modified.clear();
   1993    }
   1994  },
   1995 
   1996  async _sync() {
   1997    try {
   1998      Async.checkAppReady();
   1999      await this._syncStartup();
   2000      Async.checkAppReady();
   2001      Observers.notify("weave:engine:sync:status", "process-incoming");
   2002      await this._processIncoming();
   2003      Async.checkAppReady();
   2004      Observers.notify("weave:engine:sync:status", "upload-outgoing");
   2005      try {
   2006        await this._uploadOutgoing();
   2007        Async.checkAppReady();
   2008        await this._syncFinish();
   2009      } catch (ex) {
   2010        if (!ex.status || ex.status != 412) {
   2011          throw ex;
   2012        }
   2013        // a 412 posting just means another client raced - but we don't want
   2014        // to treat that as a sync error - the next sync is almost certain
   2015        // to work.
   2016        this._log.warn("412 error during sync - will retry.");
   2017      }
   2018    } finally {
   2019      await this._syncCleanup();
   2020    }
   2021  },
   2022 
   2023  async canDecrypt() {
   2024    // Report failure even if there's nothing to decrypt
   2025    let canDecrypt = false;
   2026 
   2027    // Fetch the most recently uploaded record and try to decrypt it
   2028    let test = new Collection(this.engineURL, this._recordObj, this.service);
   2029    test.limit = 1;
   2030    test.sort = "newest";
   2031    test.full = true;
   2032 
   2033    let key = this.service.collectionKeys.keyForCollection(this.name);
   2034 
   2035    // Any failure fetching/decrypting will just result in false
   2036    try {
   2037      this._log.trace("Trying to decrypt a record from the server..");
   2038      let json = (await test.get()).obj[0];
   2039      let record = new this._recordObj();
   2040      record.deserialize(json);
   2041      await record.decrypt(key);
   2042      canDecrypt = true;
   2043    } catch (ex) {
   2044      if (Async.isShutdownException(ex)) {
   2045        throw ex;
   2046      }
   2047      this._log.debug("Failed test decrypt", ex);
   2048    }
   2049 
   2050    return canDecrypt;
   2051  },
   2052 
   2053  /**
   2054   * Deletes the collection for this engine on the server, and removes all local
   2055   * Sync metadata for this engine. This does *not* remove any existing data on
   2056   * other clients. This is called when we reset the sync ID.
   2057   */
   2058  async wipeServer() {
   2059    await this._deleteServerCollection();
   2060    await this._resetClient();
   2061  },
   2062 
   2063  /**
   2064   * Deletes the collection for this engine on the server, without removing
   2065   * any local Sync metadata or user data. Deleting the collection will not
   2066   * remove any user data on other clients, but will force other clients to
   2067   * start over as a first sync.
   2068   */
   2069  async _deleteServerCollection() {
   2070    let response = await this.service.resource(this.engineURL).delete();
   2071    if (response.status != 200 && response.status != 404) {
   2072      throw response;
   2073    }
   2074  },
   2075 
   2076  async removeClientData() {
   2077    // Implement this method in engines that store client specific data
   2078    // on the server.
   2079  },
   2080 
   2081  /*
   2082   * Decide on (and partially effect) an error-handling strategy.
   2083   *
   2084   * Asks the Service to respond to an HMAC error, which might result in keys
   2085   * being downloaded. That call returns true if an action which might allow a
   2086   * retry to occur.
   2087   *
   2088   * If `mayRetry` is truthy, and the Service suggests a retry,
   2089   * handleHMACMismatch returns kRecoveryStrategy.retry. Otherwise, it returns
   2090   * kRecoveryStrategy.error.
   2091   *
   2092   * Subclasses of SyncEngine can override this method to allow for different
   2093   * behavior -- e.g., to delete and ignore erroneous entries.
   2094   *
   2095   * All return values will be part of the kRecoveryStrategy enumeration.
   2096   */
   2097  async handleHMACMismatch(item, mayRetry) {
   2098    // By default we either try again, or bail out noisily.
   2099    return (await this.service.handleHMACEvent()) && mayRetry
   2100      ? SyncEngine.kRecoveryStrategy.retry
   2101      : SyncEngine.kRecoveryStrategy.error;
   2102  },
   2103 
   2104  /**
   2105   * Returns a changeset containing all items in the store. The default
   2106   * implementation returns a changeset with timestamps from long ago, to
   2107   * ensure we always use the remote version if one exists.
   2108   *
   2109   * This function is only called for the first sync. Subsequent syncs call
   2110   * `pullNewChanges`.
   2111   *
   2112   * @return A `Changeset` object.
   2113   */
   2114  async pullAllChanges() {
   2115    let changes = {};
   2116    let ids = await this._store.getAllIDs();
   2117    for (let id in ids) {
   2118      changes[id] = 0;
   2119    }
   2120    return changes;
   2121  },
   2122 
   2123  /**
   2124   * Returns a changeset containing entries for all currently tracked items.
   2125   * The default implementation returns a changeset with timestamps indicating
   2126   * when the item was added to the tracker.
   2127   *
   2128   * @return A `Changeset` object.
   2129   */
   2130  async pullNewChanges() {
   2131    await this._tracker.asyncObserver.promiseObserversComplete();
   2132    return this.getChangedIDs();
   2133  },
   2134 
   2135  /**
   2136   * Adds all remaining changeset entries back to the tracker, typically for
   2137   * items that failed to upload. This method is called at the end of each sync.
   2138   *
   2139   */
   2140  async trackRemainingChanges() {
   2141    for (let [id, change] of this._modified.entries()) {
   2142      await this._tracker.addChangedID(id, change);
   2143    }
   2144  },
   2145 
   2146  /**
   2147   * Removes all local Sync metadata for this engine, but keeps all existing
   2148   * local user data.
   2149   */
   2150  async resetClient() {
   2151    return this._notify("reset-client", this.name, this._resetClient)();
   2152  },
   2153 
   2154  async _resetClient() {
   2155    await this.resetLastSync();
   2156    this.hasSyncedThisSession = false;
   2157    this.previousFailed = new SerializableSet();
   2158    this.toFetch = new SerializableSet();
   2159  },
   2160 
   2161  /**
   2162   * Removes all local Sync metadata and user data for this engine.
   2163   */
   2164  async wipeClient() {
   2165    return this._notify("wipe-client", this.name, this._wipeClient)();
   2166  },
   2167 
   2168  async _wipeClient() {
   2169    await this.resetClient();
   2170    this._log.debug("Deleting all local data");
   2171    this._tracker.ignoreAll = true;
   2172    await this._store.wipe();
   2173    this._tracker.ignoreAll = false;
   2174    this._tracker.clearChangedIDs();
   2175  },
   2176 
   2177  /**
   2178   * If one exists, initialize and return a validator for this engine (which
   2179   * must have a `validate(engine)` method that returns a promise to an object
   2180   * with a getSummary method). Otherwise return null.
   2181   */
   2182  getValidator() {
   2183    return null;
   2184  },
   2185 
   2186  async finalize() {
   2187    Services.prefs.removeObserver(
   2188      `${PREFS_BRANCH}engine.${this.prefName}`,
   2189      this.asyncObserver
   2190    );
   2191    await this.asyncObserver.promiseObserversComplete();
   2192    await this._tracker.finalize();
   2193    await this._toFetchStorage.finalize();
   2194    await this._previousFailedStorage.finalize();
   2195  },
   2196 
   2197  // Returns a new watchdog. Exposed for tests.
   2198  _newWatchdog() {
   2199    return Async.watchdog();
   2200  },
   2201 };
   2202 
   2203 /**
   2204 * A changeset is created for each sync in `Engine::get{Changed, All}IDs`,
   2205 * and stores opaque change data for tracked IDs. The default implementation
   2206 * only records timestamps, though engines can extend this to store additional
   2207 * data for each entry.
   2208 */
   2209 export class Changeset {
   2210  // Creates an empty changeset.
   2211  constructor() {
   2212    this.changes = {};
   2213  }
   2214 
   2215  // Returns the last modified time, in seconds, for an entry in the changeset.
   2216  // `id` is guaranteed to be in the set.
   2217  getModifiedTimestamp(id) {
   2218    return this.changes[id];
   2219  }
   2220 
   2221  // Adds a change for a tracked ID to the changeset.
   2222  set(id, change) {
   2223    this.changes[id] = change;
   2224  }
   2225 
   2226  // Adds multiple entries to the changeset, preserving existing entries.
   2227  insert(changes) {
   2228    Object.assign(this.changes, changes);
   2229  }
   2230 
   2231  // Overwrites the existing set of tracked changes with new entries.
   2232  replace(changes) {
   2233    this.changes = changes;
   2234  }
   2235 
   2236  // Indicates whether an entry is in the changeset.
   2237  has(id) {
   2238    return id in this.changes;
   2239  }
   2240 
   2241  // Deletes an entry from the changeset. Used to clean up entries for
   2242  // reconciled and successfully uploaded records.
   2243  delete(id) {
   2244    delete this.changes[id];
   2245  }
   2246 
   2247  // Changes the ID of an entry in the changeset. Used when reconciling
   2248  // duplicates that have local changes.
   2249  changeID(oldID, newID) {
   2250    this.changes[newID] = this.changes[oldID];
   2251    delete this.changes[oldID];
   2252  }
   2253 
   2254  // Returns an array of all tracked IDs in this changeset.
   2255  ids() {
   2256    return Object.keys(this.changes);
   2257  }
   2258 
   2259  // Returns an array of `[id, change]` tuples. Used to repopulate the tracker
   2260  // with entries for failed uploads at the end of a sync.
   2261  entries() {
   2262    return Object.entries(this.changes);
   2263  }
   2264 
   2265  // Returns the number of entries in this changeset.
   2266  count() {
   2267    return this.ids().length;
   2268  }
   2269 
   2270  // Clears the changeset.
   2271  clear() {
   2272    this.changes = {};
   2273  }
   2274 }