vcs.py (11796B)
1 import abc 2 import os 3 import stat 4 from collections import deque 5 from os import stat_result 6 from typing import (Any, Dict, Iterable, Iterator, List, MutableMapping, Optional, Set, Text, Tuple, 7 TYPE_CHECKING) 8 9 from . import jsonlib 10 from .utils import git 11 12 # Cannot do `from ..gitignore import gitignore` because 13 # relative import beyond toplevel throws *ImportError*! 14 from gitignore import gitignore # type: ignore 15 16 17 if TYPE_CHECKING: 18 from .manifest import Manifest # avoid cyclic import 19 20 GitIgnoreCacheType = MutableMapping[bytes, bool] 21 22 23 def get_tree(tests_root: Text, 24 manifest: "Manifest", 25 manifest_path: Optional[Text], 26 cache_root: Optional[Text], 27 paths_to_update: Optional[List[Text]], 28 working_copy: bool = True, 29 rebuild: bool = False) -> "FileSystem": 30 tree = None 31 if cache_root is None: 32 cache_root = os.path.join(tests_root, ".wptcache") 33 if not os.path.exists(cache_root): 34 try: 35 os.makedirs(cache_root) 36 except OSError: 37 cache_root = None 38 39 if not working_copy: 40 raise ValueError("working_copy=False unsupported") 41 42 if tree is None: 43 tree = FileSystem(tests_root, 44 manifest.url_base, 45 manifest_path=manifest_path, 46 cache_path=cache_root, 47 paths_to_update=paths_to_update, 48 rebuild=rebuild, 49 ) 50 return tree 51 52 53 class GitHasher: 54 def __init__(self, path: Text) -> None: 55 self.git = git(path) 56 57 def _local_changes(self) -> Set[Text]: 58 """get a set of files which have changed between HEAD and working copy""" 59 assert self.git is not None 60 # note that git runs the command with tests_root as the cwd, which may 61 # not be the root of the git repo (e.g., within a browser repo) 62 # 63 # `git diff-index --relative` without a path still compares all tracked 64 # files before non-WPT files are filtered out, which can be slow in 65 # vendor repos. Explicitly pass the CWD (i.e., `tests_root`) as a path 66 # argument to avoid unnecessary diffing. 67 cmd = ["diff-index", "--relative", "--no-renames", "--name-only", "-z", "HEAD", os.curdir] 68 data = self.git(*cmd) 69 return set(data.split("\0")) 70 71 def hash_cache(self) -> Dict[Text, Optional[Text]]: 72 """ 73 A dict of rel_path -> current git object id if the working tree matches HEAD else None 74 """ 75 hash_cache: Dict[Text, Optional[Text]] = {} 76 77 if self.git is None: 78 return hash_cache 79 80 # note that git runs the command with tests_root as the cwd, which may 81 # not be the root of the git repo (e.g., within a browser repo) 82 cmd = ["ls-tree", "-r", "-z", "HEAD"] 83 local_changes = self._local_changes() 84 for result in self.git(*cmd).split("\0")[:-1]: # type: Text 85 data, rel_path = result.rsplit("\t", 1) 86 hash_cache[rel_path] = None if rel_path in local_changes else data.split(" ", 3)[2] 87 88 return hash_cache 89 90 91 92 class FileSystem: 93 def __init__(self, 94 tests_root: Text, 95 url_base: Text, 96 cache_path: Optional[Text], 97 paths_to_update: Optional[List[Text]] = None, 98 manifest_path: Optional[Text] = None, 99 rebuild: bool = False) -> None: 100 self.tests_root = tests_root 101 self.url_base = url_base 102 self.paths_to_update = paths_to_update or [''] 103 self.ignore_cache = None 104 self.mtime_cache = None 105 tests_root_bytes = tests_root.encode("utf8") 106 if cache_path is not None: 107 if manifest_path is not None: 108 self.mtime_cache = MtimeCache(cache_path, tests_root, manifest_path, rebuild) 109 if gitignore.has_ignore(tests_root_bytes): 110 self.ignore_cache = GitIgnoreCache(cache_path, tests_root, rebuild) 111 self.path_filter = gitignore.PathFilter(tests_root_bytes, 112 extras=[b".git/"], 113 cache=self.ignore_cache) 114 git = GitHasher(tests_root) 115 self.hash_cache = git.hash_cache() 116 117 def _make_file_info(self, 118 path: Text, 119 path_stat: os.stat_result) -> Tuple[Text, Optional[Text], bool]: 120 mtime_cache = self.mtime_cache 121 if mtime_cache is None or mtime_cache.updated(path, path_stat): 122 file_hash = self.hash_cache.get(path, None) 123 return path, file_hash, True 124 else: 125 return path, None, False 126 127 def __iter__(self) -> Iterator[Tuple[Text, Optional[Text], bool]]: 128 for path_to_update in self.paths_to_update: 129 path = os.path.join(self.tests_root, path_to_update) 130 if os.path.isfile(path): 131 path_stat = os.stat(path) 132 yield self._make_file_info(path_to_update, path_stat) 133 elif os.path.isdir(path): 134 for dirpath, dirnames, filenames in self.path_filter( 135 walk(path.encode("utf8"))): 136 for filename, path_stat in filenames: 137 path = os.path.join(path_to_update, 138 os.path.join(dirpath, filename).decode("utf8")) 139 yield self._make_file_info(path, path_stat) 140 141 def dump_caches(self) -> None: 142 for cache in [self.mtime_cache, self.ignore_cache]: 143 if cache is not None: 144 cache.dump() 145 146 147 class CacheFile(metaclass=abc.ABCMeta): 148 def __init__(self, cache_root: Text, tests_root: Text, rebuild: bool = False) -> None: 149 self.tests_root = tests_root 150 if not os.path.exists(cache_root): 151 os.makedirs(cache_root) 152 self.path = os.path.join(cache_root, self.file_name) 153 self.modified = False 154 self.data = self.load(rebuild) 155 156 @abc.abstractproperty 157 def file_name(self) -> Text: 158 pass 159 160 def dump(self) -> None: 161 if not self.modified: 162 return 163 with open(self.path, 'w') as f: 164 jsonlib.dump_local(self.data, f) 165 166 def load(self, rebuild: bool = False) -> Dict[Text, Any]: 167 data: Dict[Text, Any] = {} 168 try: 169 if not rebuild: 170 with open(self.path) as f: 171 try: 172 data = jsonlib.load(f) 173 except ValueError: 174 pass 175 data = self.check_valid(data) 176 except OSError: 177 pass 178 return data 179 180 def check_valid(self, data: Dict[Text, Any]) -> Dict[Text, Any]: 181 """Check if the cached data is valid and return an updated copy of the 182 cache containing only data that can be used.""" 183 return data 184 185 186 class MtimeCache(CacheFile): 187 file_name = "mtime.json" 188 189 def __init__(self, cache_root: Text, tests_root: Text, manifest_path: Text, rebuild: bool = False) -> None: 190 self.manifest_path = manifest_path 191 super().__init__(cache_root, tests_root, rebuild) 192 193 def updated(self, rel_path: Text, stat: stat_result) -> bool: 194 """Return a boolean indicating whether the file changed since the cache was last updated. 195 196 This implicitly updates the cache with the new mtime data.""" 197 mtime = stat.st_mtime 198 if mtime != self.data.get(rel_path): 199 self.modified = True 200 self.data[rel_path] = mtime 201 return True 202 return False 203 204 def check_valid(self, data: Dict[Any, Any]) -> Dict[Any, Any]: 205 if data.get("/tests_root") != self.tests_root: 206 self.modified = True 207 else: 208 if self.manifest_path is not None and os.path.exists(self.manifest_path): 209 mtime = os.path.getmtime(self.manifest_path) 210 if data.get("/manifest_path") != [self.manifest_path, mtime]: 211 self.modified = True 212 else: 213 self.modified = True 214 if self.modified: 215 data = {} 216 data["/tests_root"] = self.tests_root 217 return data 218 219 def dump(self) -> None: 220 if self.manifest_path is None: 221 raise ValueError 222 if not os.path.exists(self.manifest_path): 223 return 224 mtime = os.path.getmtime(self.manifest_path) 225 self.data["/manifest_path"] = [self.manifest_path, mtime] 226 self.data["/tests_root"] = self.tests_root 227 super().dump() 228 229 230 class GitIgnoreCache(CacheFile, GitIgnoreCacheType): 231 file_name = "gitignore2.json" 232 233 def check_valid(self, data: Dict[Any, Any]) -> Dict[Any, Any]: 234 ignore_path = os.path.join(self.tests_root, ".gitignore") 235 mtime = os.path.getmtime(ignore_path) 236 if data.get("/gitignore_file") != [ignore_path, mtime]: 237 self.modified = True 238 data = {} 239 data["/gitignore_file"] = [ignore_path, mtime] 240 return data 241 242 def __contains__(self, key: Any) -> bool: 243 try: 244 key = key.decode("utf-8") 245 except Exception: 246 return False 247 248 return key in self.data 249 250 def __getitem__(self, key: bytes) -> bool: 251 real_key = key.decode("utf-8") 252 v = self.data[real_key] 253 assert isinstance(v, bool) 254 return v 255 256 def __setitem__(self, key: bytes, value: bool) -> None: 257 real_key = key.decode("utf-8") 258 if self.data.get(real_key) != value: 259 self.modified = True 260 self.data[real_key] = value 261 262 def __delitem__(self, key: bytes) -> None: 263 real_key = key.decode("utf-8") 264 del self.data[real_key] 265 266 def __iter__(self) -> Iterator[bytes]: 267 return (key.encode("utf-8") for key in self.data) 268 269 def __len__(self) -> int: 270 return len(self.data) 271 272 273 def walk(root: bytes) -> Iterable[Tuple[bytes, List[Tuple[bytes, stat_result]], List[Tuple[bytes, stat_result]]]]: 274 """Re-implementation of os.walk. Returns an iterator over 275 (dirpath, dirnames, filenames), with some semantic differences 276 to os.walk. 277 278 This has a similar interface to os.walk, with the important difference 279 that instead of lists of filenames and directory names, it yields 280 lists of tuples of the form [(name, stat)] where stat is the result of 281 os.stat for the file. That allows reusing the same stat data in the 282 caller. It also always returns the dirpath relative to the root, with 283 the root iself being returned as the empty string. 284 285 Unlike os.walk the implementation is not recursive.""" 286 287 get_stat = os.stat 288 is_dir = stat.S_ISDIR 289 is_link = stat.S_ISLNK 290 join = os.path.join 291 listdir = os.listdir 292 relpath = os.path.relpath 293 294 root = os.path.abspath(root) 295 stack = deque([(root, b"")]) 296 297 while stack: 298 dir_path, rel_path = stack.popleft() 299 try: 300 # Note that listdir and error are globals in this module due 301 # to earlier import-*. 302 names = listdir(dir_path) 303 except OSError: 304 continue 305 306 dirs, non_dirs = [], [] 307 for name in names: 308 path = join(dir_path, name) 309 try: 310 path_stat = get_stat(path) 311 except OSError: 312 continue 313 if is_dir(path_stat.st_mode): 314 dirs.append((name, path_stat)) 315 else: 316 non_dirs.append((name, path_stat)) 317 318 yield rel_path, dirs, non_dirs 319 for name, path_stat in dirs: 320 new_path = join(dir_path, name) 321 if not is_link(path_stat.st_mode): 322 stack.append((new_path, relpath(new_path, root)))