clients.sys.mjs (37339B)
1 /* This Source Code Form is subject to the terms of the Mozilla Public 2 * License, v. 2.0. If a copy of the MPL was not distributed with this 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4 5 /** 6 * How does the clients engine work? 7 * 8 * - We use 2 files - commands.json and commands-syncing.json. 9 * 10 * - At sync upload time, we attempt a rename of commands.json to 11 * commands-syncing.json, and ignore errors (helps for crash during sync!). 12 * - We load commands-syncing.json and stash the contents in 13 * _currentlySyncingCommands which lives for the duration of the upload process. 14 * - We use _currentlySyncingCommands to build the outgoing records 15 * - Immediately after successful upload, we delete commands-syncing.json from 16 * disk (and clear _currentlySyncingCommands). We reconcile our local records 17 * with what we just wrote in the server, and add failed IDs commands 18 * back in commands.json 19 * - Any time we need to "save" a command for future syncs, we load 20 * commands.json, update it, and write it back out. 21 */ 22 23 import { Async } from "resource://services-common/async.sys.mjs"; 24 25 import { 26 DEVICE_TYPE_DESKTOP, 27 DEVICE_TYPE_MOBILE, 28 DEVICE_TYPE_TABLET, 29 SINGLE_USER_THRESHOLD, 30 SYNC_API_VERSION, 31 } from "resource://services-sync/constants.sys.mjs"; 32 33 import { 34 Store, 35 SyncEngine, 36 LegacyTracker, 37 } from "resource://services-sync/engines.sys.mjs"; 38 import { CryptoWrapper } from "resource://services-sync/record.sys.mjs"; 39 import { Resource } from "resource://services-sync/resource.sys.mjs"; 40 import { Svc, Utils } from "resource://services-sync/util.sys.mjs"; 41 42 const lazy = {}; 43 44 ChromeUtils.defineLazyGetter(lazy, "fxAccounts", () => { 45 return ChromeUtils.importESModule( 46 "resource://gre/modules/FxAccounts.sys.mjs" 47 ).getFxAccountsSingleton(); 48 }); 49 50 import { PREF_ACCOUNT_ROOT } from "resource://gre/modules/FxAccountsCommon.sys.mjs"; 51 52 const CLIENTS_TTL = 15552000; // 180 days 53 const CLIENTS_TTL_REFRESH = 604800; // 7 days 54 const STALE_CLIENT_REMOTE_AGE = 604800; // 7 days 55 56 // TTL of the message sent to another device when sending a tab 57 const NOTIFY_TAB_SENT_TTL_SECS = 1 * 3600; // 1 hour 58 59 // How often we force a refresh of the FxA device list. 60 const REFRESH_FXA_DEVICE_INTERVAL_MS = 2 * 60 * 60 * 1000; // 2 hours 61 62 // Reasons behind sending collection_changed push notifications. 63 const COLLECTION_MODIFIED_REASON_SENDTAB = "sendtab"; 64 const COLLECTION_MODIFIED_REASON_FIRSTSYNC = "firstsync"; 65 66 const SUPPORTED_PROTOCOL_VERSIONS = [SYNC_API_VERSION]; 67 const LAST_MODIFIED_ON_PROCESS_COMMAND_PREF = 68 "services.sync.clients.lastModifiedOnProcessCommands"; 69 70 function hasDupeCommand(commands, action) { 71 if (!commands) { 72 return false; 73 } 74 return commands.some( 75 other => 76 other.command == action.command && 77 Utils.deepEquals(other.args, action.args) 78 ); 79 } 80 81 export function ClientsRec(collection, id) { 82 CryptoWrapper.call(this, collection, id); 83 } 84 85 ClientsRec.prototype = { 86 _logName: "Sync.Record.Clients", 87 ttl: CLIENTS_TTL, 88 }; 89 Object.setPrototypeOf(ClientsRec.prototype, CryptoWrapper.prototype); 90 91 Utils.deferGetSet(ClientsRec, "cleartext", [ 92 "name", 93 "type", 94 "commands", 95 "version", 96 "protocols", 97 "formfactor", 98 "os", 99 "appPackage", 100 "application", 101 "device", 102 "fxaDeviceId", 103 ]); 104 105 export function ClientEngine(service) { 106 SyncEngine.call(this, "Clients", service); 107 108 this.fxAccounts = lazy.fxAccounts; 109 this.addClientCommandQueue = Async.asyncQueueCaller(this._log); 110 Utils.defineLazyIDProperty(this, "localID", "services.sync.client.GUID"); 111 } 112 113 ClientEngine.prototype = { 114 _storeObj: ClientStore, 115 _recordObj: ClientsRec, 116 _trackerObj: ClientsTracker, 117 allowSkippedRecord: false, 118 _knownStaleFxADeviceIds: null, 119 _lastDeviceCounts: null, 120 _lastFxaDeviceRefresh: 0, 121 122 async initialize() { 123 // Reset the last sync timestamp on every startup so that we fetch all clients 124 await this.resetLastSync(); 125 }, 126 127 // These two properties allow us to avoid replaying the same commands 128 // continuously if we cannot manage to upload our own record. 129 _localClientLastModified: 0, 130 get _lastModifiedOnProcessCommands() { 131 return Services.prefs.getIntPref(LAST_MODIFIED_ON_PROCESS_COMMAND_PREF, -1); 132 }, 133 134 set _lastModifiedOnProcessCommands(value) { 135 Services.prefs.setIntPref(LAST_MODIFIED_ON_PROCESS_COMMAND_PREF, value); 136 }, 137 138 get isFirstSync() { 139 return !this.lastRecordUpload; 140 }, 141 142 // Always sync client data as it controls other sync behavior 143 get enabled() { 144 return true; 145 }, 146 147 get lastRecordUpload() { 148 return Svc.PrefBranch.getIntPref(this.name + ".lastRecordUpload", 0); 149 }, 150 set lastRecordUpload(value) { 151 Svc.PrefBranch.setIntPref( 152 this.name + ".lastRecordUpload", 153 Math.floor(value) 154 ); 155 }, 156 157 get remoteClients() { 158 // return all non-stale clients for external consumption. 159 return Object.values(this._store._remoteClients).filter(v => !v.stale); 160 }, 161 162 remoteClient(id) { 163 let client = this._store._remoteClients[id]; 164 return client && !client.stale ? client : null; 165 }, 166 167 remoteClientExists(id) { 168 return !!this.remoteClient(id); 169 }, 170 171 // Aggregate some stats on the composition of clients on this account 172 get stats() { 173 const ALL_MOBILE_TYPES = [DEVICE_TYPE_MOBILE, DEVICE_TYPE_TABLET]; 174 let stats = { 175 // Currently this should never be true as this code only runs on Desktop, but 176 // it doesn't cause harm. 177 hasMobile: ALL_MOBILE_TYPES.includes(this.localType), 178 names: [this.localName], 179 numClients: 1, 180 }; 181 182 for (let id in this._store._remoteClients) { 183 let { name, type, stale } = this._store._remoteClients[id]; 184 if (!stale) { 185 stats.hasMobile = stats.hasMobile || ALL_MOBILE_TYPES.includes(type); 186 stats.names.push(name); 187 stats.numClients++; 188 } 189 } 190 191 return stats; 192 }, 193 194 /** 195 * Obtain information about device types. 196 * 197 * Returns a Map of device types to integer counts. Guaranteed to include 198 * "desktop" (which will have at least 1 - this device) and "mobile" (which 199 * may have zero) counts. It almost certainly will include only these 2. 200 */ 201 get deviceTypes() { 202 let counts = new Map(); 203 204 counts.set(this.localType, 1); // currently this must be DEVICE_TYPE_DESKTOP 205 counts.set(DEVICE_TYPE_MOBILE, 0); 206 207 for (let id in this._store._remoteClients) { 208 let record = this._store._remoteClients[id]; 209 if (record.stale) { 210 continue; // pretend "stale" records don't exist. 211 } 212 let type = record.type; 213 // "tablet" and "mobile" are combined. 214 if (type == DEVICE_TYPE_TABLET) { 215 type = DEVICE_TYPE_MOBILE; 216 } 217 if (!counts.has(type)) { 218 counts.set(type, 0); 219 } 220 221 counts.set(type, counts.get(type) + 1); 222 } 223 224 return counts; 225 }, 226 227 get brandName() { 228 let brand = Services.strings.createBundle( 229 "chrome://branding/locale/brand.properties" 230 ); 231 return brand.GetStringFromName("brandShortName"); 232 }, 233 234 get localName() { 235 return this.fxAccounts.device.getLocalName(); 236 }, 237 set localName(value) { 238 this.fxAccounts.device.setLocalName(value); 239 }, 240 241 get localType() { 242 return this.fxAccounts.device.getLocalType(); 243 }, 244 245 getClientName(id) { 246 if (id == this.localID) { 247 return this.localName; 248 } 249 let client = this._store._remoteClients[id]; 250 if (!client) { 251 return ""; 252 } 253 // Sometimes the sync clients don't always correctly update the device name 254 // However FxA always does, so try to pull the name from there first 255 let fxaDevice = this.fxAccounts.device.recentDeviceList?.find( 256 device => device.id === client.fxaDeviceId 257 ); 258 259 // should be very rare, but could happen if we have yet to fetch devices, 260 // or the client recently disconnected 261 if (!fxaDevice) { 262 this._log.warn( 263 "Couldn't find associated FxA device, falling back to client name" 264 ); 265 return client.name; 266 } 267 return fxaDevice.name; 268 }, 269 270 getClientFxaDeviceId(id) { 271 if (this._store._remoteClients[id]) { 272 return this._store._remoteClients[id].fxaDeviceId; 273 } 274 return null; 275 }, 276 277 getClientByFxaDeviceId(fxaDeviceId) { 278 for (let id in this._store._remoteClients) { 279 let client = this._store._remoteClients[id]; 280 if (client.stale) { 281 continue; 282 } 283 if (client.fxaDeviceId == fxaDeviceId) { 284 return client; 285 } 286 } 287 return null; 288 }, 289 290 getClientType(id) { 291 const client = this._store._remoteClients[id]; 292 if (client.type == DEVICE_TYPE_DESKTOP) { 293 return "desktop"; 294 } 295 if (client.formfactor && client.formfactor.includes("tablet")) { 296 return "tablet"; 297 } 298 return "phone"; 299 }, 300 301 async _readCommands() { 302 let commands = await Utils.jsonLoad("commands", this); 303 return commands || {}; 304 }, 305 306 /** 307 * Low level function, do not use directly (use _addClientCommand instead). 308 */ 309 async _saveCommands(commands) { 310 try { 311 await Utils.jsonSave("commands", this, commands); 312 } catch (error) { 313 this._log.error("Failed to save JSON outgoing commands", error); 314 } 315 }, 316 317 async _prepareCommandsForUpload() { 318 try { 319 await Utils.jsonMove("commands", "commands-syncing", this); 320 } catch (e) { 321 // Ignore errors 322 } 323 let commands = await Utils.jsonLoad("commands-syncing", this); 324 return commands || {}; 325 }, 326 327 async _deleteUploadedCommands() { 328 delete this._currentlySyncingCommands; 329 try { 330 await Utils.jsonRemove("commands-syncing", this); 331 } catch (err) { 332 this._log.error("Failed to delete syncing-commands file", err); 333 } 334 }, 335 336 // Gets commands for a client we are yet to write to the server. Doesn't 337 // include commands for that client which are already on the server. 338 // We should rename this! 339 async getClientCommands(clientId) { 340 const allCommands = await this._readCommands(); 341 return allCommands[clientId] || []; 342 }, 343 344 async removeLocalCommand(command) { 345 // the implementation of this engine is such that adding a command to 346 // the local client is how commands are deleted! ¯\_(ツ)_/¯ 347 await this._addClientCommand(this.localID, command); 348 }, 349 350 async _addClientCommand(clientId, command) { 351 this.addClientCommandQueue.enqueueCall(async () => { 352 try { 353 const localCommands = await this._readCommands(); 354 const localClientCommands = localCommands[clientId] || []; 355 const remoteClient = this._store._remoteClients[clientId]; 356 let remoteClientCommands = []; 357 if (remoteClient && remoteClient.commands) { 358 remoteClientCommands = remoteClient.commands; 359 } 360 const clientCommands = localClientCommands.concat(remoteClientCommands); 361 if (hasDupeCommand(clientCommands, command)) { 362 return false; 363 } 364 localCommands[clientId] = localClientCommands.concat(command); 365 await this._saveCommands(localCommands); 366 return true; 367 } catch (e) { 368 // Failing to save a command should not "break the queue" of pending operations. 369 this._log.error(e); 370 return false; 371 } 372 }); 373 374 return this.addClientCommandQueue.promiseCallsComplete(); 375 }, 376 377 async _removeClientCommands(clientId) { 378 const allCommands = await this._readCommands(); 379 delete allCommands[clientId]; 380 await this._saveCommands(allCommands); 381 }, 382 383 async updateKnownStaleClients() { 384 this._log.debug("Updating the known stale clients"); 385 // _fetchFxADevices side effect updates this._knownStaleFxADeviceIds. 386 await this._fetchFxADevices(); 387 let localFxADeviceId = await lazy.fxAccounts.device.getLocalId(); 388 // Process newer records first, so that if we hit a record with a device ID 389 // we've seen before, we can mark it stale immediately. 390 let clientList = Object.values(this._store._remoteClients).sort( 391 (a, b) => b.serverLastModified - a.serverLastModified 392 ); 393 let seenDeviceIds = new Set([localFxADeviceId]); 394 for (let client of clientList) { 395 // Clients might not have an `fxaDeviceId` if they fail the FxA 396 // registration process. 397 if (!client.fxaDeviceId) { 398 continue; 399 } 400 if (this._knownStaleFxADeviceIds.includes(client.fxaDeviceId)) { 401 this._log.info( 402 `Hiding stale client ${client.id} - in known stale clients list` 403 ); 404 client.stale = true; 405 } else if (seenDeviceIds.has(client.fxaDeviceId)) { 406 this._log.info( 407 `Hiding stale client ${client.id}` + 408 ` - duplicate device id ${client.fxaDeviceId}` 409 ); 410 client.stale = true; 411 } else { 412 seenDeviceIds.add(client.fxaDeviceId); 413 } 414 } 415 }, 416 417 async _fetchFxADevices() { 418 // We only force a refresh periodically to keep the load on the servers 419 // down, and because we expect FxA to have received a push message in 420 // most cases when the FxA device list would have changed. For this reason 421 // we still go ahead and check the stale list even if we didn't force a 422 // refresh. 423 let now = this.fxAccounts._internal.now(); // tests mock this .now() impl. 424 if (now - REFRESH_FXA_DEVICE_INTERVAL_MS > this._lastFxaDeviceRefresh) { 425 this._lastFxaDeviceRefresh = now; 426 try { 427 await this.fxAccounts.device.refreshDeviceList(); 428 } catch (e) { 429 this._log.error("Could not refresh the FxA device list", e); 430 } 431 } 432 433 // We assume that clients not present in the FxA Device Manager list have been 434 // disconnected and so are stale 435 this._log.debug("Refreshing the known stale clients list"); 436 let localClients = Object.values(this._store._remoteClients) 437 .filter(client => client.fxaDeviceId) // iOS client records don't have fxaDeviceId 438 .map(client => client.fxaDeviceId); 439 const fxaClients = this.fxAccounts.device.recentDeviceList 440 ? this.fxAccounts.device.recentDeviceList.map(device => device.id) 441 : []; 442 this._knownStaleFxADeviceIds = Utils.arraySub(localClients, fxaClients); 443 }, 444 445 async _syncStartup() { 446 // Reupload new client record periodically. 447 if (Date.now() / 1000 - this.lastRecordUpload > CLIENTS_TTL_REFRESH) { 448 await this._tracker.addChangedID(this.localID); 449 } 450 return SyncEngine.prototype._syncStartup.call(this); 451 }, 452 453 async _processIncoming() { 454 // Fetch all records from the server. 455 await this.resetLastSync(); 456 this._incomingClients = {}; 457 try { 458 await SyncEngine.prototype._processIncoming.call(this); 459 // Update FxA Device list. 460 await this._fetchFxADevices(); 461 // Since clients are synced unconditionally, any records in the local store 462 // that don't exist on the server must be for disconnected clients. Remove 463 // them, so that we don't upload records with commands for clients that will 464 // never see them. We also do this to filter out stale clients from the 465 // tabs collection, since showing their list of tabs is confusing. 466 for (let id in this._store._remoteClients) { 467 if (!this._incomingClients[id]) { 468 this._log.info(`Removing local state for deleted client ${id}`); 469 await this._removeRemoteClient(id); 470 } 471 } 472 let localFxADeviceId = await lazy.fxAccounts.device.getLocalId(); 473 // Bug 1264498: Mobile clients don't remove themselves from the clients 474 // collection when the user disconnects Sync, so we mark as stale clients 475 // with the same name that haven't synced in over a week. 476 // (Note we can't simply delete them, or we re-apply them next sync - see 477 // bug 1287687) 478 this._localClientLastModified = Math.round( 479 this._incomingClients[this.localID] 480 ); 481 delete this._incomingClients[this.localID]; 482 let names = new Set([this.localName]); 483 let seenDeviceIds = new Set([localFxADeviceId]); 484 let idToLastModifiedList = Object.entries(this._incomingClients).sort( 485 (a, b) => b[1] - a[1] 486 ); 487 for (let [id, serverLastModified] of idToLastModifiedList) { 488 let record = this._store._remoteClients[id]; 489 // stash the server last-modified time on the record. 490 record.serverLastModified = serverLastModified; 491 if ( 492 record.fxaDeviceId && 493 this._knownStaleFxADeviceIds.includes(record.fxaDeviceId) 494 ) { 495 this._log.info( 496 `Hiding stale client ${id} - in known stale clients list` 497 ); 498 record.stale = true; 499 } 500 if (!names.has(record.name)) { 501 if (record.fxaDeviceId) { 502 seenDeviceIds.add(record.fxaDeviceId); 503 } 504 names.add(record.name); 505 continue; 506 } 507 let remoteAge = Resource.serverTime - this._incomingClients[id]; 508 if (remoteAge > STALE_CLIENT_REMOTE_AGE) { 509 this._log.info(`Hiding stale client ${id} with age ${remoteAge}`); 510 record.stale = true; 511 continue; 512 } 513 if (record.fxaDeviceId && seenDeviceIds.has(record.fxaDeviceId)) { 514 this._log.info( 515 `Hiding stale client ${record.id}` + 516 ` - duplicate device id ${record.fxaDeviceId}` 517 ); 518 record.stale = true; 519 } else if (record.fxaDeviceId) { 520 seenDeviceIds.add(record.fxaDeviceId); 521 } 522 } 523 } finally { 524 this._incomingClients = null; 525 } 526 }, 527 528 async _uploadOutgoing() { 529 this._currentlySyncingCommands = await this._prepareCommandsForUpload(); 530 const clientWithPendingCommands = Object.keys( 531 this._currentlySyncingCommands 532 ); 533 for (let clientId of clientWithPendingCommands) { 534 if (this._store._remoteClients[clientId] || this.localID == clientId) { 535 this._modified.set(clientId, 0); 536 } 537 } 538 let updatedIDs = this._modified.ids(); 539 await SyncEngine.prototype._uploadOutgoing.call(this); 540 // Record the response time as the server time for each item we uploaded. 541 let lastSync = await this.getLastSync(); 542 for (let id of updatedIDs) { 543 if (id == this.localID) { 544 this.lastRecordUpload = lastSync; 545 } else { 546 this._store._remoteClients[id].serverLastModified = lastSync; 547 } 548 } 549 }, 550 551 async _onRecordsWritten(succeeded, failed) { 552 // Reconcile the status of the local records with what we just wrote on the 553 // server 554 for (let id of succeeded) { 555 const commandChanges = this._currentlySyncingCommands[id]; 556 if (id == this.localID) { 557 if (this.isFirstSync) { 558 this._log.info( 559 "Uploaded our client record for the first time, notifying other clients." 560 ); 561 this._notifyClientRecordUploaded(); 562 } 563 if (this.localCommands) { 564 this.localCommands = this.localCommands.filter( 565 command => !hasDupeCommand(commandChanges, command) 566 ); 567 } 568 } else { 569 const clientRecord = this._store._remoteClients[id]; 570 if (!commandChanges || !clientRecord) { 571 // should be impossible, else we wouldn't have been writing it. 572 this._log.warn( 573 "No command/No record changes for a client we uploaded" 574 ); 575 continue; 576 } 577 // fixup the client record, so our copy of _remoteClients matches what we uploaded. 578 this._store._remoteClients[id] = await this._store.createRecord(id); 579 // we could do better and pass the reference to the record we just uploaded, 580 // but this will do for now 581 } 582 } 583 584 // Re-add failed commands 585 for (let id of failed) { 586 const commandChanges = this._currentlySyncingCommands[id]; 587 if (!commandChanges) { 588 continue; 589 } 590 await this._addClientCommand(id, commandChanges); 591 } 592 593 await this._deleteUploadedCommands(); 594 595 // Notify other devices that their own client collection changed 596 const idsToNotify = succeeded.reduce((acc, id) => { 597 if (id == this.localID) { 598 return acc; 599 } 600 const fxaDeviceId = this.getClientFxaDeviceId(id); 601 return fxaDeviceId ? acc.concat(fxaDeviceId) : acc; 602 }, []); 603 if (idsToNotify.length) { 604 this._notifyOtherClientsModified(idsToNotify); 605 } 606 }, 607 608 _notifyOtherClientsModified(ids) { 609 // We are not waiting on this promise on purpose. 610 this._notifyCollectionChanged( 611 ids, 612 NOTIFY_TAB_SENT_TTL_SECS, 613 COLLECTION_MODIFIED_REASON_SENDTAB 614 ); 615 }, 616 617 _notifyClientRecordUploaded() { 618 // We are not waiting on this promise on purpose. 619 this._notifyCollectionChanged( 620 null, 621 0, 622 COLLECTION_MODIFIED_REASON_FIRSTSYNC 623 ); 624 }, 625 626 /** 627 * @param {?string[]} ids FxA Client IDs to notify. null means everyone else. 628 * @param {number} ttl TTL of the push notification. 629 * @param {string} reason Reason for sending this push notification. 630 */ 631 async _notifyCollectionChanged(ids, ttl, reason) { 632 const message = { 633 version: 1, 634 command: "sync:collection_changed", 635 data: { 636 collections: ["clients"], 637 reason, 638 }, 639 }; 640 let excludedIds = null; 641 if (!ids) { 642 const localFxADeviceId = await lazy.fxAccounts.device.getLocalId(); 643 excludedIds = [localFxADeviceId]; 644 } 645 try { 646 await this.fxAccounts.notifyDevices(ids, excludedIds, message, ttl); 647 } catch (e) { 648 this._log.error("Could not notify of changes in the collection", e); 649 } 650 }, 651 652 async _syncFinish() { 653 // Record histograms for our device types, and also write them to a pref 654 // so non-histogram telemetry (eg, UITelemetry) and the sync scheduler 655 // has easy access to them, and so they are accurate even before we've 656 // successfully synced the first time after startup. 657 let deviceTypeCounts = this.deviceTypes; 658 for (let [deviceType, count] of deviceTypeCounts) { 659 let prefName = this.name + ".devices."; 660 switch (deviceType) { 661 case DEVICE_TYPE_DESKTOP: 662 Glean.sync.deviceCountDesktop.accumulateSingleSample(count); 663 prefName += "desktop"; 664 break; 665 case DEVICE_TYPE_MOBILE: 666 case DEVICE_TYPE_TABLET: 667 Glean.sync.deviceCountMobile.accumulateSingleSample(count); 668 prefName += "mobile"; 669 break; 670 default: 671 this._log.warn( 672 `Unexpected deviceType "${deviceType}" recording device telemetry.` 673 ); 674 continue; 675 } 676 // Optimization: only write the pref if it changed since our last sync. 677 if ( 678 this._lastDeviceCounts == null || 679 this._lastDeviceCounts.get(prefName) != count 680 ) { 681 Svc.PrefBranch.setIntPref(prefName, count); 682 } 683 } 684 this._lastDeviceCounts = deviceTypeCounts; 685 return SyncEngine.prototype._syncFinish.call(this); 686 }, 687 688 async _reconcile(item) { 689 // Every incoming record is reconciled, so we use this to track the 690 // contents of the collection on the server. 691 this._incomingClients[item.id] = item.modified; 692 693 if (!(await this._store.itemExists(item.id))) { 694 return true; 695 } 696 // Clients are synced unconditionally, so we'll always have new records. 697 // Unfortunately, this will cause the scheduler to use the immediate sync 698 // interval for the multi-device case, instead of the active interval. We 699 // work around this by updating the record during reconciliation, and 700 // returning false to indicate that the record doesn't need to be applied 701 // later. 702 await this._store.update(item); 703 return false; 704 }, 705 706 // Treat reset the same as wiping for locally cached clients 707 async _resetClient() { 708 await this._wipeClient(); 709 }, 710 711 async _wipeClient() { 712 await SyncEngine.prototype._resetClient.call(this); 713 this._knownStaleFxADeviceIds = null; 714 delete this.localCommands; 715 await this._store.wipe(); 716 try { 717 await Utils.jsonRemove("commands", this); 718 } catch (err) { 719 this._log.warn("Could not delete commands.json", err); 720 } 721 try { 722 await Utils.jsonRemove("commands-syncing", this); 723 } catch (err) { 724 this._log.warn("Could not delete commands-syncing.json", err); 725 } 726 }, 727 728 async removeClientData() { 729 let res = this.service.resource(this.engineURL + "/" + this.localID); 730 await res.delete(); 731 }, 732 733 // Override the default behavior to delete bad records from the server. 734 async handleHMACMismatch(item, mayRetry) { 735 this._log.debug("Handling HMAC mismatch for " + item.id); 736 737 let base = await SyncEngine.prototype.handleHMACMismatch.call( 738 this, 739 item, 740 mayRetry 741 ); 742 if (base != SyncEngine.kRecoveryStrategy.error) { 743 return base; 744 } 745 746 // It's a bad client record. Save it to be deleted at the end of the sync. 747 this._log.debug("Bad client record detected. Scheduling for deletion."); 748 await this._deleteId(item.id); 749 750 // Neither try again nor error; we're going to delete it. 751 return SyncEngine.kRecoveryStrategy.ignore; 752 }, 753 754 /** 755 * A hash of valid commands that the client knows about. The key is a command 756 * and the value is a hash containing information about the command such as 757 * number of arguments, description, and importance (lower importance numbers 758 * indicate higher importance. 759 */ 760 _commands: { 761 resetAll: { 762 args: 0, 763 importance: 0, 764 desc: "Clear temporary local data for all engines", 765 }, 766 resetEngine: { 767 args: 1, 768 importance: 0, 769 desc: "Clear temporary local data for engine", 770 }, 771 wipeEngine: { 772 args: 1, 773 importance: 0, 774 desc: "Delete all client data for engine", 775 }, 776 logout: { args: 0, importance: 0, desc: "Log out client" }, 777 }, 778 779 /** 780 * Sends a command+args pair to a specific client. 781 * 782 * @param command Command string 783 * @param args Array of arguments/data for command 784 * @param clientId Client to send command to 785 */ 786 async _sendCommandToClient(command, args, clientId, telemetryExtra) { 787 this._log.trace("Sending " + command + " to " + clientId); 788 789 let client = this._store._remoteClients[clientId]; 790 if (!client) { 791 throw new Error("Unknown remote client ID: '" + clientId + "'."); 792 } 793 if (client.stale) { 794 throw new Error("Stale remote client ID: '" + clientId + "'."); 795 } 796 797 let action = { 798 command, 799 args, 800 // We send the flowID to the other client so *it* can report it in its 801 // telemetry - we record it in ours below. 802 flowID: telemetryExtra.flowID, 803 }; 804 805 if (await this._addClientCommand(clientId, action)) { 806 this._log.trace(`Client ${clientId} got a new action`, [command, args]); 807 await this._tracker.addChangedID(clientId); 808 try { 809 telemetryExtra.deviceID = 810 this.service.identity.hashedDeviceID(clientId); 811 } catch (_) {} 812 813 Glean.syncClient.sendcommand.record({ 814 command, 815 flow_id: telemetryExtra.flowID, 816 reason: telemetryExtra.reason, 817 }); 818 this.service.recordTelemetryEvent( 819 "sendcommand", 820 command, 821 undefined, 822 telemetryExtra 823 ); 824 } else { 825 this._log.trace(`Client ${clientId} got a duplicate action`, [ 826 command, 827 args, 828 ]); 829 } 830 }, 831 832 /** 833 * Check if the local client has any remote commands and perform them. 834 * 835 * @return false to abort sync 836 */ 837 async processIncomingCommands() { 838 return this._notify("clients:process-commands", "", async function () { 839 if ( 840 !this.localCommands || 841 (this._lastModifiedOnProcessCommands == this._localClientLastModified && 842 !this.ignoreLastModifiedOnProcessCommands) 843 ) { 844 return true; 845 } 846 this._lastModifiedOnProcessCommands = this._localClientLastModified; 847 848 const clearedCommands = await this._readCommands()[this.localID]; 849 const commands = this.localCommands.filter( 850 command => !hasDupeCommand(clearedCommands, command) 851 ); 852 let didRemoveCommand = false; 853 // Process each command in order. 854 for (let rawCommand of commands) { 855 let shouldRemoveCommand = true; // most commands are auto-removed. 856 let { command, args, flowID } = rawCommand; 857 this._log.debug("Processing command " + command, args); 858 859 Glean.syncClient.processcommand.record({ 860 command, 861 flow_id: flowID, 862 }); 863 this.service.recordTelemetryEvent( 864 "processcommand", 865 command, 866 undefined, 867 { flowID } 868 ); 869 870 let engines = [args[0]]; 871 switch (command) { 872 case "resetAll": 873 engines = null; 874 // Fallthrough 875 case "resetEngine": 876 await this.service.resetClient(engines); 877 break; 878 case "wipeEngine": 879 await this.service.wipeClient(engines); 880 break; 881 case "logout": 882 this.service.logout(); 883 return false; 884 default: 885 this._log.warn("Received an unknown command: " + command); 886 break; 887 } 888 // Add the command to the "cleared" commands list 889 if (shouldRemoveCommand) { 890 await this.removeLocalCommand(rawCommand); 891 didRemoveCommand = true; 892 } 893 } 894 if (didRemoveCommand) { 895 await this._tracker.addChangedID(this.localID); 896 } 897 898 return true; 899 })(); 900 }, 901 902 /** 903 * Validates and sends a command to a client or all clients. 904 * 905 * Calling this does not actually sync the command data to the server. If the 906 * client already has the command/args pair, it won't receive a duplicate 907 * command. 908 * This method is async since it writes the command to a file. 909 * 910 * @param command 911 * Command to invoke on remote clients 912 * @param args 913 * Array of arguments to give to the command 914 * @param clientId 915 * Client ID to send command to. If undefined, send to all remote 916 * clients. 917 * @param flowID 918 * A unique identifier used to track success for this operation across 919 * devices. 920 */ 921 async sendCommand(command, args, clientId = null, telemetryExtra = {}) { 922 let commandData = this._commands[command]; 923 // Don't send commands that we don't know about. 924 if (!commandData) { 925 this._log.error("Unknown command to send: " + command); 926 return; 927 } else if (!args || args.length != commandData.args) { 928 // Don't send a command with the wrong number of arguments. 929 this._log.error( 930 "Expected " + 931 commandData.args + 932 " args for '" + 933 command + 934 "', but got " + 935 args 936 ); 937 return; 938 } 939 940 // We allocate a "flowID" here, so it is used for each client. 941 telemetryExtra = Object.assign({}, telemetryExtra); // don't clobber the caller's object 942 if (!telemetryExtra.flowID) { 943 telemetryExtra.flowID = Utils.makeGUID(); 944 } 945 946 if (clientId) { 947 await this._sendCommandToClient(command, args, clientId, telemetryExtra); 948 } else { 949 for (let [id, record] of Object.entries(this._store._remoteClients)) { 950 if (!record.stale) { 951 await this._sendCommandToClient(command, args, id, telemetryExtra); 952 } 953 } 954 } 955 }, 956 957 async _removeRemoteClient(id) { 958 delete this._store._remoteClients[id]; 959 await this._tracker.removeChangedID(id); 960 await this._removeClientCommands(id); 961 this._modified.delete(id); 962 }, 963 }; 964 Object.setPrototypeOf(ClientEngine.prototype, SyncEngine.prototype); 965 966 function ClientStore(name, engine) { 967 Store.call(this, name, engine); 968 } 969 ClientStore.prototype = { 970 _remoteClients: {}, 971 972 async create(record) { 973 await this.update(record); 974 }, 975 976 async update(record) { 977 if (record.id == this.engine.localID) { 978 // Only grab commands from the server; local name/type always wins 979 this.engine.localCommands = record.commands; 980 } else { 981 this._remoteClients[record.id] = record.cleartext; 982 } 983 }, 984 985 async createRecord(id, collection) { 986 let record = new ClientsRec(collection, id); 987 988 const commandsChanges = this.engine._currentlySyncingCommands 989 ? this.engine._currentlySyncingCommands[id] 990 : []; 991 992 // Package the individual components into a record for the local client 993 if (id == this.engine.localID) { 994 try { 995 record.fxaDeviceId = await this.engine.fxAccounts.device.getLocalId(); 996 } catch (error) { 997 this._log.warn("failed to get fxa device id", error); 998 } 999 record.name = this.engine.localName; 1000 record.type = this.engine.localType; 1001 record.version = Services.appinfo.version; 1002 record.protocols = SUPPORTED_PROTOCOL_VERSIONS; 1003 1004 // Substract the commands we recorded that we've already executed 1005 if ( 1006 commandsChanges && 1007 commandsChanges.length && 1008 this.engine.localCommands && 1009 this.engine.localCommands.length 1010 ) { 1011 record.commands = this.engine.localCommands.filter( 1012 command => !hasDupeCommand(commandsChanges, command) 1013 ); 1014 } 1015 1016 // Optional fields. 1017 record.os = Services.appinfo.OS; // "Darwin" 1018 record.appPackage = Services.appinfo.ID; 1019 record.application = this.engine.brandName; // "Nightly" 1020 1021 // We can't compute these yet. 1022 // record.device = ""; // Bug 1100723 1023 // record.formfactor = ""; // Bug 1100722 1024 } else { 1025 record.cleartext = Object.assign({}, this._remoteClients[id]); 1026 delete record.cleartext.serverLastModified; // serverLastModified is a local only attribute. 1027 1028 // Add the commands we have to send 1029 if (commandsChanges && commandsChanges.length) { 1030 const recordCommands = record.cleartext.commands || []; 1031 const newCommands = commandsChanges.filter( 1032 command => !hasDupeCommand(recordCommands, command) 1033 ); 1034 record.cleartext.commands = recordCommands.concat(newCommands); 1035 } 1036 1037 if (record.cleartext.stale) { 1038 // It's almost certainly a logic error for us to upload a record we 1039 // consider stale, so make log noise, but still remove the flag. 1040 this._log.error( 1041 `Preparing to upload record ${id} that we consider stale` 1042 ); 1043 delete record.cleartext.stale; 1044 } 1045 } 1046 if (record.commands) { 1047 const maxPayloadSize = 1048 this.engine.service.getMemcacheMaxRecordPayloadSize(); 1049 let origOrder = new Map(record.commands.map((c, i) => [c, i])); 1050 // we sort first by priority, and second by age (indicated by order in the 1051 // original list) 1052 let commands = record.commands.slice().sort((a, b) => { 1053 let infoA = this.engine._commands[a.command]; 1054 let infoB = this.engine._commands[b.command]; 1055 // Treat unknown command types as highest priority, to allow us to add 1056 // high priority commands in the future without worrying about clients 1057 // removing them on each-other unnecessarially. 1058 let importA = infoA ? infoA.importance : 0; 1059 let importB = infoB ? infoB.importance : 0; 1060 // Higher importantance numbers indicate that we care less, so they 1061 // go to the end of the list where they'll be popped off. 1062 let importDelta = importA - importB; 1063 if (importDelta != 0) { 1064 return importDelta; 1065 } 1066 let origIdxA = origOrder.get(a); 1067 let origIdxB = origOrder.get(b); 1068 // Within equivalent priorities, we put older entries near the end 1069 // of the list, so that they are removed first. 1070 return origIdxB - origIdxA; 1071 }); 1072 let truncatedCommands = Utils.tryFitItems(commands, maxPayloadSize); 1073 if (truncatedCommands.length != record.commands.length) { 1074 this._log.warn( 1075 `Removing commands from client ${id} (from ${record.commands.length} to ${truncatedCommands.length})` 1076 ); 1077 // Restore original order. 1078 record.commands = truncatedCommands.sort( 1079 (a, b) => origOrder.get(a) - origOrder.get(b) 1080 ); 1081 } 1082 } 1083 return record; 1084 }, 1085 1086 async itemExists(id) { 1087 return id in (await this.getAllIDs()); 1088 }, 1089 1090 async getAllIDs() { 1091 let ids = {}; 1092 ids[this.engine.localID] = true; 1093 for (let id in this._remoteClients) { 1094 ids[id] = true; 1095 } 1096 return ids; 1097 }, 1098 1099 async wipe() { 1100 this._remoteClients = {}; 1101 }, 1102 }; 1103 Object.setPrototypeOf(ClientStore.prototype, Store.prototype); 1104 1105 function ClientsTracker(name, engine) { 1106 LegacyTracker.call(this, name, engine); 1107 } 1108 ClientsTracker.prototype = { 1109 _enabled: false, 1110 1111 onStart() { 1112 Svc.Obs.add("fxaccounts:new_device_id", this.asyncObserver); 1113 Services.prefs.addObserver( 1114 PREF_ACCOUNT_ROOT + "device.name", 1115 this.asyncObserver 1116 ); 1117 }, 1118 onStop() { 1119 Services.prefs.removeObserver( 1120 PREF_ACCOUNT_ROOT + "device.name", 1121 this.asyncObserver 1122 ); 1123 Svc.Obs.remove("fxaccounts:new_device_id", this.asyncObserver); 1124 }, 1125 1126 async observe(subject, topic) { 1127 switch (topic) { 1128 case "nsPref:changed": 1129 this._log.debug("client.name preference changed"); 1130 // Fallthrough intended. 1131 case "fxaccounts:new_device_id": 1132 await this.addChangedID(this.engine.localID); 1133 this.score += SINGLE_USER_THRESHOLD + 1; // ALWAYS SYNC NOW. 1134 break; 1135 } 1136 }, 1137 }; 1138 Object.setPrototypeOf(ClientsTracker.prototype, LegacyTracker.prototype);