tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

tasks_unix.py (8920B)


      1 # A unix-oriented process dispatcher.  Uses a single thread with select and
      2 # waitpid to dispatch tasks.  This avoids several deadlocks that are possible
      3 # with fork/exec + threads + Python.
      4 
      5 import errno
      6 import os
      7 import select
      8 import signal
      9 import sys
     10 from datetime import datetime, timedelta
     11 
     12 from .adaptor import xdr_annotate
     13 from .progressbar import ProgressBar
     14 from .results import NullTestOutput, TestOutput, escape_cmdline
     15 
     16 
     17 class Task:
     18    def __init__(self, test, prefix, tempdir, pid, stdout, stderr):
     19        self.test = test
     20        self.cmd = test.get_command(prefix, tempdir)
     21        self.pid = pid
     22        self.stdout = stdout
     23        self.stderr = stderr
     24        self.start = datetime.now()
     25        self.out = []
     26        self.err = []
     27 
     28 
     29 def spawn_test(test, prefix, tempdir, passthrough, run_skipped, show_cmd):
     30    """Spawn one child, return a task struct."""
     31    if not test.enable and not run_skipped:
     32        return None
     33 
     34    cmd = test.get_command(prefix, tempdir)
     35    if show_cmd:
     36        print(escape_cmdline(cmd))
     37 
     38    if passthrough:
     39        os.execvp(cmd[0], cmd)
     40        return
     41 
     42    (rout, wout) = os.pipe()
     43    (rerr, werr) = os.pipe()
     44 
     45    file_actions = [
     46        (os.POSIX_SPAWN_CLOSE, rout),
     47        (os.POSIX_SPAWN_CLOSE, rerr),
     48        (os.POSIX_SPAWN_DUP2, wout, 1),
     49        (os.POSIX_SPAWN_DUP2, werr, 2),
     50    ]
     51    pid = os.posix_spawnp(cmd[0], cmd, os.environ, file_actions=file_actions)
     52 
     53    os.close(wout)
     54    os.close(werr)
     55    return Task(test, prefix, tempdir, pid, rout, rerr)
     56 
     57 
     58 def get_max_wait(tasks, timeout):
     59    """
     60    Return the maximum time we can wait before any task should time out.
     61    """
     62 
     63    # If we have a progress-meter, we need to wake up to update it frequently.
     64    wait = ProgressBar.update_granularity()
     65 
     66    # If a timeout is supplied, we need to wake up for the first task to
     67    # timeout if that is sooner.
     68    if timeout:
     69        now = datetime.now()
     70        timeout_delta = timedelta(seconds=timeout)
     71        for task in tasks:
     72            remaining = task.start + timeout_delta - now
     73            wait = min(wait, remaining)
     74 
     75    # Return the wait time in seconds, clamped between zero and max_wait.
     76    return max(wait.total_seconds(), 0)
     77 
     78 
     79 def flush_input(fd, frags):
     80    """
     81    Read any pages sitting in the file descriptor 'fd' into the list 'frags'.
     82    """
     83    rv = os.read(fd, 4096)
     84    frags.append(rv)
     85    while len(rv) == 4096:
     86        # If read() returns a full buffer, it may indicate there was 1 buffer
     87        # worth of data, or that there is more data to read.  Poll the socket
     88        # before we read again to ensure that we will not block indefinitly.
     89        readable, _, _ = select.select([fd], [], [], 0)
     90        if not readable:
     91            return
     92 
     93        rv = os.read(fd, 4096)
     94        frags.append(rv)
     95 
     96 
     97 def read_input(tasks, timeout):
     98    """
     99    Select on input or errors from the given task list for a max of timeout
    100    seconds.
    101    """
    102    rlist = []
    103    exlist = []
    104    outmap = {}  # Fast access to fragment list given fd.
    105    for t in tasks:
    106        rlist.append(t.stdout)
    107        rlist.append(t.stderr)
    108        outmap[t.stdout] = t.out
    109        outmap[t.stderr] = t.err
    110        # This will trigger with a close event when the child dies, allowing
    111        # us to respond immediately and not leave cores idle.
    112        exlist.append(t.stdout)
    113 
    114    readable = []
    115    try:
    116        readable, _, _ = select.select(rlist, [], exlist, timeout)
    117    except OverflowError:
    118        print >> sys.stderr, "timeout value", timeout
    119        raise
    120 
    121    for fd in readable:
    122        flush_input(fd, outmap[fd])
    123 
    124 
    125 def remove_task(tasks, pid):
    126    """
    127    Remove a task from the tasks list and return it.
    128    """
    129    index = None
    130    for i, t in enumerate(tasks):
    131        if t.pid == pid:
    132            index = i
    133            break
    134    else:
    135        raise KeyError(f"No such pid: {pid}")
    136 
    137    out = tasks[index]
    138    tasks.pop(index)
    139    return out
    140 
    141 
    142 def timed_out(task, timeout):
    143    """
    144    Return a timedelta with the amount we are overdue, or False if the timeout
    145    has not yet been reached (or timeout is falsy, indicating there is no
    146    timeout.)
    147    """
    148    if not timeout:
    149        return False
    150 
    151    elapsed = datetime.now() - task.start
    152    over = elapsed - timedelta(seconds=timeout)
    153    return over if over.total_seconds() > 0 else False
    154 
    155 
    156 def reap_zombies(tasks, timeout):
    157    """
    158    Search for children of this process that have finished. If they are tasks,
    159    then this routine will clean up the child. This method returns a new task
    160    list that has had the ended tasks removed, followed by the list of finished
    161    tasks.
    162    """
    163    finished = []
    164    while True:
    165        try:
    166            pid, status = os.waitpid(0, os.WNOHANG)
    167            if pid == 0:
    168                break
    169        except OSError as e:
    170            if e.errno == errno.ECHILD:
    171                break
    172            raise e
    173 
    174        ended = remove_task(tasks, pid)
    175        flush_input(ended.stdout, ended.out)
    176        flush_input(ended.stderr, ended.err)
    177        os.close(ended.stdout)
    178        os.close(ended.stderr)
    179 
    180        returncode = os.WEXITSTATUS(status)
    181        if os.WIFSIGNALED(status):
    182            returncode = -os.WTERMSIG(status)
    183 
    184        finished.append(
    185            TestOutput(
    186                ended.test,
    187                ended.cmd,
    188                b"".join(ended.out).decode("utf-8", "replace"),
    189                b"".join(ended.err).decode("utf-8", "replace"),
    190                returncode,
    191                (datetime.now() - ended.start).total_seconds(),
    192                timed_out(ended, timeout),
    193                {"pid": ended.pid},
    194            )
    195        )
    196    return tasks, finished
    197 
    198 
    199 def kill_undead(tasks, timeout):
    200    """
    201    Signal all children that are over the given timeout. Use SIGABRT first to
    202    generate a stack dump. If it still doesn't die for another 30 seconds, kill
    203    with SIGKILL.
    204    """
    205    for task in tasks:
    206        over = timed_out(task, timeout)
    207        if over:
    208            if over.total_seconds() < 30:
    209                os.kill(task.pid, signal.SIGABRT)
    210            else:
    211                os.kill(task.pid, signal.SIGKILL)
    212 
    213 
    214 def run_all_tests(tests, prefix, tempdir, pb, options):
    215    # Copy and reverse for fast pop off end.
    216    tests = list(tests)
    217    tests = tests[:]
    218    tests.reverse()
    219 
    220    # The set of currently running tests.
    221    tasks = []
    222 
    223    # Piggy back on the first test to generate the XDR content needed for all
    224    # other tests to run. To avoid read/write races, we temporarily limit the
    225    # number of workers.
    226    wait_for_encoding = False
    227    worker_count = options.worker_count
    228    if options.use_xdr and len(tests) > 1:
    229        # The next loop pops tests, thus we iterate over the tests in reversed
    230        # order.
    231        tests = list(xdr_annotate(reversed(tests), options))
    232        tests = tests[:]
    233        tests.reverse()
    234        wait_for_encoding = True
    235        worker_count = 1
    236 
    237    def running_heavy_test():
    238        return any(task.test.heavy for task in tasks)
    239 
    240    heavy_tests = [t for t in tests if t.heavy]
    241    light_tests = [t for t in tests if not t.heavy]
    242 
    243    encoding_test = None
    244    while light_tests or heavy_tests or tasks:
    245        new_tests = []
    246        max_new_tests = worker_count - len(tasks)
    247        if (
    248            heavy_tests
    249            and not running_heavy_test()
    250            and len(new_tests) < max_new_tests
    251            and not wait_for_encoding
    252        ):
    253            # Schedule a heavy test if available.
    254            new_tests.append(heavy_tests.pop())
    255        while light_tests and len(new_tests) < max_new_tests:
    256            # Schedule as many more light tests as we can.
    257            new_tests.append(light_tests.pop())
    258 
    259        assert len(tasks) + len(new_tests) <= worker_count
    260        assert len([x for x in new_tests if x.heavy]) <= 1
    261 
    262        for test in new_tests:
    263            task = spawn_test(
    264                test,
    265                prefix,
    266                tempdir,
    267                options.passthrough,
    268                options.run_skipped,
    269                options.show_cmd,
    270            )
    271            if task:
    272                tasks.append(task)
    273                if not encoding_test:
    274                    encoding_test = test
    275            else:
    276                yield NullTestOutput(test)
    277 
    278        timeout = get_max_wait(tasks, options.timeout)
    279        read_input(tasks, timeout)
    280 
    281        kill_undead(tasks, options.timeout)
    282        tasks, finished = reap_zombies(tasks, options.timeout)
    283 
    284        for out in finished:
    285            yield out
    286            if wait_for_encoding and out.test == encoding_test:
    287                assert encoding_test.selfhosted_xdr_mode == "encode"
    288                wait_for_encoding = False
    289                worker_count = options.worker_count
    290 
    291        # If we did not finish any tasks, poke the progress bar to show that
    292        # the test harness is at least not frozen.
    293        if len(finished) == 0:
    294            pb.poke()