tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

tasks_win.py (7087B)


      1 # This Source Code Form is subject to the terms of the Mozilla Public
      2 # License, v. 2.0. If a copy of the MPL was not distributed with this
      3 # file, You can obtain one at http://mozilla.org/MPL/2.0/. */
      4 import subprocess
      5 import sys
      6 from datetime import datetime, timedelta
      7 from queue import Empty, Queue
      8 from threading import Thread
      9 
     10 from .adaptor import xdr_annotate
     11 from .progressbar import ProgressBar
     12 from .results import NullTestOutput, TestOutput, escape_cmdline
     13 
     14 
     15 class EndMarker:
     16    pass
     17 
     18 
     19 class TaskFinishedMarker:
     20    pass
     21 
     22 
     23 class MultiQueue:
     24    def __init__(self, *queues):
     25        self.queues = queues
     26        self.output_queue = Queue(maxsize=1)
     27        for q in queues:
     28            thread = Thread(target=self._queue_getter, args=(q,), daemon=True)
     29            thread.start()
     30 
     31    def _queue_getter(self, q):
     32        while True:
     33            item = q.get()
     34            self.output_queue.put(item)
     35            if item is EndMarker:
     36                return
     37 
     38    def get(self):
     39        return self.output_queue.get()
     40 
     41 
     42 def _do_work(
     43    workerId,
     44    qTasks,
     45    qHeavyTasks,
     46    qResults,
     47    qWatch,
     48    prefix,
     49    tempdir,
     50    run_skipped,
     51    timeout,
     52    show_cmd,
     53 ):
     54    q = qTasks
     55    required_end_markers = 1
     56    if workerId == 0:
     57        # Only one worker handles heavy tests.
     58        q = MultiQueue(qTasks, qHeavyTasks)
     59        required_end_markers = 2
     60 
     61    num_end_markers = 0
     62    while True:
     63        test = q.get()
     64        if test is EndMarker:
     65            num_end_markers += 1
     66            if num_end_markers == required_end_markers:
     67                qWatch.put(EndMarker)
     68                qResults.put(EndMarker)
     69                return
     70            continue
     71 
     72        if not test.enable and not run_skipped:
     73            qResults.put(NullTestOutput(test))
     74            continue
     75 
     76        # Spawn the test task.
     77        cmd = test.get_command(prefix, tempdir)
     78        if show_cmd:
     79            print(escape_cmdline(cmd))
     80        tStart = datetime.now()
     81        proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
     82 
     83        # Push the task to the watchdog -- it will kill the task
     84        # if it goes over the timeout while we keep its stdout
     85        # buffer clear on the "main" worker thread.
     86        qWatch.put(proc)
     87        out, err = proc.communicate()
     88        # We're not setting universal_newlines=True in subprocess.Popen due to
     89        # still needing to support Python 3.5, which doesn't have the "encoding"
     90        # parameter to the Popen constructor, so we have to decode the output
     91        # here.
     92        system_encoding = "mbcs" if sys.platform == "win32" else "utf-8"
     93        out = out.decode(system_encoding, errors="replace")
     94        err = err.decode(system_encoding, errors="replace")
     95        qWatch.put(TaskFinishedMarker)
     96 
     97        # Create a result record and forward to result processing.
     98        dt = datetime.now() - tStart
     99        result = TestOutput(
    100            test,
    101            cmd,
    102            out,
    103            err,
    104            proc.returncode,
    105            dt.total_seconds(),
    106            dt > timedelta(seconds=timeout),
    107        )
    108        qResults.put(result)
    109 
    110 
    111 def _do_watch(qWatch, timeout):
    112    while True:
    113        proc = qWatch.get(True)
    114        if proc == EndMarker:
    115            return
    116        try:
    117            fin = qWatch.get(block=True, timeout=timeout)
    118            assert fin is TaskFinishedMarker, "invalid finish marker"
    119        except Empty:
    120            # Timed out, force-kill the test.
    121            try:
    122                proc.terminate()
    123            except OSError as ex:
    124                # If the process finishes after we time out but before we
    125                # terminate, the terminate call will fail. We can safely
    126                # ignore this.
    127                if ex.winerror != 5:
    128                    raise
    129            fin = qWatch.get()
    130            assert fin is TaskFinishedMarker, "invalid finish marker"
    131 
    132 
    133 def run_all_tests(tests, prefix, tempdir, pb, options):
    134    """
    135    Uses scatter-gather to a thread-pool to manage children.
    136    """
    137    qTasks, qHeavyTasks, qResults = Queue(), Queue(), Queue()
    138 
    139    workers = []
    140    watchdogs = []
    141    for i in range(options.worker_count):
    142        qWatch = Queue()
    143        watcher = Thread(target=_do_watch, args=(qWatch, options.timeout))
    144        watcher.setDaemon(True)
    145        watcher.start()
    146        watchdogs.append(watcher)
    147        worker = Thread(
    148            target=_do_work,
    149            args=(
    150                i,
    151                qTasks,
    152                qHeavyTasks,
    153                qResults,
    154                qWatch,
    155                prefix,
    156                tempdir,
    157                options.run_skipped,
    158                options.timeout,
    159                options.show_cmd,
    160            ),
    161        )
    162        worker.setDaemon(True)
    163        worker.start()
    164        workers.append(worker)
    165 
    166    delay = ProgressBar.update_granularity().total_seconds()
    167 
    168    # Before inserting all the tests cases, to be checked in parallel, we are
    169    # only queueing the XDR encoding test case which would be responsible for
    170    # recording the self-hosted code. Once completed, we will proceed by
    171    # queueing the rest of the test cases.
    172    if options.use_xdr:
    173        tests = xdr_annotate(tests, options)
    174        # This loop consumes the first elements of the `tests` iterator, until
    175        # it reaches the self-hosted encoding test case, and leave the
    176        # remaining tests in the iterator to be scheduled on multiple threads.
    177        for test in tests:
    178            if test.selfhosted_xdr_mode == "encode":
    179                qTasks.put(test)
    180                yield qResults.get(block=True)
    181                break
    182            assert not test.enable and not options.run_skipped
    183            yield NullTestOutput(test)
    184 
    185    # Insert all jobs into the queue, followed by the queue-end
    186    # marker, one per worker. This will not block on growing the
    187    # queue, only on waiting for more items in the generator. The
    188    # workers are already started, however, so this will process as
    189    # fast as we can produce tests from the filesystem.
    190    def _do_push(num_workers, qTasks):
    191        for test in tests:
    192            if test.heavy:
    193                qHeavyTasks.put(test)
    194            else:
    195                qTasks.put(test)
    196        for _ in range(num_workers):
    197            qTasks.put(EndMarker)
    198        qHeavyTasks.put(EndMarker)
    199 
    200    pusher = Thread(target=_do_push, args=(len(workers), qTasks))
    201    pusher.setDaemon(True)
    202    pusher.start()
    203 
    204    # Read from the results.
    205    ended = 0
    206    while ended < len(workers):
    207        try:
    208            result = qResults.get(block=True, timeout=delay)
    209            if result is EndMarker:
    210                ended += 1
    211            else:
    212                yield result
    213        except Empty:
    214            pb.poke()
    215 
    216    # Cleanup and exit.
    217    pusher.join()
    218    for worker in workers:
    219        worker.join()
    220    for watcher in watchdogs:
    221        watcher.join()
    222    assert qTasks.empty(), "Send queue not drained"
    223    assert qHeavyTasks.empty(), "Send queue (heavy tasks) not drained"
    224    assert qResults.empty(), "Result queue not drained"