decision.py (14617B)
1 # mypy: allow-untyped-defs 2 3 import argparse 4 import json 5 import logging 6 import os 7 import re 8 import subprocess 9 from collections import OrderedDict 10 11 import taskcluster 12 13 from . import taskgraph 14 15 16 here = os.path.abspath(os.path.dirname(__file__)) 17 18 19 logging.basicConfig() 20 logger = logging.getLogger() 21 22 23 def get_triggers(event): 24 # Set some variables that we use to get the commits on the current branch 25 ref_prefix = "refs/heads/" 26 is_pr = "pull_request" in event 27 branch = None 28 if not is_pr and "ref" in event: 29 branch = event["ref"] 30 if branch.startswith(ref_prefix): 31 branch = branch[len(ref_prefix):] 32 33 return is_pr, branch 34 35 36 def fetch_event_data(queue): 37 try: 38 task_id = os.environ["TASK_ID"] 39 except KeyError: 40 logger.warning("Missing TASK_ID environment variable") 41 # For example under local testing 42 return None 43 44 task_data = queue.task(task_id) 45 46 return task_data.get("extra", {}).get("github_event") 47 48 49 def filter_triggers(event, all_tasks): 50 is_pr, branch = get_triggers(event) 51 triggered = OrderedDict() 52 for name, task in all_tasks.items(): 53 if "trigger" in task: 54 if is_pr and "pull-request" in task["trigger"]: 55 triggered[name] = task 56 elif branch is not None and "branch" in task["trigger"]: 57 for trigger_branch in task["trigger"]["branch"]: 58 if (trigger_branch == branch or 59 trigger_branch.endswith("*") and branch.startswith(trigger_branch[:-1])): 60 triggered[name] = task 61 logger.info("Triggers match tasks:\n * %s" % "\n * ".join(triggered.keys())) 62 return triggered 63 64 65 def get_run_jobs(event): 66 from tools.ci import jobs 67 revish = "%s..%s" % (event["pull_request"]["base"]["sha"] 68 if "pull_request" in event 69 else event["before"], 70 event["pull_request"]["head"]["sha"] 71 if "pull_request" in event 72 else event["after"]) 73 logger.info("Looking for changes in range %s" % revish) 74 paths = jobs.get_paths(revish=revish) 75 logger.info("Found changes in paths:%s" % "\n".join(paths)) 76 path_jobs = jobs.get_jobs(paths) 77 all_jobs = path_jobs | get_extra_jobs(event) 78 logger.info("Including jobs:\n * %s" % "\n * ".join(all_jobs)) 79 return all_jobs 80 81 82 def get_extra_jobs(event): 83 body = None 84 jobs = set() 85 if "commits" in event and event["commits"]: 86 body = event["commits"][0]["message"] 87 elif "pull_request" in event: 88 body = event["pull_request"]["body"] 89 90 if not body: 91 return jobs 92 93 regexp = re.compile(r"\s*tc-jobs:(.*)$") 94 95 for line in body.splitlines(): 96 m = regexp.match(line) 97 if m: 98 items = m.group(1) 99 for item in items.split(","): 100 jobs.add(item.strip()) 101 break 102 return jobs 103 104 105 def filter_excluded_users(tasks, event): 106 # Some users' pull requests are excluded from tasks, 107 # such as pull requests from automated exports. 108 try: 109 submitter = event["pull_request"]["user"]["login"] 110 except KeyError: 111 # Just ignore excluded users if the 112 # username cannot be pulled from the event. 113 logger.debug("Unable to read username from event. Continuing.") 114 return 115 116 excluded_tasks = [] 117 # A separate list of items for tasks is needed to iterate over 118 # because removing an item during iteration will raise an error. 119 for name, task in list(tasks.items()): 120 if submitter in task.get("exclude-users", []): 121 excluded_tasks.append(name) 122 tasks.pop(name) # removing excluded task 123 if excluded_tasks: 124 logger.info( 125 f"Tasks excluded for user {submitter}:\n * " + 126 "\n * ".join(excluded_tasks) 127 ) 128 129 130 def filter_schedule_if(event, tasks): 131 scheduled = OrderedDict() 132 run_jobs = None 133 for name, task in tasks.items(): 134 if "schedule-if" in task: 135 if "run-job" in task["schedule-if"]: 136 if run_jobs is None: 137 run_jobs = get_run_jobs(event) 138 if "all" in run_jobs or any(item in run_jobs for item in task["schedule-if"]["run-job"]): 139 scheduled[name] = task 140 else: 141 scheduled[name] = task 142 logger.info("Scheduling rules match tasks:\n * %s" % "\n * ".join(scheduled.keys())) 143 return scheduled 144 145 146 def get_fetch_rev(event): 147 is_pr, _ = get_triggers(event) 148 if is_pr: 149 # Try to get the actual rev so that all non-decision tasks are pinned to that 150 rv = ["refs/pull/%s/merge" % event["pull_request"]["number"]] 151 # For every PR GitHub maintains a 'head' branch with commits from the 152 # PR, and a 'merge' branch containing a merge commit between the base 153 # branch and the PR. 154 for ref_type in ["head", "merge"]: 155 ref = "refs/pull/%s/%s" % (event["pull_request"]["number"], ref_type) 156 sha = None 157 try: 158 output = subprocess.check_output(["git", "ls-remote", "origin", ref]) 159 except subprocess.CalledProcessError: 160 import traceback 161 logger.error(traceback.format_exc()) 162 logger.error("Failed to get commit sha1 for %s" % ref) 163 else: 164 if not output: 165 logger.error("Failed to get commit for %s" % ref) 166 else: 167 sha = output.decode("utf-8").split()[0] 168 rv.append(sha) 169 rv = tuple(rv) 170 else: 171 # For a branch push we have a ref and a head but no merge SHA 172 rv = (event["ref"], event["after"], None) 173 assert len(rv) == 3 174 return rv 175 176 177 def build_full_command(event, task): 178 fetch_ref, head_sha, merge_sha = get_fetch_rev(event) 179 cmd_args = { 180 "task_name": task["name"], 181 "repo_url": event["repository"]["clone_url"], 182 "fetch_ref": fetch_ref, 183 "task_cmd": task["command"], 184 "install_str": "", 185 } 186 187 options = task.get("options", {}) 188 options_args = [] 189 options_args.append("--ref=%s" % fetch_ref) 190 if head_sha is not None: 191 options_args.append("--head-rev=%s" % head_sha) 192 if merge_sha is not None: 193 options_args.append("--merge-rev=%s" % merge_sha) 194 if options.get("oom-killer"): 195 options_args.append("--oom-killer") 196 if options.get("xvfb"): 197 options_args.append("--xvfb") 198 if not options.get("hosts"): 199 options_args.append("--no-hosts") 200 else: 201 options_args.append("--hosts") 202 # Check out the expected SHA unless it is overridden (e.g. to base_head). 203 if options.get("checkout"): 204 options_args.append("--checkout=%s" % options["checkout"]) 205 for browser in options.get("browser", []): 206 options_args.append("--browser=%s" % browser) 207 if options.get("channel"): 208 options_args.append("--channel=%s" % options["channel"]) 209 if options.get("install-certificates"): 210 options_args.append("--install-certificates") 211 212 cmd_args["options_str"] = " ".join(str(item) for item in options_args) 213 214 install_packages = task.get("install") 215 if install_packages: 216 install_items = ["apt update -qqy"] 217 install_items.extend("apt install -qqy %s" % item 218 for item in install_packages) 219 cmd_args["install_str"] = "\n".join("sudo %s;" % item for item in install_items) 220 221 return ["/bin/bash", 222 "--login", 223 "-xc", 224 """ 225 ~/start.sh \ 226 %(repo_url)s \ 227 %(fetch_ref)s; 228 %(install_str)s 229 cd web-platform-tests; 230 ./tools/ci/run_tc.py %(options_str)s -- %(task_cmd)s; 231 """ % cmd_args] 232 233 234 def get_owner(event): 235 if "pusher" in event: 236 pusher = event.get("pusher", {}).get("email", "") 237 if pusher and "@" in pusher: 238 return pusher 239 return "web-platform-tests@users.noreply.github.com" 240 241 242 def create_tc_task(event, task, taskgroup_id, depends_on_ids, env_extra=None): 243 command = build_full_command(event, task) 244 task_id = taskcluster.slugId() 245 task_data = { 246 "taskGroupId": taskgroup_id, 247 "created": taskcluster.fromNowJSON(""), 248 "deadline": taskcluster.fromNowJSON(task["deadline"]), 249 "provisionerId": task["provisionerId"], 250 "schedulerId": task["schedulerId"], 251 "workerType": task["workerType"], 252 "scopes": task.get("scopes", []), 253 "metadata": { 254 "name": task["name"], 255 "description": task.get("description", ""), 256 "owner": get_owner(event), 257 "source": event["repository"]["clone_url"] 258 }, 259 "payload": { 260 "artifacts": task.get("artifacts"), 261 "command": command, 262 "image": task.get("image"), 263 "maxRunTime": task.get("maxRunTime"), 264 "env": task.get("env", {}), 265 }, 266 "extra": { 267 "github_event": json.dumps(event) 268 }, 269 "routes": ["checks"] 270 } 271 if "extra" in task: 272 task_data["extra"].update(task["extra"]) 273 if task.get("privileged"): 274 if "capabilities" not in task_data["payload"]: 275 task_data["payload"]["capabilities"] = {} 276 task_data["payload"]["capabilities"]["privileged"] = True 277 if env_extra: 278 task_data["payload"]["env"].update(env_extra) 279 if depends_on_ids: 280 task_data["dependencies"] = depends_on_ids 281 task_data["requires"] = task.get("requires", "all-completed") 282 return task_id, task_data 283 284 285 def get_artifact_data(artifact, task_id_map): 286 task_id, data = task_id_map[artifact["task"]] 287 return { 288 "task": task_id, 289 "glob": artifact["glob"], 290 "dest": artifact["dest"], 291 "extract": artifact.get("extract", False) 292 } 293 294 295 def build_task_graph(event, all_tasks, tasks): 296 task_id_map = OrderedDict() 297 taskgroup_id = os.environ.get("TASK_ID", taskcluster.slugId()) 298 sink_task_depends_on = [] 299 300 def add_task(task_name, task): 301 depends_on_ids = [] 302 if "depends-on" in task: 303 for depends_name in task["depends-on"]: 304 if depends_name not in task_id_map: 305 add_task(depends_name, 306 all_tasks[depends_name]) 307 depends_on_ids.append(task_id_map[depends_name][0]) 308 env_extra = {} 309 if "download-artifacts" in task: 310 env_extra["TASK_ARTIFACTS"] = json.dumps( 311 [get_artifact_data(artifact, task_id_map) 312 for artifact in task["download-artifacts"]]) 313 314 task_id, task_data = create_tc_task(event, task, taskgroup_id, depends_on_ids, 315 env_extra=env_extra) 316 task_id_map[task_name] = (task_id, task_data) 317 318 # The string conversion here is because if we use variables they are 319 # converted to a string, so it's easier to use a string always 320 if str(task.get("required", "True")) != "False" and task_name != "sink-task": 321 sink_task_depends_on.append(task_id) 322 323 for task_name, task in tasks.items(): 324 if task_name == "sink-task": 325 # sink-task will be created below at the end of the ordered dict, 326 # so that it can depend on all other tasks. 327 continue 328 add_task(task_name, task) 329 330 # GitHub branch protection for pull requests needs us to name explicit 331 # required tasks - which doesn't suffice when using a dynamic task graph. 332 # To work around this we declare a sink task that depends on all the other 333 # tasks completing, and checks if they have succeeded. We can then 334 # make the sink task the sole required task for pull requests. 335 sink_task = tasks.get("sink-task") 336 if sink_task: 337 logger.info("Scheduling sink-task") 338 sink_task["command"] += " {}".format(" ".join(sink_task_depends_on)) 339 task_id_map["sink-task"] = create_tc_task( 340 event, sink_task, taskgroup_id, sink_task_depends_on) 341 else: 342 logger.info("sink-task is not scheduled") 343 344 return task_id_map 345 346 347 def create_tasks(queue, task_id_map): 348 for (task_id, task_data) in task_id_map.values(): 349 queue.createTask(task_id, task_data) 350 351 352 def get_event(queue, event_path): 353 if event_path is not None: 354 try: 355 with open(event_path) as f: 356 event_str = f.read() 357 except OSError: 358 logger.error("Missing event file at path %s" % event_path) 359 raise 360 elif "TASK_EVENT" in os.environ: 361 event_str = os.environ["TASK_EVENT"] 362 else: 363 event_str = fetch_event_data(queue) 364 if not event_str: 365 raise ValueError("Can't find GitHub event definition; for local testing pass --event-path") 366 try: 367 return json.loads(event_str) 368 except ValueError: 369 logger.error("Event was not valid JSON") 370 raise 371 372 373 def decide(event): 374 all_tasks = taskgraph.load_tasks_from_path(os.path.join(here, "tasks", "test.yml")) 375 376 triggered_tasks = filter_triggers(event, all_tasks) 377 scheduled_tasks = filter_schedule_if(event, triggered_tasks) 378 filter_excluded_users(scheduled_tasks, event) 379 380 logger.info("UNSCHEDULED TASKS:\n %s" % "\n ".join(sorted(set(all_tasks.keys()) - 381 set(scheduled_tasks.keys())))) 382 logger.info("SCHEDULED TASKS:\n %s" % "\n ".join(sorted(scheduled_tasks.keys()))) 383 384 task_id_map = build_task_graph(event, all_tasks, scheduled_tasks) 385 return task_id_map 386 387 388 def get_parser(): 389 parser = argparse.ArgumentParser() 390 parser.add_argument("--event-path", 391 help="Path to file containing serialized GitHub event") 392 parser.add_argument("--dry-run", action="store_true", 393 help="Don't actually create the tasks, just output the tasks that " 394 "would be created") 395 parser.add_argument("--tasks-path", 396 help="Path to file in which to write payload for all scheduled tasks") 397 return parser 398 399 400 def run(venv, **kwargs): 401 queue = taskcluster.Queue({'rootUrl': os.environ['TASKCLUSTER_PROXY_URL']}) 402 event = get_event(queue, event_path=kwargs["event_path"]) 403 404 task_id_map = decide(event) 405 406 try: 407 if not kwargs["dry_run"]: 408 create_tasks(queue, task_id_map) 409 else: 410 print(json.dumps(task_id_map, indent=2)) 411 finally: 412 if kwargs["tasks_path"]: 413 with open(kwargs["tasks_path"], "w") as f: 414 json.dump(task_id_map, f, indent=2)