kinto-storage-adapter.sys.mjs (16119B)
1 /* 2 * Licensed under the Apache License, Version 2.0 (the "License"); 3 * you may not use this file except in compliance with the License. 4 * You may obtain a copy of the License at 5 * 6 * http://www.apache.org/licenses/LICENSE-2.0 7 * 8 * Unless required by applicable law or agreed to in writing, software 9 * distributed under the License is distributed on an "AS IS" BASIS, 10 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 * See the License for the specific language governing permissions and 12 * limitations under the License. 13 */ 14 import { Sqlite } from "resource://gre/modules/Sqlite.sys.mjs"; 15 16 import { Kinto } from "resource://services-common/kinto-offline-client.sys.mjs"; 17 18 /** 19 * Filter and sort list against provided filters and order. 20 * 21 * @param {object} filters The filters to apply. 22 * @param {string} order The order to apply. 23 * @param {Array} list The list to reduce. 24 * @return {Array} 25 */ 26 function reduceRecords(filters, order, list) { 27 const filtered = filters ? filterObjects(filters, list) : list; 28 return order ? sortObjects(order, filtered) : filtered; 29 } 30 31 /** 32 * Checks if a value is undefined. 33 * 34 * This is a copy of `_isUndefined` from kinto.js/src/utils.js. 35 * 36 * @param {Any} value 37 * @return {boolean} 38 */ 39 function _isUndefined(value) { 40 return typeof value === "undefined"; 41 } 42 43 /** 44 * Sorts records in a list according to a given ordering. 45 * 46 * This is a copy of `sortObjects` from kinto.js/src/utils.js. 47 * 48 * @param {string} order The ordering, eg. `-last_modified`. 49 * @param {Array} list The collection to order. 50 * @return {Array} 51 */ 52 function sortObjects(order, list) { 53 const hasDash = order[0] === "-"; 54 const field = hasDash ? order.slice(1) : order; 55 const direction = hasDash ? -1 : 1; 56 return list.slice().sort((a, b) => { 57 if (a[field] && _isUndefined(b[field])) { 58 return direction; 59 } 60 if (b[field] && _isUndefined(a[field])) { 61 return -direction; 62 } 63 if (_isUndefined(a[field]) && _isUndefined(b[field])) { 64 return 0; 65 } 66 return a[field] > b[field] ? direction : -direction; 67 }); 68 } 69 70 /** 71 * Test if a single object matches all given filters. 72 * 73 * This is a copy of `filterObject` from kinto.js/src/utils.js. 74 * 75 * @param {object} filters The filters object. 76 * @param {object} entry The object to filter. 77 * @return {Function} 78 */ 79 function filterObject(filters, entry) { 80 return Object.keys(filters).every(filter => { 81 const value = filters[filter]; 82 if (Array.isArray(value)) { 83 return value.some(candidate => candidate === entry[filter]); 84 } 85 return entry[filter] === value; 86 }); 87 } 88 89 /** 90 * Filters records in a list matching all given filters. 91 * 92 * This is a copy of `filterObjects` from kinto.js/src/utils.js. 93 * 94 * @param {object} filters The filters object. 95 * @param {Array} list The collection to filter. 96 * @return {Array} 97 */ 98 function filterObjects(filters, list) { 99 return list.filter(entry => { 100 return filterObject(filters, entry); 101 }); 102 } 103 104 const statements = { 105 createCollectionData: ` 106 CREATE TABLE collection_data ( 107 collection_name TEXT, 108 record_id TEXT, 109 record TEXT 110 );`, 111 112 createCollectionMetadata: ` 113 CREATE TABLE collection_metadata ( 114 collection_name TEXT PRIMARY KEY, 115 last_modified INTEGER, 116 metadata TEXT 117 ) WITHOUT ROWID;`, 118 119 createCollectionDataRecordIdIndex: ` 120 CREATE UNIQUE INDEX unique_collection_record 121 ON collection_data(collection_name, record_id);`, 122 123 clearData: ` 124 DELETE FROM collection_data 125 WHERE collection_name = :collection_name;`, 126 127 createData: ` 128 INSERT INTO collection_data (collection_name, record_id, record) 129 VALUES (:collection_name, :record_id, :record);`, 130 131 updateData: ` 132 INSERT OR REPLACE INTO collection_data (collection_name, record_id, record) 133 VALUES (:collection_name, :record_id, :record);`, 134 135 deleteData: ` 136 DELETE FROM collection_data 137 WHERE collection_name = :collection_name 138 AND record_id = :record_id;`, 139 140 saveLastModified: ` 141 INSERT INTO collection_metadata(collection_name, last_modified) 142 VALUES(:collection_name, :last_modified) 143 ON CONFLICT(collection_name) DO UPDATE SET last_modified = :last_modified`, 144 145 getLastModified: ` 146 SELECT last_modified 147 FROM collection_metadata 148 WHERE collection_name = :collection_name;`, 149 150 saveMetadata: ` 151 INSERT INTO collection_metadata(collection_name, metadata) 152 VALUES(:collection_name, :metadata) 153 ON CONFLICT(collection_name) DO UPDATE SET metadata = :metadata`, 154 155 getMetadata: ` 156 SELECT metadata 157 FROM collection_metadata 158 WHERE collection_name = :collection_name;`, 159 160 getRecord: ` 161 SELECT record 162 FROM collection_data 163 WHERE collection_name = :collection_name 164 AND record_id = :record_id;`, 165 166 listRecords: ` 167 SELECT record 168 FROM collection_data 169 WHERE collection_name = :collection_name;`, 170 171 // N.B. we have to have a dynamic number of placeholders, which you 172 // can't do without building your own statement. See `execute` for details 173 listRecordsById: ` 174 SELECT record_id, record 175 FROM collection_data 176 WHERE collection_name = ? 177 AND record_id IN `, 178 179 importData: ` 180 REPLACE INTO collection_data (collection_name, record_id, record) 181 VALUES (:collection_name, :record_id, :record);`, 182 183 scanAllRecords: `SELECT * FROM collection_data;`, 184 185 clearCollectionMetadata: `DELETE FROM collection_metadata;`, 186 187 calculateStorage: ` 188 SELECT collection_name, SUM(LENGTH(record)) as size, COUNT(record) as num_records 189 FROM collection_data 190 GROUP BY collection_name;`, 191 192 addMetadataColumn: ` 193 ALTER TABLE collection_metadata 194 ADD COLUMN metadata TEXT;`, 195 }; 196 197 const createStatements = [ 198 "createCollectionData", 199 "createCollectionMetadata", 200 "createCollectionDataRecordIdIndex", 201 ]; 202 203 const currentSchemaVersion = 2; 204 205 /** 206 * Firefox adapter. 207 * 208 * Uses Sqlite as a backing store. 209 * 210 * Options: 211 * - sqliteHandle: a handle to the Sqlite database this adapter will 212 * use as its backing store. To open such a handle, use the 213 * static openConnection() method. 214 */ 215 export class FirefoxAdapter extends Kinto.adapters.BaseAdapter { 216 constructor(collection, options = {}) { 217 super(); 218 const { sqliteHandle = null } = options; 219 this.collection = collection; 220 this._connection = sqliteHandle; 221 this._options = options; 222 } 223 224 /** 225 * Initialize a Sqlite connection to be suitable for use with Kinto. 226 * 227 * This will be called automatically by open(). 228 */ 229 static async _init(connection) { 230 await connection.executeTransaction(async function doSetup() { 231 const schema = await connection.getSchemaVersion(); 232 233 if (schema == 0) { 234 for (let statementName of createStatements) { 235 await connection.execute(statements[statementName]); 236 } 237 await connection.setSchemaVersion(currentSchemaVersion); 238 } else if (schema == 1) { 239 await connection.execute(statements.addMetadataColumn); 240 await connection.setSchemaVersion(currentSchemaVersion); 241 } else if (schema != 2) { 242 throw new Error("Unknown database schema: " + schema); 243 } 244 }); 245 return connection; 246 } 247 248 _executeStatement(statement, params) { 249 return this._connection.executeCached(statement, params); 250 } 251 252 /** 253 * Open and initialize a Sqlite connection to a database that Kinto 254 * can use. When you are done with this connection, close it by 255 * calling close(). 256 * 257 * Options: 258 * - path: The path for the Sqlite database 259 * 260 * @returns SqliteConnection 261 */ 262 static async openConnection(options) { 263 const opts = Object.assign({}, { sharedMemoryCache: false }, options); 264 const conn = await Sqlite.openConnection(opts).then(this._init); 265 try { 266 Sqlite.shutdown.addBlocker( 267 "Kinto storage adapter connection closing", 268 () => conn.close() 269 ); 270 } catch (e) { 271 // It's too late to block shutdown, just close the connection. 272 await conn.close(); 273 throw e; 274 } 275 return conn; 276 } 277 278 clear() { 279 const params = { collection_name: this.collection }; 280 return this._executeStatement(statements.clearData, params); 281 } 282 283 execute(callback, options = { preload: [] }) { 284 let result; 285 const conn = this._connection; 286 const collection = this.collection; 287 288 return conn 289 .executeTransaction(async function doExecuteTransaction() { 290 // Preload specified records from DB, within transaction. 291 292 // if options.preload has more elements than the sqlite variable 293 // limit, split it up. 294 const limit = 100; 295 let preloaded = {}; 296 let preload; 297 let more = options.preload; 298 299 while (more.length) { 300 preload = more.slice(0, limit); 301 more = more.slice(limit, more.length); 302 303 const parameters = [collection, ...preload]; 304 const placeholders = preload.map(_ => "?"); 305 const stmt = 306 statements.listRecordsById + "(" + placeholders.join(",") + ");"; 307 const rows = await conn.execute(stmt, parameters); 308 309 rows.reduce((acc, row) => { 310 const record = JSON.parse(row.getResultByName("record")); 311 acc[row.getResultByName("record_id")] = record; 312 return acc; 313 }, preloaded); 314 } 315 const proxy = transactionProxy(collection, preloaded); 316 result = callback(proxy); 317 318 for (let { statement, params } of proxy.operations) { 319 await conn.executeCached(statement, params); 320 } 321 }, conn.TRANSACTION_EXCLUSIVE) 322 .then(_ => result); 323 } 324 325 get(id) { 326 const params = { 327 collection_name: this.collection, 328 record_id: id, 329 }; 330 return this._executeStatement(statements.getRecord, params).then(result => { 331 if (!result.length) { 332 return null; 333 } 334 return JSON.parse(result[0].getResultByName("record")); 335 }); 336 } 337 338 list(params = { filters: {}, order: "" }) { 339 const parameters = { 340 collection_name: this.collection, 341 }; 342 return this._executeStatement(statements.listRecords, parameters) 343 .then(result => { 344 const records = []; 345 for (let k = 0; k < result.length; k++) { 346 const row = result[k]; 347 records.push(JSON.parse(row.getResultByName("record"))); 348 } 349 return records; 350 }) 351 .then(results => { 352 // The resulting list of records is filtered and sorted. 353 // XXX: with some efforts, this could be implemented using SQL. 354 return reduceRecords(params.filters, params.order, results); 355 }); 356 } 357 358 async loadDump(records) { 359 return this.importBulk(records); 360 } 361 362 /** 363 * Load a list of records into the local database. 364 * 365 * Note: The adapter is not in charge of filtering the already imported 366 * records. This is done in `Collection#loadDump()`, as a common behaviour 367 * between every adapters. 368 * 369 * @param {Array} records. 370 * @return {Array} imported records. 371 */ 372 async importBulk(records) { 373 const connection = this._connection; 374 const collection_name = this.collection; 375 await connection.executeTransaction(async function doImport() { 376 for (let record of records) { 377 const params = { 378 collection_name, 379 record_id: record.id, 380 record: JSON.stringify(record), 381 }; 382 await connection.execute(statements.importData, params); 383 } 384 const lastModified = Math.max( 385 ...records.map(record => record.last_modified) 386 ); 387 const params = { 388 collection_name, 389 }; 390 const previousLastModified = await connection 391 .execute(statements.getLastModified, params) 392 .then(result => { 393 return result.length 394 ? result[0].getResultByName("last_modified") 395 : -1; 396 }); 397 if (lastModified > previousLastModified) { 398 const params = { 399 collection_name, 400 last_modified: lastModified, 401 }; 402 await connection.execute(statements.saveLastModified, params); 403 } 404 }); 405 return records; 406 } 407 408 saveLastModified(lastModified) { 409 const parsedLastModified = parseInt(lastModified, 10) || null; 410 const params = { 411 collection_name: this.collection, 412 last_modified: parsedLastModified, 413 }; 414 return this._executeStatement(statements.saveLastModified, params).then( 415 () => parsedLastModified 416 ); 417 } 418 419 getLastModified() { 420 const params = { 421 collection_name: this.collection, 422 }; 423 return this._executeStatement(statements.getLastModified, params).then( 424 result => { 425 if (!result.length) { 426 return 0; 427 } 428 return result[0].getResultByName("last_modified"); 429 } 430 ); 431 } 432 433 async saveMetadata(metadata) { 434 const params = { 435 collection_name: this.collection, 436 metadata: JSON.stringify(metadata), 437 }; 438 await this._executeStatement(statements.saveMetadata, params); 439 return metadata; 440 } 441 442 async getMetadata() { 443 const params = { 444 collection_name: this.collection, 445 }; 446 const result = await this._executeStatement(statements.getMetadata, params); 447 if (!result.length) { 448 return null; 449 } 450 return JSON.parse(result[0].getResultByName("metadata")); 451 } 452 453 calculateStorage() { 454 return this._executeStatement(statements.calculateStorage, {}).then( 455 result => { 456 return Array.from(result, row => ({ 457 collectionName: row.getResultByName("collection_name"), 458 size: row.getResultByName("size"), 459 numRecords: row.getResultByName("num_records"), 460 })); 461 } 462 ); 463 } 464 465 /** 466 * Reset the sync status of every record and collection we have 467 * access to. 468 */ 469 resetSyncStatus() { 470 // We're going to use execute instead of executeCached, so build 471 // in our own sanity check 472 if (!this._connection) { 473 throw new Error("The storage adapter is not open"); 474 } 475 476 return this._connection.executeTransaction(async function (conn) { 477 const promises = []; 478 await conn.execute(statements.scanAllRecords, null, function (row) { 479 const record = JSON.parse(row.getResultByName("record")); 480 const record_id = row.getResultByName("record_id"); 481 const collection_name = row.getResultByName("collection_name"); 482 if (record._status === "deleted") { 483 // Garbage collect deleted records. 484 promises.push( 485 conn.execute(statements.deleteData, { collection_name, record_id }) 486 ); 487 } else { 488 const newRecord = Object.assign({}, record, { 489 _status: "created", 490 last_modified: undefined, 491 }); 492 promises.push( 493 conn.execute(statements.updateData, { 494 record: JSON.stringify(newRecord), 495 record_id, 496 collection_name, 497 }) 498 ); 499 } 500 }); 501 await Promise.all(promises); 502 await conn.execute(statements.clearCollectionMetadata); 503 }); 504 } 505 } 506 507 function transactionProxy(collection, preloaded) { 508 const _operations = []; 509 510 return { 511 get operations() { 512 return _operations; 513 }, 514 515 create(record) { 516 _operations.push({ 517 statement: statements.createData, 518 params: { 519 collection_name: collection, 520 record_id: record.id, 521 record: JSON.stringify(record), 522 }, 523 }); 524 }, 525 526 update(record) { 527 _operations.push({ 528 statement: statements.updateData, 529 params: { 530 collection_name: collection, 531 record_id: record.id, 532 record: JSON.stringify(record), 533 }, 534 }); 535 }, 536 537 delete(id) { 538 _operations.push({ 539 statement: statements.deleteData, 540 params: { 541 collection_name: collection, 542 record_id: id, 543 }, 544 }); 545 }, 546 547 get(id) { 548 // Gecko JS engine outputs undesired warnings if id is not in preloaded. 549 return id in preloaded ? preloaded[id] : undefined; 550 }, 551 }; 552 }