taskcluster.py (3664B)
1 # This Source Code Form is subject to the terms of the Mozilla Public 2 # License, v. 2.0. If a copy of the MPL was not distributed with this 3 # file, You can obtain one at http://mozilla.org/MPL/2.0/. 4 5 6 import logging 7 8 import taskcluster_urls as liburls 9 from taskcluster import Hooks 10 from taskgraph.util import taskcluster as tc_util 11 from taskgraph.util.taskcluster import ( 12 get_root_url, 13 get_task_definition, 14 get_taskcluster_client, 15 ) 16 17 logger = logging.getLogger(__name__) 18 19 20 def get_index_url(index_path, multiple=False, block_proxy=True): 21 index_tmpl = liburls.api( 22 get_root_url(block_proxy=block_proxy), "index", "v1", "task{}/{}" 23 ) 24 return index_tmpl.format("s" if multiple else "", index_path) 25 26 27 def insert_index(index_path, task_id, data=None): 28 # Find task expiry. 29 expires = get_task_definition(task_id)["expires"] 30 31 index = get_taskcluster_client("index") 32 response = index.insertTask( 33 index_path, 34 { 35 "taskId": task_id, 36 "rank": 0, 37 "data": data or {}, 38 "expires": expires, 39 }, 40 ) 41 return response 42 43 44 def status_task(task_id): 45 """Gets the status of a task given a task_id. 46 47 In testing mode, just logs that it would have retrieved status. 48 49 Args: 50 task_id (str): A task id. 51 52 Returns: 53 dict: A dictionary object as defined here: 54 https://docs.taskcluster.net/docs/reference/platform/queue/api#status 55 """ 56 if tc_util.testing: 57 logger.info(f"Would have gotten status for {task_id}.") 58 else: 59 queue = get_taskcluster_client("queue") 60 response = queue.status(task_id) 61 if response: 62 return response.get("status", {}) 63 64 65 def state_task(task_id): 66 """Gets the state of a task given a task_id. 67 68 In testing mode, just logs that it would have retrieved state. This is a subset of the 69 data returned by :func:`status_task`. 70 71 Args: 72 task_id (str): A task id. 73 74 Returns: 75 str: The state of the task, one of 76 ``pending, running, completed, failed, exception, unknown``. 77 """ 78 if tc_util.testing: 79 logger.info(f"Would have gotten state for {task_id}.") 80 else: 81 status = status_task(task_id).get("state") or "unknown" 82 return status 83 84 85 def trigger_hook(hook_group_id, hook_id, hook_payload): 86 hooks = Hooks({"rootUrl": get_root_url()}) 87 response = hooks.triggerHook(hook_group_id, hook_id, hook_payload) 88 89 logger.info( 90 "Task seen here: {}/tasks/{}".format( 91 get_root_url(), 92 response["status"]["taskId"], 93 ) 94 ) 95 96 97 def list_task_group_tasks(task_group_id): 98 """Generate the tasks in a task group""" 99 queue = get_taskcluster_client("queue") 100 101 tasks = [] 102 103 def pagination_handler(response): 104 tasks.extend(response["tasks"]) 105 106 queue.listTaskGroup(task_group_id, paginationHandler=pagination_handler) 107 108 return tasks 109 110 111 def list_task_group_incomplete_task_ids(task_group_id): 112 states = ("running", "pending", "unscheduled") 113 for task in [t["status"] for t in list_task_group_tasks(task_group_id)]: 114 if task["state"] in states: 115 yield task["taskId"] 116 117 118 def list_task_group_complete_tasks(task_group_id): 119 tasks = {} 120 for task in list_task_group_tasks(task_group_id): 121 if task.get("status", {}).get("state", "") == "completed": 122 tasks[task.get("task", {}).get("metadata", {}).get("name", "")] = task.get( 123 "status", {} 124 ).get("taskId", "") 125 return tasks 126 127 128 def find_task(index_path): 129 index = get_taskcluster_client("index") 130 return index.findTask(index_path)