engines.sys.mjs (70363B)
1 /* This Source Code Form is subject to the terms of the Mozilla Public 2 * License, v. 2.0. If a copy of the MPL was not distributed with this 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4 5 import { XPCOMUtils } from "resource://gre/modules/XPCOMUtils.sys.mjs"; 6 7 import { JSONFile } from "resource://gre/modules/JSONFile.sys.mjs"; 8 import { Log } from "resource://gre/modules/Log.sys.mjs"; 9 10 import { Async } from "resource://services-common/async.sys.mjs"; 11 import { Observers } from "resource://services-common/observers.sys.mjs"; 12 13 import { 14 DEFAULT_DOWNLOAD_BATCH_SIZE, 15 DEFAULT_GUID_FETCH_BATCH_SIZE, 16 ENGINE_BATCH_INTERRUPTED, 17 ENGINE_DOWNLOAD_FAIL, 18 ENGINE_UPLOAD_FAIL, 19 VERSION_OUT_OF_DATE, 20 PREFS_BRANCH, 21 } from "resource://services-sync/constants.sys.mjs"; 22 23 import { 24 Collection, 25 CryptoWrapper, 26 } from "resource://services-sync/record.sys.mjs"; 27 import { Resource } from "resource://services-sync/resource.sys.mjs"; 28 import { 29 SerializableSet, 30 Svc, 31 Utils, 32 } from "resource://services-sync/util.sys.mjs"; 33 import { SyncedRecordsTelemetry } from "resource://services-sync/telemetry.sys.mjs"; 34 35 const lazy = {}; 36 37 ChromeUtils.defineESModuleGetters(lazy, { 38 PlacesUtils: "resource://gre/modules/PlacesUtils.sys.mjs", 39 }); 40 41 function ensureDirectory(path) { 42 return IOUtils.makeDirectory(PathUtils.parent(path), { 43 createAncestors: true, 44 }); 45 } 46 47 /** 48 * Trackers are associated with a single engine and deal with 49 * listening for changes to their particular data type. 50 * 51 * The base `Tracker` only supports listening for changes, and bumping the score 52 * to indicate how urgently the engine wants to sync. It does not persist any 53 * data. Engines that track changes directly in the storage layer (like 54 * bookmarks, bridged engines, addresses, and credit cards) or only upload a 55 * single record (tabs and preferences) should subclass `Tracker`. 56 */ 57 export function Tracker(name, engine) { 58 if (!engine) { 59 throw new Error("Tracker must be associated with an Engine instance."); 60 } 61 62 name = name || "Unnamed"; 63 this.name = name.toLowerCase(); 64 this.engine = engine; 65 66 this._log = Log.repository.getLogger(`Sync.Engine.${name}.Tracker`); 67 68 this._score = 0; 69 70 this.asyncObserver = Async.asyncObserver(this, this._log); 71 } 72 73 Tracker.prototype = { 74 // New-style trackers use change sources to filter out changes made by Sync in 75 // observer notifications, so we don't want to let the engine ignore all 76 // changes during a sync. 77 get ignoreAll() { 78 return false; 79 }, 80 81 // Define an empty setter so that the engine doesn't throw a `TypeError` 82 // setting a read-only property. 83 set ignoreAll(value) {}, 84 85 /* 86 * Score can be called as often as desired to decide which engines to sync 87 * 88 * Valid values for score: 89 * -1: Do not sync unless the user specifically requests it (almost disabled) 90 * 0: Nothing has changed 91 * 100: Please sync me ASAP! 92 * 93 * Setting it to other values should (but doesn't currently) throw an exception 94 */ 95 get score() { 96 return this._score; 97 }, 98 99 set score(value) { 100 this._score = value; 101 Observers.notify("weave:engine:score:updated", this.name); 102 }, 103 104 // Should be called by service everytime a sync has been done for an engine 105 resetScore() { 106 this._score = 0; 107 }, 108 109 // Unsupported, and throws a more descriptive error to ensure callers aren't 110 // accidentally using persistence. 111 async getChangedIDs() { 112 throw new TypeError("This tracker doesn't store changed IDs"); 113 }, 114 115 // Also unsupported. 116 async addChangedID() { 117 throw new TypeError("Can't add changed ID to this tracker"); 118 }, 119 120 // Ditto. 121 async removeChangedID() { 122 throw new TypeError("Can't remove changed IDs from this tracker"); 123 }, 124 125 // This method is called at various times, so we override with a no-op 126 // instead of throwing. 127 clearChangedIDs() {}, 128 129 _now() { 130 return Date.now() / 1000; 131 }, 132 133 _isTracking: false, 134 135 start() { 136 if (!this.engineIsEnabled()) { 137 return; 138 } 139 this._log.trace("start()."); 140 if (!this._isTracking) { 141 this.onStart(); 142 this._isTracking = true; 143 } 144 }, 145 146 async stop() { 147 this._log.trace("stop()."); 148 if (this._isTracking) { 149 await this.asyncObserver.promiseObserversComplete(); 150 this.onStop(); 151 this._isTracking = false; 152 } 153 }, 154 155 // Override these in your subclasses. 156 onStart() {}, 157 onStop() {}, 158 async observe() {}, 159 160 engineIsEnabled() { 161 if (!this.engine) { 162 // Can't tell -- we must be running in a test! 163 return true; 164 } 165 return this.engine.enabled; 166 }, 167 168 /** 169 * Starts or stops listening for changes depending on the associated engine's 170 * enabled state. 171 * 172 * @param {boolean} engineEnabled Whether the engine was enabled. 173 */ 174 async onEngineEnabledChanged(engineEnabled) { 175 if (engineEnabled == this._isTracking) { 176 return; 177 } 178 179 if (engineEnabled) { 180 this.start(); 181 } else { 182 await this.stop(); 183 this.clearChangedIDs(); 184 } 185 }, 186 187 async finalize() { 188 await this.stop(); 189 }, 190 }; 191 192 /* 193 * A tracker that persists a list of IDs for all changed items that need to be 194 * synced. This is 🚨 _extremely deprecated_ 🚨 and only kept around for current 195 * engines. ⚠️ Please **don't use it** for new engines! ⚠️ 196 * 197 * Why is this kind of external change tracking deprecated? Because it causes 198 * consistency issues due to missed notifications, interrupted syncs, and the 199 * tracker's view of what changed diverging from the data store's. 200 */ 201 export function LegacyTracker(name, engine) { 202 Tracker.call(this, name, engine); 203 204 this._ignored = []; 205 this.file = this.name; 206 this._storage = new JSONFile({ 207 path: Utils.jsonFilePath("changes", this.file), 208 dataPostProcessor: json => this._dataPostProcessor(json), 209 beforeSave: () => this._beforeSave(), 210 }); 211 this._ignoreAll = false; 212 } 213 214 LegacyTracker.prototype = { 215 get ignoreAll() { 216 return this._ignoreAll; 217 }, 218 219 set ignoreAll(value) { 220 this._ignoreAll = value; 221 }, 222 223 // Default to an empty object if the file doesn't exist. 224 _dataPostProcessor(json) { 225 return (typeof json == "object" && json) || {}; 226 }, 227 228 // Ensure the Weave storage directory exists before writing the file. 229 _beforeSave() { 230 return ensureDirectory(this._storage.path); 231 }, 232 233 async getChangedIDs() { 234 await this._storage.load(); 235 return this._storage.data; 236 }, 237 238 _saveChangedIDs() { 239 this._storage.saveSoon(); 240 }, 241 242 // ignore/unignore specific IDs. Useful for ignoring items that are 243 // being processed, or that shouldn't be synced. 244 // But note: not persisted to disk 245 246 ignoreID(id) { 247 this.unignoreID(id); 248 this._ignored.push(id); 249 }, 250 251 unignoreID(id) { 252 let index = this._ignored.indexOf(id); 253 if (index != -1) { 254 this._ignored.splice(index, 1); 255 } 256 }, 257 258 async _saveChangedID(id, when) { 259 this._log.trace(`Adding changed ID: ${id}, ${JSON.stringify(when)}`); 260 const changedIDs = await this.getChangedIDs(); 261 changedIDs[id] = when; 262 this._saveChangedIDs(); 263 }, 264 265 async addChangedID(id, when) { 266 if (!id) { 267 this._log.warn("Attempted to add undefined ID to tracker"); 268 return false; 269 } 270 271 if (this.ignoreAll || this._ignored.includes(id)) { 272 return false; 273 } 274 275 // Default to the current time in seconds if no time is provided. 276 if (when == null) { 277 when = this._now(); 278 } 279 280 const changedIDs = await this.getChangedIDs(); 281 // Add/update the entry if we have a newer time. 282 if ((changedIDs[id] || -Infinity) < when) { 283 await this._saveChangedID(id, when); 284 } 285 286 return true; 287 }, 288 289 async removeChangedID(...ids) { 290 if (!ids.length || this.ignoreAll) { 291 return false; 292 } 293 for (let id of ids) { 294 if (!id) { 295 this._log.warn("Attempted to remove undefined ID from tracker"); 296 continue; 297 } 298 if (this._ignored.includes(id)) { 299 this._log.debug(`Not removing ignored ID ${id} from tracker`); 300 continue; 301 } 302 const changedIDs = await this.getChangedIDs(); 303 if (changedIDs[id] != null) { 304 this._log.trace("Removing changed ID " + id); 305 delete changedIDs[id]; 306 } 307 } 308 this._saveChangedIDs(); 309 return true; 310 }, 311 312 clearChangedIDs() { 313 this._log.trace("Clearing changed ID list"); 314 this._storage.data = {}; 315 this._saveChangedIDs(); 316 }, 317 318 async finalize() { 319 // Persist all pending tracked changes to disk, and wait for the final write 320 // to finish. 321 await super.finalize(); 322 this._saveChangedIDs(); 323 await this._storage.finalize(); 324 }, 325 }; 326 Object.setPrototypeOf(LegacyTracker.prototype, Tracker.prototype); 327 328 /** 329 * The Store serves as the interface between Sync and stored data. 330 * 331 * The name "store" is slightly a misnomer because it doesn't actually "store" 332 * anything. Instead, it serves as a gateway to something that actually does 333 * the "storing." 334 * 335 * The store is responsible for record management inside an engine. It tells 336 * Sync what items are available for Sync, converts items to and from Sync's 337 * record format, and applies records from Sync into changes on the underlying 338 * store. 339 * 340 * Store implementations require a number of functions to be implemented. These 341 * are all documented below. 342 * 343 * For stores that deal with many records or which have expensive store access 344 * routines, it is highly recommended to implement a custom applyIncomingBatch 345 * and/or applyIncoming function on top of the basic APIs. 346 */ 347 348 export function Store(name, engine) { 349 if (!engine) { 350 throw new Error("Store must be associated with an Engine instance."); 351 } 352 353 name = name || "Unnamed"; 354 this.name = name.toLowerCase(); 355 this.engine = engine; 356 357 this._log = Log.repository.getLogger(`Sync.Engine.${name}.Store`); 358 359 ChromeUtils.defineLazyGetter(this, "_timer", function () { 360 return Cc["@mozilla.org/timer;1"].createInstance(Ci.nsITimer); 361 }); 362 } 363 364 Store.prototype = { 365 /** 366 * Apply multiple incoming records against the store. 367 * 368 * This is called with a set of incoming records to process. The function 369 * should look at each record, reconcile with the current local state, and 370 * make the local changes required to bring its state in alignment with the 371 * record. 372 * 373 * The default implementation simply iterates over all records and calls 374 * applyIncoming(). Store implementations may overwrite this function 375 * if desired. 376 * 377 * @param records Array of records to apply 378 * @param a SyncedRecordsTelemetry obj that will keep track of failed reasons 379 * @return Array of record IDs which did not apply cleanly 380 */ 381 async applyIncomingBatch(records, countTelemetry) { 382 let failed = []; 383 384 await Async.yieldingForEach(records, async record => { 385 try { 386 await this.applyIncoming(record); 387 } catch (ex) { 388 if (ex.code == SyncEngine.prototype.eEngineAbortApplyIncoming) { 389 // This kind of exception should have a 'cause' attribute, which is an 390 // originating exception. 391 // ex.cause will carry its stack with it when rethrown. 392 throw ex.cause; 393 } 394 if (Async.isShutdownException(ex)) { 395 throw ex; 396 } 397 this._log.warn("Failed to apply incoming record " + record.id, ex); 398 failed.push(record.id); 399 countTelemetry.addIncomingFailedReason(ex.message); 400 } 401 }); 402 403 return failed; 404 }, 405 406 /** 407 * Apply a single record against the store. 408 * 409 * This takes a single record and makes the local changes required so the 410 * local state matches what's in the record. 411 * 412 * The default implementation calls one of remove(), create(), or update() 413 * depending on the state obtained from the store itself. Store 414 * implementations may overwrite this function if desired. 415 * 416 * @param record 417 * Record to apply 418 */ 419 async applyIncoming(record) { 420 if (record.deleted) { 421 await this.remove(record); 422 } else if (!(await this.itemExists(record.id))) { 423 await this.create(record); 424 } else { 425 await this.update(record); 426 } 427 }, 428 429 // override these in derived objects 430 431 /** 432 * Create an item in the store from a record. 433 * 434 * This is called by the default implementation of applyIncoming(). If using 435 * applyIncomingBatch(), this won't be called unless your store calls it. 436 * 437 * @param record 438 * The store record to create an item from 439 */ 440 async create() { 441 throw new Error("override create in a subclass"); 442 }, 443 444 /** 445 * Remove an item in the store from a record. 446 * 447 * This is called by the default implementation of applyIncoming(). If using 448 * applyIncomingBatch(), this won't be called unless your store calls it. 449 * 450 * @param record 451 * The store record to delete an item from 452 */ 453 async remove() { 454 throw new Error("override remove in a subclass"); 455 }, 456 457 /** 458 * Update an item from a record. 459 * 460 * This is called by the default implementation of applyIncoming(). If using 461 * applyIncomingBatch(), this won't be called unless your store calls it. 462 * 463 * @param record 464 * The record to use to update an item from 465 */ 466 async update() { 467 throw new Error("override update in a subclass"); 468 }, 469 470 /** 471 * Determine whether a record with the specified ID exists. 472 * 473 * Takes a string record ID and returns a booleans saying whether the record 474 * exists. 475 * 476 * @param id 477 * string record ID 478 * @return boolean indicating whether record exists locally 479 */ 480 async itemExists() { 481 throw new Error("override itemExists in a subclass"); 482 }, 483 484 /** 485 * Create a record from the specified ID. 486 * 487 * If the ID is known, the record should be populated with metadata from 488 * the store. If the ID is not known, the record should be created with the 489 * delete field set to true. 490 * 491 * @param id 492 * string record ID 493 * @param collection 494 * Collection to add record to. This is typically passed into the 495 * constructor for the newly-created record. 496 * @return record type for this engine 497 */ 498 async createRecord() { 499 throw new Error("override createRecord in a subclass"); 500 }, 501 502 /** 503 * Change the ID of a record. 504 * 505 * @param oldID 506 * string old/current record ID 507 * @param newID 508 * string new record ID 509 */ 510 async changeItemID() { 511 throw new Error("override changeItemID in a subclass"); 512 }, 513 514 /** 515 * Obtain the set of all known record IDs. 516 * 517 * @return Object with ID strings as keys and values of true. The values 518 * are ignored. 519 */ 520 async getAllIDs() { 521 throw new Error("override getAllIDs in a subclass"); 522 }, 523 524 /** 525 * Wipe all data in the store. 526 * 527 * This function is called during remote wipes or when replacing local data 528 * with remote data. 529 * 530 * This function should delete all local data that the store is managing. It 531 * can be thought of as clearing out all state and restoring the "new 532 * browser" state. 533 */ 534 async wipe() { 535 throw new Error("override wipe in a subclass"); 536 }, 537 }; 538 539 export function EngineManager(service) { 540 this.service = service; 541 542 this._engines = {}; 543 544 this._altEngineInfo = {}; 545 546 // This will be populated by Service on startup. 547 this._declined = new Set(); 548 this._log = Log.repository.getLogger("Sync.EngineManager"); 549 this._log.manageLevelFromPref("services.sync.log.logger.service.engines"); 550 // define the default level for all engine logs here (although each engine 551 // allows its level to be controlled via a specific, non-default pref) 552 Log.repository 553 .getLogger(`Sync.Engine`) 554 .manageLevelFromPref("services.sync.log.logger.engine"); 555 } 556 557 EngineManager.prototype = { 558 get(name) { 559 // Return an array of engines if we have an array of names 560 if (Array.isArray(name)) { 561 let engines = []; 562 name.forEach(function (name) { 563 let engine = this.get(name); 564 if (engine) { 565 engines.push(engine); 566 } 567 }, this); 568 return engines; 569 } 570 571 return this._engines[name]; // Silently returns undefined for unknown names. 572 }, 573 574 getAll() { 575 let engines = []; 576 for (let [, engine] of Object.entries(this._engines)) { 577 engines.push(engine); 578 } 579 return engines; 580 }, 581 582 /** 583 * If a user has changed a pref that controls which variant of a sync engine 584 * for a given collection we use, unregister the old engine and register the 585 * new one. 586 * 587 * This is called by EngineSynchronizer before every sync. 588 */ 589 async switchAlternatives() { 590 for (let [name, info] of Object.entries(this._altEngineInfo)) { 591 let prefValue = info.prefValue; 592 if (prefValue === info.lastValue) { 593 this._log.trace( 594 `No change for engine ${name} (${info.pref} is still ${prefValue})` 595 ); 596 continue; 597 } 598 // Unregister the old engine, register the new one. 599 this._log.info( 600 `Switching ${name} engine ("${info.pref}" went from ${info.lastValue} => ${prefValue})` 601 ); 602 try { 603 await this._removeAndFinalize(name); 604 } catch (e) { 605 this._log.warn(`Failed to remove previous ${name} engine...`, e); 606 } 607 let engineType = prefValue ? info.whenTrue : info.whenFalse; 608 try { 609 // If register throws, we'll try again next sync, but until then there 610 // won't be an engine registered for this collection. 611 await this.register(engineType); 612 info.lastValue = prefValue; 613 // Note: engineType.name is using Function.prototype.name. 614 this._log.info(`Switched the ${name} engine to use ${engineType.name}`); 615 } catch (e) { 616 this._log.warn( 617 `Switching the ${name} engine to use ${engineType.name} failed (couldn't register)`, 618 e 619 ); 620 } 621 } 622 }, 623 624 async registerAlternatives(name, pref, whenTrue, whenFalse) { 625 let info = { name, pref, whenTrue, whenFalse }; 626 627 XPCOMUtils.defineLazyPreferenceGetter(info, "prefValue", pref, false); 628 629 let chosen = info.prefValue ? info.whenTrue : info.whenFalse; 630 info.lastValue = info.prefValue; 631 this._altEngineInfo[name] = info; 632 633 await this.register(chosen); 634 }, 635 636 /** 637 * N.B., does not pay attention to the declined list. 638 */ 639 getEnabled() { 640 return this.getAll() 641 .filter(engine => engine.enabled) 642 .sort((a, b) => a.syncPriority - b.syncPriority); 643 }, 644 645 get enabledEngineNames() { 646 return this.getEnabled().map(e => e.name); 647 }, 648 649 persistDeclined() { 650 Svc.PrefBranch.setStringPref( 651 "declinedEngines", 652 [...this._declined].join(",") 653 ); 654 }, 655 656 /** 657 * Returns an array. 658 */ 659 getDeclined() { 660 return [...this._declined]; 661 }, 662 663 setDeclined(engines) { 664 this._declined = new Set(engines); 665 this.persistDeclined(); 666 }, 667 668 isDeclined(engineName) { 669 return this._declined.has(engineName); 670 }, 671 672 /** 673 * Accepts a Set or an array. 674 */ 675 decline(engines) { 676 for (let e of engines) { 677 this._declined.add(e); 678 } 679 this.persistDeclined(); 680 }, 681 682 undecline(engines) { 683 for (let e of engines) { 684 this._declined.delete(e); 685 } 686 this.persistDeclined(); 687 }, 688 689 /** 690 * Register an Engine to the service. Alternatively, give an array of engine 691 * objects to register. 692 * 693 * @param engineObject 694 * Engine object used to get an instance of the engine 695 * @return The engine object if anything failed 696 */ 697 async register(engineObject) { 698 if (Array.isArray(engineObject)) { 699 for (const e of engineObject) { 700 await this.register(e); 701 } 702 return; 703 } 704 705 try { 706 let engine = new engineObject(this.service); 707 let name = engine.name; 708 if (name in this._engines) { 709 this._log.error("Engine '" + name + "' is already registered!"); 710 } else { 711 if (engine.initialize) { 712 await engine.initialize(); 713 } 714 this._engines[name] = engine; 715 } 716 } catch (ex) { 717 let name = engineObject || ""; 718 name = name.prototype || ""; 719 name = name.name || ""; 720 721 this._log.error(`Could not initialize engine ${name}`, ex); 722 } 723 }, 724 725 async unregister(val) { 726 let name = val; 727 if (val instanceof SyncEngine) { 728 name = val.name; 729 } 730 await this._removeAndFinalize(name); 731 delete this._altEngineInfo[name]; 732 }, 733 734 // Common code for disabling an engine by name, that doesn't complain if the 735 // engine doesn't exist. Doesn't touch the engine's alternative info (if any 736 // exists). 737 async _removeAndFinalize(name) { 738 if (name in this._engines) { 739 let engine = this._engines[name]; 740 delete this._engines[name]; 741 await engine.finalize(); 742 } 743 }, 744 745 async clear() { 746 for (let name in this._engines) { 747 let engine = this._engines[name]; 748 delete this._engines[name]; 749 await engine.finalize(); 750 } 751 this._altEngineInfo = {}; 752 }, 753 }; 754 755 export function SyncEngine(name, service) { 756 if (!service) { 757 throw new Error("SyncEngine must be associated with a Service instance."); 758 } 759 760 this.Name = name || "Unnamed"; 761 this.name = name.toLowerCase(); 762 this.service = service; 763 764 this._notify = Utils.notify("weave:engine:"); 765 this._log = Log.repository.getLogger("Sync.Engine." + this.Name); 766 this._log.manageLevelFromPref(`services.sync.log.logger.engine.${this.name}`); 767 768 this._modified = this.emptyChangeset(); 769 this._tracker; // initialize tracker to load previously changed IDs 770 this._log.debug("Engine constructed"); 771 772 this._toFetchStorage = new JSONFile({ 773 path: Utils.jsonFilePath("toFetch", this.name), 774 dataPostProcessor: json => this._metadataPostProcessor(json), 775 beforeSave: () => this._beforeSaveMetadata(), 776 }); 777 778 this._previousFailedStorage = new JSONFile({ 779 path: Utils.jsonFilePath("failed", this.name), 780 dataPostProcessor: json => this._metadataPostProcessor(json), 781 beforeSave: () => this._beforeSaveMetadata(), 782 }); 783 784 XPCOMUtils.defineLazyPreferenceGetter( 785 this, 786 "_enabled", 787 `services.sync.engine.${this.prefName}`, 788 false 789 ); 790 XPCOMUtils.defineLazyPreferenceGetter( 791 this, 792 "_syncID", 793 `services.sync.${this.name}.syncID`, 794 "" 795 ); 796 XPCOMUtils.defineLazyPreferenceGetter( 797 this, 798 "_lastSync", 799 `services.sync.${this.name}.lastSync`, 800 "0", 801 null, 802 v => parseFloat(v) 803 ); 804 // Async initializations can be made in the initialize() method. 805 806 this.asyncObserver = Async.asyncObserver(this, this._log); 807 } 808 809 // Enumeration to define approaches to handling bad records. 810 // Attached to the constructor to allow use as a kind of static enumeration. 811 SyncEngine.kRecoveryStrategy = { 812 ignore: "ignore", 813 retry: "retry", 814 error: "error", 815 }; 816 817 SyncEngine.prototype = { 818 _recordObj: CryptoWrapper, 819 // _storeObj, and _trackerObj should to be overridden in subclasses 820 _storeObj: Store, 821 _trackerObj: Tracker, 822 version: 1, 823 824 // Local 'constant'. 825 // Signal to the engine that processing further records is pointless. 826 eEngineAbortApplyIncoming: "error.engine.abort.applyincoming", 827 828 // Should we keep syncing if we find a record that cannot be uploaded (ever)? 829 // If this is false, we'll throw, otherwise, we'll ignore the record and 830 // continue. This currently can only happen due to the record being larger 831 // than the record upload limit. 832 allowSkippedRecord: true, 833 834 // Which sortindex to use when retrieving records for this engine. 835 _defaultSort: undefined, 836 837 _hasSyncedThisSession: false, 838 839 _metadataPostProcessor(json) { 840 if (Array.isArray(json)) { 841 // Pre-`JSONFile` storage stored an array, but `JSONFile` defaults to 842 // an object, so we wrap the array for consistency. 843 json = { ids: json }; 844 } 845 if (!json.ids) { 846 json.ids = []; 847 } 848 // The set serializes the same way as an array, but offers more efficient 849 // methods of manipulation. 850 json.ids = new SerializableSet(json.ids); 851 return json; 852 }, 853 854 async _beforeSaveMetadata() { 855 await ensureDirectory(this._toFetchStorage.path); 856 await ensureDirectory(this._previousFailedStorage.path); 857 }, 858 859 // A relative priority to use when computing an order 860 // for engines to be synced. Higher-priority engines 861 // (lower numbers) are synced first. 862 // It is recommended that a unique value be used for each engine, 863 // in order to guarantee a stable sequence. 864 syncPriority: 0, 865 866 // How many records to pull in a single sync. This is primarily to avoid very 867 // long first syncs against profiles with many history records. 868 downloadLimit: null, 869 870 // How many records to pull at one time when specifying IDs. This is to avoid 871 // URI length limitations. 872 guidFetchBatchSize: DEFAULT_GUID_FETCH_BATCH_SIZE, 873 874 downloadBatchSize: DEFAULT_DOWNLOAD_BATCH_SIZE, 875 876 async initialize() { 877 await this._toFetchStorage.load(); 878 await this._previousFailedStorage.load(); 879 Services.prefs.addObserver( 880 `${PREFS_BRANCH}engine.${this.prefName}`, 881 this.asyncObserver, 882 true 883 ); 884 this._log.debug("SyncEngine initialized", this.name); 885 }, 886 887 get prefName() { 888 return this.name; 889 }, 890 891 get enabled() { 892 return this._enabled; 893 }, 894 895 set enabled(val) { 896 if (!!val != this._enabled) { 897 Svc.PrefBranch.setBoolPref("engine." + this.prefName, !!val); 898 } 899 }, 900 901 get score() { 902 return this._tracker.score; 903 }, 904 905 get _store() { 906 let store = new this._storeObj(this.Name, this); 907 this.__defineGetter__("_store", () => store); 908 return store; 909 }, 910 911 get _tracker() { 912 let tracker = new this._trackerObj(this.Name, this); 913 this.__defineGetter__("_tracker", () => tracker); 914 return tracker; 915 }, 916 917 get storageURL() { 918 return this.service.storageURL; 919 }, 920 921 get engineURL() { 922 return this.storageURL + this.name; 923 }, 924 925 get cryptoKeysURL() { 926 return this.storageURL + "crypto/keys"; 927 }, 928 929 get metaURL() { 930 return this.storageURL + "meta/global"; 931 }, 932 933 startTracking() { 934 this._tracker.start(); 935 }, 936 937 // Returns a promise 938 stopTracking() { 939 return this._tracker.stop(); 940 }, 941 942 // Listens for engine enabled state changes, and updates the tracker's state. 943 // This is an async observer because the tracker waits on all its async 944 // observers to finish when it's stopped. 945 async observe(subject, topic, data) { 946 if ( 947 topic == "nsPref:changed" && 948 data == `services.sync.engine.${this.prefName}` 949 ) { 950 await this._tracker.onEngineEnabledChanged(this._enabled); 951 } 952 }, 953 954 async sync() { 955 if (!this.enabled) { 956 return false; 957 } 958 959 if (!this._sync) { 960 throw new Error("engine does not implement _sync method"); 961 } 962 963 return this._notify("sync", this.name, this._sync)(); 964 }, 965 966 // Override this method to return a new changeset type. 967 emptyChangeset() { 968 return new Changeset(); 969 }, 970 971 /** 972 * Returns the local sync ID for this engine, or `""` if the engine hasn't 973 * synced for the first time. This is exposed for tests. 974 * 975 * @return the current sync ID. 976 */ 977 async getSyncID() { 978 return this._syncID; 979 }, 980 981 /** 982 * Ensures that the local sync ID for the engine matches the sync ID for the 983 * collection on the server. A mismatch indicates that another client wiped 984 * the collection; we're syncing after a node reassignment, and another 985 * client synced before us; or the store was replaced since the last sync. 986 * In case of a mismatch, we need to reset all local Sync state and start 987 * over as a first sync. 988 * 989 * In most cases, this method should return the new sync ID as-is. However, an 990 * engine may ignore the given ID and assign a different one, if it determines 991 * that the sync ID on the server is out of date. The bookmarks engine uses 992 * this to wipe the server and other clients on the first sync after the user 993 * restores from a backup. 994 * 995 * @param newSyncID 996 * The new sync ID for the collection from `meta/global`. 997 * @return The assigned sync ID. If this doesn't match `newSyncID`, we'll 998 * replace the sync ID in `meta/global` with the assigned ID. 999 */ 1000 async ensureCurrentSyncID(newSyncID) { 1001 let existingSyncID = this._syncID; 1002 if (existingSyncID == newSyncID) { 1003 return existingSyncID; 1004 } 1005 this._log.debug( 1006 `Engine syncIDs differ (old="${existingSyncID}", new="${newSyncID}") - resetting the engine` 1007 ); 1008 await this.resetClient(); 1009 Svc.PrefBranch.setStringPref(this.name + ".syncID", newSyncID); 1010 Svc.PrefBranch.setStringPref(this.name + ".lastSync", "0"); 1011 return newSyncID; 1012 }, 1013 1014 /** 1015 * Resets the local sync ID for the engine, wipes the server, and resets all 1016 * local Sync state to start over as a first sync. 1017 * 1018 * @return the new sync ID. 1019 */ 1020 async resetSyncID() { 1021 let newSyncID = await this.resetLocalSyncID(); 1022 await this.wipeServer(); 1023 return newSyncID; 1024 }, 1025 1026 /** 1027 * Resets the local sync ID for the engine, signaling that we're starting over 1028 * as a first sync. 1029 * 1030 * @return the new sync ID. 1031 */ 1032 async resetLocalSyncID() { 1033 return this.ensureCurrentSyncID(Utils.makeGUID()); 1034 }, 1035 1036 /** 1037 * Allows overriding scheduler logic -- added to help reduce kinto server 1038 * getting hammered because our scheduler never got tuned for it. 1039 * 1040 * Note: Overriding engines must take resyncs into account -- score will not 1041 * be cleared. 1042 */ 1043 shouldSkipSync() { 1044 return false; 1045 }, 1046 1047 /* 1048 * lastSync is a timestamp in server time. 1049 */ 1050 async getLastSync() { 1051 return this._lastSync; 1052 }, 1053 async setLastSync(lastSync) { 1054 // Store the value as a string to keep floating point precision 1055 Svc.PrefBranch.setStringPref(this.name + ".lastSync", lastSync.toString()); 1056 }, 1057 async resetLastSync() { 1058 this._log.debug("Resetting " + this.name + " last sync time"); 1059 await this.setLastSync(0); 1060 }, 1061 1062 get hasSyncedThisSession() { 1063 return this._hasSyncedThisSession; 1064 }, 1065 1066 set hasSyncedThisSession(hasSynced) { 1067 this._hasSyncedThisSession = hasSynced; 1068 }, 1069 1070 get toFetch() { 1071 this._toFetchStorage.ensureDataReady(); 1072 return this._toFetchStorage.data.ids; 1073 }, 1074 1075 set toFetch(ids) { 1076 if (ids.constructor.name != "SerializableSet") { 1077 throw new Error( 1078 "Bug: Attempted to set toFetch to something that isn't a SerializableSet" 1079 ); 1080 } 1081 this._toFetchStorage.data = { ids }; 1082 this._toFetchStorage.saveSoon(); 1083 }, 1084 1085 get previousFailed() { 1086 this._previousFailedStorage.ensureDataReady(); 1087 return this._previousFailedStorage.data.ids; 1088 }, 1089 1090 set previousFailed(ids) { 1091 if (ids.constructor.name != "SerializableSet") { 1092 throw new Error( 1093 "Bug: Attempted to set previousFailed to something that isn't a SerializableSet" 1094 ); 1095 } 1096 this._previousFailedStorage.data = { ids }; 1097 this._previousFailedStorage.saveSoon(); 1098 }, 1099 1100 /* 1101 * Returns a changeset for this sync. Engine implementations can override this 1102 * method to bypass the tracker for certain or all changed items. 1103 */ 1104 async getChangedIDs() { 1105 return this._tracker.getChangedIDs(); 1106 }, 1107 1108 // Create a new record using the store and add in metadata. 1109 async _createRecord(id) { 1110 let record = await this._store.createRecord(id, this.name); 1111 record.id = id; 1112 record.collection = this.name; 1113 return record; 1114 }, 1115 1116 // Creates a tombstone Sync record with additional metadata. 1117 _createTombstone(id) { 1118 let tombstone = new this._recordObj(this.name, id); 1119 tombstone.id = id; 1120 tombstone.collection = this.name; 1121 tombstone.deleted = true; 1122 return tombstone; 1123 }, 1124 1125 // Any setup that needs to happen at the beginning of each sync. 1126 async _syncStartup() { 1127 // Determine if we need to wipe on outdated versions 1128 let metaGlobal = await this.service.recordManager.get(this.metaURL); 1129 let engines = metaGlobal.payload.engines || {}; 1130 let engineData = engines[this.name] || {}; 1131 1132 // Assume missing versions are 0 and wipe the server 1133 if ((engineData.version || 0) < this.version) { 1134 this._log.debug("Old engine data: " + [engineData.version, this.version]); 1135 1136 // Clear the server and reupload everything on bad version or missing 1137 // meta. Note that we don't regenerate per-collection keys here. 1138 let newSyncID = await this.resetSyncID(); 1139 1140 // Set the newer version and newly generated syncID 1141 engineData.version = this.version; 1142 engineData.syncID = newSyncID; 1143 1144 // Put the new data back into meta/global and mark for upload 1145 engines[this.name] = engineData; 1146 metaGlobal.payload.engines = engines; 1147 metaGlobal.changed = true; 1148 } else if (engineData.version > this.version) { 1149 // Don't sync this engine if the server has newer data 1150 1151 let error = new Error("New data: " + [engineData.version, this.version]); 1152 error.failureCode = VERSION_OUT_OF_DATE; 1153 throw error; 1154 } else { 1155 // Changes to syncID mean we'll need to upload everything 1156 let assignedSyncID = await this.ensureCurrentSyncID(engineData.syncID); 1157 if (assignedSyncID != engineData.syncID) { 1158 engineData.syncID = assignedSyncID; 1159 metaGlobal.changed = true; 1160 } 1161 } 1162 1163 // Save objects that need to be uploaded in this._modified. As we 1164 // successfully upload objects we remove them from this._modified. If an 1165 // error occurs or any objects fail to upload, they will remain in 1166 // this._modified. At the end of a sync, or after an error, we add all 1167 // objects remaining in this._modified to the tracker. 1168 let initialChanges = await this.pullChanges(); 1169 this._modified.replace(initialChanges); 1170 // Clear the tracker now. If the sync fails we'll add the ones we failed 1171 // to upload back. 1172 this._tracker.clearChangedIDs(); 1173 this._tracker.resetScore(); 1174 1175 // Keep track of what to delete at the end of sync 1176 this._delete = {}; 1177 }, 1178 1179 async pullChanges() { 1180 let lastSync = await this.getLastSync(); 1181 if (lastSync) { 1182 return this.pullNewChanges(); 1183 } 1184 this._log.debug("First sync, uploading all items"); 1185 return this.pullAllChanges(); 1186 }, 1187 1188 /** 1189 * A tiny abstraction to make it easier to test incoming record 1190 * application. 1191 */ 1192 itemSource() { 1193 return new Collection(this.engineURL, this._recordObj, this.service); 1194 }, 1195 1196 /** 1197 * Download and apply remote records changed since the last sync. This 1198 * happens in three stages. 1199 * 1200 * In the first stage, we fetch full records for all changed items, newest 1201 * first, up to the download limit. The limit lets us make progress for large 1202 * collections, where the sync is likely to be interrupted before we 1203 * can fetch everything. 1204 * 1205 * In the second stage, we fetch the IDs of any remaining records changed 1206 * since the last sync, add them to our backlog, and fast-forward our last 1207 * sync time. 1208 * 1209 * In the third stage, we fetch and apply records for all backlogged IDs, 1210 * as well as any records that failed to apply during the last sync. We 1211 * request records for the IDs in chunks, to avoid exceeding URL length 1212 * limits, then remove successfully applied records from the backlog, and 1213 * record IDs of any records that failed to apply to retry on the next sync. 1214 */ 1215 async _processIncoming() { 1216 this._log.trace("Downloading & applying server changes"); 1217 1218 let newitems = this.itemSource(); 1219 let lastSync = await this.getLastSync(); 1220 1221 newitems.newer = lastSync; 1222 newitems.full = true; 1223 1224 let downloadLimit = Infinity; 1225 if (this.downloadLimit) { 1226 // Fetch new records up to the download limit. Currently, only the history 1227 // engine sets a limit, since the history collection has the highest volume 1228 // of changed records between syncs. The other engines fetch all records 1229 // changed since the last sync. 1230 if (this._defaultSort) { 1231 // A download limit with a sort order doesn't make sense: we won't know 1232 // which records to backfill. 1233 throw new Error("Can't specify download limit with default sort order"); 1234 } 1235 newitems.sort = "newest"; 1236 downloadLimit = newitems.limit = this.downloadLimit; 1237 } else if (this._defaultSort) { 1238 // The bookmarks engine fetches records by sort index; other engines leave 1239 // the order unspecified. We can remove `_defaultSort` entirely after bug 1240 // 1305563: the sort index won't matter because we'll buffer all bookmarks 1241 // before applying. 1242 newitems.sort = this._defaultSort; 1243 } 1244 1245 // applied => number of items that should be applied. 1246 // failed => number of items that failed in this sync. 1247 // newFailed => number of items that failed for the first time in this sync. 1248 // reconciled => number of items that were reconciled. 1249 // failedReasons => {name, count} of reasons a record failed 1250 let countTelemetry = new SyncedRecordsTelemetry(); 1251 let count = countTelemetry.incomingCounts; 1252 let recordsToApply = []; 1253 let failedInCurrentSync = new SerializableSet(); 1254 1255 let oldestModified = this.lastModified; 1256 let downloadedIDs = new Set(); 1257 1258 // Stage 1: Fetch new records from the server, up to the download limit. 1259 if (this.lastModified == null || this.lastModified > lastSync) { 1260 let { response, records } = await newitems.getBatched( 1261 this.downloadBatchSize 1262 ); 1263 if (!response.success) { 1264 response.failureCode = ENGINE_DOWNLOAD_FAIL; 1265 throw response; 1266 } 1267 1268 await Async.yieldingForEach(records, async record => { 1269 downloadedIDs.add(record.id); 1270 1271 if (record.modified < oldestModified) { 1272 oldestModified = record.modified; 1273 } 1274 1275 let { shouldApply, error } = await this._maybeReconcile(record); 1276 if (error) { 1277 failedInCurrentSync.add(record.id); 1278 count.failed++; 1279 countTelemetry.addIncomingFailedReason(error.message); 1280 return; 1281 } 1282 if (!shouldApply) { 1283 count.reconciled++; 1284 return; 1285 } 1286 recordsToApply.push(record); 1287 }); 1288 1289 let failedToApply = await this._applyRecords( 1290 recordsToApply, 1291 countTelemetry 1292 ); 1293 Utils.setAddAll(failedInCurrentSync, failedToApply); 1294 1295 // `applied` is a bit of a misnomer: it counts records that *should* be 1296 // applied, so it also includes records that we tried to apply and failed. 1297 // `recordsToApply.length - failedToApply.length` is the number of records 1298 // that we *successfully* applied. 1299 count.failed += failedToApply.length; 1300 count.applied += recordsToApply.length; 1301 } 1302 1303 // Stage 2: If we reached our download limit, we might still have records 1304 // on the server that changed since the last sync. Fetch the IDs for the 1305 // remaining records, and add them to the backlog. Note that this stage 1306 // only runs for engines that set a download limit. 1307 if (downloadedIDs.size == downloadLimit) { 1308 let guidColl = this.itemSource(); 1309 1310 guidColl.newer = lastSync; 1311 guidColl.older = oldestModified; 1312 guidColl.sort = "oldest"; 1313 1314 let guids = await guidColl.get(); 1315 if (!guids.success) { 1316 throw guids; 1317 } 1318 1319 // Filtering out already downloaded IDs here isn't necessary. We only do 1320 // that in case the Sync server doesn't support `older` (bug 1316110). 1321 let remainingIDs = guids.obj.filter(id => !downloadedIDs.has(id)); 1322 if (remainingIDs.length) { 1323 this.toFetch = Utils.setAddAll(this.toFetch, remainingIDs); 1324 } 1325 } 1326 1327 // Fast-foward the lastSync timestamp since we have backlogged the 1328 // remaining items. 1329 if (lastSync < this.lastModified) { 1330 lastSync = this.lastModified; 1331 await this.setLastSync(lastSync); 1332 } 1333 1334 // Stage 3: Backfill records from the backlog, and those that failed to 1335 // decrypt or apply during the last sync. We only backfill up to the 1336 // download limit, to prevent a large backlog for one engine from blocking 1337 // the others. We'll keep processing the backlog on subsequent engine syncs. 1338 let failedInPreviousSync = this.previousFailed; 1339 let idsToBackfill = Array.from( 1340 Utils.setAddAll( 1341 Utils.subsetOfSize(this.toFetch, downloadLimit), 1342 failedInPreviousSync 1343 ) 1344 ); 1345 1346 // Note that we intentionally overwrite the previously failed list here. 1347 // Records that fail to decrypt or apply in two consecutive syncs are likely 1348 // corrupt; we remove them from the list because retrying and failing on 1349 // every subsequent sync just adds noise. 1350 this.previousFailed = failedInCurrentSync; 1351 1352 let backfilledItems = this.itemSource(); 1353 1354 backfilledItems.sort = "newest"; 1355 backfilledItems.full = true; 1356 1357 // `getBatched` includes the list of IDs as a query parameter, so we need to fetch 1358 // records in chunks to avoid exceeding URI length limits. 1359 if (this.guidFetchBatchSize) { 1360 for (let ids of lazy.PlacesUtils.chunkArray( 1361 idsToBackfill, 1362 this.guidFetchBatchSize 1363 )) { 1364 backfilledItems.ids = ids; 1365 1366 let { response, records } = await backfilledItems.getBatched( 1367 this.downloadBatchSize 1368 ); 1369 if (!response.success) { 1370 response.failureCode = ENGINE_DOWNLOAD_FAIL; 1371 throw response; 1372 } 1373 1374 let backfilledRecordsToApply = []; 1375 let failedInBackfill = []; 1376 1377 await Async.yieldingForEach(records, async record => { 1378 let { shouldApply, error } = await this._maybeReconcile(record); 1379 if (error) { 1380 failedInBackfill.push(record.id); 1381 count.failed++; 1382 countTelemetry.addIncomingFailedReason(error.message); 1383 return; 1384 } 1385 if (!shouldApply) { 1386 count.reconciled++; 1387 return; 1388 } 1389 backfilledRecordsToApply.push(record); 1390 }); 1391 1392 let failedToApply = await this._applyRecords( 1393 backfilledRecordsToApply, 1394 countTelemetry 1395 ); 1396 failedInBackfill.push(...failedToApply); 1397 1398 count.failed += failedToApply.length; 1399 count.applied += backfilledRecordsToApply.length; 1400 1401 this.toFetch = Utils.setDeleteAll(this.toFetch, ids); 1402 this.previousFailed = Utils.setAddAll( 1403 this.previousFailed, 1404 failedInBackfill 1405 ); 1406 1407 if (lastSync < this.lastModified) { 1408 lastSync = this.lastModified; 1409 await this.setLastSync(lastSync); 1410 } 1411 } 1412 } 1413 1414 count.newFailed = 0; 1415 for (let item of this.previousFailed) { 1416 // Anything that failed in the current sync that also failed in 1417 // the previous sync means there is likely something wrong with 1418 // the record, we remove it from trying again to prevent 1419 // infinitely syncing corrupted records 1420 if (failedInPreviousSync.has(item)) { 1421 this.previousFailed.delete(item); 1422 } else { 1423 // otherwise it's a new failed and we count it as so 1424 ++count.newFailed; 1425 } 1426 } 1427 1428 count.succeeded = Math.max(0, count.applied - count.failed); 1429 this._log.info( 1430 [ 1431 "Records:", 1432 count.applied, 1433 "applied,", 1434 count.succeeded, 1435 "successfully,", 1436 count.failed, 1437 "failed to apply,", 1438 count.newFailed, 1439 "newly failed to apply,", 1440 count.reconciled, 1441 "reconciled.", 1442 ].join(" ") 1443 ); 1444 Observers.notify("weave:engine:sync:applied", count, this.name); 1445 }, 1446 1447 async _maybeReconcile(item) { 1448 let key = this.service.collectionKeys.keyForCollection(this.name); 1449 1450 // Grab a later last modified if possible 1451 if (this.lastModified == null || item.modified > this.lastModified) { 1452 this.lastModified = item.modified; 1453 } 1454 1455 try { 1456 try { 1457 await item.decrypt(key); 1458 } catch (ex) { 1459 if (!Utils.isHMACMismatch(ex)) { 1460 throw ex; 1461 } 1462 let strategy = await this.handleHMACMismatch(item, true); 1463 if (strategy == SyncEngine.kRecoveryStrategy.retry) { 1464 // You only get one retry. 1465 try { 1466 // Try decrypting again, typically because we've got new keys. 1467 this._log.info("Trying decrypt again..."); 1468 key = this.service.collectionKeys.keyForCollection(this.name); 1469 await item.decrypt(key); 1470 strategy = null; 1471 } catch (ex) { 1472 if (!Utils.isHMACMismatch(ex)) { 1473 throw ex; 1474 } 1475 strategy = await this.handleHMACMismatch(item, false); 1476 } 1477 } 1478 1479 switch (strategy) { 1480 case null: 1481 // Retry succeeded! No further handling. 1482 break; 1483 case SyncEngine.kRecoveryStrategy.retry: 1484 this._log.debug("Ignoring second retry suggestion."); 1485 // Fall through to error case. 1486 case SyncEngine.kRecoveryStrategy.error: 1487 this._log.warn("Error decrypting record", ex); 1488 return { shouldApply: false, error: ex }; 1489 case SyncEngine.kRecoveryStrategy.ignore: 1490 this._log.debug( 1491 "Ignoring record " + item.id + " with bad HMAC: already handled." 1492 ); 1493 return { shouldApply: false, error: null }; 1494 } 1495 } 1496 } catch (ex) { 1497 if (Async.isShutdownException(ex)) { 1498 throw ex; 1499 } 1500 this._log.warn("Error decrypting record", ex); 1501 return { shouldApply: false, error: ex }; 1502 } 1503 1504 if (this._shouldDeleteRemotely(item)) { 1505 this._log.trace("Deleting item from server without applying", item); 1506 await this._deleteId(item.id); 1507 return { shouldApply: false, error: null }; 1508 } 1509 1510 let shouldApply; 1511 try { 1512 shouldApply = await this._reconcile(item); 1513 } catch (ex) { 1514 if (ex.code == SyncEngine.prototype.eEngineAbortApplyIncoming) { 1515 this._log.warn("Reconciliation failed: aborting incoming processing."); 1516 throw ex.cause; 1517 } else if (!Async.isShutdownException(ex)) { 1518 this._log.warn("Failed to reconcile incoming record " + item.id, ex); 1519 return { shouldApply: false, error: ex }; 1520 } else { 1521 throw ex; 1522 } 1523 } 1524 1525 if (!shouldApply) { 1526 this._log.trace("Skipping reconciled incoming item " + item.id); 1527 } 1528 1529 return { shouldApply, error: null }; 1530 }, 1531 1532 async _applyRecords(records, countTelemetry) { 1533 this._tracker.ignoreAll = true; 1534 try { 1535 let failedIDs = await this._store.applyIncomingBatch( 1536 records, 1537 countTelemetry 1538 ); 1539 return failedIDs; 1540 } catch (ex) { 1541 // Catch any error that escapes from applyIncomingBatch. At present 1542 // those will all be abort events. 1543 this._log.warn("Got exception, aborting processIncoming", ex); 1544 throw ex; 1545 } finally { 1546 this._tracker.ignoreAll = false; 1547 } 1548 }, 1549 1550 // Indicates whether an incoming item should be deleted from the server at 1551 // the end of the sync. Engines can override this method to clean up records 1552 // that shouldn't be on the server. 1553 _shouldDeleteRemotely() { 1554 return false; 1555 }, 1556 1557 /** 1558 * Find a GUID of an item that is a duplicate of the incoming item but happens 1559 * to have a different GUID 1560 * 1561 * @return GUID of the similar item; falsy otherwise 1562 */ 1563 async _findDupe() { 1564 // By default, assume there's no dupe items for the engine 1565 }, 1566 1567 /** 1568 * Called before a remote record is discarded due to failed reconciliation. 1569 * Used by bookmark sync to merge folder child orders. 1570 */ 1571 beforeRecordDiscard() {}, 1572 1573 // Called when the server has a record marked as deleted, but locally we've 1574 // changed it more recently than the deletion. If we return false, the 1575 // record will be deleted locally. If we return true, we'll reupload the 1576 // record to the server -- any extra work that's needed as part of this 1577 // process should be done at this point (such as mark the record's parent 1578 // for reuploading in the case of bookmarks). 1579 async _shouldReviveRemotelyDeletedRecord() { 1580 return true; 1581 }, 1582 1583 async _deleteId(id) { 1584 await this._tracker.removeChangedID(id); 1585 this._noteDeletedId(id); 1586 }, 1587 1588 // Marks an ID for deletion at the end of the sync. 1589 _noteDeletedId(id) { 1590 if (this._delete.ids == null) { 1591 this._delete.ids = [id]; 1592 } else { 1593 this._delete.ids.push(id); 1594 } 1595 }, 1596 1597 async _switchItemToDupe(localDupeGUID, incomingItem) { 1598 // The local, duplicate ID is always deleted on the server. 1599 await this._deleteId(localDupeGUID); 1600 1601 // We unconditionally change the item's ID in case the engine knows of 1602 // an item but doesn't expose it through itemExists. If the API 1603 // contract were stronger, this could be changed. 1604 this._log.debug( 1605 "Switching local ID to incoming: " + 1606 localDupeGUID + 1607 " -> " + 1608 incomingItem.id 1609 ); 1610 return this._store.changeItemID(localDupeGUID, incomingItem.id); 1611 }, 1612 1613 /** 1614 * Reconcile incoming record with local state. 1615 * 1616 * This function essentially determines whether to apply an incoming record. 1617 * 1618 * @param item 1619 * Record from server to be tested for application. 1620 * @return boolean 1621 * Truthy if incoming record should be applied. False if not. 1622 */ 1623 async _reconcile(item) { 1624 if (this._log.level <= Log.Level.Trace) { 1625 this._log.trace("Incoming: " + item); 1626 } 1627 1628 // We start reconciling by collecting a bunch of state. We do this here 1629 // because some state may change during the course of this function and we 1630 // need to operate on the original values. 1631 let existsLocally = await this._store.itemExists(item.id); 1632 let locallyModified = this._modified.has(item.id); 1633 1634 // TODO Handle clock drift better. Tracked in bug 721181. 1635 let remoteAge = Resource.serverTime - item.modified; 1636 let localAge = locallyModified 1637 ? Date.now() / 1000 - this._modified.getModifiedTimestamp(item.id) 1638 : null; 1639 let remoteIsNewer = remoteAge < localAge; 1640 1641 this._log.trace( 1642 "Reconciling " + 1643 item.id + 1644 ". exists=" + 1645 existsLocally + 1646 "; modified=" + 1647 locallyModified + 1648 "; local age=" + 1649 localAge + 1650 "; incoming age=" + 1651 remoteAge 1652 ); 1653 1654 // We handle deletions first so subsequent logic doesn't have to check 1655 // deleted flags. 1656 if (item.deleted) { 1657 // If the item doesn't exist locally, there is nothing for us to do. We 1658 // can't check for duplicates because the incoming record has no data 1659 // which can be used for duplicate detection. 1660 if (!existsLocally) { 1661 this._log.trace( 1662 "Ignoring incoming item because it was deleted and " + 1663 "the item does not exist locally." 1664 ); 1665 return false; 1666 } 1667 1668 // We decide whether to process the deletion by comparing the record 1669 // ages. If the item is not modified locally, the remote side wins and 1670 // the deletion is processed. If it is modified locally, we take the 1671 // newer record. 1672 if (!locallyModified) { 1673 this._log.trace( 1674 "Applying incoming delete because the local item " + 1675 "exists and isn't modified." 1676 ); 1677 return true; 1678 } 1679 this._log.trace("Incoming record is deleted but we had local changes."); 1680 1681 if (remoteIsNewer) { 1682 this._log.trace("Remote record is newer -- deleting local record."); 1683 return true; 1684 } 1685 // If the local record is newer, we defer to individual engines for 1686 // how to handle this. By default, we revive the record. 1687 let willRevive = await this._shouldReviveRemotelyDeletedRecord(item); 1688 this._log.trace("Local record is newer -- reviving? " + willRevive); 1689 1690 return !willRevive; 1691 } 1692 1693 // At this point the incoming record is not for a deletion and must have 1694 // data. If the incoming record does not exist locally, we check for a local 1695 // duplicate existing under a different ID. The default implementation of 1696 // _findDupe() is empty, so engines have to opt in to this functionality. 1697 // 1698 // If we find a duplicate, we change the local ID to the incoming ID and we 1699 // refresh the metadata collected above. See bug 710448 for the history 1700 // of this logic. 1701 if (!existsLocally) { 1702 let localDupeGUID = await this._findDupe(item); 1703 if (localDupeGUID) { 1704 this._log.trace( 1705 "Local item " + 1706 localDupeGUID + 1707 " is a duplicate for " + 1708 "incoming item " + 1709 item.id 1710 ); 1711 1712 // The current API contract does not mandate that the ID returned by 1713 // _findDupe() actually exists. Therefore, we have to perform this 1714 // check. 1715 existsLocally = await this._store.itemExists(localDupeGUID); 1716 1717 // If the local item was modified, we carry its metadata forward so 1718 // appropriate reconciling can be performed. 1719 if (this._modified.has(localDupeGUID)) { 1720 locallyModified = true; 1721 localAge = 1722 this._tracker._now() - 1723 this._modified.getModifiedTimestamp(localDupeGUID); 1724 remoteIsNewer = remoteAge < localAge; 1725 1726 this._modified.changeID(localDupeGUID, item.id); 1727 } else { 1728 locallyModified = false; 1729 localAge = null; 1730 } 1731 1732 // Tell the engine to do whatever it needs to switch the items. 1733 await this._switchItemToDupe(localDupeGUID, item); 1734 1735 this._log.debug( 1736 "Local item after duplication: age=" + 1737 localAge + 1738 "; modified=" + 1739 locallyModified + 1740 "; exists=" + 1741 existsLocally 1742 ); 1743 } else { 1744 this._log.trace("No duplicate found for incoming item: " + item.id); 1745 } 1746 } 1747 1748 // At this point we've performed duplicate detection. But, nothing here 1749 // should depend on duplicate detection as the above should have updated 1750 // state seamlessly. 1751 1752 if (!existsLocally) { 1753 // If the item doesn't exist locally and we have no local modifications 1754 // to the item (implying that it was not deleted), always apply the remote 1755 // item. 1756 if (!locallyModified) { 1757 this._log.trace( 1758 "Applying incoming because local item does not exist " + 1759 "and was not deleted." 1760 ); 1761 return true; 1762 } 1763 1764 // If the item was modified locally but isn't present, it must have 1765 // been deleted. If the incoming record is younger, we restore from 1766 // that record. 1767 if (remoteIsNewer) { 1768 this._log.trace( 1769 "Applying incoming because local item was deleted " + 1770 "before the incoming item was changed." 1771 ); 1772 this._modified.delete(item.id); 1773 return true; 1774 } 1775 1776 this._log.trace( 1777 "Ignoring incoming item because the local item's " + 1778 "deletion is newer." 1779 ); 1780 return false; 1781 } 1782 1783 // If the remote and local records are the same, there is nothing to be 1784 // done, so we don't do anything. In the ideal world, this logic wouldn't 1785 // be here and the engine would take a record and apply it. The reason we 1786 // want to defer this logic is because it would avoid a redundant and 1787 // possibly expensive dip into the storage layer to query item state. 1788 // This should get addressed in the async rewrite, so we ignore it for now. 1789 let localRecord = await this._createRecord(item.id); 1790 let recordsEqual = Utils.deepEquals(item.cleartext, localRecord.cleartext); 1791 1792 // If the records are the same, we don't need to do anything. This does 1793 // potentially throw away a local modification time. But, if the records 1794 // are the same, does it matter? 1795 if (recordsEqual) { 1796 this._log.trace( 1797 "Ignoring incoming item because the local item is identical." 1798 ); 1799 1800 this._modified.delete(item.id); 1801 return false; 1802 } 1803 1804 // At this point the records are different. 1805 1806 // If we have no local modifications, always take the server record. 1807 if (!locallyModified) { 1808 this._log.trace("Applying incoming record because no local conflicts."); 1809 return true; 1810 } 1811 1812 // At this point, records are different and the local record is modified. 1813 // We resolve conflicts by record age, where the newest one wins. This does 1814 // result in data loss and should be handled by giving the engine an 1815 // opportunity to merge the records. Bug 720592 tracks this feature. 1816 this._log.warn( 1817 "DATA LOSS: Both local and remote changes to record: " + item.id 1818 ); 1819 if (!remoteIsNewer) { 1820 this.beforeRecordDiscard(localRecord, item, remoteIsNewer); 1821 } 1822 return remoteIsNewer; 1823 }, 1824 1825 // Upload outgoing records. 1826 async _uploadOutgoing() { 1827 this._log.trace("Uploading local changes to server."); 1828 1829 // collection we'll upload 1830 let up = new Collection(this.engineURL, null, this.service); 1831 let modifiedIDs = new Set(this._modified.ids()); 1832 let countTelemetry = new SyncedRecordsTelemetry(); 1833 let counts = countTelemetry.outgoingCounts; 1834 this._log.info(`Uploading ${modifiedIDs.size} outgoing records`); 1835 if (modifiedIDs.size) { 1836 counts.sent = modifiedIDs.size; 1837 1838 let failed = []; 1839 let successful = []; 1840 let lastSync = await this.getLastSync(); 1841 let handleResponse = async (postQueue, resp, batchOngoing) => { 1842 // Note: We don't want to update this.lastSync, or this._modified until 1843 // the batch is complete, however we want to remember success/failure 1844 // indicators for when that happens. 1845 if (!resp.success) { 1846 this._log.debug(`Uploading records failed: ${resp.status}`); 1847 resp.failureCode = 1848 resp.status == 412 ? ENGINE_BATCH_INTERRUPTED : ENGINE_UPLOAD_FAIL; 1849 throw resp; 1850 } 1851 1852 // Update server timestamp from the upload. 1853 failed = failed.concat(Object.keys(resp.obj.failed)); 1854 successful = successful.concat(resp.obj.success); 1855 1856 if (batchOngoing) { 1857 // Nothing to do yet 1858 return; 1859 } 1860 1861 if (failed.length && this._log.level <= Log.Level.Debug) { 1862 this._log.debug( 1863 "Records that will be uploaded again because " + 1864 "the server couldn't store them: " + 1865 failed.join(", ") 1866 ); 1867 } 1868 1869 counts.failed += failed.length; 1870 Object.values(failed).forEach(message => { 1871 countTelemetry.addOutgoingFailedReason(message); 1872 }); 1873 1874 for (let id of successful) { 1875 this._modified.delete(id); 1876 } 1877 1878 await this._onRecordsWritten( 1879 successful, 1880 failed, 1881 postQueue.lastModified 1882 ); 1883 1884 // Advance lastSync since we've finished the batch. 1885 if (postQueue.lastModified > lastSync) { 1886 lastSync = postQueue.lastModified; 1887 await this.setLastSync(lastSync); 1888 } 1889 1890 // clear for next batch 1891 failed.length = 0; 1892 successful.length = 0; 1893 }; 1894 1895 let postQueue = up.newPostQueue(this._log, lastSync, handleResponse); 1896 1897 for (let id of modifiedIDs) { 1898 let out; 1899 let ok = false; 1900 try { 1901 out = await this._createRecord(id); 1902 if (this._log.level <= Log.Level.Trace) { 1903 this._log.trace("Outgoing: " + out); 1904 } 1905 await out.encrypt( 1906 this.service.collectionKeys.keyForCollection(this.name) 1907 ); 1908 ok = true; 1909 } catch (ex) { 1910 this._log.warn("Error creating record", ex); 1911 ++counts.failed; 1912 countTelemetry.addOutgoingFailedReason(ex.message); 1913 if (Async.isShutdownException(ex) || !this.allowSkippedRecord) { 1914 if (!this.allowSkippedRecord) { 1915 // Don't bother for shutdown errors 1916 Observers.notify("weave:engine:sync:uploaded", counts, this.name); 1917 } 1918 throw ex; 1919 } 1920 } 1921 if (ok) { 1922 let { enqueued, error } = await postQueue.enqueue(out); 1923 if (!enqueued) { 1924 ++counts.failed; 1925 countTelemetry.addOutgoingFailedReason(error.message); 1926 if (!this.allowSkippedRecord) { 1927 Observers.notify("weave:engine:sync:uploaded", counts, this.name); 1928 this._log.warn( 1929 `Failed to enqueue record "${id}" (aborting)`, 1930 error 1931 ); 1932 throw error; 1933 } 1934 this._modified.delete(id); 1935 this._log.warn( 1936 `Failed to enqueue record "${id}" (skipping)`, 1937 error 1938 ); 1939 } 1940 } 1941 await Async.promiseYield(); 1942 } 1943 await postQueue.flush(true); 1944 } 1945 1946 if (counts.sent || counts.failed) { 1947 Observers.notify("weave:engine:sync:uploaded", counts, this.name); 1948 } 1949 }, 1950 1951 async _onRecordsWritten() { 1952 // Implement this method to take specific actions against successfully 1953 // uploaded records and failed records. 1954 }, 1955 1956 // Any cleanup necessary. 1957 // Save the current snapshot so as to calculate changes at next sync 1958 async _syncFinish() { 1959 this._log.trace("Finishing up sync"); 1960 1961 let doDelete = async (key, val) => { 1962 let coll = new Collection(this.engineURL, this._recordObj, this.service); 1963 coll[key] = val; 1964 await coll.delete(); 1965 }; 1966 1967 for (let [key, val] of Object.entries(this._delete)) { 1968 // Remove the key for future uses 1969 delete this._delete[key]; 1970 1971 this._log.trace("doing post-sync deletions", { key, val }); 1972 // Send a simple delete for the property 1973 if (key != "ids" || val.length <= 100) { 1974 await doDelete(key, val); 1975 } else { 1976 // For many ids, split into chunks of at most 100 1977 while (val.length) { 1978 await doDelete(key, val.slice(0, 100)); 1979 val = val.slice(100); 1980 } 1981 } 1982 } 1983 this.hasSyncedThisSession = true; 1984 await this._tracker.asyncObserver.promiseObserversComplete(); 1985 }, 1986 1987 async _syncCleanup() { 1988 try { 1989 // Mark failed WBOs as changed again so they are reuploaded next time. 1990 await this.trackRemainingChanges(); 1991 } finally { 1992 this._modified.clear(); 1993 } 1994 }, 1995 1996 async _sync() { 1997 try { 1998 Async.checkAppReady(); 1999 await this._syncStartup(); 2000 Async.checkAppReady(); 2001 Observers.notify("weave:engine:sync:status", "process-incoming"); 2002 await this._processIncoming(); 2003 Async.checkAppReady(); 2004 Observers.notify("weave:engine:sync:status", "upload-outgoing"); 2005 try { 2006 await this._uploadOutgoing(); 2007 Async.checkAppReady(); 2008 await this._syncFinish(); 2009 } catch (ex) { 2010 if (!ex.status || ex.status != 412) { 2011 throw ex; 2012 } 2013 // a 412 posting just means another client raced - but we don't want 2014 // to treat that as a sync error - the next sync is almost certain 2015 // to work. 2016 this._log.warn("412 error during sync - will retry."); 2017 } 2018 } finally { 2019 await this._syncCleanup(); 2020 } 2021 }, 2022 2023 async canDecrypt() { 2024 // Report failure even if there's nothing to decrypt 2025 let canDecrypt = false; 2026 2027 // Fetch the most recently uploaded record and try to decrypt it 2028 let test = new Collection(this.engineURL, this._recordObj, this.service); 2029 test.limit = 1; 2030 test.sort = "newest"; 2031 test.full = true; 2032 2033 let key = this.service.collectionKeys.keyForCollection(this.name); 2034 2035 // Any failure fetching/decrypting will just result in false 2036 try { 2037 this._log.trace("Trying to decrypt a record from the server.."); 2038 let json = (await test.get()).obj[0]; 2039 let record = new this._recordObj(); 2040 record.deserialize(json); 2041 await record.decrypt(key); 2042 canDecrypt = true; 2043 } catch (ex) { 2044 if (Async.isShutdownException(ex)) { 2045 throw ex; 2046 } 2047 this._log.debug("Failed test decrypt", ex); 2048 } 2049 2050 return canDecrypt; 2051 }, 2052 2053 /** 2054 * Deletes the collection for this engine on the server, and removes all local 2055 * Sync metadata for this engine. This does *not* remove any existing data on 2056 * other clients. This is called when we reset the sync ID. 2057 */ 2058 async wipeServer() { 2059 await this._deleteServerCollection(); 2060 await this._resetClient(); 2061 }, 2062 2063 /** 2064 * Deletes the collection for this engine on the server, without removing 2065 * any local Sync metadata or user data. Deleting the collection will not 2066 * remove any user data on other clients, but will force other clients to 2067 * start over as a first sync. 2068 */ 2069 async _deleteServerCollection() { 2070 let response = await this.service.resource(this.engineURL).delete(); 2071 if (response.status != 200 && response.status != 404) { 2072 throw response; 2073 } 2074 }, 2075 2076 async removeClientData() { 2077 // Implement this method in engines that store client specific data 2078 // on the server. 2079 }, 2080 2081 /* 2082 * Decide on (and partially effect) an error-handling strategy. 2083 * 2084 * Asks the Service to respond to an HMAC error, which might result in keys 2085 * being downloaded. That call returns true if an action which might allow a 2086 * retry to occur. 2087 * 2088 * If `mayRetry` is truthy, and the Service suggests a retry, 2089 * handleHMACMismatch returns kRecoveryStrategy.retry. Otherwise, it returns 2090 * kRecoveryStrategy.error. 2091 * 2092 * Subclasses of SyncEngine can override this method to allow for different 2093 * behavior -- e.g., to delete and ignore erroneous entries. 2094 * 2095 * All return values will be part of the kRecoveryStrategy enumeration. 2096 */ 2097 async handleHMACMismatch(item, mayRetry) { 2098 // By default we either try again, or bail out noisily. 2099 return (await this.service.handleHMACEvent()) && mayRetry 2100 ? SyncEngine.kRecoveryStrategy.retry 2101 : SyncEngine.kRecoveryStrategy.error; 2102 }, 2103 2104 /** 2105 * Returns a changeset containing all items in the store. The default 2106 * implementation returns a changeset with timestamps from long ago, to 2107 * ensure we always use the remote version if one exists. 2108 * 2109 * This function is only called for the first sync. Subsequent syncs call 2110 * `pullNewChanges`. 2111 * 2112 * @return A `Changeset` object. 2113 */ 2114 async pullAllChanges() { 2115 let changes = {}; 2116 let ids = await this._store.getAllIDs(); 2117 for (let id in ids) { 2118 changes[id] = 0; 2119 } 2120 return changes; 2121 }, 2122 2123 /** 2124 * Returns a changeset containing entries for all currently tracked items. 2125 * The default implementation returns a changeset with timestamps indicating 2126 * when the item was added to the tracker. 2127 * 2128 * @return A `Changeset` object. 2129 */ 2130 async pullNewChanges() { 2131 await this._tracker.asyncObserver.promiseObserversComplete(); 2132 return this.getChangedIDs(); 2133 }, 2134 2135 /** 2136 * Adds all remaining changeset entries back to the tracker, typically for 2137 * items that failed to upload. This method is called at the end of each sync. 2138 * 2139 */ 2140 async trackRemainingChanges() { 2141 for (let [id, change] of this._modified.entries()) { 2142 await this._tracker.addChangedID(id, change); 2143 } 2144 }, 2145 2146 /** 2147 * Removes all local Sync metadata for this engine, but keeps all existing 2148 * local user data. 2149 */ 2150 async resetClient() { 2151 return this._notify("reset-client", this.name, this._resetClient)(); 2152 }, 2153 2154 async _resetClient() { 2155 await this.resetLastSync(); 2156 this.hasSyncedThisSession = false; 2157 this.previousFailed = new SerializableSet(); 2158 this.toFetch = new SerializableSet(); 2159 }, 2160 2161 /** 2162 * Removes all local Sync metadata and user data for this engine. 2163 */ 2164 async wipeClient() { 2165 return this._notify("wipe-client", this.name, this._wipeClient)(); 2166 }, 2167 2168 async _wipeClient() { 2169 await this.resetClient(); 2170 this._log.debug("Deleting all local data"); 2171 this._tracker.ignoreAll = true; 2172 await this._store.wipe(); 2173 this._tracker.ignoreAll = false; 2174 this._tracker.clearChangedIDs(); 2175 }, 2176 2177 /** 2178 * If one exists, initialize and return a validator for this engine (which 2179 * must have a `validate(engine)` method that returns a promise to an object 2180 * with a getSummary method). Otherwise return null. 2181 */ 2182 getValidator() { 2183 return null; 2184 }, 2185 2186 async finalize() { 2187 Services.prefs.removeObserver( 2188 `${PREFS_BRANCH}engine.${this.prefName}`, 2189 this.asyncObserver 2190 ); 2191 await this.asyncObserver.promiseObserversComplete(); 2192 await this._tracker.finalize(); 2193 await this._toFetchStorage.finalize(); 2194 await this._previousFailedStorage.finalize(); 2195 }, 2196 2197 // Returns a new watchdog. Exposed for tests. 2198 _newWatchdog() { 2199 return Async.watchdog(); 2200 }, 2201 }; 2202 2203 /** 2204 * A changeset is created for each sync in `Engine::get{Changed, All}IDs`, 2205 * and stores opaque change data for tracked IDs. The default implementation 2206 * only records timestamps, though engines can extend this to store additional 2207 * data for each entry. 2208 */ 2209 export class Changeset { 2210 // Creates an empty changeset. 2211 constructor() { 2212 this.changes = {}; 2213 } 2214 2215 // Returns the last modified time, in seconds, for an entry in the changeset. 2216 // `id` is guaranteed to be in the set. 2217 getModifiedTimestamp(id) { 2218 return this.changes[id]; 2219 } 2220 2221 // Adds a change for a tracked ID to the changeset. 2222 set(id, change) { 2223 this.changes[id] = change; 2224 } 2225 2226 // Adds multiple entries to the changeset, preserving existing entries. 2227 insert(changes) { 2228 Object.assign(this.changes, changes); 2229 } 2230 2231 // Overwrites the existing set of tracked changes with new entries. 2232 replace(changes) { 2233 this.changes = changes; 2234 } 2235 2236 // Indicates whether an entry is in the changeset. 2237 has(id) { 2238 return id in this.changes; 2239 } 2240 2241 // Deletes an entry from the changeset. Used to clean up entries for 2242 // reconciled and successfully uploaded records. 2243 delete(id) { 2244 delete this.changes[id]; 2245 } 2246 2247 // Changes the ID of an entry in the changeset. Used when reconciling 2248 // duplicates that have local changes. 2249 changeID(oldID, newID) { 2250 this.changes[newID] = this.changes[oldID]; 2251 delete this.changes[oldID]; 2252 } 2253 2254 // Returns an array of all tracked IDs in this changeset. 2255 ids() { 2256 return Object.keys(this.changes); 2257 } 2258 2259 // Returns an array of `[id, change]` tuples. Used to repopulate the tracker 2260 // with entries for failed uploads at the end of a sync. 2261 entries() { 2262 return Object.entries(this.changes); 2263 } 2264 2265 // Returns the number of entries in this changeset. 2266 count() { 2267 return this.ids().length; 2268 } 2269 2270 // Clears the changeset. 2271 clear() { 2272 this.changes = {}; 2273 } 2274 }