tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

util.py (16658B)


      1 # This Source Code Form is subject to the terms of the Mozilla Public
      2 # License, v. 2.0. If a copy of the MPL was not distributed with this
      3 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
      4 
      5 
      6 import copy
      7 import logging
      8 import os
      9 import re
     10 from concurrent import futures
     11 from functools import reduce
     12 
     13 import jsone
     14 import requests
     15 from slugid import nice as slugid
     16 from taskcluster.exceptions import TaskclusterRestFailure
     17 from taskgraph import create
     18 from taskgraph.optimize.base import optimize_task_graph
     19 from taskgraph.taskgraph import TaskGraph
     20 from taskgraph.util.taskcluster import (
     21    CONCURRENCY,
     22    find_task_id,
     23    get_artifact,
     24    get_session,
     25    get_task_definition,
     26    list_tasks,
     27    parse_time,
     28 )
     29 
     30 from gecko_taskgraph.decision import read_artifact, rename_artifact, write_artifact
     31 from gecko_taskgraph.util.taskcluster import trigger_hook
     32 from gecko_taskgraph.util.taskgraph import find_decision_task
     33 
     34 logger = logging.getLogger(__name__)
     35 
     36 INDEX_TMPL = "gecko.v2.{}.pushlog-id.{}.decision"
     37 PUSHLOG_TMPL = "{}/json-pushes?version=2&startID={}&endID={}"
     38 
     39 
     40 def _tags_within_context(tags, context=[]):
     41    """A context of [] means that it *only* applies to a task group"""
     42    return any(
     43        all(tag in tags and tags[tag] == tag_set[tag] for tag in tag_set.keys())
     44        for tag_set in context
     45    )
     46 
     47 
     48 def _extract_applicable_action(actions_json, action_name, task_group_id, task_id):
     49    """Extract action that applies to the given task or task group.
     50 
     51    A task (as defined by its tags) is said to match a tag-set if its
     52    tags are a super-set of the tag-set. A tag-set is a set of key-value pairs.
     53 
     54    An action (as defined by its context) is said to be relevant for
     55    a given task, if the task's tags match one of the tag-sets given
     56    in the context property of the action.
     57 
     58    The order of the actions is significant. When multiple actions apply to a
     59    task the first one takes precedence.
     60 
     61    For more details visit:
     62    https://docs.taskcluster.net/docs/manual/design/conventions/actions/spec
     63    """
     64    if task_id:
     65        tags = get_task_definition(task_id).get("tags")
     66 
     67    for _action in actions_json["actions"]:
     68        if action_name != _action["name"]:
     69            continue
     70 
     71        context = _action.get("context", [])
     72        # Ensure the task is within the context of the action
     73        if task_id and tags and _tags_within_context(tags, context):
     74            return _action
     75        if context == []:
     76            return _action
     77 
     78    available_actions = ", ".join(sorted({a["name"] for a in actions_json["actions"]}))
     79    raise LookupError(
     80        f"{action_name} action is not available for this task. Available: {available_actions}"
     81    )
     82 
     83 
     84 def trigger_action(action_name, decision_task_id, task_id=None, input={}):
     85    if not decision_task_id:
     86        raise ValueError("No decision task. We can't find the actions artifact.")
     87    actions_json = get_artifact(decision_task_id, "public/actions.json")
     88    if actions_json["version"] != 1:
     89        raise RuntimeError("Wrong version of actions.json, unable to continue")
     90 
     91    # These values substitute $eval in the template
     92    context = {
     93        "input": input,
     94        "taskId": task_id,
     95        "taskGroupId": decision_task_id,
     96    }
     97    # https://docs.taskcluster.net/docs/manual/design/conventions/actions/spec#variables
     98    context.update(actions_json["variables"])
     99    action = _extract_applicable_action(
    100        actions_json, action_name, decision_task_id, task_id
    101    )
    102    kind = action["kind"]
    103    if create.testing:
    104        logger.info(f"Skipped triggering action for {kind} as testing is enabled")
    105    elif kind == "hook":
    106        hook_payload = jsone.render(action["hookPayload"], context)
    107        trigger_hook(action["hookGroupId"], action["hookId"], hook_payload)
    108    else:
    109        raise NotImplementedError(f"Unable to submit actions with {kind} kind.")
    110 
    111 
    112 def get_pushes_from_params_input(parameters, input):
    113    inclusive_tweak = 1 if input.get("inclusive") else 0
    114    return get_pushes(
    115        project=parameters["head_repository"],
    116        end_id=int(parameters["pushlog_id"]) - (1 - inclusive_tweak),
    117        depth=input.get("depth", 9) + inclusive_tweak,
    118    )
    119 
    120 
    121 def get_pushes(project, end_id, depth, full_response=False):
    122    pushes = []
    123    while True:
    124        start_id = max(end_id - depth, 0)
    125        pushlog_url = PUSHLOG_TMPL.format(project, start_id, end_id)
    126        logger.debug(pushlog_url)
    127        r = requests.get(pushlog_url)
    128        r.raise_for_status()
    129        pushes = pushes + list(r.json()["pushes"].keys())
    130        if len(pushes) >= depth:
    131            break
    132 
    133        end_id = start_id - 1
    134        start_id -= depth
    135        if start_id < 0:
    136            break
    137 
    138    pushes = sorted(pushes)[-depth:]
    139    push_dict = {push: r.json()["pushes"][push] for push in pushes}
    140    return push_dict if full_response else pushes
    141 
    142 
    143 def get_decision_task_id(project, push_id):
    144    return find_task_id(INDEX_TMPL.format(project, push_id))
    145 
    146 
    147 def get_parameters(decision_task_id):
    148    return get_artifact(decision_task_id, "public/parameters.yml")
    149 
    150 
    151 def get_tasks_with_downstream(labels, full_task_graph, label_to_taskid):
    152    # Used to gather tasks when downstream tasks need to run as well
    153    return full_task_graph.graph.transitive_closure(
    154        set(labels), reverse=True
    155    ).nodes & set(label_to_taskid.keys())
    156 
    157 
    158 def fetch_graph_and_labels(parameters, graph_config):
    159    decision_task_id = find_decision_task(parameters, graph_config)
    160 
    161    # First grab the graph and labels generated during the initial decision task
    162    full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
    163    logger.info("Load taskgraph from JSON.")
    164    _, full_task_graph = TaskGraph.from_json(full_task_graph)
    165    label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
    166    label_to_taskids = {label: [task_id] for label, task_id in label_to_taskid.items()}
    167 
    168    logger.info("Fetching additional tasks from action and cron tasks.")
    169    # fetch everything in parallel; this avoids serializing any delay in downloading
    170    # each artifact (such as waiting for the artifact to be mirrored locally)
    171    with futures.ThreadPoolExecutor(CONCURRENCY) as e:
    172        fetches = []
    173 
    174        # fetch any modifications made by action tasks and add the new tasks
    175        def fetch_action(task_id):
    176            logger.info(f"fetching label-to-taskid.json for action task {task_id}")
    177            try:
    178                run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json")
    179                label_to_taskid.update(run_label_to_id)
    180                for label, existing_task_id in run_label_to_id.items():
    181                    label_to_taskids.setdefault(label, []).append(existing_task_id)
    182            except TaskclusterRestFailure as e:
    183                if e.status_code != 404:
    184                    raise
    185                logger.debug(f"No label-to-taskid.json found for {task_id}: {e}")
    186 
    187        head_rev_param = "{}head_rev".format(graph_config["project-repo-param-prefix"])
    188 
    189        namespace = "{}.v2.{}.revision.{}.taskgraph.actions".format(
    190            graph_config["trust-domain"],
    191            parameters["project"],
    192            parameters[head_rev_param],
    193        )
    194        for task_id in list_tasks(namespace):
    195            fetches.append(e.submit(fetch_action, task_id))
    196 
    197        # Similarly for cron tasks..
    198        def fetch_cron(task_id):
    199            logger.info(f"fetching label-to-taskid.json for cron task {task_id}")
    200            try:
    201                run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json")
    202                label_to_taskid.update(run_label_to_id)
    203                for label, existing_task_id in run_label_to_id.items():
    204                    label_to_taskids.setdefault(label, []).append(existing_task_id)
    205            except TaskclusterRestFailure as e:
    206                if e.status_code != 404:
    207                    raise
    208                logger.debug(f"No label-to-taskid.json found for {task_id}: {e}")
    209 
    210        namespace = "{}.v2.{}.revision.{}.cron".format(
    211            graph_config["trust-domain"],
    212            parameters["project"],
    213            parameters[head_rev_param],
    214        )
    215        for task_id in list_tasks(namespace):
    216            fetches.append(e.submit(fetch_cron, task_id))
    217 
    218        # now wait for each fetch to complete, raising an exception if there
    219        # were any issues
    220        for f in futures.as_completed(fetches):
    221            f.result()
    222 
    223    return (decision_task_id, full_task_graph, label_to_taskid, label_to_taskids)
    224 
    225 
    226 def create_task_from_def(task_def, level, action_tag=None):
    227    """Create a new task from a definition rather than from a label
    228    that is already in the full-task-graph. The task definition will
    229    have {relative-datestamp': '..'} rendered just like in a decision task.
    230    Use this for entirely new tasks or ones that change internals of the task.
    231    It is useful if you want to "edit" the full_task_graph and then hand
    232    it to this function. No dependencies will be scheduled. You must handle
    233    this yourself. Seeing how create_tasks handles it might prove helpful."""
    234    task_def["schedulerId"] = f"gecko-level-{level}"
    235    label = task_def["metadata"]["name"]
    236    task_id = slugid()
    237    session = get_session()
    238    if action_tag:
    239        task_def.setdefault("tags", {}).setdefault("action", action_tag)
    240    create.create_task(session, task_id, label, task_def)
    241 
    242 
    243 def update_parent(task, graph):
    244    task.task.setdefault("extra", {})["parent"] = os.environ.get("TASK_ID", "")
    245    return task
    246 
    247 
    248 def update_action_tag(task, graph, action_tag):
    249    task.task.setdefault("tags", {}).setdefault("action", action_tag)
    250    return task
    251 
    252 
    253 def update_dependencies(task, graph):
    254    if os.environ.get("TASK_ID"):
    255        task.task.setdefault("dependencies", []).append(os.environ["TASK_ID"])
    256    return task
    257 
    258 
    259 def create_tasks(
    260    graph_config,
    261    to_run,
    262    full_task_graph,
    263    label_to_taskid,
    264    params,
    265    decision_task_id,
    266    suffix="",
    267    modifier=lambda t: t,
    268    action_tag=None,
    269 ):
    270    """Create new tasks.  The task definition will have {relative-datestamp':
    271    '..'} rendered just like in a decision task.  Action callbacks should use
    272    this function to create new tasks,
    273    allowing easy debugging with `mach taskgraph action-callback --test`.
    274    This builds up all required tasks to run in order to run the tasks requested.
    275 
    276    Optionally this function takes a `modifier` function that is passed in each
    277    task before it is put into a new graph. It should return a valid task. Note
    278    that this is passed _all_ tasks in the graph, not just the set in to_run. You
    279    may want to skip modifying tasks not in your to_run list.
    280 
    281    If `suffix` is given, then it is used to give unique names to the resulting
    282    artifacts.  If you call this function multiple times in the same action,
    283    pass a different suffix each time to avoid overwriting artifacts.
    284 
    285    If you wish to create the tasks in a new group, leave out decision_task_id.
    286 
    287    Returns an updated label_to_taskid containing the new tasks"""
    288    # triggers registration of strategies
    289    import gecko_taskgraph.optimize  # noqa
    290 
    291    if suffix != "":
    292        suffix = f"-{suffix}"
    293    to_run = set(to_run)
    294 
    295    #  Copy to avoid side-effects later
    296    full_task_graph = copy.deepcopy(full_task_graph)
    297    label_to_taskid = label_to_taskid.copy()
    298 
    299    target_graph = full_task_graph.graph.transitive_closure(to_run)
    300    target_task_graph = TaskGraph(
    301        {l: modifier(full_task_graph[l]) for l in target_graph.nodes}, target_graph
    302    )
    303    target_task_graph.for_each_task(update_parent)
    304    if action_tag:
    305        target_task_graph.for_each_task(update_action_tag, action_tag)
    306    if decision_task_id and decision_task_id != os.environ.get("TASK_ID"):
    307        target_task_graph.for_each_task(update_dependencies)
    308    optimized_task_graph, label_to_taskid = optimize_task_graph(
    309        target_task_graph,
    310        to_run,
    311        params,
    312        to_run,
    313        decision_task_id,
    314        existing_tasks=label_to_taskid,
    315    )
    316    write_artifact(f"task-graph{suffix}.json", optimized_task_graph.to_json())
    317    write_artifact(f"label-to-taskid{suffix}.json", label_to_taskid)
    318    write_artifact(f"to-run{suffix}.json", list(to_run))
    319    create.create_tasks(
    320        graph_config,
    321        optimized_task_graph,
    322        label_to_taskid,
    323        params,
    324        decision_task_id,
    325    )
    326    return label_to_taskid
    327 
    328 
    329 def _update_reducer(accumulator, new_value):
    330    "similar to set or dict `update` method, but returning the modified object"
    331    accumulator.update(new_value)
    332    return accumulator
    333 
    334 
    335 def combine_task_graph_files(suffixes):
    336    """Combine task-graph-{suffix}.json files into a single task-graph.json file.
    337 
    338    Since Chain of Trust verification requires a task-graph.json file that
    339    contains all children tasks, we can combine the various task-graph-0.json
    340    type files into a master task-graph.json file at the end.
    341 
    342    Actions also look for various artifacts, so we combine those in a similar
    343    fashion.
    344 
    345    In the case where there is only one suffix, we simply rename it to avoid the
    346    additional cost of uploading two copies of the same data.
    347    """
    348 
    349    if len(suffixes) == 1:
    350        for filename in ["task-graph", "label-to-taskid", "to-run"]:
    351            rename_artifact(f"{filename}-{suffixes[0]}.json", f"{filename}.json")
    352        return
    353 
    354    def combine(file_contents, base):
    355        return reduce(_update_reducer, file_contents, base)
    356 
    357    files = [read_artifact(f"task-graph-{suffix}.json") for suffix in suffixes]
    358    write_artifact("task-graph.json", combine(files, dict()))
    359 
    360    files = [read_artifact(f"label-to-taskid-{suffix}.json") for suffix in suffixes]
    361    write_artifact("label-to-taskid.json", combine(files, dict()))
    362 
    363    files = [read_artifact(f"to-run-{suffix}.json") for suffix in suffixes]
    364    write_artifact("to-run.json", list(combine(files, set())))
    365 
    366 
    367 def relativize_datestamps(task_def):
    368    """
    369    Given a task definition as received from the queue, convert all datestamps
    370    to {relative_datestamp: ..} format, with the task creation time as "now".
    371    The result is useful for handing to ``create_task``.
    372    """
    373    base = parse_time(task_def["created"])
    374    # borrowed from https://github.com/epoberezkin/ajv/blob/master/lib/compile/formats.js
    375    ts_pattern = re.compile(
    376        r"^\d\d\d\d-[0-1]\d-[0-3]\d[t\s]"
    377        r"(?:[0-2]\d:[0-5]\d:[0-5]\d|23:59:60)(?:\.\d+)?"
    378        r"(?:z|[+-]\d\d:\d\d)$",
    379        re.I,
    380    )
    381 
    382    def recurse(value):
    383        if isinstance(value, str):
    384            if ts_pattern.match(value):
    385                value = parse_time(value)
    386                diff = value - base
    387                return {"relative-datestamp": f"{int(diff.total_seconds())} seconds"}
    388        if isinstance(value, list):
    389            return [recurse(e) for e in value]
    390        if isinstance(value, dict):
    391            return {k: recurse(v) for k, v in value.items()}
    392        return value
    393 
    394    return recurse(task_def)
    395 
    396 
    397 def add_args_to_command(cmd_parts, extra_args=[]):
    398    """
    399    Add custom command line args to a given command.
    400    args:
    401      cmd_parts: the raw command as seen by taskcluster
    402      extra_args: array of args we want to add
    403    """
    404    # Prevent modification of the caller's copy of cmd_parts
    405    cmd_parts = copy.deepcopy(cmd_parts)
    406    cmd_type = "default"
    407    if len(cmd_parts) == 1 and isinstance(cmd_parts[0], dict):
    408        # windows has single cmd part as dict: 'task-reference', with long string
    409        cmd_parts = cmd_parts[0]["task-reference"].split(" ")
    410        cmd_type = "dict"
    411    elif len(cmd_parts) == 1 and isinstance(cmd_parts[0], str):
    412        # windows has single cmd part as a long string
    413        cmd_parts = cmd_parts[0].split(" ")
    414        cmd_type = "unicode"
    415    elif len(cmd_parts) == 1 and isinstance(cmd_parts[0], list):
    416        # osx has an single value array with an array inside
    417        cmd_parts = cmd_parts[0]
    418        cmd_type = "subarray"
    419    elif len(cmd_parts) == 2 and isinstance(cmd_parts[1], list):
    420        # osx has an double value array with an array inside each element.
    421        # The first element is a pre-requisite command while the second
    422        # is the actual test command.
    423        cmd_type = "subarray2"
    424 
    425    if cmd_type == "subarray2":
    426        cmd_parts[1].extend(extra_args)
    427    else:
    428        cmd_parts.extend(extra_args)
    429 
    430    if cmd_type == "dict":
    431        cmd_parts = [{"task-reference": " ".join(cmd_parts)}]
    432    elif cmd_type == "unicode":
    433        cmd_parts = [" ".join(cmd_parts)]
    434    elif cmd_type == "subarray":
    435        cmd_parts = [cmd_parts]
    436    return cmd_parts