tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

processhandler.py (21616B)


      1 # This Source Code Form is subject to the terms of the Mozilla Public
      2 # License, v. 2.0. If a copy of the MPL was not distributed with this file,
      3 # You can obtain one at http://mozilla.org/MPL/2.0/.
      4 
      5 # The Marionette ProcessHandler and ProcessHandlerMixin classes are only
      6 # utilized by Marionette as an alternative to the mozprocess package.
      7 #
      8 # This necessity arises because Marionette supports the application to
      9 # restart itself and, under such conditions, fork its process. To maintain
     10 # the ability to track the process, including permissions to terminate
     11 # the process and receive log entries via stdout and stderr, the psutil
     12 # package is utilized. To prevent any side effects for consumers of
     13 # mozprocess, all necessary helper classes have been duplicated for now.
     14 
     15 import codecs
     16 import os
     17 import signal
     18 import subprocess
     19 import sys
     20 import threading
     21 import time
     22 import traceback
     23 from queue import Empty, Queue
     24 
     25 import psutil
     26 
     27 # Set the MOZPROCESS_DEBUG environment variable to 1 to see some debugging output
     28 MOZPROCESS_DEBUG = os.getenv("MOZPROCESS_DEBUG")
     29 
     30 INTERVAL_PROCESS_ALIVE_CHECK = 0.02
     31 
     32 # For not self-managed processes the returncode seems to not be available.
     33 # Use `8` to indicate this specific situation for now.
     34 UNKNOWN_RETURNCODE = 8
     35 
     36 # We dont use mozinfo because it is expensive to import, see bug 933558.
     37 isPosix = os.name == "posix"  # includes MacOS X
     38 
     39 
     40 class ProcessHandlerMixin:
     41    """
     42    A class for launching and manipulating local processes.
     43 
     44    :param cmd: command to run. May be a string or a list. If specified as a list, the first
     45      element will be interpreted as the command, and all additional elements will be interpreted
     46      as arguments to that command.
     47    :param args: list of arguments to pass to the command (defaults to None). Must not be set when
     48      `cmd` is specified as a list.
     49    :param cwd: working directory for command (defaults to None).
     50    :param env: is the environment to use for the process (defaults to os.environ).
     51    :param kill_on_timeout: when True, the process will be killed when a timeout is reached.
     52      When False, the caller is responsible for killing the process.
     53      Failure to do so could cause a call to wait() to hang indefinitely. (Defaults to True.)
     54    :param processOutputLine: function or list of functions to be called for
     55        each line of output produced by the process (defaults to an empty
     56        list).
     57    :param processStderrLine: function or list of functions to be called
     58        for each line of error output - stderr - produced by the process
     59        (defaults to an empty list). If this is not specified, stderr lines
     60        will be sent to the *processOutputLine* callbacks.
     61    :param onTimeout: function or list of functions to be called when the process times out.
     62    :param onFinish: function or list of functions to be called when the process terminates
     63      normally without timing out.
     64    :param kwargs: additional keyword args to pass directly into Popen.
     65 
     66    NOTE: Child processes will be tracked by default.
     67    """
     68 
     69    def __init__(
     70        self,
     71        cmd,
     72        args=None,
     73        cwd=None,
     74        env=None,
     75        kill_on_timeout=True,
     76        processOutputLine=(),
     77        processStderrLine=(),
     78        onTimeout=(),
     79        onFinish=(),
     80        **kwargs,
     81    ):
     82        self.args = args
     83        self.cmd = cmd
     84        self.cwd = cwd
     85        self.keywordargs = kwargs
     86 
     87        self.didTimeout = False
     88        self.didOutputTimeout = False
     89        self.proc = None
     90 
     91        if env is None:
     92            env = os.environ.copy()
     93        self.env = env
     94 
     95        # handlers
     96        def to_callable_list(arg):
     97            if callable(arg):
     98                arg = [arg]
     99            return CallableList(arg)
    100 
    101        processOutputLine = to_callable_list(processOutputLine)
    102        processStderrLine = to_callable_list(processStderrLine)
    103        onTimeout = to_callable_list(onTimeout)
    104        onFinish = to_callable_list(onFinish)
    105 
    106        def on_timeout():
    107            self.didTimeout = True
    108            self.didOutputTimeout = self.reader.didOutputTimeout
    109            if kill_on_timeout:
    110                self.kill()
    111 
    112        onTimeout.insert(0, on_timeout)
    113 
    114        self._stderr = subprocess.STDOUT
    115        if processStderrLine:
    116            self._stderr = subprocess.PIPE
    117        self.reader = ProcessReader(
    118            stdout_callback=processOutputLine,
    119            stderr_callback=processStderrLine,
    120            finished_callback=onFinish,
    121            timeout_callback=onTimeout,
    122        )
    123 
    124        # It is common for people to pass in the entire array with the cmd and
    125        # the args together since this is how Popen uses it.  Allow for that.
    126        if isinstance(self.cmd, list):
    127            if self.args is not None:
    128                raise TypeError("cmd and args must not both be lists")
    129            (self.cmd, self.args) = (self.cmd[0], self.cmd[1:])
    130        elif self.args is None:
    131            self.args = []
    132 
    133    def _has_valid_proc(func):
    134        def wrapper(self, *args, **kwargs):
    135            if self.proc is None:
    136                raise RuntimeError("Process hasn't been started yet")
    137            return func(self, *args, **kwargs)
    138 
    139        return wrapper
    140 
    141    @property
    142    @_has_valid_proc
    143    def pid(self):
    144        return self.proc.pid
    145 
    146    @staticmethod
    147    def pid_exists(pid):
    148        return psutil.pid_exists(pid)
    149 
    150    @property
    151    def timedOut(self):
    152        """True if the process has timed out for any reason."""
    153        return self.didTimeout
    154 
    155    @property
    156    def outputTimedOut(self):
    157        """True if the process has timed out for no output."""
    158        return self.didOutputTimeout
    159 
    160    @property
    161    def commandline(self):
    162        """the string value of the command line (command + args)"""
    163        return subprocess.list2cmdline([self.cmd] + self.args)
    164 
    165    def _debug(self, msg):
    166        if not MOZPROCESS_DEBUG:
    167            return
    168 
    169        print(f"DBG::MARIONETTE ProcessHandler {self.pid} | {msg}", file=sys.stdout)
    170 
    171    @_has_valid_proc
    172    def kill(self, sig=None, timeout=None):
    173        """Kills the managed process and all its child processes.
    174 
    175        :param sig: Signal to use to kill the process. (Defaults to SIGKILL)
    176 
    177        :param timeout: If not None, wait this number of seconds for the
    178        process to exit.
    179 
    180        Note that this does not manage any state, save any output etc,
    181        it immediately kills the process.
    182        """
    183        if hasattr(self, "returncode"):
    184            return self.returncode
    185 
    186        if self.proc.is_running():
    187            processes = [self.proc] + self.proc.children(recursive=True)
    188 
    189            if sig is None:
    190                # TODO: try SIGTERM first to sanely shutdown the application
    191                # and to not break later when Windows support gets added.
    192                sig = signal.SIGKILL
    193 
    194            # Do we need that?
    195            for process in processes:
    196                try:
    197                    self._debug(f"Killing process: {process}")
    198                    process.send_signal(sig)
    199                except psutil.NoSuchProcess:
    200                    pass
    201            psutil.wait_procs(processes, timeout=timeout)
    202 
    203        # When we kill the the managed process we also have to wait for the
    204        # reader thread to be finished. Otherwise consumers would have to assume
    205        # that it still has not completely shutdown.
    206        self.returncode = self.wait(0)
    207        if self.returncode is None:
    208            self._debug("kill: wait failed -- process is still alive")
    209 
    210        return self.returncode
    211 
    212    @_has_valid_proc
    213    def poll(self):
    214        """Check if child process has terminated
    215 
    216        Returns the current returncode value:
    217        - None if the process hasn't terminated yet
    218        - A negative number if the process was killed by signal N (Unix only)
    219        - '0' if the process ended without failures
    220 
    221        """
    222        if hasattr(self, "returncode"):
    223            return self.returncode
    224 
    225        # If the process that is observed wasn't started with Popen there is
    226        # no `poll()` method available. Use `wait()` instead and do not wait
    227        # for the reader thread because it would cause extra delays.
    228        return self.wait(0, wait_reader=False)
    229 
    230    def processOutput(self, timeout=None, outputTimeout=None):
    231        """
    232        Handle process output until the process terminates or times out.
    233 
    234        :param timeout: If not None, the process will be allowed to continue
    235        for that number of seconds before being killed.
    236 
    237        :outputTimeout: If not None, the process will be allowed to continue
    238        for that number of seconds without producing any output before
    239        being killed.
    240        """
    241        # this method is kept for backward compatibility
    242        if not hasattr(self, "proc"):
    243            self.run(timeout=timeout, outputTimeout=outputTimeout)
    244            # self.run will call this again
    245            return
    246        if not self.reader.is_alive():
    247            self.reader.timeout = timeout
    248            self.reader.output_timeout = outputTimeout
    249            self.reader.start(self.proc)
    250 
    251    def run(self, timeout=None, outputTimeout=None):
    252        """
    253        Starts the process.
    254 
    255        :param timeout: If not None, the process will be allowed to continue for
    256        that number of seconds before being killed. If the process is killed
    257        due to a timeout, the onTimeout handler will be called.
    258 
    259        :outputTimeout: If not None, the process will be allowed to continue
    260        for that number of seconds without producing any output before
    261        being killed.
    262        """
    263        self.didTimeout = False
    264        self.didOutputTimeout = False
    265 
    266        # default arguments
    267        args = dict(
    268            stdout=subprocess.PIPE,
    269            stderr=self._stderr,
    270            cwd=self.cwd,
    271            env=self.env,
    272        )
    273 
    274        # build process arguments
    275        args.update(self.keywordargs)
    276 
    277        # launch the process
    278        self.proc = psutil.Popen([self.cmd] + self.args, **args)
    279 
    280        self.processOutput(timeout=timeout, outputTimeout=outputTimeout)
    281 
    282    @_has_valid_proc
    283    def update_process(self, new_pid, timeout=None):
    284        """Update the internally managed process for the provided process ID.
    285 
    286        When the application restarts itself, such as during an update, the new
    287        process is essentially a fork of itself. To continue monitoring this
    288        process, the process ID needs to be updated accordingly.
    289 
    290        :param new_pid: The ID of the new (forked) process to track.
    291 
    292        :timeout: If not None, the old process will be allowed to continue for
    293        that number of seconds before being killed.
    294        """
    295        if isPosix:
    296            if new_pid == self.pid:
    297                return
    298 
    299            print(
    300                'Child process with id "%s" has been marked as detached because it is no '
    301                "longer in the managed process group. Keeping reference to the process id "
    302                '"%s" which is the new child process.' % (self.pid, new_pid),
    303                file=sys.stdout,
    304            )
    305 
    306            returncode = self.wait(timeout, wait_reader=False)
    307            if returncode is None:
    308                # If the process is still running force kill it.
    309                returncode = self.kill()
    310 
    311            if hasattr(self, "returncode"):
    312                del self.returncode
    313 
    314            self.proc = psutil.Process(new_pid)
    315            self._debug(
    316                f"New process status: {self.proc} (terminal={self.proc.terminal()})"
    317            )
    318 
    319            return returncode
    320 
    321    @_has_valid_proc
    322    def wait(self, timeout=None, wait_reader=True):
    323        """
    324        Waits until the process is terminated.
    325 
    326        :param timeout: If not None, will return after timeout seconds.
    327        This timeout only causes the wait function to return and
    328        does not kill the process.
    329 
    330        :param wait_reader: If set to True, it waits not only for the process
    331        to exit but also for all output to be fully read. (Defaults to True).
    332 
    333        Returns the process exit code value:
    334        - None if the process hasn't terminated yet
    335        - A negative number if the process was killed by signal N (Unix only)
    336        - '0' if the process ended without failures
    337 
    338        """
    339        # Thread.join() blocks the main thread until the reader thread is finished
    340        # wake up once a second in case a keyboard interrupt is sent
    341        if (
    342            wait_reader
    343            and self.reader.thread
    344            and self.reader.thread is not threading.current_thread()
    345        ):
    346            count = 0
    347            while self.reader.is_alive():
    348                if timeout is not None and count > timeout:
    349                    self._debug("wait timeout for reader thread")
    350                    return None
    351                self.reader.join(timeout=1)
    352                count += 1
    353 
    354        try:
    355            self.proc.wait(timeout)
    356            self._debug(f"Process status after wait: {self.proc}")
    357 
    358            if not isinstance(self.proc, psutil.Popen):
    359                self._debug(
    360                    "Not self-managed processes do not have a returncode. "
    361                    f"Setting its value to {UNKNOWN_RETURNCODE}."
    362                )
    363                self.returncode = UNKNOWN_RETURNCODE
    364 
    365            else:
    366                self.returncode = self.proc.returncode
    367 
    368            return self.returncode
    369        except psutil.TimeoutExpired:
    370            return None
    371 
    372 
    373 class CallableList(list):
    374    def __call__(self, *args, **kwargs):
    375        for e in self:
    376            e(*args, **kwargs)
    377 
    378    def __add__(self, lst):
    379        return CallableList(list.__add__(self, lst))
    380 
    381 
    382 class ProcessReader:
    383    def __init__(
    384        self,
    385        stdout_callback=None,
    386        stderr_callback=None,
    387        finished_callback=None,
    388        timeout_callback=None,
    389        timeout=None,
    390        output_timeout=None,
    391    ):
    392        self.stdout_callback = stdout_callback or (lambda line: True)
    393        self.stderr_callback = stderr_callback or (lambda line: True)
    394        self.finished_callback = finished_callback or (lambda: True)
    395        self.timeout_callback = timeout_callback or (lambda: True)
    396        self.timeout = timeout
    397        self.output_timeout = output_timeout
    398        self.thread = None
    399        self.didOutputTimeout = False
    400 
    401    def debug(self, msg):
    402        if not MOZPROCESS_DEBUG:
    403            return
    404 
    405        print(f"DBG::MARIONETTE ProcessReader | {msg}", file=sys.stdout)
    406 
    407    def _create_stream_reader(self, name, stream, queue, callback):
    408        thread = threading.Thread(
    409            name=name, target=self._read_stream, args=(stream, queue, callback)
    410        )
    411        thread.daemon = True
    412        thread.start()
    413        return thread
    414 
    415    def _read_stream(self, stream, queue, callback):
    416        while True:
    417            line = stream.readline()
    418            if not line:
    419                break
    420            queue.put((line, callback))
    421        stream.close()
    422 
    423    def start(self, proc):
    424        queue = Queue()
    425        stdout_reader = None
    426        if proc.stdout:
    427            stdout_reader = self._create_stream_reader(
    428                "ProcessReaderStdout", proc.stdout, queue, self.stdout_callback
    429            )
    430        stderr_reader = None
    431        if proc.stderr and proc.stderr != proc.stdout:
    432            stderr_reader = self._create_stream_reader(
    433                "ProcessReaderStderr", proc.stderr, queue, self.stderr_callback
    434            )
    435        self.thread = threading.Thread(
    436            name="ProcessReader",
    437            target=self._read,
    438            args=(stdout_reader, stderr_reader, queue),
    439        )
    440        self.thread.daemon = True
    441        self.thread.start()
    442        self.debug("ProcessReader started")
    443 
    444    def _read(self, stdout_reader, stderr_reader, queue):
    445        start_time = time.time()
    446        timed_out = False
    447        timeout = self.timeout
    448        if timeout is not None:
    449            timeout += start_time
    450        output_timeout = self.output_timeout
    451        if output_timeout is not None:
    452            output_timeout += start_time
    453 
    454        while (stdout_reader and stdout_reader.is_alive()) or (
    455            stderr_reader and stderr_reader.is_alive()
    456        ):
    457            has_line = True
    458            try:
    459                line, callback = queue.get(True, INTERVAL_PROCESS_ALIVE_CHECK)
    460            except Empty:
    461                has_line = False
    462            now = time.time()
    463            if not has_line:
    464                if output_timeout is not None and now > output_timeout:
    465                    timed_out = True
    466                    self.didOutputTimeout = True
    467                    break
    468            else:
    469                if output_timeout is not None:
    470                    output_timeout = now + self.output_timeout
    471                callback(line.rstrip())
    472            if timeout is not None and now > timeout:
    473                timed_out = True
    474                break
    475        self.debug("_read loop exited")
    476        # process remaining lines to read
    477        while not queue.empty():
    478            line, callback = queue.get(False)
    479            try:
    480                callback(line.rstrip())
    481            except Exception:
    482                traceback.print_exc()
    483        if timed_out:
    484            try:
    485                self.timeout_callback()
    486            except Exception:
    487                traceback.print_exc()
    488        if stdout_reader:
    489            stdout_reader.join()
    490        if stderr_reader:
    491            stderr_reader.join()
    492        if not timed_out:
    493            try:
    494                self.finished_callback()
    495            except Exception:
    496                traceback.print_exc()
    497        self.debug("_read exited")
    498 
    499    def is_alive(self):
    500        if self.thread:
    501            return self.thread.is_alive()
    502        return False
    503 
    504    def join(self, timeout=None):
    505        if self.thread:
    506            self.thread.join(timeout=timeout)
    507 
    508 
    509 # default output handlers
    510 # these should be callables that take the output line
    511 
    512 
    513 class StoreOutput:
    514    """accumulate stdout"""
    515 
    516    def __init__(self):
    517        self.output = []
    518 
    519    def __call__(self, line):
    520        self.output.append(line)
    521 
    522 
    523 class StreamOutput:
    524    """pass output to a stream and flush"""
    525 
    526    def __init__(self, stream, text=True):
    527        self.stream = stream
    528        self.text = text
    529 
    530    def __call__(self, line):
    531        if self.text:
    532            if isinstance(line, bytes):
    533                line = line.decode(errors="ignore")
    534            line += "\n"
    535        else:
    536            if isinstance(line, str):
    537                line = line.encode(errors="ignore")
    538            line += b"\n"
    539        try:
    540            self.stream.write(line)
    541        except TypeError:
    542            print(
    543                "HEY! If you're reading this, you're about to encounter a "
    544                "type error, probably as a result of a conversion from "
    545                "Python 2 to Python 3. This is almost definitely because "
    546                "you're trying to write binary data to a text-encoded "
    547                "stream, or text data to a binary-encoded stream. Check how "
    548                "you're instantiating your ProcessHandler and if the output "
    549                "should be text-encoded, make sure you pass "
    550                "universal_newlines=True.",
    551                file=sys.stderr,
    552            )
    553            raise
    554        self.stream.flush()
    555 
    556 
    557 class LogOutput(StreamOutput):
    558    """pass output to a file"""
    559 
    560    def __init__(self, filename):
    561        self.file_obj = open(filename, "a")
    562        StreamOutput.__init__(self, self.file_obj, True)
    563 
    564    def __del__(self):
    565        if self.file_obj is not None:
    566            self.file_obj.close()
    567 
    568 
    569 # front end class with the default handlers
    570 
    571 
    572 class ProcessHandler(ProcessHandlerMixin):
    573    """
    574    Convenience class for handling processes with default output handlers.
    575 
    576    By default, all output is sent to stdout. This can be disabled by setting
    577    the *stream* argument to None.
    578 
    579    If processOutputLine keyword argument is specified the function or the
    580    list of functions specified by this argument will be called for each line
    581    of output; the output will not be written to stdout automatically then
    582    if stream is True (the default).
    583 
    584    If storeOutput==True, the output produced by the process will be saved
    585    as self.output.
    586 
    587    If logfile is not None, the output produced by the process will be
    588    appended to the given file.
    589    """
    590 
    591    def __init__(self, cmd, logfile=None, stream=True, storeOutput=True, **kwargs):
    592        kwargs.setdefault("processOutputLine", [])
    593        if callable(kwargs["processOutputLine"]):
    594            kwargs["processOutputLine"] = [kwargs["processOutputLine"]]
    595 
    596        if logfile:
    597            logoutput = LogOutput(logfile)
    598            kwargs["processOutputLine"].append(logoutput)
    599 
    600        text = kwargs.get("universal_newlines", False) or kwargs.get("text", False)
    601 
    602        if stream is True:
    603            if text:
    604                # The encoding of stdout isn't guaranteed to be utf-8. Fix that.
    605                stdout = codecs.getwriter("utf-8")(sys.stdout.buffer)
    606            else:
    607                stdout = sys.stdout.buffer
    608 
    609            if not kwargs["processOutputLine"]:
    610                kwargs["processOutputLine"].append(StreamOutput(stdout, text))
    611        elif stream:
    612            streamoutput = StreamOutput(stream, text)
    613            kwargs["processOutputLine"].append(streamoutput)
    614 
    615        self.output = None
    616        if storeOutput:
    617            storeoutput = StoreOutput()
    618            self.output = storeoutput.output
    619            kwargs["processOutputLine"].append(storeoutput)
    620 
    621        ProcessHandlerMixin.__init__(self, cmd, **kwargs)