tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

vcs.py (11796B)


      1 import abc
      2 import os
      3 import stat
      4 from collections import deque
      5 from os import stat_result
      6 from typing import (Any, Dict, Iterable, Iterator, List, MutableMapping, Optional, Set, Text, Tuple,
      7                    TYPE_CHECKING)
      8 
      9 from . import jsonlib
     10 from .utils import git
     11 
     12 # Cannot do `from ..gitignore import gitignore` because
     13 # relative import beyond toplevel throws *ImportError*!
     14 from gitignore import gitignore  # type: ignore
     15 
     16 
     17 if TYPE_CHECKING:
     18    from .manifest import Manifest  # avoid cyclic import
     19 
     20 GitIgnoreCacheType = MutableMapping[bytes, bool]
     21 
     22 
     23 def get_tree(tests_root: Text,
     24             manifest: "Manifest",
     25             manifest_path: Optional[Text],
     26             cache_root: Optional[Text],
     27             paths_to_update: Optional[List[Text]],
     28             working_copy: bool = True,
     29             rebuild: bool = False) -> "FileSystem":
     30    tree = None
     31    if cache_root is None:
     32        cache_root = os.path.join(tests_root, ".wptcache")
     33    if not os.path.exists(cache_root):
     34        try:
     35            os.makedirs(cache_root)
     36        except OSError:
     37            cache_root = None
     38 
     39    if not working_copy:
     40        raise ValueError("working_copy=False unsupported")
     41 
     42    if tree is None:
     43        tree = FileSystem(tests_root,
     44                          manifest.url_base,
     45                          manifest_path=manifest_path,
     46                          cache_path=cache_root,
     47                          paths_to_update=paths_to_update,
     48                          rebuild=rebuild,
     49                          )
     50    return tree
     51 
     52 
     53 class GitHasher:
     54    def __init__(self, path: Text) -> None:
     55        self.git = git(path)
     56 
     57    def _local_changes(self) -> Set[Text]:
     58        """get a set of files which have changed between HEAD and working copy"""
     59        assert self.git is not None
     60        # note that git runs the command with tests_root as the cwd, which may
     61        # not be the root of the git repo (e.g., within a browser repo)
     62        #
     63        # `git diff-index --relative` without a path still compares all tracked
     64        # files before non-WPT files are filtered out, which can be slow in
     65        # vendor repos. Explicitly pass the CWD (i.e., `tests_root`) as a path
     66        # argument to avoid unnecessary diffing.
     67        cmd = ["diff-index", "--relative", "--no-renames", "--name-only", "-z", "HEAD", os.curdir]
     68        data = self.git(*cmd)
     69        return set(data.split("\0"))
     70 
     71    def hash_cache(self) -> Dict[Text, Optional[Text]]:
     72        """
     73        A dict of rel_path -> current git object id if the working tree matches HEAD else None
     74        """
     75        hash_cache: Dict[Text, Optional[Text]] = {}
     76 
     77        if self.git is None:
     78            return hash_cache
     79 
     80        # note that git runs the command with tests_root as the cwd, which may
     81        # not be the root of the git repo (e.g., within a browser repo)
     82        cmd = ["ls-tree", "-r", "-z", "HEAD"]
     83        local_changes = self._local_changes()
     84        for result in self.git(*cmd).split("\0")[:-1]:  # type: Text
     85            data, rel_path = result.rsplit("\t", 1)
     86            hash_cache[rel_path] = None if rel_path in local_changes else data.split(" ", 3)[2]
     87 
     88        return hash_cache
     89 
     90 
     91 
     92 class FileSystem:
     93    def __init__(self,
     94                 tests_root: Text,
     95                 url_base: Text,
     96                 cache_path: Optional[Text],
     97                 paths_to_update: Optional[List[Text]] = None,
     98                 manifest_path: Optional[Text] = None,
     99                 rebuild: bool = False) -> None:
    100        self.tests_root = tests_root
    101        self.url_base = url_base
    102        self.paths_to_update = paths_to_update or ['']
    103        self.ignore_cache = None
    104        self.mtime_cache = None
    105        tests_root_bytes = tests_root.encode("utf8")
    106        if cache_path is not None:
    107            if manifest_path is not None:
    108                self.mtime_cache = MtimeCache(cache_path, tests_root, manifest_path, rebuild)
    109            if gitignore.has_ignore(tests_root_bytes):
    110                self.ignore_cache = GitIgnoreCache(cache_path, tests_root, rebuild)
    111        self.path_filter = gitignore.PathFilter(tests_root_bytes,
    112                                                extras=[b".git/"],
    113                                                cache=self.ignore_cache)
    114        git = GitHasher(tests_root)
    115        self.hash_cache = git.hash_cache()
    116 
    117    def _make_file_info(self,
    118                        path: Text,
    119                        path_stat: os.stat_result) -> Tuple[Text, Optional[Text], bool]:
    120        mtime_cache = self.mtime_cache
    121        if mtime_cache is None or mtime_cache.updated(path, path_stat):
    122            file_hash = self.hash_cache.get(path, None)
    123            return path, file_hash, True
    124        else:
    125            return path, None, False
    126 
    127    def __iter__(self) -> Iterator[Tuple[Text, Optional[Text], bool]]:
    128        for path_to_update in self.paths_to_update:
    129            path = os.path.join(self.tests_root, path_to_update)
    130            if os.path.isfile(path):
    131                path_stat = os.stat(path)
    132                yield self._make_file_info(path_to_update, path_stat)
    133            elif os.path.isdir(path):
    134                for dirpath, dirnames, filenames in self.path_filter(
    135                        walk(path.encode("utf8"))):
    136                    for filename, path_stat in filenames:
    137                        path = os.path.join(path_to_update,
    138                                            os.path.join(dirpath, filename).decode("utf8"))
    139                        yield self._make_file_info(path, path_stat)
    140 
    141    def dump_caches(self) -> None:
    142        for cache in [self.mtime_cache, self.ignore_cache]:
    143            if cache is not None:
    144                cache.dump()
    145 
    146 
    147 class CacheFile(metaclass=abc.ABCMeta):
    148    def __init__(self, cache_root: Text, tests_root: Text, rebuild: bool = False) -> None:
    149        self.tests_root = tests_root
    150        if not os.path.exists(cache_root):
    151            os.makedirs(cache_root)
    152        self.path = os.path.join(cache_root, self.file_name)
    153        self.modified = False
    154        self.data = self.load(rebuild)
    155 
    156    @abc.abstractproperty
    157    def file_name(self) -> Text:
    158        pass
    159 
    160    def dump(self) -> None:
    161        if not self.modified:
    162            return
    163        with open(self.path, 'w') as f:
    164            jsonlib.dump_local(self.data, f)
    165 
    166    def load(self, rebuild: bool = False) -> Dict[Text, Any]:
    167        data: Dict[Text, Any] = {}
    168        try:
    169            if not rebuild:
    170                with open(self.path) as f:
    171                    try:
    172                        data = jsonlib.load(f)
    173                    except ValueError:
    174                        pass
    175                data = self.check_valid(data)
    176        except OSError:
    177            pass
    178        return data
    179 
    180    def check_valid(self, data: Dict[Text, Any]) -> Dict[Text, Any]:
    181        """Check if the cached data is valid and return an updated copy of the
    182        cache containing only data that can be used."""
    183        return data
    184 
    185 
    186 class MtimeCache(CacheFile):
    187    file_name = "mtime.json"
    188 
    189    def __init__(self, cache_root: Text, tests_root: Text, manifest_path: Text, rebuild: bool = False) -> None:
    190        self.manifest_path = manifest_path
    191        super().__init__(cache_root, tests_root, rebuild)
    192 
    193    def updated(self, rel_path: Text, stat: stat_result) -> bool:
    194        """Return a boolean indicating whether the file changed since the cache was last updated.
    195 
    196        This implicitly updates the cache with the new mtime data."""
    197        mtime = stat.st_mtime
    198        if mtime != self.data.get(rel_path):
    199            self.modified = True
    200            self.data[rel_path] = mtime
    201            return True
    202        return False
    203 
    204    def check_valid(self, data: Dict[Any, Any]) -> Dict[Any, Any]:
    205        if data.get("/tests_root") != self.tests_root:
    206            self.modified = True
    207        else:
    208            if self.manifest_path is not None and os.path.exists(self.manifest_path):
    209                mtime = os.path.getmtime(self.manifest_path)
    210                if data.get("/manifest_path") != [self.manifest_path, mtime]:
    211                    self.modified = True
    212            else:
    213                self.modified = True
    214        if self.modified:
    215            data = {}
    216            data["/tests_root"] = self.tests_root
    217        return data
    218 
    219    def dump(self) -> None:
    220        if self.manifest_path is None:
    221            raise ValueError
    222        if not os.path.exists(self.manifest_path):
    223            return
    224        mtime = os.path.getmtime(self.manifest_path)
    225        self.data["/manifest_path"] = [self.manifest_path, mtime]
    226        self.data["/tests_root"] = self.tests_root
    227        super().dump()
    228 
    229 
    230 class GitIgnoreCache(CacheFile, GitIgnoreCacheType):
    231    file_name = "gitignore2.json"
    232 
    233    def check_valid(self, data: Dict[Any, Any]) -> Dict[Any, Any]:
    234        ignore_path = os.path.join(self.tests_root, ".gitignore")
    235        mtime = os.path.getmtime(ignore_path)
    236        if data.get("/gitignore_file") != [ignore_path, mtime]:
    237            self.modified = True
    238            data = {}
    239            data["/gitignore_file"] = [ignore_path, mtime]
    240        return data
    241 
    242    def __contains__(self, key: Any) -> bool:
    243        try:
    244            key = key.decode("utf-8")
    245        except Exception:
    246            return False
    247 
    248        return key in self.data
    249 
    250    def __getitem__(self, key: bytes) -> bool:
    251        real_key = key.decode("utf-8")
    252        v = self.data[real_key]
    253        assert isinstance(v, bool)
    254        return v
    255 
    256    def __setitem__(self, key: bytes, value: bool) -> None:
    257        real_key = key.decode("utf-8")
    258        if self.data.get(real_key) != value:
    259            self.modified = True
    260            self.data[real_key] = value
    261 
    262    def __delitem__(self, key: bytes) -> None:
    263        real_key = key.decode("utf-8")
    264        del self.data[real_key]
    265 
    266    def __iter__(self) -> Iterator[bytes]:
    267        return (key.encode("utf-8") for key in self.data)
    268 
    269    def __len__(self) -> int:
    270        return len(self.data)
    271 
    272 
    273 def walk(root: bytes) -> Iterable[Tuple[bytes, List[Tuple[bytes, stat_result]], List[Tuple[bytes, stat_result]]]]:
    274    """Re-implementation of os.walk. Returns an iterator over
    275    (dirpath, dirnames, filenames), with some semantic differences
    276    to os.walk.
    277 
    278    This has a similar interface to os.walk, with the important difference
    279    that instead of lists of filenames and directory names, it yields
    280    lists of tuples of the form [(name, stat)] where stat is the result of
    281    os.stat for the file. That allows reusing the same stat data in the
    282    caller. It also always returns the dirpath relative to the root, with
    283    the root iself being returned as the empty string.
    284 
    285    Unlike os.walk the implementation is not recursive."""
    286 
    287    get_stat = os.stat
    288    is_dir = stat.S_ISDIR
    289    is_link = stat.S_ISLNK
    290    join = os.path.join
    291    listdir = os.listdir
    292    relpath = os.path.relpath
    293 
    294    root = os.path.abspath(root)
    295    stack = deque([(root, b"")])
    296 
    297    while stack:
    298        dir_path, rel_path = stack.popleft()
    299        try:
    300            # Note that listdir and error are globals in this module due
    301            # to earlier import-*.
    302            names = listdir(dir_path)
    303        except OSError:
    304            continue
    305 
    306        dirs, non_dirs = [], []
    307        for name in names:
    308            path = join(dir_path, name)
    309            try:
    310                path_stat = get_stat(path)
    311            except OSError:
    312                continue
    313            if is_dir(path_stat.st_mode):
    314                dirs.append((name, path_stat))
    315            else:
    316                non_dirs.append((name, path_stat))
    317 
    318        yield rel_path, dirs, non_dirs
    319        for name, path_stat in dirs:
    320            new_path = join(dir_path, name)
    321            if not is_link(path_stat.st_mode):
    322                stack.append((new_path, relpath(new_path, root)))