tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

test_copier.py (19480B)


      1 # This Source Code Form is subject to the terms of the Mozilla Public
      2 # License, v. 2.0. If a copy of the MPL was not distributed with this
      3 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
      4 
      5 import os
      6 import stat
      7 import unittest
      8 
      9 import mozunit
     10 
     11 import mozpack.path as mozpath
     12 from mozpack.copier import FileCopier, FileRegistry, FileRegistrySubtree, Jarrer
     13 from mozpack.errors import ErrorMessage
     14 from mozpack.files import ExistingFile, GeneratedFile
     15 from mozpack.mozjar import JarReader
     16 from mozpack.test.test_files import MatchTestTemplate, MockDest, TestWithTmpDir
     17 
     18 
     19 class BaseTestFileRegistry(MatchTestTemplate):
     20    def add(self, path):
     21        self.registry.add(path, GeneratedFile(path))
     22 
     23    def do_check(self, pattern, result):
     24        self.checked = True
     25        if result:
     26            self.assertTrue(self.registry.contains(pattern))
     27        else:
     28            self.assertFalse(self.registry.contains(pattern))
     29        self.assertEqual(self.registry.match(pattern), result)
     30 
     31    def do_test_file_registry(self, registry):
     32        self.registry = registry
     33        self.registry.add("foo", GeneratedFile(b"foo"))
     34        bar = GeneratedFile(b"bar")
     35        self.registry.add("bar", bar)
     36        self.assertEqual(self.registry.paths(), ["foo", "bar"])
     37        self.assertEqual(self.registry["bar"], bar)
     38 
     39        self.assertRaises(
     40            ErrorMessage, self.registry.add, "foo", GeneratedFile(b"foo2")
     41        )
     42 
     43        self.assertRaises(ErrorMessage, self.registry.remove, "qux")
     44 
     45        self.assertRaises(
     46            ErrorMessage, self.registry.add, "foo/bar", GeneratedFile(b"foobar")
     47        )
     48        self.assertRaises(
     49            ErrorMessage, self.registry.add, "foo/bar/baz", GeneratedFile(b"foobar")
     50        )
     51 
     52        self.assertEqual(self.registry.paths(), ["foo", "bar"])
     53 
     54        self.registry.remove("foo")
     55        self.assertEqual(self.registry.paths(), ["bar"])
     56        self.registry.remove("bar")
     57        self.assertEqual(self.registry.paths(), [])
     58 
     59        self.prepare_match_test()
     60        self.do_match_test()
     61        self.assertTrue(self.checked)
     62        self.assertEqual(
     63            self.registry.paths(),
     64            [
     65                "bar",
     66                "foo/bar",
     67                "foo/baz",
     68                "foo/qux/1",
     69                "foo/qux/bar",
     70                "foo/qux/2/test",
     71                "foo/qux/2/test2",
     72            ],
     73        )
     74 
     75        self.registry.remove("foo/qux")
     76        self.assertEqual(self.registry.paths(), ["bar", "foo/bar", "foo/baz"])
     77 
     78        self.registry.add("foo/qux", GeneratedFile(b"fooqux"))
     79        self.assertEqual(
     80            self.registry.paths(), ["bar", "foo/bar", "foo/baz", "foo/qux"]
     81        )
     82        self.registry.remove("foo/b*")
     83        self.assertEqual(self.registry.paths(), ["bar", "foo/qux"])
     84 
     85        self.assertEqual([f for f, c in self.registry], ["bar", "foo/qux"])
     86        self.assertEqual(len(self.registry), 2)
     87 
     88        self.add("foo/.foo")
     89        self.assertTrue(self.registry.contains("foo/.foo"))
     90 
     91    def do_test_registry_paths(self, registry):
     92        self.registry = registry
     93 
     94        # Can't add a file if it requires a directory in place of a
     95        # file we also require.
     96        self.registry.add("foo", GeneratedFile(b"foo"))
     97        self.assertRaises(
     98            ErrorMessage, self.registry.add, "foo/bar", GeneratedFile(b"foobar")
     99        )
    100 
    101        # Can't add a file if we already have a directory there.
    102        self.registry.add("bar/baz", GeneratedFile(b"barbaz"))
    103        self.assertRaises(ErrorMessage, self.registry.add, "bar", GeneratedFile(b"bar"))
    104 
    105        # Bump the count of things that require bar/ to 2.
    106        self.registry.add("bar/zot", GeneratedFile(b"barzot"))
    107        self.assertRaises(ErrorMessage, self.registry.add, "bar", GeneratedFile(b"bar"))
    108 
    109        # Drop the count of things that require bar/ to 1.
    110        self.registry.remove("bar/baz")
    111        self.assertRaises(ErrorMessage, self.registry.add, "bar", GeneratedFile(b"bar"))
    112 
    113        # Drop the count of things that require bar/ to 0.
    114        self.registry.remove("bar/zot")
    115        self.registry.add("bar/zot", GeneratedFile(b"barzot"))
    116 
    117 
    118 class TestFileRegistry(BaseTestFileRegistry, unittest.TestCase):
    119    def test_partial_paths(self):
    120        cases = {
    121            "foo/bar/baz/zot": ["foo/bar/baz", "foo/bar", "foo"],
    122            "foo/bar": ["foo"],
    123            "bar": [],
    124        }
    125        reg = FileRegistry()
    126        for path, parts in cases.items():
    127            self.assertEqual(reg._partial_paths(path), parts)
    128 
    129    def test_file_registry(self):
    130        self.do_test_file_registry(FileRegistry())
    131 
    132    def test_registry_paths(self):
    133        self.do_test_registry_paths(FileRegistry())
    134 
    135    def test_required_directories(self):
    136        self.registry = FileRegistry()
    137 
    138        self.registry.add("foo", GeneratedFile(b"foo"))
    139        self.assertEqual(self.registry.required_directories(), set())
    140 
    141        self.registry.add("bar/baz", GeneratedFile(b"barbaz"))
    142        self.assertEqual(self.registry.required_directories(), {"bar"})
    143 
    144        self.registry.add("bar/zot", GeneratedFile(b"barzot"))
    145        self.assertEqual(self.registry.required_directories(), {"bar"})
    146 
    147        self.registry.add("bar/zap/zot", GeneratedFile(b"barzapzot"))
    148        self.assertEqual(self.registry.required_directories(), {"bar", "bar/zap"})
    149 
    150        self.registry.remove("bar/zap/zot")
    151        self.assertEqual(self.registry.required_directories(), {"bar"})
    152 
    153        self.registry.remove("bar/baz")
    154        self.assertEqual(self.registry.required_directories(), {"bar"})
    155 
    156        self.registry.remove("bar/zot")
    157        self.assertEqual(self.registry.required_directories(), set())
    158 
    159        self.registry.add("x/y/z", GeneratedFile(b"xyz"))
    160        self.assertEqual(self.registry.required_directories(), {"x", "x/y"})
    161 
    162 
    163 class TestFileRegistrySubtree(BaseTestFileRegistry, unittest.TestCase):
    164    def test_file_registry_subtree_base(self):
    165        registry = FileRegistry()
    166        self.assertEqual(registry, FileRegistrySubtree("", registry))
    167        self.assertNotEqual(registry, FileRegistrySubtree("base", registry))
    168 
    169    def create_registry(self):
    170        registry = FileRegistry()
    171        registry.add("foo/bar", GeneratedFile(b"foo/bar"))
    172        registry.add("baz/qux", GeneratedFile(b"baz/qux"))
    173        return FileRegistrySubtree("base/root", registry)
    174 
    175    def test_file_registry_subtree(self):
    176        self.do_test_file_registry(self.create_registry())
    177 
    178    def test_registry_paths_subtree(self):
    179        FileRegistry()
    180        self.do_test_registry_paths(self.create_registry())
    181 
    182 
    183 class TestFileCopier(TestWithTmpDir):
    184    def all_dirs(self, base):
    185        all_dirs = set()
    186        for root, dirs, files in os.walk(base):
    187            if not dirs:
    188                all_dirs.add(mozpath.relpath(root, base))
    189        return all_dirs
    190 
    191    def all_files(self, base):
    192        all_files = set()
    193        for root, dirs, files in os.walk(base):
    194            for f in files:
    195                all_files.add(mozpath.join(mozpath.relpath(root, base), f))
    196        return all_files
    197 
    198    def test_file_copier(self):
    199        copier = FileCopier()
    200        copier.add("foo/bar", GeneratedFile(b"foobar"))
    201        copier.add("foo/qux", GeneratedFile(b"fooqux"))
    202        copier.add("foo/deep/nested/directory/file", GeneratedFile(b"fooz"))
    203        copier.add("bar", GeneratedFile(b"bar"))
    204        copier.add("qux/foo", GeneratedFile(b"quxfoo"))
    205        copier.add("qux/bar", GeneratedFile(b""))
    206 
    207        result = copier.copy(self.tmpdir)
    208        self.assertEqual(self.all_files(self.tmpdir), set(copier.paths()))
    209        self.assertEqual(
    210            self.all_dirs(self.tmpdir), set(["foo/deep/nested/directory", "qux"])
    211        )
    212 
    213        self.assertEqual(
    214            result.updated_files,
    215            set(self.tmppath(p) for p in self.all_files(self.tmpdir)),
    216        )
    217        self.assertEqual(result.existing_files, set())
    218        self.assertEqual(result.removed_files, set())
    219        self.assertEqual(result.removed_directories, set())
    220 
    221        copier.remove("foo")
    222        copier.add("test", GeneratedFile(b"test"))
    223        result = copier.copy(self.tmpdir)
    224        self.assertEqual(self.all_files(self.tmpdir), set(copier.paths()))
    225        self.assertEqual(self.all_dirs(self.tmpdir), set(["qux"]))
    226        self.assertEqual(
    227            result.removed_files,
    228            set(
    229                self.tmppath(p)
    230                for p in ("foo/bar", "foo/qux", "foo/deep/nested/directory/file")
    231            ),
    232        )
    233 
    234    def test_symlink_directory_replaced(self):
    235        """Directory symlinks in destination are replaced if they need to be
    236        real directories."""
    237        if not self.symlink_supported:
    238            return
    239 
    240        dest = self.tmppath("dest")
    241 
    242        copier = FileCopier()
    243        copier.add("foo/bar/baz", GeneratedFile(b"foobarbaz"))
    244 
    245        os.makedirs(self.tmppath("dest/foo"))
    246        dummy = self.tmppath("dummy")
    247        os.mkdir(dummy)
    248        link = self.tmppath("dest/foo/bar")
    249        os.symlink(dummy, link)
    250 
    251        result = copier.copy(dest)
    252 
    253        st = os.lstat(link)
    254        self.assertFalse(stat.S_ISLNK(st.st_mode))
    255        self.assertTrue(stat.S_ISDIR(st.st_mode))
    256 
    257        self.assertEqual(self.all_files(dest), set(copier.paths()))
    258 
    259        self.assertEqual(result.removed_directories, set())
    260        self.assertEqual(len(result.updated_files), 1)
    261 
    262    def test_remove_unaccounted_directory_symlinks(self):
    263        """Directory symlinks in destination that are not in the way are
    264        deleted according to remove_unaccounted and
    265        remove_all_directory_symlinks.
    266        """
    267        if not self.symlink_supported:
    268            return
    269 
    270        dest = self.tmppath("dest")
    271 
    272        copier = FileCopier()
    273        copier.add("foo/bar/baz", GeneratedFile(b"foobarbaz"))
    274 
    275        os.makedirs(self.tmppath("dest/foo"))
    276        dummy = self.tmppath("dummy")
    277        os.mkdir(dummy)
    278 
    279        os.mkdir(self.tmppath("dest/zot"))
    280        link = self.tmppath("dest/zot/zap")
    281        os.symlink(dummy, link)
    282 
    283        # If not remove_unaccounted but remove_empty_directories, then
    284        # the symlinked directory remains (as does its containing
    285        # directory).
    286        result = copier.copy(
    287            dest,
    288            remove_unaccounted=False,
    289            remove_empty_directories=True,
    290            remove_all_directory_symlinks=False,
    291        )
    292 
    293        st = os.lstat(link)
    294        self.assertTrue(stat.S_ISLNK(st.st_mode))
    295        self.assertFalse(stat.S_ISDIR(st.st_mode))
    296 
    297        self.assertEqual(self.all_files(dest), set(copier.paths()))
    298        self.assertEqual(self.all_dirs(dest), set(["foo/bar"]))
    299 
    300        self.assertEqual(result.removed_directories, set())
    301        self.assertEqual(len(result.updated_files), 1)
    302 
    303        # If remove_unaccounted but not remove_empty_directories, then
    304        # only the symlinked directory is removed.
    305        result = copier.copy(
    306            dest,
    307            remove_unaccounted=True,
    308            remove_empty_directories=False,
    309            remove_all_directory_symlinks=False,
    310        )
    311 
    312        st = os.lstat(self.tmppath("dest/zot"))
    313        self.assertFalse(stat.S_ISLNK(st.st_mode))
    314        self.assertTrue(stat.S_ISDIR(st.st_mode))
    315 
    316        self.assertEqual(result.removed_files, set([link]))
    317        self.assertEqual(result.removed_directories, set())
    318 
    319        self.assertEqual(self.all_files(dest), set(copier.paths()))
    320        self.assertEqual(self.all_dirs(dest), set(["foo/bar", "zot"]))
    321 
    322        # If remove_unaccounted and remove_empty_directories, then
    323        # both the symlink and its containing directory are removed.
    324        link = self.tmppath("dest/zot/zap")
    325        os.symlink(dummy, link)
    326 
    327        result = copier.copy(
    328            dest,
    329            remove_unaccounted=True,
    330            remove_empty_directories=True,
    331            remove_all_directory_symlinks=False,
    332        )
    333 
    334        self.assertEqual(result.removed_files, set([link]))
    335        self.assertEqual(result.removed_directories, set([self.tmppath("dest/zot")]))
    336 
    337        self.assertEqual(self.all_files(dest), set(copier.paths()))
    338        self.assertEqual(self.all_dirs(dest), set(["foo/bar"]))
    339 
    340    def test_permissions(self):
    341        """Ensure files without write permission can be deleted."""
    342        with open(self.tmppath("dummy"), "a"):
    343            pass
    344 
    345        p = self.tmppath("no_perms")
    346        with open(p, "a"):
    347            pass
    348 
    349        # Make file and directory unwritable. Reminder: making a directory
    350        # unwritable prevents modifications (including deletes) from the list
    351        # of files in that directory.
    352        os.chmod(p, 0o400)
    353        os.chmod(self.tmpdir, 0o400)
    354 
    355        copier = FileCopier()
    356        copier.add("dummy", GeneratedFile(b"content"))
    357        result = copier.copy(self.tmpdir)
    358        self.assertEqual(result.removed_files_count, 1)
    359        self.assertFalse(os.path.exists(p))
    360 
    361    def test_no_remove(self):
    362        copier = FileCopier()
    363        copier.add("foo", GeneratedFile(b"foo"))
    364 
    365        with open(self.tmppath("bar"), "a"):
    366            pass
    367 
    368        os.mkdir(self.tmppath("emptydir"))
    369        d = self.tmppath("populateddir")
    370        os.mkdir(d)
    371 
    372        with open(self.tmppath("populateddir/foo"), "a"):
    373            pass
    374 
    375        result = copier.copy(self.tmpdir, remove_unaccounted=False)
    376 
    377        self.assertEqual(
    378            self.all_files(self.tmpdir), set(["foo", "bar", "populateddir/foo"])
    379        )
    380        self.assertEqual(self.all_dirs(self.tmpdir), set(["populateddir"]))
    381        self.assertEqual(result.removed_files, set())
    382        self.assertEqual(result.removed_directories, set([self.tmppath("emptydir")]))
    383 
    384    def test_no_remove_empty_directories(self):
    385        copier = FileCopier()
    386        copier.add("foo", GeneratedFile(b"foo"))
    387 
    388        with open(self.tmppath("bar"), "a"):
    389            pass
    390 
    391        os.mkdir(self.tmppath("emptydir"))
    392        d = self.tmppath("populateddir")
    393        os.mkdir(d)
    394 
    395        with open(self.tmppath("populateddir/foo"), "a"):
    396            pass
    397 
    398        result = copier.copy(
    399            self.tmpdir, remove_unaccounted=False, remove_empty_directories=False
    400        )
    401 
    402        self.assertEqual(
    403            self.all_files(self.tmpdir), set(["foo", "bar", "populateddir/foo"])
    404        )
    405        self.assertEqual(self.all_dirs(self.tmpdir), set(["emptydir", "populateddir"]))
    406        self.assertEqual(result.removed_files, set())
    407        self.assertEqual(result.removed_directories, set())
    408 
    409    def test_optional_exists_creates_unneeded_directory(self):
    410        """Demonstrate that a directory not strictly required, but specified
    411        as the path to an optional file, will be unnecessarily created.
    412 
    413        This behaviour is wrong; fixing it is tracked by Bug 972432;
    414        and this test exists to guard against unexpected changes in
    415        behaviour.
    416        """
    417 
    418        dest = self.tmppath("dest")
    419 
    420        copier = FileCopier()
    421        copier.add("foo/bar", ExistingFile(required=False))
    422 
    423        result = copier.copy(dest)
    424 
    425        st = os.lstat(self.tmppath("dest/foo"))
    426        self.assertFalse(stat.S_ISLNK(st.st_mode))
    427        self.assertTrue(stat.S_ISDIR(st.st_mode))
    428 
    429        # What's worse, we have no record that dest was created.
    430        self.assertEqual(len(result.updated_files), 0)
    431 
    432        # But we do have an erroneous record of an optional file
    433        # existing when it does not.
    434        self.assertIn(self.tmppath("dest/foo/bar"), result.existing_files)
    435 
    436    def test_remove_unaccounted_file_registry(self):
    437        """Test FileCopier.copy(remove_unaccounted=FileRegistry())"""
    438 
    439        dest = self.tmppath("dest")
    440 
    441        copier = FileCopier()
    442        copier.add("foo/bar/baz", GeneratedFile(b"foobarbaz"))
    443        copier.add("foo/bar/qux", GeneratedFile(b"foobarqux"))
    444        copier.add("foo/hoge/fuga", GeneratedFile(b"foohogefuga"))
    445        copier.add("foo/toto/tata", GeneratedFile(b"footototata"))
    446 
    447        os.makedirs(os.path.join(dest, "bar"))
    448        with open(os.path.join(dest, "bar", "bar"), "w") as fh:
    449            fh.write("barbar")
    450        os.makedirs(os.path.join(dest, "foo", "toto"))
    451        with open(os.path.join(dest, "foo", "toto", "toto"), "w") as fh:
    452            fh.write("foototototo")
    453 
    454        result = copier.copy(dest, remove_unaccounted=False)
    455 
    456        self.assertEqual(
    457            self.all_files(dest), set(copier.paths()) | {"foo/toto/toto", "bar/bar"}
    458        )
    459        self.assertEqual(
    460            self.all_dirs(dest), {"foo/bar", "foo/hoge", "foo/toto", "bar"}
    461        )
    462 
    463        copier2 = FileCopier()
    464        copier2.add("foo/hoge/fuga", GeneratedFile(b"foohogefuga"))
    465 
    466        # We expect only files copied from the first copier to be removed,
    467        # not the extra file that was there beforehand.
    468        result = copier2.copy(dest, remove_unaccounted=copier)
    469 
    470        self.assertEqual(
    471            self.all_files(dest), set(copier2.paths()) | {"foo/toto/toto", "bar/bar"}
    472        )
    473        self.assertEqual(self.all_dirs(dest), {"foo/hoge", "foo/toto", "bar"})
    474        self.assertEqual(result.updated_files, {self.tmppath("dest/foo/hoge/fuga")})
    475        self.assertEqual(result.existing_files, set())
    476        self.assertEqual(
    477            result.removed_files,
    478            {
    479                self.tmppath(p)
    480                for p in ("dest/foo/bar/baz", "dest/foo/bar/qux", "dest/foo/toto/tata")
    481            },
    482        )
    483        self.assertEqual(result.removed_directories, {self.tmppath("dest/foo/bar")})
    484 
    485 
    486 class TestJarrer(unittest.TestCase):
    487    def check_jar(self, dest, copier):
    488        jar = JarReader(fileobj=dest)
    489        self.assertEqual([f.filename for f in jar], copier.paths())
    490        for f in jar:
    491            self.assertEqual(f.uncompressed_data.read(), copier[f.filename].content)
    492 
    493    def test_jarrer(self):
    494        copier = Jarrer()
    495        copier.add("foo/bar", GeneratedFile(b"foobar"))
    496        copier.add("foo/qux", GeneratedFile(b"fooqux"))
    497        copier.add("foo/deep/nested/directory/file", GeneratedFile(b"fooz"))
    498        copier.add("bar", GeneratedFile(b"bar"))
    499        copier.add("qux/foo", GeneratedFile(b"quxfoo"))
    500        copier.add("qux/bar", GeneratedFile(b""))
    501 
    502        dest = MockDest()
    503        copier.copy(dest)
    504        self.check_jar(dest, copier)
    505 
    506        copier.remove("foo")
    507        copier.add("test", GeneratedFile(b"test"))
    508        copier.copy(dest)
    509        self.check_jar(dest, copier)
    510 
    511        copier.remove("test")
    512        copier.add("test", GeneratedFile(b"replaced-content"))
    513        copier.copy(dest)
    514        self.check_jar(dest, copier)
    515 
    516        copier.copy(dest)
    517        self.check_jar(dest, copier)
    518 
    519        preloaded = ["qux/bar", "bar"]
    520        copier.preload(preloaded)
    521        copier.copy(dest)
    522 
    523        dest.seek(0)
    524        jar = JarReader(fileobj=dest)
    525        self.assertEqual(
    526            [f.filename for f in jar],
    527            preloaded + [p for p in copier.paths() if p not in preloaded],
    528        )
    529        self.assertEqual(jar.last_preloaded, preloaded[-1])
    530 
    531    def test_jarrer_compress(self):
    532        copier = Jarrer()
    533        copier.add("foo/bar", GeneratedFile(b"ffffff"))
    534        copier.add("foo/qux", GeneratedFile(b"ffffff"), compress=False)
    535 
    536        dest = MockDest()
    537        copier.copy(dest)
    538        self.check_jar(dest, copier)
    539 
    540        dest.seek(0)
    541        jar = JarReader(fileobj=dest)
    542        self.assertTrue(jar["foo/bar"].compressed)
    543        self.assertFalse(jar["foo/qux"].compressed)
    544 
    545 
    546 if __name__ == "__main__":
    547    mozunit.main()