tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

lint.py (42477B)


      1 import abc
      2 import argparse
      3 import ast
      4 import json
      5 import logging
      6 import multiprocessing
      7 import os
      8 import re
      9 import subprocess
     10 import sys
     11 import tempfile
     12 from collections import defaultdict
     13 from typing import (Any, Callable, Dict, IO, Iterable, List, Optional, Sequence, Set, Text, Tuple,
     14                    Type, TypeVar)
     15 
     16 from urllib.parse import urlsplit, urljoin, parse_qs
     17 
     18 try:
     19    from xml.etree import cElementTree as ElementTree
     20 except ImportError:
     21    from xml.etree import ElementTree as ElementTree  # type: ignore
     22 
     23 from . import fnmatch
     24 from . import rules
     25 from .. import localpaths
     26 from ..ci.tc.github_checks_output import get_gh_checks_outputter, GitHubChecksOutputter
     27 from ..gitignore.gitignore import PathFilter
     28 from ..wpt import testfiles
     29 from ..manifest.mputil import max_parallelism
     30 from ..manifest.vcs import walk
     31 
     32 from ..manifest.sourcefile import SourceFile, js_meta_re, python_meta_re, space_chars, get_any_variants
     33 
     34 from ..metadata.yaml.load import load_data_to_dict
     35 from ..metadata.meta.schema import META_YML_FILENAME, MetaFile
     36 from ..metadata.webfeatures.schema import WEB_FEATURES_YML_FILENAME, WebFeaturesFile
     37 
     38 # The Ignorelist is a two level dictionary. The top level is indexed by
     39 # error names (e.g. 'TRAILING WHITESPACE'). Each of those then has a map of
     40 # file patterns (e.g. 'foo/*') to a set of specific line numbers for the
     41 # exception. The line numbers are optional; if missing the entire file
     42 # ignores the error.
     43 Ignorelist = Dict[str, Dict[str, Set[Optional[int]]]]
     44 
     45 # Define an arbitrary typevar
     46 T = TypeVar("T")
     47 
     48 
     49 logger: Optional[logging.Logger] = None
     50 
     51 
     52 def setup_logging(prefix: bool = False) -> None:
     53    global logger
     54    if logger is None:
     55        logger = logging.getLogger(os.path.basename(os.path.splitext(__file__)[0]))
     56        handler: logging.Handler = logging.StreamHandler(sys.stdout)
     57        # Only add a handler if the parent logger is missing a handler
     58        parent = logger.parent
     59        assert isinstance(parent, logging.Logger)
     60        if parent and len(parent.handlers) == 0:
     61            handler = logging.StreamHandler(sys.stdout)
     62            logger.addHandler(handler)
     63    if prefix:
     64        format = logging.BASIC_FORMAT
     65    else:
     66        format = "%(message)s"
     67    formatter = logging.Formatter(format)
     68    for handler in logger.handlers:
     69        handler.setFormatter(formatter)
     70    logger.setLevel(logging.DEBUG)
     71 
     72 
     73 setup_logging()
     74 
     75 
     76 ERROR_MSG = """You must fix all errors; for details on how to fix them, see
     77 https://web-platform-tests.org/writing-tests/lint-tool.html
     78 
     79 However, instead of fixing a particular error, it's sometimes
     80 OK to add a line to the lint.ignore file in the root of the
     81 web-platform-tests directory to make the lint tool ignore it.
     82 
     83 For example, to make the lint tool ignore all '%s'
     84 errors in the %s file,
     85 you could add the following line to the lint.ignore file.
     86 
     87 %s: %s"""
     88 
     89 
     90 def all_filesystem_paths(repo_root: Text, subdir: Optional[Text] = None) -> Iterable[Text]:
     91    path_filter = PathFilter(repo_root.encode("utf8"),
     92                             extras=[b".git/"])
     93    if subdir:
     94        expanded_path = subdir.encode("utf8")
     95        subdir_str = expanded_path
     96    else:
     97        expanded_path = repo_root.encode("utf8")
     98    for dirpath, dirnames, filenames in path_filter(walk(expanded_path)):
     99        for filename, _ in filenames:
    100            path = os.path.join(dirpath, filename)
    101            if subdir:
    102                path = os.path.join(subdir_str, path)
    103            assert not os.path.isabs(path), path
    104            yield path.decode("utf8")
    105 
    106 
    107 def _all_files_equal(paths: Iterable[Text]) -> bool:
    108    """
    109    Checks all the paths are files that are byte-for-byte identical
    110 
    111    :param paths: the list of paths to compare
    112    :returns: True if they are all identical
    113    """
    114    paths = list(paths)
    115    if len(paths) < 2:
    116        return True
    117 
    118    first = paths.pop()
    119    size = os.path.getsize(first)
    120    if any(os.path.getsize(path) != size for path in paths):
    121        return False
    122 
    123    # Chunk this to avoid eating up memory and file descriptors
    124    bufsize = 4096*4  # 16KB, a "reasonable" number of disk sectors
    125    groupsize = 8  # Hypothesised to be large enough in the common case that everything fits in one group
    126    with open(first, "rb") as first_f:
    127        for start in range(0, len(paths), groupsize):
    128            path_group = paths[start:start+groupsize]
    129            first_f.seek(0)
    130            try:
    131                files = [open(x, "rb") for x in path_group]
    132                for _ in range(0, size, bufsize):
    133                    a = first_f.read(bufsize)
    134                    for f in files:
    135                        b = f.read(bufsize)
    136                        if a != b:
    137                            return False
    138            finally:
    139                for f in files:
    140                    f.close()
    141 
    142    return True
    143 
    144 
    145 def check_path_length(repo_root: Text, path: Text) -> List[rules.Error]:
    146    if len(path) + 1 > 150:
    147        return [rules.PathLength.error(path, (path, len(path) + 1))]
    148    return []
    149 
    150 
    151 def check_file_type(repo_root: Text, path: Text) -> List[rules.Error]:
    152    if os.path.islink(path):
    153        return [rules.FileType.error(path, (path, "symlink"))]
    154    return []
    155 
    156 
    157 def check_worker_collision(repo_root: Text, path: Text) -> List[rules.Error]:
    158    endings = [(".any.html", ".any.js"),
    159               (".any.worker.html", ".any.js"),
    160               (".worker.html", ".worker.js")]
    161    for path_ending, generated in endings:
    162        if path.endswith(path_ending):
    163            return [rules.WorkerCollision.error(path, (path_ending, generated))]
    164    return []
    165 
    166 
    167 def check_gitignore_file(repo_root: Text, path: Text) -> List[rules.Error]:
    168    if not path.endswith(".gitignore"):
    169        return []
    170 
    171    path_parts = path.split(os.path.sep)
    172    if len(path_parts) == 1:
    173        return []
    174 
    175    if path_parts[-1] != ".gitignore":
    176        return []
    177 
    178    if (path_parts[0] in ["tools", "docs"] or
    179        path_parts[:2] == ["resources", "webidl2"]):
    180        return []
    181 
    182    return [rules.GitIgnoreFile.error(path)]
    183 
    184 
    185 def check_mojom_js(repo_root: Text, path: Text) -> List[rules.Error]:
    186    if path.endswith(".mojom.js"):
    187        return [rules.MojomJSFile.error(path)]
    188    return []
    189 
    190 
    191 def check_ahem_copy(repo_root: Text, path: Text) -> List[rules.Error]:
    192    lpath = path.lower()
    193    if "ahem" in lpath and lpath.endswith(".ttf"):
    194        return [rules.AhemCopy.error(path)]
    195    return []
    196 
    197 
    198 def check_tentative_directories(repo_root: Text, path: Text) -> List[rules.Error]:
    199    path_parts = path.split(os.path.sep)
    200    for directory in path_parts[:-1]:
    201        if "tentative" in directory and directory != "tentative":
    202            return [rules.TentativeDirectoryName.error(path)]
    203    return []
    204 
    205 
    206 def check_git_ignore(repo_root: Text, paths: List[Text]) -> List[rules.Error]:
    207    errors = []
    208 
    209    with tempfile.TemporaryFile('w+', newline='') as f:
    210        for path in paths:
    211            f.write('%s\n' % os.path.join(repo_root, path))
    212        f.seek(0)
    213        try:
    214            matches = subprocess.check_output(
    215                ["git", "check-ignore", "--verbose", "--no-index", "--stdin"], stdin=f)
    216            for match in matches.strip().split(b'\n'):
    217                match_filter, path_bytes = match.split()
    218                _, _, filter_string = match_filter.split(b':')
    219                # If the matching filter reported by check-ignore is a special-case exception,
    220                # that's fine. Otherwise, it requires a new special-case exception.
    221                if filter_string[0:1] != b'!':
    222                    path = path_bytes.decode("utf8")
    223                    errors.append(rules.IgnoredPath.error(path, (path,)))
    224        except subprocess.CalledProcessError:
    225            # Nonzero return code means that no match exists.
    226            pass
    227    return errors
    228 
    229 
    230 drafts_csswg_re = re.compile(r"https?\:\/\/drafts\.csswg\.org\/([^/?#]+)")
    231 w3c_tr_re = re.compile(r"https?\:\/\/www\.w3c?\.org\/TR\/([^/?#]+)")
    232 w3c_dev_re = re.compile(r"https?\:\/\/dev\.w3c?\.org\/[^/?#]+\/([^/?#]+)")
    233 
    234 
    235 def check_unique_testharness_basenames(repo_root: Text, paths: List[Text]) -> List[rules.Error]:
    236    """
    237    Checks that all testharness files have unique basename paths.
    238 
    239    The 'basename path' refers to the entire path excluding the extension. For
    240    example, 'foo/bar/baz.html' and 'foo/bar/baz.xhtml' have the same basename
    241    path, but 'foo/bar/baz.html' and 'foo/qux/baz.html' do not.
    242 
    243    Testharness files with identical basenames have caused issues in downstream
    244    infrastructure (see https://github.com/web-platform-tests/wpt/issues/7570),
    245    and may cause confusion in general.
    246 
    247    :param repo_root: the repository root
    248    :param paths: list of all paths
    249    :returns: a list of errors found in ``paths``
    250    """
    251 
    252    errors = []
    253    file_dict = defaultdict(list)
    254    for path in paths:
    255        source_file = SourceFile(repo_root, path, "/")
    256        if "testharness" not in source_file.possible_types:
    257            continue
    258        file_name, file_extension = os.path.splitext(path)
    259        file_dict[file_name].append(file_extension)
    260    for k, v in file_dict.items():
    261        if len(v) == 1:
    262            continue
    263        context = (', '.join(v),)
    264        for extension in v:
    265            errors.append(rules.DuplicateBasenamePath.error(k + extension, context))
    266    return errors
    267 
    268 
    269 def check_unique_case_insensitive_paths(repo_root: Text, paths: List[Text]) -> List[rules.Error]:
    270    seen: Dict[Text, Text] = {}
    271    errors = []
    272    for path in paths:
    273        lower_path = path.lower()
    274        if lower_path in seen:
    275            context = (seen[lower_path],)
    276            errors.append(rules.DuplicatePathCaseInsensitive.error(path, context))
    277        else:
    278            seen[lower_path] = path
    279    return errors
    280 
    281 
    282 def parse_ignorelist(f: IO[Text]) -> Tuple[Ignorelist, Set[Text]]:
    283    """
    284    Parse the ignorelist file given by `f`, and return the parsed structure.
    285 
    286    :returns: a tuple of an Ignorelist and a set of files that are completely
    287              skipped by the linter (i.e. have a '*' entry).
    288    """
    289 
    290    data: Ignorelist = defaultdict(lambda:defaultdict(set))
    291    skipped_files: Set[Text] = set()
    292 
    293    for line in f:
    294        line = line.strip()
    295        if not line or line.startswith("#"):
    296            continue
    297        parts = [item.strip() for item in line.split(":")]
    298 
    299        if len(parts) == 2:
    300            error_types_s, file_match = parts
    301            line_number: Optional[int] = None
    302        else:
    303            error_types_s, file_match, line_number_s = parts
    304            line_number = int(line_number_s)
    305 
    306        error_types = {item.strip() for item in error_types_s.split(",")}
    307        file_match = os.path.normcase(file_match)
    308 
    309        if "*" in error_types:
    310            skipped_files.add(file_match)
    311        else:
    312            for error_type in error_types:
    313                data[error_type][file_match].add(line_number)
    314 
    315    return data, skipped_files
    316 
    317 
    318 def filter_ignorelist_errors(data: Ignorelist, errors: Sequence[rules.Error]) -> List[rules.Error]:
    319    """
    320    Filter out those errors that are ignored in `data`.
    321    """
    322 
    323    if not errors:
    324        return []
    325 
    326    skipped = [False for item in range(len(errors))]
    327 
    328    for i, (error_type, msg, path, line) in enumerate(errors):
    329        normpath = os.path.normcase(path)
    330        # Allow skipping all lint errors except the IGNORED PATH lint,
    331        # which explains how to fix it correctly and shouldn't be skipped.
    332        if error_type in data and error_type != "IGNORED PATH":
    333            wl_files = data[error_type]
    334            for file_match, allowed_lines in wl_files.items():
    335                if None in allowed_lines or line in allowed_lines:
    336                    if fnmatch.fnmatchcase(normpath, file_match):
    337                        skipped[i] = True
    338 
    339    return [item for i, item in enumerate(errors) if not skipped[i]]
    340 
    341 
    342 regexps = [item() for item in  # type: ignore
    343           [rules.TrailingWhitespaceRegexp,
    344            rules.TabsRegexp,
    345            rules.CRRegexp,
    346            rules.SetTimeoutRegexp,
    347            rules.W3CTestOrgRegexp,
    348            rules.WebPlatformTestRegexp,
    349            rules.Webidl2Regexp,
    350            rules.ConsoleRegexp,
    351            rules.GenerateTestsRegexp,
    352            rules.PrintRegexp,
    353            rules.LayoutTestsRegexp,
    354            rules.MissingDepsRegexp,
    355            rules.SpecialPowersRegexp,
    356            rules.AssertThrowsRegexp,
    357            rules.PromiseRejectsRegexp,
    358            rules.AssertPreconditionRegexp,
    359            rules.HTMLInvalidSyntaxRegexp,
    360            rules.TestDriverInternalRegexp]]
    361 
    362 
    363 def check_regexp_line(repo_root: Text, path: Text, f: IO[bytes]) -> List[rules.Error]:
    364    errors: List[rules.Error] = []
    365 
    366    applicable_regexps = [regexp for regexp in regexps if regexp.applies(path)]
    367 
    368    for i, line in enumerate(f):
    369        for regexp in applicable_regexps:
    370            if regexp.search(line):
    371                errors.append((regexp.name, regexp.description, path, i+1))
    372 
    373    return errors
    374 
    375 
    376 def check_parsed(repo_root: Text, path: Text, f: IO[bytes]) -> List[rules.Error]:
    377    source_file = SourceFile(repo_root, path, "/", contents=f.read())
    378 
    379    errors: List[rules.Error] = []
    380 
    381    if path.startswith("css/"):
    382        if (source_file.type != "support" and
    383            not source_file.name_is_reference and
    384            not source_file.name_is_tentative and
    385            not source_file.name_is_crashtest and
    386            not source_file.spec_links):
    387            return [rules.MissingLink.error(path)]
    388 
    389    if source_file.name_is_non_test:
    390        return []
    391 
    392    if source_file.markup_type is None:
    393        return []
    394 
    395    if source_file.root is None:
    396        return [rules.ParseFailed.error(path)]
    397 
    398    test_type = source_file.type
    399 
    400    if test_type == "manual" and not source_file.name_is_manual:
    401        errors.append(rules.ContentManual.error(path))
    402 
    403    if test_type == "visual" and not source_file.name_is_visual:
    404        errors.append(rules.ContentVisual.error(path))
    405 
    406    about_blank_parts = urlsplit("about:blank")
    407    for reftest_node in source_file.reftest_nodes:
    408        href = reftest_node.attrib.get("href", "").strip(space_chars)
    409        parts = urlsplit(href)
    410 
    411        if parts == about_blank_parts:
    412            continue
    413 
    414        if (parts.scheme or parts.netloc):
    415            errors.append(rules.AbsoluteUrlRef.error(path, (href,)))
    416            continue
    417 
    418        ref_url = urljoin(source_file.url, href)
    419        ref_parts = urlsplit(ref_url)
    420 
    421        if source_file.url == ref_url:
    422            errors.append(rules.SameFileRef.error(path))
    423            continue
    424 
    425        assert ref_parts.path != ""
    426 
    427        reference_file = os.path.join(repo_root, ref_parts.path[1:])
    428        reference_rel = reftest_node.attrib.get("rel", "")
    429 
    430        if not os.path.isfile(reference_file):
    431            errors.append(rules.NonexistentRef.error(path,
    432                                                     (reference_rel, href)))
    433 
    434    if source_file.reftest_nodes:
    435        if test_type not in ("print-reftest", "reftest"):
    436            errors.append(rules.ReferenceInOtherType.error(path, (test_type,)))
    437 
    438    if len(source_file.timeout_nodes) > 1:
    439        errors.append(rules.MultipleTimeout.error(path))
    440 
    441    for timeout_node in source_file.timeout_nodes:
    442        timeout_value = timeout_node.attrib.get("content", "").lower()
    443        if timeout_value != "long":
    444            errors.append(rules.InvalidTimeout.error(path, (timeout_value,)))
    445 
    446    if source_file.content_is_ref_node or source_file.content_is_testharness:
    447        for element in source_file.variant_nodes:
    448            if "content" not in element.attrib:
    449                errors.append(rules.VariantMissing.error(path))
    450            else:
    451                variant = element.attrib["content"]
    452                if is_variant_malformed(variant):
    453                    value = f"{path} `<meta name=variant>` 'content' attribute"
    454                    errors.append(rules.MalformedVariant.error(path, (value,)))
    455 
    456    required_elements: List[Text] = []
    457 
    458    testharnessreport_nodes: List[ElementTree.Element] = []
    459    if source_file.testharness_nodes:
    460        if test_type not in ("testharness", "manual"):
    461            errors.append(rules.TestharnessInOtherType.error(path, (test_type,)))
    462        if len(source_file.testharness_nodes) > 1:
    463            errors.append(rules.MultipleTestharness.error(path))
    464 
    465        testharnessreport_nodes = source_file.root.findall(".//{http://www.w3.org/1999/xhtml}script[@src='/resources/testharnessreport.js']")
    466        if not testharnessreport_nodes:
    467            errors.append(rules.MissingTestharnessReport.error(path))
    468        else:
    469            if len(testharnessreport_nodes) > 1:
    470                errors.append(rules.MultipleTestharnessReport.error(path))
    471 
    472        required_elements.extend(key for key, value in {"testharness": True,
    473                                                        "testharnessreport": len(testharnessreport_nodes) > 0,
    474                                                        "timeout": len(source_file.timeout_nodes) > 0}.items()
    475                                 if value)
    476 
    477    testdriver_vendor_nodes: List[ElementTree.Element] = []
    478    if source_file.testdriver_nodes:
    479        if test_type not in {"testharness", "reftest", "print-reftest", "crashtest", "support"}:
    480            errors.append(rules.TestdriverInUnsupportedType.error(path, (test_type,)))
    481 
    482        if len(source_file.testdriver_nodes) > 1:
    483            errors.append(rules.MultipleTestdriver.error(path))
    484 
    485        testdriver_vendor_nodes = source_file.root.findall(".//{http://www.w3.org/1999/xhtml}script[@src='/resources/testdriver-vendor.js']")
    486        if not testdriver_vendor_nodes:
    487            errors.append(rules.MissingTestdriverVendor.error(path))
    488        else:
    489            if len(testdriver_vendor_nodes) > 1:
    490                errors.append(rules.MultipleTestdriverVendor.error(path))
    491 
    492        required_elements.append("testdriver")
    493        if len(testdriver_vendor_nodes) > 0:
    494            required_elements.append("testdriver-vendor")
    495 
    496    if required_elements:
    497        seen_elements = defaultdict(bool)
    498 
    499        for elem in source_file.root.iter():
    500            if source_file.timeout_nodes and elem == source_file.timeout_nodes[0]:
    501                seen_elements["timeout"] = True
    502                if seen_elements["testharness"]:
    503                    errors.append(rules.LateTimeout.error(path))
    504 
    505            elif source_file.testharness_nodes and elem == source_file.testharness_nodes[0]:
    506                seen_elements["testharness"] = True
    507 
    508            elif testharnessreport_nodes and elem == testharnessreport_nodes[0]:
    509                seen_elements["testharnessreport"] = True
    510                if not seen_elements["testharness"]:
    511                    errors.append(rules.EarlyTestharnessReport.error(path))
    512 
    513            elif source_file.testdriver_nodes and elem == source_file.testdriver_nodes[0]:
    514                seen_elements["testdriver"] = True
    515 
    516            elif testdriver_vendor_nodes and elem == testdriver_vendor_nodes[0]:
    517                seen_elements["testdriver-vendor"] = True
    518                if not seen_elements["testdriver"]:
    519                    errors.append(rules.EarlyTestdriverVendor.error(path))
    520 
    521            if all(seen_elements[name] for name in required_elements):
    522                break
    523 
    524    for element in source_file.root.findall(".//{http://www.w3.org/1999/xhtml}script[@src]"):
    525        src = element.attrib["src"]
    526 
    527        def is_path_correct(script: Text, src: Text) -> bool:
    528            """
    529            If the `src` relevant to the `script`, check that the `src` is the
    530            correct path for `script`.
    531            :param script: the script name to check the `src` for.
    532            :param src: the included path.
    533            :return: if the `src` irrelevant to the `script`, or if the `src`
    534                     path is the correct path.
    535            """
    536            if script == src:
    537                # The src does not provide the full path.
    538                return False
    539 
    540            if "/%s" % script not in src:
    541                # The src is not relevant to the script.
    542                return True
    543 
    544            return ("%s" % src).startswith("/resources/%s" % script)
    545 
    546        def is_query_string_correct(script: Text, src: Text,
    547                allowed_query_string_params: Dict[str, List[str]]) -> bool:
    548            """
    549            Checks if the query string in a script tag's `src` is valid.
    550 
    551            Specifically, it verifies that the query string parameters and their
    552            values are among those allowed for the given script. It handles vendor
    553            prefixes (parameters or values containing a colon) by allowing them
    554            unconditionally.
    555 
    556            :param script: the name of the script (e.g., "testharness.js"). Used
    557                           to verify is the given `src` is related to the
    558                           script.
    559            :param src: the full `src` attribute value from the script tag.
    560            :param allowed_query_string_params: A dictionary where keys are
    561                                                allowed parameter names and
    562                                                values are lists of allowed
    563                                                values for each parameter.
    564            :return: if the query string is empty or contains only allowed
    565                     params.
    566            """
    567            if not ("%s" % src).startswith("/resources/%s?" % script):
    568                # The src is not related to the script.
    569                return True
    570 
    571            try:
    572                query_string = urlsplit(urljoin(source_file.url, src)).query
    573                query_string_params = parse_qs(query_string,
    574                                               keep_blank_values=True)
    575            except ValueError:
    576                # Parsing error means that the query string is incorrect.
    577                return False
    578 
    579            for param_name in query_string_params:
    580                if param_name not in allowed_query_string_params:
    581                    return False
    582 
    583                for param_value in query_string_params[param_name]:
    584                    if ':' in param_value:
    585                        # Allow for vendor-specific values in query parameters.
    586                        continue
    587                    if param_value not in allowed_query_string_params[
    588                            param_name]:
    589                        return False
    590            return True
    591 
    592        if (not is_path_correct("testharness.js", src) or
    593                not is_query_string_correct("testharness.js", src, {})):
    594            errors.append(rules.TestharnessPath.error(path))
    595 
    596        if (not is_path_correct("testharnessreport.js", src) or
    597                not is_query_string_correct("testharnessreport.js", src, {})):
    598            errors.append(rules.TestharnessReportPath.error(path))
    599 
    600        if not is_path_correct("testdriver.js", src):
    601            errors.append(rules.TestdriverPath.error(path))
    602        if not is_query_string_correct("testdriver.js", src,
    603                                       {'feature': ['bidi', 'extensions']}):
    604            errors.append(rules.TestdriverUnsupportedQueryParameter.error(path))
    605 
    606        if (not is_path_correct("testdriver-vendor.js", src) or
    607                not is_query_string_correct("testdriver-vendor.js", src, {})):
    608            errors.append(rules.TestdriverVendorPath.error(path))
    609 
    610        script_path = None
    611        try:
    612            script_path = urlsplit(urljoin(source_file.url, src)).path
    613        except ValueError:
    614            # This happens if the contents of src isn't something that looks like a URL to Python
    615            pass
    616        if (script_path == "/common/reftest-wait.js" and
    617            "reftest-wait" not in source_file.root.attrib.get("class", "").split()):
    618            errors.append(rules.MissingReftestWait.error(path))
    619 
    620    return errors
    621 
    622 
    623 def is_variant_malformed(variant: str) -> bool:
    624    return (variant == "" or variant[0] not in ("?", "#") or
    625            len(variant) == 1 or (variant[0] == "?" and variant[1] == "#"))
    626 
    627 
    628 class ASTCheck(metaclass=abc.ABCMeta):
    629    @abc.abstractproperty
    630    def rule(self) -> Type[rules.Rule]:
    631        pass
    632 
    633    @abc.abstractmethod
    634    def check(self, root: ast.AST) -> List[int]:
    635        pass
    636 
    637 class OpenModeCheck(ASTCheck):
    638    rule = rules.OpenNoMode
    639 
    640    def check(self, root: ast.AST) -> List[int]:
    641        errors = []
    642        for node in ast.walk(root):
    643            if isinstance(node, ast.Call):
    644                if hasattr(node.func, "id") and node.func.id in ("open", "file"):
    645                    if (len(node.args) < 2 and
    646                        all(item.arg != "mode" for item in node.keywords)):
    647                        errors.append(node.lineno)
    648        return errors
    649 
    650 ast_checkers = [item() for item in [OpenModeCheck]]
    651 
    652 def check_python_ast(repo_root: Text, path: Text, f: IO[bytes]) -> List[rules.Error]:
    653    if not path.endswith(".py"):
    654        return []
    655 
    656    try:
    657        root = ast.parse(f.read())
    658    except SyntaxError as e:
    659        return [rules.ParseFailed.error(path, line_no=e.lineno)]
    660 
    661    errors = []
    662    for checker in ast_checkers:
    663        for lineno in checker.check(root):
    664            errors.append(checker.rule.error(path, line_no=lineno))
    665    return errors
    666 
    667 
    668 broken_js_metadata = re.compile(br"//\s*META:")
    669 broken_python_metadata = re.compile(br"#\s*META:")
    670 
    671 
    672 def check_global_metadata(value: bytes) -> Iterable[Tuple[Type[rules.Rule], Tuple[Any, ...]]]:
    673    global_values = {item.strip().decode("utf8") for item in value.split(b",") if item.strip()}
    674 
    675    # TODO: this could check for duplicates and such
    676    for global_value in global_values:
    677        if not get_any_variants(global_value):
    678            yield (rules.UnknownGlobalMetadata, ())
    679 
    680 
    681 def check_script_metadata(repo_root: Text, path: Text, f: IO[bytes]) -> List[rules.Error]:
    682    if path.endswith((".window.js", ".worker.js", ".any.js")):
    683        meta_re = js_meta_re
    684        broken_metadata = broken_js_metadata
    685    elif path.endswith(".py"):
    686        meta_re = python_meta_re
    687        broken_metadata = broken_python_metadata
    688    else:
    689        return []
    690 
    691    done = False
    692    errors = []
    693    for line_no, line in enumerate(f, 1):
    694        assert isinstance(line, bytes), line
    695 
    696        m = meta_re.match(line)
    697        if m:
    698            key, value = m.groups()
    699            if key == b"global":
    700                for rule_class, context in check_global_metadata(value):
    701                    errors.append(rule_class.error(path, context, line_no))
    702            elif key == b"timeout":
    703                if value != b"long":
    704                    errors.append(rules.UnknownTimeoutMetadata.error(path,
    705                                                                     line_no=line_no))
    706            elif key == b"variant":
    707                if is_variant_malformed(value.decode()):
    708                    value = f"{path} `META: variant=...` value"
    709                    errors.append(rules.MalformedVariant.error(path, (value,), line_no))
    710            elif key == b"script":
    711                if value == b"/resources/testharness.js":
    712                    errors.append(rules.MultipleTestharness.error(path, line_no=line_no))
    713                elif value == b"/resources/testharnessreport.js":
    714                    errors.append(rules.MultipleTestharnessReport.error(path, line_no=line_no))
    715            elif key not in (b"title", b"quic"):
    716                errors.append(rules.UnknownMetadata.error(path, line_no=line_no))
    717        else:
    718            done = True
    719 
    720        if done:
    721            if meta_re.match(line):
    722                errors.append(rules.StrayMetadata.error(path, line_no=line_no))
    723            elif meta_re.search(line):
    724                errors.append(rules.IndentedMetadata.error(path, line_no=line_no))
    725            elif broken_metadata.search(line):
    726                errors.append(rules.BrokenMetadata.error(path, line_no=line_no))
    727 
    728    return errors
    729 
    730 
    731 ahem_font_re = re.compile(br"font.*:.*ahem", flags=re.IGNORECASE)
    732 # Ahem can appear either in the global location or in the support
    733 # directory for legacy Mozilla imports
    734 ahem_stylesheet_re = re.compile(br"\/fonts\/ahem\.css|support\/ahem.css",
    735                                flags=re.IGNORECASE)
    736 
    737 
    738 def check_ahem_system_font(repo_root: Text, path: Text, f: IO[bytes]) -> List[rules.Error]:
    739    if not path.endswith((".html", ".htm", ".xht", ".xhtml")):
    740        return []
    741    contents = f.read()
    742    errors = []
    743    if ahem_font_re.search(contents) and not ahem_stylesheet_re.search(contents):
    744        errors.append(rules.AhemSystemFont.error(path))
    745    return errors
    746 
    747 
    748 def check_meta_file(repo_root: Text, path: Text, f: IO[bytes]) -> List[rules.Error]:
    749    if os.path.basename(path) != META_YML_FILENAME:
    750        return []
    751    try:
    752        MetaFile(load_data_to_dict(f))
    753    except Exception:
    754        return [rules.InvalidMetaFile.error(path)]
    755    return []
    756 
    757 
    758 def check_web_features_file(repo_root: Text, path: Text, f: IO[bytes]) -> List[rules.Error]:
    759    if os.path.basename(path) != WEB_FEATURES_YML_FILENAME:
    760        return []
    761    try:
    762        web_features_file: WebFeaturesFile = WebFeaturesFile(load_data_to_dict(f))
    763    except Exception:
    764        return [rules.InvalidWebFeaturesFile.error(path)]
    765    errors = []
    766    base_dir = os.path.join(repo_root, os.path.dirname(path))
    767    files_in_directory = [
    768        f for f in os.listdir(base_dir) if os.path.isfile(os.path.join(base_dir, f))]
    769    for feature in web_features_file.features:
    770        if isinstance(feature.files, list):
    771            for file in feature.files:
    772                if not file.match_files(files_in_directory):
    773                    errors.append(rules.MissingTestInWebFeaturesFile.error(path, (file)))
    774 
    775    return errors
    776 
    777 
    778 def check_path(repo_root: Text, path: Text) -> List[rules.Error]:
    779    """
    780    Runs lints that check the file path.
    781 
    782    :param repo_root: the repository root
    783    :param path: the path of the file within the repository
    784    :returns: a list of errors found in ``path``
    785    """
    786 
    787    errors = []
    788    for path_fn in path_lints:
    789        errors.extend(path_fn(repo_root, path))
    790    return errors
    791 
    792 
    793 def check_all_paths(repo_root: Text, paths: List[Text]) -> List[rules.Error]:
    794    """
    795    Runs lints that check all paths globally.
    796 
    797    :param repo_root: the repository root
    798    :param paths: a list of all the paths within the repository
    799    :returns: a list of errors found in ``f``
    800    """
    801 
    802    errors = []
    803    for paths_fn in all_paths_lints():
    804        errors.extend(paths_fn(repo_root, paths))
    805    return errors
    806 
    807 
    808 def check_file_contents(repo_root: Text, path: Text, f: Optional[IO[bytes]] = None) -> List[rules.Error]:
    809    """
    810    Runs lints that check the file contents.
    811 
    812    :param repo_root: the repository root
    813    :param path: the path of the file within the repository
    814    :param f: a file-like object with the file contents
    815    :returns: a list of errors found in ``f``
    816    """
    817    if f is None:
    818        f = open(os.path.join(repo_root, path), 'rb')
    819    with f:
    820        errors = []
    821        for file_fn in file_lints:
    822            errors.extend(file_fn(repo_root, path, f))
    823            f.seek(0)
    824        return errors
    825 
    826 
    827 def check_file_contents_apply(args: Tuple[Text, Text]) -> List[rules.Error]:
    828    return check_file_contents(*args)
    829 
    830 
    831 def output_errors_text(log: Callable[[Any], None], errors: List[rules.Error]) -> None:
    832    for error_type, description, path, line_number in errors:
    833        pos_string = path
    834        if line_number:
    835            pos_string += ":%s" % line_number
    836        log(f"{pos_string}: {description} ({error_type})")
    837 
    838 
    839 def output_errors_markdown(log: Callable[[Any], None], errors: List[rules.Error]) -> None:
    840    if not errors:
    841        return
    842    heading = """Got lint errors:
    843 
    844 | Error Type | Position | Message |
    845 |------------|----------|---------|"""
    846    for line in heading.split("\n"):
    847        log(line)
    848    for error_type, description, path, line_number in errors:
    849        pos_string = path
    850        if line_number:
    851            pos_string += ":%s" % line_number
    852        log(f"{error_type} | {pos_string} | {description} |")
    853 
    854 
    855 def output_errors_json(log: Callable[[Any], None], errors: List[rules.Error]) -> None:
    856    for error_type, error, path, line_number in errors:
    857        # We use 'print' rather than the log function to ensure that the output
    858        # is valid JSON (e.g. with no logger preamble).
    859        print(json.dumps({"path": path, "lineno": line_number,
    860                          "rule": error_type, "message": error}))
    861 
    862 
    863 def output_errors_github_checks(outputter: GitHubChecksOutputter, errors: List[rules.Error], first_reported: bool) -> None:
    864    """Output errors to the GitHub Checks output markdown format.
    865 
    866    :param outputter: the GitHub Checks outputter
    867    :param errors: a list of error tuples (error type, message, path, line number)
    868    :param first_reported: True if these are the first reported errors
    869    """
    870    if first_reported:
    871        outputter.output(
    872            "\nChanges in this PR contain lint errors, listed below. These "
    873            "errors must either be fixed or added to the list of ignored "
    874            "errors; see [the documentation]("
    875            "https://web-platform-tests.org/writing-tests/lint-tool.html). "
    876            "For help, please tag `@web-platform-tests/wpt-core-team` in a "
    877            "comment.\n")
    878        outputter.output("```")
    879    output_errors_text(outputter.output, errors)
    880 
    881 
    882 def output_error_count(error_count: Dict[Text, int]) -> None:
    883    if not error_count:
    884        return
    885 
    886    assert logger is not None
    887    by_type = " ".join("%s: %d" % item for item in error_count.items())
    888    count = sum(error_count.values())
    889    logger.info("")
    890    if count == 1:
    891        logger.info(f"There was 1 error ({by_type})")
    892    else:
    893        logger.info("There were %d errors (%s)" % (count, by_type))
    894 
    895 
    896 def changed_files(wpt_root: Text) -> List[Text]:
    897    revish = testfiles.get_revish(revish=None)
    898    changed, _ = testfiles.files_changed(revish, None, include_uncommitted=True, include_new=True)
    899    return [os.path.relpath(item, wpt_root) for item in changed]
    900 
    901 
    902 def lint_paths(kwargs: Dict[Text, Any], wpt_root: Text) -> List[Text]:
    903    if kwargs.get("paths"):
    904        paths = []
    905        for path in kwargs.get("paths", []):
    906            if os.path.isdir(path):
    907                path_dir = list(all_filesystem_paths(wpt_root, path))
    908                paths.extend(path_dir)
    909            elif os.path.isfile(path):
    910                paths.append(os.path.relpath(os.path.abspath(path), wpt_root))
    911    elif kwargs["all"]:
    912        paths = list(all_filesystem_paths(wpt_root))
    913    elif kwargs["paths_file"]:
    914        paths = []
    915        with open(kwargs["paths_file"], 'r', newline='') as f:
    916            for line in f.readlines():
    917                path = line.strip()
    918                if os.path.isdir(path):
    919                    path_dir = list(all_filesystem_paths(wpt_root, path))
    920                    paths.extend(path_dir)
    921                elif os.path.isfile(path):
    922                    paths.append(os.path.relpath(os.path.abspath(path), wpt_root))
    923    else:
    924        changed_paths = changed_files(wpt_root)
    925        force_all = False
    926        for path in changed_paths:
    927            path = path.replace(os.path.sep, "/")
    928            if path == "lint.ignore" or path.startswith("tools/lint/"):
    929                force_all = True
    930                break
    931        paths = (list(changed_paths) if not force_all
    932                 else list(all_filesystem_paths(wpt_root)))
    933 
    934    return paths
    935 
    936 
    937 def create_parser() -> argparse.ArgumentParser:
    938    parser = argparse.ArgumentParser()
    939    parser.add_argument("paths", nargs="*",
    940                        help="List of paths to lint")
    941    parser.add_argument("--json", action="store_true",
    942                        help="Output machine-readable JSON format")
    943    parser.add_argument("--markdown", action="store_true",
    944                        help="Output markdown")
    945    parser.add_argument("--repo-root", type=str,
    946                        help="The WPT directory. Use this "
    947                        "option if the lint script exists outside the repository")
    948    parser.add_argument("--ignore-glob", type=str, action="append",
    949                        help="Additional file glob to ignore (repeat to add more). "
    950                        "Globs are matched against paths relative to REPO_ROOT "
    951                        "using fnmatch, except that path separators are normalized.")
    952    parser.add_argument("--all", action="store_true", help="If no paths are passed, try to lint the whole "
    953                        "working directory, not just files that changed")
    954    parser.add_argument("--github-checks-text-file", type=str,
    955                        help="Path to GitHub checks output file for Taskcluster runs")
    956    parser.add_argument("-j", "--jobs", type=int, default=0,
    957                        help="Level to parallelism to use (defaults to 0, which detects the number of CPUs)")
    958    parser.add_argument("--paths-file", help="File containing a list of files to lint, one per line")
    959    return parser
    960 
    961 
    962 def main(venv: Any = None, **kwargs: Any) -> int:
    963 
    964    assert logger is not None
    965    if kwargs.get("json") and kwargs.get("markdown"):
    966        logger.critical("Cannot specify --json and --markdown")
    967        sys.exit(2)
    968 
    969    repo_root = kwargs.get('repo_root') or localpaths.repo_root
    970    output_format = {(True, False): "json",
    971                     (False, True): "markdown",
    972                     (False, False): "normal"}[(kwargs.get("json", False),
    973                                                kwargs.get("markdown", False))]
    974 
    975    if output_format == "markdown":
    976        setup_logging(True)
    977 
    978    paths = lint_paths(kwargs, repo_root)
    979 
    980    ignore_glob = kwargs.get("ignore_glob", [])
    981 
    982    github_checks_outputter = get_gh_checks_outputter(kwargs["github_checks_text_file"])
    983 
    984    jobs = kwargs.get("jobs", 0)
    985 
    986    return lint(repo_root, paths, output_format, ignore_glob, github_checks_outputter, jobs)
    987 
    988 
    989 # best experimental guess at a decent cut-off for using the parallel path
    990 MIN_FILES_FOR_PARALLEL = 80
    991 
    992 
    993 def lint(repo_root: Text,
    994         paths: List[Text],
    995         output_format: Text,
    996         ignore_glob: Optional[List[Text]] = None,
    997         github_checks_outputter: Optional[GitHubChecksOutputter] = None,
    998         jobs: int = 0) -> int:
    999    error_count: Dict[Text, int] = defaultdict(int)
   1000    last = None
   1001 
   1002    if jobs == 0:
   1003        jobs = max_parallelism()
   1004 
   1005    with open(os.path.join(repo_root, "lint.ignore")) as f:
   1006        ignorelist, skipped_files = parse_ignorelist(f)
   1007 
   1008    if ignore_glob:
   1009        skipped_files |= set(ignore_glob)
   1010 
   1011    output_errors = {"json": output_errors_json,
   1012                     "markdown": output_errors_markdown,
   1013                     "normal": output_errors_text}[output_format]
   1014 
   1015    def process_errors(errors: List[rules.Error]) -> Optional[Tuple[Text, Text]]:
   1016        """
   1017        Filters and prints the errors, and updates the ``error_count`` object.
   1018 
   1019        :param errors: a list of error tuples (error type, message, path, line number)
   1020        :returns: ``None`` if there were no errors, or
   1021                  a tuple of the error type and the path otherwise
   1022        """
   1023 
   1024        errors = filter_ignorelist_errors(ignorelist, errors)
   1025        if not errors:
   1026            return None
   1027 
   1028        assert logger is not None
   1029        output_errors(logger.error, errors)
   1030 
   1031        if github_checks_outputter:
   1032            first_output = len(error_count) == 0
   1033            output_errors_github_checks(github_checks_outputter, errors, first_output)
   1034 
   1035        for error_type, error, path, line in errors:
   1036            error_count[error_type] += 1
   1037 
   1038        return (errors[-1][0], path)
   1039 
   1040    to_check_content = []
   1041    skip = set()
   1042 
   1043    for path in paths:
   1044        abs_path = os.path.join(repo_root, path)
   1045        if not os.path.exists(abs_path):
   1046            skip.add(path)
   1047            continue
   1048 
   1049        if any(fnmatch.fnmatch(path, file_match) for file_match in skipped_files):
   1050            skip.add(path)
   1051            continue
   1052 
   1053        errors = check_path(repo_root, path)
   1054        last = process_errors(errors) or last
   1055 
   1056        if not os.path.isdir(abs_path):
   1057            to_check_content.append((repo_root, path))
   1058 
   1059    paths = [p for p in paths if p not in skip]
   1060 
   1061    if jobs > 1 and len(to_check_content) >= MIN_FILES_FOR_PARALLEL:
   1062        pool = multiprocessing.Pool(jobs)
   1063        # submit this job first, as it's the longest running
   1064        all_paths_result = pool.apply_async(check_all_paths, (repo_root, paths))
   1065        # each item tends to be quick, so pass things in large chunks to avoid too much IPC overhead
   1066        errors_it = pool.imap_unordered(check_file_contents_apply, to_check_content, chunksize=40)
   1067        pool.close()
   1068        for errors in errors_it:
   1069            last = process_errors(errors) or last
   1070 
   1071        errors = all_paths_result.get()
   1072        pool.join()
   1073        last = process_errors(errors) or last
   1074    else:
   1075        for item in to_check_content:
   1076            errors = check_file_contents(*item)
   1077            last = process_errors(errors) or last
   1078 
   1079        errors = check_all_paths(repo_root, paths)
   1080        last = process_errors(errors) or last
   1081 
   1082    if output_format in ("normal", "markdown"):
   1083        output_error_count(error_count)
   1084        if error_count:
   1085            assert last is not None
   1086            assert logger is not None
   1087            for line in (ERROR_MSG % (last[0], last[1], last[0], last[1])).split("\n"):
   1088                logger.info(line)
   1089 
   1090    if error_count and github_checks_outputter:
   1091        github_checks_outputter.output("```")
   1092 
   1093    return sum(error_count.values())
   1094 
   1095 
   1096 path_lints = [check_file_type, check_path_length, check_worker_collision, check_ahem_copy,
   1097              check_mojom_js, check_tentative_directories, check_gitignore_file]
   1098 file_lints = [check_regexp_line, check_parsed, check_python_ast, check_script_metadata,
   1099              check_ahem_system_font, check_meta_file, check_web_features_file]
   1100 
   1101 
   1102 def all_paths_lints() -> Any:
   1103    paths = [check_unique_testharness_basenames,
   1104             check_unique_case_insensitive_paths]
   1105    # Don't break users of the lint that don't have git installed.
   1106    try:
   1107        subprocess.check_output(["git", "--version"])
   1108        paths += [check_git_ignore]
   1109    except (subprocess.CalledProcessError, FileNotFoundError):
   1110        print('No git present; skipping .gitignore lint.')
   1111    return paths
   1112 
   1113 
   1114 if __name__ == "__main__":
   1115    args = create_parser().parse_args()
   1116    error_count = main(**vars(args))
   1117    if error_count > 0:
   1118        sys.exit(1)