processhandler.py (21616B)
1 # This Source Code Form is subject to the terms of the Mozilla Public 2 # License, v. 2.0. If a copy of the MPL was not distributed with this file, 3 # You can obtain one at http://mozilla.org/MPL/2.0/. 4 5 # The Marionette ProcessHandler and ProcessHandlerMixin classes are only 6 # utilized by Marionette as an alternative to the mozprocess package. 7 # 8 # This necessity arises because Marionette supports the application to 9 # restart itself and, under such conditions, fork its process. To maintain 10 # the ability to track the process, including permissions to terminate 11 # the process and receive log entries via stdout and stderr, the psutil 12 # package is utilized. To prevent any side effects for consumers of 13 # mozprocess, all necessary helper classes have been duplicated for now. 14 15 import codecs 16 import os 17 import signal 18 import subprocess 19 import sys 20 import threading 21 import time 22 import traceback 23 from queue import Empty, Queue 24 25 import psutil 26 27 # Set the MOZPROCESS_DEBUG environment variable to 1 to see some debugging output 28 MOZPROCESS_DEBUG = os.getenv("MOZPROCESS_DEBUG") 29 30 INTERVAL_PROCESS_ALIVE_CHECK = 0.02 31 32 # For not self-managed processes the returncode seems to not be available. 33 # Use `8` to indicate this specific situation for now. 34 UNKNOWN_RETURNCODE = 8 35 36 # We dont use mozinfo because it is expensive to import, see bug 933558. 37 isPosix = os.name == "posix" # includes MacOS X 38 39 40 class ProcessHandlerMixin: 41 """ 42 A class for launching and manipulating local processes. 43 44 :param cmd: command to run. May be a string or a list. If specified as a list, the first 45 element will be interpreted as the command, and all additional elements will be interpreted 46 as arguments to that command. 47 :param args: list of arguments to pass to the command (defaults to None). Must not be set when 48 `cmd` is specified as a list. 49 :param cwd: working directory for command (defaults to None). 50 :param env: is the environment to use for the process (defaults to os.environ). 51 :param kill_on_timeout: when True, the process will be killed when a timeout is reached. 52 When False, the caller is responsible for killing the process. 53 Failure to do so could cause a call to wait() to hang indefinitely. (Defaults to True.) 54 :param processOutputLine: function or list of functions to be called for 55 each line of output produced by the process (defaults to an empty 56 list). 57 :param processStderrLine: function or list of functions to be called 58 for each line of error output - stderr - produced by the process 59 (defaults to an empty list). If this is not specified, stderr lines 60 will be sent to the *processOutputLine* callbacks. 61 :param onTimeout: function or list of functions to be called when the process times out. 62 :param onFinish: function or list of functions to be called when the process terminates 63 normally without timing out. 64 :param kwargs: additional keyword args to pass directly into Popen. 65 66 NOTE: Child processes will be tracked by default. 67 """ 68 69 def __init__( 70 self, 71 cmd, 72 args=None, 73 cwd=None, 74 env=None, 75 kill_on_timeout=True, 76 processOutputLine=(), 77 processStderrLine=(), 78 onTimeout=(), 79 onFinish=(), 80 **kwargs, 81 ): 82 self.args = args 83 self.cmd = cmd 84 self.cwd = cwd 85 self.keywordargs = kwargs 86 87 self.didTimeout = False 88 self.didOutputTimeout = False 89 self.proc = None 90 91 if env is None: 92 env = os.environ.copy() 93 self.env = env 94 95 # handlers 96 def to_callable_list(arg): 97 if callable(arg): 98 arg = [arg] 99 return CallableList(arg) 100 101 processOutputLine = to_callable_list(processOutputLine) 102 processStderrLine = to_callable_list(processStderrLine) 103 onTimeout = to_callable_list(onTimeout) 104 onFinish = to_callable_list(onFinish) 105 106 def on_timeout(): 107 self.didTimeout = True 108 self.didOutputTimeout = self.reader.didOutputTimeout 109 if kill_on_timeout: 110 self.kill() 111 112 onTimeout.insert(0, on_timeout) 113 114 self._stderr = subprocess.STDOUT 115 if processStderrLine: 116 self._stderr = subprocess.PIPE 117 self.reader = ProcessReader( 118 stdout_callback=processOutputLine, 119 stderr_callback=processStderrLine, 120 finished_callback=onFinish, 121 timeout_callback=onTimeout, 122 ) 123 124 # It is common for people to pass in the entire array with the cmd and 125 # the args together since this is how Popen uses it. Allow for that. 126 if isinstance(self.cmd, list): 127 if self.args is not None: 128 raise TypeError("cmd and args must not both be lists") 129 (self.cmd, self.args) = (self.cmd[0], self.cmd[1:]) 130 elif self.args is None: 131 self.args = [] 132 133 def _has_valid_proc(func): 134 def wrapper(self, *args, **kwargs): 135 if self.proc is None: 136 raise RuntimeError("Process hasn't been started yet") 137 return func(self, *args, **kwargs) 138 139 return wrapper 140 141 @property 142 @_has_valid_proc 143 def pid(self): 144 return self.proc.pid 145 146 @staticmethod 147 def pid_exists(pid): 148 return psutil.pid_exists(pid) 149 150 @property 151 def timedOut(self): 152 """True if the process has timed out for any reason.""" 153 return self.didTimeout 154 155 @property 156 def outputTimedOut(self): 157 """True if the process has timed out for no output.""" 158 return self.didOutputTimeout 159 160 @property 161 def commandline(self): 162 """the string value of the command line (command + args)""" 163 return subprocess.list2cmdline([self.cmd] + self.args) 164 165 def _debug(self, msg): 166 if not MOZPROCESS_DEBUG: 167 return 168 169 print(f"DBG::MARIONETTE ProcessHandler {self.pid} | {msg}", file=sys.stdout) 170 171 @_has_valid_proc 172 def kill(self, sig=None, timeout=None): 173 """Kills the managed process and all its child processes. 174 175 :param sig: Signal to use to kill the process. (Defaults to SIGKILL) 176 177 :param timeout: If not None, wait this number of seconds for the 178 process to exit. 179 180 Note that this does not manage any state, save any output etc, 181 it immediately kills the process. 182 """ 183 if hasattr(self, "returncode"): 184 return self.returncode 185 186 if self.proc.is_running(): 187 processes = [self.proc] + self.proc.children(recursive=True) 188 189 if sig is None: 190 # TODO: try SIGTERM first to sanely shutdown the application 191 # and to not break later when Windows support gets added. 192 sig = signal.SIGKILL 193 194 # Do we need that? 195 for process in processes: 196 try: 197 self._debug(f"Killing process: {process}") 198 process.send_signal(sig) 199 except psutil.NoSuchProcess: 200 pass 201 psutil.wait_procs(processes, timeout=timeout) 202 203 # When we kill the the managed process we also have to wait for the 204 # reader thread to be finished. Otherwise consumers would have to assume 205 # that it still has not completely shutdown. 206 self.returncode = self.wait(0) 207 if self.returncode is None: 208 self._debug("kill: wait failed -- process is still alive") 209 210 return self.returncode 211 212 @_has_valid_proc 213 def poll(self): 214 """Check if child process has terminated 215 216 Returns the current returncode value: 217 - None if the process hasn't terminated yet 218 - A negative number if the process was killed by signal N (Unix only) 219 - '0' if the process ended without failures 220 221 """ 222 if hasattr(self, "returncode"): 223 return self.returncode 224 225 # If the process that is observed wasn't started with Popen there is 226 # no `poll()` method available. Use `wait()` instead and do not wait 227 # for the reader thread because it would cause extra delays. 228 return self.wait(0, wait_reader=False) 229 230 def processOutput(self, timeout=None, outputTimeout=None): 231 """ 232 Handle process output until the process terminates or times out. 233 234 :param timeout: If not None, the process will be allowed to continue 235 for that number of seconds before being killed. 236 237 :outputTimeout: If not None, the process will be allowed to continue 238 for that number of seconds without producing any output before 239 being killed. 240 """ 241 # this method is kept for backward compatibility 242 if not hasattr(self, "proc"): 243 self.run(timeout=timeout, outputTimeout=outputTimeout) 244 # self.run will call this again 245 return 246 if not self.reader.is_alive(): 247 self.reader.timeout = timeout 248 self.reader.output_timeout = outputTimeout 249 self.reader.start(self.proc) 250 251 def run(self, timeout=None, outputTimeout=None): 252 """ 253 Starts the process. 254 255 :param timeout: If not None, the process will be allowed to continue for 256 that number of seconds before being killed. If the process is killed 257 due to a timeout, the onTimeout handler will be called. 258 259 :outputTimeout: If not None, the process will be allowed to continue 260 for that number of seconds without producing any output before 261 being killed. 262 """ 263 self.didTimeout = False 264 self.didOutputTimeout = False 265 266 # default arguments 267 args = dict( 268 stdout=subprocess.PIPE, 269 stderr=self._stderr, 270 cwd=self.cwd, 271 env=self.env, 272 ) 273 274 # build process arguments 275 args.update(self.keywordargs) 276 277 # launch the process 278 self.proc = psutil.Popen([self.cmd] + self.args, **args) 279 280 self.processOutput(timeout=timeout, outputTimeout=outputTimeout) 281 282 @_has_valid_proc 283 def update_process(self, new_pid, timeout=None): 284 """Update the internally managed process for the provided process ID. 285 286 When the application restarts itself, such as during an update, the new 287 process is essentially a fork of itself. To continue monitoring this 288 process, the process ID needs to be updated accordingly. 289 290 :param new_pid: The ID of the new (forked) process to track. 291 292 :timeout: If not None, the old process will be allowed to continue for 293 that number of seconds before being killed. 294 """ 295 if isPosix: 296 if new_pid == self.pid: 297 return 298 299 print( 300 'Child process with id "%s" has been marked as detached because it is no ' 301 "longer in the managed process group. Keeping reference to the process id " 302 '"%s" which is the new child process.' % (self.pid, new_pid), 303 file=sys.stdout, 304 ) 305 306 returncode = self.wait(timeout, wait_reader=False) 307 if returncode is None: 308 # If the process is still running force kill it. 309 returncode = self.kill() 310 311 if hasattr(self, "returncode"): 312 del self.returncode 313 314 self.proc = psutil.Process(new_pid) 315 self._debug( 316 f"New process status: {self.proc} (terminal={self.proc.terminal()})" 317 ) 318 319 return returncode 320 321 @_has_valid_proc 322 def wait(self, timeout=None, wait_reader=True): 323 """ 324 Waits until the process is terminated. 325 326 :param timeout: If not None, will return after timeout seconds. 327 This timeout only causes the wait function to return and 328 does not kill the process. 329 330 :param wait_reader: If set to True, it waits not only for the process 331 to exit but also for all output to be fully read. (Defaults to True). 332 333 Returns the process exit code value: 334 - None if the process hasn't terminated yet 335 - A negative number if the process was killed by signal N (Unix only) 336 - '0' if the process ended without failures 337 338 """ 339 # Thread.join() blocks the main thread until the reader thread is finished 340 # wake up once a second in case a keyboard interrupt is sent 341 if ( 342 wait_reader 343 and self.reader.thread 344 and self.reader.thread is not threading.current_thread() 345 ): 346 count = 0 347 while self.reader.is_alive(): 348 if timeout is not None and count > timeout: 349 self._debug("wait timeout for reader thread") 350 return None 351 self.reader.join(timeout=1) 352 count += 1 353 354 try: 355 self.proc.wait(timeout) 356 self._debug(f"Process status after wait: {self.proc}") 357 358 if not isinstance(self.proc, psutil.Popen): 359 self._debug( 360 "Not self-managed processes do not have a returncode. " 361 f"Setting its value to {UNKNOWN_RETURNCODE}." 362 ) 363 self.returncode = UNKNOWN_RETURNCODE 364 365 else: 366 self.returncode = self.proc.returncode 367 368 return self.returncode 369 except psutil.TimeoutExpired: 370 return None 371 372 373 class CallableList(list): 374 def __call__(self, *args, **kwargs): 375 for e in self: 376 e(*args, **kwargs) 377 378 def __add__(self, lst): 379 return CallableList(list.__add__(self, lst)) 380 381 382 class ProcessReader: 383 def __init__( 384 self, 385 stdout_callback=None, 386 stderr_callback=None, 387 finished_callback=None, 388 timeout_callback=None, 389 timeout=None, 390 output_timeout=None, 391 ): 392 self.stdout_callback = stdout_callback or (lambda line: True) 393 self.stderr_callback = stderr_callback or (lambda line: True) 394 self.finished_callback = finished_callback or (lambda: True) 395 self.timeout_callback = timeout_callback or (lambda: True) 396 self.timeout = timeout 397 self.output_timeout = output_timeout 398 self.thread = None 399 self.didOutputTimeout = False 400 401 def debug(self, msg): 402 if not MOZPROCESS_DEBUG: 403 return 404 405 print(f"DBG::MARIONETTE ProcessReader | {msg}", file=sys.stdout) 406 407 def _create_stream_reader(self, name, stream, queue, callback): 408 thread = threading.Thread( 409 name=name, target=self._read_stream, args=(stream, queue, callback) 410 ) 411 thread.daemon = True 412 thread.start() 413 return thread 414 415 def _read_stream(self, stream, queue, callback): 416 while True: 417 line = stream.readline() 418 if not line: 419 break 420 queue.put((line, callback)) 421 stream.close() 422 423 def start(self, proc): 424 queue = Queue() 425 stdout_reader = None 426 if proc.stdout: 427 stdout_reader = self._create_stream_reader( 428 "ProcessReaderStdout", proc.stdout, queue, self.stdout_callback 429 ) 430 stderr_reader = None 431 if proc.stderr and proc.stderr != proc.stdout: 432 stderr_reader = self._create_stream_reader( 433 "ProcessReaderStderr", proc.stderr, queue, self.stderr_callback 434 ) 435 self.thread = threading.Thread( 436 name="ProcessReader", 437 target=self._read, 438 args=(stdout_reader, stderr_reader, queue), 439 ) 440 self.thread.daemon = True 441 self.thread.start() 442 self.debug("ProcessReader started") 443 444 def _read(self, stdout_reader, stderr_reader, queue): 445 start_time = time.time() 446 timed_out = False 447 timeout = self.timeout 448 if timeout is not None: 449 timeout += start_time 450 output_timeout = self.output_timeout 451 if output_timeout is not None: 452 output_timeout += start_time 453 454 while (stdout_reader and stdout_reader.is_alive()) or ( 455 stderr_reader and stderr_reader.is_alive() 456 ): 457 has_line = True 458 try: 459 line, callback = queue.get(True, INTERVAL_PROCESS_ALIVE_CHECK) 460 except Empty: 461 has_line = False 462 now = time.time() 463 if not has_line: 464 if output_timeout is not None and now > output_timeout: 465 timed_out = True 466 self.didOutputTimeout = True 467 break 468 else: 469 if output_timeout is not None: 470 output_timeout = now + self.output_timeout 471 callback(line.rstrip()) 472 if timeout is not None and now > timeout: 473 timed_out = True 474 break 475 self.debug("_read loop exited") 476 # process remaining lines to read 477 while not queue.empty(): 478 line, callback = queue.get(False) 479 try: 480 callback(line.rstrip()) 481 except Exception: 482 traceback.print_exc() 483 if timed_out: 484 try: 485 self.timeout_callback() 486 except Exception: 487 traceback.print_exc() 488 if stdout_reader: 489 stdout_reader.join() 490 if stderr_reader: 491 stderr_reader.join() 492 if not timed_out: 493 try: 494 self.finished_callback() 495 except Exception: 496 traceback.print_exc() 497 self.debug("_read exited") 498 499 def is_alive(self): 500 if self.thread: 501 return self.thread.is_alive() 502 return False 503 504 def join(self, timeout=None): 505 if self.thread: 506 self.thread.join(timeout=timeout) 507 508 509 # default output handlers 510 # these should be callables that take the output line 511 512 513 class StoreOutput: 514 """accumulate stdout""" 515 516 def __init__(self): 517 self.output = [] 518 519 def __call__(self, line): 520 self.output.append(line) 521 522 523 class StreamOutput: 524 """pass output to a stream and flush""" 525 526 def __init__(self, stream, text=True): 527 self.stream = stream 528 self.text = text 529 530 def __call__(self, line): 531 if self.text: 532 if isinstance(line, bytes): 533 line = line.decode(errors="ignore") 534 line += "\n" 535 else: 536 if isinstance(line, str): 537 line = line.encode(errors="ignore") 538 line += b"\n" 539 try: 540 self.stream.write(line) 541 except TypeError: 542 print( 543 "HEY! If you're reading this, you're about to encounter a " 544 "type error, probably as a result of a conversion from " 545 "Python 2 to Python 3. This is almost definitely because " 546 "you're trying to write binary data to a text-encoded " 547 "stream, or text data to a binary-encoded stream. Check how " 548 "you're instantiating your ProcessHandler and if the output " 549 "should be text-encoded, make sure you pass " 550 "universal_newlines=True.", 551 file=sys.stderr, 552 ) 553 raise 554 self.stream.flush() 555 556 557 class LogOutput(StreamOutput): 558 """pass output to a file""" 559 560 def __init__(self, filename): 561 self.file_obj = open(filename, "a") 562 StreamOutput.__init__(self, self.file_obj, True) 563 564 def __del__(self): 565 if self.file_obj is not None: 566 self.file_obj.close() 567 568 569 # front end class with the default handlers 570 571 572 class ProcessHandler(ProcessHandlerMixin): 573 """ 574 Convenience class for handling processes with default output handlers. 575 576 By default, all output is sent to stdout. This can be disabled by setting 577 the *stream* argument to None. 578 579 If processOutputLine keyword argument is specified the function or the 580 list of functions specified by this argument will be called for each line 581 of output; the output will not be written to stdout automatically then 582 if stream is True (the default). 583 584 If storeOutput==True, the output produced by the process will be saved 585 as self.output. 586 587 If logfile is not None, the output produced by the process will be 588 appended to the given file. 589 """ 590 591 def __init__(self, cmd, logfile=None, stream=True, storeOutput=True, **kwargs): 592 kwargs.setdefault("processOutputLine", []) 593 if callable(kwargs["processOutputLine"]): 594 kwargs["processOutputLine"] = [kwargs["processOutputLine"]] 595 596 if logfile: 597 logoutput = LogOutput(logfile) 598 kwargs["processOutputLine"].append(logoutput) 599 600 text = kwargs.get("universal_newlines", False) or kwargs.get("text", False) 601 602 if stream is True: 603 if text: 604 # The encoding of stdout isn't guaranteed to be utf-8. Fix that. 605 stdout = codecs.getwriter("utf-8")(sys.stdout.buffer) 606 else: 607 stdout = sys.stdout.buffer 608 609 if not kwargs["processOutputLine"]: 610 kwargs["processOutputLine"].append(StreamOutput(stdout, text)) 611 elif stream: 612 streamoutput = StreamOutput(stream, text) 613 kwargs["processOutputLine"].append(streamoutput) 614 615 self.output = None 616 if storeOutput: 617 storeoutput = StoreOutput() 618 self.output = storeoutput.output 619 kwargs["processOutputLine"].append(storeoutput) 620 621 ProcessHandlerMixin.__init__(self, cmd, **kwargs)