manifest.py (16154B)
1 import os 2 from atomicwrites import atomic_write 3 from copy import deepcopy 4 from logging import Logger 5 from multiprocessing import Pool 6 from typing import (Any, Callable, Container, Dict, IO, Iterator, Iterable, List, Optional, Set, Text, 7 Tuple, Type, Union) 8 9 from . import jsonlib 10 from . import vcs 11 from .item import (ConformanceCheckerTest, 12 CrashTest, 13 ManifestItem, 14 ManualTest, 15 PrintRefTest, 16 RefTest, 17 SpecItem, 18 SupportFile, 19 TestharnessTest, 20 VisualTest, 21 WebDriverSpecTest) 22 from .log import get_logger 23 from .mputil import max_parallelism 24 from .sourcefile import SourceFile 25 from .typedata import TypeData 26 27 28 CURRENT_VERSION: int = 9 29 30 31 class ManifestError(Exception): 32 pass 33 34 35 class ManifestVersionMismatch(ManifestError): 36 pass 37 38 39 class InvalidCacheError(Exception): 40 pass 41 42 43 item_classes: Dict[Text, Type[ManifestItem]] = {"testharness": TestharnessTest, 44 "reftest": RefTest, 45 "print-reftest": PrintRefTest, 46 "crashtest": CrashTest, 47 "manual": ManualTest, 48 "wdspec": WebDriverSpecTest, 49 "conformancechecker": ConformanceCheckerTest, 50 "visual": VisualTest, 51 "spec": SpecItem, 52 "support": SupportFile} 53 54 55 def compute_manifest_items(source_file: SourceFile) -> Optional[Tuple[Tuple[Text, ...], Text, Set[ManifestItem], Text]]: 56 rel_path_parts = source_file.rel_path_parts 57 new_type, manifest_items = source_file.manifest_items() 58 file_hash = source_file.hash 59 return rel_path_parts, new_type, set(manifest_items), file_hash 60 61 62 def compute_manifest_spec_items(source_file: SourceFile) -> Optional[Tuple[Tuple[Text, ...], Text, Set[ManifestItem], Text]]: 63 spec_tuple = source_file.manifest_spec_items() 64 if not spec_tuple: 65 return None 66 67 new_type, manifest_items = spec_tuple 68 rel_path_parts = source_file.rel_path_parts 69 file_hash = source_file.hash 70 return rel_path_parts, new_type, set(manifest_items), file_hash 71 72 73 ManifestDataType = Dict[Any, TypeData] 74 75 76 class ManifestData(ManifestDataType): 77 def __init__(self, manifest: "Manifest") -> None: 78 """Dictionary subclass containing a TypeData instance for each test type, 79 keyed by type name""" 80 self.initialized: bool = False 81 for key, value in item_classes.items(): 82 self[key] = TypeData(manifest, value) 83 self.initialized = True 84 self.json_obj: None = None 85 86 def __setitem__(self, key: Text, value: TypeData) -> None: 87 if self.initialized: 88 raise AttributeError 89 dict.__setitem__(self, key, value) 90 91 def paths(self) -> Set[Text]: 92 """Get a list of all paths containing test items 93 without actually constructing all the items""" 94 rv: Set[Text] = set() 95 for item_data in self.values(): 96 for item in item_data: 97 rv.add(os.path.sep.join(item)) 98 return rv 99 100 def type_by_path(self) -> Dict[Tuple[Text, ...], Text]: 101 rv = {} 102 for item_type, item_data in self.items(): 103 for item in item_data: 104 rv[item] = item_type 105 return rv 106 107 108 class Manifest: 109 def __init__(self, tests_root: Text, url_base: Text = "/") -> None: 110 assert url_base is not None 111 self._data: ManifestData = ManifestData(self) 112 self.tests_root: Text = tests_root 113 self.url_base: Text = url_base 114 115 def __iter__(self) -> Iterator[Tuple[Text, Text, Set[ManifestItem]]]: 116 return self.itertypes() 117 118 def itertypes(self, *types: Text) -> Iterator[Tuple[Text, Text, Set[ManifestItem]]]: 119 for item_type in (types or sorted(self._data.keys())): 120 for path in self._data[item_type]: 121 rel_path = os.sep.join(path) 122 tests = self._data[item_type][path] 123 yield item_type, rel_path, tests 124 125 def iterpath(self, path: Text) -> Iterable[ManifestItem]: 126 tpath = tuple(path.split(os.path.sep)) 127 128 for type_tests in self._data.values(): 129 i = type_tests.get(tpath, set()) 130 assert i is not None 131 yield from i 132 133 def iterdir(self, dir_name: Text) -> Iterable[ManifestItem]: 134 tpath = tuple(dir_name.split(os.path.sep)) 135 tpath_len = len(tpath) 136 137 for type_tests in self._data.values(): 138 for path, tests in type_tests.items(): 139 if path[:tpath_len] == tpath: 140 yield from tests 141 142 def update(self, tree: Iterable[Tuple[Text, Optional[Text], bool]], parallel: bool = True, 143 update_func: Callable[..., Any] = compute_manifest_items) -> bool: 144 """Update the manifest given an iterable of items that make up the updated manifest. 145 146 The iterable must either generate tuples of the form (SourceFile, True) for paths 147 that are to be updated, or (path, False) for items that are not to be updated. This 148 unusual API is designed as an optimistaion meaning that SourceFile items need not be 149 constructed in the case we are not updating a path, but the absence of an item from 150 the iterator may be used to remove defunct entries from the manifest.""" 151 152 logger = get_logger() 153 154 changed = False 155 156 # Create local variable references to these dicts so we avoid the 157 # attribute access in the hot loop below 158 data = self._data 159 160 types = data.type_by_path() 161 remaining_manifest_paths = set(types) 162 163 to_update = [] 164 165 for path, file_hash, updated in tree: 166 path_parts = tuple(path.split(os.path.sep)) 167 is_new = path_parts not in remaining_manifest_paths 168 169 if not updated and is_new: 170 # This is kind of a bandaid; if we ended up here the cache 171 # was invalid but we've been using it anyway. That's obviously 172 # bad; we should fix the underlying issue that we sometimes 173 # use an invalid cache. But at least this fixes the immediate 174 # problem 175 raise InvalidCacheError 176 177 if not updated: 178 remaining_manifest_paths.remove(path_parts) 179 else: 180 assert self.tests_root is not None 181 source_file = SourceFile(self.tests_root, 182 path, 183 self.url_base, 184 file_hash) 185 186 hash_changed: bool = False 187 188 if not is_new: 189 if file_hash is None: 190 file_hash = source_file.hash 191 remaining_manifest_paths.remove(path_parts) 192 old_type = types[path_parts] 193 old_hash = data[old_type].hashes[path_parts] 194 if old_hash != file_hash: 195 hash_changed = True 196 del data[old_type][path_parts] 197 198 if is_new or hash_changed: 199 to_update.append(source_file) 200 201 if to_update: 202 logger.debug("Computing manifest update for %s items" % len(to_update)) 203 changed = True 204 205 # 25 items was derived experimentally (2020-01) to be approximately the 206 # point at which it is quicker to create a Pool and parallelize update. 207 pool = None 208 processes = max_parallelism() 209 if parallel and len(to_update) > 25 and processes > 1: 210 pool = Pool(processes) 211 212 # chunksize set > 1 when more than 10000 tests, because 213 # chunking is a net-gain once we get to very large numbers 214 # of items (again, experimentally, 2020-01) 215 chunksize = max(1, len(to_update) // 10000) 216 logger.debug("Doing a multiprocessed update. " 217 "Processes: %s, chunksize: %s" % (processes, chunksize)) 218 results: Iterator[Optional[Tuple[Tuple[Text, ...], 219 Text, 220 Set[ManifestItem], Text]]] = pool.imap_unordered( 221 update_func, 222 to_update, 223 chunksize=chunksize) 224 else: 225 results = map(update_func, to_update) 226 227 for result in results: 228 if not result: 229 continue 230 rel_path_parts, new_type, manifest_items, file_hash = result 231 data[new_type][rel_path_parts] = manifest_items 232 data[new_type].hashes[rel_path_parts] = file_hash 233 234 # Make sure to terminate the Pool, to avoid hangs on Python 3. 235 # https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool 236 if pool is not None: 237 pool.terminate() 238 239 if remaining_manifest_paths: 240 changed = True 241 for rel_path_parts in remaining_manifest_paths: 242 for test_data in data.values(): 243 if rel_path_parts in test_data: 244 del test_data[rel_path_parts] 245 246 return changed 247 248 def to_json(self, caller_owns_obj: bool = True) -> Dict[Text, Any]: 249 """Dump a manifest into a object which can be serialized as JSON 250 251 If caller_owns_obj is False, then the return value remains 252 owned by the manifest; it is _vitally important_ that _no_ 253 (even read) operation is done on the manifest, as otherwise 254 objects within the object graph rooted at the return value can 255 be mutated. This essentially makes this mode very dangerous 256 and only to be used under extreme care. 257 258 """ 259 out_items = { 260 test_type: type_paths.to_json() 261 for test_type, type_paths in self._data.items() if type_paths 262 } 263 264 if caller_owns_obj: 265 out_items = deepcopy(out_items) 266 267 rv: Dict[Text, Any] = {"url_base": self.url_base, 268 "items": out_items, 269 "version": CURRENT_VERSION} 270 return rv 271 272 @classmethod 273 def from_json(cls, 274 tests_root: Text, 275 obj: Dict[Text, Any], 276 types: Optional[Container[Text]] = None, 277 callee_owns_obj: bool = False) -> "Manifest": 278 """Load a manifest from a JSON object 279 280 This loads a manifest for a given local test_root path from an 281 object obj, potentially partially loading it to only load the 282 types given by types. 283 284 If callee_owns_obj is True, then ownership of obj transfers 285 to this function when called, and the caller must never mutate 286 the obj or anything referred to in the object graph rooted at 287 obj. 288 289 """ 290 version = obj.get("version") 291 if version != CURRENT_VERSION: 292 raise ManifestVersionMismatch 293 294 self = cls(tests_root, url_base=obj.get("url_base", "/")) 295 if not hasattr(obj, "items"): 296 raise ManifestError 297 298 for test_type, type_paths in obj["items"].items(): 299 if test_type not in item_classes: 300 raise ManifestError 301 302 if types and test_type not in types: 303 continue 304 305 if not callee_owns_obj: 306 type_paths = deepcopy(type_paths) 307 308 self._data[test_type].set_json(type_paths) 309 310 return self 311 312 313 def load(tests_root: Text, manifest: Union[IO[bytes], Text], types: Optional[Container[Text]] = None) -> Optional[Manifest]: 314 logger = get_logger() 315 316 logger.warning("Prefer load_and_update instead") 317 return _load(logger, tests_root, manifest, types) 318 319 320 __load_cache: Dict[Text, Manifest] = {} 321 322 323 def _load(logger: Logger, 324 tests_root: Text, 325 manifest: Union[IO[bytes], Text], 326 types: Optional[Container[Text]] = None, 327 allow_cached: bool = True 328 ) -> Optional[Manifest]: 329 manifest_path = (manifest if isinstance(manifest, str) 330 else manifest.name) 331 if allow_cached and manifest_path in __load_cache: 332 return __load_cache[manifest_path] 333 334 if isinstance(manifest, str): 335 if os.path.exists(manifest): 336 logger.debug("Opening manifest at %s" % manifest) 337 else: 338 logger.debug("Creating new manifest at %s" % manifest) 339 try: 340 with open(manifest, encoding="utf-8") as f: 341 rv = Manifest.from_json(tests_root, 342 jsonlib.load(f), 343 types=types, 344 callee_owns_obj=True) 345 except OSError: 346 return None 347 except ValueError: 348 logger.warning("%r may be corrupted", manifest) 349 return None 350 else: 351 rv = Manifest.from_json(tests_root, 352 jsonlib.load(manifest), 353 types=types, 354 callee_owns_obj=True) 355 356 if allow_cached: 357 __load_cache[manifest_path] = rv 358 return rv 359 360 361 def load_and_update(tests_root: Text, 362 manifest_path: Text, 363 url_base: Text, 364 update: bool = True, 365 rebuild: bool = False, 366 paths_to_update: Optional[List[Text]] = None, 367 metadata_path: Optional[Text] = None, 368 cache_root: Optional[Text] = None, 369 working_copy: bool = True, 370 types: Optional[Container[Text]] = None, 371 write_manifest: bool = True, 372 allow_cached: bool = True, 373 parallel: bool = True 374 ) -> Manifest: 375 376 logger = get_logger() 377 378 manifest = None 379 if not rebuild: 380 try: 381 manifest = _load(logger, 382 tests_root, 383 manifest_path, 384 types=types, 385 allow_cached=allow_cached) 386 except ManifestVersionMismatch: 387 logger.info("Manifest version changed, rebuilding") 388 except ManifestError: 389 logger.warning("Failed to load manifest, rebuilding") 390 391 if manifest is not None and manifest.url_base != url_base: 392 logger.info("Manifest url base did not match, rebuilding") 393 manifest = None 394 395 if manifest is None: 396 manifest = Manifest(tests_root, url_base) 397 rebuild = True 398 update = True 399 400 if rebuild or update: 401 logger.info("Updating manifest") 402 for retry in range(2): 403 try: 404 tree = vcs.get_tree(tests_root, manifest, manifest_path, cache_root, 405 paths_to_update, working_copy, rebuild) 406 changed = manifest.update(tree, parallel) 407 break 408 except InvalidCacheError: 409 logger.warning("Manifest cache was invalid, doing a complete rebuild") 410 rebuild = True 411 else: 412 # If we didn't break there was an error 413 raise 414 if write_manifest and changed: 415 write(manifest, manifest_path) 416 tree.dump_caches() 417 418 return manifest 419 420 421 def write(manifest: Manifest, manifest_path: Text) -> None: 422 dir_name = os.path.dirname(manifest_path) 423 if not os.path.exists(dir_name): 424 os.makedirs(dir_name) 425 with atomic_write(manifest_path, overwrite=True) as f: 426 # Use ',' instead of the default ', ' separator to prevent trailing 427 # spaces: https://docs.python.org/2/library/json.html#json.dump 428 jsonlib.dump_dist(manifest.to_json(caller_owns_obj=True), f) 429 f.write("\n")