tasks_unix.py (8920B)
1 # A unix-oriented process dispatcher. Uses a single thread with select and 2 # waitpid to dispatch tasks. This avoids several deadlocks that are possible 3 # with fork/exec + threads + Python. 4 5 import errno 6 import os 7 import select 8 import signal 9 import sys 10 from datetime import datetime, timedelta 11 12 from .adaptor import xdr_annotate 13 from .progressbar import ProgressBar 14 from .results import NullTestOutput, TestOutput, escape_cmdline 15 16 17 class Task: 18 def __init__(self, test, prefix, tempdir, pid, stdout, stderr): 19 self.test = test 20 self.cmd = test.get_command(prefix, tempdir) 21 self.pid = pid 22 self.stdout = stdout 23 self.stderr = stderr 24 self.start = datetime.now() 25 self.out = [] 26 self.err = [] 27 28 29 def spawn_test(test, prefix, tempdir, passthrough, run_skipped, show_cmd): 30 """Spawn one child, return a task struct.""" 31 if not test.enable and not run_skipped: 32 return None 33 34 cmd = test.get_command(prefix, tempdir) 35 if show_cmd: 36 print(escape_cmdline(cmd)) 37 38 if passthrough: 39 os.execvp(cmd[0], cmd) 40 return 41 42 (rout, wout) = os.pipe() 43 (rerr, werr) = os.pipe() 44 45 file_actions = [ 46 (os.POSIX_SPAWN_CLOSE, rout), 47 (os.POSIX_SPAWN_CLOSE, rerr), 48 (os.POSIX_SPAWN_DUP2, wout, 1), 49 (os.POSIX_SPAWN_DUP2, werr, 2), 50 ] 51 pid = os.posix_spawnp(cmd[0], cmd, os.environ, file_actions=file_actions) 52 53 os.close(wout) 54 os.close(werr) 55 return Task(test, prefix, tempdir, pid, rout, rerr) 56 57 58 def get_max_wait(tasks, timeout): 59 """ 60 Return the maximum time we can wait before any task should time out. 61 """ 62 63 # If we have a progress-meter, we need to wake up to update it frequently. 64 wait = ProgressBar.update_granularity() 65 66 # If a timeout is supplied, we need to wake up for the first task to 67 # timeout if that is sooner. 68 if timeout: 69 now = datetime.now() 70 timeout_delta = timedelta(seconds=timeout) 71 for task in tasks: 72 remaining = task.start + timeout_delta - now 73 wait = min(wait, remaining) 74 75 # Return the wait time in seconds, clamped between zero and max_wait. 76 return max(wait.total_seconds(), 0) 77 78 79 def flush_input(fd, frags): 80 """ 81 Read any pages sitting in the file descriptor 'fd' into the list 'frags'. 82 """ 83 rv = os.read(fd, 4096) 84 frags.append(rv) 85 while len(rv) == 4096: 86 # If read() returns a full buffer, it may indicate there was 1 buffer 87 # worth of data, or that there is more data to read. Poll the socket 88 # before we read again to ensure that we will not block indefinitly. 89 readable, _, _ = select.select([fd], [], [], 0) 90 if not readable: 91 return 92 93 rv = os.read(fd, 4096) 94 frags.append(rv) 95 96 97 def read_input(tasks, timeout): 98 """ 99 Select on input or errors from the given task list for a max of timeout 100 seconds. 101 """ 102 rlist = [] 103 exlist = [] 104 outmap = {} # Fast access to fragment list given fd. 105 for t in tasks: 106 rlist.append(t.stdout) 107 rlist.append(t.stderr) 108 outmap[t.stdout] = t.out 109 outmap[t.stderr] = t.err 110 # This will trigger with a close event when the child dies, allowing 111 # us to respond immediately and not leave cores idle. 112 exlist.append(t.stdout) 113 114 readable = [] 115 try: 116 readable, _, _ = select.select(rlist, [], exlist, timeout) 117 except OverflowError: 118 print >> sys.stderr, "timeout value", timeout 119 raise 120 121 for fd in readable: 122 flush_input(fd, outmap[fd]) 123 124 125 def remove_task(tasks, pid): 126 """ 127 Remove a task from the tasks list and return it. 128 """ 129 index = None 130 for i, t in enumerate(tasks): 131 if t.pid == pid: 132 index = i 133 break 134 else: 135 raise KeyError(f"No such pid: {pid}") 136 137 out = tasks[index] 138 tasks.pop(index) 139 return out 140 141 142 def timed_out(task, timeout): 143 """ 144 Return a timedelta with the amount we are overdue, or False if the timeout 145 has not yet been reached (or timeout is falsy, indicating there is no 146 timeout.) 147 """ 148 if not timeout: 149 return False 150 151 elapsed = datetime.now() - task.start 152 over = elapsed - timedelta(seconds=timeout) 153 return over if over.total_seconds() > 0 else False 154 155 156 def reap_zombies(tasks, timeout): 157 """ 158 Search for children of this process that have finished. If they are tasks, 159 then this routine will clean up the child. This method returns a new task 160 list that has had the ended tasks removed, followed by the list of finished 161 tasks. 162 """ 163 finished = [] 164 while True: 165 try: 166 pid, status = os.waitpid(0, os.WNOHANG) 167 if pid == 0: 168 break 169 except OSError as e: 170 if e.errno == errno.ECHILD: 171 break 172 raise e 173 174 ended = remove_task(tasks, pid) 175 flush_input(ended.stdout, ended.out) 176 flush_input(ended.stderr, ended.err) 177 os.close(ended.stdout) 178 os.close(ended.stderr) 179 180 returncode = os.WEXITSTATUS(status) 181 if os.WIFSIGNALED(status): 182 returncode = -os.WTERMSIG(status) 183 184 finished.append( 185 TestOutput( 186 ended.test, 187 ended.cmd, 188 b"".join(ended.out).decode("utf-8", "replace"), 189 b"".join(ended.err).decode("utf-8", "replace"), 190 returncode, 191 (datetime.now() - ended.start).total_seconds(), 192 timed_out(ended, timeout), 193 {"pid": ended.pid}, 194 ) 195 ) 196 return tasks, finished 197 198 199 def kill_undead(tasks, timeout): 200 """ 201 Signal all children that are over the given timeout. Use SIGABRT first to 202 generate a stack dump. If it still doesn't die for another 30 seconds, kill 203 with SIGKILL. 204 """ 205 for task in tasks: 206 over = timed_out(task, timeout) 207 if over: 208 if over.total_seconds() < 30: 209 os.kill(task.pid, signal.SIGABRT) 210 else: 211 os.kill(task.pid, signal.SIGKILL) 212 213 214 def run_all_tests(tests, prefix, tempdir, pb, options): 215 # Copy and reverse for fast pop off end. 216 tests = list(tests) 217 tests = tests[:] 218 tests.reverse() 219 220 # The set of currently running tests. 221 tasks = [] 222 223 # Piggy back on the first test to generate the XDR content needed for all 224 # other tests to run. To avoid read/write races, we temporarily limit the 225 # number of workers. 226 wait_for_encoding = False 227 worker_count = options.worker_count 228 if options.use_xdr and len(tests) > 1: 229 # The next loop pops tests, thus we iterate over the tests in reversed 230 # order. 231 tests = list(xdr_annotate(reversed(tests), options)) 232 tests = tests[:] 233 tests.reverse() 234 wait_for_encoding = True 235 worker_count = 1 236 237 def running_heavy_test(): 238 return any(task.test.heavy for task in tasks) 239 240 heavy_tests = [t for t in tests if t.heavy] 241 light_tests = [t for t in tests if not t.heavy] 242 243 encoding_test = None 244 while light_tests or heavy_tests or tasks: 245 new_tests = [] 246 max_new_tests = worker_count - len(tasks) 247 if ( 248 heavy_tests 249 and not running_heavy_test() 250 and len(new_tests) < max_new_tests 251 and not wait_for_encoding 252 ): 253 # Schedule a heavy test if available. 254 new_tests.append(heavy_tests.pop()) 255 while light_tests and len(new_tests) < max_new_tests: 256 # Schedule as many more light tests as we can. 257 new_tests.append(light_tests.pop()) 258 259 assert len(tasks) + len(new_tests) <= worker_count 260 assert len([x for x in new_tests if x.heavy]) <= 1 261 262 for test in new_tests: 263 task = spawn_test( 264 test, 265 prefix, 266 tempdir, 267 options.passthrough, 268 options.run_skipped, 269 options.show_cmd, 270 ) 271 if task: 272 tasks.append(task) 273 if not encoding_test: 274 encoding_test = test 275 else: 276 yield NullTestOutput(test) 277 278 timeout = get_max_wait(tasks, options.timeout) 279 read_input(tasks, timeout) 280 281 kill_undead(tasks, options.timeout) 282 tasks, finished = reap_zombies(tasks, options.timeout) 283 284 for out in finished: 285 yield out 286 if wait_for_encoding and out.test == encoding_test: 287 assert encoding_test.selfhosted_xdr_mode == "encode" 288 wait_for_encoding = False 289 worker_count = options.worker_count 290 291 # If we did not finish any tasks, poke the progress bar to show that 292 # the test harness is at least not frozen. 293 if len(finished) == 0: 294 pb.poke()