tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

roller.py (15900B)


      1 # This Source Code Form is subject to the terms of the Mozilla Public
      2 # License, v. 2.0. If a copy of the MPL was not distributed with this
      3 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
      4 
      5 import atexit
      6 import copy
      7 import logging
      8 import os
      9 import signal
     10 import sys
     11 import time
     12 from concurrent.futures import ProcessPoolExecutor
     13 from concurrent.futures.process import _python_exit as futures_atexit
     14 from itertools import chain
     15 from math import ceil
     16 from multiprocessing import get_context
     17 from multiprocessing.queues import Queue
     18 from subprocess import CalledProcessError
     19 
     20 import mozpack.path as mozpath
     21 from mozbuild.util import cpu_count
     22 from mozversioncontrol import (
     23    InvalidRepoPath,
     24    MissingUpstreamRepo,
     25    get_repository_object,
     26 )
     27 
     28 from .errors import LintersNotConfigured, NoValidLinter
     29 from .parser import Parser
     30 from .pathutils import findobject
     31 from .result import ResultSummary
     32 from .types import supported_types
     33 
     34 SHUTDOWN = False
     35 orig_sigint = signal.getsignal(signal.SIGINT)
     36 
     37 # Logger passed down into subprocesses
     38 logger = logging.getLogger("lint")
     39 handler = logging.StreamHandler()
     40 formatter = logging.Formatter("%(levelname)s: %(message)s")
     41 handler.setFormatter(formatter)
     42 logger.addHandler(handler)
     43 
     44 # Adapter for parent process.
     45 log = logging.LoggerAdapter(logger, {"lintname": "mozlint", "pid": os.getpid()})
     46 
     47 
     48 def _setup_logger(log, show_verbose):
     49    if show_verbose:
     50        formatter = logging.Formatter(
     51            "%(asctime)s.%(msecs)03d %(lintname)s (%(pid)s) | %(message)s", "%H:%M:%S"
     52        )
     53        logger.handlers[0].setFormatter(formatter)
     54        log.setLevel(logging.DEBUG)
     55    else:
     56        log.setLevel(logging.WARNING)
     57 
     58 
     59 def _run_worker(config, paths, **lintargs):
     60    log = logging.LoggerAdapter(
     61        logger, {"lintname": config.get("name"), "pid": os.getpid()}
     62    )
     63    _setup_logger(log, lintargs.get("show_verbose"))
     64    lintargs["log"] = log
     65    result = ResultSummary(lintargs["root"])
     66 
     67    if SHUTDOWN:
     68        return result
     69 
     70    # Override warnings setup for code review
     71    # Only disactivating when code_review_warnings is set to False on a linter.yml in use
     72    if os.environ.get("CODE_REVIEW") == "1" and config.get(
     73        "code_review_warnings", True
     74    ):
     75        lintargs["show_warnings"] = True
     76 
     77    # Override ignore thirdparty
     78    # Only deactivating include_thirdparty is set on a linter.yml in use
     79    if config.get("include_thirdparty", False):
     80        lintargs["include_thirdparty"] = True
     81 
     82    func = supported_types[config["type"]]
     83    start_time = time.monotonic()
     84    try:
     85        res = func(paths, config, **lintargs)
     86        # Some linters support fixed operations
     87        # dict returned - {"results":results,"fixed":fixed}
     88        if isinstance(res, dict):
     89            result.fixed += res["fixed"]
     90            res = res["results"] or []
     91        elif isinstance(res, list):
     92            res = res or []
     93        else:
     94            log.error(f"Unexpected result type received: {type(res)}")
     95            assert False
     96    except Exception:
     97        log.exception(f"{config['name']} failed")
     98        res = 1
     99    except (KeyboardInterrupt, SystemExit):
    100        return result
    101    finally:
    102        end_time = time.monotonic()
    103        log.debug(f"Finished in {end_time - start_time:.2f} seconds")
    104        sys.stdout.flush()
    105 
    106    if not isinstance(res, (list, tuple)):
    107        if res:
    108            result.failed_run.add(config["name"])
    109    else:
    110        for r in res:
    111            if not lintargs.get("show_warnings") and r.level == "warning":
    112                result.suppressed_warnings[r.path] += 1
    113                continue
    114 
    115            result.issues[r.path].append(r)
    116 
    117    return result
    118 
    119 
    120 class InterruptableQueue(Queue):
    121    """A multiprocessing.Queue that catches KeyboardInterrupt when a worker is
    122    blocking on it and returns None.
    123 
    124    This is needed to gracefully handle KeyboardInterrupts when a worker is
    125    blocking on ProcessPoolExecutor's call queue.
    126    """
    127 
    128    def __init__(self, *args, **kwargs):
    129        kwargs["ctx"] = get_context()
    130        super().__init__(*args, **kwargs)
    131 
    132    def get(self, *args, **kwargs):
    133        try:
    134            return Queue.get(self, *args, **kwargs)
    135        except KeyboardInterrupt:
    136            return None
    137 
    138 
    139 def _worker_sigint_handler(signum, frame):
    140    """Sigint handler for the worker subprocesses.
    141 
    142    Tells workers not to process the extra jobs on the call queue that couldn't
    143    be canceled by the parent process.
    144    """
    145    global SHUTDOWN  # noqa PLW0603
    146    SHUTDOWN = True
    147    orig_sigint(signum, frame)
    148 
    149 
    150 def wrap_futures_atexit():
    151    """Sometimes futures' atexit handler can spew tracebacks. This wrapper
    152    suppresses them."""
    153    try:
    154        futures_atexit()
    155    except Exception:
    156        # Generally `atexit` handlers aren't supposed to raise exceptions, but the
    157        # futures' handler can sometimes raise when the user presses `CTRL-C`. We
    158        # suppress all possible exceptions here so users have a nice experience
    159        # when canceling their lint run. Any exceptions raised by this function
    160        # won't be useful anyway.
    161        pass
    162 
    163 
    164 atexit.unregister(futures_atexit)
    165 atexit.register(wrap_futures_atexit)
    166 
    167 
    168 class LintRoller:
    169    """Registers and runs linters.
    170 
    171    :param root: Path to which relative paths will be joined. If
    172                 unspecified, root will either be determined from
    173                 version control or cwd.
    174    :param lintargs: Arguments to pass to the underlying linter(s).
    175    """
    176 
    177    MAX_PATHS_PER_JOB = (
    178        50  # set a max size to prevent command lines that are too long on Windows
    179    )
    180 
    181    def __init__(self, root, exclude=None, setupargs=None, **lintargs):
    182        self.parse = Parser(root)
    183        try:
    184            self.vcs = get_repository_object(root)
    185        except InvalidRepoPath:
    186            self.vcs = None
    187 
    188        self.linters = []
    189        self.lintargs = lintargs
    190        self.lintargs["root"] = root
    191        self._setupargs = setupargs or {}
    192 
    193        # result state
    194        self.result = ResultSummary(
    195            root,
    196            # Prevent failing on warnings when the --warnings parameter is set to "soft"
    197            fail_on_warnings=lintargs.get("show_warnings") != "soft",
    198        )
    199 
    200        self.root = root
    201        self.exclude = exclude or []
    202 
    203        _setup_logger(log, lintargs.get("show_verbose"))
    204 
    205    def read(self, paths):
    206        """Parse one or more linters and add them to the registry.
    207 
    208        :param paths: A path or iterable of paths to linter definitions.
    209        """
    210        if isinstance(paths, str):
    211            paths = (paths,)
    212 
    213        for linter in chain(*[self.parse(p) for p in paths]):
    214            # Add only the excludes present in paths
    215            linter["local_exclude"] = linter.get("exclude", [])[:]
    216            # Add in our global excludes
    217            linter.setdefault("exclude", []).extend(self.exclude)
    218            self.linters.append(linter)
    219 
    220    def setup(self, virtualenv_manager=None):
    221        """Run setup for applicable linters"""
    222        if not self.linters:
    223            raise NoValidLinter
    224 
    225        for linter in self.linters:
    226            if "setup" not in linter:
    227                continue
    228 
    229            try:
    230                setupargs = copy.deepcopy(self.lintargs)
    231                setupargs.update(self._setupargs)
    232                setupargs["name"] = linter["name"]
    233                setupargs["log"] = logging.LoggerAdapter(
    234                    logger, {"lintname": linter["name"], "pid": os.getpid()}
    235                )
    236                if virtualenv_manager is not None:
    237                    setupargs["virtualenv_manager"] = virtualenv_manager
    238                start_time = time.monotonic()
    239                res = (
    240                    findobject(linter["setup"])(
    241                        **setupargs,
    242                    )
    243                    or 0
    244                )
    245                log.debug(
    246                    f"setup for {linter['name']} finished in "
    247                    f"{round(time.monotonic() - start_time, 2)} seconds"
    248                )
    249            except Exception:
    250                log.exception(f"{linter['name']} setup failed")
    251                res = 1
    252 
    253            if res > 0:
    254                self.result.failed_setup.add(linter["name"])
    255                self.result.skipped.add(linter["name"])
    256            elif res < 0:
    257                # Negative return code means the linter should be skipped for
    258                # reasons other than a failure.
    259                self.result.skipped.add(linter["name"])
    260 
    261        self.linters = [l for l in self.linters if l["name"] not in self.result.skipped]
    262 
    263        if self.result.failed_setup:
    264            log.error(
    265                "problem with lint setup, skipping {}".format(
    266                    ", ".join(sorted(self.result.failed_setup))
    267                )
    268            )
    269            return 1
    270 
    271        if not self.linters:
    272            log.error("all linters were skipped due to setup, nothing to do!")
    273            return 1
    274 
    275        return 0
    276 
    277    def should_lint_entire_tree(self, vcs_paths: set[str], linter: dict) -> bool:
    278        """Return `True` if the linter should be run on the entire tree."""
    279        # Don't lint the entire tree when `--fix` is passed to linters.
    280        if "fix" in self.lintargs and self.lintargs["fix"]:
    281            return False
    282 
    283        if self.lintargs.get("stdin_filename"):
    284            return False
    285 
    286        # Lint the whole tree when a `support-file` is modified.
    287        return any(
    288            os.path.isfile(p) and mozpath.match(p, pattern)
    289            for pattern in linter.get("support-files", [])
    290            for p in vcs_paths
    291        )
    292 
    293    def _generate_jobs(self, paths, vcs_paths, num_procs):
    294        def __get_current_paths(path=self.root):
    295            return [os.path.join(path, p) for p in os.listdir(path)]
    296 
    297        """A job is of the form (<linter:dict>, <paths:list>)."""
    298        for linter in self.linters:
    299            if self.should_lint_entire_tree(vcs_paths, linter):
    300                lpaths = __get_current_paths()
    301                print(
    302                    "warning: {} support-file modified, linting entire tree "
    303                    "(press ctrl-c to cancel)".format(linter["name"])
    304                )
    305            elif paths == {self.root}:
    306                # If the command line is ".", the path will match with the root
    307                # directory. In this case, get all the paths, so that we can
    308                # benefit from chunking below.
    309                lpaths = __get_current_paths()
    310            else:
    311                lpaths = paths.union(vcs_paths)
    312 
    313            lpaths = list(lpaths) or __get_current_paths(os.getcwd())
    314            chunk_size = (
    315                min(self.MAX_PATHS_PER_JOB, int(ceil(len(lpaths) / num_procs))) or 1
    316            )
    317            if linter["type"] == "global":
    318                # Global linters lint the entire tree in one job.
    319                chunk_size = len(lpaths) or 1
    320            assert chunk_size > 0
    321 
    322            while lpaths:
    323                yield linter, lpaths[:chunk_size]
    324                lpaths = lpaths[chunk_size:]
    325 
    326    def _collect_results(self, future):
    327        if future.cancelled():
    328            return
    329 
    330        # Merge this job's results with our global ones.
    331        try:
    332            self.result.update(future.result())
    333        except Exception:
    334            log.exception("Sub-process raised an error:")
    335 
    336    def roll(self, paths=None, outgoing=None, workdir=None, rev=None, num_procs=None):
    337        """Run all of the registered linters against the specified file paths.
    338 
    339        :param paths: An iterable of files and/or directories to lint.
    340        :param outgoing: Lint files touched by commits that are not on the remote repository.
    341        :param workdir: Lint all files touched in the working directory.
    342        :param num_procs: The number of processes to use. Default: cpu count
    343        :return: A :class:`~result.ResultSummary` instance.
    344        """
    345        if not self.linters:
    346            raise LintersNotConfigured
    347 
    348        self.result.reset()
    349 
    350        # Need to use a set in case vcs operations specify the same file
    351        # more than once.
    352        paths = paths or set()
    353        if isinstance(paths, str):
    354            paths = set([paths])
    355        elif isinstance(paths, (list, tuple)):
    356            paths = set(paths)
    357 
    358        if not self.vcs and (workdir or outgoing):
    359            log.error(
    360                "'{}' is not a known repository, can't use --workdir or --outgoing".format(
    361                    self.lintargs["root"]
    362                )
    363            )
    364 
    365        # Calculate files from VCS
    366        vcs_paths = set()
    367        try:
    368            if workdir:
    369                vcs_paths.update(self.vcs.get_changed_files("AM", mode=workdir))
    370            if rev:
    371                vcs_paths.update(self.vcs.get_changed_files("AM", rev=rev))
    372            if outgoing:
    373                upstream = outgoing if isinstance(outgoing, str) else None
    374                try:
    375                    vcs_paths.update(
    376                        self.vcs.get_outgoing_files("AM", upstream=upstream)
    377                    )
    378                except MissingUpstreamRepo:
    379                    log.warning(
    380                        "could not find default push, specify a remote for --outgoing"
    381                    )
    382        except CalledProcessError as e:
    383            msg = f"command failed: {' '.join(e.cmd)}"
    384            if e.output:
    385                msg = f"{msg}\n{e.output}"
    386            log.exception(msg)
    387 
    388        if not (paths or vcs_paths) and (workdir or outgoing):
    389            if os.environ.get("MOZ_AUTOMATION") and not os.environ.get(
    390                "PYTEST_CURRENT_TEST"
    391            ):
    392                raise Exception(
    393                    "Despite being a CI lint job, no files were linted. Is the task "
    394                    "missing explicit paths?"
    395                )
    396 
    397            log.warning("no files linted")
    398            return self.result
    399 
    400        # Make sure all paths are absolute. Join `paths` to cwd and `vcs_paths` to root.
    401        paths = set(map(os.path.abspath, paths))
    402        vcs_paths = set([
    403            os.path.join(self.root, p) if not os.path.isabs(p) else p for p in vcs_paths
    404        ])
    405 
    406        num_procs = num_procs or cpu_count()
    407        jobs = list(self._generate_jobs(paths, vcs_paths, num_procs))
    408 
    409        # Make sure we never spawn more processes than we have jobs.
    410        num_procs = min(len(jobs), num_procs) or 1
    411        if sys.platform == "win32":
    412            # https://github.com/python/cpython/pull/13132
    413            num_procs = min(num_procs, 61)
    414 
    415        signal.signal(signal.SIGINT, _worker_sigint_handler)
    416        executor = ProcessPoolExecutor(num_procs)
    417        executor._call_queue = InterruptableQueue(executor._call_queue._maxsize)
    418 
    419        # Submit jobs to the worker pool. The _collect_results method will be
    420        # called when a job is finished. We store the futures so that they can
    421        # be canceled in the event of a KeyboardInterrupt.
    422        futures = []
    423        for job in jobs:
    424            future = executor.submit(_run_worker, *job, **self.lintargs)
    425            future.add_done_callback(self._collect_results)
    426            futures.append(future)
    427 
    428        def _parent_sigint_handler(signum, frame):
    429            """Sigint handler for the parent process.
    430 
    431            Cancels all jobs that have not yet been placed on the call queue.
    432            The parent process won't exit until all workers have terminated.
    433            Assuming the linters are implemented properly, this shouldn't take
    434            more than a couple seconds.
    435            """
    436            [f.cancel() for f in futures]
    437            executor.shutdown(wait=True)
    438            log.warning("\nnot all files were linted")
    439            signal.signal(signal.SIGINT, signal.SIG_IGN)
    440 
    441        signal.signal(signal.SIGINT, _parent_sigint_handler)
    442        executor.shutdown()
    443        signal.signal(signal.SIGINT, orig_sigint)
    444        return self.result