record.sys.mjs (40896B)
1 /* This Source Code Form is subject to the terms of the Mozilla Public 2 * License, v. 2.0. If a copy of the MPL was not distributed with this 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4 5 const CRYPTO_COLLECTION = "crypto"; 6 const KEYS_WBO = "keys"; 7 8 import { Log } from "resource://gre/modules/Log.sys.mjs"; 9 10 import { 11 DEFAULT_DOWNLOAD_BATCH_SIZE, 12 DEFAULT_KEYBUNDLE_NAME, 13 } from "resource://services-sync/constants.sys.mjs"; 14 import { BulkKeyBundle } from "resource://services-sync/keys.sys.mjs"; 15 import { Weave } from "resource://services-sync/main.sys.mjs"; 16 import { Resource } from "resource://services-sync/resource.sys.mjs"; 17 import { Utils } from "resource://services-sync/util.sys.mjs"; 18 19 import { Async } from "resource://services-common/async.sys.mjs"; 20 import { CommonUtils } from "resource://services-common/utils.sys.mjs"; 21 import { CryptoUtils } from "moz-src:///services/crypto/modules/utils.sys.mjs"; 22 23 /** 24 * The base class for all Sync basic storage objects (BSOs). This is the format 25 * used to store all records on the Sync server. In an earlier version of the 26 * Sync protocol, BSOs used to be called WBOs, or Weave Basic Objects. This 27 * class retains the old name. 28 * 29 * @class 30 * @param {string} collection The collection name for this BSO. 31 * @param {string} id The ID of this BSO. 32 */ 33 export function WBORecord(collection, id) { 34 this.data = {}; 35 this.payload = {}; 36 this.collection = collection; // Optional. 37 this.id = id; // Optional. 38 } 39 40 WBORecord.prototype = { 41 _logName: "Sync.Record.WBO", 42 43 get sortindex() { 44 if (this.data.sortindex) { 45 return this.data.sortindex; 46 } 47 return 0; 48 }, 49 50 // Get thyself from your URI, then deserialize. 51 // Set thine 'response' field. 52 async fetch(resource) { 53 if (!(resource instanceof Resource)) { 54 throw new Error("First argument must be a Resource instance."); 55 } 56 57 let r = await resource.get(); 58 if (r.success) { 59 this.deserialize(r.obj); // Warning! Muffles exceptions! 60 } 61 this.response = r; 62 return this; 63 }, 64 65 upload(resource) { 66 if (!(resource instanceof Resource)) { 67 throw new Error("First argument must be a Resource instance."); 68 } 69 70 return resource.put(this); 71 }, 72 73 // Take a base URI string, with trailing slash, and return the URI of this 74 // WBO based on collection and ID. 75 uri(base) { 76 if (!this.collection || !this.id) { 77 return null; 78 } 79 let url = CommonUtils.makeURI(base + this.collection + "/" + this.id); 80 81 // Bug 1985401: Prevent QueryInterface error when makeURI returns null 82 if (!url) { 83 throw new Error( 84 `WBORecord.uri(): makeURI returned null for base='${base}', collection='${this.collection}', id='${this.id}'` 85 ); 86 } 87 url.QueryInterface(Ci.nsIURL); 88 return url; 89 }, 90 91 deserialize: function deserialize(json) { 92 if (!json || typeof json !== "object") { 93 throw new TypeError("Can't deserialize record from: " + json); 94 } 95 this.data = json; 96 try { 97 // The payload is likely to be JSON, but if not, keep it as a string 98 this.payload = JSON.parse(this.payload); 99 } catch (ex) {} 100 }, 101 102 toJSON: function toJSON() { 103 // Copy fields from data to be stringified, making sure payload is a string 104 let obj = {}; 105 for (let [key, val] of Object.entries(this.data)) { 106 obj[key] = key == "payload" ? JSON.stringify(val) : val; 107 } 108 if (this.ttl) { 109 obj.ttl = this.ttl; 110 } 111 return obj; 112 }, 113 114 toString: function toString() { 115 return ( 116 "{ " + 117 "id: " + 118 this.id + 119 " " + 120 "index: " + 121 this.sortindex + 122 " " + 123 "modified: " + 124 this.modified + 125 " " + 126 "ttl: " + 127 this.ttl + 128 " " + 129 "payload: " + 130 JSON.stringify(this.payload) + 131 " }" 132 ); 133 }, 134 }; 135 136 Utils.deferGetSet(WBORecord, "data", [ 137 "id", 138 "modified", 139 "sortindex", 140 "payload", 141 ]); 142 143 /** 144 * An encrypted BSO record. This subclass handles encrypting and decrypting the 145 * BSO payload, but doesn't parse or interpret the cleartext string. Subclasses 146 * must override `transformBeforeEncrypt` and `transformAfterDecrypt` to process 147 * the cleartext. 148 * 149 * This class is only exposed for bridged engines, which handle serialization 150 * and deserialization in Rust. Sync engines implemented in JS should subclass 151 * `CryptoWrapper` instead, which takes care of transforming the cleartext into 152 * an object, and ensuring its contents are valid. 153 * 154 * @class 155 * @template Cleartext 156 * @param {string} collection The collection name for this BSO. 157 * @param {string} id The ID of this BSO. 158 */ 159 export function RawCryptoWrapper(collection, id) { 160 // Setting properties before calling the superclass constructor isn't allowed 161 // in new-style classes (`class MyRecord extends RawCryptoWrapper`), but 162 // allowed with plain functions. This is also why `defaultCleartext` is a 163 // method, and not simply set in the subclass constructor. 164 this.cleartext = this.defaultCleartext(); 165 WBORecord.call(this, collection, id); 166 this.ciphertext = null; 167 } 168 169 RawCryptoWrapper.prototype = { 170 _logName: "Sync.Record.RawCryptoWrapper", 171 172 /** 173 * Returns the default empty cleartext for this record type. This is exposed 174 * as a method so that subclasses can override it, and access the default 175 * cleartext in their constructors. `CryptoWrapper`, for example, overrides 176 * this to return an empty object, so that initializing the `id` in its 177 * constructor calls its overridden `id` setter. 178 * 179 * @returns {Cleartext} An empty cleartext. 180 */ 181 defaultCleartext() { 182 return null; 183 }, 184 185 /** 186 * Transforms the cleartext into a string that can be encrypted and wrapped 187 * in a BSO payload. This is called before uploading the record to the server. 188 * 189 * @param {Cleartext} outgoingCleartext The cleartext to upload. 190 * @returns {string} The serialized cleartext. 191 */ 192 transformBeforeEncrypt() { 193 throw new TypeError("Override to stringify outgoing records"); 194 }, 195 196 /** 197 * Transforms an incoming cleartext string into an instance of the 198 * `Cleartext` type. This is called when fetching the record from the 199 * server. 200 * 201 * @param {string} incomingCleartext The decrypted cleartext string. 202 * @returns {Cleartext} The parsed cleartext. 203 */ 204 transformAfterDecrypt() { 205 throw new TypeError("Override to parse incoming records"); 206 }, 207 208 ciphertextHMAC: async function ciphertextHMAC(keyBundle) { 209 let hmacKeyByteString = keyBundle.hmacKey; 210 if (!hmacKeyByteString) { 211 throw new Error("Cannot compute HMAC without an HMAC key."); 212 } 213 let hmacKey = CommonUtils.byteStringToArrayBuffer(hmacKeyByteString); 214 // NB: this.ciphertext is a base64-encoded string. For some reason this 215 // implementation computes the HMAC on the encoded value. 216 let data = CommonUtils.byteStringToArrayBuffer(this.ciphertext); 217 let hmac = await CryptoUtils.hmac("SHA-256", hmacKey, data); 218 return CommonUtils.bytesAsHex(CommonUtils.arrayBufferToByteString(hmac)); 219 }, 220 221 /* 222 * Don't directly use the sync key. Instead, grab a key for this 223 * collection, which is decrypted with the sync key. 224 * 225 * Cache those keys; invalidate the cache if the time on the keys collection 226 * changes, or other auth events occur. 227 * 228 * Optional key bundle overrides the collection key lookup. 229 */ 230 async encrypt(keyBundle) { 231 if (!keyBundle) { 232 throw new Error("A key bundle must be supplied to encrypt."); 233 } 234 235 this.IV = Weave.Crypto.generateRandomIV(); 236 this.ciphertext = await Weave.Crypto.encrypt( 237 this.transformBeforeEncrypt(this.cleartext), 238 keyBundle.encryptionKeyB64, 239 this.IV 240 ); 241 this.hmac = await this.ciphertextHMAC(keyBundle); 242 this.cleartext = null; 243 }, 244 245 // Optional key bundle. 246 async decrypt(keyBundle) { 247 if (!this.ciphertext) { 248 throw new Error("No ciphertext: nothing to decrypt?"); 249 } 250 251 if (!keyBundle) { 252 throw new Error("A key bundle must be supplied to decrypt."); 253 } 254 255 // Authenticate the encrypted blob with the expected HMAC 256 let computedHMAC = await this.ciphertextHMAC(keyBundle); 257 258 if (computedHMAC != this.hmac) { 259 Utils.throwHMACMismatch(this.hmac, computedHMAC); 260 } 261 262 let cleartext = await Weave.Crypto.decrypt( 263 this.ciphertext, 264 keyBundle.encryptionKeyB64, 265 this.IV 266 ); 267 this.cleartext = this.transformAfterDecrypt(cleartext); 268 this.ciphertext = null; 269 270 return this.cleartext; 271 }, 272 }; 273 274 Object.setPrototypeOf(RawCryptoWrapper.prototype, WBORecord.prototype); 275 276 Utils.deferGetSet(RawCryptoWrapper, "payload", ["ciphertext", "IV", "hmac"]); 277 278 /** 279 * An encrypted BSO record with a JSON payload. All engines implemented in JS 280 * should subclass this class to describe their own record types. 281 * 282 * @class 283 * @param {string} collection The collection name for this BSO. 284 * @param {string} id The ID of this BSO. 285 */ 286 export function CryptoWrapper(collection, id) { 287 RawCryptoWrapper.call(this, collection, id); 288 } 289 290 CryptoWrapper.prototype = { 291 _logName: "Sync.Record.CryptoWrapper", 292 293 defaultCleartext() { 294 return {}; 295 }, 296 297 transformBeforeEncrypt(cleartext) { 298 return JSON.stringify(cleartext); 299 }, 300 301 transformAfterDecrypt(cleartext) { 302 // Handle invalid data here. Elsewhere we assume that cleartext is an object. 303 let json_result = JSON.parse(cleartext); 304 305 if (!(json_result && json_result instanceof Object)) { 306 throw new Error( 307 `Decryption failed: result is <${json_result}>, not an object.` 308 ); 309 } 310 311 // If the payload has an encrypted id ensure it matches the requested record's id. 312 if (json_result.id && json_result.id != this.id) { 313 throw new Error(`Record id mismatch: ${json_result.id} != ${this.id}`); 314 } 315 316 return json_result; 317 }, 318 319 cleartextToString() { 320 return JSON.stringify(this.cleartext); 321 }, 322 323 toString: function toString() { 324 let payload = this.deleted ? "DELETED" : this.cleartextToString(); 325 326 return ( 327 "{ " + 328 "id: " + 329 this.id + 330 " " + 331 "index: " + 332 this.sortindex + 333 " " + 334 "modified: " + 335 this.modified + 336 " " + 337 "ttl: " + 338 this.ttl + 339 " " + 340 "payload: " + 341 payload + 342 " " + 343 "collection: " + 344 (this.collection || "undefined") + 345 " }" 346 ); 347 }, 348 349 // The custom setter below masks the parent's getter, so explicitly call it :( 350 get id() { 351 return super.id; 352 }, 353 354 // Keep both plaintext and encrypted versions of the id to verify integrity 355 set id(val) { 356 super.id = val; 357 this.cleartext.id = val; 358 }, 359 }; 360 361 Object.setPrototypeOf(CryptoWrapper.prototype, RawCryptoWrapper.prototype); 362 363 Utils.deferGetSet(CryptoWrapper, "cleartext", "deleted"); 364 365 /** 366 * An interface and caching layer for records. 367 */ 368 export function RecordManager(service) { 369 this.service = service; 370 371 this._log = Log.repository.getLogger(this._logName); 372 this._records = {}; 373 } 374 375 RecordManager.prototype = { 376 _recordType: CryptoWrapper, 377 _logName: "Sync.RecordManager", 378 379 async import(url) { 380 this._log.trace("Importing record: " + (url.spec ? url.spec : url)); 381 try { 382 // Clear out the last response with empty object if GET fails 383 this.response = {}; 384 this.response = await this.service.resource(url).get(); 385 386 // Don't parse and save the record on failure 387 if (!this.response.success) { 388 return null; 389 } 390 391 let record = new this._recordType(url); 392 record.deserialize(this.response.obj); 393 394 return this.set(url, record); 395 } catch (ex) { 396 if (Async.isShutdownException(ex)) { 397 throw ex; 398 } 399 this._log.debug("Failed to import record", ex); 400 return null; 401 } 402 }, 403 404 get(url) { 405 // Use a url string as the key to the hash 406 let spec = url.spec ? url.spec : url; 407 if (spec in this._records) { 408 return Promise.resolve(this._records[spec]); 409 } 410 return this.import(url); 411 }, 412 413 set: function RecordMgr_set(url, record) { 414 let spec = url.spec ? url.spec : url; 415 return (this._records[spec] = record); 416 }, 417 418 contains: function RecordMgr_contains(url) { 419 if ((url.spec || url) in this._records) { 420 return true; 421 } 422 return false; 423 }, 424 425 clearCache: function recordMgr_clearCache() { 426 this._records = {}; 427 }, 428 429 del: function RecordMgr_del(url) { 430 delete this._records[url]; 431 }, 432 }; 433 434 /** 435 * Keeps track of mappings between collection names ('tabs') and KeyBundles. 436 * 437 * You can update this thing simply by giving it /info/collections. It'll 438 * use the last modified time to bring itself up to date. 439 */ 440 export function CollectionKeyManager(lastModified, default_, collections) { 441 this.lastModified = lastModified || 0; 442 this._default = default_ || null; 443 this._collections = collections || {}; 444 445 this._log = Log.repository.getLogger("Sync.CollectionKeyManager"); 446 } 447 448 // TODO: persist this locally as an Identity. Bug 610913. 449 // Note that the last modified time needs to be preserved. 450 CollectionKeyManager.prototype = { 451 /** 452 * Generate a new CollectionKeyManager that has the same attributes 453 * as this one. 454 */ 455 clone() { 456 const newCollections = {}; 457 for (let c in this._collections) { 458 newCollections[c] = this._collections[c]; 459 } 460 461 return new CollectionKeyManager( 462 this.lastModified, 463 this._default, 464 newCollections 465 ); 466 }, 467 468 // Return information about old vs new keys: 469 // * same: true if two collections are equal 470 // * changed: an array of collection names that changed. 471 _compareKeyBundleCollections: function _compareKeyBundleCollections(m1, m2) { 472 let changed = []; 473 474 function process(m1, m2) { 475 for (let k1 in m1) { 476 let v1 = m1[k1]; 477 let v2 = m2[k1]; 478 if (!(v1 && v2 && v1.equals(v2))) { 479 changed.push(k1); 480 } 481 } 482 } 483 484 // Diffs both ways. 485 process(m1, m2); 486 process(m2, m1); 487 488 // Return a sorted, unique array. 489 changed.sort(); 490 let last; 491 changed = changed.filter(x => x != last && (last = x)); 492 return { same: !changed.length, changed }; 493 }, 494 495 get isClear() { 496 return !this._default; 497 }, 498 499 clear: function clear() { 500 this._log.info("Clearing collection keys..."); 501 this.lastModified = 0; 502 this._collections = {}; 503 this._default = null; 504 }, 505 506 keyForCollection(collection) { 507 if (collection && this._collections[collection]) { 508 return this._collections[collection]; 509 } 510 511 return this._default; 512 }, 513 514 /** 515 * If `collections` (an array of strings) is provided, iterate 516 * over it and generate random keys for each collection. 517 * Create a WBO for the given data. 518 */ 519 _makeWBO(collections, defaultBundle) { 520 let wbo = new CryptoWrapper(CRYPTO_COLLECTION, KEYS_WBO); 521 let c = {}; 522 for (let k in collections) { 523 c[k] = collections[k].keyPairB64; 524 } 525 wbo.cleartext = { 526 default: defaultBundle ? defaultBundle.keyPairB64 : null, 527 collections: c, 528 collection: CRYPTO_COLLECTION, 529 id: KEYS_WBO, 530 }; 531 return wbo; 532 }, 533 534 /** 535 * Create a WBO for the current keys. 536 */ 537 asWBO() { 538 return this._makeWBO(this._collections, this._default); 539 }, 540 541 /** 542 * Compute a new default key, and new keys for any specified collections. 543 */ 544 async newKeys(collections) { 545 let newDefaultKeyBundle = await this.newDefaultKeyBundle(); 546 547 let newColls = {}; 548 if (collections) { 549 for (let c of collections) { 550 let b = new BulkKeyBundle(c); 551 await b.generateRandom(); 552 newColls[c] = b; 553 } 554 } 555 return [newDefaultKeyBundle, newColls]; 556 }, 557 558 /** 559 * Generates new keys, but does not replace our local copy. Use this to 560 * verify an upload before storing. 561 */ 562 async generateNewKeysWBO(collections) { 563 let newDefaultKey, newColls; 564 [newDefaultKey, newColls] = await this.newKeys(collections); 565 566 return this._makeWBO(newColls, newDefaultKey); 567 }, 568 569 /** 570 * Create a new default key. 571 * 572 * @returns {BulkKeyBundle} 573 */ 574 async newDefaultKeyBundle() { 575 const key = new BulkKeyBundle(DEFAULT_KEYBUNDLE_NAME); 576 await key.generateRandom(); 577 return key; 578 }, 579 580 /** 581 * Create a new default key and store it as this._default, since without one you cannot use setContents. 582 */ 583 async generateDefaultKey() { 584 this._default = await this.newDefaultKeyBundle(); 585 }, 586 587 /** 588 * Return true if keys are already present for each of the given 589 * collections. 590 */ 591 hasKeysFor(collections) { 592 // We can't use filter() here because sometimes collections is an iterator. 593 for (let collection of collections) { 594 if (!this._collections[collection]) { 595 return false; 596 } 597 } 598 return true; 599 }, 600 601 /** 602 * Return a new CollectionKeyManager that has keys for each of the 603 * given collections (creating new ones for collections where we 604 * don't already have keys). 605 */ 606 async ensureKeysFor(collections) { 607 const newKeys = Object.assign({}, this._collections); 608 for (let c of collections) { 609 if (newKeys[c]) { 610 continue; // don't replace existing keys 611 } 612 613 const b = new BulkKeyBundle(c); 614 await b.generateRandom(); 615 newKeys[c] = b; 616 } 617 return new CollectionKeyManager(this.lastModified, this._default, newKeys); 618 }, 619 620 // Take the fetched info/collections WBO, checking the change 621 // time of the crypto collection. 622 updateNeeded(info_collections) { 623 this._log.info( 624 "Testing for updateNeeded. Last modified: " + this.lastModified 625 ); 626 627 // No local record of modification time? Need an update. 628 if (!this.lastModified) { 629 return true; 630 } 631 632 // No keys on the server? We need an update, though our 633 // update handling will be a little more drastic... 634 if (!(CRYPTO_COLLECTION in info_collections)) { 635 return true; 636 } 637 638 // Otherwise, we need an update if our modification time is stale. 639 return info_collections[CRYPTO_COLLECTION] > this.lastModified; 640 }, 641 642 // 643 // Set our keys and modified time to the values fetched from the server. 644 // Returns one of three values: 645 // 646 // * If the default key was modified, return true. 647 // * If the default key was not modified, but per-collection keys were, 648 // return an array of such. 649 // * Otherwise, return false -- we were up-to-date. 650 // 651 setContents: function setContents(payload, modified) { 652 let self = this; 653 654 this._log.info( 655 "Setting collection keys contents. Our last modified: " + 656 this.lastModified + 657 ", input modified: " + 658 modified + 659 "." 660 ); 661 662 if (!payload) { 663 throw new Error("No payload in CollectionKeyManager.setContents()."); 664 } 665 666 if (!payload.default) { 667 this._log.warn("No downloaded default key: this should not occur."); 668 this._log.warn("Not clearing local keys."); 669 throw new Error( 670 "No default key in CollectionKeyManager.setContents(). Cannot proceed." 671 ); 672 } 673 674 // Process the incoming default key. 675 let b = new BulkKeyBundle(DEFAULT_KEYBUNDLE_NAME); 676 b.keyPairB64 = payload.default; 677 let newDefault = b; 678 679 // Process the incoming collections. 680 let newCollections = {}; 681 if ("collections" in payload) { 682 this._log.info("Processing downloaded per-collection keys."); 683 let colls = payload.collections; 684 for (let k in colls) { 685 let v = colls[k]; 686 if (v) { 687 let keyObj = new BulkKeyBundle(k); 688 keyObj.keyPairB64 = v; 689 newCollections[k] = keyObj; 690 } 691 } 692 } 693 694 // Check to see if these are already our keys. 695 let sameDefault = this._default && this._default.equals(newDefault); 696 let collComparison = this._compareKeyBundleCollections( 697 newCollections, 698 this._collections 699 ); 700 let sameColls = collComparison.same; 701 702 if (sameDefault && sameColls) { 703 self._log.info("New keys are the same as our old keys!"); 704 if (modified) { 705 self._log.info("Bumped local modified time."); 706 self.lastModified = modified; 707 } 708 return false; 709 } 710 711 // Make sure things are nice and tidy before we set. 712 this.clear(); 713 714 this._log.info("Saving downloaded keys."); 715 this._default = newDefault; 716 this._collections = newCollections; 717 718 // Always trust the server. 719 if (modified) { 720 self._log.info("Bumping last modified to " + modified); 721 self.lastModified = modified; 722 } 723 724 return sameDefault ? collComparison.changed : true; 725 }, 726 727 async updateContents(syncKeyBundle, storage_keys) { 728 let log = this._log; 729 log.info("Updating collection keys..."); 730 731 // storage_keys is a WBO, fetched from storage/crypto/keys. 732 // Its payload is the default key, and a map of collections to keys. 733 // We lazily compute the key objects from the strings we're given. 734 735 let payload; 736 try { 737 payload = await storage_keys.decrypt(syncKeyBundle); 738 } catch (ex) { 739 log.warn("Got exception decrypting storage keys with sync key.", ex); 740 log.info("Aborting updateContents. Rethrowing."); 741 throw ex; 742 } 743 744 let r = this.setContents(payload, storage_keys.modified); 745 log.info("Collection keys updated."); 746 return r; 747 }, 748 }; 749 750 export function Collection(uri, recordObj, service) { 751 if (!service) { 752 throw new Error("Collection constructor requires a service."); 753 } 754 755 Resource.call(this, uri); 756 757 // Bug 1985401: Log when Resource constructor results in null uri 758 // We don't throw here to preserve existing functionality 759 if (!this.uri) { 760 this._log.error( 761 `Collection constructor: Resource.call() resulted in null uri`, 762 { originalUri: uri, recordObj: recordObj?.name || "unknown" } 763 ); 764 } 765 766 // This is a bit hacky, but gets the job done. 767 let res = service.resource(uri); 768 this.authenticator = res.authenticator; 769 770 this._recordObj = recordObj; 771 this._service = service; 772 773 this._full = false; 774 this._ids = null; 775 this._limit = 0; 776 this._older = 0; 777 this._newer = 0; 778 this._data = []; 779 // optional members used by batch upload operations. 780 this._batch = null; 781 this._commit = false; 782 // Used for batch download operations -- note that this is explicitly an 783 // opaque value and not (necessarily) a number. 784 this._offset = null; 785 } 786 787 Collection.prototype = { 788 _logName: "Sync.Collection", 789 790 _rebuildURL: function Coll__rebuildURL() { 791 // Bug 1985401: Prevent QueryInterface usage on null uri 792 if (!this.uri) { 793 throw new Error("_rebuildURL called with null uri"); 794 } 795 // XXX should consider what happens if it's not a URL... 796 this.uri.QueryInterface(Ci.nsIURL); 797 798 let args = []; 799 if (this.older) { 800 args.push("older=" + this.older); 801 } 802 if (this.newer) { 803 args.push("newer=" + this.newer); 804 } 805 if (this.full) { 806 args.push("full=1"); 807 } 808 if (this.sort) { 809 args.push("sort=" + this.sort); 810 } 811 if (this.ids != null) { 812 args.push("ids=" + this.ids); 813 } 814 if (this.limit > 0 && this.limit != Infinity) { 815 args.push("limit=" + this.limit); 816 } 817 if (this._batch) { 818 args.push("batch=" + encodeURIComponent(this._batch)); 819 } 820 if (this._commit) { 821 args.push("commit=true"); 822 } 823 if (this._offset) { 824 args.push("offset=" + encodeURIComponent(this._offset)); 825 } 826 827 this.uri = this.uri 828 .mutate() 829 .setQuery(args.length ? "?" + args.join("&") : "") 830 .finalize(); 831 }, 832 833 // get full items 834 get full() { 835 return this._full; 836 }, 837 set full(value) { 838 this._full = value; 839 this._rebuildURL(); 840 }, 841 842 // Apply the action to a certain set of ids 843 get ids() { 844 return this._ids; 845 }, 846 set ids(value) { 847 this._ids = value; 848 this._rebuildURL(); 849 }, 850 851 // Limit how many records to get 852 get limit() { 853 return this._limit; 854 }, 855 set limit(value) { 856 this._limit = value; 857 this._rebuildURL(); 858 }, 859 860 // get only items modified before some date 861 get older() { 862 return this._older; 863 }, 864 set older(value) { 865 this._older = value; 866 this._rebuildURL(); 867 }, 868 869 // get only items modified since some date 870 get newer() { 871 return this._newer; 872 }, 873 set newer(value) { 874 this._newer = value; 875 this._rebuildURL(); 876 }, 877 878 // get items sorted by some criteria. valid values: 879 // oldest (oldest first) 880 // newest (newest first) 881 // index 882 get sort() { 883 return this._sort; 884 }, 885 set sort(value) { 886 if (value && value != "oldest" && value != "newest" && value != "index") { 887 throw new TypeError( 888 `Illegal value for sort: "${value}" (should be "oldest", "newest", or "index").` 889 ); 890 } 891 this._sort = value; 892 this._rebuildURL(); 893 }, 894 895 get offset() { 896 return this._offset; 897 }, 898 set offset(value) { 899 this._offset = value; 900 this._rebuildURL(); 901 }, 902 903 // Set information about the batch for this request. 904 get batch() { 905 return this._batch; 906 }, 907 set batch(value) { 908 this._batch = value; 909 this._rebuildURL(); 910 }, 911 912 get commit() { 913 return this._commit; 914 }, 915 set commit(value) { 916 this._commit = value && true; 917 this._rebuildURL(); 918 }, 919 920 // Similar to get(), but will page through the items `batchSize` at a time, 921 // deferring calling the record handler until we've gotten them all. 922 // 923 // Returns the last response processed, and doesn't run the record handler 924 // on any items if a non-success status is received while downloading the 925 // records (or if a network error occurs). 926 async getBatched(batchSize = DEFAULT_DOWNLOAD_BATCH_SIZE) { 927 let totalLimit = Number(this.limit) || Infinity; 928 if (batchSize <= 0 || batchSize >= totalLimit) { 929 throw new Error("Invalid batch size"); 930 } 931 932 if (!this.full) { 933 throw new Error("getBatched is unimplemented for guid-only GETs"); 934 } 935 936 // _onComplete and _onProgress are reset after each `get` by Resource. 937 let { _onComplete, _onProgress } = this; 938 let recordBuffer = []; 939 let resp; 940 try { 941 let lastModifiedTime; 942 this.limit = batchSize; 943 944 do { 945 this._onProgress = _onProgress; 946 this._onComplete = _onComplete; 947 if (batchSize + recordBuffer.length > totalLimit) { 948 this.limit = totalLimit - recordBuffer.length; 949 } 950 this._log.trace("Performing batched GET", { 951 limit: this.limit, 952 offset: this.offset, 953 }); 954 // Actually perform the request 955 resp = await this.get(); 956 if (!resp.success) { 957 recordBuffer = []; 958 break; 959 } 960 for (let json of resp.obj) { 961 let record = new this._recordObj(); 962 record.deserialize(json); 963 recordBuffer.push(record); 964 } 965 966 // Initialize last modified, or check that something broken isn't happening. 967 let lastModified = resp.headers["x-last-modified"]; 968 if (!lastModifiedTime) { 969 lastModifiedTime = lastModified; 970 this.setHeader("X-If-Unmodified-Since", lastModified); 971 } else if (lastModified != lastModifiedTime) { 972 // Should be impossible -- We'd get a 412 in this case. 973 throw new Error( 974 "X-Last-Modified changed in the middle of a download batch! " + 975 `${lastModified} => ${lastModifiedTime}` 976 ); 977 } 978 979 // If this is missing, we're finished. 980 this.offset = resp.headers["x-weave-next-offset"]; 981 } while (this.offset && totalLimit > recordBuffer.length); 982 } finally { 983 // Ensure we undo any temporary state so that subsequent calls to get() 984 // or getBatched() work properly. We do this before calling the record 985 // handler so that we can more convincingly pretend to be a normal get() 986 // call. Note: we're resetting these to the values they had before this 987 // function was called. 988 this._limit = totalLimit; 989 this._offset = null; 990 delete this._headers["x-if-unmodified-since"]; 991 this._rebuildURL(); 992 } 993 return { response: resp, records: recordBuffer }; 994 }, 995 996 // This object only supports posting via the postQueue object. 997 post() { 998 throw new Error( 999 "Don't directly post to a collection - use newPostQueue instead" 1000 ); 1001 }, 1002 1003 newPostQueue(log, timestamp, postCallback) { 1004 let poster = (data, headers, batch, commit) => { 1005 this.batch = batch; 1006 this.commit = commit; 1007 for (let [header, value] of headers) { 1008 this.setHeader(header, value); 1009 } 1010 return Resource.prototype.post.call(this, data); 1011 }; 1012 return new PostQueue( 1013 poster, 1014 timestamp, 1015 this._service.serverConfiguration || {}, 1016 log, 1017 postCallback 1018 ); 1019 }, 1020 }; 1021 1022 Object.setPrototypeOf(Collection.prototype, Resource.prototype); 1023 1024 // These are limits for requests provided by the server at the 1025 // info/configuration endpoint -- server documentation is available here: 1026 // http://moz-services-docs.readthedocs.io/en/latest/storage/apis-1.5.html#api-instructions 1027 // 1028 // All are optional, however we synthesize (non-infinite) default values for the 1029 // "max_request_bytes" and "max_record_payload_bytes" options. For the others, 1030 // we ignore them (we treat the limit is infinite) if they're missing. 1031 // 1032 // These are also the only ones that all servers (even batching-disabled 1033 // servers) should support, at least once this sync-serverstorage patch is 1034 // everywhere https://github.com/mozilla-services/server-syncstorage/pull/74 1035 // 1036 // Batching enabled servers also limit the amount of payload data and number 1037 // of and records we can send in a single post as well as in the whole batch. 1038 // Note that the byte limits for these there are just with respect to the 1039 // *payload* data, e.g. the data appearing in the payload property (a 1040 // string) of the object. 1041 // 1042 // Note that in practice, these limits should be sensible, but the code makes 1043 // no assumptions about this. If we hit any of the limits, we perform the 1044 // corresponding action (e.g. submit a request, possibly committing the 1045 // current batch). 1046 const DefaultPostQueueConfig = Object.freeze({ 1047 // Number of total bytes allowed in a request 1048 max_request_bytes: 260 * 1024, 1049 1050 // Maximum number of bytes allowed in the "payload" property of a record. 1051 max_record_payload_bytes: 256 * 1024, 1052 1053 // The limit for how many bytes worth of data appearing in "payload" 1054 // properties are allowed in a single post. 1055 max_post_bytes: Infinity, 1056 1057 // The limit for the number of records allowed in a single post. 1058 max_post_records: Infinity, 1059 1060 // The limit for how many bytes worth of data appearing in "payload" 1061 // properties are allowed in a batch. (Same as max_post_bytes, but for 1062 // batches). 1063 max_total_bytes: Infinity, 1064 1065 // The limit for the number of records allowed in a single post. (Same 1066 // as max_post_records, but for batches). 1067 max_total_records: Infinity, 1068 }); 1069 1070 // Manages a pair of (byte, count) limits for a PostQueue, such as 1071 // (max_post_bytes, max_post_records) or (max_total_bytes, max_total_records). 1072 class LimitTracker { 1073 constructor(maxBytes, maxRecords) { 1074 this.maxBytes = maxBytes; 1075 this.maxRecords = maxRecords; 1076 this.curBytes = 0; 1077 this.curRecords = 0; 1078 } 1079 1080 clear() { 1081 this.curBytes = 0; 1082 this.curRecords = 0; 1083 } 1084 1085 canAddRecord(payloadSize) { 1086 // The record counts are inclusive, but depending on the version of the 1087 // server, the byte counts may or may not be inclusive (See 1088 // https://github.com/mozilla-services/server-syncstorage/issues/73). 1089 return ( 1090 this.curRecords + 1 <= this.maxRecords && 1091 this.curBytes + payloadSize < this.maxBytes 1092 ); 1093 } 1094 1095 canNeverAdd(recordSize) { 1096 return recordSize >= this.maxBytes; 1097 } 1098 1099 didAddRecord(recordSize) { 1100 if (!this.canAddRecord(recordSize)) { 1101 // This is a bug, caller is expected to call canAddRecord first. 1102 throw new Error( 1103 "LimitTracker.canAddRecord must be checked before adding record" 1104 ); 1105 } 1106 this.curRecords += 1; 1107 this.curBytes += recordSize; 1108 } 1109 } 1110 1111 /* A helper to manage the posting of records while respecting the various 1112 size limits. 1113 1114 This supports the concept of a server-side "batch". The general idea is: 1115 * We queue as many records as allowed in memory, then make a single POST. 1116 * This first POST (optionally) gives us a batch ID, which we use for 1117 all subsequent posts, until... 1118 * At some point we hit a batch-maximum, and jump through a few hoops to 1119 commit the current batch (ie, all previous POSTs) and start a new one. 1120 * Eventually commit the final batch. 1121 1122 In most cases we expect there to be exactly 1 batch consisting of possibly 1123 multiple POSTs. 1124 */ 1125 export function PostQueue(poster, timestamp, serverConfig, log, postCallback) { 1126 // The "post" function we should use when it comes time to do the post. 1127 this.poster = poster; 1128 this.log = log; 1129 1130 let config = Object.assign({}, DefaultPostQueueConfig, serverConfig); 1131 1132 if (!serverConfig.max_request_bytes && serverConfig.max_post_bytes) { 1133 // Use max_post_bytes for max_request_bytes if it's missing. Only needed 1134 // until server-syncstorage/pull/74 is everywhere, and even then it's 1135 // unnecessary if the server limits are configured sanely (there's no 1136 // guarantee of -- at least before that is fully deployed) 1137 config.max_request_bytes = serverConfig.max_post_bytes; 1138 } 1139 1140 this.log.trace("new PostQueue config (after defaults): ", config); 1141 1142 // The callback we make with the response when we do get around to making the 1143 // post (which could be during any of the enqueue() calls or the final flush()) 1144 // This callback may be called multiple times and must not add new items to 1145 // the queue. 1146 // The second argument passed to this callback is a boolean value that is true 1147 // if we're in the middle of a batch, and false if either the batch is 1148 // complete, or it's a post to a server that does not understand batching. 1149 this.postCallback = postCallback; 1150 1151 // Tracks the count and combined payload size for the records we've queued 1152 // so far but are yet to POST. 1153 this.postLimits = new LimitTracker( 1154 config.max_post_bytes, 1155 config.max_post_records 1156 ); 1157 1158 // As above, but for the batch size. 1159 this.batchLimits = new LimitTracker( 1160 config.max_total_bytes, 1161 config.max_total_records 1162 ); 1163 1164 // Limit for the size of `this.queued` before we do a post. 1165 this.maxRequestBytes = config.max_request_bytes; 1166 1167 // Limit for the size of incoming record payloads. 1168 this.maxPayloadBytes = config.max_record_payload_bytes; 1169 1170 // The string where we are capturing the stringified version of the records 1171 // queued so far. It will always be invalid JSON as it is always missing the 1172 // closing bracket. It's also used to track whether or not we've gone past 1173 // maxRequestBytes. 1174 this.queued = ""; 1175 1176 // The ID of our current batch. Can be undefined (meaning we are yet to make 1177 // the first post of a patch, so don't know if we have a batch), null (meaning 1178 // we've made the first post but the server response indicated no batching 1179 // semantics), otherwise we have made the first post and it holds the batch ID 1180 // returned from the server. 1181 this.batchID = undefined; 1182 1183 // Time used for X-If-Unmodified-Since -- should be the timestamp from the last GET. 1184 this.lastModified = timestamp; 1185 } 1186 1187 PostQueue.prototype = { 1188 async enqueue(record) { 1189 // We want to ensure the record has a .toJSON() method defined - even 1190 // though JSON.stringify() would implicitly call it, the stringify might 1191 // still work even if it isn't defined, which isn't what we want. 1192 let jsonRepr = record.toJSON(); 1193 if (!jsonRepr) { 1194 throw new Error( 1195 "You must only call this with objects that explicitly support JSON" 1196 ); 1197 } 1198 1199 let bytes = JSON.stringify(jsonRepr); 1200 1201 // We use the payload size for the LimitTrackers, since that's what the 1202 // byte limits other than max_request_bytes refer to. 1203 let payloadLength = jsonRepr.payload.length; 1204 1205 // The `+ 2` is to account for the 2-byte (maximum) overhead (one byte for 1206 // the leading comma or "[", which all records will have, and the other for 1207 // the final trailing "]", only present for the last record). 1208 let encodedLength = bytes.length + 2; 1209 1210 // Check first if there's some limit that indicates we cannot ever enqueue 1211 // this record. 1212 let isTooBig = 1213 this.postLimits.canNeverAdd(payloadLength) || 1214 this.batchLimits.canNeverAdd(payloadLength) || 1215 encodedLength >= this.maxRequestBytes || 1216 payloadLength >= this.maxPayloadBytes; 1217 1218 if (isTooBig) { 1219 return { 1220 enqueued: false, 1221 error: new Error("Single record too large to submit to server"), 1222 }; 1223 } 1224 1225 let canPostRecord = this.postLimits.canAddRecord(payloadLength); 1226 let canBatchRecord = this.batchLimits.canAddRecord(payloadLength); 1227 let canSendRecord = 1228 this.queued.length + encodedLength < this.maxRequestBytes; 1229 1230 if (!canPostRecord || !canBatchRecord || !canSendRecord) { 1231 this.log.trace("PostQueue flushing: ", { 1232 canPostRecord, 1233 canSendRecord, 1234 canBatchRecord, 1235 }); 1236 // We need to write the queue out before handling this one, but we only 1237 // commit the batch (and thus start a new one) if the record couldn't fit 1238 // inside the batch. 1239 await this.flush(!canBatchRecord); 1240 } 1241 1242 this.postLimits.didAddRecord(payloadLength); 1243 this.batchLimits.didAddRecord(payloadLength); 1244 1245 // Either a ',' or a '[' depending on whether this is the first record. 1246 this.queued += this.queued.length ? "," : "["; 1247 this.queued += bytes; 1248 return { enqueued: true }; 1249 }, 1250 1251 async flush(finalBatchPost) { 1252 if (!this.queued) { 1253 // nothing queued - we can't be in a batch, and something has gone very 1254 // bad if we think we are. 1255 if (this.batchID) { 1256 throw new Error( 1257 `Flush called when no queued records but we are in a batch ${this.batchID}` 1258 ); 1259 } 1260 return; 1261 } 1262 // the batch query-param and headers we'll send. 1263 let batch; 1264 let headers = []; 1265 if (this.batchID === undefined) { 1266 // First commit in a (possible) batch. 1267 batch = "true"; 1268 } else if (this.batchID) { 1269 // We have an existing batch. 1270 batch = this.batchID; 1271 } else { 1272 // Not the first post and we know we have no batch semantics. 1273 batch = null; 1274 } 1275 1276 headers.push(["x-if-unmodified-since", this.lastModified]); 1277 1278 let numQueued = this.postLimits.curRecords; 1279 this.log.info( 1280 `Posting ${numQueued} records of ${ 1281 this.queued.length + 1 1282 } bytes with batch=${batch}` 1283 ); 1284 let queued = this.queued + "]"; 1285 if (finalBatchPost) { 1286 this.batchLimits.clear(); 1287 } 1288 this.postLimits.clear(); 1289 this.queued = ""; 1290 let response = await this.poster( 1291 queued, 1292 headers, 1293 batch, 1294 !!(finalBatchPost && this.batchID !== null) 1295 ); 1296 1297 if (!response.success) { 1298 this.log.trace("Server error response during a batch", response); 1299 // not clear what we should do here - we expect the consumer of this to 1300 // abort by throwing in the postCallback below. 1301 await this.postCallback(this, response, !finalBatchPost); 1302 return; 1303 } 1304 1305 if (finalBatchPost) { 1306 this.log.trace("Committed batch", this.batchID); 1307 this.batchID = undefined; // we are now in "first post for the batch" state. 1308 this.lastModified = response.headers["x-last-modified"]; 1309 await this.postCallback(this, response, false); 1310 return; 1311 } 1312 1313 if (response.status != 202) { 1314 if (this.batchID) { 1315 throw new Error( 1316 "Server responded non-202 success code while a batch was in progress" 1317 ); 1318 } 1319 this.batchID = null; // no batch semantics are in place. 1320 this.lastModified = response.headers["x-last-modified"]; 1321 await this.postCallback(this, response, false); 1322 return; 1323 } 1324 1325 // this response is saying the server has batch semantics - we should 1326 // always have a batch ID in the response. 1327 let responseBatchID = response.obj.batch; 1328 this.log.trace("Server responsed 202 with batch", responseBatchID); 1329 if (!responseBatchID) { 1330 this.log.error( 1331 "Invalid server response: 202 without a batch ID", 1332 response 1333 ); 1334 throw new Error("Invalid server response: 202 without a batch ID"); 1335 } 1336 1337 if (this.batchID === undefined) { 1338 this.batchID = responseBatchID; 1339 if (!this.lastModified) { 1340 this.lastModified = response.headers["x-last-modified"]; 1341 if (!this.lastModified) { 1342 throw new Error("Batch response without x-last-modified"); 1343 } 1344 } 1345 } 1346 1347 if (this.batchID != responseBatchID) { 1348 throw new Error( 1349 `Invalid client/server batch state - client has ${this.batchID}, server has ${responseBatchID}` 1350 ); 1351 } 1352 1353 await this.postCallback(this, response, true); 1354 }, 1355 };