util.py (16658B)
1 # This Source Code Form is subject to the terms of the Mozilla Public 2 # License, v. 2.0. If a copy of the MPL was not distributed with this 3 # file, You can obtain one at http://mozilla.org/MPL/2.0/. 4 5 6 import copy 7 import logging 8 import os 9 import re 10 from concurrent import futures 11 from functools import reduce 12 13 import jsone 14 import requests 15 from slugid import nice as slugid 16 from taskcluster.exceptions import TaskclusterRestFailure 17 from taskgraph import create 18 from taskgraph.optimize.base import optimize_task_graph 19 from taskgraph.taskgraph import TaskGraph 20 from taskgraph.util.taskcluster import ( 21 CONCURRENCY, 22 find_task_id, 23 get_artifact, 24 get_session, 25 get_task_definition, 26 list_tasks, 27 parse_time, 28 ) 29 30 from gecko_taskgraph.decision import read_artifact, rename_artifact, write_artifact 31 from gecko_taskgraph.util.taskcluster import trigger_hook 32 from gecko_taskgraph.util.taskgraph import find_decision_task 33 34 logger = logging.getLogger(__name__) 35 36 INDEX_TMPL = "gecko.v2.{}.pushlog-id.{}.decision" 37 PUSHLOG_TMPL = "{}/json-pushes?version=2&startID={}&endID={}" 38 39 40 def _tags_within_context(tags, context=[]): 41 """A context of [] means that it *only* applies to a task group""" 42 return any( 43 all(tag in tags and tags[tag] == tag_set[tag] for tag in tag_set.keys()) 44 for tag_set in context 45 ) 46 47 48 def _extract_applicable_action(actions_json, action_name, task_group_id, task_id): 49 """Extract action that applies to the given task or task group. 50 51 A task (as defined by its tags) is said to match a tag-set if its 52 tags are a super-set of the tag-set. A tag-set is a set of key-value pairs. 53 54 An action (as defined by its context) is said to be relevant for 55 a given task, if the task's tags match one of the tag-sets given 56 in the context property of the action. 57 58 The order of the actions is significant. When multiple actions apply to a 59 task the first one takes precedence. 60 61 For more details visit: 62 https://docs.taskcluster.net/docs/manual/design/conventions/actions/spec 63 """ 64 if task_id: 65 tags = get_task_definition(task_id).get("tags") 66 67 for _action in actions_json["actions"]: 68 if action_name != _action["name"]: 69 continue 70 71 context = _action.get("context", []) 72 # Ensure the task is within the context of the action 73 if task_id and tags and _tags_within_context(tags, context): 74 return _action 75 if context == []: 76 return _action 77 78 available_actions = ", ".join(sorted({a["name"] for a in actions_json["actions"]})) 79 raise LookupError( 80 f"{action_name} action is not available for this task. Available: {available_actions}" 81 ) 82 83 84 def trigger_action(action_name, decision_task_id, task_id=None, input={}): 85 if not decision_task_id: 86 raise ValueError("No decision task. We can't find the actions artifact.") 87 actions_json = get_artifact(decision_task_id, "public/actions.json") 88 if actions_json["version"] != 1: 89 raise RuntimeError("Wrong version of actions.json, unable to continue") 90 91 # These values substitute $eval in the template 92 context = { 93 "input": input, 94 "taskId": task_id, 95 "taskGroupId": decision_task_id, 96 } 97 # https://docs.taskcluster.net/docs/manual/design/conventions/actions/spec#variables 98 context.update(actions_json["variables"]) 99 action = _extract_applicable_action( 100 actions_json, action_name, decision_task_id, task_id 101 ) 102 kind = action["kind"] 103 if create.testing: 104 logger.info(f"Skipped triggering action for {kind} as testing is enabled") 105 elif kind == "hook": 106 hook_payload = jsone.render(action["hookPayload"], context) 107 trigger_hook(action["hookGroupId"], action["hookId"], hook_payload) 108 else: 109 raise NotImplementedError(f"Unable to submit actions with {kind} kind.") 110 111 112 def get_pushes_from_params_input(parameters, input): 113 inclusive_tweak = 1 if input.get("inclusive") else 0 114 return get_pushes( 115 project=parameters["head_repository"], 116 end_id=int(parameters["pushlog_id"]) - (1 - inclusive_tweak), 117 depth=input.get("depth", 9) + inclusive_tweak, 118 ) 119 120 121 def get_pushes(project, end_id, depth, full_response=False): 122 pushes = [] 123 while True: 124 start_id = max(end_id - depth, 0) 125 pushlog_url = PUSHLOG_TMPL.format(project, start_id, end_id) 126 logger.debug(pushlog_url) 127 r = requests.get(pushlog_url) 128 r.raise_for_status() 129 pushes = pushes + list(r.json()["pushes"].keys()) 130 if len(pushes) >= depth: 131 break 132 133 end_id = start_id - 1 134 start_id -= depth 135 if start_id < 0: 136 break 137 138 pushes = sorted(pushes)[-depth:] 139 push_dict = {push: r.json()["pushes"][push] for push in pushes} 140 return push_dict if full_response else pushes 141 142 143 def get_decision_task_id(project, push_id): 144 return find_task_id(INDEX_TMPL.format(project, push_id)) 145 146 147 def get_parameters(decision_task_id): 148 return get_artifact(decision_task_id, "public/parameters.yml") 149 150 151 def get_tasks_with_downstream(labels, full_task_graph, label_to_taskid): 152 # Used to gather tasks when downstream tasks need to run as well 153 return full_task_graph.graph.transitive_closure( 154 set(labels), reverse=True 155 ).nodes & set(label_to_taskid.keys()) 156 157 158 def fetch_graph_and_labels(parameters, graph_config): 159 decision_task_id = find_decision_task(parameters, graph_config) 160 161 # First grab the graph and labels generated during the initial decision task 162 full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json") 163 logger.info("Load taskgraph from JSON.") 164 _, full_task_graph = TaskGraph.from_json(full_task_graph) 165 label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json") 166 label_to_taskids = {label: [task_id] for label, task_id in label_to_taskid.items()} 167 168 logger.info("Fetching additional tasks from action and cron tasks.") 169 # fetch everything in parallel; this avoids serializing any delay in downloading 170 # each artifact (such as waiting for the artifact to be mirrored locally) 171 with futures.ThreadPoolExecutor(CONCURRENCY) as e: 172 fetches = [] 173 174 # fetch any modifications made by action tasks and add the new tasks 175 def fetch_action(task_id): 176 logger.info(f"fetching label-to-taskid.json for action task {task_id}") 177 try: 178 run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json") 179 label_to_taskid.update(run_label_to_id) 180 for label, existing_task_id in run_label_to_id.items(): 181 label_to_taskids.setdefault(label, []).append(existing_task_id) 182 except TaskclusterRestFailure as e: 183 if e.status_code != 404: 184 raise 185 logger.debug(f"No label-to-taskid.json found for {task_id}: {e}") 186 187 head_rev_param = "{}head_rev".format(graph_config["project-repo-param-prefix"]) 188 189 namespace = "{}.v2.{}.revision.{}.taskgraph.actions".format( 190 graph_config["trust-domain"], 191 parameters["project"], 192 parameters[head_rev_param], 193 ) 194 for task_id in list_tasks(namespace): 195 fetches.append(e.submit(fetch_action, task_id)) 196 197 # Similarly for cron tasks.. 198 def fetch_cron(task_id): 199 logger.info(f"fetching label-to-taskid.json for cron task {task_id}") 200 try: 201 run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json") 202 label_to_taskid.update(run_label_to_id) 203 for label, existing_task_id in run_label_to_id.items(): 204 label_to_taskids.setdefault(label, []).append(existing_task_id) 205 except TaskclusterRestFailure as e: 206 if e.status_code != 404: 207 raise 208 logger.debug(f"No label-to-taskid.json found for {task_id}: {e}") 209 210 namespace = "{}.v2.{}.revision.{}.cron".format( 211 graph_config["trust-domain"], 212 parameters["project"], 213 parameters[head_rev_param], 214 ) 215 for task_id in list_tasks(namespace): 216 fetches.append(e.submit(fetch_cron, task_id)) 217 218 # now wait for each fetch to complete, raising an exception if there 219 # were any issues 220 for f in futures.as_completed(fetches): 221 f.result() 222 223 return (decision_task_id, full_task_graph, label_to_taskid, label_to_taskids) 224 225 226 def create_task_from_def(task_def, level, action_tag=None): 227 """Create a new task from a definition rather than from a label 228 that is already in the full-task-graph. The task definition will 229 have {relative-datestamp': '..'} rendered just like in a decision task. 230 Use this for entirely new tasks or ones that change internals of the task. 231 It is useful if you want to "edit" the full_task_graph and then hand 232 it to this function. No dependencies will be scheduled. You must handle 233 this yourself. Seeing how create_tasks handles it might prove helpful.""" 234 task_def["schedulerId"] = f"gecko-level-{level}" 235 label = task_def["metadata"]["name"] 236 task_id = slugid() 237 session = get_session() 238 if action_tag: 239 task_def.setdefault("tags", {}).setdefault("action", action_tag) 240 create.create_task(session, task_id, label, task_def) 241 242 243 def update_parent(task, graph): 244 task.task.setdefault("extra", {})["parent"] = os.environ.get("TASK_ID", "") 245 return task 246 247 248 def update_action_tag(task, graph, action_tag): 249 task.task.setdefault("tags", {}).setdefault("action", action_tag) 250 return task 251 252 253 def update_dependencies(task, graph): 254 if os.environ.get("TASK_ID"): 255 task.task.setdefault("dependencies", []).append(os.environ["TASK_ID"]) 256 return task 257 258 259 def create_tasks( 260 graph_config, 261 to_run, 262 full_task_graph, 263 label_to_taskid, 264 params, 265 decision_task_id, 266 suffix="", 267 modifier=lambda t: t, 268 action_tag=None, 269 ): 270 """Create new tasks. The task definition will have {relative-datestamp': 271 '..'} rendered just like in a decision task. Action callbacks should use 272 this function to create new tasks, 273 allowing easy debugging with `mach taskgraph action-callback --test`. 274 This builds up all required tasks to run in order to run the tasks requested. 275 276 Optionally this function takes a `modifier` function that is passed in each 277 task before it is put into a new graph. It should return a valid task. Note 278 that this is passed _all_ tasks in the graph, not just the set in to_run. You 279 may want to skip modifying tasks not in your to_run list. 280 281 If `suffix` is given, then it is used to give unique names to the resulting 282 artifacts. If you call this function multiple times in the same action, 283 pass a different suffix each time to avoid overwriting artifacts. 284 285 If you wish to create the tasks in a new group, leave out decision_task_id. 286 287 Returns an updated label_to_taskid containing the new tasks""" 288 # triggers registration of strategies 289 import gecko_taskgraph.optimize # noqa 290 291 if suffix != "": 292 suffix = f"-{suffix}" 293 to_run = set(to_run) 294 295 # Copy to avoid side-effects later 296 full_task_graph = copy.deepcopy(full_task_graph) 297 label_to_taskid = label_to_taskid.copy() 298 299 target_graph = full_task_graph.graph.transitive_closure(to_run) 300 target_task_graph = TaskGraph( 301 {l: modifier(full_task_graph[l]) for l in target_graph.nodes}, target_graph 302 ) 303 target_task_graph.for_each_task(update_parent) 304 if action_tag: 305 target_task_graph.for_each_task(update_action_tag, action_tag) 306 if decision_task_id and decision_task_id != os.environ.get("TASK_ID"): 307 target_task_graph.for_each_task(update_dependencies) 308 optimized_task_graph, label_to_taskid = optimize_task_graph( 309 target_task_graph, 310 to_run, 311 params, 312 to_run, 313 decision_task_id, 314 existing_tasks=label_to_taskid, 315 ) 316 write_artifact(f"task-graph{suffix}.json", optimized_task_graph.to_json()) 317 write_artifact(f"label-to-taskid{suffix}.json", label_to_taskid) 318 write_artifact(f"to-run{suffix}.json", list(to_run)) 319 create.create_tasks( 320 graph_config, 321 optimized_task_graph, 322 label_to_taskid, 323 params, 324 decision_task_id, 325 ) 326 return label_to_taskid 327 328 329 def _update_reducer(accumulator, new_value): 330 "similar to set or dict `update` method, but returning the modified object" 331 accumulator.update(new_value) 332 return accumulator 333 334 335 def combine_task_graph_files(suffixes): 336 """Combine task-graph-{suffix}.json files into a single task-graph.json file. 337 338 Since Chain of Trust verification requires a task-graph.json file that 339 contains all children tasks, we can combine the various task-graph-0.json 340 type files into a master task-graph.json file at the end. 341 342 Actions also look for various artifacts, so we combine those in a similar 343 fashion. 344 345 In the case where there is only one suffix, we simply rename it to avoid the 346 additional cost of uploading two copies of the same data. 347 """ 348 349 if len(suffixes) == 1: 350 for filename in ["task-graph", "label-to-taskid", "to-run"]: 351 rename_artifact(f"{filename}-{suffixes[0]}.json", f"{filename}.json") 352 return 353 354 def combine(file_contents, base): 355 return reduce(_update_reducer, file_contents, base) 356 357 files = [read_artifact(f"task-graph-{suffix}.json") for suffix in suffixes] 358 write_artifact("task-graph.json", combine(files, dict())) 359 360 files = [read_artifact(f"label-to-taskid-{suffix}.json") for suffix in suffixes] 361 write_artifact("label-to-taskid.json", combine(files, dict())) 362 363 files = [read_artifact(f"to-run-{suffix}.json") for suffix in suffixes] 364 write_artifact("to-run.json", list(combine(files, set()))) 365 366 367 def relativize_datestamps(task_def): 368 """ 369 Given a task definition as received from the queue, convert all datestamps 370 to {relative_datestamp: ..} format, with the task creation time as "now". 371 The result is useful for handing to ``create_task``. 372 """ 373 base = parse_time(task_def["created"]) 374 # borrowed from https://github.com/epoberezkin/ajv/blob/master/lib/compile/formats.js 375 ts_pattern = re.compile( 376 r"^\d\d\d\d-[0-1]\d-[0-3]\d[t\s]" 377 r"(?:[0-2]\d:[0-5]\d:[0-5]\d|23:59:60)(?:\.\d+)?" 378 r"(?:z|[+-]\d\d:\d\d)$", 379 re.I, 380 ) 381 382 def recurse(value): 383 if isinstance(value, str): 384 if ts_pattern.match(value): 385 value = parse_time(value) 386 diff = value - base 387 return {"relative-datestamp": f"{int(diff.total_seconds())} seconds"} 388 if isinstance(value, list): 389 return [recurse(e) for e in value] 390 if isinstance(value, dict): 391 return {k: recurse(v) for k, v in value.items()} 392 return value 393 394 return recurse(task_def) 395 396 397 def add_args_to_command(cmd_parts, extra_args=[]): 398 """ 399 Add custom command line args to a given command. 400 args: 401 cmd_parts: the raw command as seen by taskcluster 402 extra_args: array of args we want to add 403 """ 404 # Prevent modification of the caller's copy of cmd_parts 405 cmd_parts = copy.deepcopy(cmd_parts) 406 cmd_type = "default" 407 if len(cmd_parts) == 1 and isinstance(cmd_parts[0], dict): 408 # windows has single cmd part as dict: 'task-reference', with long string 409 cmd_parts = cmd_parts[0]["task-reference"].split(" ") 410 cmd_type = "dict" 411 elif len(cmd_parts) == 1 and isinstance(cmd_parts[0], str): 412 # windows has single cmd part as a long string 413 cmd_parts = cmd_parts[0].split(" ") 414 cmd_type = "unicode" 415 elif len(cmd_parts) == 1 and isinstance(cmd_parts[0], list): 416 # osx has an single value array with an array inside 417 cmd_parts = cmd_parts[0] 418 cmd_type = "subarray" 419 elif len(cmd_parts) == 2 and isinstance(cmd_parts[1], list): 420 # osx has an double value array with an array inside each element. 421 # The first element is a pre-requisite command while the second 422 # is the actual test command. 423 cmd_type = "subarray2" 424 425 if cmd_type == "subarray2": 426 cmd_parts[1].extend(extra_args) 427 else: 428 cmd_parts.extend(extra_args) 429 430 if cmd_type == "dict": 431 cmd_parts = [{"task-reference": " ".join(cmd_parts)}] 432 elif cmd_type == "unicode": 433 cmd_parts = [" ".join(cmd_parts)] 434 elif cmd_type == "subarray": 435 cmd_parts = [cmd_parts] 436 return cmd_parts