__init__.py (19271B)
1 # This Source Code Form is subject to the terms of the Mozilla Public 2 # License, v. 2.0. If a copy of the MPL was not distributed with this 3 # file, You can obtain one at http://mozilla.org/MPL/2.0/. 4 """ 5 Convert a job description into a task description. 6 7 Jobs descriptions are similar to task descriptions, but they specify how to run 8 the job at a higher level, using a "run" field that can be interpreted by 9 run-using handlers in `taskcluster/gecko_taskgraph/transforms/job`. 10 """ 11 12 import logging 13 import os 14 15 import mozpack.path as mozpath 16 from packaging.version import Version 17 from taskgraph.transforms.base import TransformSequence 18 from taskgraph.transforms.run import rewrite_when_to_optimization 19 from taskgraph.util import json 20 from taskgraph.util.copy import deepcopy 21 from taskgraph.util.python_path import import_sibling_modules 22 from taskgraph.util.schema import Schema, validate_schema 23 from taskgraph.util.taskcluster import get_artifact_prefix 24 from voluptuous import Any, Coerce, Exclusive, Extra, Optional, Required 25 26 from gecko_taskgraph.transforms.cached_tasks import order_tasks 27 from gecko_taskgraph.transforms.task import task_description_schema 28 from gecko_taskgraph.util.workertypes import worker_type_implementation 29 30 logger = logging.getLogger(__name__) 31 32 # Schema for a build description 33 job_description_schema = Schema({ 34 # The name of the job and the job's label. At least one must be specified, 35 # and the label will be generated from the name if necessary, by prepending 36 # the kind. 37 Optional("name"): str, 38 Optional("label"): str, 39 # the following fields are passed directly through to the task description, 40 # possibly modified by the run implementation. See 41 # taskcluster/gecko_taskgraph/transforms/task.py for the schema details. 42 Required("description"): task_description_schema["description"], 43 Optional("attributes"): task_description_schema["attributes"], 44 Optional("task-from"): task_description_schema["task-from"], 45 Optional("dependencies"): task_description_schema["dependencies"], 46 Optional("if-dependencies"): task_description_schema["if-dependencies"], 47 Optional("soft-dependencies"): task_description_schema["soft-dependencies"], 48 Optional("if-dependencies"): task_description_schema["if-dependencies"], 49 Optional("requires"): task_description_schema["requires"], 50 Optional("expires-after"): task_description_schema["expires-after"], 51 Optional("expiration-policy"): task_description_schema["expiration-policy"], 52 Optional("routes"): task_description_schema["routes"], 53 Optional("scopes"): task_description_schema["scopes"], 54 Optional("tags"): task_description_schema["tags"], 55 Optional("extra"): task_description_schema["extra"], 56 Optional("treeherder"): task_description_schema["treeherder"], 57 Optional("index"): task_description_schema["index"], 58 Optional("run-on-repo-type"): task_description_schema["run-on-repo-type"], 59 Optional("run-on-projects"): task_description_schema["run-on-projects"], 60 Optional("run-on-git-branches"): task_description_schema["run-on-git-branches"], 61 Optional("shipping-phase"): task_description_schema["shipping-phase"], 62 Optional("shipping-product"): task_description_schema["shipping-product"], 63 Optional("always-target"): task_description_schema["always-target"], 64 Exclusive("optimization", "optimization"): task_description_schema["optimization"], 65 Optional("use-sccache"): task_description_schema["use-sccache"], 66 Optional("use-python"): Any("system", "default", Coerce(Version)), 67 # Fetch uv binary and add it to PATH 68 Optional("use-uv"): bool, 69 Optional("priority"): task_description_schema["priority"], 70 # The "when" section contains descriptions of the circumstances under which 71 # this task should be included in the task graph. This will be converted 72 # into an optimization, so it cannot be specified in a job description that 73 # also gives 'optimization'. 74 Exclusive("when", "optimization"): Any( 75 None, 76 { 77 # This task only needs to be run if a file matching one of the given 78 # patterns has changed in the push. The patterns use the mozpack 79 # match function (python/mozbuild/mozpack/path.py). 80 Optional("files-changed"): [str], 81 }, 82 ), 83 # A list of artifacts to install from 'fetch' tasks. 84 Optional("fetches"): { 85 str: [ 86 str, 87 { 88 Required("artifact"): str, 89 Optional("dest"): str, 90 Optional("extract"): bool, 91 Optional("verify-hash"): bool, 92 }, 93 ], 94 }, 95 # A description of how to run this job. 96 "run": { 97 # The key to a job implementation in a peer module to this one 98 "using": str, 99 # Base work directory used to set up the task. 100 Optional("workdir"): str, 101 # Any remaining content is verified against that job implementation's 102 # own schema. 103 Extra: object, 104 }, 105 Required("worker-type"): task_description_schema["worker-type"], 106 # This object will be passed through to the task description, with additions 107 # provided by the job's run-using function 108 Optional("worker"): dict, 109 }) 110 111 transforms = TransformSequence() 112 transforms.add_validate(job_description_schema) 113 transforms.add(rewrite_when_to_optimization) 114 115 116 @transforms.add 117 def set_implementation(config, jobs): 118 for job in jobs: 119 impl, os = worker_type_implementation( 120 config.graph_config, config.params, job["worker-type"] 121 ) 122 if os: 123 job.setdefault("tags", {})["os"] = os 124 if impl: 125 job.setdefault("tags", {})["worker-implementation"] = impl 126 worker = job.setdefault("worker", {}) 127 assert "implementation" not in worker 128 worker["implementation"] = impl 129 if os: 130 worker["os"] = os 131 yield job 132 133 134 @transforms.add 135 def set_label(config, jobs): 136 for job in jobs: 137 if "label" not in job: 138 if "name" not in job: 139 raise Exception("job has neither a name nor a label") 140 job["label"] = "{}-{}".format(config.kind, job["name"]) 141 if job.get("name"): 142 del job["name"] 143 yield job 144 145 146 @transforms.add 147 def make_task_description(config, jobs): 148 """Given a build description, create a task description""" 149 # import plugin modules first, before iterating over jobs 150 import_sibling_modules(exceptions=("common.py",)) 151 152 for job in jobs: 153 # only docker-worker uses a fixed absolute path to find directories 154 if job["worker"]["implementation"] == "docker-worker": 155 job["run"].setdefault("workdir", "/builds/worker") 156 157 taskdesc = deepcopy(job) 158 159 # fill in some empty defaults to make run implementations easier 160 taskdesc.setdefault("attributes", {}) 161 taskdesc.setdefault("dependencies", {}) 162 taskdesc.setdefault("if-dependencies", []) 163 taskdesc.setdefault("soft-dependencies", []) 164 taskdesc.setdefault("routes", []) 165 taskdesc.setdefault("scopes", []) 166 taskdesc.setdefault("extra", {}) 167 168 # give the function for job.run.using on this worker implementation a 169 # chance to set up the task description. 170 configure_taskdesc_for_run( 171 config, job, taskdesc, job["worker"]["implementation"] 172 ) 173 del taskdesc["run"] 174 175 # yield only the task description, discarding the job description 176 yield taskdesc 177 178 179 def get_attribute(dict, key, attributes, attribute_name): 180 """Get `attribute_name` from the given `attributes` dict, and if there 181 is a corresponding value, set `key` in `dict` to that value.""" 182 value = attributes.get(attribute_name) 183 if value is not None: 184 dict[key] = value 185 186 187 def get_platform(job): 188 if "win" in job["worker"]["os"]: 189 return "win64" 190 elif "linux" in job["worker"]["os"]: 191 platform = "linux64" 192 if "aarch64" in job["worker-type"] or "arm64" in job["worker-type"]: 193 return f"{platform}-aarch64" 194 return platform 195 elif "macosx" in job["worker"]["os"]: 196 return "macosx64" 197 else: 198 raise ValueError(f"unexpected worker.os value {job['worker']['os']}") 199 200 201 @transforms.add 202 def use_system_python(config, jobs): 203 for job in jobs: 204 taskcluster_python = job.pop("use-python", "system") 205 if taskcluster_python == "system": 206 yield job 207 else: 208 if taskcluster_python == "default": 209 python_version = "python" # the taskcluster default alias 210 else: 211 python_version = f"python-{taskcluster_python}" 212 213 fetches = job.setdefault("fetches", {}) 214 toolchain = fetches.setdefault("toolchain", []) 215 platform = get_platform(job) 216 217 toolchain.append(f"{platform}-{python_version}") 218 219 worker = job.setdefault("worker", {}) 220 env = worker.setdefault("env", {}) 221 222 moz_fetches_dir = env.get("MOZ_FETCHES_DIR", "fetches") 223 moz_python_home = mozpath.join(moz_fetches_dir, "python") 224 env["MOZ_PYTHON_HOME"] = moz_python_home 225 226 yield job 227 228 229 @transforms.add 230 def use_uv(config, jobs): 231 for job in jobs: 232 if not job.pop("use-uv", False): 233 yield job 234 else: 235 fetches = job.setdefault("fetches", {}) 236 toolchain = fetches.setdefault("toolchain", []) 237 platform = get_platform(job) 238 239 toolchain.append(f"{platform}-uv") 240 241 worker = job.setdefault("worker", {}) 242 env = worker.setdefault("env", {}) 243 moz_fetches_dir = env.get("MOZ_FETCHES_DIR", "fetches") 244 env["MOZ_UV_HOME"] = os.path.join(moz_fetches_dir, "uv") 245 246 yield job 247 248 249 @transforms.add 250 def add_perfherder_fetch_content_artifact(config, jobs): 251 for job in jobs: 252 if not job.get("fetches"): 253 yield job 254 continue 255 256 worker = job.setdefault("worker", {}) 257 env = worker.setdefault("env", {}) 258 artifacts = worker.setdefault("artifacts", []) 259 perfherder_fetch_content_json_path = ( 260 "/builds/worker/perf/perfherder-data-fetch-content.json" 261 if worker.get("implementation") == "docker-worker" 262 else "./perf/perfherder-data-fetch-content.json" 263 ) 264 artifacts.append({ 265 "type": "file", 266 "name": "public/fetch/perfherder-data-fetch-content.json", 267 "path": perfherder_fetch_content_json_path, 268 }) 269 env["PERFHERDER_FETCH_CONTENT_JSON_PATH"] = perfherder_fetch_content_json_path 270 271 yield job 272 273 274 @transforms.add 275 def use_fetches(config, jobs): 276 artifact_names = {} 277 extra_env = {} 278 should_extract = {} 279 aliases = {} 280 tasks = [] 281 282 if config.kind in ("toolchain", "fetch"): 283 jobs = list(jobs) 284 tasks.extend((config.kind, j) for j in jobs) 285 286 tasks.extend( 287 (task.kind, task.__dict__) 288 for task in config.kind_dependencies_tasks.values() 289 if task.kind in ("fetch", "toolchain") 290 ) 291 for kind, task in tasks: 292 get_attribute( 293 artifact_names, task["label"], task["attributes"], f"{kind}-artifact" 294 ) 295 get_attribute(extra_env, task["label"], task["attributes"], f"{kind}-env") 296 get_attribute( 297 should_extract, task["label"], task["attributes"], f"{kind}-extract" 298 ) 299 value = task["attributes"].get(f"{kind}-alias") 300 if not value: 301 value = [] 302 elif isinstance(value, str): 303 value = [value] 304 for alias in value: 305 fully_qualified = f"{kind}-{alias}" 306 label = task["label"] 307 if fully_qualified == label: 308 raise Exception(f"The alias {alias} of task {label} points to itself!") 309 aliases[fully_qualified] = label 310 311 artifact_prefixes = {} 312 for job in order_tasks(config, jobs): 313 artifact_prefixes[job["label"]] = get_artifact_prefix(job) 314 315 fetches = job.pop("fetches", None) 316 if not fetches: 317 yield job 318 continue 319 320 job_fetches = [] 321 name = job.get("name") or job.get("label").replace(f"{config.kind}-", "") 322 dependencies = job.setdefault("dependencies", {}) 323 worker = job.setdefault("worker", {}) 324 env = worker.setdefault("env", {}) 325 prefix = get_artifact_prefix(job) 326 has_sccache = False 327 for kind, artifacts in fetches.items(): 328 if kind in ("fetch", "toolchain"): 329 for fetch_name in artifacts: 330 label = f"{kind}-{fetch_name}" 331 label = aliases.get(label, label) 332 if label not in artifact_names: 333 raise Exception( 334 f"Missing fetch job for {config.kind}-{name}: {fetch_name}" 335 ) 336 if label in extra_env: 337 env.update(extra_env[label]) 338 339 path = artifact_names[label] 340 341 dependencies[label] = label 342 job_fetches.append({ 343 "artifact": path, 344 "task": f"<{label}>", 345 "extract": should_extract.get(label, True), 346 }) 347 348 if kind == "toolchain" and fetch_name.endswith("-sccache"): 349 has_sccache = True 350 else: 351 if kind not in dependencies: 352 raise Exception( 353 f"{name} can't fetch {kind} artifacts because " 354 f"it has no {kind} dependencies!" 355 ) 356 dep_label = dependencies[kind] 357 if dep_label in artifact_prefixes: 358 prefix = artifact_prefixes[dep_label] 359 else: 360 if dep_label not in config.kind_dependencies_tasks: 361 raise Exception( 362 f"{name} can't fetch {kind} artifacts because " 363 f"there are no tasks with label {dependencies[kind]} in kind dependencies!" 364 ) 365 366 prefix = get_artifact_prefix( 367 config.kind_dependencies_tasks[dep_label] 368 ) 369 370 for artifact in artifacts: 371 if isinstance(artifact, str): 372 path = artifact 373 dest = None 374 extract = True 375 verify_hash = False 376 else: 377 path = artifact["artifact"] 378 dest = artifact.get("dest") 379 extract = artifact.get("extract", True) 380 verify_hash = artifact.get("verify-hash", False) 381 382 fetch = { 383 "artifact": ( 384 f"{prefix}/{path}" if not path.startswith("/") else path[1:] 385 ), 386 "task": f"<{kind}>", 387 "extract": extract, 388 } 389 if dest is not None: 390 fetch["dest"] = dest 391 if verify_hash: 392 fetch["verify-hash"] = verify_hash 393 job_fetches.append(fetch) 394 395 if job.get("use-sccache") and not has_sccache: 396 raise Exception("Must provide an sccache toolchain if using sccache.") 397 398 job_artifact_prefixes = { 399 mozpath.dirname(fetch["artifact"]) 400 for fetch in job_fetches 401 if not fetch["artifact"].startswith("public/") 402 } 403 if job_artifact_prefixes: 404 # Use taskcluster-proxy and request appropriate scope. For example, add 405 # 'scopes: [queue:get-artifact:path/to/*]' for 'path/to/artifact.tar.xz'. 406 worker["taskcluster-proxy"] = True 407 for prefix in sorted(job_artifact_prefixes): 408 scope = f"queue:get-artifact:{prefix}/*" 409 if scope not in job.setdefault("scopes", []): 410 job["scopes"].append(scope) 411 412 artifacts = {} 413 for f in job_fetches: 414 _, __, artifact = f["artifact"].rpartition("/") 415 if "dest" in f: 416 artifact = f"{f['dest']}/{artifact}" 417 task = f["task"][1:-1] 418 if artifact in artifacts: 419 raise Exception( 420 f"Task {name} depends on {artifacts[artifact]} and {task} " 421 f"that both provide {artifact}" 422 ) 423 artifacts[artifact] = task 424 425 env["MOZ_FETCHES"] = { 426 "task-reference": json.dumps( 427 sorted(job_fetches, key=lambda x: sorted(x.items())), sort_keys=True 428 ) 429 } 430 431 # The path is normalized to an absolute path in run-task 432 env.setdefault("MOZ_FETCHES_DIR", "fetches") 433 434 yield job 435 436 437 # A registry of all functions decorated with run_job_using 438 registry = {} 439 440 441 def run_job_using(worker_implementation, run_using, schema=None, defaults={}): 442 """Register the decorated function as able to set up a task description for 443 jobs with the given worker implementation and `run.using` property. If 444 `schema` is given, the job's run field will be verified to match it. 445 446 The decorated function should have the signature `using_foo(config, job, taskdesc)` 447 and should modify the task description in-place. The skeleton of 448 the task description is already set up, but without a payload.""" 449 450 def wrap(func): 451 for_run_using = registry.setdefault(run_using, {}) 452 if worker_implementation in for_run_using: 453 raise Exception( 454 f"run_job_using({run_using!r}, {worker_implementation!r}) already exists: {for_run_using[worker_implementation]!r}" 455 ) 456 for_run_using[worker_implementation] = (func, schema, defaults) 457 return func 458 459 return wrap 460 461 462 @run_job_using( 463 "always-optimized", "always-optimized", Schema({"using": "always-optimized"}) 464 ) 465 def always_optimized(config, job, taskdesc): 466 pass 467 468 469 def configure_taskdesc_for_run(config, job, taskdesc, worker_implementation): 470 """ 471 Run the appropriate function for this job against the given task 472 description. 473 474 This will raise an appropriate error if no function exists, or if the job's 475 run is not valid according to the schema. 476 """ 477 run_using = job["run"]["using"] 478 if run_using not in registry: 479 raise Exception(f"no functions for run.using {run_using!r}") 480 481 if worker_implementation not in registry[run_using]: 482 raise Exception( 483 f"no functions for run.using {run_using!r} on {worker_implementation!r}" 484 ) 485 486 func, schema, defaults = registry[run_using][worker_implementation] 487 for k, v in defaults.items(): 488 job["run"].setdefault(k, v) 489 490 if schema: 491 validate_schema( 492 schema, 493 job["run"], 494 "In job.run using {!r}/{!r} for job {!r}:".format( 495 job["run"]["using"], worker_implementation, job["label"] 496 ), 497 ) 498 func(config, job, taskdesc)