morph.py (9422B)
1 # This Source Code Form is subject to the terms of the Mozilla Public 2 # License, v. 2.0. If a copy of the MPL was not distributed with this 3 # file, You can obtain one at http://mozilla.org/MPL/2.0/. 4 5 """ 6 Graph morphs are modifications to task-graphs that take place *after* the 7 optimization phase. 8 9 These graph morphs are largely invisible to developers running `./mach` 10 locally, so they should be limited to changes that do not modify the meaning of 11 the graph. 12 """ 13 14 # Note that the translation of `{'task-reference': '..'}` and 15 # `artifact-reference` are handled in the optimization phase (since 16 # optimization involves dealing with taskIds directly). Similarly, 17 # `{'relative-datestamp': '..'}` is handled at the last possible moment during 18 # task creation. 19 20 import copy 21 import logging 22 import os 23 import re 24 25 from slugid import nice as slugid 26 from taskgraph.graph import Graph 27 from taskgraph.morph import register_morph 28 from taskgraph.task import Task 29 from taskgraph.taskgraph import TaskGraph 30 31 from .util.workertypes import get_worker_type 32 33 here = os.path.abspath(os.path.dirname(__file__)) 34 logger = logging.getLogger(__name__) 35 MAX_ROUTES = 64 36 37 38 def amend_taskgraph(taskgraph, label_to_taskid, to_add): 39 """Add the given tasks to the taskgraph, returning a new taskgraph""" 40 new_tasks = taskgraph.tasks.copy() 41 new_edges = set(taskgraph.graph.edges) 42 for task in to_add: 43 new_tasks[task.task_id] = task 44 assert task.label not in label_to_taskid 45 label_to_taskid[task.label] = task.task_id 46 for depname, dep in task.dependencies.items(): 47 new_edges.add((task.task_id, dep, depname)) 48 49 taskgraph = TaskGraph(new_tasks, Graph(set(new_tasks), new_edges)) 50 return taskgraph, label_to_taskid 51 52 53 def derive_misc_task( 54 target_task, 55 purpose, 56 image, 57 taskgraph, 58 label_to_taskid, 59 parameters, 60 graph_config, 61 dependencies, 62 ): 63 """Create the shell of a task that depends on `dependencies` and on the given docker 64 image.""" 65 label = f"{purpose}-{target_task.label}" 66 67 # this is why all docker image tasks are included in the target task graph: we 68 # need to find them in label_to_taskid, even if nothing else required them 69 image_taskid = label_to_taskid["docker-image-" + image] 70 71 provisioner_id, worker_type = get_worker_type( 72 graph_config, 73 parameters, 74 "misc", 75 ) 76 77 deps = copy.copy(dependencies) 78 deps["docker-image"] = image_taskid 79 80 task_def = { 81 "provisionerId": provisioner_id, 82 "workerType": worker_type, 83 "dependencies": [d for d in deps.values()], 84 "created": {"relative-datestamp": "0 seconds"}, 85 "deadline": target_task.task["deadline"], 86 # no point existing past the parent task's deadline 87 "expires": target_task.task["deadline"], 88 "metadata": { 89 "name": label, 90 "description": f"{purpose} for {target_task.description}", 91 "owner": target_task.task["metadata"]["owner"], 92 "source": target_task.task["metadata"]["source"], 93 }, 94 "scopes": [], 95 "payload": { 96 "image": { 97 "path": "public/image.tar.zst", 98 "taskId": image_taskid, 99 "type": "task-image", 100 }, 101 "features": {"taskclusterProxy": True}, 102 "maxRunTime": 600, 103 }, 104 } 105 106 if image_taskid not in taskgraph.tasks: 107 # The task above depends on the replaced docker-image not one in 108 # this current graph. 109 del deps["docker-image"] 110 111 task = Task( 112 kind="misc", 113 label=label, 114 attributes={}, 115 task=task_def, 116 dependencies=deps, 117 ) 118 task.task_id = slugid() 119 return task 120 121 122 # these regular expressions capture route prefixes for which we have a star 123 # scope, allowing them to be summarized. Each should correspond to a star scope 124 # in each Gecko `assume:repo:hg.mozilla.org/...` role. 125 SCOPE_SUMMARY_REGEXPS = [ 126 re.compile(r"(index:insert-task:docker\.images\.v1\.[^.]*\.).*"), 127 re.compile(r"(index:insert-task:gecko\.v2\.trunk\.revision\.).*"), 128 re.compile(r"(index:insert-task:gecko\.v2\.[^.]*\.).*"), 129 re.compile(r"(index:insert-task:comm\.v2\.[^.]*\.).*"), 130 ] 131 132 133 def make_index_task( 134 parent_task, 135 taskgraph, 136 label_to_taskid, 137 parameters, 138 graph_config, 139 index_paths, 140 index_rank, 141 purpose, 142 dependencies, 143 ): 144 task = derive_misc_task( 145 parent_task, 146 purpose, 147 "index-task", 148 taskgraph, 149 label_to_taskid, 150 parameters, 151 graph_config, 152 dependencies, 153 ) 154 155 # we need to "summarize" the scopes, otherwise a particularly 156 # namespace-heavy index task might have more scopes than can fit in a 157 # temporary credential. 158 scopes = set() 159 for path in index_paths: 160 scope = f"index:insert-task:{path}" 161 for summ_re in SCOPE_SUMMARY_REGEXPS: 162 match = summ_re.match(scope) 163 if match: 164 scope = match.group(1) + "*" 165 break 166 scopes.add(scope) 167 task.task["scopes"] = sorted(scopes) 168 169 task.task["payload"]["command"] = ["insert-indexes.js"] + index_paths 170 task.task["payload"]["env"] = { 171 "TARGET_TASKID": parent_task.task_id, 172 "INDEX_RANK": f"{index_rank}", 173 } 174 return task 175 176 177 @register_morph 178 def add_index_tasks(taskgraph, label_to_taskid, parameters, graph_config): 179 """ 180 The TaskCluster queue only allows 64 routes on a task. In the event a task 181 exceeds this limit, this graph morph adds "index tasks" that depend on it 182 and do the index insertions directly, avoiding the limit on task.routes. 183 """ 184 logger.debug("Morphing: adding index tasks") 185 186 # Add indexes for tasks that exceed MAX_ROUTES. 187 added = [] 188 for label, task in taskgraph.tasks.items(): 189 if len(task.task.get("routes", [])) <= MAX_ROUTES: 190 continue 191 index_paths = [ 192 r.split(".", 1)[1] for r in task.task["routes"] if r.startswith("index.") 193 ] 194 task.task["routes"] = [ 195 r for r in task.task["routes"] if not r.startswith("index.") 196 ] 197 added.append( 198 make_index_task( 199 task, 200 taskgraph, 201 label_to_taskid, 202 parameters, 203 graph_config, 204 index_paths=index_paths, 205 index_rank=task.task.get("extra", {}).get("index", {}).get("rank", 0), 206 purpose="index-task", 207 dependencies={"parent": task.task_id}, 208 ) 209 ) 210 211 if added: 212 taskgraph, label_to_taskid = amend_taskgraph(taskgraph, label_to_taskid, added) 213 logger.info(f"Added {len(added)} index tasks") 214 215 return taskgraph, label_to_taskid 216 217 218 @register_morph 219 def add_eager_cache_index_tasks(taskgraph, label_to_taskid, parameters, graph_config): 220 """ 221 Some tasks (e.g. cached tasks) we want to exist in the index before they even 222 run/complete. Our current use is to allow us to depend on an unfinished cached 223 task in future pushes. This graph morph adds "eager-index tasks" that depend on 224 the decision task and do the index insertions directly, which does not need to 225 wait on the pointed at task to complete. 226 """ 227 logger.debug("Morphing: Adding eager cached index's") 228 229 added = [] 230 for label, task in taskgraph.tasks.items(): 231 if "eager_indexes" not in task.attributes: 232 continue 233 eager_indexes = task.attributes["eager_indexes"] 234 added.append( 235 make_index_task( 236 task, 237 taskgraph, 238 label_to_taskid, 239 parameters, 240 graph_config, 241 index_paths=eager_indexes, 242 index_rank=0, # Be sure complete tasks get priority 243 purpose="eager-index", 244 dependencies={}, 245 ) 246 ) 247 248 if added: 249 taskgraph, label_to_taskid = amend_taskgraph(taskgraph, label_to_taskid, added) 250 logger.info(f"Added {len(added)} eager index tasks") 251 return taskgraph, label_to_taskid 252 253 254 @register_morph 255 def add_try_task_duplicates(taskgraph, label_to_taskid, parameters, graph_config): 256 return _add_try_task_duplicates( 257 taskgraph, label_to_taskid, parameters, graph_config 258 ) 259 260 261 # this shim function exists so we can call it from the unittests. 262 # this works around an issue with 263 # third_party/python/taskcluster_taskgraph/taskgraph/morph.py#40 264 def _add_try_task_duplicates(taskgraph, label_to_taskid, parameters, graph_config): 265 try_config = parameters.get("try_task_config", {}) 266 tasks = try_config.get("tasks", []) 267 glob_tasks = {x.strip("-*") for x in tasks if x.endswith("-*")} 268 tasks = set(tasks) - glob_tasks 269 270 rebuild = try_config.get("rebuild") 271 if rebuild: 272 for task in taskgraph.tasks.values(): 273 chunk_index = -1 274 if task.label.endswith("-cf"): 275 chunk_index = -2 276 label_parts = task.label.split("-") 277 label_no_chunk = "-".join(label_parts[:chunk_index]) 278 279 if label_parts[chunk_index].isnumeric() and label_no_chunk in glob_tasks: 280 task.attributes["task_duplicates"] = rebuild 281 elif task.label in tasks: 282 task.attributes["task_duplicates"] = rebuild 283 return taskgraph, label_to_taskid