quota_test_case.py (7267B)
1 # This Source Code Form is subject to the terms of the Mozilla Public 2 # License, v. 2.0. If a copy of the MPL was not distributed with this 3 # file, You can obtain one at http://mozilla.org/MPL/2.0/. 4 5 import os 6 from contextlib import contextmanager 7 8 from marionette_driver import Wait 9 from marionette_harness import MarionetteTestCase 10 11 12 class QuotaTestCase(MarionetteTestCase): 13 def executeAsyncScript(self, script, script_args): 14 res = self.marionette.execute_async_script( 15 """ 16 const resolve = arguments[arguments.length - 1]; 17 18 class RequestError extends Error { 19 constructor(resultCode, resultName) { 20 super(`Request failed (code: ${resultCode}, name: ${resultName})`); 21 this.name = "RequestError"; 22 this.resultCode = resultCode; 23 this.resultName = resultName; 24 } 25 } 26 27 async function requestFinished(request) { 28 await new Promise(function (resolve) { 29 request.callback = function () { 30 resolve(); 31 }; 32 }); 33 34 if (request.resultCode !== Cr.NS_OK) { 35 throw new RequestError(request.resultCode, request.resultName); 36 } 37 38 return request.result; 39 } 40 """ 41 + script 42 + """ 43 main() 44 .then(function(result) { 45 resolve(result); 46 }) 47 .catch(function() { 48 resolve(null); 49 });; 50 """, 51 script_args=script_args, 52 new_sandbox=False, 53 ) 54 55 assert res is not None 56 return res 57 58 def ensureInvariantHolds(self, op): 59 maxWaitTime = 60 60 Wait(self.marionette, timeout=maxWaitTime).until( 61 op, 62 message=f"operation did not yield success even after waiting {maxWaitTime}s time", 63 ) 64 65 def findDirObj(self, path, pattern, isFile): 66 for obj in os.scandir(path): 67 if obj.path.endswith(pattern) and (obj.is_file() == isFile): 68 return obj.path 69 return None 70 71 def getFullOriginMetadata(self, persistenceType, origin): 72 with self.marionette.using_context(self.marionette.CONTEXT_CHROME): 73 res = self.executeAsyncScript( 74 """ 75 const [persistenceType, origin] = arguments; 76 77 async function main() { 78 const principal = Services.scriptSecurityManager. 79 createContentPrincipalFromOrigin(origin); 80 81 const request = Services.qms.getFullOriginMetadata( 82 persistenceType, principal); 83 const metadata = await requestFinished(request); 84 85 return metadata; 86 } 87 """, 88 script_args=(persistenceType, origin), 89 ) 90 91 assert res is not None 92 return res 93 94 def getStoragePath(self, profilePath, origin, persistenceType, client): 95 fullOriginMetadata = self.getFullOriginMetadata(persistenceType, origin) 96 97 storageOrigin = fullOriginMetadata["storageOrigin"] 98 sanitizedStorageOrigin = storageOrigin.replace(":", "+").replace("/", "+") 99 100 return os.path.join( 101 self.getRepositoryPath(persistenceType), sanitizedStorageOrigin, client 102 ) 103 104 def getRepositoryPath(self, persistenceType): 105 profilePath = self.marionette.instance.profile.profile 106 assert profilePath is not None 107 108 return os.path.join(profilePath, "storage", persistenceType) 109 110 def initStorage(self): 111 with self.marionette.using_context(self.marionette.CONTEXT_CHROME): 112 return self.executeAsyncScript( 113 """ 114 async function main() { 115 let req = Services.qms.init(); 116 await requestFinished(req); 117 118 return true; 119 } 120 """, 121 script_args=(), 122 ) 123 124 def initTemporaryOrigin(self, persistenceType, origin, createIfNonExistent=True): 125 with self.marionette.using_context(self.marionette.CONTEXT_CHROME): 126 return self.executeAsyncScript( 127 """ 128 const [persistenceType, origin, createIfNonExistent] = arguments; 129 async function main() { 130 const principal = Services.scriptSecurityManager. 131 createContentPrincipalFromOrigin(origin); 132 133 let req = Services.qms.initializeTemporaryOrigin(persistenceType, principal, createIfNonExistent); 134 await requestFinished(req) 135 136 return true; 137 } 138 """, 139 script_args=( 140 persistenceType, 141 origin, 142 createIfNonExistent, 143 ), 144 ) 145 146 def initTemporaryStorage(self): 147 with self.marionette.using_context(self.marionette.CONTEXT_CHROME): 148 return self.executeAsyncScript( 149 """ 150 async function main() { 151 const req = Services.qms.initTemporaryStorage(); 152 await requestFinished(req); 153 154 return true; 155 } 156 """, 157 script_args=(), 158 ) 159 160 def resetStoragesForClient(self, persistenceType, origin, client): 161 # This method is used to force sqlite to write journal file contents to 162 # main sqlite database file 163 164 with self.marionette.using_context(self.marionette.CONTEXT_CHROME): 165 res = self.executeAsyncScript( 166 """ 167 const [persistenceType, origin, client] = arguments; 168 169 async function main() { 170 const principal = Services.scriptSecurityManager. 171 createContentPrincipalFromOrigin(origin); 172 173 const request = Services.qms.resetStoragesForClient(principal, client, persistenceType); 174 await requestFinished(request); 175 176 return true; 177 } 178 """, 179 script_args=(persistenceType, origin, client), 180 ) 181 182 assert res is not None 183 return res 184 185 @contextmanager 186 def using_new_window(self, path, private=False, skipCleanup=False): 187 """ 188 This helper method is created to ensure that a temporary 189 window required inside the test scope is lifetime'd properly 190 """ 191 192 oldWindow = self.marionette.current_window_handle 193 try: 194 newWindow = self.marionette.open(type="window", private=private) 195 self.marionette.switch_to_window(newWindow["handle"]) 196 self.marionette.navigate(self.marionette.absolute_url(path)) 197 origin = self.marionette.absolute_url("")[:-1] 198 if private: 199 origin += "^privateBrowsingId=1" 200 201 yield (origin, "private" if private else "default") 202 203 finally: 204 if not skipCleanup: 205 self.marionette.close() 206 self.marionette.switch_to_window(oldWindow)