tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

test_syncengine_sync.js (58976B)


      1 /* Any copyright is dedicated to the Public Domain.
      2 * http://creativecommons.org/publicdomain/zero/1.0/ */
      3 
      4 const { Weave } = ChromeUtils.importESModule(
      5  "resource://services-sync/main.sys.mjs"
      6 );
      7 const { WBORecord } = ChromeUtils.importESModule(
      8  "resource://services-sync/record.sys.mjs"
      9 );
     10 const { Service } = ChromeUtils.importESModule(
     11  "resource://services-sync/service.sys.mjs"
     12 );
     13 const { RotaryEngine } = ChromeUtils.importESModule(
     14  "resource://testing-common/services/sync/rotaryengine.sys.mjs"
     15 );
     16 
     17 function makeRotaryEngine() {
     18  return new RotaryEngine(Service);
     19 }
     20 
     21 async function clean(engine) {
     22  for (const pref of Svc.PrefBranch.getChildList("")) {
     23    Svc.PrefBranch.clearUserPref(pref);
     24  }
     25  Svc.PrefBranch.setStringPref("log.logger.engine.rotary", "Trace");
     26  Service.recordManager.clearCache();
     27  await engine._tracker.clearChangedIDs();
     28  await engine.finalize();
     29 }
     30 
     31 async function cleanAndGo(engine, server) {
     32  await clean(engine);
     33  await promiseStopServer(server);
     34 }
     35 
     36 async function promiseClean(engine, server) {
     37  await clean(engine);
     38  await promiseStopServer(server);
     39 }
     40 
     41 async function createServerAndConfigureClient() {
     42  let engine = new RotaryEngine(Service);
     43  let syncID = await engine.resetLocalSyncID();
     44 
     45  let contents = {
     46    meta: {
     47      global: { engines: { rotary: { version: engine.version, syncID } } },
     48    },
     49    crypto: {},
     50    rotary: {},
     51  };
     52 
     53  const USER = "foo";
     54  let server = new SyncServer();
     55  server.registerUser(USER, "password");
     56  server.createContents(USER, contents);
     57  server.start();
     58 
     59  await SyncTestingInfrastructure(server, USER);
     60  Service._updateCachedURLs();
     61 
     62  return [engine, server, USER];
     63 }
     64 
     65 /*
     66 * Tests
     67 *
     68 * SyncEngine._sync() is divided into four rather independent steps:
     69 *
     70 * - _syncStartup()
     71 * - _processIncoming()
     72 * - _uploadOutgoing()
     73 * - _syncFinish()
     74 *
     75 * In the spirit of unit testing, these are tested individually for
     76 * different scenarios below.
     77 */
     78 
     79 add_task(async function setup() {
     80  await generateNewKeys(Service.collectionKeys);
     81  Svc.PrefBranch.setStringPref("log.logger.engine.rotary", "Trace");
     82 });
     83 
     84 add_task(async function test_syncStartup_emptyOrOutdatedGlobalsResetsSync() {
     85  _(
     86    "SyncEngine._syncStartup resets sync and wipes server data if there's no or an outdated global record"
     87  );
     88 
     89  // Some server side data that's going to be wiped
     90  let collection = new ServerCollection();
     91  collection.insert(
     92    "flying",
     93    encryptPayload({ id: "flying", denomination: "LNER Class A3 4472" })
     94  );
     95  collection.insert(
     96    "scotsman",
     97    encryptPayload({ id: "scotsman", denomination: "Flying Scotsman" })
     98  );
     99 
    100  let server = sync_httpd_setup({
    101    "/1.1/foo/storage/rotary": collection.handler(),
    102  });
    103 
    104  await SyncTestingInfrastructure(server);
    105 
    106  let engine = makeRotaryEngine();
    107  engine._store.items = { rekolok: "Rekonstruktionslokomotive" };
    108  try {
    109    // Confirm initial environment
    110    const changes = await engine._tracker.getChangedIDs();
    111    Assert.equal(changes.rekolok, undefined);
    112    let metaGlobal = await Service.recordManager.get(engine.metaURL);
    113    Assert.equal(metaGlobal.payload.engines, undefined);
    114    Assert.ok(!!collection.payload("flying"));
    115    Assert.ok(!!collection.payload("scotsman"));
    116 
    117    await engine.setLastSync(Date.now() / 1000);
    118 
    119    // Trying to prompt a wipe -- we no longer track CryptoMeta per engine,
    120    // so it has nothing to check.
    121    await engine._syncStartup();
    122 
    123    // The meta/global WBO has been filled with data about the engine
    124    let engineData = metaGlobal.payload.engines.rotary;
    125    Assert.equal(engineData.version, engine.version);
    126    Assert.equal(engineData.syncID, await engine.getSyncID());
    127 
    128    // Sync was reset and server data was wiped
    129    Assert.equal(await engine.getLastSync(), 0);
    130    Assert.equal(collection.payload("flying"), undefined);
    131    Assert.equal(collection.payload("scotsman"), undefined);
    132  } finally {
    133    await cleanAndGo(engine, server);
    134  }
    135 });
    136 
    137 add_task(async function test_syncStartup_serverHasNewerVersion() {
    138  _("SyncEngine._syncStartup ");
    139 
    140  let global = new ServerWBO("global", {
    141    engines: { rotary: { version: 23456 } },
    142  });
    143  let server = httpd_setup({
    144    "/1.1/foo/storage/meta/global": global.handler(),
    145  });
    146 
    147  await SyncTestingInfrastructure(server);
    148 
    149  let engine = makeRotaryEngine();
    150  try {
    151    // The server has a newer version of the data and our engine can
    152    // handle.  That should give us an exception.
    153    let error;
    154    try {
    155      await engine._syncStartup();
    156    } catch (ex) {
    157      error = ex;
    158    }
    159    Assert.equal(error.failureCode, VERSION_OUT_OF_DATE);
    160  } finally {
    161    await cleanAndGo(engine, server);
    162  }
    163 });
    164 
    165 add_task(async function test_syncStartup_syncIDMismatchResetsClient() {
    166  _("SyncEngine._syncStartup resets sync if syncIDs don't match");
    167 
    168  let server = sync_httpd_setup({});
    169 
    170  await SyncTestingInfrastructure(server);
    171 
    172  // global record with a different syncID than our engine has
    173  let engine = makeRotaryEngine();
    174  let global = new ServerWBO("global", {
    175    engines: { rotary: { version: engine.version, syncID: "foobar" } },
    176  });
    177  server.registerPathHandler("/1.1/foo/storage/meta/global", global.handler());
    178 
    179  try {
    180    // Confirm initial environment
    181    Assert.equal(await engine.getSyncID(), "");
    182    const changes = await engine._tracker.getChangedIDs();
    183    Assert.equal(changes.rekolok, undefined);
    184 
    185    await engine.setLastSync(Date.now() / 1000);
    186    await engine._syncStartup();
    187 
    188    // The engine has assumed the server's syncID
    189    Assert.equal(await engine.getSyncID(), "foobar");
    190 
    191    // Sync was reset
    192    Assert.equal(await engine.getLastSync(), 0);
    193  } finally {
    194    await cleanAndGo(engine, server);
    195  }
    196 });
    197 
    198 add_task(async function test_processIncoming_emptyServer() {
    199  _("SyncEngine._processIncoming working with an empty server backend");
    200 
    201  let collection = new ServerCollection();
    202  let server = sync_httpd_setup({
    203    "/1.1/foo/storage/rotary": collection.handler(),
    204  });
    205 
    206  await SyncTestingInfrastructure(server);
    207 
    208  let engine = makeRotaryEngine();
    209  try {
    210    // Merely ensure that this code path is run without any errors
    211    await engine._processIncoming();
    212    Assert.equal(await engine.getLastSync(), 0);
    213  } finally {
    214    await cleanAndGo(engine, server);
    215  }
    216 });
    217 
    218 add_task(async function test_processIncoming_createFromServer() {
    219  _("SyncEngine._processIncoming creates new records from server data");
    220 
    221  // Some server records that will be downloaded
    222  let collection = new ServerCollection();
    223  collection.insert(
    224    "flying",
    225    encryptPayload({ id: "flying", denomination: "LNER Class A3 4472" })
    226  );
    227  collection.insert(
    228    "scotsman",
    229    encryptPayload({ id: "scotsman", denomination: "Flying Scotsman" })
    230  );
    231 
    232  // Two pathological cases involving relative URIs gone wrong.
    233  let pathologicalPayload = encryptPayload({
    234    id: "../pathological",
    235    denomination: "Pathological Case",
    236  });
    237  collection.insert("../pathological", pathologicalPayload);
    238 
    239  let server = sync_httpd_setup({
    240    "/1.1/foo/storage/rotary": collection.handler(),
    241    "/1.1/foo/storage/rotary/flying": collection.wbo("flying").handler(),
    242    "/1.1/foo/storage/rotary/scotsman": collection.wbo("scotsman").handler(),
    243  });
    244 
    245  await SyncTestingInfrastructure(server);
    246 
    247  await generateNewKeys(Service.collectionKeys);
    248 
    249  let engine = makeRotaryEngine();
    250  let syncID = await engine.resetLocalSyncID();
    251  let meta_global = Service.recordManager.set(
    252    engine.metaURL,
    253    new WBORecord(engine.metaURL)
    254  );
    255  meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
    256 
    257  try {
    258    // Confirm initial environment
    259    Assert.equal(await engine.getLastSync(), 0);
    260    Assert.equal(engine.lastModified, null);
    261    Assert.equal(engine._store.items.flying, undefined);
    262    Assert.equal(engine._store.items.scotsman, undefined);
    263    Assert.equal(engine._store.items["../pathological"], undefined);
    264 
    265    await engine._syncStartup();
    266    await engine._processIncoming();
    267 
    268    // Timestamps of last sync and last server modification are set.
    269    Assert.greater(await engine.getLastSync(), 0);
    270    Assert.greater(engine.lastModified, 0);
    271 
    272    // Local records have been created from the server data.
    273    Assert.equal(engine._store.items.flying, "LNER Class A3 4472");
    274    Assert.equal(engine._store.items.scotsman, "Flying Scotsman");
    275    Assert.equal(engine._store.items["../pathological"], "Pathological Case");
    276  } finally {
    277    await cleanAndGo(engine, server);
    278  }
    279 });
    280 
    281 add_task(async function test_processIncoming_reconcile() {
    282  _("SyncEngine._processIncoming updates local records");
    283 
    284  let collection = new ServerCollection();
    285 
    286  // This server record is newer than the corresponding client one,
    287  // so it'll update its data.
    288  collection.insert(
    289    "newrecord",
    290    encryptPayload({ id: "newrecord", denomination: "New stuff..." })
    291  );
    292 
    293  // This server record is newer than the corresponding client one,
    294  // so it'll update its data.
    295  collection.insert(
    296    "newerserver",
    297    encryptPayload({ id: "newerserver", denomination: "New data!" })
    298  );
    299 
    300  // This server record is 2 mins older than the client counterpart
    301  // but identical to it, so we're expecting the client record's
    302  // changedID to be reset.
    303  collection.insert(
    304    "olderidentical",
    305    encryptPayload({
    306      id: "olderidentical",
    307      denomination: "Older but identical",
    308    })
    309  );
    310  collection._wbos.olderidentical.modified -= 120;
    311 
    312  // This item simply has different data than the corresponding client
    313  // record (which is unmodified), so it will update the client as well
    314  collection.insert(
    315    "updateclient",
    316    encryptPayload({ id: "updateclient", denomination: "Get this!" })
    317  );
    318 
    319  // This is a dupe of 'original'.
    320  collection.insert(
    321    "duplication",
    322    encryptPayload({ id: "duplication", denomination: "Original Entry" })
    323  );
    324 
    325  // This record is marked as deleted, so we're expecting the client
    326  // record to be removed.
    327  collection.insert(
    328    "nukeme",
    329    encryptPayload({ id: "nukeme", denomination: "Nuke me!", deleted: true })
    330  );
    331 
    332  let server = sync_httpd_setup({
    333    "/1.1/foo/storage/rotary": collection.handler(),
    334  });
    335 
    336  await SyncTestingInfrastructure(server);
    337 
    338  let engine = makeRotaryEngine();
    339  engine._store.items = {
    340    newerserver: "New data, but not as new as server!",
    341    olderidentical: "Older but identical",
    342    updateclient: "Got data?",
    343    original: "Original Entry",
    344    long_original: "Long Original Entry",
    345    nukeme: "Nuke me!",
    346  };
    347  // Make this record 1 min old, thus older than the one on the server
    348  await engine._tracker.addChangedID("newerserver", Date.now() / 1000 - 60);
    349  // This record has been changed 2 mins later than the one on the server
    350  await engine._tracker.addChangedID("olderidentical", Date.now() / 1000);
    351 
    352  let syncID = await engine.resetLocalSyncID();
    353  let meta_global = Service.recordManager.set(
    354    engine.metaURL,
    355    new WBORecord(engine.metaURL)
    356  );
    357  meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
    358 
    359  try {
    360    // Confirm initial environment
    361    Assert.equal(engine._store.items.newrecord, undefined);
    362    Assert.equal(
    363      engine._store.items.newerserver,
    364      "New data, but not as new as server!"
    365    );
    366    Assert.equal(engine._store.items.olderidentical, "Older but identical");
    367    Assert.equal(engine._store.items.updateclient, "Got data?");
    368    Assert.equal(engine._store.items.nukeme, "Nuke me!");
    369    let changes = await engine._tracker.getChangedIDs();
    370    Assert.greater(changes.olderidentical, 0);
    371 
    372    await engine._syncStartup();
    373    await engine._processIncoming();
    374 
    375    // Timestamps of last sync and last server modification are set.
    376    Assert.greater(await engine.getLastSync(), 0);
    377    Assert.greater(engine.lastModified, 0);
    378 
    379    // The new record is created.
    380    Assert.equal(engine._store.items.newrecord, "New stuff...");
    381 
    382    // The 'newerserver' record is updated since the server data is newer.
    383    Assert.equal(engine._store.items.newerserver, "New data!");
    384 
    385    // The data for 'olderidentical' is identical on the server, so
    386    // it's no longer marked as changed anymore.
    387    Assert.equal(engine._store.items.olderidentical, "Older but identical");
    388    changes = await engine._tracker.getChangedIDs();
    389    Assert.equal(changes.olderidentical, undefined);
    390 
    391    // Updated with server data.
    392    Assert.equal(engine._store.items.updateclient, "Get this!");
    393 
    394    // The incoming ID is preferred.
    395    Assert.equal(engine._store.items.original, undefined);
    396    Assert.equal(engine._store.items.duplication, "Original Entry");
    397    Assert.notEqual(engine._delete.ids.indexOf("original"), -1);
    398 
    399    // The 'nukeme' record marked as deleted is removed.
    400    Assert.equal(engine._store.items.nukeme, undefined);
    401  } finally {
    402    await cleanAndGo(engine, server);
    403  }
    404 });
    405 
    406 add_task(async function test_processIncoming_reconcile_local_deleted() {
    407  _("Ensure local, duplicate ID is deleted on server.");
    408 
    409  // When a duplicate is resolved, the local ID (which is never taken) should
    410  // be deleted on the server.
    411  let [engine, server, user] = await createServerAndConfigureClient();
    412 
    413  let now = Date.now() / 1000 - 10;
    414  await engine.setLastSync(now);
    415  engine.lastModified = now + 1;
    416 
    417  let record = encryptPayload({
    418    id: "DUPE_INCOMING",
    419    denomination: "incoming",
    420  });
    421  let wbo = new ServerWBO("DUPE_INCOMING", record, now + 2);
    422  server.insertWBO(user, "rotary", wbo);
    423 
    424  record = encryptPayload({ id: "DUPE_LOCAL", denomination: "local" });
    425  wbo = new ServerWBO("DUPE_LOCAL", record, now - 1);
    426  server.insertWBO(user, "rotary", wbo);
    427 
    428  await engine._store.create({ id: "DUPE_LOCAL", denomination: "local" });
    429  Assert.ok(await engine._store.itemExists("DUPE_LOCAL"));
    430  Assert.equal("DUPE_LOCAL", await engine._findDupe({ id: "DUPE_INCOMING" }));
    431 
    432  await engine._sync();
    433 
    434  do_check_attribute_count(engine._store.items, 1);
    435  Assert.ok("DUPE_INCOMING" in engine._store.items);
    436 
    437  let collection = server.getCollection(user, "rotary");
    438  Assert.equal(1, collection.count());
    439  Assert.notEqual(undefined, collection.wbo("DUPE_INCOMING"));
    440 
    441  await cleanAndGo(engine, server);
    442 });
    443 
    444 add_task(async function test_processIncoming_reconcile_equivalent() {
    445  _("Ensure proper handling of incoming records that match local.");
    446 
    447  let [engine, server, user] = await createServerAndConfigureClient();
    448 
    449  let now = Date.now() / 1000 - 10;
    450  await engine.setLastSync(now);
    451  engine.lastModified = now + 1;
    452 
    453  let record = encryptPayload({ id: "entry", denomination: "denomination" });
    454  let wbo = new ServerWBO("entry", record, now + 2);
    455  server.insertWBO(user, "rotary", wbo);
    456 
    457  engine._store.items = { entry: "denomination" };
    458  Assert.ok(await engine._store.itemExists("entry"));
    459 
    460  await engine._sync();
    461 
    462  do_check_attribute_count(engine._store.items, 1);
    463 
    464  await cleanAndGo(engine, server);
    465 });
    466 
    467 add_task(
    468  async function test_processIncoming_reconcile_locally_deleted_dupe_new() {
    469    _(
    470      "Ensure locally deleted duplicate record newer than incoming is handled."
    471    );
    472 
    473    // This is a somewhat complicated test. It ensures that if a client receives
    474    // a modified record for an item that is deleted locally but with a different
    475    // ID that the incoming record is ignored. This is a corner case for record
    476    // handling, but it needs to be supported.
    477    let [engine, server, user] = await createServerAndConfigureClient();
    478 
    479    let now = Date.now() / 1000 - 10;
    480    await engine.setLastSync(now);
    481    engine.lastModified = now + 1;
    482 
    483    let record = encryptPayload({
    484      id: "DUPE_INCOMING",
    485      denomination: "incoming",
    486    });
    487    let wbo = new ServerWBO("DUPE_INCOMING", record, now + 2);
    488    server.insertWBO(user, "rotary", wbo);
    489 
    490    // Simulate a locally-deleted item.
    491    engine._store.items = {};
    492    await engine._tracker.addChangedID("DUPE_LOCAL", now + 3);
    493    Assert.equal(false, await engine._store.itemExists("DUPE_LOCAL"));
    494    Assert.equal(false, await engine._store.itemExists("DUPE_INCOMING"));
    495    Assert.equal("DUPE_LOCAL", await engine._findDupe({ id: "DUPE_INCOMING" }));
    496 
    497    engine.lastModified = server.getCollection(user, engine.name).timestamp;
    498    await engine._sync();
    499 
    500    // After the sync, the server's payload for the original ID should be marked
    501    // as deleted.
    502    do_check_empty(engine._store.items);
    503    let collection = server.getCollection(user, "rotary");
    504    Assert.equal(1, collection.count());
    505    wbo = collection.wbo("DUPE_INCOMING");
    506    Assert.notEqual(null, wbo);
    507    let payload = wbo.getCleartext();
    508    Assert.ok(payload.deleted);
    509 
    510    await cleanAndGo(engine, server);
    511  }
    512 );
    513 
    514 add_task(
    515  async function test_processIncoming_reconcile_locally_deleted_dupe_old() {
    516    _(
    517      "Ensure locally deleted duplicate record older than incoming is restored."
    518    );
    519 
    520    // This is similar to the above test except it tests the condition where the
    521    // incoming record is newer than the local deletion, therefore overriding it.
    522 
    523    let [engine, server, user] = await createServerAndConfigureClient();
    524 
    525    let now = Date.now() / 1000 - 10;
    526    await engine.setLastSync(now);
    527    engine.lastModified = now + 1;
    528 
    529    let record = encryptPayload({
    530      id: "DUPE_INCOMING",
    531      denomination: "incoming",
    532    });
    533    let wbo = new ServerWBO("DUPE_INCOMING", record, now + 2);
    534    server.insertWBO(user, "rotary", wbo);
    535 
    536    // Simulate a locally-deleted item.
    537    engine._store.items = {};
    538    await engine._tracker.addChangedID("DUPE_LOCAL", now + 1);
    539    Assert.equal(false, await engine._store.itemExists("DUPE_LOCAL"));
    540    Assert.equal(false, await engine._store.itemExists("DUPE_INCOMING"));
    541    Assert.equal("DUPE_LOCAL", await engine._findDupe({ id: "DUPE_INCOMING" }));
    542 
    543    await engine._sync();
    544 
    545    // Since the remote change is newer, the incoming item should exist locally.
    546    do_check_attribute_count(engine._store.items, 1);
    547    Assert.ok("DUPE_INCOMING" in engine._store.items);
    548    Assert.equal("incoming", engine._store.items.DUPE_INCOMING);
    549 
    550    let collection = server.getCollection(user, "rotary");
    551    Assert.equal(1, collection.count());
    552    wbo = collection.wbo("DUPE_INCOMING");
    553    let payload = wbo.getCleartext();
    554    Assert.equal("incoming", payload.denomination);
    555 
    556    await cleanAndGo(engine, server);
    557  }
    558 );
    559 
    560 add_task(async function test_processIncoming_reconcile_changed_dupe() {
    561  _("Ensure that locally changed duplicate record is handled properly.");
    562 
    563  let [engine, server, user] = await createServerAndConfigureClient();
    564 
    565  let now = Date.now() / 1000 - 10;
    566  await engine.setLastSync(now);
    567  engine.lastModified = now + 1;
    568 
    569  // The local record is newer than the incoming one, so it should be retained.
    570  let record = encryptPayload({
    571    id: "DUPE_INCOMING",
    572    denomination: "incoming",
    573  });
    574  let wbo = new ServerWBO("DUPE_INCOMING", record, now + 2);
    575  server.insertWBO(user, "rotary", wbo);
    576 
    577  await engine._store.create({ id: "DUPE_LOCAL", denomination: "local" });
    578  await engine._tracker.addChangedID("DUPE_LOCAL", now + 3);
    579  Assert.ok(await engine._store.itemExists("DUPE_LOCAL"));
    580  Assert.equal("DUPE_LOCAL", await engine._findDupe({ id: "DUPE_INCOMING" }));
    581 
    582  engine.lastModified = server.getCollection(user, engine.name).timestamp;
    583  await engine._sync();
    584 
    585  // The ID should have been changed to incoming.
    586  do_check_attribute_count(engine._store.items, 1);
    587  Assert.ok("DUPE_INCOMING" in engine._store.items);
    588 
    589  // On the server, the local ID should be deleted and the incoming ID should
    590  // have its payload set to what was in the local record.
    591  let collection = server.getCollection(user, "rotary");
    592  Assert.equal(1, collection.count());
    593  wbo = collection.wbo("DUPE_INCOMING");
    594  Assert.notEqual(undefined, wbo);
    595  let payload = wbo.getCleartext();
    596  Assert.equal("local", payload.denomination);
    597 
    598  await cleanAndGo(engine, server);
    599 });
    600 
    601 add_task(async function test_processIncoming_reconcile_changed_dupe_new() {
    602  _("Ensure locally changed duplicate record older than incoming is ignored.");
    603 
    604  // This test is similar to the above except the incoming record is younger
    605  // than the local record. The incoming record should be authoritative.
    606  let [engine, server, user] = await createServerAndConfigureClient();
    607 
    608  let now = Date.now() / 1000 - 10;
    609  await engine.setLastSync(now);
    610  engine.lastModified = now + 1;
    611 
    612  let record = encryptPayload({
    613    id: "DUPE_INCOMING",
    614    denomination: "incoming",
    615  });
    616  let wbo = new ServerWBO("DUPE_INCOMING", record, now + 2);
    617  server.insertWBO(user, "rotary", wbo);
    618 
    619  await engine._store.create({ id: "DUPE_LOCAL", denomination: "local" });
    620  await engine._tracker.addChangedID("DUPE_LOCAL", now + 1);
    621  Assert.ok(await engine._store.itemExists("DUPE_LOCAL"));
    622  Assert.equal("DUPE_LOCAL", await engine._findDupe({ id: "DUPE_INCOMING" }));
    623 
    624  engine.lastModified = server.getCollection(user, engine.name).timestamp;
    625  await engine._sync();
    626 
    627  // The ID should have been changed to incoming.
    628  do_check_attribute_count(engine._store.items, 1);
    629  Assert.ok("DUPE_INCOMING" in engine._store.items);
    630 
    631  // On the server, the local ID should be deleted and the incoming ID should
    632  // have its payload retained.
    633  let collection = server.getCollection(user, "rotary");
    634  Assert.equal(1, collection.count());
    635  wbo = collection.wbo("DUPE_INCOMING");
    636  Assert.notEqual(undefined, wbo);
    637  let payload = wbo.getCleartext();
    638  Assert.equal("incoming", payload.denomination);
    639  await cleanAndGo(engine, server);
    640 });
    641 
    642 add_task(async function test_processIncoming_resume_toFetch() {
    643  _(
    644    "toFetch and previousFailed items left over from previous syncs are fetched on the next sync, along with new items."
    645  );
    646 
    647  const LASTSYNC = Date.now() / 1000;
    648 
    649  // Server records that will be downloaded
    650  let collection = new ServerCollection();
    651  collection.insert(
    652    "flying",
    653    encryptPayload({ id: "flying", denomination: "LNER Class A3 4472" })
    654  );
    655  collection.insert(
    656    "scotsman",
    657    encryptPayload({ id: "scotsman", denomination: "Flying Scotsman" })
    658  );
    659  collection.insert(
    660    "rekolok",
    661    encryptPayload({ id: "rekolok", denomination: "Rekonstruktionslokomotive" })
    662  );
    663  for (let i = 0; i < 3; i++) {
    664    let id = "failed" + i;
    665    let payload = encryptPayload({ id, denomination: "Record No. " + i });
    666    let wbo = new ServerWBO(id, payload);
    667    wbo.modified = LASTSYNC - 10;
    668    collection.insertWBO(wbo);
    669  }
    670 
    671  collection.wbo("flying").modified = collection.wbo("scotsman").modified =
    672    LASTSYNC - 10;
    673  collection._wbos.rekolok.modified = LASTSYNC + 10;
    674 
    675  // Time travel 10 seconds into the future but still download the above WBOs.
    676  let engine = makeRotaryEngine();
    677  await engine.setLastSync(LASTSYNC);
    678  engine.toFetch = new SerializableSet(["flying", "scotsman"]);
    679  engine.previousFailed = new SerializableSet([
    680    "failed0",
    681    "failed1",
    682    "failed2",
    683  ]);
    684 
    685  let server = sync_httpd_setup({
    686    "/1.1/foo/storage/rotary": collection.handler(),
    687  });
    688 
    689  await SyncTestingInfrastructure(server);
    690 
    691  let syncID = await engine.resetLocalSyncID();
    692  let meta_global = Service.recordManager.set(
    693    engine.metaURL,
    694    new WBORecord(engine.metaURL)
    695  );
    696  meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
    697  try {
    698    // Confirm initial environment
    699    Assert.equal(engine._store.items.flying, undefined);
    700    Assert.equal(engine._store.items.scotsman, undefined);
    701    Assert.equal(engine._store.items.rekolok, undefined);
    702 
    703    await engine._syncStartup();
    704    await engine._processIncoming();
    705 
    706    // Local records have been created from the server data.
    707    Assert.equal(engine._store.items.flying, "LNER Class A3 4472");
    708    Assert.equal(engine._store.items.scotsman, "Flying Scotsman");
    709    Assert.equal(engine._store.items.rekolok, "Rekonstruktionslokomotive");
    710    Assert.equal(engine._store.items.failed0, "Record No. 0");
    711    Assert.equal(engine._store.items.failed1, "Record No. 1");
    712    Assert.equal(engine._store.items.failed2, "Record No. 2");
    713    Assert.equal(engine.previousFailed.size, 0);
    714  } finally {
    715    await cleanAndGo(engine, server);
    716  }
    717 });
    718 
    719 add_task(async function test_processIncoming_notify_count() {
    720  _("Ensure that failed records are reported only once.");
    721 
    722  const NUMBER_OF_RECORDS = 15;
    723 
    724  // Engine that fails every 5 records.
    725  let engine = makeRotaryEngine();
    726  engine._store._applyIncomingBatch = engine._store.applyIncomingBatch;
    727  engine._store.applyIncomingBatch = async function (records, countTelemetry) {
    728    let sortedRecords = records.sort((a, b) => (a.id > b.id ? 1 : -1));
    729    let recordsToApply = [],
    730      recordsToFail = [];
    731    for (let i = 0; i < sortedRecords.length; i++) {
    732      (i % 5 === 0 ? recordsToFail : recordsToApply).push(sortedRecords[i]);
    733    }
    734    recordsToFail.forEach(() => {
    735      countTelemetry.addIncomingFailedReason("failed message");
    736    });
    737    await engine._store._applyIncomingBatch(recordsToApply, countTelemetry);
    738 
    739    return recordsToFail.map(record => record.id);
    740  };
    741 
    742  // Create a batch of server side records.
    743  let collection = new ServerCollection();
    744  for (var i = 0; i < NUMBER_OF_RECORDS; i++) {
    745    let id = "record-no-" + i.toString(10).padStart(2, "0");
    746    let payload = encryptPayload({ id, denomination: "Record No. " + id });
    747    collection.insert(id, payload);
    748  }
    749 
    750  let server = sync_httpd_setup({
    751    "/1.1/foo/storage/rotary": collection.handler(),
    752  });
    753 
    754  await SyncTestingInfrastructure(server);
    755 
    756  let syncID = await engine.resetLocalSyncID();
    757  let meta_global = Service.recordManager.set(
    758    engine.metaURL,
    759    new WBORecord(engine.metaURL)
    760  );
    761  meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
    762  try {
    763    // Confirm initial environment.
    764    Assert.equal(await engine.getLastSync(), 0);
    765    Assert.equal(engine.toFetch.size, 0);
    766    Assert.equal(engine.previousFailed.size, 0);
    767    do_check_empty(engine._store.items);
    768 
    769    let called = 0;
    770    let counts;
    771    function onApplied(count) {
    772      _("Called with " + JSON.stringify(counts));
    773      counts = count;
    774      called++;
    775    }
    776    Svc.Obs.add("weave:engine:sync:applied", onApplied);
    777 
    778    // Do sync.
    779    await engine._syncStartup();
    780    await engine._processIncoming();
    781 
    782    // Confirm failures.
    783    do_check_attribute_count(engine._store.items, 12);
    784    Assert.deepEqual(
    785      Array.from(engine.previousFailed).sort(),
    786      ["record-no-00", "record-no-05", "record-no-10"].sort()
    787    );
    788 
    789    // There are newly failed records and they are reported.
    790    Assert.equal(called, 1);
    791    Assert.equal(counts.failed, 3);
    792    Assert.equal(counts.failedReasons[0].count, 3);
    793    Assert.equal(counts.failedReasons[0].name, "failed message");
    794    Assert.equal(counts.applied, 15);
    795    Assert.equal(counts.newFailed, 3);
    796    Assert.equal(counts.succeeded, 12);
    797 
    798    // Sync again, 1 of the failed items are the same, the rest didn't fail.
    799    await engine._processIncoming();
    800 
    801    // Confirming removed failures.
    802    do_check_attribute_count(engine._store.items, 14);
    803    // After failing twice the record that failed again [record-no-00]
    804    // should NOT be stored to try again
    805    Assert.deepEqual(Array.from(engine.previousFailed), []);
    806 
    807    Assert.equal(called, 2);
    808    Assert.equal(counts.failed, 1);
    809    Assert.equal(counts.failedReasons[0].count, 1);
    810    Assert.equal(counts.failedReasons[0].name, "failed message");
    811    Assert.equal(counts.applied, 3);
    812    Assert.equal(counts.newFailed, 0);
    813    Assert.equal(counts.succeeded, 2);
    814 
    815    Svc.Obs.remove("weave:engine:sync:applied", onApplied);
    816  } finally {
    817    await cleanAndGo(engine, server);
    818  }
    819 });
    820 
    821 add_task(async function test_processIncoming_previousFailed() {
    822  _("Ensure that failed records are retried.");
    823 
    824  const NUMBER_OF_RECORDS = 14;
    825 
    826  // Engine that alternates between failing and applying every 2 records.
    827  let engine = makeRotaryEngine();
    828  engine._store._applyIncomingBatch = engine._store.applyIncomingBatch;
    829  engine._store.applyIncomingBatch = async function (records, countTelemetry) {
    830    let sortedRecords = records.sort((a, b) => (a.id > b.id ? 1 : -1));
    831    let recordsToApply = [],
    832      recordsToFail = [];
    833    let chunks = Array.from(PlacesUtils.chunkArray(sortedRecords, 2));
    834    for (let i = 0; i < chunks.length; i++) {
    835      (i % 2 === 0 ? recordsToFail : recordsToApply).push(...chunks[i]);
    836    }
    837    await engine._store._applyIncomingBatch(recordsToApply, countTelemetry);
    838    return recordsToFail.map(record => record.id);
    839  };
    840 
    841  // Create a batch of server side records.
    842  let collection = new ServerCollection();
    843  for (var i = 0; i < NUMBER_OF_RECORDS; i++) {
    844    let id = "record-no-" + i.toString(10).padStart(2, "0");
    845    let payload = encryptPayload({ id, denomination: "Record No. " + i });
    846    collection.insert(id, payload);
    847  }
    848 
    849  let server = sync_httpd_setup({
    850    "/1.1/foo/storage/rotary": collection.handler(),
    851  });
    852 
    853  await SyncTestingInfrastructure(server);
    854 
    855  let syncID = await engine.resetLocalSyncID();
    856  let meta_global = Service.recordManager.set(
    857    engine.metaURL,
    858    new WBORecord(engine.metaURL)
    859  );
    860  meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
    861  try {
    862    // Confirm initial environment.
    863    Assert.equal(await engine.getLastSync(), 0);
    864    Assert.equal(engine.toFetch.size, 0);
    865    Assert.equal(engine.previousFailed.size, 0);
    866    do_check_empty(engine._store.items);
    867 
    868    // Initial failed items in previousFailed to be reset.
    869    let previousFailed = new SerializableSet([
    870      Utils.makeGUID(),
    871      Utils.makeGUID(),
    872      Utils.makeGUID(),
    873    ]);
    874    engine.previousFailed = previousFailed;
    875    Assert.equal(engine.previousFailed, previousFailed);
    876 
    877    // Do sync.
    878    await engine._syncStartup();
    879    await engine._processIncoming();
    880 
    881    // Expected result: 4 sync batches with 2 failures each => 8 failures
    882    do_check_attribute_count(engine._store.items, 6);
    883    Assert.deepEqual(
    884      Array.from(engine.previousFailed).sort(),
    885      [
    886        "record-no-00",
    887        "record-no-01",
    888        "record-no-04",
    889        "record-no-05",
    890        "record-no-08",
    891        "record-no-09",
    892        "record-no-12",
    893        "record-no-13",
    894      ].sort()
    895    );
    896 
    897    // Sync again with the same failed items (records 0, 1, 8, 9).
    898    await engine._processIncoming();
    899 
    900    do_check_attribute_count(engine._store.items, 10);
    901    // A second sync with the same failed items should NOT add the same items again.
    902    // Items that did not fail a second time should no longer be in previousFailed.
    903    Assert.deepEqual(Array.from(engine.previousFailed).sort(), []);
    904 
    905    // Refetched items that didn't fail the second time are in engine._store.items.
    906    Assert.equal(engine._store.items["record-no-04"], "Record No. 4");
    907    Assert.equal(engine._store.items["record-no-05"], "Record No. 5");
    908    Assert.equal(engine._store.items["record-no-12"], "Record No. 12");
    909    Assert.equal(engine._store.items["record-no-13"], "Record No. 13");
    910  } finally {
    911    await cleanAndGo(engine, server);
    912  }
    913 });
    914 
    915 add_task(async function test_processIncoming_failed_records() {
    916  _(
    917    "Ensure that failed records from _reconcile and applyIncomingBatch are refetched."
    918  );
    919 
    920  // Let's create three and a bit batches worth of server side records.
    921  let APPLY_BATCH_SIZE = 50;
    922  let collection = new ServerCollection();
    923  const NUMBER_OF_RECORDS = APPLY_BATCH_SIZE * 3 + 5;
    924  for (let i = 0; i < NUMBER_OF_RECORDS; i++) {
    925    let id = "record-no-" + i;
    926    let payload = encryptPayload({ id, denomination: "Record No. " + id });
    927    let wbo = new ServerWBO(id, payload);
    928    wbo.modified = Date.now() / 1000 + 60 * (i - APPLY_BATCH_SIZE * 3);
    929    collection.insertWBO(wbo);
    930  }
    931 
    932  // Engine that batches but likes to throw on a couple of records,
    933  // two in each batch: the even ones fail in reconcile, the odd ones
    934  // in applyIncoming.
    935  const BOGUS_RECORDS = [
    936    "record-no-" + 42,
    937    "record-no-" + 23,
    938    "record-no-" + (42 + APPLY_BATCH_SIZE),
    939    "record-no-" + (23 + APPLY_BATCH_SIZE),
    940    "record-no-" + (42 + APPLY_BATCH_SIZE * 2),
    941    "record-no-" + (23 + APPLY_BATCH_SIZE * 2),
    942    "record-no-" + (2 + APPLY_BATCH_SIZE * 3),
    943    "record-no-" + (1 + APPLY_BATCH_SIZE * 3),
    944  ];
    945  let engine = makeRotaryEngine();
    946 
    947  engine.__reconcile = engine._reconcile;
    948  engine._reconcile = async function _reconcile(record) {
    949    if (BOGUS_RECORDS.indexOf(record.id) % 2 == 0) {
    950      throw new Error("I don't like this record! Baaaaaah!");
    951    }
    952    return this.__reconcile.apply(this, arguments);
    953  };
    954  engine._store._applyIncoming = engine._store.applyIncoming;
    955  engine._store.applyIncoming = async function (record) {
    956    if (BOGUS_RECORDS.indexOf(record.id) % 2 == 1) {
    957      throw new Error("I don't like this record! Baaaaaah!");
    958    }
    959    return this._applyIncoming.apply(this, arguments);
    960  };
    961 
    962  // Keep track of requests made of a collection.
    963  let count = 0;
    964  let uris = [];
    965  function recording_handler(recordedCollection) {
    966    let h = recordedCollection.handler();
    967    return function (req, res) {
    968      ++count;
    969      uris.push(req.path + "?" + req.queryString);
    970      return h(req, res);
    971    };
    972  }
    973  let server = sync_httpd_setup({
    974    "/1.1/foo/storage/rotary": recording_handler(collection),
    975  });
    976 
    977  await SyncTestingInfrastructure(server);
    978 
    979  let syncID = await engine.resetLocalSyncID();
    980  let meta_global = Service.recordManager.set(
    981    engine.metaURL,
    982    new WBORecord(engine.metaURL)
    983  );
    984  meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
    985 
    986  try {
    987    // Confirm initial environment
    988    Assert.equal(await engine.getLastSync(), 0);
    989    Assert.equal(engine.toFetch.size, 0);
    990    Assert.equal(engine.previousFailed.size, 0);
    991    do_check_empty(engine._store.items);
    992 
    993    let observerSubject;
    994    let observerData;
    995    Svc.Obs.add("weave:engine:sync:applied", function onApplied(subject, data) {
    996      Svc.Obs.remove("weave:engine:sync:applied", onApplied);
    997      observerSubject = subject;
    998      observerData = data;
    999    });
   1000 
   1001    await engine._syncStartup();
   1002    await engine._processIncoming();
   1003 
   1004    // Ensure that all records but the bogus 4 have been applied.
   1005    do_check_attribute_count(
   1006      engine._store.items,
   1007      NUMBER_OF_RECORDS - BOGUS_RECORDS.length
   1008    );
   1009 
   1010    // Ensure that the bogus records will be fetched again on the next sync.
   1011    Assert.equal(engine.previousFailed.size, BOGUS_RECORDS.length);
   1012    Assert.deepEqual(
   1013      Array.from(engine.previousFailed).sort(),
   1014      BOGUS_RECORDS.sort()
   1015    );
   1016 
   1017    // Ensure the observer was notified
   1018    Assert.equal(observerData, engine.name);
   1019    Assert.equal(observerSubject.failed, BOGUS_RECORDS.length);
   1020    Assert.equal(observerSubject.newFailed, BOGUS_RECORDS.length);
   1021 
   1022    // Testing batching of failed item fetches.
   1023    // Try to sync again. Ensure that we split the request into chunks to avoid
   1024    // URI length limitations.
   1025    async function batchDownload(batchSize) {
   1026      count = 0;
   1027      uris = [];
   1028      engine.guidFetchBatchSize = batchSize;
   1029      await engine._processIncoming();
   1030      _("Tried again. Requests: " + count + "; URIs: " + JSON.stringify(uris));
   1031      return count;
   1032    }
   1033 
   1034    // There are 8 bad records, so this needs 3 fetches.
   1035    _("Test batching with ID batch size 3, normal mobile batch size.");
   1036    Assert.equal(await batchDownload(3), 3);
   1037 
   1038    // Since there the previous batch failed again, there should be
   1039    // no more records to fetch
   1040    _("Test that the second time a record failed to sync, gets ignored");
   1041    Assert.equal(await batchDownload(BOGUS_RECORDS.length), 0);
   1042  } finally {
   1043    await cleanAndGo(engine, server);
   1044  }
   1045 });
   1046 
   1047 add_task(async function test_processIncoming_decrypt_failed() {
   1048  _("Ensure that records failing to decrypt are either replaced or refetched.");
   1049 
   1050  // Some good and some bogus records. One doesn't contain valid JSON,
   1051  // the other will throw during decrypt.
   1052  let collection = new ServerCollection();
   1053  collection._wbos.flying = new ServerWBO(
   1054    "flying",
   1055    encryptPayload({ id: "flying", denomination: "LNER Class A3 4472" })
   1056  );
   1057  collection._wbos.nojson = new ServerWBO("nojson", "This is invalid JSON");
   1058  collection._wbos.nojson2 = new ServerWBO("nojson2", "This is invalid JSON");
   1059  collection._wbos.scotsman = new ServerWBO(
   1060    "scotsman",
   1061    encryptPayload({ id: "scotsman", denomination: "Flying Scotsman" })
   1062  );
   1063  collection._wbos.nodecrypt = new ServerWBO("nodecrypt", "Decrypt this!");
   1064  collection._wbos.nodecrypt2 = new ServerWBO("nodecrypt2", "Decrypt this!");
   1065 
   1066  // Patch the fake crypto service to throw on the record above.
   1067  Weave.Crypto._decrypt = Weave.Crypto.decrypt;
   1068  Weave.Crypto.decrypt = function (ciphertext) {
   1069    if (ciphertext == "Decrypt this!") {
   1070      throw new Error(
   1071        "Derp! Cipher finalized failed. Im ur crypto destroyin ur recordz."
   1072      );
   1073    }
   1074    return this._decrypt.apply(this, arguments);
   1075  };
   1076 
   1077  // Some broken records also exist locally.
   1078  let engine = makeRotaryEngine();
   1079  engine.enabled = true;
   1080  engine._store.items = { nojson: "Valid JSON", nodecrypt: "Valid ciphertext" };
   1081 
   1082  let server = sync_httpd_setup({
   1083    "/1.1/foo/storage/rotary": collection.handler(),
   1084  });
   1085 
   1086  await SyncTestingInfrastructure(server);
   1087 
   1088  let syncID = await engine.resetLocalSyncID();
   1089  let meta_global = Service.recordManager.set(
   1090    engine.metaURL,
   1091    new WBORecord(engine.metaURL)
   1092  );
   1093  meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
   1094  try {
   1095    // Confirm initial state
   1096    Assert.equal(engine.toFetch.size, 0);
   1097    Assert.equal(engine.previousFailed.size, 0);
   1098 
   1099    let observerSubject;
   1100    let observerData;
   1101    Svc.Obs.add("weave:engine:sync:applied", function onApplied(subject, data) {
   1102      Svc.Obs.remove("weave:engine:sync:applied", onApplied);
   1103      observerSubject = subject;
   1104      observerData = data;
   1105    });
   1106 
   1107    await engine.setLastSync(collection.wbo("nojson").modified - 1);
   1108    let ping = await sync_engine_and_validate_telem(engine, true);
   1109    Assert.equal(ping.engines[0].incoming.applied, 2);
   1110    Assert.equal(ping.engines[0].incoming.failed, 4);
   1111    console.log("incoming telem: ", ping.engines[0].incoming);
   1112    Assert.equal(
   1113      ping.engines[0].incoming.failedReasons[0].name,
   1114      "No ciphertext: nothing to decrypt?"
   1115    );
   1116    // There should be 4 of the same error
   1117    Assert.equal(ping.engines[0].incoming.failedReasons[0].count, 4);
   1118 
   1119    Assert.equal(engine.previousFailed.size, 4);
   1120    Assert.ok(engine.previousFailed.has("nojson"));
   1121    Assert.ok(engine.previousFailed.has("nojson2"));
   1122    Assert.ok(engine.previousFailed.has("nodecrypt"));
   1123    Assert.ok(engine.previousFailed.has("nodecrypt2"));
   1124 
   1125    // Ensure the observer was notified
   1126    Assert.equal(observerData, engine.name);
   1127    Assert.equal(observerSubject.applied, 2);
   1128    Assert.equal(observerSubject.failed, 4);
   1129    Assert.equal(observerSubject.failedReasons[0].count, 4);
   1130  } finally {
   1131    await promiseClean(engine, server);
   1132  }
   1133 });
   1134 
   1135 add_task(async function test_uploadOutgoing_toEmptyServer() {
   1136  _("SyncEngine._uploadOutgoing uploads new records to server");
   1137 
   1138  let collection = new ServerCollection();
   1139  collection._wbos.flying = new ServerWBO("flying");
   1140  collection._wbos.scotsman = new ServerWBO("scotsman");
   1141 
   1142  let server = sync_httpd_setup({
   1143    "/1.1/foo/storage/rotary": collection.handler(),
   1144    "/1.1/foo/storage/rotary/flying": collection.wbo("flying").handler(),
   1145    "/1.1/foo/storage/rotary/scotsman": collection.wbo("scotsman").handler(),
   1146  });
   1147 
   1148  await SyncTestingInfrastructure(server);
   1149  await generateNewKeys(Service.collectionKeys);
   1150 
   1151  let engine = makeRotaryEngine();
   1152  engine._store.items = {
   1153    flying: "LNER Class A3 4472",
   1154    scotsman: "Flying Scotsman",
   1155  };
   1156  // Mark one of these records as changed
   1157  await engine._tracker.addChangedID("scotsman", 0);
   1158 
   1159  let syncID = await engine.resetLocalSyncID();
   1160  let meta_global = Service.recordManager.set(
   1161    engine.metaURL,
   1162    new WBORecord(engine.metaURL)
   1163  );
   1164  meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
   1165 
   1166  try {
   1167    await engine.setLastSync(123); // needs to be non-zero so that tracker is queried
   1168 
   1169    // Confirm initial environment
   1170    Assert.equal(collection.payload("flying"), undefined);
   1171    Assert.equal(collection.payload("scotsman"), undefined);
   1172 
   1173    await engine._syncStartup();
   1174    await engine._uploadOutgoing();
   1175 
   1176    // Ensure the marked record ('scotsman') has been uploaded and is
   1177    // no longer marked.
   1178    Assert.equal(collection.payload("flying"), undefined);
   1179    Assert.ok(!!collection.payload("scotsman"));
   1180    Assert.equal(collection.cleartext("scotsman").id, "scotsman");
   1181    const changes = await engine._tracker.getChangedIDs();
   1182    Assert.equal(changes.scotsman, undefined);
   1183 
   1184    // The 'flying' record wasn't marked so it wasn't uploaded
   1185    Assert.equal(collection.payload("flying"), undefined);
   1186  } finally {
   1187    await cleanAndGo(engine, server);
   1188  }
   1189 });
   1190 
   1191 async function test_uploadOutgoing_max_record_payload_bytes(
   1192  allowSkippedRecord
   1193 ) {
   1194  _(
   1195    "SyncEngine._uploadOutgoing throws when payload is bigger than max_record_payload_bytes"
   1196  );
   1197  let collection = new ServerCollection();
   1198  collection._wbos.flying = new ServerWBO("flying");
   1199  collection._wbos.scotsman = new ServerWBO("scotsman");
   1200 
   1201  let server = sync_httpd_setup({
   1202    "/1.1/foo/storage/rotary": collection.handler(),
   1203    "/1.1/foo/storage/rotary/flying": collection.wbo("flying").handler(),
   1204    "/1.1/foo/storage/rotary/scotsman": collection.wbo("scotsman").handler(),
   1205  });
   1206 
   1207  await SyncTestingInfrastructure(server);
   1208  await generateNewKeys(Service.collectionKeys);
   1209 
   1210  let engine = makeRotaryEngine();
   1211  engine.allowSkippedRecord = allowSkippedRecord;
   1212  engine._store.items = { flying: "a".repeat(1024 * 1024), scotsman: "abcd" };
   1213 
   1214  await engine._tracker.addChangedID("flying", 1000);
   1215  await engine._tracker.addChangedID("scotsman", 1000);
   1216 
   1217  let syncID = await engine.resetLocalSyncID();
   1218  let meta_global = Service.recordManager.set(
   1219    engine.metaURL,
   1220    new WBORecord(engine.metaURL)
   1221  );
   1222  meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
   1223 
   1224  try {
   1225    await engine.setLastSync(1); // needs to be non-zero so that tracker is queried
   1226 
   1227    // Confirm initial environment
   1228    Assert.equal(collection.payload("flying"), undefined);
   1229    Assert.equal(collection.payload("scotsman"), undefined);
   1230 
   1231    await engine._syncStartup();
   1232    await engine._uploadOutgoing();
   1233 
   1234    if (!allowSkippedRecord) {
   1235      do_throw("should not get here");
   1236    }
   1237 
   1238    await engine.trackRemainingChanges();
   1239 
   1240    // Check we uploaded the other record to the server
   1241    Assert.ok(collection.payload("scotsman"));
   1242    // And that we won't try to upload the huge record next time.
   1243    const changes = await engine._tracker.getChangedIDs();
   1244    Assert.equal(changes.flying, undefined);
   1245  } catch (e) {
   1246    if (allowSkippedRecord) {
   1247      do_throw("should not get here");
   1248    }
   1249 
   1250    await engine.trackRemainingChanges();
   1251 
   1252    // Check that we will try to upload the huge record next time
   1253    const changes = await engine._tracker.getChangedIDs();
   1254    Assert.equal(changes.flying, 1000);
   1255  } finally {
   1256    // Check we didn't upload the oversized record to the server
   1257    Assert.equal(collection.payload("flying"), undefined);
   1258    await cleanAndGo(engine, server);
   1259  }
   1260 }
   1261 
   1262 add_task(
   1263  async function test_uploadOutgoing_max_record_payload_bytes_disallowSkippedRecords() {
   1264    return test_uploadOutgoing_max_record_payload_bytes(false);
   1265  }
   1266 );
   1267 
   1268 add_task(
   1269  async function test_uploadOutgoing_max_record_payload_bytes_allowSkippedRecords() {
   1270    return test_uploadOutgoing_max_record_payload_bytes(true);
   1271  }
   1272 );
   1273 
   1274 add_task(async function test_uploadOutgoing_failed() {
   1275  _(
   1276    "SyncEngine._uploadOutgoing doesn't clear the tracker of objects that failed to upload."
   1277  );
   1278 
   1279  let collection = new ServerCollection();
   1280  // We only define the "flying" WBO on the server, not the "scotsman"
   1281  // and "peppercorn" ones.
   1282  collection._wbos.flying = new ServerWBO("flying");
   1283 
   1284  let server = sync_httpd_setup({
   1285    "/1.1/foo/storage/rotary": collection.handler(),
   1286  });
   1287 
   1288  await SyncTestingInfrastructure(server);
   1289 
   1290  let engine = makeRotaryEngine();
   1291  engine._store.items = {
   1292    flying: "LNER Class A3 4472",
   1293    scotsman: "Flying Scotsman",
   1294    peppercorn: "Peppercorn Class",
   1295  };
   1296  // Mark these records as changed
   1297  const FLYING_CHANGED = 12345;
   1298  const SCOTSMAN_CHANGED = 23456;
   1299  const PEPPERCORN_CHANGED = 34567;
   1300  await engine._tracker.addChangedID("flying", FLYING_CHANGED);
   1301  await engine._tracker.addChangedID("scotsman", SCOTSMAN_CHANGED);
   1302  await engine._tracker.addChangedID("peppercorn", PEPPERCORN_CHANGED);
   1303 
   1304  let syncID = await engine.resetLocalSyncID();
   1305  let meta_global = Service.recordManager.set(
   1306    engine.metaURL,
   1307    new WBORecord(engine.metaURL)
   1308  );
   1309  meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
   1310 
   1311  try {
   1312    await engine.setLastSync(123); // needs to be non-zero so that tracker is queried
   1313 
   1314    // Confirm initial environment
   1315    Assert.equal(collection.payload("flying"), undefined);
   1316    let changes = await engine._tracker.getChangedIDs();
   1317    Assert.equal(changes.flying, FLYING_CHANGED);
   1318    Assert.equal(changes.scotsman, SCOTSMAN_CHANGED);
   1319    Assert.equal(changes.peppercorn, PEPPERCORN_CHANGED);
   1320 
   1321    engine.enabled = true;
   1322    await sync_engine_and_validate_telem(engine, true);
   1323 
   1324    // Ensure the 'flying' record has been uploaded and is no longer marked.
   1325    Assert.ok(!!collection.payload("flying"));
   1326    changes = await engine._tracker.getChangedIDs();
   1327    Assert.equal(changes.flying, undefined);
   1328 
   1329    // The 'scotsman' and 'peppercorn' records couldn't be uploaded so
   1330    // they weren't cleared from the tracker.
   1331    Assert.equal(changes.scotsman, SCOTSMAN_CHANGED);
   1332    Assert.equal(changes.peppercorn, PEPPERCORN_CHANGED);
   1333  } finally {
   1334    await promiseClean(engine, server);
   1335  }
   1336 });
   1337 
   1338 async function createRecordFailTelemetry(allowSkippedRecord) {
   1339  Services.prefs.setStringPref("services.sync.username", "foo");
   1340  let collection = new ServerCollection();
   1341  collection._wbos.flying = new ServerWBO("flying");
   1342  collection._wbos.scotsman = new ServerWBO("scotsman");
   1343 
   1344  let server = sync_httpd_setup({
   1345    "/1.1/foo/storage/rotary": collection.handler(),
   1346  });
   1347 
   1348  await SyncTestingInfrastructure(server);
   1349 
   1350  let engine = makeRotaryEngine();
   1351  engine.allowSkippedRecord = allowSkippedRecord;
   1352  let oldCreateRecord = engine._store.createRecord;
   1353  engine._store.createRecord = async (id, col) => {
   1354    if (id != "flying") {
   1355      throw new Error("oops");
   1356    }
   1357    return oldCreateRecord.call(engine._store, id, col);
   1358  };
   1359  engine._store.items = {
   1360    flying: "LNER Class A3 4472",
   1361    scotsman: "Flying Scotsman",
   1362  };
   1363  // Mark these records as changed
   1364  const FLYING_CHANGED = 12345;
   1365  const SCOTSMAN_CHANGED = 23456;
   1366  await engine._tracker.addChangedID("flying", FLYING_CHANGED);
   1367  await engine._tracker.addChangedID("scotsman", SCOTSMAN_CHANGED);
   1368 
   1369  let syncID = await engine.resetLocalSyncID();
   1370  let meta_global = Service.recordManager.set(
   1371    engine.metaURL,
   1372    new WBORecord(engine.metaURL)
   1373  );
   1374  meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
   1375 
   1376  let ping;
   1377  try {
   1378    await engine.setLastSync(123); // needs to be non-zero so that tracker is queried
   1379 
   1380    // Confirm initial environment
   1381    Assert.equal(collection.payload("flying"), undefined);
   1382    let changes = await engine._tracker.getChangedIDs();
   1383    Assert.equal(changes.flying, FLYING_CHANGED);
   1384    Assert.equal(changes.scotsman, SCOTSMAN_CHANGED);
   1385 
   1386    engine.enabled = true;
   1387    ping = await sync_engine_and_validate_telem(engine, true, onErrorPing => {
   1388      ping = onErrorPing;
   1389    });
   1390 
   1391    if (!allowSkippedRecord) {
   1392      do_throw("should not get here");
   1393    }
   1394 
   1395    // Ensure the 'flying' record has been uploaded and is no longer marked.
   1396    Assert.ok(!!collection.payload("flying"));
   1397    changes = await engine._tracker.getChangedIDs();
   1398    Assert.equal(changes.flying, undefined);
   1399  } catch (err) {
   1400    if (allowSkippedRecord) {
   1401      do_throw("should not get here");
   1402    }
   1403 
   1404    // Ensure the 'flying' record has not been uploaded and is still marked
   1405    Assert.ok(!collection.payload("flying"));
   1406    const changes = await engine._tracker.getChangedIDs();
   1407    Assert.ok(changes.flying);
   1408  } finally {
   1409    // We reported in telemetry that we failed a record
   1410    Assert.equal(ping.engines[0].outgoing[0].failed, 1);
   1411    Assert.equal(ping.engines[0].outgoing[0].failedReasons[0].name, "oops");
   1412 
   1413    // In any case, the 'scotsman' record couldn't be created so it wasn't
   1414    // uploaded nor it was not cleared from the tracker.
   1415    Assert.ok(!collection.payload("scotsman"));
   1416    const changes = await engine._tracker.getChangedIDs();
   1417    Assert.equal(changes.scotsman, SCOTSMAN_CHANGED);
   1418 
   1419    engine._store.createRecord = oldCreateRecord;
   1420    await promiseClean(engine, server);
   1421  }
   1422 }
   1423 
   1424 add_task(
   1425  async function test_uploadOutgoing_createRecord_throws_reported_telemetry() {
   1426    _(
   1427      "SyncEngine._uploadOutgoing reports a failed record to telemetry if createRecord throws"
   1428    );
   1429    await createRecordFailTelemetry(true);
   1430  }
   1431 );
   1432 
   1433 add_task(
   1434  async function test_uploadOutgoing_createRecord_throws_dontAllowSkipRecord() {
   1435    _(
   1436      "SyncEngine._uploadOutgoing will throw if createRecord throws and allowSkipRecord is set to false"
   1437    );
   1438    await createRecordFailTelemetry(false);
   1439  }
   1440 );
   1441 
   1442 add_task(async function test_uploadOutgoing_largeRecords() {
   1443  _(
   1444    "SyncEngine._uploadOutgoing throws on records larger than the max record payload size"
   1445  );
   1446 
   1447  let collection = new ServerCollection();
   1448 
   1449  let engine = makeRotaryEngine();
   1450  engine.allowSkippedRecord = false;
   1451  engine._store.items["large-item"] = "Y".repeat(
   1452    Service.getMaxRecordPayloadSize() * 2
   1453  );
   1454  await engine._tracker.addChangedID("large-item", 0);
   1455  collection.insert("large-item");
   1456 
   1457  let syncID = await engine.resetLocalSyncID();
   1458  let meta_global = Service.recordManager.set(
   1459    engine.metaURL,
   1460    new WBORecord(engine.metaURL)
   1461  );
   1462  meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
   1463 
   1464  let server = sync_httpd_setup({
   1465    "/1.1/foo/storage/rotary": collection.handler(),
   1466  });
   1467 
   1468  await SyncTestingInfrastructure(server);
   1469 
   1470  try {
   1471    await engine._syncStartup();
   1472    let error = null;
   1473    try {
   1474      await engine._uploadOutgoing();
   1475    } catch (e) {
   1476      error = e;
   1477    }
   1478    ok(!!error);
   1479  } finally {
   1480    await cleanAndGo(engine, server);
   1481  }
   1482 });
   1483 
   1484 add_task(async function test_syncFinish_deleteByIds() {
   1485  _(
   1486    "SyncEngine._syncFinish deletes server records slated for deletion (list of record IDs)."
   1487  );
   1488 
   1489  let collection = new ServerCollection();
   1490  collection._wbos.flying = new ServerWBO(
   1491    "flying",
   1492    encryptPayload({ id: "flying", denomination: "LNER Class A3 4472" })
   1493  );
   1494  collection._wbos.scotsman = new ServerWBO(
   1495    "scotsman",
   1496    encryptPayload({ id: "scotsman", denomination: "Flying Scotsman" })
   1497  );
   1498  collection._wbos.rekolok = new ServerWBO(
   1499    "rekolok",
   1500    encryptPayload({ id: "rekolok", denomination: "Rekonstruktionslokomotive" })
   1501  );
   1502 
   1503  let server = httpd_setup({
   1504    "/1.1/foo/storage/rotary": collection.handler(),
   1505  });
   1506  await SyncTestingInfrastructure(server);
   1507 
   1508  let engine = makeRotaryEngine();
   1509  try {
   1510    engine._delete = { ids: ["flying", "rekolok"] };
   1511    await engine._syncFinish();
   1512 
   1513    // The 'flying' and 'rekolok' records were deleted while the
   1514    // 'scotsman' one wasn't.
   1515    Assert.equal(collection.payload("flying"), undefined);
   1516    Assert.ok(!!collection.payload("scotsman"));
   1517    Assert.equal(collection.payload("rekolok"), undefined);
   1518 
   1519    // The deletion todo list has been reset.
   1520    Assert.equal(engine._delete.ids, undefined);
   1521  } finally {
   1522    await cleanAndGo(engine, server);
   1523  }
   1524 });
   1525 
   1526 add_task(async function test_syncFinish_deleteLotsInBatches() {
   1527  _(
   1528    "SyncEngine._syncFinish deletes server records in batches of 100 (list of record IDs)."
   1529  );
   1530 
   1531  let collection = new ServerCollection();
   1532 
   1533  // Let's count how many times the client does a DELETE request to the server
   1534  var noOfUploads = 0;
   1535  collection.delete = (function (orig) {
   1536    return function () {
   1537      noOfUploads++;
   1538      return orig.apply(this, arguments);
   1539    };
   1540  })(collection.delete);
   1541 
   1542  // Create a bunch of records on the server
   1543  let now = Date.now();
   1544  for (var i = 0; i < 234; i++) {
   1545    let id = "record-no-" + i;
   1546    let payload = encryptPayload({ id, denomination: "Record No. " + i });
   1547    let wbo = new ServerWBO(id, payload);
   1548    wbo.modified = now / 1000 - 60 * (i + 110);
   1549    collection.insertWBO(wbo);
   1550  }
   1551 
   1552  let server = httpd_setup({
   1553    "/1.1/foo/storage/rotary": collection.handler(),
   1554  });
   1555 
   1556  await SyncTestingInfrastructure(server);
   1557 
   1558  let engine = makeRotaryEngine();
   1559  try {
   1560    // Confirm initial environment
   1561    Assert.equal(noOfUploads, 0);
   1562 
   1563    // Declare what we want to have deleted: all records no. 100 and
   1564    // up and all records that are less than 200 mins old (which are
   1565    // records 0 thru 90).
   1566    engine._delete = { ids: [], newer: now / 1000 - 60 * 200.5 };
   1567    for (i = 100; i < 234; i++) {
   1568      engine._delete.ids.push("record-no-" + i);
   1569    }
   1570 
   1571    await engine._syncFinish();
   1572 
   1573    // Ensure that the appropriate server data has been wiped while
   1574    // preserving records 90 thru 200.
   1575    for (i = 0; i < 234; i++) {
   1576      let id = "record-no-" + i;
   1577      if (i <= 90 || i >= 100) {
   1578        Assert.equal(collection.payload(id), undefined);
   1579      } else {
   1580        Assert.ok(!!collection.payload(id));
   1581      }
   1582    }
   1583 
   1584    // The deletion was done in batches
   1585    Assert.equal(noOfUploads, 2 + 1);
   1586 
   1587    // The deletion todo list has been reset.
   1588    Assert.equal(engine._delete.ids, undefined);
   1589  } finally {
   1590    await cleanAndGo(engine, server);
   1591  }
   1592 });
   1593 
   1594 add_task(async function test_sync_partialUpload() {
   1595  _("SyncEngine.sync() keeps changedIDs that couldn't be uploaded.");
   1596 
   1597  let collection = new ServerCollection();
   1598  let server = sync_httpd_setup({
   1599    "/1.1/foo/storage/rotary": collection.handler(),
   1600  });
   1601  let oldServerConfiguration = Service.serverConfiguration;
   1602  Service.serverConfiguration = {
   1603    max_post_records: 100,
   1604  };
   1605  await SyncTestingInfrastructure(server);
   1606  await generateNewKeys(Service.collectionKeys);
   1607 
   1608  let engine = makeRotaryEngine();
   1609 
   1610  // Let the third upload fail completely
   1611  var noOfUploads = 0;
   1612  collection.post = (function (orig) {
   1613    return function () {
   1614      if (noOfUploads == 2) {
   1615        throw new Error("FAIL!");
   1616      }
   1617      noOfUploads++;
   1618      return orig.apply(this, arguments);
   1619    };
   1620  })(collection.post);
   1621 
   1622  // Create a bunch of records (and server side handlers)
   1623  for (let i = 0; i < 234; i++) {
   1624    let id = "record-no-" + i;
   1625    engine._store.items[id] = "Record No. " + i;
   1626    await engine._tracker.addChangedID(id, i);
   1627    // Let two items in the first upload batch fail.
   1628    if (i != 23 && i != 42) {
   1629      collection.insert(id);
   1630    }
   1631  }
   1632 
   1633  let syncID = await engine.resetLocalSyncID();
   1634  let meta_global = Service.recordManager.set(
   1635    engine.metaURL,
   1636    new WBORecord(engine.metaURL)
   1637  );
   1638  meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
   1639 
   1640  try {
   1641    await engine.setLastSync(123); // needs to be non-zero so that tracker is queried
   1642 
   1643    engine.enabled = true;
   1644    let error;
   1645    try {
   1646      await sync_engine_and_validate_telem(engine, true);
   1647    } catch (ex) {
   1648      error = ex;
   1649    }
   1650 
   1651    ok(!!error);
   1652 
   1653    const changes = await engine._tracker.getChangedIDs();
   1654    for (let i = 0; i < 234; i++) {
   1655      let id = "record-no-" + i;
   1656      // Ensure failed records are back in the tracker:
   1657      // * records no. 23 and 42 were rejected by the server,
   1658      // * records after the third batch and higher couldn't be uploaded because
   1659      //   we failed hard on the 3rd upload.
   1660      if (i == 23 || i == 42 || i >= 200) {
   1661        Assert.equal(changes[id], i);
   1662      } else {
   1663        Assert.equal(false, id in changes);
   1664      }
   1665    }
   1666  } finally {
   1667    Service.serverConfiguration = oldServerConfiguration;
   1668    await promiseClean(engine, server);
   1669  }
   1670 });
   1671 
   1672 add_task(async function test_canDecrypt_noCryptoKeys() {
   1673  _(
   1674    "SyncEngine.canDecrypt returns false if the engine fails to decrypt items on the server, e.g. due to a missing crypto key collection."
   1675  );
   1676 
   1677  // Wipe collection keys so we can test the desired scenario.
   1678  Service.collectionKeys.clear();
   1679 
   1680  let collection = new ServerCollection();
   1681  collection._wbos.flying = new ServerWBO(
   1682    "flying",
   1683    encryptPayload({ id: "flying", denomination: "LNER Class A3 4472" })
   1684  );
   1685 
   1686  let server = sync_httpd_setup({
   1687    "/1.1/foo/storage/rotary": collection.handler(),
   1688  });
   1689 
   1690  await SyncTestingInfrastructure(server);
   1691  let engine = makeRotaryEngine();
   1692  try {
   1693    Assert.equal(false, await engine.canDecrypt());
   1694  } finally {
   1695    await cleanAndGo(engine, server);
   1696  }
   1697 });
   1698 
   1699 add_task(async function test_canDecrypt_true() {
   1700  _(
   1701    "SyncEngine.canDecrypt returns true if the engine can decrypt the items on the server."
   1702  );
   1703 
   1704  await generateNewKeys(Service.collectionKeys);
   1705 
   1706  let collection = new ServerCollection();
   1707  collection._wbos.flying = new ServerWBO(
   1708    "flying",
   1709    encryptPayload({ id: "flying", denomination: "LNER Class A3 4472" })
   1710  );
   1711 
   1712  let server = sync_httpd_setup({
   1713    "/1.1/foo/storage/rotary": collection.handler(),
   1714  });
   1715 
   1716  await SyncTestingInfrastructure(server);
   1717  let engine = makeRotaryEngine();
   1718  try {
   1719    Assert.ok(await engine.canDecrypt());
   1720  } finally {
   1721    await cleanAndGo(engine, server);
   1722  }
   1723 });
   1724 
   1725 add_task(async function test_syncapplied_observer() {
   1726  const NUMBER_OF_RECORDS = 10;
   1727 
   1728  let engine = makeRotaryEngine();
   1729 
   1730  // Create a batch of server side records.
   1731  let collection = new ServerCollection();
   1732  for (var i = 0; i < NUMBER_OF_RECORDS; i++) {
   1733    let id = "record-no-" + i;
   1734    let payload = encryptPayload({ id, denomination: "Record No. " + id });
   1735    collection.insert(id, payload);
   1736  }
   1737 
   1738  let server = httpd_setup({
   1739    "/1.1/foo/storage/rotary": collection.handler(),
   1740  });
   1741 
   1742  await SyncTestingInfrastructure(server);
   1743 
   1744  let syncID = await engine.resetLocalSyncID();
   1745  let meta_global = Service.recordManager.set(
   1746    engine.metaURL,
   1747    new WBORecord(engine.metaURL)
   1748  );
   1749  meta_global.payload.engines = { rotary: { version: engine.version, syncID } };
   1750 
   1751  let numApplyCalls = 0;
   1752  let engine_name;
   1753  let count;
   1754  function onApplied(subject, data) {
   1755    numApplyCalls++;
   1756    engine_name = data;
   1757    count = subject;
   1758  }
   1759 
   1760  Svc.Obs.add("weave:engine:sync:applied", onApplied);
   1761 
   1762  try {
   1763    Service.scheduler.hasIncomingItems = false;
   1764 
   1765    // Do sync.
   1766    await engine._syncStartup();
   1767    await engine._processIncoming();
   1768 
   1769    do_check_attribute_count(engine._store.items, 10);
   1770 
   1771    Assert.equal(numApplyCalls, 1);
   1772    Assert.equal(engine_name, "rotary");
   1773    Assert.equal(count.applied, 10);
   1774 
   1775    Assert.ok(Service.scheduler.hasIncomingItems);
   1776  } finally {
   1777    await cleanAndGo(engine, server);
   1778    Service.scheduler.hasIncomingItems = false;
   1779    Svc.Obs.remove("weave:engine:sync:applied", onApplied);
   1780  }
   1781 });