test_cacheapi_encryption_PBM.py (6795B)
1 # This Source Code Form is subject to the terms of the Mozilla Public 2 # License, v. 2.0. If a copy of the MPL was not distributed with this 3 # file, You can obtain one at http://mozilla.org/MPL/2.0/. 4 5 import os 6 import re 7 import sys 8 from pathlib import Path 9 10 sys.path.append(os.fspath(Path(__file__).parents[3] / "quota/test/marionette")) 11 12 from quota_test_case import QuotaTestCase 13 14 CACHEAPI_PBM_PREF = "dom.cache.privateBrowsing.enabled" 15 QM_TESTING_PREF = "dom.quotaManager.testing" 16 17 18 class CacheAPIEncryptionPBM(QuotaTestCase): 19 """ 20 Bug1856953: Ensure CacheAPI data gets encrypted in Private Browsing Mode. 21 We need to ensure data inside both sqlite fields and request/response files 22 gets encrypted 23 """ 24 25 def setUp(self): 26 super().setUp() 27 28 self.testHTML = "dom/cache/basicCacheAPI_PBM.html" 29 self.cacheName = "CachePBMTest" 30 self.profilePath = self.marionette.instance.profile.profile 31 self.cacheAPIStoragePath = None 32 33 self.defaultCacheAPIPBMPref = self.marionette.get_pref(CACHEAPI_PBM_PREF) 34 self.marionette.set_pref(CACHEAPI_PBM_PREF, True) 35 36 self.defaultQMPrefValue = self.marionette.get_pref(QM_TESTING_PREF) 37 self.marionette.set_pref(QM_TESTING_PREF, True) 38 39 self.cacheRequestStr = "https://example.com/" 40 self.cacheResponseStr = "CacheAPIEncryptionPBM" 41 42 self.cacheDBFileName = "caches.sqlite" 43 self.cacheDBJournalFileName = "caches.sqlite-wal" 44 45 self.dbCheckpointThresholdBytes = 512 * 1024 46 47 def tearDown(self): 48 super().tearDown() 49 50 self.marionette.set_pref(CACHEAPI_PBM_PREF, self.defaultCacheAPIPBMPref) 51 self.marionette.set_pref(QM_TESTING_PREF, self.defaultQMPrefValue) 52 53 def test_request_response_ondisk(self): 54 with self.using_new_window(self.testHTML, private=False) as ( 55 self.origin, 56 self.persistenceType, 57 ): 58 self.runAndValidate( 59 lambda exists: self.assertTrue( 60 exists, "Failed to find expected data on disk" 61 ) 62 ) 63 64 def test_encrypted_request_response_ondisk(self): 65 with self.using_new_window(self.testHTML, private=True) as ( 66 self.origin, 67 self.persistenceType, 68 ): 69 self.runAndValidate( 70 lambda exists: self.assertFalse(exists, "Data on disk is not encrypted") 71 ) 72 73 def runAndValidate(self, validator): 74 self.marionette.execute_async_script( 75 """ 76 const [name, requestStr, responseStr, resolve] = arguments; 77 78 const request = new Request(requestStr); 79 const response = new Response(responseStr); 80 window.wrappedJSObject.addDataIntoCache(name, request, response) 81 .then(resolve); 82 """, 83 script_args=( 84 self.cacheName, 85 self.cacheRequestStr, 86 self.cacheResponseStr, 87 ), 88 ) 89 90 self.ensureInvariantHolds( 91 lambda _: os.path.exists(self.getCacheAPIStoragePath()) 92 ) 93 94 self.validateSqlite(validator) 95 self.validateBodyFile(validator) 96 97 def validateBodyFile(self, validator): 98 # Ensure response bodies have been flushed to the disk 99 self.ensureInvariantHolds( 100 lambda _: self.findDirObj(self.getCacheAPIStoragePath(), "morgue", False) 101 is not None 102 ) 103 104 cacheResponseDir = self.findDirObj( 105 self.getCacheAPIStoragePath(), "morgue", False 106 ) 107 108 self.ensureInvariantHolds(lambda _: any(os.listdir(cacheResponseDir))) 109 110 # Get response bodies directory corresponding to the cache 'self.CacheName' since, there's 111 # only one cache object in this origin, it must be the first one. 112 cacheResponseBodiesPath = [ 113 d for d in Path(cacheResponseDir).iterdir() if d.is_dir() 114 ][0] 115 116 # Ensure bodies have been transferred to '.final' from '.tmp' 117 self.ensureInvariantHolds( 118 lambda _: self.findDirObj(cacheResponseBodiesPath, ".final", True) 119 is not None 120 ) 121 cacheResponseBodyPath = self.findDirObj(cacheResponseBodiesPath, ".final", True) 122 123 # Since a cache response would get compressed using snappy; and an unencrypted response would 124 # contain 'sNaPpY' as a compression header in the response body file. Check to ensure that 125 # 'sNaPpy' does not exist if bodies are getting encrypted. 126 foundRawValue = False 127 with open(cacheResponseBodyPath, "rb") as f_binary: 128 foundRawValue = re.search(b"sNaPpY", f_binary.read()) is not None 129 130 validator(foundRawValue) 131 132 def validateSqlite(self, validator): 133 self.ensureInvariantHolds( 134 lambda _: self.findDirObj( 135 self.getCacheAPIStoragePath(), self.cacheDBJournalFileName, True 136 ) 137 is not None 138 ) 139 dbJournalFile = self.findDirObj( 140 self.getCacheAPIStoragePath(), self.cacheDBJournalFileName, True 141 ) 142 143 self.ensureInvariantHolds( 144 lambda _: self.findDirObj( 145 self.getCacheAPIStoragePath(), self.cacheDBFileName, True 146 ) 147 is not None 148 ) 149 dbFile = self.findDirObj( 150 self.getCacheAPIStoragePath(), self.cacheDBFileName, True 151 ) 152 153 # Confirm journal file size is less than 512KB which ensures that checkpoint 154 # has not happend yet (dom/cache/DBSchema.cpp::InitializeConnection, kWalAutoCheckpointPages) 155 self.assertTrue( 156 os.path.getsize(dbJournalFile) < self.dbCheckpointThresholdBytes 157 ) 158 159 # Before checkpointing, journal file size should be greater than main sqlite db file. 160 self.assertTrue(os.path.getsize(dbJournalFile) > os.path.getsize(dbFile)) 161 162 validator( 163 self.cacheRequestStr.encode("ascii") in open(dbJournalFile, "rb").read() 164 ) 165 166 self.assertTrue( 167 self.resetStoragesForClient(self.persistenceType, self.origin, "cache") 168 ) 169 170 self.assertFalse(os.path.getsize(dbJournalFile) > os.path.getsize(dbFile)) 171 172 validator(self.cacheRequestStr.encode("ascii") in open(dbFile, "rb").read()) 173 174 def getCacheAPIStoragePath(self): 175 if self.cacheAPIStoragePath is not None: 176 return self.cacheAPIStoragePath 177 178 assert self.origin is not None 179 assert self.persistenceType is not None 180 181 self.cacheAPIStoragePath = self.getStoragePath( 182 self.profilePath, self.origin, self.persistenceType, "cache" 183 ) 184 185 print("cacheAPI origin directory = " + self.cacheAPIStoragePath) 186 return self.cacheAPIStoragePath