tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

processhandler.py (53137B)


      1 # This Source Code Form is subject to the terms of the Mozilla Public
      2 # License, v. 2.0. If a copy of the MPL was not distributed with this file,
      3 # You can obtain one at http://mozilla.org/MPL/2.0/.
      4 
      5 # The mozprocess ProcessHandler and ProcessHandlerMixin are typically used as
      6 # an alternative to the python subprocess module. They have been used in many
      7 # Mozilla test harnesses with some success -- but also with on-going concerns,
      8 # especially regarding reliability and exception handling.
      9 #
     10 # New code should try to use the standard subprocess module, and only use
     11 # this ProcessHandler if absolutely necessary.
     12 
     13 import codecs
     14 import errno
     15 import io
     16 import os
     17 import signal
     18 import subprocess
     19 import sys
     20 import threading
     21 import time
     22 import traceback
     23 from datetime import datetime
     24 from queue import Empty, Queue
     25 
     26 # Set the MOZPROCESS_DEBUG environment variable to 1 to see some debugging output
     27 MOZPROCESS_DEBUG = os.getenv("MOZPROCESS_DEBUG")
     28 
     29 INTERVAL_PROCESS_ALIVE_CHECK = 0.02
     30 
     31 # We dont use mozinfo because it is expensive to import, see bug 933558.
     32 isWin = os.name == "nt"
     33 isPosix = os.name == "posix"  # includes MacOS X
     34 
     35 if isWin:
     36    from ctypes import WinError, addressof, byref, c_longlong, c_ulong, sizeof
     37 
     38    from . import winprocess
     39    from .qijo import (
     40        IO_COUNTERS,
     41        JOBOBJECT_ASSOCIATE_COMPLETION_PORT,
     42        JOBOBJECT_BASIC_LIMIT_INFORMATION,
     43        JOBOBJECT_EXTENDED_LIMIT_INFORMATION,
     44        JobObjectAssociateCompletionPortInformation,
     45        JobObjectExtendedLimitInformation,
     46    )
     47 
     48 
     49 class ProcessHandlerMixin:
     50    """
     51    A class for launching and manipulating local processes.
     52 
     53    :param cmd: command to run. May be a string or a list. If specified as a list, the first
     54      element will be interpreted as the command, and all additional elements will be interpreted
     55      as arguments to that command.
     56    :param args: list of arguments to pass to the command (defaults to None). Must not be set when
     57      `cmd` is specified as a list.
     58    :param cwd: working directory for command (defaults to None).
     59    :param env: is the environment to use for the process (defaults to os.environ).
     60    :param ignore_children: causes system to ignore child processes when True,
     61      defaults to False (which tracks child processes).
     62    :param kill_on_timeout: when True, the process will be killed when a timeout is reached.
     63      When False, the caller is responsible for killing the process.
     64      Failure to do so could cause a call to wait() to hang indefinitely. (Defaults to True.)
     65    :param processOutputLine: function or list of functions to be called for
     66        each line of output produced by the process (defaults to an empty
     67        list).
     68    :param processStderrLine: function or list of functions to be called
     69        for each line of error output - stderr - produced by the process
     70        (defaults to an empty list). If this is not specified, stderr lines
     71        will be sent to the *processOutputLine* callbacks.
     72    :param onTimeout: function or list of functions to be called when the process times out.
     73    :param onFinish: function or list of functions to be called when the process terminates
     74      normally without timing out.
     75    :param kwargs: additional keyword args to pass directly into Popen.
     76 
     77    NOTE: Child processes will be tracked by default.  If for any reason
     78    we are unable to track child processes and ignore_children is set to False,
     79    then we will fall back to only tracking the root process.  The fallback
     80    will be logged.
     81    """
     82 
     83    class Process(subprocess.Popen):
     84        """
     85        Represents our view of a subprocess.
     86        It adds a kill() method which allows it to be stopped explicitly.
     87        """
     88 
     89        MAX_IOCOMPLETION_PORT_NOTIFICATION_DELAY = 180
     90        TIMEOUT_BEFORE_SIGKILL = 1.0
     91 
     92        def __init__(
     93            self,
     94            args,
     95            bufsize=0,
     96            executable=None,
     97            stdin=None,
     98            stdout=None,
     99            stderr=None,
    100            preexec_fn=None,
    101            close_fds=False,
    102            shell=False,
    103            cwd=None,
    104            env=None,
    105            universal_newlines=False,
    106            startupinfo=None,
    107            creationflags=0,
    108            ignore_children=False,
    109            encoding="utf-8",
    110        ):
    111            # Parameter for whether or not we should attempt to track child processes
    112            self._ignore_children = ignore_children
    113            self._job = None
    114            self._io_port = None
    115            if isWin:
    116                self._cleanup_lock = threading.Lock()
    117 
    118            if not self._ignore_children and not isWin:
    119                # Set the process group id for linux systems
    120                # Sets process group id to the pid of the parent process
    121                # NOTE: This prevents you from using preexec_fn and managing
    122                #       child processes, TODO: Ideally, find a way around this
    123                def setpgidfn():
    124                    os.setpgid(0, 0)
    125 
    126                preexec_fn = setpgidfn
    127 
    128            kwargs = {
    129                "bufsize": bufsize,
    130                "executable": executable,
    131                "stdin": stdin,
    132                "stdout": stdout,
    133                "stderr": stderr,
    134                "preexec_fn": preexec_fn,
    135                "close_fds": close_fds,
    136                "shell": shell,
    137                "cwd": cwd,
    138                "env": env,
    139                "startupinfo": startupinfo,
    140                "creationflags": creationflags,
    141            }
    142            if sys.version_info.minor >= 6 and universal_newlines:
    143                kwargs["universal_newlines"] = universal_newlines
    144                kwargs["encoding"] = encoding
    145            try:
    146                subprocess.Popen.__init__(self, args, **kwargs)
    147            except OSError:
    148                print(args, file=sys.stderr)
    149                raise
    150 
    151        def debug(self, msg):
    152            if not MOZPROCESS_DEBUG:
    153                return
    154            thread = threading.current_thread().name
    155            print(f"DBG::MOZPROC PID:{self.pid} ({thread}) | {msg}")
    156 
    157        def __del__(self):
    158            if isWin:
    159                _maxint = sys.maxsize
    160                handle = getattr(self, "_handle", None)
    161                if handle:
    162                    # _internal_poll is a Python3 built-in call and requires _handle to be an int on Windows
    163                    # It's only an AutoHANDLE for legacy Python2 reasons that are non-trivial to remove
    164                    self._handle = int(self._handle)
    165                    self._internal_poll(_deadstate=_maxint)
    166                    # Revert it back to the saved 'handle' (AutoHANDLE) for self._cleanup()
    167                    self._handle = handle
    168                if handle or self._job or self._io_port:
    169                    self._cleanup()
    170            else:
    171                subprocess.Popen.__del__(self)
    172 
    173        def send_signal(self, sig=None):
    174            if isWin:
    175                try:
    176                    if not self._ignore_children and self._handle and self._job:
    177                        self.debug("calling TerminateJobObject")
    178                        winprocess.TerminateJobObject(
    179                            self._job, winprocess.ERROR_CONTROL_C_EXIT
    180                        )
    181                    elif self._handle:
    182                        self.debug("calling TerminateProcess")
    183                        winprocess.TerminateProcess(
    184                            self._handle, winprocess.ERROR_CONTROL_C_EXIT
    185                        )
    186                except OSError:
    187                    self._cleanup()
    188 
    189                    traceback.print_exc()
    190                    raise OSError("Could not terminate process")
    191 
    192            else:
    193 
    194                def send_sig(sig, retries=0):
    195                    pid = self.detached_pid or self.pid
    196                    if not self._ignore_children:
    197                        try:
    198                            os.killpg(pid, sig)
    199                        except BaseException as e:
    200                            # On Mac OSX if the process group contains zombie
    201                            # processes, killpg results in an EPERM.
    202                            # In this case, zombie processes need to be reaped
    203                            # before continuing
    204                            # Note: A negative pid refers to the entire process
    205                            # group
    206                            if retries < 1 and getattr(e, "errno", None) == errno.EPERM:
    207                                try:
    208                                    os.waitpid(-pid, 0)
    209                                finally:
    210                                    return send_sig(sig, retries + 1)
    211 
    212                            # ESRCH is a "no such process" failure, which is fine because the
    213                            # application might already have been terminated itself. Any other
    214                            # error would indicate a problem in killing the process.
    215                            if getattr(e, "errno", None) != errno.ESRCH:
    216                                print(
    217                                    "Could not terminate process: %s" % self.pid,
    218                                    file=sys.stderr,
    219                                )
    220                                raise
    221                    else:
    222                        os.kill(pid, sig)
    223 
    224                if sig is None and isPosix:
    225                    # ask the process for termination and wait a bit
    226                    send_sig(signal.SIGTERM)
    227                    limit = time.time() + self.TIMEOUT_BEFORE_SIGKILL
    228                    while time.time() <= limit:
    229                        if self.poll() is not None:
    230                            # process terminated nicely
    231                            break
    232                        time.sleep(INTERVAL_PROCESS_ALIVE_CHECK)
    233                    else:
    234                        # process did not terminate - send SIGKILL to force
    235                        send_sig(signal.SIGKILL)
    236                else:
    237                    # a signal was explicitly set or not posix
    238                    send_sig(sig or signal.SIGKILL)
    239 
    240        def kill(self, sig=None, timeout=None):
    241            self.send_signal(sig)
    242            self.returncode = self.wait(timeout)
    243            self._cleanup()
    244            return self.returncode
    245 
    246        def poll(self):
    247            """Popen.poll
    248            Check if child process has terminated. Set and return returncode attribute.
    249            """
    250            if isWin:
    251                returncode = self._custom_wait(timeout=0)
    252            else:
    253                returncode = subprocess.Popen.poll(self)
    254            if returncode is not None:
    255                self._cleanup()
    256            return returncode
    257 
    258        def wait(self, timeout=None):
    259            """Popen.wait
    260            Called to wait for a running process to shut down and return
    261            its exit code
    262            Returns the main process's exit code
    263            """
    264            # This call will be different for each OS
    265            self.returncode = self._custom_wait(timeout=timeout)
    266            if self.returncode is not None:
    267                self._cleanup()
    268            return self.returncode
    269 
    270        """ Private Members of Process class """
    271 
    272        if isWin:
    273            # Redefine the execute child so that we can track process groups
    274            def _execute_child(self, *args_tuple):
    275                (
    276                    args,
    277                    executable,
    278                    preexec_fn,
    279                    close_fds,
    280                    pass_fds,
    281                    cwd,
    282                    env,
    283                    startupinfo,
    284                    creationflags,
    285                    shell,
    286                    p2cread,
    287                    p2cwrite,
    288                    c2pread,
    289                    c2pwrite,
    290                    errread,
    291                    errwrite,
    292                    *_,
    293                ) = args_tuple
    294                if not isinstance(args, str):
    295                    args = subprocess.list2cmdline(args)
    296 
    297                # Always or in the create new process group
    298                creationflags |= winprocess.CREATE_NEW_PROCESS_GROUP
    299 
    300                if startupinfo is None:
    301                    startupinfo = winprocess.STARTUPINFO()
    302 
    303                if None not in (p2cread, c2pwrite, errwrite):
    304                    startupinfo.dwFlags |= winprocess.STARTF_USESTDHANDLES
    305                    startupinfo.hStdInput = int(p2cread)
    306                    startupinfo.hStdOutput = int(c2pwrite)
    307                    startupinfo.hStdError = int(errwrite)
    308                if shell:
    309                    startupinfo.dwFlags |= winprocess.STARTF_USESHOWWINDOW
    310                    startupinfo.wShowWindow = winprocess.SW_HIDE
    311                    comspec = os.environ.get("COMSPEC", "cmd.exe")
    312                    args = comspec + " /c " + args
    313 
    314                # Determine if we can create a job or create nested jobs.
    315                can_create_job = winprocess.CanCreateJobObject()
    316                can_nest_jobs = self._can_nest_jobs()
    317 
    318                # Ensure we write a warning message if we are falling back
    319                if not (can_create_job or can_nest_jobs) and not self._ignore_children:
    320                    # We can't create job objects AND the user wanted us to
    321                    # Warn the user about this.
    322                    print(
    323                        "ProcessManager UNABLE to use job objects to manage "
    324                        "child processes",
    325                        file=sys.stderr,
    326                    )
    327 
    328                # set process creation flags
    329                creationflags |= winprocess.CREATE_SUSPENDED
    330                creationflags |= winprocess.CREATE_UNICODE_ENVIRONMENT
    331                if can_create_job:
    332                    creationflags |= winprocess.CREATE_BREAKAWAY_FROM_JOB
    333                if not (can_create_job or can_nest_jobs):
    334                    # Since we've warned, we just log info here to inform you
    335                    # of the consequence of setting ignore_children = True
    336                    print("ProcessManager NOT managing child processes")
    337 
    338                # create the process
    339                hp, ht, pid, tid = winprocess.CreateProcess(
    340                    executable,
    341                    args,
    342                    None,
    343                    None,  # No special security
    344                    1,  # Must inherit handles!
    345                    creationflags,
    346                    winprocess.EnvironmentBlock(env),
    347                    cwd,
    348                    startupinfo,
    349                )
    350                self._child_created = True
    351                self._handle = hp
    352                self._thread = ht
    353                self.pid = pid
    354                self.tid = tid
    355 
    356                if not self._ignore_children and (can_create_job or can_nest_jobs):
    357                    try:
    358                        # We create a new job for this process, so that we can kill
    359                        # the process and any sub-processes
    360                        # Create the IO Completion Port
    361                        self._io_port = winprocess.CreateIoCompletionPort()
    362                        self._job = winprocess.CreateJobObject()
    363 
    364                        # Now associate the io comp port and the job object
    365                        joacp = JOBOBJECT_ASSOCIATE_COMPLETION_PORT(
    366                            winprocess.COMPKEY_JOBOBJECT, self._io_port
    367                        )
    368                        winprocess.SetInformationJobObject(
    369                            self._job,
    370                            JobObjectAssociateCompletionPortInformation,
    371                            addressof(joacp),
    372                            sizeof(joacp),
    373                        )
    374 
    375                        # Allow subprocesses to break away from us - necessary when
    376                        # Firefox restarts, or flash with protected mode
    377                        limit_flags = winprocess.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE
    378                        if not can_nest_jobs:
    379                            # This allows sandbox processes to create their own job,
    380                            # and is necessary to set for older versions of Windows
    381                            # without nested job support.
    382                            limit_flags |= winprocess.JOB_OBJECT_LIMIT_BREAKAWAY_OK
    383 
    384                        jbli = JOBOBJECT_BASIC_LIMIT_INFORMATION(
    385                            c_longlong(0),  # per process time limit (ignored)
    386                            c_longlong(0),  # per job user time limit (ignored)
    387                            limit_flags,
    388                            0,  # min working set (ignored)
    389                            0,  # max working set (ignored)
    390                            0,  # active process limit (ignored)
    391                            None,  # affinity (ignored)
    392                            0,  # Priority class (ignored)
    393                            0,  # Scheduling class (ignored)
    394                        )
    395 
    396                        iocntr = IO_COUNTERS()
    397                        jeli = JOBOBJECT_EXTENDED_LIMIT_INFORMATION(
    398                            jbli,  # basic limit info struct
    399                            iocntr,  # io_counters (ignored)
    400                            0,  # process mem limit (ignored)
    401                            0,  # job mem limit (ignored)
    402                            0,  # peak process limit (ignored)
    403                            0,
    404                        )  # peak job limit (ignored)
    405 
    406                        winprocess.SetInformationJobObject(
    407                            self._job,
    408                            JobObjectExtendedLimitInformation,
    409                            addressof(jeli),
    410                            sizeof(jeli),
    411                        )
    412 
    413                        # Assign the job object to the process
    414                        winprocess.AssignProcessToJobObject(self._job, int(hp))
    415 
    416                        # It's overkill, but we use Queue to signal between threads
    417                        # because it handles errors more gracefully than event or condition.
    418                        self._process_events = Queue()
    419 
    420                        # Spin up our thread for managing the IO Completion Port
    421                        self._procmgrthread = threading.Thread(target=self._procmgr)
    422                    except Exception:
    423                        print(
    424                            """Exception trying to use job objects;
    425 falling back to not using job objects for managing child processes""",
    426                            file=sys.stderr,
    427                        )
    428                        tb = traceback.format_exc()
    429                        print(tb, file=sys.stderr)
    430                        # Ensure no dangling handles left behind
    431                        self._cleanup_job_io_port()
    432                else:
    433                    self._job = None
    434 
    435                winprocess.ResumeThread(int(ht))
    436                if getattr(self, "_procmgrthread", None):
    437                    self._procmgrthread.start()
    438                ht.Close()
    439 
    440                for i in (p2cread, c2pwrite, errwrite):
    441                    if i is not None:
    442                        i.Close()
    443 
    444            # Per:
    445            # https://msdn.microsoft.com/en-us/library/windows/desktop/hh448388%28v=vs.85%29.aspx
    446            # Nesting jobs came in with windows versions starting with 6.2 according to the table
    447            # on this page:
    448            # https://msdn.microsoft.com/en-us/library/ms724834%28v=vs.85%29.aspx
    449            def _can_nest_jobs(self):
    450                winver = sys.getwindowsversion()
    451                return winver.major > 6 or winver.major == 6 and winver.minor >= 2
    452 
    453            # Windows Process Manager - watches the IO Completion Port and
    454            # keeps track of child processes
    455            def _procmgr(self):
    456                if not (self._io_port) or not (self._job):
    457                    return
    458 
    459                try:
    460                    self._poll_iocompletion_port()
    461                except Exception:
    462                    traceback.print_exc()
    463                    # If _poll_iocompletion_port threw an exception for some unexpected reason,
    464                    # send an event that will make _custom_wait throw an Exception.
    465                    self._process_events.put({})
    466                except KeyboardInterrupt:
    467                    raise KeyboardInterrupt
    468 
    469            def _poll_iocompletion_port(self):
    470                # Watch the IO Completion port for status
    471                self._spawned_procs = {}
    472                countdowntokill = 0
    473 
    474                self.debug("start polling IO completion port")
    475 
    476                while True:
    477                    msgid = c_ulong(0)
    478                    compkey = c_ulong(0)
    479                    pid = c_ulong(0)
    480                    portstatus = winprocess.GetQueuedCompletionStatus(
    481                        self._io_port, byref(msgid), byref(compkey), byref(pid), 5000
    482                    )
    483 
    484                    # If the countdowntokill has been activated, we need to check
    485                    # if we should start killing the children or not.
    486                    if countdowntokill != 0:
    487                        diff = datetime.now() - countdowntokill
    488                        # Arbitrarily wait 3 minutes for windows to get its act together
    489                        # Windows sometimes takes a small nap between notifying the
    490                        # IO Completion port and actually killing the children, and we
    491                        # don't want to mistake that situation for the situation of an unexpected
    492                        # parent abort (which is what we're looking for here).
    493                        if diff.seconds > self.MAX_IOCOMPLETION_PORT_NOTIFICATION_DELAY:
    494                            print(
    495                                "WARNING | IO Completion Port failed to signal "
    496                                "process shutdown",
    497                                file=sys.stderr,
    498                            )
    499                            print(
    500                                "Parent process %s exited with children alive:"
    501                                % self.pid,
    502                                file=sys.stderr,
    503                            )
    504                            print(
    505                                "PIDS: %s"
    506                                % ", ".join([str(i) for i in self._spawned_procs]),
    507                                file=sys.stderr,
    508                            )
    509                            print(
    510                                "Attempting to kill them, but no guarantee of success",
    511                                file=sys.stderr,
    512                            )
    513 
    514                            self.send_signal()
    515                            self._process_events.put({self.pid: "FINISHED"})
    516                            break
    517 
    518                    if not portstatus:
    519                        # Check to see what happened
    520                        errcode = winprocess.GetLastError()
    521                        if errcode == winprocess.ERROR_ABANDONED_WAIT_0:
    522                            # Then something has killed the port, break the loop
    523                            print(
    524                                "IO Completion Port unexpectedly closed",
    525                                file=sys.stderr,
    526                            )
    527                            self._process_events.put({self.pid: "FINISHED"})
    528                            break
    529                        elif errcode == winprocess.WAIT_TIMEOUT:
    530                            # Timeouts are expected, just keep on polling
    531                            continue
    532                        else:
    533                            print(
    534                                "Error Code %s trying to query IO Completion Port, "
    535                                "exiting" % errcode,
    536                                file=sys.stderr,
    537                            )
    538                            raise WinError(errcode)
    539                            break
    540 
    541                    if compkey.value == winprocess.COMPKEY_TERMINATE.value:
    542                        self.debug("compkeyterminate detected")
    543                        # Then we're done
    544                        break
    545 
    546                    # Check the status of the IO Port and do things based on it
    547                    if compkey.value == winprocess.COMPKEY_JOBOBJECT.value:
    548                        if msgid.value == winprocess.JOB_OBJECT_MSG_ACTIVE_PROCESS_ZERO:
    549                            # No processes left, time to shut down
    550                            # Signal anyone waiting on us that it is safe to shut down
    551                            self.debug("job object msg active processes zero")
    552                            self._process_events.put({self.pid: "FINISHED"})
    553                            break
    554                        elif msgid.value == winprocess.JOB_OBJECT_MSG_NEW_PROCESS:
    555                            # New Process started
    556                            # Add the child proc to our list in case our parent flakes out on us
    557                            # without killing everything.
    558                            if pid.value != self.pid:
    559                                self._spawned_procs[pid.value] = 1
    560                                self.debug(
    561                                    "new process detected with pid value: %s"
    562                                    % pid.value
    563                                )
    564                        elif msgid.value == winprocess.JOB_OBJECT_MSG_EXIT_PROCESS:
    565                            self.debug("process id %s exited normally" % pid.value)
    566                            # One process exited normally
    567                            if pid.value == self.pid and len(self._spawned_procs) > 0:
    568                                # Parent process dying, start countdown timer
    569                                countdowntokill = datetime.now()
    570                            elif pid.value in self._spawned_procs:
    571                                # Child Process died remove from list
    572                                del self._spawned_procs[pid.value]
    573                        elif (
    574                            msgid.value
    575                            == winprocess.JOB_OBJECT_MSG_ABNORMAL_EXIT_PROCESS
    576                        ):
    577                            # One process existed abnormally
    578                            self.debug("process id %s exited abnormally" % pid.value)
    579                            if pid.value == self.pid and len(self._spawned_procs) > 0:
    580                                # Parent process dying, start countdown timer
    581                                countdowntokill = datetime.now()
    582                            elif pid.value in self._spawned_procs:
    583                                # Child Process died remove from list
    584                                del self._spawned_procs[pid.value]
    585                        else:
    586                            # We don't care about anything else
    587                            self.debug("We got a message %s" % msgid.value)
    588                            pass
    589 
    590            def _custom_wait(self, timeout=None):
    591                """Custom implementation of wait.
    592 
    593                - timeout: number of seconds before timing out. If None,
    594                  will wait indefinitely.
    595                """
    596                # First, check to see if the process is still running
    597                if self._handle:
    598                    returncode = winprocess.GetExitCodeProcess(self._handle)
    599                    if returncode != winprocess.STILL_ACTIVE:
    600                        self.returncode = returncode
    601                else:
    602                    # Dude, the process is like totally dead!
    603                    return self.returncode
    604 
    605                # On Windows, an unlimited timeout prevents KeyboardInterrupt from
    606                # being caught.
    607                the_timeout = 0.1 if timeout is None else timeout
    608 
    609                if self._job:
    610                    self.debug("waiting with IO completion port")
    611                    # Then we are managing with IO Completion Ports
    612                    # wait on a signal so we know when we have seen the last
    613                    # process come through.
    614                    # We use queues to synchronize between the thread and this
    615                    # function because events just didn't have robust enough error
    616                    # handling on pre-2.7 versions
    617                    try:
    618                        while True:
    619                            try:
    620                                item = self._process_events.get(timeout=the_timeout)
    621                            except Empty:
    622                                # The timeout was not given by the user, we just have a
    623                                # timeout to allow KeyboardInterrupt, so retry.
    624                                if timeout is None:
    625                                    continue
    626                                else:
    627                                    raise
    628                            break
    629 
    630                        # re-emit the event in case some other thread is also calling wait()
    631                        self._process_events.put(item)
    632                        if item[self.pid] == "FINISHED":
    633                            self.debug("received 'FINISHED' from _procmgrthread")
    634                            self._process_events.task_done()
    635                    except Empty:
    636                        # There was no event within the expected time.
    637                        pass
    638                    except Exception:
    639                        traceback.print_exc()
    640                        raise OSError(
    641                            "IO Completion Port failed to signal process shutdown"
    642                        )
    643                    finally:
    644                        if self._handle:
    645                            returncode = winprocess.GetExitCodeProcess(self._handle)
    646                            if returncode != winprocess.STILL_ACTIVE:
    647                                self.returncode = returncode
    648 
    649                else:
    650                    # Not managing with job objects, so all we can reasonably do
    651                    # is call waitforsingleobject and hope for the best
    652                    self.debug("waiting without IO completion port")
    653 
    654                    if not self._ignore_children:
    655                        self.debug("NOT USING JOB OBJECTS!!!")
    656                    # First, make sure we have not already ended
    657                    if self.returncode is not None:
    658                        return self.returncode
    659 
    660                    rc = None
    661                    if self._handle:
    662                        # timeout for WaitForSingleObject is in ms
    663                        the_timeout = int(the_timeout * 1000)
    664                        while True:
    665                            rc = winprocess.WaitForSingleObject(
    666                                self._handle, the_timeout
    667                            )
    668                            # The timeout was not given by the user, we just have a
    669                            # timeout to allow KeyboardInterrupt, so retry.
    670                            if timeout is None and rc == winprocess.WAIT_TIMEOUT:
    671                                continue
    672                            break
    673 
    674                    if rc == winprocess.WAIT_TIMEOUT:
    675                        # Timeout happened as asked.
    676                        pass
    677                    elif rc == winprocess.WAIT_OBJECT_0:
    678                        # We caught WAIT_OBJECT_0, which indicates all is well
    679                        print("Single process terminated successfully")
    680                        self.returncode = winprocess.GetExitCodeProcess(self._handle)
    681                    else:
    682                        # An error occured we should probably throw
    683                        rc = winprocess.GetLastError()
    684                        if rc:
    685                            raise WinError(rc)
    686 
    687                return self.returncode
    688 
    689            def _cleanup_job_io_port(self):
    690                """Do the job and IO port cleanup separately because there are
    691                cases where we want to clean these without killing _handle
    692                (i.e. if we fail to create the job object in the first place)
    693                """
    694                if (
    695                    getattr(self, "_job")
    696                    and self._job != winprocess.INVALID_HANDLE_VALUE
    697                ):
    698                    self._job.Close()
    699                    self._job = None
    700                else:
    701                    # If windows already freed our handle just set it to none
    702                    # (saw this intermittently while testing)
    703                    self._job = None
    704 
    705                if (
    706                    getattr(self, "_io_port", None)
    707                    and self._io_port != winprocess.INVALID_HANDLE_VALUE
    708                ):
    709                    self._io_port.Close()
    710                    self._io_port = None
    711                else:
    712                    self._io_port = None
    713 
    714                if getattr(self, "_procmgrthread", None):
    715                    self._procmgrthread = None
    716 
    717            def _cleanup(self):
    718                self._cleanup_lock.acquire()
    719                self._cleanup_job_io_port()
    720                if self._thread and self._thread != winprocess.INVALID_HANDLE_VALUE:
    721                    self._thread.Close()
    722                    self._thread = None
    723                else:
    724                    self._thread = None
    725 
    726                if self._handle and self._handle != winprocess.INVALID_HANDLE_VALUE:
    727                    self._handle.Close()
    728                    self._handle = None
    729                else:
    730                    self._handle = None
    731                self._cleanup_lock.release()
    732 
    733        else:
    734 
    735            def _custom_wait(self, timeout=None):
    736                """Haven't found any reason to differentiate between these platforms
    737                so they all use the same wait callback.  If it is necessary to
    738                craft different styles of wait, then a new _custom_wait method
    739                could be easily implemented.
    740                """
    741                # For non-group wait, call base class
    742                try:
    743                    subprocess.Popen.wait(self, timeout=timeout)
    744                except subprocess.TimeoutExpired:
    745                    # We want to return None in this case
    746                    pass
    747                return self.returncode
    748 
    749            def _cleanup(self):
    750                pass
    751 
    752    def __init__(
    753        self,
    754        cmd,
    755        args=None,
    756        cwd=None,
    757        env=None,
    758        ignore_children=False,
    759        kill_on_timeout=True,
    760        processOutputLine=(),
    761        processStderrLine=(),
    762        onTimeout=(),
    763        onFinish=(),
    764        **kwargs,
    765    ):
    766        self.cmd = cmd
    767        self.args = args
    768        self.cwd = cwd
    769        self.didTimeout = False
    770        self.didOutputTimeout = False
    771        self._ignore_children = ignore_children
    772        self.keywordargs = kwargs
    773        self.read_buffer = ""
    774 
    775        if env is None:
    776            env = os.environ.copy()
    777        self.env = env
    778 
    779        # handlers
    780        def to_callable_list(arg):
    781            if callable(arg):
    782                arg = [arg]
    783            return CallableList(arg)
    784 
    785        processOutputLine = to_callable_list(processOutputLine)
    786        processStderrLine = to_callable_list(processStderrLine)
    787        onTimeout = to_callable_list(onTimeout)
    788        onFinish = to_callable_list(onFinish)
    789 
    790        def on_timeout():
    791            self.didTimeout = True
    792            self.didOutputTimeout = self.reader.didOutputTimeout
    793            if kill_on_timeout:
    794                self.kill()
    795 
    796        onTimeout.insert(0, on_timeout)
    797 
    798        self._stderr = subprocess.STDOUT
    799        if processStderrLine:
    800            self._stderr = subprocess.PIPE
    801        self.reader = ProcessReader(
    802            stdout_callback=processOutputLine,
    803            stderr_callback=processStderrLine,
    804            finished_callback=onFinish,
    805            timeout_callback=onTimeout,
    806        )
    807 
    808        # It is common for people to pass in the entire array with the cmd and
    809        # the args together since this is how Popen uses it.  Allow for that.
    810        if isinstance(self.cmd, list):
    811            if self.args is not None:
    812                raise TypeError("cmd and args must not both be lists")
    813            (self.cmd, self.args) = (self.cmd[0], self.cmd[1:])
    814        elif self.args is None:
    815            self.args = []
    816 
    817    def debug(self, msg):
    818        if not MOZPROCESS_DEBUG:
    819            return
    820        cmd = self.cmd.split(os.sep)[-1:]
    821        print(f"DBG::MOZPROC ProcessHandlerMixin {cmd} | {msg}")
    822 
    823    @property
    824    def timedOut(self):
    825        """True if the process has timed out for any reason."""
    826        return self.didTimeout
    827 
    828    @property
    829    def outputTimedOut(self):
    830        """True if the process has timed out for no output."""
    831        return self.didOutputTimeout
    832 
    833    @property
    834    def commandline(self):
    835        """the string value of the command line (command + args)"""
    836        return subprocess.list2cmdline([self.cmd] + self.args)
    837 
    838    def run(self, timeout=None, outputTimeout=None):
    839        """
    840        Starts the process.
    841 
    842        If timeout is not None, the process will be allowed to continue for
    843        that number of seconds before being killed. If the process is killed
    844        due to a timeout, the onTimeout handler will be called.
    845 
    846        If outputTimeout is not None, the process will be allowed to continue
    847        for that number of seconds without producing any output before
    848        being killed.
    849        """
    850        self.didTimeout = False
    851        self.didOutputTimeout = False
    852 
    853        # default arguments
    854        args = dict(
    855            stdout=subprocess.PIPE,
    856            stderr=self._stderr,
    857            cwd=self.cwd,
    858            env=self.env,
    859            ignore_children=self._ignore_children,
    860        )
    861 
    862        # build process arguments
    863        args.update(self.keywordargs)
    864 
    865        # launch the process
    866        self.proc = self.Process([self.cmd] + self.args, **args)
    867 
    868        if isPosix:
    869            # Keep track of the initial process group in case the process detaches itself
    870            self.proc.pgid = self._getpgid(self.proc.pid)
    871            self.proc.detached_pid = None
    872 
    873        self.processOutput(timeout=timeout, outputTimeout=outputTimeout)
    874 
    875    def kill(self, sig=None, timeout=None):
    876        """
    877        Kills the managed process.
    878 
    879        If you created the process with 'ignore_children=False' (the
    880        default) then it will also also kill all child processes spawned by
    881        it. If you specified 'ignore_children=True' when creating the
    882        process, only the root process will be killed.
    883 
    884        Note that this does not manage any state, save any output etc,
    885        it immediately kills the process.
    886 
    887        :param sig: Signal used to kill the process, defaults to SIGKILL
    888                    (has no effect on Windows)
    889        """
    890        if not hasattr(self, "proc"):
    891            raise RuntimeError("Process hasn't been started yet")
    892 
    893        self.proc.kill(sig=sig, timeout=timeout)
    894 
    895        # When we kill the the managed process we also have to wait for the
    896        # reader thread to be finished. Otherwise consumers would have to assume
    897        # that it still has not completely shutdown.
    898        rc = self.wait(0)
    899        if rc is None:
    900            self.debug("kill: wait failed -- process is still alive")
    901        return rc
    902 
    903    def poll(self):
    904        """Check if child process has terminated
    905 
    906        Returns the current returncode value:
    907        - None if the process hasn't terminated yet
    908        - A negative number if the process was killed by signal N (Unix only)
    909        - '0' if the process ended without failures
    910 
    911        """
    912        if not hasattr(self, "proc"):
    913            raise RuntimeError("Process hasn't been started yet")
    914 
    915        # Ensure that we first check for the reader status. Otherwise
    916        # we might mark the process as finished while output is still getting
    917        # processed.
    918        elif not self._ignore_children and self.reader.is_alive():
    919            return None
    920        elif hasattr(self, "returncode"):
    921            return self.returncode
    922        else:
    923            return self.proc.poll()
    924 
    925    def processOutput(self, timeout=None, outputTimeout=None):
    926        """
    927        Handle process output until the process terminates or times out.
    928 
    929        If timeout is not None, the process will be allowed to continue for
    930        that number of seconds before being killed.
    931 
    932        If outputTimeout is not None, the process will be allowed to continue
    933        for that number of seconds without producing any output before
    934        being killed.
    935        """
    936        # this method is kept for backward compatibility
    937        if not hasattr(self, "proc"):
    938            self.run(timeout=timeout, outputTimeout=outputTimeout)
    939            # self.run will call this again
    940            return
    941        if not self.reader.is_alive():
    942            self.reader.timeout = timeout
    943            self.reader.output_timeout = outputTimeout
    944            self.reader.start(self.proc)
    945 
    946    def wait(self, timeout=None):
    947        """
    948        Waits until all output has been read and the process is
    949        terminated.
    950 
    951        If timeout is not None, will return after timeout seconds.
    952        This timeout only causes the wait function to return and
    953        does not kill the process.
    954 
    955        Returns the process exit code value:
    956        - None if the process hasn't terminated yet
    957        - A negative number if the process was killed by signal N (Unix only)
    958        - '0' if the process ended without failures
    959 
    960        """
    961        self.returncode = self.proc.wait(timeout)
    962        if (
    963            self.returncode is not None
    964            and self.reader.thread
    965            and self.reader.thread is not threading.current_thread()
    966        ):
    967            # If children are ignored and a child is still running because it's
    968            # been daemonized or something, the reader might still be attached
    969            # to that child'd output... and joining will deadlock.
    970            # So instead, we wait for there to be no more active reading still
    971            # happening.
    972            if self._ignore_children:
    973                while self.reader.is_still_reading(timeout=0.1):
    974                    time.sleep(0.1)
    975            else:
    976                self.reader.join()
    977        return self.returncode
    978 
    979    @property
    980    def pid(self):
    981        if not hasattr(self, "proc"):
    982            raise RuntimeError("Process hasn't been started yet")
    983 
    984        return self.proc.pid
    985 
    986    @staticmethod
    987    def pid_exists(pid):
    988        if pid < 0:
    989            return False
    990 
    991        if isWin:
    992            try:
    993                process = winprocess.OpenProcess(
    994                    winprocess.PROCESS_QUERY_INFORMATION | winprocess.PROCESS_VM_READ,
    995                    False,
    996                    pid,
    997                )
    998                return winprocess.GetExitCodeProcess(process) == winprocess.STILL_ACTIVE
    999 
   1000            except OSError as e:
   1001                # no such process
   1002                if e.winerror == winprocess.ERROR_INVALID_PARAMETER:
   1003                    return False
   1004 
   1005                # access denied
   1006                if e.winerror == winprocess.ERROR_ACCESS_DENIED:
   1007                    return True
   1008 
   1009                # re-raise for any other type of exception
   1010                raise
   1011 
   1012        elif isPosix:
   1013            try:
   1014                os.kill(pid, 0)
   1015            except OSError as e:
   1016                return e.errno == errno.EPERM
   1017            else:
   1018                return True
   1019 
   1020    @classmethod
   1021    def _getpgid(cls, pid):
   1022        try:
   1023            return os.getpgid(pid)
   1024        except OSError as e:
   1025            # Do not raise for "No such process"
   1026            if e.errno != errno.ESRCH:
   1027                raise
   1028 
   1029    def check_for_detached(self, new_pid):
   1030        """Check if the current process has been detached and mark it appropriately.
   1031 
   1032        In case of application restarts the process can spawn itself into a new process group.
   1033        From now on the process can no longer be tracked by mozprocess anymore and has to be
   1034        marked as detached. If the consumer of mozprocess still knows the new process id it could
   1035        check for the detached state.
   1036 
   1037        new_pid is the new process id of the child process.
   1038        """
   1039        if not hasattr(self, "proc"):
   1040            raise RuntimeError("Process hasn't been started yet")
   1041 
   1042        if isPosix:
   1043            new_pgid = self._getpgid(new_pid)
   1044 
   1045            if new_pgid and new_pgid != self.proc.pgid:
   1046                self.proc.detached_pid = new_pid
   1047                print(
   1048                    'Child process with id "%s" has been marked as detached because it is no '
   1049                    "longer in the managed process group. Keeping reference to the process id "
   1050                    '"%s" which is the new child process.' % (self.pid, new_pid),
   1051                    file=sys.stdout,
   1052                )
   1053 
   1054 
   1055 class CallableList(list):
   1056    def __call__(self, *args, **kwargs):
   1057        for e in self:
   1058            e(*args, **kwargs)
   1059 
   1060    def __add__(self, lst):
   1061        return CallableList(list.__add__(self, lst))
   1062 
   1063 
   1064 class ProcessReader:
   1065    def __init__(
   1066        self,
   1067        stdout_callback=None,
   1068        stderr_callback=None,
   1069        finished_callback=None,
   1070        timeout_callback=None,
   1071        timeout=None,
   1072        output_timeout=None,
   1073    ):
   1074        self.stdout_callback = stdout_callback or (lambda line: True)
   1075        self.stderr_callback = stderr_callback or (lambda line: True)
   1076        self.finished_callback = finished_callback or (lambda: True)
   1077        self.timeout_callback = timeout_callback or (lambda: True)
   1078        self.timeout = timeout
   1079        self.output_timeout = output_timeout
   1080        self.thread = None
   1081        self.got_data = threading.Event()
   1082        self.didOutputTimeout = False
   1083 
   1084    def debug(self, msg):
   1085        if not MOZPROCESS_DEBUG:
   1086            return
   1087        print(f"DBG::MOZPROC ProcessReader | {msg}")
   1088 
   1089    def _create_stream_reader(self, name, stream, queue, callback):
   1090        thread = threading.Thread(
   1091            name=name, target=self._read_stream, args=(stream, queue, callback)
   1092        )
   1093        thread.daemon = True
   1094        thread.start()
   1095        return thread
   1096 
   1097    def _read_stream(self, stream, queue, callback):
   1098        sentinel = "" if isinstance(stream, io.TextIOBase) else b""
   1099        try:
   1100            for line in iter(stream.readline, sentinel):
   1101                queue.put((line, callback))
   1102        except ValueError as e:
   1103            if "I/O operation on closed file" in str(e):
   1104                # Stream was closed by the process, this is normal
   1105                pass
   1106            else:
   1107                raise
   1108        # Give a chance to the reading loop to exit without a timeout.
   1109        queue.put((b"", None))
   1110        try:
   1111            stream.close()
   1112        except ValueError:
   1113            # Stream might already be closed
   1114            pass
   1115 
   1116    def start(self, proc):
   1117        queue = Queue()
   1118        readers = 0
   1119        if proc.stdout:
   1120            self._create_stream_reader(
   1121                "ProcessReaderStdout", proc.stdout, queue, self.stdout_callback
   1122            )
   1123            readers += 1
   1124        if proc.stderr and proc.stderr != proc.stdout:
   1125            self._create_stream_reader(
   1126                "ProcessReaderStderr", proc.stderr, queue, self.stderr_callback
   1127            )
   1128            readers += 1
   1129        self.thread = threading.Thread(
   1130            name="ProcessReader",
   1131            target=self._read,
   1132            args=(queue, readers),
   1133        )
   1134        self.thread.daemon = True
   1135        self.thread.start()
   1136        self.debug("ProcessReader started")
   1137 
   1138    def _read(self, queue, readers):
   1139        start_time = time.time()
   1140        timeout = self.timeout
   1141        if timeout is not None:
   1142            timeout += start_time
   1143        output_timeout = self.output_timeout
   1144 
   1145        def get_line():
   1146            queue_timeout = None
   1147            if timeout:
   1148                queue_timeout = timeout - time.time()
   1149            if output_timeout:
   1150                if queue_timeout:
   1151                    queue_timeout = min(queue_timeout, output_timeout)
   1152                else:
   1153                    queue_timeout = output_timeout
   1154            return queue.get(timeout=queue_timeout)
   1155 
   1156        try:
   1157            # We need to wait for as many `(b"", None)` sentinels as there are
   1158            # reader threads setup in start.
   1159            for n in range(readers):
   1160                for line, callback in iter(get_line, (b"", None)):
   1161                    self.got_data.set()
   1162                    try:
   1163                        callback(line.rstrip())
   1164                    except Exception:
   1165                        traceback.print_exc()
   1166            try:
   1167                self.finished_callback()
   1168            except Exception:
   1169                traceback.print_exc()
   1170        except Empty:
   1171            if timeout and time.time() < timeout or not timeout:
   1172                self.didOutputTimeout = True
   1173            try:
   1174                self.timeout_callback()
   1175            except Exception:
   1176                traceback.print_exc()
   1177        self.debug("_read exited")
   1178 
   1179    def is_alive(self):
   1180        if self.thread:
   1181            return self.thread.is_alive()
   1182        return False
   1183 
   1184    def is_still_reading(self, timeout):
   1185        self.got_data.clear()
   1186        return self.got_data.wait(timeout)
   1187 
   1188    def join(self, timeout=None):
   1189        if self.thread:
   1190            self.thread.join(timeout=timeout)
   1191 
   1192 
   1193 # default output handlers
   1194 # these should be callables that take the output line
   1195 
   1196 
   1197 class StoreOutput:
   1198    """accumulate stdout"""
   1199 
   1200    def __init__(self):
   1201        self.output = []
   1202 
   1203    def __call__(self, line):
   1204        self.output.append(line)
   1205 
   1206 
   1207 class StreamOutput:
   1208    """pass output to a stream and flush"""
   1209 
   1210    def __init__(self, stream, text=True):
   1211        self.stream = stream
   1212        self.text = text
   1213 
   1214    def __call__(self, line):
   1215        if self.text:
   1216            if isinstance(line, bytes):
   1217                line = line.decode(errors="ignore")
   1218            line += "\n"
   1219        else:
   1220            if isinstance(line, str):
   1221                line = line.encode(errors="ignore")
   1222            line += b"\n"
   1223        try:
   1224            self.stream.write(line)
   1225        except TypeError:
   1226            print(
   1227                "HEY! If you're reading this, you're about to encounter a "
   1228                "type error, probably as a result of a conversion from "
   1229                "Python 2 to Python 3. This is almost definitely because "
   1230                "you're trying to write binary data to a text-encoded "
   1231                "stream, or text data to a binary-encoded stream. Check how "
   1232                "you're instantiating your ProcessHandler and if the output "
   1233                "should be text-encoded, make sure you pass "
   1234                "universal_newlines=True.",
   1235                file=sys.stderr,
   1236            )
   1237            raise
   1238        self.stream.flush()
   1239 
   1240 
   1241 class LogOutput(StreamOutput):
   1242    """pass output to a file"""
   1243 
   1244    def __init__(self, filename):
   1245        self.file_obj = open(filename, "a")
   1246        StreamOutput.__init__(self, self.file_obj, True)
   1247 
   1248    def __del__(self):
   1249        if self.file_obj is not None:
   1250            self.file_obj.close()
   1251 
   1252 
   1253 # front end class with the default handlers
   1254 
   1255 
   1256 class ProcessHandler(ProcessHandlerMixin):
   1257    """
   1258    Convenience class for handling processes with default output handlers.
   1259 
   1260    By default, all output is sent to stdout. This can be disabled by setting
   1261    the *stream* argument to None.
   1262 
   1263    If processOutputLine keyword argument is specified the function or the
   1264    list of functions specified by this argument will be called for each line
   1265    of output; the output will not be written to stdout automatically then
   1266    if stream is True (the default).
   1267 
   1268    If storeOutput==True, the output produced by the process will be saved
   1269    as self.output.
   1270 
   1271    If logfile is not None, the output produced by the process will be
   1272    appended to the given file.
   1273    """
   1274 
   1275    def __init__(self, cmd, logfile=None, stream=True, storeOutput=True, **kwargs):
   1276        kwargs.setdefault("processOutputLine", [])
   1277        if callable(kwargs["processOutputLine"]):
   1278            kwargs["processOutputLine"] = [kwargs["processOutputLine"]]
   1279 
   1280        if logfile:
   1281            logoutput = LogOutput(logfile)
   1282            kwargs["processOutputLine"].append(logoutput)
   1283 
   1284        text = kwargs.get("universal_newlines", False) or kwargs.get("text", False)
   1285 
   1286        if stream is True:
   1287            if text:
   1288                # The encoding of stdout isn't guaranteed to be utf-8. Fix that.
   1289                stdout = codecs.getwriter("utf-8")(sys.stdout.buffer)
   1290            else:
   1291                stdout = sys.stdout.buffer
   1292 
   1293            if not kwargs["processOutputLine"]:
   1294                kwargs["processOutputLine"].append(StreamOutput(stdout, text))
   1295        elif stream:
   1296            streamoutput = StreamOutput(stream, text)
   1297            kwargs["processOutputLine"].append(streamoutput)
   1298 
   1299        self.output = None
   1300        if storeOutput:
   1301            storeoutput = StoreOutput()
   1302            self.output = storeoutput.output
   1303            kwargs["processOutputLine"].append(storeoutput)
   1304 
   1305        ProcessHandlerMixin.__init__(self, cmd, **kwargs)