tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

local_test_server_spawner.py (2839B)


      1 # Copyright 2014 The Chromium Authors
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 
      6 import json
      7 import time
      8 
      9 from devil.android import forwarder
     10 from devil.android import ports
     11 from pylib.base import test_server
     12 from pylib.constants import host_paths
     13 
     14 with host_paths.SysPath(host_paths.BUILD_UTIL_PATH):
     15  from lib.common import chrome_test_server_spawner
     16 
     17 # The tests should not need more than one test server instance.
     18 MAX_TEST_SERVER_INSTANCES = 1
     19 
     20 
     21 def _WaitUntil(predicate, max_attempts=5):
     22  """Blocks until the provided predicate (function) is true.
     23 
     24  Returns:
     25    Whether the provided predicate was satisfied once (before the timeout).
     26  """
     27  sleep_time_sec = 0.025
     28  for _ in range(1, max_attempts):
     29    if predicate():
     30      return True
     31    time.sleep(sleep_time_sec)
     32    sleep_time_sec = min(1, sleep_time_sec * 2)  # Don't wait more than 1 sec.
     33  return False
     34 
     35 
     36 class PortForwarderAndroid(chrome_test_server_spawner.PortForwarder):
     37 
     38  def __init__(self, device):
     39    self.device = device
     40 
     41  def Map(self, port_pairs):
     42    forwarder.Forwarder.Map(port_pairs, self.device)
     43 
     44  def GetDevicePortForHostPort(self, host_port):
     45    return forwarder.Forwarder.DevicePortForHostPort(host_port)
     46 
     47  def WaitHostPortAvailable(self, port):
     48    return _WaitUntil(lambda: ports.IsHostPortAvailable(port))
     49 
     50  def WaitPortNotAvailable(self, port):
     51    return _WaitUntil(lambda: not ports.IsHostPortAvailable(port))
     52 
     53  def WaitDevicePortReady(self, port):
     54    return _WaitUntil(lambda: ports.IsDevicePortUsed(self.device, port))
     55 
     56  def Unmap(self, device_port):
     57    forwarder.Forwarder.UnmapDevicePort(device_port, self.device)
     58 
     59 
     60 class LocalTestServerSpawner(test_server.TestServer):
     61 
     62  def __init__(self, port, device):
     63    super().__init__()
     64    self._device = device
     65    self._spawning_server = chrome_test_server_spawner.SpawningServer(
     66        port, PortForwarderAndroid(device), MAX_TEST_SERVER_INSTANCES)
     67 
     68  @property
     69  def server_address(self):
     70    return self._spawning_server.server.server_address
     71 
     72  @property
     73  def port(self):
     74    return self.server_address[1]
     75 
     76  #override
     77  def SetUp(self):
     78    # See net/test/spawned_test_server/remote_test_server.h for description of
     79    # the fields in the config file.
     80    test_server_config = json.dumps({
     81      'spawner_url_base': 'http://localhost:%d' % self.port
     82    })
     83    self._device.WriteFile(
     84        '%s/net-test-server-config' % self._device.GetExternalStoragePath(),
     85        test_server_config)
     86    forwarder.Forwarder.Map([(self.port, self.port)], self._device)
     87    self._spawning_server.Start()
     88 
     89  #override
     90  def Reset(self):
     91    self._spawning_server.CleanupState()
     92 
     93  #override
     94  def TearDown(self):
     95    self.Reset()
     96    self._spawning_server.Stop()
     97    forwarder.Forwarder.UnmapDevicePort(self.port, self._device)