bridged_engine.sys.mjs (11353B)
1 /* This Source Code Form is subject to the terms of the Mozilla Public 2 * License, v. 2.0. If a copy of the MPL was not distributed with this 3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4 5 /** 6 * This file has all the machinery for hooking up bridged engines implemented 7 * in Rust. It's the JavaScript side of the Golden Gate bridge that connects 8 * Desktop Sync to a Rust `BridgedEngine`, via the `mozIBridgedSyncEngine` 9 * XPCOM interface. 10 * 11 * Creating a bridged engine only takes a few lines of code, since most of the 12 * hard work is done on the Rust side. On the JS side, you'll need to subclass 13 * `BridgedEngine` (instead of `SyncEngine`), supply a `mozIBridgedSyncEngine` 14 * for your subclass to wrap, and optionally implement and override the tracker. 15 */ 16 17 import { SyncEngine, Tracker } from "resource://services-sync/engines.sys.mjs"; 18 import { RawCryptoWrapper } from "resource://services-sync/record.sys.mjs"; 19 20 const lazy = {}; 21 22 ChromeUtils.defineESModuleGetters(lazy, { 23 Log: "resource://gre/modules/Log.sys.mjs", 24 PlacesUtils: "resource://gre/modules/PlacesUtils.sys.mjs", 25 }); 26 27 /** 28 * A stub store that converts between raw decrypted incoming records and 29 * envelopes. Since the interface we need is so minimal, this class doesn't 30 * inherit from the base `Store` implementation...it would take more code to 31 * override all those behaviors! 32 * 33 * This class isn't meant to be subclassed, because bridged engines shouldn't 34 * override their store classes in `_storeObj`. 35 */ 36 class BridgedStore { 37 constructor(name, engine) { 38 if (!engine) { 39 throw new Error("Store must be associated with an Engine instance."); 40 } 41 this.engine = engine; 42 this._log = engine.log; 43 this._batchChunkSize = 500; 44 } 45 46 async applyIncomingBatch(records) { 47 for (let chunk of lazy.PlacesUtils.chunkArray( 48 records, 49 this._batchChunkSize 50 )) { 51 let incomingEnvelopesAsJSON = chunk.map(record => 52 JSON.stringify(record.toIncomingBso()) 53 ); 54 this._log.trace("incoming envelopes", incomingEnvelopesAsJSON); 55 await this.engine._bridge.storeIncoming(incomingEnvelopesAsJSON); 56 } 57 // Array of failed records. 58 return []; 59 } 60 61 async wipe() { 62 await this.engine._bridge.wipe(); 63 } 64 } 65 66 /** 67 * A wrapper class to convert between BSOs on the JS side, and envelopes on the 68 * Rust side. This class intentionally subclasses `RawCryptoWrapper`, because we 69 * don't want the stringification and parsing machinery in `CryptoWrapper`. 70 * 71 * This class isn't meant to be subclassed, because bridged engines shouldn't 72 * override their record classes in `_recordObj`. 73 */ 74 class BridgedRecord extends RawCryptoWrapper { 75 /** 76 * Creates an outgoing record from a BSO returned by a bridged engine. 77 * 78 * @param {string} collection The collection name. 79 * @param {object} bso The outgoing bso (ie, a sync15::bso::OutgoingBso) returned from 80 * `mozIBridgedSyncEngine::apply`. 81 * @return {BridgedRecord} A Sync record ready to encrypt and upload. 82 */ 83 static fromOutgoingBso(collection, bso) { 84 // The BSO has already been JSON serialized coming out of Rust, so the 85 // envelope has been flattened. 86 if (typeof bso.id != "string") { 87 throw new TypeError("Outgoing BSO missing ID"); 88 } 89 if (typeof bso.payload != "string") { 90 throw new TypeError("Outgoing BSO missing payload"); 91 } 92 let record = new BridgedRecord(collection, bso.id); 93 record.cleartext = bso.payload; 94 return record; 95 } 96 97 transformBeforeEncrypt(cleartext) { 98 if (typeof cleartext != "string") { 99 throw new TypeError("Outgoing bridged engine records must be strings"); 100 } 101 return cleartext; 102 } 103 104 transformAfterDecrypt(cleartext) { 105 if (typeof cleartext != "string") { 106 throw new TypeError("Incoming bridged engine records must be strings"); 107 } 108 return cleartext; 109 } 110 111 /** 112 * Converts this incoming record into an envelope to pass to a bridged engine. 113 * This object must be kept in sync with `sync15::IncomingBso`. 114 * 115 * @return {object} The incoming envelope, to pass to 116 * `mozIBridgedSyncEngine::storeIncoming`. 117 */ 118 toIncomingBso() { 119 return { 120 id: this.data.id, 121 modified: this.data.modified, 122 payload: this.cleartext, 123 }; 124 } 125 } 126 127 /** 128 * A base class used to plug a Rust engine into Sync, and have it work like any 129 * other engine. The constructor takes a bridge as its first argument, which is 130 * a "bridged sync engine", as defined by UniFFI in the application-services 131 * crate. 132 * 133 * This class inherits from `SyncEngine`, which has a lot of machinery that we 134 * don't need, but that's fairly easy to override. It would be harder to 135 * reimplement the machinery that we _do_ need here. However, because of that, 136 * this class has lots of methods that do nothing, or return empty data. The 137 * docs above each method explain what it's overriding, and why. 138 * 139 * This class is designed to be subclassed, but the only part that your engine 140 * may want to override is `_trackerObj`. Even then, using the default (no-op) 141 * tracker is fine, because the shape of the `Tracker` interface may not make 142 * sense for all engines. 143 */ 144 export function BridgedEngine(name, service) { 145 SyncEngine.call(this, name, service); 146 // The store is lazily created, which means we'd miss a number of logs if the store 147 // managed the log like happens for other engines. But we still keep Store in the name for consistency 148 // (but really we should kill all these sub-logs - no-one cares about ".Store" etc!) 149 this.log = lazy.Log.repository.getLogger(`Sync.Engine.${name}.Store`); 150 } 151 152 BridgedEngine.prototype = { 153 /** 154 * The Rust implemented bridge. Must be set by the engine which subclasses us. 155 */ 156 _bridge: null, 157 /** 158 * The tracker class for this engine. Subclasses may want to override this 159 * with their own tracker, though using the default `Tracker` is fine. 160 */ 161 _trackerObj: Tracker, 162 163 /** Returns the record class for all bridged engines. */ 164 get _recordObj() { 165 return BridgedRecord; 166 }, 167 168 set _recordObj(obj) { 169 throw new TypeError("Don't override the record class for bridged engines"); 170 }, 171 172 /** Returns the store class for all bridged engines. */ 173 get _storeObj() { 174 return BridgedStore; 175 }, 176 177 set _storeObj(obj) { 178 throw new TypeError("Don't override the store class for bridged engines"); 179 }, 180 181 /** Returns the storage version for this engine. */ 182 get version() { 183 return this._bridge.storageVersion; 184 }, 185 186 // Legacy engines allow sync to proceed if some records are too large to 187 // upload (eg, a payload that's bigger than the server's published limits). 188 // If this returns true, we will just skip the record without even attempting 189 // to upload. If this is false, we'll abort the entire batch. 190 // If the engine allows this, it will need to detect this scenario by noticing 191 // the ID is not in the 'success' records reported to `setUploaded`. 192 // (Note that this is not to be confused with the fact server's can currently 193 // reject records as part of a POST - but we hope to remove this ability from 194 // the server API. Note also that this is not bullet-proof - if the count of 195 // records is high, it's possible that we will have committed a previous 196 // batch before we hit the relevant limits, so things might have been written. 197 // We hope to fix this by ensuring batch limits are such that this is 198 // impossible) 199 get allowSkippedRecord() { 200 return this._bridge.allowSkippedRecord; 201 }, 202 203 /** 204 * Returns the sync ID for this engine. This is exposed for tests, but 205 * Sync code always calls `resetSyncID()` and `ensureCurrentSyncID()`, 206 * not this. 207 * 208 * @returns {string?} The sync ID, or `null` if one isn't set. 209 */ 210 async getSyncID() { 211 // Note that all methods on an XPCOM class instance are automatically bound, 212 // so we don't need to write `this._bridge.getSyncId.bind(this._bridge)`. 213 let syncID = await this._bridge.getSyncId(); 214 return syncID; 215 }, 216 217 async resetSyncID() { 218 await this._deleteServerCollection(); 219 let newSyncID = await this.resetLocalSyncID(); 220 return newSyncID; 221 }, 222 223 async resetLocalSyncID() { 224 let newSyncID = await this._bridge.resetSyncId(); 225 return newSyncID; 226 }, 227 228 async ensureCurrentSyncID(newSyncID) { 229 let assignedSyncID = await this._bridge.ensureCurrentSyncId(newSyncID); 230 return assignedSyncID; 231 }, 232 233 async getLastSync() { 234 // The bridge defines lastSync as integer ms, but sync itself wants to work 235 // in a float seconds with 2 decimal places. 236 let lastSyncMS = await this._bridge.lastSync(); 237 return Math.round(lastSyncMS / 10) / 100; 238 }, 239 240 async setLastSync(lastSyncSeconds) { 241 await this._bridge.setLastSync(Math.round(lastSyncSeconds * 1000)); 242 }, 243 244 /** 245 * Returns the initial changeset for the sync. Bridged engines handle 246 * reconciliation internally, so we don't know what changed until after we've 247 * stored and applied all incoming records. So we return an empty changeset 248 * here, and replace it with the real one in `_processIncoming`. 249 */ 250 async pullChanges() { 251 return {}; 252 }, 253 254 async trackRemainingChanges() { 255 await this._bridge.syncFinished(); 256 }, 257 258 /** 259 * Marks a record for a hard-`DELETE` at the end of the sync. The base method 260 * also removes it from the tracker, but we don't use the tracker for that, 261 * so we override the method to just mark. 262 */ 263 _deleteId(id) { 264 this._noteDeletedId(id); 265 }, 266 267 /** 268 * Always stage incoming records, bypassing the base engine's reconciliation 269 * machinery. 270 */ 271 async _reconcile() { 272 return true; 273 }, 274 275 async _syncStartup() { 276 await super._syncStartup(); 277 await this._bridge.syncStarted(); 278 }, 279 280 async _processIncoming(newitems) { 281 await super._processIncoming(newitems); 282 283 let outgoingBsosAsJSON = await this._bridge.apply(); 284 let changeset = {}; 285 for (let bsoAsJSON of outgoingBsosAsJSON) { 286 this._log.trace("outgoing bso", bsoAsJSON); 287 let record = BridgedRecord.fromOutgoingBso( 288 this.name, 289 JSON.parse(bsoAsJSON) 290 ); 291 changeset[record.id] = { 292 synced: false, 293 record, 294 }; 295 } 296 this._modified.replace(changeset); 297 }, 298 299 /** 300 * Notify the bridged engine that we've successfully uploaded a batch, so 301 * that it can update its local state. For example, if the engine uses a 302 * mirror and a temp table for outgoing records, it can write the uploaded 303 * records from the outgoing table back to the mirror. 304 */ 305 async _onRecordsWritten(succeeded, failed, serverModifiedTime) { 306 // JS uses seconds but Rust uses milliseconds so we'll need to convert 307 let serverModifiedMS = Math.round(serverModifiedTime * 1000); 308 await this._bridge.setUploaded(Math.floor(serverModifiedMS), succeeded); 309 }, 310 311 async _createTombstone() { 312 throw new Error("Bridged engines don't support weak uploads"); 313 }, 314 315 async _createRecord(id) { 316 let change = this._modified.changes[id]; 317 if (!change) { 318 throw new TypeError("Can't create record for unchanged item"); 319 } 320 return change.record; 321 }, 322 323 async _resetClient() { 324 await super._resetClient(); 325 await this._bridge.reset(); 326 }, 327 }; 328 Object.setPrototypeOf(BridgedEngine.prototype, SyncEngine.prototype);