roller.py (15900B)
1 # This Source Code Form is subject to the terms of the Mozilla Public 2 # License, v. 2.0. If a copy of the MPL was not distributed with this 3 # file, You can obtain one at http://mozilla.org/MPL/2.0/. 4 5 import atexit 6 import copy 7 import logging 8 import os 9 import signal 10 import sys 11 import time 12 from concurrent.futures import ProcessPoolExecutor 13 from concurrent.futures.process import _python_exit as futures_atexit 14 from itertools import chain 15 from math import ceil 16 from multiprocessing import get_context 17 from multiprocessing.queues import Queue 18 from subprocess import CalledProcessError 19 20 import mozpack.path as mozpath 21 from mozbuild.util import cpu_count 22 from mozversioncontrol import ( 23 InvalidRepoPath, 24 MissingUpstreamRepo, 25 get_repository_object, 26 ) 27 28 from .errors import LintersNotConfigured, NoValidLinter 29 from .parser import Parser 30 from .pathutils import findobject 31 from .result import ResultSummary 32 from .types import supported_types 33 34 SHUTDOWN = False 35 orig_sigint = signal.getsignal(signal.SIGINT) 36 37 # Logger passed down into subprocesses 38 logger = logging.getLogger("lint") 39 handler = logging.StreamHandler() 40 formatter = logging.Formatter("%(levelname)s: %(message)s") 41 handler.setFormatter(formatter) 42 logger.addHandler(handler) 43 44 # Adapter for parent process. 45 log = logging.LoggerAdapter(logger, {"lintname": "mozlint", "pid": os.getpid()}) 46 47 48 def _setup_logger(log, show_verbose): 49 if show_verbose: 50 formatter = logging.Formatter( 51 "%(asctime)s.%(msecs)03d %(lintname)s (%(pid)s) | %(message)s", "%H:%M:%S" 52 ) 53 logger.handlers[0].setFormatter(formatter) 54 log.setLevel(logging.DEBUG) 55 else: 56 log.setLevel(logging.WARNING) 57 58 59 def _run_worker(config, paths, **lintargs): 60 log = logging.LoggerAdapter( 61 logger, {"lintname": config.get("name"), "pid": os.getpid()} 62 ) 63 _setup_logger(log, lintargs.get("show_verbose")) 64 lintargs["log"] = log 65 result = ResultSummary(lintargs["root"]) 66 67 if SHUTDOWN: 68 return result 69 70 # Override warnings setup for code review 71 # Only disactivating when code_review_warnings is set to False on a linter.yml in use 72 if os.environ.get("CODE_REVIEW") == "1" and config.get( 73 "code_review_warnings", True 74 ): 75 lintargs["show_warnings"] = True 76 77 # Override ignore thirdparty 78 # Only deactivating include_thirdparty is set on a linter.yml in use 79 if config.get("include_thirdparty", False): 80 lintargs["include_thirdparty"] = True 81 82 func = supported_types[config["type"]] 83 start_time = time.monotonic() 84 try: 85 res = func(paths, config, **lintargs) 86 # Some linters support fixed operations 87 # dict returned - {"results":results,"fixed":fixed} 88 if isinstance(res, dict): 89 result.fixed += res["fixed"] 90 res = res["results"] or [] 91 elif isinstance(res, list): 92 res = res or [] 93 else: 94 log.error(f"Unexpected result type received: {type(res)}") 95 assert False 96 except Exception: 97 log.exception(f"{config['name']} failed") 98 res = 1 99 except (KeyboardInterrupt, SystemExit): 100 return result 101 finally: 102 end_time = time.monotonic() 103 log.debug(f"Finished in {end_time - start_time:.2f} seconds") 104 sys.stdout.flush() 105 106 if not isinstance(res, (list, tuple)): 107 if res: 108 result.failed_run.add(config["name"]) 109 else: 110 for r in res: 111 if not lintargs.get("show_warnings") and r.level == "warning": 112 result.suppressed_warnings[r.path] += 1 113 continue 114 115 result.issues[r.path].append(r) 116 117 return result 118 119 120 class InterruptableQueue(Queue): 121 """A multiprocessing.Queue that catches KeyboardInterrupt when a worker is 122 blocking on it and returns None. 123 124 This is needed to gracefully handle KeyboardInterrupts when a worker is 125 blocking on ProcessPoolExecutor's call queue. 126 """ 127 128 def __init__(self, *args, **kwargs): 129 kwargs["ctx"] = get_context() 130 super().__init__(*args, **kwargs) 131 132 def get(self, *args, **kwargs): 133 try: 134 return Queue.get(self, *args, **kwargs) 135 except KeyboardInterrupt: 136 return None 137 138 139 def _worker_sigint_handler(signum, frame): 140 """Sigint handler for the worker subprocesses. 141 142 Tells workers not to process the extra jobs on the call queue that couldn't 143 be canceled by the parent process. 144 """ 145 global SHUTDOWN # noqa PLW0603 146 SHUTDOWN = True 147 orig_sigint(signum, frame) 148 149 150 def wrap_futures_atexit(): 151 """Sometimes futures' atexit handler can spew tracebacks. This wrapper 152 suppresses them.""" 153 try: 154 futures_atexit() 155 except Exception: 156 # Generally `atexit` handlers aren't supposed to raise exceptions, but the 157 # futures' handler can sometimes raise when the user presses `CTRL-C`. We 158 # suppress all possible exceptions here so users have a nice experience 159 # when canceling their lint run. Any exceptions raised by this function 160 # won't be useful anyway. 161 pass 162 163 164 atexit.unregister(futures_atexit) 165 atexit.register(wrap_futures_atexit) 166 167 168 class LintRoller: 169 """Registers and runs linters. 170 171 :param root: Path to which relative paths will be joined. If 172 unspecified, root will either be determined from 173 version control or cwd. 174 :param lintargs: Arguments to pass to the underlying linter(s). 175 """ 176 177 MAX_PATHS_PER_JOB = ( 178 50 # set a max size to prevent command lines that are too long on Windows 179 ) 180 181 def __init__(self, root, exclude=None, setupargs=None, **lintargs): 182 self.parse = Parser(root) 183 try: 184 self.vcs = get_repository_object(root) 185 except InvalidRepoPath: 186 self.vcs = None 187 188 self.linters = [] 189 self.lintargs = lintargs 190 self.lintargs["root"] = root 191 self._setupargs = setupargs or {} 192 193 # result state 194 self.result = ResultSummary( 195 root, 196 # Prevent failing on warnings when the --warnings parameter is set to "soft" 197 fail_on_warnings=lintargs.get("show_warnings") != "soft", 198 ) 199 200 self.root = root 201 self.exclude = exclude or [] 202 203 _setup_logger(log, lintargs.get("show_verbose")) 204 205 def read(self, paths): 206 """Parse one or more linters and add them to the registry. 207 208 :param paths: A path or iterable of paths to linter definitions. 209 """ 210 if isinstance(paths, str): 211 paths = (paths,) 212 213 for linter in chain(*[self.parse(p) for p in paths]): 214 # Add only the excludes present in paths 215 linter["local_exclude"] = linter.get("exclude", [])[:] 216 # Add in our global excludes 217 linter.setdefault("exclude", []).extend(self.exclude) 218 self.linters.append(linter) 219 220 def setup(self, virtualenv_manager=None): 221 """Run setup for applicable linters""" 222 if not self.linters: 223 raise NoValidLinter 224 225 for linter in self.linters: 226 if "setup" not in linter: 227 continue 228 229 try: 230 setupargs = copy.deepcopy(self.lintargs) 231 setupargs.update(self._setupargs) 232 setupargs["name"] = linter["name"] 233 setupargs["log"] = logging.LoggerAdapter( 234 logger, {"lintname": linter["name"], "pid": os.getpid()} 235 ) 236 if virtualenv_manager is not None: 237 setupargs["virtualenv_manager"] = virtualenv_manager 238 start_time = time.monotonic() 239 res = ( 240 findobject(linter["setup"])( 241 **setupargs, 242 ) 243 or 0 244 ) 245 log.debug( 246 f"setup for {linter['name']} finished in " 247 f"{round(time.monotonic() - start_time, 2)} seconds" 248 ) 249 except Exception: 250 log.exception(f"{linter['name']} setup failed") 251 res = 1 252 253 if res > 0: 254 self.result.failed_setup.add(linter["name"]) 255 self.result.skipped.add(linter["name"]) 256 elif res < 0: 257 # Negative return code means the linter should be skipped for 258 # reasons other than a failure. 259 self.result.skipped.add(linter["name"]) 260 261 self.linters = [l for l in self.linters if l["name"] not in self.result.skipped] 262 263 if self.result.failed_setup: 264 log.error( 265 "problem with lint setup, skipping {}".format( 266 ", ".join(sorted(self.result.failed_setup)) 267 ) 268 ) 269 return 1 270 271 if not self.linters: 272 log.error("all linters were skipped due to setup, nothing to do!") 273 return 1 274 275 return 0 276 277 def should_lint_entire_tree(self, vcs_paths: set[str], linter: dict) -> bool: 278 """Return `True` if the linter should be run on the entire tree.""" 279 # Don't lint the entire tree when `--fix` is passed to linters. 280 if "fix" in self.lintargs and self.lintargs["fix"]: 281 return False 282 283 if self.lintargs.get("stdin_filename"): 284 return False 285 286 # Lint the whole tree when a `support-file` is modified. 287 return any( 288 os.path.isfile(p) and mozpath.match(p, pattern) 289 for pattern in linter.get("support-files", []) 290 for p in vcs_paths 291 ) 292 293 def _generate_jobs(self, paths, vcs_paths, num_procs): 294 def __get_current_paths(path=self.root): 295 return [os.path.join(path, p) for p in os.listdir(path)] 296 297 """A job is of the form (<linter:dict>, <paths:list>).""" 298 for linter in self.linters: 299 if self.should_lint_entire_tree(vcs_paths, linter): 300 lpaths = __get_current_paths() 301 print( 302 "warning: {} support-file modified, linting entire tree " 303 "(press ctrl-c to cancel)".format(linter["name"]) 304 ) 305 elif paths == {self.root}: 306 # If the command line is ".", the path will match with the root 307 # directory. In this case, get all the paths, so that we can 308 # benefit from chunking below. 309 lpaths = __get_current_paths() 310 else: 311 lpaths = paths.union(vcs_paths) 312 313 lpaths = list(lpaths) or __get_current_paths(os.getcwd()) 314 chunk_size = ( 315 min(self.MAX_PATHS_PER_JOB, int(ceil(len(lpaths) / num_procs))) or 1 316 ) 317 if linter["type"] == "global": 318 # Global linters lint the entire tree in one job. 319 chunk_size = len(lpaths) or 1 320 assert chunk_size > 0 321 322 while lpaths: 323 yield linter, lpaths[:chunk_size] 324 lpaths = lpaths[chunk_size:] 325 326 def _collect_results(self, future): 327 if future.cancelled(): 328 return 329 330 # Merge this job's results with our global ones. 331 try: 332 self.result.update(future.result()) 333 except Exception: 334 log.exception("Sub-process raised an error:") 335 336 def roll(self, paths=None, outgoing=None, workdir=None, rev=None, num_procs=None): 337 """Run all of the registered linters against the specified file paths. 338 339 :param paths: An iterable of files and/or directories to lint. 340 :param outgoing: Lint files touched by commits that are not on the remote repository. 341 :param workdir: Lint all files touched in the working directory. 342 :param num_procs: The number of processes to use. Default: cpu count 343 :return: A :class:`~result.ResultSummary` instance. 344 """ 345 if not self.linters: 346 raise LintersNotConfigured 347 348 self.result.reset() 349 350 # Need to use a set in case vcs operations specify the same file 351 # more than once. 352 paths = paths or set() 353 if isinstance(paths, str): 354 paths = set([paths]) 355 elif isinstance(paths, (list, tuple)): 356 paths = set(paths) 357 358 if not self.vcs and (workdir or outgoing): 359 log.error( 360 "'{}' is not a known repository, can't use --workdir or --outgoing".format( 361 self.lintargs["root"] 362 ) 363 ) 364 365 # Calculate files from VCS 366 vcs_paths = set() 367 try: 368 if workdir: 369 vcs_paths.update(self.vcs.get_changed_files("AM", mode=workdir)) 370 if rev: 371 vcs_paths.update(self.vcs.get_changed_files("AM", rev=rev)) 372 if outgoing: 373 upstream = outgoing if isinstance(outgoing, str) else None 374 try: 375 vcs_paths.update( 376 self.vcs.get_outgoing_files("AM", upstream=upstream) 377 ) 378 except MissingUpstreamRepo: 379 log.warning( 380 "could not find default push, specify a remote for --outgoing" 381 ) 382 except CalledProcessError as e: 383 msg = f"command failed: {' '.join(e.cmd)}" 384 if e.output: 385 msg = f"{msg}\n{e.output}" 386 log.exception(msg) 387 388 if not (paths or vcs_paths) and (workdir or outgoing): 389 if os.environ.get("MOZ_AUTOMATION") and not os.environ.get( 390 "PYTEST_CURRENT_TEST" 391 ): 392 raise Exception( 393 "Despite being a CI lint job, no files were linted. Is the task " 394 "missing explicit paths?" 395 ) 396 397 log.warning("no files linted") 398 return self.result 399 400 # Make sure all paths are absolute. Join `paths` to cwd and `vcs_paths` to root. 401 paths = set(map(os.path.abspath, paths)) 402 vcs_paths = set([ 403 os.path.join(self.root, p) if not os.path.isabs(p) else p for p in vcs_paths 404 ]) 405 406 num_procs = num_procs or cpu_count() 407 jobs = list(self._generate_jobs(paths, vcs_paths, num_procs)) 408 409 # Make sure we never spawn more processes than we have jobs. 410 num_procs = min(len(jobs), num_procs) or 1 411 if sys.platform == "win32": 412 # https://github.com/python/cpython/pull/13132 413 num_procs = min(num_procs, 61) 414 415 signal.signal(signal.SIGINT, _worker_sigint_handler) 416 executor = ProcessPoolExecutor(num_procs) 417 executor._call_queue = InterruptableQueue(executor._call_queue._maxsize) 418 419 # Submit jobs to the worker pool. The _collect_results method will be 420 # called when a job is finished. We store the futures so that they can 421 # be canceled in the event of a KeyboardInterrupt. 422 futures = [] 423 for job in jobs: 424 future = executor.submit(_run_worker, *job, **self.lintargs) 425 future.add_done_callback(self._collect_results) 426 futures.append(future) 427 428 def _parent_sigint_handler(signum, frame): 429 """Sigint handler for the parent process. 430 431 Cancels all jobs that have not yet been placed on the call queue. 432 The parent process won't exit until all workers have terminated. 433 Assuming the linters are implemented properly, this shouldn't take 434 more than a couple seconds. 435 """ 436 [f.cancel() for f in futures] 437 executor.shutdown(wait=True) 438 log.warning("\nnot all files were linted") 439 signal.signal(signal.SIGINT, signal.SIG_IGN) 440 441 signal.signal(signal.SIGINT, _parent_sigint_handler) 442 executor.shutdown() 443 signal.signal(signal.SIGINT, orig_sigint) 444 return self.result