tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

parallel.py (5853B)


      1 # Copyright 2020 The Chromium Authors
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 """Helpers related to multiprocessing.
      5 
      6 Based on: //tools/binary_size/libsupersize/parallel.py
      7 """
      8 
      9 import atexit
     10 import logging
     11 import multiprocessing
     12 import os
     13 import sys
     14 import threading
     15 import traceback
     16 
     17 DISABLE_ASYNC = os.environ.get('DISABLE_ASYNC') == '1'
     18 if DISABLE_ASYNC:
     19  logging.warning('Running in synchronous mode.')
     20 
     21 _all_pools = None
     22 _is_child_process = False
     23 _silence_exceptions = False
     24 
     25 # Used to pass parameters to forked processes without pickling.
     26 _fork_params = None
     27 _fork_kwargs = None
     28 
     29 # Ensure fork is used on MacOS for multiprocessing compatibility.
     30 # Starting from Python 3.8, the "spawn" method is the default on MacOS.
     31 # On Linux hosts this line will be a no-op.
     32 multiprocessing.set_start_method('fork')
     33 
     34 class _ImmediateResult:
     35  def __init__(self, value):
     36    self._value = value
     37 
     38  def get(self):
     39    return self._value
     40 
     41  def wait(self):
     42    pass
     43 
     44  def ready(self):
     45    return True
     46 
     47  def successful(self):
     48    return True
     49 
     50 
     51 class _ExceptionWrapper:
     52  """Used to marshal exception messages back to main process."""
     53 
     54  def __init__(self, msg, exception_type=None):
     55    self.msg = msg
     56    self.exception_type = exception_type
     57 
     58  def MaybeThrow(self):
     59    if self.exception_type:
     60      raise getattr(__builtins__,
     61                    self.exception_type)('Originally caused by: ' + self.msg)
     62 
     63 
     64 class _FuncWrapper:
     65  """Runs on the fork()'ed side to catch exceptions and spread *args."""
     66 
     67  def __init__(self, func):
     68    global _is_child_process
     69    _is_child_process = True
     70    self._func = func
     71 
     72  def __call__(self, index, _=None):
     73    global _fork_kwargs
     74    try:
     75      if _fork_kwargs is None:  # Clarifies _fork_kwargs is map for pylint.
     76        _fork_kwargs = {}
     77      return self._func(*_fork_params[index], **_fork_kwargs)
     78    except Exception as e:
     79      # Only keep the exception type for builtin exception types or else risk
     80      # further marshalling exceptions.
     81      exception_type = None
     82      if hasattr(__builtins__, type(e).__name__):
     83        exception_type = type(e).__name__
     84      # multiprocessing is supposed to catch and return exceptions automatically
     85      # but it doesn't seem to work properly :(.
     86      return _ExceptionWrapper(traceback.format_exc(), exception_type)
     87    except:  # pylint: disable=bare-except
     88      return _ExceptionWrapper(traceback.format_exc())
     89 
     90 
     91 class _WrappedResult:
     92  """Allows for host-side logic to be run after child process has terminated.
     93 
     94  * Unregisters associated pool _all_pools.
     95  * Raises exception caught by _FuncWrapper.
     96  """
     97 
     98  def __init__(self, result, pool=None):
     99    self._result = result
    100    self._pool = pool
    101 
    102  def get(self):
    103    self.wait()
    104    value = self._result.get()
    105    _CheckForException(value)
    106    return value
    107 
    108  def wait(self):
    109    self._result.wait()
    110    if self._pool:
    111      _all_pools.remove(self._pool)
    112      self._pool = None
    113 
    114  def ready(self):
    115    return self._result.ready()
    116 
    117  def successful(self):
    118    return self._result.successful()
    119 
    120 
    121 def _TerminatePools():
    122  """Calls .terminate() on all active process pools.
    123 
    124  Not supposed to be necessary according to the docs, but seems to be required
    125  when child process throws an exception or Ctrl-C is hit.
    126  """
    127  global _silence_exceptions
    128  _silence_exceptions = True
    129  # Child processes cannot have pools, but atexit runs this function because
    130  # it was registered before fork()ing.
    131  if _is_child_process:
    132    return
    133 
    134  def close_pool(pool):
    135    try:
    136      pool.terminate()
    137    except:  # pylint: disable=bare-except
    138      pass
    139 
    140  for i, pool in enumerate(_all_pools):
    141    # Without calling terminate() on a separate thread, the call can block
    142    # forever.
    143    thread = threading.Thread(name='Pool-Terminate-{}'.format(i),
    144                              target=close_pool,
    145                              args=(pool, ))
    146    thread.daemon = True
    147    thread.start()
    148 
    149 
    150 def _CheckForException(value):
    151  if isinstance(value, _ExceptionWrapper):
    152    global _silence_exceptions
    153    if not _silence_exceptions:
    154      value.MaybeThrow()
    155      _silence_exceptions = True
    156      logging.error('Subprocess raised an exception:\n%s', value.msg)
    157    sys.exit(1)
    158 
    159 
    160 def _MakeProcessPool(job_params, **job_kwargs):
    161  global _all_pools
    162  global _fork_params
    163  global _fork_kwargs
    164  assert _fork_params is None
    165  assert _fork_kwargs is None
    166  pool_size = min(len(job_params), multiprocessing.cpu_count())
    167  _fork_params = job_params
    168  _fork_kwargs = job_kwargs
    169  ret = multiprocessing.Pool(pool_size)
    170  _fork_params = None
    171  _fork_kwargs = None
    172  if _all_pools is None:
    173    _all_pools = []
    174    atexit.register(_TerminatePools)
    175  _all_pools.append(ret)
    176  return ret
    177 
    178 
    179 def ForkAndCall(func, args):
    180  """Runs |func| in a fork'ed process.
    181 
    182  Returns:
    183    A Result object (call .get() to get the return value)
    184  """
    185  if DISABLE_ASYNC:
    186    pool = None
    187    result = _ImmediateResult(func(*args))
    188  else:
    189    pool = _MakeProcessPool([args])  # Omit |kwargs|.
    190    result = pool.apply_async(_FuncWrapper(func), (0, ))
    191    pool.close()
    192  return _WrappedResult(result, pool=pool)
    193 
    194 
    195 def BulkForkAndCall(func, arg_tuples, **kwargs):
    196  """Calls |func| in a fork'ed process for each set of args within |arg_tuples|.
    197 
    198  Args:
    199    kwargs: Common keyword arguments to be passed to |func|.
    200 
    201  Yields the return values in order.
    202  """
    203  arg_tuples = list(arg_tuples)
    204  if not arg_tuples:
    205    return
    206 
    207  if DISABLE_ASYNC:
    208    for args in arg_tuples:
    209      yield func(*args, **kwargs)
    210    return
    211 
    212  pool = _MakeProcessPool(arg_tuples, **kwargs)
    213  wrapped_func = _FuncWrapper(func)
    214  try:
    215    for result in pool.imap(wrapped_func, range(len(arg_tuples))):
    216      _CheckForException(result)
    217      yield result
    218  finally:
    219    pool.close()
    220    pool.join()
    221    _all_pools.remove(pool)