processhandler.py (53137B)
1 # This Source Code Form is subject to the terms of the Mozilla Public 2 # License, v. 2.0. If a copy of the MPL was not distributed with this file, 3 # You can obtain one at http://mozilla.org/MPL/2.0/. 4 5 # The mozprocess ProcessHandler and ProcessHandlerMixin are typically used as 6 # an alternative to the python subprocess module. They have been used in many 7 # Mozilla test harnesses with some success -- but also with on-going concerns, 8 # especially regarding reliability and exception handling. 9 # 10 # New code should try to use the standard subprocess module, and only use 11 # this ProcessHandler if absolutely necessary. 12 13 import codecs 14 import errno 15 import io 16 import os 17 import signal 18 import subprocess 19 import sys 20 import threading 21 import time 22 import traceback 23 from datetime import datetime 24 from queue import Empty, Queue 25 26 # Set the MOZPROCESS_DEBUG environment variable to 1 to see some debugging output 27 MOZPROCESS_DEBUG = os.getenv("MOZPROCESS_DEBUG") 28 29 INTERVAL_PROCESS_ALIVE_CHECK = 0.02 30 31 # We dont use mozinfo because it is expensive to import, see bug 933558. 32 isWin = os.name == "nt" 33 isPosix = os.name == "posix" # includes MacOS X 34 35 if isWin: 36 from ctypes import WinError, addressof, byref, c_longlong, c_ulong, sizeof 37 38 from . import winprocess 39 from .qijo import ( 40 IO_COUNTERS, 41 JOBOBJECT_ASSOCIATE_COMPLETION_PORT, 42 JOBOBJECT_BASIC_LIMIT_INFORMATION, 43 JOBOBJECT_EXTENDED_LIMIT_INFORMATION, 44 JobObjectAssociateCompletionPortInformation, 45 JobObjectExtendedLimitInformation, 46 ) 47 48 49 class ProcessHandlerMixin: 50 """ 51 A class for launching and manipulating local processes. 52 53 :param cmd: command to run. May be a string or a list. If specified as a list, the first 54 element will be interpreted as the command, and all additional elements will be interpreted 55 as arguments to that command. 56 :param args: list of arguments to pass to the command (defaults to None). Must not be set when 57 `cmd` is specified as a list. 58 :param cwd: working directory for command (defaults to None). 59 :param env: is the environment to use for the process (defaults to os.environ). 60 :param ignore_children: causes system to ignore child processes when True, 61 defaults to False (which tracks child processes). 62 :param kill_on_timeout: when True, the process will be killed when a timeout is reached. 63 When False, the caller is responsible for killing the process. 64 Failure to do so could cause a call to wait() to hang indefinitely. (Defaults to True.) 65 :param processOutputLine: function or list of functions to be called for 66 each line of output produced by the process (defaults to an empty 67 list). 68 :param processStderrLine: function or list of functions to be called 69 for each line of error output - stderr - produced by the process 70 (defaults to an empty list). If this is not specified, stderr lines 71 will be sent to the *processOutputLine* callbacks. 72 :param onTimeout: function or list of functions to be called when the process times out. 73 :param onFinish: function or list of functions to be called when the process terminates 74 normally without timing out. 75 :param kwargs: additional keyword args to pass directly into Popen. 76 77 NOTE: Child processes will be tracked by default. If for any reason 78 we are unable to track child processes and ignore_children is set to False, 79 then we will fall back to only tracking the root process. The fallback 80 will be logged. 81 """ 82 83 class Process(subprocess.Popen): 84 """ 85 Represents our view of a subprocess. 86 It adds a kill() method which allows it to be stopped explicitly. 87 """ 88 89 MAX_IOCOMPLETION_PORT_NOTIFICATION_DELAY = 180 90 TIMEOUT_BEFORE_SIGKILL = 1.0 91 92 def __init__( 93 self, 94 args, 95 bufsize=0, 96 executable=None, 97 stdin=None, 98 stdout=None, 99 stderr=None, 100 preexec_fn=None, 101 close_fds=False, 102 shell=False, 103 cwd=None, 104 env=None, 105 universal_newlines=False, 106 startupinfo=None, 107 creationflags=0, 108 ignore_children=False, 109 encoding="utf-8", 110 ): 111 # Parameter for whether or not we should attempt to track child processes 112 self._ignore_children = ignore_children 113 self._job = None 114 self._io_port = None 115 if isWin: 116 self._cleanup_lock = threading.Lock() 117 118 if not self._ignore_children and not isWin: 119 # Set the process group id for linux systems 120 # Sets process group id to the pid of the parent process 121 # NOTE: This prevents you from using preexec_fn and managing 122 # child processes, TODO: Ideally, find a way around this 123 def setpgidfn(): 124 os.setpgid(0, 0) 125 126 preexec_fn = setpgidfn 127 128 kwargs = { 129 "bufsize": bufsize, 130 "executable": executable, 131 "stdin": stdin, 132 "stdout": stdout, 133 "stderr": stderr, 134 "preexec_fn": preexec_fn, 135 "close_fds": close_fds, 136 "shell": shell, 137 "cwd": cwd, 138 "env": env, 139 "startupinfo": startupinfo, 140 "creationflags": creationflags, 141 } 142 if sys.version_info.minor >= 6 and universal_newlines: 143 kwargs["universal_newlines"] = universal_newlines 144 kwargs["encoding"] = encoding 145 try: 146 subprocess.Popen.__init__(self, args, **kwargs) 147 except OSError: 148 print(args, file=sys.stderr) 149 raise 150 151 def debug(self, msg): 152 if not MOZPROCESS_DEBUG: 153 return 154 thread = threading.current_thread().name 155 print(f"DBG::MOZPROC PID:{self.pid} ({thread}) | {msg}") 156 157 def __del__(self): 158 if isWin: 159 _maxint = sys.maxsize 160 handle = getattr(self, "_handle", None) 161 if handle: 162 # _internal_poll is a Python3 built-in call and requires _handle to be an int on Windows 163 # It's only an AutoHANDLE for legacy Python2 reasons that are non-trivial to remove 164 self._handle = int(self._handle) 165 self._internal_poll(_deadstate=_maxint) 166 # Revert it back to the saved 'handle' (AutoHANDLE) for self._cleanup() 167 self._handle = handle 168 if handle or self._job or self._io_port: 169 self._cleanup() 170 else: 171 subprocess.Popen.__del__(self) 172 173 def send_signal(self, sig=None): 174 if isWin: 175 try: 176 if not self._ignore_children and self._handle and self._job: 177 self.debug("calling TerminateJobObject") 178 winprocess.TerminateJobObject( 179 self._job, winprocess.ERROR_CONTROL_C_EXIT 180 ) 181 elif self._handle: 182 self.debug("calling TerminateProcess") 183 winprocess.TerminateProcess( 184 self._handle, winprocess.ERROR_CONTROL_C_EXIT 185 ) 186 except OSError: 187 self._cleanup() 188 189 traceback.print_exc() 190 raise OSError("Could not terminate process") 191 192 else: 193 194 def send_sig(sig, retries=0): 195 pid = self.detached_pid or self.pid 196 if not self._ignore_children: 197 try: 198 os.killpg(pid, sig) 199 except BaseException as e: 200 # On Mac OSX if the process group contains zombie 201 # processes, killpg results in an EPERM. 202 # In this case, zombie processes need to be reaped 203 # before continuing 204 # Note: A negative pid refers to the entire process 205 # group 206 if retries < 1 and getattr(e, "errno", None) == errno.EPERM: 207 try: 208 os.waitpid(-pid, 0) 209 finally: 210 return send_sig(sig, retries + 1) 211 212 # ESRCH is a "no such process" failure, which is fine because the 213 # application might already have been terminated itself. Any other 214 # error would indicate a problem in killing the process. 215 if getattr(e, "errno", None) != errno.ESRCH: 216 print( 217 "Could not terminate process: %s" % self.pid, 218 file=sys.stderr, 219 ) 220 raise 221 else: 222 os.kill(pid, sig) 223 224 if sig is None and isPosix: 225 # ask the process for termination and wait a bit 226 send_sig(signal.SIGTERM) 227 limit = time.time() + self.TIMEOUT_BEFORE_SIGKILL 228 while time.time() <= limit: 229 if self.poll() is not None: 230 # process terminated nicely 231 break 232 time.sleep(INTERVAL_PROCESS_ALIVE_CHECK) 233 else: 234 # process did not terminate - send SIGKILL to force 235 send_sig(signal.SIGKILL) 236 else: 237 # a signal was explicitly set or not posix 238 send_sig(sig or signal.SIGKILL) 239 240 def kill(self, sig=None, timeout=None): 241 self.send_signal(sig) 242 self.returncode = self.wait(timeout) 243 self._cleanup() 244 return self.returncode 245 246 def poll(self): 247 """Popen.poll 248 Check if child process has terminated. Set and return returncode attribute. 249 """ 250 if isWin: 251 returncode = self._custom_wait(timeout=0) 252 else: 253 returncode = subprocess.Popen.poll(self) 254 if returncode is not None: 255 self._cleanup() 256 return returncode 257 258 def wait(self, timeout=None): 259 """Popen.wait 260 Called to wait for a running process to shut down and return 261 its exit code 262 Returns the main process's exit code 263 """ 264 # This call will be different for each OS 265 self.returncode = self._custom_wait(timeout=timeout) 266 if self.returncode is not None: 267 self._cleanup() 268 return self.returncode 269 270 """ Private Members of Process class """ 271 272 if isWin: 273 # Redefine the execute child so that we can track process groups 274 def _execute_child(self, *args_tuple): 275 ( 276 args, 277 executable, 278 preexec_fn, 279 close_fds, 280 pass_fds, 281 cwd, 282 env, 283 startupinfo, 284 creationflags, 285 shell, 286 p2cread, 287 p2cwrite, 288 c2pread, 289 c2pwrite, 290 errread, 291 errwrite, 292 *_, 293 ) = args_tuple 294 if not isinstance(args, str): 295 args = subprocess.list2cmdline(args) 296 297 # Always or in the create new process group 298 creationflags |= winprocess.CREATE_NEW_PROCESS_GROUP 299 300 if startupinfo is None: 301 startupinfo = winprocess.STARTUPINFO() 302 303 if None not in (p2cread, c2pwrite, errwrite): 304 startupinfo.dwFlags |= winprocess.STARTF_USESTDHANDLES 305 startupinfo.hStdInput = int(p2cread) 306 startupinfo.hStdOutput = int(c2pwrite) 307 startupinfo.hStdError = int(errwrite) 308 if shell: 309 startupinfo.dwFlags |= winprocess.STARTF_USESHOWWINDOW 310 startupinfo.wShowWindow = winprocess.SW_HIDE 311 comspec = os.environ.get("COMSPEC", "cmd.exe") 312 args = comspec + " /c " + args 313 314 # Determine if we can create a job or create nested jobs. 315 can_create_job = winprocess.CanCreateJobObject() 316 can_nest_jobs = self._can_nest_jobs() 317 318 # Ensure we write a warning message if we are falling back 319 if not (can_create_job or can_nest_jobs) and not self._ignore_children: 320 # We can't create job objects AND the user wanted us to 321 # Warn the user about this. 322 print( 323 "ProcessManager UNABLE to use job objects to manage " 324 "child processes", 325 file=sys.stderr, 326 ) 327 328 # set process creation flags 329 creationflags |= winprocess.CREATE_SUSPENDED 330 creationflags |= winprocess.CREATE_UNICODE_ENVIRONMENT 331 if can_create_job: 332 creationflags |= winprocess.CREATE_BREAKAWAY_FROM_JOB 333 if not (can_create_job or can_nest_jobs): 334 # Since we've warned, we just log info here to inform you 335 # of the consequence of setting ignore_children = True 336 print("ProcessManager NOT managing child processes") 337 338 # create the process 339 hp, ht, pid, tid = winprocess.CreateProcess( 340 executable, 341 args, 342 None, 343 None, # No special security 344 1, # Must inherit handles! 345 creationflags, 346 winprocess.EnvironmentBlock(env), 347 cwd, 348 startupinfo, 349 ) 350 self._child_created = True 351 self._handle = hp 352 self._thread = ht 353 self.pid = pid 354 self.tid = tid 355 356 if not self._ignore_children and (can_create_job or can_nest_jobs): 357 try: 358 # We create a new job for this process, so that we can kill 359 # the process and any sub-processes 360 # Create the IO Completion Port 361 self._io_port = winprocess.CreateIoCompletionPort() 362 self._job = winprocess.CreateJobObject() 363 364 # Now associate the io comp port and the job object 365 joacp = JOBOBJECT_ASSOCIATE_COMPLETION_PORT( 366 winprocess.COMPKEY_JOBOBJECT, self._io_port 367 ) 368 winprocess.SetInformationJobObject( 369 self._job, 370 JobObjectAssociateCompletionPortInformation, 371 addressof(joacp), 372 sizeof(joacp), 373 ) 374 375 # Allow subprocesses to break away from us - necessary when 376 # Firefox restarts, or flash with protected mode 377 limit_flags = winprocess.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE 378 if not can_nest_jobs: 379 # This allows sandbox processes to create their own job, 380 # and is necessary to set for older versions of Windows 381 # without nested job support. 382 limit_flags |= winprocess.JOB_OBJECT_LIMIT_BREAKAWAY_OK 383 384 jbli = JOBOBJECT_BASIC_LIMIT_INFORMATION( 385 c_longlong(0), # per process time limit (ignored) 386 c_longlong(0), # per job user time limit (ignored) 387 limit_flags, 388 0, # min working set (ignored) 389 0, # max working set (ignored) 390 0, # active process limit (ignored) 391 None, # affinity (ignored) 392 0, # Priority class (ignored) 393 0, # Scheduling class (ignored) 394 ) 395 396 iocntr = IO_COUNTERS() 397 jeli = JOBOBJECT_EXTENDED_LIMIT_INFORMATION( 398 jbli, # basic limit info struct 399 iocntr, # io_counters (ignored) 400 0, # process mem limit (ignored) 401 0, # job mem limit (ignored) 402 0, # peak process limit (ignored) 403 0, 404 ) # peak job limit (ignored) 405 406 winprocess.SetInformationJobObject( 407 self._job, 408 JobObjectExtendedLimitInformation, 409 addressof(jeli), 410 sizeof(jeli), 411 ) 412 413 # Assign the job object to the process 414 winprocess.AssignProcessToJobObject(self._job, int(hp)) 415 416 # It's overkill, but we use Queue to signal between threads 417 # because it handles errors more gracefully than event or condition. 418 self._process_events = Queue() 419 420 # Spin up our thread for managing the IO Completion Port 421 self._procmgrthread = threading.Thread(target=self._procmgr) 422 except Exception: 423 print( 424 """Exception trying to use job objects; 425 falling back to not using job objects for managing child processes""", 426 file=sys.stderr, 427 ) 428 tb = traceback.format_exc() 429 print(tb, file=sys.stderr) 430 # Ensure no dangling handles left behind 431 self._cleanup_job_io_port() 432 else: 433 self._job = None 434 435 winprocess.ResumeThread(int(ht)) 436 if getattr(self, "_procmgrthread", None): 437 self._procmgrthread.start() 438 ht.Close() 439 440 for i in (p2cread, c2pwrite, errwrite): 441 if i is not None: 442 i.Close() 443 444 # Per: 445 # https://msdn.microsoft.com/en-us/library/windows/desktop/hh448388%28v=vs.85%29.aspx 446 # Nesting jobs came in with windows versions starting with 6.2 according to the table 447 # on this page: 448 # https://msdn.microsoft.com/en-us/library/ms724834%28v=vs.85%29.aspx 449 def _can_nest_jobs(self): 450 winver = sys.getwindowsversion() 451 return winver.major > 6 or winver.major == 6 and winver.minor >= 2 452 453 # Windows Process Manager - watches the IO Completion Port and 454 # keeps track of child processes 455 def _procmgr(self): 456 if not (self._io_port) or not (self._job): 457 return 458 459 try: 460 self._poll_iocompletion_port() 461 except Exception: 462 traceback.print_exc() 463 # If _poll_iocompletion_port threw an exception for some unexpected reason, 464 # send an event that will make _custom_wait throw an Exception. 465 self._process_events.put({}) 466 except KeyboardInterrupt: 467 raise KeyboardInterrupt 468 469 def _poll_iocompletion_port(self): 470 # Watch the IO Completion port for status 471 self._spawned_procs = {} 472 countdowntokill = 0 473 474 self.debug("start polling IO completion port") 475 476 while True: 477 msgid = c_ulong(0) 478 compkey = c_ulong(0) 479 pid = c_ulong(0) 480 portstatus = winprocess.GetQueuedCompletionStatus( 481 self._io_port, byref(msgid), byref(compkey), byref(pid), 5000 482 ) 483 484 # If the countdowntokill has been activated, we need to check 485 # if we should start killing the children or not. 486 if countdowntokill != 0: 487 diff = datetime.now() - countdowntokill 488 # Arbitrarily wait 3 minutes for windows to get its act together 489 # Windows sometimes takes a small nap between notifying the 490 # IO Completion port and actually killing the children, and we 491 # don't want to mistake that situation for the situation of an unexpected 492 # parent abort (which is what we're looking for here). 493 if diff.seconds > self.MAX_IOCOMPLETION_PORT_NOTIFICATION_DELAY: 494 print( 495 "WARNING | IO Completion Port failed to signal " 496 "process shutdown", 497 file=sys.stderr, 498 ) 499 print( 500 "Parent process %s exited with children alive:" 501 % self.pid, 502 file=sys.stderr, 503 ) 504 print( 505 "PIDS: %s" 506 % ", ".join([str(i) for i in self._spawned_procs]), 507 file=sys.stderr, 508 ) 509 print( 510 "Attempting to kill them, but no guarantee of success", 511 file=sys.stderr, 512 ) 513 514 self.send_signal() 515 self._process_events.put({self.pid: "FINISHED"}) 516 break 517 518 if not portstatus: 519 # Check to see what happened 520 errcode = winprocess.GetLastError() 521 if errcode == winprocess.ERROR_ABANDONED_WAIT_0: 522 # Then something has killed the port, break the loop 523 print( 524 "IO Completion Port unexpectedly closed", 525 file=sys.stderr, 526 ) 527 self._process_events.put({self.pid: "FINISHED"}) 528 break 529 elif errcode == winprocess.WAIT_TIMEOUT: 530 # Timeouts are expected, just keep on polling 531 continue 532 else: 533 print( 534 "Error Code %s trying to query IO Completion Port, " 535 "exiting" % errcode, 536 file=sys.stderr, 537 ) 538 raise WinError(errcode) 539 break 540 541 if compkey.value == winprocess.COMPKEY_TERMINATE.value: 542 self.debug("compkeyterminate detected") 543 # Then we're done 544 break 545 546 # Check the status of the IO Port and do things based on it 547 if compkey.value == winprocess.COMPKEY_JOBOBJECT.value: 548 if msgid.value == winprocess.JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO: 549 # No processes left, time to shut down 550 # Signal anyone waiting on us that it is safe to shut down 551 self.debug("job object msg active processes zero") 552 self._process_events.put({self.pid: "FINISHED"}) 553 break 554 elif msgid.value == winprocess.JOB_OBJECT_MSG_NEW_PROCESS: 555 # New Process started 556 # Add the child proc to our list in case our parent flakes out on us 557 # without killing everything. 558 if pid.value != self.pid: 559 self._spawned_procs[pid.value] = 1 560 self.debug( 561 "new process detected with pid value: %s" 562 % pid.value 563 ) 564 elif msgid.value == winprocess.JOB_OBJECT_MSG_EXIT_PROCESS: 565 self.debug("process id %s exited normally" % pid.value) 566 # One process exited normally 567 if pid.value == self.pid and len(self._spawned_procs) > 0: 568 # Parent process dying, start countdown timer 569 countdowntokill = datetime.now() 570 elif pid.value in self._spawned_procs: 571 # Child Process died remove from list 572 del self._spawned_procs[pid.value] 573 elif ( 574 msgid.value 575 == winprocess.JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS 576 ): 577 # One process existed abnormally 578 self.debug("process id %s exited abnormally" % pid.value) 579 if pid.value == self.pid and len(self._spawned_procs) > 0: 580 # Parent process dying, start countdown timer 581 countdowntokill = datetime.now() 582 elif pid.value in self._spawned_procs: 583 # Child Process died remove from list 584 del self._spawned_procs[pid.value] 585 else: 586 # We don't care about anything else 587 self.debug("We got a message %s" % msgid.value) 588 pass 589 590 def _custom_wait(self, timeout=None): 591 """Custom implementation of wait. 592 593 - timeout: number of seconds before timing out. If None, 594 will wait indefinitely. 595 """ 596 # First, check to see if the process is still running 597 if self._handle: 598 returncode = winprocess.GetExitCodeProcess(self._handle) 599 if returncode != winprocess.STILL_ACTIVE: 600 self.returncode = returncode 601 else: 602 # Dude, the process is like totally dead! 603 return self.returncode 604 605 # On Windows, an unlimited timeout prevents KeyboardInterrupt from 606 # being caught. 607 the_timeout = 0.1 if timeout is None else timeout 608 609 if self._job: 610 self.debug("waiting with IO completion port") 611 # Then we are managing with IO Completion Ports 612 # wait on a signal so we know when we have seen the last 613 # process come through. 614 # We use queues to synchronize between the thread and this 615 # function because events just didn't have robust enough error 616 # handling on pre-2.7 versions 617 try: 618 while True: 619 try: 620 item = self._process_events.get(timeout=the_timeout) 621 except Empty: 622 # The timeout was not given by the user, we just have a 623 # timeout to allow KeyboardInterrupt, so retry. 624 if timeout is None: 625 continue 626 else: 627 raise 628 break 629 630 # re-emit the event in case some other thread is also calling wait() 631 self._process_events.put(item) 632 if item[self.pid] == "FINISHED": 633 self.debug("received 'FINISHED' from _procmgrthread") 634 self._process_events.task_done() 635 except Empty: 636 # There was no event within the expected time. 637 pass 638 except Exception: 639 traceback.print_exc() 640 raise OSError( 641 "IO Completion Port failed to signal process shutdown" 642 ) 643 finally: 644 if self._handle: 645 returncode = winprocess.GetExitCodeProcess(self._handle) 646 if returncode != winprocess.STILL_ACTIVE: 647 self.returncode = returncode 648 649 else: 650 # Not managing with job objects, so all we can reasonably do 651 # is call waitforsingleobject and hope for the best 652 self.debug("waiting without IO completion port") 653 654 if not self._ignore_children: 655 self.debug("NOT USING JOB OBJECTS!!!") 656 # First, make sure we have not already ended 657 if self.returncode is not None: 658 return self.returncode 659 660 rc = None 661 if self._handle: 662 # timeout for WaitForSingleObject is in ms 663 the_timeout = int(the_timeout * 1000) 664 while True: 665 rc = winprocess.WaitForSingleObject( 666 self._handle, the_timeout 667 ) 668 # The timeout was not given by the user, we just have a 669 # timeout to allow KeyboardInterrupt, so retry. 670 if timeout is None and rc == winprocess.WAIT_TIMEOUT: 671 continue 672 break 673 674 if rc == winprocess.WAIT_TIMEOUT: 675 # Timeout happened as asked. 676 pass 677 elif rc == winprocess.WAIT_OBJECT_0: 678 # We caught WAIT_OBJECT_0, which indicates all is well 679 print("Single process terminated successfully") 680 self.returncode = winprocess.GetExitCodeProcess(self._handle) 681 else: 682 # An error occured we should probably throw 683 rc = winprocess.GetLastError() 684 if rc: 685 raise WinError(rc) 686 687 return self.returncode 688 689 def _cleanup_job_io_port(self): 690 """Do the job and IO port cleanup separately because there are 691 cases where we want to clean these without killing _handle 692 (i.e. if we fail to create the job object in the first place) 693 """ 694 if ( 695 getattr(self, "_job") 696 and self._job != winprocess.INVALID_HANDLE_VALUE 697 ): 698 self._job.Close() 699 self._job = None 700 else: 701 # If windows already freed our handle just set it to none 702 # (saw this intermittently while testing) 703 self._job = None 704 705 if ( 706 getattr(self, "_io_port", None) 707 and self._io_port != winprocess.INVALID_HANDLE_VALUE 708 ): 709 self._io_port.Close() 710 self._io_port = None 711 else: 712 self._io_port = None 713 714 if getattr(self, "_procmgrthread", None): 715 self._procmgrthread = None 716 717 def _cleanup(self): 718 self._cleanup_lock.acquire() 719 self._cleanup_job_io_port() 720 if self._thread and self._thread != winprocess.INVALID_HANDLE_VALUE: 721 self._thread.Close() 722 self._thread = None 723 else: 724 self._thread = None 725 726 if self._handle and self._handle != winprocess.INVALID_HANDLE_VALUE: 727 self._handle.Close() 728 self._handle = None 729 else: 730 self._handle = None 731 self._cleanup_lock.release() 732 733 else: 734 735 def _custom_wait(self, timeout=None): 736 """Haven't found any reason to differentiate between these platforms 737 so they all use the same wait callback. If it is necessary to 738 craft different styles of wait, then a new _custom_wait method 739 could be easily implemented. 740 """ 741 # For non-group wait, call base class 742 try: 743 subprocess.Popen.wait(self, timeout=timeout) 744 except subprocess.TimeoutExpired: 745 # We want to return None in this case 746 pass 747 return self.returncode 748 749 def _cleanup(self): 750 pass 751 752 def __init__( 753 self, 754 cmd, 755 args=None, 756 cwd=None, 757 env=None, 758 ignore_children=False, 759 kill_on_timeout=True, 760 processOutputLine=(), 761 processStderrLine=(), 762 onTimeout=(), 763 onFinish=(), 764 **kwargs, 765 ): 766 self.cmd = cmd 767 self.args = args 768 self.cwd = cwd 769 self.didTimeout = False 770 self.didOutputTimeout = False 771 self._ignore_children = ignore_children 772 self.keywordargs = kwargs 773 self.read_buffer = "" 774 775 if env is None: 776 env = os.environ.copy() 777 self.env = env 778 779 # handlers 780 def to_callable_list(arg): 781 if callable(arg): 782 arg = [arg] 783 return CallableList(arg) 784 785 processOutputLine = to_callable_list(processOutputLine) 786 processStderrLine = to_callable_list(processStderrLine) 787 onTimeout = to_callable_list(onTimeout) 788 onFinish = to_callable_list(onFinish) 789 790 def on_timeout(): 791 self.didTimeout = True 792 self.didOutputTimeout = self.reader.didOutputTimeout 793 if kill_on_timeout: 794 self.kill() 795 796 onTimeout.insert(0, on_timeout) 797 798 self._stderr = subprocess.STDOUT 799 if processStderrLine: 800 self._stderr = subprocess.PIPE 801 self.reader = ProcessReader( 802 stdout_callback=processOutputLine, 803 stderr_callback=processStderrLine, 804 finished_callback=onFinish, 805 timeout_callback=onTimeout, 806 ) 807 808 # It is common for people to pass in the entire array with the cmd and 809 # the args together since this is how Popen uses it. Allow for that. 810 if isinstance(self.cmd, list): 811 if self.args is not None: 812 raise TypeError("cmd and args must not both be lists") 813 (self.cmd, self.args) = (self.cmd[0], self.cmd[1:]) 814 elif self.args is None: 815 self.args = [] 816 817 def debug(self, msg): 818 if not MOZPROCESS_DEBUG: 819 return 820 cmd = self.cmd.split(os.sep)[-1:] 821 print(f"DBG::MOZPROC ProcessHandlerMixin {cmd} | {msg}") 822 823 @property 824 def timedOut(self): 825 """True if the process has timed out for any reason.""" 826 return self.didTimeout 827 828 @property 829 def outputTimedOut(self): 830 """True if the process has timed out for no output.""" 831 return self.didOutputTimeout 832 833 @property 834 def commandline(self): 835 """the string value of the command line (command + args)""" 836 return subprocess.list2cmdline([self.cmd] + self.args) 837 838 def run(self, timeout=None, outputTimeout=None): 839 """ 840 Starts the process. 841 842 If timeout is not None, the process will be allowed to continue for 843 that number of seconds before being killed. If the process is killed 844 due to a timeout, the onTimeout handler will be called. 845 846 If outputTimeout is not None, the process will be allowed to continue 847 for that number of seconds without producing any output before 848 being killed. 849 """ 850 self.didTimeout = False 851 self.didOutputTimeout = False 852 853 # default arguments 854 args = dict( 855 stdout=subprocess.PIPE, 856 stderr=self._stderr, 857 cwd=self.cwd, 858 env=self.env, 859 ignore_children=self._ignore_children, 860 ) 861 862 # build process arguments 863 args.update(self.keywordargs) 864 865 # launch the process 866 self.proc = self.Process([self.cmd] + self.args, **args) 867 868 if isPosix: 869 # Keep track of the initial process group in case the process detaches itself 870 self.proc.pgid = self._getpgid(self.proc.pid) 871 self.proc.detached_pid = None 872 873 self.processOutput(timeout=timeout, outputTimeout=outputTimeout) 874 875 def kill(self, sig=None, timeout=None): 876 """ 877 Kills the managed process. 878 879 If you created the process with 'ignore_children=False' (the 880 default) then it will also also kill all child processes spawned by 881 it. If you specified 'ignore_children=True' when creating the 882 process, only the root process will be killed. 883 884 Note that this does not manage any state, save any output etc, 885 it immediately kills the process. 886 887 :param sig: Signal used to kill the process, defaults to SIGKILL 888 (has no effect on Windows) 889 """ 890 if not hasattr(self, "proc"): 891 raise RuntimeError("Process hasn't been started yet") 892 893 self.proc.kill(sig=sig, timeout=timeout) 894 895 # When we kill the the managed process we also have to wait for the 896 # reader thread to be finished. Otherwise consumers would have to assume 897 # that it still has not completely shutdown. 898 rc = self.wait(0) 899 if rc is None: 900 self.debug("kill: wait failed -- process is still alive") 901 return rc 902 903 def poll(self): 904 """Check if child process has terminated 905 906 Returns the current returncode value: 907 - None if the process hasn't terminated yet 908 - A negative number if the process was killed by signal N (Unix only) 909 - '0' if the process ended without failures 910 911 """ 912 if not hasattr(self, "proc"): 913 raise RuntimeError("Process hasn't been started yet") 914 915 # Ensure that we first check for the reader status. Otherwise 916 # we might mark the process as finished while output is still getting 917 # processed. 918 elif not self._ignore_children and self.reader.is_alive(): 919 return None 920 elif hasattr(self, "returncode"): 921 return self.returncode 922 else: 923 return self.proc.poll() 924 925 def processOutput(self, timeout=None, outputTimeout=None): 926 """ 927 Handle process output until the process terminates or times out. 928 929 If timeout is not None, the process will be allowed to continue for 930 that number of seconds before being killed. 931 932 If outputTimeout is not None, the process will be allowed to continue 933 for that number of seconds without producing any output before 934 being killed. 935 """ 936 # this method is kept for backward compatibility 937 if not hasattr(self, "proc"): 938 self.run(timeout=timeout, outputTimeout=outputTimeout) 939 # self.run will call this again 940 return 941 if not self.reader.is_alive(): 942 self.reader.timeout = timeout 943 self.reader.output_timeout = outputTimeout 944 self.reader.start(self.proc) 945 946 def wait(self, timeout=None): 947 """ 948 Waits until all output has been read and the process is 949 terminated. 950 951 If timeout is not None, will return after timeout seconds. 952 This timeout only causes the wait function to return and 953 does not kill the process. 954 955 Returns the process exit code value: 956 - None if the process hasn't terminated yet 957 - A negative number if the process was killed by signal N (Unix only) 958 - '0' if the process ended without failures 959 960 """ 961 self.returncode = self.proc.wait(timeout) 962 if ( 963 self.returncode is not None 964 and self.reader.thread 965 and self.reader.thread is not threading.current_thread() 966 ): 967 # If children are ignored and a child is still running because it's 968 # been daemonized or something, the reader might still be attached 969 # to that child'd output... and joining will deadlock. 970 # So instead, we wait for there to be no more active reading still 971 # happening. 972 if self._ignore_children: 973 while self.reader.is_still_reading(timeout=0.1): 974 time.sleep(0.1) 975 else: 976 self.reader.join() 977 return self.returncode 978 979 @property 980 def pid(self): 981 if not hasattr(self, "proc"): 982 raise RuntimeError("Process hasn't been started yet") 983 984 return self.proc.pid 985 986 @staticmethod 987 def pid_exists(pid): 988 if pid < 0: 989 return False 990 991 if isWin: 992 try: 993 process = winprocess.OpenProcess( 994 winprocess.PROCESS_QUERY_INFORMATION | winprocess.PROCESS_VM_READ, 995 False, 996 pid, 997 ) 998 return winprocess.GetExitCodeProcess(process) == winprocess.STILL_ACTIVE 999 1000 except OSError as e: 1001 # no such process 1002 if e.winerror == winprocess.ERROR_INVALID_PARAMETER: 1003 return False 1004 1005 # access denied 1006 if e.winerror == winprocess.ERROR_ACCESS_DENIED: 1007 return True 1008 1009 # re-raise for any other type of exception 1010 raise 1011 1012 elif isPosix: 1013 try: 1014 os.kill(pid, 0) 1015 except OSError as e: 1016 return e.errno == errno.EPERM 1017 else: 1018 return True 1019 1020 @classmethod 1021 def _getpgid(cls, pid): 1022 try: 1023 return os.getpgid(pid) 1024 except OSError as e: 1025 # Do not raise for "No such process" 1026 if e.errno != errno.ESRCH: 1027 raise 1028 1029 def check_for_detached(self, new_pid): 1030 """Check if the current process has been detached and mark it appropriately. 1031 1032 In case of application restarts the process can spawn itself into a new process group. 1033 From now on the process can no longer be tracked by mozprocess anymore and has to be 1034 marked as detached. If the consumer of mozprocess still knows the new process id it could 1035 check for the detached state. 1036 1037 new_pid is the new process id of the child process. 1038 """ 1039 if not hasattr(self, "proc"): 1040 raise RuntimeError("Process hasn't been started yet") 1041 1042 if isPosix: 1043 new_pgid = self._getpgid(new_pid) 1044 1045 if new_pgid and new_pgid != self.proc.pgid: 1046 self.proc.detached_pid = new_pid 1047 print( 1048 'Child process with id "%s" has been marked as detached because it is no ' 1049 "longer in the managed process group. Keeping reference to the process id " 1050 '"%s" which is the new child process.' % (self.pid, new_pid), 1051 file=sys.stdout, 1052 ) 1053 1054 1055 class CallableList(list): 1056 def __call__(self, *args, **kwargs): 1057 for e in self: 1058 e(*args, **kwargs) 1059 1060 def __add__(self, lst): 1061 return CallableList(list.__add__(self, lst)) 1062 1063 1064 class ProcessReader: 1065 def __init__( 1066 self, 1067 stdout_callback=None, 1068 stderr_callback=None, 1069 finished_callback=None, 1070 timeout_callback=None, 1071 timeout=None, 1072 output_timeout=None, 1073 ): 1074 self.stdout_callback = stdout_callback or (lambda line: True) 1075 self.stderr_callback = stderr_callback or (lambda line: True) 1076 self.finished_callback = finished_callback or (lambda: True) 1077 self.timeout_callback = timeout_callback or (lambda: True) 1078 self.timeout = timeout 1079 self.output_timeout = output_timeout 1080 self.thread = None 1081 self.got_data = threading.Event() 1082 self.didOutputTimeout = False 1083 1084 def debug(self, msg): 1085 if not MOZPROCESS_DEBUG: 1086 return 1087 print(f"DBG::MOZPROC ProcessReader | {msg}") 1088 1089 def _create_stream_reader(self, name, stream, queue, callback): 1090 thread = threading.Thread( 1091 name=name, target=self._read_stream, args=(stream, queue, callback) 1092 ) 1093 thread.daemon = True 1094 thread.start() 1095 return thread 1096 1097 def _read_stream(self, stream, queue, callback): 1098 sentinel = "" if isinstance(stream, io.TextIOBase) else b"" 1099 try: 1100 for line in iter(stream.readline, sentinel): 1101 queue.put((line, callback)) 1102 except ValueError as e: 1103 if "I/O operation on closed file" in str(e): 1104 # Stream was closed by the process, this is normal 1105 pass 1106 else: 1107 raise 1108 # Give a chance to the reading loop to exit without a timeout. 1109 queue.put((b"", None)) 1110 try: 1111 stream.close() 1112 except ValueError: 1113 # Stream might already be closed 1114 pass 1115 1116 def start(self, proc): 1117 queue = Queue() 1118 readers = 0 1119 if proc.stdout: 1120 self._create_stream_reader( 1121 "ProcessReaderStdout", proc.stdout, queue, self.stdout_callback 1122 ) 1123 readers += 1 1124 if proc.stderr and proc.stderr != proc.stdout: 1125 self._create_stream_reader( 1126 "ProcessReaderStderr", proc.stderr, queue, self.stderr_callback 1127 ) 1128 readers += 1 1129 self.thread = threading.Thread( 1130 name="ProcessReader", 1131 target=self._read, 1132 args=(queue, readers), 1133 ) 1134 self.thread.daemon = True 1135 self.thread.start() 1136 self.debug("ProcessReader started") 1137 1138 def _read(self, queue, readers): 1139 start_time = time.time() 1140 timeout = self.timeout 1141 if timeout is not None: 1142 timeout += start_time 1143 output_timeout = self.output_timeout 1144 1145 def get_line(): 1146 queue_timeout = None 1147 if timeout: 1148 queue_timeout = timeout - time.time() 1149 if output_timeout: 1150 if queue_timeout: 1151 queue_timeout = min(queue_timeout, output_timeout) 1152 else: 1153 queue_timeout = output_timeout 1154 return queue.get(timeout=queue_timeout) 1155 1156 try: 1157 # We need to wait for as many `(b"", None)` sentinels as there are 1158 # reader threads setup in start. 1159 for n in range(readers): 1160 for line, callback in iter(get_line, (b"", None)): 1161 self.got_data.set() 1162 try: 1163 callback(line.rstrip()) 1164 except Exception: 1165 traceback.print_exc() 1166 try: 1167 self.finished_callback() 1168 except Exception: 1169 traceback.print_exc() 1170 except Empty: 1171 if timeout and time.time() < timeout or not timeout: 1172 self.didOutputTimeout = True 1173 try: 1174 self.timeout_callback() 1175 except Exception: 1176 traceback.print_exc() 1177 self.debug("_read exited") 1178 1179 def is_alive(self): 1180 if self.thread: 1181 return self.thread.is_alive() 1182 return False 1183 1184 def is_still_reading(self, timeout): 1185 self.got_data.clear() 1186 return self.got_data.wait(timeout) 1187 1188 def join(self, timeout=None): 1189 if self.thread: 1190 self.thread.join(timeout=timeout) 1191 1192 1193 # default output handlers 1194 # these should be callables that take the output line 1195 1196 1197 class StoreOutput: 1198 """accumulate stdout""" 1199 1200 def __init__(self): 1201 self.output = [] 1202 1203 def __call__(self, line): 1204 self.output.append(line) 1205 1206 1207 class StreamOutput: 1208 """pass output to a stream and flush""" 1209 1210 def __init__(self, stream, text=True): 1211 self.stream = stream 1212 self.text = text 1213 1214 def __call__(self, line): 1215 if self.text: 1216 if isinstance(line, bytes): 1217 line = line.decode(errors="ignore") 1218 line += "\n" 1219 else: 1220 if isinstance(line, str): 1221 line = line.encode(errors="ignore") 1222 line += b"\n" 1223 try: 1224 self.stream.write(line) 1225 except TypeError: 1226 print( 1227 "HEY! If you're reading this, you're about to encounter a " 1228 "type error, probably as a result of a conversion from " 1229 "Python 2 to Python 3. This is almost definitely because " 1230 "you're trying to write binary data to a text-encoded " 1231 "stream, or text data to a binary-encoded stream. Check how " 1232 "you're instantiating your ProcessHandler and if the output " 1233 "should be text-encoded, make sure you pass " 1234 "universal_newlines=True.", 1235 file=sys.stderr, 1236 ) 1237 raise 1238 self.stream.flush() 1239 1240 1241 class LogOutput(StreamOutput): 1242 """pass output to a file""" 1243 1244 def __init__(self, filename): 1245 self.file_obj = open(filename, "a") 1246 StreamOutput.__init__(self, self.file_obj, True) 1247 1248 def __del__(self): 1249 if self.file_obj is not None: 1250 self.file_obj.close() 1251 1252 1253 # front end class with the default handlers 1254 1255 1256 class ProcessHandler(ProcessHandlerMixin): 1257 """ 1258 Convenience class for handling processes with default output handlers. 1259 1260 By default, all output is sent to stdout. This can be disabled by setting 1261 the *stream* argument to None. 1262 1263 If processOutputLine keyword argument is specified the function or the 1264 list of functions specified by this argument will be called for each line 1265 of output; the output will not be written to stdout automatically then 1266 if stream is True (the default). 1267 1268 If storeOutput==True, the output produced by the process will be saved 1269 as self.output. 1270 1271 If logfile is not None, the output produced by the process will be 1272 appended to the given file. 1273 """ 1274 1275 def __init__(self, cmd, logfile=None, stream=True, storeOutput=True, **kwargs): 1276 kwargs.setdefault("processOutputLine", []) 1277 if callable(kwargs["processOutputLine"]): 1278 kwargs["processOutputLine"] = [kwargs["processOutputLine"]] 1279 1280 if logfile: 1281 logoutput = LogOutput(logfile) 1282 kwargs["processOutputLine"].append(logoutput) 1283 1284 text = kwargs.get("universal_newlines", False) or kwargs.get("text", False) 1285 1286 if stream is True: 1287 if text: 1288 # The encoding of stdout isn't guaranteed to be utf-8. Fix that. 1289 stdout = codecs.getwriter("utf-8")(sys.stdout.buffer) 1290 else: 1291 stdout = sys.stdout.buffer 1292 1293 if not kwargs["processOutputLine"]: 1294 kwargs["processOutputLine"].append(StreamOutput(stdout, text)) 1295 elif stream: 1296 streamoutput = StreamOutput(stream, text) 1297 kwargs["processOutputLine"].append(streamoutput) 1298 1299 self.output = None 1300 if storeOutput: 1301 storeoutput = StoreOutput() 1302 self.output = storeoutput.output 1303 kwargs["processOutputLine"].append(storeoutput) 1304 1305 ProcessHandlerMixin.__init__(self, cmd, **kwargs)