tasks_win.py (7087B)
1 # This Source Code Form is subject to the terms of the Mozilla Public 2 # License, v. 2.0. If a copy of the MPL was not distributed with this 3 # file, You can obtain one at http://mozilla.org/MPL/2.0/. */ 4 import subprocess 5 import sys 6 from datetime import datetime, timedelta 7 from queue import Empty, Queue 8 from threading import Thread 9 10 from .adaptor import xdr_annotate 11 from .progressbar import ProgressBar 12 from .results import NullTestOutput, TestOutput, escape_cmdline 13 14 15 class EndMarker: 16 pass 17 18 19 class TaskFinishedMarker: 20 pass 21 22 23 class MultiQueue: 24 def __init__(self, *queues): 25 self.queues = queues 26 self.output_queue = Queue(maxsize=1) 27 for q in queues: 28 thread = Thread(target=self._queue_getter, args=(q,), daemon=True) 29 thread.start() 30 31 def _queue_getter(self, q): 32 while True: 33 item = q.get() 34 self.output_queue.put(item) 35 if item is EndMarker: 36 return 37 38 def get(self): 39 return self.output_queue.get() 40 41 42 def _do_work( 43 workerId, 44 qTasks, 45 qHeavyTasks, 46 qResults, 47 qWatch, 48 prefix, 49 tempdir, 50 run_skipped, 51 timeout, 52 show_cmd, 53 ): 54 q = qTasks 55 required_end_markers = 1 56 if workerId == 0: 57 # Only one worker handles heavy tests. 58 q = MultiQueue(qTasks, qHeavyTasks) 59 required_end_markers = 2 60 61 num_end_markers = 0 62 while True: 63 test = q.get() 64 if test is EndMarker: 65 num_end_markers += 1 66 if num_end_markers == required_end_markers: 67 qWatch.put(EndMarker) 68 qResults.put(EndMarker) 69 return 70 continue 71 72 if not test.enable and not run_skipped: 73 qResults.put(NullTestOutput(test)) 74 continue 75 76 # Spawn the test task. 77 cmd = test.get_command(prefix, tempdir) 78 if show_cmd: 79 print(escape_cmdline(cmd)) 80 tStart = datetime.now() 81 proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 82 83 # Push the task to the watchdog -- it will kill the task 84 # if it goes over the timeout while we keep its stdout 85 # buffer clear on the "main" worker thread. 86 qWatch.put(proc) 87 out, err = proc.communicate() 88 # We're not setting universal_newlines=True in subprocess.Popen due to 89 # still needing to support Python 3.5, which doesn't have the "encoding" 90 # parameter to the Popen constructor, so we have to decode the output 91 # here. 92 system_encoding = "mbcs" if sys.platform == "win32" else "utf-8" 93 out = out.decode(system_encoding, errors="replace") 94 err = err.decode(system_encoding, errors="replace") 95 qWatch.put(TaskFinishedMarker) 96 97 # Create a result record and forward to result processing. 98 dt = datetime.now() - tStart 99 result = TestOutput( 100 test, 101 cmd, 102 out, 103 err, 104 proc.returncode, 105 dt.total_seconds(), 106 dt > timedelta(seconds=timeout), 107 ) 108 qResults.put(result) 109 110 111 def _do_watch(qWatch, timeout): 112 while True: 113 proc = qWatch.get(True) 114 if proc == EndMarker: 115 return 116 try: 117 fin = qWatch.get(block=True, timeout=timeout) 118 assert fin is TaskFinishedMarker, "invalid finish marker" 119 except Empty: 120 # Timed out, force-kill the test. 121 try: 122 proc.terminate() 123 except OSError as ex: 124 # If the process finishes after we time out but before we 125 # terminate, the terminate call will fail. We can safely 126 # ignore this. 127 if ex.winerror != 5: 128 raise 129 fin = qWatch.get() 130 assert fin is TaskFinishedMarker, "invalid finish marker" 131 132 133 def run_all_tests(tests, prefix, tempdir, pb, options): 134 """ 135 Uses scatter-gather to a thread-pool to manage children. 136 """ 137 qTasks, qHeavyTasks, qResults = Queue(), Queue(), Queue() 138 139 workers = [] 140 watchdogs = [] 141 for i in range(options.worker_count): 142 qWatch = Queue() 143 watcher = Thread(target=_do_watch, args=(qWatch, options.timeout)) 144 watcher.setDaemon(True) 145 watcher.start() 146 watchdogs.append(watcher) 147 worker = Thread( 148 target=_do_work, 149 args=( 150 i, 151 qTasks, 152 qHeavyTasks, 153 qResults, 154 qWatch, 155 prefix, 156 tempdir, 157 options.run_skipped, 158 options.timeout, 159 options.show_cmd, 160 ), 161 ) 162 worker.setDaemon(True) 163 worker.start() 164 workers.append(worker) 165 166 delay = ProgressBar.update_granularity().total_seconds() 167 168 # Before inserting all the tests cases, to be checked in parallel, we are 169 # only queueing the XDR encoding test case which would be responsible for 170 # recording the self-hosted code. Once completed, we will proceed by 171 # queueing the rest of the test cases. 172 if options.use_xdr: 173 tests = xdr_annotate(tests, options) 174 # This loop consumes the first elements of the `tests` iterator, until 175 # it reaches the self-hosted encoding test case, and leave the 176 # remaining tests in the iterator to be scheduled on multiple threads. 177 for test in tests: 178 if test.selfhosted_xdr_mode == "encode": 179 qTasks.put(test) 180 yield qResults.get(block=True) 181 break 182 assert not test.enable and not options.run_skipped 183 yield NullTestOutput(test) 184 185 # Insert all jobs into the queue, followed by the queue-end 186 # marker, one per worker. This will not block on growing the 187 # queue, only on waiting for more items in the generator. The 188 # workers are already started, however, so this will process as 189 # fast as we can produce tests from the filesystem. 190 def _do_push(num_workers, qTasks): 191 for test in tests: 192 if test.heavy: 193 qHeavyTasks.put(test) 194 else: 195 qTasks.put(test) 196 for _ in range(num_workers): 197 qTasks.put(EndMarker) 198 qHeavyTasks.put(EndMarker) 199 200 pusher = Thread(target=_do_push, args=(len(workers), qTasks)) 201 pusher.setDaemon(True) 202 pusher.start() 203 204 # Read from the results. 205 ended = 0 206 while ended < len(workers): 207 try: 208 result = qResults.get(block=True, timeout=delay) 209 if result is EndMarker: 210 ended += 1 211 else: 212 yield result 213 except Empty: 214 pb.poke() 215 216 # Cleanup and exit. 217 pusher.join() 218 for worker in workers: 219 worker.join() 220 for watcher in watchdogs: 221 watcher.join() 222 assert qTasks.empty(), "Send queue not drained" 223 assert qHeavyTasks.empty(), "Send queue (heavy tasks) not drained" 224 assert qResults.empty(), "Result queue not drained"