tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

expensive_line_transformer.py (7538B)


      1 # Copyright 2023 The Chromium Authors
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 from abc import ABC, abstractmethod
      6 import logging
      7 import subprocess
      8 import threading
      9 import time
     10 import uuid
     11 
     12 from devil.utils import reraiser_thread
     13 
     14 
     15 class ExpensiveLineTransformer(ABC):
     16  def __init__(self, process_start_timeout, minimum_timeout, per_line_timeout):
     17    self._process_start_timeout = process_start_timeout
     18    self._minimum_timeout = minimum_timeout
     19    self._per_line_timeout = per_line_timeout
     20    self._started = False
     21    # Allow only one thread to call TransformLines() at a time.
     22    self._lock = threading.Lock()
     23    # Ensure that only one thread attempts to kill self._proc in Close().
     24    self._close_lock = threading.Lock()
     25    self._closed_called = False
     26    # Assign to None so that attribute exists if Popen() throws.
     27    self._proc = None
     28    # Start process eagerly to hide start-up latency.
     29    self._proc_start_time = None
     30 
     31  def start(self):
     32    # delay the start of the process, to allow the initialization of the
     33    # descendant classes first.
     34    if self._started:
     35      logging.error('%s: Trying to start an already started command', self.name)
     36      return
     37 
     38    # Start process eagerly to hide start-up latency.
     39    self._proc_start_time = time.time()
     40 
     41    if not self.command:
     42      logging.error('%s: No command available', self.name)
     43      return
     44 
     45    self._proc = subprocess.Popen(self.command,
     46                                  bufsize=1,
     47                                  stdin=subprocess.PIPE,
     48                                  stdout=subprocess.PIPE,
     49                                  universal_newlines=True,
     50                                  close_fds=True)
     51    self._started = True
     52 
     53  def IsClosed(self):
     54    return (not self._started or self._closed_called
     55            or self._proc.returncode is not None)
     56 
     57  def IsBusy(self):
     58    return self._lock.locked()
     59 
     60  def IsReady(self):
     61    return self._started and not self.IsClosed() and not self.IsBusy()
     62 
     63  def TransformLines(self, lines):
     64    """Symbolizes names found in the given lines.
     65 
     66    If anything goes wrong (process crashes, timeout, etc), returns |lines|.
     67 
     68    Args:
     69      lines: A list of strings without trailing newlines.
     70 
     71    Returns:
     72      A list of strings without trailing newlines.
     73    """
     74    if not lines:
     75      return []
     76 
     77    # symbolized output contain more lines than the input, as the symbolized
     78    # stacktraces will be added. To account for the extra output lines, keep
     79    # reading until this eof_line token is reached. Using a format that will
     80    # be considered a "useful line" without modifying its output by
     81    # third_party/android_platform/development/scripts/stack_core.py
     82    eof_line = self.getEofLine()
     83    out_lines = []
     84 
     85    def _reader():
     86      while True:
     87        line = self._proc.stdout.readline()
     88        # Return an empty string at EOF (when stdin is closed).
     89        if not line:
     90          break
     91        line = line[:-1]
     92        if line == eof_line:
     93          break
     94        out_lines.append(line)
     95 
     96    if self.IsBusy():
     97      logging.warning('%s: Having to wait for transformation.', self.name)
     98 
     99    # Allow only one thread to operate at a time.
    100    with self._lock:
    101      if self.IsClosed():
    102        if self._started and not self._closed_called:
    103          logging.warning('%s: Process exited with code=%d.', self.name,
    104                          self._proc.returncode)
    105          self.Close()
    106        return lines
    107 
    108      reader_thread = reraiser_thread.ReraiserThread(_reader)
    109      reader_thread.start()
    110 
    111      try:
    112        self._proc.stdin.write('\n'.join(lines))
    113        self._proc.stdin.write('\n{}\n'.format(eof_line))
    114        self._proc.stdin.flush()
    115        time_since_proc_start = time.time() - self._proc_start_time
    116        timeout = (max(0, self._process_start_timeout - time_since_proc_start) +
    117                   max(self._minimum_timeout,
    118                       len(lines) * self._per_line_timeout))
    119        reader_thread.join(timeout)
    120        if self.IsClosed():
    121          logging.warning('%s: Close() called by another thread during join().',
    122                          self.name)
    123          return lines
    124        if reader_thread.is_alive():
    125          logging.error('%s: Timed out after %f seconds with input:', self.name,
    126                        timeout)
    127          for l in lines:
    128            logging.error(l)
    129          logging.error(eof_line)
    130          logging.error('%s: End of timed out input.', self.name)
    131          logging.error('%s: Timed out output was:', self.name)
    132          for l in out_lines:
    133            logging.error(l)
    134          logging.error('%s: End of timed out output.', self.name)
    135          self.Close()
    136          return lines
    137        return out_lines
    138      except IOError:
    139        logging.exception('%s: Exception during transformation', self.name)
    140        self.Close()
    141        return lines
    142 
    143  def Close(self):
    144    with self._close_lock:
    145      needs_closing = not self.IsClosed()
    146      self._closed_called = True
    147 
    148    if needs_closing:
    149      self._proc.stdin.close()
    150      self._proc.kill()
    151      self._proc.wait()
    152 
    153  def __del__(self):
    154    # self._proc is None when Popen() fails.
    155    if not self._closed_called and self._proc:
    156      logging.error('%s: Forgot to Close()', self.name)
    157      self.Close()
    158 
    159  @property
    160  @abstractmethod
    161  def name(self):
    162    ...
    163 
    164  @property
    165  @abstractmethod
    166  def command(self):
    167    ...
    168 
    169  @staticmethod
    170  def getEofLine():
    171    # Use a format that will be considered a "useful line" without modifying its
    172    # output by third_party/android_platform/development/scripts/stack_core.py
    173    return "Generic useful log header: \'{}\'".format(uuid.uuid4().hex)
    174 
    175 
    176 class ExpensiveLineTransformerPool(ABC):
    177  def __init__(self, max_restarts, pool_size, passthrough_on_failure):
    178    self._max_restarts = max_restarts
    179    self._pool = [self.CreateTransformer() for _ in range(pool_size)]
    180    self._passthrough_on_failure = passthrough_on_failure
    181    # Allow only one thread to select from the pool at a time.
    182    self._lock = threading.Lock()
    183    self._num_restarts = 0
    184 
    185  def __enter__(self):
    186    pass
    187 
    188  def __exit__(self, *args):
    189    self.Close()
    190 
    191  def TransformLines(self, lines):
    192    with self._lock:
    193      assert self._pool, 'TransformLines() called on a closed Pool.'
    194 
    195      # transformation is broken.
    196      if self._num_restarts == self._max_restarts:
    197        if self._passthrough_on_failure:
    198          return lines
    199        raise Exception('%s is broken.' % self.name)
    200 
    201      # Restart any closed transformer.
    202      for i, d in enumerate(self._pool):
    203        if d.IsClosed():
    204          logging.warning('%s: Restarting closed instance.', self.name)
    205          self._pool[i] = self.CreateTransformer()
    206          self._num_restarts += 1
    207          if self._num_restarts == self._max_restarts:
    208            logging.warning('%s: MAX_RESTARTS reached.', self.name)
    209            if self._passthrough_on_failure:
    210              return lines
    211            raise Exception('%s is broken.' % self.name)
    212 
    213      selected = next((x for x in self._pool if x.IsReady()), self._pool[0])
    214      # Rotate the order so that next caller will not choose the same one.
    215      self._pool.remove(selected)
    216      self._pool.append(selected)
    217 
    218    return selected.TransformLines(lines)
    219 
    220  def Close(self):
    221    with self._lock:
    222      for d in self._pool:
    223        d.Close()
    224      self._pool = None
    225 
    226  @abstractmethod
    227  def CreateTransformer(self):
    228    ...
    229 
    230  @property
    231  @abstractmethod
    232  def name(self):
    233    ...