taskgraph.py (5635B)
1 # mypy: allow-untyped-defs 2 3 import json 4 import os 5 import re 6 from collections import OrderedDict 7 from copy import deepcopy 8 9 import yaml 10 11 here = os.path.dirname(__file__) 12 13 14 def first(iterable): 15 # First item from a list or iterator 16 if not hasattr(iterable, "next"): 17 if hasattr(iterable, "__iter__"): 18 iterable = iter(iterable) 19 else: 20 raise ValueError("Object isn't iterable") 21 return next(iterable) 22 23 24 def load_task_file(path): 25 with open(path) as f: 26 return yaml.safe_load(f) 27 28 29 def update_recursive(data, update_data): 30 for key, value in update_data.items(): 31 if key not in data: 32 data[key] = value 33 else: 34 initial_value = data[key] 35 if isinstance(value, dict): 36 if not isinstance(initial_value, dict): 37 raise ValueError("Variable %s has inconsistent types " 38 "(expected object)" % key) 39 update_recursive(initial_value, value) 40 elif isinstance(value, list): 41 if not isinstance(initial_value, list): 42 raise ValueError("Variable %s has inconsistent types " 43 "(expected list)" % key) 44 initial_value.extend(value) 45 else: 46 data[key] = value 47 48 49 def resolve_use(task_data, templates): 50 rv = {} 51 if "use" in task_data: 52 for template_name in task_data["use"]: 53 update_recursive(rv, deepcopy(templates[template_name])) 54 update_recursive(rv, task_data) 55 rv.pop("use", None) 56 return rv 57 58 59 def resolve_name(task_data, default_name): 60 if "name" not in task_data: 61 task_data["name"] = default_name 62 return task_data 63 64 65 def resolve_chunks(task_data): 66 if "chunks" not in task_data: 67 return [task_data] 68 rv = [] 69 total_chunks = task_data["chunks"] 70 if "chunks-override" in task_data: 71 override = task_data["chunks-override"].get(task_data["vars"]["test-type"]) 72 if override is not None: 73 total_chunks = override 74 for i in range(1, total_chunks + 1): 75 chunk_data = deepcopy(task_data) 76 chunk_data["chunks"] = {"id": i, 77 "total": total_chunks} 78 rv.append(chunk_data) 79 return rv 80 81 82 def replace_vars(input_string, variables): 83 # TODO: support replacing as a non-string type? 84 variable_re = re.compile(r"(?<!\\)\${([^}]+)}") 85 86 def replacer(m): 87 var = m.group(1).split(".") 88 repl = variables 89 for part in var: 90 try: 91 repl = repl[part] 92 except Exception: 93 # Don't substitute 94 return m.group(0) 95 return str(repl) 96 97 return variable_re.sub(replacer, input_string) 98 99 100 def sub_variables(data, variables): 101 if isinstance(data, str): 102 return replace_vars(data, variables) 103 if isinstance(data, list): 104 return [sub_variables(item, variables) for item in data] 105 if isinstance(data, dict): 106 return {key: sub_variables(value, variables) 107 for key, value in data.items()} 108 return data 109 110 111 def substitute_variables(task): 112 variables = {"vars": task.get("vars", {}), 113 "chunks": task.get("chunks", {})} 114 115 return sub_variables(task, variables) 116 117 118 def expand_maps(task): 119 name = first(task.keys()) 120 if name != "$map": 121 return [task] 122 123 map_data = task["$map"] 124 if set(map_data.keys()) != {"for", "do"}: 125 raise ValueError("$map objects must have exactly two properties named 'for' " 126 "and 'do' (got %s)" % ("no properties" if not map_data.keys() 127 else ", ". join(map_data.keys()))) 128 rv = [] 129 for for_data in map_data["for"]: 130 do_items = map_data["do"] 131 if not isinstance(do_items, list): 132 do_items = expand_maps(do_items) 133 for do_data in do_items: 134 task_data = deepcopy(for_data) 135 if len(do_data.keys()) != 1: 136 raise ValueError("Each item in the 'do' list must be an object " 137 "with a single property") 138 name = first(do_data.keys()) 139 update_recursive(task_data, deepcopy(do_data[name])) 140 rv.append({name: task_data}) 141 return rv 142 143 144 def load_tasks(tasks_data): 145 map_resolved_tasks = OrderedDict() 146 tasks = [] 147 148 for task in tasks_data["tasks"]: 149 if len(task.keys()) != 1: 150 raise ValueError("Each task must be an object with a single property") 151 for task in expand_maps(task): 152 if len(task.keys()) != 1: 153 raise ValueError("Each task must be an object with a single property") 154 name = first(task.keys()) 155 data = task[name] 156 new_name = sub_variables(name, {"vars": data.get("vars", {})}) 157 if new_name in map_resolved_tasks: 158 raise ValueError("Got duplicate task name %s" % new_name) 159 map_resolved_tasks[new_name] = substitute_variables(data) 160 161 for task_default_name, data in map_resolved_tasks.items(): 162 task = resolve_use(data, tasks_data["components"]) 163 task = resolve_name(task, task_default_name) 164 tasks.extend(resolve_chunks(task)) 165 166 tasks = [substitute_variables(task_data) for task_data in tasks] 167 return OrderedDict([(t["name"], t) for t in tasks]) 168 169 170 def load_tasks_from_path(path): 171 return load_tasks(load_task_file(path)) 172 173 174 def run(venv, **kwargs): 175 print(json.dumps(load_tasks_from_path(os.path.join(here, "tasks", "test.yml")), indent=2))