test_syncengine_sync.js (58976B)
1 /* Any copyright is dedicated to the Public Domain. 2 * http://creativecommons.org/publicdomain/zero/1.0/ */ 3 4 const { Weave } = ChromeUtils.importESModule( 5 "resource://services-sync/main.sys.mjs" 6 ); 7 const { WBORecord } = ChromeUtils.importESModule( 8 "resource://services-sync/record.sys.mjs" 9 ); 10 const { Service } = ChromeUtils.importESModule( 11 "resource://services-sync/service.sys.mjs" 12 ); 13 const { RotaryEngine } = ChromeUtils.importESModule( 14 "resource://testing-common/services/sync/rotaryengine.sys.mjs" 15 ); 16 17 function makeRotaryEngine() { 18 return new RotaryEngine(Service); 19 } 20 21 async function clean(engine) { 22 for (const pref of Svc.PrefBranch.getChildList("")) { 23 Svc.PrefBranch.clearUserPref(pref); 24 } 25 Svc.PrefBranch.setStringPref("log.logger.engine.rotary", "Trace"); 26 Service.recordManager.clearCache(); 27 await engine._tracker.clearChangedIDs(); 28 await engine.finalize(); 29 } 30 31 async function cleanAndGo(engine, server) { 32 await clean(engine); 33 await promiseStopServer(server); 34 } 35 36 async function promiseClean(engine, server) { 37 await clean(engine); 38 await promiseStopServer(server); 39 } 40 41 async function createServerAndConfigureClient() { 42 let engine = new RotaryEngine(Service); 43 let syncID = await engine.resetLocalSyncID(); 44 45 let contents = { 46 meta: { 47 global: { engines: { rotary: { version: engine.version, syncID } } }, 48 }, 49 crypto: {}, 50 rotary: {}, 51 }; 52 53 const USER = "foo"; 54 let server = new SyncServer(); 55 server.registerUser(USER, "password"); 56 server.createContents(USER, contents); 57 server.start(); 58 59 await SyncTestingInfrastructure(server, USER); 60 Service._updateCachedURLs(); 61 62 return [engine, server, USER]; 63 } 64 65 /* 66 * Tests 67 * 68 * SyncEngine._sync() is divided into four rather independent steps: 69 * 70 * - _syncStartup() 71 * - _processIncoming() 72 * - _uploadOutgoing() 73 * - _syncFinish() 74 * 75 * In the spirit of unit testing, these are tested individually for 76 * different scenarios below. 77 */ 78 79 add_task(async function setup() { 80 await generateNewKeys(Service.collectionKeys); 81 Svc.PrefBranch.setStringPref("log.logger.engine.rotary", "Trace"); 82 }); 83 84 add_task(async function test_syncStartup_emptyOrOutdatedGlobalsResetsSync() { 85 _( 86 "SyncEngine._syncStartup resets sync and wipes server data if there's no or an outdated global record" 87 ); 88 89 // Some server side data that's going to be wiped 90 let collection = new ServerCollection(); 91 collection.insert( 92 "flying", 93 encryptPayload({ id: "flying", denomination: "LNER Class A3 4472" }) 94 ); 95 collection.insert( 96 "scotsman", 97 encryptPayload({ id: "scotsman", denomination: "Flying Scotsman" }) 98 ); 99 100 let server = sync_httpd_setup({ 101 "/1.1/foo/storage/rotary": collection.handler(), 102 }); 103 104 await SyncTestingInfrastructure(server); 105 106 let engine = makeRotaryEngine(); 107 engine._store.items = { rekolok: "Rekonstruktionslokomotive" }; 108 try { 109 // Confirm initial environment 110 const changes = await engine._tracker.getChangedIDs(); 111 Assert.equal(changes.rekolok, undefined); 112 let metaGlobal = await Service.recordManager.get(engine.metaURL); 113 Assert.equal(metaGlobal.payload.engines, undefined); 114 Assert.ok(!!collection.payload("flying")); 115 Assert.ok(!!collection.payload("scotsman")); 116 117 await engine.setLastSync(Date.now() / 1000); 118 119 // Trying to prompt a wipe -- we no longer track CryptoMeta per engine, 120 // so it has nothing to check. 121 await engine._syncStartup(); 122 123 // The meta/global WBO has been filled with data about the engine 124 let engineData = metaGlobal.payload.engines.rotary; 125 Assert.equal(engineData.version, engine.version); 126 Assert.equal(engineData.syncID, await engine.getSyncID()); 127 128 // Sync was reset and server data was wiped 129 Assert.equal(await engine.getLastSync(), 0); 130 Assert.equal(collection.payload("flying"), undefined); 131 Assert.equal(collection.payload("scotsman"), undefined); 132 } finally { 133 await cleanAndGo(engine, server); 134 } 135 }); 136 137 add_task(async function test_syncStartup_serverHasNewerVersion() { 138 _("SyncEngine._syncStartup "); 139 140 let global = new ServerWBO("global", { 141 engines: { rotary: { version: 23456 } }, 142 }); 143 let server = httpd_setup({ 144 "/1.1/foo/storage/meta/global": global.handler(), 145 }); 146 147 await SyncTestingInfrastructure(server); 148 149 let engine = makeRotaryEngine(); 150 try { 151 // The server has a newer version of the data and our engine can 152 // handle. That should give us an exception. 153 let error; 154 try { 155 await engine._syncStartup(); 156 } catch (ex) { 157 error = ex; 158 } 159 Assert.equal(error.failureCode, VERSION_OUT_OF_DATE); 160 } finally { 161 await cleanAndGo(engine, server); 162 } 163 }); 164 165 add_task(async function test_syncStartup_syncIDMismatchResetsClient() { 166 _("SyncEngine._syncStartup resets sync if syncIDs don't match"); 167 168 let server = sync_httpd_setup({}); 169 170 await SyncTestingInfrastructure(server); 171 172 // global record with a different syncID than our engine has 173 let engine = makeRotaryEngine(); 174 let global = new ServerWBO("global", { 175 engines: { rotary: { version: engine.version, syncID: "foobar" } }, 176 }); 177 server.registerPathHandler("/1.1/foo/storage/meta/global", global.handler()); 178 179 try { 180 // Confirm initial environment 181 Assert.equal(await engine.getSyncID(), ""); 182 const changes = await engine._tracker.getChangedIDs(); 183 Assert.equal(changes.rekolok, undefined); 184 185 await engine.setLastSync(Date.now() / 1000); 186 await engine._syncStartup(); 187 188 // The engine has assumed the server's syncID 189 Assert.equal(await engine.getSyncID(), "foobar"); 190 191 // Sync was reset 192 Assert.equal(await engine.getLastSync(), 0); 193 } finally { 194 await cleanAndGo(engine, server); 195 } 196 }); 197 198 add_task(async function test_processIncoming_emptyServer() { 199 _("SyncEngine._processIncoming working with an empty server backend"); 200 201 let collection = new ServerCollection(); 202 let server = sync_httpd_setup({ 203 "/1.1/foo/storage/rotary": collection.handler(), 204 }); 205 206 await SyncTestingInfrastructure(server); 207 208 let engine = makeRotaryEngine(); 209 try { 210 // Merely ensure that this code path is run without any errors 211 await engine._processIncoming(); 212 Assert.equal(await engine.getLastSync(), 0); 213 } finally { 214 await cleanAndGo(engine, server); 215 } 216 }); 217 218 add_task(async function test_processIncoming_createFromServer() { 219 _("SyncEngine._processIncoming creates new records from server data"); 220 221 // Some server records that will be downloaded 222 let collection = new ServerCollection(); 223 collection.insert( 224 "flying", 225 encryptPayload({ id: "flying", denomination: "LNER Class A3 4472" }) 226 ); 227 collection.insert( 228 "scotsman", 229 encryptPayload({ id: "scotsman", denomination: "Flying Scotsman" }) 230 ); 231 232 // Two pathological cases involving relative URIs gone wrong. 233 let pathologicalPayload = encryptPayload({ 234 id: "../pathological", 235 denomination: "Pathological Case", 236 }); 237 collection.insert("../pathological", pathologicalPayload); 238 239 let server = sync_httpd_setup({ 240 "/1.1/foo/storage/rotary": collection.handler(), 241 "/1.1/foo/storage/rotary/flying": collection.wbo("flying").handler(), 242 "/1.1/foo/storage/rotary/scotsman": collection.wbo("scotsman").handler(), 243 }); 244 245 await SyncTestingInfrastructure(server); 246 247 await generateNewKeys(Service.collectionKeys); 248 249 let engine = makeRotaryEngine(); 250 let syncID = await engine.resetLocalSyncID(); 251 let meta_global = Service.recordManager.set( 252 engine.metaURL, 253 new WBORecord(engine.metaURL) 254 ); 255 meta_global.payload.engines = { rotary: { version: engine.version, syncID } }; 256 257 try { 258 // Confirm initial environment 259 Assert.equal(await engine.getLastSync(), 0); 260 Assert.equal(engine.lastModified, null); 261 Assert.equal(engine._store.items.flying, undefined); 262 Assert.equal(engine._store.items.scotsman, undefined); 263 Assert.equal(engine._store.items["../pathological"], undefined); 264 265 await engine._syncStartup(); 266 await engine._processIncoming(); 267 268 // Timestamps of last sync and last server modification are set. 269 Assert.greater(await engine.getLastSync(), 0); 270 Assert.greater(engine.lastModified, 0); 271 272 // Local records have been created from the server data. 273 Assert.equal(engine._store.items.flying, "LNER Class A3 4472"); 274 Assert.equal(engine._store.items.scotsman, "Flying Scotsman"); 275 Assert.equal(engine._store.items["../pathological"], "Pathological Case"); 276 } finally { 277 await cleanAndGo(engine, server); 278 } 279 }); 280 281 add_task(async function test_processIncoming_reconcile() { 282 _("SyncEngine._processIncoming updates local records"); 283 284 let collection = new ServerCollection(); 285 286 // This server record is newer than the corresponding client one, 287 // so it'll update its data. 288 collection.insert( 289 "newrecord", 290 encryptPayload({ id: "newrecord", denomination: "New stuff..." }) 291 ); 292 293 // This server record is newer than the corresponding client one, 294 // so it'll update its data. 295 collection.insert( 296 "newerserver", 297 encryptPayload({ id: "newerserver", denomination: "New data!" }) 298 ); 299 300 // This server record is 2 mins older than the client counterpart 301 // but identical to it, so we're expecting the client record's 302 // changedID to be reset. 303 collection.insert( 304 "olderidentical", 305 encryptPayload({ 306 id: "olderidentical", 307 denomination: "Older but identical", 308 }) 309 ); 310 collection._wbos.olderidentical.modified -= 120; 311 312 // This item simply has different data than the corresponding client 313 // record (which is unmodified), so it will update the client as well 314 collection.insert( 315 "updateclient", 316 encryptPayload({ id: "updateclient", denomination: "Get this!" }) 317 ); 318 319 // This is a dupe of 'original'. 320 collection.insert( 321 "duplication", 322 encryptPayload({ id: "duplication", denomination: "Original Entry" }) 323 ); 324 325 // This record is marked as deleted, so we're expecting the client 326 // record to be removed. 327 collection.insert( 328 "nukeme", 329 encryptPayload({ id: "nukeme", denomination: "Nuke me!", deleted: true }) 330 ); 331 332 let server = sync_httpd_setup({ 333 "/1.1/foo/storage/rotary": collection.handler(), 334 }); 335 336 await SyncTestingInfrastructure(server); 337 338 let engine = makeRotaryEngine(); 339 engine._store.items = { 340 newerserver: "New data, but not as new as server!", 341 olderidentical: "Older but identical", 342 updateclient: "Got data?", 343 original: "Original Entry", 344 long_original: "Long Original Entry", 345 nukeme: "Nuke me!", 346 }; 347 // Make this record 1 min old, thus older than the one on the server 348 await engine._tracker.addChangedID("newerserver", Date.now() / 1000 - 60); 349 // This record has been changed 2 mins later than the one on the server 350 await engine._tracker.addChangedID("olderidentical", Date.now() / 1000); 351 352 let syncID = await engine.resetLocalSyncID(); 353 let meta_global = Service.recordManager.set( 354 engine.metaURL, 355 new WBORecord(engine.metaURL) 356 ); 357 meta_global.payload.engines = { rotary: { version: engine.version, syncID } }; 358 359 try { 360 // Confirm initial environment 361 Assert.equal(engine._store.items.newrecord, undefined); 362 Assert.equal( 363 engine._store.items.newerserver, 364 "New data, but not as new as server!" 365 ); 366 Assert.equal(engine._store.items.olderidentical, "Older but identical"); 367 Assert.equal(engine._store.items.updateclient, "Got data?"); 368 Assert.equal(engine._store.items.nukeme, "Nuke me!"); 369 let changes = await engine._tracker.getChangedIDs(); 370 Assert.greater(changes.olderidentical, 0); 371 372 await engine._syncStartup(); 373 await engine._processIncoming(); 374 375 // Timestamps of last sync and last server modification are set. 376 Assert.greater(await engine.getLastSync(), 0); 377 Assert.greater(engine.lastModified, 0); 378 379 // The new record is created. 380 Assert.equal(engine._store.items.newrecord, "New stuff..."); 381 382 // The 'newerserver' record is updated since the server data is newer. 383 Assert.equal(engine._store.items.newerserver, "New data!"); 384 385 // The data for 'olderidentical' is identical on the server, so 386 // it's no longer marked as changed anymore. 387 Assert.equal(engine._store.items.olderidentical, "Older but identical"); 388 changes = await engine._tracker.getChangedIDs(); 389 Assert.equal(changes.olderidentical, undefined); 390 391 // Updated with server data. 392 Assert.equal(engine._store.items.updateclient, "Get this!"); 393 394 // The incoming ID is preferred. 395 Assert.equal(engine._store.items.original, undefined); 396 Assert.equal(engine._store.items.duplication, "Original Entry"); 397 Assert.notEqual(engine._delete.ids.indexOf("original"), -1); 398 399 // The 'nukeme' record marked as deleted is removed. 400 Assert.equal(engine._store.items.nukeme, undefined); 401 } finally { 402 await cleanAndGo(engine, server); 403 } 404 }); 405 406 add_task(async function test_processIncoming_reconcile_local_deleted() { 407 _("Ensure local, duplicate ID is deleted on server."); 408 409 // When a duplicate is resolved, the local ID (which is never taken) should 410 // be deleted on the server. 411 let [engine, server, user] = await createServerAndConfigureClient(); 412 413 let now = Date.now() / 1000 - 10; 414 await engine.setLastSync(now); 415 engine.lastModified = now + 1; 416 417 let record = encryptPayload({ 418 id: "DUPE_INCOMING", 419 denomination: "incoming", 420 }); 421 let wbo = new ServerWBO("DUPE_INCOMING", record, now + 2); 422 server.insertWBO(user, "rotary", wbo); 423 424 record = encryptPayload({ id: "DUPE_LOCAL", denomination: "local" }); 425 wbo = new ServerWBO("DUPE_LOCAL", record, now - 1); 426 server.insertWBO(user, "rotary", wbo); 427 428 await engine._store.create({ id: "DUPE_LOCAL", denomination: "local" }); 429 Assert.ok(await engine._store.itemExists("DUPE_LOCAL")); 430 Assert.equal("DUPE_LOCAL", await engine._findDupe({ id: "DUPE_INCOMING" })); 431 432 await engine._sync(); 433 434 do_check_attribute_count(engine._store.items, 1); 435 Assert.ok("DUPE_INCOMING" in engine._store.items); 436 437 let collection = server.getCollection(user, "rotary"); 438 Assert.equal(1, collection.count()); 439 Assert.notEqual(undefined, collection.wbo("DUPE_INCOMING")); 440 441 await cleanAndGo(engine, server); 442 }); 443 444 add_task(async function test_processIncoming_reconcile_equivalent() { 445 _("Ensure proper handling of incoming records that match local."); 446 447 let [engine, server, user] = await createServerAndConfigureClient(); 448 449 let now = Date.now() / 1000 - 10; 450 await engine.setLastSync(now); 451 engine.lastModified = now + 1; 452 453 let record = encryptPayload({ id: "entry", denomination: "denomination" }); 454 let wbo = new ServerWBO("entry", record, now + 2); 455 server.insertWBO(user, "rotary", wbo); 456 457 engine._store.items = { entry: "denomination" }; 458 Assert.ok(await engine._store.itemExists("entry")); 459 460 await engine._sync(); 461 462 do_check_attribute_count(engine._store.items, 1); 463 464 await cleanAndGo(engine, server); 465 }); 466 467 add_task( 468 async function test_processIncoming_reconcile_locally_deleted_dupe_new() { 469 _( 470 "Ensure locally deleted duplicate record newer than incoming is handled." 471 ); 472 473 // This is a somewhat complicated test. It ensures that if a client receives 474 // a modified record for an item that is deleted locally but with a different 475 // ID that the incoming record is ignored. This is a corner case for record 476 // handling, but it needs to be supported. 477 let [engine, server, user] = await createServerAndConfigureClient(); 478 479 let now = Date.now() / 1000 - 10; 480 await engine.setLastSync(now); 481 engine.lastModified = now + 1; 482 483 let record = encryptPayload({ 484 id: "DUPE_INCOMING", 485 denomination: "incoming", 486 }); 487 let wbo = new ServerWBO("DUPE_INCOMING", record, now + 2); 488 server.insertWBO(user, "rotary", wbo); 489 490 // Simulate a locally-deleted item. 491 engine._store.items = {}; 492 await engine._tracker.addChangedID("DUPE_LOCAL", now + 3); 493 Assert.equal(false, await engine._store.itemExists("DUPE_LOCAL")); 494 Assert.equal(false, await engine._store.itemExists("DUPE_INCOMING")); 495 Assert.equal("DUPE_LOCAL", await engine._findDupe({ id: "DUPE_INCOMING" })); 496 497 engine.lastModified = server.getCollection(user, engine.name).timestamp; 498 await engine._sync(); 499 500 // After the sync, the server's payload for the original ID should be marked 501 // as deleted. 502 do_check_empty(engine._store.items); 503 let collection = server.getCollection(user, "rotary"); 504 Assert.equal(1, collection.count()); 505 wbo = collection.wbo("DUPE_INCOMING"); 506 Assert.notEqual(null, wbo); 507 let payload = wbo.getCleartext(); 508 Assert.ok(payload.deleted); 509 510 await cleanAndGo(engine, server); 511 } 512 ); 513 514 add_task( 515 async function test_processIncoming_reconcile_locally_deleted_dupe_old() { 516 _( 517 "Ensure locally deleted duplicate record older than incoming is restored." 518 ); 519 520 // This is similar to the above test except it tests the condition where the 521 // incoming record is newer than the local deletion, therefore overriding it. 522 523 let [engine, server, user] = await createServerAndConfigureClient(); 524 525 let now = Date.now() / 1000 - 10; 526 await engine.setLastSync(now); 527 engine.lastModified = now + 1; 528 529 let record = encryptPayload({ 530 id: "DUPE_INCOMING", 531 denomination: "incoming", 532 }); 533 let wbo = new ServerWBO("DUPE_INCOMING", record, now + 2); 534 server.insertWBO(user, "rotary", wbo); 535 536 // Simulate a locally-deleted item. 537 engine._store.items = {}; 538 await engine._tracker.addChangedID("DUPE_LOCAL", now + 1); 539 Assert.equal(false, await engine._store.itemExists("DUPE_LOCAL")); 540 Assert.equal(false, await engine._store.itemExists("DUPE_INCOMING")); 541 Assert.equal("DUPE_LOCAL", await engine._findDupe({ id: "DUPE_INCOMING" })); 542 543 await engine._sync(); 544 545 // Since the remote change is newer, the incoming item should exist locally. 546 do_check_attribute_count(engine._store.items, 1); 547 Assert.ok("DUPE_INCOMING" in engine._store.items); 548 Assert.equal("incoming", engine._store.items.DUPE_INCOMING); 549 550 let collection = server.getCollection(user, "rotary"); 551 Assert.equal(1, collection.count()); 552 wbo = collection.wbo("DUPE_INCOMING"); 553 let payload = wbo.getCleartext(); 554 Assert.equal("incoming", payload.denomination); 555 556 await cleanAndGo(engine, server); 557 } 558 ); 559 560 add_task(async function test_processIncoming_reconcile_changed_dupe() { 561 _("Ensure that locally changed duplicate record is handled properly."); 562 563 let [engine, server, user] = await createServerAndConfigureClient(); 564 565 let now = Date.now() / 1000 - 10; 566 await engine.setLastSync(now); 567 engine.lastModified = now + 1; 568 569 // The local record is newer than the incoming one, so it should be retained. 570 let record = encryptPayload({ 571 id: "DUPE_INCOMING", 572 denomination: "incoming", 573 }); 574 let wbo = new ServerWBO("DUPE_INCOMING", record, now + 2); 575 server.insertWBO(user, "rotary", wbo); 576 577 await engine._store.create({ id: "DUPE_LOCAL", denomination: "local" }); 578 await engine._tracker.addChangedID("DUPE_LOCAL", now + 3); 579 Assert.ok(await engine._store.itemExists("DUPE_LOCAL")); 580 Assert.equal("DUPE_LOCAL", await engine._findDupe({ id: "DUPE_INCOMING" })); 581 582 engine.lastModified = server.getCollection(user, engine.name).timestamp; 583 await engine._sync(); 584 585 // The ID should have been changed to incoming. 586 do_check_attribute_count(engine._store.items, 1); 587 Assert.ok("DUPE_INCOMING" in engine._store.items); 588 589 // On the server, the local ID should be deleted and the incoming ID should 590 // have its payload set to what was in the local record. 591 let collection = server.getCollection(user, "rotary"); 592 Assert.equal(1, collection.count()); 593 wbo = collection.wbo("DUPE_INCOMING"); 594 Assert.notEqual(undefined, wbo); 595 let payload = wbo.getCleartext(); 596 Assert.equal("local", payload.denomination); 597 598 await cleanAndGo(engine, server); 599 }); 600 601 add_task(async function test_processIncoming_reconcile_changed_dupe_new() { 602 _("Ensure locally changed duplicate record older than incoming is ignored."); 603 604 // This test is similar to the above except the incoming record is younger 605 // than the local record. The incoming record should be authoritative. 606 let [engine, server, user] = await createServerAndConfigureClient(); 607 608 let now = Date.now() / 1000 - 10; 609 await engine.setLastSync(now); 610 engine.lastModified = now + 1; 611 612 let record = encryptPayload({ 613 id: "DUPE_INCOMING", 614 denomination: "incoming", 615 }); 616 let wbo = new ServerWBO("DUPE_INCOMING", record, now + 2); 617 server.insertWBO(user, "rotary", wbo); 618 619 await engine._store.create({ id: "DUPE_LOCAL", denomination: "local" }); 620 await engine._tracker.addChangedID("DUPE_LOCAL", now + 1); 621 Assert.ok(await engine._store.itemExists("DUPE_LOCAL")); 622 Assert.equal("DUPE_LOCAL", await engine._findDupe({ id: "DUPE_INCOMING" })); 623 624 engine.lastModified = server.getCollection(user, engine.name).timestamp; 625 await engine._sync(); 626 627 // The ID should have been changed to incoming. 628 do_check_attribute_count(engine._store.items, 1); 629 Assert.ok("DUPE_INCOMING" in engine._store.items); 630 631 // On the server, the local ID should be deleted and the incoming ID should 632 // have its payload retained. 633 let collection = server.getCollection(user, "rotary"); 634 Assert.equal(1, collection.count()); 635 wbo = collection.wbo("DUPE_INCOMING"); 636 Assert.notEqual(undefined, wbo); 637 let payload = wbo.getCleartext(); 638 Assert.equal("incoming", payload.denomination); 639 await cleanAndGo(engine, server); 640 }); 641 642 add_task(async function test_processIncoming_resume_toFetch() { 643 _( 644 "toFetch and previousFailed items left over from previous syncs are fetched on the next sync, along with new items." 645 ); 646 647 const LASTSYNC = Date.now() / 1000; 648 649 // Server records that will be downloaded 650 let collection = new ServerCollection(); 651 collection.insert( 652 "flying", 653 encryptPayload({ id: "flying", denomination: "LNER Class A3 4472" }) 654 ); 655 collection.insert( 656 "scotsman", 657 encryptPayload({ id: "scotsman", denomination: "Flying Scotsman" }) 658 ); 659 collection.insert( 660 "rekolok", 661 encryptPayload({ id: "rekolok", denomination: "Rekonstruktionslokomotive" }) 662 ); 663 for (let i = 0; i < 3; i++) { 664 let id = "failed" + i; 665 let payload = encryptPayload({ id, denomination: "Record No. " + i }); 666 let wbo = new ServerWBO(id, payload); 667 wbo.modified = LASTSYNC - 10; 668 collection.insertWBO(wbo); 669 } 670 671 collection.wbo("flying").modified = collection.wbo("scotsman").modified = 672 LASTSYNC - 10; 673 collection._wbos.rekolok.modified = LASTSYNC + 10; 674 675 // Time travel 10 seconds into the future but still download the above WBOs. 676 let engine = makeRotaryEngine(); 677 await engine.setLastSync(LASTSYNC); 678 engine.toFetch = new SerializableSet(["flying", "scotsman"]); 679 engine.previousFailed = new SerializableSet([ 680 "failed0", 681 "failed1", 682 "failed2", 683 ]); 684 685 let server = sync_httpd_setup({ 686 "/1.1/foo/storage/rotary": collection.handler(), 687 }); 688 689 await SyncTestingInfrastructure(server); 690 691 let syncID = await engine.resetLocalSyncID(); 692 let meta_global = Service.recordManager.set( 693 engine.metaURL, 694 new WBORecord(engine.metaURL) 695 ); 696 meta_global.payload.engines = { rotary: { version: engine.version, syncID } }; 697 try { 698 // Confirm initial environment 699 Assert.equal(engine._store.items.flying, undefined); 700 Assert.equal(engine._store.items.scotsman, undefined); 701 Assert.equal(engine._store.items.rekolok, undefined); 702 703 await engine._syncStartup(); 704 await engine._processIncoming(); 705 706 // Local records have been created from the server data. 707 Assert.equal(engine._store.items.flying, "LNER Class A3 4472"); 708 Assert.equal(engine._store.items.scotsman, "Flying Scotsman"); 709 Assert.equal(engine._store.items.rekolok, "Rekonstruktionslokomotive"); 710 Assert.equal(engine._store.items.failed0, "Record No. 0"); 711 Assert.equal(engine._store.items.failed1, "Record No. 1"); 712 Assert.equal(engine._store.items.failed2, "Record No. 2"); 713 Assert.equal(engine.previousFailed.size, 0); 714 } finally { 715 await cleanAndGo(engine, server); 716 } 717 }); 718 719 add_task(async function test_processIncoming_notify_count() { 720 _("Ensure that failed records are reported only once."); 721 722 const NUMBER_OF_RECORDS = 15; 723 724 // Engine that fails every 5 records. 725 let engine = makeRotaryEngine(); 726 engine._store._applyIncomingBatch = engine._store.applyIncomingBatch; 727 engine._store.applyIncomingBatch = async function (records, countTelemetry) { 728 let sortedRecords = records.sort((a, b) => (a.id > b.id ? 1 : -1)); 729 let recordsToApply = [], 730 recordsToFail = []; 731 for (let i = 0; i < sortedRecords.length; i++) { 732 (i % 5 === 0 ? recordsToFail : recordsToApply).push(sortedRecords[i]); 733 } 734 recordsToFail.forEach(() => { 735 countTelemetry.addIncomingFailedReason("failed message"); 736 }); 737 await engine._store._applyIncomingBatch(recordsToApply, countTelemetry); 738 739 return recordsToFail.map(record => record.id); 740 }; 741 742 // Create a batch of server side records. 743 let collection = new ServerCollection(); 744 for (var i = 0; i < NUMBER_OF_RECORDS; i++) { 745 let id = "record-no-" + i.toString(10).padStart(2, "0"); 746 let payload = encryptPayload({ id, denomination: "Record No. " + id }); 747 collection.insert(id, payload); 748 } 749 750 let server = sync_httpd_setup({ 751 "/1.1/foo/storage/rotary": collection.handler(), 752 }); 753 754 await SyncTestingInfrastructure(server); 755 756 let syncID = await engine.resetLocalSyncID(); 757 let meta_global = Service.recordManager.set( 758 engine.metaURL, 759 new WBORecord(engine.metaURL) 760 ); 761 meta_global.payload.engines = { rotary: { version: engine.version, syncID } }; 762 try { 763 // Confirm initial environment. 764 Assert.equal(await engine.getLastSync(), 0); 765 Assert.equal(engine.toFetch.size, 0); 766 Assert.equal(engine.previousFailed.size, 0); 767 do_check_empty(engine._store.items); 768 769 let called = 0; 770 let counts; 771 function onApplied(count) { 772 _("Called with " + JSON.stringify(counts)); 773 counts = count; 774 called++; 775 } 776 Svc.Obs.add("weave:engine:sync:applied", onApplied); 777 778 // Do sync. 779 await engine._syncStartup(); 780 await engine._processIncoming(); 781 782 // Confirm failures. 783 do_check_attribute_count(engine._store.items, 12); 784 Assert.deepEqual( 785 Array.from(engine.previousFailed).sort(), 786 ["record-no-00", "record-no-05", "record-no-10"].sort() 787 ); 788 789 // There are newly failed records and they are reported. 790 Assert.equal(called, 1); 791 Assert.equal(counts.failed, 3); 792 Assert.equal(counts.failedReasons[0].count, 3); 793 Assert.equal(counts.failedReasons[0].name, "failed message"); 794 Assert.equal(counts.applied, 15); 795 Assert.equal(counts.newFailed, 3); 796 Assert.equal(counts.succeeded, 12); 797 798 // Sync again, 1 of the failed items are the same, the rest didn't fail. 799 await engine._processIncoming(); 800 801 // Confirming removed failures. 802 do_check_attribute_count(engine._store.items, 14); 803 // After failing twice the record that failed again [record-no-00] 804 // should NOT be stored to try again 805 Assert.deepEqual(Array.from(engine.previousFailed), []); 806 807 Assert.equal(called, 2); 808 Assert.equal(counts.failed, 1); 809 Assert.equal(counts.failedReasons[0].count, 1); 810 Assert.equal(counts.failedReasons[0].name, "failed message"); 811 Assert.equal(counts.applied, 3); 812 Assert.equal(counts.newFailed, 0); 813 Assert.equal(counts.succeeded, 2); 814 815 Svc.Obs.remove("weave:engine:sync:applied", onApplied); 816 } finally { 817 await cleanAndGo(engine, server); 818 } 819 }); 820 821 add_task(async function test_processIncoming_previousFailed() { 822 _("Ensure that failed records are retried."); 823 824 const NUMBER_OF_RECORDS = 14; 825 826 // Engine that alternates between failing and applying every 2 records. 827 let engine = makeRotaryEngine(); 828 engine._store._applyIncomingBatch = engine._store.applyIncomingBatch; 829 engine._store.applyIncomingBatch = async function (records, countTelemetry) { 830 let sortedRecords = records.sort((a, b) => (a.id > b.id ? 1 : -1)); 831 let recordsToApply = [], 832 recordsToFail = []; 833 let chunks = Array.from(PlacesUtils.chunkArray(sortedRecords, 2)); 834 for (let i = 0; i < chunks.length; i++) { 835 (i % 2 === 0 ? recordsToFail : recordsToApply).push(...chunks[i]); 836 } 837 await engine._store._applyIncomingBatch(recordsToApply, countTelemetry); 838 return recordsToFail.map(record => record.id); 839 }; 840 841 // Create a batch of server side records. 842 let collection = new ServerCollection(); 843 for (var i = 0; i < NUMBER_OF_RECORDS; i++) { 844 let id = "record-no-" + i.toString(10).padStart(2, "0"); 845 let payload = encryptPayload({ id, denomination: "Record No. " + i }); 846 collection.insert(id, payload); 847 } 848 849 let server = sync_httpd_setup({ 850 "/1.1/foo/storage/rotary": collection.handler(), 851 }); 852 853 await SyncTestingInfrastructure(server); 854 855 let syncID = await engine.resetLocalSyncID(); 856 let meta_global = Service.recordManager.set( 857 engine.metaURL, 858 new WBORecord(engine.metaURL) 859 ); 860 meta_global.payload.engines = { rotary: { version: engine.version, syncID } }; 861 try { 862 // Confirm initial environment. 863 Assert.equal(await engine.getLastSync(), 0); 864 Assert.equal(engine.toFetch.size, 0); 865 Assert.equal(engine.previousFailed.size, 0); 866 do_check_empty(engine._store.items); 867 868 // Initial failed items in previousFailed to be reset. 869 let previousFailed = new SerializableSet([ 870 Utils.makeGUID(), 871 Utils.makeGUID(), 872 Utils.makeGUID(), 873 ]); 874 engine.previousFailed = previousFailed; 875 Assert.equal(engine.previousFailed, previousFailed); 876 877 // Do sync. 878 await engine._syncStartup(); 879 await engine._processIncoming(); 880 881 // Expected result: 4 sync batches with 2 failures each => 8 failures 882 do_check_attribute_count(engine._store.items, 6); 883 Assert.deepEqual( 884 Array.from(engine.previousFailed).sort(), 885 [ 886 "record-no-00", 887 "record-no-01", 888 "record-no-04", 889 "record-no-05", 890 "record-no-08", 891 "record-no-09", 892 "record-no-12", 893 "record-no-13", 894 ].sort() 895 ); 896 897 // Sync again with the same failed items (records 0, 1, 8, 9). 898 await engine._processIncoming(); 899 900 do_check_attribute_count(engine._store.items, 10); 901 // A second sync with the same failed items should NOT add the same items again. 902 // Items that did not fail a second time should no longer be in previousFailed. 903 Assert.deepEqual(Array.from(engine.previousFailed).sort(), []); 904 905 // Refetched items that didn't fail the second time are in engine._store.items. 906 Assert.equal(engine._store.items["record-no-04"], "Record No. 4"); 907 Assert.equal(engine._store.items["record-no-05"], "Record No. 5"); 908 Assert.equal(engine._store.items["record-no-12"], "Record No. 12"); 909 Assert.equal(engine._store.items["record-no-13"], "Record No. 13"); 910 } finally { 911 await cleanAndGo(engine, server); 912 } 913 }); 914 915 add_task(async function test_processIncoming_failed_records() { 916 _( 917 "Ensure that failed records from _reconcile and applyIncomingBatch are refetched." 918 ); 919 920 // Let's create three and a bit batches worth of server side records. 921 let APPLY_BATCH_SIZE = 50; 922 let collection = new ServerCollection(); 923 const NUMBER_OF_RECORDS = APPLY_BATCH_SIZE * 3 + 5; 924 for (let i = 0; i < NUMBER_OF_RECORDS; i++) { 925 let id = "record-no-" + i; 926 let payload = encryptPayload({ id, denomination: "Record No. " + id }); 927 let wbo = new ServerWBO(id, payload); 928 wbo.modified = Date.now() / 1000 + 60 * (i - APPLY_BATCH_SIZE * 3); 929 collection.insertWBO(wbo); 930 } 931 932 // Engine that batches but likes to throw on a couple of records, 933 // two in each batch: the even ones fail in reconcile, the odd ones 934 // in applyIncoming. 935 const BOGUS_RECORDS = [ 936 "record-no-" + 42, 937 "record-no-" + 23, 938 "record-no-" + (42 + APPLY_BATCH_SIZE), 939 "record-no-" + (23 + APPLY_BATCH_SIZE), 940 "record-no-" + (42 + APPLY_BATCH_SIZE * 2), 941 "record-no-" + (23 + APPLY_BATCH_SIZE * 2), 942 "record-no-" + (2 + APPLY_BATCH_SIZE * 3), 943 "record-no-" + (1 + APPLY_BATCH_SIZE * 3), 944 ]; 945 let engine = makeRotaryEngine(); 946 947 engine.__reconcile = engine._reconcile; 948 engine._reconcile = async function _reconcile(record) { 949 if (BOGUS_RECORDS.indexOf(record.id) % 2 == 0) { 950 throw new Error("I don't like this record! Baaaaaah!"); 951 } 952 return this.__reconcile.apply(this, arguments); 953 }; 954 engine._store._applyIncoming = engine._store.applyIncoming; 955 engine._store.applyIncoming = async function (record) { 956 if (BOGUS_RECORDS.indexOf(record.id) % 2 == 1) { 957 throw new Error("I don't like this record! Baaaaaah!"); 958 } 959 return this._applyIncoming.apply(this, arguments); 960 }; 961 962 // Keep track of requests made of a collection. 963 let count = 0; 964 let uris = []; 965 function recording_handler(recordedCollection) { 966 let h = recordedCollection.handler(); 967 return function (req, res) { 968 ++count; 969 uris.push(req.path + "?" + req.queryString); 970 return h(req, res); 971 }; 972 } 973 let server = sync_httpd_setup({ 974 "/1.1/foo/storage/rotary": recording_handler(collection), 975 }); 976 977 await SyncTestingInfrastructure(server); 978 979 let syncID = await engine.resetLocalSyncID(); 980 let meta_global = Service.recordManager.set( 981 engine.metaURL, 982 new WBORecord(engine.metaURL) 983 ); 984 meta_global.payload.engines = { rotary: { version: engine.version, syncID } }; 985 986 try { 987 // Confirm initial environment 988 Assert.equal(await engine.getLastSync(), 0); 989 Assert.equal(engine.toFetch.size, 0); 990 Assert.equal(engine.previousFailed.size, 0); 991 do_check_empty(engine._store.items); 992 993 let observerSubject; 994 let observerData; 995 Svc.Obs.add("weave:engine:sync:applied", function onApplied(subject, data) { 996 Svc.Obs.remove("weave:engine:sync:applied", onApplied); 997 observerSubject = subject; 998 observerData = data; 999 }); 1000 1001 await engine._syncStartup(); 1002 await engine._processIncoming(); 1003 1004 // Ensure that all records but the bogus 4 have been applied. 1005 do_check_attribute_count( 1006 engine._store.items, 1007 NUMBER_OF_RECORDS - BOGUS_RECORDS.length 1008 ); 1009 1010 // Ensure that the bogus records will be fetched again on the next sync. 1011 Assert.equal(engine.previousFailed.size, BOGUS_RECORDS.length); 1012 Assert.deepEqual( 1013 Array.from(engine.previousFailed).sort(), 1014 BOGUS_RECORDS.sort() 1015 ); 1016 1017 // Ensure the observer was notified 1018 Assert.equal(observerData, engine.name); 1019 Assert.equal(observerSubject.failed, BOGUS_RECORDS.length); 1020 Assert.equal(observerSubject.newFailed, BOGUS_RECORDS.length); 1021 1022 // Testing batching of failed item fetches. 1023 // Try to sync again. Ensure that we split the request into chunks to avoid 1024 // URI length limitations. 1025 async function batchDownload(batchSize) { 1026 count = 0; 1027 uris = []; 1028 engine.guidFetchBatchSize = batchSize; 1029 await engine._processIncoming(); 1030 _("Tried again. Requests: " + count + "; URIs: " + JSON.stringify(uris)); 1031 return count; 1032 } 1033 1034 // There are 8 bad records, so this needs 3 fetches. 1035 _("Test batching with ID batch size 3, normal mobile batch size."); 1036 Assert.equal(await batchDownload(3), 3); 1037 1038 // Since there the previous batch failed again, there should be 1039 // no more records to fetch 1040 _("Test that the second time a record failed to sync, gets ignored"); 1041 Assert.equal(await batchDownload(BOGUS_RECORDS.length), 0); 1042 } finally { 1043 await cleanAndGo(engine, server); 1044 } 1045 }); 1046 1047 add_task(async function test_processIncoming_decrypt_failed() { 1048 _("Ensure that records failing to decrypt are either replaced or refetched."); 1049 1050 // Some good and some bogus records. One doesn't contain valid JSON, 1051 // the other will throw during decrypt. 1052 let collection = new ServerCollection(); 1053 collection._wbos.flying = new ServerWBO( 1054 "flying", 1055 encryptPayload({ id: "flying", denomination: "LNER Class A3 4472" }) 1056 ); 1057 collection._wbos.nojson = new ServerWBO("nojson", "This is invalid JSON"); 1058 collection._wbos.nojson2 = new ServerWBO("nojson2", "This is invalid JSON"); 1059 collection._wbos.scotsman = new ServerWBO( 1060 "scotsman", 1061 encryptPayload({ id: "scotsman", denomination: "Flying Scotsman" }) 1062 ); 1063 collection._wbos.nodecrypt = new ServerWBO("nodecrypt", "Decrypt this!"); 1064 collection._wbos.nodecrypt2 = new ServerWBO("nodecrypt2", "Decrypt this!"); 1065 1066 // Patch the fake crypto service to throw on the record above. 1067 Weave.Crypto._decrypt = Weave.Crypto.decrypt; 1068 Weave.Crypto.decrypt = function (ciphertext) { 1069 if (ciphertext == "Decrypt this!") { 1070 throw new Error( 1071 "Derp! Cipher finalized failed. Im ur crypto destroyin ur recordz." 1072 ); 1073 } 1074 return this._decrypt.apply(this, arguments); 1075 }; 1076 1077 // Some broken records also exist locally. 1078 let engine = makeRotaryEngine(); 1079 engine.enabled = true; 1080 engine._store.items = { nojson: "Valid JSON", nodecrypt: "Valid ciphertext" }; 1081 1082 let server = sync_httpd_setup({ 1083 "/1.1/foo/storage/rotary": collection.handler(), 1084 }); 1085 1086 await SyncTestingInfrastructure(server); 1087 1088 let syncID = await engine.resetLocalSyncID(); 1089 let meta_global = Service.recordManager.set( 1090 engine.metaURL, 1091 new WBORecord(engine.metaURL) 1092 ); 1093 meta_global.payload.engines = { rotary: { version: engine.version, syncID } }; 1094 try { 1095 // Confirm initial state 1096 Assert.equal(engine.toFetch.size, 0); 1097 Assert.equal(engine.previousFailed.size, 0); 1098 1099 let observerSubject; 1100 let observerData; 1101 Svc.Obs.add("weave:engine:sync:applied", function onApplied(subject, data) { 1102 Svc.Obs.remove("weave:engine:sync:applied", onApplied); 1103 observerSubject = subject; 1104 observerData = data; 1105 }); 1106 1107 await engine.setLastSync(collection.wbo("nojson").modified - 1); 1108 let ping = await sync_engine_and_validate_telem(engine, true); 1109 Assert.equal(ping.engines[0].incoming.applied, 2); 1110 Assert.equal(ping.engines[0].incoming.failed, 4); 1111 console.log("incoming telem: ", ping.engines[0].incoming); 1112 Assert.equal( 1113 ping.engines[0].incoming.failedReasons[0].name, 1114 "No ciphertext: nothing to decrypt?" 1115 ); 1116 // There should be 4 of the same error 1117 Assert.equal(ping.engines[0].incoming.failedReasons[0].count, 4); 1118 1119 Assert.equal(engine.previousFailed.size, 4); 1120 Assert.ok(engine.previousFailed.has("nojson")); 1121 Assert.ok(engine.previousFailed.has("nojson2")); 1122 Assert.ok(engine.previousFailed.has("nodecrypt")); 1123 Assert.ok(engine.previousFailed.has("nodecrypt2")); 1124 1125 // Ensure the observer was notified 1126 Assert.equal(observerData, engine.name); 1127 Assert.equal(observerSubject.applied, 2); 1128 Assert.equal(observerSubject.failed, 4); 1129 Assert.equal(observerSubject.failedReasons[0].count, 4); 1130 } finally { 1131 await promiseClean(engine, server); 1132 } 1133 }); 1134 1135 add_task(async function test_uploadOutgoing_toEmptyServer() { 1136 _("SyncEngine._uploadOutgoing uploads new records to server"); 1137 1138 let collection = new ServerCollection(); 1139 collection._wbos.flying = new ServerWBO("flying"); 1140 collection._wbos.scotsman = new ServerWBO("scotsman"); 1141 1142 let server = sync_httpd_setup({ 1143 "/1.1/foo/storage/rotary": collection.handler(), 1144 "/1.1/foo/storage/rotary/flying": collection.wbo("flying").handler(), 1145 "/1.1/foo/storage/rotary/scotsman": collection.wbo("scotsman").handler(), 1146 }); 1147 1148 await SyncTestingInfrastructure(server); 1149 await generateNewKeys(Service.collectionKeys); 1150 1151 let engine = makeRotaryEngine(); 1152 engine._store.items = { 1153 flying: "LNER Class A3 4472", 1154 scotsman: "Flying Scotsman", 1155 }; 1156 // Mark one of these records as changed 1157 await engine._tracker.addChangedID("scotsman", 0); 1158 1159 let syncID = await engine.resetLocalSyncID(); 1160 let meta_global = Service.recordManager.set( 1161 engine.metaURL, 1162 new WBORecord(engine.metaURL) 1163 ); 1164 meta_global.payload.engines = { rotary: { version: engine.version, syncID } }; 1165 1166 try { 1167 await engine.setLastSync(123); // needs to be non-zero so that tracker is queried 1168 1169 // Confirm initial environment 1170 Assert.equal(collection.payload("flying"), undefined); 1171 Assert.equal(collection.payload("scotsman"), undefined); 1172 1173 await engine._syncStartup(); 1174 await engine._uploadOutgoing(); 1175 1176 // Ensure the marked record ('scotsman') has been uploaded and is 1177 // no longer marked. 1178 Assert.equal(collection.payload("flying"), undefined); 1179 Assert.ok(!!collection.payload("scotsman")); 1180 Assert.equal(collection.cleartext("scotsman").id, "scotsman"); 1181 const changes = await engine._tracker.getChangedIDs(); 1182 Assert.equal(changes.scotsman, undefined); 1183 1184 // The 'flying' record wasn't marked so it wasn't uploaded 1185 Assert.equal(collection.payload("flying"), undefined); 1186 } finally { 1187 await cleanAndGo(engine, server); 1188 } 1189 }); 1190 1191 async function test_uploadOutgoing_max_record_payload_bytes( 1192 allowSkippedRecord 1193 ) { 1194 _( 1195 "SyncEngine._uploadOutgoing throws when payload is bigger than max_record_payload_bytes" 1196 ); 1197 let collection = new ServerCollection(); 1198 collection._wbos.flying = new ServerWBO("flying"); 1199 collection._wbos.scotsman = new ServerWBO("scotsman"); 1200 1201 let server = sync_httpd_setup({ 1202 "/1.1/foo/storage/rotary": collection.handler(), 1203 "/1.1/foo/storage/rotary/flying": collection.wbo("flying").handler(), 1204 "/1.1/foo/storage/rotary/scotsman": collection.wbo("scotsman").handler(), 1205 }); 1206 1207 await SyncTestingInfrastructure(server); 1208 await generateNewKeys(Service.collectionKeys); 1209 1210 let engine = makeRotaryEngine(); 1211 engine.allowSkippedRecord = allowSkippedRecord; 1212 engine._store.items = { flying: "a".repeat(1024 * 1024), scotsman: "abcd" }; 1213 1214 await engine._tracker.addChangedID("flying", 1000); 1215 await engine._tracker.addChangedID("scotsman", 1000); 1216 1217 let syncID = await engine.resetLocalSyncID(); 1218 let meta_global = Service.recordManager.set( 1219 engine.metaURL, 1220 new WBORecord(engine.metaURL) 1221 ); 1222 meta_global.payload.engines = { rotary: { version: engine.version, syncID } }; 1223 1224 try { 1225 await engine.setLastSync(1); // needs to be non-zero so that tracker is queried 1226 1227 // Confirm initial environment 1228 Assert.equal(collection.payload("flying"), undefined); 1229 Assert.equal(collection.payload("scotsman"), undefined); 1230 1231 await engine._syncStartup(); 1232 await engine._uploadOutgoing(); 1233 1234 if (!allowSkippedRecord) { 1235 do_throw("should not get here"); 1236 } 1237 1238 await engine.trackRemainingChanges(); 1239 1240 // Check we uploaded the other record to the server 1241 Assert.ok(collection.payload("scotsman")); 1242 // And that we won't try to upload the huge record next time. 1243 const changes = await engine._tracker.getChangedIDs(); 1244 Assert.equal(changes.flying, undefined); 1245 } catch (e) { 1246 if (allowSkippedRecord) { 1247 do_throw("should not get here"); 1248 } 1249 1250 await engine.trackRemainingChanges(); 1251 1252 // Check that we will try to upload the huge record next time 1253 const changes = await engine._tracker.getChangedIDs(); 1254 Assert.equal(changes.flying, 1000); 1255 } finally { 1256 // Check we didn't upload the oversized record to the server 1257 Assert.equal(collection.payload("flying"), undefined); 1258 await cleanAndGo(engine, server); 1259 } 1260 } 1261 1262 add_task( 1263 async function test_uploadOutgoing_max_record_payload_bytes_disallowSkippedRecords() { 1264 return test_uploadOutgoing_max_record_payload_bytes(false); 1265 } 1266 ); 1267 1268 add_task( 1269 async function test_uploadOutgoing_max_record_payload_bytes_allowSkippedRecords() { 1270 return test_uploadOutgoing_max_record_payload_bytes(true); 1271 } 1272 ); 1273 1274 add_task(async function test_uploadOutgoing_failed() { 1275 _( 1276 "SyncEngine._uploadOutgoing doesn't clear the tracker of objects that failed to upload." 1277 ); 1278 1279 let collection = new ServerCollection(); 1280 // We only define the "flying" WBO on the server, not the "scotsman" 1281 // and "peppercorn" ones. 1282 collection._wbos.flying = new ServerWBO("flying"); 1283 1284 let server = sync_httpd_setup({ 1285 "/1.1/foo/storage/rotary": collection.handler(), 1286 }); 1287 1288 await SyncTestingInfrastructure(server); 1289 1290 let engine = makeRotaryEngine(); 1291 engine._store.items = { 1292 flying: "LNER Class A3 4472", 1293 scotsman: "Flying Scotsman", 1294 peppercorn: "Peppercorn Class", 1295 }; 1296 // Mark these records as changed 1297 const FLYING_CHANGED = 12345; 1298 const SCOTSMAN_CHANGED = 23456; 1299 const PEPPERCORN_CHANGED = 34567; 1300 await engine._tracker.addChangedID("flying", FLYING_CHANGED); 1301 await engine._tracker.addChangedID("scotsman", SCOTSMAN_CHANGED); 1302 await engine._tracker.addChangedID("peppercorn", PEPPERCORN_CHANGED); 1303 1304 let syncID = await engine.resetLocalSyncID(); 1305 let meta_global = Service.recordManager.set( 1306 engine.metaURL, 1307 new WBORecord(engine.metaURL) 1308 ); 1309 meta_global.payload.engines = { rotary: { version: engine.version, syncID } }; 1310 1311 try { 1312 await engine.setLastSync(123); // needs to be non-zero so that tracker is queried 1313 1314 // Confirm initial environment 1315 Assert.equal(collection.payload("flying"), undefined); 1316 let changes = await engine._tracker.getChangedIDs(); 1317 Assert.equal(changes.flying, FLYING_CHANGED); 1318 Assert.equal(changes.scotsman, SCOTSMAN_CHANGED); 1319 Assert.equal(changes.peppercorn, PEPPERCORN_CHANGED); 1320 1321 engine.enabled = true; 1322 await sync_engine_and_validate_telem(engine, true); 1323 1324 // Ensure the 'flying' record has been uploaded and is no longer marked. 1325 Assert.ok(!!collection.payload("flying")); 1326 changes = await engine._tracker.getChangedIDs(); 1327 Assert.equal(changes.flying, undefined); 1328 1329 // The 'scotsman' and 'peppercorn' records couldn't be uploaded so 1330 // they weren't cleared from the tracker. 1331 Assert.equal(changes.scotsman, SCOTSMAN_CHANGED); 1332 Assert.equal(changes.peppercorn, PEPPERCORN_CHANGED); 1333 } finally { 1334 await promiseClean(engine, server); 1335 } 1336 }); 1337 1338 async function createRecordFailTelemetry(allowSkippedRecord) { 1339 Services.prefs.setStringPref("services.sync.username", "foo"); 1340 let collection = new ServerCollection(); 1341 collection._wbos.flying = new ServerWBO("flying"); 1342 collection._wbos.scotsman = new ServerWBO("scotsman"); 1343 1344 let server = sync_httpd_setup({ 1345 "/1.1/foo/storage/rotary": collection.handler(), 1346 }); 1347 1348 await SyncTestingInfrastructure(server); 1349 1350 let engine = makeRotaryEngine(); 1351 engine.allowSkippedRecord = allowSkippedRecord; 1352 let oldCreateRecord = engine._store.createRecord; 1353 engine._store.createRecord = async (id, col) => { 1354 if (id != "flying") { 1355 throw new Error("oops"); 1356 } 1357 return oldCreateRecord.call(engine._store, id, col); 1358 }; 1359 engine._store.items = { 1360 flying: "LNER Class A3 4472", 1361 scotsman: "Flying Scotsman", 1362 }; 1363 // Mark these records as changed 1364 const FLYING_CHANGED = 12345; 1365 const SCOTSMAN_CHANGED = 23456; 1366 await engine._tracker.addChangedID("flying", FLYING_CHANGED); 1367 await engine._tracker.addChangedID("scotsman", SCOTSMAN_CHANGED); 1368 1369 let syncID = await engine.resetLocalSyncID(); 1370 let meta_global = Service.recordManager.set( 1371 engine.metaURL, 1372 new WBORecord(engine.metaURL) 1373 ); 1374 meta_global.payload.engines = { rotary: { version: engine.version, syncID } }; 1375 1376 let ping; 1377 try { 1378 await engine.setLastSync(123); // needs to be non-zero so that tracker is queried 1379 1380 // Confirm initial environment 1381 Assert.equal(collection.payload("flying"), undefined); 1382 let changes = await engine._tracker.getChangedIDs(); 1383 Assert.equal(changes.flying, FLYING_CHANGED); 1384 Assert.equal(changes.scotsman, SCOTSMAN_CHANGED); 1385 1386 engine.enabled = true; 1387 ping = await sync_engine_and_validate_telem(engine, true, onErrorPing => { 1388 ping = onErrorPing; 1389 }); 1390 1391 if (!allowSkippedRecord) { 1392 do_throw("should not get here"); 1393 } 1394 1395 // Ensure the 'flying' record has been uploaded and is no longer marked. 1396 Assert.ok(!!collection.payload("flying")); 1397 changes = await engine._tracker.getChangedIDs(); 1398 Assert.equal(changes.flying, undefined); 1399 } catch (err) { 1400 if (allowSkippedRecord) { 1401 do_throw("should not get here"); 1402 } 1403 1404 // Ensure the 'flying' record has not been uploaded and is still marked 1405 Assert.ok(!collection.payload("flying")); 1406 const changes = await engine._tracker.getChangedIDs(); 1407 Assert.ok(changes.flying); 1408 } finally { 1409 // We reported in telemetry that we failed a record 1410 Assert.equal(ping.engines[0].outgoing[0].failed, 1); 1411 Assert.equal(ping.engines[0].outgoing[0].failedReasons[0].name, "oops"); 1412 1413 // In any case, the 'scotsman' record couldn't be created so it wasn't 1414 // uploaded nor it was not cleared from the tracker. 1415 Assert.ok(!collection.payload("scotsman")); 1416 const changes = await engine._tracker.getChangedIDs(); 1417 Assert.equal(changes.scotsman, SCOTSMAN_CHANGED); 1418 1419 engine._store.createRecord = oldCreateRecord; 1420 await promiseClean(engine, server); 1421 } 1422 } 1423 1424 add_task( 1425 async function test_uploadOutgoing_createRecord_throws_reported_telemetry() { 1426 _( 1427 "SyncEngine._uploadOutgoing reports a failed record to telemetry if createRecord throws" 1428 ); 1429 await createRecordFailTelemetry(true); 1430 } 1431 ); 1432 1433 add_task( 1434 async function test_uploadOutgoing_createRecord_throws_dontAllowSkipRecord() { 1435 _( 1436 "SyncEngine._uploadOutgoing will throw if createRecord throws and allowSkipRecord is set to false" 1437 ); 1438 await createRecordFailTelemetry(false); 1439 } 1440 ); 1441 1442 add_task(async function test_uploadOutgoing_largeRecords() { 1443 _( 1444 "SyncEngine._uploadOutgoing throws on records larger than the max record payload size" 1445 ); 1446 1447 let collection = new ServerCollection(); 1448 1449 let engine = makeRotaryEngine(); 1450 engine.allowSkippedRecord = false; 1451 engine._store.items["large-item"] = "Y".repeat( 1452 Service.getMaxRecordPayloadSize() * 2 1453 ); 1454 await engine._tracker.addChangedID("large-item", 0); 1455 collection.insert("large-item"); 1456 1457 let syncID = await engine.resetLocalSyncID(); 1458 let meta_global = Service.recordManager.set( 1459 engine.metaURL, 1460 new WBORecord(engine.metaURL) 1461 ); 1462 meta_global.payload.engines = { rotary: { version: engine.version, syncID } }; 1463 1464 let server = sync_httpd_setup({ 1465 "/1.1/foo/storage/rotary": collection.handler(), 1466 }); 1467 1468 await SyncTestingInfrastructure(server); 1469 1470 try { 1471 await engine._syncStartup(); 1472 let error = null; 1473 try { 1474 await engine._uploadOutgoing(); 1475 } catch (e) { 1476 error = e; 1477 } 1478 ok(!!error); 1479 } finally { 1480 await cleanAndGo(engine, server); 1481 } 1482 }); 1483 1484 add_task(async function test_syncFinish_deleteByIds() { 1485 _( 1486 "SyncEngine._syncFinish deletes server records slated for deletion (list of record IDs)." 1487 ); 1488 1489 let collection = new ServerCollection(); 1490 collection._wbos.flying = new ServerWBO( 1491 "flying", 1492 encryptPayload({ id: "flying", denomination: "LNER Class A3 4472" }) 1493 ); 1494 collection._wbos.scotsman = new ServerWBO( 1495 "scotsman", 1496 encryptPayload({ id: "scotsman", denomination: "Flying Scotsman" }) 1497 ); 1498 collection._wbos.rekolok = new ServerWBO( 1499 "rekolok", 1500 encryptPayload({ id: "rekolok", denomination: "Rekonstruktionslokomotive" }) 1501 ); 1502 1503 let server = httpd_setup({ 1504 "/1.1/foo/storage/rotary": collection.handler(), 1505 }); 1506 await SyncTestingInfrastructure(server); 1507 1508 let engine = makeRotaryEngine(); 1509 try { 1510 engine._delete = { ids: ["flying", "rekolok"] }; 1511 await engine._syncFinish(); 1512 1513 // The 'flying' and 'rekolok' records were deleted while the 1514 // 'scotsman' one wasn't. 1515 Assert.equal(collection.payload("flying"), undefined); 1516 Assert.ok(!!collection.payload("scotsman")); 1517 Assert.equal(collection.payload("rekolok"), undefined); 1518 1519 // The deletion todo list has been reset. 1520 Assert.equal(engine._delete.ids, undefined); 1521 } finally { 1522 await cleanAndGo(engine, server); 1523 } 1524 }); 1525 1526 add_task(async function test_syncFinish_deleteLotsInBatches() { 1527 _( 1528 "SyncEngine._syncFinish deletes server records in batches of 100 (list of record IDs)." 1529 ); 1530 1531 let collection = new ServerCollection(); 1532 1533 // Let's count how many times the client does a DELETE request to the server 1534 var noOfUploads = 0; 1535 collection.delete = (function (orig) { 1536 return function () { 1537 noOfUploads++; 1538 return orig.apply(this, arguments); 1539 }; 1540 })(collection.delete); 1541 1542 // Create a bunch of records on the server 1543 let now = Date.now(); 1544 for (var i = 0; i < 234; i++) { 1545 let id = "record-no-" + i; 1546 let payload = encryptPayload({ id, denomination: "Record No. " + i }); 1547 let wbo = new ServerWBO(id, payload); 1548 wbo.modified = now / 1000 - 60 * (i + 110); 1549 collection.insertWBO(wbo); 1550 } 1551 1552 let server = httpd_setup({ 1553 "/1.1/foo/storage/rotary": collection.handler(), 1554 }); 1555 1556 await SyncTestingInfrastructure(server); 1557 1558 let engine = makeRotaryEngine(); 1559 try { 1560 // Confirm initial environment 1561 Assert.equal(noOfUploads, 0); 1562 1563 // Declare what we want to have deleted: all records no. 100 and 1564 // up and all records that are less than 200 mins old (which are 1565 // records 0 thru 90). 1566 engine._delete = { ids: [], newer: now / 1000 - 60 * 200.5 }; 1567 for (i = 100; i < 234; i++) { 1568 engine._delete.ids.push("record-no-" + i); 1569 } 1570 1571 await engine._syncFinish(); 1572 1573 // Ensure that the appropriate server data has been wiped while 1574 // preserving records 90 thru 200. 1575 for (i = 0; i < 234; i++) { 1576 let id = "record-no-" + i; 1577 if (i <= 90 || i >= 100) { 1578 Assert.equal(collection.payload(id), undefined); 1579 } else { 1580 Assert.ok(!!collection.payload(id)); 1581 } 1582 } 1583 1584 // The deletion was done in batches 1585 Assert.equal(noOfUploads, 2 + 1); 1586 1587 // The deletion todo list has been reset. 1588 Assert.equal(engine._delete.ids, undefined); 1589 } finally { 1590 await cleanAndGo(engine, server); 1591 } 1592 }); 1593 1594 add_task(async function test_sync_partialUpload() { 1595 _("SyncEngine.sync() keeps changedIDs that couldn't be uploaded."); 1596 1597 let collection = new ServerCollection(); 1598 let server = sync_httpd_setup({ 1599 "/1.1/foo/storage/rotary": collection.handler(), 1600 }); 1601 let oldServerConfiguration = Service.serverConfiguration; 1602 Service.serverConfiguration = { 1603 max_post_records: 100, 1604 }; 1605 await SyncTestingInfrastructure(server); 1606 await generateNewKeys(Service.collectionKeys); 1607 1608 let engine = makeRotaryEngine(); 1609 1610 // Let the third upload fail completely 1611 var noOfUploads = 0; 1612 collection.post = (function (orig) { 1613 return function () { 1614 if (noOfUploads == 2) { 1615 throw new Error("FAIL!"); 1616 } 1617 noOfUploads++; 1618 return orig.apply(this, arguments); 1619 }; 1620 })(collection.post); 1621 1622 // Create a bunch of records (and server side handlers) 1623 for (let i = 0; i < 234; i++) { 1624 let id = "record-no-" + i; 1625 engine._store.items[id] = "Record No. " + i; 1626 await engine._tracker.addChangedID(id, i); 1627 // Let two items in the first upload batch fail. 1628 if (i != 23 && i != 42) { 1629 collection.insert(id); 1630 } 1631 } 1632 1633 let syncID = await engine.resetLocalSyncID(); 1634 let meta_global = Service.recordManager.set( 1635 engine.metaURL, 1636 new WBORecord(engine.metaURL) 1637 ); 1638 meta_global.payload.engines = { rotary: { version: engine.version, syncID } }; 1639 1640 try { 1641 await engine.setLastSync(123); // needs to be non-zero so that tracker is queried 1642 1643 engine.enabled = true; 1644 let error; 1645 try { 1646 await sync_engine_and_validate_telem(engine, true); 1647 } catch (ex) { 1648 error = ex; 1649 } 1650 1651 ok(!!error); 1652 1653 const changes = await engine._tracker.getChangedIDs(); 1654 for (let i = 0; i < 234; i++) { 1655 let id = "record-no-" + i; 1656 // Ensure failed records are back in the tracker: 1657 // * records no. 23 and 42 were rejected by the server, 1658 // * records after the third batch and higher couldn't be uploaded because 1659 // we failed hard on the 3rd upload. 1660 if (i == 23 || i == 42 || i >= 200) { 1661 Assert.equal(changes[id], i); 1662 } else { 1663 Assert.equal(false, id in changes); 1664 } 1665 } 1666 } finally { 1667 Service.serverConfiguration = oldServerConfiguration; 1668 await promiseClean(engine, server); 1669 } 1670 }); 1671 1672 add_task(async function test_canDecrypt_noCryptoKeys() { 1673 _( 1674 "SyncEngine.canDecrypt returns false if the engine fails to decrypt items on the server, e.g. due to a missing crypto key collection." 1675 ); 1676 1677 // Wipe collection keys so we can test the desired scenario. 1678 Service.collectionKeys.clear(); 1679 1680 let collection = new ServerCollection(); 1681 collection._wbos.flying = new ServerWBO( 1682 "flying", 1683 encryptPayload({ id: "flying", denomination: "LNER Class A3 4472" }) 1684 ); 1685 1686 let server = sync_httpd_setup({ 1687 "/1.1/foo/storage/rotary": collection.handler(), 1688 }); 1689 1690 await SyncTestingInfrastructure(server); 1691 let engine = makeRotaryEngine(); 1692 try { 1693 Assert.equal(false, await engine.canDecrypt()); 1694 } finally { 1695 await cleanAndGo(engine, server); 1696 } 1697 }); 1698 1699 add_task(async function test_canDecrypt_true() { 1700 _( 1701 "SyncEngine.canDecrypt returns true if the engine can decrypt the items on the server." 1702 ); 1703 1704 await generateNewKeys(Service.collectionKeys); 1705 1706 let collection = new ServerCollection(); 1707 collection._wbos.flying = new ServerWBO( 1708 "flying", 1709 encryptPayload({ id: "flying", denomination: "LNER Class A3 4472" }) 1710 ); 1711 1712 let server = sync_httpd_setup({ 1713 "/1.1/foo/storage/rotary": collection.handler(), 1714 }); 1715 1716 await SyncTestingInfrastructure(server); 1717 let engine = makeRotaryEngine(); 1718 try { 1719 Assert.ok(await engine.canDecrypt()); 1720 } finally { 1721 await cleanAndGo(engine, server); 1722 } 1723 }); 1724 1725 add_task(async function test_syncapplied_observer() { 1726 const NUMBER_OF_RECORDS = 10; 1727 1728 let engine = makeRotaryEngine(); 1729 1730 // Create a batch of server side records. 1731 let collection = new ServerCollection(); 1732 for (var i = 0; i < NUMBER_OF_RECORDS; i++) { 1733 let id = "record-no-" + i; 1734 let payload = encryptPayload({ id, denomination: "Record No. " + id }); 1735 collection.insert(id, payload); 1736 } 1737 1738 let server = httpd_setup({ 1739 "/1.1/foo/storage/rotary": collection.handler(), 1740 }); 1741 1742 await SyncTestingInfrastructure(server); 1743 1744 let syncID = await engine.resetLocalSyncID(); 1745 let meta_global = Service.recordManager.set( 1746 engine.metaURL, 1747 new WBORecord(engine.metaURL) 1748 ); 1749 meta_global.payload.engines = { rotary: { version: engine.version, syncID } }; 1750 1751 let numApplyCalls = 0; 1752 let engine_name; 1753 let count; 1754 function onApplied(subject, data) { 1755 numApplyCalls++; 1756 engine_name = data; 1757 count = subject; 1758 } 1759 1760 Svc.Obs.add("weave:engine:sync:applied", onApplied); 1761 1762 try { 1763 Service.scheduler.hasIncomingItems = false; 1764 1765 // Do sync. 1766 await engine._syncStartup(); 1767 await engine._processIncoming(); 1768 1769 do_check_attribute_count(engine._store.items, 10); 1770 1771 Assert.equal(numApplyCalls, 1); 1772 Assert.equal(engine_name, "rotary"); 1773 Assert.equal(count.applied, 10); 1774 1775 Assert.ok(Service.scheduler.hasIncomingItems); 1776 } finally { 1777 await cleanAndGo(engine, server); 1778 Service.scheduler.hasIncomingItems = false; 1779 Svc.Obs.remove("weave:engine:sync:applied", onApplied); 1780 } 1781 });