tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

chrome_test_server_spawner.py (17264B)


      1 # Copyright 2017 The Chromium Authors
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """A "Test Server Spawner" that handles killing/stopping per-test test servers.
      6 
      7 It's used to accept requests from the device to spawn and kill instances of the
      8 chrome test server on the host.
      9 """
     10 # pylint: disable=W0702
     11 
     12 import json
     13 import logging
     14 import os
     15 import select
     16 import struct
     17 import subprocess
     18 import sys
     19 import threading
     20 import time
     21 import urllib
     22 
     23 from http.server import BaseHTTPRequestHandler
     24 from http.server import HTTPServer
     25 
     26 SERVER_TYPES = {
     27    'http': '',
     28    'ftp': '-f',
     29    'ws': '--websocket',
     30 }
     31 
     32 
     33 _DIR_SOURCE_ROOT = os.path.abspath(
     34    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, os.pardir,
     35                 os.pardir))
     36 
     37 
     38 _logger = logging.getLogger(__name__)
     39 
     40 
     41 # Path that are needed to import necessary modules when launching a testserver.
     42 os.environ['PYTHONPATH'] = os.environ.get('PYTHONPATH', '') + (
     43    ':%s:%s' % (os.path.join(_DIR_SOURCE_ROOT, 'third_party'),
     44                os.path.join(_DIR_SOURCE_ROOT, 'net', 'tools', 'testserver')))
     45 
     46 
     47 def _GetServerTypeCommandLine(server_type):
     48  """Returns the command-line by the given server type.
     49 
     50  Args:
     51    server_type: the server type to be used (e.g. 'http').
     52 
     53  Returns:
     54    A string containing the command-line argument.
     55  """
     56  if server_type not in SERVER_TYPES:
     57    raise NotImplementedError('Unknown server type: %s' % server_type)
     58  return SERVER_TYPES[server_type]
     59 
     60 
     61 class PortForwarder:
     62  def Map(self, port_pairs):
     63    pass
     64 
     65  def GetDevicePortForHostPort(self, host_port):
     66    """Returns the device port that corresponds to a given host port."""
     67    return host_port
     68 
     69  def WaitHostPortAvailable(self, port):
     70    """Returns True if |port| is available."""
     71    return True
     72 
     73  def WaitPortNotAvailable(self, port):
     74    """Returns True if |port| is not available."""
     75    return True
     76 
     77  def WaitDevicePortReady(self, port):
     78    """Returns whether the provided port is used."""
     79    return True
     80 
     81  def Unmap(self, device_port):
     82    """Unmaps specified port"""
     83    pass
     84 
     85 
     86 class TestServerThread(threading.Thread):
     87  """A thread to run the test server in a separate process."""
     88 
     89  def __init__(self, ready_event, arguments, port_forwarder):
     90    """Initialize TestServerThread with the following argument.
     91 
     92    Args:
     93      ready_event: event which will be set when the test server is ready.
     94      arguments: dictionary of arguments to run the test server.
     95      device: An instance of DeviceUtils.
     96      tool: instance of runtime error detection tool.
     97    """
     98    threading.Thread.__init__(self)
     99    self.wait_event = threading.Event()
    100    self.stop_event = threading.Event()
    101    self.ready_event = ready_event
    102    self.ready_event.clear()
    103    self.arguments = arguments
    104    self.port_forwarder = port_forwarder
    105    self.test_server_process = None
    106    self.is_ready = False
    107    self.host_port = 0
    108    self.host_ocsp_port = 0
    109    assert isinstance(self.host_port, int)
    110    # The forwarder device port now is dynamically allocated.
    111    self.forwarder_device_port = 0
    112    self.forwarder_ocsp_device_port = 0
    113    self.process = None
    114    self.command_line = []
    115 
    116  def _WaitToStartAndGetPortFromTestServer(self, pipe_in):
    117    """Waits for the Python test server to start and gets the port it is using.
    118 
    119    The port information is passed by the Python test server with a pipe given
    120    by |pipe_in|. It is written as a result to |self.host_port|.
    121 
    122    Returns:
    123      Whether the port used by the test server was successfully fetched.
    124    """
    125    (in_fds, _, _) = select.select([pipe_in], [], [])
    126    if len(in_fds) == 0:
    127      _logger.error('Failed to wait to the Python test server to be started.')
    128      return False
    129    # First read the data length as an unsigned 4-byte value.  This
    130    # is _not_ using network byte ordering since the Python test server packs
    131    # size as native byte order and all Chromium platforms so far are
    132    # configured to use little-endian.
    133    # TODO(jnd): Change the Python test server and local_test_server_*.cc to
    134    # use a unified byte order (either big-endian or little-endian).
    135    data_length = os.read(pipe_in, struct.calcsize('=L'))
    136    if data_length:
    137      (data_length,) = struct.unpack('=L', data_length)
    138      assert data_length
    139    if not data_length:
    140      _logger.error('Failed to get length of server data.')
    141      return False
    142    server_data_json = os.read(pipe_in, data_length)
    143    if not server_data_json:
    144      _logger.error('Failed to get server data.')
    145      return False
    146    _logger.info('Got port json data: %s', server_data_json)
    147 
    148    parsed_server_data = None
    149    try:
    150      parsed_server_data = json.loads(server_data_json)
    151    except ValueError:
    152      pass
    153 
    154    if not isinstance(parsed_server_data, dict):
    155      _logger.error('Failed to parse server_data: %s' % server_data_json)
    156      return False
    157 
    158    if not isinstance(parsed_server_data.get('port'), int):
    159      _logger.error('Failed to get port information from the server data.')
    160      return False
    161 
    162    self.host_port = parsed_server_data['port']
    163    self.host_ocsp_port = parsed_server_data.get('ocsp_port', 0)
    164 
    165    return self.port_forwarder.WaitPortNotAvailable(self.host_port)
    166 
    167  def _GenerateCommandLineArguments(self, pipe_out):
    168    """Generates the command line to run the test server.
    169 
    170    Note that all options are processed by following the definitions in
    171    testserver.py.
    172    """
    173    if self.command_line:
    174      return
    175 
    176    args_copy = dict(self.arguments)
    177 
    178    # Translate the server type.
    179    type_cmd = _GetServerTypeCommandLine(args_copy.pop('server-type'))
    180    if type_cmd:
    181      self.command_line.append(type_cmd)
    182 
    183    # Use a pipe to get the port given by the Python test server.
    184    self.command_line.append('--startup-pipe=%d' % pipe_out)
    185 
    186    # Pass the remaining arguments as-is.
    187    for key, values in args_copy.items():
    188      if not isinstance(values, list):
    189        values = [values]
    190      for value in values:
    191        if value is None:
    192          self.command_line.append('--%s' % key)
    193        else:
    194          self.command_line.append('--%s=%s' % (key, value))
    195 
    196  def _CloseUnnecessaryFDsForTestServerProcess(self, pipe_out):
    197    # This is required to avoid subtle deadlocks that could be caused by the
    198    # test server child process inheriting undesirable file descriptors such as
    199    # file lock file descriptors. Note stdin, stdout, and stderr (0-2) are left
    200    # alone and redirected with subprocess.Popen. It is important to leave those
    201    # fds filled, or the test server will accidentally open other fds at those
    202    # numbers.
    203    for fd in range(3, 1024):
    204      if fd != pipe_out:
    205        try:
    206          os.close(fd)
    207        except:
    208          pass
    209 
    210  def run(self):
    211    _logger.info('Start running the thread!')
    212    self.wait_event.clear()
    213 
    214    # Set up a pipe for the server to report when it has started.
    215    pipe_in, pipe_out = os.pipe()
    216 
    217    # TODO(crbug.com/40618161): Remove if condition after python3 migration.
    218    if hasattr(os, 'set_inheritable'):
    219      os.set_inheritable(pipe_out, True)
    220 
    221    try:
    222      self._GenerateCommandLineArguments(pipe_out)
    223      # TODO(crbug.com/40618161): When this script is ported to Python 3, replace
    224      # 'vpython3' below with sys.executable.
    225      command = [
    226          'vpython3',
    227          os.path.join(_DIR_SOURCE_ROOT, 'net', 'tools', 'testserver',
    228                       'testserver.py')
    229      ] + self.command_line
    230      _logger.info('Running: %s', command)
    231 
    232      # Disable PYTHONUNBUFFERED because it has a bad interaction with the
    233      # testserver. Remove once this interaction is fixed.
    234      unbuf = os.environ.pop('PYTHONUNBUFFERED', None)
    235 
    236      # Pass _DIR_SOURCE_ROOT as the child's working directory so that relative
    237      # paths in the arguments are resolved correctly. devnull can be replaced
    238      # with subprocess.DEVNULL in Python 3.
    239      with open(os.devnull, 'r+b') as devnull:
    240        self.process = subprocess.Popen(
    241            command,
    242            preexec_fn=lambda: self._CloseUnnecessaryFDsForTestServerProcess(
    243                pipe_out),
    244            stdin=devnull,
    245            # Preserve stdout and stderr from the test server.
    246            stdout=None,
    247            stderr=None,
    248            cwd=_DIR_SOURCE_ROOT,
    249            close_fds=False)
    250 
    251      # Close pipe_out early. If self.process crashes, this will be visible
    252      # in _WaitToStartAndGetPortFromTestServer's select loop.
    253      os.close(pipe_out)
    254      pipe_out = -1
    255      if unbuf:
    256        os.environ['PYTHONUNBUFFERED'] = unbuf
    257      self.is_ready = self._WaitToStartAndGetPortFromTestServer(pipe_in)
    258 
    259      if self.is_ready:
    260        port_map = [(0, self.host_port)]
    261        if self.host_ocsp_port:
    262          port_map.extend([(0, self.host_ocsp_port)])
    263        self.port_forwarder.Map(port_map)
    264 
    265        self.forwarder_device_port = \
    266            self.port_forwarder.GetDevicePortForHostPort(self.host_port)
    267        if self.host_ocsp_port:
    268          self.forwarder_ocsp_device_port = \
    269              self.port_forwarder.GetDevicePortForHostPort(self.host_ocsp_port)
    270 
    271        # Check whether the forwarder is ready on the device.
    272        self.is_ready = self.forwarder_device_port and \
    273            self.port_forwarder.WaitDevicePortReady(self.forwarder_device_port)
    274 
    275      # Wake up the request handler thread.
    276      self.ready_event.set()
    277      # Keep thread running until Stop() gets called.
    278      self.stop_event.wait()
    279      if self.process.poll() is None:
    280        self.process.kill()
    281        # Wait for process to actually terminate.
    282        # (crbug.com/946475)
    283        self.process.wait()
    284 
    285      self.port_forwarder.Unmap(self.forwarder_device_port)
    286      self.process = None
    287      self.is_ready = False
    288    finally:
    289      if pipe_in >= 0:
    290        os.close(pipe_in)
    291      if pipe_out >= 0:
    292        os.close(pipe_out)
    293    _logger.info('Test-server has died.')
    294    self.wait_event.set()
    295 
    296  def Stop(self):
    297    """Blocks until the loop has finished.
    298 
    299    Note that this must be called in another thread.
    300    """
    301    if not self.process:
    302      return
    303    self.stop_event.set()
    304    self.wait_event.wait()
    305 
    306 
    307 class SpawningServerRequestHandler(BaseHTTPRequestHandler):
    308  """A handler used to process http GET/POST request."""
    309 
    310  def _SendResponse(self, response_code, response_reason, additional_headers,
    311                    contents):
    312    """Generates a response sent to the client from the provided parameters.
    313 
    314    Args:
    315      response_code: number of the response status.
    316      response_reason: string of reason description of the response.
    317      additional_headers: dict of additional headers. Each key is the name of
    318                          the header, each value is the content of the header.
    319      contents: string of the contents we want to send to client.
    320    """
    321    self.send_response(response_code, response_reason)
    322    self.send_header('Content-Type', 'text/html')
    323    # Specify the content-length as without it the http(s) response will not
    324    # be completed properly (and the browser keeps expecting data).
    325    self.send_header('Content-Length', len(contents))
    326    for header_name in additional_headers:
    327      self.send_header(header_name, additional_headers[header_name])
    328    self.end_headers()
    329    self.wfile.write(contents.encode('utf8'))
    330    self.wfile.flush()
    331 
    332  def _StartTestServer(self):
    333    """Starts the test server thread."""
    334    _logger.info('Handling request to spawn a test server.')
    335    content_type = self.headers.get('content-type')
    336    if content_type != 'application/json':
    337      raise Exception('Bad content-type for start request.')
    338    content_length = self.headers.get('content-length')
    339    if not content_length:
    340      content_length = 0
    341    try:
    342      content_length = int(content_length)
    343    except:
    344      raise Exception('Bad content-length for start request.')
    345    _logger.info(content_length)
    346    test_server_argument_json = self.rfile.read(content_length)
    347    _logger.info(test_server_argument_json)
    348 
    349    if len(self.server.test_servers) >= self.server.max_instances:
    350      self._SendResponse(400, 'Invalid request', {},
    351                         'Too many test servers running')
    352      return
    353 
    354    ready_event = threading.Event()
    355    new_server = TestServerThread(ready_event,
    356                                  json.loads(test_server_argument_json),
    357                                  self.server.port_forwarder)
    358    new_server.setDaemon(True)
    359    new_server.start()
    360    ready_event.wait()
    361    if new_server.is_ready:
    362      response = {'port': new_server.forwarder_device_port,
    363                  'message': 'started'};
    364      if new_server.forwarder_ocsp_device_port:
    365        response['ocsp_port'] = new_server.forwarder_ocsp_device_port
    366      self._SendResponse(200, 'OK', {}, json.dumps(response))
    367      _logger.info('Test server is running on port %d forwarded to %d.' %
    368              (new_server.forwarder_device_port, new_server.host_port))
    369      port = new_server.forwarder_device_port
    370      assert port not in self.server.test_servers
    371      self.server.test_servers[port] = new_server
    372    else:
    373      new_server.Stop()
    374      self._SendResponse(500, 'Test Server Error.', {}, '')
    375      _logger.info('Encounter problem during starting a test server.')
    376 
    377  def _KillTestServer(self, params):
    378    """Stops the test server instance."""
    379    try:
    380      port = int(params['port'][0])
    381    except ValueError:
    382      port = None
    383    if port == None or port <= 0:
    384      self._SendResponse(400, 'Invalid request.', {}, 'port must be specified')
    385      return
    386 
    387    if port not in self.server.test_servers:
    388      self._SendResponse(400, 'Invalid request.', {},
    389                         "testserver isn't running on port %d" % port)
    390      return
    391 
    392    server = self.server.test_servers.pop(port)
    393 
    394    _logger.info('Handling request to kill a test server on port: %d.', port)
    395    server.Stop()
    396 
    397    # Make sure the status of test server is correct before sending response.
    398    if self.server.port_forwarder.WaitHostPortAvailable(port):
    399      self._SendResponse(200, 'OK', {}, 'killed')
    400      _logger.info('Test server on port %d is killed', port)
    401    else:
    402      # We expect the port to be free, but nothing stops the system from
    403      # binding something else to that port, so don't throw error.
    404      # (crbug.com/946475)
    405      self._SendResponse(200, 'OK', {}, '')
    406      _logger.warn('Port %s is not free after killing test server.' % port)
    407 
    408  def log_message(self, format, *args):
    409    # Suppress the default HTTP logging behavior if the logging level is higher
    410    # than INFO.
    411    if _logger.getEffectiveLevel() <= logging.INFO:
    412      pass
    413 
    414  def do_POST(self):
    415    parsed_path = urllib.parse.urlparse(self.path)
    416    action = parsed_path.path
    417    _logger.info('Action for POST method is: %s.', action)
    418    if action == '/start':
    419      self._StartTestServer()
    420    else:
    421      self._SendResponse(400, 'Unknown request.', {}, '')
    422      _logger.info('Encounter unknown request: %s.', action)
    423 
    424  def do_GET(self):
    425    parsed_path = urllib.parse.urlparse(self.path)
    426    action = parsed_path.path
    427    params = urllib.parse.parse_qs(parsed_path.query, keep_blank_values=1)
    428    _logger.info('Action for GET method is: %s.', action)
    429    for param in params:
    430      _logger.info('%s=%s', param, params[param][0])
    431    if action == '/kill':
    432      self._KillTestServer(params)
    433    elif action == '/ping':
    434      # The ping handler is used to check whether the spawner server is ready
    435      # to serve the requests. We don't need to test the status of the test
    436      # server when handling ping request.
    437      self._SendResponse(200, 'OK', {}, 'ready')
    438      _logger.info('Handled ping request and sent response.')
    439    else:
    440      self._SendResponse(400, 'Unknown request', {}, '')
    441      _logger.info('Encounter unknown request: %s.', action)
    442 
    443 
    444 class SpawningServer(object):
    445  """The class used to start/stop a http server."""
    446 
    447  def __init__(self, test_server_spawner_port, port_forwarder, max_instances):
    448    self.server = HTTPServer(('', test_server_spawner_port),
    449                             SpawningServerRequestHandler)
    450    self.server_port = self.server.server_port
    451    _logger.info('Started test server spawner on port: %d.', self.server_port)
    452 
    453    self.server.port_forwarder = port_forwarder
    454    self.server.test_servers = {}
    455    self.server.max_instances = max_instances
    456 
    457  def _Listen(self):
    458    _logger.info('Starting test server spawner.')
    459    self.server.serve_forever()
    460 
    461  def Start(self):
    462    """Starts the test server spawner."""
    463    listener_thread = threading.Thread(target=self._Listen)
    464    listener_thread.setDaemon(True)
    465    listener_thread.start()
    466 
    467  def Stop(self):
    468    """Stops the test server spawner.
    469 
    470    Also cleans the server state.
    471    """
    472    self.CleanupState()
    473    self.server.shutdown()
    474 
    475  def CleanupState(self):
    476    """Cleans up the spawning server state.
    477 
    478    This should be called if the test server spawner is reused,
    479    to avoid sharing the test server instance.
    480    """
    481    if self.server.test_servers:
    482      _logger.warning('Not all test servers were stopped.')
    483      for port in self.server.test_servers:
    484        _logger.warning('Stopping test server on port %d' % port)
    485        self.server.test_servers[port].Stop()
    486      self.server.test_servers = {}