tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

test_cacheapi_encryption_PBM.py (6795B)


      1 # This Source Code Form is subject to the terms of the Mozilla Public
      2 # License, v. 2.0. If a copy of the MPL was not distributed with this
      3 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
      4 
      5 import os
      6 import re
      7 import sys
      8 from pathlib import Path
      9 
     10 sys.path.append(os.fspath(Path(__file__).parents[3] / "quota/test/marionette"))
     11 
     12 from quota_test_case import QuotaTestCase
     13 
     14 CACHEAPI_PBM_PREF = "dom.cache.privateBrowsing.enabled"
     15 QM_TESTING_PREF = "dom.quotaManager.testing"
     16 
     17 
     18 class CacheAPIEncryptionPBM(QuotaTestCase):
     19    """
     20    Bug1856953: Ensure CacheAPI data gets encrypted in Private Browsing Mode.
     21    We need to ensure data inside both sqlite fields and request/response files
     22    gets encrypted
     23    """
     24 
     25    def setUp(self):
     26        super().setUp()
     27 
     28        self.testHTML = "dom/cache/basicCacheAPI_PBM.html"
     29        self.cacheName = "CachePBMTest"
     30        self.profilePath = self.marionette.instance.profile.profile
     31        self.cacheAPIStoragePath = None
     32 
     33        self.defaultCacheAPIPBMPref = self.marionette.get_pref(CACHEAPI_PBM_PREF)
     34        self.marionette.set_pref(CACHEAPI_PBM_PREF, True)
     35 
     36        self.defaultQMPrefValue = self.marionette.get_pref(QM_TESTING_PREF)
     37        self.marionette.set_pref(QM_TESTING_PREF, True)
     38 
     39        self.cacheRequestStr = "https://example.com/"
     40        self.cacheResponseStr = "CacheAPIEncryptionPBM"
     41 
     42        self.cacheDBFileName = "caches.sqlite"
     43        self.cacheDBJournalFileName = "caches.sqlite-wal"
     44 
     45        self.dbCheckpointThresholdBytes = 512 * 1024
     46 
     47    def tearDown(self):
     48        super().tearDown()
     49 
     50        self.marionette.set_pref(CACHEAPI_PBM_PREF, self.defaultCacheAPIPBMPref)
     51        self.marionette.set_pref(QM_TESTING_PREF, self.defaultQMPrefValue)
     52 
     53    def test_request_response_ondisk(self):
     54        with self.using_new_window(self.testHTML, private=False) as (
     55            self.origin,
     56            self.persistenceType,
     57        ):
     58            self.runAndValidate(
     59                lambda exists: self.assertTrue(
     60                    exists, "Failed to find expected data on disk"
     61                )
     62            )
     63 
     64    def test_encrypted_request_response_ondisk(self):
     65        with self.using_new_window(self.testHTML, private=True) as (
     66            self.origin,
     67            self.persistenceType,
     68        ):
     69            self.runAndValidate(
     70                lambda exists: self.assertFalse(exists, "Data on disk is not encrypted")
     71            )
     72 
     73    def runAndValidate(self, validator):
     74        self.marionette.execute_async_script(
     75            """
     76                const [name, requestStr, responseStr, resolve] = arguments;
     77 
     78                const request = new Request(requestStr);
     79                const response = new Response(responseStr);
     80                window.wrappedJSObject.addDataIntoCache(name, request, response)
     81                                    .then(resolve);
     82            """,
     83            script_args=(
     84                self.cacheName,
     85                self.cacheRequestStr,
     86                self.cacheResponseStr,
     87            ),
     88        )
     89 
     90        self.ensureInvariantHolds(
     91            lambda _: os.path.exists(self.getCacheAPIStoragePath())
     92        )
     93 
     94        self.validateSqlite(validator)
     95        self.validateBodyFile(validator)
     96 
     97    def validateBodyFile(self, validator):
     98        # Ensure response bodies have been flushed to the disk
     99        self.ensureInvariantHolds(
    100            lambda _: self.findDirObj(self.getCacheAPIStoragePath(), "morgue", False)
    101            is not None
    102        )
    103 
    104        cacheResponseDir = self.findDirObj(
    105            self.getCacheAPIStoragePath(), "morgue", False
    106        )
    107 
    108        self.ensureInvariantHolds(lambda _: any(os.listdir(cacheResponseDir)))
    109 
    110        # Get response bodies directory corresponding to the cache 'self.CacheName' since, there's
    111        # only one cache object in this origin, it must be the first one.
    112        cacheResponseBodiesPath = [
    113            d for d in Path(cacheResponseDir).iterdir() if d.is_dir()
    114        ][0]
    115 
    116        # Ensure bodies have been transferred to '.final' from '.tmp'
    117        self.ensureInvariantHolds(
    118            lambda _: self.findDirObj(cacheResponseBodiesPath, ".final", True)
    119            is not None
    120        )
    121        cacheResponseBodyPath = self.findDirObj(cacheResponseBodiesPath, ".final", True)
    122 
    123        # Since a cache response would get compressed using snappy; and an unencrypted response would
    124        # contain 'sNaPpY' as a compression header in the response body file. Check to ensure that
    125        # 'sNaPpy' does not exist if bodies are getting encrypted.
    126        foundRawValue = False
    127        with open(cacheResponseBodyPath, "rb") as f_binary:
    128            foundRawValue = re.search(b"sNaPpY", f_binary.read()) is not None
    129 
    130        validator(foundRawValue)
    131 
    132    def validateSqlite(self, validator):
    133        self.ensureInvariantHolds(
    134            lambda _: self.findDirObj(
    135                self.getCacheAPIStoragePath(), self.cacheDBJournalFileName, True
    136            )
    137            is not None
    138        )
    139        dbJournalFile = self.findDirObj(
    140            self.getCacheAPIStoragePath(), self.cacheDBJournalFileName, True
    141        )
    142 
    143        self.ensureInvariantHolds(
    144            lambda _: self.findDirObj(
    145                self.getCacheAPIStoragePath(), self.cacheDBFileName, True
    146            )
    147            is not None
    148        )
    149        dbFile = self.findDirObj(
    150            self.getCacheAPIStoragePath(), self.cacheDBFileName, True
    151        )
    152 
    153        # Confirm journal file size is less than 512KB which ensures that checkpoint
    154        # has not happend yet (dom/cache/DBSchema.cpp::InitializeConnection, kWalAutoCheckpointPages)
    155        self.assertTrue(
    156            os.path.getsize(dbJournalFile) < self.dbCheckpointThresholdBytes
    157        )
    158 
    159        # Before checkpointing, journal file size should be greater than main sqlite db file.
    160        self.assertTrue(os.path.getsize(dbJournalFile) > os.path.getsize(dbFile))
    161 
    162        validator(
    163            self.cacheRequestStr.encode("ascii") in open(dbJournalFile, "rb").read()
    164        )
    165 
    166        self.assertTrue(
    167            self.resetStoragesForClient(self.persistenceType, self.origin, "cache")
    168        )
    169 
    170        self.assertFalse(os.path.getsize(dbJournalFile) > os.path.getsize(dbFile))
    171 
    172        validator(self.cacheRequestStr.encode("ascii") in open(dbFile, "rb").read())
    173 
    174    def getCacheAPIStoragePath(self):
    175        if self.cacheAPIStoragePath is not None:
    176            return self.cacheAPIStoragePath
    177 
    178        assert self.origin is not None
    179        assert self.persistenceType is not None
    180 
    181        self.cacheAPIStoragePath = self.getStoragePath(
    182            self.profilePath, self.origin, self.persistenceType, "cache"
    183        )
    184 
    185        print("cacheAPI origin directory = " + self.cacheAPIStoragePath)
    186        return self.cacheAPIStoragePath